home · contact · privacy
bebd39456a804de397b6357919a0ceb0f245be3d
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from datetime import datetime
5 from plomtask.db import DatabaseConnection
6 from plomtask.exceptions import NotFoundException, BadFormatException
7
8
9 class Process:
10     """Template for, and metadata for, Todos, and their arrangements."""
11
12     def __init__(self, id_: int | None) -> None:
13         if (id_ is not None) and id_ < 1:
14             raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
15         self.id_ = id_
16         self.title = VersionedAttribute(self, 'title', 'UNNAMED')
17         self.description = VersionedAttribute(self, 'description', '')
18         self.effort = VersionedAttribute(self, 'effort', 1.0)
19         self.child_ids: list[int] = []
20
21     @classmethod
22     def from_table_row(cls, row: Row) -> Process:
23         """Make Process from database row, with empty VersionedAttributes."""
24         return cls(row[0])
25
26     @classmethod
27     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
28         """Collect all Processes and their connected VersionedAttributes."""
29         processes = {}
30         for row in db_conn.exec('SELECT * FROM processes'):
31             process = cls.from_table_row(row)
32             processes[process.id_] = process
33         for row in db_conn.exec('SELECT * FROM process_titles'):
34             processes[row[0]].title.history[row[1]] = row[2]
35         for row in db_conn.exec('SELECT * FROM process_descriptions'):
36             processes[row[0]].description.history[row[1]] = row[2]
37         for row in db_conn.exec('SELECT * FROM process_efforts'):
38             processes[row[0]].effort.history[row[1]] = row[2]
39         return list(processes.values())
40
41     @classmethod
42     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
43               create: bool = False) -> Process:
44         """Collect all Processes and their connected VersionedAttributes."""
45         process = None
46         for row in db_conn.exec('SELECT * FROM processes '
47                                 'WHERE id = ?', (id_,)):
48             process = cls(row[0])
49             break
50         if not process:
51             if not create:
52                 raise NotFoundException(f'Process not found of id: {id_}')
53             process = Process(id_)
54         if process:
55             for row in db_conn.exec('SELECT * FROM process_titles '
56                                     'WHERE process_id = ?', (process.id_,)):
57                 process.title.history[row[1]] = row[2]
58             for row in db_conn.exec('SELECT * FROM process_descriptions '
59                                     'WHERE process_id = ?', (process.id_,)):
60                 process.description.history[row[1]] = row[2]
61             for row in db_conn.exec('SELECT * FROM process_efforts '
62                                     'WHERE process_id = ?', (process.id_,)):
63                 process.effort.history[row[1]] = row[2]
64             for row in db_conn.exec('SELECT * FROM process_children '
65                                     'WHERE parent_id = ?', (process.id_,)):
66                 process.child_ids += [row[1]]
67         return process
68
69     def children(self, db_conn: DatabaseConnection) -> list[Process]:
70         """Return child Processes as determined by self.child_ids."""
71         return [self.__class__.by_id(db_conn, id_) for id_ in self.child_ids]
72
73     def save(self, db_conn: DatabaseConnection) -> None:
74         """Add (or re-write) self and connected VersionedAttributes to DB."""
75         cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
76         self.id_ = cursor.lastrowid
77         self.title.save(db_conn)
78         self.description.save(db_conn)
79         self.effort.save(db_conn)
80         db_conn.exec('DELETE FROM process_children WHERE parent_id = ?',
81                      (self.id_,))
82         for child_id in self.child_ids:
83             db_conn.exec('INSERT INTO process_children VALUES (?, ?)',
84                          (self.id_, child_id))
85
86
87 class VersionedAttribute:
88     """Attributes whose values are recorded as a timestamped history."""
89
90     def __init__(self,
91                  parent: Process, name: str, default: str | float) -> None:
92         self.parent = parent
93         self.name = name
94         self.default = default
95         self.history: dict[str, str | float] = {}
96
97     @property
98     def _newest_timestamp(self) -> str:
99         """Return most recent timestamp."""
100         return sorted(self.history.keys())[-1]
101
102     @property
103     def newest(self) -> str | float:
104         """Return most recent value, or self.default if self.history empty."""
105         if 0 == len(self.history):
106             return self.default
107         return self.history[self._newest_timestamp]
108
109     def set(self, value: str | float) -> None:
110         """Add to self.history if and only if not same value as newest one."""
111         if 0 == len(self.history) \
112                 or value != self.history[self._newest_timestamp]:
113             self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
114
115     def at(self, queried_time: str) -> str | float:
116         """Retrieve value of timestamp nearest queried_time from the past."""
117         sorted_timestamps = sorted(self.history.keys())
118         if 0 == len(sorted_timestamps):
119             return self.default
120         selected_timestamp = sorted_timestamps[0]
121         for timestamp in sorted_timestamps[1:]:
122             if timestamp > queried_time:
123                 break
124             selected_timestamp = timestamp
125         return self.history[selected_timestamp]
126
127     def save(self, db_conn: DatabaseConnection) -> None:
128         """Save as self.history entries, but first wipe old ones."""
129         db_conn.exec(f'DELETE FROM process_{self.name}s WHERE process_id = ?',
130                      (self.parent.id_,))
131         for timestamp, value in self.history.items():
132             db_conn.exec(f'INSERT INTO process_{self.name}s VALUES (?, ?, ?)',
133                          (self.parent.id_, timestamp, value))