home · contact · privacy
Refactor updates of relations tables.
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from typing import Any, Set
4 from plomtask.db import DatabaseConnection, BaseModel
5 from plomtask.misc import VersionedAttribute
6 from plomtask.conditions import Condition
7 from plomtask.exceptions import NotFoundException, BadFormatException
8
9
10 class Process(BaseModel):
11     """Template for, and metadata for, Todos, and their arrangements."""
12     table_name = 'processes'
13
14     # pylint: disable=too-many-instance-attributes
15
16     def __init__(self, id_: int | None) -> None:
17         self.set_int_id(id_)
18         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
19         self.description = VersionedAttribute(self, 'process_descriptions', '')
20         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
21         self.explicit_steps: list[ProcessStep] = []
22         self.conditions: list[Condition] = []
23         self.fulfills: list[Condition] = []
24         self.undoes: list[Condition] = []
25
26     @classmethod
27     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
28         """Collect all Processes and their connected VersionedAttributes."""
29         processes = {}
30         for id_, process in db_conn.cached_processes.items():
31             processes[id_] = process
32         already_recorded = processes.keys()
33         for row in db_conn.exec('SELECT id FROM processes'):
34             if row[0] not in already_recorded:
35                 process = cls.by_id(db_conn, row[0])
36                 processes[process.id_] = process
37         return list(processes.values())
38
39     @classmethod
40     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
41               create: bool = False) -> Process:
42         """Collect Process, its VersionedAttributes, and its child IDs."""
43         process = None
44         if id_:
45             process, _ = super()._by_id(db_conn, id_)
46         if not process:
47             if not create:
48                 raise NotFoundException(f'Process not found of id: {id_}')
49             process = Process(id_)
50         for row in db_conn.exec('SELECT * FROM process_titles '
51                                 'WHERE parent = ?', (process.id_,)):
52             process.title.history[row[1]] = row[2]
53         for row in db_conn.exec('SELECT * FROM process_descriptions '
54                                 'WHERE parent = ?', (process.id_,)):
55             process.description.history[row[1]] = row[2]
56         for row in db_conn.exec('SELECT * FROM process_efforts '
57                                 'WHERE parent = ?', (process.id_,)):
58             process.effort.history[row[1]] = row[2]
59         for row in db_conn.exec('SELECT * FROM process_steps '
60                                 'WHERE owner = ?', (process.id_,)):
61             process.explicit_steps += [ProcessStep.from_table_row(db_conn,
62                                                                   row)]
63         for row in db_conn.exec('SELECT condition FROM process_conditions '
64                                 'WHERE process = ?', (process.id_,)):
65             process.conditions += [Condition.by_id(db_conn, row[0])]
66         for row in db_conn.exec('SELECT condition FROM process_fulfills '
67                                 'WHERE process = ?', (process.id_,)):
68             process.fulfills += [Condition.by_id(db_conn, row[0])]
69         for row in db_conn.exec('SELECT condition FROM process_undoes '
70                                 'WHERE process = ?', (process.id_,)):
71             process.undoes += [Condition.by_id(db_conn, row[0])]
72         assert isinstance(process, Process)
73         return process
74
75     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
76         """Return Processes using self for a ProcessStep."""
77         owner_ids = set()
78         for owner_id in db_conn.exec('SELECT owner FROM process_steps WHERE'
79                                      ' step_process = ?', (self.id_,)):
80             owner_ids.add(owner_id[0])
81         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
82
83     def get_steps(self, db_conn: DatabaseConnection, external_owner:
84                   Process | None = None) -> dict[int, dict[str, object]]:
85         """Return tree of depended-on explicit and implicit ProcessSteps."""
86
87         def make_node(step: ProcessStep) -> dict[str, object]:
88             is_explicit = False
89             if external_owner is not None:
90                 is_explicit = step.owner_id == external_owner.id_
91             process = self.__class__.by_id(db_conn, step.step_process_id)
92             step_steps = process.get_steps(db_conn, external_owner)
93             return {'process': process, 'parent_id': step.parent_step_id,
94                     'is_explicit': is_explicit, 'steps': step_steps}
95
96         def walk_steps(node_id: int, node: dict[str, Any]) -> None:
97             explicit_children = [s for s in self.explicit_steps
98                                  if s.parent_step_id == node_id]
99             for child in explicit_children:
100                 node['steps'][child.id_] = make_node(child)
101             node['seen'] = node_id in seen_step_ids
102             seen_step_ids.add(node_id)
103             for id_, step in node['steps'].items():
104                 walk_steps(id_, step)
105
106         steps: dict[int, dict[str, object]] = {}
107         seen_step_ids: Set[int] = set()
108         if external_owner is None:
109             external_owner = self
110         for step in [s for s in self.explicit_steps
111                      if s.parent_step_id is None]:
112             assert isinstance(step.id_, int)
113             steps[step.id_] = make_node(step)
114         for step_id, step_node in steps.items():
115             walk_steps(step_id, step_node)
116         return steps
117
118     def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
119                        trgt: str = 'conditions') -> None:
120         """Set self.[target] to Conditions identified by ids."""
121         trgt_list = getattr(self, trgt)
122         while len(trgt_list) > 0:
123             trgt_list.pop()
124         for id_ in ids:
125             trgt_list += [Condition.by_id(db_conn, id_)]
126
127     def set_fulfills(self, db_conn: DatabaseConnection,
128                      ids: list[int]) -> None:
129         """Set self.fulfills to Conditions identified by ids."""
130         self.set_conditions(db_conn, ids, 'fulfills')
131
132     def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
133         """Set self.undoes to Conditions identified by ids."""
134         self.set_conditions(db_conn, ids, 'undoes')
135
136     def _add_step(self,
137                   db_conn: DatabaseConnection,
138                   id_: int | None,
139                   step_process_id: int,
140                   parent_step_id: int | None) -> ProcessStep:
141         """Create new ProcessStep, save and add it to self.explicit_steps.
142
143         Also checks against step recursion.
144
145         The new step's parent_step_id will fall back to None either if no
146         matching ProcessStep is found (which can be assumed in case it was
147         just deleted under its feet), or if the parent step would not be
148         owned by the current Process.
149         """
150         def walk_steps(node: ProcessStep) -> None:
151             if node.step_process_id == self.id_:
152                 raise BadFormatException('bad step selection causes recursion')
153             step_process = self.by_id(db_conn, node.step_process_id)
154             for step in step_process.explicit_steps:
155                 walk_steps(step)
156         if parent_step_id is not None:
157             try:
158                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
159                 if parent_step.owner_id != self.id_:
160                     parent_step_id = None
161             except NotFoundException:
162                 parent_step_id = None
163         assert isinstance(self.id_, int)
164         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
165         walk_steps(step)
166         self.explicit_steps += [step]
167         step.save(db_conn)  # NB: This ensures a non-None step.id_.
168         return step
169
170     def set_steps(self, db_conn: DatabaseConnection,
171                   steps: list[tuple[int | None, int, int | None]]) -> None:
172         """Set self.explicit_steps in bulk."""
173         for step in self.explicit_steps:
174             assert isinstance(step.id_, int)
175             del db_conn.cached_process_steps[step.id_]
176         self.explicit_steps = []
177         db_conn.exec('DELETE FROM process_steps WHERE owner = ?',
178                      (self.id_,))
179         for step_tuple in steps:
180             self._add_step(db_conn, step_tuple[0],
181                            step_tuple[1], step_tuple[2])
182
183     def save(self, db_conn: DatabaseConnection) -> None:
184         """Add (or re-write) self and connected items to DB."""
185         self.save_core(db_conn)
186         assert isinstance(self.id_, int)
187         self.title.save(db_conn)
188         self.description.save(db_conn)
189         self.effort.save(db_conn)
190         db_conn.rewrite_relations('process_conditions', 'process', self.id_,
191                                   [[c.id_] for c in self.conditions])
192         db_conn.rewrite_relations('process_fulfills', 'process', self.id_,
193                                   [[c.id_] for c in self.fulfills])
194         db_conn.rewrite_relations('process_undoes', 'process', self.id_,
195                                   [[c.id_] for c in self.undoes])
196         db_conn.exec('DELETE FROM process_steps WHERE owner = ?',
197                      (self.id_,))
198         for step in self.explicit_steps:
199             step.save(db_conn)
200         db_conn.cached_processes[self.id_] = self
201
202
203 class ProcessStep(BaseModel):
204     """Sub-unit of Processes."""
205     table_name = 'process_steps'
206     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
207
208     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
209                  parent_step_id: int | None) -> None:
210         self.set_int_id(id_)
211         self.owner_id = owner_id
212         self.step_process_id = step_process_id
213         self.parent_step_id = parent_step_id
214
215     @classmethod
216     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
217         """Retrieve ProcessStep by id_, or throw NotFoundException."""
218         step, _ = super()._by_id(db_conn, id_)
219         if step:
220             assert isinstance(step, ProcessStep)
221             return step
222         raise NotFoundException(f'found no ProcessStep of ID {id_}')
223
224     def save(self, db_conn: DatabaseConnection) -> None:
225         """Default to simply calling self.save_core for simple cases."""
226         self.save_core(db_conn)