X-Git-Url: https://plomlompom.com/repos/?a=blobdiff_plain;f=plomtask%2Fconditions.py;h=cd147cb79cafcfd81f4672fae0a0b6296201ff6a;hb=8e1a5416151dbcf506f2435823362e21d85aed2d;hp=9fab77fc118d81bb650116ec307150fc390532ba;hpb=5195f3f36960b76d1b6530ef1822d0806db221d8;p=plomtask diff --git a/plomtask/conditions.py b/plomtask/conditions.py index 9fab77f..cd147cb 100644 --- a/plomtask/conditions.py +++ b/plomtask/conditions.py @@ -1,5 +1,6 @@ """Non-doable elements of ProcessStep/Todo chains.""" from __future__ import annotations +from typing import Any from sqlite3 import Row from plomtask.db import DatabaseConnection, BaseModel from plomtask.misc import VersionedAttribute @@ -20,18 +21,14 @@ class Condition(BaseModel): @classmethod def from_table_row(cls, db_conn: DatabaseConnection, - row: Row) -> Condition: + row: Row | list[Any]) -> Condition: """Build condition from row, including VersionedAttributes.""" condition = super().from_table_row(db_conn, row) assert isinstance(condition, Condition) - for title_row in db_conn.exec('SELECT * FROM condition_titles ' - 'WHERE parent_id = ?', (row[0],)): - condition.title.history[title_row[1]]\ - = title_row[2] # pylint: disable=no-member - for desc_row in db_conn.exec('SELECT * FROM condition_descriptions ' - 'WHERE parent_id = ?', (row[0],)): - condition.description.history[desc_row[1]]\ - = desc_row[2] # pylint: disable=no-member + for name in ('title', 'description'): + table_name = f'condition_{name}s' + for row_ in db_conn.row_where(table_name, 'parent', row[0]): + getattr(condition, name).history_from_row(row_) return condition @classmethod @@ -41,9 +38,9 @@ class Condition(BaseModel): for id_, condition in db_conn.cached_conditions.items(): conditions[id_] = condition already_recorded = conditions.keys() - for row in db_conn.exec('SELECT id FROM conditions'): - if row[0] not in already_recorded: - condition = cls.by_id(db_conn, row[0]) + for id_ in db_conn.column_all('conditions', 'id'): + if id_ not in already_recorded: + condition = cls.by_id(db_conn, id_) conditions[condition.id_] = condition return list(conditions.values()) @@ -52,17 +49,14 @@ class Condition(BaseModel): create: bool = False) -> Condition: """Collect (or create) Condition and its VersionedAttributes.""" condition = None - if id_ in db_conn.cached_conditions.keys(): - condition = db_conn.cached_conditions[id_] - else: - for row in db_conn.exec('SELECT * FROM conditions WHERE id = ?', - (id_,)): - condition = cls.from_table_row(db_conn, row) - break + if id_: + condition, _ = super()._by_id(db_conn, id_) if not condition: if not create: raise NotFoundException(f'Condition not found of id: {id_}') condition = cls(id_, False) + condition.save(db_conn) + assert isinstance(condition, Condition) return condition def save(self, db_conn: DatabaseConnection) -> None: @@ -72,3 +66,26 @@ class Condition(BaseModel): self.description.save(db_conn) assert isinstance(self.id_, int) db_conn.cached_conditions[self.id_] = self + + +class ConditionsRelations: + """Methods for handling relations to Conditions, for Todo and Process.""" + + def set_conditions(self, db_conn: DatabaseConnection, ids: list[int], + target: str = 'conditions') -> None: + """Set self.[target] to Conditions identified by ids.""" + target_list = getattr(self, target) + while len(target_list) > 0: + target_list.pop() + for id_ in ids: + target_list += [Condition.by_id(db_conn, id_)] + + def set_enables(self, db_conn: DatabaseConnection, + ids: list[int]) -> None: + """Set self.enables to Conditions identified by ids.""" + self.set_conditions(db_conn, ids, 'enables') + + def set_disables(self, db_conn: DatabaseConnection, + ids: list[int]) -> None: + """Set self.disables to Conditions identified by ids.""" + self.set_conditions(db_conn, ids, 'disables')