1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Set, Any
5 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.versioned_attributes import VersionedAttribute
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import (NotFoundException, BadFormatException,
14 class ProcessStepsNode:
15 """Collects what's useful to know for ProcessSteps tree display."""
19 steps: dict[int, ProcessStepsNode]
23 class Process(BaseModel[int], ConditionsRelations):
24 """Template for, and metadata for, Todos, and their arrangements."""
25 table_name = 'processes'
26 to_save_versioned = ['title', 'description', 'effort']
27 to_save_relations = [('process_conditions', 'process', 'conditions'),
28 ('process_enables', 'process', 'enables'),
29 ('process_disables', 'process', 'disables')]
31 # pylint: disable=too-many-instance-attributes
33 def __init__(self, id_: int | None) -> None:
35 self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
36 self.description = VersionedAttribute(self, 'process_descriptions', '')
37 self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
38 self.explicit_steps: list[ProcessStep] = []
39 self.conditions: list[Condition] = []
40 self.enables: list[Condition] = []
41 self.disables: list[Condition] = []
44 def from_table_row(cls, db_conn: DatabaseConnection,
45 row: Row | list[Any]) -> Process:
46 """Make from DB row, with dependencies."""
47 process = super().from_table_row(db_conn, row)
48 assert isinstance(process.id_, int)
49 for name in ('title', 'description', 'effort'):
50 table = f'process_{name}s'
51 for row_ in db_conn.row_where(table, 'parent', process.id_):
52 getattr(process, name).history_from_row(row_)
53 for row_ in db_conn.row_where('process_steps', 'owner',
55 step = ProcessStep.from_table_row(db_conn, row_)
56 process.explicit_steps += [step] # pylint: disable=no-member
57 for name in ('conditions', 'enables', 'disables'):
58 table = f'process_{name}'
59 assert isinstance(process.id_, int)
60 for c_id in db_conn.column_where(table, 'condition',
61 'process', process.id_):
62 target = getattr(process, name)
63 target += [Condition.by_id(db_conn, c_id)]
66 def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
67 """Return Processes using self for a ProcessStep."""
71 for id_ in db_conn.column_where('process_steps', 'owner',
72 'step_process', self.id_):
74 return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
76 def get_steps(self, db_conn: DatabaseConnection, external_owner:
77 Process | None = None) -> dict[int, ProcessStepsNode]:
78 """Return tree of depended-on explicit and implicit ProcessSteps."""
80 def make_node(step: ProcessStep) -> ProcessStepsNode:
82 if external_owner is not None:
83 is_explicit = step.owner_id == external_owner.id_
84 process = self.__class__.by_id(db_conn, step.step_process_id)
85 step_steps = process.get_steps(db_conn, external_owner)
86 return ProcessStepsNode(process, step.parent_step_id,
87 is_explicit, step_steps, False)
89 def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
90 explicit_children = [s for s in self.explicit_steps
91 if s.parent_step_id == node_id]
92 for child in explicit_children:
93 assert isinstance(child.id_, int)
94 node.steps[child.id_] = make_node(child)
95 node.seen = node_id in seen_step_ids
96 seen_step_ids.add(node_id)
97 for id_, step in node.steps.items():
100 steps: dict[int, ProcessStepsNode] = {}
101 seen_step_ids: Set[int] = set()
102 if external_owner is None:
103 external_owner = self
104 for step in [s for s in self.explicit_steps
105 if s.parent_step_id is None]:
106 assert isinstance(step.id_, int)
107 steps[step.id_] = make_node(step)
108 for step_id, step_node in steps.items():
109 walk_steps(step_id, step_node)
113 db_conn: DatabaseConnection,
115 step_process_id: int,
116 parent_step_id: int | None) -> ProcessStep:
117 """Create new ProcessStep, save and add it to self.explicit_steps.
119 Also checks against step recursion.
121 The new step's parent_step_id will fall back to None either if no
122 matching ProcessStep is found (which can be assumed in case it was
123 just deleted under its feet), or if the parent step would not be
124 owned by the current Process.
127 def walk_steps(node: ProcessStep) -> None:
128 if node.step_process_id == self.id_:
129 raise BadFormatException('bad step selection causes recursion')
130 step_process = self.by_id(db_conn, node.step_process_id)
131 for step in step_process.explicit_steps:
134 if parent_step_id is not None:
136 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
137 if parent_step.owner_id != self.id_:
138 parent_step_id = None
139 except NotFoundException:
140 parent_step_id = None
141 assert isinstance(self.id_, int)
142 step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
144 self.explicit_steps += [step]
145 step.save(db_conn) # NB: This ensures a non-None step.id_.
148 def set_steps(self, db_conn: DatabaseConnection,
149 steps: list[tuple[int | None, int, int | None]]) -> None:
150 """Set self.explicit_steps in bulk."""
151 assert isinstance(self.id_, int)
152 for step in self.explicit_steps:
154 self.explicit_steps = []
155 db_conn.delete_where('process_steps', 'owner', self.id_)
156 for step_tuple in steps:
157 self._add_step(db_conn, step_tuple[0],
158 step_tuple[1], step_tuple[2])
160 def save(self, db_conn: DatabaseConnection) -> None:
161 """Add (or re-write) self and connected items to DB."""
162 super().save(db_conn)
163 assert isinstance(self.id_, int)
164 db_conn.delete_where('process_steps', 'owner', self.id_)
165 for step in self.explicit_steps:
168 def remove(self, db_conn: DatabaseConnection) -> None:
169 """Remove from DB, with dependencies.
171 Guard against removal of Processes in use.
173 assert isinstance(self.id_, int)
174 for _ in db_conn.row_where('process_steps', 'step_process', self.id_):
175 raise HandledException('cannot remove Process in use')
176 for _ in db_conn.row_where('todos', 'process', self.id_):
177 raise HandledException('cannot remove Process in use')
178 for step in self.explicit_steps:
180 super().remove(db_conn)
183 class ProcessStep(BaseModel[int]):
184 """Sub-unit of Processes."""
185 table_name = 'process_steps'
186 to_save = ['owner_id', 'step_process_id', 'parent_step_id']
188 def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
189 parent_step_id: int | None) -> None:
190 super().__init__(id_)
191 self.owner_id = owner_id
192 self.step_process_id = step_process_id
193 self.parent_step_id = parent_step_id
195 def remove(self, db_conn: DatabaseConnection) -> None:
196 """Remove from DB, and owner's .explicit_steps."""
197 owner = Process.by_id(db_conn, self.owner_id)
198 owner.explicit_steps.remove(self)
199 super().remove(db_conn)