1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Set, Any
5 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.versioned_attributes import VersionedAttribute
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import (NotFoundException, BadFormatException,
14 class ProcessStepsNode:
15 """Collects what's useful to know for ProcessSteps tree display."""
19 steps: dict[int, ProcessStepsNode]
23 class Process(BaseModel[int], ConditionsRelations):
24 """Template for, and metadata for, Todos, and their arrangements."""
25 table_name = 'processes'
27 # pylint: disable=too-many-instance-attributes
29 def __init__(self, id_: int | None) -> None:
31 self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
32 self.description = VersionedAttribute(self, 'process_descriptions', '')
33 self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
34 self.explicit_steps: list[ProcessStep] = []
35 self.conditions: list[Condition] = []
36 self.enables: list[Condition] = []
37 self.disables: list[Condition] = []
40 def from_table_row(cls, db_conn: DatabaseConnection,
41 row: Row | list[Any]) -> Process:
42 """Make from DB row, with dependencies."""
43 process = super().from_table_row(db_conn, row)
44 assert isinstance(process.id_, int)
45 for name in ('title', 'description', 'effort'):
46 table = f'process_{name}s'
47 for row_ in db_conn.row_where(table, 'parent', process.id_):
48 getattr(process, name).history_from_row(row_)
49 for row_ in db_conn.row_where('process_steps', 'owner',
51 step = ProcessStep.from_table_row(db_conn, row_)
52 process.explicit_steps += [step] # pylint: disable=no-member
53 for name in ('conditions', 'enables', 'disables'):
54 table = f'process_{name}'
55 assert isinstance(process.id_, int)
56 for c_id in db_conn.column_where(table, 'condition',
57 'process', process.id_):
58 target = getattr(process, name)
59 target += [Condition.by_id(db_conn, c_id)]
62 def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
63 """Return Processes using self for a ProcessStep."""
67 for id_ in db_conn.column_where('process_steps', 'owner',
68 'step_process', self.id_):
70 return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
72 def get_steps(self, db_conn: DatabaseConnection, external_owner:
73 Process | None = None) -> dict[int, ProcessStepsNode]:
74 """Return tree of depended-on explicit and implicit ProcessSteps."""
76 def make_node(step: ProcessStep) -> ProcessStepsNode:
78 if external_owner is not None:
79 is_explicit = step.owner_id == external_owner.id_
80 process = self.__class__.by_id(db_conn, step.step_process_id)
81 step_steps = process.get_steps(db_conn, external_owner)
82 return ProcessStepsNode(process, step.parent_step_id,
83 is_explicit, step_steps, False)
85 def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
86 explicit_children = [s for s in self.explicit_steps
87 if s.parent_step_id == node_id]
88 for child in explicit_children:
89 assert isinstance(child.id_, int)
90 node.steps[child.id_] = make_node(child)
91 node.seen = node_id in seen_step_ids
92 seen_step_ids.add(node_id)
93 for id_, step in node.steps.items():
96 steps: dict[int, ProcessStepsNode] = {}
97 seen_step_ids: Set[int] = set()
98 if external_owner is None:
100 for step in [s for s in self.explicit_steps
101 if s.parent_step_id is None]:
102 assert isinstance(step.id_, int)
103 steps[step.id_] = make_node(step)
104 for step_id, step_node in steps.items():
105 walk_steps(step_id, step_node)
109 db_conn: DatabaseConnection,
111 step_process_id: int,
112 parent_step_id: int | None) -> ProcessStep:
113 """Create new ProcessStep, save and add it to self.explicit_steps.
115 Also checks against step recursion.
117 The new step's parent_step_id will fall back to None either if no
118 matching ProcessStep is found (which can be assumed in case it was
119 just deleted under its feet), or if the parent step would not be
120 owned by the current Process.
123 def walk_steps(node: ProcessStep) -> None:
124 if node.step_process_id == self.id_:
125 raise BadFormatException('bad step selection causes recursion')
126 step_process = self.by_id(db_conn, node.step_process_id)
127 for step in step_process.explicit_steps:
130 if parent_step_id is not None:
132 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
133 if parent_step.owner_id != self.id_:
134 parent_step_id = None
135 except NotFoundException:
136 parent_step_id = None
137 assert isinstance(self.id_, int)
138 step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
140 self.explicit_steps += [step]
141 step.save(db_conn) # NB: This ensures a non-None step.id_.
144 def set_steps(self, db_conn: DatabaseConnection,
145 steps: list[tuple[int | None, int, int | None]]) -> None:
146 """Set self.explicit_steps in bulk."""
147 assert isinstance(self.id_, int)
148 for step in self.explicit_steps:
150 self.explicit_steps = []
151 db_conn.delete_where('process_steps', 'owner', self.id_)
152 for step_tuple in steps:
153 self._add_step(db_conn, step_tuple[0],
154 step_tuple[1], step_tuple[2])
156 def save(self, db_conn: DatabaseConnection) -> None:
157 """Add (or re-write) self and connected items to DB."""
158 self.save_core(db_conn)
159 assert isinstance(self.id_, int)
160 self.title.save(db_conn)
161 self.description.save(db_conn)
162 self.effort.save(db_conn)
163 db_conn.rewrite_relations('process_conditions', 'process', self.id_,
164 [[c.id_] for c in self.conditions])
165 db_conn.rewrite_relations('process_enables', 'process', self.id_,
166 [[c.id_] for c in self.enables])
167 db_conn.rewrite_relations('process_disables', 'process', self.id_,
168 [[c.id_] for c in self.disables])
169 db_conn.delete_where('process_steps', 'owner', self.id_)
170 for step in self.explicit_steps:
173 def remove(self, db_conn: DatabaseConnection) -> None:
174 """Remove from DB, with dependencies.
176 Guard against removal of Processes in use.
178 assert isinstance(self.id_, int)
179 for _ in db_conn.row_where('process_steps', 'step_process', self.id_):
180 raise HandledException('cannot remove Process in use')
181 for _ in db_conn.row_where('todos', 'process', self.id_):
182 raise HandledException('cannot remove Process in use')
183 db_conn.delete_where('process_conditions', 'process', self.id_)
184 db_conn.delete_where('process_enables', 'process', self.id_)
185 db_conn.delete_where('process_disables', 'process', self.id_)
186 for step in self.explicit_steps:
188 db_conn.delete_where('process_titles', 'parent', self.id_)
189 db_conn.delete_where('process_descriptions', 'parent', self.id_)
190 db_conn.delete_where('process_efforts', 'parent', self.id_)
191 super().remove(db_conn)
194 class ProcessStep(BaseModel[int]):
195 """Sub-unit of Processes."""
196 table_name = 'process_steps'
197 to_save = ['owner_id', 'step_process_id', 'parent_step_id']
199 def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
200 parent_step_id: int | None) -> None:
201 super().__init__(id_)
202 self.owner_id = owner_id
203 self.step_process_id = step_process_id
204 self.parent_step_id = parent_step_id
206 def save(self, db_conn: DatabaseConnection) -> None:
207 """Default to simply calling self.save_core for simple cases."""
208 self.save_core(db_conn)
210 def remove(self, db_conn: DatabaseConnection) -> None:
211 """Remove from DB, and owner's .explicit_steps."""
212 owner = Process.by_id(db_conn, self.owner_id)
213 owner.explicit_steps.remove(self)
214 super().remove(db_conn)