home · contact · privacy
Refactor object retrieval and creation.
[plomtask] / plomtask / db.py
index fe67e5cc799303efada094f510b537616a30bbd0..ebd8c6c544fd9dd3aca7e040bccb3d354629806d 100644 (file)
@@ -4,7 +4,7 @@ from os.path import isfile
 from difflib import Differ
 from sqlite3 import connect as sql_connect, Cursor, Row
 from typing import Any, Self, TypeVar, Generic
-from plomtask.exceptions import HandledException
+from plomtask.exceptions import HandledException, NotFoundException
 
 PATH_DB_SCHEMA = 'scripts/init.sql'
 EXPECTED_DB_VERSION = 0
@@ -123,6 +123,12 @@ class BaseModel(Generic[BaseModelId]):
     id_: None | BaseModelId
     cache_: dict[BaseModelId, Self]
 
+    def __init__(self, id_: BaseModelId | None) -> None:
+        if isinstance(id_, int) and id_ < 1:
+            msg = f'illegal {self.__class__.__name__} ID, must be >=1: {id_}'
+            raise HandledException(msg)
+        self.id_ = id_
+
     @classmethod
     def get_cached(cls: type[BaseModelInstance],
                    id_: BaseModelId) -> BaseModelInstance | None:
@@ -173,20 +179,32 @@ class BaseModel(Generic[BaseModelId]):
         return obj
 
     @classmethod
-    def _by_id(cls,
-               db_conn: DatabaseConnection,
-               id_: BaseModelId) -> tuple[Self | None, bool]:
+    def _by_id(cls, db_conn: DatabaseConnection,
+               id_: BaseModelId) -> Self | None:
         """Return instance found by ID, or None, and if from cache or not."""
-        from_cache = False
         obj = cls.get_cached(id_)
-        if obj:
-            from_cache = True
-        else:
+        if not obj:
             for row in db_conn.row_where(cls.table_name, 'id', id_):
                 obj = cls.from_table_row(db_conn, row)
                 obj.cache()
                 break
-        return obj, from_cache
+        return obj
+
+    @classmethod
+    def by_id(cls, db_conn: DatabaseConnection,
+              id_: BaseModelId | None,
+              # pylint: disable=unused-argument
+              create: bool = False) -> Self:
+        """Retrieve by id_, on failure throw NotFoundException."""
+        obj = None
+        if id_ is not None:
+            obj = cls._by_id(db_conn, id_)
+        if obj:
+            return obj
+        if create:
+            obj = cls(id_)
+            return obj
+        raise NotFoundException(f'found no object of ID {id_}')
 
     @classmethod
     def all(cls: type[BaseModelInstance],
@@ -199,18 +217,11 @@ class BaseModel(Generic[BaseModelId]):
         already_recorded = items.keys()
         for id_ in db_conn.column_all(cls.table_name, 'id'):
             if id_ not in already_recorded:
-                # pylint: disable=no-member
-                item = cls.by_id(db_conn, id_)  # type: ignore[attr-defined]
+                item = cls.by_id(db_conn, id_)
+                assert item.id_ is not None
                 items[item.id_] = item
         return list(items.values())
 
-    def set_int_id(self, id_: int | None) -> None:
-        """Set id_ if >= 1 or None, else fail."""
-        if (id_ is not None) and id_ < 1:
-            msg = f'illegal {self.__class__.__name__} ID, must be >=1: {id_}'
-            raise HandledException(msg)
-        self.id_ = id_  # type: ignore[assignment]
-
     def save_core(self, db_conn: DatabaseConnection,
                   update_with_lastrowid: bool = True) -> None:
         """Write bare-bones self (sans connected items), ensuring self.id_."""