home · contact · privacy
Put mypy into strict mode, adapt code to still pass.
[plomtask] / plomtask / processes.py
1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from datetime import datetime
5 from plomtask.db import DatabaseConnection
6 from plomtask.misc import HandledException
7
8
9 class Process:
10     """Template for, and metadata for, Todos, and their arrangements."""
11
12     def __init__(self, id_: int | None) -> None:
13         self.id_ = id_ if id_ != 0 else None  # to avoid DB-confusing rowid=0
14         self.title = VersionedAttribute(self, 'title', 'UNNAMED')
15         self.description = VersionedAttribute(self, 'description', '')
16         self.effort = VersionedAttribute(self, 'effort', 1.0)
17
18     @classmethod
19     def from_table_row(cls, row: Row) -> Process:
20         """Make Process from database row, with empty VersionedAttributes."""
21         return cls(row[0])
22
23     @classmethod
24     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
25         """Collect all Processes and their connected VersionedAttributes."""
26         processes = {}
27         for row in db_conn.exec('SELECT * FROM processes'):
28             process = cls.from_table_row(row)
29             processes[process.id_] = process
30         for row in db_conn.exec('SELECT * FROM process_titles'):
31             processes[row[0]].title.history[row[1]] = row[2]
32         for row in db_conn.exec('SELECT * FROM process_descriptions'):
33             processes[row[0]].description.history[row[1]] = row[2]
34         for row in db_conn.exec('SELECT * FROM process_efforts'):
35             processes[row[0]].effort.history[row[1]] = row[2]
36         return list(processes.values())
37
38     @classmethod
39     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
40               create: bool = False) -> Process:
41         """Collect all Processes and their connected VersionedAttributes."""
42         process = None
43         for row in db_conn.exec('SELECT * FROM processes '
44                                 'WHERE id = ?', (id_,)):
45             process = cls(row[0])
46             break
47         if not process:
48             if not create:
49                 raise HandledException(f'Process not found of id: {id_}')
50             process = Process(id_)
51         if process:
52             for row in db_conn.exec('SELECT * FROM process_titles '
53                                     'WHERE process_id = ?', (process.id_,)):
54                 process.title.history[row[1]] = row[2]
55             for row in db_conn.exec('SELECT * FROM process_descriptions '
56                                     'WHERE process_id = ?', (process.id_,)):
57                 process.description.history[row[1]] = row[2]
58             for row in db_conn.exec('SELECT * FROM process_efforts '
59                                     'WHERE process_id = ?', (process.id_,)):
60                 process.effort.history[row[1]] = row[2]
61         return process
62
63     def save(self, db_conn: DatabaseConnection) -> None:
64         """Add (or re-write) self and connected VersionedAttributes to DB."""
65         cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
66         self.id_ = cursor.lastrowid
67         self.title.save(db_conn)
68         self.description.save(db_conn)
69         self.effort.save(db_conn)
70
71
72 class VersionedAttribute:
73     """Attributes whose values are recorded as a timestamped history."""
74
75     def __init__(self,
76                  parent: Process, name: str, default: str | float) -> None:
77         self.parent = parent
78         self.name = name
79         self.default = default
80         self.history: dict[str, str | float] = {}
81
82     @property
83     def _newest_timestamp(self) -> str:
84         """Return most recent timestamp."""
85         return sorted(self.history.keys())[-1]
86
87     @property
88     def newest(self) -> str | float:
89         """Return most recent value, or self.default if self.history empty."""
90         if 0 == len(self.history):
91             return self.default
92         return self.history[self._newest_timestamp]
93
94     def set(self, value: str | float) -> None:
95         """Add to self.history if and only if not same value as newest one."""
96         if 0 == len(self.history) \
97                 or value != self.history[self._newest_timestamp]:
98             self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
99
100     def at(self, queried_time: str) -> str | float:
101         """Retrieve value of timestamp nearest queried_time from the past."""
102         sorted_timestamps = sorted(self.history.keys())
103         if 0 == len(sorted_timestamps):
104             return self.default
105         selected_timestamp = sorted_timestamps[0]
106         for timestamp in sorted_timestamps[1:]:
107             if timestamp > queried_time:
108                 break
109             selected_timestamp = timestamp
110         return self.history[selected_timestamp]
111
112     def save(self, db_conn: DatabaseConnection) -> None:
113         """Save as self.history entries, but first wipe old ones."""
114         db_conn.exec(f'DELETE FROM process_{self.name}s WHERE process_id = ?',
115                      (self.parent.id_,))
116         for timestamp, value in self.history.items():
117             db_conn.exec(f'INSERT INTO process_{self.name}s VALUES (?, ?, ?)',
118                          (self.parent.id_, timestamp, value))