2 from __future__ import annotations
3 from typing import Any, Set
4 from sqlite3 import Row
5 from plomtask.db import DatabaseConnection, BaseModel
6 from plomtask.processes import Process, ProcessStepsNode
7 from plomtask.versioned_attributes import VersionedAttribute
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import (NotFoundException, BadFormatException,
11 from plomtask.dating import valid_date
15 """Collects what's useful to know for Todo/Condition tree display."""
16 # pylint: disable=too-few-public-methods
19 children: list[TodoNode]
24 children: list[TodoNode]) -> None:
27 self.children = children
30 def as_dict(self) -> dict[str, object]:
31 """Return self as (json.dumps-coompatible) dict."""
32 return {'todo': self.todo.id_,
34 'children': [c.as_dict for c in self.children]}
38 """Collect what's useful for Todo steps tree display."""
39 # pylint: disable=too-few-public-methods
42 process: Process | None
43 children: list[TodoStepsNode] # pylint: disable=undefined-variable
49 process: Process | None,
50 children: list[TodoStepsNode],
51 fillable: bool = False):
52 # pylint: disable=too-many-arguments
55 self.process = process
56 self.children = children
57 self.fillable = fillable
60 def as_dict(self) -> dict[str, object]:
61 """Return self as (json.dumps-compatible) dict."""
62 return {'id': self.id_,
63 'todo': self.todo.id_ if self.todo else None,
64 'process': self.process.id_ if self.process else None,
65 'children': [c.as_dict for c in self.children],
66 'fillable': self.fillable}
69 class Todo(BaseModel[int], ConditionsRelations):
70 """Individual actionable."""
71 # pylint: disable=too-many-instance-attributes
72 # pylint: disable=too-many-public-methods
74 to_save_simples = ['process_id', 'is_done', 'date', 'comment', 'effort',
76 to_save_relations = [('todo_conditions', 'todo', 'conditions', 0),
77 ('todo_blockers', 'todo', 'blockers', 0),
78 ('todo_enables', 'todo', 'enables', 0),
79 ('todo_disables', 'todo', 'disables', 0),
80 ('todo_children', 'parent', 'children', 0),
81 ('todo_children', 'child', 'parents', 1)]
82 to_search = ['comment']
83 days_to_update: Set[str] = set()
86 sorters = {'doneness': lambda t: t.is_done,
87 'title': lambda t: t.title_then,
88 'comment': lambda t: t.comment,
89 'date': lambda t: t.date}
91 # pylint: disable=too-many-arguments
92 def __init__(self, id_: int | None,
95 date: str, comment: str = '',
96 effort: None | float = None,
97 calendarize: bool = False) -> None:
98 BaseModel.__init__(self, id_)
99 ConditionsRelations.__init__(self)
100 if process.id_ is None:
101 raise NotFoundException('Process of Todo without ID (not saved?)')
102 self.process = process
103 self._is_done = is_done
104 self.date = valid_date(date)
105 self.comment = comment
109 self.calendarize = calendarize
111 self.calendarize = self.process.calendarize
112 self.conditions = self.process.conditions[:]
113 self.blockers = self.process.blockers[:]
114 self.enables = self.process.enables[:]
115 self.disables = self.process.disables[:]
118 def by_date_range(cls, db_conn: DatabaseConnection,
119 date_range: tuple[str, str] = ('', '')) -> list[Todo]:
120 """Collect Todos of Days within date_range."""
121 todos, _, _ = cls.by_date_range_with_limits(db_conn, date_range)
125 def create_with_children(cls, db_conn: DatabaseConnection,
126 process_id: int, date: str) -> Todo:
127 """Create Todo of process for date, ensure children."""
129 def key_order_func(n: ProcessStepsNode) -> int:
130 assert isinstance(n.process.id_, int)
133 def walk_steps(parent: Todo, step_node: ProcessStepsNode) -> Todo:
134 adoptables = [t for t in cls.by_date(db_conn, date)
135 if (t not in parent.children)
137 and step_node.process == t.process]
139 for adoptable in adoptables:
140 satisfier = adoptable
143 satisfier = cls(None, step_node.process, False, date)
144 satisfier.save(db_conn)
145 sub_step_nodes = list(step_node.steps.values())
146 sub_step_nodes.sort(key=key_order_func)
147 for sub_node in sub_step_nodes:
148 if sub_node.is_suppressed:
150 n_slots = len([n for n in sub_step_nodes
151 if n.process == sub_node.process])
152 filled_slots = len([t for t in satisfier.children
153 if t.process == sub_node.process])
154 # if we did not newly create satisfier, it may already fill
155 # some step dependencies, so only fill what remains open
156 if n_slots - filled_slots > 0:
157 satisfier.add_child(walk_steps(satisfier, sub_node))
158 satisfier.save(db_conn)
161 process = Process.by_id(db_conn, process_id)
162 todo = cls(None, process, False, date)
164 steps_tree = process.get_steps(db_conn)
165 for step_node in steps_tree.values():
166 if step_node.is_suppressed:
168 todo.add_child(walk_steps(todo, step_node))
173 def from_table_row(cls, db_conn: DatabaseConnection,
174 row: Row | list[Any]) -> Todo:
175 """Make from DB row, with dependencies."""
177 raise NotFoundException('calling Todo of '
179 row_as_list = list(row)
180 row_as_list[1] = Process.by_id(db_conn, row[1])
181 todo = super().from_table_row(db_conn, row_as_list)
182 assert isinstance(todo.id_, int)
183 for t_id in db_conn.column_where('todo_children', 'child',
185 todo.children += [cls.by_id(db_conn, t_id)]
186 for t_id in db_conn.column_where('todo_children', 'parent',
188 todo.parents += [cls.by_id(db_conn, t_id)]
189 for name in ('conditions', 'blockers', 'enables', 'disables'):
190 table = f'todo_{name}'
191 assert isinstance(todo.id_, int)
192 for cond_id in db_conn.column_where(table, 'condition',
194 target = getattr(todo, name)
195 target += [Condition.by_id(db_conn, cond_id)]
199 def by_process_id(cls, db_conn: DatabaseConnection,
200 process_id: int | None) -> list[Todo]:
201 """Collect all Todos of Process of process_id."""
202 return [t for t in cls.all(db_conn) if t.process.id_ == process_id]
205 def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
206 """Collect all Todos for Day of date."""
207 return cls.by_date_range(db_conn, (date, date))
210 def is_doable(self) -> bool:
211 """Decide whether .is_done settable based on children, Conditions."""
212 for child in self.children:
213 if not child.is_done:
215 for condition in self.conditions:
216 if not condition.is_active:
218 for condition in self.blockers:
219 if condition.is_active:
224 def is_deletable(self) -> bool:
225 """Decide whether self be deletable (not if preserve-worthy values)."""
228 if self.effort and self.effort >= 0:
233 def performed_effort(self) -> float:
234 """Return performed effort, i.e. self.effort or default if done.."""
235 if self.effort is not None:
238 return self.effort_then
242 def process_id(self) -> int | str | None:
243 """Needed for super().save to save Processes as attributes."""
244 return self.process.id_
247 def is_done(self) -> bool:
248 """Wrapper around self._is_done so we can control its setter."""
252 def is_done(self, value: bool) -> None:
253 if value != self.is_done and not self.is_doable:
254 raise BadFormatException('cannot change doneness of undoable Todo')
255 if self._is_done != value:
256 self._is_done = value
258 for condition in self.enables:
259 condition.is_active = True
260 for condition in self.disables:
261 condition.is_active = False
264 def title(self) -> VersionedAttribute:
265 """Shortcut to .process.title."""
266 assert isinstance(self.process.title, VersionedAttribute)
267 return self.process.title
270 def title_then(self) -> str:
271 """Shortcut to .process.title.at(self.date)"""
272 title_then = self.process.title.at(self.date)
273 assert isinstance(title_then, str)
277 def effort_then(self) -> float:
278 """Shortcut to .process.effort.at(self.date)"""
279 effort_then = self.process.effort.at(self.date)
280 assert isinstance(effort_then, float)
284 def has_doneness_in_path(self) -> bool:
285 """Check whether self is done or has any children that are."""
288 for child in self.children:
291 if child.has_doneness_in_path:
295 def get_step_tree(self, seen_todos: set[int]) -> TodoNode:
296 """Return tree of depended-on Todos."""
298 def make_node(todo: Todo) -> TodoNode:
300 seen = todo.id_ in seen_todos
301 assert isinstance(todo.id_, int)
302 seen_todos.add(todo.id_)
303 for child in todo.children:
304 children += [make_node(child)]
305 return TodoNode(todo, seen, children)
307 return make_node(self)
310 def tree_effort(self) -> float:
311 """Return sum of performed efforts of self and all descendants."""
313 def walk_tree(node: Todo) -> float:
315 for child in node.children:
316 local_effort += walk_tree(child)
317 return node.performed_effort + local_effort
319 return walk_tree(self)
321 def add_child(self, child: Todo) -> None:
322 """Add child to self.children, avoid recursion, update parenthoods."""
324 def walk_steps(node: Todo) -> None:
325 if node.id_ == self.id_:
326 raise BadFormatException('bad child choice causes recursion')
327 for child in node.children:
331 raise HandledException('Can only add children to saved Todos.')
332 if child.id_ is None:
333 raise HandledException('Can only add saved children to Todos.')
334 if child in self.children:
335 raise BadFormatException('cannot adopt same child twice')
337 self.children += [child]
338 child.parents += [self]
340 def remove_child(self, child: Todo) -> None:
341 """Remove child from self.children, update counter relations."""
342 if child not in self.children:
343 raise HandledException('Cannot remove un-parented child.')
344 self.children.remove(child)
345 child.parents.remove(self)
347 def save(self, db_conn: DatabaseConnection) -> None:
348 """On save calls, also check if auto-deletion by effort < 0."""
349 if self.effort and self.effort < 0 and self.is_deletable:
353 self.__class__.days_to_update.add(self.date)
354 super().save(db_conn)
355 for condition in self.enables + self.disables + self.conditions:
356 condition.save(db_conn)
358 def remove(self, db_conn: DatabaseConnection) -> None:
359 """Remove from DB, including relations."""
360 if not self.is_deletable:
361 raise HandledException('Cannot remove non-deletable Todo.')
362 self.__class__.days_to_update.add(self.date)
363 children_to_remove = self.children[:]
364 parents_to_remove = self.parents[:]
365 for child in children_to_remove:
366 self.remove_child(child)
367 for parent in parents_to_remove:
368 parent.remove_child(self)
369 super().remove(db_conn)