1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from typing import Any, Set
4 from plomtask.db import DatabaseConnection, BaseModel
5 from plomtask.misc import VersionedAttribute
6 from plomtask.conditions import Condition
7 from plomtask.exceptions import NotFoundException, BadFormatException
10 class Process(BaseModel):
11 """Template for, and metadata for, Todos, and their arrangements."""
12 table_name = 'processes'
14 # pylint: disable=too-many-instance-attributes
16 def __init__(self, id_: int | None) -> None:
18 self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
19 self.description = VersionedAttribute(self, 'process_descriptions', '')
20 self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
21 self.explicit_steps: list[ProcessStep] = []
22 self.conditions: list[Condition] = []
23 self.fulfills: list[Condition] = []
24 self.undoes: list[Condition] = []
27 def all(cls, db_conn: DatabaseConnection) -> list[Process]:
28 """Collect all Processes and their connected VersionedAttributes."""
30 for id_, process in db_conn.cached_processes.items():
31 processes[id_] = process
32 already_recorded = processes.keys()
33 for row in db_conn.exec('SELECT id FROM processes'):
34 if row[0] not in already_recorded:
35 process = cls.by_id(db_conn, row[0])
36 processes[process.id_] = process
37 return list(processes.values())
40 def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
41 create: bool = False) -> Process:
42 """Collect Process, its VersionedAttributes, and its child IDs."""
45 process, _ = super()._by_id(db_conn, id_)
48 raise NotFoundException(f'Process not found of id: {id_}')
49 process = Process(id_)
50 for row in db_conn.exec('SELECT * FROM process_titles '
51 'WHERE parent_id = ?', (process.id_,)):
52 process.title.history[row[1]] = row[2]
53 for row in db_conn.exec('SELECT * FROM process_descriptions '
54 'WHERE parent_id = ?', (process.id_,)):
55 process.description.history[row[1]] = row[2]
56 for row in db_conn.exec('SELECT * FROM process_efforts '
57 'WHERE parent_id = ?', (process.id_,)):
58 process.effort.history[row[1]] = row[2]
59 for row in db_conn.exec('SELECT * FROM process_steps '
60 'WHERE owner_id = ?', (process.id_,)):
61 process.explicit_steps += [ProcessStep.from_table_row(db_conn,
63 for row in db_conn.exec('SELECT condition FROM process_conditions '
64 'WHERE process = ?', (process.id_,)):
65 process.conditions += [Condition.by_id(db_conn, row[0])]
66 for row in db_conn.exec('SELECT condition FROM process_fulfills '
67 'WHERE process = ?', (process.id_,)):
68 process.fulfills += [Condition.by_id(db_conn, row[0])]
69 for row in db_conn.exec('SELECT condition FROM process_undoes '
70 'WHERE process = ?', (process.id_,)):
71 process.undoes += [Condition.by_id(db_conn, row[0])]
72 assert isinstance(process, Process)
75 def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
76 """Return Processes using self for a ProcessStep."""
78 for owner_id in db_conn.exec('SELECT owner_id FROM process_steps WHERE'
79 ' step_process_id = ?', (self.id_,)):
80 owner_ids.add(owner_id[0])
81 return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
83 def get_steps(self, db_conn: DatabaseConnection, external_owner:
84 Process | None = None) -> dict[int, dict[str, object]]:
85 """Return tree of depended-on explicit and implicit ProcessSteps."""
87 def make_node(step: ProcessStep) -> dict[str, object]:
89 if external_owner is not None:
90 is_explicit = step.owner_id == external_owner.id_
91 process = self.__class__.by_id(db_conn, step.step_process_id)
92 step_steps = process.get_steps(db_conn, external_owner)
93 return {'process': process, 'parent_id': step.parent_step_id,
94 'is_explicit': is_explicit, 'steps': step_steps}
96 def walk_steps(node_id: int, node: dict[str, Any]) -> None:
97 explicit_children = [s for s in self.explicit_steps
98 if s.parent_step_id == node_id]
99 for child in explicit_children:
100 node['steps'][child.id_] = make_node(child)
101 node['seen'] = node_id in seen_step_ids
102 seen_step_ids.add(node_id)
103 for id_, step in node['steps'].items():
104 walk_steps(id_, step)
106 steps: dict[int, dict[str, object]] = {}
107 seen_step_ids: Set[int] = set()
108 if external_owner is None:
109 external_owner = self
110 for step in [s for s in self.explicit_steps
111 if s.parent_step_id is None]:
112 assert isinstance(step.id_, int)
113 steps[step.id_] = make_node(step)
114 for step_id, step_node in steps.items():
115 walk_steps(step_id, step_node)
118 def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
119 trgt: str = 'conditions') -> None:
120 """Set self.[target] to Conditions identified by ids."""
121 trgt_list = getattr(self, trgt)
122 while len(trgt_list) > 0:
125 trgt_list += [Condition.by_id(db_conn, id_)]
127 def set_fulfills(self, db_conn: DatabaseConnection,
128 ids: list[int]) -> None:
129 """Set self.fulfills to Conditions identified by ids."""
130 self.set_conditions(db_conn, ids, 'fulfills')
132 def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
133 """Set self.undoes to Conditions identified by ids."""
134 self.set_conditions(db_conn, ids, 'undoes')
137 db_conn: DatabaseConnection,
139 step_process_id: int,
140 parent_step_id: int | None) -> ProcessStep:
141 """Create new ProcessStep, save and add it to self.explicit_steps.
143 Also checks against step recursion.
145 The new step's parent_step_id will fall back to None either if no
146 matching ProcessStep is found (which can be assumed in case it was
147 just deleted under its feet), or if the parent step would not be
148 owned by the current Process.
150 def walk_steps(node: ProcessStep) -> None:
151 if node.step_process_id == self.id_:
152 raise BadFormatException('bad step selection causes recursion')
153 step_process = self.by_id(db_conn, node.step_process_id)
154 for step in step_process.explicit_steps:
156 if parent_step_id is not None:
158 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
159 if parent_step.owner_id != self.id_:
160 parent_step_id = None
161 except NotFoundException:
162 parent_step_id = None
163 assert isinstance(self.id_, int)
164 step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
166 self.explicit_steps += [step]
167 step.save(db_conn) # NB: This ensures a non-None step.id_.
170 def set_steps(self, db_conn: DatabaseConnection,
171 steps: list[tuple[int | None, int, int | None]]) -> None:
172 """Set self.explicit_steps in bulk."""
173 for step in self.explicit_steps:
174 assert isinstance(step.id_, int)
175 del db_conn.cached_process_steps[step.id_]
176 self.explicit_steps = []
177 db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
179 for step_tuple in steps:
180 self._add_step(db_conn, step_tuple[0],
181 step_tuple[1], step_tuple[2])
183 def save(self, db_conn: DatabaseConnection) -> None:
184 """Add (or re-write) self and connected items to DB."""
185 self.save_core(db_conn)
186 self.title.save(db_conn)
187 self.description.save(db_conn)
188 self.effort.save(db_conn)
189 db_conn.exec('DELETE FROM process_conditions WHERE process = ?',
191 for condition in self.conditions:
192 db_conn.exec('INSERT INTO process_conditions VALUES (?,?)',
193 (self.id_, condition.id_))
194 db_conn.exec('DELETE FROM process_fulfills WHERE process = ?',
196 for condition in self.fulfills:
197 db_conn.exec('INSERT INTO process_fulfills VALUES (?,?)',
198 (self.id_, condition.id_))
199 db_conn.exec('DELETE FROM process_undoes WHERE process = ?',
201 for condition in self.undoes:
202 db_conn.exec('INSERT INTO process_undoes VALUES (?,?)',
203 (self.id_, condition.id_))
204 assert isinstance(self.id_, int)
205 db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
207 for step in self.explicit_steps:
209 db_conn.cached_processes[self.id_] = self
212 class ProcessStep(BaseModel):
213 """Sub-unit of Processes."""
214 table_name = 'process_steps'
215 to_save = ['owner_id', 'step_process_id', 'parent_step_id']
217 def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
218 parent_step_id: int | None) -> None:
220 self.owner_id = owner_id
221 self.step_process_id = step_process_id
222 self.parent_step_id = parent_step_id
225 def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
226 """Retrieve ProcessStep by id_, or throw NotFoundException."""
227 step, _ = super()._by_id(db_conn, id_)
229 assert isinstance(step, ProcessStep)
231 raise NotFoundException(f'found no ProcessStep of ID {id_}')
233 def save(self, db_conn: DatabaseConnection) -> None:
234 """Default to simply calling self.save_core for simple cases."""
235 self.save_core(db_conn)