1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from typing import Any, Set
5 from plomtask.db import DatabaseConnection
6 from plomtask.misc import VersionedAttribute
7 from plomtask.conditions import Condition
8 from plomtask.exceptions import NotFoundException, BadFormatException
12 """Template for, and metadata for, Todos, and their arrangements."""
14 # pylint: disable=too-many-instance-attributes
16 def __init__(self, id_: int | None) -> None:
17 if (id_ is not None) and id_ < 1:
18 raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
20 self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
21 self.description = VersionedAttribute(self, 'process_descriptions', '')
22 self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
23 self.explicit_steps: list[ProcessStep] = []
24 self.conditions: list[Condition] = []
25 self.fulfills: list[Condition] = []
26 self.undoes: list[Condition] = []
29 def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Process:
30 """Make Process from database row, with empty VersionedAttributes."""
32 assert process.id_ is not None
33 db_conn.cached_processes[process.id_] = process
37 def all(cls, db_conn: DatabaseConnection) -> list[Process]:
38 """Collect all Processes and their connected VersionedAttributes."""
40 for id_, process in db_conn.cached_processes.items():
41 processes[id_] = process
42 already_recorded = processes.keys()
43 for row in db_conn.exec('SELECT id FROM processes'):
44 if row[0] not in already_recorded:
45 process = cls.by_id(db_conn, row[0])
46 processes[process.id_] = process
47 return list(processes.values())
50 def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
51 create: bool = False) -> Process:
52 """Collect Process, its VersionedAttributes, and its child IDs."""
53 if id_ in db_conn.cached_processes.keys():
54 process = db_conn.cached_processes[id_]
55 assert isinstance(process, Process)
58 for row in db_conn.exec('SELECT * FROM processes '
59 'WHERE id = ?', (id_,)):
64 raise NotFoundException(f'Process not found of id: {id_}')
65 process = Process(id_)
66 for row in db_conn.exec('SELECT * FROM process_titles '
67 'WHERE parent_id = ?', (process.id_,)):
68 process.title.history[row[1]] = row[2]
69 for row in db_conn.exec('SELECT * FROM process_descriptions '
70 'WHERE parent_id = ?', (process.id_,)):
71 process.description.history[row[1]] = row[2]
72 for row in db_conn.exec('SELECT * FROM process_efforts '
73 'WHERE parent_id = ?', (process.id_,)):
74 process.effort.history[row[1]] = row[2]
75 for row in db_conn.exec('SELECT * FROM process_steps '
76 'WHERE owner_id = ?', (process.id_,)):
77 process.explicit_steps += [ProcessStep.from_table_row(db_conn,
79 for row in db_conn.exec('SELECT condition FROM process_conditions '
80 'WHERE process = ?', (process.id_,)):
81 process.conditions += [Condition.by_id(db_conn, row[0])]
82 for row in db_conn.exec('SELECT condition FROM process_fulfills '
83 'WHERE process = ?', (process.id_,)):
84 process.fulfills += [Condition.by_id(db_conn, row[0])]
85 for row in db_conn.exec('SELECT condition FROM process_undoes '
86 'WHERE process = ?', (process.id_,)):
87 process.undoes += [Condition.by_id(db_conn, row[0])]
88 assert isinstance(process, Process)
91 def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
92 """Return Processes using self for a ProcessStep."""
94 for owner_id in db_conn.exec('SELECT owner_id FROM process_steps WHERE'
95 ' step_process_id = ?', (self.id_,)):
96 owner_ids.add(owner_id[0])
97 return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
99 def get_steps(self, db_conn: DatabaseConnection, external_owner:
100 Process | None = None) -> dict[int, dict[str, object]]:
101 """Return tree of depended-on explicit and implicit ProcessSteps."""
103 def make_node(step: ProcessStep) -> dict[str, object]:
105 if external_owner is not None:
106 is_explicit = step.owner_id == external_owner.id_
107 process = self.__class__.by_id(db_conn, step.step_process_id)
108 step_steps = process.get_steps(db_conn, external_owner)
109 return {'process': process, 'parent_id': step.parent_step_id,
110 'is_explicit': is_explicit, 'steps': step_steps}
112 def walk_steps(node_id: int, node: dict[str, Any]) -> None:
113 explicit_children = [s for s in self.explicit_steps
114 if s.parent_step_id == node_id]
115 for child in explicit_children:
116 node['steps'][child.id_] = make_node(child)
117 node['seen'] = node_id in seen_step_ids
118 seen_step_ids.add(node_id)
119 for id_, step in node['steps'].items():
120 walk_steps(id_, step)
122 steps: dict[int, dict[str, object]] = {}
123 seen_step_ids: Set[int] = set()
124 if external_owner is None:
125 external_owner = self
126 for step in [s for s in self.explicit_steps
127 if s.parent_step_id is None]:
128 assert step.id_ is not None # for mypy
129 steps[step.id_] = make_node(step)
130 for step_id, step_node in steps.items():
131 walk_steps(step_id, step_node)
134 def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
135 trgt: str = 'conditions') -> None:
136 """Set self.[target] to Conditions identified by ids."""
137 trgt_list = getattr(self, trgt)
138 while len(trgt_list) > 0:
141 trgt_list += [Condition.by_id(db_conn, id_)]
143 def set_fulfills(self, db_conn: DatabaseConnection,
144 ids: list[int]) -> None:
145 """Set self.fulfills to Conditions identified by ids."""
146 self.set_conditions(db_conn, ids, 'fulfills')
148 def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
149 """Set self.undoes to Conditions identified by ids."""
150 self.set_conditions(db_conn, ids, 'undoes')
152 def add_step(self, db_conn: DatabaseConnection, id_: int | None,
153 step_process_id: int,
154 parent_step_id: int | None) -> ProcessStep:
155 """Create new ProcessStep, save and add it to self.explicit_steps.
157 Also checks against step recursion.
158 The new step's parent_step_id will fall back to None either if no
159 matching ProcessStep is found (which can be assumed in case it was
160 just deleted under its feet), or if the parent step would not be
161 owned by the current Process.
163 def walk_steps(node: ProcessStep) -> None:
164 if node.step_process_id == self.id_:
165 raise BadFormatException('bad step selection causes recursion')
166 step_process = self.by_id(db_conn, node.step_process_id)
167 for step in step_process.explicit_steps:
169 if parent_step_id is not None:
171 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
172 if parent_step.owner_id != self.id_:
173 parent_step_id = None
174 except NotFoundException:
175 parent_step_id = None
176 assert self.id_ is not None
177 step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
179 self.explicit_steps += [step]
180 step.save(db_conn) # NB: This ensures a non-None step.id_.
183 def save_without_steps(self, db_conn: DatabaseConnection) -> None:
184 """Add (or re-write) self and connected VersionedAttributes to DB."""
185 cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
186 self.id_ = cursor.lastrowid
187 self.title.save(db_conn)
188 self.description.save(db_conn)
189 self.effort.save(db_conn)
190 db_conn.exec('DELETE FROM process_conditions WHERE process = ?',
192 for condition in self.conditions:
193 db_conn.exec('INSERT INTO process_conditions VALUES (?,?)',
194 (self.id_, condition.id_))
195 db_conn.exec('DELETE FROM process_fulfills WHERE process = ?',
197 for condition in self.fulfills:
198 db_conn.exec('INSERT INTO process_fulfills VALUES (?,?)',
199 (self.id_, condition.id_))
200 db_conn.exec('DELETE FROM process_undoes WHERE process = ?',
202 for condition in self.undoes:
203 db_conn.exec('INSERT INTO process_undoes VALUES (?,?)',
204 (self.id_, condition.id_))
205 assert self.id_ is not None
206 db_conn.cached_processes[self.id_] = self
208 def fix_steps(self, db_conn: DatabaseConnection) -> None:
209 """Rewrite ProcessSteps from self.explicit_steps.
211 This also fixes illegal Step.parent_step_id values, i.e. those pointing
212 to steps now absent, or owned by a different Process, fall back into
215 db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
217 for step in self.explicit_steps:
218 if step.parent_step_id is not None:
220 parent_step = ProcessStep.by_id(db_conn,
222 if parent_step.owner_id != self.id_:
223 step.parent_step_id = None
224 except NotFoundException:
225 step.parent_step_id = None
230 """Sub-unit of Processes."""
232 def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
233 parent_step_id: int | None) -> None:
235 self.owner_id = owner_id
236 self.step_process_id = step_process_id
237 self.parent_step_id = parent_step_id
240 def from_table_row(cls, db_conn: DatabaseConnection,
241 row: Row) -> ProcessStep:
242 """Make ProcessStep from database row, store in DB cache."""
243 step = cls(row[0], row[1], row[2], row[3])
244 assert step.id_ is not None
245 db_conn.cached_process_steps[step.id_] = step
249 def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
250 """Retrieve ProcessStep by id_, or throw NotFoundException."""
251 if id_ in db_conn.cached_process_steps.keys():
252 step = db_conn.cached_process_steps[id_]
253 assert isinstance(step, ProcessStep)
255 for row in db_conn.exec('SELECT * FROM process_steps '
256 'WHERE step_id = ?', (id_,)):
257 return cls.from_table_row(db_conn, row)
258 raise NotFoundException(f'found no ProcessStep of ID {id_}')
260 def save(self, db_conn: DatabaseConnection) -> None:
261 """Save to database and cache."""
262 cursor = db_conn.exec('REPLACE INTO process_steps VALUES (?, ?, ?, ?)',
263 (self.id_, self.owner_id, self.step_process_id,
264 self.parent_step_id))
265 self.id_ = cursor.lastrowid
266 assert self.id_ is not None
267 db_conn.cached_process_steps[self.id_] = self