From: Christian Heller Date: Fri, 19 Apr 2024 04:46:11 +0000 (+0200) Subject: Refactor VersionedAttributes, ProcessSteps, and Conditions retrieval. X-Git-Url: https://plomlompom.com/repos/balance?a=commitdiff_plain;h=7adcf651f0053f0dc5d719457a016a0d5b12253b;p=plomtask Refactor VersionedAttributes, ProcessSteps, and Conditions retrieval. --- diff --git a/plomtask/conditions.py b/plomtask/conditions.py index 98a0b99..b87e3ac 100644 --- a/plomtask/conditions.py +++ b/plomtask/conditions.py @@ -25,14 +25,10 @@ class Condition(BaseModel): """Build condition from row, including VersionedAttributes.""" condition = super().from_table_row(db_conn, row) assert isinstance(condition, Condition) - for title_row in db_conn.exec('SELECT * FROM condition_titles ' - 'WHERE parent = ?', (row[0],)): - condition.title.history[title_row[1]]\ - = title_row[2] # pylint: disable=no-member - for desc_row in db_conn.exec('SELECT * FROM condition_descriptions ' - 'WHERE parent = ?', (row[0],)): - condition.description.history[desc_row[1]]\ - = desc_row[2] # pylint: disable=no-member + for name in ('title', 'description'): + table_name = f'condition_{name}s' + for row_ in db_conn.all_where(table_name, 'parent', row[0]): + getattr(condition, name).history_from_row(row_) return condition @classmethod diff --git a/plomtask/db.py b/plomtask/db.py index bf633e2..f45d2b6 100644 --- a/plomtask/db.py +++ b/plomtask/db.py @@ -76,6 +76,11 @@ class DatabaseConnection: q_marks = self.__class__.q_marks_from_values(values) self.exec(f'INSERT INTO {table_name} VALUES {q_marks}', values) + def all_where(self, table_name: str, key: str, target: int) -> list[Row]: + """Return list of Rows at table where key == target.""" + return list(self.exec(f'SELECT * FROM {table_name} WHERE {key} = ?', + (target,))) + @staticmethod def q_marks_from_values(values: tuple[Any]) -> str: """Return placeholder to insert values into SQL code.""" diff --git a/plomtask/misc.py b/plomtask/misc.py index efa8898..5759c0d 100644 --- a/plomtask/misc.py +++ b/plomtask/misc.py @@ -1,6 +1,7 @@ """Attributes whose values are recorded as a timestamped history.""" from datetime import datetime from typing import Any +from sqlite3 import Row from plomtask.db import DatabaseConnection @@ -32,6 +33,10 @@ class VersionedAttribute: or value != self.history[self._newest_timestamp]: self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value + def history_from_row(self, row: Row) -> None: + """Extend self.history from expected table row format.""" + self.history[row[1]] = row[2] + def at(self, queried_time: str) -> str | float: """Retrieve value of timestamp nearest queried_time from the past.""" sorted_timestamps = sorted(self.history.keys()) diff --git a/plomtask/processes.py b/plomtask/processes.py index a3682c5..e5851d0 100644 --- a/plomtask/processes.py +++ b/plomtask/processes.py @@ -47,28 +47,20 @@ class Process(BaseModel): if not create: raise NotFoundException(f'Process not found of id: {id_}') process = Process(id_) - for row in db_conn.exec('SELECT * FROM process_titles ' - 'WHERE parent = ?', (process.id_,)): - process.title.history[row[1]] = row[2] - for row in db_conn.exec('SELECT * FROM process_descriptions ' - 'WHERE parent = ?', (process.id_,)): - process.description.history[row[1]] = row[2] - for row in db_conn.exec('SELECT * FROM process_efforts ' - 'WHERE parent = ?', (process.id_,)): - process.effort.history[row[1]] = row[2] - for row in db_conn.exec('SELECT * FROM process_steps ' - 'WHERE owner = ?', (process.id_,)): - process.explicit_steps += [ProcessStep.from_table_row(db_conn, - row)] - for row in db_conn.exec('SELECT condition FROM process_conditions ' - 'WHERE process = ?', (process.id_,)): - process.conditions += [Condition.by_id(db_conn, row[0])] - for row in db_conn.exec('SELECT condition FROM process_fulfills ' - 'WHERE process = ?', (process.id_,)): - process.fulfills += [Condition.by_id(db_conn, row[0])] - for row in db_conn.exec('SELECT condition FROM process_undoes ' - 'WHERE process = ?', (process.id_,)): - process.undoes += [Condition.by_id(db_conn, row[0])] + if isinstance(process.id_, int): + for name in ('title', 'description', 'effort'): + table = f'process_{name}s' + for row in db_conn.all_where(table, 'parent', process.id_): + getattr(process, name).history_from_row(row) + for row in db_conn.all_where('process_steps', 'owner', + process.id_): + step = ProcessStep.from_table_row(db_conn, row) + process.explicit_steps += [step] + for name in ('conditions', 'fulfills', 'undoes'): + table = f'process_{name}' + for row in db_conn.all_where(table, 'process', process.id_): + target = getattr(process, name) + target += [Condition.by_id(db_conn, row[1])] assert isinstance(process, Process) return process