home · contact · privacy
Add Conditions for Todos/Processes to be met or undone by other Todos.
[plomtask] / plomtask / db.py
index a8bdbec166fedbcfd3531717917b1291d364efce..4a82132ff9e89d990c5d3f04f1b4f7395bfb0fda 100644 (file)
@@ -1,39 +1,68 @@
 """Database management."""
 from os.path import isfile
-from sqlite3 import connect as sql_connect
-from plomtask.misc import HandledException
+from difflib import Differ
+from sqlite3 import connect as sql_connect, Cursor
+from typing import Any, Dict
+from plomtask.exceptions import HandledException
 
 PATH_DB_SCHEMA = 'scripts/init.sql'
 
 
-class DatabaseFile:
+class DatabaseFile:  # pylint: disable=too-few-public-methods
     """Represents the sqlite3 database's file."""
 
-    def __init__(self, path):
+    def __init__(self, path: str) -> None:
         self.path = path
-        self.check()
+        self._check()
 
-    def check(self):
-        """Check file exists and is of proper schema."""
-        self.exists = isfile(self.path)
-        if self.exists:
-            self.validate_schema()
-
-    def remake(self):
+    def remake(self) -> None:
         """Create tables in self.path file as per PATH_DB_SCHEMA sql file."""
         with sql_connect(self.path) as conn:
             with open(PATH_DB_SCHEMA, 'r', encoding='utf-8') as f:
                 conn.executescript(f.read())
-        self.check()
+        self._check()
+
+    def _check(self) -> None:
+        """Check file exists and is of proper schema."""
+        self.exists = isfile(self.path)
+        if self.exists:
+            self._validate_schema()
 
-    def validate_schema(self):
+    def _validate_schema(self) -> None:
         """Compare found schema with what's stored at PATH_DB_SCHEMA."""
         sql_for_schema = 'SELECT sql FROM sqlite_master ORDER BY sql'
-        msg_wrong_schema = 'Database has wrong tables schema.'
+        msg_err = 'Database has wrong tables schema. Diff:\n'
         with sql_connect(self.path) as conn:
             schema_rows = [r[0] for r in conn.execute(sql_for_schema) if r[0]]
             retrieved_schema = ';\n'.join(schema_rows) + ';'
             with open(PATH_DB_SCHEMA, 'r', encoding='utf-8') as f:
                 stored_schema = f.read().rstrip()
                 if stored_schema != retrieved_schema:
-                    raise HandledException(msg_wrong_schema)
+                    diff_msg = Differ().compare(retrieved_schema.splitlines(),
+                                                stored_schema.splitlines())
+                    raise HandledException(msg_err + '\n'.join(diff_msg))
+
+
+class DatabaseConnection:
+    """A single connection to the database."""
+
+    def __init__(self, db_file: DatabaseFile) -> None:
+        self.file = db_file
+        self.conn = sql_connect(self.file.path)
+        self.cached_todos: Dict[int, Any] = {}
+        self.cached_days: Dict[str, Any] = {}
+        self.cached_process_steps: Dict[int, Any] = {}
+        self.cached_processes: Dict[int, Any] = {}
+        self.cached_conditions: Dict[int, Any] = {}
+
+    def commit(self) -> None:
+        """Commit SQL transaction."""
+        self.conn.commit()
+
+    def exec(self, code: str, inputs: tuple[Any, ...] = tuple()) -> Cursor:
+        """Add commands to SQL transaction."""
+        return self.conn.execute(code, inputs)
+
+    def close(self) -> None:
+        """Close DB connection."""
+        self.conn.close()