X-Git-Url: https://plomlompom.com/repos/?a=blobdiff_plain;f=plomtask%2Fdb.py;h=ebd8c6c544fd9dd3aca7e040bccb3d354629806d;hb=8570f4ce4d44b813a1f02b72c5c45a57d2003bae;hp=fe67e5cc799303efada094f510b537616a30bbd0;hpb=4815fe5c7be508e67ceec144968a81bdd6a923d4;p=plomtask diff --git a/plomtask/db.py b/plomtask/db.py index fe67e5c..ebd8c6c 100644 --- a/plomtask/db.py +++ b/plomtask/db.py @@ -4,7 +4,7 @@ from os.path import isfile from difflib import Differ from sqlite3 import connect as sql_connect, Cursor, Row from typing import Any, Self, TypeVar, Generic -from plomtask.exceptions import HandledException +from plomtask.exceptions import HandledException, NotFoundException PATH_DB_SCHEMA = 'scripts/init.sql' EXPECTED_DB_VERSION = 0 @@ -123,6 +123,12 @@ class BaseModel(Generic[BaseModelId]): id_: None | BaseModelId cache_: dict[BaseModelId, Self] + def __init__(self, id_: BaseModelId | None) -> None: + if isinstance(id_, int) and id_ < 1: + msg = f'illegal {self.__class__.__name__} ID, must be >=1: {id_}' + raise HandledException(msg) + self.id_ = id_ + @classmethod def get_cached(cls: type[BaseModelInstance], id_: BaseModelId) -> BaseModelInstance | None: @@ -173,20 +179,32 @@ class BaseModel(Generic[BaseModelId]): return obj @classmethod - def _by_id(cls, - db_conn: DatabaseConnection, - id_: BaseModelId) -> tuple[Self | None, bool]: + def _by_id(cls, db_conn: DatabaseConnection, + id_: BaseModelId) -> Self | None: """Return instance found by ID, or None, and if from cache or not.""" - from_cache = False obj = cls.get_cached(id_) - if obj: - from_cache = True - else: + if not obj: for row in db_conn.row_where(cls.table_name, 'id', id_): obj = cls.from_table_row(db_conn, row) obj.cache() break - return obj, from_cache + return obj + + @classmethod + def by_id(cls, db_conn: DatabaseConnection, + id_: BaseModelId | None, + # pylint: disable=unused-argument + create: bool = False) -> Self: + """Retrieve by id_, on failure throw NotFoundException.""" + obj = None + if id_ is not None: + obj = cls._by_id(db_conn, id_) + if obj: + return obj + if create: + obj = cls(id_) + return obj + raise NotFoundException(f'found no object of ID {id_}') @classmethod def all(cls: type[BaseModelInstance], @@ -199,18 +217,11 @@ class BaseModel(Generic[BaseModelId]): already_recorded = items.keys() for id_ in db_conn.column_all(cls.table_name, 'id'): if id_ not in already_recorded: - # pylint: disable=no-member - item = cls.by_id(db_conn, id_) # type: ignore[attr-defined] + item = cls.by_id(db_conn, id_) + assert item.id_ is not None items[item.id_] = item return list(items.values()) - def set_int_id(self, id_: int | None) -> None: - """Set id_ if >= 1 or None, else fail.""" - if (id_ is not None) and id_ < 1: - msg = f'illegal {self.__class__.__name__} ID, must be >=1: {id_}' - raise HandledException(msg) - self.id_ = id_ # type: ignore[assignment] - def save_core(self, db_conn: DatabaseConnection, update_with_lastrowid: bool = True) -> None: """Write bare-bones self (sans connected items), ensuring self.id_."""