home · contact · privacy
Base core models on BaseModel providing sensible defaults.
authorChristian Heller <c.heller@plomlompom.de>
Thu, 18 Apr 2024 22:50:06 +0000 (00:50 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Thu, 18 Apr 2024 22:50:06 +0000 (00:50 +0200)
plomtask/conditions.py
plomtask/days.py
plomtask/db.py
plomtask/http.py
plomtask/processes.py
plomtask/todos.py
tests/conditions.py
tests/processes.py

index 5c57d85b9bfb7c402c538aba5716954e5a0a65d7..27ab62cf303084f4ae62add99ac4f6d39616d4f0 100644 (file)
@@ -1,19 +1,18 @@
 """Non-doable elements of ProcessStep/Todo chains."""
 from __future__ import annotations
 from sqlite3 import Row
-from plomtask.db import DatabaseConnection
+from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.misc import VersionedAttribute
-from plomtask.exceptions import BadFormatException, NotFoundException
+from plomtask.exceptions import NotFoundException
 
 
-class Condition:
+class Condition(BaseModel):
     """Non Process-dependency for ProcessSteps and Todos."""
+    table_name = 'conditions'
+    to_save = ['is_active']
 
     def __init__(self, id_: int | None, is_active: bool = False) -> None:
-        if (id_ is not None) and id_ < 1:
-            msg = f'illegal Condition ID, must be >=1: {id_}'
-            raise BadFormatException(msg)
-        self.id_ = id_
+        self.set_int_id(id_)
         self.is_active = is_active
         self.title = VersionedAttribute(self, 'condition_titles', 'UNNAMED')
         self.description = VersionedAttribute(self, 'condition_descriptions',
@@ -65,10 +64,8 @@ class Condition:
 
     def save(self, db_conn: DatabaseConnection) -> None:
         """Save self and its VersionedAttributes to DB and cache."""
-        cursor = db_conn.exec('REPLACE INTO conditions VALUES (?, ?)',
-                              (self.id_, self.is_active))
-        self.id_ = cursor.lastrowid
+        self.save_core(db_conn)
         self.title.save(db_conn)
         self.description.save(db_conn)
-        assert self.id_ is not None
+        assert isinstance(self.id_, int)
         db_conn.cached_conditions[self.id_] = self
index abfce06f51fc7ab43607ccd177dddc2194667391..553579e35ee694989433cab6cc0d01310847b180 100644 (file)
@@ -3,7 +3,7 @@ from __future__ import annotations
 from datetime import datetime, timedelta
 from sqlite3 import Row
 from plomtask.exceptions import BadFormatException, NotFoundException
-from plomtask.db import DatabaseConnection
+from plomtask.db import DatabaseConnection, BaseModel
 
 DATE_FORMAT = '%Y-%m-%d'
 
@@ -25,11 +25,13 @@ def todays_date() -> str:
     return datetime.now().strftime(DATE_FORMAT)
 
 
-class Day:
+class Day(BaseModel):
     """Individual days defined by their dates."""
+    table_name = 'days'
+    to_save = ['comment']
 
     def __init__(self, date: str, comment: str = '') -> None:
-        self.date = valid_date(date)
+        self.id_: str = valid_date(date)
         self.datetime = datetime.strptime(self.date, DATE_FORMAT)
         self.comment = comment
 
@@ -88,6 +90,11 @@ class Day:
         assert isinstance(day, Day)
         return day
 
+    @property
+    def date(self) -> str:
+        """Return self.id_ under the assumption it's a date string."""
+        return self.id_
+
     @property
     def weekday(self) -> str:
         """Return what weekday matches self.date."""
@@ -107,6 +114,4 @@ class Day:
 
     def save(self, db_conn: DatabaseConnection) -> None:
         """Add (or re-write) self to DB and cache."""
-        db_conn.exec('REPLACE INTO days VALUES (?, ?)',
-                     (self.date, self.comment))
-        db_conn.cached_days[self.date] = self
+        self.save_core(db_conn, update_with_lastrowid=False)
index f53a94c4f085f3664d66f0b0cc38863b217741e1..dfb388b7ec6ed9317f19a788336025a96e4c745e 100644 (file)
@@ -66,3 +66,31 @@ class DatabaseConnection:
     def close(self) -> None:
         """Close DB connection."""
         self.conn.close()
+
+
+class BaseModel:
+    """Template for most of the models we use/derive from the DB."""
+    table_name = ''
+    to_save: list[str] = []
+    id_: None | int | str
+
+    def set_int_id(self, id_: int | None) -> None:
+        """Set id_ if >= 1 or None, else fail."""
+        if (id_ is not None) and id_ < 1:
+            msg = f'illegal {self.__class__.__name__} ID, must be >=1: {id_}'
+            raise HandledException(msg)
+        self.id_ = id_
+
+    def save_core(self, db_conn: DatabaseConnection,
+                  update_with_lastrowid: bool = True) -> None:
+        """Write bare-bones self (sans connected items), ensuring self.id_."""
+        q_marks = ','.join(['?'] * (len(self.to_save) + 1))
+        values = tuple([self.id_] + [getattr(self, key)
+                                     for key in self.to_save])
+        table_name = self.table_name
+        cursor = db_conn.exec(f'REPLACE INTO {table_name} VALUES ({q_marks})',
+                              values)
+        if update_with_lastrowid:
+            self.id_ = cursor.lastrowid
+        cache = getattr(db_conn, f'cached_{table_name}')
+        cache[self.id_] = self
index 25be899044d652145f166ad1ebed5ff3528b0db0..a14233ae9cb81e0d65c18bbaf37497e63ab311e2 100644 (file)
@@ -225,7 +225,7 @@ class TaskHandler(BaseHTTPRequestHandler):
                                self.form_data.get_all_int('condition'))
         process.set_fulfills(self.conn, self.form_data.get_all_int('fulfills'))
         process.set_undoes(self.conn, self.form_data.get_all_int('undoes'))
-        process.save_id(self.conn)
+        process.save_core(self.conn)
         assert process.id_ is not None  # for mypy
         process.explicit_steps = []
         steps: list[tuple[int | None, int, int | None]] = []
index 76ed30df049b5d801b283ab904364da27f05f240..45de9dbe8e879f3ec06aae5575305e980df8c0d4 100644 (file)
@@ -2,21 +2,20 @@
 from __future__ import annotations
 from sqlite3 import Row
 from typing import Any, Set
-from plomtask.db import DatabaseConnection
+from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.misc import VersionedAttribute
 from plomtask.conditions import Condition
 from plomtask.exceptions import NotFoundException, BadFormatException
 
 
-class Process:
+class Process(BaseModel):
     """Template for, and metadata for, Todos, and their arrangements."""
+    table_name = 'processes'
 
     # pylint: disable=too-many-instance-attributes
 
     def __init__(self, id_: int | None) -> None:
-        if (id_ is not None) and id_ < 1:
-            raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
-        self.id_ = id_
+        self.set_int_id(id_)
         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
         self.description = VersionedAttribute(self, 'process_descriptions', '')
         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
@@ -29,7 +28,7 @@ class Process:
     def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Process:
         """Make Process from database row, with empty VersionedAttributes."""
         process = cls(row[0])
-        assert process.id_ is not None
+        assert isinstance(process.id_, int)
         db_conn.cached_processes[process.id_] = process
         return process
 
@@ -125,7 +124,7 @@ class Process:
             external_owner = self
         for step in [s for s in self.explicit_steps
                      if s.parent_step_id is None]:
-            assert step.id_ is not None  # for mypy
+            assert isinstance(step.id_, int)
             steps[step.id_] = make_node(step)
         for step_id, step_node in steps.items():
             walk_steps(step_id, step_node)
@@ -176,7 +175,7 @@ class Process:
                     parent_step_id = None
             except NotFoundException:
                 parent_step_id = None
-        assert self.id_ is not None
+        assert isinstance(self.id_, int)
         step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
         walk_steps(step)
         self.explicit_steps += [step]
@@ -187,7 +186,7 @@ class Process:
                   steps: list[tuple[int | None, int, int | None]]) -> None:
         """Set self.explicit_steps in bulk."""
         for step in self.explicit_steps:
