home · contact · privacy
Refactor save and remove methods of BaseObject subclasses.
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Set, Any
5 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.versioned_attributes import VersionedAttribute
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import (NotFoundException, BadFormatException,
10                                  HandledException)
11
12
13 @dataclass
14 class ProcessStepsNode:
15     """Collects what's useful to know for ProcessSteps tree display."""
16     process: Process
17     parent_id: int | None
18     is_explicit: bool
19     steps: dict[int, ProcessStepsNode]
20     seen: bool
21
22
23 class Process(BaseModel[int], ConditionsRelations):
24     """Template for, and metadata for, Todos, and their arrangements."""
25     table_name = 'processes'
26     to_save_versioned = ['title', 'description', 'effort']
27     to_save_relations = [('process_conditions', 'process', 'conditions'),
28                          ('process_enables', 'process', 'enables'),
29                          ('process_disables', 'process', 'disables')]
30
31     # pylint: disable=too-many-instance-attributes
32
33     def __init__(self, id_: int | None) -> None:
34         super().__init__(id_)
35         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
36         self.description = VersionedAttribute(self, 'process_descriptions', '')
37         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
38         self.explicit_steps: list[ProcessStep] = []
39         self.conditions: list[Condition] = []
40         self.enables: list[Condition] = []
41         self.disables: list[Condition] = []
42
43     @classmethod
44     def from_table_row(cls, db_conn: DatabaseConnection,
45                        row: Row | list[Any]) -> Process:
46         """Make from DB row, with dependencies."""
47         process = super().from_table_row(db_conn, row)
48         assert isinstance(process.id_, int)
49         for name in ('title', 'description', 'effort'):
50             table = f'process_{name}s'
51             for row_ in db_conn.row_where(table, 'parent', process.id_):
52                 getattr(process, name).history_from_row(row_)
53         for row_ in db_conn.row_where('process_steps', 'owner',
54                                       process.id_):
55             step = ProcessStep.from_table_row(db_conn, row_)
56             process.explicit_steps += [step]  # pylint: disable=no-member
57         for name in ('conditions', 'enables', 'disables'):
58             table = f'process_{name}'
59             assert isinstance(process.id_, int)
60             for c_id in db_conn.column_where(table, 'condition',
61                                              'process', process.id_):
62                 target = getattr(process, name)
63                 target += [Condition.by_id(db_conn, c_id)]
64         return process
65
66     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
67         """Return Processes using self for a ProcessStep."""
68         if not self.id_:
69             return []
70         owner_ids = set()
71         for id_ in db_conn.column_where('process_steps', 'owner',
72                                         'step_process', self.id_):
73             owner_ids.add(id_)
74         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
75
76     def get_steps(self, db_conn: DatabaseConnection, external_owner:
77                   Process | None = None) -> dict[int, ProcessStepsNode]:
78         """Return tree of depended-on explicit and implicit ProcessSteps."""
79
80         def make_node(step: ProcessStep) -> ProcessStepsNode:
81             is_explicit = False
82             if external_owner is not None:
83                 is_explicit = step.owner_id == external_owner.id_
84             process = self.__class__.by_id(db_conn, step.step_process_id)
85             step_steps = process.get_steps(db_conn, external_owner)
86             return ProcessStepsNode(process, step.parent_step_id,
87                                     is_explicit, step_steps, False)
88
89         def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
90             explicit_children = [s for s in self.explicit_steps
91                                  if s.parent_step_id == node_id]
92             for child in explicit_children:
93                 assert isinstance(child.id_, int)
94                 node.steps[child.id_] = make_node(child)
95             node.seen = node_id in seen_step_ids
96             seen_step_ids.add(node_id)
97             for id_, step in node.steps.items():
98                 walk_steps(id_, step)
99
100         steps: dict[int, ProcessStepsNode] = {}
101         seen_step_ids: Set[int] = set()
102         if external_owner is None:
103             external_owner = self
104         for step in [s for s in self.explicit_steps
105                      if s.parent_step_id is None]:
106             assert isinstance(step.id_, int)
107             steps[step.id_] = make_node(step)
108         for step_id, step_node in steps.items():
109             walk_steps(step_id, step_node)
110         return steps
111
112     def _add_step(self,
113                   db_conn: DatabaseConnection,
114                   id_: int | None,
115                   step_process_id: int,
116                   parent_step_id: int | None) -> ProcessStep:
117         """Create new ProcessStep, save and add it to self.explicit_steps.
118
119         Also checks against step recursion.
120
121         The new step's parent_step_id will fall back to None either if no
122         matching ProcessStep is found (which can be assumed in case it was
123         just deleted under its feet), or if the parent step would not be
124         owned by the current Process.
125         """
126
127         def walk_steps(node: ProcessStep) -> None:
128             if node.step_process_id == self.id_:
129                 raise BadFormatException('bad step selection causes recursion')
130             step_process = self.by_id(db_conn, node.step_process_id)
131             for step in step_process.explicit_steps:
132                 walk_steps(step)
133
134         if parent_step_id is not None:
135             try:
136                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
137                 if parent_step.owner_id != self.id_:
138                     parent_step_id = None
139             except NotFoundException:
140                 parent_step_id = None
141         assert isinstance(self.id_, int)
142         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
143         walk_steps(step)
144         self.explicit_steps += [step]
145         step.save(db_conn)  # NB: This ensures a non-None step.id_.
146         return step
147
148     def set_steps(self, db_conn: DatabaseConnection,
149                   steps: list[tuple[int | None, int, int | None]]) -> None:
150         """Set self.explicit_steps in bulk."""
151         assert isinstance(self.id_, int)
152         for step in self.explicit_steps:
153             step.uncache()
154         self.explicit_steps = []
155         db_conn.delete_where('process_steps', 'owner', self.id_)
156         for step_tuple in steps:
157             self._add_step(db_conn, step_tuple[0],
158                            step_tuple[1], step_tuple[2])
159
160     def save(self, db_conn: DatabaseConnection) -> None:
161         """Add (or re-write) self and connected items to DB."""
162         super().save(db_conn)
163         assert isinstance(self.id_, int)
164         db_conn.delete_where('process_steps', 'owner', self.id_)
165         for step in self.explicit_steps:
166             step.save(db_conn)
167
168     def remove(self, db_conn: DatabaseConnection) -> None:
169         """Remove from DB, with dependencies.
170
171         Guard against removal of Processes in use.
172         """
173         assert isinstance(self.id_, int)
174         for _ in db_conn.row_where('process_steps', 'step_process', self.id_):
175             raise HandledException('cannot remove Process in use')
176         for _ in db_conn.row_where('todos', 'process', self.id_):
177             raise HandledException('cannot remove Process in use')
178         for step in self.explicit_steps:
179             step.remove(db_conn)
180         super().remove(db_conn)
181
182
183 class ProcessStep(BaseModel[int]):
184     """Sub-unit of Processes."""
185     table_name = 'process_steps'
186     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
187
188     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
189                  parent_step_id: int | None) -> None:
190         super().__init__(id_)
191         self.owner_id = owner_id
192         self.step_process_id = step_process_id
193         self.parent_step_id = parent_step_id
194
195     def remove(self, db_conn: DatabaseConnection) -> None:
196         """Remove from DB, and owner's .explicit_steps."""
197         owner = Process.by_id(db_conn, self.owner_id)
198         owner.explicit_steps.remove(self)
199         super().remove(db_conn)