is_todo: bool
children: list[TodoStepsNode]
seen: bool
+ hide: bool
-class Todo(BaseModel, ConditionsRelations):
+class Todo(BaseModel[int], ConditionsRelations):
"""Individual actionable."""
-
# pylint: disable=too-many-instance-attributes
-
table_name = 'todos'
to_save = ['process_id', 'is_done', 'date']
def __init__(self, id_: int | None, process: Process,
is_done: bool, date: str) -> None:
- self.set_int_id(id_)
+ super().__init__(id_)
self.process = process
self._is_done = is_done
self.date = date
@classmethod
def from_table_row(cls, db_conn: DatabaseConnection,
row: Row | list[Any]) -> Todo:
- """Make from DB row, write to DB cache."""
+ """Make from DB row, with dependencies."""
if row[1] == 0:
raise NotFoundException('calling Todo of '
'unsaved Process')
row_as_list = list(row)
row_as_list[1] = Process.by_id(db_conn, row[1])
todo = super().from_table_row(db_conn, row_as_list)
- assert isinstance(todo, Todo)
- return todo
-
- @classmethod
- def by_id(cls, db_conn: DatabaseConnection, id_: int) -> Todo:
- """Get Todo of .id_=id_ and children (from DB cache if possible)."""
- todo, from_cache = super()._by_id(db_conn, id_)
- if todo is None:
- raise NotFoundException(f'Todo of ID not found: {id_}')
- if not from_cache:
- for t_id in db_conn.column_where('todo_children', 'child',
- 'parent', id_):
- todo.children += [cls.by_id(db_conn, t_id)]
- for t_id in db_conn.column_where('todo_children', 'parent',
- 'child', id_):
- todo.parents += [cls.by_id(db_conn, t_id)]
- for name in ('conditions', 'enables', 'disables'):
- table = f'todo_{name}'
- for cond_id in db_conn.column_where(table, 'condition',
- 'todo', todo.id_):
- target = getattr(todo, name)
- target += [Condition.by_id(db_conn, cond_id)]
- assert isinstance(todo, Todo)
+ assert isinstance(todo.id_, int)
+ for t_id in db_conn.column_where('todo_children', 'child',
+ 'parent', todo.id_):
+ # pylint: disable=no-member
+ todo.children += [cls.by_id(db_conn, t_id)]
+ for t_id in db_conn.column_where('todo_children', 'parent',
+ 'child', todo.id_):
+ # pylint: disable=no-member
+ todo.parents += [cls.by_id(db_conn, t_id)]
+ for name in ('conditions', 'enables', 'disables'):
+ table = f'todo_{name}'
+ assert isinstance(todo.id_, int)
+ for cond_id in db_conn.column_where(table, 'condition',
+ 'todo', todo.id_):
+ target = getattr(todo, name)
+ target += [Condition.by_id(db_conn, cond_id)]
return todo
@classmethod
@property
def unsatisfied_dependencies(self) -> list[int]:
"""Return Process IDs of .process.explicit_steps not in .children."""
- child_process_ids = {c.process.id_ for c in self.children}
- unsatisfied: list[int] = []
- for process_id in [s.step_process_id
- for s in self.process.explicit_steps]:
- if process_id not in child_process_ids:
- unsatisfied += [process_id]
+ unsatisfied = [s.step_process_id for s in self.process.explicit_steps
+ if s.parent_step_id is None]
+ for child_process_id in [c.process.id_ for c in self.children]:
+ if child_process_id in unsatisfied:
+ unsatisfied.remove(child_process_id)
return unsatisfied
@property
def adopt_from(self, todos: list[Todo]) -> None:
"""As far as possible, fill unsatisfied dependencies from todos."""
for process_id in self.unsatisfied_dependencies:
- for todo in [t for t in todos if t.process.id_ == process_id]:
+ for todo in [t for t in todos if t.process.id_ == process_id
+ and t not in self.children]:
self.add_child(todo)
break
+ def make_missing_children(self, db_conn: DatabaseConnection) -> None:
+ """Fill unsatisfied dependencies with new Todos."""
+ for process_id in self.unsatisfied_dependencies:
+ process = Process.by_id(db_conn, process_id)
+ todo = self.__class__(None, process, False, self.date)
+ todo.save(db_conn)
+ self.add_child(todo)
+
def get_step_tree(self, seen_todos: set[int],
seen_conditions: set[int]) -> TodoStepsNode:
"""Return tree of depended-on Todos and Conditions."""
potentially_enabled = set()
for child in step.children:
for condition in child.enables:
- potentially_enabled.add(condition)
+ potentially_enabled.add(condition.id_)
children += [make_node(child)]
for condition in [c for c in step.conditions
if (not c.is_active)
- and (c not in potentially_enabled)]:
+ and (c.id_ not in potentially_enabled)]:
children += [make_node(condition)]
else:
- assert isinstance(step, Condition)
seen = step.id_ in seen_conditions
seen_conditions.add(step.id_)
- return TodoStepsNode(step, is_todo, children, seen)
+ return TodoStepsNode(step, is_todo, children, seen, False)
node = make_node(self)
return node
+ def get_undone_steps_tree(self) -> TodoStepsNode:
+ """Return tree of depended-on undone Todos and Conditions."""
+
+ def walk_tree(node: TodoStepsNode) -> None:
+ if isinstance(node.item, Todo) and node.item.is_done:
+ node.hide = True
+ for child in node.children:
+ walk_tree(child)
+
+ seen_todos: set[int] = set()
+ seen_conditions: set[int] = set()
+ step_tree = self.get_step_tree(seen_todos, seen_conditions)
+ walk_tree(step_tree)
+ return step_tree
+
+ def get_done_steps_tree(self) -> list[TodoStepsNode]:
+ """Return tree of depended-on done Todos."""
+
+ def make_nodes(node: TodoStepsNode) -> list[TodoStepsNode]:
+ children: list[TodoStepsNode] = []
+ if not isinstance(node.item, Todo):
+ return children
+ for child in node.children:
+ children += make_nodes(child)
+ if node.item.is_done:
+ node.children = children
+ return [node]
+ return children
+
+ seen_todos: set[int] = set()
+ seen_conditions: set[int] = set()
+ step_tree = self.get_step_tree(seen_todos, seen_conditions)
+ nodes = make_nodes(step_tree)
+ return nodes
+
def add_child(self, child: Todo) -> None:
"""Add child to self.children, avoid recursion, update parenthoods."""
raise NotFoundException('Process of Todo without ID (not saved?)')
self.save_core(db_conn)
assert isinstance(self.id_, int)
- db_conn.cached_todos[self.id_] = self
+ db_conn.rewrite_relations('todo_children', 'child', self.id_,
+ [[p.id_] for p in self.parents])
db_conn.rewrite_relations('todo_children', 'parent', self.id_,
[[c.id_] for c in self.children])
db_conn.rewrite_relations('todo_conditions', 'todo', self.id_,
[[c.id_] for c in self.enables])
db_conn.rewrite_relations('todo_disables', 'todo', self.id_,
[[c.id_] for c in self.disables])
+
+ def remove(self, db_conn: DatabaseConnection) -> None:
+ """Remove from DB, including relations."""
+ assert isinstance(self.id_, int)
+ children_to_remove = self.children[:]
+ parents_to_remove = self.parents[:]
+ for child in children_to_remove:
+ self.remove_child(child)
+ for parent in parents_to_remove:
+ parent.remove_child(self)
+ db_conn.delete_where('todo_children', 'parent', self.id_)
+ db_conn.delete_where('todo_children', 'child', self.id_)
+ db_conn.delete_where('todo_conditions', 'todo', self.id_)
+ db_conn.delete_where('todo_enables', 'todo', self.id_)
+ db_conn.delete_where('todo_disables', 'todo', self.id_)
+ super().remove(db_conn)