1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Set, Any
5 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.versioned_attributes import VersionedAttribute
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import (NotFoundException, BadFormatException,
14 class ProcessStepsNode:
15 """Collects what's useful to know for ProcessSteps tree display."""
19 steps: dict[int, ProcessStepsNode]
23 class Process(BaseModel[int], ConditionsRelations):
24 """Template for, and metadata for, Todos, and their arrangements."""
25 # pylint: disable=too-many-instance-attributes
26 table_name = 'processes'
27 to_save = ['calendarize']
28 to_save_versioned = ['title', 'description', 'effort']
29 to_save_relations = [('process_conditions', 'process', 'conditions'),
30 ('process_blockers', 'process', 'blockers'),
31 ('process_enables', 'process', 'enables'),
32 ('process_disables', 'process', 'disables')]
33 to_search = ['title.newest', 'description.newest']
35 def __init__(self, id_: int | None, calendarize: bool = False) -> None:
36 BaseModel.__init__(self, id_)
37 ConditionsRelations.__init__(self)
38 self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
39 self.description = VersionedAttribute(self, 'process_descriptions', '')
40 self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
41 self.explicit_steps: list[ProcessStep] = []
42 self.calendarize = calendarize
45 def from_table_row(cls, db_conn: DatabaseConnection,
46 row: Row | list[Any]) -> Process:
47 """Make from DB row, with dependencies."""
48 process = super().from_table_row(db_conn, row)
49 assert isinstance(process.id_, int)
50 for name in ('title', 'description', 'effort'):
51 table = f'process_{name}s'
52 for row_ in db_conn.row_where(table, 'parent', process.id_):
53 getattr(process, name).history_from_row(row_)
54 for row_ in db_conn.row_where('process_steps', 'owner',
56 step = ProcessStep.from_table_row(db_conn, row_)
57 process.explicit_steps += [step] # pylint: disable=no-member
58 for name in ('conditions', 'blockers', 'enables', 'disables'):
59 table = f'process_{name}'
60 assert isinstance(process.id_, int)
61 for c_id in db_conn.column_where(table, 'condition',
62 'process', process.id_):
63 target = getattr(process, name)
64 target += [Condition.by_id(db_conn, c_id)]
67 def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
68 """Return Processes using self for a ProcessStep."""
72 for id_ in db_conn.column_where('process_steps', 'owner',
73 'step_process', self.id_):
75 return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
77 def get_steps(self, db_conn: DatabaseConnection, external_owner:
78 Process | None = None) -> dict[int, ProcessStepsNode]:
79 """Return tree of depended-on explicit and implicit ProcessSteps."""
81 def make_node(step: ProcessStep) -> ProcessStepsNode:
83 if external_owner is not None:
84 is_explicit = step.owner_id == external_owner.id_
85 process = self.__class__.by_id(db_conn, step.step_process_id)
86 step_steps = process.get_steps(db_conn, external_owner)
87 return ProcessStepsNode(process, step.parent_step_id,
88 is_explicit, step_steps, False)
90 def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
91 explicit_children = [s for s in self.explicit_steps
92 if s.parent_step_id == node_id]
93 for child in explicit_children:
94 assert isinstance(child.id_, int)
95 node.steps[child.id_] = make_node(child)
96 node.seen = node_id in seen_step_ids
97 seen_step_ids.add(node_id)
98 for id_, step in node.steps.items():
101 steps: dict[int, ProcessStepsNode] = {}
102 seen_step_ids: Set[int] = set()
103 if external_owner is None:
104 external_owner = self
105 for step in [s for s in self.explicit_steps
106 if s.parent_step_id is None]:
107 assert isinstance(step.id_, int)
108 steps[step.id_] = make_node(step)
109 for step_id, step_node in steps.items():
110 walk_steps(step_id, step_node)
114 db_conn: DatabaseConnection,
116 step_process_id: int,
117 parent_step_id: int | None) -> ProcessStep:
118 """Create new ProcessStep, save and add it to self.explicit_steps.
120 Also checks against step recursion.
122 The new step's parent_step_id will fall back to None either if no
123 matching ProcessStep is found (which can be assumed in case it was
124 just deleted under its feet), or if the parent step would not be
125 owned by the current Process.
128 def walk_steps(node: ProcessStep) -> None:
129 if node.step_process_id == self.id_:
130 raise BadFormatException('bad step selection causes recursion')
131 step_process = self.by_id(db_conn, node.step_process_id)
132 for step in step_process.explicit_steps:
135 if parent_step_id is not None:
137 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
138 if parent_step.owner_id != self.id_:
139 parent_step_id = None
140 except NotFoundException:
141 parent_step_id = None
142 assert isinstance(self.id_, int)
143 step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
145 self.explicit_steps += [step]
146 step.save(db_conn) # NB: This ensures a non-None step.id_.
149 def set_steps(self, db_conn: DatabaseConnection,
150 steps: list[tuple[int | None, int, int | None]]) -> None:
151 """Set self.explicit_steps in bulk."""
152 assert isinstance(self.id_, int)
153 for step in self.explicit_steps:
155 self.explicit_steps = []
156 db_conn.delete_where('process_steps', 'owner', self.id_)
157 for step_tuple in steps:
158 self._add_step(db_conn, step_tuple[0],
159 step_tuple[1], step_tuple[2])
161 def save(self, db_conn: DatabaseConnection) -> None:
162 """Add (or re-write) self and connected items to DB."""
163 super().save(db_conn)
164 assert isinstance(self.id_, int)
165 db_conn.delete_where('process_steps', 'owner', self.id_)
166 for step in self.explicit_steps:
169 def remove(self, db_conn: DatabaseConnection) -> None:
170 """Remove from DB, with dependencies.
172 Guard against removal of Processes in use.
174 assert isinstance(self.id_, int)
175 for _ in db_conn.row_where('process_steps', 'step_process', self.id_):
176 raise HandledException('cannot remove Process in use')
177 for _ in db_conn.row_where('todos', 'process', self.id_):
178 raise HandledException('cannot remove Process in use')
179 for step in self.explicit_steps:
181 super().remove(db_conn)
184 class ProcessStep(BaseModel[int]):
185 """Sub-unit of Processes."""
186 table_name = 'process_steps'
187 to_save = ['owner_id', 'step_process_id', 'parent_step_id']
189 def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
190 parent_step_id: int | None) -> None:
191 super().__init__(id_)
192 self.owner_id = owner_id
193 self.step_process_id = step_process_id
194 self.parent_step_id = parent_step_id
196 def remove(self, db_conn: DatabaseConnection) -> None:
197 """Remove from DB, and owner's .explicit_steps."""
198 owner = Process.by_id(db_conn, self.owner_id)
199 owner.explicit_steps.remove(self)
200 super().remove(db_conn)