home · contact · privacy
9fac63b3d962aacd867a2ab9f6e1be86074c149a
[plomtask] / plomtask / todos.py
1 """Actionables."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any
5 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.processes import Process, ProcessStepsNode
8 from plomtask.versioned_attributes import VersionedAttribute
9 from plomtask.conditions import Condition, ConditionsRelations
10 from plomtask.exceptions import (NotFoundException, BadFormatException,
11                                  HandledException)
12 from plomtask.dating import valid_date
13
14
15 @dataclass
16 class TodoNode:
17     """Collects what's useful to know for Todo/Condition tree display."""
18     todo: Todo
19     seen: bool
20     children: list[TodoNode]
21
22
23 class Todo(BaseModel[int], ConditionsRelations):
24     """Individual actionable."""
25     # pylint: disable=too-many-instance-attributes
26     table_name = 'todos'
27     to_save = ['process_id', 'is_done', 'date', 'comment', 'effort',
28                'calendarize']
29     to_save_relations = [('todo_conditions', 'todo', 'conditions'),
30                          ('todo_blockers', 'todo', 'blockers'),
31                          ('todo_enables', 'todo', 'enables'),
32                          ('todo_disables', 'todo', 'disables'),
33                          ('todo_children', 'parent', 'children'),
34                          ('todo_children', 'child', 'parents')]
35     to_search = ['comment']
36
37     # pylint: disable=too-many-arguments
38     def __init__(self, id_: int | None,
39                  process: Process,
40                  is_done: bool,
41                  date: str, comment: str = '',
42                  effort: None | float = None,
43                  calendarize: bool = False) -> None:
44         BaseModel.__init__(self, id_)
45         ConditionsRelations.__init__(self)
46         if process.id_ is None:
47             raise NotFoundException('Process of Todo without ID (not saved?)')
48         self.process = process
49         self._is_done = is_done
50         self.date = valid_date(date)
51         self.comment = comment
52         self.effort = effort
53         self.children: list[Todo] = []
54         self.parents: list[Todo] = []
55         self.calendarize = calendarize
56         if not self.id_:
57             self.calendarize = self.process.calendarize
58             self.conditions = self.process.conditions[:]
59             self.blockers = self.process.blockers[:]
60             self.enables = self.process.enables[:]
61             self.disables = self.process.disables[:]
62
63     @classmethod
64     def by_date_range(cls, db_conn: DatabaseConnection,
65                       date_range: tuple[str, str] = ('', '')) -> list[Todo]:
66         """Collect Todos of Days within date_range."""
67         todos, _, _ = cls.by_date_range_with_limits(db_conn, date_range)
68         return todos
69
70     @classmethod
71     def create_with_children(cls, db_conn: DatabaseConnection,
72                              process_id: int, date: str) -> Todo:
73         """Create Todo of process for date, ensure children."""
74
75         def key_order_func(n: ProcessStepsNode) -> int:
76             assert isinstance(n.process.id_, int)
77             return n.process.id_
78
79         def walk_steps(parent: Todo, step_node: ProcessStepsNode) -> Todo:
80             adoptables = [t for t in cls.by_date(db_conn, date)
81                           if (t not in parent.children)
82                           and (t != parent)
83                           and step_node.process == t.process]
84             satisfier = None
85             for adoptable in adoptables:
86                 satisfier = adoptable
87                 break
88             if not satisfier:
89                 satisfier = cls(None, step_node.process, False, date)
90                 satisfier.save(db_conn)
91             sub_step_nodes = list(step_node.steps.values())
92             sub_step_nodes.sort(key=key_order_func)
93             for sub_node in sub_step_nodes:
94                 n_slots = len([n for n in sub_step_nodes
95                                if n.process == sub_node.process])
96                 filled_slots = len([t for t in satisfier.children
97                                     if t.process == sub_node.process])
98                 # if we did not newly create satisfier, it may already fill
99                 # some step dependencies, so only fill what remains open
100                 if n_slots - filled_slots > 0:
101                     satisfier.add_child(walk_steps(satisfier, sub_node))
102             satisfier.save(db_conn)
103             return satisfier
104
105         process = Process.by_id(db_conn, process_id)
106         todo = cls(None, process, False, date)
107         todo.save(db_conn)
108         steps_tree = process.get_steps(db_conn)
109         for step_node in steps_tree.values():
110             todo.add_child(walk_steps(todo, step_node))
111         todo.save(db_conn)
112         return todo
113
114     @classmethod
115     def from_table_row(cls, db_conn: DatabaseConnection,
116                        row: Row | list[Any]) -> Todo:
117         """Make from DB row, with dependencies."""
118         if row[1] == 0:
119             raise NotFoundException('calling Todo of '
120                                     'unsaved Process')
121         row_as_list = list(row)
122         row_as_list[1] = Process.by_id(db_conn, row[1])
123         todo = super().from_table_row(db_conn, row_as_list)
124         assert isinstance(todo.id_, int)
125         for t_id in db_conn.column_where('todo_children', 'child',
126                                          'parent', todo.id_):
127             # pylint: disable=no-member
128             todo.children += [cls.by_id(db_conn, t_id)]
129         for t_id in db_conn.column_where('todo_children', 'parent',
130                                          'child', todo.id_):
131             # pylint: disable=no-member
132             todo.parents += [cls.by_id(db_conn, t_id)]
133         for name in ('conditions', 'blockers', 'enables', 'disables'):
134             table = f'todo_{name}'
135             assert isinstance(todo.id_, int)
136             for cond_id in db_conn.column_where(table, 'condition',
137                                                 'todo', todo.id_):
138                 target = getattr(todo, name)
139                 target += [Condition.by_id(db_conn, cond_id)]
140         return todo
141
142     @classmethod
143     def by_process_id(cls, db_conn: DatabaseConnection,
144                       process_id: int | None) -> list[Todo]:
145         """Collect all Todos of Process of process_id."""
146         return [t for t in cls.all(db_conn) if t.process.id_ == process_id]
147
148     @classmethod
149     def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
150         """Collect all Todos for Day of date."""
151         return cls.by_date_range(db_conn, (date, date))
152
153     @property
154     def is_doable(self) -> bool:
155         """Decide whether .is_done settable based on children, Conditions."""
156         for child in self.children:
157             if not child.is_done:
158                 return False
159         for condition in self.conditions:
160             if not condition.is_active:
161                 return False
162         for condition in self.blockers:
163             if condition.is_active:
164                 return False
165         return True
166
167     @property
168     def is_deletable(self) -> bool:
169         """Decide whether self be deletable (not if preserve-worthy values)."""
170         if self.comment:
171             return False
172         if self.effort and self.effort >= 0:
173             return False
174         return True
175
176     @property
177     def process_id(self) -> int | str | None:
178         """Needed for super().save to save Processes as attributes."""
179         return self.process.id_
180
181     @property
182     def is_done(self) -> bool:
183         """Wrapper around self._is_done so we can control its setter."""
184         return self._is_done
185
186     @is_done.setter
187     def is_done(self, value: bool) -> None:
188         if value != self.is_done and not self.is_doable:
189             raise BadFormatException('cannot change doneness of undoable Todo')
190         if self._is_done != value:
191             self._is_done = value
192             if value is True:
193                 for condition in self.enables:
194                     condition.is_active = True
195                 for condition in self.disables:
196                     condition.is_active = False
197
198     @property
199     def title(self) -> VersionedAttribute:
200         """Shortcut to .process.title."""
201         return self.process.title
202
203     @property
204     def title_then(self) -> str:
205         """Shortcut to .process.title.at(self.date)"""
206         title_then = self.process.title.at(self.date)
207         assert isinstance(title_then, str)
208         return title_then
209
210     @property
211     def effort_then(self) -> float:
212         """Shortcut to .process.effort.at(self.date)"""
213         effort_then = self.process.effort.at(self.date)
214         assert isinstance(effort_then, float)
215         return effort_then
216
217     def get_step_tree(self, seen_todos: set[int]) -> TodoNode:
218         """Return tree of depended-on Todos."""
219
220         def make_node(todo: Todo) -> TodoNode:
221             children = []
222             seen = todo.id_ in seen_todos
223             assert isinstance(todo.id_, int)
224             seen_todos.add(todo.id_)
225             for child in todo.children:
226                 children += [make_node(child)]
227             return TodoNode(todo, seen, children)
228
229         return make_node(self)
230
231     def add_child(self, child: Todo) -> None:
232         """Add child to self.children, avoid recursion, update parenthoods."""
233
234         def walk_steps(node: Todo) -> None:
235             if node.id_ == self.id_:
236                 raise BadFormatException('bad child choice causes recursion')
237             for child in node.children:
238                 walk_steps(child)
239
240         if self.id_ is None:
241             raise HandledException('Can only add children to saved Todos.')
242         if child.id_ is None:
243             raise HandledException('Can only add saved children to Todos.')
244         if child in self.children:
245             raise BadFormatException('cannot adopt same child twice')
246         walk_steps(child)
247         self.children += [child]
248         child.parents += [self]
249
250     def remove_child(self, child: Todo) -> None:
251         """Remove child from self.children, update counter relations."""
252         if child not in self.children:
253             raise HandledException('Cannot remove un-parented child.')
254         self.children.remove(child)
255         child.parents.remove(self)
256
257     def save(self, db_conn: DatabaseConnection) -> None:
258         """On save calls, also check if auto-deletion by effort < 0."""
259         if self.effort and self.effort < 0 and self.is_deletable:
260             self.remove(db_conn)
261             return
262         super().save(db_conn)
263
264     def remove(self, db_conn: DatabaseConnection) -> None:
265         """Remove from DB, including relations."""
266         if not self.is_deletable:
267             raise HandledException('Cannot remove non-deletable Todo.')
268         children_to_remove = self.children[:]
269         parents_to_remove = self.parents[:]
270         for child in children_to_remove:
271             self.remove_child(child)
272         for parent in parents_to_remove:
273             parent.remove_child(self)
274         super().remove(db_conn)