1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from dataclasses import dataclass
5 from plomtask.db import DatabaseConnection, BaseModel
6 from plomtask.misc import VersionedAttribute
7 from plomtask.conditions import Condition, ConditionsRelations
8 from plomtask.exceptions import NotFoundException, BadFormatException
12 class ProcessStepsNode:
13 """Collects what's useful to know for ProcessSteps tree display."""
17 steps: dict[int, ProcessStepsNode]
21 class Process(BaseModel, ConditionsRelations):
22 """Template for, and metadata for, Todos, and their arrangements."""
23 table_name = 'processes'
25 # pylint: disable=too-many-instance-attributes
27 def __init__(self, id_: int | None) -> None:
29 self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
30 self.description = VersionedAttribute(self, 'process_descriptions', '')
31 self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
32 self.explicit_steps: list[ProcessStep] = []
33 self.conditions: list[Condition] = []
34 self.enables: list[Condition] = []
35 self.disables: list[Condition] = []
38 def all(cls, db_conn: DatabaseConnection) -> list[Process]:
39 """Collect all Processes and their connected VersionedAttributes."""
41 for id_, process in db_conn.cached_processes.items():
42 processes[id_] = process
43 already_recorded = processes.keys()
44 for id_ in db_conn.column_all('processes', 'id'):
45 if id_ not in already_recorded:
46 process = cls.by_id(db_conn, id_)
47 processes[process.id_] = process
48 return list(processes.values())
51 def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
52 create: bool = False) -> Process:
53 """Collect Process, its VersionedAttributes, and its child IDs."""
57 process, from_cache = super()._by_id(db_conn, id_)
61 raise NotFoundException(f'Process not found of id: {id_}')
62 process = Process(id_)
63 if isinstance(process.id_, int):
64 for name in ('title', 'description', 'effort'):
65 table = f'process_{name}s'
66 for row in db_conn.row_where(table, 'parent', process.id_):
67 getattr(process, name).history_from_row(row)
68 for row in db_conn.row_where('process_steps', 'owner',
70 step = ProcessStep.from_table_row(db_conn, row)
71 process.explicit_steps += [step]
72 for name in ('conditions', 'enables', 'disables'):
73 table = f'process_{name}'
74 for c_id in db_conn.column_where(table, 'condition',
75 'process', process.id_):
76 target = getattr(process, name)
77 target += [Condition.by_id(db_conn, c_id)]
78 assert isinstance(process, Process)
81 def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
82 """Return Processes using self for a ProcessStep."""
86 for id_ in db_conn.column_where('process_steps', 'owner',
87 'step_process', self.id_):
89 return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
91 def get_steps(self, db_conn: DatabaseConnection, external_owner:
92 Process | None = None) -> dict[int, ProcessStepsNode]:
93 """Return tree of depended-on explicit and implicit ProcessSteps."""
95 def make_node(step: ProcessStep) -> ProcessStepsNode:
97 if external_owner is not None:
98 is_explicit = step.owner_id == external_owner.id_
99 process = self.__class__.by_id(db_conn, step.step_process_id)
100 step_steps = process.get_steps(db_conn, external_owner)
101 return ProcessStepsNode(process, step.parent_step_id,
102 is_explicit, step_steps, False)
104 def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
105 explicit_children = [s for s in self.explicit_steps
106 if s.parent_step_id == node_id]
107 for child in explicit_children:
108 assert isinstance(child.id_, int)
109 node.steps[child.id_] = make_node(child)
110 node.seen = node_id in seen_step_ids
111 seen_step_ids.add(node_id)
112 for id_, step in node.steps.items():
113 walk_steps(id_, step)
115 steps: dict[int, ProcessStepsNode] = {}
116 seen_step_ids: Set[int] = set()
117 if external_owner is None:
118 external_owner = self
119 for step in [s for s in self.explicit_steps
120 if s.parent_step_id is None]:
121 assert isinstance(step.id_, int)
122 steps[step.id_] = make_node(step)
123 for step_id, step_node in steps.items():
124 walk_steps(step_id, step_node)
128 db_conn: DatabaseConnection,
130 step_process_id: int,
131 parent_step_id: int | None) -> ProcessStep:
132 """Create new ProcessStep, save and add it to self.explicit_steps.
134 Also checks against step recursion.
136 The new step's parent_step_id will fall back to None either if no
137 matching ProcessStep is found (which can be assumed in case it was
138 just deleted under its feet), or if the parent step would not be
139 owned by the current Process.
142 def walk_steps(node: ProcessStep) -> None:
143 if node.step_process_id == self.id_:
144 raise BadFormatException('bad step selection causes recursion')
145 step_process = self.by_id(db_conn, node.step_process_id)
146 for step in step_process.explicit_steps:
149 if parent_step_id is not None:
151 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
152 if parent_step.owner_id != self.id_:
153 parent_step_id = None
154 except NotFoundException:
155 parent_step_id = None
156 assert isinstance(self.id_, int)
157 step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
159 self.explicit_steps += [step]
160 step.save(db_conn) # NB: This ensures a non-None step.id_.
163 def set_steps(self, db_conn: DatabaseConnection,
164 steps: list[tuple[int | None, int, int | None]]) -> None:
165 """Set self.explicit_steps in bulk."""
166 assert isinstance(self.id_, int)
167 for step in self.explicit_steps:
168 assert isinstance(step.id_, int)
169 del db_conn.cached_process_steps[step.id_]
170 self.explicit_steps = []
171 db_conn.delete_where('process_steps', 'owner', self.id_)
172 for step_tuple in steps:
173 self._add_step(db_conn, step_tuple[0],
174 step_tuple[1], step_tuple[2])
176 def save(self, db_conn: DatabaseConnection) -> None:
177 """Add (or re-write) self and connected items to DB."""
178 self.save_core(db_conn)
179 assert isinstance(self.id_, int)
180 self.title.save(db_conn)
181 self.description.save(db_conn)
182 self.effort.save(db_conn)
183 db_conn.rewrite_relations('process_conditions', 'process', self.id_,
184 [[c.id_] for c in self.conditions])
185 db_conn.rewrite_relations('process_enables', 'process', self.id_,
186 [[c.id_] for c in self.enables])
187 db_conn.rewrite_relations('process_disables', 'process', self.id_,
188 [[c.id_] for c in self.disables])
189 db_conn.delete_where('process_steps', 'owner', self.id_)
190 for step in self.explicit_steps:
192 db_conn.cached_processes[self.id_] = self
195 class ProcessStep(BaseModel):
196 """Sub-unit of Processes."""
197 table_name = 'process_steps'
198 to_save = ['owner_id', 'step_process_id', 'parent_step_id']
200 def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
201 parent_step_id: int | None) -> None:
203 self.owner_id = owner_id
204 self.step_process_id = step_process_id
205 self.parent_step_id = parent_step_id
208 def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
209 """Retrieve ProcessStep by id_, or throw NotFoundException."""
210 step, _ = super()._by_id(db_conn, id_)
212 assert isinstance(step, ProcessStep)
214 raise NotFoundException(f'found no ProcessStep of ID {id_}')
216 def save(self, db_conn: DatabaseConnection) -> None:
217 """Default to simply calling self.save_core for simple cases."""
218 self.save_core(db_conn)