1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from datetime import datetime
5 from typing import Any, Set
6 from plomtask.db import DatabaseConnection
7 from plomtask.exceptions import NotFoundException, BadFormatException
11 """Template for, and metadata for, Todos, and their arrangements."""
13 def __init__(self, id_: int | None) -> None:
14 if (id_ is not None) and id_ < 1:
15 raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
17 self.title = VersionedAttribute(self, 'title', 'UNNAMED')
18 self.description = VersionedAttribute(self, 'description', '')
19 self.effort = VersionedAttribute(self, 'effort', 1.0)
20 self.explicit_steps: list[ProcessStep] = []
23 def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Process:
24 """Make Process from database row, with empty VersionedAttributes."""
26 assert process.id_ is not None
27 db_conn.cached_processes[process.id_] = process
31 def all(cls, db_conn: DatabaseConnection) -> list[Process]:
32 """Collect all Processes and their connected VersionedAttributes."""
34 for id_, process in db_conn.cached_processes.items():
35 processes[id_] = process
36 already_recorded = processes.keys()
37 for row in db_conn.exec('SELECT id FROM processes'):
38 if row[0] not in already_recorded:
39 process = cls.by_id(db_conn, row[0])
40 processes[process.id_] = process
41 return list(processes.values())
44 def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
45 create: bool = False) -> Process:
46 """Collect Process, its VersionedAttributes, and its child IDs."""
47 if id_ in db_conn.cached_processes.keys():
48 process = db_conn.cached_processes[id_]
49 assert isinstance(process, Process)
52 for row in db_conn.exec('SELECT * FROM processes '
53 'WHERE id = ?', (id_,)):
58 raise NotFoundException(f'Process not found of id: {id_}')
59 process = Process(id_)
61 for row in db_conn.exec('SELECT * FROM process_titles '
62 'WHERE process_id = ?', (process.id_,)):
63 process.title.history[row[1]] = row[2]
64 for row in db_conn.exec('SELECT * FROM process_descriptions '
65 'WHERE process_id = ?', (process.id_,)):
66 process.description.history[row[1]] = row[2]
67 for row in db_conn.exec('SELECT * FROM process_efforts '
68 'WHERE process_id = ?', (process.id_,)):
69 process.effort.history[row[1]] = row[2]
70 for row in db_conn.exec('SELECT * FROM process_steps '
71 'WHERE owner_id = ?', (process.id_,)):
72 process.explicit_steps += [ProcessStep.from_table_row(db_conn,
74 assert isinstance(process, Process)
77 def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
78 """Return Processes using self for a ProcessStep."""
80 for owner_id in db_conn.exec('SELECT owner_id FROM process_steps WHERE'
81 ' step_process_id = ?', (self.id_,)):
82 owner_ids.add(owner_id[0])
83 return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
85 def get_steps(self, db_conn: DatabaseConnection, external_owner:
86 Process | None = None) -> dict[int, dict[str, object]]:
87 """Return tree of depended-on explicit and implicit ProcessSteps."""
89 def make_node(step: ProcessStep) -> dict[str, object]:
90 step_process = self.__class__.by_id(db_conn, step.step_process_id)
92 if external_owner is not None:
93 is_explicit = step.owner_id == external_owner.id_
94 step_steps = step_process.get_steps(db_conn, external_owner)
95 return {'process': step_process, 'parent_id': step.parent_step_id,
96 'is_explicit': is_explicit, 'steps': step_steps}
98 def walk_steps(node_id: int, node: dict[str, Any]) -> None:
99 explicit_children = [s for s in self.explicit_steps
100 if s.parent_step_id == node_id]
101 for child in explicit_children:
102 node['steps'][child.id_] = make_node(child)
103 node['seen'] = node_id in seen_step_ids
104 seen_step_ids.add(node_id)
105 for id_, step in node['steps'].items():
106 walk_steps(id_, step)
108 steps: dict[int, dict[str, object]] = {}
109 seen_step_ids: Set[int] = set()
110 if external_owner is None:
111 external_owner = self
112 for step in [s for s in self.explicit_steps
113 if s.parent_step_id is None]:
114 assert step.id_ is not None # for mypy
115 steps[step.id_] = make_node(step)
116 for step_id, step_node in steps.items():
117 walk_steps(step_id, step_node)
120 def add_step(self, db_conn: DatabaseConnection, id_: int | None,
121 step_process_id: int,
122 parent_step_id: int | None) -> ProcessStep:
123 """Create new ProcessStep, save and add it to self.explicit_steps.
125 Also checks against step recursion.
126 The new step's parent_step_id will fall back to None either if no
127 matching ProcessStep is found (which can be assumed in case it was
128 just deleted under its feet), or if the parent step would not be
129 owned by the current Process.
131 def walk_steps(node: ProcessStep) -> None:
132 if node.step_process_id == self.id_:
133 raise BadFormatException('bad step selection causes recursion')
134 step_process = self.by_id(db_conn, node.step_process_id)
135 for step in step_process.explicit_steps:
137 if parent_step_id is not None:
139 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
140 if parent_step.owner_id != self.id_:
141 parent_step_id = None
142 except NotFoundException:
143 parent_step_id = None
144 assert self.id_ is not None
145 step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
147 self.explicit_steps += [step]
148 step.save(db_conn) # NB: This ensures a non-None step.id_.
151 def save_without_steps(self, db_conn: DatabaseConnection) -> None:
152 """Add (or re-write) self and connected VersionedAttributes to DB."""
153 cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
154 self.id_ = cursor.lastrowid
155 self.title.save(db_conn)
156 self.description.save(db_conn)
157 self.effort.save(db_conn)
158 assert self.id_ is not None
159 db_conn.cached_processes[self.id_] = self
161 def fix_steps(self, db_conn: DatabaseConnection) -> None:
162 """Rewrite ProcessSteps from self.explicit_steps.
164 This also fixes illegal Step.parent_step_id values, i.e. those pointing
165 to steps now absent, or owned by a different Process, fall back into
168 db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
170 for step in self.explicit_steps:
171 if step.parent_step_id is not None:
173 parent_step = ProcessStep.by_id(db_conn,
175 if parent_step.owner_id != self.id_:
176 step.parent_step_id = None
177 except NotFoundException:
178 step.parent_step_id = None
183 """Sub-unit of Processes."""
185 def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
186 parent_step_id: int | None) -> None:
188 self.owner_id = owner_id
189 self.step_process_id = step_process_id
190 self.parent_step_id = parent_step_id
193 def from_table_row(cls, db_conn: DatabaseConnection,
194 row: Row) -> ProcessStep:
195 """Make ProcessStep from database row, store in DB cache."""
196 step = cls(row[0], row[1], row[2], row[3])
197 assert step.id_ is not None
198 db_conn.cached_process_steps[step.id_] = step
202 def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
203 """Retrieve ProcessStep by id_, or throw NotFoundException."""
204 if id_ in db_conn.cached_process_steps.keys():
205 step = db_conn.cached_process_steps[id_]
206 assert isinstance(step, ProcessStep)
208 for row in db_conn.exec('SELECT * FROM process_steps '
209 'WHERE step_id = ?', (id_,)):
210 return cls.from_table_row(db_conn, row)
211 raise NotFoundException(f'found no ProcessStep of ID {id_}')
213 def save(self, db_conn: DatabaseConnection) -> None:
214 """Save to database and cache."""
215 cursor = db_conn.exec('REPLACE INTO process_steps VALUES (?, ?, ?, ?)',
216 (self.id_, self.owner_id, self.step_process_id,
217 self.parent_step_id))
218 self.id_ = cursor.lastrowid
219 assert self.id_ is not None
220 db_conn.cached_process_steps[self.id_] = self
223 class VersionedAttribute:
224 """Attributes whose values are recorded as a timestamped history."""
227 parent: Process, name: str, default: str | float) -> None:
230 self.default = default
231 self.history: dict[str, str | float] = {}
234 def _newest_timestamp(self) -> str:
235 """Return most recent timestamp."""
236 return sorted(self.history.keys())[-1]
239 def newest(self) -> str | float:
240 """Return most recent value, or self.default if self.history empty."""
241 if 0 == len(self.history):
243 return self.history[self._newest_timestamp]
245 def set(self, value: str | float) -> None:
246 """Add to self.history if and only if not same value as newest one."""
247 if 0 == len(self.history) \
248 or value != self.history[self._newest_timestamp]:
249 self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
251 def at(self, queried_time: str) -> str | float:
252 """Retrieve value of timestamp nearest queried_time from the past."""
253 sorted_timestamps = sorted(self.history.keys())
254 if 0 == len(sorted_timestamps):
256 selected_timestamp = sorted_timestamps[0]
257 for timestamp in sorted_timestamps[1:]:
258 if timestamp > queried_time:
260 selected_timestamp = timestamp
261 return self.history[selected_timestamp]
263 def save(self, db_conn: DatabaseConnection) -> None:
264 """Save as self.history entries, but first wipe old ones."""
265 db_conn.exec(f'DELETE FROM process_{self.name}s WHERE process_id = ?',
267 for timestamp, value in self.history.items():
268 db_conn.exec(f'INSERT INTO process_{self.name}s VALUES (?, ?, ?)',
269 (self.parent.id_, timestamp, value))