"""Collecting Processes and Process-related items."""
from __future__ import annotations
from dataclasses import dataclass
-from typing import Set
+from typing import Set, Any
+from sqlite3 import Row
from plomtask.db import DatabaseConnection, BaseModel
-from plomtask.misc import VersionedAttribute
+from plomtask.versioned_attributes import VersionedAttribute
from plomtask.conditions import Condition, ConditionsRelations
-from plomtask.exceptions import NotFoundException, BadFormatException
+from plomtask.exceptions import (NotFoundException, BadFormatException,
+ HandledException)
@dataclass
seen: bool
-class Process(BaseModel, ConditionsRelations):
+class Process(BaseModel[int], ConditionsRelations):
"""Template for, and metadata for, Todos, and their arrangements."""
- table_name = 'processes'
-
# pylint: disable=too-many-instance-attributes
-
- def __init__(self, id_: int | None) -> None:
- self.set_int_id(id_)
+ table_name = 'processes'
+ to_save = ['calendarize']
+ to_save_versioned = ['title', 'description', 'effort']
+ to_save_relations = [('process_conditions', 'process', 'conditions', 0),
+ ('process_blockers', 'process', 'blockers', 0),
+ ('process_enables', 'process', 'enables', 0),
+ ('process_disables', 'process', 'disables', 0)]
+ to_search = ['title.newest', 'description.newest']
+
+ def __init__(self, id_: int | None, calendarize: bool = False) -> None:
+ BaseModel.__init__(self, id_)
+ ConditionsRelations.__init__(self)
self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
self.description = VersionedAttribute(self, 'process_descriptions', '')
self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
self.explicit_steps: list[ProcessStep] = []
- self.conditions: list[Condition] = []
- self.enables: list[Condition] = []
- self.disables: list[Condition] = []
+ self.calendarize = calendarize
@classmethod
- def all(cls, db_conn: DatabaseConnection) -> list[Process]:
- """Collect all Processes and their connected VersionedAttributes."""
- processes = {}
- for id_, process in db_conn.cached_processes.items():
- processes[id_] = process
- already_recorded = processes.keys()
- for id_ in db_conn.column_all('processes', 'id'):
- if id_ not in already_recorded:
- process = cls.by_id(db_conn, id_)
- processes[process.id_] = process
- return list(processes.values())
-
- @classmethod
- def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
- create: bool = False) -> Process:
- """Collect Process, its VersionedAttributes, and its child IDs."""
- process = None
- if id_:
- process, _ = super()._by_id(db_conn, id_)
- if not process:
- if not create:
- raise NotFoundException(f'Process not found of id: {id_}')
- process = Process(id_)
- if isinstance(process.id_, int):
- for name in ('title', 'description', 'effort'):
- table = f'process_{name}s'
- for row in db_conn.row_where(table, 'parent', process.id_):
- getattr(process, name).history_from_row(row)
- for row in db_conn.row_where('process_steps', 'owner',
- process.id_):
- step = ProcessStep.from_table_row(db_conn, row)
- process.explicit_steps += [step]
- for name in ('conditions', 'enables', 'disables'):
- table = f'process_{name}'
- for cond_id in db_conn.column_where(table, 'condition',
- 'process', process.id_):
- target = getattr(process, name)
- target += [Condition.by_id(db_conn, cond_id)]
- assert isinstance(process, Process)
+ def from_table_row(cls, db_conn: DatabaseConnection,
+ row: Row | list[Any]) -> Process:
+ """Make from DB row, with dependencies."""
+ process = super().from_table_row(db_conn, row)
+ assert isinstance(process.id_, int)
+ for name in ('title', 'description', 'effort'):
+ table = f'process_{name}s'
+ for row_ in db_conn.row_where(table, 'parent', process.id_):
+ getattr(process, name).history_from_row(row_)
+ for row_ in db_conn.row_where('process_steps', 'owner',
+ process.id_):
+ step = ProcessStep.from_table_row(db_conn, row_)
+ process.explicit_steps += [step] # pylint: disable=no-member
+ for name in ('conditions', 'blockers', 'enables', 'disables'):
+ table = f'process_{name}'
+ assert isinstance(process.id_, int)
+ for c_id in db_conn.column_where(table, 'condition',
+ 'process', process.id_):
+ target = getattr(process, name)
+ target += [Condition.by_id(db_conn, c_id)]
return process
def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
for child in explicit_children:
assert isinstance(child.id_, int)
node.steps[child.id_] = make_node(child)
+ # ensure that one (!) explicit step of process replaces
+ # one (!) implicit step of same process
+ for i in [i for i, s in node.steps.items()
+ if not s.is_explicit
+ and s.process.id_ == child.step_process_id]:
+ del node.steps[i]
+ break
node.seen = node_id in seen_step_ids
seen_step_ids.add(node_id)
for id_, step in node.steps.items():
just deleted under its feet), or if the parent step would not be
owned by the current Process.
"""
-
def walk_steps(node: ProcessStep) -> None:
if node.step_process_id == self.id_:
raise BadFormatException('bad step selection causes recursion')
"""Set self.explicit_steps in bulk."""
assert isinstance(self.id_, int)
for step in self.explicit_steps:
- assert isinstance(step.id_, int)
- del db_conn.cached_process_steps[step.id_]
+ step.uncache()
self.explicit_steps = []
db_conn.delete_where('process_steps', 'owner', self.id_)
for step_tuple in steps:
def save(self, db_conn: DatabaseConnection) -> None:
"""Add (or re-write) self and connected items to DB."""
- self.save_core(db_conn)
+ super().save(db_conn)
assert isinstance(self.id_, int)
- self.title.save(db_conn)
- self.description.save(db_conn)
- self.effort.save(db_conn)
- db_conn.rewrite_relations('process_conditions', 'process', self.id_,
- [[c.id_] for c in self.conditions])
- db_conn.rewrite_relations('process_enables', 'process', self.id_,
- [[c.id_] for c in self.enables])
- db_conn.rewrite_relations('process_disables', 'process', self.id_,
- [[c.id_] for c in self.disables])
db_conn.delete_where('process_steps', 'owner', self.id_)
for step in self.explicit_steps:
step.save(db_conn)
- db_conn.cached_processes[self.id_] = self
+ def remove(self, db_conn: DatabaseConnection) -> None:
+ """Remove from DB, with dependencies.
-class ProcessStep(BaseModel):
+ Guard against removal of Processes in use.
+ """
+ assert isinstance(self.id_, int)
+ for _ in db_conn.row_where('process_steps', 'step_process', self.id_):
+ raise HandledException('cannot remove Process in use')
+ for _ in db_conn.row_where('todos', 'process', self.id_):
+ raise HandledException('cannot remove Process in use')
+ for step in self.explicit_steps:
+ step.remove(db_conn)
+ super().remove(db_conn)
+
+
+class ProcessStep(BaseModel[int]):
"""Sub-unit of Processes."""
table_name = 'process_steps'
to_save = ['owner_id', 'step_process_id', 'parent_step_id']
def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
parent_step_id: int | None) -> None:
- self.set_int_id(id_)
+ super().__init__(id_)
self.owner_id = owner_id
self.step_process_id = step_process_id
self.parent_step_id = parent_step_id
- @classmethod
- def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
- """Retrieve ProcessStep by id_, or throw NotFoundException."""
- step, _ = super()._by_id(db_conn, id_)
- if step:
- assert isinstance(step, ProcessStep)
- return step
- raise NotFoundException(f'found no ProcessStep of ID {id_}')
-
- def save(self, db_conn: DatabaseConnection) -> None:
- """Default to simply calling self.save_core for simple cases."""
- self.save_core(db_conn)
+ def remove(self, db_conn: DatabaseConnection) -> None:
+ """Remove from DB, and owner's .explicit_steps."""
+ owner = Process.by_id(db_conn, self.owner_id)
+ owner.explicit_steps.remove(self)
+ super().remove(db_conn)