home · contact · privacy
Expand POST /todo adoption tests.
[plomtask] / plomtask / todos.py
1 """Actionables."""
2 from __future__ import annotations
3 from typing import Any, Set
4 from sqlite3 import Row
5 from plomtask.db import DatabaseConnection, BaseModel
6 from plomtask.processes import Process, ProcessStepsNode
7 from plomtask.versioned_attributes import VersionedAttribute
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import (NotFoundException, BadFormatException,
10                                  HandledException)
11 from plomtask.dating import valid_date
12
13
14 class DictableNode:
15     """Template for TodoNode, TodoOrStepsNode providing .as_dict_and_refs."""
16     # pylint: disable=too-few-public-methods
17     _to_dict: list[str] = []
18
19     def __init__(self, *args: Any) -> None:
20         for i, arg in enumerate(args):
21             setattr(self, self._to_dict[i], arg)
22
23     @property
24     def as_dict_and_refs(self) -> tuple[dict[str, object], list[Any]]:
25         """Return self as json.dumps-ready dict, list of referenced objects."""
26         d = {}
27         refs = []
28         for name in self._to_dict:
29             attr = getattr(self, name)
30             if hasattr(attr, 'id_'):
31                 d[name] = attr.id_
32                 continue
33             if isinstance(attr, list):
34                 d[name] = []
35                 for item in attr:
36                     item_d, item_refs = item.as_dict_and_refs
37                     d[name] += [item_d]
38                     for item_ref in [r for r in item_refs if r not in refs]:
39                         refs += [item_ref]
40                 continue
41             d[name] = attr
42         return d, refs
43
44
45 class TodoNode(DictableNode):
46     """Collects what's useful to know for Todo/Condition tree display."""
47     # pylint: disable=too-few-public-methods
48     todo: Todo
49     seen: bool
50     children: list[TodoNode]
51     _to_dict = ['todo', 'seen', 'children']
52
53
54 class TodoOrProcStepNode(DictableNode):
55     """Collect what's useful for Todo-or-ProcessStep tree display."""
56     # pylint: disable=too-few-public-methods
57     node_id: int
58     todo: Todo | None
59     process: Process | None
60     children: list[TodoOrProcStepNode]  # pylint: disable=undefined-variable
61     fillable: bool = False
62     _to_dict = ['node_id', 'todo', 'process', 'children', 'fillable']
63
64
65 class Todo(BaseModel[int], ConditionsRelations):
66     """Individual actionable."""
67     # pylint: disable=too-many-instance-attributes
68     # pylint: disable=too-many-public-methods
69     table_name = 'todos'
70     to_save_simples = ['process_id', 'is_done', 'date', 'comment', 'effort',
71                        'calendarize']
72     to_save_relations = [('todo_conditions', 'todo', 'conditions', 0),
73                          ('todo_blockers', 'todo', 'blockers', 0),
74                          ('todo_enables', 'todo', 'enables', 0),
75                          ('todo_disables', 'todo', 'disables', 0),
76                          ('todo_children', 'parent', 'children', 0),
77                          ('todo_children', 'child', 'parents', 1)]
78     to_search = ['comment']
79     days_to_update: Set[str] = set()
80     children: list[Todo]
81     parents: list[Todo]
82     sorters = {'doneness': lambda t: t.is_done,
83                'title': lambda t: t.title_then,
84                'comment': lambda t: t.comment,
85                'date': lambda t: t.date}
86
87     # pylint: disable=too-many-arguments
88     def __init__(self, id_: int | None,
89                  process: Process,
90                  is_done: bool,
91                  date: str, comment: str = '',
92                  effort: None | float = None,
93                  calendarize: bool = False) -> None:
94         BaseModel.__init__(self, id_)
95         ConditionsRelations.__init__(self)
96         if process.id_ is None:
97             raise NotFoundException('Process of Todo without ID (not saved?)')
98         self.process = process
99         self._is_done = is_done
100         self.date = valid_date(date)
101         self.comment = comment
102         self.effort = effort
103         self.children = []
104         self.parents = []
105         self.calendarize = calendarize
106         if not self.id_:
107             self.calendarize = self.process.calendarize
108             self.conditions = self.process.conditions[:]
109             self.blockers = self.process.blockers[:]
110             self.enables = self.process.enables[:]
111             self.disables = self.process.disables[:]
112
113     @classmethod
114     def by_date_range(cls, db_conn: DatabaseConnection,
115                       date_range: tuple[str, str] = ('', '')) -> list[Todo]:
116         """Collect Todos of Days within date_range."""
117         todos, _, _ = cls.by_date_range_with_limits(db_conn, date_range)
118         return todos
119
120     @classmethod
121     def create_with_children(cls, db_conn: DatabaseConnection,
122                              process_id: int, date: str) -> Todo:
123         """Create Todo of process for date, ensure children demanded by chain.
124
125         At minimum creates Todo of process_id, but checks the respective
126         Process for its step tree, and walks down that to provide the initial
127         Todo with all descendants defined there, either adopting existing
128         Todos, or creating them where necessary.
129         """
130
131         def key_order_func(n: ProcessStepsNode) -> int:
132             assert isinstance(n.process.id_, int)
133             return n.process.id_
134
135         def walk_steps(parent: Todo, step_node: ProcessStepsNode) -> Todo:
136             adoptables = [t for t in cls.by_date(db_conn, date)
137                           if (t not in parent.children)
138                           and (t != parent)
139                           and step_node.process == t.process]
140             satisfier = None
141             for adoptable in adoptables:
142                 satisfier = adoptable
143                 break
144             if not satisfier:
145                 satisfier = cls(None, step_node.process, False, date)
146                 satisfier.save(db_conn)
147             sub_step_nodes = list(step_node.steps.values())
148             sub_step_nodes.sort(key=key_order_func)
149             for sub_node in sub_step_nodes:
150                 if sub_node.is_suppressed:
151                     continue
152                 n_slots = len([n for n in sub_step_nodes
153                                if n.process == sub_node.process])
154                 filled_slots = len([t for t in satisfier.children
155                                     if t.process == sub_node.process])
156                 # if we did not newly create satisfier, it may already fill
157                 # some step dependencies, so only fill what remains open
158                 if n_slots - filled_slots > 0:
159                     satisfier.add_child(walk_steps(satisfier, sub_node))
160             satisfier.save(db_conn)
161             return satisfier
162
163         process = Process.by_id(db_conn, process_id)
164         todo = cls(None, process, False, date)
165         todo.save(db_conn)
166         steps_tree = process.get_steps(db_conn)
167         for step_node in steps_tree.values():
168             if step_node.is_suppressed:
169                 continue
170             todo.add_child(walk_steps(todo, step_node))
171         todo.save(db_conn)
172         return todo
173
174     @classmethod
175     def from_table_row(cls, db_conn: DatabaseConnection,
176                        row: Row | list[Any]) -> Todo:
177         """Make from DB row, with dependencies."""
178         if row[1] == 0:
179             raise NotFoundException('calling Todo of '
180                                     'unsaved Process')
181         row_as_list = list(row)
182         row_as_list[1] = Process.by_id(db_conn, row[1])
183         todo = super().from_table_row(db_conn, row_as_list)
184         assert isinstance(todo.id_, int)
185         for t_id in db_conn.column_where('todo_children', 'child',
186                                          'parent', todo.id_):
187             todo.children += [cls.by_id(db_conn, t_id)]
188         for t_id in db_conn.column_where('todo_children', 'parent',
189                                          'child', todo.id_):
190             todo.parents += [cls.by_id(db_conn, t_id)]
191         for name in ('conditions', 'blockers', 'enables', 'disables'):
192             table = f'todo_{name}'
193             assert isinstance(todo.id_, int)
194             for cond_id in db_conn.column_where(table, 'condition',
195                                                 'todo', todo.id_):
196                 target = getattr(todo, name)
197                 target += [Condition.by_id(db_conn, cond_id)]
198         return todo
199
200     @classmethod
201     def by_process_id(cls, db_conn: DatabaseConnection,
202                       process_id: int | None) -> list[Todo]:
203         """Collect all Todos of Process of process_id."""
204         return [t for t in cls.all(db_conn) if t.process.id_ == process_id]
205
206     @classmethod
207     def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
208         """Collect all Todos for Day of date."""
209         return cls.by_date_range(db_conn, (date, date))
210
211     @property
212     def is_doable(self) -> bool:
213         """Decide whether .is_done settable based on children, Conditions."""
214         for child in self.children:
215             if not child.is_done:
216                 return False
217         for condition in self.conditions:
218             if not condition.is_active:
219                 return False
220         for condition in self.blockers:
221             if condition.is_active:
222                 return False
223         return True
224
225     @property
226     def is_deletable(self) -> bool:
227         """Decide whether self be deletable (not if preserve-worthy values)."""
228         if self.comment:
229             return False
230         if self.effort and self.effort >= 0:
231             return False
232         return True
233
234     @property
235     def performed_effort(self) -> float:
236         """Return performed effort, i.e. self.effort or default if done.."""
237         if self.effort is not None:
238             return self.effort
239         if self.is_done:
240             return self.effort_then
241         return 0
242
243     @property
244     def process_id(self) -> int | str | None:
245         """Needed for super().save to save Processes as attributes."""
246         return self.process.id_
247
248     @property
249     def is_done(self) -> bool:
250         """Wrapper around self._is_done so we can control its setter."""
251         return self._is_done
252
253     @is_done.setter
254     def is_done(self, value: bool) -> None:
255         if value != self.is_done and not self.is_doable:
256             raise BadFormatException('cannot change doneness of undoable Todo')
257         if self._is_done != value:
258             self._is_done = value
259             if value is True:
260                 for condition in self.enables:
261                     condition.is_active = True
262                 for condition in self.disables:
263                     condition.is_active = False
264
265     @property
266     def title(self) -> VersionedAttribute:
267         """Shortcut to .process.title."""
268         assert isinstance(self.process.title, VersionedAttribute)
269         return self.process.title
270
271     @property
272     def title_then(self) -> str:
273         """Shortcut to .process.title.at(self.date)"""
274         title_then = self.process.title.at(self.date)
275         assert isinstance(title_then, str)
276         return title_then
277
278     @property
279     def effort_then(self) -> float:
280         """Shortcut to .process.effort.at(self.date)"""
281         effort_then = self.process.effort.at(self.date)
282         assert isinstance(effort_then, float)
283         return effort_then
284
285     @property
286     def has_doneness_in_path(self) -> bool:
287         """Check whether self is done or has any children that are."""
288         if self.is_done:
289             return True
290         for child in self.children:
291             if child.is_done:
292                 return True
293             if child.has_doneness_in_path:
294                 return True
295         return False
296
297     def get_step_tree(self, seen_todos: set[int]) -> TodoNode:
298         """Return tree of depended-on Todos."""
299
300         def make_node(todo: Todo) -> TodoNode:
301             children = []
302             seen = todo.id_ in seen_todos
303             assert isinstance(todo.id_, int)
304             seen_todos.add(todo.id_)
305             for child in todo.children:
306                 children += [make_node(child)]
307             return TodoNode(todo, seen, children)
308
309         return make_node(self)
310
311     @property
312     def tree_effort(self) -> float:
313         """Return sum of performed efforts of self and all descendants."""
314
315         def walk_tree(node: Todo) -> float:
316             local_effort = 0.0
317             for child in node.children:
318                 local_effort += walk_tree(child)
319             return node.performed_effort + local_effort
320
321         return walk_tree(self)
322
323     def add_child(self, child: Todo) -> None:
324         """Add child to self.children, avoid recursion, update parenthoods."""
325
326         def walk_steps(node: Todo) -> None:
327             if node.id_ == self.id_:
328                 raise BadFormatException('bad child choice causes recursion')
329             for child in node.children:
330                 walk_steps(child)
331
332         if self.id_ is None:
333             raise HandledException('Can only add children to saved Todos.')
334         if child.id_ is None:
335             raise HandledException('Can only add saved children to Todos.')
336         if child in self.children:
337             raise BadFormatException('cannot adopt same child twice')
338         walk_steps(child)
339         self.children += [child]
340         child.parents += [self]
341
342     def remove_child(self, child: Todo) -> None:
343         """Remove child from self.children, update counter relations."""
344         if child not in self.children:
345             raise HandledException('Cannot remove un-parented child.')
346         self.children.remove(child)
347         child.parents.remove(self)
348
349     def save(self, db_conn: DatabaseConnection) -> None:
350         """On save calls, also check if auto-deletion by effort < 0."""
351         if self.effort and self.effort < 0 and self.is_deletable:
352             self.remove(db_conn)
353             return
354         if self.id_ is None:
355             self.__class__.days_to_update.add(self.date)
356         super().save(db_conn)
357         for condition in self.enables + self.disables + self.conditions:
358             condition.save(db_conn)
359
360     def remove(self, db_conn: DatabaseConnection) -> None:
361         """Remove from DB, including relations."""
362         if not self.is_deletable:
363             raise HandledException('Cannot remove non-deletable Todo.')
364         self.__class__.days_to_update.add(self.date)
365         children_to_remove = self.children[:]
366         parents_to_remove = self.parents[:]
367         for child in children_to_remove:
368             self.remove_child(child)
369         for parent in parents_to_remove:
370             parent.remove_child(self)
371         super().remove(db_conn)