home · contact · privacy
Refactor models' .by_id().
[plomtask] / plomtask / processes.py
index 76ed30df049b5d801b283ab904364da27f05f240..7872c335eebd702e327db08481a83a77827fbc5f 100644 (file)
@@ -1,22 +1,20 @@
 """Collecting Processes and Process-related items."""
 from __future__ import annotations
 """Collecting Processes and Process-related items."""
 from __future__ import annotations
-from sqlite3 import Row
 from typing import Any, Set
 from typing import Any, Set
-from plomtask.db import DatabaseConnection
+from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.misc import VersionedAttribute
 from plomtask.conditions import Condition
 from plomtask.exceptions import NotFoundException, BadFormatException
 
 
 from plomtask.misc import VersionedAttribute
 from plomtask.conditions import Condition
 from plomtask.exceptions import NotFoundException, BadFormatException
 
 
-class Process:
+class Process(BaseModel):
     """Template for, and metadata for, Todos, and their arrangements."""
     """Template for, and metadata for, Todos, and their arrangements."""
+    table_name = 'processes'
 
     # pylint: disable=too-many-instance-attributes
 
     def __init__(self, id_: int | None) -> None:
 
     # pylint: disable=too-many-instance-attributes
 
     def __init__(self, id_: int | None) -> None:
-        if (id_ is not None) and id_ < 1:
-            raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
-        self.id_ = id_
+        self.set_int_id(id_)
         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
         self.description = VersionedAttribute(self, 'process_descriptions', '')
         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
         self.description = VersionedAttribute(self, 'process_descriptions', '')
         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
@@ -25,14 +23,6 @@ class Process:
         self.fulfills: list[Condition] = []
         self.undoes: list[Condition] = []
 
         self.fulfills: list[Condition] = []
         self.undoes: list[Condition] = []
 
-    @classmethod
-    def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Process:
-        """Make Process from database row, with empty VersionedAttributes."""
-        process = cls(row[0])
-        assert process.id_ is not None
-        db_conn.cached_processes[process.id_] = process
-        return process
-
     @classmethod
     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
         """Collect all Processes and their connected VersionedAttributes."""
     @classmethod
     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
         """Collect all Processes and their connected VersionedAttributes."""
@@ -50,15 +40,9 @@ class Process:
     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
               create: bool = False) -> Process:
         """Collect Process, its VersionedAttributes, and its child IDs."""
     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
               create: bool = False) -> Process:
         """Collect Process, its VersionedAttributes, and its child IDs."""
-        if id_ in db_conn.cached_processes.keys():
-            process = db_conn.cached_processes[id_]
-            assert isinstance(process, Process)
-            return process
         process = None
         process = None
-        for row in db_conn.exec('SELECT * FROM processes '
-                                'WHERE id = ?', (id_,)):
-            process = cls(row[0])
-            break
+        if id_:
+            process, _ = super()._by_id(db_conn, id_)
         if not process:
             if not create:
                 raise NotFoundException(f'Process not found of id: {id_}')
         if not process:
             if not create:
                 raise NotFoundException(f'Process not found of id: {id_}')
@@ -125,7 +109,7 @@ class Process:
             external_owner = self
         for step in [s for s in self.explicit_steps
                      if s.parent_step_id is None]:
             external_owner = self
         for step in [s for s in self.explicit_steps
                      if s.parent_step_id is None]:
-            assert step.id_ is not None  # for mypy
+            assert isinstance(step.id_, int)
             steps[step.id_] = make_node(step)
         for step_id, step_node in steps.items():
             walk_steps(step_id, step_node)
             steps[step.id_] = make_node(step)
         for step_id, step_node in steps.items():
             walk_steps(step_id, step_node)
@@ -176,7 +160,7 @@ class Process:
                     parent_step_id = None
             except NotFoundException:
                 parent_step_id = None
                     parent_step_id = None
             except NotFoundException:
                 parent_step_id = None
-        assert self.id_ is not None
+        assert isinstance(self.id_, int)
         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
         walk_steps(step)
         self.explicit_steps += [step]
         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
         walk_steps(step)
         self.explicit_steps += [step]
@@ -187,7 +171,7 @@ class Process:
                   steps: list[tuple[int | None, int, int | None]]) -> None:
         """Set self.explicit_steps in bulk."""
         for step in self.explicit_steps:
                   steps: list[tuple[int | None, int, int | None]]) -> None:
         """Set self.explicit_steps in bulk."""
         for step in self.explicit_steps:
