From: Christian Heller Date: Thu, 18 Apr 2024 22:50:06 +0000 (+0200) Subject: Base core models on BaseModel providing sensible defaults. X-Git-Url: https://plomlompom.com/repos/%7B%7Bdb.prefix%7D%7D/%7B%7Bprefix%7D%7D/processes?a=commitdiff_plain;h=c5fab0b28785bb8f3a8e2b8e455fd679cfe83d25;p=plomtask Base core models on BaseModel providing sensible defaults. --- diff --git a/plomtask/conditions.py b/plomtask/conditions.py index 5c57d85..27ab62c 100644 --- a/plomtask/conditions.py +++ b/plomtask/conditions.py @@ -1,19 +1,18 @@ """Non-doable elements of ProcessStep/Todo chains.""" from __future__ import annotations from sqlite3 import Row -from plomtask.db import DatabaseConnection +from plomtask.db import DatabaseConnection, BaseModel from plomtask.misc import VersionedAttribute -from plomtask.exceptions import BadFormatException, NotFoundException +from plomtask.exceptions import NotFoundException -class Condition: +class Condition(BaseModel): """Non Process-dependency for ProcessSteps and Todos.""" + table_name = 'conditions' + to_save = ['is_active'] def __init__(self, id_: int | None, is_active: bool = False) -> None: - if (id_ is not None) and id_ < 1: - msg = f'illegal Condition ID, must be >=1: {id_}' - raise BadFormatException(msg) - self.id_ = id_ + self.set_int_id(id_) self.is_active = is_active self.title = VersionedAttribute(self, 'condition_titles', 'UNNAMED') self.description = VersionedAttribute(self, 'condition_descriptions', @@ -65,10 +64,8 @@ class Condition: def save(self, db_conn: DatabaseConnection) -> None: """Save self and its VersionedAttributes to DB and cache.""" - cursor = db_conn.exec('REPLACE INTO conditions VALUES (?, ?)', - (self.id_, self.is_active)) - self.id_ = cursor.lastrowid + self.save_core(db_conn) self.title.save(db_conn) self.description.save(db_conn) - assert self.id_ is not None + assert isinstance(self.id_, int) db_conn.cached_conditions[self.id_] = self diff --git a/plomtask/days.py b/plomtask/days.py index abfce06..553579e 100644 --- a/plomtask/days.py +++ b/plomtask/days.py @@ -3,7 +3,7 @@ from __future__ import annotations from datetime import datetime, timedelta from sqlite3 import Row from plomtask.exceptions import BadFormatException, NotFoundException -from plomtask.db import DatabaseConnection +from plomtask.db import DatabaseConnection, BaseModel DATE_FORMAT = '%Y-%m-%d' @@ -25,11 +25,13 @@ def todays_date() -> str: return datetime.now().strftime(DATE_FORMAT) -class Day: +class Day(BaseModel): """Individual days defined by their dates.""" + table_name = 'days' + to_save = ['comment'] def __init__(self, date: str, comment: str = '') -> None: - self.date = valid_date(date) + self.id_: str = valid_date(date) self.datetime = datetime.strptime(self.date, DATE_FORMAT) self.comment = comment @@ -88,6 +90,11 @@ class Day: assert isinstance(day, Day) return day + @property + def date(self) -> str: + """Return self.id_ under the assumption it's a date string.""" + return self.id_ + @property def weekday(self) -> str: """Return what weekday matches self.date.""" @@ -107,6 +114,4 @@ class Day: def save(self, db_conn: DatabaseConnection) -> None: """Add (or re-write) self to DB and cache.""" - db_conn.exec('REPLACE INTO days VALUES (?, ?)', - (self.date, self.comment)) - db_conn.cached_days[self.date] = self + self.save_core(db_conn, update_with_lastrowid=False) diff --git a/plomtask/db.py b/plomtask/db.py index f53a94c..dfb388b 100644 --- a/plomtask/db.py +++ b/plomtask/db.py @@ -66,3 +66,31 @@ class DatabaseConnection: def close(self) -> None: """Close DB connection.""" self.conn.close() + + +class BaseModel: + """Template for most of the models we use/derive from the DB.""" + table_name = '' + to_save: list[str] = [] + id_: None | int | str + + def set_int_id(self, id_: int | None) -> None: + """Set id_ if >= 1 or None, else fail.""" + if (id_ is not None) and id_ < 1: + msg = f'illegal {self.__class__.__name__} ID, must be >=1: {id_}' + raise HandledException(msg) + self.id_ = id_ + + def save_core(self, db_conn: DatabaseConnection, + update_with_lastrowid: bool = True) -> None: + """Write bare-bones self (sans connected items), ensuring self.id_.""" + q_marks = ','.join(['?'] * (len(self.to_save) + 1)) + values = tuple([self.id_] + [getattr(self, key) + for key in self.to_save]) + table_name = self.table_name + cursor = db_conn.exec(f'REPLACE INTO {table_name} VALUES ({q_marks})', + values) + if update_with_lastrowid: + self.id_ = cursor.lastrowid + cache = getattr(db_conn, f'cached_{table_name}') + cache[self.id_] = self diff --git a/plomtask/http.py b/plomtask/http.py index 25be899..a14233a 100644 --- a/plomtask/http.py +++ b/plomtask/http.py @@ -225,7 +225,7 @@ class TaskHandler(BaseHTTPRequestHandler): self.form_data.get_all_int('condition')) process.set_fulfills(self.conn, self.form_data.get_all_int('fulfills')) process.set_undoes(self.conn, self.form_data.get_all_int('undoes')) - process.save_id(self.conn) + process.save_core(self.conn) assert process.id_ is not None # for mypy process.explicit_steps = [] steps: list[tuple[int | None, int, int | None]] = [] diff --git a/plomtask/processes.py b/plomtask/processes.py index 76ed30d..45de9db 100644 --- a/plomtask/processes.py +++ b/plomtask/processes.py @@ -2,21 +2,20 @@ from __future__ import annotations from sqlite3 import Row from typing import Any, Set -from plomtask.db import DatabaseConnection +from plomtask.db import DatabaseConnection, BaseModel from plomtask.misc import VersionedAttribute from plomtask.conditions import Condition from plomtask.exceptions import NotFoundException, BadFormatException -class Process: +class Process(BaseModel): """Template for, and metadata for, Todos, and their arrangements.""" + table_name = 'processes' # pylint: disable=too-many-instance-attributes def __init__(self, id_: int | None) -> None: - if (id_ is not None) and id_ < 1: - raise BadFormatException(f'illegal Process ID, must be >=1: {id_}') - self.id_ = id_ + self.set_int_id(id_) self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED') self.description = VersionedAttribute(self, 'process_descriptions', '') self.effort = VersionedAttribute(self, 'process_efforts', 1.0) @@ -29,7 +28,7 @@ class Process: def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Process: """Make Process from database row, with empty VersionedAttributes.""" process = cls(row[0]) - assert process.id_ is not None + assert isinstance(process.id_, int) db_conn.cached_processes[process.id_] = process return process @@ -125,7 +124,7 @@ class Process: external_owner = self for step in [s for s in self.explicit_steps if s.parent_step_id is None]: - assert step.id_ is not None # for mypy + assert isinstance(step.id_, int) steps[step.id_] = make_node(step) for step_id, step_node in steps.items(): walk_steps(step_id, step_node) @@ -176,7 +175,7 @@ class Process: parent_step_id = None except NotFoundException: parent_step_id = None - assert self.id_ is not None + assert isinstance(self.id_, int) step = ProcessStep(id_, self.id_, step_process_id, parent_step_id) walk_steps(step) self.explicit_steps += [step] @@ -187,7 +186,7 @@ class Process: steps: list[tuple[int | None, int, int | None]]) -> None: """Set self.explicit_steps in bulk.""" for step in self.explicit_steps: - assert step.id_ is not None + assert isinstance(step.id_, int) del db_conn.cached_process_steps[step.id_] self.explicit_steps = [] db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?', @@ -196,14 +195,9 @@ class Process: self._add_step(db_conn, step_tuple[0], step_tuple[1], step_tuple[2]) - def save_id(self, db_conn: DatabaseConnection) -> None: - """Write bare-bones self (sans connected items), ensuring self.id_.""" - cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,)) - self.id_ = cursor.lastrowid - def save(self, db_conn: DatabaseConnection) -> None: """Add (or re-write) self and connected items to DB.""" - self.save_id(db_conn) + self.save_core(db_conn) self.title.save(db_conn) self.description.save(db_conn) self.effort.save(db_conn) @@ -222,7 +216,7 @@ class Process: for condition in self.undoes: db_conn.exec('INSERT INTO process_undoes VALUES (?,?)', (self.id_, condition.id_)) - assert self.id_ is not None + assert isinstance(self.id_, int) db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?', (self.id_,)) for step in self.explicit_steps: @@ -230,12 +224,14 @@ class Process: db_conn.cached_processes[self.id_] = self -class ProcessStep: +class ProcessStep(BaseModel): """Sub-unit of Processes.""" + table_name = 'process_steps' + to_save = ['owner_id', 'step_process_id', 'parent_step_id'] def __init__(self, id_: int | None, owner_id: int, step_process_id: int, parent_step_id: int | None) -> None: - self.id_ = id_ + self.set_int_id(id_) self.owner_id = owner_id self.step_process_id = step_process_id self.parent_step_id = parent_step_id @@ -245,7 +241,7 @@ class ProcessStep: row: Row) -> ProcessStep: """Make ProcessStep from database row, store in DB cache.""" step = cls(row[0], row[1], row[2], row[3]) - assert step.id_ is not None + assert isinstance(step.id_, int) db_conn.cached_process_steps[step.id_] = step return step @@ -262,10 +258,5 @@ class ProcessStep: raise NotFoundException(f'found no ProcessStep of ID {id_}') def save(self, db_conn: DatabaseConnection) -> None: - """Save to database and cache.""" - cursor = db_conn.exec('REPLACE INTO process_steps VALUES (?, ?, ?, ?)', - (self.id_, self.owner_id, self.step_process_id, - self.parent_step_id)) - self.id_ = cursor.lastrowid - assert self.id_ is not None - db_conn.cached_process_steps[self.id_] = self + """Default to simply calling self.save_core for simple cases.""" + self.save_core(db_conn) diff --git a/plomtask/todos.py b/plomtask/todos.py index ce83fad..5d46b30 100644 --- a/plomtask/todos.py +++ b/plomtask/todos.py @@ -1,7 +1,7 @@ """Actionables.""" from __future__ import annotations from sqlite3 import Row -from plomtask.db import DatabaseConnection +from plomtask.db import DatabaseConnection, BaseModel from plomtask.days import Day from plomtask.processes import Process from plomtask.conditions import Condition @@ -9,14 +9,18 @@ from plomtask.exceptions import (NotFoundException, BadFormatException, HandledException) -class Todo: +class Todo(BaseModel): """Individual actionable.""" # pylint: disable=too-many-instance-attributes + name = 'Todo' + table_name = 'todos' + to_save = ['process_id', 'is_done', 'date'] + def __init__(self, id_: int | None, process: Process, is_done: bool, day: Day) -> None: - self.id_ = id_ + self.set_int_id(id_) self.process = process self._is_done = is_done self.day = day @@ -37,7 +41,7 @@ class Todo: process=Process.by_id(db_conn, row[1]), is_done=bool(row[2]), day=Day.by_date(db_conn, row[3])) - assert todo.id_ is not None + assert isinstance(todo.id_, int) db_conn.cached_todos[todo.id_] = todo return todo @@ -115,6 +119,16 @@ class Todo: return False return True + @property + def process_id(self) -> int | str | None: + """Return ID of tasked Process.""" + return self.process.id_ + + @property + def date(self) -> str: + """Return date of used Day.""" + return self.day.date + @property def is_done(self) -> bool: """Wrapper around self._is_done so we can control its setter.""" @@ -171,11 +185,8 @@ class Todo: """Write self and children to DB and its cache.""" if self.process.id_ is None: raise NotFoundException('Process of Todo without ID (not saved?)') - cursor = db_conn.exec('REPLACE INTO todos VALUES (?,?,?,?)', - (self.id_, self.process.id_, - self.is_done, self.day.date)) - self.id_ = cursor.lastrowid - assert self.id_ is not None + self.save_core(db_conn) + assert isinstance(self.id_, int) db_conn.cached_todos[self.id_] = self db_conn.exec('DELETE FROM todo_children WHERE parent = ?', (self.id_,)) diff --git a/tests/conditions.py b/tests/conditions.py index 4781246..b6510a1 100644 --- a/tests/conditions.py +++ b/tests/conditions.py @@ -50,6 +50,6 @@ class TestsWithServer(TestCaseWithServer): """Test /condition and /conditions response codes.""" self.check_get('/condition', 200) self.check_get('/condition?id=', 200) - self.check_get('/condition?id=0', 400) + self.check_get('/condition?id=0', 500) self.check_get('/condition?id=FOO', 400) self.check_get('/conditions', 200) diff --git a/tests/processes.py b/tests/processes.py index 960fd70..491a48f 100644 --- a/tests/processes.py +++ b/tests/processes.py @@ -4,7 +4,7 @@ from typing import Any from tests.utils import TestCaseWithDB, TestCaseWithServer from plomtask.processes import Process, ProcessStep from plomtask.conditions import Condition -from plomtask.exceptions import NotFoundException, BadFormatException +from plomtask.exceptions import NotFoundException, HandledException class TestsSansDB(TestCase): @@ -18,7 +18,7 @@ class TestsSansDB(TestCase): def test_Process_legal_ID(self) -> None: """Test Process cannot be instantiated with id_=0.""" - with self.assertRaises(BadFormatException): + with self.assertRaises(HandledException): Process(0) @@ -54,8 +54,8 @@ class TestsWithDB(TestCaseWithDB): steps_proc += [step_tuple] proc.set_steps(self.db_conn, steps_proc) steps_proc[-1] = (expected_id, step_tuple[1], step_tuple[2]) - assert self.proc2.id_ is not None - assert self.proc3.id_ is not None + assert isinstance(self.proc2.id_, int) + assert isinstance(self.proc3.id_, int) steps_proc1: list[tuple[int | None, int, int | None]] = [] add_step(self.proc1, steps_proc1, (None, self.proc2.id_, None), 1) p_1_dict: dict[int, dict[str, Any]] = {1: { @@ -65,6 +65,7 @@ class TestsWithDB(TestCaseWithDB): self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) add_step(self.proc1, steps_proc1, (None, self.proc3.id_, None), 2) step_2 = self.proc1.explicit_steps[-1] + assert isinstance(step_2.id_, int) p_1_dict[2] = { 'process': self.proc3, 'parent_id': None, 'is_explicit': True, 'steps': {}, 'seen': False @@ -111,10 +112,10 @@ class TestsWithDB(TestCaseWithDB): for target in ('conditions', 'fulfills', 'undoes'): c1 = Condition(None, False) c1.save(self.db_conn) - assert c1.id_ is not None + assert isinstance(c1.id_, int) c2 = Condition(None, False) c2.save(self.db_conn) - assert c2.id_ is not None + assert isinstance(c2.id_, int) self.proc1.set_conditions(self.db_conn, [], target) self.assertEqual(getattr(self.proc1, target), []) self.proc1.set_conditions(self.db_conn, [c1.id_], target) @@ -142,23 +143,25 @@ class TestsWithDB(TestCaseWithDB): def test_ProcessStep_singularity(self) -> None: """Test pointers made for single object keep pointing to it.""" - assert self.proc2.id_ is not None + assert isinstance(self.proc2.id_, int) self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)]) step = self.proc1.explicit_steps[-1] - assert step.id_ is not None + assert isinstance(step.id_, int) step_retrieved = ProcessStep.by_id(self.db_conn, step.id_) step.parent_step_id = 99 self.assertEqual(step.parent_step_id, step_retrieved.parent_step_id) def test_Process_singularity(self) -> None: """Test pointers made for single object keep pointing to it.""" - assert self.proc2.id_ is not None + assert isinstance(self.proc1.id_, int) + assert isinstance(self.proc2.id_, int) self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)]) p_retrieved = Process.by_id(self.db_conn, self.proc1.id_) self.assertEqual(self.proc1.explicit_steps, p_retrieved.explicit_steps) def test_Process_versioned_attributes_singularity(self) -> None: """Test behavior of VersionedAttributes on saving (with .title).""" + assert isinstance(self.proc1.id_, int) self.proc1.title.set('named') p_loaded = Process.by_id(self.db_conn, self.proc1.id_) self.assertEqual(self.proc1.title.history, p_loaded.title.history) @@ -201,6 +204,6 @@ class TestsWithServer(TestCaseWithServer): """Test /process and /processes response codes.""" self.check_get('/process', 200) self.check_get('/process?id=', 200) - self.check_get('/process?id=0', 400) + self.check_get('/process?id=0', 500) self.check_get('/process?id=FOO', 400) self.check_get('/processes', 200)