home · contact · privacy
Replace ProcessChildren with more flexible ProcessStep infrastructure.
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from datetime import datetime
5 from typing import Any
6 from plomtask.db import DatabaseConnection
7 from plomtask.exceptions import NotFoundException, BadFormatException
8
9
10 class Process:
11     """Template for, and metadata for, Todos, and their arrangements."""
12
13     def __init__(self, id_: int | None) -> None:
14         if (id_ is not None) and id_ < 1:
15             raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
16         self.id_ = id_
17         self.title = VersionedAttribute(self, 'title', 'UNNAMED')
18         self.description = VersionedAttribute(self, 'description', '')
19         self.effort = VersionedAttribute(self, 'effort', 1.0)
20         self.explicit_steps: list[ProcessStep] = []
21
22     def __eq__(self, other: object) -> bool:
23         return isinstance(other, self.__class__) and self.id_ == other.id_
24
25     @classmethod
26     def from_table_row(cls, row: Row) -> Process:
27         """Make Process from database row, with empty VersionedAttributes."""
28         return cls(row[0])
29
30     @classmethod
31     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
32         """Collect all Processes and their connected VersionedAttributes."""
33         processes = {}
34         for row in db_conn.exec('SELECT * FROM processes'):
35             process = cls.from_table_row(row)
36             processes[process.id_] = process
37         for row in db_conn.exec('SELECT * FROM process_titles'):
38             processes[row[0]].title.history[row[1]] = row[2]
39         for row in db_conn.exec('SELECT * FROM process_descriptions'):
40             processes[row[0]].description.history[row[1]] = row[2]
41         for row in db_conn.exec('SELECT * FROM process_efforts'):
42             processes[row[0]].effort.history[row[1]] = row[2]
43         return list(processes.values())
44
45     @classmethod
46     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
47               create: bool = False) -> Process:
48         """Collect Process, its VersionedAttributes, and its child IDs."""
49         process = None
50         for row in db_conn.exec('SELECT * FROM processes '
51                                 'WHERE id = ?', (id_,)):
52             process = cls(row[0])
53             break
54         if not process:
55             if not create:
56                 raise NotFoundException(f'Process not found of id: {id_}')
57             process = Process(id_)
58         if process:
59             for row in db_conn.exec('SELECT * FROM process_titles '
60                                     'WHERE process_id = ?', (process.id_,)):
61                 process.title.history[row[1]] = row[2]
62             for row in db_conn.exec('SELECT * FROM process_descriptions '
63                                     'WHERE process_id = ?', (process.id_,)):
64                 process.description.history[row[1]] = row[2]
65             for row in db_conn.exec('SELECT * FROM process_efforts '
66                                     'WHERE process_id = ?', (process.id_,)):
67                 process.effort.history[row[1]] = row[2]
68             for row in db_conn.exec('SELECT * FROM process_steps '
69                                     'WHERE owner_id = ?', (process.id_,)):
70                 process.explicit_steps += [ProcessStep.from_table_row(row)]
71         return process
72
73     def get_steps(self, db_conn: DatabaseConnection, external_owner:
74                   Process | None = None) -> dict[int, dict[str, object]]:
75         """Return tree of depended-on explicit and implicit ProcessSteps."""
76
77         def make_node(step: ProcessStep) -> dict[str, object]:
78             step_process = self.__class__.by_id(db_conn, step.step_process_id)
79             is_explicit = False
80             if external_owner is not None:
81                 is_explicit = step.owner_id == external_owner.id_
82             step_steps = step_process.get_steps(db_conn, external_owner)
83             return {'process': step_process, 'parent_id': step.parent_step_id,
84                     'is_explicit': is_explicit, 'steps': step_steps}
85
86         def walk_steps(node_id: int, node: dict[str, Any]) -> None:
87             explicit_children = [s for s in self.explicit_steps
88                                  if s.parent_step_id == node_id]
89             for child in explicit_children:
90                 node['steps'][child.id_] = make_node(child)
91             for id_, step in node['steps'].items():
92                 walk_steps(id_, step)
93
94         steps: dict[int, dict[str, object]] = {}
95         if external_owner is None:
96             external_owner = self
97         for step in [s for s in self.explicit_steps
98                      if s.parent_step_id is None]:
99             assert step.id_ is not None  # for mypy
100             steps[step.id_] = make_node(step)
101         for step_id, step_node in steps.items():
102             walk_steps(step_id, step_node)
103         return steps
104
105     def add_step(self, db_conn: DatabaseConnection, id_: int | None,
106                  step_process_id: int,
107                  parent_step_id: int | None) -> ProcessStep:
108         """Create new ProcessStep, save and add it to self.explicit_steps.
109
110         Also checks against step recursion.
111         The new step's parent_step_id will fall back to None either if no
112         matching ProcessStep is found (which can be assumed in case it was
113         just deleted under its feet), or if the parent step would not be
114         owned by the current Process.
115         """
116         def walk_steps(node: ProcessStep) -> None:
117             if node.step_process_id == self.id_:
118                 raise BadFormatException('bad step selection causes recursion')
119             step_process = self.by_id(db_conn, node.step_process_id)
120             for step in step_process.explicit_steps:
121                 walk_steps(step)
122         if parent_step_id is not None:
123             try:
124                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
125                 if parent_step.owner_id != self.id_:
126                     parent_step_id = None
127             except NotFoundException:
128                 parent_step_id = None
129         assert self.id_ is not None
130         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
131         walk_steps(step)
132         self.explicit_steps += [step]
133         step.save(db_conn)  # NB: This ensures a non-None step.id_.
134         return step
135
136     def save_without_steps(self, db_conn: DatabaseConnection) -> None:
137         """Add (or re-write) self and connected VersionedAttributes to DB."""
138         cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
139         self.id_ = cursor.lastrowid
140         self.title.save(db_conn)
141         self.description.save(db_conn)
142         self.effort.save(db_conn)
143
144     def fix_steps(self, db_conn: DatabaseConnection) -> None:
145         """Rewrite ProcessSteps from self.explicit_steps.
146
147         This also fixes illegal Step.parent_step_id values, i.e. those pointing
148         to steps now absent, or owned by a different Process, fall back into
149         .parent_step_id=None
150         """
151         db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
152                      (self.id_,))
153         for step in self.explicit_steps:
154             if step.parent_step_id is not None:
155                 try:
156                     parent_step = ProcessStep.by_id(db_conn,
157                                                     step.parent_step_id)
158                     if parent_step.owner_id != self.id_:
159                         step.parent_step_id = None
160                 except NotFoundException:
161                     step.parent_step_id = None
162             step.save(db_conn)
163
164
165 class ProcessStep:
166     """Sub-unit of Processes."""
167
168     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
169                  parent_step_id: int | None) -> None:
170         self.id_ = id_
171         self.owner_id = owner_id
172         self.step_process_id = step_process_id
173         self.parent_step_id = parent_step_id
174
175     @classmethod
176     def from_table_row(cls, row: Row) -> ProcessStep:
177         """Make ProcessStep from database row."""
178         return cls(row[0], row[1], row[2], row[3])
179
180     @classmethod
181     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
182         """Retrieve ProcessStep by id_, or throw NotFoundException."""
183         for row in db_conn.exec('SELECT * FROM process_steps '
184                                 'WHERE step_id = ?', (id_,)):
185             return cls.from_table_row(row)
186         raise NotFoundException(f'found no ProcessStep of ID {id_}')
187
188     def save(self, db_conn: DatabaseConnection) -> None:
189         """Save to database."""
190         cursor = db_conn.exec('REPLACE INTO process_steps VALUES (?, ?, ?, ?)',
191                               (self.id_, self.owner_id, self.step_process_id,
192                                self.parent_step_id))
193         self.id_ = cursor.lastrowid
194
195
196 class VersionedAttribute:
197     """Attributes whose values are recorded as a timestamped history."""
198
199     def __init__(self,
200                  parent: Process, name: str, default: str | float) -> None:
201         self.parent = parent
202         self.name = name
203         self.default = default
204         self.history: dict[str, str | float] = {}
205
206     @property
207     def _newest_timestamp(self) -> str:
208         """Return most recent timestamp."""
209         return sorted(self.history.keys())[-1]
210
211     @property
212     def newest(self) -> str | float:
213         """Return most recent value, or self.default if self.history empty."""
214         if 0 == len(self.history):
215             return self.default
216         return self.history[self._newest_timestamp]
217
218     def set(self, value: str | float) -> None:
219         """Add to self.history if and only if not same value as newest one."""
220         if 0 == len(self.history) \
221                 or value != self.history[self._newest_timestamp]:
222             self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
223
224     def at(self, queried_time: str) -> str | float:
225         """Retrieve value of timestamp nearest queried_time from the past."""
226         sorted_timestamps = sorted(self.history.keys())
227         if 0 == len(sorted_timestamps):
228             return self.default
229         selected_timestamp = sorted_timestamps[0]
230         for timestamp in sorted_timestamps[1:]:
231             if timestamp > queried_time:
232                 break
233             selected_timestamp = timestamp
234         return self.history[selected_timestamp]
235
236     def save(self, db_conn: DatabaseConnection) -> None:
237         """Save as self.history entries, but first wipe old ones."""
238         db_conn.exec(f'DELETE FROM process_{self.name}s WHERE process_id = ?',
239                      (self.parent.id_,))
240         for timestamp, value in self.history.items():
241             db_conn.exec(f'INSERT INTO process_{self.name}s VALUES (?, ?, ?)',
242                          (self.parent.id_, timestamp, value))