home · contact · privacy
e5851d0e0d5586215d6521e495a25da95f360e43
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from typing import Any, Set
4 from plomtask.db import DatabaseConnection, BaseModel
5 from plomtask.misc import VersionedAttribute
6 from plomtask.conditions import Condition
7 from plomtask.exceptions import NotFoundException, BadFormatException
8
9
10 class Process(BaseModel):
11     """Template for, and metadata for, Todos, and their arrangements."""
12     table_name = 'processes'
13
14     # pylint: disable=too-many-instance-attributes
15
16     def __init__(self, id_: int | None) -> None:
17         self.set_int_id(id_)
18         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
19         self.description = VersionedAttribute(self, 'process_descriptions', '')
20         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
21         self.explicit_steps: list[ProcessStep] = []
22         self.conditions: list[Condition] = []
23         self.fulfills: list[Condition] = []
24         self.undoes: list[Condition] = []
25
26     @classmethod
27     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
28         """Collect all Processes and their connected VersionedAttributes."""
29         processes = {}
30         for id_, process in db_conn.cached_processes.items():
31             processes[id_] = process
32         already_recorded = processes.keys()
33         for row in db_conn.exec('SELECT id FROM processes'):
34             if row[0] not in already_recorded:
35                 process = cls.by_id(db_conn, row[0])
36                 processes[process.id_] = process
37         return list(processes.values())
38
39     @classmethod
40     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
41               create: bool = False) -> Process:
42         """Collect Process, its VersionedAttributes, and its child IDs."""
43         process = None
44         if id_:
45             process, _ = super()._by_id(db_conn, id_)
46         if not process:
47             if not create:
48                 raise NotFoundException(f'Process not found of id: {id_}')
49             process = Process(id_)
50         if isinstance(process.id_, int):
51             for name in ('title', 'description', 'effort'):
52                 table = f'process_{name}s'
53                 for row in db_conn.all_where(table, 'parent', process.id_):
54                     getattr(process, name).history_from_row(row)
55             for row in db_conn.all_where('process_steps', 'owner',
56                                          process.id_):
57                 step = ProcessStep.from_table_row(db_conn, row)
58                 process.explicit_steps += [step]
59             for name in ('conditions', 'fulfills', 'undoes'):
60                 table = f'process_{name}'
61                 for row in db_conn.all_where(table, 'process', process.id_):
62                     target = getattr(process, name)
63                     target += [Condition.by_id(db_conn, row[1])]
64         assert isinstance(process, Process)
65         return process
66
67     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
68         """Return Processes using self for a ProcessStep."""
69         owner_ids = set()
70         for owner_id in db_conn.exec('SELECT owner FROM process_steps WHERE'
71                                      ' step_process = ?', (self.id_,)):
72             owner_ids.add(owner_id[0])
73         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
74
75     def get_steps(self, db_conn: DatabaseConnection, external_owner:
76                   Process | None = None) -> dict[int, dict[str, object]]:
77         """Return tree of depended-on explicit and implicit ProcessSteps."""
78
79         def make_node(step: ProcessStep) -> dict[str, object]:
80             is_explicit = False
81             if external_owner is not None:
82                 is_explicit = step.owner_id == external_owner.id_
83             process = self.__class__.by_id(db_conn, step.step_process_id)
84             step_steps = process.get_steps(db_conn, external_owner)
85             return {'process': process, 'parent_id': step.parent_step_id,
86                     'is_explicit': is_explicit, 'steps': step_steps}
87
88         def walk_steps(node_id: int, node: dict[str, Any]) -> None:
89             explicit_children = [s for s in self.explicit_steps
90                                  if s.parent_step_id == node_id]
91             for child in explicit_children:
92                 node['steps'][child.id_] = make_node(child)
93             node['seen'] = node_id in seen_step_ids
94             seen_step_ids.add(node_id)
95             for id_, step in node['steps'].items():
96                 walk_steps(id_, step)
97
98         steps: dict[int, dict[str, object]] = {}
99         seen_step_ids: Set[int] = set()
100         if external_owner is None:
101             external_owner = self
102         for step in [s for s in self.explicit_steps
103                      if s.parent_step_id is None]:
104             assert isinstance(step.id_, int)
105             steps[step.id_] = make_node(step)
106         for step_id, step_node in steps.items():
107             walk_steps(step_id, step_node)
108         return steps
109
110     def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
111                        trgt: str = 'conditions') -> None:
112         """Set self.[target] to Conditions identified by ids."""
113         trgt_list = getattr(self, trgt)
114         while len(trgt_list) > 0:
115             trgt_list.pop()
116         for id_ in ids:
117             trgt_list += [Condition.by_id(db_conn, id_)]
118
119     def set_fulfills(self, db_conn: DatabaseConnection,
120                      ids: list[int]) -> None:
121         """Set self.fulfills to Conditions identified by ids."""
122         self.set_conditions(db_conn, ids, 'fulfills')
123
124     def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
125         """Set self.undoes to Conditions identified by ids."""
126         self.set_conditions(db_conn, ids, 'undoes')
127
128     def _add_step(self,
129                   db_conn: DatabaseConnection,
130                   id_: int | None,
131                   step_process_id: int,
132                   parent_step_id: int | None) -> ProcessStep:
133         """Create new ProcessStep, save and add it to self.explicit_steps.
134
135         Also checks against step recursion.
136
137         The new step's parent_step_id will fall back to None either if no
138         matching ProcessStep is found (which can be assumed in case it was
139         just deleted under its feet), or if the parent step would not be
140         owned by the current Process.
141         """
142         def walk_steps(node: ProcessStep) -> None:
143             if node.step_process_id == self.id_:
144                 raise BadFormatException('bad step selection causes recursion')
145             step_process = self.by_id(db_conn, node.step_process_id)
146             for step in step_process.explicit_steps:
147                 walk_steps(step)
148         if parent_step_id is not None:
149             try:
150                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
151                 if parent_step.owner_id != self.id_:
152                     parent_step_id = None
153             except NotFoundException:
154                 parent_step_id = None
155         assert isinstance(self.id_, int)
156         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
157         walk_steps(step)
158         self.explicit_steps += [step]
159         step.save(db_conn)  # NB: This ensures a non-None step.id_.
160         return step
161
162     def set_steps(self, db_conn: DatabaseConnection,
163                   steps: list[tuple[int | None, int, int | None]]) -> None:
164         """Set self.explicit_steps in bulk."""
165         for step in self.explicit_steps:
166             assert isinstance(step.id_, int)
167             del db_conn.cached_process_steps[step.id_]
168         self.explicit_steps = []
169         db_conn.exec('DELETE FROM process_steps WHERE owner = ?',
170                      (self.id_,))
171         for step_tuple in steps:
172             self._add_step(db_conn, step_tuple[0],
173                            step_tuple[1], step_tuple[2])
174
175     def save(self, db_conn: DatabaseConnection) -> None:
176         """Add (or re-write) self and connected items to DB."""
177         self.save_core(db_conn)
178         assert isinstance(self.id_, int)
179         self.title.save(db_conn)
180         self.description.save(db_conn)
181         self.effort.save(db_conn)
182         db_conn.rewrite_relations('process_conditions', 'process', self.id_,
183                                   [[c.id_] for c in self.conditions])
184         db_conn.rewrite_relations('process_fulfills', 'process', self.id_,
185                                   [[c.id_] for c in self.fulfills])
186         db_conn.rewrite_relations('process_undoes', 'process', self.id_,
187                                   [[c.id_] for c in self.undoes])
188         db_conn.exec('DELETE FROM process_steps WHERE owner = ?',
189                      (self.id_,))
190         for step in self.explicit_steps:
191             step.save(db_conn)
192         db_conn.cached_processes[self.id_] = self
193
194
195 class ProcessStep(BaseModel):
196     """Sub-unit of Processes."""
197     table_name = 'process_steps'
198     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
199
200     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
201                  parent_step_id: int | None) -> None:
202         self.set_int_id(id_)
203         self.owner_id = owner_id
204         self.step_process_id = step_process_id
205         self.parent_step_id = parent_step_id
206
207     @classmethod
208     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
209         """Retrieve ProcessStep by id_, or throw NotFoundException."""
210         step, _ = super()._by_id(db_conn, id_)
211         if step:
212             assert isinstance(step, ProcessStep)
213             return step
214         raise NotFoundException(f'found no ProcessStep of ID {id_}')
215
216     def save(self, db_conn: DatabaseConnection) -> None:
217         """Default to simply calling self.save_core for simple cases."""
218         self.save_core(db_conn)