home · contact · privacy
Fix Process retrieval/display/saving bugs.
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Set
5 from plomtask.db import DatabaseConnection, BaseModel
6 from plomtask.misc import VersionedAttribute
7 from plomtask.conditions import Condition, ConditionsRelations
8 from plomtask.exceptions import NotFoundException, BadFormatException
9
10
11 @dataclass
12 class ProcessStepsNode:
13     """Collects what's useful to know for ProcessSteps tree display."""
14     process: Process
15     parent_id: int | None
16     is_explicit: bool
17     steps: dict[int, ProcessStepsNode]
18     seen: bool
19
20
21 class Process(BaseModel, ConditionsRelations):
22     """Template for, and metadata for, Todos, and their arrangements."""
23     table_name = 'processes'
24
25     # pylint: disable=too-many-instance-attributes
26
27     def __init__(self, id_: int | None) -> None:
28         self.set_int_id(id_)
29         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
30         self.description = VersionedAttribute(self, 'process_descriptions', '')
31         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
32         self.explicit_steps: list[ProcessStep] = []
33         self.conditions: list[Condition] = []
34         self.enables: list[Condition] = []
35         self.disables: list[Condition] = []
36
37     @classmethod
38     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
39         """Collect all Processes and their connected VersionedAttributes."""
40         processes = {}
41         for id_, process in db_conn.cached_processes.items():
42             processes[id_] = process
43         already_recorded = processes.keys()
44         for id_ in db_conn.column_all('processes', 'id'):
45             if id_ not in already_recorded:
46                 process = cls.by_id(db_conn, id_)
47                 processes[process.id_] = process
48         return list(processes.values())
49
50     @classmethod
51     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
52               create: bool = False) -> Process:
53         """Collect Process, its VersionedAttributes, and its child IDs."""
54         process = None
55         from_cache = False
56         if id_:
57             process, from_cache = super()._by_id(db_conn, id_)
58         if not from_cache:
59             if not process:
60                 if not create:
61                     raise NotFoundException(f'Process not found of id: {id_}')
62                 process = Process(id_)
63             if isinstance(process.id_, int):
64                 for name in ('title', 'description', 'effort'):
65                     table = f'process_{name}s'
66                     for row in db_conn.row_where(table, 'parent', process.id_):
67                         getattr(process, name).history_from_row(row)
68                 for row in db_conn.row_where('process_steps', 'owner',
69                                              process.id_):
70                     step = ProcessStep.from_table_row(db_conn, row)
71                     process.explicit_steps += [step]
72                 for name in ('conditions', 'enables', 'disables'):
73                     table = f'process_{name}'
74                     for c_id in db_conn.column_where(table, 'condition',
75                                                      'process', process.id_):
76                         target = getattr(process, name)
77                         target += [Condition.by_id(db_conn, c_id)]
78         assert isinstance(process, Process)
79         return process
80
81     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
82         """Return Processes using self for a ProcessStep."""
83         if not self.id_:
84             return []
85         owner_ids = set()
86         for id_ in db_conn.column_where('process_steps', 'owner',
87                                         'step_process', self.id_):
88             owner_ids.add(id_)
89         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
90
91     def get_steps(self, db_conn: DatabaseConnection, external_owner:
92                   Process | None = None) -> dict[int, ProcessStepsNode]:
93         """Return tree of depended-on explicit and implicit ProcessSteps."""
94
95         def make_node(step: ProcessStep) -> ProcessStepsNode:
96             is_explicit = False
97             if external_owner is not None:
98                 is_explicit = step.owner_id == external_owner.id_
99             process = self.__class__.by_id(db_conn, step.step_process_id)
100             step_steps = process.get_steps(db_conn, external_owner)
101             return ProcessStepsNode(process, step.parent_step_id,
102                                     is_explicit, step_steps, False)
103
104         def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
105             explicit_children = [s for s in self.explicit_steps
106                                  if s.parent_step_id == node_id]
107             for child in explicit_children:
108                 assert isinstance(child.id_, int)
109                 node.steps[child.id_] = make_node(child)
110             node.seen = node_id in seen_step_ids
111             seen_step_ids.add(node_id)
112             for id_, step in node.steps.items():
113                 walk_steps(id_, step)
114
115         steps: dict[int, ProcessStepsNode] = {}
116         seen_step_ids: Set[int] = set()
117         if external_owner is None:
118             external_owner = self
119         for step in [s for s in self.explicit_steps
120                      if s.parent_step_id is None]:
121             assert isinstance(step.id_, int)
122             steps[step.id_] = make_node(step)
123         for step_id, step_node in steps.items():
124             walk_steps(step_id, step_node)
125         return steps
126
127     def _add_step(self,
128                   db_conn: DatabaseConnection,
129                   id_: int | None,
130                   step_process_id: int,
131                   parent_step_id: int | None) -> ProcessStep:
132         """Create new ProcessStep, save and add it to self.explicit_steps.
133
134         Also checks against step recursion.
135
136         The new step's parent_step_id will fall back to None either if no
137         matching ProcessStep is found (which can be assumed in case it was
138         just deleted under its feet), or if the parent step would not be
139         owned by the current Process.
140         """
141
142         def walk_steps(node: ProcessStep) -> None:
143             if node.step_process_id == self.id_:
144                 raise BadFormatException('bad step selection causes recursion')
145             step_process = self.by_id(db_conn, node.step_process_id)
146             for step in step_process.explicit_steps:
147                 walk_steps(step)
148
149         if parent_step_id is not None:
150             try:
151                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
152                 if parent_step.owner_id != self.id_:
153                     parent_step_id = None
154             except NotFoundException:
155                 parent_step_id = None
156         assert isinstance(self.id_, int)
157         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
158         walk_steps(step)
159         self.explicit_steps += [step]
160         step.save(db_conn)  # NB: This ensures a non-None step.id_.
161         return step
162
163     def set_steps(self, db_conn: DatabaseConnection,
164                   steps: list[tuple[int | None, int, int | None]]) -> None:
165         """Set self.explicit_steps in bulk."""
166         assert isinstance(self.id_, int)
167         for step in self.explicit_steps:
168             assert isinstance(step.id_, int)
169             del db_conn.cached_process_steps[step.id_]
170         self.explicit_steps = []
171         db_conn.delete_where('process_steps', 'owner', self.id_)
172         for step_tuple in steps:
173             self._add_step(db_conn, step_tuple[0],
174                            step_tuple[1], step_tuple[2])
175
176     def save(self, db_conn: DatabaseConnection) -> None:
177         """Add (or re-write) self and connected items to DB."""
178         self.save_core(db_conn)
179         assert isinstance(self.id_, int)
180         self.title.save(db_conn)
181         self.description.save(db_conn)
182         self.effort.save(db_conn)
183         db_conn.rewrite_relations('process_conditions', 'process', self.id_,
184                                   [[c.id_] for c in self.conditions])
185         db_conn.rewrite_relations('process_enables', 'process', self.id_,
186                                   [[c.id_] for c in self.enables])
187         db_conn.rewrite_relations('process_disables', 'process', self.id_,
188                                   [[c.id_] for c in self.disables])
189         db_conn.delete_where('process_steps', 'owner', self.id_)
190         for step in self.explicit_steps:
191             step.save(db_conn)
192         db_conn.cached_processes[self.id_] = self
193
194
195 class ProcessStep(BaseModel):
196     """Sub-unit of Processes."""
197     table_name = 'process_steps'
198     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
199
200     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
201                  parent_step_id: int | None) -> None:
202         self.set_int_id(id_)
203         self.owner_id = owner_id
204         self.step_process_id = step_process_id
205         self.parent_step_id = parent_step_id
206
207     @classmethod
208     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
209         """Retrieve ProcessStep by id_, or throw NotFoundException."""
210         step, _ = super()._by_id(db_conn, id_)
211         if step:
212             assert isinstance(step, ProcessStep)
213             return step
214         raise NotFoundException(f'found no ProcessStep of ID {id_}')
215
216     def save(self, db_conn: DatabaseConnection) -> None:
217         """Default to simply calling self.save_core for simple cases."""
218         self.save_core(db_conn)