home · contact · privacy
Remove more redundant code.
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Set
5 from plomtask.db import DatabaseConnection, BaseModel
6 from plomtask.misc import VersionedAttribute
7 from plomtask.conditions import Condition, ConditionsRelations
8 from plomtask.exceptions import NotFoundException, BadFormatException
9
10
11 @dataclass
12 class ProcessStepsNode:
13     """Collects what's useful to know for ProcessSteps tree display."""
14     process: Process
15     parent_id: int | None
16     is_explicit: bool
17     steps: dict[int, ProcessStepsNode]
18     seen: bool
19
20
21 class Process(BaseModel[int], ConditionsRelations):
22     """Template for, and metadata for, Todos, and their arrangements."""
23     table_name = 'processes'
24
25     # pylint: disable=too-many-instance-attributes
26
27     def __init__(self, id_: int | None) -> None:
28         self.set_int_id(id_)
29         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
30         self.description = VersionedAttribute(self, 'process_descriptions', '')
31         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
32         self.explicit_steps: list[ProcessStep] = []
33         self.conditions: list[Condition] = []
34         self.enables: list[Condition] = []
35         self.disables: list[Condition] = []
36
37     @classmethod
38     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
39               create: bool = False) -> Process:
40         """Collect Process, its VersionedAttributes, and its child IDs."""
41         process = None
42         from_cache = False
43         if id_:
44             process, from_cache = super()._by_id(db_conn, id_)
45         if not from_cache:
46             if not process:
47                 if not create:
48                     raise NotFoundException(f'Process not found of id: {id_}')
49                 process = Process(id_)
50             if isinstance(process.id_, int):
51                 for name in ('title', 'description', 'effort'):
52                     table = f'process_{name}s'
53                     for row in db_conn.row_where(table, 'parent', process.id_):
54                         getattr(process, name).history_from_row(row)
55                 for row in db_conn.row_where('process_steps', 'owner',
56                                              process.id_):
57                     step = ProcessStep.from_table_row(db_conn, row)
58                     process.explicit_steps += [step]
59                 for name in ('conditions', 'enables', 'disables'):
60                     table = f'process_{name}'
61                     for c_id in db_conn.column_where(table, 'condition',
62                                                      'process', process.id_):
63                         target = getattr(process, name)
64                         target += [Condition.by_id(db_conn, c_id)]
65         assert isinstance(process, Process)
66         return process
67
68     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
69         """Return Processes using self for a ProcessStep."""
70         if not self.id_:
71             return []
72         owner_ids = set()
73         for id_ in db_conn.column_where('process_steps', 'owner',
74                                         'step_process', self.id_):
75             owner_ids.add(id_)
76         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
77
78     def get_steps(self, db_conn: DatabaseConnection, external_owner:
79                   Process | None = None) -> dict[int, ProcessStepsNode]:
80         """Return tree of depended-on explicit and implicit ProcessSteps."""
81
82         def make_node(step: ProcessStep) -> ProcessStepsNode:
83             is_explicit = False
84             if external_owner is not None:
85                 is_explicit = step.owner_id == external_owner.id_
86             process = self.__class__.by_id(db_conn, step.step_process_id)
87             step_steps = process.get_steps(db_conn, external_owner)
88             return ProcessStepsNode(process, step.parent_step_id,
89                                     is_explicit, step_steps, False)
90
91         def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
92             explicit_children = [s for s in self.explicit_steps
93                                  if s.parent_step_id == node_id]
94             for child in explicit_children:
95                 assert isinstance(child.id_, int)
96                 node.steps[child.id_] = make_node(child)
97             node.seen = node_id in seen_step_ids
98             seen_step_ids.add(node_id)
99             for id_, step in node.steps.items():
100                 walk_steps(id_, step)
101
102         steps: dict[int, ProcessStepsNode] = {}
103         seen_step_ids: Set[int] = set()
104         if external_owner is None:
105             external_owner = self
106         for step in [s for s in self.explicit_steps
107                      if s.parent_step_id is None]:
108             assert isinstance(step.id_, int)
109             steps[step.id_] = make_node(step)
110         for step_id, step_node in steps.items():
111             walk_steps(step_id, step_node)
112         return steps
113
114     def _add_step(self,
115                   db_conn: DatabaseConnection,
116                   id_: int | None,
117                   step_process_id: int,
118                   parent_step_id: int | None) -> ProcessStep:
119         """Create new ProcessStep, save and add it to self.explicit_steps.
120
121         Also checks against step recursion.
122
123         The new step's parent_step_id will fall back to None either if no
124         matching ProcessStep is found (which can be assumed in case it was
125         just deleted under its feet), or if the parent step would not be
126         owned by the current Process.
127         """
128
129         def walk_steps(node: ProcessStep) -> None:
130             if node.step_process_id == self.id_:
131                 raise BadFormatException('bad step selection causes recursion')
132             step_process = self.by_id(db_conn, node.step_process_id)
133             for step in step_process.explicit_steps:
134                 walk_steps(step)
135
136         if parent_step_id is not None:
137             try:
138                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
139                 if parent_step.owner_id != self.id_:
140                     parent_step_id = None
141             except NotFoundException:
142                 parent_step_id = None
143         assert isinstance(self.id_, int)
144         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
145         walk_steps(step)
146         self.explicit_steps += [step]
147         step.save(db_conn)  # NB: This ensures a non-None step.id_.
148         return step
149
150     def set_steps(self, db_conn: DatabaseConnection,
151                   steps: list[tuple[int | None, int, int | None]]) -> None:
152         """Set self.explicit_steps in bulk."""
153         assert isinstance(self.id_, int)
154         for step in self.explicit_steps:
155             step.uncache()
156         self.explicit_steps = []
157         db_conn.delete_where('process_steps', 'owner', self.id_)
158         for step_tuple in steps:
159             self._add_step(db_conn, step_tuple[0],
160                            step_tuple[1], step_tuple[2])
161
162     def save(self, db_conn: DatabaseConnection) -> None:
163         """Add (or re-write) self and connected items to DB."""
164         self.save_core(db_conn)
165         assert isinstance(self.id_, int)
166         self.title.save(db_conn)
167         self.description.save(db_conn)
168         self.effort.save(db_conn)
169         db_conn.rewrite_relations('process_conditions', 'process', self.id_,
170                                   [[c.id_] for c in self.conditions])
171         db_conn.rewrite_relations('process_enables', 'process', self.id_,
172                                   [[c.id_] for c in self.enables])
173         db_conn.rewrite_relations('process_disables', 'process', self.id_,
174                                   [[c.id_] for c in self.disables])
175         db_conn.delete_where('process_steps', 'owner', self.id_)
176         for step in self.explicit_steps:
177             step.save(db_conn)
178
179
180 class ProcessStep(BaseModel[int]):
181     """Sub-unit of Processes."""
182     table_name = 'process_steps'
183     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
184
185     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
186                  parent_step_id: int | None) -> None:
187         self.set_int_id(id_)
188         self.owner_id = owner_id
189         self.step_process_id = step_process_id
190         self.parent_step_id = parent_step_id
191
192     @classmethod
193     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
194         """Retrieve ProcessStep by id_, or throw NotFoundException."""
195         step, _ = super()._by_id(db_conn, id_)
196         if step:
197             return step
198         raise NotFoundException(f'found no ProcessStep of ID {id_}')
199
200     def save(self, db_conn: DatabaseConnection) -> None:
201         """Default to simply calling self.save_core for simple cases."""
202         self.save_core(db_conn)