-            assert step.id_ is not None
+            assert isinstance(step.id_, int)
             del db_conn.cached_process_steps[step.id_]
         self.explicit_steps = []
         db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
@@ -196,14 +195,9 @@ class Process:
             self._add_step(db_conn, step_tuple[0],
                            step_tuple[1], step_tuple[2])
 
-    def save_id(self, db_conn: DatabaseConnection) -> None:
-        """Write bare-bones self (sans connected items), ensuring self.id_."""
-        cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
-        self.id_ = cursor.lastrowid
-
     def save(self, db_conn: DatabaseConnection) -> None:
         """Add (or re-write) self and connected items to DB."""
-        self.save_id(db_conn)
+        self.save_core(db_conn)
         self.title.save(db_conn)
         self.description.save(db_conn)
         self.effort.save(db_conn)
@@ -222,7 +216,7 @@ class Process:
         for condition in self.undoes:
             db_conn.exec('INSERT INTO process_undoes VALUES (?,?)',
                          (self.id_, condition.id_))
-        assert self.id_ is not None
+        assert isinstance(self.id_, int)
         db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
                      (self.id_,))
         for step in self.explicit_steps:
@@ -230,12 +224,14 @@ class Process:
         db_conn.cached_processes[self.id_] = self
 
 
-class ProcessStep:
+class ProcessStep(BaseModel):
     """Sub-unit of Processes."""
