home · contact · privacy
Refactor Todo adoption code.
[plomtask] / plomtask / todos.py
index ebe35ac56bb78af42a10644c9209b34778527c0e..ed78ca947b359286456e7dcc4c664fcb32961c12 100644 (file)
@@ -1,15 +1,25 @@
 """Actionables."""
 from __future__ import annotations
 """Actionables."""
 from __future__ import annotations
+from dataclasses import dataclass
 from typing import Any
 from sqlite3 import Row
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.processes import Process
 from typing import Any
 from sqlite3 import Row
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.processes import Process
-from plomtask.conditions import Condition
+from plomtask.conditions import Condition, ConditionsRelations
 from plomtask.exceptions import (NotFoundException, BadFormatException,
                                  HandledException)
 
 
 from plomtask.exceptions import (NotFoundException, BadFormatException,
                                  HandledException)
 
 
-class Todo(BaseModel):
+@dataclass
+class TodoStepsNode:
+    """Collects what's useful to know for Todo/Condition tree display."""
+    item: Todo | Condition
+    is_todo: bool
+    children: list[TodoStepsNode]
+    seen: bool
+
+
+class Todo(BaseModel, ConditionsRelations):
     """Individual actionable."""
 
     # pylint: disable=too-many-instance-attributes
     """Individual actionable."""
 
     # pylint: disable=too-many-instance-attributes
@@ -29,9 +39,9 @@ class Todo(BaseModel):
         self.enables: list[Condition] = []
         self.disables: list[Condition] = []
         if not self.id_:
         self.enables: list[Condition] = []
         self.disables: list[Condition] = []
         if not self.id_:
-            self.conditions = process.conditions[:]
-            self.enables = process.enables[:]
-            self.disables = process.disables[:]
+            self.conditions = self.process.conditions[:]
+            self.enables = self.process.enables[:]
+            self.disables = self.process.disables[:]
 
     @classmethod
     def from_table_row(cls, db_conn: DatabaseConnection,
 
     @classmethod
     def from_table_row(cls, db_conn: DatabaseConnection,
@@ -117,6 +127,17 @@ class Todo(BaseModel):
         """Return ID of tasked Process."""
         return self.process.id_
 
         """Return ID of tasked Process."""
         return self.process.id_
 
+    @property
+    def unsatisfied_dependencies(self) -> list[int]:
+        """Return Process IDs of .process.explicit_steps not in .children."""
+        child_process_ids = {c.process.id_ for c in self.children}
+        unsatisfied: list[int] = []
+        for process_id in [s.step_process_id
+                           for s in self.process.explicit_steps]:
+            if process_id not in child_process_ids:
+                unsatisfied += [process_id]
+        return unsatisfied
+
     @property
     def is_done(self) -> bool:
         """Wrapper around self._is_done so we can control its setter."""
     @property
     def is_done(self) -> bool:
         """Wrapper around self._is_done so we can control its setter."""
@@ -134,32 +155,52 @@ class Todo(BaseModel):
                 for condition in self.disables:
                     condition.is_active = False
 
                 for condition in self.disables:
                     condition.is_active = False
 
-    def set_disables(self, db_conn: DatabaseConnection,
-                     ids: list[int]) -> None:
-        """Set self.disables to Conditions identified by ids."""
-        self.set_conditions(db_conn, ids, 'disables')
-
-    def set_enables(self, db_conn: DatabaseConnection,
-                    ids: list[int]) -> None:
-        """Set self.enables to Conditions identified by ids."""
-        self.set_conditions(db_conn, ids, 'enables')
-
-    def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
-                       target: str = 'conditions') -> None:
-        """Set self.[target] to Conditions identified by ids."""
-        target_list = getattr(self, target)
-        while len(target_list) > 0:
-            target_list.pop()
-        for id_ in ids:
-            target_list += [Condition.by_id(db_conn, id_)]
+    def adopt_from(self, todos: list[Todo]) -> None:
+        """As far as possible, fill unsatisfied dependencies from todos."""
+        for process_id in self.unsatisfied_dependencies:
+            for todo in [t for t in todos if t.process.id_ == process_id]:
+                self.add_child(todo)
+                break
+
+    def get_step_tree(self, seen_todos: set[int],
+                      seen_conditions: set[int]) -> TodoStepsNode:
+        """Return tree of depended-on Todos and Conditions."""
+
+        def make_node(step: Todo | Condition) -> TodoStepsNode:
+            assert isinstance(step.id_, int)
+            is_todo = isinstance(step, Todo)
+            children = []
+            if is_todo:
+                assert isinstance(step, Todo)
+                seen = step.id_ in seen_todos
+                seen_todos.add(step.id_)
+                potentially_enabled = set()
+                for child in step.children:
+                    for condition in child.enables:
+                        potentially_enabled.add(condition)
+                    children += [make_node(child)]
+                for condition in [c for c in step.conditions
+                                  if (not c.is_active)
+                                  and (c not in potentially_enabled)]:
+                    children += [make_node(condition)]
+            else:
+                assert isinstance(step, Condition)
+                seen = step.id_ in seen_conditions
+                seen_conditions.add(step.id_)
+            return TodoStepsNode(step, is_todo, children, seen)
+
+        node = make_node(self)
+        return node
 
     def add_child(self, child: Todo) -> None:
 
     def add_child(self, child: Todo) -> None:
-        """Add child to self.children, guard against recursion"""
+        """Add child to self.children, avoid recursion, update parenthoods."""
+
         def walk_steps(node: Todo) -> None:
             if node.id_ == self.id_:
                 raise BadFormatException('bad child choice causes recursion')
             for child in node.children:
                 walk_steps(child)
         def walk_steps(node: Todo) -> None:
             if node.id_ == self.id_:
                 raise BadFormatException('bad child choice causes recursion')
             for child in node.children:
                 walk_steps(child)
+
         if self.id_ is None:
             raise HandledException('Can only add children to saved Todos.')
         if child.id_ is None:
         if self.id_ is None:
             raise HandledException('Can only add children to saved Todos.')
         if child.id_ is None:
@@ -170,6 +211,13 @@ class Todo(BaseModel):
         self.children += [child]
         child.parents += [self]
 
         self.children += [child]
         child.parents += [self]
 
+    def remove_child(self, child: Todo) -> None:
+        """Remove child from self.children, update counter relations."""
+        if child not in self.children:
+            raise HandledException('Cannot remove un-parented child.')
+        self.children.remove(child)
+        child.parents.remove(self)
+
     def save(self, db_conn: DatabaseConnection) -> None:
         """Write self and children to DB and its cache."""
         if self.process.id_ is None:
     def save(self, db_conn: DatabaseConnection) -> None:
         """Write self and children to DB and its cache."""
         if self.process.id_ is None: