1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from datetime import datetime
5 from plomtask.db import DatabaseConnection
9 """Template for, and metadata for, Todos, and their arrangements."""
11 def __init__(self, id_: int | None) -> None:
12 self.id_ = id_ if id_ != 0 else None # to avoid DB-confusing rowid=0
13 self.title = VersionedAttribute(self, 'title', 'UNNAMED')
14 self.description = VersionedAttribute(self, 'description', '')
15 self.effort = VersionedAttribute(self, 'effort', 1.0)
18 def from_table_row(cls, row: Row) -> Process:
19 """Make Process from database row, with empty VersionedAttributes."""
23 def all(cls, db_conn: DatabaseConnection) -> list[Process]:
24 """Collect all Processes and their connected VersionedAttributes."""
26 for row in db_conn.exec('SELECT * FROM processes'):
27 process = cls.from_table_row(row)
28 processes[process.id_] = process
29 for row in db_conn.exec('SELECT * FROM process_titles'):
30 processes[row[0]].title.history[row[1]] = row[2]
31 for row in db_conn.exec('SELECT * FROM process_descriptions'):
32 processes[row[0]].description.history[row[1]] = row[2]
33 for row in db_conn.exec('SELECT * FROM process_efforts'):
34 processes[row[0]].effort.history[row[1]] = row[2]
35 return list(processes.values())
38 def by_id(cls, db_conn: DatabaseConnection,
39 id_: int | None, create: bool = False) -> Process | None:
40 """Collect all Processes and their connected VersionedAttributes."""
42 for row in db_conn.exec('SELECT * FROM processes '
43 'WHERE id = ?', (id_,)):
46 if create and not process:
47 process = Process(id_)
49 for row in db_conn.exec('SELECT * FROM process_titles '
50 'WHERE process_id = ?', (process.id_,)):
51 process.title.history[row[1]] = row[2]
52 for row in db_conn.exec('SELECT * FROM process_descriptions '
53 'WHERE process_id = ?', (process.id_,)):
54 process.description.history[row[1]] = row[2]
55 for row in db_conn.exec('SELECT * FROM process_efforts '
56 'WHERE process_id = ?', (process.id_,)):
57 process.effort.history[row[1]] = row[2]
60 def save(self, db_conn: DatabaseConnection) -> None:
61 """Add (or re-write) self and connected VersionedAttributes to DB."""
62 cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
63 self.id_ = cursor.lastrowid
64 self.title.save(db_conn)
65 self.description.save(db_conn)
66 self.effort.save(db_conn)
69 class VersionedAttribute:
70 """Attributes whose values are recorded as a timestamped history."""
73 parent: Process, name: str, default: str | float) -> None:
76 self.default = default
77 self.history: dict[str, str | float] = {}
80 def _newest_timestamp(self) -> str:
81 """Return most recent timestamp."""
82 return sorted(self.history.keys())[-1]
85 def newest(self) -> str | float:
86 """Return most recent value, or self.default if self.history empty."""
87 if 0 == len(self.history):
89 return self.history[self._newest_timestamp]
91 def set(self, value: str | float) -> None:
92 """Add to self.history if and only if not same value as newest one."""
93 if 0 == len(self.history) \
94 or value != self.history[self._newest_timestamp]:
95 self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
97 def at(self, queried_time: str) -> str | float:
98 """Retrieve value of timestamp nearest queried_time from the past."""
99 sorted_timestamps = sorted(self.history.keys())
100 if 0 == len(sorted_timestamps):
102 selected_timestamp = sorted_timestamps[0]
103 for timestamp in sorted_timestamps[1:]:
104 if timestamp > queried_time:
106 selected_timestamp = timestamp
107 return self.history[selected_timestamp]
109 def save(self, db_conn: DatabaseConnection) -> None:
110 """Save as self.history entries, but first wipe old ones."""
111 db_conn.exec(f'DELETE FROM process_{self.name}s WHERE process_id = ?',
113 for timestamp, value in self.history.items():
114 db_conn.exec(f'INSERT INTO process_{self.name}s VALUES (?, ?, ?)',
115 (self.parent.id_, timestamp, value))