home · contact · privacy
0a9b95b3f5faf89fd5a4a9a0991c7bb9a70ca9c5
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Set, Any
5 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.misc import VersionedAttribute
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import NotFoundException, BadFormatException
10
11
12 @dataclass
13 class ProcessStepsNode:
14     """Collects what's useful to know for ProcessSteps tree display."""
15     process: Process
16     parent_id: int | None
17     is_explicit: bool
18     steps: dict[int, ProcessStepsNode]
19     seen: bool
20
21
22 class Process(BaseModel[int], ConditionsRelations):
23     """Template for, and metadata for, Todos, and their arrangements."""
24     table_name = 'processes'
25
26     # pylint: disable=too-many-instance-attributes
27
28     def __init__(self, id_: int | None) -> None:
29         super().__init__(id_)
30         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
31         self.description = VersionedAttribute(self, 'process_descriptions', '')
32         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
33         self.explicit_steps: list[ProcessStep] = []
34         self.conditions: list[Condition] = []
35         self.enables: list[Condition] = []
36         self.disables: list[Condition] = []
37
38     @classmethod
39     def from_table_row(cls, db_conn: DatabaseConnection,
40                        row: Row | list[Any]) -> Process:
41         """Make from DB row, with dependencies."""
42         process = super().from_table_row(db_conn, row)
43         assert isinstance(process.id_, int)
44         for name in ('title', 'description', 'effort'):
45             table = f'process_{name}s'
46             for row_ in db_conn.row_where(table, 'parent', process.id_):
47                 getattr(process, name).history_from_row(row_)
48         for row_ in db_conn.row_where('process_steps', 'owner',
49                                       process.id_):
50             step = ProcessStep.from_table_row(db_conn, row_)
51             process.explicit_steps += [step]  # pylint: disable=no-member
52         for name in ('conditions', 'enables', 'disables'):
53             table = f'process_{name}'
54             assert isinstance(process.id_, int)
55             for c_id in db_conn.column_where(table, 'condition',
56                                              'process', process.id_):
57                 target = getattr(process, name)
58                 target += [Condition.by_id(db_conn, c_id)]
59         return process
60
61     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
62         """Return Processes using self for a ProcessStep."""
63         if not self.id_:
64             return []
65         owner_ids = set()
66         for id_ in db_conn.column_where('process_steps', 'owner',
67                                         'step_process', self.id_):
68             owner_ids.add(id_)
69         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
70
71     def get_steps(self, db_conn: DatabaseConnection, external_owner:
72                   Process | None = None) -> dict[int, ProcessStepsNode]:
73         """Return tree of depended-on explicit and implicit ProcessSteps."""
74
75         def make_node(step: ProcessStep) -> ProcessStepsNode:
76             is_explicit = False
77             if external_owner is not None:
78                 is_explicit = step.owner_id == external_owner.id_
79             process = self.__class__.by_id(db_conn, step.step_process_id)
80             step_steps = process.get_steps(db_conn, external_owner)
81             return ProcessStepsNode(process, step.parent_step_id,
82                                     is_explicit, step_steps, False)
83
84         def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
85             explicit_children = [s for s in self.explicit_steps
86                                  if s.parent_step_id == node_id]
87             for child in explicit_children:
88                 assert isinstance(child.id_, int)
89                 node.steps[child.id_] = make_node(child)
90             node.seen = node_id in seen_step_ids
91             seen_step_ids.add(node_id)
92             for id_, step in node.steps.items():
93                 walk_steps(id_, step)
94
95         steps: dict[int, ProcessStepsNode] = {}
96         seen_step_ids: Set[int] = set()
97         if external_owner is None:
98             external_owner = self
99         for step in [s for s in self.explicit_steps
100                      if s.parent_step_id is None]:
101             assert isinstance(step.id_, int)
102             steps[step.id_] = make_node(step)
103         for step_id, step_node in steps.items():
104             walk_steps(step_id, step_node)
105         return steps
106
107     def _add_step(self,
108                   db_conn: DatabaseConnection,
109                   id_: int | None,
110                   step_process_id: int,
111                   parent_step_id: int | None) -> ProcessStep:
112         """Create new ProcessStep, save and add it to self.explicit_steps.
113
114         Also checks against step recursion.
115
116         The new step's parent_step_id will fall back to None either if no
117         matching ProcessStep is found (which can be assumed in case it was
118         just deleted under its feet), or if the parent step would not be
119         owned by the current Process.
120         """
121
122         def walk_steps(node: ProcessStep) -> None:
123             if node.step_process_id == self.id_:
124                 raise BadFormatException('bad step selection causes recursion')
125             step_process = self.by_id(db_conn, node.step_process_id)
126             for step in step_process.explicit_steps:
127                 walk_steps(step)
128
129         if parent_step_id is not None:
130             try:
131                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
132                 if parent_step.owner_id != self.id_:
133                     parent_step_id = None
134             except NotFoundException:
135                 parent_step_id = None
136         assert isinstance(self.id_, int)
137         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
138         walk_steps(step)
139         self.explicit_steps += [step]
140         step.save(db_conn)  # NB: This ensures a non-None step.id_.
141         return step
142
143     def set_steps(self, db_conn: DatabaseConnection,
144                   steps: list[tuple[int | None, int, int | None]]) -> None:
145         """Set self.explicit_steps in bulk."""
146         assert isinstance(self.id_, int)
147         for step in self.explicit_steps:
148             step.uncache()
149         self.explicit_steps = []
150         db_conn.delete_where('process_steps', 'owner', self.id_)
151         for step_tuple in steps:
152             self._add_step(db_conn, step_tuple[0],
153                            step_tuple[1], step_tuple[2])
154
155     def save(self, db_conn: DatabaseConnection) -> None:
156         """Add (or re-write) self and connected items to DB."""
157         self.save_core(db_conn)
158         assert isinstance(self.id_, int)
159         self.title.save(db_conn)
160         self.description.save(db_conn)
161         self.effort.save(db_conn)
162         db_conn.rewrite_relations('process_conditions', 'process', self.id_,
163                                   [[c.id_] for c in self.conditions])
164         db_conn.rewrite_relations('process_enables', 'process', self.id_,
165                                   [[c.id_] for c in self.enables])
166         db_conn.rewrite_relations('process_disables', 'process', self.id_,
167                                   [[c.id_] for c in self.disables])
168         db_conn.delete_where('process_steps', 'owner', self.id_)
169         for step in self.explicit_steps:
170             step.save(db_conn)
171
172     def remove(self, db_conn: DatabaseConnection) -> None:
173         """Remove from DB, with dependencies."""
174         assert isinstance(self.id_, int)
175         db_conn.delete_where('process_conditions', 'process', self.id_)
176         db_conn.delete_where('process_enables', 'process', self.id_)
177         db_conn.delete_where('process_disables', 'process', self.id_)
178         for step in self.explicit_steps:
179             step.remove(db_conn)
180         db_conn.delete_where('process_titles', 'parent', self.id_)
181         db_conn.delete_where('process_descriptions', 'parent', self.id_)
182         db_conn.delete_where('process_efforts', 'parent', self.id_)
183         super().remove(db_conn)
184
185
186 class ProcessStep(BaseModel[int]):
187     """Sub-unit of Processes."""
188     table_name = 'process_steps'
189     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
190
191     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
192                  parent_step_id: int | None) -> None:
193         super().__init__(id_)
194         self.owner_id = owner_id
195         self.step_process_id = step_process_id
196         self.parent_step_id = parent_step_id
197
198     def save(self, db_conn: DatabaseConnection) -> None:
199         """Default to simply calling self.save_core for simple cases."""
200         self.save_core(db_conn)
201
202     def remove(self, db_conn: DatabaseConnection) -> None:
203         """Remove from DB, and owner's .explicit_steps."""
204         owner = Process.by_id(db_conn, self.owner_id)
205         owner.explicit_steps.remove(self)
206         super().remove(db_conn)