1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from dataclasses import dataclass
5 from plomtask.db import DatabaseConnection, BaseModel
6 from plomtask.misc import VersionedAttribute
7 from plomtask.conditions import Condition, ConditionsRelations
8 from plomtask.exceptions import NotFoundException, BadFormatException
12 class ProcessStepsNode:
13 """Collects what's useful to know for ProcessSteps tree display."""
17 steps: dict[int, ProcessStepsNode]
21 class Process(BaseModel[int], ConditionsRelations):
22 """Template for, and metadata for, Todos, and their arrangements."""
23 table_name = 'processes'
25 # pylint: disable=too-many-instance-attributes
27 def __init__(self, id_: int | None) -> None:
29 self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
30 self.description = VersionedAttribute(self, 'process_descriptions', '')
31 self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
32 self.explicit_steps: list[ProcessStep] = []
33 self.conditions: list[Condition] = []
34 self.enables: list[Condition] = []
35 self.disables: list[Condition] = []
38 def all(cls, db_conn: DatabaseConnection) -> list[Process]:
39 """Collect all Processes and their connected VersionedAttributes."""
41 for id_, process in cls.cache_.items():
42 processes[id_] = process
43 already_recorded = processes.keys()
44 for id_ in db_conn.column_all('processes', 'id'):
45 if id_ not in already_recorded:
46 process = cls.by_id(db_conn, id_)
47 assert isinstance(process.id_, int)
48 processes[process.id_] = process
49 return list(processes.values())
52 def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
53 create: bool = False) -> Process:
54 """Collect Process, its VersionedAttributes, and its child IDs."""
58 process, from_cache = super()._by_id(db_conn, id_)
62 raise NotFoundException(f'Process not found of id: {id_}')
63 process = Process(id_)
64 if isinstance(process.id_, int):
65 for name in ('title', 'description', 'effort'):
66 table = f'process_{name}s'
67 for row in db_conn.row_where(table, 'parent', process.id_):
68 getattr(process, name).history_from_row(row)
69 for row in db_conn.row_where('process_steps', 'owner',
71 step = ProcessStep.from_table_row(db_conn, row)
72 process.explicit_steps += [step]
73 for name in ('conditions', 'enables', 'disables'):
74 table = f'process_{name}'
75 for c_id in db_conn.column_where(table, 'condition',
76 'process', process.id_):
77 target = getattr(process, name)
78 target += [Condition.by_id(db_conn, c_id)]
79 assert isinstance(process, Process)
82 def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
83 """Return Processes using self for a ProcessStep."""
87 for id_ in db_conn.column_where('process_steps', 'owner',
88 'step_process', self.id_):
90 return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
92 def get_steps(self, db_conn: DatabaseConnection, external_owner:
93 Process | None = None) -> dict[int, ProcessStepsNode]:
94 """Return tree of depended-on explicit and implicit ProcessSteps."""
96 def make_node(step: ProcessStep) -> ProcessStepsNode:
98 if external_owner is not None:
99 is_explicit = step.owner_id == external_owner.id_
100 process = self.__class__.by_id(db_conn, step.step_process_id)
101 step_steps = process.get_steps(db_conn, external_owner)
102 return ProcessStepsNode(process, step.parent_step_id,
103 is_explicit, step_steps, False)
105 def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
106 explicit_children = [s for s in self.explicit_steps
107 if s.parent_step_id == node_id]
108 for child in explicit_children:
109 assert isinstance(child.id_, int)
110 node.steps[child.id_] = make_node(child)
111 node.seen = node_id in seen_step_ids
112 seen_step_ids.add(node_id)
113 for id_, step in node.steps.items():
114 walk_steps(id_, step)
116 steps: dict[int, ProcessStepsNode] = {}
117 seen_step_ids: Set[int] = set()
118 if external_owner is None:
119 external_owner = self
120 for step in [s for s in self.explicit_steps
121 if s.parent_step_id is None]:
122 assert isinstance(step.id_, int)
123 steps[step.id_] = make_node(step)
124 for step_id, step_node in steps.items():
125 walk_steps(step_id, step_node)
129 db_conn: DatabaseConnection,
131 step_process_id: int,
132 parent_step_id: int | None) -> ProcessStep:
133 """Create new ProcessStep, save and add it to self.explicit_steps.
135 Also checks against step recursion.
137 The new step's parent_step_id will fall back to None either if no
138 matching ProcessStep is found (which can be assumed in case it was
139 just deleted under its feet), or if the parent step would not be
140 owned by the current Process.
143 def walk_steps(node: ProcessStep) -> None:
144 if node.step_process_id == self.id_:
145 raise BadFormatException('bad step selection causes recursion')
146 step_process = self.by_id(db_conn, node.step_process_id)
147 for step in step_process.explicit_steps:
150 if parent_step_id is not None:
152 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
153 if parent_step.owner_id != self.id_:
154 parent_step_id = None
155 except NotFoundException:
156 parent_step_id = None
157 assert isinstance(self.id_, int)
158 step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
160 self.explicit_steps += [step]
161 step.save(db_conn) # NB: This ensures a non-None step.id_.
164 def set_steps(self, db_conn: DatabaseConnection,
165 steps: list[tuple[int | None, int, int | None]]) -> None:
166 """Set self.explicit_steps in bulk."""
167 assert isinstance(self.id_, int)
168 for step in self.explicit_steps:
170 self.explicit_steps = []
171 db_conn.delete_where('process_steps', 'owner', self.id_)
172 for step_tuple in steps:
173 self._add_step(db_conn, step_tuple[0],
174 step_tuple[1], step_tuple[2])
176 def save(self, db_conn: DatabaseConnection) -> None:
177 """Add (or re-write) self and connected items to DB."""
178 self.save_core(db_conn)
179 assert isinstance(self.id_, int)
180 self.title.save(db_conn)
181 self.description.save(db_conn)
182 self.effort.save(db_conn)
183 db_conn.rewrite_relations('process_conditions', 'process', self.id_,
184 [[c.id_] for c in self.conditions])
185 db_conn.rewrite_relations('process_enables', 'process', self.id_,
186 [[c.id_] for c in self.enables])
187 db_conn.rewrite_relations('process_disables', 'process', self.id_,
188 [[c.id_] for c in self.disables])
189 db_conn.delete_where('process_steps', 'owner', self.id_)
190 for step in self.explicit_steps:
194 class ProcessStep(BaseModel[int]):
195 """Sub-unit of Processes."""
196 table_name = 'process_steps'
197 to_save = ['owner_id', 'step_process_id', 'parent_step_id']
199 def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
200 parent_step_id: int | None) -> None:
202 self.owner_id = owner_id
203 self.step_process_id = step_process_id
204 self.parent_step_id = parent_step_id
207 def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
208 """Retrieve ProcessStep by id_, or throw NotFoundException."""
209 step, _ = super()._by_id(db_conn, id_)
211 assert isinstance(step, ProcessStep)
213 raise NotFoundException(f'found no ProcessStep of ID {id_}')
215 def save(self, db_conn: DatabaseConnection) -> None:
216 """Default to simply calling self.save_core for simple cases."""
217 self.save_core(db_conn)