2 from __future__ import annotations
3 from typing import Any, Set
4 from sqlite3 import Row
5 from plomtask.db import DatabaseConnection, BaseModel
6 from plomtask.processes import Process, ProcessStepsNode
7 from plomtask.versioned_attributes import VersionedAttribute
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import (NotFoundException, BadFormatException,
11 from plomtask.dating import valid_date
15 """Template for TodoNode, TodoOrStepsNode providing .as_dict_and_refs."""
16 # pylint: disable=too-few-public-methods
17 _to_dict: list[str] = []
19 def __init__(self, *args: Any) -> None:
20 for i, arg in enumerate(args):
21 setattr(self, self._to_dict[i], arg)
24 def as_dict_and_refs(self) -> tuple[dict[str, object], list[Any]]:
25 """Return self as json.dumps-ready dict, list of referenced objects."""
28 for name in self._to_dict:
29 attr = getattr(self, name)
30 if hasattr(attr, 'id_'):
33 if isinstance(attr, list):
36 item_d, item_refs = item.as_dict_and_refs
38 for item_ref in [r for r in item_refs if r not in refs]:
45 class TodoNode(DictableNode):
46 """Collects what's useful to know for Todo/Condition tree display."""
47 # pylint: disable=too-few-public-methods
50 children: list[TodoNode]
51 _to_dict = ['todo', 'seen', 'children']
54 class TodoOrProcStepNode(DictableNode):
55 """Collect what's useful for Todo-or-ProcessStep tree display."""
56 # pylint: disable=too-few-public-methods
59 process: Process | None
60 children: list[TodoOrProcStepNode] # pylint: disable=undefined-variable
61 fillable: bool = False
62 _to_dict = ['node_id', 'todo', 'process', 'children', 'fillable']
65 class Todo(BaseModel[int], ConditionsRelations):
66 """Individual actionable."""
67 # pylint: disable=too-many-instance-attributes
68 # pylint: disable=too-many-public-methods
70 to_save_simples = ['process_id', 'is_done', 'date', 'comment', 'effort',
72 to_save_relations = [('todo_conditions', 'todo', 'conditions', 0),
73 ('todo_blockers', 'todo', 'blockers', 0),
74 ('todo_enables', 'todo', 'enables', 0),
75 ('todo_disables', 'todo', 'disables', 0),
76 ('todo_children', 'parent', 'children', 0),
77 ('todo_children', 'child', 'parents', 1)]
78 to_search = ['comment']
79 days_to_update: Set[str] = set()
82 sorters = {'doneness': lambda t: t.is_done,
83 'title': lambda t: t.title_then,
84 'comment': lambda t: t.comment,
85 'date': lambda t: t.date}
87 # pylint: disable=too-many-arguments
88 def __init__(self, id_: int | None,
91 date: str, comment: str = '',
92 effort: None | float = None,
93 calendarize: bool = False) -> None:
94 BaseModel.__init__(self, id_)
95 ConditionsRelations.__init__(self)
96 if process.id_ is None:
97 raise NotFoundException('Process of Todo without ID (not saved?)')
98 self.process = process
99 self._is_done = is_done
100 self.date = valid_date(date)
101 self.comment = comment
105 self.calendarize = calendarize
107 self.calendarize = self.process.calendarize
108 self.conditions = self.process.conditions[:]
109 self.blockers = self.process.blockers[:]
110 self.enables = self.process.enables[:]
111 self.disables = self.process.disables[:]
114 def by_date_range(cls, db_conn: DatabaseConnection,
115 date_range: tuple[str, str] = ('', '')) -> list[Todo]:
116 """Collect Todos of Days within date_range."""
117 todos, _, _ = cls.by_date_range_with_limits(db_conn, date_range)
121 def create_with_children(cls, db_conn: DatabaseConnection,
122 process_id: int, date: str) -> Todo:
123 """Create Todo of process for date, ensure children demanded by chain.
125 At minimum creates Todo of process_id, but checks the respective
126 Process for its step tree, and walks down that to provide the initial
127 Todo with all descendants defined there, either adopting existing
128 Todos, or creating them where necessary.
131 def key_order_func(n: ProcessStepsNode) -> int:
132 assert isinstance(n.process.id_, int)
135 def walk_steps(parent: Todo, step_node: ProcessStepsNode) -> Todo:
136 adoptables = [t for t in cls.by_date(db_conn, date)
137 if (t not in parent.children)
139 and step_node.process == t.process]
141 for adoptable in adoptables:
142 satisfier = adoptable
145 satisfier = cls(None, step_node.process, False, date)
146 satisfier.save(db_conn)
147 sub_step_nodes = list(step_node.steps.values())
148 sub_step_nodes.sort(key=key_order_func)
149 for sub_node in sub_step_nodes:
150 if sub_node.is_suppressed:
152 n_slots = len([n for n in sub_step_nodes
153 if n.process == sub_node.process])
154 filled_slots = len([t for t in satisfier.children
155 if t.process == sub_node.process])
156 # if we did not newly create satisfier, it may already fill
157 # some step dependencies, so only fill what remains open
158 if n_slots - filled_slots > 0:
159 satisfier.add_child(walk_steps(satisfier, sub_node))
160 satisfier.save(db_conn)
163 process = Process.by_id(db_conn, process_id)
164 todo = cls(None, process, False, date)
166 steps_tree = process.get_steps(db_conn)
167 for step_node in steps_tree.values():
168 if step_node.is_suppressed:
170 todo.add_child(walk_steps(todo, step_node))
175 def from_table_row(cls, db_conn: DatabaseConnection,
176 row: Row | list[Any]) -> Todo:
177 """Make from DB row, with dependencies."""
179 raise NotFoundException('calling Todo of '
181 row_as_list = list(row)
182 row_as_list[1] = Process.by_id(db_conn, row[1])
183 todo = super().from_table_row(db_conn, row_as_list)
184 assert isinstance(todo.id_, int)
185 for t_id in db_conn.column_where('todo_children', 'child',
187 todo.children += [cls.by_id(db_conn, t_id)]
188 for t_id in db_conn.column_where('todo_children', 'parent',
190 todo.parents += [cls.by_id(db_conn, t_id)]
191 for name in ('conditions', 'blockers', 'enables', 'disables'):
192 table = f'todo_{name}'
193 assert isinstance(todo.id_, int)
194 for cond_id in db_conn.column_where(table, 'condition',
196 target = getattr(todo, name)
197 target += [Condition.by_id(db_conn, cond_id)]
201 def by_process_id(cls, db_conn: DatabaseConnection,
202 process_id: int | None) -> list[Todo]:
203 """Collect all Todos of Process of process_id."""
204 return [t for t in cls.all(db_conn) if t.process.id_ == process_id]
207 def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
208 """Collect all Todos for Day of date."""
209 return cls.by_date_range(db_conn, (date, date))
212 def is_doable(self) -> bool:
213 """Decide whether .is_done settable based on children, Conditions."""
214 for child in self.children:
215 if not child.is_done:
217 for condition in self.conditions:
218 if not condition.is_active:
220 for condition in self.blockers:
221 if condition.is_active:
226 def is_deletable(self) -> bool:
227 """Decide whether self be deletable (not if preserve-worthy values)."""
230 if self.effort and self.effort >= 0:
235 def performed_effort(self) -> float:
236 """Return performed effort, i.e. self.effort or default if done.."""
237 if self.effort is not None:
240 return self.effort_then
244 def process_id(self) -> int | str | None:
245 """Needed for super().save to save Processes as attributes."""
246 return self.process.id_
249 def is_done(self) -> bool:
250 """Wrapper around self._is_done so we can control its setter."""
254 def is_done(self, value: bool) -> None:
255 if value != self.is_done and not self.is_doable:
256 raise BadFormatException('cannot change doneness of undoable Todo')
257 if self._is_done != value:
258 self._is_done = value
260 for condition in self.enables:
261 condition.is_active = True
262 for condition in self.disables:
263 condition.is_active = False
266 def title(self) -> VersionedAttribute:
267 """Shortcut to .process.title."""
268 assert isinstance(self.process.title, VersionedAttribute)
269 return self.process.title
272 def title_then(self) -> str:
273 """Shortcut to .process.title.at(self.date)"""
274 title_then = self.process.title.at(self.date)
275 assert isinstance(title_then, str)
279 def effort_then(self) -> float:
280 """Shortcut to .process.effort.at(self.date)"""
281 effort_then = self.process.effort.at(self.date)
282 assert isinstance(effort_then, float)
286 def has_doneness_in_path(self) -> bool:
287 """Check whether self is done or has any children that are."""
290 for child in self.children:
293 if child.has_doneness_in_path:
297 def get_step_tree(self, seen_todos: set[int]) -> TodoNode:
298 """Return tree of depended-on Todos."""
300 def make_node(todo: Todo) -> TodoNode:
302 seen = todo.id_ in seen_todos
303 assert isinstance(todo.id_, int)
304 seen_todos.add(todo.id_)
305 for child in todo.children:
306 children += [make_node(child)]
307 return TodoNode(todo, seen, children)
309 return make_node(self)
312 def tree_effort(self) -> float:
313 """Return sum of performed efforts of self and all descendants."""
315 def walk_tree(node: Todo) -> float:
317 for child in node.children:
318 local_effort += walk_tree(child)
319 return node.performed_effort + local_effort
321 return walk_tree(self)
323 def add_child(self, child: Todo) -> None:
324 """Add child to self.children, avoid recursion, update parenthoods."""
326 def walk_steps(node: Todo) -> None:
327 if node.id_ == self.id_:
328 raise BadFormatException('bad child choice causes recursion')
329 for child in node.children:
333 raise HandledException('Can only add children to saved Todos.')
334 if child.id_ is None:
335 raise HandledException('Can only add saved children to Todos.')
336 if child in self.children:
337 raise BadFormatException('cannot adopt same child twice')
339 self.children += [child]
340 child.parents += [self]
342 def remove_child(self, child: Todo) -> None:
343 """Remove child from self.children, update counter relations."""
344 if child not in self.children:
345 raise HandledException('Cannot remove un-parented child.')
346 self.children.remove(child)
347 child.parents.remove(self)
349 def save(self, db_conn: DatabaseConnection) -> None:
350 """On save calls, also check if auto-deletion by effort < 0."""
351 if self.effort and self.effort < 0 and self.is_deletable:
355 self.__class__.days_to_update.add(self.date)
356 super().save(db_conn)
357 for condition in self.enables + self.disables + self.conditions:
358 condition.save(db_conn)
360 def remove(self, db_conn: DatabaseConnection) -> None:
361 """Remove from DB, including relations."""
362 if not self.is_deletable:
363 raise HandledException('Cannot remove non-deletable Todo.')
364 self.__class__.days_to_update.add(self.date)
365 children_to_remove = self.children[:]
366 parents_to_remove = self.parents[:]
367 for child in children_to_remove:
368 self.remove_child(child)
369 for parent in parents_to_remove:
370 parent.remove_child(self)
371 super().remove(db_conn)