+    table_name = 'process_steps'
+    to_save = ['owner_id', 'step_process_id', 'parent_step_id']
 
     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
                  parent_step_id: int | None) -> None:
-        self.id_ = id_
+        self.set_int_id(id_)
         self.owner_id = owner_id
         self.step_process_id = step_process_id
         self.parent_step_id = parent_step_id
@@ -245,7 +241,7 @@ class ProcessStep:
                        row: Row) -> ProcessStep:
         """Make ProcessStep from database row, store in DB cache."""
         step = cls(row[0], row[1], row[2], row[3])
-        assert step.id_ is not None
+        assert isinstance(step.id_, int)
         db_conn.cached_process_steps[step.id_] = step
         return step
 
@@ -262,10 +258,5 @@ class ProcessStep:
         raise NotFoundException(f'found no ProcessStep of ID {id_}')
 
     def save(self, db_conn: DatabaseConnection) -> None:
-        """Save to database and cache."""
-        cursor = db_conn.exec('REPLACE INTO process_steps VALUES (?, ?, ?, ?)',
-                              (self.id_, self.owner_id, self.step_process_id,
-                               self.parent_step_id))
-        self.id_ = cursor.lastrowid
-        assert self.id_ is not None
-        db_conn.cached_process_steps[self.id_] = self
+        """Default to simply calling self.save_core for simple cases."""
+        self.save_core(db_conn)
index ce83faddff55278c0efe44f604f6a5fddfaa5851..5d46b30d93640e40ba39f6e0cd1ba4b114241396 100644 (file)
@@ -1,7 +1,7 @@
 """Actionables."""
 from __future__ import annotations
 from sqlite3 import Row
-from plomtask.db import DatabaseConnection
+from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.days import Day
 from plomtask.processes import Process
 from plomtask.conditions import Condition
@@ -9,14 +9,18 @@ from plomtask.exceptions import (NotFoundException, BadFormatException,
                                  HandledException)
 
 
-class Todo:
+class Todo(BaseModel):
     """Individual actionable."""
 
     # pylint: disable=too-many-instance-attributes
 
+    name = 'Todo'
+    table_name = 'todos'
+    to_save = ['process_id', 'is_done', 'date']
+
     def __init__(self, id_: int | None, process: Process,
                  is_done: bool, day: Day) -> None:
-        self.id_ = id_
+        self.set_int_id(id_)
         self.process = process
         self._is_done = is_done
         self.day = day
@@ -37,7 +41,7 @@ class Todo:
                    process=Process.by_id(db_conn, row[1]),
                    is_done=bool(row[2]),
                    day=Day.by_date(db_conn, row[3]))
-        assert todo.id_ is not None
+        assert isinstance(todo.id_, int)
         db_conn.cached_todos[todo.id_] = todo
         return todo
 
@@ -115,6 +119,16 @@ class Todo:
                 return False
         return True
 
+    @property
+    def process_id(self) -> int | str | None:
+        """Return ID of tasked Process."""
+        return self.process.id_
+
+    @property
+    def date(self) -> str:
+        """Return date of used Day."""
+        return self.day.date
+
     @property
     def is_done(self) -> bool:
         """Wrapper around self._is_done so we can control its setter."""
@@ -171,11 +185,8 @@ class Todo:
         """Write self and children to DB and its cache."""
         if self.process.id_ is None:
             raise NotFoundException('Process of Todo without ID (not saved?)')
-        cursor = db_conn.exec('REPLACE INTO todos VALUES (?,?,?,?)',
-                              (self.id_, self.process.id_,
-                               self.is_done, self.day.date))
-        self.id_ = cursor.lastrowid
-        assert self.id_ is not None
+        self.save_core(db_conn)
+        assert isinstance(self.id_, int)
         db_conn.cached_todos[self.id_] = self
         db_conn.exec('DELETE FROM todo_children WHERE parent = ?',
                      (self.id_,))
index 478124650bd7dd72d984920de105cab3dce033dd..b6510a1afbf2b7435679573ad45fbd8a8ac5ff57 100644 (file)
@@ -50,6 +50,6 @@ class TestsWithServer(TestCaseWithServer):
         """Test /condition and /conditions response codes."""
         self.check_get('/condition', 200)
         self.check_get('/condition?id=', 200)
