home · contact · privacy
Add basic sorting features to Condition, Process table views.
[plomtask] / plomtask / todos.py
1 """Actionables."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any
5 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection, BaseModel
7 from plomtask.processes import Process
8 from plomtask.versioned_attributes import VersionedAttribute
9 from plomtask.conditions import Condition, ConditionsRelations
10 from plomtask.exceptions import (NotFoundException, BadFormatException,
11                                  HandledException)
12
13
14 @dataclass
15 class TodoNode:
16     """Collects what's useful to know for Todo/Condition tree display."""
17     todo: Todo
18     seen: bool
19     children: list[TodoNode]
20
21
22 class Todo(BaseModel[int], ConditionsRelations):
23     """Individual actionable."""
24     # pylint: disable=too-many-instance-attributes
25     table_name = 'todos'
26     to_save = ['process_id', 'is_done', 'date', 'comment']
27     to_save_relations = [('todo_conditions', 'todo', 'conditions'),
28                          ('todo_enables', 'todo', 'enables'),
29                          ('todo_disables', 'todo', 'disables'),
30                          ('todo_children', 'parent', 'children'),
31                          ('todo_children', 'child', 'parents')]
32
33     # pylint: disable=too-many-arguments
34     def __init__(self, id_: int | None, process: Process,
35                  is_done: bool, date: str, comment: str = '') -> None:
36         super().__init__(id_)
37         if process.id_ is None:
38             raise NotFoundException('Process of Todo without ID (not saved?)')
39         self.process = process
40         self._is_done = is_done
41         self.date = date
42         self.comment = comment
43         self.children: list[Todo] = []
44         self.parents: list[Todo] = []
45         self.conditions: list[Condition] = []
46         self.enables: list[Condition] = []
47         self.disables: list[Condition] = []
48         if not self.id_:
49             self.conditions = self.process.conditions[:]
50             self.enables = self.process.enables[:]
51             self.disables = self.process.disables[:]
52
53     @classmethod
54     def from_table_row(cls, db_conn: DatabaseConnection,
55                        row: Row | list[Any]) -> Todo:
56         """Make from DB row, with dependencies."""
57         if row[1] == 0:
58             raise NotFoundException('calling Todo of '
59                                     'unsaved Process')
60         row_as_list = list(row)
61         row_as_list[1] = Process.by_id(db_conn, row[1])
62         todo = super().from_table_row(db_conn, row_as_list)
63         assert isinstance(todo.id_, int)
64         for t_id in db_conn.column_where('todo_children', 'child',
65                                          'parent', todo.id_):
66             # pylint: disable=no-member
67             todo.children += [cls.by_id(db_conn, t_id)]
68         for t_id in db_conn.column_where('todo_children', 'parent',
69                                          'child', todo.id_):
70             # pylint: disable=no-member
71             todo.parents += [cls.by_id(db_conn, t_id)]
72         for name in ('conditions', 'enables', 'disables'):
73             table = f'todo_{name}'
74             assert isinstance(todo.id_, int)
75             for cond_id in db_conn.column_where(table, 'condition',
76                                                 'todo', todo.id_):
77                 target = getattr(todo, name)
78                 target += [Condition.by_id(db_conn, cond_id)]
79         return todo
80
81     @classmethod
82     def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
83         """Collect all Todos for Day of date."""
84         todos = []
85         for id_ in db_conn.column_where('todos', 'id', 'day', date):
86             todos += [cls.by_id(db_conn, id_)]
87         return todos
88
89     @property
90     def is_doable(self) -> bool:
91         """Decide whether .is_done settable based on children, Conditions."""
92         for child in self.children:
93             if not child.is_done:
94                 return False
95         for condition in self.conditions:
96             if not condition.is_active:
97                 return False
98         return True
99
100     @property
101     def process_id(self) -> int | str | None:
102         """Needed for super().save to save Processes as attributes."""
103         return self.process.id_
104
105     @property
106     def unsatisfied_dependencies(self) -> list[int]:
107         """Return Process IDs of .process.explicit_steps not in .children."""
108         unsatisfied = [s.step_process_id for s in self.process.explicit_steps
109                        if s.parent_step_id is None]
110         for child_process_id in [c.process.id_ for c in self.children]:
111             if child_process_id in unsatisfied:
112                 unsatisfied.remove(child_process_id)
113         return unsatisfied
114
115     @property
116     def is_done(self) -> bool:
117         """Wrapper around self._is_done so we can control its setter."""
118         return self._is_done
119
120     @is_done.setter
121     def is_done(self, value: bool) -> None:
122         if value != self.is_done and not self.is_doable:
123             raise BadFormatException('cannot change doneness of undoable Todo')
124         if self._is_done != value:
125             self._is_done = value
126             if value is True:
127                 for condition in self.enables:
128                     condition.is_active = True
129                 for condition in self.disables:
130                     condition.is_active = False
131
132     @property
133     def title(self) -> VersionedAttribute:
134         """Shortcut to .process.title."""
135         return self.process.title
136
137     def adopt_from(self, todos: list[Todo]) -> bool:
138         """As far as possible, fill unsatisfied dependencies from todos."""
139         adopted = False
140         for process_id in self.unsatisfied_dependencies:
141             for todo in [t for t in todos if t.process.id_ == process_id
142                          and t not in self.children]:
143                 self.add_child(todo)
144                 adopted = True
145                 break
146         return adopted
147
148     def make_missing_children(self, db_conn: DatabaseConnection) -> None:
149         """Fill unsatisfied dependencies with new Todos."""
150         for process_id in self.unsatisfied_dependencies:
151             process = Process.by_id(db_conn, process_id)
152             todo = self.__class__(None, process, False, self.date)
153             todo.save(db_conn)
154             self.add_child(todo)
155
156     def get_step_tree(self, seen_todos: set[int]) -> TodoNode:
157         """Return tree of depended-on Todos."""
158
159         def make_node(todo: Todo) -> TodoNode:
160             children = []
161             seen = todo.id_ in seen_todos
162             assert isinstance(todo.id_, int)
163             seen_todos.add(todo.id_)
164             for child in todo.children:
165                 children += [make_node(child)]
166             return TodoNode(todo, seen, children)
167
168         return make_node(self)
169
170     def add_child(self, child: Todo) -> None:
171         """Add child to self.children, avoid recursion, update parenthoods."""
172
173         def walk_steps(node: Todo) -> None:
174             if node.id_ == self.id_:
175                 raise BadFormatException('bad child choice causes recursion')
176             for child in node.children:
177                 walk_steps(child)
178
179         if self.id_ is None:
180             raise HandledException('Can only add children to saved Todos.')
181         if child.id_ is None:
182             raise HandledException('Can only add saved children to Todos.')
183         if child in self.children:
184             raise BadFormatException('cannot adopt same child twice')
185         walk_steps(child)
186         self.children += [child]
187         child.parents += [self]
188
189     def remove_child(self, child: Todo) -> None:
190         """Remove child from self.children, update counter relations."""
191         if child not in self.children:
192             raise HandledException('Cannot remove un-parented child.')
193         self.children.remove(child)
194         child.parents.remove(self)
195
196     def remove(self, db_conn: DatabaseConnection) -> None:
197         """Remove from DB, including relations."""
198         children_to_remove = self.children[:]
199         parents_to_remove = self.parents[:]
200         for child in children_to_remove:
201             self.remove_child(child)
202         for parent in parents_to_remove:
203             parent.remove_child(self)
204         super().remove(db_conn)