home · contact · privacy
Further refactor Conditions handling.
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from typing import Any, Set
4 from plomtask.db import DatabaseConnection, BaseModel
5 from plomtask.misc import VersionedAttribute
6 from plomtask.conditions import Condition, ConditionsRelations
7 from plomtask.exceptions import NotFoundException, BadFormatException
8
9
10 class Process(BaseModel, ConditionsRelations):
11     """Template for, and metadata for, Todos, and their arrangements."""
12     table_name = 'processes'
13
14     # pylint: disable=too-many-instance-attributes
15
16     def __init__(self, id_: int | None) -> None:
17         self.set_int_id(id_)
18         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
19         self.description = VersionedAttribute(self, 'process_descriptions', '')
20         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
21         self.explicit_steps: list[ProcessStep] = []
22         self.conditions: list[Condition] = []
23         self.enables: list[Condition] = []
24         self.disables: list[Condition] = []
25
26     @classmethod
27     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
28         """Collect all Processes and their connected VersionedAttributes."""
29         processes = {}
30         for id_, process in db_conn.cached_processes.items():
31             processes[id_] = process
32         already_recorded = processes.keys()
33         for id_ in db_conn.column_all('processes', 'id'):
34             if id_ not in already_recorded:
35                 process = cls.by_id(db_conn, id_)
36                 processes[process.id_] = process
37         return list(processes.values())
38
39     @classmethod
40     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
41               create: bool = False) -> Process:
42         """Collect Process, its VersionedAttributes, and its child IDs."""
43         process = None
44         if id_:
45             process, _ = super()._by_id(db_conn, id_)
46         if not process:
47             if not create:
48                 raise NotFoundException(f'Process not found of id: {id_}')
49             process = Process(id_)
50         if isinstance(process.id_, int):
51             for name in ('title', 'description', 'effort'):
52                 table = f'process_{name}s'
53                 for row in db_conn.row_where(table, 'parent', process.id_):
54                     getattr(process, name).history_from_row(row)
55             for row in db_conn.row_where('process_steps', 'owner',
56                                          process.id_):
57                 step = ProcessStep.from_table_row(db_conn, row)
58                 process.explicit_steps += [step]
59             for name in ('conditions', 'enables', 'disables'):
60                 table = f'process_{name}'
61                 for cond_id in db_conn.column_where(table, 'condition',
62                                                     'process', process.id_):
63                     target = getattr(process, name)
64                     target += [Condition.by_id(db_conn, cond_id)]
65         assert isinstance(process, Process)
66         return process
67
68     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
69         """Return Processes using self for a ProcessStep."""
70         if not self.id_:
71             return []
72         owner_ids = set()
73         for id_ in db_conn.column_where('process_steps', 'owner',
74                                         'step_process', self.id_):
75             owner_ids.add(id_)
76         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
77
78     def get_steps(self, db_conn: DatabaseConnection, external_owner:
79                   Process | None = None) -> dict[int, dict[str, object]]:
80         """Return tree of depended-on explicit and implicit ProcessSteps."""
81
82         def make_node(step: ProcessStep) -> dict[str, object]:
83             is_explicit = False
84             if external_owner is not None:
85                 is_explicit = step.owner_id == external_owner.id_
86             process = self.__class__.by_id(db_conn, step.step_process_id)
87             step_steps = process.get_steps(db_conn, external_owner)
88             return {'process': process, 'parent_id': step.parent_step_id,
89                     'is_explicit': is_explicit, 'steps': step_steps}
90
91         def walk_steps(node_id: int, node: dict[str, Any]) -> None:
92             explicit_children = [s for s in self.explicit_steps
93                                  if s.parent_step_id == node_id]
94             for child in explicit_children:
95                 node['steps'][child.id_] = make_node(child)
96             node['seen'] = node_id in seen_step_ids
97             seen_step_ids.add(node_id)
98             for id_, step in node['steps'].items():
99                 walk_steps(id_, step)
100
101         steps: dict[int, dict[str, object]] = {}
102         seen_step_ids: Set[int] = set()
103         if external_owner is None:
104             external_owner = self
105         for step in [s for s in self.explicit_steps
106                      if s.parent_step_id is None]:
107             assert isinstance(step.id_, int)
108             steps[step.id_] = make_node(step)
109         for step_id, step_node in steps.items():
110             walk_steps(step_id, step_node)
111         return steps
112
113     def _add_step(self,
114                   db_conn: DatabaseConnection,
115                   id_: int | None,
116                   step_process_id: int,
117                   parent_step_id: int | None) -> ProcessStep:
118         """Create new ProcessStep, save and add it to self.explicit_steps.
119
120         Also checks against step recursion.
121
122         The new step's parent_step_id will fall back to None either if no
123         matching ProcessStep is found (which can be assumed in case it was
124         just deleted under its feet), or if the parent step would not be
125         owned by the current Process.
126         """
127         def walk_steps(node: ProcessStep) -> None:
128             if node.step_process_id == self.id_:
129                 raise BadFormatException('bad step selection causes recursion')
130             step_process = self.by_id(db_conn, node.step_process_id)
131             for step in step_process.explicit_steps:
132                 walk_steps(step)
133         if parent_step_id is not None:
134             try:
135                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
136                 if parent_step.owner_id != self.id_:
137                     parent_step_id = None
138             except NotFoundException:
139                 parent_step_id = None
140         assert isinstance(self.id_, int)
141         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
142         walk_steps(step)
143         self.explicit_steps += [step]
144         step.save(db_conn)  # NB: This ensures a non-None step.id_.
145         return step
146
147     def set_steps(self, db_conn: DatabaseConnection,
148                   steps: list[tuple[int | None, int, int | None]]) -> None:
149         """Set self.explicit_steps in bulk."""
150         assert isinstance(self.id_, int)
151         for step in self.explicit_steps:
152             assert isinstance(step.id_, int)
153             del db_conn.cached_process_steps[step.id_]
154         self.explicit_steps = []
155         db_conn.delete_where('process_steps', 'owner', self.id_)
156         for step_tuple in steps:
157             self._add_step(db_conn, step_tuple[0],
158                            step_tuple[1], step_tuple[2])
159
160     def save(self, db_conn: DatabaseConnection) -> None:
161         """Add (or re-write) self and connected items to DB."""
162         self.save_core(db_conn)
163         assert isinstance(self.id_, int)
164         self.title.save(db_conn)
165         self.description.save(db_conn)
166         self.effort.save(db_conn)
167         db_conn.rewrite_relations('process_conditions', 'process', self.id_,
168                                   [[c.id_] for c in self.conditions])
169         db_conn.rewrite_relations('process_enables', 'process', self.id_,
170                                   [[c.id_] for c in self.enables])
171         db_conn.rewrite_relations('process_disables', 'process', self.id_,
172                                   [[c.id_] for c in self.disables])
173         db_conn.delete_where('process_steps', 'owner', self.id_)
174         for step in self.explicit_steps:
175             step.save(db_conn)
176         db_conn.cached_processes[self.id_] = self
177
178
179 class ProcessStep(BaseModel):
180     """Sub-unit of Processes."""
181     table_name = 'process_steps'
182     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
183
184     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
185                  parent_step_id: int | None) -> None:
186         self.set_int_id(id_)
187         self.owner_id = owner_id
188         self.step_process_id = step_process_id
189         self.parent_step_id = parent_step_id
190
191     @classmethod
192     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
193         """Retrieve ProcessStep by id_, or throw NotFoundException."""
194         step, _ = super()._by_id(db_conn, id_)
195         if step:
196             assert isinstance(step, ProcessStep)
197             return step
198         raise NotFoundException(f'found no ProcessStep of ID {id_}')
199
200     def save(self, db_conn: DatabaseConnection) -> None:
201         """Default to simply calling self.save_core for simple cases."""
202         self.save_core(db_conn)