1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from typing import Any, Set
4 from plomtask.db import DatabaseConnection, BaseModel
5 from plomtask.misc import VersionedAttribute
6 from plomtask.conditions import Condition
7 from plomtask.exceptions import NotFoundException, BadFormatException
10 class Process(BaseModel):
11 """Template for, and metadata for, Todos, and their arrangements."""
12 table_name = 'processes'
14 # pylint: disable=too-many-instance-attributes
16 def __init__(self, id_: int | None) -> None:
18 self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
19 self.description = VersionedAttribute(self, 'process_descriptions', '')
20 self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
21 self.explicit_steps: list[ProcessStep] = []
22 self.conditions: list[Condition] = []
23 self.fulfills: list[Condition] = []
24 self.undoes: list[Condition] = []
27 def all(cls, db_conn: DatabaseConnection) -> list[Process]:
28 """Collect all Processes and their connected VersionedAttributes."""
30 for id_, process in db_conn.cached_processes.items():
31 processes[id_] = process
32 already_recorded = processes.keys()
33 for id_ in db_conn.column_all('processes', 'id'):
34 if id_ not in already_recorded:
35 process = cls.by_id(db_conn, id_)
36 processes[process.id_] = process
37 return list(processes.values())
40 def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
41 create: bool = False) -> Process:
42 """Collect Process, its VersionedAttributes, and its child IDs."""
45 process, _ = super()._by_id(db_conn, id_)
48 raise NotFoundException(f'Process not found of id: {id_}')
49 process = Process(id_)
50 if isinstance(process.id_, int):
51 for name in ('title', 'description', 'effort'):
52 table = f'process_{name}s'
53 for row in db_conn.row_where(table, 'parent', process.id_):
54 getattr(process, name).history_from_row(row)
55 for row in db_conn.row_where('process_steps', 'owner',
57 step = ProcessStep.from_table_row(db_conn, row)
58 process.explicit_steps += [step]
59 for name in ('conditions', 'fulfills', 'undoes'):
60 table = f'process_{name}'
61 for cond_id in db_conn.column_where(table, 'condition',
62 'process', process.id_):
63 target = getattr(process, name)
64 target += [Condition.by_id(db_conn, cond_id)]
65 assert isinstance(process, Process)
68 def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
69 """Return Processes using self for a ProcessStep."""
73 for id_ in db_conn.column_where('process_steps', 'owner',
74 'step_process', self.id_):
76 return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
78 def get_steps(self, db_conn: DatabaseConnection, external_owner:
79 Process | None = None) -> dict[int, dict[str, object]]:
80 """Return tree of depended-on explicit and implicit ProcessSteps."""
82 def make_node(step: ProcessStep) -> dict[str, object]:
84 if external_owner is not None:
85 is_explicit = step.owner_id == external_owner.id_
86 process = self.__class__.by_id(db_conn, step.step_process_id)
87 step_steps = process.get_steps(db_conn, external_owner)
88 return {'process': process, 'parent_id': step.parent_step_id,
89 'is_explicit': is_explicit, 'steps': step_steps}
91 def walk_steps(node_id: int, node: dict[str, Any]) -> None:
92 explicit_children = [s for s in self.explicit_steps
93 if s.parent_step_id == node_id]
94 for child in explicit_children:
95 node['steps'][child.id_] = make_node(child)
96 node['seen'] = node_id in seen_step_ids
97 seen_step_ids.add(node_id)
98 for id_, step in node['steps'].items():
101 steps: dict[int, dict[str, object]] = {}
102 seen_step_ids: Set[int] = set()
103 if external_owner is None:
104 external_owner = self
105 for step in [s for s in self.explicit_steps
106 if s.parent_step_id is None]:
107 assert isinstance(step.id_, int)
108 steps[step.id_] = make_node(step)
109 for step_id, step_node in steps.items():
110 walk_steps(step_id, step_node)
113 def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
114 trgt: str = 'conditions') -> None:
115 """Set self.[target] to Conditions identified by ids."""
116 trgt_list = getattr(self, trgt)
117 while len(trgt_list) > 0:
120 trgt_list += [Condition.by_id(db_conn, id_)]
122 def set_fulfills(self, db_conn: DatabaseConnection,
123 ids: list[int]) -> None:
124 """Set self.fulfills to Conditions identified by ids."""
125 self.set_conditions(db_conn, ids, 'fulfills')
127 def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
128 """Set self.undoes to Conditions identified by ids."""
129 self.set_conditions(db_conn, ids, 'undoes')
132 db_conn: DatabaseConnection,
134 step_process_id: int,
135 parent_step_id: int | None) -> ProcessStep:
136 """Create new ProcessStep, save and add it to self.explicit_steps.
138 Also checks against step recursion.
140 The new step's parent_step_id will fall back to None either if no
141 matching ProcessStep is found (which can be assumed in case it was
142 just deleted under its feet), or if the parent step would not be
143 owned by the current Process.
145 def walk_steps(node: ProcessStep) -> None:
146 if node.step_process_id == self.id_:
147 raise BadFormatException('bad step selection causes recursion')
148 step_process = self.by_id(db_conn, node.step_process_id)
149 for step in step_process.explicit_steps:
151 if parent_step_id is not None:
153 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
154 if parent_step.owner_id != self.id_:
155 parent_step_id = None
156 except NotFoundException:
157 parent_step_id = None
158 assert isinstance(self.id_, int)
159 step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
161 self.explicit_steps += [step]
162 step.save(db_conn) # NB: This ensures a non-None step.id_.
165 def set_steps(self, db_conn: DatabaseConnection,
166 steps: list[tuple[int | None, int, int | None]]) -> None:
167 """Set self.explicit_steps in bulk."""
168 assert isinstance(self.id_, int)
169 for step in self.explicit_steps:
170 assert isinstance(step.id_, int)
171 del db_conn.cached_process_steps[step.id_]
172 self.explicit_steps = []
173 db_conn.delete_where('process_steps', 'owner', self.id_)
174 for step_tuple in steps:
175 self._add_step(db_conn, step_tuple[0],
176 step_tuple[1], step_tuple[2])
178 def save(self, db_conn: DatabaseConnection) -> None:
179 """Add (or re-write) self and connected items to DB."""
180 self.save_core(db_conn)
181 assert isinstance(self.id_, int)
182 self.title.save(db_conn)
183 self.description.save(db_conn)
184 self.effort.save(db_conn)
185 db_conn.rewrite_relations('process_conditions', 'process', self.id_,
186 [[c.id_] for c in self.conditions])
187 db_conn.rewrite_relations('process_fulfills', 'process', self.id_,
188 [[c.id_] for c in self.fulfills])
189 db_conn.rewrite_relations('process_undoes', 'process', self.id_,
190 [[c.id_] for c in self.undoes])
191 db_conn.delete_where('process_steps', 'owner', self.id_)
192 for step in self.explicit_steps:
194 db_conn.cached_processes[self.id_] = self
197 class ProcessStep(BaseModel):
198 """Sub-unit of Processes."""
199 table_name = 'process_steps'
200 to_save = ['owner_id', 'step_process_id', 'parent_step_id']
202 def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
203 parent_step_id: int | None) -> None:
205 self.owner_id = owner_id
206 self.step_process_id = step_process_id
207 self.parent_step_id = parent_step_id
210 def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
211 """Retrieve ProcessStep by id_, or throw NotFoundException."""
212 step, _ = super()._by_id(db_conn, id_)
214 assert isinstance(step, ProcessStep)
216 raise NotFoundException(f'found no ProcessStep of ID {id_}')
218 def save(self, db_conn: DatabaseConnection) -> None:
219 """Default to simply calling self.save_core for simple cases."""
220 self.save_core(db_conn)