1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from typing import Any, Set
4 from plomtask.db import DatabaseConnection, BaseModel
5 from plomtask.misc import VersionedAttribute
6 from plomtask.conditions import Condition
7 from plomtask.exceptions import NotFoundException, BadFormatException
10 class Process(BaseModel):
11 """Template for, and metadata for, Todos, and their arrangements."""
12 table_name = 'processes'
14 # pylint: disable=too-many-instance-attributes
16 def __init__(self, id_: int | None) -> None:
18 self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
19 self.description = VersionedAttribute(self, 'process_descriptions', '')
20 self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
21 self.explicit_steps: list[ProcessStep] = []
22 self.conditions: list[Condition] = []
23 self.enables: list[Condition] = []
24 self.disables: list[Condition] = []
27 def all(cls, db_conn: DatabaseConnection) -> list[Process]:
28 """Collect all Processes and their connected VersionedAttributes."""
30 for id_, process in db_conn.cached_processes.items():
31 processes[id_] = process
32 already_recorded = processes.keys()
33 for id_ in db_conn.column_all('processes', 'id'):
34 if id_ not in already_recorded:
35 process = cls.by_id(db_conn, id_)
36 processes[process.id_] = process
37 return list(processes.values())
40 def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
41 create: bool = False) -> Process:
42 """Collect Process, its VersionedAttributes, and its child IDs."""
45 process, _ = super()._by_id(db_conn, id_)
48 raise NotFoundException(f'Process not found of id: {id_}')
49 process = Process(id_)
50 if isinstance(process.id_, int):
51 for name in ('title', 'description', 'effort'):
52 table = f'process_{name}s'
53 for row in db_conn.row_where(table, 'parent', process.id_):
54 getattr(process, name).history_from_row(row)
55 for row in db_conn.row_where('process_steps', 'owner',
57 step = ProcessStep.from_table_row(db_conn, row)
58 process.explicit_steps += [step]
59 for name in ('conditions', 'enables', 'disables'):
60 table = f'process_{name}'
61 for cond_id in db_conn.column_where(table, 'condition',
62 'process', process.id_):
63 target = getattr(process, name)
64 target += [Condition.by_id(db_conn, cond_id)]
65 assert isinstance(process, Process)
68 def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
69 """Return Processes using self for a ProcessStep."""
73 for id_ in db_conn.column_where('process_steps', 'owner',
74 'step_process', self.id_):
76 return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
78 def get_steps(self, db_conn: DatabaseConnection, external_owner:
79 Process | None = None) -> dict[int, dict[str, object]]:
80 """Return tree of depended-on explicit and implicit ProcessSteps."""
82 def make_node(step: ProcessStep) -> dict[str, object]:
84 if external_owner is not None:
85 is_explicit = step.owner_id == external_owner.id_
86 process = self.__class__.by_id(db_conn, step.step_process_id)
87 step_steps = process.get_steps(db_conn, external_owner)
88 return {'process': process, 'parent_id': step.parent_step_id,
89 'is_explicit': is_explicit, 'steps': step_steps}
91 def walk_steps(node_id: int, node: dict[str, Any]) -> None:
92 explicit_children = [s for s in self.explicit_steps
93 if s.parent_step_id == node_id]
94 for child in explicit_children:
95 node['steps'][child.id_] = make_node(child)
96 node['seen'] = node_id in seen_step_ids
97 seen_step_ids.add(node_id)
98 for id_, step in node['steps'].items():
101 steps: dict[int, dict[str, object]] = {}
102 seen_step_ids: Set[int] = set()
103 if external_owner is None:
104 external_owner = self
105 for step in [s for s in self.explicit_steps
106 if s.parent_step_id is None]:
107 assert isinstance(step.id_, int)
108 steps[step.id_] = make_node(step)
109 for step_id, step_node in steps.items():
110 walk_steps(step_id, step_node)
113 def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
114 trgt: str = 'conditions') -> None:
115 """Set self.[target] to Conditions identified by ids."""
116 trgt_list = getattr(self, trgt)
117 while len(trgt_list) > 0:
120 trgt_list += [Condition.by_id(db_conn, id_)]
122 def set_enables(self, db_conn: DatabaseConnection,
123 ids: list[int]) -> None:
124 """Set self.enables to Conditions identified by ids."""
125 self.set_conditions(db_conn, ids, 'enables')
127 def set_disables(self, db_conn: DatabaseConnection,
128 ids: list[int]) -> None:
129 """Set self.disables to Conditions identified by ids."""
130 self.set_conditions(db_conn, ids, 'disables')
133 db_conn: DatabaseConnection,
135 step_process_id: int,
136 parent_step_id: int | None) -> ProcessStep:
137 """Create new ProcessStep, save and add it to self.explicit_steps.
139 Also checks against step recursion.
141 The new step's parent_step_id will fall back to None either if no
142 matching ProcessStep is found (which can be assumed in case it was
143 just deleted under its feet), or if the parent step would not be
144 owned by the current Process.
146 def walk_steps(node: ProcessStep) -> None:
147 if node.step_process_id == self.id_:
148 raise BadFormatException('bad step selection causes recursion')
149 step_process = self.by_id(db_conn, node.step_process_id)
150 for step in step_process.explicit_steps:
152 if parent_step_id is not None:
154 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
155 if parent_step.owner_id != self.id_:
156 parent_step_id = None
157 except NotFoundException:
158 parent_step_id = None
159 assert isinstance(self.id_, int)
160 step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
162 self.explicit_steps += [step]
163 step.save(db_conn) # NB: This ensures a non-None step.id_.
166 def set_steps(self, db_conn: DatabaseConnection,
167 steps: list[tuple[int | None, int, int | None]]) -> None:
168 """Set self.explicit_steps in bulk."""
169 assert isinstance(self.id_, int)
170 for step in self.explicit_steps:
171 assert isinstance(step.id_, int)
172 del db_conn.cached_process_steps[step.id_]
173 self.explicit_steps = []
174 db_conn.delete_where('process_steps', 'owner', self.id_)
175 for step_tuple in steps:
176 self._add_step(db_conn, step_tuple[0],
177 step_tuple[1], step_tuple[2])
179 def save(self, db_conn: DatabaseConnection) -> None:
180 """Add (or re-write) self and connected items to DB."""
181 self.save_core(db_conn)
182 assert isinstance(self.id_, int)
183 self.title.save(db_conn)
184 self.description.save(db_conn)
185 self.effort.save(db_conn)
186 db_conn.rewrite_relations('process_conditions', 'process', self.id_,
187 [[c.id_] for c in self.conditions])
188 db_conn.rewrite_relations('process_enables', 'process', self.id_,
189 [[c.id_] for c in self.enables])
190 db_conn.rewrite_relations('process_disables', 'process', self.id_,
191 [[c.id_] for c in self.disables])
192 db_conn.delete_where('process_steps', 'owner', self.id_)
193 for step in self.explicit_steps:
195 db_conn.cached_processes[self.id_] = self
198 class ProcessStep(BaseModel):
199 """Sub-unit of Processes."""
200 table_name = 'process_steps'
201 to_save = ['owner_id', 'step_process_id', 'parent_step_id']
203 def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
204 parent_step_id: int | None) -> None:
206 self.owner_id = owner_id
207 self.step_process_id = step_process_id
208 self.parent_step_id = parent_step_id
211 def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
212 """Retrieve ProcessStep by id_, or throw NotFoundException."""
213 step, _ = super()._by_id(db_conn, id_)
215 assert isinstance(step, ProcessStep)
217 raise NotFoundException(f'found no ProcessStep of ID {id_}')
219 def save(self, db_conn: DatabaseConnection) -> None:
220 """Default to simply calling self.save_core for simple cases."""
221 self.save_core(db_conn)