home · contact · privacy
fe9bd4a63ff36a34afdbcbe0abed36edb891c8f5
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from datetime import datetime
5 from typing import Any, Set
6 from plomtask.db import DatabaseConnection
7 from plomtask.exceptions import NotFoundException, BadFormatException
8
9
10 class Process:
11     """Template for, and metadata for, Todos, and their arrangements."""
12
13     def __init__(self, id_: int | None) -> None:
14         if (id_ is not None) and id_ < 1:
15             raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
16         self.id_ = id_
17         self.title = VersionedAttribute(self, 'title', 'UNNAMED')
18         self.description = VersionedAttribute(self, 'description', '')
19         self.effort = VersionedAttribute(self, 'effort', 1.0)
20         self.explicit_steps: list[ProcessStep] = []
21
22     @classmethod
23     def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Process:
24         """Make Process from database row, with empty VersionedAttributes."""
25         process = cls(row[0])
26         assert process.id_ is not None
27         db_conn.cached_processes[process.id_] = process
28         return process
29
30     @classmethod
31     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
32         """Collect all Processes and their connected VersionedAttributes."""
33         processes = {}
34         for id_, process in db_conn.cached_processes.items():
35             processes[id_] = process
36         already_recorded = processes.keys()
37         for row in db_conn.exec('SELECT id FROM processes'):
38             if row[0] not in already_recorded:
39                 process = cls.by_id(db_conn, row[0])
40                 processes[process.id_] = process
41         return list(processes.values())
42
43     @classmethod
44     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
45               create: bool = False) -> Process:
46         """Collect Process, its VersionedAttributes, and its child IDs."""
47         if id_ in db_conn.cached_processes.keys():
48             process = db_conn.cached_processes[id_]
49             assert isinstance(process, Process)
50             return process
51         process = None
52         for row in db_conn.exec('SELECT * FROM processes '
53                                 'WHERE id = ?', (id_,)):
54             process = cls(row[0])
55             break
56         if not process:
57             if not create:
58                 raise NotFoundException(f'Process not found of id: {id_}')
59             process = Process(id_)
60         if process:
61             for row in db_conn.exec('SELECT * FROM process_titles '
62                                     'WHERE process_id = ?', (process.id_,)):
63                 process.title.history[row[1]] = row[2]
64             for row in db_conn.exec('SELECT * FROM process_descriptions '
65                                     'WHERE process_id = ?', (process.id_,)):
66                 process.description.history[row[1]] = row[2]
67             for row in db_conn.exec('SELECT * FROM process_efforts '
68                                     'WHERE process_id = ?', (process.id_,)):
69                 process.effort.history[row[1]] = row[2]
70             for row in db_conn.exec('SELECT * FROM process_steps '
71                                     'WHERE owner_id = ?', (process.id_,)):
72                 process.explicit_steps += [ProcessStep.from_table_row(db_conn,
73                                                                       row)]
74         assert isinstance(process, Process)
75         return process
76
77     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
78         """Return Processes using self for a ProcessStep."""
79         owner_ids = set()
80         for owner_id in db_conn.exec('SELECT owner_id FROM process_steps WHERE'
81                                      ' step_process_id = ?', (self.id_,)):
82             owner_ids.add(owner_id[0])
83         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
84
85     def get_steps(self, db_conn: DatabaseConnection, external_owner:
86                   Process | None = None) -> dict[int, dict[str, object]]:
87         """Return tree of depended-on explicit and implicit ProcessSteps."""
88
89         def make_node(step: ProcessStep) -> dict[str, object]:
90             step_process = self.__class__.by_id(db_conn, step.step_process_id)
91             is_explicit = False
92             if external_owner is not None:
93                 is_explicit = step.owner_id == external_owner.id_
94             step_steps = step_process.get_steps(db_conn, external_owner)
95             return {'process': step_process, 'parent_id': step.parent_step_id,
96                     'is_explicit': is_explicit, 'steps': step_steps}
97
98         def walk_steps(node_id: int, node: dict[str, Any]) -> None:
99             explicit_children = [s for s in self.explicit_steps
100                                  if s.parent_step_id == node_id]
101             for child in explicit_children:
102                 node['steps'][child.id_] = make_node(child)
103             node['seen'] = node_id in seen_step_ids
104             seen_step_ids.add(node_id)
105             for id_, step in node['steps'].items():
106                 walk_steps(id_, step)
107
108         steps: dict[int, dict[str, object]] = {}
109         seen_step_ids: Set[int] = set()
110         if external_owner is None:
111             external_owner = self
112         for step in [s for s in self.explicit_steps
113                      if s.parent_step_id is None]:
114             assert step.id_ is not None  # for mypy
115             steps[step.id_] = make_node(step)
116         for step_id, step_node in steps.items():
117             walk_steps(step_id, step_node)
118         return steps
119
120     def add_step(self, db_conn: DatabaseConnection, id_: int | None,
121                  step_process_id: int,
122                  parent_step_id: int | None) -> ProcessStep:
123         """Create new ProcessStep, save and add it to self.explicit_steps.
124
125         Also checks against step recursion.
126         The new step's parent_step_id will fall back to None either if no
127         matching ProcessStep is found (which can be assumed in case it was
128         just deleted under its feet), or if the parent step would not be
129         owned by the current Process.
130         """
131         def walk_steps(node: ProcessStep) -> None:
132             if node.step_process_id == self.id_:
133                 raise BadFormatException('bad step selection causes recursion')
134             step_process = self.by_id(db_conn, node.step_process_id)
135             for step in step_process.explicit_steps:
136                 walk_steps(step)
137         if parent_step_id is not None:
138             try:
139                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
140                 if parent_step.owner_id != self.id_:
141                     parent_step_id = None
142             except NotFoundException:
143                 parent_step_id = None
144         assert self.id_ is not None
145         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
146         walk_steps(step)
147         self.explicit_steps += [step]
148         step.save(db_conn)  # NB: This ensures a non-None step.id_.
149         return step
150
151     def save_without_steps(self, db_conn: DatabaseConnection) -> None:
152         """Add (or re-write) self and connected VersionedAttributes to DB."""
153         cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
154         self.id_ = cursor.lastrowid
155         self.title.save(db_conn)
156         self.description.save(db_conn)
157         self.effort.save(db_conn)
158         assert self.id_ is not None
159         db_conn.cached_processes[self.id_] = self
160
161     def fix_steps(self, db_conn: DatabaseConnection) -> None:
162         """Rewrite ProcessSteps from self.explicit_steps.
163
164         This also fixes illegal Step.parent_step_id values, i.e. those pointing
165         to steps now absent, or owned by a different Process, fall back into
166         .parent_step_id=None
167         """
168         db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
169                      (self.id_,))
170         for step in self.explicit_steps:
171             if step.parent_step_id is not None:
172                 try:
173                     parent_step = ProcessStep.by_id(db_conn,
174                                                     step.parent_step_id)
175                     if parent_step.owner_id != self.id_:
176                         step.parent_step_id = None
177                 except NotFoundException:
178                     step.parent_step_id = None
179             step.save(db_conn)
180
181
182 class ProcessStep:
183     """Sub-unit of Processes."""
184
185     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
186                  parent_step_id: int | None) -> None:
187         self.id_ = id_
188         self.owner_id = owner_id
189         self.step_process_id = step_process_id
190         self.parent_step_id = parent_step_id
191
192     @classmethod
193     def from_table_row(cls, db_conn: DatabaseConnection,
194                        row: Row) -> ProcessStep:
195         """Make ProcessStep from database row, store in DB cache."""
196         step = cls(row[0], row[1], row[2], row[3])
197         assert step.id_ is not None
198         db_conn.cached_process_steps[step.id_] = step
199         return step
200
201     @classmethod
202     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
203         """Retrieve ProcessStep by id_, or throw NotFoundException."""
204         if id_ in db_conn.cached_process_steps.keys():
205             step = db_conn.cached_process_steps[id_]
206             assert isinstance(step, ProcessStep)
207             return step
208         for row in db_conn.exec('SELECT * FROM process_steps '
209                                 'WHERE step_id = ?', (id_,)):
210             return cls.from_table_row(db_conn, row)
211         raise NotFoundException(f'found no ProcessStep of ID {id_}')
212
213     def save(self, db_conn: DatabaseConnection) -> None:
214         """Save to database and cache."""
215         cursor = db_conn.exec('REPLACE INTO process_steps VALUES (?, ?, ?, ?)',
216                               (self.id_, self.owner_id, self.step_process_id,
217                                self.parent_step_id))
218         self.id_ = cursor.lastrowid
219         assert self.id_ is not None
220         db_conn.cached_process_steps[self.id_] = self
221
222
223 class VersionedAttribute:
224     """Attributes whose values are recorded as a timestamped history."""
225
226     def __init__(self,
227                  parent: Process, name: str, default: str | float) -> None:
228         self.parent = parent
229         self.name = name
230         self.default = default
231         self.history: dict[str, str | float] = {}
232
233     @property
234     def _newest_timestamp(self) -> str:
235         """Return most recent timestamp."""
236         return sorted(self.history.keys())[-1]
237
238     @property
239     def newest(self) -> str | float:
240         """Return most recent value, or self.default if self.history empty."""
241         if 0 == len(self.history):
242             return self.default
243         return self.history[self._newest_timestamp]
244
245     def set(self, value: str | float) -> None:
246         """Add to self.history if and only if not same value as newest one."""
247         if 0 == len(self.history) \
248                 or value != self.history[self._newest_timestamp]:
249             self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
250
251     def at(self, queried_time: str) -> str | float:
252         """Retrieve value of timestamp nearest queried_time from the past."""
253         sorted_timestamps = sorted(self.history.keys())
254         if 0 == len(sorted_timestamps):
255             return self.default
256         selected_timestamp = sorted_timestamps[0]
257         for timestamp in sorted_timestamps[1:]:
258             if timestamp > queried_time:
259                 break
260             selected_timestamp = timestamp
261         return self.history[selected_timestamp]
262
263     def save(self, db_conn: DatabaseConnection) -> None:
264         """Save as self.history entries, but first wipe old ones."""
265         db_conn.exec(f'DELETE FROM process_{self.name}s WHERE process_id = ?',
266                      (self.parent.id_,))
267         for timestamp, value in self.history.items():
268             db_conn.exec(f'INSERT INTO process_{self.name}s VALUES (?, ?, ?)',
269                          (self.parent.id_, timestamp, value))