1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from typing import Any, Set
5 from plomtask.db import DatabaseConnection, BaseModel
6 from plomtask.misc import VersionedAttribute
7 from plomtask.conditions import Condition
8 from plomtask.exceptions import NotFoundException, BadFormatException
11 class Process(BaseModel):
12 """Template for, and metadata for, Todos, and their arrangements."""
13 table_name = 'processes'
15 # pylint: disable=too-many-instance-attributes
17 def __init__(self, id_: int | None) -> None:
19 self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
20 self.description = VersionedAttribute(self, 'process_descriptions', '')
21 self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
22 self.explicit_steps: list[ProcessStep] = []
23 self.conditions: list[Condition] = []
24 self.fulfills: list[Condition] = []
25 self.undoes: list[Condition] = []
28 def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Process:
29 """Make Process from database row, with empty VersionedAttributes."""
31 assert isinstance(process.id_, int)
32 db_conn.cached_processes[process.id_] = process
36 def all(cls, db_conn: DatabaseConnection) -> list[Process]:
37 """Collect all Processes and their connected VersionedAttributes."""
39 for id_, process in db_conn.cached_processes.items():
40 processes[id_] = process
41 already_recorded = processes.keys()
42 for row in db_conn.exec('SELECT id FROM processes'):
43 if row[0] not in already_recorded:
44 process = cls.by_id(db_conn, row[0])
45 processes[process.id_] = process
46 return list(processes.values())
49 def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
50 create: bool = False) -> Process:
51 """Collect Process, its VersionedAttributes, and its child IDs."""
52 if id_ in db_conn.cached_processes.keys():
53 process = db_conn.cached_processes[id_]
54 assert isinstance(process, Process)
57 for row in db_conn.exec('SELECT * FROM processes '
58 'WHERE id = ?', (id_,)):
63 raise NotFoundException(f'Process not found of id: {id_}')
64 process = Process(id_)
65 for row in db_conn.exec('SELECT * FROM process_titles '
66 'WHERE parent_id = ?', (process.id_,)):
67 process.title.history[row[1]] = row[2]
68 for row in db_conn.exec('SELECT * FROM process_descriptions '
69 'WHERE parent_id = ?', (process.id_,)):
70 process.description.history[row[1]] = row[2]
71 for row in db_conn.exec('SELECT * FROM process_efforts '
72 'WHERE parent_id = ?', (process.id_,)):
73 process.effort.history[row[1]] = row[2]
74 for row in db_conn.exec('SELECT * FROM process_steps '
75 'WHERE owner_id = ?', (process.id_,)):
76 process.explicit_steps += [ProcessStep.from_table_row(db_conn,
78 for row in db_conn.exec('SELECT condition FROM process_conditions '
79 'WHERE process = ?', (process.id_,)):
80 process.conditions += [Condition.by_id(db_conn, row[0])]
81 for row in db_conn.exec('SELECT condition FROM process_fulfills '
82 'WHERE process = ?', (process.id_,)):
83 process.fulfills += [Condition.by_id(db_conn, row[0])]
84 for row in db_conn.exec('SELECT condition FROM process_undoes '
85 'WHERE process = ?', (process.id_,)):
86 process.undoes += [Condition.by_id(db_conn, row[0])]
87 assert isinstance(process, Process)
90 def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
91 """Return Processes using self for a ProcessStep."""
93 for owner_id in db_conn.exec('SELECT owner_id FROM process_steps WHERE'
94 ' step_process_id = ?', (self.id_,)):
95 owner_ids.add(owner_id[0])
96 return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
98 def get_steps(self, db_conn: DatabaseConnection, external_owner:
99 Process | None = None) -> dict[int, dict[str, object]]:
100 """Return tree of depended-on explicit and implicit ProcessSteps."""
102 def make_node(step: ProcessStep) -> dict[str, object]:
104 if external_owner is not None:
105 is_explicit = step.owner_id == external_owner.id_
106 process = self.__class__.by_id(db_conn, step.step_process_id)
107 step_steps = process.get_steps(db_conn, external_owner)
108 return {'process': process, 'parent_id': step.parent_step_id,
109 'is_explicit': is_explicit, 'steps': step_steps}
111 def walk_steps(node_id: int, node: dict[str, Any]) -> None:
112 explicit_children = [s for s in self.explicit_steps
113 if s.parent_step_id == node_id]
114 for child in explicit_children:
115 node['steps'][child.id_] = make_node(child)
116 node['seen'] = node_id in seen_step_ids
117 seen_step_ids.add(node_id)
118 for id_, step in node['steps'].items():
119 walk_steps(id_, step)
121 steps: dict[int, dict[str, object]] = {}
122 seen_step_ids: Set[int] = set()
123 if external_owner is None:
124 external_owner = self
125 for step in [s for s in self.explicit_steps
126 if s.parent_step_id is None]:
127 assert isinstance(step.id_, int)
128 steps[step.id_] = make_node(step)
129 for step_id, step_node in steps.items():
130 walk_steps(step_id, step_node)
133 def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
134 trgt: str = 'conditions') -> None:
135 """Set self.[target] to Conditions identified by ids."""
136 trgt_list = getattr(self, trgt)
137 while len(trgt_list) > 0:
140 trgt_list += [Condition.by_id(db_conn, id_)]
142 def set_fulfills(self, db_conn: DatabaseConnection,
143 ids: list[int]) -> None:
144 """Set self.fulfills to Conditions identified by ids."""
145 self.set_conditions(db_conn, ids, 'fulfills')
147 def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
148 """Set self.undoes to Conditions identified by ids."""
149 self.set_conditions(db_conn, ids, 'undoes')
152 db_conn: DatabaseConnection,
154 step_process_id: int,
155 parent_step_id: int | None) -> ProcessStep:
156 """Create new ProcessStep, save and add it to self.explicit_steps.
158 Also checks against step recursion.
160 The new step's parent_step_id will fall back to None either if no
161 matching ProcessStep is found (which can be assumed in case it was
162 just deleted under its feet), or if the parent step would not be
163 owned by the current Process.
165 def walk_steps(node: ProcessStep) -> None:
166 if node.step_process_id == self.id_:
167 raise BadFormatException('bad step selection causes recursion')
168 step_process = self.by_id(db_conn, node.step_process_id)
169 for step in step_process.explicit_steps:
171 if parent_step_id is not None:
173 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
174 if parent_step.owner_id != self.id_:
175 parent_step_id = None
176 except NotFoundException:
177 parent_step_id = None
178 assert isinstance(self.id_, int)
179 step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
181 self.explicit_steps += [step]
182 step.save(db_conn) # NB: This ensures a non-None step.id_.
185 def set_steps(self, db_conn: DatabaseConnection,
186 steps: list[tuple[int | None, int, int | None]]) -> None:
187 """Set self.explicit_steps in bulk."""
188 for step in self.explicit_steps:
189 assert isinstance(step.id_, int)
190 del db_conn.cached_process_steps[step.id_]
191 self.explicit_steps = []
192 db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
194 for step_tuple in steps:
195 self._add_step(db_conn, step_tuple[0],
196 step_tuple[1], step_tuple[2])
198 def save(self, db_conn: DatabaseConnection) -> None:
199 """Add (or re-write) self and connected items to DB."""
200 self.save_core(db_conn)
201 self.title.save(db_conn)
202 self.description.save(db_conn)
203 self.effort.save(db_conn)
204 db_conn.exec('DELETE FROM process_conditions WHERE process = ?',
206 for condition in self.conditions:
207 db_conn.exec('INSERT INTO process_conditions VALUES (?,?)',
208 (self.id_, condition.id_))
209 db_conn.exec('DELETE FROM process_fulfills WHERE process = ?',
211 for condition in self.fulfills:
212 db_conn.exec('INSERT INTO process_fulfills VALUES (?,?)',
213 (self.id_, condition.id_))
214 db_conn.exec('DELETE FROM process_undoes WHERE process = ?',
216 for condition in self.undoes:
217 db_conn.exec('INSERT INTO process_undoes VALUES (?,?)',
218 (self.id_, condition.id_))
219 assert isinstance(self.id_, int)
220 db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
222 for step in self.explicit_steps:
224 db_conn.cached_processes[self.id_] = self
227 class ProcessStep(BaseModel):
228 """Sub-unit of Processes."""
229 table_name = 'process_steps'
230 to_save = ['owner_id', 'step_process_id', 'parent_step_id']
232 def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
233 parent_step_id: int | None) -> None:
235 self.owner_id = owner_id
236 self.step_process_id = step_process_id
237 self.parent_step_id = parent_step_id
240 def from_table_row(cls, db_conn: DatabaseConnection,
241 row: Row) -> ProcessStep:
242 """Make ProcessStep from database row, store in DB cache."""
243 step = cls(row[0], row[1], row[2], row[3])
244 assert isinstance(step.id_, int)
245 db_conn.cached_process_steps[step.id_] = step
249 def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
250 """Retrieve ProcessStep by id_, or throw NotFoundException."""
251 if id_ in db_conn.cached_process_steps.keys():
252 step = db_conn.cached_process_steps[id_]
253 assert isinstance(step, ProcessStep)
255 for row in db_conn.exec('SELECT * FROM process_steps '
256 'WHERE step_id = ?', (id_,)):
257 return cls.from_table_row(db_conn, row)
258 raise NotFoundException(f'found no ProcessStep of ID {id_}')
260 def save(self, db_conn: DatabaseConnection) -> None:
261 """Default to simply calling self.save_core for simple cases."""
262 self.save_core(db_conn)