"""Actionables."""
from __future__ import annotations
+from typing import Any, Set
from sqlite3 import Row
-from plomtask.db import DatabaseConnection
-from plomtask.days import Day
-from plomtask.processes import Process
+from plomtask.misc import DictableNode
+from plomtask.db import DatabaseConnection, BaseModel
+from plomtask.processes import Process, ProcessStepsNode
+from plomtask.versioned_attributes import VersionedAttribute
+from plomtask.conditions import Condition, ConditionsRelations
from plomtask.exceptions import (NotFoundException, BadFormatException,
HandledException)
+from plomtask.dating import valid_date
-class Todo:
+class TodoNode(DictableNode):
+ """Collects what's useful to know for Todo/Condition tree display."""
+ # pylint: disable=too-few-public-methods
+ todo: Todo
+ seen: bool
+ children: list[TodoNode]
+ _to_dict = ['todo', 'seen', 'children']
+
+
+class TodoOrProcStepNode(DictableNode):
+ """Collect what's useful for Todo-or-ProcessStep tree display."""
+ # pylint: disable=too-few-public-methods
+ node_id: int
+ todo: Todo | None
+ process: Process | None
+ children: list[TodoOrProcStepNode] # pylint: disable=undefined-variable
+ fillable: bool = False
+ _to_dict = ['node_id', 'todo', 'process', 'children', 'fillable']
+
+
+class Todo(BaseModel[int], ConditionsRelations):
"""Individual actionable."""
+ # pylint: disable=too-many-instance-attributes
+ # pylint: disable=too-many-public-methods
+ table_name = 'todos'
+ to_save_simples = ['process_id', 'is_done', 'date', 'comment', 'effort',
+ 'calendarize']
+ to_save_relations = [('todo_conditions', 'todo', 'conditions', 0),
+ ('todo_blockers', 'todo', 'blockers', 0),
+ ('todo_enables', 'todo', 'enables', 0),
+ ('todo_disables', 'todo', 'disables', 0),
+ ('todo_children', 'parent', 'children', 0),
+ ('todo_children', 'child', 'parents', 1)]
+ to_search = ['comment']
+ days_to_update: Set[str] = set()
+ children: list[Todo]
+ parents: list[Todo]
+ sorters = {'doneness': lambda t: t.is_done,
+ 'title': lambda t: t.title_then,
+ 'comment': lambda t: t.comment,
+ 'date': lambda t: t.date}
- def __init__(self, id_: int | None, process: Process,
- is_done: bool, day: Day) -> None:
- self.id_ = id_
+ # pylint: disable=too-many-arguments
+ def __init__(self, id_: int | None,
+ process: Process,
+ is_done: bool,
+ date: str, comment: str = '',
+ effort: None | float = None,
+ calendarize: bool = False) -> None:
+ BaseModel.__init__(self, id_)
+ ConditionsRelations.__init__(self)
+ if process.id_ is None:
+ raise NotFoundException('Process of Todo without ID (not saved?)')
self.process = process
- self.is_done = is_done
- self.day = day
- self.children: list[Todo] = []
+ self._is_done = is_done
+ self.date = valid_date(date)
+ self.comment = comment
+ self.effort = effort
+ self.children = []
+ self.parents = []
+ self.calendarize = calendarize
+ if not self.id_:
+ self.calendarize = self.process.calendarize
+ self.conditions = self.process.conditions[:]
+ self.blockers = self.process.blockers[:]
+ self.enables = self.process.enables[:]
+ self.disables = self.process.disables[:]
@classmethod
- def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Todo:
- """Make Todo from database row, write to DB cache."""
- todo = cls(id_=row[0],
- process=Process.by_id(db_conn, row[1]),
- is_done=row[2],
- day=Day.by_date(db_conn, row[3]))
- assert todo.id_ is not None
- db_conn.cached_todos[todo.id_] = todo
- return todo
+ def by_date_range(cls, db_conn: DatabaseConnection,
+ date_range: tuple[str, str] = ('', '')) -> list[Todo]:
+ """Collect Todos of Days within date_range."""
+ todos, _, _ = cls.by_date_range_with_limits(db_conn, date_range)
+ return todos
- @classmethod
- def by_id(cls, db_conn: DatabaseConnection, id_: int | None) -> Todo:
- """Get Todo of .id_=id_ and children (from DB cache if possible)."""
- if id_ in db_conn.cached_todos.keys():
- todo = db_conn.cached_todos[id_]
- else:
- todo = None
- for row in db_conn.exec('SELECT * FROM todos WHERE id = ?',
- (id_,)):
- todo = cls.from_table_row(db_conn, row)
+ def ensure_children(self, db_conn: DatabaseConnection) -> None:
+ """Ensure Todo children (create or adopt) demanded by Process chain."""
+
+ def walk_steps(parent: Todo, step_node: ProcessStepsNode) -> Todo:
+ adoptables = [t for t in Todo.by_date(db_conn, parent.date)
+ if (t not in parent.children)
+ and (t != parent)
+ and step_node.process.id_ == t.process_id]
+ satisfier = None
+ for adoptable in adoptables:
+ satisfier = adoptable
break
- if todo is None:
- raise NotFoundException(f'Todo of ID not found: {id_}')
- for row in db_conn.exec('SELECT child FROM todo_children '
- 'WHERE parent = ?', (id_,)):
- todo.children += [cls.by_id(db_conn, row[0])]
- assert isinstance(todo, Todo)
+ if not satisfier:
+ satisfier = Todo(None, step_node.process, False, parent.date)
+ satisfier.save(db_conn)
+ sub_step_nodes = sorted(
+ step_node.steps,
+ key=lambda s: s.process.id_ if s.process.id_ else 0)
+ for sub_node in sub_step_nodes:
+ if sub_node.is_suppressed:
+ continue
+ n_slots = len([n for n in sub_step_nodes
+ if n.process == sub_node.process])
+ filled_slots = len([t for t in satisfier.children
+ if t.process.id_ == sub_node.process.id_])
+ # if we did not newly create satisfier, it may already fill
+ # some step dependencies, so only fill what remains open
+ if n_slots - filled_slots > 0:
+ satisfier.add_child(walk_steps(satisfier, sub_node))
+ satisfier.save(db_conn)
+ return satisfier
+
+ process = Process.by_id(db_conn, self.process_id)
+ steps_tree = process.get_steps(db_conn)
+ for step_node in steps_tree:
+ if step_node.is_suppressed:
+ continue
+ self.add_child(walk_steps(self, step_node))
+ self.save(db_conn)
+
+ @classmethod
+ def from_table_row(cls, db_conn: DatabaseConnection,
+ row: Row | list[Any]) -> Todo:
+ """Make from DB row, with dependencies."""
+ if row[1] == 0:
+ raise NotFoundException('calling Todo of '
+ 'unsaved Process')
+ row_as_list = list(row)
+ row_as_list[1] = Process.by_id(db_conn, row[1])
+ todo = super().from_table_row(db_conn, row_as_list)
+ assert isinstance(todo.id_, int)
+ for t_id in db_conn.column_where('todo_children', 'child',
+ 'parent', todo.id_):
+ todo.children += [cls.by_id(db_conn, t_id)]
+ for t_id in db_conn.column_where('todo_children', 'parent',
+ 'child', todo.id_):
+ todo.parents += [cls.by_id(db_conn, t_id)]
+ for name in ('conditions', 'blockers', 'enables', 'disables'):
+ table = f'todo_{name}'
+ assert isinstance(todo.id_, int)
+ for cond_id in db_conn.column_where(table, 'condition',
+ 'todo', todo.id_):
+ target = getattr(todo, name)
+ target += [Condition.by_id(db_conn, cond_id)]
return todo
+ @classmethod
+ def by_process_id(cls, db_conn: DatabaseConnection,
+ process_id: int | None) -> list[Todo]:
+ """Collect all Todos of Process of process_id."""
+ return [t for t in cls.all(db_conn) if t.process.id_ == process_id]
+
@classmethod
def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
"""Collect all Todos for Day of date."""
- todos = []
- for row in db_conn.exec('SELECT id FROM todos WHERE day = ?', (date,)):
- todos += [cls.by_id(db_conn, row[0])]
- return todos
+ return cls.by_date_range(db_conn, (date, date))
+
+ @property
+ def is_doable(self) -> bool:
+ """Decide whether .is_done settable based on children, Conditions."""
+ for child in self.children:
+ if not child.is_done:
+ return False
+ for condition in self.conditions:
+ if not condition.is_active:
+ return False
+ for condition in self.blockers:
+ if condition.is_active:
+ return False
+ return True
+
+ @property
+ def is_deletable(self) -> bool:
+ """Decide whether self be deletable (not if preserve-worthy values)."""
+ if self.comment:
+ return False
+ if self.effort and self.effort >= 0:
+ return False
+ return True
+
+ @property
+ def performed_effort(self) -> float:
+ """Return performed effort, i.e. self.effort or default if done.."""
+ if self.effort is not None:
+ return self.effort
+ if self.is_done:
+ return self.effort_then
+ return 0
+
+ @property
+ def process_id(self) -> int:
+ """Needed for super().save to save Processes as attributes."""
+ assert isinstance(self.process.id_, int)
+ return self.process.id_
+
+ @property
+ def is_done(self) -> bool:
+ """Wrapper around self._is_done so we can control its setter."""
+ return self._is_done
+
+ @is_done.setter
+ def is_done(self, value: bool) -> None:
+ if value != self.is_done and not self.is_doable:
+ raise BadFormatException('cannot change doneness of undoable Todo')
+ if self._is_done != value:
+ self._is_done = value
+ if value is True:
+ for condition in self.enables:
+ condition.is_active = True
+ for condition in self.disables:
+ condition.is_active = False
+
+ @property
+ def title(self) -> VersionedAttribute:
+ """Shortcut to .process.title."""
+ assert isinstance(self.process.title, VersionedAttribute)
+ return self.process.title
+
+ @property
+ def title_then(self) -> str:
+ """Shortcut to .process.title.at(self.date)"""
+ title_then = self.process.title.at(self.date)
+ assert isinstance(title_then, str)
+ return title_then
+
+ @property
+ def effort_then(self) -> float:
+ """Shortcut to .process.effort.at(self.date)"""
+ effort_then = self.process.effort.at(self.date)
+ assert isinstance(effort_then, float)
+ return effort_then
+
+ @property
+ def has_doneness_in_path(self) -> bool:
+ """Check whether self is done or has any children that are."""
+ if self.is_done:
+ return True
+ for child in self.children:
+ if child.is_done:
+ return True
+ if child.has_doneness_in_path:
+ return True
+ return False
+
+ def get_step_tree(self, seen_todos: set[int]) -> TodoNode:
+ """Return tree of depended-on Todos."""
+
+ def make_node(todo: Todo) -> TodoNode:
+ children = []
+ seen = todo.id_ in seen_todos
+ assert isinstance(todo.id_, int)
+ seen_todos.add(todo.id_)
+ for child in todo.children:
+ children += [make_node(child)]
+ return TodoNode(todo, seen, children)
+
+ return make_node(self)
+
+ @property
+ def tree_effort(self) -> float:
+ """Return sum of performed efforts of self and all descendants."""
+
+ def walk_tree(node: Todo) -> float:
+ local_effort = 0.0
+ for child in node.children:
+ local_effort += walk_tree(child)
+ return node.performed_effort + local_effort
+
+ return walk_tree(self)
def add_child(self, child: Todo) -> None:
- """Add child to self.children, guard against recursion"""
+ """Add child to self.children, avoid recursion, update parenthoods."""
+
def walk_steps(node: Todo) -> None:
if node.id_ == self.id_:
raise BadFormatException('bad child choice causes recursion')
for child in node.children:
walk_steps(child)
+
if self.id_ is None:
raise HandledException('Can only add children to saved Todos.')
if child.id_ is None:
raise BadFormatException('cannot adopt same child twice')
walk_steps(child)
self.children += [child]
+ child.parents += [self]
+
+ def remove_child(self, child: Todo) -> None:
+ """Remove child from self.children, update counter relations."""
+ if child not in self.children:
+ raise HandledException('Cannot remove un-parented child.')
+ self.children.remove(child)
+ child.parents.remove(self)
+
+ def update_attrs(self, **kwargs: Any) -> None:
+ """Update self's attributes listed in kwargs."""
+ for k, v in kwargs.items():
+ setattr(self, k, v)
def save(self, db_conn: DatabaseConnection) -> None:
- """Write self and children to DB and its cache."""
- if self.process.id_ is None:
- raise NotFoundException('Process of Todo without ID (not saved?)')
- cursor = db_conn.exec('REPLACE INTO todos VALUES (?,?,?,?)',
- (self.id_, self.process.id_,
- self.is_done, self.day.date))
- self.id_ = cursor.lastrowid
- assert self.id_ is not None
- db_conn.cached_todos[self.id_] = self
- db_conn.exec('DELETE FROM todo_children WHERE parent = ?',
- (self.id_,))
- for child in self.children:
- db_conn.exec('INSERT INTO todo_children VALUES (?, ?)',
- (self.id_, child.id_))
+ """On save calls, also check if auto-deletion by effort < 0."""
+ if self.effort and self.effort < 0 and self.is_deletable:
+ self.remove(db_conn)
+ return
+ if self.id_ is None:
+ self.__class__.days_to_update.add(self.date)
+ super().save(db_conn)
+ for condition in self.enables + self.disables + self.conditions:
+ condition.save(db_conn)
+
+ def remove(self, db_conn: DatabaseConnection) -> None:
+ """Remove from DB, including relations."""
+ if not self.is_deletable:
+ raise HandledException('Cannot remove non-deletable Todo.')
+ self.__class__.days_to_update.add(self.date)
+ children_to_remove = self.children[:]
+ parents_to_remove = self.parents[:]
+ for child in children_to_remove:
+ self.remove_child(child)
+ for parent in parents_to_remove:
+ parent.remove_child(self)
+ super().remove(db_conn)