home · contact · privacy
Refactor Todo adoption code.
[plomtask] / plomtask / todos.py
1 """Actionables."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any
5 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.processes import Process
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import (NotFoundException, BadFormatException,
10                                  HandledException)
11
12
13 @dataclass
14 class TodoStepsNode:
15     """Collects what's useful to know for Todo/Condition tree display."""
16     item: Todo | Condition
17     is_todo: bool
18     children: list[TodoStepsNode]
19     seen: bool
20
21
22 class Todo(BaseModel, ConditionsRelations):
23     """Individual actionable."""
24
25     # pylint: disable=too-many-instance-attributes
26
27     table_name = 'todos'
28     to_save = ['process_id', 'is_done', 'date']
29
30     def __init__(self, id_: int | None, process: Process,
31                  is_done: bool, date: str) -> None:
32         self.set_int_id(id_)
33         self.process = process
34         self._is_done = is_done
35         self.date = date
36         self.children: list[Todo] = []
37         self.parents: list[Todo] = []
38         self.conditions: list[Condition] = []
39         self.enables: list[Condition] = []
40         self.disables: list[Condition] = []
41         if not self.id_:
42             self.conditions = self.process.conditions[:]
43             self.enables = self.process.enables[:]
44             self.disables = self.process.disables[:]
45
46     @classmethod
47     def from_table_row(cls, db_conn: DatabaseConnection,
48                        row: Row | list[Any]) -> Todo:
49         """Make from DB row, write to DB cache."""
50         if row[1] == 0:
51             raise NotFoundException('calling Todo of '
52                                     'unsaved Process')
53         row_as_list = list(row)
54         row_as_list[1] = Process.by_id(db_conn, row[1])
55         todo = super().from_table_row(db_conn, row_as_list)
56         assert isinstance(todo, Todo)
57         return todo
58
59     @classmethod
60     def by_id(cls, db_conn: DatabaseConnection, id_: int) -> Todo:
61         """Get Todo of .id_=id_ and children (from DB cache if possible)."""
62         todo, from_cache = super()._by_id(db_conn, id_)
63         if todo is None:
64             raise NotFoundException(f'Todo of ID not found: {id_}')
65         if not from_cache:
66             for t_id in db_conn.column_where('todo_children', 'child',
67                                              'parent', id_):
68                 todo.children += [cls.by_id(db_conn, t_id)]
69             for t_id in db_conn.column_where('todo_children', 'parent',
70                                              'child', id_):
71                 todo.parents += [cls.by_id(db_conn, t_id)]
72             for name in ('conditions', 'enables', 'disables'):
73                 table = f'todo_{name}'
74                 for cond_id in db_conn.column_where(table, 'condition',
75                                                     'todo', todo.id_):
76                     target = getattr(todo, name)
77                     target += [Condition.by_id(db_conn, cond_id)]
78         assert isinstance(todo, Todo)
79         return todo
80
81     @classmethod
82     def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
83         """Collect all Todos for Day of date."""
84         todos = []
85         for id_ in db_conn.column_where('todos', 'id', 'day', date):
86             todos += [cls.by_id(db_conn, id_)]
87         return todos
88
89     @staticmethod
90     def _x_ablers_for_at(db_conn: DatabaseConnection, name: str,
91                          cond: Condition, date: str) -> list[Todo]:
92         """Collect all Todos of day that [name] condition."""
93         assert isinstance(cond.id_, int)
94         x_ablers = []
95         table = f'todo_{name}'
96         for id_ in db_conn.column_where(table, 'todo', 'condition', cond.id_):
97             todo = Todo.by_id(db_conn, id_)
98             if todo.date == date:
99                 x_ablers += [todo]
100         return x_ablers
101
102     @classmethod
103     def enablers_for_at(cls, db_conn: DatabaseConnection,
104                         condition: Condition, date: str) -> list[Todo]:
105         """Collect all Todos of day that enable condition."""
106         return cls._x_ablers_for_at(db_conn, 'enables', condition, date)
107
108     @classmethod
109     def disablers_for_at(cls, db_conn: DatabaseConnection,
110                          condition: Condition, date: str) -> list[Todo]:
111         """Collect all Todos of day that disable condition."""
112         return cls._x_ablers_for_at(db_conn, 'disables', condition, date)
113
114     @property
115     def is_doable(self) -> bool:
116         """Decide whether .is_done settable based on children, Conditions."""
117         for child in self.children:
118             if not child.is_done:
119                 return False
120         for condition in self.conditions:
121             if not condition.is_active:
122                 return False
123         return True
124
125     @property
126     def process_id(self) -> int | str | None:
127         """Return ID of tasked Process."""
128         return self.process.id_
129
130     @property
131     def unsatisfied_dependencies(self) -> list[int]:
132         """Return Process IDs of .process.explicit_steps not in .children."""
133         child_process_ids = {c.process.id_ for c in self.children}
134         unsatisfied: list[int] = []
135         for process_id in [s.step_process_id
136                            for s in self.process.explicit_steps]:
137             if process_id not in child_process_ids:
138                 unsatisfied += [process_id]
139         return unsatisfied
140
141     @property
142     def is_done(self) -> bool:
143         """Wrapper around self._is_done so we can control its setter."""
144         return self._is_done
145
146     @is_done.setter
147     def is_done(self, value: bool) -> None:
148         if value != self.is_done and not self.is_doable:
149             raise BadFormatException('cannot change doneness of undoable Todo')
150         if self._is_done != value:
151             self._is_done = value
152             if value is True:
153                 for condition in self.enables:
154                     condition.is_active = True
155                 for condition in self.disables:
156                     condition.is_active = False
157
158     def adopt_from(self, todos: list[Todo]) -> None:
159         """As far as possible, fill unsatisfied dependencies from todos."""
160         for process_id in self.unsatisfied_dependencies:
161             for todo in [t for t in todos if t.process.id_ == process_id]:
162                 self.add_child(todo)
163                 break
164
165     def get_step_tree(self, seen_todos: set[int],
166                       seen_conditions: set[int]) -> TodoStepsNode:
167         """Return tree of depended-on Todos and Conditions."""
168
169         def make_node(step: Todo | Condition) -> TodoStepsNode:
170             assert isinstance(step.id_, int)
171             is_todo = isinstance(step, Todo)
172             children = []
173             if is_todo:
174                 assert isinstance(step, Todo)
175                 seen = step.id_ in seen_todos
176                 seen_todos.add(step.id_)
177                 potentially_enabled = set()
178                 for child in step.children:
179                     for condition in child.enables:
180                         potentially_enabled.add(condition)
181                     children += [make_node(child)]
182                 for condition in [c for c in step.conditions
183                                   if (not c.is_active)
184                                   and (c not in potentially_enabled)]:
185                     children += [make_node(condition)]
186             else:
187                 assert isinstance(step, Condition)
188                 seen = step.id_ in seen_conditions
189                 seen_conditions.add(step.id_)
190             return TodoStepsNode(step, is_todo, children, seen)
191
192         node = make_node(self)
193         return node
194
195     def add_child(self, child: Todo) -> None:
196         """Add child to self.children, avoid recursion, update parenthoods."""
197
198         def walk_steps(node: Todo) -> None:
199             if node.id_ == self.id_:
200                 raise BadFormatException('bad child choice causes recursion')
201             for child in node.children:
202                 walk_steps(child)
203
204         if self.id_ is None:
205             raise HandledException('Can only add children to saved Todos.')
206         if child.id_ is None:
207             raise HandledException('Can only add saved children to Todos.')
208         if child in self.children:
209             raise BadFormatException('cannot adopt same child twice')
210         walk_steps(child)
211         self.children += [child]
212         child.parents += [self]
213
214     def remove_child(self, child: Todo) -> None:
215         """Remove child from self.children, update counter relations."""
216         if child not in self.children:
217             raise HandledException('Cannot remove un-parented child.')
218         self.children.remove(child)
219         child.parents.remove(self)
220
221     def save(self, db_conn: DatabaseConnection) -> None:
222         """Write self and children to DB and its cache."""
223         if self.process.id_ is None:
224             raise NotFoundException('Process of Todo without ID (not saved?)')
225         self.save_core(db_conn)
226         assert isinstance(self.id_, int)
227         db_conn.cached_todos[self.id_] = self
228         db_conn.rewrite_relations('todo_children', 'parent', self.id_,
229                                   [[c.id_] for c in self.children])
230         db_conn.rewrite_relations('todo_conditions', 'todo', self.id_,
231                                   [[c.id_] for c in self.conditions])
232         db_conn.rewrite_relations('todo_enables', 'todo', self.id_,
233                                   [[c.id_] for c in self.enables])
234         db_conn.rewrite_relations('todo_disables', 'todo', self.id_,
235                                   [[c.id_] for c in self.disables])