1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from datetime import datetime
5 from plomtask.db import DatabaseConnection
6 from plomtask.exceptions import NotFoundException, BadFormatException
10 """Template for, and metadata for, Todos, and their arrangements."""
12 def __init__(self, id_: int | None) -> None:
13 if (id_ is not None) and id_ < 1:
14 raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
16 self.title = VersionedAttribute(self, 'title', 'UNNAMED')
17 self.description = VersionedAttribute(self, 'description', '')
18 self.effort = VersionedAttribute(self, 'effort', 1.0)
19 self.child_ids: list[int] = []
22 def from_table_row(cls, row: Row) -> Process:
23 """Make Process from database row, with empty VersionedAttributes."""
27 def all(cls, db_conn: DatabaseConnection) -> list[Process]:
28 """Collect all Processes and their connected VersionedAttributes."""
30 for row in db_conn.exec('SELECT * FROM processes'):
31 process = cls.from_table_row(row)
32 processes[process.id_] = process
33 for row in db_conn.exec('SELECT * FROM process_titles'):
34 processes[row[0]].title.history[row[1]] = row[2]
35 for row in db_conn.exec('SELECT * FROM process_descriptions'):
36 processes[row[0]].description.history[row[1]] = row[2]
37 for row in db_conn.exec('SELECT * FROM process_efforts'):
38 processes[row[0]].effort.history[row[1]] = row[2]
39 return list(processes.values())
42 def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
43 create: bool = False) -> Process:
44 """Collect all Processes and their connected VersionedAttributes."""
46 for row in db_conn.exec('SELECT * FROM processes '
47 'WHERE id = ?', (id_,)):
52 raise NotFoundException(f'Process not found of id: {id_}')
53 process = Process(id_)
55 for row in db_conn.exec('SELECT * FROM process_titles '
56 'WHERE process_id = ?', (process.id_,)):
57 process.title.history[row[1]] = row[2]
58 for row in db_conn.exec('SELECT * FROM process_descriptions '
59 'WHERE process_id = ?', (process.id_,)):
60 process.description.history[row[1]] = row[2]
61 for row in db_conn.exec('SELECT * FROM process_efforts '
62 'WHERE process_id = ?', (process.id_,)):
63 process.effort.history[row[1]] = row[2]
64 for row in db_conn.exec('SELECT * FROM process_children '
65 'WHERE parent_id = ?', (process.id_,)):
66 process.child_ids += [row[1]]
69 def children(self, db_conn: DatabaseConnection) -> list[Process]:
70 """Return child Processes as determined by self.child_ids."""
71 return [self.__class__.by_id(db_conn, id_) for id_ in self.child_ids]
73 def save(self, db_conn: DatabaseConnection) -> None:
74 """Add (or re-write) self and connected VersionedAttributes to DB."""
75 cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
76 self.id_ = cursor.lastrowid
77 self.title.save(db_conn)
78 self.description.save(db_conn)
79 self.effort.save(db_conn)
80 db_conn.exec('DELETE FROM process_children WHERE parent_id = ?',
82 for child_id in self.child_ids:
83 db_conn.exec('INSERT INTO process_children VALUES (?, ?)',
87 class VersionedAttribute:
88 """Attributes whose values are recorded as a timestamped history."""
91 parent: Process, name: str, default: str | float) -> None:
94 self.default = default
95 self.history: dict[str, str | float] = {}
98 def _newest_timestamp(self) -> str:
99 """Return most recent timestamp."""
100 return sorted(self.history.keys())[-1]
103 def newest(self) -> str | float:
104 """Return most recent value, or self.default if self.history empty."""
105 if 0 == len(self.history):
107 return self.history[self._newest_timestamp]
109 def set(self, value: str | float) -> None:
110 """Add to self.history if and only if not same value as newest one."""
111 if 0 == len(self.history) \
112 or value != self.history[self._newest_timestamp]:
113 self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
115 def at(self, queried_time: str) -> str | float:
116 """Retrieve value of timestamp nearest queried_time from the past."""
117 sorted_timestamps = sorted(self.history.keys())
118 if 0 == len(sorted_timestamps):
120 selected_timestamp = sorted_timestamps[0]
121 for timestamp in sorted_timestamps[1:]:
122 if timestamp > queried_time:
124 selected_timestamp = timestamp
125 return self.history[selected_timestamp]
127 def save(self, db_conn: DatabaseConnection) -> None:
128 """Save as self.history entries, but first wipe old ones."""
129 db_conn.exec(f'DELETE FROM process_{self.name}s WHERE process_id = ?',
131 for timestamp, value in self.history.items():
132 db_conn.exec(f'INSERT INTO process_{self.name}s VALUES (?, ?, ?)',
133 (self.parent.id_, timestamp, value))