home · contact · privacy
008f7a275ab42aa9bbbcfed1933c81f491854211
[plomtask] / plomtask / todos.py
1 """Actionables."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any
5 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.processes import Process, ProcessStepsNode
8 from plomtask.versioned_attributes import VersionedAttribute
9 from plomtask.conditions import Condition, ConditionsRelations
10 from plomtask.exceptions import (NotFoundException, BadFormatException,
11                                  HandledException)
12 from plomtask.dating import valid_date
13
14
15 @dataclass
16 class TodoNode:
17     """Collects what's useful to know for Todo/Condition tree display."""
18     todo: Todo
19     seen: bool
20     children: list[TodoNode]
21
22
23 class Todo(BaseModel[int], ConditionsRelations):
24     """Individual actionable."""
25     # pylint: disable=too-many-instance-attributes
26     # pylint: disable=too-many-public-methods
27     table_name = 'todos'
28     to_save = ['process_id', 'is_done', 'date', 'comment', 'effort',
29                'calendarize']
30     to_save_relations = [('todo_conditions', 'todo', 'conditions', 0),
31                          ('todo_blockers', 'todo', 'blockers', 0),
32                          ('todo_enables', 'todo', 'enables', 0),
33                          ('todo_disables', 'todo', 'disables', 0),
34                          ('todo_children', 'parent', 'children', 0),
35                          ('todo_children', 'child', 'parents', 1)]
36     to_search = ['comment']
37
38     # pylint: disable=too-many-arguments
39     def __init__(self, id_: int | None,
40                  process: Process,
41                  is_done: bool,
42                  date: str, comment: str = '',
43                  effort: None | float = None,
44                  calendarize: bool = False) -> None:
45         BaseModel.__init__(self, id_)
46         ConditionsRelations.__init__(self)
47         if process.id_ is None:
48             raise NotFoundException('Process of Todo without ID (not saved?)')
49         self.process = process
50         self._is_done = is_done
51         self.date = valid_date(date)
52         self.comment = comment
53         self.effort = effort
54         self.children: list[Todo] = []
55         self.parents: list[Todo] = []
56         self.calendarize = calendarize
57         if not self.id_:
58             self.calendarize = self.process.calendarize
59             self.conditions = self.process.conditions[:]
60             self.blockers = self.process.blockers[:]
61             self.enables = self.process.enables[:]
62             self.disables = self.process.disables[:]
63
64     @classmethod
65     def by_date_range(cls, db_conn: DatabaseConnection,
66                       date_range: tuple[str, str] = ('', '')) -> list[Todo]:
67         """Collect Todos of Days within date_range."""
68         todos, _, _ = cls.by_date_range_with_limits(db_conn, date_range)
69         return todos
70
71     @classmethod
72     def create_with_children(cls, db_conn: DatabaseConnection,
73                              process_id: int, date: str) -> Todo:
74         """Create Todo of process for date, ensure children."""
75
76         def key_order_func(n: ProcessStepsNode) -> int:
77             assert isinstance(n.process.id_, int)
78             return n.process.id_
79
80         def walk_steps(parent: Todo, step_node: ProcessStepsNode) -> Todo:
81             adoptables = [t for t in cls.by_date(db_conn, date)
82                           if (t not in parent.children)
83                           and (t != parent)
84                           and step_node.process == t.process]
85             satisfier = None
86             for adoptable in adoptables:
87                 satisfier = adoptable
88                 break
89             if not satisfier:
90                 satisfier = cls(None, step_node.process, False, date)
91                 satisfier.save(db_conn)
92             sub_step_nodes = list(step_node.steps.values())
93             sub_step_nodes.sort(key=key_order_func)
94             for sub_node in sub_step_nodes:
95                 if sub_node.is_suppressed:
96                     continue
97                 n_slots = len([n for n in sub_step_nodes
98                                if n.process == sub_node.process])
99                 filled_slots = len([t for t in satisfier.children
100                                     if t.process == sub_node.process])
101                 # if we did not newly create satisfier, it may already fill
102                 # some step dependencies, so only fill what remains open
103                 if n_slots - filled_slots > 0:
104                     satisfier.add_child(walk_steps(satisfier, sub_node))
105             satisfier.save(db_conn)
106             return satisfier
107
108         process = Process.by_id(db_conn, process_id)
109         todo = cls(None, process, False, date)
110         todo.save(db_conn)
111         steps_tree = process.get_steps(db_conn)
112         for step_node in steps_tree.values():
113             if step_node.is_suppressed:
114                 continue
115             todo.add_child(walk_steps(todo, step_node))
116         todo.save(db_conn)
117         return todo
118
119     @classmethod
120     def from_table_row(cls, db_conn: DatabaseConnection,
121                        row: Row | list[Any]) -> Todo:
122         """Make from DB row, with dependencies."""
123         if row[1] == 0:
124             raise NotFoundException('calling Todo of '
125                                     'unsaved Process')
126         row_as_list = list(row)
127         row_as_list[1] = Process.by_id(db_conn, row[1])
128         todo = super().from_table_row(db_conn, row_as_list)
129         assert isinstance(todo.id_, int)
130         for t_id in db_conn.column_where('todo_children', 'child',
131                                          'parent', todo.id_):
132             # pylint: disable=no-member
133             todo.children += [cls.by_id(db_conn, t_id)]
134         for t_id in db_conn.column_where('todo_children', 'parent',
135                                          'child', todo.id_):
136             # pylint: disable=no-member
137             todo.parents += [cls.by_id(db_conn, t_id)]
138         for name in ('conditions', 'blockers', 'enables', 'disables'):
139             table = f'todo_{name}'
140             assert isinstance(todo.id_, int)
141             for cond_id in db_conn.column_where(table, 'condition',
142                                                 'todo', todo.id_):
143                 target = getattr(todo, name)
144                 target += [Condition.by_id(db_conn, cond_id)]
145         return todo
146
147     @classmethod
148     def by_process_id(cls, db_conn: DatabaseConnection,
149                       process_id: int | None) -> list[Todo]:
150         """Collect all Todos of Process of process_id."""
151         return [t for t in cls.all(db_conn) if t.process.id_ == process_id]
152
153     @classmethod
154     def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
155         """Collect all Todos for Day of date."""
156         return cls.by_date_range(db_conn, (date, date))
157
158     @classmethod
159     def total_effort_at_date(cls, db_conn: DatabaseConnection, date: str
160                              ) -> float:
161         """Sum all .performed_effort of Todos at Day of date."""
162         total_effort = 0.0
163         days_todos = cls.by_date(db_conn, date)
164         for todo in days_todos:
165             total_effort += todo.performed_effort
166         return total_effort
167
168     @property
169     def is_doable(self) -> bool:
170         """Decide whether .is_done settable based on children, Conditions."""
171         for child in self.children:
172             if not child.is_done:
173                 return False
174         for condition in self.conditions:
175             if not condition.is_active:
176                 return False
177         for condition in self.blockers:
178             if condition.is_active:
179                 return False
180         return True
181
182     @property
183     def is_deletable(self) -> bool:
184         """Decide whether self be deletable (not if preserve-worthy values)."""
185         if self.comment:
186             return False
187         if self.effort and self.effort >= 0:
188             return False
189         return True
190
191     @property
192     def performed_effort(self) -> float:
193         """Return performed effort, i.e. self.effort or default if done.."""
194         if self.effort is not None:
195             return self.effort
196         if self.is_done:
197             return self.effort_then
198         return 0
199
200     @property
201     def process_id(self) -> int | str | None:
202         """Needed for super().save to save Processes as attributes."""
203         return self.process.id_
204
205     @property
206     def is_done(self) -> bool:
207         """Wrapper around self._is_done so we can control its setter."""
208         return self._is_done
209
210     @is_done.setter
211     def is_done(self, value: bool) -> None:
212         if value != self.is_done and not self.is_doable:
213             raise BadFormatException('cannot change doneness of undoable Todo')
214         if self._is_done != value:
215             self._is_done = value
216             if value is True:
217                 for condition in self.enables:
218                     condition.is_active = True
219                 for condition in self.disables:
220                     condition.is_active = False
221
222     @property
223     def title(self) -> VersionedAttribute:
224         """Shortcut to .process.title."""
225         return self.process.title
226
227     @property
228     def title_then(self) -> str:
229         """Shortcut to .process.title.at(self.date)"""
230         title_then = self.process.title.at(self.date)
231         assert isinstance(title_then, str)
232         return title_then
233
234     @property
235     def effort_then(self) -> float:
236         """Shortcut to .process.effort.at(self.date)"""
237         effort_then = self.process.effort.at(self.date)
238         assert isinstance(effort_then, float)
239         return effort_then
240
241     @property
242     def has_doneness_in_path(self) -> bool:
243         """Check whether self is done or has any children that are."""
244         if self.is_done:
245             return True
246         for child in self.children:
247             if child.is_done:
248                 return True
249             if child.has_doneness_in_path:
250                 return True
251         return False
252
253     def get_step_tree(self, seen_todos: set[int]) -> TodoNode:
254         """Return tree of depended-on Todos."""
255
256         def make_node(todo: Todo) -> TodoNode:
257             children = []
258             seen = todo.id_ in seen_todos
259             assert isinstance(todo.id_, int)
260             seen_todos.add(todo.id_)
261             for child in todo.children:
262                 children += [make_node(child)]
263             return TodoNode(todo, seen, children)
264
265         return make_node(self)
266
267     @property
268     def tree_effort(self) -> float:
269         """Return sum of performed efforts of self and all descendants."""
270
271         def walk_tree(node: Todo) -> float:
272             local_effort = 0.0
273             for child in node.children:
274                 local_effort += walk_tree(child)
275             return node.performed_effort + local_effort
276
277         return walk_tree(self)
278
279     def add_child(self, child: Todo) -> None:
280         """Add child to self.children, avoid recursion, update parenthoods."""
281
282         def walk_steps(node: Todo) -> None:
283             if node.id_ == self.id_:
284                 raise BadFormatException('bad child choice causes recursion')
285             for child in node.children:
286                 walk_steps(child)
287
288         if self.id_ is None:
289             raise HandledException('Can only add children to saved Todos.')
290         if child.id_ is None:
291             raise HandledException('Can only add saved children to Todos.')
292         if child in self.children:
293             raise BadFormatException('cannot adopt same child twice')
294         walk_steps(child)
295         self.children += [child]
296         child.parents += [self]
297
298     def remove_child(self, child: Todo) -> None:
299         """Remove child from self.children, update counter relations."""
300         if child not in self.children:
301             raise HandledException('Cannot remove un-parented child.')
302         self.children.remove(child)
303         child.parents.remove(self)
304
305     def save(self, db_conn: DatabaseConnection) -> None:
306         """On save calls, also check if auto-deletion by effort < 0."""
307         if self.effort and self.effort < 0 and self.is_deletable:
308             self.remove(db_conn)
309             return
310         super().save(db_conn)
311
312     def remove(self, db_conn: DatabaseConnection) -> None:
313         """Remove from DB, including relations."""
314         if not self.is_deletable:
315             raise HandledException('Cannot remove non-deletable Todo.')
316         children_to_remove = self.children[:]
317         parents_to_remove = self.parents[:]
318         for child in children_to_remove:
319             self.remove_child(child)
320         for parent in parents_to_remove:
321             parent.remove_child(self)
322         super().remove(db_conn)