home · contact · privacy
Add text-based search/filter for Conditions and Processes.
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Set, Any
5 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.versioned_attributes import VersionedAttribute
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import (NotFoundException, BadFormatException,
10                                  HandledException)
11
12
13 @dataclass
14 class ProcessStepsNode:
15     """Collects what's useful to know for ProcessSteps tree display."""
16     process: Process
17     parent_id: int | None
18     is_explicit: bool
19     steps: dict[int, ProcessStepsNode]
20     seen: bool
21
22
23 class Process(BaseModel[int], ConditionsRelations):
24     """Template for, and metadata for, Todos, and their arrangements."""
25     # pylint: disable=too-many-instance-attributes
26     table_name = 'processes'
27     to_save = ['calendarize']
28     to_save_versioned = ['title', 'description', 'effort']
29     to_save_relations = [('process_conditions', 'process', 'conditions'),
30                          ('process_blockers', 'process', 'blockers'),
31                          ('process_enables', 'process', 'enables'),
32                          ('process_disables', 'process', 'disables')]
33     to_search = ['title.newest', 'description.newest']
34
35     def __init__(self, id_: int | None, calendarize: bool = False) -> None:
36         BaseModel.__init__(self, id_)
37         ConditionsRelations.__init__(self)
38         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
39         self.description = VersionedAttribute(self, 'process_descriptions', '')
40         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
41         self.explicit_steps: list[ProcessStep] = []
42         self.calendarize = calendarize
43
44     @classmethod
45     def from_table_row(cls, db_conn: DatabaseConnection,
46                        row: Row | list[Any]) -> Process:
47         """Make from DB row, with dependencies."""
48         process = super().from_table_row(db_conn, row)
49         assert isinstance(process.id_, int)
50         for name in ('title', 'description', 'effort'):
51             table = f'process_{name}s'
52             for row_ in db_conn.row_where(table, 'parent', process.id_):
53                 getattr(process, name).history_from_row(row_)
54         for row_ in db_conn.row_where('process_steps', 'owner',
55                                       process.id_):
56             step = ProcessStep.from_table_row(db_conn, row_)
57             process.explicit_steps += [step]  # pylint: disable=no-member
58         for name in ('conditions', 'blockers', 'enables', 'disables'):
59             table = f'process_{name}'
60             assert isinstance(process.id_, int)
61             for c_id in db_conn.column_where(table, 'condition',
62                                              'process', process.id_):
63                 target = getattr(process, name)
64                 target += [Condition.by_id(db_conn, c_id)]
65         return process
66
67     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
68         """Return Processes using self for a ProcessStep."""
69         if not self.id_:
70             return []
71         owner_ids = set()
72         for id_ in db_conn.column_where('process_steps', 'owner',
73                                         'step_process', self.id_):
74             owner_ids.add(id_)
75         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
76
77     def get_steps(self, db_conn: DatabaseConnection, external_owner:
78                   Process | None = None) -> dict[int, ProcessStepsNode]:
79         """Return tree of depended-on explicit and implicit ProcessSteps."""
80
81         def make_node(step: ProcessStep) -> ProcessStepsNode:
82             is_explicit = False
83             if external_owner is not None:
84                 is_explicit = step.owner_id == external_owner.id_
85             process = self.__class__.by_id(db_conn, step.step_process_id)
86             step_steps = process.get_steps(db_conn, external_owner)
87             return ProcessStepsNode(process, step.parent_step_id,
88                                     is_explicit, step_steps, False)
89
90         def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
91             explicit_children = [s for s in self.explicit_steps
92                                  if s.parent_step_id == node_id]
93             for child in explicit_children:
94                 assert isinstance(child.id_, int)
95                 node.steps[child.id_] = make_node(child)
96             node.seen = node_id in seen_step_ids
97             seen_step_ids.add(node_id)
98             for id_, step in node.steps.items():
99                 walk_steps(id_, step)
100
101         steps: dict[int, ProcessStepsNode] = {}
102         seen_step_ids: Set[int] = set()
103         if external_owner is None:
104             external_owner = self
105         for step in [s for s in self.explicit_steps
106                      if s.parent_step_id is None]:
107             assert isinstance(step.id_, int)
108             steps[step.id_] = make_node(step)
109         for step_id, step_node in steps.items():
110             walk_steps(step_id, step_node)
111         return steps
112
113     def _add_step(self,
114                   db_conn: DatabaseConnection,
115                   id_: int | None,
116                   step_process_id: int,
117                   parent_step_id: int | None) -> ProcessStep:
118         """Create new ProcessStep, save and add it to self.explicit_steps.
119
120         Also checks against step recursion.
121
122         The new step's parent_step_id will fall back to None either if no
123         matching ProcessStep is found (which can be assumed in case it was
124         just deleted under its feet), or if the parent step would not be
125         owned by the current Process.
126         """
127
128         def walk_steps(node: ProcessStep) -> None:
129             if node.step_process_id == self.id_:
130                 raise BadFormatException('bad step selection causes recursion')
131             step_process = self.by_id(db_conn, node.step_process_id)
132             for step in step_process.explicit_steps:
133                 walk_steps(step)
134
135         if parent_step_id is not None:
136             try:
137                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
138                 if parent_step.owner_id != self.id_:
139                     parent_step_id = None
140             except NotFoundException:
141                 parent_step_id = None
142         assert isinstance(self.id_, int)
143         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
144         walk_steps(step)
145         self.explicit_steps += [step]
146         step.save(db_conn)  # NB: This ensures a non-None step.id_.
147         return step
148
149     def set_steps(self, db_conn: DatabaseConnection,
150                   steps: list[tuple[int | None, int, int | None]]) -> None:
151         """Set self.explicit_steps in bulk."""
152         assert isinstance(self.id_, int)
153         for step in self.explicit_steps:
154             step.uncache()
155         self.explicit_steps = []
156         db_conn.delete_where('process_steps', 'owner', self.id_)
157         for step_tuple in steps:
158             self._add_step(db_conn, step_tuple[0],
159                            step_tuple[1], step_tuple[2])
160
161     def save(self, db_conn: DatabaseConnection) -> None:
162         """Add (or re-write) self and connected items to DB."""
163         super().save(db_conn)
164         assert isinstance(self.id_, int)
165         db_conn.delete_where('process_steps', 'owner', self.id_)
166         for step in self.explicit_steps:
167             step.save(db_conn)
168
169     def remove(self, db_conn: DatabaseConnection) -> None:
170         """Remove from DB, with dependencies.
171
172         Guard against removal of Processes in use.
173         """
174         assert isinstance(self.id_, int)
175         for _ in db_conn.row_where('process_steps', 'step_process', self.id_):
176             raise HandledException('cannot remove Process in use')
177         for _ in db_conn.row_where('todos', 'process', self.id_):
178             raise HandledException('cannot remove Process in use')
179         for step in self.explicit_steps:
180             step.remove(db_conn)
181         super().remove(db_conn)
182
183
184 class ProcessStep(BaseModel[int]):
185     """Sub-unit of Processes."""
186     table_name = 'process_steps'
187     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
188
189     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
190                  parent_step_id: int | None) -> None:
191         super().__init__(id_)
192         self.owner_id = owner_id
193         self.step_process_id = step_process_id
194         self.parent_step_id = parent_step_id
195
196     def remove(self, db_conn: DatabaseConnection) -> None:
197         """Remove from DB, and owner's .explicit_steps."""
198         owner = Process.by_id(db_conn, self.owner_id)
199         owner.explicit_steps.remove(self)
200         super().remove(db_conn)