1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from typing import Any, Set
4 from plomtask.db import DatabaseConnection, BaseModel
5 from plomtask.misc import VersionedAttribute
6 from plomtask.conditions import Condition, ConditionsRelations
7 from plomtask.exceptions import NotFoundException, BadFormatException
10 class Process(BaseModel, ConditionsRelations):
11 """Template for, and metadata for, Todos, and their arrangements."""
12 table_name = 'processes'
14 # pylint: disable=too-many-instance-attributes
16 def __init__(self, id_: int | None) -> None:
18 self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
19 self.description = VersionedAttribute(self, 'process_descriptions', '')
20 self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
21 self.explicit_steps: list[ProcessStep] = []
22 self.conditions: list[Condition] = []
23 self.enables: list[Condition] = []
24 self.disables: list[Condition] = []
27 def all(cls, db_conn: DatabaseConnection) -> list[Process]:
28 """Collect all Processes and their connected VersionedAttributes."""
30 for id_, process in db_conn.cached_processes.items():
31 processes[id_] = process
32 already_recorded = processes.keys()
33 for id_ in db_conn.column_all('processes', 'id'):
34 if id_ not in already_recorded:
35 process = cls.by_id(db_conn, id_)
36 processes[process.id_] = process
37 return list(processes.values())
40 def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
41 create: bool = False) -> Process:
42 """Collect Process, its VersionedAttributes, and its child IDs."""
45 process, _ = super()._by_id(db_conn, id_)
48 raise NotFoundException(f'Process not found of id: {id_}')
49 process = Process(id_)
50 if isinstance(process.id_, int):
51 for name in ('title', 'description', 'effort'):
52 table = f'process_{name}s'
53 for row in db_conn.row_where(table, 'parent', process.id_):
54 getattr(process, name).history_from_row(row)
55 for row in db_conn.row_where('process_steps', 'owner',
57 step = ProcessStep.from_table_row(db_conn, row)
58 process.explicit_steps += [step]
59 for name in ('conditions', 'enables', 'disables'):
60 table = f'process_{name}'
61 for cond_id in db_conn.column_where(table, 'condition',
62 'process', process.id_):
63 target = getattr(process, name)
64 target += [Condition.by_id(db_conn, cond_id)]
65 assert isinstance(process, Process)
68 def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
69 """Return Processes using self for a ProcessStep."""
73 for id_ in db_conn.column_where('process_steps', 'owner',
74 'step_process', self.id_):
76 return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
78 def get_steps(self, db_conn: DatabaseConnection, external_owner:
79 Process | None = None) -> dict[int, dict[str, object]]:
80 """Return tree of depended-on explicit and implicit ProcessSteps."""
82 def make_node(step: ProcessStep) -> dict[str, object]:
84 if external_owner is not None:
85 is_explicit = step.owner_id == external_owner.id_
86 process = self.__class__.by_id(db_conn, step.step_process_id)
87 step_steps = process.get_steps(db_conn, external_owner)
88 return {'process': process, 'parent_id': step.parent_step_id,
89 'is_explicit': is_explicit, 'steps': step_steps}
91 def walk_steps(node_id: int, node: dict[str, Any]) -> None:
92 explicit_children = [s for s in self.explicit_steps
93 if s.parent_step_id == node_id]
94 for child in explicit_children:
95 node['steps'][child.id_] = make_node(child)
96 node['seen'] = node_id in seen_step_ids
97 seen_step_ids.add(node_id)
98 for id_, step in node['steps'].items():
101 steps: dict[int, dict[str, object]] = {}
102 seen_step_ids: Set[int] = set()
103 if external_owner is None:
104 external_owner = self
105 for step in [s for s in self.explicit_steps
106 if s.parent_step_id is None]:
107 assert isinstance(step.id_, int)
108 steps[step.id_] = make_node(step)
109 for step_id, step_node in steps.items():
110 walk_steps(step_id, step_node)
114 db_conn: DatabaseConnection,
116 step_process_id: int,
117 parent_step_id: int | None) -> ProcessStep:
118 """Create new ProcessStep, save and add it to self.explicit_steps.
120 Also checks against step recursion.
122 The new step's parent_step_id will fall back to None either if no
123 matching ProcessStep is found (which can be assumed in case it was
124 just deleted under its feet), or if the parent step would not be
125 owned by the current Process.
127 def walk_steps(node: ProcessStep) -> None:
128 if node.step_process_id == self.id_:
129 raise BadFormatException('bad step selection causes recursion')
130 step_process = self.by_id(db_conn, node.step_process_id)
131 for step in step_process.explicit_steps:
133 if parent_step_id is not None:
135 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
136 if parent_step.owner_id != self.id_:
137 parent_step_id = None
138 except NotFoundException:
139 parent_step_id = None
140 assert isinstance(self.id_, int)
141 step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
143 self.explicit_steps += [step]
144 step.save(db_conn) # NB: This ensures a non-None step.id_.
147 def set_steps(self, db_conn: DatabaseConnection,
148 steps: list[tuple[int | None, int, int | None]]) -> None:
149 """Set self.explicit_steps in bulk."""
150 assert isinstance(self.id_, int)
151 for step in self.explicit_steps:
152 assert isinstance(step.id_, int)
153 del db_conn.cached_process_steps[step.id_]
154 self.explicit_steps = []
155 db_conn.delete_where('process_steps', 'owner', self.id_)
156 for step_tuple in steps:
157 self._add_step(db_conn, step_tuple[0],
158 step_tuple[1], step_tuple[2])
160 def save(self, db_conn: DatabaseConnection) -> None:
161 """Add (or re-write) self and connected items to DB."""
162 self.save_core(db_conn)
163 assert isinstance(self.id_, int)
164 self.title.save(db_conn)
165 self.description.save(db_conn)
166 self.effort.save(db_conn)
167 db_conn.rewrite_relations('process_conditions', 'process', self.id_,
168 [[c.id_] for c in self.conditions])
169 db_conn.rewrite_relations('process_enables', 'process', self.id_,
170 [[c.id_] for c in self.enables])
171 db_conn.rewrite_relations('process_disables', 'process', self.id_,
172 [[c.id_] for c in self.disables])
173 db_conn.delete_where('process_steps', 'owner', self.id_)
174 for step in self.explicit_steps:
176 db_conn.cached_processes[self.id_] = self
179 class ProcessStep(BaseModel):
180 """Sub-unit of Processes."""
181 table_name = 'process_steps'
182 to_save = ['owner_id', 'step_process_id', 'parent_step_id']
184 def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
185 parent_step_id: int | None) -> None:
187 self.owner_id = owner_id
188 self.step_process_id = step_process_id
189 self.parent_step_id = parent_step_id
192 def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
193 """Retrieve ProcessStep by id_, or throw NotFoundException."""
194 step, _ = super()._by_id(db_conn, id_)
196 assert isinstance(step, ProcessStep)
198 raise NotFoundException(f'found no ProcessStep of ID {id_}')
200 def save(self, db_conn: DatabaseConnection) -> None:
201 """Default to simply calling self.save_core for simple cases."""
202 self.save_core(db_conn)