home · contact · privacy
0300e73ccbaec2f15a5478a892b9fcc9d231eaa3
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from datetime import datetime
5 from plomtask.db import DatabaseConnection
6 from plomtask.exceptions import NotFoundException, BadFormatException
7
8
9 class Process:
10     """Template for, and metadata for, Todos, and their arrangements."""
11
12     def __init__(self, id_: int | None) -> None:
13         if (id_ is not None) and id_ < 1:
14             raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
15         self.id_ = id_
16         self.title = VersionedAttribute(self, 'title', 'UNNAMED')
17         self.description = VersionedAttribute(self, 'description', '')
18         self.effort = VersionedAttribute(self, 'effort', 1.0)
19
20     @classmethod
21     def from_table_row(cls, row: Row) -> Process:
22         """Make Process from database row, with empty VersionedAttributes."""
23         return cls(row[0])
24
25     @classmethod
26     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
27         """Collect all Processes and their connected VersionedAttributes."""
28         processes = {}
29         for row in db_conn.exec('SELECT * FROM processes'):
30             process = cls.from_table_row(row)
31             processes[process.id_] = process
32         for row in db_conn.exec('SELECT * FROM process_titles'):
33             processes[row[0]].title.history[row[1]] = row[2]
34         for row in db_conn.exec('SELECT * FROM process_descriptions'):
35             processes[row[0]].description.history[row[1]] = row[2]
36         for row in db_conn.exec('SELECT * FROM process_efforts'):
37             processes[row[0]].effort.history[row[1]] = row[2]
38         return list(processes.values())
39
40     @classmethod
41     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
42               create: bool = False) -> Process:
43         """Collect all Processes and their connected VersionedAttributes."""
44         process = None
45         for row in db_conn.exec('SELECT * FROM processes '
46                                 'WHERE id = ?', (id_,)):
47             process = cls(row[0])
48             break
49         if not process:
50             if not create:
51                 raise NotFoundException(f'Process not found of id: {id_}')
52             process = Process(id_)
53         if process:
54             for row in db_conn.exec('SELECT * FROM process_titles '
55                                     'WHERE process_id = ?', (process.id_,)):
56                 process.title.history[row[1]] = row[2]
57             for row in db_conn.exec('SELECT * FROM process_descriptions '
58                                     'WHERE process_id = ?', (process.id_,)):
59                 process.description.history[row[1]] = row[2]
60             for row in db_conn.exec('SELECT * FROM process_efforts '
61                                     'WHERE process_id = ?', (process.id_,)):
62                 process.effort.history[row[1]] = row[2]
63         return process
64
65     def save(self, db_conn: DatabaseConnection) -> None:
66         """Add (or re-write) self and connected VersionedAttributes to DB."""
67         cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
68         self.id_ = cursor.lastrowid
69         self.title.save(db_conn)
70         self.description.save(db_conn)
71         self.effort.save(db_conn)
72
73
74 class VersionedAttribute:
75     """Attributes whose values are recorded as a timestamped history."""
76
77     def __init__(self,
78                  parent: Process, name: str, default: str | float) -> None:
79         self.parent = parent
80         self.name = name
81         self.default = default
82         self.history: dict[str, str | float] = {}
83
84     @property
85     def _newest_timestamp(self) -> str:
86         """Return most recent timestamp."""
87         return sorted(self.history.keys())[-1]
88
89     @property
90     def newest(self) -> str | float:
91         """Return most recent value, or self.default if self.history empty."""
92         if 0 == len(self.history):
93             return self.default
94         return self.history[self._newest_timestamp]
95
96     def set(self, value: str | float) -> None:
97         """Add to self.history if and only if not same value as newest one."""
98         if 0 == len(self.history) \
99                 or value != self.history[self._newest_timestamp]:
100             self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
101
102     def at(self, queried_time: str) -> str | float:
103         """Retrieve value of timestamp nearest queried_time from the past."""
104         sorted_timestamps = sorted(self.history.keys())
105         if 0 == len(sorted_timestamps):
106             return self.default
107         selected_timestamp = sorted_timestamps[0]
108         for timestamp in sorted_timestamps[1:]:
109             if timestamp > queried_time:
110                 break
111             selected_timestamp = timestamp
112         return self.history[selected_timestamp]
113
114     def save(self, db_conn: DatabaseConnection) -> None:
115         """Save as self.history entries, but first wipe old ones."""
116         db_conn.exec(f'DELETE FROM process_{self.name}s WHERE process_id = ?',
117                      (self.parent.id_,))
118         for timestamp, value in self.history.items():
119             db_conn.exec(f'INSERT INTO process_{self.name}s VALUES (?, ?, ?)',
120                          (self.parent.id_, timestamp, value))