home · contact · privacy
Base core models on BaseModel providing sensible defaults.
[plomtask] / plomtask / db.py
index a8bdbec166fedbcfd3531717917b1291d364efce..dfb388b7ec6ed9317f19a788336025a96e4c745e 100644 (file)
@@ -1,39 +1,96 @@
 """Database management."""
 from os.path import isfile
-from sqlite3 import connect as sql_connect
-from plomtask.misc import HandledException
+from difflib import Differ
+from sqlite3 import connect as sql_connect, Cursor
+from typing import Any, Dict
+from plomtask.exceptions import HandledException
 
 PATH_DB_SCHEMA = 'scripts/init.sql'
 
 
-class DatabaseFile:
+class DatabaseFile:  # pylint: disable=too-few-public-methods
     """Represents the sqlite3 database's file."""
 
-    def __init__(self, path):
+    def __init__(self, path: str) -> None:
         self.path = path
-        self.check()
+        self._check()
 
-    def check(self):
-        """Check file exists and is of proper schema."""
-        self.exists = isfile(self.path)
-        if self.exists:
-            self.validate_schema()
-
-    def remake(self):
+    def remake(self) -> None:
         """Create tables in self.path file as per PATH_DB_SCHEMA sql file."""
         with sql_connect(self.path) as conn:
             with open(PATH_DB_SCHEMA, 'r', encoding='utf-8') as f:
                 conn.executescript(f.read())
-        self.check()
+        self._check()
+
+    def _check(self) -> None:
+        """Check file exists, and is of proper schema."""
+        self.exists = isfile(self.path)
+        if self.exists:
+            self._validate_schema()
 
-    def validate_schema(self):
+    def _validate_schema(self) -> None:
         """Compare found schema with what's stored at PATH_DB_SCHEMA."""
         sql_for_schema = 'SELECT sql FROM sqlite_master ORDER BY sql'
-        msg_wrong_schema = 'Database has wrong tables schema.'
+        msg_err = 'Database has wrong tables schema. Diff:\n'
         with sql_connect(self.path) as conn:
             schema_rows = [r[0] for r in conn.execute(sql_for_schema) if r[0]]
             retrieved_schema = ';\n'.join(schema_rows) + ';'
             with open(PATH_DB_SCHEMA, 'r', encoding='utf-8') as f:
                 stored_schema = f.read().rstrip()
                 if stored_schema != retrieved_schema:
-                    raise HandledException(msg_wrong_schema)
+                    diff_msg = Differ().compare(retrieved_schema.splitlines(),
+                                                stored_schema.splitlines())
+                    raise HandledException(msg_err + '\n'.join(diff_msg))
+
+
+class DatabaseConnection:
+    """A single connection to the database."""
+
+    def __init__(self, db_file: DatabaseFile) -> None:
+        self.file = db_file
+        self.conn = sql_connect(self.file.path)
+        self.cached_todos: Dict[int, Any] = {}
+        self.cached_days: Dict[str, Any] = {}
+        self.cached_process_steps: Dict[int, Any] = {}
+        self.cached_processes: Dict[int, Any] = {}
+        self.cached_conditions: Dict[int, Any] = {}
+
+    def commit(self) -> None:
+        """Commit SQL transaction."""
+        self.conn.commit()
+
+    def exec(self, code: str, inputs: tuple[Any, ...] = tuple()) -> Cursor:
+        """Add commands to SQL transaction."""
+        return self.conn.execute(code, inputs)
+
+    def close(self) -> None:
+        """Close DB connection."""
+        self.conn.close()
+
+
+class BaseModel:
+    """Template for most of the models we use/derive from the DB."""
+    table_name = ''
+    to_save: list[str] = []
+    id_: None | int | str
+
+    def set_int_id(self, id_: int | None) -> None:
+        """Set id_ if >= 1 or None, else fail."""
+        if (id_ is not None) and id_ < 1:
+            msg = f'illegal {self.__class__.__name__} ID, must be >=1: {id_}'
+            raise HandledException(msg)
+        self.id_ = id_
+
+    def save_core(self, db_conn: DatabaseConnection,
+                  update_with_lastrowid: bool = True) -> None:
+        """Write bare-bones self (sans connected items), ensuring self.id_."""
+        q_marks = ','.join(['?'] * (len(self.to_save) + 1))
+        values = tuple([self.id_] + [getattr(self, key)
+                                     for key in self.to_save])
+        table_name = self.table_name
+        cursor = db_conn.exec(f'REPLACE INTO {table_name} VALUES ({q_marks})',
+                              values)
+        if update_with_lastrowid:
+            self.id_ = cursor.lastrowid
+        cache = getattr(db_conn, f'cached_{table_name}')
+        cache[self.id_] = self