home · contact · privacy
Refactor object retrieval and creation.
[plomtask] / plomtask / todos.py
1 """Actionables."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any
5 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.processes import Process
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import (NotFoundException, BadFormatException,
10                                  HandledException)
11
12
13 @dataclass
14 class TodoStepsNode:
15     """Collects what's useful to know for Todo/Condition tree display."""
16     item: Todo | Condition
17     is_todo: bool
18     children: list[TodoStepsNode]
19     seen: bool
20
21
22 class Todo(BaseModel[int], ConditionsRelations):
23     """Individual actionable."""
24     # pylint: disable=too-many-instance-attributes
25     table_name = 'todos'
26     to_save = ['process_id', 'is_done', 'date']
27
28     def __init__(self, id_: int | None, process: Process,
29                  is_done: bool, date: str) -> None:
30         super().__init__(id_)
31         self.process = process
32         self._is_done = is_done
33         self.date = date
34         self.children: list[Todo] = []
35         self.parents: list[Todo] = []
36         self.conditions: list[Condition] = []
37         self.enables: list[Condition] = []
38         self.disables: list[Condition] = []
39         if not self.id_:
40             self.conditions = self.process.conditions[:]
41             self.enables = self.process.enables[:]
42             self.disables = self.process.disables[:]
43
44     @classmethod
45     def from_table_row(cls, db_conn: DatabaseConnection,
46                        row: Row | list[Any]) -> Todo:
47         """Make from DB row, with dependencies."""
48         if row[1] == 0:
49             raise NotFoundException('calling Todo of '
50                                     'unsaved Process')
51         row_as_list = list(row)
52         row_as_list[1] = Process.by_id(db_conn, row[1])
53         todo = super().from_table_row(db_conn, row_as_list)
54         assert isinstance(todo.id_, int)
55         for t_id in db_conn.column_where('todo_children', 'child',
56                                          'parent', todo.id_):
57             # pylint: disable=no-member
58             todo.children += [cls.by_id(db_conn, t_id)]
59         for t_id in db_conn.column_where('todo_children', 'parent',
60                                          'child', todo.id_):
61             # pylint: disable=no-member
62             todo.parents += [cls.by_id(db_conn, t_id)]
63         for name in ('conditions', 'enables', 'disables'):
64             table = f'todo_{name}'
65             assert isinstance(todo.id_, int)
66             for cond_id in db_conn.column_where(table, 'condition',
67                                                 'todo', todo.id_):
68                 target = getattr(todo, name)
69                 target += [Condition.by_id(db_conn, cond_id)]
70         return todo
71
72     @classmethod
73     def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
74         """Collect all Todos for Day of date."""
75         todos = []
76         for id_ in db_conn.column_where('todos', 'id', 'day', date):
77             todos += [cls.by_id(db_conn, id_)]
78         return todos
79
80     @staticmethod
81     def _x_ablers_for_at(db_conn: DatabaseConnection, name: str,
82                          cond: Condition, date: str) -> list[Todo]:
83         """Collect all Todos of day that [name] condition."""
84         assert isinstance(cond.id_, int)
85         x_ablers = []
86         table = f'todo_{name}'
87         for id_ in db_conn.column_where(table, 'todo', 'condition', cond.id_):
88             todo = Todo.by_id(db_conn, id_)
89             if todo.date == date:
90                 x_ablers += [todo]
91         return x_ablers
92
93     @classmethod
94     def enablers_for_at(cls, db_conn: DatabaseConnection,
95                         condition: Condition, date: str) -> list[Todo]:
96         """Collect all Todos of day that enable condition."""
97         return cls._x_ablers_for_at(db_conn, 'enables', condition, date)
98
99     @classmethod
100     def disablers_for_at(cls, db_conn: DatabaseConnection,
101                          condition: Condition, date: str) -> list[Todo]:
102         """Collect all Todos of day that disable condition."""
103         return cls._x_ablers_for_at(db_conn, 'disables', condition, date)
104
105     @property
106     def is_doable(self) -> bool:
107         """Decide whether .is_done settable based on children, Conditions."""
108         for child in self.children:
109             if not child.is_done:
110                 return False
111         for condition in self.conditions:
112             if not condition.is_active:
113                 return False
114         return True
115
116     @property
117     def process_id(self) -> int | str | None:
118         """Return ID of tasked Process."""
119         return self.process.id_
120
121     @property
122     def unsatisfied_dependencies(self) -> list[int]:
123         """Return Process IDs of .process.explicit_steps not in .children."""
124         unsatisfied = [s.step_process_id for s in self.process.explicit_steps
125                        if s.parent_step_id is None]
126         for child_process_id in [c.process.id_ for c in self.children]:
127             if child_process_id in unsatisfied:
128                 unsatisfied.remove(child_process_id)
129         return unsatisfied
130
131     @property
132     def is_done(self) -> bool:
133         """Wrapper around self._is_done so we can control its setter."""
134         return self._is_done
135
136     @is_done.setter
137     def is_done(self, value: bool) -> None:
138         if value != self.is_done and not self.is_doable:
139             raise BadFormatException('cannot change doneness of undoable Todo')
140         if self._is_done != value:
141             self._is_done = value
142             if value is True:
143                 for condition in self.enables:
144                     condition.is_active = True
145                 for condition in self.disables:
146                     condition.is_active = False
147
148     def adopt_from(self, todos: list[Todo]) -> None:
149         """As far as possible, fill unsatisfied dependencies from todos."""
150         for process_id in self.unsatisfied_dependencies:
151             for todo in [t for t in todos if t.process.id_ == process_id
152                          and t not in self.children]:
153                 self.add_child(todo)
154                 break
155
156     def make_missing_children(self, db_conn: DatabaseConnection) -> None:
157         """Fill unsatisfied dependencies with new Todos."""
158         for process_id in self.unsatisfied_dependencies:
159             process = Process.by_id(db_conn, process_id)
160             todo = self.__class__(None, process, False, self.date)
161             todo.save(db_conn)
162             self.add_child(todo)
163
164     def get_step_tree(self, seen_todos: set[int],
165                       seen_conditions: set[int]) -> TodoStepsNode:
166         """Return tree of depended-on Todos and Conditions."""
167
168         def make_node(step: Todo | Condition) -> TodoStepsNode:
169             assert isinstance(step.id_, int)
170             is_todo = isinstance(step, Todo)
171             children = []
172             if is_todo:
173                 assert isinstance(step, Todo)
174                 seen = step.id_ in seen_todos
175                 seen_todos.add(step.id_)
176                 potentially_enabled = set()
177                 for child in step.children:
178                     for condition in child.enables:
179                         potentially_enabled.add(condition)
180                     children += [make_node(child)]
181                 for condition in [c for c in step.conditions
182                                   if (not c.is_active)
183                                   and (c not in potentially_enabled)]:
184                     children += [make_node(condition)]
185             else:
186                 seen = step.id_ in seen_conditions
187                 seen_conditions.add(step.id_)
188             return TodoStepsNode(step, is_todo, children, seen)
189
190         node = make_node(self)
191         return node
192
193     def add_child(self, child: Todo) -> None:
194         """Add child to self.children, avoid recursion, update parenthoods."""
195
196         def walk_steps(node: Todo) -> None:
197             if node.id_ == self.id_:
198                 raise BadFormatException('bad child choice causes recursion')
199             for child in node.children:
200                 walk_steps(child)
201
202         if self.id_ is None:
203             raise HandledException('Can only add children to saved Todos.')
204         if child.id_ is None:
205             raise HandledException('Can only add saved children to Todos.')
206         if child in self.children:
207             raise BadFormatException('cannot adopt same child twice')
208         walk_steps(child)
209         self.children += [child]
210         child.parents += [self]
211
212     def remove_child(self, child: Todo) -> None:
213         """Remove child from self.children, update counter relations."""
214         if child not in self.children:
215             raise HandledException('Cannot remove un-parented child.')
216         self.children.remove(child)
217         child.parents.remove(self)
218
219     def save(self, db_conn: DatabaseConnection) -> None:
220         """Write self and children to DB and its cache."""
221         if self.process.id_ is None:
222             raise NotFoundException('Process of Todo without ID (not saved?)')
223         self.save_core(db_conn)
224         assert isinstance(self.id_, int)
225         db_conn.rewrite_relations('todo_children', 'parent', self.id_,
226                                   [[c.id_] for c in self.children])
227         db_conn.rewrite_relations('todo_conditions', 'todo', self.id_,
228                                   [[c.id_] for c in self.conditions])
229         db_conn.rewrite_relations('todo_enables', 'todo', self.id_,
230                                   [[c.id_] for c in self.enables])
231         db_conn.rewrite_relations('todo_disables', 'todo', self.id_,
232                                   [[c.id_] for c in self.disables])