X-Git-Url: https://plomlompom.com/repos/?a=blobdiff_plain;f=plomtask%2Fconditions.py;h=d2559272cd876c5b071b59437998e3356409cd07;hb=refs%2Fheads%2Fmaster;hp=9fab77fc118d81bb650116ec307150fc390532ba;hpb=5195f3f36960b76d1b6530ef1822d0806db221d8;p=plomtask diff --git a/plomtask/conditions.py b/plomtask/conditions.py index 9fab77f..15dcb9d 100644 --- a/plomtask/conditions.py +++ b/plomtask/conditions.py @@ -1,74 +1,73 @@ """Non-doable elements of ProcessStep/Todo chains.""" from __future__ import annotations -from sqlite3 import Row from plomtask.db import DatabaseConnection, BaseModel -from plomtask.misc import VersionedAttribute -from plomtask.exceptions import NotFoundException +from plomtask.versioned_attributes import VersionedAttribute +from plomtask.exceptions import HandledException -class Condition(BaseModel): - """Non Process-dependency for ProcessSteps and Todos.""" +class Condition(BaseModel[int]): + """Non-Process dependency for ProcessSteps and Todos.""" table_name = 'conditions' to_save = ['is_active'] + to_save_versioned = ['title', 'description'] + to_search = ['title.newest', 'description.newest'] + can_create_by_id = True + sorters = {'is_active': lambda c: c.is_active, + 'title': lambda c: c.title.newest} def __init__(self, id_: int | None, is_active: bool = False) -> None: - self.set_int_id(id_) + super().__init__(id_) self.is_active = is_active self.title = VersionedAttribute(self, 'condition_titles', 'UNNAMED') self.description = VersionedAttribute(self, 'condition_descriptions', '') - @classmethod - def from_table_row(cls, db_conn: DatabaseConnection, - row: Row) -> Condition: - """Build condition from row, including VersionedAttributes.""" - condition = super().from_table_row(db_conn, row) - assert isinstance(condition, Condition) - for title_row in db_conn.exec('SELECT * FROM condition_titles ' - 'WHERE parent_id = ?', (row[0],)): - condition.title.history[title_row[1]]\ - = title_row[2] # pylint: disable=no-member - for desc_row in db_conn.exec('SELECT * FROM condition_descriptions ' - 'WHERE parent_id = ?', (row[0],)): - condition.description.history[desc_row[1]]\ - = desc_row[2] # pylint: disable=no-member - return condition + def remove(self, db_conn: DatabaseConnection) -> None: + """Remove from DB, with VersionedAttributes. - @classmethod - def all(cls, db_conn: DatabaseConnection) -> list[Condition]: - """Collect all Conditions and their VersionedAttributes.""" - conditions = {} - for id_, condition in db_conn.cached_conditions.items(): - conditions[id_] = condition - already_recorded = conditions.keys() - for row in db_conn.exec('SELECT id FROM conditions'): - if row[0] not in already_recorded: - condition = cls.by_id(db_conn, row[0]) - conditions[condition.id_] = condition - return list(conditions.values()) + Checks for Todos and Processes that depend on Condition, prohibits + deletion if found. + """ + if self.id_ is not None: + for item in ('process', 'todo'): + for attr in ('conditions', 'blockers', 'enables', 'disables'): + table_name = f'{item}_{attr}' + for _ in db_conn.row_where(table_name, 'condition', + self.id_): + msg = 'cannot remove Condition in use' + raise HandledException(msg) + super().remove(db_conn) - @classmethod - def by_id(cls, db_conn: DatabaseConnection, id_: int | None, - create: bool = False) -> Condition: - """Collect (or create) Condition and its VersionedAttributes.""" - condition = None - if id_ in db_conn.cached_conditions.keys(): - condition = db_conn.cached_conditions[id_] - else: - for row in db_conn.exec('SELECT * FROM conditions WHERE id = ?', - (id_,)): - condition = cls.from_table_row(db_conn, row) - break - if not condition: - if not create: - raise NotFoundException(f'Condition not found of id: {id_}') - condition = cls(id_, False) - return condition - def save(self, db_conn: DatabaseConnection) -> None: - """Save self and its VersionedAttributes to DB and cache.""" - self.save_core(db_conn) - self.title.save(db_conn) - self.description.save(db_conn) - assert isinstance(self.id_, int) - db_conn.cached_conditions[self.id_] = self +class ConditionsRelations: + """Methods for handling relations to Conditions, for Todo and Process.""" + + def __init__(self) -> None: + self.conditions: list[Condition] = [] + self.blockers: list[Condition] = [] + self.enables: list[Condition] = [] + self.disables: list[Condition] = [] + + def set_conditions(self, db_conn: DatabaseConnection, ids: list[int], + target: str = 'conditions') -> None: + """Set self.[target] to Conditions identified by ids.""" + target_list = getattr(self, target) + while len(target_list) > 0: + target_list.pop() + for id_ in ids: + target_list += [Condition.by_id(db_conn, id_)] + + def set_blockers(self, db_conn: DatabaseConnection, + ids: list[int]) -> None: + """Set self.enables to Conditions identified by ids.""" + self.set_conditions(db_conn, ids, 'blockers') + + def set_enables(self, db_conn: DatabaseConnection, + ids: list[int]) -> None: + """Set self.enables to Conditions identified by ids.""" + self.set_conditions(db_conn, ids, 'enables') + + def set_disables(self, db_conn: DatabaseConnection, + ids: list[int]) -> None: + """Set self.disables to Conditions identified by ids.""" + self.set_conditions(db_conn, ids, 'disables')