home · contact · privacy
Enable deletion of Conditions.
[plomtask] / plomtask / conditions.py
index 80fc13e4f0a68a9784cc1ec510311f76c0e44320..337970924ab25305d64abef13afe24148b0209b1 100644 (file)
@@ -4,16 +4,16 @@ from typing import Any
 from sqlite3 import Row
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.misc import VersionedAttribute
-from plomtask.exceptions import NotFoundException
+from plomtask.exceptions import HandledException
 
 
-class Condition(BaseModel):
-    """Non Process-dependency for ProcessSteps and Todos."""
+class Condition(BaseModel[int]):
+    """Non-Process dependency for ProcessSteps and Todos."""
     table_name = 'conditions'
     to_save = ['is_active']
 
     def __init__(self, id_: int | None, is_active: bool = False) -> None:
-        self.set_int_id(id_)
+        super().__init__(id_)
         self.is_active = is_active
         self.title = VersionedAttribute(self, 'condition_titles', 'UNNAMED')
         self.description = VersionedAttribute(self, 'condition_descriptions',
@@ -24,43 +24,10 @@ class Condition(BaseModel):
                        row: Row | list[Any]) -> Condition:
         """Build condition from row, including VersionedAttributes."""
         condition = super().from_table_row(db_conn, row)
-        assert isinstance(condition, Condition)
-        for title_row in db_conn.exec('SELECT * FROM condition_titles '
-                                      'WHERE parent_id = ?', (row[0],)):
-            condition.title.history[title_row[1]]\
-                    = title_row[2]  # pylint: disable=no-member
-        for desc_row in db_conn.exec('SELECT * FROM condition_descriptions '
-                                     'WHERE parent_id = ?', (row[0],)):
-            condition.description.history[desc_row[1]]\
-                    = desc_row[2]  # pylint: disable=no-member
-        return condition
-
-    @classmethod
-    def all(cls, db_conn: DatabaseConnection) -> list[Condition]:
-        """Collect all Conditions and their VersionedAttributes."""
-        conditions = {}
-        for id_, condition in db_conn.cached_conditions.items():
-            conditions[id_] = condition
-        already_recorded = conditions.keys()
-        for row in db_conn.exec('SELECT id FROM conditions'):
-            if row[0] not in already_recorded:
-                condition = cls.by_id(db_conn, row[0])
-                conditions[condition.id_] = condition
-        return list(conditions.values())
-
-    @classmethod
-    def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
-              create: bool = False) -> Condition:
-        """Collect (or create) Condition and its VersionedAttributes."""
-        condition = None
-        if id_:
-            condition, _ = super()._by_id(db_conn, id_)
-        if not condition:
-            if not create:
-                raise NotFoundException(f'Condition not found of id: {id_}')
-            condition = cls(id_, False)
-            condition.save(db_conn)
-        assert isinstance(condition, Condition)
+        for name in ('title', 'description'):
+            table_name = f'condition_{name}s'
+            for row_ in db_conn.row_where(table_name, 'parent', row[0]):
+                getattr(condition, name).history_from_row(row_)
         return condition
 
     def save(self, db_conn: DatabaseConnection) -> None:
@@ -68,5 +35,36 @@ class Condition(BaseModel):
         self.save_core(db_conn)
         self.title.save(db_conn)
         self.description.save(db_conn)
+
+    def remove(self, db_conn: DatabaseConnection) -> None:
+        """Remove from DB, with dependencies."""
         assert isinstance(self.id_, int)
-        db_conn.cached_conditions[self.id_] = self
+        for item in ('process', 'todo'):
+            for attr in ('conditions', 'enables', 'disables'):
+                table_name = f'{item}_{attr}'
+                for _ in db_conn.row_where(table_name, 'condition', self.id_):
+                    raise HandledException('cannot remove Condition in use')
+        super().remove(db_conn)
+
+
+class ConditionsRelations:
+    """Methods for handling relations to Conditions, for Todo and Process."""
+
+    def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
+                       target: str = 'conditions') -> None:
+        """Set self.[target] to Conditions identified by ids."""
+        target_list = getattr(self, target)
+        while len(target_list) > 0:
+            target_list.pop()
+        for id_ in ids:
+            target_list += [Condition.by_id(db_conn, id_)]
+
+    def set_enables(self, db_conn: DatabaseConnection,
+                    ids: list[int]) -> None:
+        """Set self.enables to Conditions identified by ids."""
+        self.set_conditions(db_conn, ids, 'enables')
+
+    def set_disables(self, db_conn: DatabaseConnection,
+                     ids: list[int]) -> None:
+        """Set self.disables to Conditions identified by ids."""
+        self.set_conditions(db_conn, ids, 'disables')