home · contact · privacy
2f8c2d537a168062cee953985242b732f1973a72
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from typing import Any, Set
4 from plomtask.db import DatabaseConnection, BaseModel
5 from plomtask.misc import VersionedAttribute
6 from plomtask.conditions import Condition
7 from plomtask.exceptions import NotFoundException, BadFormatException
8
9
10 class Process(BaseModel):
11     """Template for, and metadata for, Todos, and their arrangements."""
12     table_name = 'processes'
13
14     # pylint: disable=too-many-instance-attributes
15
16     def __init__(self, id_: int | None) -> None:
17         self.set_int_id(id_)
18         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
19         self.description = VersionedAttribute(self, 'process_descriptions', '')
20         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
21         self.explicit_steps: list[ProcessStep] = []
22         self.conditions: list[Condition] = []
23         self.fulfills: list[Condition] = []
24         self.undoes: list[Condition] = []
25
26     @classmethod
27     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
28         """Collect all Processes and their connected VersionedAttributes."""
29         processes = {}
30         for id_, process in db_conn.cached_processes.items():
31             processes[id_] = process
32         already_recorded = processes.keys()
33         for row in db_conn.exec('SELECT id FROM processes'):
34             if row[0] not in already_recorded:
35                 process = cls.by_id(db_conn, row[0])
36                 processes[process.id_] = process
37         return list(processes.values())
38
39     @classmethod
40     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
41               create: bool = False) -> Process:
42         """Collect Process, its VersionedAttributes, and its child IDs."""
43         if id_ in db_conn.cached_processes.keys():
44             process = db_conn.cached_processes[id_]
45             assert isinstance(process, Process)
46             return process
47         process = None
48         for row in db_conn.exec('SELECT * FROM processes '
49                                 'WHERE id = ?', (id_,)):
50             process = cls(row[0])
51             break
52         if not process:
53             if not create:
54                 raise NotFoundException(f'Process not found of id: {id_}')
55             process = Process(id_)
56         for row in db_conn.exec('SELECT * FROM process_titles '
57                                 'WHERE parent_id = ?', (process.id_,)):
58             process.title.history[row[1]] = row[2]
59         for row in db_conn.exec('SELECT * FROM process_descriptions '
60                                 'WHERE parent_id = ?', (process.id_,)):
61             process.description.history[row[1]] = row[2]
62         for row in db_conn.exec('SELECT * FROM process_efforts '
63                                 'WHERE parent_id = ?', (process.id_,)):
64             process.effort.history[row[1]] = row[2]
65         for row in db_conn.exec('SELECT * FROM process_steps '
66                                 'WHERE owner_id = ?', (process.id_,)):
67             process.explicit_steps += [ProcessStep.from_table_row(db_conn,
68                                                                   row)]
69         for row in db_conn.exec('SELECT condition FROM process_conditions '
70                                 'WHERE process = ?', (process.id_,)):
71             process.conditions += [Condition.by_id(db_conn, row[0])]
72         for row in db_conn.exec('SELECT condition FROM process_fulfills '
73                                 'WHERE process = ?', (process.id_,)):
74             process.fulfills += [Condition.by_id(db_conn, row[0])]
75         for row in db_conn.exec('SELECT condition FROM process_undoes '
76                                 'WHERE process = ?', (process.id_,)):
77             process.undoes += [Condition.by_id(db_conn, row[0])]
78         assert isinstance(process, Process)
79         return process
80
81     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
82         """Return Processes using self for a ProcessStep."""
83         owner_ids = set()
84         for owner_id in db_conn.exec('SELECT owner_id FROM process_steps WHERE'
85                                      ' step_process_id = ?', (self.id_,)):
86             owner_ids.add(owner_id[0])
87         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
88
89     def get_steps(self, db_conn: DatabaseConnection, external_owner:
90                   Process | None = None) -> dict[int, dict[str, object]]:
91         """Return tree of depended-on explicit and implicit ProcessSteps."""
92
93         def make_node(step: ProcessStep) -> dict[str, object]:
94             is_explicit = False
95             if external_owner is not None:
96                 is_explicit = step.owner_id == external_owner.id_
97             process = self.__class__.by_id(db_conn, step.step_process_id)
98             step_steps = process.get_steps(db_conn, external_owner)
99             return {'process': process, 'parent_id': step.parent_step_id,
100                     'is_explicit': is_explicit, 'steps': step_steps}
101
102         def walk_steps(node_id: int, node: dict[str, Any]) -> None:
103             explicit_children = [s for s in self.explicit_steps
104                                  if s.parent_step_id == node_id]
105             for child in explicit_children:
106                 node['steps'][child.id_] = make_node(child)
107             node['seen'] = node_id in seen_step_ids
108             seen_step_ids.add(node_id)
109             for id_, step in node['steps'].items():
110                 walk_steps(id_, step)
111
112         steps: dict[int, dict[str, object]] = {}
113         seen_step_ids: Set[int] = set()
114         if external_owner is None:
115             external_owner = self
116         for step in [s for s in self.explicit_steps
117                      if s.parent_step_id is None]:
118             assert isinstance(step.id_, int)
119             steps[step.id_] = make_node(step)
120         for step_id, step_node in steps.items():
121             walk_steps(step_id, step_node)
122         return steps
123
124     def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
125                        trgt: str = 'conditions') -> None:
126         """Set self.[target] to Conditions identified by ids."""
127         trgt_list = getattr(self, trgt)
128         while len(trgt_list) > 0:
129             trgt_list.pop()
130         for id_ in ids:
131             trgt_list += [Condition.by_id(db_conn, id_)]
132
133     def set_fulfills(self, db_conn: DatabaseConnection,
134                      ids: list[int]) -> None:
135         """Set self.fulfills to Conditions identified by ids."""
136         self.set_conditions(db_conn, ids, 'fulfills')
137
138     def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
139         """Set self.undoes to Conditions identified by ids."""
140         self.set_conditions(db_conn, ids, 'undoes')
141
142     def _add_step(self,
143                   db_conn: DatabaseConnection,
144                   id_: int | None,
145                   step_process_id: int,
146                   parent_step_id: int | None) -> ProcessStep:
147         """Create new ProcessStep, save and add it to self.explicit_steps.
148
149         Also checks against step recursion.
150
151         The new step's parent_step_id will fall back to None either if no
152         matching ProcessStep is found (which can be assumed in case it was
153         just deleted under its feet), or if the parent step would not be
154         owned by the current Process.
155         """
156         def walk_steps(node: ProcessStep) -> None:
157             if node.step_process_id == self.id_:
158                 raise BadFormatException('bad step selection causes recursion')
159             step_process = self.by_id(db_conn, node.step_process_id)
160             for step in step_process.explicit_steps:
161                 walk_steps(step)
162         if parent_step_id is not None:
163             try:
164                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
165                 if parent_step.owner_id != self.id_:
166                     parent_step_id = None
167             except NotFoundException:
168                 parent_step_id = None
169         assert isinstance(self.id_, int)
170         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
171         walk_steps(step)
172         self.explicit_steps += [step]
173         step.save(db_conn)  # NB: This ensures a non-None step.id_.
174         return step
175
176     def set_steps(self, db_conn: DatabaseConnection,
177                   steps: list[tuple[int | None, int, int | None]]) -> None:
178         """Set self.explicit_steps in bulk."""
179         for step in self.explicit_steps:
180             assert isinstance(step.id_, int)
181             del db_conn.cached_process_steps[step.id_]
182         self.explicit_steps = []
183         db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
184                      (self.id_,))
185         for step_tuple in steps:
186             self._add_step(db_conn, step_tuple[0],
187                            step_tuple[1], step_tuple[2])
188
189     def save(self, db_conn: DatabaseConnection) -> None:
190         """Add (or re-write) self and connected items to DB."""
191         self.save_core(db_conn)
192         self.title.save(db_conn)
193         self.description.save(db_conn)
194         self.effort.save(db_conn)
195         db_conn.exec('DELETE FROM process_conditions WHERE process = ?',
196                      (self.id_,))
197         for condition in self.conditions:
198             db_conn.exec('INSERT INTO process_conditions VALUES (?,?)',
199                          (self.id_, condition.id_))
200         db_conn.exec('DELETE FROM process_fulfills WHERE process = ?',
201                      (self.id_,))
202         for condition in self.fulfills:
203             db_conn.exec('INSERT INTO process_fulfills VALUES (?,?)',
204                          (self.id_, condition.id_))
205         db_conn.exec('DELETE FROM process_undoes WHERE process = ?',
206                      (self.id_,))
207         for condition in self.undoes:
208             db_conn.exec('INSERT INTO process_undoes VALUES (?,?)',
209                          (self.id_, condition.id_))
210         assert isinstance(self.id_, int)
211         db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
212                      (self.id_,))
213         for step in self.explicit_steps:
214             step.save(db_conn)
215         db_conn.cached_processes[self.id_] = self
216
217
218 class ProcessStep(BaseModel):
219     """Sub-unit of Processes."""
220     table_name = 'process_steps'
221     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
222
223     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
224                  parent_step_id: int | None) -> None:
225         self.set_int_id(id_)
226         self.owner_id = owner_id
227         self.step_process_id = step_process_id
228         self.parent_step_id = parent_step_id
229
230     @classmethod
231     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
232         """Retrieve ProcessStep by id_, or throw NotFoundException."""
233         if id_ in db_conn.cached_process_steps.keys():
234             step = db_conn.cached_process_steps[id_]
235             assert isinstance(step, ProcessStep)
236             return step
237         for row in db_conn.exec('SELECT * FROM process_steps '
238                                 'WHERE step_id = ?', (id_,)):
239             step = cls.from_table_row(db_conn, row)
240             assert isinstance(step, ProcessStep)
241         raise NotFoundException(f'found no ProcessStep of ID {id_}')
242
243     def save(self, db_conn: DatabaseConnection) -> None:
244         """Default to simply calling self.save_core for simple cases."""
245         self.save_core(db_conn)