1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from typing import Any, Set
4 from plomtask.db import DatabaseConnection, BaseModel
5 from plomtask.misc import VersionedAttribute
6 from plomtask.conditions import Condition
7 from plomtask.exceptions import NotFoundException, BadFormatException
10 class Process(BaseModel):
11 """Template for, and metadata for, Todos, and their arrangements."""
12 table_name = 'processes'
14 # pylint: disable=too-many-instance-attributes
16 def __init__(self, id_: int | None) -> None:
18 self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
19 self.description = VersionedAttribute(self, 'process_descriptions', '')
20 self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
21 self.explicit_steps: list[ProcessStep] = []
22 self.conditions: list[Condition] = []
23 self.fulfills: list[Condition] = []
24 self.undoes: list[Condition] = []
27 def all(cls, db_conn: DatabaseConnection) -> list[Process]:
28 """Collect all Processes and their connected VersionedAttributes."""
30 for id_, process in db_conn.cached_processes.items():
31 processes[id_] = process
32 already_recorded = processes.keys()
33 for row in db_conn.exec('SELECT id FROM processes'):
34 if row[0] not in already_recorded:
35 process = cls.by_id(db_conn, row[0])
36 processes[process.id_] = process
37 return list(processes.values())
40 def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
41 create: bool = False) -> Process:
42 """Collect Process, its VersionedAttributes, and its child IDs."""
43 if id_ in db_conn.cached_processes.keys():
44 process = db_conn.cached_processes[id_]
45 assert isinstance(process, Process)
48 for row in db_conn.exec('SELECT * FROM processes '
49 'WHERE id = ?', (id_,)):
54 raise NotFoundException(f'Process not found of id: {id_}')
55 process = Process(id_)
56 for row in db_conn.exec('SELECT * FROM process_titles '
57 'WHERE parent_id = ?', (process.id_,)):
58 process.title.history[row[1]] = row[2]
59 for row in db_conn.exec('SELECT * FROM process_descriptions '
60 'WHERE parent_id = ?', (process.id_,)):
61 process.description.history[row[1]] = row[2]
62 for row in db_conn.exec('SELECT * FROM process_efforts '
63 'WHERE parent_id = ?', (process.id_,)):
64 process.effort.history[row[1]] = row[2]
65 for row in db_conn.exec('SELECT * FROM process_steps '
66 'WHERE owner_id = ?', (process.id_,)):
67 process.explicit_steps += [ProcessStep.from_table_row(db_conn,
69 for row in db_conn.exec('SELECT condition FROM process_conditions '
70 'WHERE process = ?', (process.id_,)):
71 process.conditions += [Condition.by_id(db_conn, row[0])]
72 for row in db_conn.exec('SELECT condition FROM process_fulfills '
73 'WHERE process = ?', (process.id_,)):
74 process.fulfills += [Condition.by_id(db_conn, row[0])]
75 for row in db_conn.exec('SELECT condition FROM process_undoes '
76 'WHERE process = ?', (process.id_,)):
77 process.undoes += [Condition.by_id(db_conn, row[0])]
78 assert isinstance(process, Process)
81 def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
82 """Return Processes using self for a ProcessStep."""
84 for owner_id in db_conn.exec('SELECT owner_id FROM process_steps WHERE'
85 ' step_process_id = ?', (self.id_,)):
86 owner_ids.add(owner_id[0])
87 return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
89 def get_steps(self, db_conn: DatabaseConnection, external_owner:
90 Process | None = None) -> dict[int, dict[str, object]]:
91 """Return tree of depended-on explicit and implicit ProcessSteps."""
93 def make_node(step: ProcessStep) -> dict[str, object]:
95 if external_owner is not None:
96 is_explicit = step.owner_id == external_owner.id_
97 process = self.__class__.by_id(db_conn, step.step_process_id)
98 step_steps = process.get_steps(db_conn, external_owner)
99 return {'process': process, 'parent_id': step.parent_step_id,
100 'is_explicit': is_explicit, 'steps': step_steps}
102 def walk_steps(node_id: int, node: dict[str, Any]) -> None:
103 explicit_children = [s for s in self.explicit_steps
104 if s.parent_step_id == node_id]
105 for child in explicit_children:
106 node['steps'][child.id_] = make_node(child)
107 node['seen'] = node_id in seen_step_ids
108 seen_step_ids.add(node_id)
109 for id_, step in node['steps'].items():
110 walk_steps(id_, step)
112 steps: dict[int, dict[str, object]] = {}
113 seen_step_ids: Set[int] = set()
114 if external_owner is None:
115 external_owner = self
116 for step in [s for s in self.explicit_steps
117 if s.parent_step_id is None]:
118 assert isinstance(step.id_, int)
119 steps[step.id_] = make_node(step)
120 for step_id, step_node in steps.items():
121 walk_steps(step_id, step_node)
124 def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
125 trgt: str = 'conditions') -> None:
126 """Set self.[target] to Conditions identified by ids."""
127 trgt_list = getattr(self, trgt)
128 while len(trgt_list) > 0:
131 trgt_list += [Condition.by_id(db_conn, id_)]
133 def set_fulfills(self, db_conn: DatabaseConnection,
134 ids: list[int]) -> None:
135 """Set self.fulfills to Conditions identified by ids."""
136 self.set_conditions(db_conn, ids, 'fulfills')
138 def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
139 """Set self.undoes to Conditions identified by ids."""
140 self.set_conditions(db_conn, ids, 'undoes')
143 db_conn: DatabaseConnection,
145 step_process_id: int,
146 parent_step_id: int | None) -> ProcessStep:
147 """Create new ProcessStep, save and add it to self.explicit_steps.
149 Also checks against step recursion.
151 The new step's parent_step_id will fall back to None either if no
152 matching ProcessStep is found (which can be assumed in case it was
153 just deleted under its feet), or if the parent step would not be
154 owned by the current Process.
156 def walk_steps(node: ProcessStep) -> None:
157 if node.step_process_id == self.id_:
158 raise BadFormatException('bad step selection causes recursion')
159 step_process = self.by_id(db_conn, node.step_process_id)
160 for step in step_process.explicit_steps:
162 if parent_step_id is not None:
164 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
165 if parent_step.owner_id != self.id_:
166 parent_step_id = None
167 except NotFoundException:
168 parent_step_id = None
169 assert isinstance(self.id_, int)
170 step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
172 self.explicit_steps += [step]
173 step.save(db_conn) # NB: This ensures a non-None step.id_.
176 def set_steps(self, db_conn: DatabaseConnection,
177 steps: list[tuple[int | None, int, int | None]]) -> None:
178 """Set self.explicit_steps in bulk."""
179 for step in self.explicit_steps:
180 assert isinstance(step.id_, int)
181 del db_conn.cached_process_steps[step.id_]
182 self.explicit_steps = []
183 db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
185 for step_tuple in steps:
186 self._add_step(db_conn, step_tuple[0],
187 step_tuple[1], step_tuple[2])
189 def save(self, db_conn: DatabaseConnection) -> None:
190 """Add (or re-write) self and connected items to DB."""
191 self.save_core(db_conn)
192 self.title.save(db_conn)
193 self.description.save(db_conn)
194 self.effort.save(db_conn)
195 db_conn.exec('DELETE FROM process_conditions WHERE process = ?',
197 for condition in self.conditions:
198 db_conn.exec('INSERT INTO process_conditions VALUES (?,?)',
199 (self.id_, condition.id_))
200 db_conn.exec('DELETE FROM process_fulfills WHERE process = ?',
202 for condition in self.fulfills:
203 db_conn.exec('INSERT INTO process_fulfills VALUES (?,?)',
204 (self.id_, condition.id_))
205 db_conn.exec('DELETE FROM process_undoes WHERE process = ?',
207 for condition in self.undoes:
208 db_conn.exec('INSERT INTO process_undoes VALUES (?,?)',
209 (self.id_, condition.id_))
210 assert isinstance(self.id_, int)
211 db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
213 for step in self.explicit_steps:
215 db_conn.cached_processes[self.id_] = self
218 class ProcessStep(BaseModel):
219 """Sub-unit of Processes."""
220 table_name = 'process_steps'
221 to_save = ['owner_id', 'step_process_id', 'parent_step_id']
223 def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
224 parent_step_id: int | None) -> None:
226 self.owner_id = owner_id
227 self.step_process_id = step_process_id
228 self.parent_step_id = parent_step_id
231 def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
232 """Retrieve ProcessStep by id_, or throw NotFoundException."""
233 if id_ in db_conn.cached_process_steps.keys():
234 step = db_conn.cached_process_steps[id_]
235 assert isinstance(step, ProcessStep)
237 for row in db_conn.exec('SELECT * FROM process_steps '
238 'WHERE step_id = ?', (id_,)):
239 step = cls.from_table_row(db_conn, row)
240 assert isinstance(step, ProcessStep)
241 raise NotFoundException(f'found no ProcessStep of ID {id_}')
243 def save(self, db_conn: DatabaseConnection) -> None:
244 """Default to simply calling self.save_core for simple cases."""
245 self.save_core(db_conn)