1 """Collecting Processes and Process-related items."""
2 from __future__ import annotations
3 from sqlite3 import Row
4 from datetime import datetime
5 from plomtask.db import DatabaseConnection
6 from plomtask.exceptions import NotFoundException, BadFormatException
10 """Template for, and metadata for, Todos, and their arrangements."""
12 def __init__(self, id_: int | None) -> None:
13 if (id_ is not None) and id_ < 1:
14 raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
16 self.title = VersionedAttribute(self, 'title', 'UNNAMED')
17 self.description = VersionedAttribute(self, 'description', '')
18 self.effort = VersionedAttribute(self, 'effort', 1.0)
19 self.child_ids: list[int] = []
22 def from_table_row(cls, row: Row) -> Process:
23 """Make Process from database row, with empty VersionedAttributes."""
27 def all(cls, db_conn: DatabaseConnection) -> list[Process]:
28 """Collect all Processes and their connected VersionedAttributes."""
30 for row in db_conn.exec('SELECT * FROM processes'):
31 process = cls.from_table_row(row)
32 processes[process.id_] = process
33 for row in db_conn.exec('SELECT * FROM process_titles'):
34 processes[row[0]].title.history[row[1]] = row[2]
35 for row in db_conn.exec('SELECT * FROM process_descriptions'):
36 processes[row[0]].description.history[row[1]] = row[2]
37 for row in db_conn.exec('SELECT * FROM process_efforts'):
38 processes[row[0]].effort.history[row[1]] = row[2]
39 return list(processes.values())
42 def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
43 create: bool = False) -> Process:
44 """Collect Process, its VersionedAttributes, and its child IDs."""
46 for row in db_conn.exec('SELECT * FROM processes '
47 'WHERE id = ?', (id_,)):
52 raise NotFoundException(f'Process not found of id: {id_}')
53 process = Process(id_)
55 for row in db_conn.exec('SELECT * FROM process_titles '
56 'WHERE process_id = ?', (process.id_,)):
57 process.title.history[row[1]] = row[2]
58 for row in db_conn.exec('SELECT * FROM process_descriptions '
59 'WHERE process_id = ?', (process.id_,)):
60 process.description.history[row[1]] = row[2]
61 for row in db_conn.exec('SELECT * FROM process_efforts '
62 'WHERE process_id = ?', (process.id_,)):
63 process.effort.history[row[1]] = row[2]
64 for row in db_conn.exec('SELECT * FROM process_children '
65 'WHERE parent_id = ?', (process.id_,)):
66 process.child_ids += [row[1]]
69 def get_descendants(self, db_conn: DatabaseConnection) ->\
70 list[dict[str, object]]:
71 """Return tree of descendant Processes"""
73 for id_ in self.child_ids:
74 child = self.__class__.by_id(db_conn, id_)
75 descendants += [{'process': child,
76 'children': child.get_descendants(db_conn)}]
79 def save(self, db_conn: DatabaseConnection) -> None:
80 """Add (or re-write) self and connected VersionedAttributes to DB.
82 Also is the point at which descendancy recursion is checked.
84 def walk_descendants(node_id: int) -> None:
85 if node_id == self.id_:
86 raise BadFormatException('bad child selection: recursion')
87 descendant = self.by_id(db_conn, node_id)
88 for descendant_id in descendant.child_ids:
89 walk_descendants(descendant_id)
90 cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
91 self.id_ = cursor.lastrowid
92 self.title.save(db_conn)
93 self.description.save(db_conn)
94 self.effort.save(db_conn)
95 db_conn.exec('DELETE FROM process_children WHERE parent_id = ?',
97 for child_id in self.child_ids:
98 walk_descendants(child_id)
99 db_conn.exec('INSERT INTO process_children VALUES (?, ?)',
100 (self.id_, child_id))
103 class VersionedAttribute:
104 """Attributes whose values are recorded as a timestamped history."""
107 parent: Process, name: str, default: str | float) -> None:
110 self.default = default
111 self.history: dict[str, str | float] = {}
114 def _newest_timestamp(self) -> str:
115 """Return most recent timestamp."""
116 return sorted(self.history.keys())[-1]
119 def newest(self) -> str | float:
120 """Return most recent value, or self.default if self.history empty."""
121 if 0 == len(self.history):
123 return self.history[self._newest_timestamp]
125 def set(self, value: str | float) -> None:
126 """Add to self.history if and only if not same value as newest one."""
127 if 0 == len(self.history) \
128 or value != self.history[self._newest_timestamp]:
129 self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
131 def at(self, queried_time: str) -> str | float:
132 """Retrieve value of timestamp nearest queried_time from the past."""
133 sorted_timestamps = sorted(self.history.keys())
134 if 0 == len(sorted_timestamps):
136 selected_timestamp = sorted_timestamps[0]
137 for timestamp in sorted_timestamps[1:]:
138 if timestamp > queried_time:
140 selected_timestamp = timestamp
141 return self.history[selected_timestamp]
143 def save(self, db_conn: DatabaseConnection) -> None:
144 """Save as self.history entries, but first wipe old ones."""
145 db_conn.exec(f'DELETE FROM process_{self.name}s WHERE process_id = ?',
147 for timestamp, value in self.history.items():
148 db_conn.exec(f'INSERT INTO process_{self.name}s VALUES (?, ?, ?)',
149 (self.parent.id_, timestamp, value))