home · contact · privacy
Display what Processes use focused Process as ProcessStep.
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from datetime import datetime
5 from typing import Any, Set
6 from plomtask.db import DatabaseConnection
7 from plomtask.exceptions import NotFoundException, BadFormatException
8
9
10 class Process:
11     """Template for, and metadata for, Todos, and their arrangements."""
12
13     def __init__(self, id_: int | None) -> None:
14         if (id_ is not None) and id_ < 1:
15             raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
16         self.id_ = id_
17         self.title = VersionedAttribute(self, 'title', 'UNNAMED')
18         self.description = VersionedAttribute(self, 'description', '')
19         self.effort = VersionedAttribute(self, 'effort', 1.0)
20         self.explicit_steps: list[ProcessStep] = []
21
22     def __eq__(self, other: object) -> bool:
23         return isinstance(other, self.__class__) and self.id_ == other.id_
24
25     @classmethod
26     def from_table_row(cls, row: Row) -> Process:
27         """Make Process from database row, with empty VersionedAttributes."""
28         return cls(row[0])
29
30     @classmethod
31     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
32         """Collect all Processes and their connected VersionedAttributes."""
33         processes = {}
34         for row in db_conn.exec('SELECT * FROM processes'):
35             process = cls.from_table_row(row)
36             processes[process.id_] = process
37         for row in db_conn.exec('SELECT * FROM process_titles'):
38             processes[row[0]].title.history[row[1]] = row[2]
39         for row in db_conn.exec('SELECT * FROM process_descriptions'):
40             processes[row[0]].description.history[row[1]] = row[2]
41         for row in db_conn.exec('SELECT * FROM process_efforts'):
42             processes[row[0]].effort.history[row[1]] = row[2]
43         return list(processes.values())
44
45     @classmethod
46     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
47               create: bool = False) -> Process:
48         """Collect Process, its VersionedAttributes, and its child IDs."""
49         process = None
50         for row in db_conn.exec('SELECT * FROM processes '
51                                 'WHERE id = ?', (id_,)):
52             process = cls(row[0])
53             break
54         if not process:
55             if not create:
56                 raise NotFoundException(f'Process not found of id: {id_}')
57             process = Process(id_)
58         if process:
59             for row in db_conn.exec('SELECT * FROM process_titles '
60                                     'WHERE process_id = ?', (process.id_,)):
61                 process.title.history[row[1]] = row[2]
62             for row in db_conn.exec('SELECT * FROM process_descriptions '
63                                     'WHERE process_id = ?', (process.id_,)):
64                 process.description.history[row[1]] = row[2]
65             for row in db_conn.exec('SELECT * FROM process_efforts '
66                                     'WHERE process_id = ?', (process.id_,)):
67                 process.effort.history[row[1]] = row[2]
68             for row in db_conn.exec('SELECT * FROM process_steps '
69                                     'WHERE owner_id = ?', (process.id_,)):
70                 process.explicit_steps += [ProcessStep.from_table_row(row)]
71         return process
72
73     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
74         """Return Processes using self for a ProcessStep."""
75         owner_ids = set()
76         for owner_id in db_conn.exec('SELECT owner_id FROM process_steps WHERE'
77                                      ' step_process_id = ?', (self.id_,)):
78             owner_ids.add(owner_id[0])
79         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
80
81     def get_steps(self, db_conn: DatabaseConnection, external_owner:
82                   Process | None = None) -> dict[int, dict[str, object]]:
83         """Return tree of depended-on explicit and implicit ProcessSteps."""
84
85         def make_node(step: ProcessStep) -> dict[str, object]:
86             step_process = self.__class__.by_id(db_conn, step.step_process_id)
87             is_explicit = False
88             if external_owner is not None:
89                 is_explicit = step.owner_id == external_owner.id_
90             step_steps = step_process.get_steps(db_conn, external_owner)
91             return {'process': step_process, 'parent_id': step.parent_step_id,
92                     'is_explicit': is_explicit, 'steps': step_steps}
93
94         def walk_steps(node_id: int, node: dict[str, Any]) -> None:
95             explicit_children = [s for s in self.explicit_steps
96                                  if s.parent_step_id == node_id]
97             for child in explicit_children:
98                 node['steps'][child.id_] = make_node(child)
99             node['seen'] = node_id in seen_step_ids
100             seen_step_ids.add(node_id)
101             for id_, step in node['steps'].items():
102                 walk_steps(id_, step)
103
104         steps: dict[int, dict[str, object]] = {}
105         seen_step_ids: Set[int] = set()
106         if external_owner is None:
107             external_owner = self
108         for step in [s for s in self.explicit_steps
109                      if s.parent_step_id is None]:
110             assert step.id_ is not None  # for mypy
111             steps[step.id_] = make_node(step)
112         for step_id, step_node in steps.items():
113             walk_steps(step_id, step_node)
114         return steps
115
116     def add_step(self, db_conn: DatabaseConnection, id_: int | None,
117                  step_process_id: int,
118                  parent_step_id: int | None) -> ProcessStep:
119         """Create new ProcessStep, save and add it to self.explicit_steps.
120
121         Also checks against step recursion.
122         The new step's parent_step_id will fall back to None either if no
123         matching ProcessStep is found (which can be assumed in case it was
124         just deleted under its feet), or if the parent step would not be
125         owned by the current Process.
126         """
127         def walk_steps(node: ProcessStep) -> None:
128             if node.step_process_id == self.id_:
129                 raise BadFormatException('bad step selection causes recursion')
130             step_process = self.by_id(db_conn, node.step_process_id)
131             for step in step_process.explicit_steps:
132                 walk_steps(step)
133         if parent_step_id is not None:
134             try:
135                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
136                 if parent_step.owner_id != self.id_:
137                     parent_step_id = None
138             except NotFoundException:
139                 parent_step_id = None
140         assert self.id_ is not None
141         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
142         walk_steps(step)
143         self.explicit_steps += [step]
144         step.save(db_conn)  # NB: This ensures a non-None step.id_.
145         return step
146
147     def save_without_steps(self, db_conn: DatabaseConnection) -> None:
148         """Add (or re-write) self and connected VersionedAttributes to DB."""
149         cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
150         self.id_ = cursor.lastrowid
151         self.title.save(db_conn)
152         self.description.save(db_conn)
153         self.effort.save(db_conn)
154
155     def fix_steps(self, db_conn: DatabaseConnection) -> None:
156         """Rewrite ProcessSteps from self.explicit_steps.
157
158         This also fixes illegal Step.parent_step_id values, i.e. those pointing
159         to steps now absent, or owned by a different Process, fall back into
160         .parent_step_id=None
161         """
162         db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
163                      (self.id_,))
164         for step in self.explicit_steps:
165             if step.parent_step_id is not None:
166                 try:
167                     parent_step = ProcessStep.by_id(db_conn,
168                                                     step.parent_step_id)
169                     if parent_step.owner_id != self.id_:
170                         step.parent_step_id = None
171                 except NotFoundException:
172                     step.parent_step_id = None
173             step.save(db_conn)
174
175
176 class ProcessStep:
177     """Sub-unit of Processes."""
178
179     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
180                  parent_step_id: int | None) -> None:
181         self.id_ = id_
182         self.owner_id = owner_id
183         self.step_process_id = step_process_id
184         self.parent_step_id = parent_step_id
185
186     @classmethod
187     def from_table_row(cls, row: Row) -> ProcessStep:
188         """Make ProcessStep from database row."""
189         return cls(row[0], row[1], row[2], row[3])
190
191     @classmethod
192     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
193         """Retrieve ProcessStep by id_, or throw NotFoundException."""
194         for row in db_conn.exec('SELECT * FROM process_steps '
195                                 'WHERE step_id = ?', (id_,)):
196             return cls.from_table_row(row)
197         raise NotFoundException(f'found no ProcessStep of ID {id_}')
198
199     def save(self, db_conn: DatabaseConnection) -> None:
200         """Save to database."""
201         cursor = db_conn.exec('REPLACE INTO process_steps VALUES (?, ?, ?, ?)',
202                               (self.id_, self.owner_id, self.step_process_id,
203                                self.parent_step_id))
204         self.id_ = cursor.lastrowid
205
206
207 class VersionedAttribute:
208     """Attributes whose values are recorded as a timestamped history."""
209
210     def __init__(self,
211                  parent: Process, name: str, default: str | float) -> None:
212         self.parent = parent
213         self.name = name
214         self.default = default
215         self.history: dict[str, str | float] = {}
216
217     @property
218     def _newest_timestamp(self) -> str:
219         """Return most recent timestamp."""
220         return sorted(self.history.keys())[-1]
221
222     @property
223     def newest(self) -> str | float:
224         """Return most recent value, or self.default if self.history empty."""
225         if 0 == len(self.history):
226             return self.default
227         return self.history[self._newest_timestamp]
228
229     def set(self, value: str | float) -> None:
230         """Add to self.history if and only if not same value as newest one."""
231         if 0 == len(self.history) \
232                 or value != self.history[self._newest_timestamp]:
233             self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
234
235     def at(self, queried_time: str) -> str | float:
236         """Retrieve value of timestamp nearest queried_time from the past."""
237         sorted_timestamps = sorted(self.history.keys())
238         if 0 == len(sorted_timestamps):
239             return self.default
240         selected_timestamp = sorted_timestamps[0]
241         for timestamp in sorted_timestamps[1:]:
242             if timestamp > queried_time:
243                 break
244             selected_timestamp = timestamp
245         return self.history[selected_timestamp]
246
247     def save(self, db_conn: DatabaseConnection) -> None:
248         """Save as self.history entries, but first wipe old ones."""
249         db_conn.exec(f'DELETE FROM process_{self.name}s WHERE process_id = ?',
250                      (self.parent.id_,))
251         for timestamp, value in self.history.items():
252             db_conn.exec(f'INSERT INTO process_{self.name}s VALUES (?, ?, ?)',
253                          (self.parent.id_, timestamp, value))