home · contact · privacy
Refactor models' .by_id().
[plomtask] / plomtask / processes.py
index 45de9dbe8e879f3ec06aae5575305e980df8c0d4..7872c335eebd702e327db08481a83a77827fbc5f 100644 (file)
@@ -1,6 +1,5 @@
 """Collecting Processes and Process-related items."""
 from __future__ import annotations
 """Collecting Processes and Process-related items."""
 from __future__ import annotations
-from sqlite3 import Row
 from typing import Any, Set
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.misc import VersionedAttribute
 from typing import Any, Set
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.misc import VersionedAttribute
@@ -24,14 +23,6 @@ class Process(BaseModel):
         self.fulfills: list[Condition] = []
         self.undoes: list[Condition] = []
 
         self.fulfills: list[Condition] = []
         self.undoes: list[Condition] = []
 
-    @classmethod
-    def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Process:
-        """Make Process from database row, with empty VersionedAttributes."""
-        process = cls(row[0])
-        assert isinstance(process.id_, int)
-        db_conn.cached_processes[process.id_] = process
-        return process
-
     @classmethod
     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
         """Collect all Processes and their connected VersionedAttributes."""
     @classmethod
     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
         """Collect all Processes and their connected VersionedAttributes."""
@@ -49,15 +40,9 @@ class Process(BaseModel):
     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
               create: bool = False) -> Process:
         """Collect Process, its VersionedAttributes, and its child IDs."""
     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
               create: bool = False) -> Process:
         """Collect Process, its VersionedAttributes, and its child IDs."""
-        if id_ in db_conn.cached_processes.keys():
-            process = db_conn.cached_processes[id_]
-            assert isinstance(process, Process)
-            return process
         process = None
         process = None
-        for row in db_conn.exec('SELECT * FROM processes '
-                                'WHERE id = ?', (id_,)):
-            process = cls(row[0])
-            break
+        if id_:
+            process, _ = super()._by_id(db_conn, id_)
         if not process:
             if not create:
                 raise NotFoundException(f'Process not found of id: {id_}')
         if not process:
             if not create:
                 raise NotFoundException(f'Process not found of id: {id_}')
@@ -236,25 +221,13 @@ class ProcessStep(BaseModel):
         self.step_process_id = step_process_id
         self.parent_step_id = parent_step_id
 
         self.step_process_id = step_process_id
         self.parent_step_id = parent_step_id
 
-    @classmethod
-    def from_table_row(cls, db_conn: DatabaseConnection,
-                       row: Row) -> ProcessStep:
-        """Make ProcessStep from database row, store in DB cache."""
-        step = cls(row[0], row[1], row[2], row[3])
-        assert isinstance(step.id_, int)
-        db_conn.cached_process_steps[step.id_] = step
-        return step
-
     @classmethod
     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
         """Retrieve ProcessStep by id_, or throw NotFoundException."""
     @classmethod
     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
         """Retrieve ProcessStep by id_, or throw NotFoundException."""
-        if id_ in db_conn.cached_process_steps.keys():
-            step = db_conn.cached_process_steps[id_]
+        step, _ = super()._by_id(db_conn, id_)
+        if step:
             assert isinstance(step, ProcessStep)
             return step
             assert isinstance(step, ProcessStep)
             return step
-        for row in db_conn.exec('SELECT * FROM process_steps '
-                                'WHERE step_id = ?', (id_,)):
-            return cls.from_table_row(db_conn, row)
         raise NotFoundException(f'found no ProcessStep of ID {id_}')
 
     def save(self, db_conn: DatabaseConnection) -> None:
         raise NotFoundException(f'found no ProcessStep of ID {id_}')
 
     def save(self, db_conn: DatabaseConnection) -> None: