home · contact · privacy
Re-write caching.
[plomtask] / plomtask / conditions.py
index 80fc13e4f0a68a9784cc1ec510311f76c0e44320..4b012491d6151ed74995849378fa8dc1c04f20f8 100644 (file)
@@ -7,7 +7,7 @@ from plomtask.misc import VersionedAttribute
 from plomtask.exceptions import NotFoundException
 
 
-class Condition(BaseModel):
+class Condition(BaseModel[int]):
     """Non Process-dependency for ProcessSteps and Todos."""
     table_name = 'conditions'
     to_save = ['is_active']
@@ -25,26 +25,23 @@ class Condition(BaseModel):
         """Build condition from row, including VersionedAttributes."""
         condition = super().from_table_row(db_conn, row)
         assert isinstance(condition, Condition)
-        for title_row in db_conn.exec('SELECT * FROM condition_titles '
-                                      'WHERE parent_id = ?', (row[0],)):
-            condition.title.history[title_row[1]]\
-                    = title_row[2]  # pylint: disable=no-member
-        for desc_row in db_conn.exec('SELECT * FROM condition_descriptions '
-                                     'WHERE parent_id = ?', (row[0],)):
-            condition.description.history[desc_row[1]]\
-                    = desc_row[2]  # pylint: disable=no-member
+        for name in ('title', 'description'):
+            table_name = f'condition_{name}s'
+            for row_ in db_conn.row_where(table_name, 'parent', row[0]):
+                getattr(condition, name).history_from_row(row_)
         return condition
 
     @classmethod
     def all(cls, db_conn: DatabaseConnection) -> list[Condition]:
         """Collect all Conditions and their VersionedAttributes."""
         conditions = {}
-        for id_, condition in db_conn.cached_conditions.items():
+        for id_, condition in cls.cache_.items():
             conditions[id_] = condition
         already_recorded = conditions.keys()
-        for row in db_conn.exec('SELECT id FROM conditions'):
-            if row[0] not in already_recorded:
-                condition = cls.by_id(db_conn, row[0])
+        for id_ in db_conn.column_all('conditions', 'id'):
+            if id_ not in already_recorded:
+                condition = cls.by_id(db_conn, id_)
+                assert isinstance(condition.id_, int)
                 conditions[condition.id_] = condition
         return list(conditions.values())
 
@@ -69,4 +66,26 @@ class Condition(BaseModel):
         self.title.save(db_conn)
         self.description.save(db_conn)
         assert isinstance(self.id_, int)
-        db_conn.cached_conditions[self.id_] = self
+
+
+class ConditionsRelations:
+    """Methods for handling relations to Conditions, for Todo and Process."""
+
+    def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
+                       target: str = 'conditions') -> None:
+        """Set self.[target] to Conditions identified by ids."""
+        target_list = getattr(self, target)
+        while len(target_list) > 0:
+            target_list.pop()
+        for id_ in ids:
+            target_list += [Condition.by_id(db_conn, id_)]
+
+    def set_enables(self, db_conn: DatabaseConnection,
+                    ids: list[int]) -> None:
+        """Set self.enables to Conditions identified by ids."""
+        self.set_conditions(db_conn, ids, 'enables')
+
+    def set_disables(self, db_conn: DatabaseConnection,
+                     ids: list[int]) -> None:
+        """Set self.disables to Conditions identified by ids."""
+        self.set_conditions(db_conn, ids, 'disables')