home · contact · privacy
Re-organize testing.
[plomtask] / plomtask / todos.py
1 """Actionables."""
2 from __future__ import annotations
3 from typing import Any, Set
4 from sqlite3 import Row
5 from plomtask.db import DatabaseConnection, BaseModel
6 from plomtask.processes import Process, ProcessStepsNode
7 from plomtask.versioned_attributes import VersionedAttribute
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import (NotFoundException, BadFormatException,
10                                  HandledException)
11 from plomtask.dating import valid_date
12
13
14 class DictableNode:
15     """Template for TodoNode, TodoOrStepsNode providing .as_dict_and_refs."""
16     # pylint: disable=too-few-public-methods
17     _to_dict: list[str] = []
18
19     def __init__(self, *args: Any) -> None:
20         for i, arg in enumerate(args):
21             setattr(self, self._to_dict[i], arg)
22
23     @property
24     def as_dict_and_refs(self) -> tuple[dict[str, object], list[Any]]:
25         """Return self as json.dumps-ready dict, list of referenced objects."""
26         d = {}
27         refs = []
28         for name in self._to_dict:
29             attr = getattr(self, name)
30             if hasattr(attr, 'id_'):
31                 d[name] = attr.id_
32                 continue
33             if isinstance(attr, list):
34                 d[name] = []
35                 for item in attr:
36                     item_d, item_refs = item.as_dict_and_refs
37                     d[name] += [item_d]
38                     for item_ref in [r for r in item_refs if r not in refs]:
39                         refs += [item_ref]
40                 continue
41             d[name] = attr
42         return d, refs
43
44
45 class TodoNode(DictableNode):
46     """Collects what's useful to know for Todo/Condition tree display."""
47     # pylint: disable=too-few-public-methods
48     todo: Todo
49     seen: bool
50     children: list[TodoNode]
51     _to_dict = ['todo', 'seen', 'children']
52
53
54 class TodoOrProcStepNode(DictableNode):
55     """Collect what's useful for Todo-or-ProcessStep tree display."""
56     # pylint: disable=too-few-public-methods
57     node_id: int
58     todo: Todo | None
59     process: Process | None
60     children: list[TodoOrProcStepNode]  # pylint: disable=undefined-variable
61     fillable: bool = False
62     _to_dict = ['node_id', 'todo', 'process', 'children', 'fillable']
63
64
65 class Todo(BaseModel[int], ConditionsRelations):
66     """Individual actionable."""
67     # pylint: disable=too-many-instance-attributes
68     # pylint: disable=too-many-public-methods
69     table_name = 'todos'
70     to_save_simples = ['process_id', 'is_done', 'date', 'comment', 'effort',
71                        'calendarize']
72     to_save_relations = [('todo_conditions', 'todo', 'conditions', 0),
73                          ('todo_blockers', 'todo', 'blockers', 0),
74                          ('todo_enables', 'todo', 'enables', 0),
75                          ('todo_disables', 'todo', 'disables', 0),
76                          ('todo_children', 'parent', 'children', 0),
77                          ('todo_children', 'child', 'parents', 1)]
78     to_search = ['comment']
79     days_to_update: Set[str] = set()
80     children: list[Todo]
81     parents: list[Todo]
82     sorters = {'doneness': lambda t: t.is_done,
83                'title': lambda t: t.title_then,
84                'comment': lambda t: t.comment,
85                'date': lambda t: t.date}
86
87     # pylint: disable=too-many-arguments
88     def __init__(self, id_: int | None,
89                  process: Process,
90                  is_done: bool,
91                  date: str, comment: str = '',
92                  effort: None | float = None,
93                  calendarize: bool = False) -> None:
94         BaseModel.__init__(self, id_)
95         ConditionsRelations.__init__(self)
96         if process.id_ is None:
97             raise NotFoundException('Process of Todo without ID (not saved?)')
98         self.process = process
99         self._is_done = is_done
100         self.date = valid_date(date)
101         self.comment = comment
102         self.effort = effort
103         self.children = []
104         self.parents = []
105         self.calendarize = calendarize
106         if not self.id_:
107             self.calendarize = self.process.calendarize
108             self.conditions = self.process.conditions[:]
109             self.blockers = self.process.blockers[:]
110             self.enables = self.process.enables[:]
111             self.disables = self.process.disables[:]
112
113     @classmethod
114     def by_date_range(cls, db_conn: DatabaseConnection,
115                       date_range: tuple[str, str] = ('', '')) -> list[Todo]:
116         """Collect Todos of Days within date_range."""
117         todos, _, _ = cls.by_date_range_with_limits(db_conn, date_range)
118         return todos
119
120     def ensure_children(self, db_conn: DatabaseConnection) -> None:
121         """Ensure Todo children (create or adopt) demanded by Process chain."""
122
123         def key_order_func(n: ProcessStepsNode) -> int:
124             assert isinstance(n.process.id_, int)
125             return n.process.id_
126
127         def walk_steps(parent: Todo, step_node: ProcessStepsNode) -> Todo:
128             adoptables = [t for t in Todo.by_date(db_conn, parent.date)
129                           if (t not in parent.children)
130                           and (t != parent)
131                           and step_node.process == t.process]
132             satisfier = None
133             for adoptable in adoptables:
134                 satisfier = adoptable
135                 break
136             if not satisfier:
137                 satisfier = Todo(None, step_node.process, False, parent.date)
138                 satisfier.save(db_conn)
139             sub_step_nodes = list(step_node.steps.values())
140             sub_step_nodes.sort(key=key_order_func)
141             for sub_node in sub_step_nodes:
142                 if sub_node.is_suppressed:
143                     continue
144                 n_slots = len([n for n in sub_step_nodes
145                                if n.process == sub_node.process])
146                 filled_slots = len([t for t in satisfier.children
147                                     if t.process == sub_node.process])
148                 # if we did not newly create satisfier, it may already fill
149                 # some step dependencies, so only fill what remains open
150                 if n_slots - filled_slots > 0:
151                     satisfier.add_child(walk_steps(satisfier, sub_node))
152             satisfier.save(db_conn)
153             return satisfier
154
155         process = Process.by_id(db_conn, self.process_id)
156         steps_tree = process.get_steps(db_conn)
157         for step_node in steps_tree.values():
158             if step_node.is_suppressed:
159                 continue
160             self.add_child(walk_steps(self, step_node))
161         self.save(db_conn)
162
163     @classmethod
164     def from_table_row(cls, db_conn: DatabaseConnection,
165                        row: Row | list[Any]) -> Todo:
166         """Make from DB row, with dependencies."""
167         if row[1] == 0:
168             raise NotFoundException('calling Todo of '
169                                     'unsaved Process')
170         row_as_list = list(row)
171         row_as_list[1] = Process.by_id(db_conn, row[1])
172         todo = super().from_table_row(db_conn, row_as_list)
173         assert isinstance(todo.id_, int)
174         for t_id in db_conn.column_where('todo_children', 'child',
175                                          'parent', todo.id_):
176             todo.children += [cls.by_id(db_conn, t_id)]
177         for t_id in db_conn.column_where('todo_children', 'parent',
178                                          'child', todo.id_):
179             todo.parents += [cls.by_id(db_conn, t_id)]
180         for name in ('conditions', 'blockers', 'enables', 'disables'):
181             table = f'todo_{name}'
182             assert isinstance(todo.id_, int)
183             for cond_id in db_conn.column_where(table, 'condition',
184                                                 'todo', todo.id_):
185                 target = getattr(todo, name)
186                 target += [Condition.by_id(db_conn, cond_id)]
187         return todo
188
189     @classmethod
190     def by_process_id(cls, db_conn: DatabaseConnection,
191                       process_id: int | None) -> list[Todo]:
192         """Collect all Todos of Process of process_id."""
193         return [t for t in cls.all(db_conn) if t.process.id_ == process_id]
194
195     @classmethod
196     def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
197         """Collect all Todos for Day of date."""
198         return cls.by_date_range(db_conn, (date, date))
199
200     @property
201     def is_doable(self) -> bool:
202         """Decide whether .is_done settable based on children, Conditions."""
203         for child in self.children:
204             if not child.is_done:
205                 return False
206         for condition in self.conditions:
207             if not condition.is_active:
208                 return False
209         for condition in self.blockers:
210             if condition.is_active:
211                 return False
212         return True
213
214     @property
215     def is_deletable(self) -> bool:
216         """Decide whether self be deletable (not if preserve-worthy values)."""
217         if self.comment:
218             return False
219         if self.effort and self.effort >= 0:
220             return False
221         return True
222
223     @property
224     def performed_effort(self) -> float:
225         """Return performed effort, i.e. self.effort or default if done.."""
226         if self.effort is not None:
227             return self.effort
228         if self.is_done:
229             return self.effort_then
230         return 0
231
232     @property
233     def process_id(self) -> int:
234         """Needed for super().save to save Processes as attributes."""
235         assert isinstance(self.process.id_, int)
236         return self.process.id_
237
238     @property
239     def is_done(self) -> bool:
240         """Wrapper around self._is_done so we can control its setter."""
241         return self._is_done
242
243     @is_done.setter
244     def is_done(self, value: bool) -> None:
245         if value != self.is_done and not self.is_doable:
246             raise BadFormatException('cannot change doneness of undoable Todo')
247         if self._is_done != value:
248             self._is_done = value
249             if value is True:
250                 for condition in self.enables:
251                     condition.is_active = True
252                 for condition in self.disables:
253                     condition.is_active = False
254
255     @property
256     def title(self) -> VersionedAttribute:
257         """Shortcut to .process.title."""
258         assert isinstance(self.process.title, VersionedAttribute)
259         return self.process.title
260
261     @property
262     def title_then(self) -> str:
263         """Shortcut to .process.title.at(self.date)"""
264         title_then = self.process.title.at(self.date)
265         assert isinstance(title_then, str)
266         return title_then
267
268     @property
269     def effort_then(self) -> float:
270         """Shortcut to .process.effort.at(self.date)"""
271         effort_then = self.process.effort.at(self.date)
272         assert isinstance(effort_then, float)
273         return effort_then
274
275     @property
276     def has_doneness_in_path(self) -> bool:
277         """Check whether self is done or has any children that are."""
278         if self.is_done:
279             return True
280         for child in self.children:
281             if child.is_done:
282                 return True
283             if child.has_doneness_in_path:
284                 return True
285         return False
286
287     def get_step_tree(self, seen_todos: set[int]) -> TodoNode:
288         """Return tree of depended-on Todos."""
289
290         def make_node(todo: Todo) -> TodoNode:
291             children = []
292             seen = todo.id_ in seen_todos
293             assert isinstance(todo.id_, int)
294             seen_todos.add(todo.id_)
295             for child in todo.children:
296                 children += [make_node(child)]
297             return TodoNode(todo, seen, children)
298
299         return make_node(self)
300
301     @property
302     def tree_effort(self) -> float:
303         """Return sum of performed efforts of self and all descendants."""
304
305         def walk_tree(node: Todo) -> float:
306             local_effort = 0.0
307             for child in node.children:
308                 local_effort += walk_tree(child)
309             return node.performed_effort + local_effort
310
311         return walk_tree(self)
312
313     def add_child(self, child: Todo) -> None:
314         """Add child to self.children, avoid recursion, update parenthoods."""
315
316         def walk_steps(node: Todo) -> None:
317             if node.id_ == self.id_:
318                 raise BadFormatException('bad child choice causes recursion')
319             for child in node.children:
320                 walk_steps(child)
321
322         if self.id_ is None:
323             raise HandledException('Can only add children to saved Todos.')
324         if child.id_ is None:
325             raise HandledException('Can only add saved children to Todos.')
326         if child in self.children:
327             raise BadFormatException('cannot adopt same child twice')
328         walk_steps(child)
329         self.children += [child]
330         child.parents += [self]
331
332     def remove_child(self, child: Todo) -> None:
333         """Remove child from self.children, update counter relations."""
334         if child not in self.children:
335             raise HandledException('Cannot remove un-parented child.')
336         self.children.remove(child)
337         child.parents.remove(self)
338
339     def update_attrs(self, **kwargs: Any) -> None:
340         """Update self's attributes listed in kwargs."""
341         for k, v in kwargs.items():
342             setattr(self, k, v)
343
344     def save(self, db_conn: DatabaseConnection) -> None:
345         """On save calls, also check if auto-deletion by effort < 0."""
346         if self.effort and self.effort < 0 and self.is_deletable:
347             self.remove(db_conn)
348             return
349         if self.id_ is None:
350             self.__class__.days_to_update.add(self.date)
351         super().save(db_conn)
352         for condition in self.enables + self.disables + self.conditions:
353             condition.save(db_conn)
354
355     def remove(self, db_conn: DatabaseConnection) -> None:
356         """Remove from DB, including relations."""
357         if not self.is_deletable:
358             raise HandledException('Cannot remove non-deletable Todo.')
359         self.__class__.days_to_update.add(self.date)
360         children_to_remove = self.children[:]
361         parents_to_remove = self.parents[:]
362         for child in children_to_remove:
363             self.remove_child(child)
364         for parent in parents_to_remove:
365             parent.remove_child(self)
366         super().remove(db_conn)