home · contact · privacy
0a7d5b5d67e4f17b33c52800f381295c12bd3b37
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from datetime import datetime
5 from typing import Any, Set
6 from plomtask.db import DatabaseConnection
7 from plomtask.exceptions import NotFoundException, BadFormatException
8
9
10 class Process:
11     """Template for, and metadata for, Todos, and their arrangements."""
12
13     def __init__(self, id_: int | None) -> None:
14         if (id_ is not None) and id_ < 1:
15             raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
16         self.id_ = id_
17         self.title = VersionedAttribute(self, 'title', 'UNNAMED')
18         self.description = VersionedAttribute(self, 'description', '')
19         self.effort = VersionedAttribute(self, 'effort', 1.0)
20         self.explicit_steps: list[ProcessStep] = []
21
22     def __eq__(self, other: object) -> bool:
23         return isinstance(other, self.__class__) and self.id_ == other.id_
24
25     @classmethod
26     def from_table_row(cls, row: Row) -> Process:
27         """Make Process from database row, with empty VersionedAttributes."""
28         return cls(row[0])
29
30     @classmethod
31     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
32         """Collect all Processes and their connected VersionedAttributes."""
33         processes = {}
34         for row in db_conn.exec('SELECT * FROM processes'):
35             process = cls.from_table_row(row)
36             processes[process.id_] = process
37         for row in db_conn.exec('SELECT * FROM process_titles'):
38             processes[row[0]].title.history[row[1]] = row[2]
39         for row in db_conn.exec('SELECT * FROM process_descriptions'):
40             processes[row[0]].description.history[row[1]] = row[2]
41         for row in db_conn.exec('SELECT * FROM process_efforts'):
42             processes[row[0]].effort.history[row[1]] = row[2]
43         return list(processes.values())
44
45     @classmethod
46     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
47               create: bool = False) -> Process:
48         """Collect Process, its VersionedAttributes, and its child IDs."""
49         process = None
50         for row in db_conn.exec('SELECT * FROM processes '
51                                 'WHERE id = ?', (id_,)):
52             process = cls(row[0])
53             break
54         if not process:
55             if not create:
56                 raise NotFoundException(f'Process not found of id: {id_}')
57             process = Process(id_)
58         if process:
59             for row in db_conn.exec('SELECT * FROM process_titles '
60                                     'WHERE process_id = ?', (process.id_,)):
61                 process.title.history[row[1]] = row[2]
62             for row in db_conn.exec('SELECT * FROM process_descriptions '
63                                     'WHERE process_id = ?', (process.id_,)):
64                 process.description.history[row[1]] = row[2]
65             for row in db_conn.exec('SELECT * FROM process_efforts '
66                                     'WHERE process_id = ?', (process.id_,)):
67                 process.effort.history[row[1]] = row[2]
68             for row in db_conn.exec('SELECT * FROM process_steps '
69                                     'WHERE owner_id = ?', (process.id_,)):
70                 process.explicit_steps += [ProcessStep.from_table_row(row)]
71         return process
72
73     def get_steps(self, db_conn: DatabaseConnection, external_owner:
74                   Process | None = None) -> dict[int, dict[str, object]]:
75         """Return tree of depended-on explicit and implicit ProcessSteps."""
76
77         def make_node(step: ProcessStep) -> dict[str, object]:
78             step_process = self.__class__.by_id(db_conn, step.step_process_id)
79             is_explicit = False
80             if external_owner is not None:
81                 is_explicit = step.owner_id == external_owner.id_
82             step_steps = step_process.get_steps(db_conn, external_owner)
83             return {'process': step_process, 'parent_id': step.parent_step_id,
84                     'is_explicit': is_explicit, 'steps': step_steps}
85
86         def walk_steps(node_id: int, node: dict[str, Any]) -> None:
87             explicit_children = [s for s in self.explicit_steps
88                                  if s.parent_step_id == node_id]
89             for child in explicit_children:
90                 node['steps'][child.id_] = make_node(child)
91             node['seen'] = node_id in seen_step_ids
92             seen_step_ids.add(node_id)
93             for id_, step in node['steps'].items():
94                 walk_steps(id_, step)
95
96         steps: dict[int, dict[str, object]] = {}
97         seen_step_ids: Set[int] = set()
98         if external_owner is None:
99             external_owner = self
100         for step in [s for s in self.explicit_steps
101                      if s.parent_step_id is None]:
102             assert step.id_ is not None  # for mypy
103             steps[step.id_] = make_node(step)
104         for step_id, step_node in steps.items():
105             walk_steps(step_id, step_node)
106         return steps
107
108     def add_step(self, db_conn: DatabaseConnection, id_: int | None,
109                  step_process_id: int,
110                  parent_step_id: int | None) -> ProcessStep:
111         """Create new ProcessStep, save and add it to self.explicit_steps.
112
113         Also checks against step recursion.
114         The new step's parent_step_id will fall back to None either if no
115         matching ProcessStep is found (which can be assumed in case it was
116         just deleted under its feet), or if the parent step would not be
117         owned by the current Process.
118         """
119         def walk_steps(node: ProcessStep) -> None:
120             if node.step_process_id == self.id_:
121                 raise BadFormatException('bad step selection causes recursion')
122             step_process = self.by_id(db_conn, node.step_process_id)
123             for step in step_process.explicit_steps:
124                 walk_steps(step)
125         if parent_step_id is not None:
126             try:
127                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
128                 if parent_step.owner_id != self.id_:
129                     parent_step_id = None
130             except NotFoundException:
131                 parent_step_id = None
132         assert self.id_ is not None
133         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
134         walk_steps(step)
135         self.explicit_steps += [step]
136         step.save(db_conn)  # NB: This ensures a non-None step.id_.
137         return step
138
139     def save_without_steps(self, db_conn: DatabaseConnection) -> None:
140         """Add (or re-write) self and connected VersionedAttributes to DB."""
141         cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
142         self.id_ = cursor.lastrowid
143         self.title.save(db_conn)
144         self.description.save(db_conn)
145         self.effort.save(db_conn)
146
147     def fix_steps(self, db_conn: DatabaseConnection) -> None:
148         """Rewrite ProcessSteps from self.explicit_steps.
149
150         This also fixes illegal Step.parent_step_id values, i.e. those pointing
151         to steps now absent, or owned by a different Process, fall back into
152         .parent_step_id=None
153         """
154         db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
155                      (self.id_,))
156         for step in self.explicit_steps:
157             if step.parent_step_id is not None:
158                 try:
159                     parent_step = ProcessStep.by_id(db_conn,
160                                                     step.parent_step_id)
161                     if parent_step.owner_id != self.id_:
162                         step.parent_step_id = None
163                 except NotFoundException:
164                     step.parent_step_id = None
165             step.save(db_conn)
166
167
168 class ProcessStep:
169     """Sub-unit of Processes."""
170
171     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
172                  parent_step_id: int | None) -> None:
173         self.id_ = id_
174         self.owner_id = owner_id
175         self.step_process_id = step_process_id
176         self.parent_step_id = parent_step_id
177
178     @classmethod
179     def from_table_row(cls, row: Row) -> ProcessStep:
180         """Make ProcessStep from database row."""
181         return cls(row[0], row[1], row[2], row[3])
182
183     @classmethod
184     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
185         """Retrieve ProcessStep by id_, or throw NotFoundException."""
186         for row in db_conn.exec('SELECT * FROM process_steps '
187                                 'WHERE step_id = ?', (id_,)):
188             return cls.from_table_row(row)
189         raise NotFoundException(f'found no ProcessStep of ID {id_}')
190
191     def save(self, db_conn: DatabaseConnection) -> None:
192         """Save to database."""
193         cursor = db_conn.exec('REPLACE INTO process_steps VALUES (?, ?, ?, ?)',
194                               (self.id_, self.owner_id, self.step_process_id,
195                                self.parent_step_id))
196         self.id_ = cursor.lastrowid
197
198
199 class VersionedAttribute:
200     """Attributes whose values are recorded as a timestamped history."""
201
202     def __init__(self,
203                  parent: Process, name: str, default: str | float) -> None:
204         self.parent = parent
205         self.name = name
206         self.default = default
207         self.history: dict[str, str | float] = {}
208
209     @property
210     def _newest_timestamp(self) -> str:
211         """Return most recent timestamp."""
212         return sorted(self.history.keys())[-1]
213
214     @property
215     def newest(self) -> str | float:
216         """Return most recent value, or self.default if self.history empty."""
217         if 0 == len(self.history):
218             return self.default
219         return self.history[self._newest_timestamp]
220
221     def set(self, value: str | float) -> None:
222         """Add to self.history if and only if not same value as newest one."""
223         if 0 == len(self.history) \
224                 or value != self.history[self._newest_timestamp]:
225             self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
226
227     def at(self, queried_time: str) -> str | float:
228         """Retrieve value of timestamp nearest queried_time from the past."""
229         sorted_timestamps = sorted(self.history.keys())
230         if 0 == len(sorted_timestamps):
231             return self.default
232         selected_timestamp = sorted_timestamps[0]
233         for timestamp in sorted_timestamps[1:]:
234             if timestamp > queried_time:
235                 break
236             selected_timestamp = timestamp
237         return self.history[selected_timestamp]
238
239     def save(self, db_conn: DatabaseConnection) -> None:
240         """Save as self.history entries, but first wipe old ones."""
241         db_conn.exec(f'DELETE FROM process_{self.name}s WHERE process_id = ?',
242                      (self.parent.id_,))
243         for timestamp, value in self.history.items():
244             db_conn.exec(f'INSERT INTO process_{self.name}s VALUES (?, ?, ?)',
245                          (self.parent.id_, timestamp, value))