home · contact · privacy
Refactor object retrieval and creation.
[plomtask] / plomtask / processes.py
index 9705f17a5672336371ba186ff5d52cdd5fe002ef..c0b13b551862b0c8a00c927c4cc0665ae178daeb 100644 (file)
@@ -1,7 +1,8 @@
 """Collecting Processes and Process-related items."""
 from __future__ import annotations
 from dataclasses import dataclass
 """Collecting Processes and Process-related items."""
 from __future__ import annotations
 from dataclasses import dataclass
-from typing import Set
+from typing import Set, Any
+from sqlite3 import Row
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.misc import VersionedAttribute
 from plomtask.conditions import Condition, ConditionsRelations
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.misc import VersionedAttribute
 from plomtask.conditions import Condition, ConditionsRelations
@@ -25,7 +26,7 @@ class Process(BaseModel[int], ConditionsRelations):
     # pylint: disable=too-many-instance-attributes
 
     def __init__(self, id_: int | None) -> None:
     # pylint: disable=too-many-instance-attributes
 
     def __init__(self, id_: int | None) -> None:
-        self.set_int_id(id_)
+        super().__init__(id_)
         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
         self.description = VersionedAttribute(self, 'process_descriptions', '')
         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
         self.description = VersionedAttribute(self, 'process_descriptions', '')
         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
@@ -35,34 +36,26 @@ class Process(BaseModel[int], ConditionsRelations):
         self.disables: list[Condition] = []
 
     @classmethod
         self.disables: list[Condition] = []
 
     @classmethod
-    def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
-              create: bool = False) -> Process:
-        """Collect Process, its VersionedAttributes, and its child IDs."""
-        process = None
-        from_cache = False
-        if id_:
-            process, from_cache = super()._by_id(db_conn, id_)
-        if not from_cache:
-            if not process:
-                if not create:
-                    raise NotFoundException(f'Process not found of id: {id_}')
-                process = Process(id_)
-            if isinstance(process.id_, int):
-                for name in ('title', 'description', 'effort'):
-                    table = f'process_{name}s'
-                    for row in db_conn.row_where(table, 'parent', process.id_):
-                        getattr(process, name).history_from_row(row)
-                for row in db_conn.row_where('process_steps', 'owner',
-                                             process.id_):
-                    step = ProcessStep.from_table_row(db_conn, row)
-                    process.explicit_steps += [step]
-                for name in ('conditions', 'enables', 'disables'):
-                    table = f'process_{name}'
-                    for c_id in db_conn.column_where(table, 'condition',
-                                                     'process', process.id_):
-                        target = getattr(process, name)
-                        target += [Condition.by_id(db_conn, c_id)]
-        assert isinstance(process, Process)
+    def from_table_row(cls, db_conn: DatabaseConnection,
+                       row: Row | list[Any]) -> Process:
+        """Make from DB row, with dependencies."""
+        process = super().from_table_row(db_conn, row)
+        assert isinstance(process.id_, int)
+        for name in ('title', 'description', 'effort'):
+            table = f'process_{name}s'
+            for row_ in db_conn.row_where(table, 'parent', process.id_):
+                getattr(process, name).history_from_row(row_)
+        for row_ in db_conn.row_where('process_steps', 'owner',
+                                      process.id_):
+            step = ProcessStep.from_table_row(db_conn, row_)
+            process.explicit_steps += [step]  # pylint: disable=no-member
+        for name in ('conditions', 'enables', 'disables'):
+            table = f'process_{name}'
+            assert isinstance(process.id_, int)
+            for c_id in db_conn.column_where(table, 'condition',
+                                             'process', process.id_):
+                target = getattr(process, name)
+                target += [Condition.by_id(db_conn, c_id)]
         return process
 
     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
         return process
 
     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
@@ -184,19 +177,11 @@ class ProcessStep(BaseModel[int]):
 
     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
                  parent_step_id: int | None) -> None:
 
     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
                  parent_step_id: int | None) -> None:
-        self.set_int_id(id_)
+        super().__init__(id_)
         self.owner_id = owner_id
         self.step_process_id = step_process_id
         self.parent_step_id = parent_step_id
 
         self.owner_id = owner_id
         self.step_process_id = step_process_id
         self.parent_step_id = parent_step_id
 
-    @classmethod
-    def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
-        """Retrieve ProcessStep by id_, or throw NotFoundException."""
-        step, _ = super()._by_id(db_conn, id_)
-        if step:
-            return step
-        raise NotFoundException(f'found no ProcessStep of ID {id_}')
-
     def save(self, db_conn: DatabaseConnection) -> None:
         """Default to simply calling self.save_core for simple cases."""
         self.save_core(db_conn)
     def save(self, db_conn: DatabaseConnection) -> None:
         """Default to simply calling self.save_core for simple cases."""
         self.save_core(db_conn)