home · contact · privacy
45de9dbe8e879f3ec06aae5575305e980df8c0d4
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from typing import Any, Set
5 from plomtask.db import DatabaseConnection, BaseModel
6 from plomtask.misc import VersionedAttribute
7 from plomtask.conditions import Condition
8 from plomtask.exceptions import NotFoundException, BadFormatException
9
10
11 class Process(BaseModel):
12     """Template for, and metadata for, Todos, and their arrangements."""
13     table_name = 'processes'
14
15     # pylint: disable=too-many-instance-attributes
16
17     def __init__(self, id_: int | None) -> None:
18         self.set_int_id(id_)
19         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
20         self.description = VersionedAttribute(self, 'process_descriptions', '')
21         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
22         self.explicit_steps: list[ProcessStep] = []
23         self.conditions: list[Condition] = []
24         self.fulfills: list[Condition] = []
25         self.undoes: list[Condition] = []
26
27     @classmethod
28     def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Process:
29         """Make Process from database row, with empty VersionedAttributes."""
30         process = cls(row[0])
31         assert isinstance(process.id_, int)
32         db_conn.cached_processes[process.id_] = process
33         return process
34
35     @classmethod
36     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
37         """Collect all Processes and their connected VersionedAttributes."""
38         processes = {}
39         for id_, process in db_conn.cached_processes.items():
40             processes[id_] = process
41         already_recorded = processes.keys()
42         for row in db_conn.exec('SELECT id FROM processes'):
43             if row[0] not in already_recorded:
44                 process = cls.by_id(db_conn, row[0])
45                 processes[process.id_] = process
46         return list(processes.values())
47
48     @classmethod
49     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
50               create: bool = False) -> Process:
51         """Collect Process, its VersionedAttributes, and its child IDs."""
52         if id_ in db_conn.cached_processes.keys():
53             process = db_conn.cached_processes[id_]
54             assert isinstance(process, Process)
55             return process
56         process = None
57         for row in db_conn.exec('SELECT * FROM processes '
58                                 'WHERE id = ?', (id_,)):
59             process = cls(row[0])
60             break
61         if not process:
62             if not create:
63                 raise NotFoundException(f'Process not found of id: {id_}')
64             process = Process(id_)
65         for row in db_conn.exec('SELECT * FROM process_titles '
66                                 'WHERE parent_id = ?', (process.id_,)):
67             process.title.history[row[1]] = row[2]
68         for row in db_conn.exec('SELECT * FROM process_descriptions '
69                                 'WHERE parent_id = ?', (process.id_,)):
70             process.description.history[row[1]] = row[2]
71         for row in db_conn.exec('SELECT * FROM process_efforts '
72                                 'WHERE parent_id = ?', (process.id_,)):
73             process.effort.history[row[1]] = row[2]
74         for row in db_conn.exec('SELECT * FROM process_steps '
75                                 'WHERE owner_id = ?', (process.id_,)):
76             process.explicit_steps += [ProcessStep.from_table_row(db_conn,
77                                                                   row)]
78         for row in db_conn.exec('SELECT condition FROM process_conditions '
79                                 'WHERE process = ?', (process.id_,)):
80             process.conditions += [Condition.by_id(db_conn, row[0])]
81         for row in db_conn.exec('SELECT condition FROM process_fulfills '
82                                 'WHERE process = ?', (process.id_,)):
83             process.fulfills += [Condition.by_id(db_conn, row[0])]
84         for row in db_conn.exec('SELECT condition FROM process_undoes '
85                                 'WHERE process = ?', (process.id_,)):
86             process.undoes += [Condition.by_id(db_conn, row[0])]
87         assert isinstance(process, Process)
88         return process
89
90     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
91         """Return Processes using self for a ProcessStep."""
92         owner_ids = set()
93         for owner_id in db_conn.exec('SELECT owner_id FROM process_steps WHERE'
94                                      ' step_process_id = ?', (self.id_,)):
95             owner_ids.add(owner_id[0])
96         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
97
98     def get_steps(self, db_conn: DatabaseConnection, external_owner:
99                   Process | None = None) -> dict[int, dict[str, object]]:
100         """Return tree of depended-on explicit and implicit ProcessSteps."""
101
102         def make_node(step: ProcessStep) -> dict[str, object]:
103             is_explicit = False
104             if external_owner is not None:
105                 is_explicit = step.owner_id == external_owner.id_
106             process = self.__class__.by_id(db_conn, step.step_process_id)
107             step_steps = process.get_steps(db_conn, external_owner)
108             return {'process': process, 'parent_id': step.parent_step_id,
109                     'is_explicit': is_explicit, 'steps': step_steps}
110
111         def walk_steps(node_id: int, node: dict[str, Any]) -> None:
112             explicit_children = [s for s in self.explicit_steps
113                                  if s.parent_step_id == node_id]
114             for child in explicit_children:
115                 node['steps'][child.id_] = make_node(child)
116             node['seen'] = node_id in seen_step_ids
117             seen_step_ids.add(node_id)
118             for id_, step in node['steps'].items():
119                 walk_steps(id_, step)
120
121         steps: dict[int, dict[str, object]] = {}
122         seen_step_ids: Set[int] = set()
123         if external_owner is None:
124             external_owner = self
125         for step in [s for s in self.explicit_steps
126                      if s.parent_step_id is None]:
127             assert isinstance(step.id_, int)
128             steps[step.id_] = make_node(step)
129         for step_id, step_node in steps.items():
130             walk_steps(step_id, step_node)
131         return steps
132
133     def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
134                        trgt: str = 'conditions') -> None:
135         """Set self.[target] to Conditions identified by ids."""
136         trgt_list = getattr(self, trgt)
137         while len(trgt_list) > 0:
138             trgt_list.pop()
139         for id_ in ids:
140             trgt_list += [Condition.by_id(db_conn, id_)]
141
142     def set_fulfills(self, db_conn: DatabaseConnection,
143                      ids: list[int]) -> None:
144         """Set self.fulfills to Conditions identified by ids."""
145         self.set_conditions(db_conn, ids, 'fulfills')
146
147     def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
148         """Set self.undoes to Conditions identified by ids."""
149         self.set_conditions(db_conn, ids, 'undoes')
150
151     def _add_step(self,
152                   db_conn: DatabaseConnection,
153                   id_: int | None,
154                   step_process_id: int,
155                   parent_step_id: int | None) -> ProcessStep:
156         """Create new ProcessStep, save and add it to self.explicit_steps.
157
158         Also checks against step recursion.
159
160         The new step's parent_step_id will fall back to None either if no
161         matching ProcessStep is found (which can be assumed in case it was
162         just deleted under its feet), or if the parent step would not be
163         owned by the current Process.
164         """
165         def walk_steps(node: ProcessStep) -> None:
166             if node.step_process_id == self.id_:
167                 raise BadFormatException('bad step selection causes recursion')
168             step_process = self.by_id(db_conn, node.step_process_id)
169             for step in step_process.explicit_steps:
170                 walk_steps(step)
171         if parent_step_id is not None:
172             try:
173                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
174                 if parent_step.owner_id != self.id_:
175                     parent_step_id = None
176             except NotFoundException:
177                 parent_step_id = None
178         assert isinstance(self.id_, int)
179         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
180         walk_steps(step)
181         self.explicit_steps += [step]
182         step.save(db_conn)  # NB: This ensures a non-None step.id_.
183         return step
184
185     def set_steps(self, db_conn: DatabaseConnection,
186                   steps: list[tuple[int | None, int, int | None]]) -> None:
187         """Set self.explicit_steps in bulk."""
188         for step in self.explicit_steps:
189             assert isinstance(step.id_, int)
190             del db_conn.cached_process_steps[step.id_]
191         self.explicit_steps = []
192         db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
193                      (self.id_,))
194         for step_tuple in steps:
195             self._add_step(db_conn, step_tuple[0],
196                            step_tuple[1], step_tuple[2])
197
198     def save(self, db_conn: DatabaseConnection) -> None:
199         """Add (or re-write) self and connected items to DB."""
200         self.save_core(db_conn)
201         self.title.save(db_conn)
202         self.description.save(db_conn)
203         self.effort.save(db_conn)
204         db_conn.exec('DELETE FROM process_conditions WHERE process = ?',
205                      (self.id_,))
206         for condition in self.conditions:
207             db_conn.exec('INSERT INTO process_conditions VALUES (?,?)',
208                          (self.id_, condition.id_))
209         db_conn.exec('DELETE FROM process_fulfills WHERE process = ?',
210                      (self.id_,))
211         for condition in self.fulfills:
212             db_conn.exec('INSERT INTO process_fulfills VALUES (?,?)',
213                          (self.id_, condition.id_))
214         db_conn.exec('DELETE FROM process_undoes WHERE process = ?',
215                      (self.id_,))
216         for condition in self.undoes:
217             db_conn.exec('INSERT INTO process_undoes VALUES (?,?)',
218                          (self.id_, condition.id_))
219         assert isinstance(self.id_, int)
220         db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
221                      (self.id_,))
222         for step in self.explicit_steps:
223             step.save(db_conn)
224         db_conn.cached_processes[self.id_] = self
225
226
227 class ProcessStep(BaseModel):
228     """Sub-unit of Processes."""
229     table_name = 'process_steps'
230     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
231
232     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
233                  parent_step_id: int | None) -> None:
234         self.set_int_id(id_)
235         self.owner_id = owner_id
236         self.step_process_id = step_process_id
237         self.parent_step_id = parent_step_id
238
239     @classmethod
240     def from_table_row(cls, db_conn: DatabaseConnection,
241                        row: Row) -> ProcessStep:
242         """Make ProcessStep from database row, store in DB cache."""
243         step = cls(row[0], row[1], row[2], row[3])
244         assert isinstance(step.id_, int)
245         db_conn.cached_process_steps[step.id_] = step
246         return step
247
248     @classmethod
249     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
250         """Retrieve ProcessStep by id_, or throw NotFoundException."""
251         if id_ in db_conn.cached_process_steps.keys():
252             step = db_conn.cached_process_steps[id_]
253             assert isinstance(step, ProcessStep)
254             return step
255         for row in db_conn.exec('SELECT * FROM process_steps '
256                                 'WHERE step_id = ?', (id_,)):
257             return cls.from_table_row(db_conn, row)
258         raise NotFoundException(f'found no ProcessStep of ID {id_}')
259
260     def save(self, db_conn: DatabaseConnection) -> None:
261         """Default to simply calling self.save_core for simple cases."""
262         self.save_core(db_conn)