home · contact · privacy
Refactor VersionedAttributes, ProcessSteps, and Conditions retrieval.
[plomtask] / plomtask / conditions.py
index 5c57d85b9bfb7c402c538aba5716954e5a0a65d7..b87e3ac3e16f1500da87962d2adfb333093f0f54 100644 (file)
@@ -1,19 +1,19 @@
 """Non-doable elements of ProcessStep/Todo chains."""
 from __future__ import annotations
+from typing import Any
 from sqlite3 import Row
-from plomtask.db import DatabaseConnection
+from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.misc import VersionedAttribute
-from plomtask.exceptions import BadFormatException, NotFoundException
+from plomtask.exceptions import NotFoundException
 
 
-class Condition:
+class Condition(BaseModel):
     """Non Process-dependency for ProcessSteps and Todos."""
+    table_name = 'conditions'
+    to_save = ['is_active']
 
     def __init__(self, id_: int | None, is_active: bool = False) -> None:
-        if (id_ is not None) and id_ < 1:
-            msg = f'illegal Condition ID, must be >=1: {id_}'
-            raise BadFormatException(msg)
-        self.id_ = id_
+        self.set_int_id(id_)
         self.is_active = is_active
         self.title = VersionedAttribute(self, 'condition_titles', 'UNNAMED')
         self.description = VersionedAttribute(self, 'condition_descriptions',
@@ -21,15 +21,14 @@ class Condition:
 
     @classmethod
     def from_table_row(cls, db_conn: DatabaseConnection,
-                       row: Row) -> Condition:
+                       row: Row | list[Any]) -> Condition:
         """Build condition from row, including VersionedAttributes."""
-        condition = cls(row[0], row[1])
-        for title_row in db_conn.exec('SELECT * FROM condition_titles '
-                                      'WHERE parent_id = ?', (row[0],)):
-            condition.title.history[title_row[1]] = title_row[2]
-        for desc_row in db_conn.exec('SELECT * FROM condition_descriptions '
-                                     'WHERE parent_id = ?', (row[0],)):
-            condition.description.history[desc_row[1]] = desc_row[2]
+        condition = super().from_table_row(db_conn, row)
+        assert isinstance(condition, Condition)
+        for name in ('title', 'description'):
+            table_name = f'condition_{name}s'
+            for row_ in db_conn.all_where(table_name, 'parent', row[0]):
+                getattr(condition, name).history_from_row(row_)
         return condition
 
     @classmethod
@@ -50,25 +49,20 @@ class Condition:
               create: bool = False) -> Condition:
         """Collect (or create) Condition and its VersionedAttributes."""
         condition = None
-        if id_ in db_conn.cached_conditions.keys():
-            condition = db_conn.cached_conditions[id_]
-        else:
-            for row in db_conn.exec('SELECT * FROM conditions WHERE id = ?',
-                                    (id_,)):
-                condition = cls.from_table_row(db_conn, row)
-                break
+        if id_:
+            condition, _ = super()._by_id(db_conn, id_)
         if not condition:
             if not create:
                 raise NotFoundException(f'Condition not found of id: {id_}')
             condition = cls(id_, False)
+            condition.save(db_conn)
+        assert isinstance(condition, Condition)
         return condition
 
     def save(self, db_conn: DatabaseConnection) -> None:
         """Save self and its VersionedAttributes to DB and cache."""
-        cursor = db_conn.exec('REPLACE INTO conditions VALUES (?, ?)',
-                              (self.id_, self.is_active))
-        self.id_ = cursor.lastrowid
+        self.save_core(db_conn)
         self.title.save(db_conn)
         self.description.save(db_conn)
-        assert self.id_ is not None
+        assert isinstance(self.id_, int)
         db_conn.cached_conditions[self.id_] = self