1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from dataclasses import dataclass
5 from plomtask.db import DatabaseConnection, BaseModel
6 from plomtask.misc import VersionedAttribute
7 from plomtask.conditions import Condition, ConditionsRelations
8 from plomtask.exceptions import NotFoundException, BadFormatException
12 class ProcessStepsNode:
13 """Collects what's useful to know for ProcessSteps tree display."""
17 steps: dict[int, ProcessStepsNode]
21 class Process(BaseModel[int], ConditionsRelations):
22 """Template for, and metadata for, Todos, and their arrangements."""
23 table_name = 'processes'
25 # pylint: disable=too-many-instance-attributes
27 def __init__(self, id_: int | None) -> None:
29 self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
30 self.description = VersionedAttribute(self, 'process_descriptions', '')
31 self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
32 self.explicit_steps: list[ProcessStep] = []
33 self.conditions: list[Condition] = []
34 self.enables: list[Condition] = []
35 self.disables: list[Condition] = []
38 def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
39 create: bool = False) -> Process:
40 """Collect Process, its VersionedAttributes, and its child IDs."""
44 process, from_cache = super()._by_id(db_conn, id_)
48 raise NotFoundException(f'Process not found of id: {id_}')
49 process = Process(id_)
50 if isinstance(process.id_, int):
51 for name in ('title', 'description', 'effort'):
52 table = f'process_{name}s'
53 for row in db_conn.row_where(table, 'parent', process.id_):
54 getattr(process, name).history_from_row(row)
55 for row in db_conn.row_where('process_steps', 'owner',
57 step = ProcessStep.from_table_row(db_conn, row)
58 process.explicit_steps += [step]
59 for name in ('conditions', 'enables', 'disables'):
60 table = f'process_{name}'
61 for c_id in db_conn.column_where(table, 'condition',
62 'process', process.id_):
63 target = getattr(process, name)
64 target += [Condition.by_id(db_conn, c_id)]
65 assert isinstance(process, Process)
68 def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
69 """Return Processes using self for a ProcessStep."""
73 for id_ in db_conn.column_where('process_steps', 'owner',
74 'step_process', self.id_):
76 return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
78 def get_steps(self, db_conn: DatabaseConnection, external_owner:
79 Process | None = None) -> dict[int, ProcessStepsNode]:
80 """Return tree of depended-on explicit and implicit ProcessSteps."""
82 def make_node(step: ProcessStep) -> ProcessStepsNode:
84 if external_owner is not None:
85 is_explicit = step.owner_id == external_owner.id_
86 process = self.__class__.by_id(db_conn, step.step_process_id)
87 step_steps = process.get_steps(db_conn, external_owner)
88 return ProcessStepsNode(process, step.parent_step_id,
89 is_explicit, step_steps, False)
91 def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
92 explicit_children = [s for s in self.explicit_steps
93 if s.parent_step_id == node_id]
94 for child in explicit_children:
95 assert isinstance(child.id_, int)
96 node.steps[child.id_] = make_node(child)
97 node.seen = node_id in seen_step_ids
98 seen_step_ids.add(node_id)
99 for id_, step in node.steps.items():
100 walk_steps(id_, step)
102 steps: dict[int, ProcessStepsNode] = {}
103 seen_step_ids: Set[int] = set()
104 if external_owner is None:
105 external_owner = self
106 for step in [s for s in self.explicit_steps
107 if s.parent_step_id is None]:
108 assert isinstance(step.id_, int)
109 steps[step.id_] = make_node(step)
110 for step_id, step_node in steps.items():
111 walk_steps(step_id, step_node)
115 db_conn: DatabaseConnection,
117 step_process_id: int,
118 parent_step_id: int | None) -> ProcessStep:
119 """Create new ProcessStep, save and add it to self.explicit_steps.
121 Also checks against step recursion.
123 The new step's parent_step_id will fall back to None either if no
124 matching ProcessStep is found (which can be assumed in case it was
125 just deleted under its feet), or if the parent step would not be
126 owned by the current Process.
129 def walk_steps(node: ProcessStep) -> None:
130 if node.step_process_id == self.id_:
131 raise BadFormatException('bad step selection causes recursion')
132 step_process = self.by_id(db_conn, node.step_process_id)
133 for step in step_process.explicit_steps:
136 if parent_step_id is not None:
138 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
139 if parent_step.owner_id != self.id_:
140 parent_step_id = None
141 except NotFoundException:
142 parent_step_id = None
143 assert isinstance(self.id_, int)
144 step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
146 self.explicit_steps += [step]
147 step.save(db_conn) # NB: This ensures a non-None step.id_.
150 def set_steps(self, db_conn: DatabaseConnection,
151 steps: list[tuple[int | None, int, int | None]]) -> None:
152 """Set self.explicit_steps in bulk."""
153 assert isinstance(self.id_, int)
154 for step in self.explicit_steps:
156 self.explicit_steps = []
157 db_conn.delete_where('process_steps', 'owner', self.id_)
158 for step_tuple in steps:
159 self._add_step(db_conn, step_tuple[0],
160 step_tuple[1], step_tuple[2])
162 def save(self, db_conn: DatabaseConnection) -> None:
163 """Add (or re-write) self and connected items to DB."""
164 self.save_core(db_conn)
165 assert isinstance(self.id_, int)
166 self.title.save(db_conn)
167 self.description.save(db_conn)
168 self.effort.save(db_conn)
169 db_conn.rewrite_relations('process_conditions', 'process', self.id_,
170 [[c.id_] for c in self.conditions])
171 db_conn.rewrite_relations('process_enables', 'process', self.id_,
172 [[c.id_] for c in self.enables])
173 db_conn.rewrite_relations('process_disables', 'process', self.id_,
174 [[c.id_] for c in self.disables])
175 db_conn.delete_where('process_steps', 'owner', self.id_)
176 for step in self.explicit_steps:
180 class ProcessStep(BaseModel[int]):
181 """Sub-unit of Processes."""
182 table_name = 'process_steps'
183 to_save = ['owner_id', 'step_process_id', 'parent_step_id']
185 def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
186 parent_step_id: int | None) -> None:
188 self.owner_id = owner_id
189 self.step_process_id = step_process_id
190 self.parent_step_id = parent_step_id
193 def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
194 """Retrieve ProcessStep by id_, or throw NotFoundException."""
195 step, _ = super()._by_id(db_conn, id_)
198 raise NotFoundException(f'found no ProcessStep of ID {id_}')
200 def save(self, db_conn: DatabaseConnection) -> None:
201 """Default to simply calling self.save_core for simple cases."""
202 self.save_core(db_conn)