home · contact · privacy
Refactor from_table_row methods of core DB models.
[plomtask] / plomtask / processes.py
index 45de9dbe8e879f3ec06aae5575305e980df8c0d4..2f8c2d537a168062cee953985242b732f1973a72 100644 (file)
@@ -1,6 +1,5 @@
 """Collecting Processes and Process-related items."""
 from __future__ import annotations
 """Collecting Processes and Process-related items."""
 from __future__ import annotations
-from sqlite3 import Row
 from typing import Any, Set
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.misc import VersionedAttribute
 from typing import Any, Set
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.misc import VersionedAttribute
@@ -24,14 +23,6 @@ class Process(BaseModel):
         self.fulfills: list[Condition] = []
         self.undoes: list[Condition] = []
 
         self.fulfills: list[Condition] = []
         self.undoes: list[Condition] = []
 
-    @classmethod
-    def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Process:
-        """Make Process from database row, with empty VersionedAttributes."""
-        process = cls(row[0])
-        assert isinstance(process.id_, int)
-        db_conn.cached_processes[process.id_] = process
-        return process
-
     @classmethod
     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
         """Collect all Processes and their connected VersionedAttributes."""
     @classmethod
     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
         """Collect all Processes and their connected VersionedAttributes."""
@@ -236,15 +227,6 @@ class ProcessStep(BaseModel):
         self.step_process_id = step_process_id
         self.parent_step_id = parent_step_id
 
         self.step_process_id = step_process_id
         self.parent_step_id = parent_step_id
 
-    @classmethod
-    def from_table_row(cls, db_conn: DatabaseConnection,
-                       row: Row) -> ProcessStep:
-        """Make ProcessStep from database row, store in DB cache."""
-        step = cls(row[0], row[1], row[2], row[3])
-        assert isinstance(step.id_, int)
-        db_conn.cached_process_steps[step.id_] = step
-        return step
-
     @classmethod
     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
         """Retrieve ProcessStep by id_, or throw NotFoundException."""
     @classmethod
     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
         """Retrieve ProcessStep by id_, or throw NotFoundException."""
@@ -254,7 +236,8 @@ class ProcessStep(BaseModel):
             return step
         for row in db_conn.exec('SELECT * FROM process_steps '
                                 'WHERE step_id = ?', (id_,)):
             return step
         for row in db_conn.exec('SELECT * FROM process_steps '
                                 'WHERE step_id = ?', (id_,)):
-            return cls.from_table_row(db_conn, row)
+            step = cls.from_table_row(db_conn, row)
+            assert isinstance(step, ProcessStep)
         raise NotFoundException(f'found no ProcessStep of ID {id_}')
 
     def save(self, db_conn: DatabaseConnection) -> None:
         raise NotFoundException(f'found no ProcessStep of ID {id_}')
 
     def save(self, db_conn: DatabaseConnection) -> None: