"""Actionables."""
from __future__ import annotations
+from typing import Any
from sqlite3 import Row
-from plomtask.db import DatabaseConnection
-from plomtask.days import Day
+from plomtask.db import DatabaseConnection, BaseModel
from plomtask.processes import Process
-from plomtask.exceptions import NotFoundException
+from plomtask.conditions import Condition
+from plomtask.exceptions import (NotFoundException, BadFormatException,
+ HandledException)
-class Todo:
+class Todo(BaseModel):
"""Individual actionable."""
+ # pylint: disable=too-many-instance-attributes
+
+ table_name = 'todos'
+ to_save = ['process_id', 'is_done', 'date']
+
def __init__(self, id_: int | None, process: Process,
- is_done: bool, day: Day) -> None:
- self.id_ = id_
+ is_done: bool, date: str) -> None:
+ self.set_int_id(id_)
self.process = process
- self.is_done = is_done
- self.day = day
-
- def __eq__(self, other: object) -> bool:
- return isinstance(other, self.__class__) and self.id_ == other.id_
+ self._is_done = is_done
+ self.date = date
+ self.children: list[Todo] = []
+ self.parents: list[Todo] = []
+ self.conditions: list[Condition] = []
+ self.fulfills: list[Condition] = []
+ self.undoes: list[Condition] = []
+ if not self.id_:
+ self.conditions = process.conditions[:]
+ self.fulfills = process.fulfills[:]
+ self.undoes = process.undoes[:]
@classmethod
- def from_table_row(cls, row: Row, db_conn: DatabaseConnection) -> Todo:
- """Make Todo from database row."""
- return cls(id_=row[0],
- process=Process.by_id(db_conn, row[1]),
- is_done=row[2],
- day=Day.by_date(db_conn, row[3]))
+ def from_table_row(cls, db_conn: DatabaseConnection,
+ row: Row | list[Any]) -> Todo:
+ """Make from DB row, write to DB cache."""
+ if row[1] == 0:
+ raise NotFoundException('calling Todo of '
+ 'unsaved Process')
+ row_as_list = list(row)
+ row_as_list[1] = Process.by_id(db_conn, row[1])
+ todo = super().from_table_row(db_conn, row_as_list)
+ assert isinstance(todo, Todo)
+ return todo
@classmethod
def by_id(cls, db_conn: DatabaseConnection, id_: int) -> Todo:
- """Get Todo of .id_=id_."""
- for row in db_conn.exec('SELECT * FROM todos WHERE id = ?', (id_,)):
- return cls.from_table_row(row, db_conn)
- raise NotFoundException(f'Todo of ID not found: {id_}')
+ """Get Todo of .id_=id_ and children (from DB cache if possible)."""
+ todo, from_cache = super()._by_id(db_conn, id_)
+ if todo is None:
+ raise NotFoundException(f'Todo of ID not found: {id_}')
+ if not from_cache:
+ for t_id in db_conn.column_where('todo_children', 'child',
+ 'parent', id_):
+ todo.children += [cls.by_id(db_conn, t_id)]
+ for t_id in db_conn.column_where('todo_children', 'parent',
+ 'child', id_):
+ todo.parents += [cls.by_id(db_conn, t_id)]
+ for name in ('conditions', 'fulfills', 'undoes'):
+ table = f'todo_{name}'
+ for cond_id in db_conn.column_where(table, 'condition',
+ 'todo', todo.id_):
+ target = getattr(todo, name)
+ target += [Condition.by_id(db_conn, cond_id)]
+ assert isinstance(todo, Todo)
+ return todo
@classmethod
def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
"""Collect all Todos for Day of date."""
todos = []
- for row in db_conn.exec('SELECT * FROM todos WHERE day = ?', (date,)):
- todos += [cls.from_table_row(row, db_conn)]
+ for id_ in db_conn.column_where('todos', 'id', 'day', date):
+ todos += [cls.by_id(db_conn, id_)]
return todos
+ @classmethod
+ def enablers_for_at(cls, db_conn: DatabaseConnection, condition: Condition,
+ date: str) -> list[Todo]:
+ """Collect all Todos of day that enable condition."""
+ assert isinstance(condition.id_, int)
+ enablers = []
+ for id_ in db_conn.column_where('todo_fulfills', 'todo', 'condition',
+ condition.id_):
+ todo = cls.by_id(db_conn, id_)
+ if todo.date == date:
+ enablers += [todo]
+ return enablers
+
+ @classmethod
+ def disablers_for_at(cls, db_conn: DatabaseConnection,
+ condition: Condition, date: str) -> list[Todo]:
+ """Collect all Todos of day that disable condition."""
+ assert isinstance(condition.id_, int)
+ disablers = []
+ for id_ in db_conn.column_where('todo_undoes', 'todo', 'condition',
+ condition.id_):
+ todo = cls.by_id(db_conn, id_)
+ if todo.date == date:
+ disablers += [todo]
+ return disablers
+
+ @property
+ def is_doable(self) -> bool:
+ """Decide whether .is_done settable based on children, Conditions."""
+ for child in self.children:
+ if not child.is_done:
+ return False
+ for condition in self.conditions:
+ if not condition.is_active:
+ return False
+ return True
+
+ @property
+ def process_id(self) -> int | str | None:
+ """Return ID of tasked Process."""
+ return self.process.id_
+
+ @property
+ def is_done(self) -> bool:
+ """Wrapper around self._is_done so we can control its setter."""
+ return self._is_done
+
+ @is_done.setter
+ def is_done(self, value: bool) -> None:
+ if value != self.is_done and not self.is_doable:
+ raise BadFormatException('cannot change doneness of undoable Todo')
+ if self._is_done != value:
+ self._is_done = value
+ if value is True:
+ for condition in self.fulfills:
+ condition.is_active = True
+ for condition in self.undoes:
+ condition.is_active = False
+
+ def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
+ """Set self.undoes to Conditions identified by ids."""
+ self.set_conditions(db_conn, ids, 'undoes')
+
+ def set_fulfills(self, db_conn: DatabaseConnection,
+ ids: list[int]) -> None:
+ """Set self.fulfills to Conditions identified by ids."""
+ self.set_conditions(db_conn, ids, 'fulfills')
+
+ def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
+ target: str = 'conditions') -> None:
+ """Set self.[target] to Conditions identified by ids."""
+ target_list = getattr(self, target)
+ while len(target_list) > 0:
+ target_list.pop()
+ for id_ in ids:
+ target_list += [Condition.by_id(db_conn, id_)]
+
+ def add_child(self, child: Todo) -> None:
+ """Add child to self.children, guard against recursion"""
+ def walk_steps(node: Todo) -> None:
+ if node.id_ == self.id_:
+ raise BadFormatException('bad child choice causes recursion')
+ for child in node.children:
+ walk_steps(child)
+ if self.id_ is None:
+ raise HandledException('Can only add children to saved Todos.')
+ if child.id_ is None:
+ raise HandledException('Can only add saved children to Todos.')
+ if child in self.children:
+ raise BadFormatException('cannot adopt same child twice')
+ walk_steps(child)
+ self.children += [child]
+ child.parents += [self]
+
def save(self, db_conn: DatabaseConnection) -> None:
- """Write self to DB."""
+ """Write self and children to DB and its cache."""
if self.process.id_ is None:
raise NotFoundException('Process of Todo without ID (not saved?)')
- cursor = db_conn.exec('REPLACE INTO todos VALUES (?,?,?,?)',
- (self.id_, self.process.id_,
- self.is_done, self.day.date))
- self.id_ = cursor.lastrowid
+ self.save_core(db_conn)
+ assert isinstance(self.id_, int)
+ db_conn.cached_todos[self.id_] = self
+ db_conn.rewrite_relations('todo_children', 'parent', self.id_,
+ [[c.id_] for c in self.children])
+ db_conn.rewrite_relations('todo_conditions', 'todo', self.id_,
+ [[c.id_] for c in self.conditions])
+ db_conn.rewrite_relations('todo_fulfills', 'todo', self.id_,
+ [[c.id_] for c in self.fulfills])
+ db_conn.rewrite_relations('todo_undoes', 'todo', self.id_,
+ [[c.id_] for c in self.undoes])