1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from datetime import datetime
5 from plomtask.db import DatabaseConnection
6 from plomtask.misc import HandledException
10 """Template for, and metadata for, Todos, and their arrangements."""
12 def __init__(self, id_: int | None) -> None:
13 self.id_ = id_ if id_ != 0 else None # to avoid DB-confusing rowid=0
14 self.title = VersionedAttribute(self, 'title', 'UNNAMED')
15 self.description = VersionedAttribute(self, 'description', '')
16 self.effort = VersionedAttribute(self, 'effort', 1.0)
19 def from_table_row(cls, row: Row) -> Process:
20 """Make Process from database row, with empty VersionedAttributes."""
24 def all(cls, db_conn: DatabaseConnection) -> list[Process]:
25 """Collect all Processes and their connected VersionedAttributes."""
27 for row in db_conn.exec('SELECT * FROM processes'):
28 process = cls.from_table_row(row)
29 processes[process.id_] = process
30 for row in db_conn.exec('SELECT * FROM process_titles'):
31 processes[row[0]].title.history[row[1]] = row[2]
32 for row in db_conn.exec('SELECT * FROM process_descriptions'):
33 processes[row[0]].description.history[row[1]] = row[2]
34 for row in db_conn.exec('SELECT * FROM process_efforts'):
35 processes[row[0]].effort.history[row[1]] = row[2]
36 return list(processes.values())
39 def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
40 create: bool = False) -> Process:
41 """Collect all Processes and their connected VersionedAttributes."""
43 for row in db_conn.exec('SELECT * FROM processes '
44 'WHERE id = ?', (id_,)):
49 raise HandledException(f'Process not found of id: {id_}')
50 process = Process(id_)
52 for row in db_conn.exec('SELECT * FROM process_titles '
53 'WHERE process_id = ?', (process.id_,)):
54 process.title.history[row[1]] = row[2]
55 for row in db_conn.exec('SELECT * FROM process_descriptions '
56 'WHERE process_id = ?', (process.id_,)):
57 process.description.history[row[1]] = row[2]
58 for row in db_conn.exec('SELECT * FROM process_efforts '
59 'WHERE process_id = ?', (process.id_,)):
60 process.effort.history[row[1]] = row[2]
63 def save(self, db_conn: DatabaseConnection) -> None:
64 """Add (or re-write) self and connected VersionedAttributes to DB."""
65 cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
66 self.id_ = cursor.lastrowid
67 self.title.save(db_conn)
68 self.description.save(db_conn)
69 self.effort.save(db_conn)
72 class VersionedAttribute:
73 """Attributes whose values are recorded as a timestamped history."""
76 parent: Process, name: str, default: str | float) -> None:
79 self.default = default
80 self.history: dict[str, str | float] = {}
83 def _newest_timestamp(self) -> str:
84 """Return most recent timestamp."""
85 return sorted(self.history.keys())[-1]
88 def newest(self) -> str | float:
89 """Return most recent value, or self.default if self.history empty."""
90 if 0 == len(self.history):
92 return self.history[self._newest_timestamp]
94 def set(self, value: str | float) -> None:
95 """Add to self.history if and only if not same value as newest one."""
96 if 0 == len(self.history) \
97 or value != self.history[self._newest_timestamp]:
98 self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
100 def at(self, queried_time: str) -> str | float:
101 """Retrieve value of timestamp nearest queried_time from the past."""
102 sorted_timestamps = sorted(self.history.keys())
103 if 0 == len(sorted_timestamps):
105 selected_timestamp = sorted_timestamps[0]
106 for timestamp in sorted_timestamps[1:]:
107 if timestamp > queried_time:
109 selected_timestamp = timestamp
110 return self.history[selected_timestamp]
112 def save(self, db_conn: DatabaseConnection) -> None:
113 """Save as self.history entries, but first wipe old ones."""
114 db_conn.exec(f'DELETE FROM process_{self.name}s WHERE process_id = ?',
116 for timestamp, value in self.history.items():
117 db_conn.exec(f'INSERT INTO process_{self.name}s VALUES (?, ?, ?)',
118 (self.parent.id_, timestamp, value))