home · contact · privacy
Unify ParamsParser and PostvarsParser to InputsParser.
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from typing import Any, Set
5 from plomtask.db import DatabaseConnection
6 from plomtask.misc import VersionedAttribute
7 from plomtask.conditions import Condition
8 from plomtask.exceptions import NotFoundException, BadFormatException
9
10
11 class Process:
12     """Template for, and metadata for, Todos, and their arrangements."""
13
14     # pylint: disable=too-many-instance-attributes
15
16     def __init__(self, id_: int | None) -> None:
17         if (id_ is not None) and id_ < 1:
18             raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
19         self.id_ = id_
20         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
21         self.description = VersionedAttribute(self, 'process_descriptions', '')
22         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
23         self.explicit_steps: list[ProcessStep] = []
24         self.conditions: list[Condition] = []
25         self.fulfills: list[Condition] = []
26         self.undoes: list[Condition] = []
27
28     @classmethod
29     def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Process:
30         """Make Process from database row, with empty VersionedAttributes."""
31         process = cls(row[0])
32         assert process.id_ is not None
33         db_conn.cached_processes[process.id_] = process
34         return process
35
36     @classmethod
37     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
38         """Collect all Processes and their connected VersionedAttributes."""
39         processes = {}
40         for id_, process in db_conn.cached_processes.items():
41             processes[id_] = process
42         already_recorded = processes.keys()
43         for row in db_conn.exec('SELECT id FROM processes'):
44             if row[0] not in already_recorded:
45                 process = cls.by_id(db_conn, row[0])
46                 processes[process.id_] = process
47         return list(processes.values())
48
49     @classmethod
50     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
51               create: bool = False) -> Process:
52         """Collect Process, its VersionedAttributes, and its child IDs."""
53         if id_ in db_conn.cached_processes.keys():
54             process = db_conn.cached_processes[id_]
55             assert isinstance(process, Process)
56             return process
57         process = None
58         for row in db_conn.exec('SELECT * FROM processes '
59                                 'WHERE id = ?', (id_,)):
60             process = cls(row[0])
61             break
62         if not process:
63             if not create:
64                 raise NotFoundException(f'Process not found of id: {id_}')
65             process = Process(id_)
66         for row in db_conn.exec('SELECT * FROM process_titles '
67                                 'WHERE parent_id = ?', (process.id_,)):
68             process.title.history[row[1]] = row[2]
69         for row in db_conn.exec('SELECT * FROM process_descriptions '
70                                 'WHERE parent_id = ?', (process.id_,)):
71             process.description.history[row[1]] = row[2]
72         for row in db_conn.exec('SELECT * FROM process_efforts '
73                                 'WHERE parent_id = ?', (process.id_,)):
74             process.effort.history[row[1]] = row[2]
75         for row in db_conn.exec('SELECT * FROM process_steps '
76                                 'WHERE owner_id = ?', (process.id_,)):
77             process.explicit_steps += [ProcessStep.from_table_row(db_conn,
78                                                                   row)]
79         for row in db_conn.exec('SELECT condition FROM process_conditions '
80                                 'WHERE process = ?', (process.id_,)):
81             process.conditions += [Condition.by_id(db_conn, row[0])]
82         for row in db_conn.exec('SELECT condition FROM process_fulfills '
83                                 'WHERE process = ?', (process.id_,)):
84             process.fulfills += [Condition.by_id(db_conn, row[0])]
85         for row in db_conn.exec('SELECT condition FROM process_undoes '
86                                 'WHERE process = ?', (process.id_,)):
87             process.undoes += [Condition.by_id(db_conn, row[0])]
88         assert isinstance(process, Process)
89         return process
90
91     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
92         """Return Processes using self for a ProcessStep."""
93         owner_ids = set()
94         for owner_id in db_conn.exec('SELECT owner_id FROM process_steps WHERE'
95                                      ' step_process_id = ?', (self.id_,)):
96             owner_ids.add(owner_id[0])
97         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
98
99     def get_steps(self, db_conn: DatabaseConnection, external_owner:
100                   Process | None = None) -> dict[int, dict[str, object]]:
101         """Return tree of depended-on explicit and implicit ProcessSteps."""
102
103         def make_node(step: ProcessStep) -> dict[str, object]:
104             is_explicit = False
105             if external_owner is not None:
106                 is_explicit = step.owner_id == external_owner.id_
107             process = self.__class__.by_id(db_conn, step.step_process_id)
108             step_steps = process.get_steps(db_conn, external_owner)
109             return {'process': process, 'parent_id': step.parent_step_id,
110                     'is_explicit': is_explicit, 'steps': step_steps}
111
112         def walk_steps(node_id: int, node: dict[str, Any]) -> None:
113             explicit_children = [s for s in self.explicit_steps
114                                  if s.parent_step_id == node_id]
115             for child in explicit_children:
116                 node['steps'][child.id_] = make_node(child)
117             node['seen'] = node_id in seen_step_ids
118             seen_step_ids.add(node_id)
119             for id_, step in node['steps'].items():
120                 walk_steps(id_, step)
121
122         steps: dict[int, dict[str, object]] = {}
123         seen_step_ids: Set[int] = set()
124         if external_owner is None:
125             external_owner = self
126         for step in [s for s in self.explicit_steps
127                      if s.parent_step_id is None]:
128             assert step.id_ is not None  # for mypy
129             steps[step.id_] = make_node(step)
130         for step_id, step_node in steps.items():
131             walk_steps(step_id, step_node)
132         return steps
133
134     def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
135                        trgt: str = 'conditions') -> None:
136         """Set self.[target] to Conditions identified by ids."""
137         trgt_list = getattr(self, trgt)
138         while len(trgt_list) > 0:
139             trgt_list.pop()
140         for id_ in ids:
141             trgt_list += [Condition.by_id(db_conn, id_)]
142
143     def set_fulfills(self, db_conn: DatabaseConnection,
144                      ids: list[int]) -> None:
145         """Set self.fulfills to Conditions identified by ids."""
146         self.set_conditions(db_conn, ids, 'fulfills')
147
148     def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
149         """Set self.undoes to Conditions identified by ids."""
150         self.set_conditions(db_conn, ids, 'undoes')
151
152     def add_step(self, db_conn: DatabaseConnection, id_: int | None,
153                  step_process_id: int,
154                  parent_step_id: int | None) -> ProcessStep:
155         """Create new ProcessStep, save and add it to self.explicit_steps.
156
157         Also checks against step recursion.
158         The new step's parent_step_id will fall back to None either if no
159         matching ProcessStep is found (which can be assumed in case it was
160         just deleted under its feet), or if the parent step would not be
161         owned by the current Process.
162         """
163         def walk_steps(node: ProcessStep) -> None:
164             if node.step_process_id == self.id_:
165                 raise BadFormatException('bad step selection causes recursion')
166             step_process = self.by_id(db_conn, node.step_process_id)
167             for step in step_process.explicit_steps:
168                 walk_steps(step)
169         if parent_step_id is not None:
170             try:
171                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
172                 if parent_step.owner_id != self.id_:
173                     parent_step_id = None
174             except NotFoundException:
175                 parent_step_id = None
176         assert self.id_ is not None
177         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
178         walk_steps(step)
179         self.explicit_steps += [step]
180         step.save(db_conn)  # NB: This ensures a non-None step.id_.
181         return step
182
183     def save_without_steps(self, db_conn: DatabaseConnection) -> None:
184         """Add (or re-write) self and connected VersionedAttributes to DB."""
185         cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
186         self.id_ = cursor.lastrowid
187         self.title.save(db_conn)
188         self.description.save(db_conn)
189         self.effort.save(db_conn)
190         db_conn.exec('DELETE FROM process_conditions WHERE process = ?',
191                      (self.id_,))
192         for condition in self.conditions:
193             db_conn.exec('INSERT INTO process_conditions VALUES (?,?)',
194                          (self.id_, condition.id_))
195         db_conn.exec('DELETE FROM process_fulfills WHERE process = ?',
196                      (self.id_,))
197         for condition in self.fulfills:
198             db_conn.exec('INSERT INTO process_fulfills VALUES (?,?)',
199                          (self.id_, condition.id_))
200         db_conn.exec('DELETE FROM process_undoes WHERE process = ?',
201                      (self.id_,))
202         for condition in self.undoes:
203             db_conn.exec('INSERT INTO process_undoes VALUES (?,?)',
204                          (self.id_, condition.id_))
205         assert self.id_ is not None
206         db_conn.cached_processes[self.id_] = self
207
208     def fix_steps(self, db_conn: DatabaseConnection) -> None:
209         """Rewrite ProcessSteps from self.explicit_steps.
210
211         This also fixes illegal Step.parent_step_id values, i.e. those pointing
212         to steps now absent, or owned by a different Process, fall back into
213         .parent_step_id=None
214         """
215         db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
216                      (self.id_,))
217         for step in self.explicit_steps:
218             if step.parent_step_id is not None:
219                 try:
220                     parent_step = ProcessStep.by_id(db_conn,
221                                                     step.parent_step_id)
222                     if parent_step.owner_id != self.id_:
223                         step.parent_step_id = None
224                 except NotFoundException:
225                     step.parent_step_id = None
226             step.save(db_conn)
227
228
229 class ProcessStep:
230     """Sub-unit of Processes."""
231
232     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
233                  parent_step_id: int | None) -> None:
234         self.id_ = id_
235         self.owner_id = owner_id
236         self.step_process_id = step_process_id
237         self.parent_step_id = parent_step_id
238
239     @classmethod
240     def from_table_row(cls, db_conn: DatabaseConnection,
241                        row: Row) -> ProcessStep:
242         """Make ProcessStep from database row, store in DB cache."""
243         step = cls(row[0], row[1], row[2], row[3])
244         assert step.id_ is not None
245         db_conn.cached_process_steps[step.id_] = step
246         return step
247
248     @classmethod
249     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
250         """Retrieve ProcessStep by id_, or throw NotFoundException."""
251         if id_ in db_conn.cached_process_steps.keys():
252             step = db_conn.cached_process_steps[id_]
253             assert isinstance(step, ProcessStep)
254             return step
255         for row in db_conn.exec('SELECT * FROM process_steps '
256                                 'WHERE step_id = ?', (id_,)):
257             return cls.from_table_row(db_conn, row)
258         raise NotFoundException(f'found no ProcessStep of ID {id_}')
259
260     def save(self, db_conn: DatabaseConnection) -> None:
261         """Save to database and cache."""
262         cursor = db_conn.exec('REPLACE INTO process_steps VALUES (?, ?, ?, ?)',
263                               (self.id_, self.owner_id, self.step_process_id,
264                                self.parent_step_id))
265         self.id_ = cursor.lastrowid
266         assert self.id_ is not None
267         db_conn.cached_process_steps[self.id_] = self