home · contact · privacy
Remove more redundant code.
[plomtask] / plomtask / conditions.py
index 80fc13e4f0a68a9784cc1ec510311f76c0e44320..8d67e5a78c85b1672aaaaac133ab0422e80a6d73 100644 (file)
@@ -7,7 +7,7 @@ from plomtask.misc import VersionedAttribute
 from plomtask.exceptions import NotFoundException
 
 
-class Condition(BaseModel):
+class Condition(BaseModel[int]):
     """Non Process-dependency for ProcessSteps and Todos."""
     table_name = 'conditions'
     to_save = ['is_active']
@@ -24,30 +24,12 @@ class Condition(BaseModel):
                        row: Row | list[Any]) -> Condition:
         """Build condition from row, including VersionedAttributes."""
         condition = super().from_table_row(db_conn, row)
-        assert isinstance(condition, Condition)
-        for title_row in db_conn.exec('SELECT * FROM condition_titles '
-                                      'WHERE parent_id = ?', (row[0],)):
-            condition.title.history[title_row[1]]\
-                    = title_row[2]  # pylint: disable=no-member
-        for desc_row in db_conn.exec('SELECT * FROM condition_descriptions '
-                                     'WHERE parent_id = ?', (row[0],)):
-            condition.description.history[desc_row[1]]\
-                    = desc_row[2]  # pylint: disable=no-member
+        for name in ('title', 'description'):
+            table_name = f'condition_{name}s'
+            for row_ in db_conn.row_where(table_name, 'parent', row[0]):
+                getattr(condition, name).history_from_row(row_)
         return condition
 
-    @classmethod
-    def all(cls, db_conn: DatabaseConnection) -> list[Condition]:
-        """Collect all Conditions and their VersionedAttributes."""
-        conditions = {}
-        for id_, condition in db_conn.cached_conditions.items():
-            conditions[id_] = condition
-        already_recorded = conditions.keys()
-        for row in db_conn.exec('SELECT id FROM conditions'):
-            if row[0] not in already_recorded:
-                condition = cls.by_id(db_conn, row[0])
-                conditions[condition.id_] = condition
-        return list(conditions.values())
-
     @classmethod
     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
               create: bool = False) -> Condition:
@@ -60,7 +42,6 @@ class Condition(BaseModel):
                 raise NotFoundException(f'Condition not found of id: {id_}')
             condition = cls(id_, False)
             condition.save(db_conn)
-        assert isinstance(condition, Condition)
         return condition
 
     def save(self, db_conn: DatabaseConnection) -> None:
@@ -68,5 +49,26 @@ class Condition(BaseModel):
         self.save_core(db_conn)
         self.title.save(db_conn)
         self.description.save(db_conn)
-        assert isinstance(self.id_, int)
-        db_conn.cached_conditions[self.id_] = self
+
+
+class ConditionsRelations:
+    """Methods for handling relations to Conditions, for Todo and Process."""
+
+    def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
+                       target: str = 'conditions') -> None:
+        """Set self.[target] to Conditions identified by ids."""
+        target_list = getattr(self, target)
+        while len(target_list) > 0:
+            target_list.pop()
+        for id_ in ids:
+            target_list += [Condition.by_id(db_conn, id_)]
+
+    def set_enables(self, db_conn: DatabaseConnection,
+                    ids: list[int]) -> None:
+        """Set self.enables to Conditions identified by ids."""
+        self.set_conditions(db_conn, ids, 'enables')
+
+    def set_disables(self, db_conn: DatabaseConnection,
+                     ids: list[int]) -> None:
+        """Set self.disables to Conditions identified by ids."""
+        self.set_conditions(db_conn, ids, 'disables')