X-Git-Url: https://plomlompom.com/repos/foo.html?a=blobdiff_plain;f=plomtask%2Ftodos.py;h=fd72af6bf8c0842f2a2727185c2b4c3f707a20d4;hb=2d0d3a138de69e5e09208936ac094b53b0785c0b;hp=1b6b740c97930862b8867d5229e01685713e6595;hpb=a4ca74f81ae42abe27cf6dbab7ef18c850db72c2;p=plomtask diff --git a/plomtask/todos.py b/plomtask/todos.py index 1b6b740..fd72af6 100644 --- a/plomtask/todos.py +++ b/plomtask/todos.py @@ -1,46 +1,187 @@ """Actionables.""" from __future__ import annotations +from typing import Any from sqlite3 import Row -from plomtask.db import DatabaseConnection -from plomtask.days import Day +from plomtask.db import DatabaseConnection, BaseModel from plomtask.processes import Process -from plomtask.exceptions import NotFoundException +from plomtask.conditions import Condition +from plomtask.exceptions import (NotFoundException, BadFormatException, + HandledException) -class Todo: +class Todo(BaseModel): """Individual actionable.""" + # pylint: disable=too-many-instance-attributes + + table_name = 'todos' + to_save = ['process_id', 'is_done', 'date'] + def __init__(self, id_: int | None, process: Process, - is_done: bool, day: Day) -> None: - self.id_ = id_ + is_done: bool, date: str) -> None: + self.set_int_id(id_) self.process = process - self.is_done = is_done - self.day = day + self._is_done = is_done + self.date = date + self.children: list[Todo] = [] + self.parents: list[Todo] = [] + self.conditions: list[Condition] = [] + self.fulfills: list[Condition] = [] + self.undoes: list[Condition] = [] + if not self.id_: + self.conditions = process.conditions[:] + self.fulfills = process.fulfills[:] + self.undoes = process.undoes[:] - def __eq__(self, other: object) -> bool: - return isinstance(other, self.__class__) and self.id_ == other.id_ + @classmethod + def from_table_row(cls, db_conn: DatabaseConnection, + row: Row | list[Any]) -> Todo: + """Make from DB row, write to DB cache.""" + if row[1] == 0: + raise NotFoundException('calling Todo of ' + 'unsaved Process') + row_as_list = list(row) + row_as_list[1] = Process.by_id(db_conn, row[1]) + todo = super().from_table_row(db_conn, row_as_list) + assert isinstance(todo, Todo) + return todo @classmethod - def from_table_row(cls, row: Row, db_conn: DatabaseConnection) -> Todo: - """Make Todo from database row.""" - return cls(id_=row[0], - process=Process.by_id(db_conn, row[1]), - is_done=row[2], - day=Day.by_date(db_conn, row[3])) + def by_id(cls, db_conn: DatabaseConnection, id_: int) -> Todo: + """Get Todo of .id_=id_ and children (from DB cache if possible).""" + todo, from_cache = super()._by_id(db_conn, id_) + if todo is None: + raise NotFoundException(f'Todo of ID not found: {id_}') + if not from_cache: + for t_id in db_conn.column_where('todo_children', 'child', + 'parent', id_): + todo.children += [cls.by_id(db_conn, t_id)] + for t_id in db_conn.column_where('todo_children', 'parent', + 'child', id_): + todo.parents += [cls.by_id(db_conn, t_id)] + for name in ('conditions', 'fulfills', 'undoes'): + table = f'todo_{name}' + for cond_id in db_conn.column_where(table, 'condition', + 'todo', todo.id_): + target = getattr(todo, name) + target += [Condition.by_id(db_conn, cond_id)] + assert isinstance(todo, Todo) + return todo @classmethod def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]: """Collect all Todos for Day of date.""" todos = [] - for row in db_conn.exec('SELECT * FROM todos WHERE day = ?', (date,)): - todos += [cls.from_table_row(row, db_conn)] + for id_ in db_conn.column_where('todos', 'id', 'day', date): + todos += [cls.by_id(db_conn, id_)] return todos + @classmethod + def enablers_for_at(cls, db_conn: DatabaseConnection, condition: Condition, + date: str) -> list[Todo]: + """Collect all Todos of day that enable condition.""" + assert isinstance(condition.id_, int) + enablers = [] + for id_ in db_conn.column_where('todo_fulfills', 'todo', 'condition', + condition.id_): + todo = cls.by_id(db_conn, id_) + if todo.date == date: + enablers += [todo] + return enablers + + @classmethod + def disablers_for_at(cls, db_conn: DatabaseConnection, + condition: Condition, date: str) -> list[Todo]: + """Collect all Todos of day that disable condition.""" + assert isinstance(condition.id_, int) + disablers = [] + for id_ in db_conn.column_where('todo_undoes', 'todo', 'condition', + condition.id_): + todo = cls.by_id(db_conn, id_) + if todo.date == date: + disablers += [todo] + return disablers + + @property + def is_doable(self) -> bool: + """Decide whether .is_done settable based on children, Conditions.""" + for child in self.children: + if not child.is_done: + return False + for condition in self.conditions: + if not condition.is_active: + return False + return True + + @property + def process_id(self) -> int | str | None: + """Return ID of tasked Process.""" + return self.process.id_ + + @property + def is_done(self) -> bool: + """Wrapper around self._is_done so we can control its setter.""" + return self._is_done + + @is_done.setter + def is_done(self, value: bool) -> None: + if value != self.is_done and not self.is_doable: + raise BadFormatException('cannot change doneness of undoable Todo') + if self._is_done != value: + self._is_done = value + if value is True: + for condition in self.fulfills: + condition.is_active = True + for condition in self.undoes: + condition.is_active = False + + def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None: + """Set self.undoes to Conditions identified by ids.""" + self.set_conditions(db_conn, ids, 'undoes') + + def set_fulfills(self, db_conn: DatabaseConnection, + ids: list[int]) -> None: + """Set self.fulfills to Conditions identified by ids.""" + self.set_conditions(db_conn, ids, 'fulfills') + + def set_conditions(self, db_conn: DatabaseConnection, ids: list[int], + target: str = 'conditions') -> None: + """Set self.[target] to Conditions identified by ids.""" + target_list = getattr(self, target) + while len(target_list) > 0: + target_list.pop() + for id_ in ids: + target_list += [Condition.by_id(db_conn, id_)] + + def add_child(self, child: Todo) -> None: + """Add child to self.children, guard against recursion""" + def walk_steps(node: Todo) -> None: + if node.id_ == self.id_: + raise BadFormatException('bad child choice causes recursion') + for child in node.children: + walk_steps(child) + if self.id_ is None: + raise HandledException('Can only add children to saved Todos.') + if child.id_ is None: + raise HandledException('Can only add saved children to Todos.') + if child in self.children: + raise BadFormatException('cannot adopt same child twice') + walk_steps(child) + self.children += [child] + child.parents += [self] + def save(self, db_conn: DatabaseConnection) -> None: - """Write self to DB.""" + """Write self and children to DB and its cache.""" if self.process.id_ is None: raise NotFoundException('Process of Todo without ID (not saved?)') - cursor = db_conn.exec('REPLACE INTO todos VALUES (?,?,?,?)', - (self.id_, self.process.id_, - self.is_done, self.day.date)) - self.id_ = cursor.lastrowid + self.save_core(db_conn) + assert isinstance(self.id_, int) + db_conn.cached_todos[self.id_] = self + db_conn.rewrite_relations('todo_children', 'parent', self.id_, + [[c.id_] for c in self.children]) + db_conn.rewrite_relations('todo_conditions', 'todo', self.id_, + [[c.id_] for c in self.conditions]) + db_conn.rewrite_relations('todo_fulfills', 'todo', self.id_, + [[c.id_] for c in self.fulfills]) + db_conn.rewrite_relations('todo_undoes', 'todo', self.id_, + [[c.id_] for c in self.undoes])