home · contact · privacy
Simplify Todo steps tree calculation/display.
[plomtask] / plomtask / todos.py
1 """Actionables."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any
5 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.processes import Process
8 from plomtask.conditions import Condition, ConditionsRelations
9 from plomtask.exceptions import (NotFoundException, BadFormatException,
10                                  HandledException)
11
12
13 @dataclass
14 class TodoNode:
15     """Collects what's useful to know for Todo/Condition tree display."""
16     todo: Todo
17     seen: bool
18     children: list[TodoNode]
19
20
21 class Todo(BaseModel[int], ConditionsRelations):
22     """Individual actionable."""
23     # pylint: disable=too-many-instance-attributes
24     table_name = 'todos'
25     to_save = ['process_id', 'is_done', 'date']
26     to_save_relations = [('todo_conditions', 'todo', 'conditions'),
27                          ('todo_enables', 'todo', 'enables'),
28                          ('todo_disables', 'todo', 'disables'),
29                          ('todo_children', 'parent', 'children'),
30                          ('todo_children', 'child', 'parents')]
31
32     def __init__(self, id_: int | None, process: Process,
33                  is_done: bool, date: str) -> None:
34         super().__init__(id_)
35         if process.id_ is None:
36             raise NotFoundException('Process of Todo without ID (not saved?)')
37         self.process = process
38         self._is_done = is_done
39         self.date = date
40         self.children: list[Todo] = []
41         self.parents: list[Todo] = []
42         self.conditions: list[Condition] = []
43         self.enables: list[Condition] = []
44         self.disables: list[Condition] = []
45         if not self.id_:
46             self.conditions = self.process.conditions[:]
47             self.enables = self.process.enables[:]
48             self.disables = self.process.disables[:]
49
50     @classmethod
51     def from_table_row(cls, db_conn: DatabaseConnection,
52                        row: Row | list[Any]) -> Todo:
53         """Make from DB row, with dependencies."""
54         if row[1] == 0:
55             raise NotFoundException('calling Todo of '
56                                     'unsaved Process')
57         row_as_list = list(row)
58         row_as_list[1] = Process.by_id(db_conn, row[1])
59         todo = super().from_table_row(db_conn, row_as_list)
60         assert isinstance(todo.id_, int)
61         for t_id in db_conn.column_where('todo_children', 'child',
62                                          'parent', todo.id_):
63             # pylint: disable=no-member
64             todo.children += [cls.by_id(db_conn, t_id)]
65         for t_id in db_conn.column_where('todo_children', 'parent',
66                                          'child', todo.id_):
67             # pylint: disable=no-member
68             todo.parents += [cls.by_id(db_conn, t_id)]
69         for name in ('conditions', 'enables', 'disables'):
70             table = f'todo_{name}'
71             assert isinstance(todo.id_, int)
72             for cond_id in db_conn.column_where(table, 'condition',
73                                                 'todo', todo.id_):
74                 target = getattr(todo, name)
75                 target += [Condition.by_id(db_conn, cond_id)]
76         return todo
77
78     @classmethod
79     def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
80         """Collect all Todos for Day of date."""
81         todos = []
82         for id_ in db_conn.column_where('todos', 'id', 'day', date):
83             todos += [cls.by_id(db_conn, id_)]
84         return todos
85
86     @property
87     def is_doable(self) -> bool:
88         """Decide whether .is_done settable based on children, Conditions."""
89         for child in self.children:
90             if not child.is_done:
91                 return False
92         for condition in self.conditions:
93             if not condition.is_active:
94                 return False
95         return True
96
97     @property
98     def process_id(self) -> int | str | None:
99         """Needed for super().save to save Processes as attributes."""
100         return self.process.id_
101
102     @property
103     def unsatisfied_dependencies(self) -> list[int]:
104         """Return Process IDs of .process.explicit_steps not in .children."""
105         unsatisfied = [s.step_process_id for s in self.process.explicit_steps
106                        if s.parent_step_id is None]
107         for child_process_id in [c.process.id_ for c in self.children]:
108             if child_process_id in unsatisfied:
109                 unsatisfied.remove(child_process_id)
110         return unsatisfied
111
112     @property
113     def is_done(self) -> bool:
114         """Wrapper around self._is_done so we can control its setter."""
115         return self._is_done
116
117     @is_done.setter
118     def is_done(self, value: bool) -> None:
119         if value != self.is_done and not self.is_doable:
120             raise BadFormatException('cannot change doneness of undoable Todo')
121         if self._is_done != value:
122             self._is_done = value
123             if value is True:
124                 for condition in self.enables:
125                     condition.is_active = True
126                 for condition in self.disables:
127                     condition.is_active = False
128
129     def adopt_from(self, todos: list[Todo]) -> None:
130         """As far as possible, fill unsatisfied dependencies from todos."""
131         for process_id in self.unsatisfied_dependencies:
132             for todo in [t for t in todos if t.process.id_ == process_id
133                          and t not in self.children]:
134                 self.add_child(todo)
135                 break
136
137     def make_missing_children(self, db_conn: DatabaseConnection) -> None:
138         """Fill unsatisfied dependencies with new Todos."""
139         for process_id in self.unsatisfied_dependencies:
140             process = Process.by_id(db_conn, process_id)
141             todo = self.__class__(None, process, False, self.date)
142             todo.save(db_conn)
143             self.add_child(todo)
144
145     def get_step_tree(self, seen_todos: set[int]) -> TodoNode:
146         """Return tree of depended-on Todos."""
147
148         def make_node(todo: Todo) -> TodoNode:
149             children = []
150             seen = todo.id_ in seen_todos
151             assert isinstance(todo.id_, int)
152             seen_todos.add(todo.id_)
153             for child in todo.children:
154                 children += [make_node(child)]
155             return TodoNode(todo, seen, children)
156
157         return make_node(self)
158
159     def add_child(self, child: Todo) -> None:
160         """Add child to self.children, avoid recursion, update parenthoods."""
161
162         def walk_steps(node: Todo) -> None:
163             if node.id_ == self.id_:
164                 raise BadFormatException('bad child choice causes recursion')
165             for child in node.children:
166                 walk_steps(child)
167
168         if self.id_ is None:
169             raise HandledException('Can only add children to saved Todos.')
170         if child.id_ is None:
171             raise HandledException('Can only add saved children to Todos.')
172         if child in self.children:
173             raise BadFormatException('cannot adopt same child twice')
174         walk_steps(child)
175         self.children += [child]
176         child.parents += [self]
177
178     def remove_child(self, child: Todo) -> None:
179         """Remove child from self.children, update counter relations."""
180         if child not in self.children:
181             raise HandledException('Cannot remove un-parented child.')
182         self.children.remove(child)
183         child.parents.remove(self)
184
185     def remove(self, db_conn: DatabaseConnection) -> None:
186         """Remove from DB, including relations."""
187         children_to_remove = self.children[:]
188         parents_to_remove = self.parents[:]
189         for child in children_to_remove:
190             self.remove_child(child)
191         for parent in parents_to_remove:
192             parent.remove_child(self)
193         super().remove(db_conn)