home · contact · privacy
375a0bee622081fd8690cc0e6c1a8ae5241685e1
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Set, Any
5 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.misc import VersionedAttribute
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import (NotFoundException, BadFormatException,
10                                  HandledException)
11
12
13 @dataclass
14 class ProcessStepsNode:
15     """Collects what's useful to know for ProcessSteps tree display."""
16     process: Process
17     parent_id: int | None
18     is_explicit: bool
19     steps: dict[int, ProcessStepsNode]
20     seen: bool
21
22
23 class Process(BaseModel[int], ConditionsRelations):
24     """Template for, and metadata for, Todos, and their arrangements."""
25     table_name = 'processes'
26
27     # pylint: disable=too-many-instance-attributes
28
29     def __init__(self, id_: int | None) -> None:
30         super().__init__(id_)
31         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
32         self.description = VersionedAttribute(self, 'process_descriptions', '')
33         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
34         self.explicit_steps: list[ProcessStep] = []
35         self.conditions: list[Condition] = []
36         self.enables: list[Condition] = []
37         self.disables: list[Condition] = []
38
39     @classmethod
40     def from_table_row(cls, db_conn: DatabaseConnection,
41                        row: Row | list[Any]) -> Process:
42         """Make from DB row, with dependencies."""
43         process = super().from_table_row(db_conn, row)
44         assert isinstance(process.id_, int)
45         for name in ('title', 'description', 'effort'):
46             table = f'process_{name}s'
47             for row_ in db_conn.row_where(table, 'parent', process.id_):
48                 getattr(process, name).history_from_row(row_)
49         for row_ in db_conn.row_where('process_steps', 'owner',
50                                       process.id_):
51             step = ProcessStep.from_table_row(db_conn, row_)
52             process.explicit_steps += [step]  # pylint: disable=no-member
53         for name in ('conditions', 'enables', 'disables'):
54             table = f'process_{name}'
55             assert isinstance(process.id_, int)
56             for c_id in db_conn.column_where(table, 'condition',
57                                              'process', process.id_):
58                 target = getattr(process, name)
59                 target += [Condition.by_id(db_conn, c_id)]
60         return process
61
62     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
63         """Return Processes using self for a ProcessStep."""
64         if not self.id_:
65             return []
66         owner_ids = set()
67         for id_ in db_conn.column_where('process_steps', 'owner',
68                                         'step_process', self.id_):
69             owner_ids.add(id_)
70         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
71
72     def get_steps(self, db_conn: DatabaseConnection, external_owner:
73                   Process | None = None) -> dict[int, ProcessStepsNode]:
74         """Return tree of depended-on explicit and implicit ProcessSteps."""
75
76         def make_node(step: ProcessStep) -> ProcessStepsNode:
77             is_explicit = False
78             if external_owner is not None:
79                 is_explicit = step.owner_id == external_owner.id_
80             process = self.__class__.by_id(db_conn, step.step_process_id)
81             step_steps = process.get_steps(db_conn, external_owner)
82             return ProcessStepsNode(process, step.parent_step_id,
83                                     is_explicit, step_steps, False)
84
85         def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
86             explicit_children = [s for s in self.explicit_steps
87                                  if s.parent_step_id == node_id]
88             for child in explicit_children:
89                 assert isinstance(child.id_, int)
90                 node.steps[child.id_] = make_node(child)
91             node.seen = node_id in seen_step_ids
92             seen_step_ids.add(node_id)
93             for id_, step in node.steps.items():
94                 walk_steps(id_, step)
95
96         steps: dict[int, ProcessStepsNode] = {}
97         seen_step_ids: Set[int] = set()
98         if external_owner is None:
99             external_owner = self
100         for step in [s for s in self.explicit_steps
101                      if s.parent_step_id is None]:
102             assert isinstance(step.id_, int)
103             steps[step.id_] = make_node(step)
104         for step_id, step_node in steps.items():
105             walk_steps(step_id, step_node)
106         return steps
107
108     def _add_step(self,
109                   db_conn: DatabaseConnection,
110                   id_: int | None,
111                   step_process_id: int,
112                   parent_step_id: int | None) -> ProcessStep:
113         """Create new ProcessStep, save and add it to self.explicit_steps.
114
115         Also checks against step recursion.
116
117         The new step's parent_step_id will fall back to None either if no
118         matching ProcessStep is found (which can be assumed in case it was
119         just deleted under its feet), or if the parent step would not be
120         owned by the current Process.
121         """
122
123         def walk_steps(node: ProcessStep) -> None:
124             if node.step_process_id == self.id_:
125                 raise BadFormatException('bad step selection causes recursion')
126             step_process = self.by_id(db_conn, node.step_process_id)
127             for step in step_process.explicit_steps:
128                 walk_steps(step)
129
130         if parent_step_id is not None:
131             try:
132                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
133                 if parent_step.owner_id != self.id_:
134                     parent_step_id = None
135             except NotFoundException:
136                 parent_step_id = None
137         assert isinstance(self.id_, int)
138         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
139         walk_steps(step)
140         self.explicit_steps += [step]
141         step.save(db_conn)  # NB: This ensures a non-None step.id_.
142         return step
143
144     def set_steps(self, db_conn: DatabaseConnection,
145                   steps: list[tuple[int | None, int, int | None]]) -> None:
146         """Set self.explicit_steps in bulk."""
147         assert isinstance(self.id_, int)
148         for step in self.explicit_steps:
149             step.uncache()
150         self.explicit_steps = []
151         db_conn.delete_where('process_steps', 'owner', self.id_)
152         for step_tuple in steps:
153             self._add_step(db_conn, step_tuple[0],
154                            step_tuple[1], step_tuple[2])
155
156     def save(self, db_conn: DatabaseConnection) -> None:
157         """Add (or re-write) self and connected items to DB."""
158         self.save_core(db_conn)
159         assert isinstance(self.id_, int)
160         self.title.save(db_conn)
161         self.description.save(db_conn)
162         self.effort.save(db_conn)
163         db_conn.rewrite_relations('process_conditions', 'process', self.id_,
164                                   [[c.id_] for c in self.conditions])
165         db_conn.rewrite_relations('process_enables', 'process', self.id_,
166                                   [[c.id_] for c in self.enables])
167         db_conn.rewrite_relations('process_disables', 'process', self.id_,
168                                   [[c.id_] for c in self.disables])
169         db_conn.delete_where('process_steps', 'owner', self.id_)
170         for step in self.explicit_steps:
171             step.save(db_conn)
172
173     def remove(self, db_conn: DatabaseConnection) -> None:
174         """Remove from DB, with dependencies."""
175         assert isinstance(self.id_, int)
176         for _ in db_conn.row_where('process_steps', 'step_process', self.id_):
177             raise HandledException('cannot remove Process in use')
178         for _ in db_conn.row_where('todos', 'process', self.id_):
179             raise HandledException('cannot remove Process in use')
180         db_conn.delete_where('process_conditions', 'process', self.id_)
181         db_conn.delete_where('process_enables', 'process', self.id_)
182         db_conn.delete_where('process_disables', 'process', self.id_)
183         for step in self.explicit_steps:
184             step.remove(db_conn)
185         db_conn.delete_where('process_titles', 'parent', self.id_)
186         db_conn.delete_where('process_descriptions', 'parent', self.id_)
187         db_conn.delete_where('process_efforts', 'parent', self.id_)
188         super().remove(db_conn)
189
190
191 class ProcessStep(BaseModel[int]):
192     """Sub-unit of Processes."""
193     table_name = 'process_steps'
194     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
195
196     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
197                  parent_step_id: int | None) -> None:
198         super().__init__(id_)
199         self.owner_id = owner_id
200         self.step_process_id = step_process_id
201         self.parent_step_id = parent_step_id
202
203     def save(self, db_conn: DatabaseConnection) -> None:
204         """Default to simply calling self.save_core for simple cases."""
205         self.save_core(db_conn)
206
207     def remove(self, db_conn: DatabaseConnection) -> None:
208         """Remove from DB, and owner's .explicit_steps."""
209         owner = Process.by_id(db_conn, self.owner_id)
210         owner.explicit_steps.remove(self)
211         super().remove(db_conn)