home · contact · privacy
Make explicit ProcessSteps overwrite implicit ones.
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Set, Any
5 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.versioned_attributes import VersionedAttribute
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import (NotFoundException, BadFormatException,
10                                  HandledException)
11
12
13 @dataclass
14 class ProcessStepsNode:
15     """Collects what's useful to know for ProcessSteps tree display."""
16     process: Process
17     parent_id: int | None
18     is_explicit: bool
19     steps: dict[int, ProcessStepsNode]
20     seen: bool
21
22
23 class Process(BaseModel[int], ConditionsRelations):
24     """Template for, and metadata for, Todos, and their arrangements."""
25     # pylint: disable=too-many-instance-attributes
26     table_name = 'processes'
27     to_save = ['calendarize']
28     to_save_versioned = ['title', 'description', 'effort']
29     to_save_relations = [('process_conditions', 'process', 'conditions', 0),
30                          ('process_blockers', 'process', 'blockers', 0),
31                          ('process_enables', 'process', 'enables', 0),
32                          ('process_disables', 'process', 'disables', 0)]
33     to_search = ['title.newest', 'description.newest']
34
35     def __init__(self, id_: int | None, calendarize: bool = False) -> None:
36         BaseModel.__init__(self, id_)
37         ConditionsRelations.__init__(self)
38         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
39         self.description = VersionedAttribute(self, 'process_descriptions', '')
40         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
41         self.explicit_steps: list[ProcessStep] = []
42         self.calendarize = calendarize
43
44     @classmethod
45     def from_table_row(cls, db_conn: DatabaseConnection,
46                        row: Row | list[Any]) -> Process:
47         """Make from DB row, with dependencies."""
48         process = super().from_table_row(db_conn, row)
49         assert isinstance(process.id_, int)
50         for name in ('title', 'description', 'effort'):
51             table = f'process_{name}s'
52             for row_ in db_conn.row_where(table, 'parent', process.id_):
53                 getattr(process, name).history_from_row(row_)
54         for row_ in db_conn.row_where('process_steps', 'owner',
55                                       process.id_):
56             step = ProcessStep.from_table_row(db_conn, row_)
57             process.explicit_steps += [step]  # pylint: disable=no-member
58         for name in ('conditions', 'blockers', 'enables', 'disables'):
59             table = f'process_{name}'
60             assert isinstance(process.id_, int)
61             for c_id in db_conn.column_where(table, 'condition',
62                                              'process', process.id_):
63                 target = getattr(process, name)
64                 target += [Condition.by_id(db_conn, c_id)]
65         return process
66
67     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
68         """Return Processes using self for a ProcessStep."""
69         if not self.id_:
70             return []
71         owner_ids = set()
72         for id_ in db_conn.column_where('process_steps', 'owner',
73                                         'step_process', self.id_):
74             owner_ids.add(id_)
75         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
76
77     def get_steps(self, db_conn: DatabaseConnection, external_owner:
78                   Process | None = None) -> dict[int, ProcessStepsNode]:
79         """Return tree of depended-on explicit and implicit ProcessSteps."""
80
81         def make_node(step: ProcessStep) -> ProcessStepsNode:
82             is_explicit = False
83             if external_owner is not None:
84                 is_explicit = step.owner_id == external_owner.id_
85             process = self.__class__.by_id(db_conn, step.step_process_id)
86             step_steps = process.get_steps(db_conn, external_owner)
87             return ProcessStepsNode(process, step.parent_step_id,
88                                     is_explicit, step_steps, False)
89
90         def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
91             explicit_children = [s for s in self.explicit_steps
92                                  if s.parent_step_id == node_id]
93             for child in explicit_children:
94                 assert isinstance(child.id_, int)
95                 node.steps[child.id_] = make_node(child)
96                 # ensure that one (!) explicit step of process replaces
97                 # one (!) implicit step of same process
98                 for i in [i for i, s in node.steps.items()
99                           if not s.is_explicit
100                           and s.process.id_ == child.step_process_id]:
101                     del node.steps[i]
102                     break
103             node.seen = node_id in seen_step_ids
104             seen_step_ids.add(node_id)
105             for id_, step in node.steps.items():
106                 walk_steps(id_, step)
107
108         steps: dict[int, ProcessStepsNode] = {}
109         seen_step_ids: Set[int] = set()
110         if external_owner is None:
111             external_owner = self
112         for step in [s for s in self.explicit_steps
113                      if s.parent_step_id is None]:
114             assert isinstance(step.id_, int)
115             steps[step.id_] = make_node(step)
116         for step_id, step_node in steps.items():
117             walk_steps(step_id, step_node)
118         return steps
119
120     def _add_step(self,
121                   db_conn: DatabaseConnection,
122                   id_: int | None,
123                   step_process_id: int,
124                   parent_step_id: int | None) -> ProcessStep:
125         """Create new ProcessStep, save and add it to self.explicit_steps.
126
127         Also checks against step recursion.
128
129         The new step's parent_step_id will fall back to None either if no
130         matching ProcessStep is found (which can be assumed in case it was
131         just deleted under its feet), or if the parent step would not be
132         owned by the current Process.
133         """
134         def walk_steps(node: ProcessStep) -> None:
135             if node.step_process_id == self.id_:
136                 raise BadFormatException('bad step selection causes recursion')
137             step_process = self.by_id(db_conn, node.step_process_id)
138             for step in step_process.explicit_steps:
139                 walk_steps(step)
140
141         if parent_step_id is not None:
142             try:
143                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
144                 if parent_step.owner_id != self.id_:
145                     parent_step_id = None
146             except NotFoundException:
147                 parent_step_id = None
148         assert isinstance(self.id_, int)
149         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
150         walk_steps(step)
151         self.explicit_steps += [step]
152         step.save(db_conn)  # NB: This ensures a non-None step.id_.
153         return step
154
155     def set_steps(self, db_conn: DatabaseConnection,
156                   steps: list[tuple[int | None, int, int | None]]) -> None:
157         """Set self.explicit_steps in bulk."""
158         assert isinstance(self.id_, int)
159         for step in self.explicit_steps:
160             step.uncache()
161         self.explicit_steps = []
162         db_conn.delete_where('process_steps', 'owner', self.id_)
163         for step_tuple in steps:
164             self._add_step(db_conn, step_tuple[0],
165                            step_tuple[1], step_tuple[2])
166
167     def save(self, db_conn: DatabaseConnection) -> None:
168         """Add (or re-write) self and connected items to DB."""
169         super().save(db_conn)
170         assert isinstance(self.id_, int)
171         db_conn.delete_where('process_steps', 'owner', self.id_)
172         for step in self.explicit_steps:
173             step.save(db_conn)
174
175     def remove(self, db_conn: DatabaseConnection) -> None:
176         """Remove from DB, with dependencies.
177
178         Guard against removal of Processes in use.
179         """
180         assert isinstance(self.id_, int)
181         for _ in db_conn.row_where('process_steps', 'step_process', self.id_):
182             raise HandledException('cannot remove Process in use')
183         for _ in db_conn.row_where('todos', 'process', self.id_):
184             raise HandledException('cannot remove Process in use')
185         for step in self.explicit_steps:
186             step.remove(db_conn)
187         super().remove(db_conn)
188
189
190 class ProcessStep(BaseModel[int]):
191     """Sub-unit of Processes."""
192     table_name = 'process_steps'
193     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
194
195     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
196                  parent_step_id: int | None) -> None:
197         super().__init__(id_)
198         self.owner_id = owner_id
199         self.step_process_id = step_process_id
200         self.parent_step_id = parent_step_id
201
202     def remove(self, db_conn: DatabaseConnection) -> None:
203         """Remove from DB, and owner's .explicit_steps."""
204         owner = Process.by_id(db_conn, self.owner_id)
205         owner.explicit_steps.remove(self)
206         super().remove(db_conn)