1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from datetime import datetime
5 from typing import Any, Set
6 from plomtask.db import DatabaseConnection
7 from plomtask.exceptions import NotFoundException, BadFormatException
11 """Template for, and metadata for, Todos, and their arrangements."""
13 def __init__(self, id_: int | None) -> None:
14 if (id_ is not None) and id_ < 1:
15 raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
17 self.title = VersionedAttribute(self, 'title', 'UNNAMED')
18 self.description = VersionedAttribute(self, 'description', '')
19 self.effort = VersionedAttribute(self, 'effort', 1.0)
20 self.explicit_steps: list[ProcessStep] = []
22 def __eq__(self, other: object) -> bool:
23 return isinstance(other, self.__class__) and self.id_ == other.id_
26 def from_table_row(cls, row: Row) -> Process:
27 """Make Process from database row, with empty VersionedAttributes."""
31 def all(cls, db_conn: DatabaseConnection) -> list[Process]:
32 """Collect all Processes and their connected VersionedAttributes."""
34 for row in db_conn.exec('SELECT * FROM processes'):
35 process = cls.from_table_row(row)
36 processes[process.id_] = process
37 for row in db_conn.exec('SELECT * FROM process_titles'):
38 processes[row[0]].title.history[row[1]] = row[2]
39 for row in db_conn.exec('SELECT * FROM process_descriptions'):
40 processes[row[0]].description.history[row[1]] = row[2]
41 for row in db_conn.exec('SELECT * FROM process_efforts'):
42 processes[row[0]].effort.history[row[1]] = row[2]
43 return list(processes.values())
46 def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
47 create: bool = False) -> Process:
48 """Collect Process, its VersionedAttributes, and its child IDs."""
50 for row in db_conn.exec('SELECT * FROM processes '
51 'WHERE id = ?', (id_,)):
56 raise NotFoundException(f'Process not found of id: {id_}')
57 process = Process(id_)
59 for row in db_conn.exec('SELECT * FROM process_titles '
60 'WHERE process_id = ?', (process.id_,)):
61 process.title.history[row[1]] = row[2]
62 for row in db_conn.exec('SELECT * FROM process_descriptions '
63 'WHERE process_id = ?', (process.id_,)):
64 process.description.history[row[1]] = row[2]
65 for row in db_conn.exec('SELECT * FROM process_efforts '
66 'WHERE process_id = ?', (process.id_,)):
67 process.effort.history[row[1]] = row[2]
68 for row in db_conn.exec('SELECT * FROM process_steps '
69 'WHERE owner_id = ?', (process.id_,)):
70 process.explicit_steps += [ProcessStep.from_table_row(row)]
73 def get_steps(self, db_conn: DatabaseConnection, external_owner:
74 Process | None = None) -> dict[int, dict[str, object]]:
75 """Return tree of depended-on explicit and implicit ProcessSteps."""
77 def make_node(step: ProcessStep) -> dict[str, object]:
78 step_process = self.__class__.by_id(db_conn, step.step_process_id)
80 if external_owner is not None:
81 is_explicit = step.owner_id == external_owner.id_
82 step_steps = step_process.get_steps(db_conn, external_owner)
83 return {'process': step_process, 'parent_id': step.parent_step_id,
84 'is_explicit': is_explicit, 'steps': step_steps}
86 def walk_steps(node_id: int, node: dict[str, Any]) -> None:
87 explicit_children = [s for s in self.explicit_steps
88 if s.parent_step_id == node_id]
89 for child in explicit_children:
90 node['steps'][child.id_] = make_node(child)
91 node['seen'] = node_id in seen_step_ids
92 seen_step_ids.add(node_id)
93 for id_, step in node['steps'].items():
96 steps: dict[int, dict[str, object]] = {}
97 seen_step_ids: Set[int] = set()
98 if external_owner is None:
100 for step in [s for s in self.explicit_steps
101 if s.parent_step_id is None]:
102 assert step.id_ is not None # for mypy
103 steps[step.id_] = make_node(step)
104 for step_id, step_node in steps.items():
105 walk_steps(step_id, step_node)
108 def add_step(self, db_conn: DatabaseConnection, id_: int | None,
109 step_process_id: int,
110 parent_step_id: int | None) -> ProcessStep:
111 """Create new ProcessStep, save and add it to self.explicit_steps.
113 Also checks against step recursion.
114 The new step's parent_step_id will fall back to None either if no
115 matching ProcessStep is found (which can be assumed in case it was
116 just deleted under its feet), or if the parent step would not be
117 owned by the current Process.
119 def walk_steps(node: ProcessStep) -> None:
120 if node.step_process_id == self.id_:
121 raise BadFormatException('bad step selection causes recursion')
122 step_process = self.by_id(db_conn, node.step_process_id)
123 for step in step_process.explicit_steps:
125 if parent_step_id is not None:
127 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
128 if parent_step.owner_id != self.id_:
129 parent_step_id = None
130 except NotFoundException:
131 parent_step_id = None
132 assert self.id_ is not None
133 step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
135 self.explicit_steps += [step]
136 step.save(db_conn) # NB: This ensures a non-None step.id_.
139 def save_without_steps(self, db_conn: DatabaseConnection) -> None:
140 """Add (or re-write) self and connected VersionedAttributes to DB."""
141 cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
142 self.id_ = cursor.lastrowid
143 self.title.save(db_conn)
144 self.description.save(db_conn)
145 self.effort.save(db_conn)
147 def fix_steps(self, db_conn: DatabaseConnection) -> None:
148 """Rewrite ProcessSteps from self.explicit_steps.
150 This also fixes illegal Step.parent_step_id values, i.e. those pointing
151 to steps now absent, or owned by a different Process, fall back into
154 db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
156 for step in self.explicit_steps:
157 if step.parent_step_id is not None:
159 parent_step = ProcessStep.by_id(db_conn,
161 if parent_step.owner_id != self.id_:
162 step.parent_step_id = None
163 except NotFoundException:
164 step.parent_step_id = None
169 """Sub-unit of Processes."""
171 def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
172 parent_step_id: int | None) -> None:
174 self.owner_id = owner_id
175 self.step_process_id = step_process_id
176 self.parent_step_id = parent_step_id
179 def from_table_row(cls, row: Row) -> ProcessStep:
180 """Make ProcessStep from database row."""
181 return cls(row[0], row[1], row[2], row[3])
184 def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
185 """Retrieve ProcessStep by id_, or throw NotFoundException."""
186 for row in db_conn.exec('SELECT * FROM process_steps '
187 'WHERE step_id = ?', (id_,)):
188 return cls.from_table_row(row)
189 raise NotFoundException(f'found no ProcessStep of ID {id_}')
191 def save(self, db_conn: DatabaseConnection) -> None:
192 """Save to database."""
193 cursor = db_conn.exec('REPLACE INTO process_steps VALUES (?, ?, ?, ?)',
194 (self.id_, self.owner_id, self.step_process_id,
195 self.parent_step_id))
196 self.id_ = cursor.lastrowid
199 class VersionedAttribute:
200 """Attributes whose values are recorded as a timestamped history."""
203 parent: Process, name: str, default: str | float) -> None:
206 self.default = default
207 self.history: dict[str, str | float] = {}
210 def _newest_timestamp(self) -> str:
211 """Return most recent timestamp."""
212 return sorted(self.history.keys())[-1]
215 def newest(self) -> str | float:
216 """Return most recent value, or self.default if self.history empty."""
217 if 0 == len(self.history):
219 return self.history[self._newest_timestamp]
221 def set(self, value: str | float) -> None:
222 """Add to self.history if and only if not same value as newest one."""
223 if 0 == len(self.history) \
224 or value != self.history[self._newest_timestamp]:
225 self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
227 def at(self, queried_time: str) -> str | float:
228 """Retrieve value of timestamp nearest queried_time from the past."""
229 sorted_timestamps = sorted(self.history.keys())
230 if 0 == len(sorted_timestamps):
232 selected_timestamp = sorted_timestamps[0]
233 for timestamp in sorted_timestamps[1:]:
234 if timestamp > queried_time:
236 selected_timestamp = timestamp
237 return self.history[selected_timestamp]
239 def save(self, db_conn: DatabaseConnection) -> None:
240 """Save as self.history entries, but first wipe old ones."""
241 db_conn.exec(f'DELETE FROM process_{self.name}s WHERE process_id = ?',
243 for timestamp, value in self.history.items():
244 db_conn.exec(f'INSERT INTO process_{self.name}s VALUES (?, ?, ?)',
245 (self.parent.id_, timestamp, value))