from typing import Any
from sqlite3 import Row
from plomtask.db import DatabaseConnection, BaseModel
-from plomtask.processes import Process
+from plomtask.processes import Process, ProcessStepsNode
+from plomtask.versioned_attributes import VersionedAttribute
from plomtask.conditions import Condition, ConditionsRelations
from plomtask.exceptions import (NotFoundException, BadFormatException,
HandledException)
+from plomtask.dating import valid_date
@dataclass
"""Individual actionable."""
# pylint: disable=too-many-instance-attributes
table_name = 'todos'
- to_save = ['process_id', 'is_done', 'date', 'comment']
- to_save_relations = [('todo_conditions', 'todo', 'conditions'),
- ('todo_enables', 'todo', 'enables'),
- ('todo_disables', 'todo', 'disables'),
- ('todo_children', 'parent', 'children'),
- ('todo_children', 'child', 'parents')]
+ to_save = ['process_id', 'is_done', 'date', 'comment', 'effort',
+ 'calendarize']
+ to_save_relations = [('todo_conditions', 'todo', 'conditions', 0),
+ ('todo_blockers', 'todo', 'blockers', 0),
+ ('todo_enables', 'todo', 'enables', 0),
+ ('todo_disables', 'todo', 'disables', 0),
+ ('todo_children', 'parent', 'children', 0),
+ ('todo_children', 'child', 'parents', 1)]
+ to_search = ['comment']
# pylint: disable=too-many-arguments
- def __init__(self, id_: int | None, process: Process,
- is_done: bool, date: str, comment: str = '') -> None:
- super().__init__(id_)
+ def __init__(self, id_: int | None,
+ process: Process,
+ is_done: bool,
+ date: str, comment: str = '',
+ effort: None | float = None,
+ calendarize: bool = False) -> None:
+ BaseModel.__init__(self, id_)
+ ConditionsRelations.__init__(self)
if process.id_ is None:
raise NotFoundException('Process of Todo without ID (not saved?)')
self.process = process
self._is_done = is_done
- self.date = date
+ self.date = valid_date(date)
self.comment = comment
+ self.effort = effort
self.children: list[Todo] = []
self.parents: list[Todo] = []
- self.conditions: list[Condition] = []
- self.enables: list[Condition] = []
- self.disables: list[Condition] = []
+ self.calendarize = calendarize
if not self.id_:
+ self.calendarize = self.process.calendarize
self.conditions = self.process.conditions[:]
+ self.blockers = self.process.blockers[:]
self.enables = self.process.enables[:]
self.disables = self.process.disables[:]
+ @classmethod
+ def by_date_range(cls, db_conn: DatabaseConnection,
+ date_range: tuple[str, str] = ('', '')) -> list[Todo]:
+ """Collect Todos of Days within date_range."""
+ todos, _, _ = cls.by_date_range_with_limits(db_conn, date_range)
+ return todos
+
+ @classmethod
+ def create_with_children(cls, db_conn: DatabaseConnection,
+ process_id: int, date: str) -> Todo:
+ """Create Todo of process for date, ensure children."""
+
+ def key_order_func(n: ProcessStepsNode) -> int:
+ assert isinstance(n.process.id_, int)
+ return n.process.id_
+
+ def walk_steps(parent: Todo, step_node: ProcessStepsNode) -> Todo:
+ adoptables = [t for t in cls.by_date(db_conn, date)
+ if (t not in parent.children)
+ and (t != parent)
+ and step_node.process == t.process]
+ satisfier = None
+ for adoptable in adoptables:
+ satisfier = adoptable
+ break
+ if not satisfier:
+ satisfier = cls(None, step_node.process, False, date)
+ satisfier.save(db_conn)
+ sub_step_nodes = list(step_node.steps.values())
+ sub_step_nodes.sort(key=key_order_func)
+ for sub_node in sub_step_nodes:
+ if sub_node.is_suppressed:
+ continue
+ n_slots = len([n for n in sub_step_nodes
+ if n.process == sub_node.process])
+ filled_slots = len([t for t in satisfier.children
+ if t.process == sub_node.process])
+ # if we did not newly create satisfier, it may already fill
+ # some step dependencies, so only fill what remains open
+ if n_slots - filled_slots > 0:
+ satisfier.add_child(walk_steps(satisfier, sub_node))
+ satisfier.save(db_conn)
+ return satisfier
+
+ process = Process.by_id(db_conn, process_id)
+ todo = cls(None, process, False, date)
+ todo.save(db_conn)
+ steps_tree = process.get_steps(db_conn)
+ for step_node in steps_tree.values():
+ if step_node.is_suppressed:
+ continue
+ todo.add_child(walk_steps(todo, step_node))
+ todo.save(db_conn)
+ return todo
+
@classmethod
def from_table_row(cls, db_conn: DatabaseConnection,
row: Row | list[Any]) -> Todo:
'child', todo.id_):
# pylint: disable=no-member
todo.parents += [cls.by_id(db_conn, t_id)]
- for name in ('conditions', 'enables', 'disables'):
+ for name in ('conditions', 'blockers', 'enables', 'disables'):
table = f'todo_{name}'
assert isinstance(todo.id_, int)
for cond_id in db_conn.column_where(table, 'condition',
target += [Condition.by_id(db_conn, cond_id)]
return todo
+ @classmethod
+ def by_process_id(cls, db_conn: DatabaseConnection,
+ process_id: int | None) -> list[Todo]:
+ """Collect all Todos of Process of process_id."""
+ return [t for t in cls.all(db_conn) if t.process.id_ == process_id]
+
@classmethod
def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
"""Collect all Todos for Day of date."""
- todos = []
- for id_ in db_conn.column_where('todos', 'id', 'day', date):
- todos += [cls.by_id(db_conn, id_)]
- return todos
+ return cls.by_date_range(db_conn, (date, date))
@property
def is_doable(self) -> bool:
for condition in self.conditions:
if not condition.is_active:
return False
+ for condition in self.blockers:
+ if condition.is_active:
+ return False
return True
+ @property
+ def is_deletable(self) -> bool:
+ """Decide whether self be deletable (not if preserve-worthy values)."""
+ if self.comment:
+ return False
+ if self.effort and self.effort >= 0:
+ return False
+ return True
+
+ @property
+ def performed_effort(self) -> float:
+ """Return performed effort, i.e. self.effort or default if done.."""
+ if self.effort is not None:
+ return self.effort
+ if self.is_done:
+ return self.effort_then
+ return 0
+
@property
def process_id(self) -> int | str | None:
"""Needed for super().save to save Processes as attributes."""
return self.process.id_
- @property
- def unsatisfied_dependencies(self) -> list[int]:
- """Return Process IDs of .process.explicit_steps not in .children."""
- unsatisfied = [s.step_process_id for s in self.process.explicit_steps
- if s.parent_step_id is None]
- for child_process_id in [c.process.id_ for c in self.children]:
- if child_process_id in unsatisfied:
- unsatisfied.remove(child_process_id)
- return unsatisfied
-
@property
def is_done(self) -> bool:
"""Wrapper around self._is_done so we can control its setter."""
for condition in self.disables:
condition.is_active = False
- def adopt_from(self, todos: list[Todo]) -> bool:
- """As far as possible, fill unsatisfied dependencies from todos."""
- adopted = False
- for process_id in self.unsatisfied_dependencies:
- for todo in [t for t in todos if t.process.id_ == process_id
- and t not in self.children]:
- self.add_child(todo)
- adopted = True
- break
- return adopted
+ @property
+ def title(self) -> VersionedAttribute:
+ """Shortcut to .process.title."""
+ return self.process.title
- def make_missing_children(self, db_conn: DatabaseConnection) -> None:
- """Fill unsatisfied dependencies with new Todos."""
- for process_id in self.unsatisfied_dependencies:
- process = Process.by_id(db_conn, process_id)
- todo = self.__class__(None, process, False, self.date)
- todo.save(db_conn)
- self.add_child(todo)
+ @property
+ def title_then(self) -> str:
+ """Shortcut to .process.title.at(self.date)"""
+ title_then = self.process.title.at(self.date)
+ assert isinstance(title_then, str)
+ return title_then
+
+ @property
+ def effort_then(self) -> float:
+ """Shortcut to .process.effort.at(self.date)"""
+ effort_then = self.process.effort.at(self.date)
+ assert isinstance(effort_then, float)
+ return effort_then
+
+ @property
+ def has_doneness_in_path(self) -> bool:
+ """Check whether self is done or has any children that are."""
+ if self.is_done:
+ return True
+ for child in self.children:
+ if child.is_done:
+ return True
+ if child.has_doneness_in_path:
+ return True
+ return False
def get_step_tree(self, seen_todos: set[int]) -> TodoNode:
"""Return tree of depended-on Todos."""
return make_node(self)
+ @property
+ def tree_effort(self) -> float:
+ """Return sum of performed efforts of self and all descendants."""
+
+ def walk_tree(node: Todo) -> float:
+ local_effort = 0
+ for child in node.children:
+ local_effort += walk_tree(child)
+ return node.performed_effort + local_effort
+
+ return walk_tree(self)
+
def add_child(self, child: Todo) -> None:
"""Add child to self.children, avoid recursion, update parenthoods."""
self.children.remove(child)
child.parents.remove(self)
+ def save(self, db_conn: DatabaseConnection) -> None:
+ """On save calls, also check if auto-deletion by effort < 0."""
+ if self.effort and self.effort < 0 and self.is_deletable:
+ self.remove(db_conn)
+ return
+ super().save(db_conn)
+
def remove(self, db_conn: DatabaseConnection) -> None:
"""Remove from DB, including relations."""
+ if not self.is_deletable:
+ raise HandledException('Cannot remove non-deletable Todo.')
children_to_remove = self.children[:]
parents_to_remove = self.parents[:]
for child in children_to_remove: