home · contact · privacy
Add TaskHandler code to actually make previous commit work.
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from typing import Set, Any
4 from sqlite3 import Row
5 from plomtask.misc import DictableNode
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.versioned_attributes import VersionedAttribute
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import (NotFoundException, BadFormatException,
10                                  HandledException)
11
12
13 class ProcessStepsNode(DictableNode):
14     """Collects what's useful to know for ProcessSteps tree display."""
15     # pylint: disable=too-few-public-methods
16     step: ProcessStep
17     process: Process
18     is_explicit: bool
19     steps: list[ProcessStepsNode]
20     seen: bool = False
21     is_suppressed: bool = False
22     _to_dict = ['step', 'process', 'is_explicit', 'steps', 'seen',
23                 'is_suppressed']
24
25
26 class Process(BaseModel[int], ConditionsRelations):
27     """Template for, and metadata for, Todos, and their arrangements."""
28     # pylint: disable=too-many-instance-attributes
29     table_name = 'processes'
30     to_save_simples = ['calendarize']
31     to_save_relations = [('process_conditions', 'process', 'conditions', 0),
32                          ('process_blockers', 'process', 'blockers', 0),
33                          ('process_enables', 'process', 'enables', 0),
34                          ('process_disables', 'process', 'disables', 0),
35                          ('process_step_suppressions', 'process',
36                           'suppressed_steps', 0)]
37     add_to_dict = ['explicit_steps']
38     versioned_defaults = {'title': 'UNNAMED', 'description': '', 'effort': 1.0}
39     to_search = ['title.newest', 'description.newest']
40     can_create_by_id = True
41     sorters = {'steps': lambda p: len(p.explicit_steps),
42                'owners': lambda p: p.n_owners,
43                'effort': lambda p: p.effort.newest,
44                'title': lambda p: p.title.newest}
45
46     def __init__(self, id_: int | None, calendarize: bool = False) -> None:
47         BaseModel.__init__(self, id_)
48         ConditionsRelations.__init__(self)
49         for name in ['title', 'description', 'effort']:
50             attr = VersionedAttribute(self, f'process_{name}s',
51                                       self.versioned_defaults[name])
52             setattr(self, name, attr)
53         self.explicit_steps: list[ProcessStep] = []
54         self.suppressed_steps: list[ProcessStep] = []
55         self.calendarize = calendarize
56         self.n_owners: int | None = None  # only set by from_table_row
57
58     @classmethod
59     def from_table_row(cls, db_conn: DatabaseConnection,
60                        row: Row | list[Any]) -> Process:
61         """Make from DB row, with dependencies."""
62         process = super().from_table_row(db_conn, row)
63         assert process.id_ is not None
64         for name in ('conditions', 'blockers', 'enables', 'disables'):
65             table = f'process_{name}'
66             assert isinstance(process.id_, int)
67             for c_id in db_conn.column_where(table, 'condition',
68                                              'process', process.id_):
69                 target = getattr(process, name)
70                 target += [Condition.by_id(db_conn, c_id)]
71         for row_ in db_conn.row_where('process_steps', 'owner', process.id_):
72             step = ProcessStep.from_table_row(db_conn, row_)
73             process.explicit_steps += [step]
74         for row_ in db_conn.row_where('process_step_suppressions', 'process',
75                                       process.id_):
76             step = ProcessStep.by_id(db_conn, row_[1])
77             process.suppressed_steps += [step]
78         process.n_owners = len(process.used_as_step_by(db_conn))
79         return process
80
81     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
82         """Return Processes using self for a ProcessStep."""
83         if not self.id_:
84             return []
85         owner_ids = set()
86         for id_ in db_conn.column_where('process_steps', 'owner',
87                                         'step_process', self.id_):
88             owner_ids.add(id_)
89         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
90
91     def get_steps(self, db_conn: DatabaseConnection, external_owner:
92                   Process | None = None) -> list[ProcessStepsNode]:
93         """Return tree of depended-on explicit and implicit ProcessSteps."""
94
95         def make_node(step: ProcessStep, suppressed: bool) -> ProcessStepsNode:
96             is_explicit = False
97             if external_owner is not None:
98                 is_explicit = step.owner_id == external_owner.id_
99             process = self.__class__.by_id(db_conn, step.step_process_id)
100             step_steps = []
101             if not suppressed:
102                 step_steps = process.get_steps(db_conn, external_owner)
103             return ProcessStepsNode(step, process, is_explicit, step_steps,
104                                     False, suppressed)
105
106         def walk_steps(node: ProcessStepsNode) -> None:
107             node.seen = node.step.id_ in seen_step_ids
108             assert isinstance(node.step.id_, int)
109             seen_step_ids.add(node.step.id_)
110             if node.is_suppressed:
111                 return
112             explicit_children = [s for s in self.explicit_steps
113                                  if s.parent_step_id == node.step.id_]
114             for child in explicit_children:
115                 node.steps += [make_node(child, False)]
116             for step in node.steps:
117                 walk_steps(step)
118
119         step_nodes: list[ProcessStepsNode] = []
120         seen_step_ids: Set[int] = set()
121         if external_owner is None:
122             external_owner = self
123         for step in [s for s in self.explicit_steps
124                      if s.parent_step_id is None]:
125             assert isinstance(step.id_, int)
126             new_node = make_node(step, step in external_owner.suppressed_steps)
127             step_nodes += [new_node]
128         for step_node in step_nodes:
129             walk_steps(step_node)
130         return step_nodes
131
132     def set_step_relations(self,
133                            db_conn: DatabaseConnection,
134                            owners: list[int],
135                            suppressions: list[int],
136                            owned_steps: list[ProcessStep]
137                            ) -> None:
138         """Set step owners, suppressions, and owned steps."""
139         self._set_owners(db_conn, owners)
140         self._set_step_suppressions(db_conn, suppressions)
141         self.set_steps(db_conn, owned_steps)
142
143     def _set_step_suppressions(self,
144                                db_conn: DatabaseConnection,
145                                step_ids: list[int]
146                                ) -> None:
147         """Set self.suppressed_steps from step_ids."""
148         assert isinstance(self.id_, int)
149         db_conn.delete_where('process_step_suppressions', 'process', self.id_)
150         self.suppressed_steps = [ProcessStep.by_id(db_conn, s)
151                                  for s in step_ids]
152
153     def _set_owners(self,
154                     db_conn: DatabaseConnection,
155                     owner_ids: list[int]
156                     ) -> None:
157         """Re-set owners to those identified in owner_ids."""
158         owners_old = self.used_as_step_by(db_conn)
159         losers = [o for o in owners_old if o.id_ not in owner_ids]
160         owners_old_ids = [o.id_ for o in owners_old]
161         winners = [Process.by_id(db_conn, id_) for id_ in owner_ids
162                    if id_ not in owners_old_ids]
163         steps_to_remove = []
164         for loser in losers:
165             steps_to_remove += [s for s in loser.explicit_steps
166                                 if s.step_process_id == self.id_]
167         for step in steps_to_remove:
168             step.remove(db_conn)
169         for winner in winners:
170             assert isinstance(winner.id_, int)
171             assert isinstance(self.id_, int)
172             new_step = ProcessStep(None, winner.id_, self.id_, None)
173             new_explicit_steps = winner.explicit_steps + [new_step]
174             winner.set_steps(db_conn, new_explicit_steps)
175
176     def set_steps(self,
177                   db_conn: DatabaseConnection,
178                   steps: list[ProcessStep]
179                   ) -> None:
180         """Set self.explicit_steps in bulk.
181
182         Checks against recursion, and turns into top-level steps any of
183         unknown or non-owned parent.
184         """
185         def walk_steps(node: ProcessStep) -> None:
186             if node.step_process_id == self.id_:
187                 raise BadFormatException('bad step selection causes recursion')
188             step_process = self.by_id(db_conn, node.step_process_id)
189             for step in step_process.explicit_steps:
190                 walk_steps(step)
191
192         assert isinstance(self.id_, int)
193         for step in [s for s in self.explicit_steps if s not in steps]:
194             step.remove(db_conn)
195         for step in [s for s in steps if s not in self.explicit_steps]:
196             if step.parent_step_id is not None:
197                 try:
198                     parent_step = ProcessStep.by_id(db_conn,
199                                                     step.parent_step_id)
200                     if parent_step.owner_id != self.id_:
201                         step.parent_step_id = None
202                 except NotFoundException:
203                     step.parent_step_id = None
204             walk_steps(step)
205             step.save(db_conn)
206
207     def save(self, db_conn: DatabaseConnection) -> None:
208         """Add (or re-write) self and connected items to DB."""
209         super().save(db_conn)
210         assert isinstance(self.id_, int)
211         db_conn.delete_where('process_steps', 'owner', self.id_)
212         for step in self.explicit_steps:
213             step.save(db_conn)
214
215     def remove(self, db_conn: DatabaseConnection) -> None:
216         """Remove from DB, with dependencies.
217
218         Guard against removal of Processes in use.
219         """
220         assert isinstance(self.id_, int)
221         for _ in db_conn.row_where('process_steps', 'step_process', self.id_):
222             raise HandledException('cannot remove Process in use')
223         for _ in db_conn.row_where('todos', 'process', self.id_):
224             raise HandledException('cannot remove Process in use')
225         for step in self.explicit_steps:
226             step.remove(db_conn)
227         super().remove(db_conn)
228
229
230 class ProcessStep(BaseModel[int]):
231     """Sub-unit of Processes."""
232     table_name = 'process_steps'
233     to_save_simples = ['owner_id', 'step_process_id', 'parent_step_id']
234
235     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
236                  parent_step_id: int | None) -> None:
237         super().__init__(id_)
238         self.owner_id = owner_id
239         self.step_process_id = step_process_id
240         self.parent_step_id = parent_step_id
241
242     def save(self, db_conn: DatabaseConnection) -> None:
243         """Update into DB/cache, and owner's .explicit_steps."""
244         super().save(db_conn)
245         owner = Process.by_id(db_conn, self.owner_id)
246         if self not in owner.explicit_steps:
247             for s in [s for s in owner.explicit_steps if s.id_ == self.id_]:
248                 s.remove(db_conn)
249             owner.explicit_steps += [self]
250         owner.explicit_steps.sort(key=hash)
251
252     def remove(self, db_conn: DatabaseConnection) -> None:
253         """Remove from DB, and owner's .explicit_steps."""
254         owner = Process.by_id(db_conn, self.owner_id)
255         owner.explicit_steps.remove(self)
256         super().remove(db_conn)