home · contact · privacy
Draw Process descendant trees, and guard against recursion within them.
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from datetime import datetime
5 from plomtask.db import DatabaseConnection
6 from plomtask.exceptions import NotFoundException, BadFormatException
7
8
9 class Process:
10     """Template for, and metadata for, Todos, and their arrangements."""
11
12     def __init__(self, id_: int | None) -> None:
13         if (id_ is not None) and id_ < 1:
14             raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
15         self.id_ = id_
16         self.title = VersionedAttribute(self, 'title', 'UNNAMED')
17         self.description = VersionedAttribute(self, 'description', '')
18         self.effort = VersionedAttribute(self, 'effort', 1.0)
19         self.child_ids: list[int] = []
20
21     @classmethod
22     def from_table_row(cls, row: Row) -> Process:
23         """Make Process from database row, with empty VersionedAttributes."""
24         return cls(row[0])
25
26     @classmethod
27     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
28         """Collect all Processes and their connected VersionedAttributes."""
29         processes = {}
30         for row in db_conn.exec('SELECT * FROM processes'):
31             process = cls.from_table_row(row)
32             processes[process.id_] = process
33         for row in db_conn.exec('SELECT * FROM process_titles'):
34             processes[row[0]].title.history[row[1]] = row[2]
35         for row in db_conn.exec('SELECT * FROM process_descriptions'):
36             processes[row[0]].description.history[row[1]] = row[2]
37         for row in db_conn.exec('SELECT * FROM process_efforts'):
38             processes[row[0]].effort.history[row[1]] = row[2]
39         return list(processes.values())
40
41     @classmethod
42     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
43               create: bool = False) -> Process:
44         """Collect Process, its VersionedAttributes, and its child IDs."""
45         process = None
46         for row in db_conn.exec('SELECT * FROM processes '
47                                 'WHERE id = ?', (id_,)):
48             process = cls(row[0])
49             break
50         if not process:
51             if not create:
52                 raise NotFoundException(f'Process not found of id: {id_}')
53             process = Process(id_)
54         if process:
55             for row in db_conn.exec('SELECT * FROM process_titles '
56                                     'WHERE process_id = ?', (process.id_,)):
57                 process.title.history[row[1]] = row[2]
58             for row in db_conn.exec('SELECT * FROM process_descriptions '
59                                     'WHERE process_id = ?', (process.id_,)):
60                 process.description.history[row[1]] = row[2]
61             for row in db_conn.exec('SELECT * FROM process_efforts '
62                                     'WHERE process_id = ?', (process.id_,)):
63                 process.effort.history[row[1]] = row[2]
64             for row in db_conn.exec('SELECT * FROM process_children '
65                                     'WHERE parent_id = ?', (process.id_,)):
66                 process.child_ids += [row[1]]
67         return process
68
69     def get_descendants(self, db_conn: DatabaseConnection) ->\
70             dict[int, dict[str, object]]:
71         """Return tree of descendant Processes"""
72         descendants = {}
73         for id_ in self.child_ids:
74             child = self.__class__.by_id(db_conn, id_)
75             descendants[id_] = {'process': child,
76                                 'children': child.get_descendants(db_conn)}
77         return descendants
78
79     def save(self, db_conn: DatabaseConnection) -> None:
80         """Add (or re-write) self and connected VersionedAttributes to DB.
81
82         Also is the point at which descendancy recursion is checked.
83         """
84         def walk_descendants(node_id: int) -> None:
85             if node_id == self.id_:
86                 raise BadFormatException('bad child selection: recursion')
87             descendant = self.by_id(db_conn, node_id)
88             for descendant_id in descendant.child_ids:
89                 walk_descendants(descendant_id)
90         cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
91         self.id_ = cursor.lastrowid
92         self.title.save(db_conn)
93         self.description.save(db_conn)
94         self.effort.save(db_conn)
95         db_conn.exec('DELETE FROM process_children WHERE parent_id = ?',
96                      (self.id_,))
97         for child_id in self.child_ids:
98             walk_descendants(child_id)
99             db_conn.exec('INSERT INTO process_children VALUES (?, ?)',
100                          (self.id_, child_id))
101
102
103 class VersionedAttribute:
104     """Attributes whose values are recorded as a timestamped history."""
105
106     def __init__(self,
107                  parent: Process, name: str, default: str | float) -> None:
108         self.parent = parent
109         self.name = name
110         self.default = default
111         self.history: dict[str, str | float] = {}
112
113     @property
114     def _newest_timestamp(self) -> str:
115         """Return most recent timestamp."""
116         return sorted(self.history.keys())[-1]
117
118     @property
119     def newest(self) -> str | float:
120         """Return most recent value, or self.default if self.history empty."""
121         if 0 == len(self.history):
122             return self.default
123         return self.history[self._newest_timestamp]
124
125     def set(self, value: str | float) -> None:
126         """Add to self.history if and only if not same value as newest one."""
127         if 0 == len(self.history) \
128                 or value != self.history[self._newest_timestamp]:
129             self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
130
131     def at(self, queried_time: str) -> str | float:
132         """Retrieve value of timestamp nearest queried_time from the past."""
133         sorted_timestamps = sorted(self.history.keys())
134         if 0 == len(sorted_timestamps):
135             return self.default
136         selected_timestamp = sorted_timestamps[0]
137         for timestamp in sorted_timestamps[1:]:
138             if timestamp > queried_time:
139                 break
140             selected_timestamp = timestamp
141         return self.history[selected_timestamp]
142
143     def save(self, db_conn: DatabaseConnection) -> None:
144         """Save as self.history entries, but first wipe old ones."""
145         db_conn.exec(f'DELETE FROM process_{self.name}s WHERE process_id = ?',
146                      (self.parent.id_,))
147         for timestamp, value in self.history.items():
148             db_conn.exec(f'INSERT INTO process_{self.name}s VALUES (?, ?, ?)',
149                          (self.parent.id_, timestamp, value))