home · contact · privacy
Add rump Processes, and to those VersionedAttributes.
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from datetime import datetime
5 from plomtask.db import DatabaseConnection
6
7
8 class Process:
9     """Template for, and metadata for, Todos, and their arrangements."""
10
11     def __init__(self, id_: int | None) -> None:
12         self.id_ = id_ if id_ != 0 else None  # to avoid DB-confusing rowid=0
13         self.title = VersionedAttribute(self, 'title', 'UNNAMED')
14         self.description = VersionedAttribute(self, 'description', '')
15         self.effort = VersionedAttribute(self, 'effort', 1.0)
16
17     @classmethod
18     def from_table_row(cls, row: Row) -> Process:
19         """Make Process from database row, with empty VersionedAttributes."""
20         return cls(row[0])
21
22     @classmethod
23     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
24         """Collect all Processes and their connected VersionedAttributes."""
25         processes = {}
26         for row in db_conn.exec('SELECT * FROM processes'):
27             process = cls.from_table_row(row)
28             processes[process.id_] = process
29         for row in db_conn.exec('SELECT * FROM process_titles'):
30             processes[row[0]].title.history[row[1]] = row[2]
31         for row in db_conn.exec('SELECT * FROM process_descriptions'):
32             processes[row[0]].description.history[row[1]] = row[2]
33         for row in db_conn.exec('SELECT * FROM process_efforts'):
34             processes[row[0]].effort.history[row[1]] = row[2]
35         return list(processes.values())
36
37     @classmethod
38     def by_id(cls, db_conn: DatabaseConnection,
39               id_: int | None, create: bool = False) -> Process | None:
40         """Collect all Processes and their connected VersionedAttributes."""
41         process = None
42         for row in db_conn.exec('SELECT * FROM processes '
43                                 'WHERE id = ?', (id_,)):
44             process = cls(row[0])
45             break
46         if create and not process:
47             process = Process(id_)
48         if process:
49             for row in db_conn.exec('SELECT * FROM process_titles '
50                                     'WHERE process_id = ?', (process.id_,)):
51                 process.title.history[row[1]] = row[2]
52             for row in db_conn.exec('SELECT * FROM process_descriptions '
53                                     'WHERE process_id = ?', (process.id_,)):
54                 process.description.history[row[1]] = row[2]
55             for row in db_conn.exec('SELECT * FROM process_efforts '
56                                     'WHERE process_id = ?', (process.id_,)):
57                 process.effort.history[row[1]] = row[2]
58         return process
59
60     def save(self, db_conn: DatabaseConnection) -> None:
61         """Add (or re-write) self and connected VersionedAttributes to DB."""
62         cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
63         self.id_ = cursor.lastrowid
64         self.title.save(db_conn)
65         self.description.save(db_conn)
66         self.effort.save(db_conn)
67
68
69 class VersionedAttribute:
70     """Attributes whose values are recorded as a timestamped history."""
71
72     def __init__(self,
73                  parent: Process, name: str, default: str | float) -> None:
74         self.parent = parent
75         self.name = name
76         self.default = default
77         self.history: dict[str, str | float] = {}
78
79     @property
80     def _newest_timestamp(self) -> str:
81         """Return most recent timestamp."""
82         return sorted(self.history.keys())[-1]
83
84     @property
85     def newest(self) -> str | float:
86         """Return most recent value, or self.default if self.history empty."""
87         if 0 == len(self.history):
88             return self.default
89         return self.history[self._newest_timestamp]
90
91     def set(self, value: str | float) -> None:
92         """Add to self.history if and only if not same value as newest one."""
93         if 0 == len(self.history) \
94                 or value != self.history[self._newest_timestamp]:
95             self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
96
97     def at(self, queried_time: str) -> str | float:
98         """Retrieve value of timestamp nearest queried_time from the past."""
99         sorted_timestamps = sorted(self.history.keys())
100         if 0 == len(sorted_timestamps):
101             return self.default
102         selected_timestamp = sorted_timestamps[0]
103         for timestamp in sorted_timestamps[1:]:
104             if timestamp > queried_time:
105                 break
106             selected_timestamp = timestamp
107         return self.history[selected_timestamp]
108
109     def save(self, db_conn: DatabaseConnection) -> None:
110         """Save as self.history entries, but first wipe old ones."""
111         db_conn.exec(f'DELETE FROM process_{self.name}s WHERE process_id = ?',
112                      (self.parent.id_,))
113         for timestamp, value in self.history.items():
114             db_conn.exec(f'INSERT INTO process_{self.name}s VALUES (?, ?, ?)',
115                          (self.parent.id_, timestamp, value))