1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from dataclasses import dataclass
5 from plomtask.db import DatabaseConnection, BaseModel
6 from plomtask.misc import VersionedAttribute
7 from plomtask.conditions import Condition, ConditionsRelations
8 from plomtask.exceptions import NotFoundException, BadFormatException
12 class ProcessStepsNode:
13 """Collects what's useful to know for ProcessSteps tree display."""
17 steps: dict[int, ProcessStepsNode]
21 class Process(BaseModel, ConditionsRelations):
22 """Template for, and metadata for, Todos, and their arrangements."""
23 table_name = 'processes'
25 # pylint: disable=too-many-instance-attributes
27 def __init__(self, id_: int | None) -> None:
29 self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
30 self.description = VersionedAttribute(self, 'process_descriptions', '')
31 self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
32 self.explicit_steps: list[ProcessStep] = []
33 self.conditions: list[Condition] = []
34 self.enables: list[Condition] = []
35 self.disables: list[Condition] = []
38 def all(cls, db_conn: DatabaseConnection) -> list[Process]:
39 """Collect all Processes and their connected VersionedAttributes."""
41 for id_, process in db_conn.cached_processes.items():
42 processes[id_] = process
43 already_recorded = processes.keys()
44 for id_ in db_conn.column_all('processes', 'id'):
45 if id_ not in already_recorded:
46 process = cls.by_id(db_conn, id_)
47 processes[process.id_] = process
48 return list(processes.values())
51 def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
52 create: bool = False) -> Process:
53 """Collect Process, its VersionedAttributes, and its child IDs."""
56 process, _ = super()._by_id(db_conn, id_)
59 raise NotFoundException(f'Process not found of id: {id_}')
60 process = Process(id_)
61 if isinstance(process.id_, int):
62 for name in ('title', 'description', 'effort'):
63 table = f'process_{name}s'
64 for row in db_conn.row_where(table, 'parent', process.id_):
65 getattr(process, name).history_from_row(row)
66 for row in db_conn.row_where('process_steps', 'owner',
68 step = ProcessStep.from_table_row(db_conn, row)
69 process.explicit_steps += [step]
70 for name in ('conditions', 'enables', 'disables'):
71 table = f'process_{name}'
72 for cond_id in db_conn.column_where(table, 'condition',
73 'process', process.id_):
74 target = getattr(process, name)
75 target += [Condition.by_id(db_conn, cond_id)]
76 assert isinstance(process, Process)
79 def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
80 """Return Processes using self for a ProcessStep."""
84 for id_ in db_conn.column_where('process_steps', 'owner',
85 'step_process', self.id_):
87 return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
89 def get_steps(self, db_conn: DatabaseConnection, external_owner:
90 Process | None = None) -> dict[int, ProcessStepsNode]:
91 """Return tree of depended-on explicit and implicit ProcessSteps."""
93 def make_node(step: ProcessStep) -> ProcessStepsNode:
95 if external_owner is not None:
96 is_explicit = step.owner_id == external_owner.id_
97 process = self.__class__.by_id(db_conn, step.step_process_id)
98 step_steps = process.get_steps(db_conn, external_owner)
99 return ProcessStepsNode(process, step.parent_step_id,
100 is_explicit, step_steps, False)
102 def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
103 explicit_children = [s for s in self.explicit_steps
104 if s.parent_step_id == node_id]
105 for child in explicit_children:
106 assert isinstance(child.id_, int)
107 node.steps[child.id_] = make_node(child)
108 node.seen = node_id in seen_step_ids
109 seen_step_ids.add(node_id)
110 for id_, step in node.steps.items():
111 walk_steps(id_, step)
113 steps: dict[int, ProcessStepsNode] = {}
114 seen_step_ids: Set[int] = set()
115 if external_owner is None:
116 external_owner = self
117 for step in [s for s in self.explicit_steps
118 if s.parent_step_id is None]:
119 assert isinstance(step.id_, int)
120 steps[step.id_] = make_node(step)
121 for step_id, step_node in steps.items():
122 walk_steps(step_id, step_node)
126 db_conn: DatabaseConnection,
128 step_process_id: int,
129 parent_step_id: int | None) -> ProcessStep:
130 """Create new ProcessStep, save and add it to self.explicit_steps.
132 Also checks against step recursion.
134 The new step's parent_step_id will fall back to None either if no
135 matching ProcessStep is found (which can be assumed in case it was
136 just deleted under its feet), or if the parent step would not be
137 owned by the current Process.
140 def walk_steps(node: ProcessStep) -> None:
141 if node.step_process_id == self.id_:
142 raise BadFormatException('bad step selection causes recursion')
143 step_process = self.by_id(db_conn, node.step_process_id)
144 for step in step_process.explicit_steps:
147 if parent_step_id is not None:
149 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
150 if parent_step.owner_id != self.id_:
151 parent_step_id = None
152 except NotFoundException:
153 parent_step_id = None
154 assert isinstance(self.id_, int)
155 step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
157 self.explicit_steps += [step]
158 step.save(db_conn) # NB: This ensures a non-None step.id_.
161 def set_steps(self, db_conn: DatabaseConnection,
162 steps: list[tuple[int | None, int, int | None]]) -> None:
163 """Set self.explicit_steps in bulk."""
164 assert isinstance(self.id_, int)
165 for step in self.explicit_steps:
166 assert isinstance(step.id_, int)
167 del db_conn.cached_process_steps[step.id_]
168 self.explicit_steps = []
169 db_conn.delete_where('process_steps', 'owner', self.id_)
170 for step_tuple in steps:
171 self._add_step(db_conn, step_tuple[0],
172 step_tuple[1], step_tuple[2])
174 def save(self, db_conn: DatabaseConnection) -> None:
175 """Add (or re-write) self and connected items to DB."""
176 self.save_core(db_conn)
177 assert isinstance(self.id_, int)
178 self.title.save(db_conn)
179 self.description.save(db_conn)
180 self.effort.save(db_conn)
181 db_conn.rewrite_relations('process_conditions', 'process', self.id_,
182 [[c.id_] for c in self.conditions])
183 db_conn.rewrite_relations('process_enables', 'process', self.id_,
184 [[c.id_] for c in self.enables])
185 db_conn.rewrite_relations('process_disables', 'process', self.id_,
186 [[c.id_] for c in self.disables])
187 db_conn.delete_where('process_steps', 'owner', self.id_)
188 for step in self.explicit_steps:
190 db_conn.cached_processes[self.id_] = self
193 class ProcessStep(BaseModel):
194 """Sub-unit of Processes."""
195 table_name = 'process_steps'
196 to_save = ['owner_id', 'step_process_id', 'parent_step_id']
198 def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
199 parent_step_id: int | None) -> None:
201 self.owner_id = owner_id
202 self.step_process_id = step_process_id
203 self.parent_step_id = parent_step_id
206 def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
207 """Retrieve ProcessStep by id_, or throw NotFoundException."""
208 step, _ = super()._by_id(db_conn, id_)
210 assert isinstance(step, ProcessStep)
212 raise NotFoundException(f'found no ProcessStep of ID {id_}')
214 def save(self, db_conn: DatabaseConnection) -> None:
215 """Default to simply calling self.save_core for simple cases."""
216 self.save_core(db_conn)