"""Non-doable elements of ProcessStep/Todo chains."""
from __future__ import annotations
from sqlite3 import Row
-from plomtask.db import DatabaseConnection
+from plomtask.db import DatabaseConnection, BaseModel
from plomtask.misc import VersionedAttribute
-from plomtask.exceptions import BadFormatException, NotFoundException
+from plomtask.exceptions import NotFoundException
-class Condition:
+class Condition(BaseModel):
"""Non Process-dependency for ProcessSteps and Todos."""
+ table_name = 'conditions'
+ to_save = ['is_active']
def __init__(self, id_: int | None, is_active: bool = False) -> None:
- if (id_ is not None) and id_ < 1:
- msg = f'illegal Condition ID, must be >=1: {id_}'
- raise BadFormatException(msg)
- self.id_ = id_
+ self.set_int_id(id_)
self.is_active = is_active
self.title = VersionedAttribute(self, 'condition_titles', 'UNNAMED')
self.description = VersionedAttribute(self, 'condition_descriptions',
def save(self, db_conn: DatabaseConnection) -> None:
"""Save self and its VersionedAttributes to DB and cache."""
- cursor = db_conn.exec('REPLACE INTO conditions VALUES (?, ?)',
- (self.id_, self.is_active))
- self.id_ = cursor.lastrowid
+ self.save_core(db_conn)
self.title.save(db_conn)
self.description.save(db_conn)
- assert self.id_ is not None
+ assert isinstance(self.id_, int)
db_conn.cached_conditions[self.id_] = self
from datetime import datetime, timedelta
from sqlite3 import Row
from plomtask.exceptions import BadFormatException, NotFoundException
-from plomtask.db import DatabaseConnection
+from plomtask.db import DatabaseConnection, BaseModel
DATE_FORMAT = '%Y-%m-%d'
return datetime.now().strftime(DATE_FORMAT)
-class Day:
+class Day(BaseModel):
"""Individual days defined by their dates."""
+ table_name = 'days'
+ to_save = ['comment']
def __init__(self, date: str, comment: str = '') -> None:
- self.date = valid_date(date)
+ self.id_: str = valid_date(date)
self.datetime = datetime.strptime(self.date, DATE_FORMAT)
self.comment = comment
assert isinstance(day, Day)
return day
+ @property
+ def date(self) -> str:
+ """Return self.id_ under the assumption it's a date string."""
+ return self.id_
+
@property
def weekday(self) -> str:
"""Return what weekday matches self.date."""
def save(self, db_conn: DatabaseConnection) -> None:
"""Add (or re-write) self to DB and cache."""
- db_conn.exec('REPLACE INTO days VALUES (?, ?)',
- (self.date, self.comment))
- db_conn.cached_days[self.date] = self
+ self.save_core(db_conn, update_with_lastrowid=False)
def close(self) -> None:
"""Close DB connection."""
self.conn.close()
+
+
+class BaseModel:
+ """Template for most of the models we use/derive from the DB."""
+ table_name = ''
+ to_save: list[str] = []
+ id_: None | int | str
+
+ def set_int_id(self, id_: int | None) -> None:
+ """Set id_ if >= 1 or None, else fail."""
+ if (id_ is not None) and id_ < 1:
+ msg = f'illegal {self.__class__.__name__} ID, must be >=1: {id_}'
+ raise HandledException(msg)
+ self.id_ = id_
+
+ def save_core(self, db_conn: DatabaseConnection,
+ update_with_lastrowid: bool = True) -> None:
+ """Write bare-bones self (sans connected items), ensuring self.id_."""
+ q_marks = ','.join(['?'] * (len(self.to_save) + 1))
+ values = tuple([self.id_] + [getattr(self, key)
+ for key in self.to_save])
+ table_name = self.table_name
+ cursor = db_conn.exec(f'REPLACE INTO {table_name} VALUES ({q_marks})',
+ values)
+ if update_with_lastrowid:
+ self.id_ = cursor.lastrowid
+ cache = getattr(db_conn, f'cached_{table_name}')
+ cache[self.id_] = self
self.form_data.get_all_int('condition'))
process.set_fulfills(self.conn, self.form_data.get_all_int('fulfills'))
process.set_undoes(self.conn, self.form_data.get_all_int('undoes'))
- process.save_id(self.conn)
+ process.save_core(self.conn)
assert process.id_ is not None # for mypy
process.explicit_steps = []
steps: list[tuple[int | None, int, int | None]] = []
from __future__ import annotations
from sqlite3 import Row
from typing import Any, Set
-from plomtask.db import DatabaseConnection
+from plomtask.db import DatabaseConnection, BaseModel
from plomtask.misc import VersionedAttribute
from plomtask.conditions import Condition
from plomtask.exceptions import NotFoundException, BadFormatException
-class Process:
+class Process(BaseModel):
"""Template for, and metadata for, Todos, and their arrangements."""
+ table_name = 'processes'
# pylint: disable=too-many-instance-attributes
def __init__(self, id_: int | None) -> None:
- if (id_ is not None) and id_ < 1:
- raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
- self.id_ = id_
+ self.set_int_id(id_)
self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
self.description = VersionedAttribute(self, 'process_descriptions', '')
self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Process:
"""Make Process from database row, with empty VersionedAttributes."""
process = cls(row[0])
- assert process.id_ is not None
+ assert isinstance(process.id_, int)
db_conn.cached_processes[process.id_] = process
return process
external_owner = self
for step in [s for s in self.explicit_steps
if s.parent_step_id is None]:
- assert step.id_ is not None # for mypy
+ assert isinstance(step.id_, int)
steps[step.id_] = make_node(step)
for step_id, step_node in steps.items():
walk_steps(step_id, step_node)
parent_step_id = None
except NotFoundException:
parent_step_id = None
- assert self.id_ is not None
+ assert isinstance(self.id_, int)
step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
walk_steps(step)
self.explicit_steps += [step]
steps: list[tuple[int | None, int, int | None]]) -> None:
"""Set self.explicit_steps in bulk."""
for step in self.explicit_steps:
- assert step.id_ is not None
+ assert isinstance(step.id_, int)
del db_conn.cached_process_steps[step.id_]
self.explicit_steps = []
db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
self._add_step(db_conn, step_tuple[0],
step_tuple[1], step_tuple[2])
- def save_id(self, db_conn: DatabaseConnection) -> None:
- """Write bare-bones self (sans connected items), ensuring self.id_."""
- cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
- self.id_ = cursor.lastrowid
-
def save(self, db_conn: DatabaseConnection) -> None:
"""Add (or re-write) self and connected items to DB."""
- self.save_id(db_conn)
+ self.save_core(db_conn)
self.title.save(db_conn)
self.description.save(db_conn)
self.effort.save(db_conn)
for condition in self.undoes:
db_conn.exec('INSERT INTO process_undoes VALUES (?,?)',
(self.id_, condition.id_))
- assert self.id_ is not None
+ assert isinstance(self.id_, int)
db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
(self.id_,))
for step in self.explicit_steps:
db_conn.cached_processes[self.id_] = self
-class ProcessStep:
+class ProcessStep(BaseModel):
"""Sub-unit of Processes."""
+ table_name = 'process_steps'
+ to_save = ['owner_id', 'step_process_id', 'parent_step_id']
def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
parent_step_id: int | None) -> None:
- self.id_ = id_
+ self.set_int_id(id_)
self.owner_id = owner_id
self.step_process_id = step_process_id
self.parent_step_id = parent_step_id
row: Row) -> ProcessStep:
"""Make ProcessStep from database row, store in DB cache."""
step = cls(row[0], row[1], row[2], row[3])
- assert step.id_ is not None
+ assert isinstance(step.id_, int)
db_conn.cached_process_steps[step.id_] = step
return step
raise NotFoundException(f'found no ProcessStep of ID {id_}')
def save(self, db_conn: DatabaseConnection) -> None:
- """Save to database and cache."""
- cursor = db_conn.exec('REPLACE INTO process_steps VALUES (?, ?, ?, ?)',
- (self.id_, self.owner_id, self.step_process_id,
- self.parent_step_id))
- self.id_ = cursor.lastrowid
- assert self.id_ is not None
- db_conn.cached_process_steps[self.id_] = self
+ """Default to simply calling self.save_core for simple cases."""
+ self.save_core(db_conn)
"""Actionables."""
from __future__ import annotations
from sqlite3 import Row
-from plomtask.db import DatabaseConnection
+from plomtask.db import DatabaseConnection, BaseModel
from plomtask.days import Day
from plomtask.processes import Process
from plomtask.conditions import Condition
HandledException)
-class Todo:
+class Todo(BaseModel):
"""Individual actionable."""
# pylint: disable=too-many-instance-attributes
+ name = 'Todo'
+ table_name = 'todos'
+ to_save = ['process_id', 'is_done', 'date']
+
def __init__(self, id_: int | None, process: Process,
is_done: bool, day: Day) -> None:
- self.id_ = id_
+ self.set_int_id(id_)
self.process = process
self._is_done = is_done
self.day = day
process=Process.by_id(db_conn, row[1]),
is_done=bool(row[2]),
day=Day.by_date(db_conn, row[3]))
- assert todo.id_ is not None
+ assert isinstance(todo.id_, int)
db_conn.cached_todos[todo.id_] = todo
return todo
return False
return True
+ @property
+ def process_id(self) -> int | str | None:
+ """Return ID of tasked Process."""
+ return self.process.id_
+
+ @property
+ def date(self) -> str:
+ """Return date of used Day."""
+ return self.day.date
+
@property
def is_done(self) -> bool:
"""Wrapper around self._is_done so we can control its setter."""
"""Write self and children to DB and its cache."""
if self.process.id_ is None:
raise NotFoundException('Process of Todo without ID (not saved?)')
- cursor = db_conn.exec('REPLACE INTO todos VALUES (?,?,?,?)',
- (self.id_, self.process.id_,
- self.is_done, self.day.date))
- self.id_ = cursor.lastrowid
- assert self.id_ is not None
+ self.save_core(db_conn)
+ assert isinstance(self.id_, int)
db_conn.cached_todos[self.id_] = self
db_conn.exec('DELETE FROM todo_children WHERE parent = ?',
(self.id_,))
"""Test /condition and /conditions response codes."""
self.check_get('/condition', 200)
self.check_get('/condition?id=', 200)
- self.check_get('/condition?id=0', 400)
+ self.check_get('/condition?id=0', 500)
self.check_get('/condition?id=FOO', 400)
self.check_get('/conditions', 200)
from tests.utils import TestCaseWithDB, TestCaseWithServer
from plomtask.processes import Process, ProcessStep
from plomtask.conditions import Condition
-from plomtask.exceptions import NotFoundException, BadFormatException
+from plomtask.exceptions import NotFoundException, HandledException
class TestsSansDB(TestCase):
def test_Process_legal_ID(self) -> None:
"""Test Process cannot be instantiated with id_=0."""
- with self.assertRaises(BadFormatException):
+ with self.assertRaises(HandledException):
Process(0)
steps_proc += [step_tuple]
proc.set_steps(self.db_conn, steps_proc)
steps_proc[-1] = (expected_id, step_tuple[1], step_tuple[2])
- assert self.proc2.id_ is not None
- assert self.proc3.id_ is not None
+ assert isinstance(self.proc2.id_, int)
+ assert isinstance(self.proc3.id_, int)
steps_proc1: list[tuple[int | None, int, int | None]] = []
add_step(self.proc1, steps_proc1, (None, self.proc2.id_, None), 1)
p_1_dict: dict[int, dict[str, Any]] = {1: {
self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
add_step(self.proc1, steps_proc1, (None, self.proc3.id_, None), 2)
step_2 = self.proc1.explicit_steps[-1]
+ assert isinstance(step_2.id_, int)
p_1_dict[2] = {
'process': self.proc3, 'parent_id': None,
'is_explicit': True, 'steps': {}, 'seen': False
for target in ('conditions', 'fulfills', 'undoes'):
c1 = Condition(None, False)
c1.save(self.db_conn)
- assert c1.id_ is not None
+ assert isinstance(c1.id_, int)
c2 = Condition(None, False)
c2.save(self.db_conn)
- assert c2.id_ is not None
+ assert isinstance(c2.id_, int)
self.proc1.set_conditions(self.db_conn, [], target)
self.assertEqual(getattr(self.proc1, target), [])
self.proc1.set_conditions(self.db_conn, [c1.id_], target)
def test_ProcessStep_singularity(self) -> None:
"""Test pointers made for single object keep pointing to it."""
- assert self.proc2.id_ is not None
+ assert isinstance(self.proc2.id_, int)
self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)])
step = self.proc1.explicit_steps[-1]
- assert step.id_ is not None
+ assert isinstance(step.id_, int)
step_retrieved = ProcessStep.by_id(self.db_conn, step.id_)
step.parent_step_id = 99
self.assertEqual(step.parent_step_id, step_retrieved.parent_step_id)
def test_Process_singularity(self) -> None:
"""Test pointers made for single object keep pointing to it."""
- assert self.proc2.id_ is not None
+ assert isinstance(self.proc1.id_, int)
+ assert isinstance(self.proc2.id_, int)
self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)])
p_retrieved = Process.by_id(self.db_conn, self.proc1.id_)
self.assertEqual(self.proc1.explicit_steps, p_retrieved.explicit_steps)
def test_Process_versioned_attributes_singularity(self) -> None:
"""Test behavior of VersionedAttributes on saving (with .title)."""
+ assert isinstance(self.proc1.id_, int)
self.proc1.title.set('named')
p_loaded = Process.by_id(self.db_conn, self.proc1.id_)
self.assertEqual(self.proc1.title.history, p_loaded.title.history)
"""Test /process and /processes response codes."""
self.check_get('/process', 200)
self.check_get('/process?id=', 200)
- self.check_get('/process?id=0', 400)
+ self.check_get('/process?id=0', 500)
self.check_get('/process?id=FOO', 400)
self.check_get('/processes', 200)