home · contact · privacy
Hide (almost all) remaining SQL code in DB module.
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from typing import Any, Set
4 from plomtask.db import DatabaseConnection, BaseModel
5 from plomtask.misc import VersionedAttribute
6 from plomtask.conditions import Condition
7 from plomtask.exceptions import NotFoundException, BadFormatException
8
9
10 class Process(BaseModel):
11     """Template for, and metadata for, Todos, and their arrangements."""
12     table_name = 'processes'
13
14     # pylint: disable=too-many-instance-attributes
15
16     def __init__(self, id_: int | None) -> None:
17         self.set_int_id(id_)
18         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
19         self.description = VersionedAttribute(self, 'process_descriptions', '')
20         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
21         self.explicit_steps: list[ProcessStep] = []
22         self.conditions: list[Condition] = []
23         self.fulfills: list[Condition] = []
24         self.undoes: list[Condition] = []
25
26     @classmethod
27     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
28         """Collect all Processes and their connected VersionedAttributes."""
29         processes = {}
30         for id_, process in db_conn.cached_processes.items():
31             processes[id_] = process
32         already_recorded = processes.keys()
33         for id_ in db_conn.column_all('processes', 'id'):
34             if id_ not in already_recorded:
35                 process = cls.by_id(db_conn, id_)
36                 processes[process.id_] = process
37         return list(processes.values())
38
39     @classmethod
40     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
41               create: bool = False) -> Process:
42         """Collect Process, its VersionedAttributes, and its child IDs."""
43         process = None
44         if id_:
45             process, _ = super()._by_id(db_conn, id_)
46         if not process:
47             if not create:
48                 raise NotFoundException(f'Process not found of id: {id_}')
49             process = Process(id_)
50         if isinstance(process.id_, int):
51             for name in ('title', 'description', 'effort'):
52                 table = f'process_{name}s'
53                 for row in db_conn.row_where(table, 'parent', process.id_):
54                     getattr(process, name).history_from_row(row)
55             for row in db_conn.row_where('process_steps', 'owner',
56                                          process.id_):
57                 step = ProcessStep.from_table_row(db_conn, row)
58                 process.explicit_steps += [step]
59             for name in ('conditions', 'fulfills', 'undoes'):
60                 table = f'process_{name}'
61                 for cond_id in db_conn.column_where(table, 'condition',
62                                                     'process', process.id_):
63                     target = getattr(process, name)
64                     target += [Condition.by_id(db_conn, cond_id)]
65         assert isinstance(process, Process)
66         return process
67
68     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
69         """Return Processes using self for a ProcessStep."""
70         if not self.id_:
71             return []
72         owner_ids = set()
73         for id_ in db_conn.column_where('process_steps', 'owner',
74                                         'step_process', self.id_):
75             owner_ids.add(id_)
76         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
77
78     def get_steps(self, db_conn: DatabaseConnection, external_owner:
79                   Process | None = None) -> dict[int, dict[str, object]]:
80         """Return tree of depended-on explicit and implicit ProcessSteps."""
81
82         def make_node(step: ProcessStep) -> dict[str, object]:
83             is_explicit = False
84             if external_owner is not None:
85                 is_explicit = step.owner_id == external_owner.id_
86             process = self.__class__.by_id(db_conn, step.step_process_id)
87             step_steps = process.get_steps(db_conn, external_owner)
88             return {'process': process, 'parent_id': step.parent_step_id,
89                     'is_explicit': is_explicit, 'steps': step_steps}
90
91         def walk_steps(node_id: int, node: dict[str, Any]) -> None:
92             explicit_children = [s for s in self.explicit_steps
93                                  if s.parent_step_id == node_id]
94             for child in explicit_children:
95                 node['steps'][child.id_] = make_node(child)
96             node['seen'] = node_id in seen_step_ids
97             seen_step_ids.add(node_id)
98             for id_, step in node['steps'].items():
99                 walk_steps(id_, step)
100
101         steps: dict[int, dict[str, object]] = {}
102         seen_step_ids: Set[int] = set()
103         if external_owner is None:
104             external_owner = self
105         for step in [s for s in self.explicit_steps
106                      if s.parent_step_id is None]:
107             assert isinstance(step.id_, int)
108             steps[step.id_] = make_node(step)
109         for step_id, step_node in steps.items():
110             walk_steps(step_id, step_node)
111         return steps
112
113     def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
114                        trgt: str = 'conditions') -> None:
115         """Set self.[target] to Conditions identified by ids."""
116         trgt_list = getattr(self, trgt)
117         while len(trgt_list) > 0:
118             trgt_list.pop()
119         for id_ in ids:
120             trgt_list += [Condition.by_id(db_conn, id_)]
121
122     def set_fulfills(self, db_conn: DatabaseConnection,
123                      ids: list[int]) -> None:
124         """Set self.fulfills to Conditions identified by ids."""
125         self.set_conditions(db_conn, ids, 'fulfills')
126
127     def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
128         """Set self.undoes to Conditions identified by ids."""
129         self.set_conditions(db_conn, ids, 'undoes')
130
131     def _add_step(self,
132                   db_conn: DatabaseConnection,
133                   id_: int | None,
134                   step_process_id: int,
135                   parent_step_id: int | None) -> ProcessStep:
136         """Create new ProcessStep, save and add it to self.explicit_steps.
137
138         Also checks against step recursion.
139
140         The new step's parent_step_id will fall back to None either if no
141         matching ProcessStep is found (which can be assumed in case it was
142         just deleted under its feet), or if the parent step would not be
143         owned by the current Process.
144         """
145         def walk_steps(node: ProcessStep) -> None:
146             if node.step_process_id == self.id_:
147                 raise BadFormatException('bad step selection causes recursion')
148             step_process = self.by_id(db_conn, node.step_process_id)
149             for step in step_process.explicit_steps:
150                 walk_steps(step)
151         if parent_step_id is not None:
152             try:
153                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
154                 if parent_step.owner_id != self.id_:
155                     parent_step_id = None
156             except NotFoundException:
157                 parent_step_id = None
158         assert isinstance(self.id_, int)
159         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
160         walk_steps(step)
161         self.explicit_steps += [step]
162         step.save(db_conn)  # NB: This ensures a non-None step.id_.
163         return step
164
165     def set_steps(self, db_conn: DatabaseConnection,
166                   steps: list[tuple[int | None, int, int | None]]) -> None:
167         """Set self.explicit_steps in bulk."""
168         assert isinstance(self.id_, int)
169         for step in self.explicit_steps:
170             assert isinstance(step.id_, int)
171             del db_conn.cached_process_steps[step.id_]
172         self.explicit_steps = []
173         db_conn.delete_where('process_steps', 'owner', self.id_)
174         for step_tuple in steps:
175             self._add_step(db_conn, step_tuple[0],
176                            step_tuple[1], step_tuple[2])
177
178     def save(self, db_conn: DatabaseConnection) -> None:
179         """Add (or re-write) self and connected items to DB."""
180         self.save_core(db_conn)
181         assert isinstance(self.id_, int)
182         self.title.save(db_conn)
183         self.description.save(db_conn)
184         self.effort.save(db_conn)
185         db_conn.rewrite_relations('process_conditions', 'process', self.id_,
186                                   [[c.id_] for c in self.conditions])
187         db_conn.rewrite_relations('process_fulfills', 'process', self.id_,
188                                   [[c.id_] for c in self.fulfills])
189         db_conn.rewrite_relations('process_undoes', 'process', self.id_,
190                                   [[c.id_] for c in self.undoes])
191         db_conn.delete_where('process_steps', 'owner', self.id_)
192         for step in self.explicit_steps:
193             step.save(db_conn)
194         db_conn.cached_processes[self.id_] = self
195
196
197 class ProcessStep(BaseModel):
198     """Sub-unit of Processes."""
199     table_name = 'process_steps'
200     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
201
202     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
203                  parent_step_id: int | None) -> None:
204         self.set_int_id(id_)
205         self.owner_id = owner_id
206         self.step_process_id = step_process_id
207         self.parent_step_id = parent_step_id
208
209     @classmethod
210     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
211         """Retrieve ProcessStep by id_, or throw NotFoundException."""
212         step, _ = super()._by_id(db_conn, id_)
213         if step:
214             assert isinstance(step, ProcessStep)
215             return step
216         raise NotFoundException(f'found no ProcessStep of ID {id_}')
217
218     def save(self, db_conn: DatabaseConnection) -> None:
219         """Default to simply calling self.save_core for simple cases."""
220         self.save_core(db_conn)