home · contact · privacy
76ed30df049b5d801b283ab904364da27f05f240
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from typing import Any, Set
5 from plomtask.db import DatabaseConnection
6 from plomtask.misc import VersionedAttribute
7 from plomtask.conditions import Condition
8 from plomtask.exceptions import NotFoundException, BadFormatException
9
10
11 class Process:
12     """Template for, and metadata for, Todos, and their arrangements."""
13
14     # pylint: disable=too-many-instance-attributes
15
16     def __init__(self, id_: int | None) -> None:
17         if (id_ is not None) and id_ < 1:
18             raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
19         self.id_ = id_
20         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
21         self.description = VersionedAttribute(self, 'process_descriptions', '')
22         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
23         self.explicit_steps: list[ProcessStep] = []
24         self.conditions: list[Condition] = []
25         self.fulfills: list[Condition] = []
26         self.undoes: list[Condition] = []
27
28     @classmethod
29     def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Process:
30         """Make Process from database row, with empty VersionedAttributes."""
31         process = cls(row[0])
32         assert process.id_ is not None
33         db_conn.cached_processes[process.id_] = process
34         return process
35
36     @classmethod
37     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
38         """Collect all Processes and their connected VersionedAttributes."""
39         processes = {}
40         for id_, process in db_conn.cached_processes.items():
41             processes[id_] = process
42         already_recorded = processes.keys()
43         for row in db_conn.exec('SELECT id FROM processes'):
44             if row[0] not in already_recorded:
45                 process = cls.by_id(db_conn, row[0])
46                 processes[process.id_] = process
47         return list(processes.values())
48
49     @classmethod
50     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
51               create: bool = False) -> Process:
52         """Collect Process, its VersionedAttributes, and its child IDs."""
53         if id_ in db_conn.cached_processes.keys():
54             process = db_conn.cached_processes[id_]
55             assert isinstance(process, Process)
56             return process
57         process = None
58         for row in db_conn.exec('SELECT * FROM processes '
59                                 'WHERE id = ?', (id_,)):
60             process = cls(row[0])
61             break
62         if not process:
63             if not create:
64                 raise NotFoundException(f'Process not found of id: {id_}')
65             process = Process(id_)
66         for row in db_conn.exec('SELECT * FROM process_titles '
67                                 'WHERE parent_id = ?', (process.id_,)):
68             process.title.history[row[1]] = row[2]
69         for row in db_conn.exec('SELECT * FROM process_descriptions '
70                                 'WHERE parent_id = ?', (process.id_,)):
71             process.description.history[row[1]] = row[2]
72         for row in db_conn.exec('SELECT * FROM process_efforts '
73                                 'WHERE parent_id = ?', (process.id_,)):
74             process.effort.history[row[1]] = row[2]
75         for row in db_conn.exec('SELECT * FROM process_steps '
76                                 'WHERE owner_id = ?', (process.id_,)):
77             process.explicit_steps += [ProcessStep.from_table_row(db_conn,
78                                                                   row)]
79         for row in db_conn.exec('SELECT condition FROM process_conditions '
80                                 'WHERE process = ?', (process.id_,)):
81             process.conditions += [Condition.by_id(db_conn, row[0])]
82         for row in db_conn.exec('SELECT condition FROM process_fulfills '
83                                 'WHERE process = ?', (process.id_,)):
84             process.fulfills += [Condition.by_id(db_conn, row[0])]
85         for row in db_conn.exec('SELECT condition FROM process_undoes '
86                                 'WHERE process = ?', (process.id_,)):
87             process.undoes += [Condition.by_id(db_conn, row[0])]
88         assert isinstance(process, Process)
89         return process
90
91     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
92         """Return Processes using self for a ProcessStep."""
93         owner_ids = set()
94         for owner_id in db_conn.exec('SELECT owner_id FROM process_steps WHERE'
95                                      ' step_process_id = ?', (self.id_,)):
96             owner_ids.add(owner_id[0])
97         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
98
99     def get_steps(self, db_conn: DatabaseConnection, external_owner:
100                   Process | None = None) -> dict[int, dict[str, object]]:
101         """Return tree of depended-on explicit and implicit ProcessSteps."""
102
103         def make_node(step: ProcessStep) -> dict[str, object]:
104             is_explicit = False
105             if external_owner is not None:
106                 is_explicit = step.owner_id == external_owner.id_
107             process = self.__class__.by_id(db_conn, step.step_process_id)
108             step_steps = process.get_steps(db_conn, external_owner)
109             return {'process': process, 'parent_id': step.parent_step_id,
110                     'is_explicit': is_explicit, 'steps': step_steps}
111
112         def walk_steps(node_id: int, node: dict[str, Any]) -> None:
113             explicit_children = [s for s in self.explicit_steps
114                                  if s.parent_step_id == node_id]
115             for child in explicit_children:
116                 node['steps'][child.id_] = make_node(child)
117             node['seen'] = node_id in seen_step_ids
118             seen_step_ids.add(node_id)
119             for id_, step in node['steps'].items():
120                 walk_steps(id_, step)
121
122         steps: dict[int, dict[str, object]] = {}
123         seen_step_ids: Set[int] = set()
124         if external_owner is None:
125             external_owner = self
126         for step in [s for s in self.explicit_steps
127                      if s.parent_step_id is None]:
128             assert step.id_ is not None  # for mypy
129             steps[step.id_] = make_node(step)
130         for step_id, step_node in steps.items():
131             walk_steps(step_id, step_node)
132         return steps
133
134     def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
135                        trgt: str = 'conditions') -> None:
136         """Set self.[target] to Conditions identified by ids."""
137         trgt_list = getattr(self, trgt)
138         while len(trgt_list) > 0:
139             trgt_list.pop()
140         for id_ in ids:
141             trgt_list += [Condition.by_id(db_conn, id_)]
142
143     def set_fulfills(self, db_conn: DatabaseConnection,
144                      ids: list[int]) -> None:
145         """Set self.fulfills to Conditions identified by ids."""
146         self.set_conditions(db_conn, ids, 'fulfills')
147
148     def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
149         """Set self.undoes to Conditions identified by ids."""
150         self.set_conditions(db_conn, ids, 'undoes')
151
152     def _add_step(self,
153                   db_conn: DatabaseConnection,
154                   id_: int | None,
155                   step_process_id: int,
156                   parent_step_id: int | None) -> ProcessStep:
157         """Create new ProcessStep, save and add it to self.explicit_steps.
158
159         Also checks against step recursion.
160
161         The new step's parent_step_id will fall back to None either if no
162         matching ProcessStep is found (which can be assumed in case it was
163         just deleted under its feet), or if the parent step would not be
164         owned by the current Process.
165         """
166         def walk_steps(node: ProcessStep) -> None:
167             if node.step_process_id == self.id_:
168                 raise BadFormatException('bad step selection causes recursion')
169             step_process = self.by_id(db_conn, node.step_process_id)
170             for step in step_process.explicit_steps:
171                 walk_steps(step)
172         if parent_step_id is not None:
173             try:
174                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
175                 if parent_step.owner_id != self.id_:
176                     parent_step_id = None
177             except NotFoundException:
178                 parent_step_id = None
179         assert self.id_ is not None
180         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
181         walk_steps(step)
182         self.explicit_steps += [step]
183         step.save(db_conn)  # NB: This ensures a non-None step.id_.
184         return step
185
186     def set_steps(self, db_conn: DatabaseConnection,
187                   steps: list[tuple[int | None, int, int | None]]) -> None:
188         """Set self.explicit_steps in bulk."""
189         for step in self.explicit_steps:
190             assert step.id_ is not None
191             del db_conn.cached_process_steps[step.id_]
192         self.explicit_steps = []
193         db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
194                      (self.id_,))
195         for step_tuple in steps:
196             self._add_step(db_conn, step_tuple[0],
197                            step_tuple[1], step_tuple[2])
198
199     def save_id(self, db_conn: DatabaseConnection) -> None:
200         """Write bare-bones self (sans connected items), ensuring self.id_."""
201         cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
202         self.id_ = cursor.lastrowid
203
204     def save(self, db_conn: DatabaseConnection) -> None:
205         """Add (or re-write) self and connected items to DB."""
206         self.save_id(db_conn)
207         self.title.save(db_conn)
208         self.description.save(db_conn)
209         self.effort.save(db_conn)
210         db_conn.exec('DELETE FROM process_conditions WHERE process = ?',
211                      (self.id_,))
212         for condition in self.conditions:
213             db_conn.exec('INSERT INTO process_conditions VALUES (?,?)',
214                          (self.id_, condition.id_))
215         db_conn.exec('DELETE FROM process_fulfills WHERE process = ?',
216                      (self.id_,))
217         for condition in self.fulfills:
218             db_conn.exec('INSERT INTO process_fulfills VALUES (?,?)',
219                          (self.id_, condition.id_))
220         db_conn.exec('DELETE FROM process_undoes WHERE process = ?',
221                      (self.id_,))
222         for condition in self.undoes:
223             db_conn.exec('INSERT INTO process_undoes VALUES (?,?)',
224                          (self.id_, condition.id_))
225         assert self.id_ is not None
226         db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
227                      (self.id_,))
228         for step in self.explicit_steps:
229             step.save(db_conn)
230         db_conn.cached_processes[self.id_] = self
231
232
233 class ProcessStep:
234     """Sub-unit of Processes."""
235
236     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
237                  parent_step_id: int | None) -> None:
238         self.id_ = id_
239         self.owner_id = owner_id
240         self.step_process_id = step_process_id
241         self.parent_step_id = parent_step_id
242
243     @classmethod
244     def from_table_row(cls, db_conn: DatabaseConnection,
245                        row: Row) -> ProcessStep:
246         """Make ProcessStep from database row, store in DB cache."""
247         step = cls(row[0], row[1], row[2], row[3])
248         assert step.id_ is not None
249         db_conn.cached_process_steps[step.id_] = step
250         return step
251
252     @classmethod
253     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
254         """Retrieve ProcessStep by id_, or throw NotFoundException."""
255         if id_ in db_conn.cached_process_steps.keys():
256             step = db_conn.cached_process_steps[id_]
257             assert isinstance(step, ProcessStep)
258             return step
259         for row in db_conn.exec('SELECT * FROM process_steps '
260                                 'WHERE step_id = ?', (id_,)):
261             return cls.from_table_row(db_conn, row)
262         raise NotFoundException(f'found no ProcessStep of ID {id_}')
263
264     def save(self, db_conn: DatabaseConnection) -> None:
265         """Save to database and cache."""
266         cursor = db_conn.exec('REPLACE INTO process_steps VALUES (?, ?, ?, ?)',
267                               (self.id_, self.owner_id, self.step_process_id,
268                                self.parent_step_id))
269         self.id_ = cursor.lastrowid
270         assert self.id_ is not None
271         db_conn.cached_process_steps[self.id_] = self