2 from __future__ import annotations
3 from dataclasses import dataclass
5 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.processes import Process, ProcessStepsNode
8 from plomtask.versioned_attributes import VersionedAttribute
9 from plomtask.conditions import Condition, ConditionsRelations
10 from plomtask.exceptions import (NotFoundException, BadFormatException,
12 from plomtask.dating import valid_date
17 """Collects what's useful to know for Todo/Condition tree display."""
20 children: list[TodoNode]
23 class Todo(BaseModel[int], ConditionsRelations):
24 """Individual actionable."""
25 # pylint: disable=too-many-instance-attributes
27 to_save = ['process_id', 'is_done', 'date', 'comment', 'effort',
29 to_save_relations = [('todo_conditions', 'todo', 'conditions', 0),
30 ('todo_blockers', 'todo', 'blockers', 0),
31 ('todo_enables', 'todo', 'enables', 0),
32 ('todo_disables', 'todo', 'disables', 0),
33 ('todo_children', 'parent', 'children', 0),
34 ('todo_children', 'child', 'parents', 1)]
35 to_search = ['comment']
37 # pylint: disable=too-many-arguments
38 def __init__(self, id_: int | None,
41 date: str, comment: str = '',
42 effort: None | float = None,
43 calendarize: bool = False) -> None:
44 BaseModel.__init__(self, id_)
45 ConditionsRelations.__init__(self)
46 if process.id_ is None:
47 raise NotFoundException('Process of Todo without ID (not saved?)')
48 self.process = process
49 self._is_done = is_done
50 self.date = valid_date(date)
51 self.comment = comment
53 self.children: list[Todo] = []
54 self.parents: list[Todo] = []
55 self.calendarize = calendarize
57 self.calendarize = self.process.calendarize
58 self.conditions = self.process.conditions[:]
59 self.blockers = self.process.blockers[:]
60 self.enables = self.process.enables[:]
61 self.disables = self.process.disables[:]
64 def by_date_range(cls, db_conn: DatabaseConnection,
65 date_range: tuple[str, str] = ('', '')) -> list[Todo]:
66 """Collect Todos of Days within date_range."""
67 todos, _, _ = cls.by_date_range_with_limits(db_conn, date_range)
71 def create_with_children(cls, db_conn: DatabaseConnection,
72 process_id: int, date: str) -> Todo:
73 """Create Todo of process for date, ensure children."""
75 def key_order_func(n: ProcessStepsNode) -> int:
76 assert isinstance(n.process.id_, int)
79 def walk_steps(parent: Todo, step_node: ProcessStepsNode) -> Todo:
80 adoptables = [t for t in cls.by_date(db_conn, date)
81 if (t not in parent.children)
83 and step_node.process == t.process]
85 for adoptable in adoptables:
89 satisfier = cls(None, step_node.process, False, date)
90 satisfier.save(db_conn)
91 sub_step_nodes = list(step_node.steps.values())
92 sub_step_nodes.sort(key=key_order_func)
93 for sub_node in sub_step_nodes:
94 n_slots = len([n for n in sub_step_nodes
95 if n.process == sub_node.process])
96 filled_slots = len([t for t in satisfier.children
97 if t.process == sub_node.process])
98 # if we did not newly create satisfier, it may already fill
99 # some step dependencies, so only fill what remains open
100 if n_slots - filled_slots > 0:
101 satisfier.add_child(walk_steps(satisfier, sub_node))
102 satisfier.save(db_conn)
105 process = Process.by_id(db_conn, process_id)
106 todo = cls(None, process, False, date)
108 steps_tree = process.get_steps(db_conn)
109 for step_node in steps_tree.values():
110 todo.add_child(walk_steps(todo, step_node))
115 def from_table_row(cls, db_conn: DatabaseConnection,
116 row: Row | list[Any]) -> Todo:
117 """Make from DB row, with dependencies."""
119 raise NotFoundException('calling Todo of '
121 row_as_list = list(row)
122 row_as_list[1] = Process.by_id(db_conn, row[1])
123 todo = super().from_table_row(db_conn, row_as_list)
124 assert isinstance(todo.id_, int)
125 for t_id in db_conn.column_where('todo_children', 'child',
127 # pylint: disable=no-member
128 todo.children += [cls.by_id(db_conn, t_id)]
129 for t_id in db_conn.column_where('todo_children', 'parent',
131 # pylint: disable=no-member
132 todo.parents += [cls.by_id(db_conn, t_id)]
133 for name in ('conditions', 'blockers', 'enables', 'disables'):
134 table = f'todo_{name}'
135 assert isinstance(todo.id_, int)
136 for cond_id in db_conn.column_where(table, 'condition',
138 target = getattr(todo, name)
139 target += [Condition.by_id(db_conn, cond_id)]
143 def by_process_id(cls, db_conn: DatabaseConnection,
144 process_id: int | None) -> list[Todo]:
145 """Collect all Todos of Process of process_id."""
146 return [t for t in cls.all(db_conn) if t.process.id_ == process_id]
149 def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
150 """Collect all Todos for Day of date."""
151 return cls.by_date_range(db_conn, (date, date))
154 def is_doable(self) -> bool:
155 """Decide whether .is_done settable based on children, Conditions."""
156 for child in self.children:
157 if not child.is_done:
159 for condition in self.conditions:
160 if not condition.is_active:
162 for condition in self.blockers:
163 if condition.is_active:
168 def is_deletable(self) -> bool:
169 """Decide whether self be deletable (not if preserve-worthy values)."""
172 if self.effort and self.effort >= 0:
177 def process_id(self) -> int | str | None:
178 """Needed for super().save to save Processes as attributes."""
179 return self.process.id_
182 def is_done(self) -> bool:
183 """Wrapper around self._is_done so we can control its setter."""
187 def is_done(self, value: bool) -> None:
188 if value != self.is_done and not self.is_doable:
189 raise BadFormatException('cannot change doneness of undoable Todo')
190 if self._is_done != value:
191 self._is_done = value
193 for condition in self.enables:
194 condition.is_active = True
195 for condition in self.disables:
196 condition.is_active = False
199 def title(self) -> VersionedAttribute:
200 """Shortcut to .process.title."""
201 return self.process.title
204 def title_then(self) -> str:
205 """Shortcut to .process.title.at(self.date)"""
206 title_then = self.process.title.at(self.date)
207 assert isinstance(title_then, str)
211 def effort_then(self) -> float:
212 """Shortcut to .process.effort.at(self.date)"""
213 effort_then = self.process.effort.at(self.date)
214 assert isinstance(effort_then, float)
218 def has_doneness_in_path(self) -> bool:
219 """Check whether self is done or has any children that are."""
222 for child in self.children:
225 if child.has_doneness_in_path:
229 def get_step_tree(self, seen_todos: set[int]) -> TodoNode:
230 """Return tree of depended-on Todos."""
232 def make_node(todo: Todo) -> TodoNode:
234 seen = todo.id_ in seen_todos
235 assert isinstance(todo.id_, int)
236 seen_todos.add(todo.id_)
237 for child in todo.children:
238 children += [make_node(child)]
239 return TodoNode(todo, seen, children)
241 return make_node(self)
243 def add_child(self, child: Todo) -> None:
244 """Add child to self.children, avoid recursion, update parenthoods."""
246 def walk_steps(node: Todo) -> None:
247 if node.id_ == self.id_:
248 raise BadFormatException('bad child choice causes recursion')
249 for child in node.children:
253 raise HandledException('Can only add children to saved Todos.')
254 if child.id_ is None:
255 raise HandledException('Can only add saved children to Todos.')
256 if child in self.children:
257 raise BadFormatException('cannot adopt same child twice')
259 self.children += [child]
260 child.parents += [self]
262 def remove_child(self, child: Todo) -> None:
263 """Remove child from self.children, update counter relations."""
264 if child not in self.children:
265 raise HandledException('Cannot remove un-parented child.')
266 self.children.remove(child)
267 child.parents.remove(self)
269 def save(self, db_conn: DatabaseConnection) -> None:
270 """On save calls, also check if auto-deletion by effort < 0."""
271 if self.effort and self.effort < 0 and self.is_deletable:
274 super().save(db_conn)
276 def remove(self, db_conn: DatabaseConnection) -> None:
277 """Remove from DB, including relations."""
278 if not self.is_deletable:
279 raise HandledException('Cannot remove non-deletable Todo.')
280 children_to_remove = self.children[:]
281 parents_to_remove = self.parents[:]
282 for child in children_to_remove:
283 self.remove_child(child)
284 for parent in parents_to_remove:
285 parent.remove_child(self)
286 super().remove(db_conn)