1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from datetime import datetime
6 from plomtask.db import DatabaseConnection
7 from plomtask.exceptions import NotFoundException, BadFormatException
11 """Template for, and metadata for, Todos, and their arrangements."""
13 def __init__(self, id_: int | None) -> None:
14 if (id_ is not None) and id_ < 1:
15 raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
17 self.title = VersionedAttribute(self, 'title', 'UNNAMED')
18 self.description = VersionedAttribute(self, 'description', '')
19 self.effort = VersionedAttribute(self, 'effort', 1.0)
20 self.explicit_steps: list[ProcessStep] = []
22 def __eq__(self, other: object) -> bool:
23 return isinstance(other, self.__class__) and self.id_ == other.id_
26 def from_table_row(cls, row: Row) -> Process:
27 """Make Process from database row, with empty VersionedAttributes."""
31 def all(cls, db_conn: DatabaseConnection) -> list[Process]:
32 """Collect all Processes and their connected VersionedAttributes."""
34 for row in db_conn.exec('SELECT * FROM processes'):
35 process = cls.from_table_row(row)
36 processes[process.id_] = process
37 for row in db_conn.exec('SELECT * FROM process_titles'):
38 processes[row[0]].title.history[row[1]] = row[2]
39 for row in db_conn.exec('SELECT * FROM process_descriptions'):
40 processes[row[0]].description.history[row[1]] = row[2]
41 for row in db_conn.exec('SELECT * FROM process_efforts'):
42 processes[row[0]].effort.history[row[1]] = row[2]
43 return list(processes.values())
46 def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
47 create: bool = False) -> Process:
48 """Collect Process, its VersionedAttributes, and its child IDs."""
50 for row in db_conn.exec('SELECT * FROM processes '
51 'WHERE id = ?', (id_,)):
56 raise NotFoundException(f'Process not found of id: {id_}')
57 process = Process(id_)
59 for row in db_conn.exec('SELECT * FROM process_titles '
60 'WHERE process_id = ?', (process.id_,)):
61 process.title.history[row[1]] = row[2]
62 for row in db_conn.exec('SELECT * FROM process_descriptions '
63 'WHERE process_id = ?', (process.id_,)):
64 process.description.history[row[1]] = row[2]
65 for row in db_conn.exec('SELECT * FROM process_efforts '
66 'WHERE process_id = ?', (process.id_,)):
67 process.effort.history[row[1]] = row[2]
68 for row in db_conn.exec('SELECT * FROM process_steps '
69 'WHERE owner_id = ?', (process.id_,)):
70 process.explicit_steps += [ProcessStep.from_table_row(row)]
73 def get_steps(self, db_conn: DatabaseConnection, external_owner:
74 Process | None = None) -> dict[int, dict[str, object]]:
75 """Return tree of depended-on explicit and implicit ProcessSteps."""
77 def make_node(step: ProcessStep) -> dict[str, object]:
78 step_process = self.__class__.by_id(db_conn, step.step_process_id)
80 if external_owner is not None:
81 is_explicit = step.owner_id == external_owner.id_
82 step_steps = step_process.get_steps(db_conn, external_owner)
83 return {'process': step_process, 'parent_id': step.parent_step_id,
84 'is_explicit': is_explicit, 'steps': step_steps}
86 def walk_steps(node_id: int, node: dict[str, Any]) -> None:
87 explicit_children = [s for s in self.explicit_steps
88 if s.parent_step_id == node_id]
89 for child in explicit_children:
90 node['steps'][child.id_] = make_node(child)
91 for id_, step in node['steps'].items():
94 steps: dict[int, dict[str, object]] = {}
95 if external_owner is None:
97 for step in [s for s in self.explicit_steps
98 if s.parent_step_id is None]:
99 assert step.id_ is not None # for mypy
100 steps[step.id_] = make_node(step)
101 for step_id, step_node in steps.items():
102 walk_steps(step_id, step_node)
105 def add_step(self, db_conn: DatabaseConnection, id_: int | None,
106 step_process_id: int,
107 parent_step_id: int | None) -> ProcessStep:
108 """Create new ProcessStep, save and add it to self.explicit_steps.
110 Also checks against step recursion.
111 The new step's parent_step_id will fall back to None either if no
112 matching ProcessStep is found (which can be assumed in case it was
113 just deleted under its feet), or if the parent step would not be
114 owned by the current Process.
116 def walk_steps(node: ProcessStep) -> None:
117 if node.step_process_id == self.id_:
118 raise BadFormatException('bad step selection causes recursion')
119 step_process = self.by_id(db_conn, node.step_process_id)
120 for step in step_process.explicit_steps:
122 if parent_step_id is not None:
124 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
125 if parent_step.owner_id != self.id_:
126 parent_step_id = None
127 except NotFoundException:
128 parent_step_id = None
129 assert self.id_ is not None
130 step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
132 self.explicit_steps += [step]
133 step.save(db_conn) # NB: This ensures a non-None step.id_.
136 def save_without_steps(self, db_conn: DatabaseConnection) -> None:
137 """Add (or re-write) self and connected VersionedAttributes to DB."""
138 cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
139 self.id_ = cursor.lastrowid
140 self.title.save(db_conn)
141 self.description.save(db_conn)
142 self.effort.save(db_conn)
144 def fix_steps(self, db_conn: DatabaseConnection) -> None:
145 """Rewrite ProcessSteps from self.explicit_steps.
147 This also fixes illegal Step.parent_step_id values, i.e. those pointing
148 to steps now absent, or owned by a different Process, fall back into
151 db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
153 for step in self.explicit_steps:
154 if step.parent_step_id is not None:
156 parent_step = ProcessStep.by_id(db_conn,
158 if parent_step.owner_id != self.id_:
159 step.parent_step_id = None
160 except NotFoundException:
161 step.parent_step_id = None
166 """Sub-unit of Processes."""
168 def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
169 parent_step_id: int | None) -> None:
171 self.owner_id = owner_id
172 self.step_process_id = step_process_id
173 self.parent_step_id = parent_step_id
176 def from_table_row(cls, row: Row) -> ProcessStep:
177 """Make ProcessStep from database row."""
178 return cls(row[0], row[1], row[2], row[3])
181 def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
182 """Retrieve ProcessStep by id_, or throw NotFoundException."""
183 for row in db_conn.exec('SELECT * FROM process_steps '
184 'WHERE step_id = ?', (id_,)):
185 return cls.from_table_row(row)
186 raise NotFoundException(f'found no ProcessStep of ID {id_}')
188 def save(self, db_conn: DatabaseConnection) -> None:
189 """Save to database."""
190 cursor = db_conn.exec('REPLACE INTO process_steps VALUES (?, ?, ?, ?)',
191 (self.id_, self.owner_id, self.step_process_id,
192 self.parent_step_id))
193 self.id_ = cursor.lastrowid
196 class VersionedAttribute:
197 """Attributes whose values are recorded as a timestamped history."""
200 parent: Process, name: str, default: str | float) -> None:
203 self.default = default
204 self.history: dict[str, str | float] = {}
207 def _newest_timestamp(self) -> str:
208 """Return most recent timestamp."""
209 return sorted(self.history.keys())[-1]
212 def newest(self) -> str | float:
213 """Return most recent value, or self.default if self.history empty."""
214 if 0 == len(self.history):
216 return self.history[self._newest_timestamp]
218 def set(self, value: str | float) -> None:
219 """Add to self.history if and only if not same value as newest one."""
220 if 0 == len(self.history) \
221 or value != self.history[self._newest_timestamp]:
222 self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
224 def at(self, queried_time: str) -> str | float:
225 """Retrieve value of timestamp nearest queried_time from the past."""
226 sorted_timestamps = sorted(self.history.keys())
227 if 0 == len(sorted_timestamps):
229 selected_timestamp = sorted_timestamps[0]
230 for timestamp in sorted_timestamps[1:]:
231 if timestamp > queried_time:
233 selected_timestamp = timestamp
234 return self.history[selected_timestamp]
236 def save(self, db_conn: DatabaseConnection) -> None:
237 """Save as self.history entries, but first wipe old ones."""
238 db_conn.exec(f'DELETE FROM process_{self.name}s WHERE process_id = ?',
240 for timestamp, value in self.history.items():
241 db_conn.exec(f'INSERT INTO process_{self.name}s VALUES (?, ?, ?)',
242 (self.parent.id_, timestamp, value))