-        self.check_get('/condition?id=0', 400)
+        self.check_get('/condition?id=0', 500)
         self.check_get('/condition?id=FOO', 400)
         self.check_get('/conditions', 200)
index 960fd707eee4345cb4350c065d9dcc974e94405f..491a48f170a7375bdfc64ed067e00ef6fef8f69c 100644 (file)
@@ -4,7 +4,7 @@ from typing import Any
 from tests.utils import TestCaseWithDB, TestCaseWithServer
 from plomtask.processes import Process, ProcessStep
 from plomtask.conditions import Condition
-from plomtask.exceptions import NotFoundException, BadFormatException
+from plomtask.exceptions import NotFoundException, HandledException
 
 
 class TestsSansDB(TestCase):
@@ -18,7 +18,7 @@ class TestsSansDB(TestCase):
 
     def test_Process_legal_ID(self) -> None:
         """Test Process cannot be instantiated with id_=0."""
-        with self.assertRaises(BadFormatException):
+        with self.assertRaises(HandledException):
             Process(0)
 
 
@@ -54,8 +54,8 @@ class TestsWithDB(TestCaseWithDB):
             steps_proc += [step_tuple]
             proc.set_steps(self.db_conn, steps_proc)
             steps_proc[-1] = (expected_id, step_tuple[1], step_tuple[2])
-        assert self.proc2.id_ is not None
-        assert self.proc3.id_ is not None
+        assert isinstance(self.proc2.id_, int)
+        assert isinstance(self.proc3.id_, int)
         steps_proc1: list[tuple[int | None, int, int | None]] = []
         add_step(self.proc1, steps_proc1, (None, self.proc2.id_, None), 1)
         p_1_dict: dict[int, dict[str, Any]] = {1: {
@@ -65,6 +65,7 @@ class TestsWithDB(TestCaseWithDB):
         self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
         add_step(self.proc1, steps_proc1, (None, self.proc3.id_, None), 2)
         step_2 = self.proc1.explicit_steps[-1]
+        assert isinstance(step_2.id_, int)
         p_1_dict[2] = {
             'process': self.proc3, 'parent_id': None,
             'is_explicit': True, 'steps': {}, 'seen': False
@@ -111,10 +112,10 @@ class TestsWithDB(TestCaseWithDB):
         for target in ('conditions', 'fulfills', 'undoes'):
             c1 = Condition(None, False)
             c1.save(self.db_conn)
-            assert c1.id_ is not None
+            assert isinstance(c1.id_, int)
             c2 = Condition(None, False)
             c2.save(self.db_conn)
-            assert c2.id_ is not None
+            assert isinstance(c2.id_, int)
             self.proc1.set_conditions(self.db_conn, [], target)
             self.assertEqual(getattr(self.proc1, target), [])
             self.proc1.set_conditions(self.db_conn, [c1.id_], target)
@@ -142,23 +143,25 @@ class TestsWithDB(TestCaseWithDB):
 
     def test_ProcessStep_singularity(self) -> None:
         """Test pointers made for single object keep pointing to it."""
-        assert self.proc2.id_ is not None
+        assert isinstance(self.proc2.id_, int)
         self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)])
         step = self.proc1.explicit_steps[-1]
-        assert step.id_ is not None
+        assert isinstance(step.id_, int)
         step_retrieved = ProcessStep.by_id(self.db_conn, step.id_)
         step.parent_step_id = 99
         self.assertEqual(step.parent_step_id, step_retrieved.parent_step_id)
 
     def test_Process_singularity(self) -> None:
         """Test pointers made for single object keep pointing to it."""
-        assert self.proc2.id_ is not None
+        assert isinstance(self.proc1.id_, int)
+        assert isinstance(self.proc2.id_, int)
         self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)])
         p_retrieved = Process.by_id(self.db_conn, self.proc1.id_)
         self.assertEqual(self.proc1.explicit_steps, p_retrieved.explicit_steps)
 
     def test_Process_versioned_attributes_singularity(self) -> None:
         """Test behavior of VersionedAttributes on saving (with .title)."""
+        assert isinstance(self.proc1.id_, int)
         self.proc1.title.set('named')
         p_loaded = Process.by_id(self.db_conn, self.proc1.id_)
         self.assertEqual(self.proc1.title.history, p_loaded.title.history)
@@ -201,6 +204,6 @@ class TestsWithServer(TestCaseWithServer):
         """Test /process and /processes response codes."""
         self.check_get('/process', 200)
         self.check_get('/process?id=', 200)
-        self.check_get('/process?id=0', 400)
+        self.check_get('/process?id=0', 500)
         self.check_get('/process?id=FOO', 400)
         self.check_get('/processes', 200)