-            assert step.id_ is not None
+            assert isinstance(step.id_, int)
             del db_conn.cached_process_steps[step.id_]
         self.explicit_steps = []
         db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
             del db_conn.cached_process_steps[step.id_]
         self.explicit_steps = []
         db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
@@ -196,14 +180,9 @@ class Process:
             self._add_step(db_conn, step_tuple[0],
                            step_tuple[1], step_tuple[2])
 
             self._add_step(db_conn, step_tuple[0],
                            step_tuple[1], step_tuple[2])
 
-    def save_id(self, db_conn: DatabaseConnection) -> None:
-        """Write bare-bones self (sans connected items), ensuring self.id_."""
-        cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
-        self.id_ = cursor.lastrowid
-
     def save(self, db_conn: DatabaseConnection) -> None:
         """Add (or re-write) self and connected items to DB."""
     def save(self, db_conn: DatabaseConnection) -> None:
         """Add (or re-write) self and connected items to DB."""
-        self.save_id(db_conn)
+        self.save_core(db_conn)
         self.title.save(db_conn)
         self.description.save(db_conn)
         self.effort.save(db_conn)
         self.title.save(db_conn)
         self.description.save(db_conn)
         self.effort.save(db_conn)
@@ -222,7 +201,7 @@ class Process:
         for condition in self.undoes:
             db_conn.exec('INSERT INTO process_undoes VALUES (?,?)',
                          (self.id_, condition.id_))
         for condition in self.undoes:
             db_conn.exec('INSERT INTO process_undoes VALUES (?,?)',
                          (self.id_, condition.id_))
-        assert self.id_ is not None
+        assert isinstance(self.id_, int)
         db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
                      (self.id_,))
         for step in self.explicit_steps:
         db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
                      (self.id_,))
         for step in self.explicit_steps:
@@ -230,42 +209,27 @@ class Process:
         db_conn.cached_processes[self.id_] = self
 
 
         db_conn.cached_processes[self.id_] = self
 
 
-class ProcessStep:
+class ProcessStep(BaseModel):
     """Sub-unit of Processes."""
     """Sub-unit of Processes."""
+    table_name = 'process_steps'
+    to_save = ['owner_id', 'step_process_id', 'parent_step_id']
 
     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
                  parent_step_id: int | None) -> None:
 
     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
                  parent_step_id: int | None) -> None:
-        self.id_ = id_
+        self.set_int_id(id_)
         self.owner_id = owner_id
         self.step_process_id = step_process_id
         self.parent_step_id = parent_step_id
 
         self.owner_id = owner_id
         self.step_process_id = step_process_id
         self.parent_step_id = parent_step_id
 
-    @classmethod
-    def from_table_row(cls, db_conn: DatabaseConnection,
-                       row: Row) -> ProcessStep:
-        """Make ProcessStep from database row, store in DB cache."""
-        step = cls(row[0], row[1], row[2], row[3])
-        assert step.id_ is not None
-        db_conn.cached_process_steps[step.id_] = step
-        return step
-
     @classmethod
     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
         """Retrieve ProcessStep by id_, or throw NotFoundException."""
     @classmethod
     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
         """Retrieve ProcessStep by id_, or throw NotFoundException."""
-        if id_ in db_conn.cached_process_steps.keys():
-            step = db_conn.cached_process_steps[id_]
+        step, _ = super()._by_id(db_conn, id_)
+        if step:
             assert isinstance(step, ProcessStep)
             return step
             assert isinstance(step, ProcessStep)
             return step
-        for row in db_conn.exec('SELECT * FROM process_steps '
-                                'WHERE step_id = ?', (id_,)):
-            return cls.from_table_row(db_conn, row)
         raise NotFoundException(f'found no ProcessStep of ID {id_}')
 
     def save(self, db_conn: DatabaseConnection) -> None:
         raise NotFoundException(f'found no ProcessStep of ID {id_}')
 
     def save(self, db_conn: DatabaseConnection) -> None:
-        """Save to database and cache."""
-        cursor = db_conn.exec('REPLACE INTO process_steps VALUES (?, ?, ?, ?)',
-                              (self.id_, self.owner_id, self.step_process_id,
-                               self.parent_step_id))
-        self.id_ = cursor.lastrowid
-        assert self.id_ is not None
-        db_conn.cached_process_steps[self.id_] = self
+        """Default to simply calling self.save_core for simple cases."""
+        self.save_core(db_conn)