1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from typing import Any, Set
5 from plomtask.db import DatabaseConnection
6 from plomtask.misc import VersionedAttribute
7 from plomtask.conditions import Condition
8 from plomtask.exceptions import NotFoundException, BadFormatException
12 """Template for, and metadata for, Todos, and their arrangements."""
14 # pylint: disable=too-many-instance-attributes
16 def __init__(self, id_: int | None) -> None:
17 if (id_ is not None) and id_ < 1:
18 raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
20 self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
21 self.description = VersionedAttribute(self, 'process_descriptions', '')
22 self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
23 self.explicit_steps: list[ProcessStep] = []
24 self.conditions: list[Condition] = []
25 self.fulfills: list[Condition] = []
26 self.undoes: list[Condition] = []
29 def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Process:
30 """Make Process from database row, with empty VersionedAttributes."""
32 assert process.id_ is not None
33 db_conn.cached_processes[process.id_] = process
37 def all(cls, db_conn: DatabaseConnection) -> list[Process]:
38 """Collect all Processes and their connected VersionedAttributes."""
40 for id_, process in db_conn.cached_processes.items():
41 processes[id_] = process
42 already_recorded = processes.keys()
43 for row in db_conn.exec('SELECT id FROM processes'):
44 if row[0] not in already_recorded:
45 process = cls.by_id(db_conn, row[0])
46 processes[process.id_] = process
47 return list(processes.values())
50 def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
51 create: bool = False) -> Process:
52 """Collect Process, its VersionedAttributes, and its child IDs."""
53 if id_ in db_conn.cached_processes.keys():
54 process = db_conn.cached_processes[id_]
55 assert isinstance(process, Process)
58 for row in db_conn.exec('SELECT * FROM processes '
59 'WHERE id = ?', (id_,)):
64 raise NotFoundException(f'Process not found of id: {id_}')
65 process = Process(id_)
66 for row in db_conn.exec('SELECT * FROM process_titles '
67 'WHERE parent_id = ?', (process.id_,)):
68 process.title.history[row[1]] = row[2]
69 for row in db_conn.exec('SELECT * FROM process_descriptions '
70 'WHERE parent_id = ?', (process.id_,)):
71 process.description.history[row[1]] = row[2]
72 for row in db_conn.exec('SELECT * FROM process_efforts '
73 'WHERE parent_id = ?', (process.id_,)):
74 process.effort.history[row[1]] = row[2]
75 for row in db_conn.exec('SELECT * FROM process_steps '
76 'WHERE owner_id = ?', (process.id_,)):
77 process.explicit_steps += [ProcessStep.from_table_row(db_conn,
79 for row in db_conn.exec('SELECT condition FROM process_conditions '
80 'WHERE process = ?', (process.id_,)):
81 process.conditions += [Condition.by_id(db_conn, row[0])]
82 for row in db_conn.exec('SELECT condition FROM process_fulfills '
83 'WHERE process = ?', (process.id_,)):
84 process.fulfills += [Condition.by_id(db_conn, row[0])]
85 for row in db_conn.exec('SELECT condition FROM process_undoes '
86 'WHERE process = ?', (process.id_,)):
87 process.undoes += [Condition.by_id(db_conn, row[0])]
88 assert isinstance(process, Process)
91 def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
92 """Return Processes using self for a ProcessStep."""
94 for owner_id in db_conn.exec('SELECT owner_id FROM process_steps WHERE'
95 ' step_process_id = ?', (self.id_,)):
96 owner_ids.add(owner_id[0])
97 return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
99 def get_steps(self, db_conn: DatabaseConnection, external_owner:
100 Process | None = None) -> dict[int, dict[str, object]]:
101 """Return tree of depended-on explicit and implicit ProcessSteps."""
103 def make_node(step: ProcessStep) -> dict[str, object]:
105 if external_owner is not None:
106 is_explicit = step.owner_id == external_owner.id_
107 process = self.__class__.by_id(db_conn, step.step_process_id)
108 step_steps = process.get_steps(db_conn, external_owner)
109 return {'process': process, 'parent_id': step.parent_step_id,
110 'is_explicit': is_explicit, 'steps': step_steps}
112 def walk_steps(node_id: int, node: dict[str, Any]) -> None:
113 explicit_children = [s for s in self.explicit_steps
114 if s.parent_step_id == node_id]
115 for child in explicit_children:
116 node['steps'][child.id_] = make_node(child)
117 node['seen'] = node_id in seen_step_ids
118 seen_step_ids.add(node_id)
119 for id_, step in node['steps'].items():
120 walk_steps(id_, step)
122 steps: dict[int, dict[str, object]] = {}
123 seen_step_ids: Set[int] = set()
124 if external_owner is None:
125 external_owner = self
126 for step in [s for s in self.explicit_steps
127 if s.parent_step_id is None]:
128 assert step.id_ is not None # for mypy
129 steps[step.id_] = make_node(step)
130 for step_id, step_node in steps.items():
131 walk_steps(step_id, step_node)
134 def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
135 trgt: str = 'conditions') -> None:
136 """Set self.[target] to Conditions identified by ids."""
137 trgt_list = getattr(self, trgt)
138 while len(trgt_list) > 0:
141 trgt_list += [Condition.by_id(db_conn, id_)]
143 def set_fulfills(self, db_conn: DatabaseConnection,
144 ids: list[int]) -> None:
145 """Set self.fulfills to Conditions identified by ids."""
146 self.set_conditions(db_conn, ids, 'fulfills')
148 def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
149 """Set self.undoes to Conditions identified by ids."""
150 self.set_conditions(db_conn, ids, 'undoes')
153 db_conn: DatabaseConnection,
155 step_process_id: int,
156 parent_step_id: int | None) -> ProcessStep:
157 """Create new ProcessStep, save and add it to self.explicit_steps.
159 Also checks against step recursion.
161 The new step's parent_step_id will fall back to None either if no
162 matching ProcessStep is found (which can be assumed in case it was
163 just deleted under its feet), or if the parent step would not be
164 owned by the current Process.
166 def walk_steps(node: ProcessStep) -> None:
167 if node.step_process_id == self.id_:
168 raise BadFormatException('bad step selection causes recursion')
169 step_process = self.by_id(db_conn, node.step_process_id)
170 for step in step_process.explicit_steps:
172 if parent_step_id is not None:
174 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
175 if parent_step.owner_id != self.id_:
176 parent_step_id = None
177 except NotFoundException:
178 parent_step_id = None
179 assert self.id_ is not None
180 step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
182 self.explicit_steps += [step]
183 step.save(db_conn) # NB: This ensures a non-None step.id_.
186 def set_steps(self, db_conn: DatabaseConnection,
187 steps: list[tuple[int | None, int, int | None]]) -> None:
188 """Set self.explicit_steps in bulk."""
189 for step in self.explicit_steps:
190 assert step.id_ is not None
191 del db_conn.cached_process_steps[step.id_]
192 self.explicit_steps = []
193 db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
195 for step_tuple in steps:
196 self._add_step(db_conn, step_tuple[0],
197 step_tuple[1], step_tuple[2])
199 def save_id(self, db_conn: DatabaseConnection) -> None:
200 """Write bare-bones self (sans connected items), ensuring self.id_."""
201 cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
202 self.id_ = cursor.lastrowid
204 def save(self, db_conn: DatabaseConnection) -> None:
205 """Add (or re-write) self and connected items to DB."""
206 self.save_id(db_conn)
207 self.title.save(db_conn)
208 self.description.save(db_conn)
209 self.effort.save(db_conn)
210 db_conn.exec('DELETE FROM process_conditions WHERE process = ?',
212 for condition in self.conditions:
213 db_conn.exec('INSERT INTO process_conditions VALUES (?,?)',
214 (self.id_, condition.id_))
215 db_conn.exec('DELETE FROM process_fulfills WHERE process = ?',
217 for condition in self.fulfills:
218 db_conn.exec('INSERT INTO process_fulfills VALUES (?,?)',
219 (self.id_, condition.id_))
220 db_conn.exec('DELETE FROM process_undoes WHERE process = ?',
222 for condition in self.undoes:
223 db_conn.exec('INSERT INTO process_undoes VALUES (?,?)',
224 (self.id_, condition.id_))
225 assert self.id_ is not None
226 db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
228 for step in self.explicit_steps:
230 db_conn.cached_processes[self.id_] = self
234 """Sub-unit of Processes."""
236 def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
237 parent_step_id: int | None) -> None:
239 self.owner_id = owner_id
240 self.step_process_id = step_process_id
241 self.parent_step_id = parent_step_id
244 def from_table_row(cls, db_conn: DatabaseConnection,
245 row: Row) -> ProcessStep:
246 """Make ProcessStep from database row, store in DB cache."""
247 step = cls(row[0], row[1], row[2], row[3])
248 assert step.id_ is not None
249 db_conn.cached_process_steps[step.id_] = step
253 def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
254 """Retrieve ProcessStep by id_, or throw NotFoundException."""
255 if id_ in db_conn.cached_process_steps.keys():
256 step = db_conn.cached_process_steps[id_]
257 assert isinstance(step, ProcessStep)
259 for row in db_conn.exec('SELECT * FROM process_steps '
260 'WHERE step_id = ?', (id_,)):
261 return cls.from_table_row(db_conn, row)
262 raise NotFoundException(f'found no ProcessStep of ID {id_}')
264 def save(self, db_conn: DatabaseConnection) -> None:
265 """Save to database and cache."""
266 cursor = db_conn.exec('REPLACE INTO process_steps VALUES (?, ?, ?, ?)',
267 (self.id_, self.owner_id, self.step_process_id,
268 self.parent_step_id))
269 self.id_ = cursor.lastrowid
270 assert self.id_ is not None
271 db_conn.cached_process_steps[self.id_] = self