home · contact · privacy
Refactor from_table_row methods of core DB models.
[plomtask] / plomtask / db.py
index d6966e62f5a974e7fd6f5156988cfe2b8fbf57dc..2cc1d64d54576a46a0b9c7edb513abaa28d4008e 100644 (file)
@@ -1,8 +1,9 @@
 """Database management."""
 from os.path import isfile
 from difflib import Differ
-from sqlite3 import connect as sql_connect
-from plomtask.misc import HandledException
+from sqlite3 import connect as sql_connect, Cursor, Row
+from typing import Any, Dict
+from plomtask.exceptions import HandledException
 
 PATH_DB_SCHEMA = 'scripts/init.sql'
 
@@ -10,24 +11,24 @@ PATH_DB_SCHEMA = 'scripts/init.sql'
 class DatabaseFile:  # pylint: disable=too-few-public-methods
     """Represents the sqlite3 database's file."""
 
-    def __init__(self, path):
+    def __init__(self, path: str) -> None:
         self.path = path
         self._check()
 
-    def remake(self):
+    def remake(self) -> None:
         """Create tables in self.path file as per PATH_DB_SCHEMA sql file."""
         with sql_connect(self.path) as conn:
             with open(PATH_DB_SCHEMA, 'r', encoding='utf-8') as f:
                 conn.executescript(f.read())
         self._check()
 
-    def _check(self):
-        """Check file exists and is of proper schema."""
+    def _check(self) -> None:
+        """Check file exists, and is of proper schema."""
         self.exists = isfile(self.path)
         if self.exists:
             self._validate_schema()
 
-    def _validate_schema(self):
+    def _validate_schema(self) -> None:
         """Compare found schema with what's stored at PATH_DB_SCHEMA."""
         sql_for_schema = 'SELECT sql FROM sqlite_master ORDER BY sql'
         msg_err = 'Database has wrong tables schema. Diff:\n'
@@ -45,18 +46,61 @@ class DatabaseFile:  # pylint: disable=too-few-public-methods
 class DatabaseConnection:
     """A single connection to the database."""
 
-    def __init__(self, db_file: DatabaseFile):
+    def __init__(self, db_file: DatabaseFile) -> None:
         self.file = db_file
         self.conn = sql_connect(self.file.path)
+        self.cached_todos: Dict[int, Any] = {}
+        self.cached_days: Dict[str, Any] = {}
+        self.cached_process_steps: Dict[int, Any] = {}
+        self.cached_processes: Dict[int, Any] = {}
+        self.cached_conditions: Dict[int, Any] = {}
 
-    def commit(self):
+    def commit(self) -> None:
         """Commit SQL transaction."""
         self.conn.commit()
 
-    def exec(self, code: str, inputs: tuple = ()):
+    def exec(self, code: str, inputs: tuple[Any, ...] = tuple()) -> Cursor:
         """Add commands to SQL transaction."""
         return self.conn.execute(code, inputs)
 
-    def close(self):
+    def close(self) -> None:
         """Close DB connection."""
         self.conn.close()
+
+
+class BaseModel:
+    """Template for most of the models we use/derive from the DB."""
+    table_name = ''
+    to_save: list[str] = []
+    id_: None | int | str
+    id_type: type[Any] = int
+
+    @classmethod
+    def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Any:
+        """Make from DB row, write to DB cache."""
+        obj = cls(*row)
+        assert isinstance(obj.id_, cls.id_type)
+        cache = getattr(db_conn, f'cached_{cls.table_name}')
+        cache[obj.id_] = obj
+        return obj
+
+    def set_int_id(self, id_: int | None) -> None:
+        """Set id_ if >= 1 or None, else fail."""
+        if (id_ is not None) and id_ < 1:
+            msg = f'illegal {self.__class__.__name__} ID, must be >=1: {id_}'
+            raise HandledException(msg)
+        self.id_ = id_
+
+    def save_core(self, db_conn: DatabaseConnection,
+                  update_with_lastrowid: bool = True) -> None:
+        """Write bare-bones self (sans connected items), ensuring self.id_."""
+        q_marks = ','.join(['?'] * (len(self.to_save) + 1))
+        values = tuple([self.id_] + [getattr(self, key)
+                                     for key in self.to_save])
+        table_name = self.table_name
+        cursor = db_conn.exec(f'REPLACE INTO {table_name} VALUES ({q_marks})',
+                              values)
+        if update_with_lastrowid:
+            self.id_ = cursor.lastrowid
+        cache = getattr(db_conn, f'cached_{table_name}')
+        cache[self.id_] = self