1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from typing import Any, Set
4 from plomtask.db import DatabaseConnection, BaseModel
5 from plomtask.misc import VersionedAttribute
6 from plomtask.conditions import Condition
7 from plomtask.exceptions import NotFoundException, BadFormatException
10 class Process(BaseModel):
11 """Template for, and metadata for, Todos, and their arrangements."""
12 table_name = 'processes'
14 # pylint: disable=too-many-instance-attributes
16 def __init__(self, id_: int | None) -> None:
18 self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
19 self.description = VersionedAttribute(self, 'process_descriptions', '')
20 self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
21 self.explicit_steps: list[ProcessStep] = []
22 self.conditions: list[Condition] = []
23 self.fulfills: list[Condition] = []
24 self.undoes: list[Condition] = []
27 def all(cls, db_conn: DatabaseConnection) -> list[Process]:
28 """Collect all Processes and their connected VersionedAttributes."""
30 for id_, process in db_conn.cached_processes.items():
31 processes[id_] = process
32 already_recorded = processes.keys()
33 for row in db_conn.exec('SELECT id FROM processes'):
34 if row[0] not in already_recorded:
35 process = cls.by_id(db_conn, row[0])
36 processes[process.id_] = process
37 return list(processes.values())
40 def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
41 create: bool = False) -> Process:
42 """Collect Process, its VersionedAttributes, and its child IDs."""
45 process, _ = super()._by_id(db_conn, id_)
48 raise NotFoundException(f'Process not found of id: {id_}')
49 process = Process(id_)
50 if isinstance(process.id_, int):
51 for name in ('title', 'description', 'effort'):
52 table = f'process_{name}s'
53 for row in db_conn.all_where(table, 'parent', process.id_):
54 getattr(process, name).history_from_row(row)
55 for row in db_conn.all_where('process_steps', 'owner',
57 step = ProcessStep.from_table_row(db_conn, row)
58 process.explicit_steps += [step]
59 for name in ('conditions', 'fulfills', 'undoes'):
60 table = f'process_{name}'
61 for row in db_conn.all_where(table, 'process', process.id_):
62 target = getattr(process, name)
63 target += [Condition.by_id(db_conn, row[1])]
64 assert isinstance(process, Process)
67 def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
68 """Return Processes using self for a ProcessStep."""
70 for owner_id in db_conn.exec('SELECT owner FROM process_steps WHERE'
71 ' step_process = ?', (self.id_,)):
72 owner_ids.add(owner_id[0])
73 return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
75 def get_steps(self, db_conn: DatabaseConnection, external_owner:
76 Process | None = None) -> dict[int, dict[str, object]]:
77 """Return tree of depended-on explicit and implicit ProcessSteps."""
79 def make_node(step: ProcessStep) -> dict[str, object]:
81 if external_owner is not None:
82 is_explicit = step.owner_id == external_owner.id_
83 process = self.__class__.by_id(db_conn, step.step_process_id)
84 step_steps = process.get_steps(db_conn, external_owner)
85 return {'process': process, 'parent_id': step.parent_step_id,
86 'is_explicit': is_explicit, 'steps': step_steps}
88 def walk_steps(node_id: int, node: dict[str, Any]) -> None:
89 explicit_children = [s for s in self.explicit_steps
90 if s.parent_step_id == node_id]
91 for child in explicit_children:
92 node['steps'][child.id_] = make_node(child)
93 node['seen'] = node_id in seen_step_ids
94 seen_step_ids.add(node_id)
95 for id_, step in node['steps'].items():
98 steps: dict[int, dict[str, object]] = {}
99 seen_step_ids: Set[int] = set()
100 if external_owner is None:
101 external_owner = self
102 for step in [s for s in self.explicit_steps
103 if s.parent_step_id is None]:
104 assert isinstance(step.id_, int)
105 steps[step.id_] = make_node(step)
106 for step_id, step_node in steps.items():
107 walk_steps(step_id, step_node)
110 def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
111 trgt: str = 'conditions') -> None:
112 """Set self.[target] to Conditions identified by ids."""
113 trgt_list = getattr(self, trgt)
114 while len(trgt_list) > 0:
117 trgt_list += [Condition.by_id(db_conn, id_)]
119 def set_fulfills(self, db_conn: DatabaseConnection,
120 ids: list[int]) -> None:
121 """Set self.fulfills to Conditions identified by ids."""
122 self.set_conditions(db_conn, ids, 'fulfills')
124 def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
125 """Set self.undoes to Conditions identified by ids."""
126 self.set_conditions(db_conn, ids, 'undoes')
129 db_conn: DatabaseConnection,
131 step_process_id: int,
132 parent_step_id: int | None) -> ProcessStep:
133 """Create new ProcessStep, save and add it to self.explicit_steps.
135 Also checks against step recursion.
137 The new step's parent_step_id will fall back to None either if no
138 matching ProcessStep is found (which can be assumed in case it was
139 just deleted under its feet), or if the parent step would not be
140 owned by the current Process.
142 def walk_steps(node: ProcessStep) -> None:
143 if node.step_process_id == self.id_:
144 raise BadFormatException('bad step selection causes recursion')
145 step_process = self.by_id(db_conn, node.step_process_id)
146 for step in step_process.explicit_steps:
148 if parent_step_id is not None:
150 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
151 if parent_step.owner_id != self.id_:
152 parent_step_id = None
153 except NotFoundException:
154 parent_step_id = None
155 assert isinstance(self.id_, int)
156 step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
158 self.explicit_steps += [step]
159 step.save(db_conn) # NB: This ensures a non-None step.id_.
162 def set_steps(self, db_conn: DatabaseConnection,
163 steps: list[tuple[int | None, int, int | None]]) -> None:
164 """Set self.explicit_steps in bulk."""
165 for step in self.explicit_steps:
166 assert isinstance(step.id_, int)
167 del db_conn.cached_process_steps[step.id_]
168 self.explicit_steps = []
169 db_conn.exec('DELETE FROM process_steps WHERE owner = ?',
171 for step_tuple in steps:
172 self._add_step(db_conn, step_tuple[0],
173 step_tuple[1], step_tuple[2])
175 def save(self, db_conn: DatabaseConnection) -> None:
176 """Add (or re-write) self and connected items to DB."""
177 self.save_core(db_conn)
178 assert isinstance(self.id_, int)
179 self.title.save(db_conn)
180 self.description.save(db_conn)
181 self.effort.save(db_conn)
182 db_conn.rewrite_relations('process_conditions', 'process', self.id_,
183 [[c.id_] for c in self.conditions])
184 db_conn.rewrite_relations('process_fulfills', 'process', self.id_,
185 [[c.id_] for c in self.fulfills])
186 db_conn.rewrite_relations('process_undoes', 'process', self.id_,
187 [[c.id_] for c in self.undoes])
188 db_conn.exec('DELETE FROM process_steps WHERE owner = ?',
190 for step in self.explicit_steps:
192 db_conn.cached_processes[self.id_] = self
195 class ProcessStep(BaseModel):
196 """Sub-unit of Processes."""
197 table_name = 'process_steps'
198 to_save = ['owner_id', 'step_process_id', 'parent_step_id']
200 def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
201 parent_step_id: int | None) -> None:
203 self.owner_id = owner_id
204 self.step_process_id = step_process_id
205 self.parent_step_id = parent_step_id
208 def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
209 """Retrieve ProcessStep by id_, or throw NotFoundException."""
210 step, _ = super()._by_id(db_conn, id_)
212 assert isinstance(step, ProcessStep)
214 raise NotFoundException(f'found no ProcessStep of ID {id_}')
216 def save(self, db_conn: DatabaseConnection) -> None:
217 """Default to simply calling self.save_core for simple cases."""
218 self.save_core(db_conn)