home · contact · privacy
e67b13414e26d40a87e77545ce57d238bc050028
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Set, Any
5 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.misc import VersionedAttribute
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import NotFoundException, BadFormatException
10
11
12 @dataclass
13 class ProcessStepsNode:
14     """Collects what's useful to know for ProcessSteps tree display."""
15     process: Process
16     parent_id: int | None
17     is_explicit: bool
18     steps: dict[int, ProcessStepsNode]
19     seen: bool
20
21
22 class Process(BaseModel[int], ConditionsRelations):
23     """Template for, and metadata for, Todos, and their arrangements."""
24     table_name = 'processes'
25
26     # pylint: disable=too-many-instance-attributes
27
28     def __init__(self, id_: int | None) -> None:
29         super().__init__(id_)
30         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
31         self.description = VersionedAttribute(self, 'process_descriptions', '')
32         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
33         self.explicit_steps: list[ProcessStep] = []
34         self.conditions: list[Condition] = []
35         self.enables: list[Condition] = []
36         self.disables: list[Condition] = []
37
38     @classmethod
39     def from_table_row(cls, db_conn: DatabaseConnection,
40                        row: Row | list[Any]) -> Process:
41         """Make from DB row, with dependencies."""
42         process = super().from_table_row(db_conn, row)
43         assert isinstance(process.id_, int)
44         for name in ('title', 'description', 'effort'):
45             table = f'process_{name}s'
46             for row_ in db_conn.row_where(table, 'parent', process.id_):
47                 getattr(process, name).history_from_row(row_)
48         for row_ in db_conn.row_where('process_steps', 'owner',
49                                       process.id_):
50             step = ProcessStep.from_table_row(db_conn, row_)
51             process.explicit_steps += [step]  # pylint: disable=no-member
52         for name in ('conditions', 'enables', 'disables'):
53             table = f'process_{name}'
54             assert isinstance(process.id_, int)
55             for c_id in db_conn.column_where(table, 'condition',
56                                              'process', process.id_):
57                 target = getattr(process, name)
58                 target += [Condition.by_id(db_conn, c_id)]
59         return process
60
61     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
62         """Return Processes using self for a ProcessStep."""
63         if not self.id_:
64             return []
65         owner_ids = set()
66         for id_ in db_conn.column_where('process_steps', 'owner',
67                                         'step_process', self.id_):
68             owner_ids.add(id_)
69         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
70
71     def get_steps(self, db_conn: DatabaseConnection, external_owner:
72                   Process | None = None) -> dict[int, ProcessStepsNode]:
73         """Return tree of depended-on explicit and implicit ProcessSteps."""
74
75         def make_node(step: ProcessStep) -> ProcessStepsNode:
76             is_explicit = False
77             if external_owner is not None:
78                 is_explicit = step.owner_id == external_owner.id_
79             process = self.__class__.by_id(db_conn, step.step_process_id)
80             step_steps = process.get_steps(db_conn, external_owner)
81             return ProcessStepsNode(process, step.parent_step_id,
82                                     is_explicit, step_steps, False)
83
84         def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
85             explicit_children = [s for s in self.explicit_steps
86                                  if s.parent_step_id == node_id]
87             for child in explicit_children:
88                 assert isinstance(child.id_, int)
89                 node.steps[child.id_] = make_node(child)
90             node.seen = node_id in seen_step_ids
91             seen_step_ids.add(node_id)
92             for id_, step in node.steps.items():
93                 walk_steps(id_, step)
94
95         steps: dict[int, ProcessStepsNode] = {}
96         seen_step_ids: Set[int] = set()
97         if external_owner is None:
98             external_owner = self
99         for step in [s for s in self.explicit_steps
100                      if s.parent_step_id is None]:
101             assert isinstance(step.id_, int)
102             steps[step.id_] = make_node(step)
103         for step_id, step_node in steps.items():
104             walk_steps(step_id, step_node)
105         return steps
106
107     def _add_step(self,
108                   db_conn: DatabaseConnection,
109                   id_: int | None,
110                   step_process_id: int,
111                   parent_step_id: int | None) -> ProcessStep:
112         """Create new ProcessStep, save and add it to self.explicit_steps.
113
114         Also checks against step recursion.
115
116         The new step's parent_step_id will fall back to None either if no
117         matching ProcessStep is found (which can be assumed in case it was
118         just deleted under its feet), or if the parent step would not be
119         owned by the current Process.
120         """
121
122         def walk_steps(node: ProcessStep) -> None:
123             if node.step_process_id == self.id_:
124                 raise BadFormatException('bad step selection causes recursion')
125             step_process = self.by_id(db_conn, node.step_process_id)
126             for step in step_process.explicit_steps:
127                 walk_steps(step)
128
129         if parent_step_id is not None:
130             try:
131                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
132                 if parent_step.owner_id != self.id_:
133                     parent_step_id = None
134             except NotFoundException:
135                 parent_step_id = None
136         assert isinstance(self.id_, int)
137         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
138         walk_steps(step)
139         self.explicit_steps += [step]
140         step.save(db_conn)  # NB: This ensures a non-None step.id_.
141         return step
142
143     def set_steps(self, db_conn: DatabaseConnection,
144                   steps: list[tuple[int | None, int, int | None]]) -> None:
145         """Set self.explicit_steps in bulk."""
146         assert isinstance(self.id_, int)
147         for step in self.explicit_steps:
148             step.uncache()
149         self.explicit_steps = []
150         db_conn.delete_where('process_steps', 'owner', self.id_)
151         for step_tuple in steps:
152             self._add_step(db_conn, step_tuple[0],
153                            step_tuple[1], step_tuple[2])
154
155     def save(self, db_conn: DatabaseConnection) -> None:
156         """Add (or re-write) self and connected items to DB."""
157         self.save_core(db_conn)
158         assert isinstance(self.id_, int)
159         self.title.save(db_conn)
160         self.description.save(db_conn)
161         self.effort.save(db_conn)
162         db_conn.rewrite_relations('process_conditions', 'process', self.id_,
163                                   [[c.id_] for c in self.conditions])
164         db_conn.rewrite_relations('process_enables', 'process', self.id_,
165                                   [[c.id_] for c in self.enables])
166         db_conn.rewrite_relations('process_disables', 'process', self.id_,
167                                   [[c.id_] for c in self.disables])
168         db_conn.delete_where('process_steps', 'owner', self.id_)
169         for step in self.explicit_steps:
170             step.save(db_conn)
171
172     def remove(self, db_conn: DatabaseConnection) -> None:
173         """Remove from DB, with dependencies."""
174         assert isinstance(self.id_, int)
175         db_conn.delete_where('process_conditions', 'process', self.id_)
176         db_conn.delete_where('process_enables', 'process', self.id_)
177         db_conn.delete_where('process_disables', 'process', self.id_)
178         for step in self.explicit_steps:
179             step.remove(db_conn)
180         super().remove(db_conn)
181
182
183 class ProcessStep(BaseModel[int]):
184     """Sub-unit of Processes."""
185     table_name = 'process_steps'
186     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
187
188     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
189                  parent_step_id: int | None) -> None:
190         super().__init__(id_)
191         self.owner_id = owner_id
192         self.step_process_id = step_process_id
193         self.parent_step_id = parent_step_id
194
195     def save(self, db_conn: DatabaseConnection) -> None:
196         """Default to simply calling self.save_core for simple cases."""
197         self.save_core(db_conn)
198
199     def remove(self, db_conn: DatabaseConnection) -> None:
200         """Remove from DB, and owner's .explicit_steps."""
201         owner = Process.by_id(db_conn, self.owner_id)
202         owner.explicit_steps.remove(self)
203         super().remove(db_conn)