1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from datetime import datetime
5 from typing import Any, Set
6 from plomtask.db import DatabaseConnection
7 from plomtask.exceptions import NotFoundException, BadFormatException
11 """Template for, and metadata for, Todos, and their arrangements."""
13 def __init__(self, id_: int | None) -> None:
14 if (id_ is not None) and id_ < 1:
15 raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
17 self.title = VersionedAttribute(self, 'title', 'UNNAMED')
18 self.description = VersionedAttribute(self, 'description', '')
19 self.effort = VersionedAttribute(self, 'effort', 1.0)
20 self.explicit_steps: list[ProcessStep] = []
22 def __eq__(self, other: object) -> bool:
23 return isinstance(other, self.__class__) and self.id_ == other.id_
26 def from_table_row(cls, row: Row) -> Process:
27 """Make Process from database row, with empty VersionedAttributes."""
31 def all(cls, db_conn: DatabaseConnection) -> list[Process]:
32 """Collect all Processes and their connected VersionedAttributes."""
34 for row in db_conn.exec('SELECT * FROM processes'):
35 process = cls.from_table_row(row)
36 processes[process.id_] = process
37 for row in db_conn.exec('SELECT * FROM process_titles'):
38 processes[row[0]].title.history[row[1]] = row[2]
39 for row in db_conn.exec('SELECT * FROM process_descriptions'):
40 processes[row[0]].description.history[row[1]] = row[2]
41 for row in db_conn.exec('SELECT * FROM process_efforts'):
42 processes[row[0]].effort.history[row[1]] = row[2]
43 return list(processes.values())
46 def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
47 create: bool = False) -> Process:
48 """Collect Process, its VersionedAttributes, and its child IDs."""
50 for row in db_conn.exec('SELECT * FROM processes '
51 'WHERE id = ?', (id_,)):
56 raise NotFoundException(f'Process not found of id: {id_}')
57 process = Process(id_)
59 for row in db_conn.exec('SELECT * FROM process_titles '
60 'WHERE process_id = ?', (process.id_,)):
61 process.title.history[row[1]] = row[2]
62 for row in db_conn.exec('SELECT * FROM process_descriptions '
63 'WHERE process_id = ?', (process.id_,)):
64 process.description.history[row[1]] = row[2]
65 for row in db_conn.exec('SELECT * FROM process_efforts '
66 'WHERE process_id = ?', (process.id_,)):
67 process.effort.history[row[1]] = row[2]
68 for row in db_conn.exec('SELECT * FROM process_steps '
69 'WHERE owner_id = ?', (process.id_,)):
70 process.explicit_steps += [ProcessStep.from_table_row(row)]
73 def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
74 """Return Processes using self for a ProcessStep."""
76 for owner_id in db_conn.exec('SELECT owner_id FROM process_steps WHERE'
77 ' step_process_id = ?', (self.id_,)):
78 owner_ids.add(owner_id[0])
79 return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
81 def get_steps(self, db_conn: DatabaseConnection, external_owner:
82 Process | None = None) -> dict[int, dict[str, object]]:
83 """Return tree of depended-on explicit and implicit ProcessSteps."""
85 def make_node(step: ProcessStep) -> dict[str, object]:
86 step_process = self.__class__.by_id(db_conn, step.step_process_id)
88 if external_owner is not None:
89 is_explicit = step.owner_id == external_owner.id_
90 step_steps = step_process.get_steps(db_conn, external_owner)
91 return {'process': step_process, 'parent_id': step.parent_step_id,
92 'is_explicit': is_explicit, 'steps': step_steps}
94 def walk_steps(node_id: int, node: dict[str, Any]) -> None:
95 explicit_children = [s for s in self.explicit_steps
96 if s.parent_step_id == node_id]
97 for child in explicit_children:
98 node['steps'][child.id_] = make_node(child)
99 node['seen'] = node_id in seen_step_ids
100 seen_step_ids.add(node_id)
101 for id_, step in node['steps'].items():
102 walk_steps(id_, step)
104 steps: dict[int, dict[str, object]] = {}
105 seen_step_ids: Set[int] = set()
106 if external_owner is None:
107 external_owner = self
108 for step in [s for s in self.explicit_steps
109 if s.parent_step_id is None]:
110 assert step.id_ is not None # for mypy
111 steps[step.id_] = make_node(step)
112 for step_id, step_node in steps.items():
113 walk_steps(step_id, step_node)
116 def add_step(self, db_conn: DatabaseConnection, id_: int | None,
117 step_process_id: int,
118 parent_step_id: int | None) -> ProcessStep:
119 """Create new ProcessStep, save and add it to self.explicit_steps.
121 Also checks against step recursion.
122 The new step's parent_step_id will fall back to None either if no
123 matching ProcessStep is found (which can be assumed in case it was
124 just deleted under its feet), or if the parent step would not be
125 owned by the current Process.
127 def walk_steps(node: ProcessStep) -> None:
128 if node.step_process_id == self.id_:
129 raise BadFormatException('bad step selection causes recursion')
130 step_process = self.by_id(db_conn, node.step_process_id)
131 for step in step_process.explicit_steps:
133 if parent_step_id is not None:
135 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
136 if parent_step.owner_id != self.id_:
137 parent_step_id = None
138 except NotFoundException:
139 parent_step_id = None
140 assert self.id_ is not None
141 step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
143 self.explicit_steps += [step]
144 step.save(db_conn) # NB: This ensures a non-None step.id_.
147 def save_without_steps(self, db_conn: DatabaseConnection) -> None:
148 """Add (or re-write) self and connected VersionedAttributes to DB."""
149 cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
150 self.id_ = cursor.lastrowid
151 self.title.save(db_conn)
152 self.description.save(db_conn)
153 self.effort.save(db_conn)
155 def fix_steps(self, db_conn: DatabaseConnection) -> None:
156 """Rewrite ProcessSteps from self.explicit_steps.
158 This also fixes illegal Step.parent_step_id values, i.e. those pointing
159 to steps now absent, or owned by a different Process, fall back into
162 db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
164 for step in self.explicit_steps:
165 if step.parent_step_id is not None:
167 parent_step = ProcessStep.by_id(db_conn,
169 if parent_step.owner_id != self.id_:
170 step.parent_step_id = None
171 except NotFoundException:
172 step.parent_step_id = None
177 """Sub-unit of Processes."""
179 def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
180 parent_step_id: int | None) -> None:
182 self.owner_id = owner_id
183 self.step_process_id = step_process_id
184 self.parent_step_id = parent_step_id
187 def from_table_row(cls, row: Row) -> ProcessStep:
188 """Make ProcessStep from database row."""
189 return cls(row[0], row[1], row[2], row[3])
192 def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
193 """Retrieve ProcessStep by id_, or throw NotFoundException."""
194 for row in db_conn.exec('SELECT * FROM process_steps '
195 'WHERE step_id = ?', (id_,)):
196 return cls.from_table_row(row)
197 raise NotFoundException(f'found no ProcessStep of ID {id_}')
199 def save(self, db_conn: DatabaseConnection) -> None:
200 """Save to database."""
201 cursor = db_conn.exec('REPLACE INTO process_steps VALUES (?, ?, ?, ?)',
202 (self.id_, self.owner_id, self.step_process_id,
203 self.parent_step_id))
204 self.id_ = cursor.lastrowid
207 class VersionedAttribute:
208 """Attributes whose values are recorded as a timestamped history."""
211 parent: Process, name: str, default: str | float) -> None:
214 self.default = default
215 self.history: dict[str, str | float] = {}
218 def _newest_timestamp(self) -> str:
219 """Return most recent timestamp."""
220 return sorted(self.history.keys())[-1]
223 def newest(self) -> str | float:
224 """Return most recent value, or self.default if self.history empty."""
225 if 0 == len(self.history):
227 return self.history[self._newest_timestamp]
229 def set(self, value: str | float) -> None:
230 """Add to self.history if and only if not same value as newest one."""
231 if 0 == len(self.history) \
232 or value != self.history[self._newest_timestamp]:
233 self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
235 def at(self, queried_time: str) -> str | float:
236 """Retrieve value of timestamp nearest queried_time from the past."""
237 sorted_timestamps = sorted(self.history.keys())
238 if 0 == len(sorted_timestamps):
240 selected_timestamp = sorted_timestamps[0]
241 for timestamp in sorted_timestamps[1:]:
242 if timestamp > queried_time:
244 selected_timestamp = timestamp
245 return self.history[selected_timestamp]
247 def save(self, db_conn: DatabaseConnection) -> None:
248 """Save as self.history entries, but first wipe old ones."""
249 db_conn.exec(f'DELETE FROM process_{self.name}s WHERE process_id = ?',
251 for timestamp, value in self.history.items():
252 db_conn.exec(f'INSERT INTO process_{self.name}s VALUES (?, ?, ?)',
253 (self.parent.id_, timestamp, value))