home · contact · privacy
Clean up enablers/disablers code and naming conventions.
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from typing import Any, Set
4 from plomtask.db import DatabaseConnection, BaseModel
5 from plomtask.misc import VersionedAttribute
6 from plomtask.conditions import Condition
7 from plomtask.exceptions import NotFoundException, BadFormatException
8
9
10 class Process(BaseModel):
11     """Template for, and metadata for, Todos, and their arrangements."""
12     table_name = 'processes'
13
14     # pylint: disable=too-many-instance-attributes
15
16     def __init__(self, id_: int | None) -> None:
17         self.set_int_id(id_)
18         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
19         self.description = VersionedAttribute(self, 'process_descriptions', '')
20         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
21         self.explicit_steps: list[ProcessStep] = []
22         self.conditions: list[Condition] = []
23         self.enables: list[Condition] = []
24         self.disables: list[Condition] = []
25
26     @classmethod
27     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
28         """Collect all Processes and their connected VersionedAttributes."""
29         processes = {}
30         for id_, process in db_conn.cached_processes.items():
31             processes[id_] = process
32         already_recorded = processes.keys()
33         for id_ in db_conn.column_all('processes', 'id'):
34             if id_ not in already_recorded:
35                 process = cls.by_id(db_conn, id_)
36                 processes[process.id_] = process
37         return list(processes.values())
38
39     @classmethod
40     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
41               create: bool = False) -> Process:
42         """Collect Process, its VersionedAttributes, and its child IDs."""
43         process = None
44         if id_:
45             process, _ = super()._by_id(db_conn, id_)
46         if not process:
47             if not create:
48                 raise NotFoundException(f'Process not found of id: {id_}')
49             process = Process(id_)
50         if isinstance(process.id_, int):
51             for name in ('title', 'description', 'effort'):
52                 table = f'process_{name}s'
53                 for row in db_conn.row_where(table, 'parent', process.id_):
54                     getattr(process, name).history_from_row(row)
55             for row in db_conn.row_where('process_steps', 'owner',
56                                          process.id_):
57                 step = ProcessStep.from_table_row(db_conn, row)
58                 process.explicit_steps += [step]
59             for name in ('conditions', 'enables', 'disables'):
60                 table = f'process_{name}'
61                 for cond_id in db_conn.column_where(table, 'condition',
62                                                     'process', process.id_):
63                     target = getattr(process, name)
64                     target += [Condition.by_id(db_conn, cond_id)]
65         assert isinstance(process, Process)
66         return process
67
68     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
69         """Return Processes using self for a ProcessStep."""
70         if not self.id_:
71             return []
72         owner_ids = set()
73         for id_ in db_conn.column_where('process_steps', 'owner',
74                                         'step_process', self.id_):
75             owner_ids.add(id_)
76         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
77
78     def get_steps(self, db_conn: DatabaseConnection, external_owner:
79                   Process | None = None) -> dict[int, dict[str, object]]:
80         """Return tree of depended-on explicit and implicit ProcessSteps."""
81
82         def make_node(step: ProcessStep) -> dict[str, object]:
83             is_explicit = False
84             if external_owner is not None:
85                 is_explicit = step.owner_id == external_owner.id_
86             process = self.__class__.by_id(db_conn, step.step_process_id)
87             step_steps = process.get_steps(db_conn, external_owner)
88             return {'process': process, 'parent_id': step.parent_step_id,
89                     'is_explicit': is_explicit, 'steps': step_steps}
90
91         def walk_steps(node_id: int, node: dict[str, Any]) -> None:
92             explicit_children = [s for s in self.explicit_steps
93                                  if s.parent_step_id == node_id]
94             for child in explicit_children:
95                 node['steps'][child.id_] = make_node(child)
96             node['seen'] = node_id in seen_step_ids
97             seen_step_ids.add(node_id)
98             for id_, step in node['steps'].items():
99                 walk_steps(id_, step)
100
101         steps: dict[int, dict[str, object]] = {}
102         seen_step_ids: Set[int] = set()
103         if external_owner is None:
104             external_owner = self
105         for step in [s for s in self.explicit_steps
106                      if s.parent_step_id is None]:
107             assert isinstance(step.id_, int)
108             steps[step.id_] = make_node(step)
109         for step_id, step_node in steps.items():
110             walk_steps(step_id, step_node)
111         return steps
112
113     def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
114                        trgt: str = 'conditions') -> None:
115         """Set self.[target] to Conditions identified by ids."""
116         trgt_list = getattr(self, trgt)
117         while len(trgt_list) > 0:
118             trgt_list.pop()
119         for id_ in ids:
120             trgt_list += [Condition.by_id(db_conn, id_)]
121
122     def set_enables(self, db_conn: DatabaseConnection,
123                     ids: list[int]) -> None:
124         """Set self.enables to Conditions identified by ids."""
125         self.set_conditions(db_conn, ids, 'enables')
126
127     def set_disables(self, db_conn: DatabaseConnection,
128                      ids: list[int]) -> None:
129         """Set self.disables to Conditions identified by ids."""
130         self.set_conditions(db_conn, ids, 'disables')
131
132     def _add_step(self,
133                   db_conn: DatabaseConnection,
134                   id_: int | None,
135                   step_process_id: int,
136                   parent_step_id: int | None) -> ProcessStep:
137         """Create new ProcessStep, save and add it to self.explicit_steps.
138
139         Also checks against step recursion.
140
141         The new step's parent_step_id will fall back to None either if no
142         matching ProcessStep is found (which can be assumed in case it was
143         just deleted under its feet), or if the parent step would not be
144         owned by the current Process.
145         """
146         def walk_steps(node: ProcessStep) -> None:
147             if node.step_process_id == self.id_:
148                 raise BadFormatException('bad step selection causes recursion')
149             step_process = self.by_id(db_conn, node.step_process_id)
150             for step in step_process.explicit_steps:
151                 walk_steps(step)
152         if parent_step_id is not None:
153             try:
154                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
155                 if parent_step.owner_id != self.id_:
156                     parent_step_id = None
157             except NotFoundException:
158                 parent_step_id = None
159         assert isinstance(self.id_, int)
160         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
161         walk_steps(step)
162         self.explicit_steps += [step]
163         step.save(db_conn)  # NB: This ensures a non-None step.id_.
164         return step
165
166     def set_steps(self, db_conn: DatabaseConnection,
167                   steps: list[tuple[int | None, int, int | None]]) -> None:
168         """Set self.explicit_steps in bulk."""
169         assert isinstance(self.id_, int)
170         for step in self.explicit_steps:
171             assert isinstance(step.id_, int)
172             del db_conn.cached_process_steps[step.id_]
173         self.explicit_steps = []
174         db_conn.delete_where('process_steps', 'owner', self.id_)
175         for step_tuple in steps:
176             self._add_step(db_conn, step_tuple[0],
177                            step_tuple[1], step_tuple[2])
178
179     def save(self, db_conn: DatabaseConnection) -> None:
180         """Add (or re-write) self and connected items to DB."""
181         self.save_core(db_conn)
182         assert isinstance(self.id_, int)
183         self.title.save(db_conn)
184         self.description.save(db_conn)
185         self.effort.save(db_conn)
186         db_conn.rewrite_relations('process_conditions', 'process', self.id_,
187                                   [[c.id_] for c in self.conditions])
188         db_conn.rewrite_relations('process_enables', 'process', self.id_,
189                                   [[c.id_] for c in self.enables])
190         db_conn.rewrite_relations('process_disables', 'process', self.id_,
191                                   [[c.id_] for c in self.disables])
192         db_conn.delete_where('process_steps', 'owner', self.id_)
193         for step in self.explicit_steps:
194             step.save(db_conn)
195         db_conn.cached_processes[self.id_] = self
196
197
198 class ProcessStep(BaseModel):
199     """Sub-unit of Processes."""
200     table_name = 'process_steps'
201     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
202
203     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
204                  parent_step_id: int | None) -> None:
205         self.set_int_id(id_)
206         self.owner_id = owner_id
207         self.step_process_id = step_process_id
208         self.parent_step_id = parent_step_id
209
210     @classmethod
211     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
212         """Retrieve ProcessStep by id_, or throw NotFoundException."""
213         step, _ = super()._by_id(db_conn, id_)
214         if step:
215             assert isinstance(step, ProcessStep)
216             return step
217         raise NotFoundException(f'found no ProcessStep of ID {id_}')
218
219     def save(self, db_conn: DatabaseConnection) -> None:
220         """Default to simply calling self.save_core for simple cases."""
221         self.save_core(db_conn)