home · contact · privacy
Extend tests.
[plomtask] / plomtask / todos.py
1 """Actionables."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any
5 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.processes import Process
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import (NotFoundException, BadFormatException,
10                                  HandledException)
11
12
13 @dataclass
14 class TodoStepsNode:
15     """Collects what's useful to know for Todo/Condition tree display."""
16     item: Todo | Condition
17     is_todo: bool
18     children: list[TodoStepsNode]
19     seen: bool
20     hide: bool
21
22
23 class Todo(BaseModel[int], ConditionsRelations):
24     """Individual actionable."""
25     # pylint: disable=too-many-instance-attributes
26     table_name = 'todos'
27     to_save = ['process_id', 'is_done', 'date']
28
29     def __init__(self, id_: int | None, process: Process,
30                  is_done: bool, date: str) -> None:
31         super().__init__(id_)
32         self.process = process
33         self._is_done = is_done
34         self.date = date
35         self.children: list[Todo] = []
36         self.parents: list[Todo] = []
37         self.conditions: list[Condition] = []
38         self.enables: list[Condition] = []
39         self.disables: list[Condition] = []
40         if not self.id_:
41             self.conditions = self.process.conditions[:]
42             self.enables = self.process.enables[:]
43             self.disables = self.process.disables[:]
44
45     @classmethod
46     def from_table_row(cls, db_conn: DatabaseConnection,
47                        row: Row | list[Any]) -> Todo:
48         """Make from DB row, with dependencies."""
49         if row[1] == 0:
50             raise NotFoundException('calling Todo of '
51                                     'unsaved Process')
52         row_as_list = list(row)
53         row_as_list[1] = Process.by_id(db_conn, row[1])
54         todo = super().from_table_row(db_conn, row_as_list)
55         assert isinstance(todo.id_, int)
56         for t_id in db_conn.column_where('todo_children', 'child',
57                                          'parent', todo.id_):
58             # pylint: disable=no-member
59             todo.children += [cls.by_id(db_conn, t_id)]
60         for t_id in db_conn.column_where('todo_children', 'parent',
61                                          'child', todo.id_):
62             # pylint: disable=no-member
63             todo.parents += [cls.by_id(db_conn, t_id)]
64         for name in ('conditions', 'enables', 'disables'):
65             table = f'todo_{name}'
66             assert isinstance(todo.id_, int)
67             for cond_id in db_conn.column_where(table, 'condition',
68                                                 'todo', todo.id_):
69                 target = getattr(todo, name)
70                 target += [Condition.by_id(db_conn, cond_id)]
71         return todo
72
73     @classmethod
74     def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
75         """Collect all Todos for Day of date."""
76         todos = []
77         for id_ in db_conn.column_where('todos', 'id', 'day', date):
78             todos += [cls.by_id(db_conn, id_)]
79         return todos
80
81     @staticmethod
82     def _x_ablers_for_at(db_conn: DatabaseConnection, name: str,
83                          cond: Condition, date: str) -> list[Todo]:
84         """Collect all Todos of day that [name] condition."""
85         assert isinstance(cond.id_, int)
86         x_ablers = []
87         table = f'todo_{name}'
88         for id_ in db_conn.column_where(table, 'todo', 'condition', cond.id_):
89             todo = Todo.by_id(db_conn, id_)
90             if todo.date == date:
91                 x_ablers += [todo]
92         return x_ablers
93
94     @classmethod
95     def enablers_for_at(cls, db_conn: DatabaseConnection,
96                         condition: Condition, date: str) -> list[Todo]:
97         """Collect all Todos of day that enable condition."""
98         return cls._x_ablers_for_at(db_conn, 'enables', condition, date)
99
100     @classmethod
101     def disablers_for_at(cls, db_conn: DatabaseConnection,
102                          condition: Condition, date: str) -> list[Todo]:
103         """Collect all Todos of day that disable condition."""
104         return cls._x_ablers_for_at(db_conn, 'disables', condition, date)
105
106     @property
107     def is_doable(self) -> bool:
108         """Decide whether .is_done settable based on children, Conditions."""
109         for child in self.children:
110             if not child.is_done:
111                 return False
112         for condition in self.conditions:
113             if not condition.is_active:
114                 return False
115         return True
116
117     @property
118     def process_id(self) -> int | str | None:
119         """Return ID of tasked Process."""
120         return self.process.id_
121
122     @property
123     def unsatisfied_dependencies(self) -> list[int]:
124         """Return Process IDs of .process.explicit_steps not in .children."""
125         unsatisfied = [s.step_process_id for s in self.process.explicit_steps
126                        if s.parent_step_id is None]
127         for child_process_id in [c.process.id_ for c in self.children]:
128             if child_process_id in unsatisfied:
129                 unsatisfied.remove(child_process_id)
130         return unsatisfied
131
132     @property
133     def is_done(self) -> bool:
134         """Wrapper around self._is_done so we can control its setter."""
135         return self._is_done
136
137     @is_done.setter
138     def is_done(self, value: bool) -> None:
139         if value != self.is_done and not self.is_doable:
140             raise BadFormatException('cannot change doneness of undoable Todo')
141         if self._is_done != value:
142             self._is_done = value
143             if value is True:
144                 for condition in self.enables:
145                     condition.is_active = True
146                 for condition in self.disables:
147                     condition.is_active = False
148
149     def adopt_from(self, todos: list[Todo]) -> None:
150         """As far as possible, fill unsatisfied dependencies from todos."""
151         for process_id in self.unsatisfied_dependencies:
152             for todo in [t for t in todos if t.process.id_ == process_id
153                          and t not in self.children]:
154                 self.add_child(todo)
155                 break
156
157     def make_missing_children(self, db_conn: DatabaseConnection) -> None:
158         """Fill unsatisfied dependencies with new Todos."""
159         for process_id in self.unsatisfied_dependencies:
160             process = Process.by_id(db_conn, process_id)
161             todo = self.__class__(None, process, False, self.date)
162             todo.save(db_conn)
163             self.add_child(todo)
164
165     def get_step_tree(self, seen_todos: set[int],
166                       seen_conditions: set[int]) -> TodoStepsNode:
167         """Return tree of depended-on Todos and Conditions."""
168
169         def make_node(step: Todo | Condition) -> TodoStepsNode:
170             assert isinstance(step.id_, int)
171             is_todo = isinstance(step, Todo)
172             children = []
173             if is_todo:
174                 assert isinstance(step, Todo)
175                 seen = step.id_ in seen_todos
176                 seen_todos.add(step.id_)
177                 potentially_enabled = set()
178                 for child in step.children:
179                     for condition in child.enables:
180                         potentially_enabled.add(condition.id_)
181                     children += [make_node(child)]
182                 for condition in [c for c in step.conditions
183                                   if (not c.is_active)
184                                   and (c.id_ not in potentially_enabled)]:
185                     children += [make_node(condition)]
186             else:
187                 seen = step.id_ in seen_conditions
188                 seen_conditions.add(step.id_)
189             return TodoStepsNode(step, is_todo, children, seen, False)
190
191         node = make_node(self)
192         return node
193
194     def get_undone_steps_tree(self) -> TodoStepsNode:
195         """Return tree of depended-on undone Todos and Conditions."""
196
197         def walk_tree(node: TodoStepsNode) -> None:
198             if isinstance(node.item, Todo) and node.item.is_done:
199                 node.hide = True
200             for child in node.children:
201                 walk_tree(child)
202
203         seen_todos: set[int] = set()
204         seen_conditions: set[int] = set()
205         step_tree = self.get_step_tree(seen_todos, seen_conditions)
206         walk_tree(step_tree)
207         return step_tree
208
209     def get_done_steps_tree(self) -> list[TodoStepsNode]:
210         """Return tree of depended-on done Todos."""
211
212         def make_nodes(node: TodoStepsNode) -> list[TodoStepsNode]:
213             children: list[TodoStepsNode] = []
214             if not isinstance(node.item, Todo):
215                 return children
216             for child in node.children:
217                 children += make_nodes(child)
218             if node.item.is_done:
219                 node.children = children
220                 return [node]
221             return children
222
223         seen_todos: set[int] = set()
224         seen_conditions: set[int] = set()
225         step_tree = self.get_step_tree(seen_todos, seen_conditions)
226         nodes = make_nodes(step_tree)
227         return nodes
228
229     def add_child(self, child: Todo) -> None:
230         """Add child to self.children, avoid recursion, update parenthoods."""
231
232         def walk_steps(node: Todo) -> None:
233             if node.id_ == self.id_:
234                 raise BadFormatException('bad child choice causes recursion')
235             for child in node.children:
236                 walk_steps(child)
237
238         if self.id_ is None:
239             raise HandledException('Can only add children to saved Todos.')
240         if child.id_ is None:
241             raise HandledException('Can only add saved children to Todos.')
242         if child in self.children:
243             raise BadFormatException('cannot adopt same child twice')
244         walk_steps(child)
245         self.children += [child]
246         child.parents += [self]
247
248     def remove_child(self, child: Todo) -> None:
249         """Remove child from self.children, update counter relations."""
250         if child not in self.children:
251             raise HandledException('Cannot remove un-parented child.')
252         self.children.remove(child)
253         child.parents.remove(self)
254
255     def save(self, db_conn: DatabaseConnection) -> None:
256         """Write self and children to DB and its cache."""
257         if self.process.id_ is None:
258             raise NotFoundException('Process of Todo without ID (not saved?)')
259         self.save_core(db_conn)
260         assert isinstance(self.id_, int)
261         db_conn.rewrite_relations('todo_children', 'child', self.id_,
262                                   [[p.id_] for p in self.parents])
263         db_conn.rewrite_relations('todo_children', 'parent', self.id_,
264                                   [[c.id_] for c in self.children])
265         db_conn.rewrite_relations('todo_conditions', 'todo', self.id_,
266                                   [[c.id_] for c in self.conditions])
267         db_conn.rewrite_relations('todo_enables', 'todo', self.id_,
268                                   [[c.id_] for c in self.enables])
269         db_conn.rewrite_relations('todo_disables', 'todo', self.id_,
270                                   [[c.id_] for c in self.disables])
271
272     def remove(self, db_conn: DatabaseConnection) -> None:
273         """Remove from DB, including relations."""
274         assert isinstance(self.id_, int)
275         children_to_remove = self.children[:]
276         parents_to_remove = self.parents[:]
277         for child in children_to_remove:
278             self.remove_child(child)
279         for parent in parents_to_remove:
280             parent.remove_child(self)
281         db_conn.delete_where('todo_children', 'parent', self.id_)
282         db_conn.delete_where('todo_children', 'child', self.id_)
283         db_conn.delete_where('todo_conditions', 'todo', self.id_)
284         db_conn.delete_where('todo_enables', 'todo', self.id_)
285         db_conn.delete_where('todo_disables', 'todo', self.id_)
286         super().remove(db_conn)