home · contact · privacy
Improve consistency of DB column names.
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from typing import Any, Set
4 from plomtask.db import DatabaseConnection, BaseModel
5 from plomtask.misc import VersionedAttribute
6 from plomtask.conditions import Condition
7 from plomtask.exceptions import NotFoundException, BadFormatException
8
9
10 class Process(BaseModel):
11     """Template for, and metadata for, Todos, and their arrangements."""
12     table_name = 'processes'
13
14     # pylint: disable=too-many-instance-attributes
15
16     def __init__(self, id_: int | None) -> None:
17         self.set_int_id(id_)
18         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
19         self.description = VersionedAttribute(self, 'process_descriptions', '')
20         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
21         self.explicit_steps: list[ProcessStep] = []
22         self.conditions: list[Condition] = []
23         self.fulfills: list[Condition] = []
24         self.undoes: list[Condition] = []
25
26     @classmethod
27     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
28         """Collect all Processes and their connected VersionedAttributes."""
29         processes = {}
30         for id_, process in db_conn.cached_processes.items():
31             processes[id_] = process
32         already_recorded = processes.keys()
33         for row in db_conn.exec('SELECT id FROM processes'):
34             if row[0] not in already_recorded:
35                 process = cls.by_id(db_conn, row[0])
36                 processes[process.id_] = process
37         return list(processes.values())
38
39     @classmethod
40     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
41               create: bool = False) -> Process:
42         """Collect Process, its VersionedAttributes, and its child IDs."""
43         process = None
44         if id_:
45             process, _ = super()._by_id(db_conn, id_)
46         if not process:
47             if not create:
48                 raise NotFoundException(f'Process not found of id: {id_}')
49             process = Process(id_)
50         for row in db_conn.exec('SELECT * FROM process_titles '
51                                 'WHERE parent = ?', (process.id_,)):
52             process.title.history[row[1]] = row[2]
53         for row in db_conn.exec('SELECT * FROM process_descriptions '
54                                 'WHERE parent = ?', (process.id_,)):
55             process.description.history[row[1]] = row[2]
56         for row in db_conn.exec('SELECT * FROM process_efforts '
57                                 'WHERE parent = ?', (process.id_,)):
58             process.effort.history[row[1]] = row[2]
59         for row in db_conn.exec('SELECT * FROM process_steps '
60                                 'WHERE owner = ?', (process.id_,)):
61             process.explicit_steps += [ProcessStep.from_table_row(db_conn,
62                                                                   row)]
63         for row in db_conn.exec('SELECT condition FROM process_conditions '
64                                 'WHERE process = ?', (process.id_,)):
65             process.conditions += [Condition.by_id(db_conn, row[0])]
66         for row in db_conn.exec('SELECT condition FROM process_fulfills '
67                                 'WHERE process = ?', (process.id_,)):
68             process.fulfills += [Condition.by_id(db_conn, row[0])]
69         for row in db_conn.exec('SELECT condition FROM process_undoes '
70                                 'WHERE process = ?', (process.id_,)):
71             process.undoes += [Condition.by_id(db_conn, row[0])]
72         assert isinstance(process, Process)
73         return process
74
75     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
76         """Return Processes using self for a ProcessStep."""
77         owner_ids = set()
78         for owner_id in db_conn.exec('SELECT owner FROM process_steps WHERE'
79                                      ' step_process = ?', (self.id_,)):
80             owner_ids.add(owner_id[0])
81         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
82
83     def get_steps(self, db_conn: DatabaseConnection, external_owner:
84                   Process | None = None) -> dict[int, dict[str, object]]:
85         """Return tree of depended-on explicit and implicit ProcessSteps."""
86
87         def make_node(step: ProcessStep) -> dict[str, object]:
88             is_explicit = False
89             if external_owner is not None:
90                 is_explicit = step.owner_id == external_owner.id_
91             process = self.__class__.by_id(db_conn, step.step_process_id)
92             step_steps = process.get_steps(db_conn, external_owner)
93             return {'process': process, 'parent_id': step.parent_step_id,
94                     'is_explicit': is_explicit, 'steps': step_steps}
95
96         def walk_steps(node_id: int, node: dict[str, Any]) -> None:
97             explicit_children = [s for s in self.explicit_steps
98                                  if s.parent_step_id == node_id]
99             for child in explicit_children:
100                 node['steps'][child.id_] = make_node(child)
101             node['seen'] = node_id in seen_step_ids
102             seen_step_ids.add(node_id)
103             for id_, step in node['steps'].items():
104                 walk_steps(id_, step)
105
106         steps: dict[int, dict[str, object]] = {}
107         seen_step_ids: Set[int] = set()
108         if external_owner is None:
109             external_owner = self
110         for step in [s for s in self.explicit_steps
111                      if s.parent_step_id is None]:
112             assert isinstance(step.id_, int)
113             steps[step.id_] = make_node(step)
114         for step_id, step_node in steps.items():
115             walk_steps(step_id, step_node)
116         return steps
117
118     def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
119                        trgt: str = 'conditions') -> None:
120         """Set self.[target] to Conditions identified by ids."""
121         trgt_list = getattr(self, trgt)
122         while len(trgt_list) > 0:
123             trgt_list.pop()
124         for id_ in ids:
125             trgt_list += [Condition.by_id(db_conn, id_)]
126
127     def set_fulfills(self, db_conn: DatabaseConnection,
128                      ids: list[int]) -> None:
129         """Set self.fulfills to Conditions identified by ids."""
130         self.set_conditions(db_conn, ids, 'fulfills')
131
132     def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
133         """Set self.undoes to Conditions identified by ids."""
134         self.set_conditions(db_conn, ids, 'undoes')
135
136     def _add_step(self,
137                   db_conn: DatabaseConnection,
138                   id_: int | None,
139                   step_process_id: int,
140                   parent_step_id: int | None) -> ProcessStep:
141         """Create new ProcessStep, save and add it to self.explicit_steps.
142
143         Also checks against step recursion.
144
145         The new step's parent_step_id will fall back to None either if no
146         matching ProcessStep is found (which can be assumed in case it was
147         just deleted under its feet), or if the parent step would not be
148         owned by the current Process.
149         """
150         def walk_steps(node: ProcessStep) -> None:
151             if node.step_process_id == self.id_:
152                 raise BadFormatException('bad step selection causes recursion')
153             step_process = self.by_id(db_conn, node.step_process_id)
154             for step in step_process.explicit_steps:
155                 walk_steps(step)
156         if parent_step_id is not None:
157             try:
158                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
159                 if parent_step.owner_id != self.id_:
160                     parent_step_id = None
161             except NotFoundException:
162                 parent_step_id = None
163         assert isinstance(self.id_, int)
164         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
165         walk_steps(step)
166         self.explicit_steps += [step]
167         step.save(db_conn)  # NB: This ensures a non-None step.id_.
168         return step
169
170     def set_steps(self, db_conn: DatabaseConnection,
171                   steps: list[tuple[int | None, int, int | None]]) -> None:
172         """Set self.explicit_steps in bulk."""
173         for step in self.explicit_steps:
174             assert isinstance(step.id_, int)
175             del db_conn.cached_process_steps[step.id_]
176         self.explicit_steps = []
177         db_conn.exec('DELETE FROM process_steps WHERE owner = ?',
178                      (self.id_,))
179         for step_tuple in steps:
180             self._add_step(db_conn, step_tuple[0],
181                            step_tuple[1], step_tuple[2])
182
183     def save(self, db_conn: DatabaseConnection) -> None:
184         """Add (or re-write) self and connected items to DB."""
185         self.save_core(db_conn)
186         self.title.save(db_conn)
187         self.description.save(db_conn)
188         self.effort.save(db_conn)
189         db_conn.exec('DELETE FROM process_conditions WHERE process = ?',
190                      (self.id_,))
191         for condition in self.conditions:
192             db_conn.exec('INSERT INTO process_conditions VALUES (?,?)',
193                          (self.id_, condition.id_))
194         db_conn.exec('DELETE FROM process_fulfills WHERE process = ?',
195                      (self.id_,))
196         for condition in self.fulfills:
197             db_conn.exec('INSERT INTO process_fulfills VALUES (?,?)',
198                          (self.id_, condition.id_))
199         db_conn.exec('DELETE FROM process_undoes WHERE process = ?',
200                      (self.id_,))
201         for condition in self.undoes:
202             db_conn.exec('INSERT INTO process_undoes VALUES (?,?)',
203                          (self.id_, condition.id_))
204         assert isinstance(self.id_, int)
205         db_conn.exec('DELETE FROM process_steps WHERE owner = ?',
206                      (self.id_,))
207         for step in self.explicit_steps:
208             step.save(db_conn)
209         db_conn.cached_processes[self.id_] = self
210
211
212 class ProcessStep(BaseModel):
213     """Sub-unit of Processes."""
214     table_name = 'process_steps'
215     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
216
217     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
218                  parent_step_id: int | None) -> None:
219         self.set_int_id(id_)
220         self.owner_id = owner_id
221         self.step_process_id = step_process_id
222         self.parent_step_id = parent_step_id
223
224     @classmethod
225     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
226         """Retrieve ProcessStep by id_, or throw NotFoundException."""
227         step, _ = super()._by_id(db_conn, id_)
228         if step:
229             assert isinstance(step, ProcessStep)
230             return step
231         raise NotFoundException(f'found no ProcessStep of ID {id_}')
232
233     def save(self, db_conn: DatabaseConnection) -> None:
234         """Default to simply calling self.save_core for simple cases."""
235         self.save_core(db_conn)