home · contact · privacy
Refactor save and remove methods of BaseObject subclasses.
[plomtask] / plomtask / todos.py
1 """Actionables."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any
5 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.processes import Process
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import (NotFoundException, BadFormatException,
10                                  HandledException)
11
12
13 @dataclass
14 class TodoStepsNode:
15     """Collects what's useful to know for Todo/Condition tree display."""
16     item: Todo | Condition
17     is_todo: bool
18     children: list[TodoStepsNode]
19     seen: bool
20     hide: bool
21
22
23 class Todo(BaseModel[int], ConditionsRelations):
24     """Individual actionable."""
25     # pylint: disable=too-many-instance-attributes
26     table_name = 'todos'
27     to_save = ['process_id', 'is_done', 'date']
28     to_save_relations = [('todo_conditions', 'todo', 'conditions'),
29                          ('todo_enables', 'todo', 'enables'),
30                          ('todo_disables', 'todo', 'disables'),
31                          ('todo_children', 'parent', 'children'),
32                          ('todo_children', 'child', 'parents')]
33
34     def __init__(self, id_: int | None, process: Process,
35                  is_done: bool, date: str) -> None:
36         super().__init__(id_)
37         if process.id_ is None:
38             raise NotFoundException('Process of Todo without ID (not saved?)')
39         self.process = process
40         self._is_done = is_done
41         self.date = date
42         self.children: list[Todo] = []
43         self.parents: list[Todo] = []
44         self.conditions: list[Condition] = []
45         self.enables: list[Condition] = []
46         self.disables: list[Condition] = []
47         if not self.id_:
48             self.conditions = self.process.conditions[:]
49             self.enables = self.process.enables[:]
50             self.disables = self.process.disables[:]
51
52     @classmethod
53     def from_table_row(cls, db_conn: DatabaseConnection,
54                        row: Row | list[Any]) -> Todo:
55         """Make from DB row, with dependencies."""
56         if row[1] == 0:
57             raise NotFoundException('calling Todo of '
58                                     'unsaved Process')
59         row_as_list = list(row)
60         row_as_list[1] = Process.by_id(db_conn, row[1])
61         todo = super().from_table_row(db_conn, row_as_list)
62         assert isinstance(todo.id_, int)
63         for t_id in db_conn.column_where('todo_children', 'child',
64                                          'parent', todo.id_):
65             # pylint: disable=no-member
66             todo.children += [cls.by_id(db_conn, t_id)]
67         for t_id in db_conn.column_where('todo_children', 'parent',
68                                          'child', todo.id_):
69             # pylint: disable=no-member
70             todo.parents += [cls.by_id(db_conn, t_id)]
71         for name in ('conditions', 'enables', 'disables'):
72             table = f'todo_{name}'
73             assert isinstance(todo.id_, int)
74             for cond_id in db_conn.column_where(table, 'condition',
75                                                 'todo', todo.id_):
76                 target = getattr(todo, name)
77                 target += [Condition.by_id(db_conn, cond_id)]
78         return todo
79
80     @classmethod
81     def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
82         """Collect all Todos for Day of date."""
83         todos = []
84         for id_ in db_conn.column_where('todos', 'id', 'day', date):
85             todos += [cls.by_id(db_conn, id_)]
86         return todos
87
88     @staticmethod
89     def _x_ablers_for_at(db_conn: DatabaseConnection, name: str,
90                          cond: Condition, date: str) -> list[Todo]:
91         """Collect all Todos of day that [name] condition."""
92         assert isinstance(cond.id_, int)
93         x_ablers = []
94         table = f'todo_{name}'
95         for id_ in db_conn.column_where(table, 'todo', 'condition', cond.id_):
96             todo = Todo.by_id(db_conn, id_)
97             if todo.date == date:
98                 x_ablers += [todo]
99         return x_ablers
100
101     @classmethod
102     def enablers_for_at(cls, db_conn: DatabaseConnection,
103                         condition: Condition, date: str) -> list[Todo]:
104         """Collect all Todos of day that enable condition."""
105         return cls._x_ablers_for_at(db_conn, 'enables', condition, date)
106
107     @classmethod
108     def disablers_for_at(cls, db_conn: DatabaseConnection,
109                          condition: Condition, date: str) -> list[Todo]:
110         """Collect all Todos of day that disable condition."""
111         return cls._x_ablers_for_at(db_conn, 'disables', condition, date)
112
113     @property
114     def is_doable(self) -> bool:
115         """Decide whether .is_done settable based on children, Conditions."""
116         for child in self.children:
117             if not child.is_done:
118                 return False
119         for condition in self.conditions:
120             if not condition.is_active:
121                 return False
122         return True
123
124     @property
125     def process_id(self) -> int | str | None:
126         """Return ID of tasked Process."""
127         return self.process.id_
128
129     @property
130     def unsatisfied_dependencies(self) -> list[int]:
131         """Return Process IDs of .process.explicit_steps not in .children."""
132         unsatisfied = [s.step_process_id for s in self.process.explicit_steps
133                        if s.parent_step_id is None]
134         for child_process_id in [c.process.id_ for c in self.children]:
135             if child_process_id in unsatisfied:
136                 unsatisfied.remove(child_process_id)
137         return unsatisfied
138
139     @property
140     def is_done(self) -> bool:
141         """Wrapper around self._is_done so we can control its setter."""
142         return self._is_done
143
144     @is_done.setter
145     def is_done(self, value: bool) -> None:
146         if value != self.is_done and not self.is_doable:
147             raise BadFormatException('cannot change doneness of undoable Todo')
148         if self._is_done != value:
149             self._is_done = value
150             if value is True:
151                 for condition in self.enables:
152                     condition.is_active = True
153                 for condition in self.disables:
154                     condition.is_active = False
155
156     def adopt_from(self, todos: list[Todo]) -> None:
157         """As far as possible, fill unsatisfied dependencies from todos."""
158         for process_id in self.unsatisfied_dependencies:
159             for todo in [t for t in todos if t.process.id_ == process_id
160                          and t not in self.children]:
161                 self.add_child(todo)
162                 break
163
164     def make_missing_children(self, db_conn: DatabaseConnection) -> None:
165         """Fill unsatisfied dependencies with new Todos."""
166         for process_id in self.unsatisfied_dependencies:
167             process = Process.by_id(db_conn, process_id)
168             todo = self.__class__(None, process, False, self.date)
169             todo.save(db_conn)
170             self.add_child(todo)
171
172     def get_step_tree(self, seen_todos: set[int],
173                       seen_conditions: set[int]) -> TodoStepsNode:
174         """Return tree of depended-on Todos and Conditions."""
175
176         def make_node(step: Todo | Condition) -> TodoStepsNode:
177             assert isinstance(step.id_, int)
178             is_todo = isinstance(step, Todo)
179             children = []
180             if is_todo:
181                 assert isinstance(step, Todo)
182                 seen = step.id_ in seen_todos
183                 seen_todos.add(step.id_)
184                 potentially_enabled = set()
185                 for child in step.children:
186                     for condition in child.enables:
187                         potentially_enabled.add(condition.id_)
188                     children += [make_node(child)]
189                 for condition in [c for c in step.conditions
190                                   if (not c.is_active)
191                                   and (c.id_ not in potentially_enabled)]:
192                     children += [make_node(condition)]
193             else:
194                 seen = step.id_ in seen_conditions
195                 seen_conditions.add(step.id_)
196             return TodoStepsNode(step, is_todo, children, seen, False)
197
198         node = make_node(self)
199         return node
200
201     def get_undone_steps_tree(self) -> TodoStepsNode:
202         """Return tree of depended-on undone Todos and Conditions."""
203
204         def walk_tree(node: TodoStepsNode) -> None:
205             if isinstance(node.item, Todo) and node.item.is_done:
206                 node.hide = True
207             for child in node.children:
208                 walk_tree(child)
209
210         seen_todos: set[int] = set()
211         seen_conditions: set[int] = set()
212         step_tree = self.get_step_tree(seen_todos, seen_conditions)
213         walk_tree(step_tree)
214         return step_tree
215
216     def get_done_steps_tree(self) -> list[TodoStepsNode]:
217         """Return tree of depended-on done Todos."""
218
219         def make_nodes(node: TodoStepsNode) -> list[TodoStepsNode]:
220             children: list[TodoStepsNode] = []
221             if not isinstance(node.item, Todo):
222                 return children
223             for child in node.children:
224                 children += make_nodes(child)
225             if node.item.is_done:
226                 node.children = children
227                 return [node]
228             return children
229
230         seen_todos: set[int] = set()
231         seen_conditions: set[int] = set()
232         step_tree = self.get_step_tree(seen_todos, seen_conditions)
233         nodes = make_nodes(step_tree)
234         return nodes
235
236     def add_child(self, child: Todo) -> None:
237         """Add child to self.children, avoid recursion, update parenthoods."""
238
239         def walk_steps(node: Todo) -> None:
240             if node.id_ == self.id_:
241                 raise BadFormatException('bad child choice causes recursion')
242             for child in node.children:
243                 walk_steps(child)
244
245         if self.id_ is None:
246             raise HandledException('Can only add children to saved Todos.')
247         if child.id_ is None:
248             raise HandledException('Can only add saved children to Todos.')
249         if child in self.children:
250             raise BadFormatException('cannot adopt same child twice')
251         walk_steps(child)
252         self.children += [child]
253         child.parents += [self]
254
255     def remove_child(self, child: Todo) -> None:
256         """Remove child from self.children, update counter relations."""
257         if child not in self.children:
258             raise HandledException('Cannot remove un-parented child.')
259         self.children.remove(child)
260         child.parents.remove(self)
261
262     def remove(self, db_conn: DatabaseConnection) -> None:
263         """Remove from DB, including relations."""
264         children_to_remove = self.children[:]
265         parents_to_remove = self.parents[:]
266         for child in children_to_remove:
267             self.remove_child(child)
268         for parent in parents_to_remove:
269             parent.remove_child(self)
270         super().remove(db_conn)