2 from __future__ import annotations
4 from sqlite3 import Row
5 from plomtask.db import DatabaseConnection, BaseModel
6 from plomtask.processes import Process
7 from plomtask.conditions import Condition
8 from plomtask.exceptions import (NotFoundException, BadFormatException,
12 class Todo(BaseModel):
13 """Individual actionable."""
15 # pylint: disable=too-many-instance-attributes
18 to_save = ['process_id', 'is_done', 'date']
20 def __init__(self, id_: int | None, process: Process,
21 is_done: bool, date: str) -> None:
23 self.process = process
24 self._is_done = is_done
26 self.children: list[Todo] = []
27 self.parents: list[Todo] = []
28 self.conditions: list[Condition] = []
29 self.fulfills: list[Condition] = []
30 self.undoes: list[Condition] = []
32 self.conditions = process.conditions[:]
33 self.fulfills = process.fulfills[:]
34 self.undoes = process.undoes[:]
37 def from_table_row(cls, db_conn: DatabaseConnection,
38 row: Row | list[Any]) -> Todo:
39 """Make from DB row, write to DB cache."""
41 raise NotFoundException('calling Todo of '
43 row_as_list = list(row)
44 row_as_list[1] = Process.by_id(db_conn, row[1])
45 todo = super().from_table_row(db_conn, row_as_list)
46 assert isinstance(todo, Todo)
50 def by_id(cls, db_conn: DatabaseConnection, id_: int | None) -> Todo:
51 """Get Todo of .id_=id_ and children (from DB cache if possible)."""
53 todo, from_cache = super()._by_id(db_conn, id_)
55 todo, from_cache = None, False
57 raise NotFoundException(f'Todo of ID not found: {id_}')
59 for row in db_conn.exec('SELECT child FROM todo_children '
60 'WHERE parent = ?', (id_,)):
61 todo.children += [cls.by_id(db_conn, row[0])]
62 for row in db_conn.exec('SELECT parent FROM todo_children '
63 'WHERE child = ?', (id_,)):
64 todo.parents += [cls.by_id(db_conn, row[0])]
65 for row in db_conn.exec('SELECT condition FROM todo_conditions '
66 'WHERE todo = ?', (id_,)):
67 todo.conditions += [Condition.by_id(db_conn, row[0])]
68 for row in db_conn.exec('SELECT condition FROM todo_fulfills '
69 'WHERE todo = ?', (id_,)):
70 todo.fulfills += [Condition.by_id(db_conn, row[0])]
71 for row in db_conn.exec('SELECT condition FROM todo_undoes '
72 'WHERE todo = ?', (id_,)):
73 todo.undoes += [Condition.by_id(db_conn, row[0])]
74 assert isinstance(todo, Todo)
78 def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
79 """Collect all Todos for Day of date."""
81 for row in db_conn.exec('SELECT id FROM todos WHERE day = ?', (date,)):
82 todos += [cls.by_id(db_conn, row[0])]
86 def enablers_for_at(cls, db_conn: DatabaseConnection, condition: Condition,
87 date: str) -> list[Todo]:
88 """Collect all Todos of day that enable condition."""
90 for row in db_conn.exec('SELECT todo FROM todo_fulfills '
91 'WHERE condition = ?', (condition.id_,)):
92 todo = cls.by_id(db_conn, row[0])
98 def disablers_for_at(cls, db_conn: DatabaseConnection,
99 condition: Condition, date: str) -> list[Todo]:
100 """Collect all Todos of day that disable condition."""
102 for row in db_conn.exec('SELECT todo FROM todo_undoes '
103 'WHERE condition = ?', (condition.id_,)):
104 todo = cls.by_id(db_conn, row[0])
105 if todo.date == date:
110 def is_doable(self) -> bool:
111 """Decide whether .is_done settable based on children, Conditions."""
112 for child in self.children:
113 if not child.is_done:
115 for condition in self.conditions:
116 if not condition.is_active:
121 def process_id(self) -> int | str | None:
122 """Return ID of tasked Process."""
123 return self.process.id_
126 def is_done(self) -> bool:
127 """Wrapper around self._is_done so we can control its setter."""
131 def is_done(self, value: bool) -> None:
132 if value != self.is_done and not self.is_doable:
133 raise BadFormatException('cannot change doneness of undoable Todo')
134 if self._is_done != value:
135 self._is_done = value
137 for condition in self.fulfills:
138 condition.is_active = True
139 for condition in self.undoes:
140 condition.is_active = False
142 def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
143 """Set self.undoes to Conditions identified by ids."""
144 self.set_conditions(db_conn, ids, 'undoes')
146 def set_fulfills(self, db_conn: DatabaseConnection,
147 ids: list[int]) -> None:
148 """Set self.fulfills to Conditions identified by ids."""
149 self.set_conditions(db_conn, ids, 'fulfills')
151 def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
152 target: str = 'conditions') -> None:
153 """Set self.[target] to Conditions identified by ids."""
154 target_list = getattr(self, target)
155 while len(target_list) > 0:
158 target_list += [Condition.by_id(db_conn, id_)]
160 def add_child(self, child: Todo) -> None:
161 """Add child to self.children, guard against recursion"""
162 def walk_steps(node: Todo) -> None:
163 if node.id_ == self.id_:
164 raise BadFormatException('bad child choice causes recursion')
165 for child in node.children:
168 raise HandledException('Can only add children to saved Todos.')
169 if child.id_ is None:
170 raise HandledException('Can only add saved children to Todos.')
171 if child in self.children:
172 raise BadFormatException('cannot adopt same child twice')
174 self.children += [child]
175 child.parents += [self]
177 def save(self, db_conn: DatabaseConnection) -> None:
178 """Write self and children to DB and its cache."""
179 if self.process.id_ is None:
180 raise NotFoundException('Process of Todo without ID (not saved?)')
181 self.save_core(db_conn)
182 assert isinstance(self.id_, int)
183 db_conn.cached_todos[self.id_] = self
184 db_conn.rewrite_relations('todo_children', 'parent', self.id_,
185 [[c.id_] for c in self.children])
186 db_conn.rewrite_relations('todo_conditions', 'todo', self.id_,
187 [[c.id_] for c in self.conditions])
188 db_conn.rewrite_relations('todo_fulfills', 'todo', self.id_,
189 [[c.id_] for c in self.fulfills])
190 db_conn.rewrite_relations('todo_undoes', 'todo', self.id_,
191 [[c.id_] for c in self.undoes])