home · contact · privacy
Simplify code with namedtuples and dataclasses.
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Set
5 from plomtask.db import DatabaseConnection, BaseModel
6 from plomtask.misc import VersionedAttribute
7 from plomtask.conditions import Condition, ConditionsRelations
8 from plomtask.exceptions import NotFoundException, BadFormatException
9
10
11 @dataclass
12 class ProcessStepsNode:
13     """Collects what's useful to know for ProcessSteps tree display."""
14     process: Process
15     parent_id: int | None
16     is_explicit: bool
17     steps: dict[int, ProcessStepsNode]
18     seen: bool
19
20
21 class Process(BaseModel, ConditionsRelations):
22     """Template for, and metadata for, Todos, and their arrangements."""
23     table_name = 'processes'
24
25     # pylint: disable=too-many-instance-attributes
26
27     def __init__(self, id_: int | None) -> None:
28         self.set_int_id(id_)
29         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
30         self.description = VersionedAttribute(self, 'process_descriptions', '')
31         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
32         self.explicit_steps: list[ProcessStep] = []
33         self.conditions: list[Condition] = []
34         self.enables: list[Condition] = []
35         self.disables: list[Condition] = []
36
37     @classmethod
38     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
39         """Collect all Processes and their connected VersionedAttributes."""
40         processes = {}
41         for id_, process in db_conn.cached_processes.items():
42             processes[id_] = process
43         already_recorded = processes.keys()
44         for id_ in db_conn.column_all('processes', 'id'):
45             if id_ not in already_recorded:
46                 process = cls.by_id(db_conn, id_)
47                 processes[process.id_] = process
48         return list(processes.values())
49
50     @classmethod
51     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
52               create: bool = False) -> Process:
53         """Collect Process, its VersionedAttributes, and its child IDs."""
54         process = None
55         if id_:
56             process, _ = super()._by_id(db_conn, id_)
57         if not process:
58             if not create:
59                 raise NotFoundException(f'Process not found of id: {id_}')
60             process = Process(id_)
61         if isinstance(process.id_, int):
62             for name in ('title', 'description', 'effort'):
63                 table = f'process_{name}s'
64                 for row in db_conn.row_where(table, 'parent', process.id_):
65                     getattr(process, name).history_from_row(row)
66             for row in db_conn.row_where('process_steps', 'owner',
67                                          process.id_):
68                 step = ProcessStep.from_table_row(db_conn, row)
69                 process.explicit_steps += [step]
70             for name in ('conditions', 'enables', 'disables'):
71                 table = f'process_{name}'
72                 for cond_id in db_conn.column_where(table, 'condition',
73                                                     'process', process.id_):
74                     target = getattr(process, name)
75                     target += [Condition.by_id(db_conn, cond_id)]
76         assert isinstance(process, Process)
77         return process
78
79     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
80         """Return Processes using self for a ProcessStep."""
81         if not self.id_:
82             return []
83         owner_ids = set()
84         for id_ in db_conn.column_where('process_steps', 'owner',
85                                         'step_process', self.id_):
86             owner_ids.add(id_)
87         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
88
89     def get_steps(self, db_conn: DatabaseConnection, external_owner:
90                   Process | None = None) -> dict[int, ProcessStepsNode]:
91         """Return tree of depended-on explicit and implicit ProcessSteps."""
92
93         def make_node(step: ProcessStep) -> ProcessStepsNode:
94             is_explicit = False
95             if external_owner is not None:
96                 is_explicit = step.owner_id == external_owner.id_
97             process = self.__class__.by_id(db_conn, step.step_process_id)
98             step_steps = process.get_steps(db_conn, external_owner)
99             return ProcessStepsNode(process, step.parent_step_id,
100                                     is_explicit, step_steps, False)
101
102         def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
103             explicit_children = [s for s in self.explicit_steps
104                                  if s.parent_step_id == node_id]
105             for child in explicit_children:
106                 assert isinstance(child.id_, int)
107                 node.steps[child.id_] = make_node(child)
108             node.seen = node_id in seen_step_ids
109             seen_step_ids.add(node_id)
110             for id_, step in node.steps.items():
111                 walk_steps(id_, step)
112
113         steps: dict[int, ProcessStepsNode] = {}
114         seen_step_ids: Set[int] = set()
115         if external_owner is None:
116             external_owner = self
117         for step in [s for s in self.explicit_steps
118                      if s.parent_step_id is None]:
119             assert isinstance(step.id_, int)
120             steps[step.id_] = make_node(step)
121         for step_id, step_node in steps.items():
122             walk_steps(step_id, step_node)
123         return steps
124
125     def _add_step(self,
126                   db_conn: DatabaseConnection,
127                   id_: int | None,
128                   step_process_id: int,
129                   parent_step_id: int | None) -> ProcessStep:
130         """Create new ProcessStep, save and add it to self.explicit_steps.
131
132         Also checks against step recursion.
133
134         The new step's parent_step_id will fall back to None either if no
135         matching ProcessStep is found (which can be assumed in case it was
136         just deleted under its feet), or if the parent step would not be
137         owned by the current Process.
138         """
139
140         def walk_steps(node: ProcessStep) -> None:
141             if node.step_process_id == self.id_:
142                 raise BadFormatException('bad step selection causes recursion')
143             step_process = self.by_id(db_conn, node.step_process_id)
144             for step in step_process.explicit_steps:
145                 walk_steps(step)
146
147         if parent_step_id is not None:
148             try:
149                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
150                 if parent_step.owner_id != self.id_:
151                     parent_step_id = None
152             except NotFoundException:
153                 parent_step_id = None
154         assert isinstance(self.id_, int)
155         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
156         walk_steps(step)
157         self.explicit_steps += [step]
158         step.save(db_conn)  # NB: This ensures a non-None step.id_.
159         return step
160
161     def set_steps(self, db_conn: DatabaseConnection,
162                   steps: list[tuple[int | None, int, int | None]]) -> None:
163         """Set self.explicit_steps in bulk."""
164         assert isinstance(self.id_, int)
165         for step in self.explicit_steps:
166             assert isinstance(step.id_, int)
167             del db_conn.cached_process_steps[step.id_]
168         self.explicit_steps = []
169         db_conn.delete_where('process_steps', 'owner', self.id_)
170         for step_tuple in steps:
171             self._add_step(db_conn, step_tuple[0],
172                            step_tuple[1], step_tuple[2])
173
174     def save(self, db_conn: DatabaseConnection) -> None:
175         """Add (or re-write) self and connected items to DB."""
176         self.save_core(db_conn)
177         assert isinstance(self.id_, int)
178         self.title.save(db_conn)
179         self.description.save(db_conn)
180         self.effort.save(db_conn)
181         db_conn.rewrite_relations('process_conditions', 'process', self.id_,
182                                   [[c.id_] for c in self.conditions])
183         db_conn.rewrite_relations('process_enables', 'process', self.id_,
184                                   [[c.id_] for c in self.enables])
185         db_conn.rewrite_relations('process_disables', 'process', self.id_,
186                                   [[c.id_] for c in self.disables])
187         db_conn.delete_where('process_steps', 'owner', self.id_)
188         for step in self.explicit_steps:
189             step.save(db_conn)
190         db_conn.cached_processes[self.id_] = self
191
192
193 class ProcessStep(BaseModel):
194     """Sub-unit of Processes."""
195     table_name = 'process_steps'
196     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
197
198     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
199                  parent_step_id: int | None) -> None:
200         self.set_int_id(id_)
201         self.owner_id = owner_id
202         self.step_process_id = step_process_id
203         self.parent_step_id = parent_step_id
204
205     @classmethod
206     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
207         """Retrieve ProcessStep by id_, or throw NotFoundException."""
208         step, _ = super()._by_id(db_conn, id_)
209         if step:
210             assert isinstance(step, ProcessStep)
211             return step
212         raise NotFoundException(f'found no ProcessStep of ID {id_}')
213
214     def save(self, db_conn: DatabaseConnection) -> None:
215         """Default to simply calling self.save_core for simple cases."""
216         self.save_core(db_conn)