home · contact · privacy
Extend tests.
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Set, Any
5 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.versioned_attributes import VersionedAttribute
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import (NotFoundException, BadFormatException,
10                                  HandledException)
11
12
13 @dataclass
14 class ProcessStepsNode:
15     """Collects what's useful to know for ProcessSteps tree display."""
16     process: Process
17     parent_id: int | None
18     is_explicit: bool
19     steps: dict[int, ProcessStepsNode]
20     seen: bool
21
22
23 class Process(BaseModel[int], ConditionsRelations):
24     """Template for, and metadata for, Todos, and their arrangements."""
25     table_name = 'processes'
26
27     # pylint: disable=too-many-instance-attributes
28
29     def __init__(self, id_: int | None) -> None:
30         super().__init__(id_)
31         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
32         self.description = VersionedAttribute(self, 'process_descriptions', '')
33         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
34         self.explicit_steps: list[ProcessStep] = []
35         self.conditions: list[Condition] = []
36         self.enables: list[Condition] = []
37         self.disables: list[Condition] = []
38
39     @classmethod
40     def from_table_row(cls, db_conn: DatabaseConnection,
41                        row: Row | list[Any]) -> Process:
42         """Make from DB row, with dependencies."""
43         process = super().from_table_row(db_conn, row)
44         assert isinstance(process.id_, int)
45         for name in ('title', 'description', 'effort'):
46             table = f'process_{name}s'
47             for row_ in db_conn.row_where(table, 'parent', process.id_):
48                 getattr(process, name).history_from_row(row_)
49         for row_ in db_conn.row_where('process_steps', 'owner',
50                                       process.id_):
51             step = ProcessStep.from_table_row(db_conn, row_)
52             process.explicit_steps += [step]  # pylint: disable=no-member
53         for name in ('conditions', 'enables', 'disables'):
54             table = f'process_{name}'
55             assert isinstance(process.id_, int)
56             for c_id in db_conn.column_where(table, 'condition',
57                                              'process', process.id_):
58                 target = getattr(process, name)
59                 target += [Condition.by_id(db_conn, c_id)]
60         return process
61
62     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
63         """Return Processes using self for a ProcessStep."""
64         if not self.id_:
65             return []
66         owner_ids = set()
67         for id_ in db_conn.column_where('process_steps', 'owner',
68                                         'step_process', self.id_):
69             owner_ids.add(id_)
70         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
71
72     def get_steps(self, db_conn: DatabaseConnection, external_owner:
73                   Process | None = None) -> dict[int, ProcessStepsNode]:
74         """Return tree of depended-on explicit and implicit ProcessSteps."""
75
76         def make_node(step: ProcessStep) -> ProcessStepsNode:
77             is_explicit = False
78             if external_owner is not None:
79                 is_explicit = step.owner_id == external_owner.id_
80             process = self.__class__.by_id(db_conn, step.step_process_id)
81             step_steps = process.get_steps(db_conn, external_owner)
82             return ProcessStepsNode(process, step.parent_step_id,
83                                     is_explicit, step_steps, False)
84
85         def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
86             explicit_children = [s for s in self.explicit_steps
87                                  if s.parent_step_id == node_id]
88             for child in explicit_children:
89                 assert isinstance(child.id_, int)
90                 node.steps[child.id_] = make_node(child)
91             node.seen = node_id in seen_step_ids
92             seen_step_ids.add(node_id)
93             for id_, step in node.steps.items():
94                 walk_steps(id_, step)
95
96         steps: dict[int, ProcessStepsNode] = {}
97         seen_step_ids: Set[int] = set()
98         if external_owner is None:
99             external_owner = self
100         for step in [s for s in self.explicit_steps
101                      if s.parent_step_id is None]:
102             assert isinstance(step.id_, int)
103             steps[step.id_] = make_node(step)
104         for step_id, step_node in steps.items():
105             walk_steps(step_id, step_node)
106         return steps
107
108     def _add_step(self,
109                   db_conn: DatabaseConnection,
110                   id_: int | None,
111                   step_process_id: int,
112                   parent_step_id: int | None) -> ProcessStep:
113         """Create new ProcessStep, save and add it to self.explicit_steps.
114
115         Also checks against step recursion.
116
117         The new step's parent_step_id will fall back to None either if no
118         matching ProcessStep is found (which can be assumed in case it was
119         just deleted under its feet), or if the parent step would not be
120         owned by the current Process.
121         """
122
123         def walk_steps(node: ProcessStep) -> None:
124             if node.step_process_id == self.id_:
125                 raise BadFormatException('bad step selection causes recursion')
126             step_process = self.by_id(db_conn, node.step_process_id)
127             for step in step_process.explicit_steps:
128                 walk_steps(step)
129
130         if parent_step_id is not None:
131             try:
132                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
133                 if parent_step.owner_id != self.id_:
134                     parent_step_id = None
135             except NotFoundException:
136                 parent_step_id = None
137         assert isinstance(self.id_, int)
138         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
139         walk_steps(step)
140         self.explicit_steps += [step]
141         step.save(db_conn)  # NB: This ensures a non-None step.id_.
142         return step
143
144     def set_steps(self, db_conn: DatabaseConnection,
145                   steps: list[tuple[int | None, int, int | None]]) -> None:
146         """Set self.explicit_steps in bulk."""
147         assert isinstance(self.id_, int)
148         for step in self.explicit_steps:
149             step.uncache()
150         self.explicit_steps = []
151         db_conn.delete_where('process_steps', 'owner', self.id_)
152         for step_tuple in steps:
153             self._add_step(db_conn, step_tuple[0],
154                            step_tuple[1], step_tuple[2])
155
156     def save(self, db_conn: DatabaseConnection) -> None:
157         """Add (or re-write) self and connected items to DB."""
158         self.save_core(db_conn)
159         assert isinstance(self.id_, int)
160         self.title.save(db_conn)
161         self.description.save(db_conn)
162         self.effort.save(db_conn)
163         db_conn.rewrite_relations('process_conditions', 'process', self.id_,
164                                   [[c.id_] for c in self.conditions])
165         db_conn.rewrite_relations('process_enables', 'process', self.id_,
166                                   [[c.id_] for c in self.enables])
167         db_conn.rewrite_relations('process_disables', 'process', self.id_,
168                                   [[c.id_] for c in self.disables])
169         db_conn.delete_where('process_steps', 'owner', self.id_)
170         for step in self.explicit_steps:
171             step.save(db_conn)
172
173     def remove(self, db_conn: DatabaseConnection) -> None:
174         """Remove from DB, with dependencies.
175
176         Guard against removal of Processes in use.
177         """
178         assert isinstance(self.id_, int)
179         for _ in db_conn.row_where('process_steps', 'step_process', self.id_):
180             raise HandledException('cannot remove Process in use')
181         for _ in db_conn.row_where('todos', 'process', self.id_):
182             raise HandledException('cannot remove Process in use')
183         db_conn.delete_where('process_conditions', 'process', self.id_)
184         db_conn.delete_where('process_enables', 'process', self.id_)
185         db_conn.delete_where('process_disables', 'process', self.id_)
186         for step in self.explicit_steps:
187             step.remove(db_conn)
188         db_conn.delete_where('process_titles', 'parent', self.id_)
189         db_conn.delete_where('process_descriptions', 'parent', self.id_)
190         db_conn.delete_where('process_efforts', 'parent', self.id_)
191         super().remove(db_conn)
192
193
194 class ProcessStep(BaseModel[int]):
195     """Sub-unit of Processes."""
196     table_name = 'process_steps'
197     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
198
199     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
200                  parent_step_id: int | None) -> None:
201         super().__init__(id_)
202         self.owner_id = owner_id
203         self.step_process_id = step_process_id
204         self.parent_step_id = parent_step_id
205
206     def save(self, db_conn: DatabaseConnection) -> None:
207         """Default to simply calling self.save_core for simple cases."""
208         self.save_core(db_conn)
209
210     def remove(self, db_conn: DatabaseConnection) -> None:
211         """Remove from DB, and owner's .explicit_steps."""
212         owner = Process.by_id(db_conn, self.owner_id)
213         owner.explicit_steps.remove(self)
214         super().remove(db_conn)