home · contact · privacy
Minor tests refactoring.
[plomtask] / plomtask / todos.py
1 """Actionables."""
2 from __future__ import annotations
3 from typing import Any, Set
4 from sqlite3 import Row
5 from plomtask.misc import DictableNode
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.processes import Process, ProcessStepsNode
8 from plomtask.versioned_attributes import VersionedAttribute
9 from plomtask.conditions import Condition, ConditionsRelations
10 from plomtask.exceptions import (NotFoundException, BadFormatException,
11                                  HandledException)
12 from plomtask.dating import valid_date
13
14
15 class TodoNode(DictableNode):
16     """Collects what's useful to know for Todo/Condition tree display."""
17     # pylint: disable=too-few-public-methods
18     todo: Todo
19     seen: bool
20     children: list[TodoNode]
21     _to_dict = ['todo', 'seen', 'children']
22
23
24 class TodoOrProcStepNode(DictableNode):
25     """Collect what's useful for Todo-or-ProcessStep tree display."""
26     # pylint: disable=too-few-public-methods
27     node_id: int
28     todo: Todo | None
29     process: Process | None
30     children: list[TodoOrProcStepNode]  # pylint: disable=undefined-variable
31     fillable: bool = False
32     _to_dict = ['node_id', 'todo', 'process', 'children', 'fillable']
33
34
35 class Todo(BaseModel[int], ConditionsRelations):
36     """Individual actionable."""
37     # pylint: disable=too-many-instance-attributes
38     # pylint: disable=too-many-public-methods
39     table_name = 'todos'
40     to_save_simples = ['process_id', 'is_done', 'date', 'comment', 'effort',
41                        'calendarize']
42     to_save_relations = [('todo_conditions', 'todo', 'conditions', 0),
43                          ('todo_blockers', 'todo', 'blockers', 0),
44                          ('todo_enables', 'todo', 'enables', 0),
45                          ('todo_disables', 'todo', 'disables', 0),
46                          ('todo_children', 'parent', 'children', 0),
47                          ('todo_children', 'child', 'parents', 1)]
48     to_search = ['comment']
49     days_to_update: Set[str] = set()
50     children: list[Todo]
51     parents: list[Todo]
52     sorters = {'doneness': lambda t: t.is_done,
53                'title': lambda t: t.title_then,
54                'comment': lambda t: t.comment,
55                'date': lambda t: t.date}
56
57     # pylint: disable=too-many-arguments
58     def __init__(self, id_: int | None,
59                  process: Process,
60                  is_done: bool,
61                  date: str, comment: str = '',
62                  effort: None | float = None,
63                  calendarize: bool = False) -> None:
64         BaseModel.__init__(self, id_)
65         ConditionsRelations.__init__(self)
66         if process.id_ is None:
67             raise NotFoundException('Process of Todo without ID (not saved?)')
68         self.process = process
69         self._is_done = is_done
70         self.date = valid_date(date)
71         self.comment = comment
72         self.effort = effort
73         self.children = []
74         self.parents = []
75         self.calendarize = calendarize
76         if not self.id_:
77             self.calendarize = self.process.calendarize
78             self.conditions = self.process.conditions[:]
79             self.blockers = self.process.blockers[:]
80             self.enables = self.process.enables[:]
81             self.disables = self.process.disables[:]
82
83     @classmethod
84     def by_date_range(cls, db_conn: DatabaseConnection,
85                       date_range: tuple[str, str] = ('', '')) -> list[Todo]:
86         """Collect Todos of Days within date_range."""
87         todos, _, _ = cls.by_date_range_with_limits(db_conn, date_range)
88         return todos
89
90     def ensure_children(self, db_conn: DatabaseConnection) -> None:
91         """Ensure Todo children (create or adopt) demanded by Process chain."""
92
93         def walk_steps(parent: Todo, step_node: ProcessStepsNode) -> Todo:
94             adoptables = [t for t in Todo.by_date(db_conn, parent.date)
95                           if (t not in parent.children)
96                           and (t != parent)
97                           and step_node.process.id_ == t.process_id]
98             satisfier = None
99             for adoptable in adoptables:
100                 satisfier = adoptable
101                 break
102             if not satisfier:
103                 satisfier = Todo(None, step_node.process, False, parent.date)
104                 satisfier.save(db_conn)
105             sub_step_nodes = sorted(
106                     step_node.steps,
107                     key=lambda s: s.process.id_ if s.process.id_ else 0)
108             for sub_node in sub_step_nodes:
109                 if sub_node.is_suppressed:
110                     continue
111                 n_slots = len([n for n in sub_step_nodes
112                                if n.process == sub_node.process])
113                 filled_slots = len([t for t in satisfier.children
114                                     if t.process.id_ == sub_node.process.id_])
115                 # if we did not newly create satisfier, it may already fill
116                 # some step dependencies, so only fill what remains open
117                 if n_slots - filled_slots > 0:
118                     satisfier.add_child(walk_steps(satisfier, sub_node))
119             satisfier.save(db_conn)
120             return satisfier
121
122         process = Process.by_id(db_conn, self.process_id)
123         steps_tree = process.get_steps(db_conn)
124         for step_node in steps_tree:
125             if step_node.is_suppressed:
126                 continue
127             self.add_child(walk_steps(self, step_node))
128         self.save(db_conn)
129
130     @classmethod
131     def from_table_row(cls, db_conn: DatabaseConnection,
132                        row: Row | list[Any]) -> Todo:
133         """Make from DB row, with dependencies."""
134         if row[1] == 0:
135             raise NotFoundException('calling Todo of '
136                                     'unsaved Process')
137         row_as_list = list(row)
138         row_as_list[1] = Process.by_id(db_conn, row[1])
139         todo = super().from_table_row(db_conn, row_as_list)
140         assert isinstance(todo.id_, int)
141         for t_id in db_conn.column_where('todo_children', 'child',
142                                          'parent', todo.id_):
143             todo.children += [cls.by_id(db_conn, t_id)]
144         for t_id in db_conn.column_where('todo_children', 'parent',
145                                          'child', todo.id_):
146             todo.parents += [cls.by_id(db_conn, t_id)]
147         for name in ('conditions', 'blockers', 'enables', 'disables'):
148             table = f'todo_{name}'
149             assert isinstance(todo.id_, int)
150             for cond_id in db_conn.column_where(table, 'condition',
151                                                 'todo', todo.id_):
152                 target = getattr(todo, name)
153                 target += [Condition.by_id(db_conn, cond_id)]
154         return todo
155
156     @classmethod
157     def by_process_id(cls, db_conn: DatabaseConnection,
158                       process_id: int | None) -> list[Todo]:
159         """Collect all Todos of Process of process_id."""
160         return [t for t in cls.all(db_conn) if t.process.id_ == process_id]
161
162     @classmethod
163     def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
164         """Collect all Todos for Day of date."""
165         return cls.by_date_range(db_conn, (date, date))
166
167     @property
168     def is_doable(self) -> bool:
169         """Decide whether .is_done settable based on children, Conditions."""
170         for child in self.children:
171             if not child.is_done:
172                 return False
173         for condition in self.conditions:
174             if not condition.is_active:
175                 return False
176         for condition in self.blockers:
177             if condition.is_active:
178                 return False
179         return True
180
181     @property
182     def is_deletable(self) -> bool:
183         """Decide whether self be deletable (not if preserve-worthy values)."""
184         if self.comment:
185             return False
186         if self.effort and self.effort >= 0:
187             return False
188         return True
189
190     @property
191     def performed_effort(self) -> float:
192         """Return performed effort, i.e. self.effort or default if done.."""
193         if self.effort is not None:
194             return self.effort
195         if self.is_done:
196             return self.effort_then
197         return 0
198
199     @property
200     def process_id(self) -> int:
201         """Needed for super().save to save Processes as attributes."""
202         assert isinstance(self.process.id_, int)
203         return self.process.id_
204
205     @property
206     def is_done(self) -> bool:
207         """Wrapper around self._is_done so we can control its setter."""
208         return self._is_done
209
210     @is_done.setter
211     def is_done(self, value: bool) -> None:
212         if value != self.is_done and not self.is_doable:
213             raise BadFormatException('cannot change doneness of undoable Todo')
214         if self._is_done != value:
215             self._is_done = value
216             if value is True:
217                 for condition in self.enables:
218                     condition.is_active = True
219                 for condition in self.disables:
220                     condition.is_active = False
221
222     @property
223     def title(self) -> VersionedAttribute:
224         """Shortcut to .process.title."""
225         assert isinstance(self.process.title, VersionedAttribute)
226         return self.process.title
227
228     @property
229     def title_then(self) -> str:
230         """Shortcut to .process.title.at(self.date)"""
231         title_then = self.process.title.at(self.date)
232         assert isinstance(title_then, str)
233         return title_then
234
235     @property
236     def effort_then(self) -> float:
237         """Shortcut to .process.effort.at(self.date)"""
238         effort_then = self.process.effort.at(self.date)
239         assert isinstance(effort_then, float)
240         return effort_then
241
242     @property
243     def has_doneness_in_path(self) -> bool:
244         """Check whether self is done or has any children that are."""
245         if self.is_done:
246             return True
247         for child in self.children:
248             if child.is_done:
249                 return True
250             if child.has_doneness_in_path:
251                 return True
252         return False
253
254     def get_step_tree(self, seen_todos: set[int]) -> TodoNode:
255         """Return tree of depended-on Todos."""
256
257         def make_node(todo: Todo) -> TodoNode:
258             children = []
259             seen = todo.id_ in seen_todos
260             assert isinstance(todo.id_, int)
261             seen_todos.add(todo.id_)
262             for child in todo.children:
263                 children += [make_node(child)]
264             return TodoNode(todo, seen, children)
265
266         return make_node(self)
267
268     @property
269     def tree_effort(self) -> float:
270         """Return sum of performed efforts of self and all descendants."""
271
272         def walk_tree(node: Todo) -> float:
273             local_effort = 0.0
274             for child in node.children:
275                 local_effort += walk_tree(child)
276             return node.performed_effort + local_effort
277
278         return walk_tree(self)
279
280     def add_child(self, child: Todo) -> None:
281         """Add child to self.children, avoid recursion, update parenthoods."""
282
283         def walk_steps(node: Todo) -> None:
284             if node.id_ == self.id_:
285                 raise BadFormatException('bad child choice causes recursion')
286             for child in node.children:
287                 walk_steps(child)
288
289         if self.id_ is None:
290             raise HandledException('Can only add children to saved Todos.')
291         if child.id_ is None:
292             raise HandledException('Can only add saved children to Todos.')
293         if child in self.children:
294             raise BadFormatException('cannot adopt same child twice')
295         walk_steps(child)
296         self.children += [child]
297         child.parents += [self]
298
299     def remove_child(self, child: Todo) -> None:
300         """Remove child from self.children, update counter relations."""
301         if child not in self.children:
302             raise HandledException('Cannot remove un-parented child.')
303         self.children.remove(child)
304         child.parents.remove(self)
305
306     def update_attrs(self, **kwargs: Any) -> None:
307         """Update self's attributes listed in kwargs."""
308         for k, v in kwargs.items():
309             setattr(self, k, v)
310
311     def save(self, db_conn: DatabaseConnection) -> None:
312         """On save calls, also check if auto-deletion by effort < 0."""
313         if self.effort and self.effort < 0 and self.is_deletable:
314             self.remove(db_conn)
315             return
316         if self.id_ is None:
317             self.__class__.days_to_update.add(self.date)
318         super().save(db_conn)
319         for condition in self.enables + self.disables + self.conditions:
320             condition.save(db_conn)
321
322     def remove(self, db_conn: DatabaseConnection) -> None:
323         """Remove from DB, including relations."""
324         if not self.is_deletable:
325             raise HandledException('Cannot remove non-deletable Todo.')
326         self.__class__.days_to_update.add(self.date)
327         children_to_remove = self.children[:]
328         parents_to_remove = self.parents[:]
329         for child in children_to_remove:
330             self.remove_child(child)
331         for parent in parents_to_remove:
332             parent.remove_child(self)
333         super().remove(db_conn)