home · contact · privacy
Fix some ProcessStepping bugs.
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Set, Any
5 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.versioned_attributes import VersionedAttribute
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import (NotFoundException, BadFormatException,
10                                  HandledException)
11
12
13 @dataclass
14 class ProcessStepsNode:
15     """Collects what's useful to know for ProcessSteps tree display."""
16     process: Process
17     parent_id: int | None
18     is_explicit: bool
19     steps: dict[int, ProcessStepsNode]
20     seen: bool
21
22
23 class Process(BaseModel[int], ConditionsRelations):
24     """Template for, and metadata for, Todos, and their arrangements."""
25     # pylint: disable=too-many-instance-attributes
26     table_name = 'processes'
27     to_save = ['calendarize']
28     to_save_versioned = ['title', 'description', 'effort']
29     to_save_relations = [('process_conditions', 'process', 'conditions'),
30                          ('process_blockers', 'process', 'blockers'),
31                          ('process_enables', 'process', 'enables'),
32                          ('process_disables', 'process', 'disables')]
33     to_search = ['title.newest', 'description.newest']
34
35     def __init__(self, id_: int | None, calendarize: bool = False) -> None:
36         BaseModel.__init__(self, id_)
37         ConditionsRelations.__init__(self)
38         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
39         self.description = VersionedAttribute(self, 'process_descriptions', '')
40         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
41         self.explicit_steps: list[ProcessStep] = []
42         self.calendarize = calendarize
43
44     @classmethod
45     def from_table_row(cls, db_conn: DatabaseConnection,
46                        row: Row | list[Any]) -> Process:
47         """Make from DB row, with dependencies."""
48         process = super().from_table_row(db_conn, row)
49         assert isinstance(process.id_, int)
50         for name in ('title', 'description', 'effort'):
51             table = f'process_{name}s'
52             for row_ in db_conn.row_where(table, 'parent', process.id_):
53                 getattr(process, name).history_from_row(row_)
54         for row_ in db_conn.row_where('process_steps', 'owner',
55                                       process.id_):
56             step = ProcessStep.from_table_row(db_conn, row_)
57             process.explicit_steps += [step]  # pylint: disable=no-member
58         for name in ('conditions', 'blockers', 'enables', 'disables'):
59             table = f'process_{name}'
60             assert isinstance(process.id_, int)
61             for c_id in db_conn.column_where(table, 'condition',
62                                              'process', process.id_):
63                 target = getattr(process, name)
64                 target += [Condition.by_id(db_conn, c_id)]
65         return process
66
67     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
68         """Return Processes using self for a ProcessStep."""
69         if not self.id_:
70             return []
71         owner_ids = set()
72         for id_ in db_conn.column_where('process_steps', 'owner',
73                                         'step_process', self.id_):
74             owner_ids.add(id_)
75         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
76
77     def get_steps(self, db_conn: DatabaseConnection, external_owner:
78                   Process | None = None) -> dict[int, ProcessStepsNode]:
79         """Return tree of depended-on explicit and implicit ProcessSteps."""
80
81         def make_node(step: ProcessStep) -> ProcessStepsNode:
82             is_explicit = False
83             if external_owner is not None:
84                 is_explicit = step.owner_id == external_owner.id_
85             process = self.__class__.by_id(db_conn, step.step_process_id)
86             step_steps = process.get_steps(db_conn, external_owner)
87             return ProcessStepsNode(process, step.parent_step_id,
88                                     is_explicit, step_steps, False)
89
90         def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
91             explicit_children = [s for s in self.explicit_steps
92                                  if s.parent_step_id == node_id]
93             for child in explicit_children:
94                 assert isinstance(child.id_, int)
95                 node.steps[child.id_] = make_node(child)
96             node.seen = node_id in seen_step_ids
97             seen_step_ids.add(node_id)
98             for id_, step in node.steps.items():
99                 walk_steps(id_, step)
100
101         steps: dict[int, ProcessStepsNode] = {}
102         seen_step_ids: Set[int] = set()
103         if external_owner is None:
104             external_owner = self
105         for step in [s for s in self.explicit_steps
106                      if s.parent_step_id is None]:
107             assert isinstance(step.id_, int)
108             steps[step.id_] = make_node(step)
109         for step_id, step_node in steps.items():
110             walk_steps(step_id, step_node)
111         return steps
112
113     def _add_step(self,
114                   db_conn: DatabaseConnection,
115                   id_: int | None,
116                   step_process_id: int,
117                   parent_step_id: int | None) -> ProcessStep:
118         """Create new ProcessStep, save and add it to self.explicit_steps.
119
120         Also checks against step recursion.
121
122         The new step's parent_step_id will fall back to None either if no
123         matching ProcessStep is found (which can be assumed in case it was
124         just deleted under its feet), or if the parent step would not be
125         owned by the current Process.
126         """
127         def walk_steps(node: ProcessStep) -> None:
128             if node.step_process_id == self.id_:
129                 raise BadFormatException('bad step selection causes recursion')
130             step_process = self.by_id(db_conn, node.step_process_id)
131             for step in step_process.explicit_steps:
132                 walk_steps(step)
133
134         if parent_step_id is not None:
135             try:
136                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
137                 if parent_step.owner_id != self.id_:
138                     parent_step_id = None
139             except NotFoundException:
140                 parent_step_id = None
141         assert isinstance(self.id_, int)
142         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
143         walk_steps(step)
144         self.explicit_steps += [step]
145         step.save(db_conn)  # NB: This ensures a non-None step.id_.
146         return step
147
148     def set_steps(self, db_conn: DatabaseConnection,
149                   steps: list[tuple[int | None, int, int | None]]) -> None:
150         """Set self.explicit_steps in bulk."""
151         assert isinstance(self.id_, int)
152         for step in self.explicit_steps:
153             step.uncache()
154         self.explicit_steps = []
155         db_conn.delete_where('process_steps', 'owner', self.id_)
156         for step_tuple in steps:
157             self._add_step(db_conn, step_tuple[0],
158                            step_tuple[1], step_tuple[2])
159
160     def save(self, db_conn: DatabaseConnection) -> None:
161         """Add (or re-write) self and connected items to DB."""
162         super().save(db_conn)
163         assert isinstance(self.id_, int)
164         db_conn.delete_where('process_steps', 'owner', self.id_)
165         for step in self.explicit_steps:
166             step.save(db_conn)
167
168     def remove(self, db_conn: DatabaseConnection) -> None:
169         """Remove from DB, with dependencies.
170
171         Guard against removal of Processes in use.
172         """
173         assert isinstance(self.id_, int)
174         for _ in db_conn.row_where('process_steps', 'step_process', self.id_):
175             raise HandledException('cannot remove Process in use')
176         for _ in db_conn.row_where('todos', 'process', self.id_):
177             raise HandledException('cannot remove Process in use')
178         for step in self.explicit_steps:
179             step.remove(db_conn)
180         super().remove(db_conn)
181
182
183 class ProcessStep(BaseModel[int]):
184     """Sub-unit of Processes."""
185     table_name = 'process_steps'
186     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
187
188     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
189                  parent_step_id: int | None) -> None:
190         super().__init__(id_)
191         self.owner_id = owner_id
192         self.step_process_id = step_process_id
193         self.parent_step_id = parent_step_id
194
195     def remove(self, db_conn: DatabaseConnection) -> None:
196         """Remove from DB, and owner's .explicit_steps."""
197         owner = Process.by_id(db_conn, self.owner_id)
198         owner.explicit_steps.remove(self)
199         super().remove(db_conn)