X-Git-Url: https://plomlompom.com/repos/foo.html?a=blobdiff_plain;f=plomtask%2Fconditions.py;h=9a442000a99befa563582346826244739bf13ae7;hb=2d0d3a138de69e5e09208936ac094b53b0785c0b;hp=5c57d85b9bfb7c402c538aba5716954e5a0a65d7;hpb=34741b65438149b4e02f1e2bb4f8fdba5df5a667;p=plomtask diff --git a/plomtask/conditions.py b/plomtask/conditions.py index 5c57d85..9a44200 100644 --- a/plomtask/conditions.py +++ b/plomtask/conditions.py @@ -1,19 +1,19 @@ """Non-doable elements of ProcessStep/Todo chains.""" from __future__ import annotations +from typing import Any from sqlite3 import Row -from plomtask.db import DatabaseConnection +from plomtask.db import DatabaseConnection, BaseModel from plomtask.misc import VersionedAttribute -from plomtask.exceptions import BadFormatException, NotFoundException +from plomtask.exceptions import NotFoundException -class Condition: +class Condition(BaseModel): """Non Process-dependency for ProcessSteps and Todos.""" + table_name = 'conditions' + to_save = ['is_active'] def __init__(self, id_: int | None, is_active: bool = False) -> None: - if (id_ is not None) and id_ < 1: - msg = f'illegal Condition ID, must be >=1: {id_}' - raise BadFormatException(msg) - self.id_ = id_ + self.set_int_id(id_) self.is_active = is_active self.title = VersionedAttribute(self, 'condition_titles', 'UNNAMED') self.description = VersionedAttribute(self, 'condition_descriptions', @@ -21,15 +21,14 @@ class Condition: @classmethod def from_table_row(cls, db_conn: DatabaseConnection, - row: Row) -> Condition: + row: Row | list[Any]) -> Condition: """Build condition from row, including VersionedAttributes.""" - condition = cls(row[0], row[1]) - for title_row in db_conn.exec('SELECT * FROM condition_titles ' - 'WHERE parent_id = ?', (row[0],)): - condition.title.history[title_row[1]] = title_row[2] - for desc_row in db_conn.exec('SELECT * FROM condition_descriptions ' - 'WHERE parent_id = ?', (row[0],)): - condition.description.history[desc_row[1]] = desc_row[2] + condition = super().from_table_row(db_conn, row) + assert isinstance(condition, Condition) + for name in ('title', 'description'): + table_name = f'condition_{name}s' + for row_ in db_conn.row_where(table_name, 'parent', row[0]): + getattr(condition, name).history_from_row(row_) return condition @classmethod @@ -39,9 +38,9 @@ class Condition: for id_, condition in db_conn.cached_conditions.items(): conditions[id_] = condition already_recorded = conditions.keys() - for row in db_conn.exec('SELECT id FROM conditions'): - if row[0] not in already_recorded: - condition = cls.by_id(db_conn, row[0]) + for id_ in db_conn.column_all('conditions', 'id'): + if id_ not in already_recorded: + condition = cls.by_id(db_conn, id_) conditions[condition.id_] = condition return list(conditions.values()) @@ -50,25 +49,20 @@ class Condition: create: bool = False) -> Condition: """Collect (or create) Condition and its VersionedAttributes.""" condition = None - if id_ in db_conn.cached_conditions.keys(): - condition = db_conn.cached_conditions[id_] - else: - for row in db_conn.exec('SELECT * FROM conditions WHERE id = ?', - (id_,)): - condition = cls.from_table_row(db_conn, row) - break + if id_: + condition, _ = super()._by_id(db_conn, id_) if not condition: if not create: raise NotFoundException(f'Condition not found of id: {id_}') condition = cls(id_, False) + condition.save(db_conn) + assert isinstance(condition, Condition) return condition def save(self, db_conn: DatabaseConnection) -> None: """Save self and its VersionedAttributes to DB and cache.""" - cursor = db_conn.exec('REPLACE INTO conditions VALUES (?, ?)', - (self.id_, self.is_active)) - self.id_ = cursor.lastrowid + self.save_core(db_conn) self.title.save(db_conn) self.description.save(db_conn) - assert self.id_ is not None + assert isinstance(self.id_, int) db_conn.cached_conditions[self.id_] = self