"""Actionables."""
from __future__ import annotations
+from dataclasses import dataclass
+from typing import Any
+from sqlite3 import Row
from plomtask.db import DatabaseConnection, BaseModel
from plomtask.processes import Process
-from plomtask.conditions import Condition
+from plomtask.conditions import Condition, ConditionsRelations
from plomtask.exceptions import (NotFoundException, BadFormatException,
HandledException)
-class Todo(BaseModel):
- """Individual actionable."""
+@dataclass
+class TodoStepsNode:
+ """Collects what's useful to know for Todo/Condition tree display."""
+ item: Todo | Condition
+ is_todo: bool
+ children: list[TodoStepsNode]
+ seen: bool
+ hide: bool
- # pylint: disable=too-many-instance-attributes
+class Todo(BaseModel[int], ConditionsRelations):
+ """Individual actionable."""
+ # pylint: disable=too-many-instance-attributes
table_name = 'todos'
to_save = ['process_id', 'is_done', 'date']
def __init__(self, id_: int | None, process: Process,
is_done: bool, date: str) -> None:
- self.set_int_id(id_)
+ super().__init__(id_)
self.process = process
self._is_done = is_done
self.date = date
self.children: list[Todo] = []
self.parents: list[Todo] = []
self.conditions: list[Condition] = []
- self.fulfills: list[Condition] = []
- self.undoes: list[Condition] = []
+ self.enables: list[Condition] = []
+ self.disables: list[Condition] = []
if not self.id_:
- self.conditions = process.conditions[:]
- self.fulfills = process.fulfills[:]
- self.undoes = process.undoes[:]
+ self.conditions = self.process.conditions[:]
+ self.enables = self.process.enables[:]
+ self.disables = self.process.disables[:]
@classmethod
- def by_id(cls, db_conn: DatabaseConnection, id_: int | None) -> Todo:
- """Get Todo of .id_=id_ and children (from DB cache if possible)."""
- if id_ in db_conn.cached_todos.keys():
- todo = db_conn.cached_todos[id_]
- else:
- todo = None
- for row in db_conn.exec('SELECT * FROM todos WHERE id = ?',
- (id_,)):
- row = list(row)
- if row[1] == 0:
- raise NotFoundException('calling Todo of '
- 'unsaved Process')
- row[1] = Process.by_id(db_conn, row[1])
- todo = cls.from_table_row(db_conn, row)
- break
- if todo is None:
- raise NotFoundException(f'Todo of ID not found: {id_}')
- for row in db_conn.exec('SELECT child FROM todo_children '
- 'WHERE parent = ?', (id_,)):
- todo.children += [cls.by_id(db_conn, row[0])]
- for row in db_conn.exec('SELECT parent FROM todo_children '
- 'WHERE child = ?', (id_,)):
- todo.parents += [cls.by_id(db_conn, row[0])]
- for row in db_conn.exec('SELECT condition FROM todo_conditions '
- 'WHERE todo = ?', (id_,)):
- todo.conditions += [Condition.by_id(db_conn, row[0])]
- for row in db_conn.exec('SELECT condition FROM todo_fulfills '
- 'WHERE todo = ?', (id_,)):
- todo.fulfills += [Condition.by_id(db_conn, row[0])]
- for row in db_conn.exec('SELECT condition FROM todo_undoes '
- 'WHERE todo = ?', (id_,)):
- todo.undoes += [Condition.by_id(db_conn, row[0])]
- assert isinstance(todo, Todo)
+ def from_table_row(cls, db_conn: DatabaseConnection,
+ row: Row | list[Any]) -> Todo:
+ """Make from DB row, with dependencies."""
+ if row[1] == 0:
+ raise NotFoundException('calling Todo of '
+ 'unsaved Process')
+ row_as_list = list(row)
+ row_as_list[1] = Process.by_id(db_conn, row[1])
+ todo = super().from_table_row(db_conn, row_as_list)
+ assert isinstance(todo.id_, int)
+ for t_id in db_conn.column_where('todo_children', 'child',
+ 'parent', todo.id_):
+ # pylint: disable=no-member
+ todo.children += [cls.by_id(db_conn, t_id)]
+ for t_id in db_conn.column_where('todo_children', 'parent',
+ 'child', todo.id_):
+ # pylint: disable=no-member
+ todo.parents += [cls.by_id(db_conn, t_id)]
+ for name in ('conditions', 'enables', 'disables'):
+ table = f'todo_{name}'
+ assert isinstance(todo.id_, int)
+ for cond_id in db_conn.column_where(table, 'condition',
+ 'todo', todo.id_):
+ target = getattr(todo, name)
+ target += [Condition.by_id(db_conn, cond_id)]
return todo
@classmethod
def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
"""Collect all Todos for Day of date."""
todos = []
- for row in db_conn.exec('SELECT id FROM todos WHERE day = ?', (date,)):
- todos += [cls.by_id(db_conn, row[0])]
+ for id_ in db_conn.column_where('todos', 'id', 'day', date):
+ todos += [cls.by_id(db_conn, id_)]
return todos
+ @staticmethod
+ def _x_ablers_for_at(db_conn: DatabaseConnection, name: str,
+ cond: Condition, date: str) -> list[Todo]:
+ """Collect all Todos of day that [name] condition."""
+ assert isinstance(cond.id_, int)
+ x_ablers = []
+ table = f'todo_{name}'
+ for id_ in db_conn.column_where(table, 'todo', 'condition', cond.id_):
+ todo = Todo.by_id(db_conn, id_)
+ if todo.date == date:
+ x_ablers += [todo]
+ return x_ablers
+
@classmethod
- def enablers_for_at(cls, db_conn: DatabaseConnection, condition: Condition,
- date: str) -> list[Todo]:
+ def enablers_for_at(cls, db_conn: DatabaseConnection,
+ condition: Condition, date: str) -> list[Todo]:
"""Collect all Todos of day that enable condition."""
- enablers = []
- for row in db_conn.exec('SELECT todo FROM todo_fulfills '
- 'WHERE condition = ?', (condition.id_,)):
- todo = cls.by_id(db_conn, row[0])
- if todo.date == date:
- enablers += [todo]
- return enablers
+ return cls._x_ablers_for_at(db_conn, 'enables', condition, date)
@classmethod
def disablers_for_at(cls, db_conn: DatabaseConnection,
condition: Condition, date: str) -> list[Todo]:
"""Collect all Todos of day that disable condition."""
- disablers = []
- for row in db_conn.exec('SELECT todo FROM todo_undoes '
- 'WHERE condition = ?', (condition.id_,)):
- todo = cls.by_id(db_conn, row[0])
- if todo.date == date:
- disablers += [todo]
- return disablers
+ return cls._x_ablers_for_at(db_conn, 'disables', condition, date)
@property
def is_doable(self) -> bool:
"""Return ID of tasked Process."""
return self.process.id_
+ @property
+ def unsatisfied_dependencies(self) -> list[int]:
+ """Return Process IDs of .process.explicit_steps not in .children."""
+ unsatisfied = [s.step_process_id for s in self.process.explicit_steps
+ if s.parent_step_id is None]
+ for child_process_id in [c.process.id_ for c in self.children]:
+ if child_process_id in unsatisfied:
+ unsatisfied.remove(child_process_id)
+ return unsatisfied
+
@property
def is_done(self) -> bool:
"""Wrapper around self._is_done so we can control its setter."""
if self._is_done != value:
self._is_done = value
if value is True:
- for condition in self.fulfills:
+ for condition in self.enables:
condition.is_active = True
- for condition in self.undoes:
+ for condition in self.disables:
condition.is_active = False
- def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
- """Set self.undoes to Conditions identified by ids."""
- self.set_conditions(db_conn, ids, 'undoes')
+ def adopt_from(self, todos: list[Todo]) -> None:
+ """As far as possible, fill unsatisfied dependencies from todos."""
+ for process_id in self.unsatisfied_dependencies:
+ for todo in [t for t in todos if t.process.id_ == process_id
+ and t not in self.children]:
+ self.add_child(todo)
+ break
+
+ def make_missing_children(self, db_conn: DatabaseConnection) -> None:
+ """Fill unsatisfied dependencies with new Todos."""
+ for process_id in self.unsatisfied_dependencies:
+ process = Process.by_id(db_conn, process_id)
+ todo = self.__class__(None, process, False, self.date)
+ todo.save(db_conn)
+ self.add_child(todo)
+
+ def get_step_tree(self, seen_todos: set[int],
+ seen_conditions: set[int]) -> TodoStepsNode:
+ """Return tree of depended-on Todos and Conditions."""
+
+ def make_node(step: Todo | Condition) -> TodoStepsNode:
+ assert isinstance(step.id_, int)
+ is_todo = isinstance(step, Todo)
+ children = []
+ if is_todo:
+ assert isinstance(step, Todo)
+ seen = step.id_ in seen_todos
+ seen_todos.add(step.id_)
+ potentially_enabled = set()
+ for child in step.children:
+ for condition in child.enables:
+ potentially_enabled.add(condition)
+ children += [make_node(child)]
+ for condition in [c for c in step.conditions
+ if (not c.is_active)
+ and (c not in potentially_enabled)]:
+ children += [make_node(condition)]
+ else:
+ seen = step.id_ in seen_conditions
+ seen_conditions.add(step.id_)
+ return TodoStepsNode(step, is_todo, children, seen, False)
- def set_fulfills(self, db_conn: DatabaseConnection,
- ids: list[int]) -> None:
- """Set self.fulfills to Conditions identified by ids."""
- self.set_conditions(db_conn, ids, 'fulfills')
+ node = make_node(self)
+ return node
- def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
- target: str = 'conditions') -> None:
- """Set self.[target] to Conditions identified by ids."""
- target_list = getattr(self, target)
- while len(target_list) > 0:
- target_list.pop()
- for id_ in ids:
- target_list += [Condition.by_id(db_conn, id_)]
+ def get_undone_steps_tree(self) -> TodoStepsNode:
+ """Return tree of depended-on undone Todos and Conditions."""
+
+ def walk_tree(node: TodoStepsNode) -> None:
+ if isinstance(node.item, Todo) and node.item.is_done:
+ node.hide = True
+ for child in node.children:
+ walk_tree(child)
+
+ seen_todos: set[int] = set()
+ seen_conditions: set[int] = set()
+ step_tree = self.get_step_tree(seen_todos, seen_conditions)
+ walk_tree(step_tree)
+ return step_tree
+
+ def get_done_steps_tree(self) -> list[TodoStepsNode]:
+ """Return tree of depended-on done Todos."""
+
+ def make_nodes(node: TodoStepsNode) -> list[TodoStepsNode]:
+ children: list[TodoStepsNode] = []
+ if not isinstance(node.item, Todo):
+ return children
+ for child in node.children:
+ children += make_nodes(child)
+ if node.item.is_done:
+ node.children = children
+ return [node]
+ return children
+
+ seen_todos: set[int] = set()
+ seen_conditions: set[int] = set()
+ step_tree = self.get_step_tree(seen_todos, seen_conditions)
+ nodes = make_nodes(step_tree)
+ return nodes
def add_child(self, child: Todo) -> None:
- """Add child to self.children, guard against recursion"""
+ """Add child to self.children, avoid recursion, update parenthoods."""
+
def walk_steps(node: Todo) -> None:
if node.id_ == self.id_:
raise BadFormatException('bad child choice causes recursion')
for child in node.children:
walk_steps(child)
+
if self.id_ is None:
raise HandledException('Can only add children to saved Todos.')
if child.id_ is None:
self.children += [child]
child.parents += [self]
+ def remove_child(self, child: Todo) -> None:
+ """Remove child from self.children, update counter relations."""
+ if child not in self.children:
+ raise HandledException('Cannot remove un-parented child.')
+ self.children.remove(child)
+ child.parents.remove(self)
+
def save(self, db_conn: DatabaseConnection) -> None:
"""Write self and children to DB and its cache."""
if self.process.id_ is None:
raise NotFoundException('Process of Todo without ID (not saved?)')
self.save_core(db_conn)
assert isinstance(self.id_, int)
- db_conn.cached_todos[self.id_] = self
- db_conn.exec('DELETE FROM todo_children WHERE parent = ?',
- (self.id_,))
- for child in self.children:
- db_conn.exec('INSERT INTO todo_children VALUES (?, ?)',
- (self.id_, child.id_))
- db_conn.exec('DELETE FROM todo_fulfills WHERE todo = ?', (self.id_,))
- for condition in self.fulfills:
- if condition.id_ is None:
- raise NotFoundException('Fulfilled Condition of Todo '
- 'without ID (not saved?)')
- db_conn.exec('INSERT INTO todo_fulfills VALUES (?, ?)',
- (self.id_, condition.id_))
- db_conn.exec('DELETE FROM todo_undoes WHERE todo = ?', (self.id_,))
- for condition in self.undoes:
- if condition.id_ is None:
- raise NotFoundException('Undone Condition of Todo '
- 'without ID (not saved?)')
- db_conn.exec('INSERT INTO todo_undoes VALUES (?, ?)',
- (self.id_, condition.id_))
- db_conn.exec('DELETE FROM todo_conditions WHERE todo = ?', (self.id_,))
- for condition in self.conditions:
- if condition.id_ is None:
- raise NotFoundException('Condition of Todo '
- 'without ID (not saved?)')
- db_conn.exec('INSERT INTO todo_conditions VALUES (?, ?)',
- (self.id_, condition.id_))
+ db_conn.rewrite_relations('todo_children', 'child', self.id_,
+ [[p.id_] for p in self.parents])
+ db_conn.rewrite_relations('todo_children', 'parent', self.id_,
+ [[c.id_] for c in self.children])
+ db_conn.rewrite_relations('todo_conditions', 'todo', self.id_,
+ [[c.id_] for c in self.conditions])
+ db_conn.rewrite_relations('todo_enables', 'todo', self.id_,
+ [[c.id_] for c in self.enables])
+ db_conn.rewrite_relations('todo_disables', 'todo', self.id_,
+ [[c.id_] for c in self.disables])
+
+ def remove(self, db_conn: DatabaseConnection) -> None:
+ """Remove from DB, including relations."""
+ assert isinstance(self.id_, int)
+ children_to_remove = self.children[:]
+ parents_to_remove = self.parents[:]
+ for child in children_to_remove:
+ self.remove_child(child)
+ for parent in parents_to_remove:
+ parent.remove_child(self)
+ db_conn.delete_where('todo_children', 'parent', self.id_)
+ db_conn.delete_where('todo_children', 'child', self.id_)
+ db_conn.delete_where('todo_conditions', 'todo', self.id_)
+ db_conn.delete_where('todo_enables', 'todo', self.id_)
+ db_conn.delete_where('todo_disables', 'todo', self.id_)
+ super().remove(db_conn)