home · contact · privacy
Some test clean-ups.
[plomtask] / plomtask / todos.py
1 """Actionables."""
2 from __future__ import annotations
3 from typing import Any, Set
4 from sqlite3 import Row
5 from plomtask.db import DatabaseConnection, BaseModel
6 from plomtask.processes import Process, ProcessStepsNode
7 from plomtask.versioned_attributes import VersionedAttribute
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import (NotFoundException, BadFormatException,
10                                  HandledException)
11 from plomtask.dating import valid_date
12
13
14 class TodoNode:
15     """Collects what's useful to know for Todo/Condition tree display."""
16     # pylint: disable=too-few-public-methods
17     todo: Todo
18     seen: bool
19     children: list[TodoNode]
20
21     def __init__(self,
22                  todo: Todo,
23                  seen: bool,
24                  children: list[TodoNode]) -> None:
25         self.todo = todo
26         self.seen = seen
27         self.children = children
28
29     @property
30     def as_dict(self) -> dict[str, object]:
31         """Return self as (json.dumps-coompatible) dict."""
32         return {'todo': self.todo.id_,
33                 'seen': self.seen,
34                 'children': [c.as_dict for c in self.children]}
35
36
37 class TodoStepsNode:
38     """Collect what's useful for Todo steps tree display."""
39     # pylint: disable=too-few-public-methods
40     id_: int
41     todo: Todo | None
42     process: Process | None
43     children: list[TodoStepsNode]  # pylint: disable=undefined-variable
44     fillable: bool
45
46     def __init__(self,
47                  id_: int,
48                  todo: Todo | None,
49                  process: Process | None,
50                  children: list[TodoStepsNode],
51                  fillable: bool = False):
52         # pylint: disable=too-many-arguments
53         self.id_ = id_
54         self.todo = todo
55         self.process = process
56         self.children = children
57         self.fillable = fillable
58
59     @property
60     def as_dict(self) -> dict[str, object]:
61         """Return self as (json.dumps-compatible) dict."""
62         return {'id': self.id_,
63                 'todo': self.todo.id_ if self.todo else None,
64                 'process': self.process.id_ if self.process else None,
65                 'children': [c.as_dict for c in self.children],
66                 'fillable': self.fillable}
67
68
69 class Todo(BaseModel[int], ConditionsRelations):
70     """Individual actionable."""
71     # pylint: disable=too-many-instance-attributes
72     # pylint: disable=too-many-public-methods
73     table_name = 'todos'
74     to_save_simples = ['process_id', 'is_done', 'date', 'comment', 'effort',
75                        'calendarize']
76     to_save_relations = [('todo_conditions', 'todo', 'conditions', 0),
77                          ('todo_blockers', 'todo', 'blockers', 0),
78                          ('todo_enables', 'todo', 'enables', 0),
79                          ('todo_disables', 'todo', 'disables', 0),
80                          ('todo_children', 'parent', 'children', 0),
81                          ('todo_children', 'child', 'parents', 1)]
82     to_search = ['comment']
83     days_to_update: Set[str] = set()
84     children: list[Todo]
85     parents: list[Todo]
86     sorters = {'doneness': lambda t: t.is_done,
87                'title': lambda t: t.title_then,
88                'comment': lambda t: t.comment,
89                'date': lambda t: t.date}
90
91     # pylint: disable=too-many-arguments
92     def __init__(self, id_: int | None,
93                  process: Process,
94                  is_done: bool,
95                  date: str, comment: str = '',
96                  effort: None | float = None,
97                  calendarize: bool = False) -> None:
98         BaseModel.__init__(self, id_)
99         ConditionsRelations.__init__(self)
100         if process.id_ is None:
101             raise NotFoundException('Process of Todo without ID (not saved?)')
102         self.process = process
103         self._is_done = is_done
104         self.date = valid_date(date)
105         self.comment = comment
106         self.effort = effort
107         self.children = []
108         self.parents = []
109         self.calendarize = calendarize
110         if not self.id_:
111             self.calendarize = self.process.calendarize
112             self.conditions = self.process.conditions[:]
113             self.blockers = self.process.blockers[:]
114             self.enables = self.process.enables[:]
115             self.disables = self.process.disables[:]
116
117     @classmethod
118     def by_date_range(cls, db_conn: DatabaseConnection,
119                       date_range: tuple[str, str] = ('', '')) -> list[Todo]:
120         """Collect Todos of Days within date_range."""
121         todos, _, _ = cls.by_date_range_with_limits(db_conn, date_range)
122         return todos
123
124     @classmethod
125     def create_with_children(cls, db_conn: DatabaseConnection,
126                              process_id: int, date: str) -> Todo:
127         """Create Todo of process for date, ensure children."""
128
129         def key_order_func(n: ProcessStepsNode) -> int:
130             assert isinstance(n.process.id_, int)
131             return n.process.id_
132
133         def walk_steps(parent: Todo, step_node: ProcessStepsNode) -> Todo:
134             adoptables = [t for t in cls.by_date(db_conn, date)
135                           if (t not in parent.children)
136                           and (t != parent)
137                           and step_node.process == t.process]
138             satisfier = None
139             for adoptable in adoptables:
140                 satisfier = adoptable
141                 break
142             if not satisfier:
143                 satisfier = cls(None, step_node.process, False, date)
144                 satisfier.save(db_conn)
145             sub_step_nodes = list(step_node.steps.values())
146             sub_step_nodes.sort(key=key_order_func)
147             for sub_node in sub_step_nodes:
148                 if sub_node.is_suppressed:
149                     continue
150                 n_slots = len([n for n in sub_step_nodes
151                                if n.process == sub_node.process])
152                 filled_slots = len([t for t in satisfier.children
153                                     if t.process == sub_node.process])
154                 # if we did not newly create satisfier, it may already fill
155                 # some step dependencies, so only fill what remains open
156                 if n_slots - filled_slots > 0:
157                     satisfier.add_child(walk_steps(satisfier, sub_node))
158             satisfier.save(db_conn)
159             return satisfier
160
161         process = Process.by_id(db_conn, process_id)
162         todo = cls(None, process, False, date)
163         todo.save(db_conn)
164         steps_tree = process.get_steps(db_conn)
165         for step_node in steps_tree.values():
166             if step_node.is_suppressed:
167                 continue
168             todo.add_child(walk_steps(todo, step_node))
169         todo.save(db_conn)
170         return todo
171
172     @classmethod
173     def from_table_row(cls, db_conn: DatabaseConnection,
174                        row: Row | list[Any]) -> Todo:
175         """Make from DB row, with dependencies."""
176         if row[1] == 0:
177             raise NotFoundException('calling Todo of '
178                                     'unsaved Process')
179         row_as_list = list(row)
180         row_as_list[1] = Process.by_id(db_conn, row[1])
181         todo = super().from_table_row(db_conn, row_as_list)
182         assert isinstance(todo.id_, int)
183         for t_id in db_conn.column_where('todo_children', 'child',
184                                          'parent', todo.id_):
185             todo.children += [cls.by_id(db_conn, t_id)]
186         for t_id in db_conn.column_where('todo_children', 'parent',
187                                          'child', todo.id_):
188             todo.parents += [cls.by_id(db_conn, t_id)]
189         for name in ('conditions', 'blockers', 'enables', 'disables'):
190             table = f'todo_{name}'
191             assert isinstance(todo.id_, int)
192             for cond_id in db_conn.column_where(table, 'condition',
193                                                 'todo', todo.id_):
194                 target = getattr(todo, name)
195                 target += [Condition.by_id(db_conn, cond_id)]
196         return todo
197
198     @classmethod
199     def by_process_id(cls, db_conn: DatabaseConnection,
200                       process_id: int | None) -> list[Todo]:
201         """Collect all Todos of Process of process_id."""
202         return [t for t in cls.all(db_conn) if t.process.id_ == process_id]
203
204     @classmethod
205     def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
206         """Collect all Todos for Day of date."""
207         return cls.by_date_range(db_conn, (date, date))
208
209     @property
210     def is_doable(self) -> bool:
211         """Decide whether .is_done settable based on children, Conditions."""
212         for child in self.children:
213             if not child.is_done:
214                 return False
215         for condition in self.conditions:
216             if not condition.is_active:
217                 return False
218         for condition in self.blockers:
219             if condition.is_active:
220                 return False
221         return True
222
223     @property
224     def is_deletable(self) -> bool:
225         """Decide whether self be deletable (not if preserve-worthy values)."""
226         if self.comment:
227             return False
228         if self.effort and self.effort >= 0:
229             return False
230         return True
231
232     @property
233     def performed_effort(self) -> float:
234         """Return performed effort, i.e. self.effort or default if done.."""
235         if self.effort is not None:
236             return self.effort
237         if self.is_done:
238             return self.effort_then
239         return 0
240
241     @property
242     def process_id(self) -> int | str | None:
243         """Needed for super().save to save Processes as attributes."""
244         return self.process.id_
245
246     @property
247     def is_done(self) -> bool:
248         """Wrapper around self._is_done so we can control its setter."""
249         return self._is_done
250
251     @is_done.setter
252     def is_done(self, value: bool) -> None:
253         if value != self.is_done and not self.is_doable:
254             raise BadFormatException('cannot change doneness of undoable Todo')
255         if self._is_done != value:
256             self._is_done = value
257             if value is True:
258                 for condition in self.enables:
259                     condition.is_active = True
260                 for condition in self.disables:
261                     condition.is_active = False
262
263     @property
264     def title(self) -> VersionedAttribute:
265         """Shortcut to .process.title."""
266         assert isinstance(self.process.title, VersionedAttribute)
267         return self.process.title
268
269     @property
270     def title_then(self) -> str:
271         """Shortcut to .process.title.at(self.date)"""
272         title_then = self.process.title.at(self.date)
273         assert isinstance(title_then, str)
274         return title_then
275
276     @property
277     def effort_then(self) -> float:
278         """Shortcut to .process.effort.at(self.date)"""
279         effort_then = self.process.effort.at(self.date)
280         assert isinstance(effort_then, float)
281         return effort_then
282
283     @property
284     def has_doneness_in_path(self) -> bool:
285         """Check whether self is done or has any children that are."""
286         if self.is_done:
287             return True
288         for child in self.children:
289             if child.is_done:
290                 return True
291             if child.has_doneness_in_path:
292                 return True
293         return False
294
295     def get_step_tree(self, seen_todos: set[int]) -> TodoNode:
296         """Return tree of depended-on Todos."""
297
298         def make_node(todo: Todo) -> TodoNode:
299             children = []
300             seen = todo.id_ in seen_todos
301             assert isinstance(todo.id_, int)
302             seen_todos.add(todo.id_)
303             for child in todo.children:
304                 children += [make_node(child)]
305             return TodoNode(todo, seen, children)
306
307         return make_node(self)
308
309     @property
310     def tree_effort(self) -> float:
311         """Return sum of performed efforts of self and all descendants."""
312
313         def walk_tree(node: Todo) -> float:
314             local_effort = 0.0
315             for child in node.children:
316                 local_effort += walk_tree(child)
317             return node.performed_effort + local_effort
318
319         return walk_tree(self)
320
321     def add_child(self, child: Todo) -> None:
322         """Add child to self.children, avoid recursion, update parenthoods."""
323
324         def walk_steps(node: Todo) -> None:
325             if node.id_ == self.id_:
326                 raise BadFormatException('bad child choice causes recursion')
327             for child in node.children:
328                 walk_steps(child)
329
330         if self.id_ is None:
331             raise HandledException('Can only add children to saved Todos.')
332         if child.id_ is None:
333             raise HandledException('Can only add saved children to Todos.')
334         if child in self.children:
335             raise BadFormatException('cannot adopt same child twice')
336         walk_steps(child)
337         self.children += [child]
338         child.parents += [self]
339
340     def remove_child(self, child: Todo) -> None:
341         """Remove child from self.children, update counter relations."""
342         if child not in self.children:
343             raise HandledException('Cannot remove un-parented child.')
344         self.children.remove(child)
345         child.parents.remove(self)
346
347     def save(self, db_conn: DatabaseConnection) -> None:
348         """On save calls, also check if auto-deletion by effort < 0."""
349         if self.effort and self.effort < 0 and self.is_deletable:
350             self.remove(db_conn)
351             return
352         if self.id_ is None:
353             self.__class__.days_to_update.add(self.date)
354         super().save(db_conn)
355         for condition in self.enables + self.disables + self.conditions:
356             condition.save(db_conn)
357
358     def remove(self, db_conn: DatabaseConnection) -> None:
359         """Remove from DB, including relations."""
360         if not self.is_deletable:
361             raise HandledException('Cannot remove non-deletable Todo.')
362         self.__class__.days_to_update.add(self.date)
363         children_to_remove = self.children[:]
364         parents_to_remove = self.parents[:]
365         for child in children_to_remove:
366             self.remove_child(child)
367         for parent in parents_to_remove:
368             parent.remove_child(self)
369         super().remove(db_conn)