home · contact · privacy
Minor fixes.
[plomtask] / plomtask / todos.py
index 348dbddcbb96338ba1f1e10da50e094e36b92f63..ff6cdcb1e96144743f828ca5fea14072988be10a 100644 (file)
@@ -1,15 +1,25 @@
 """Actionables."""
 from __future__ import annotations
 """Actionables."""
 from __future__ import annotations
+from dataclasses import dataclass
 from typing import Any
 from sqlite3 import Row
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.processes import Process
 from typing import Any
 from sqlite3 import Row
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.processes import Process
-from plomtask.conditions import Condition
+from plomtask.conditions import Condition, ConditionsRelations
 from plomtask.exceptions import (NotFoundException, BadFormatException,
                                  HandledException)
 
 
 from plomtask.exceptions import (NotFoundException, BadFormatException,
                                  HandledException)
 
 
-class Todo(BaseModel):
+@dataclass
+class TodoStepsNode:
+    """Collects what's useful to know for Todo/Condition tree display."""
+    item: Todo | Condition
+    is_todo: bool
+    children: list[TodoStepsNode]
+    seen: bool
+
+
+class Todo(BaseModel, ConditionsRelations):
     """Individual actionable."""
 
     # pylint: disable=too-many-instance-attributes
     """Individual actionable."""
 
     # pylint: disable=too-many-instance-attributes
@@ -26,12 +36,12 @@ class Todo(BaseModel):
         self.children: list[Todo] = []
         self.parents: list[Todo] = []
         self.conditions: list[Condition] = []
         self.children: list[Todo] = []
         self.parents: list[Todo] = []
         self.conditions: list[Condition] = []
-        self.fulfills: list[Condition] = []
-        self.undoes: list[Condition] = []
+        self.enables: list[Condition] = []
+        self.disables: list[Condition] = []
         if not self.id_:
             self.conditions = process.conditions[:]
         if not self.id_:
             self.conditions = process.conditions[:]
-            self.fulfills = process.fulfills[:]
-            self.undoes = process.undoes[:]
+            self.enables = process.enables[:]
+            self.disables = process.disables[:]
 
     @classmethod
     def from_table_row(cls, db_conn: DatabaseConnection,
 
     @classmethod
     def from_table_row(cls, db_conn: DatabaseConnection,
@@ -47,30 +57,24 @@ class Todo(BaseModel):
         return todo
 
     @classmethod
         return todo
 
     @classmethod
-    def by_id(cls, db_conn: DatabaseConnection, id_: int | None) -> Todo:
+    def by_id(cls, db_conn: DatabaseConnection, id_: int) -> Todo:
         """Get Todo of .id_=id_ and children (from DB cache if possible)."""
         """Get Todo of .id_=id_ and children (from DB cache if possible)."""
-        if id_:
-            todo, from_cache = super()._by_id(db_conn, id_)
-        else:
-            todo, from_cache = None, False
+        todo, from_cache = super()._by_id(db_conn, id_)
         if todo is None:
             raise NotFoundException(f'Todo of ID not found: {id_}')
         if not from_cache:
         if todo is None:
             raise NotFoundException(f'Todo of ID not found: {id_}')
         if not from_cache:
-            for row in db_conn.exec('SELECT child FROM todo_children '
-                                    'WHERE parent = ?', (id_,)):
-                todo.children += [cls.by_id(db_conn, row[0])]
-            for row in db_conn.exec('SELECT parent FROM todo_children '
-                                    'WHERE child = ?', (id_,)):
-                todo.parents += [cls.by_id(db_conn, row[0])]
-            for row in db_conn.exec('SELECT condition FROM todo_conditions '
-                                    'WHERE todo = ?', (id_,)):
-                todo.conditions += [Condition.by_id(db_conn, row[0])]
-            for row in db_conn.exec('SELECT condition FROM todo_fulfills '
-                                    'WHERE todo = ?', (id_,)):
-                todo.fulfills += [Condition.by_id(db_conn, row[0])]
-            for row in db_conn.exec('SELECT condition FROM todo_undoes '
-                                    'WHERE todo = ?', (id_,)):
-                todo.undoes += [Condition.by_id(db_conn, row[0])]
+            for t_id in db_conn.column_where('todo_children', 'child',
+                                             'parent', id_):
+                todo.children += [cls.by_id(db_conn, t_id)]
+            for t_id in db_conn.column_where('todo_children', 'parent',
+                                             'child', id_):
+                todo.parents += [cls.by_id(db_conn, t_id)]
+            for name in ('conditions', 'enables', 'disables'):
+                table = f'todo_{name}'
+                for cond_id in db_conn.column_where(table, 'condition',
+                                                    'todo', todo.id_):
+                    target = getattr(todo, name)
+                    target += [Condition.by_id(db_conn, cond_id)]
         assert isinstance(todo, Todo)
         return todo
 
         assert isinstance(todo, Todo)
         return todo
 
@@ -78,33 +82,34 @@ class Todo(BaseModel):
     def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
         """Collect all Todos for Day of date."""
         todos = []
     def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
         """Collect all Todos for Day of date."""
         todos = []
-        for row in db_conn.exec('SELECT id FROM todos WHERE day = ?', (date,)):
-            todos += [cls.by_id(db_conn, row[0])]
+        for id_ in db_conn.column_where('todos', 'id', 'day', date):
+            todos += [cls.by_id(db_conn, id_)]
         return todos
 
         return todos
 
+    @staticmethod
+    def _x_ablers_for_at(db_conn: DatabaseConnection, name: str,
+                         cond: Condition, date: str) -> list[Todo]:
+        """Collect all Todos of day that [name] condition."""
+        assert isinstance(cond.id_, int)
+        x_ablers = []
+        table = f'todo_{name}'
+        for id_ in db_conn.column_where(table, 'todo', 'condition', cond.id_):
+            todo = Todo.by_id(db_conn, id_)
+            if todo.date == date:
+                x_ablers += [todo]
+        return x_ablers
+
     @classmethod
     @classmethod
-    def enablers_for_at(cls, db_conn: DatabaseConnection, condition: Condition,
-                        date: str) -> list[Todo]:
+    def enablers_for_at(cls, db_conn: DatabaseConnection,
+                        condition: Condition, date: str) -> list[Todo]:
         """Collect all Todos of day that enable condition."""
         """Collect all Todos of day that enable condition."""
-        enablers = []
-        for row in db_conn.exec('SELECT todo FROM todo_fulfills '
-                                'WHERE condition = ?', (condition.id_,)):
-            todo = cls.by_id(db_conn, row[0])
-            if todo.date == date:
-                enablers += [todo]
-        return enablers
+        return cls._x_ablers_for_at(db_conn, 'enables', condition, date)
 
     @classmethod
     def disablers_for_at(cls, db_conn: DatabaseConnection,
                          condition: Condition, date: str) -> list[Todo]:
         """Collect all Todos of day that disable condition."""
 
     @classmethod
     def disablers_for_at(cls, db_conn: DatabaseConnection,
                          condition: Condition, date: str) -> list[Todo]:
         """Collect all Todos of day that disable condition."""
-        disablers = []
-        for row in db_conn.exec('SELECT todo FROM todo_undoes '
-                                'WHERE condition = ?', (condition.id_,)):
-            todo = cls.by_id(db_conn, row[0])
-            if todo.date == date:
-                disablers += [todo]
-        return disablers
+        return cls._x_ablers_for_at(db_conn, 'disables', condition, date)
 
     @property
     def is_doable(self) -> bool:
 
     @property
     def is_doable(self) -> bool:
@@ -134,36 +139,50 @@ class Todo(BaseModel):
         if self._is_done != value:
             self._is_done = value
             if value is True:
         if self._is_done != value:
             self._is_done = value
             if value is True:
-                for condition in self.fulfills:
+                for condition in self.enables:
                     condition.is_active = True
                     condition.is_active = True
-                for condition in self.undoes:
+                for condition in self.disables:
                     condition.is_active = False
 
                     condition.is_active = False
 
-    def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
-        """Set self.undoes to Conditions identified by ids."""
-        self.set_conditions(db_conn, ids, 'undoes')
-
-    def set_fulfills(self, db_conn: DatabaseConnection,
-                     ids: list[int]) -> None:
-        """Set self.fulfills to Conditions identified by ids."""
-        self.set_conditions(db_conn, ids, 'fulfills')
-
-    def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
-                       target: str = 'conditions') -> None:
-        """Set self.[target] to Conditions identified by ids."""
-        target_list = getattr(self, target)
-        while len(target_list) > 0:
-            target_list.pop()
-        for id_ in ids:
-            target_list += [Condition.by_id(db_conn, id_)]
+    def get_step_tree(self, seen_todos: set[int],
+                      seen_conditions: set[int]) -> TodoStepsNode:
+        """Return tree of depended-on Todos and Conditions."""
+
+        def make_node(step: Todo | Condition) -> TodoStepsNode:
+            assert isinstance(step.id_, int)
+            is_todo = isinstance(step, Todo)
+            children = []
+            if is_todo:
+                assert isinstance(step, Todo)
+                seen = step.id_ in seen_todos
+                seen_todos.add(step.id_)
+                potentially_enabled = set()
+                for child in step.children:
+                    for condition in child.enables:
+                        potentially_enabled.add(condition)
+                    children += [make_node(child)]
+                for condition in [c for c in step.conditions
+                                  if (not c.is_active)
+                                  and (c not in potentially_enabled)]:
+                    children += [make_node(condition)]
+            else:
+                assert isinstance(step, Condition)
+                seen = step.id_ in seen_conditions
+                seen_conditions.add(step.id_)
+            return TodoStepsNode(step, is_todo, children, seen)
+
+        node = make_node(self)
+        return node
 
     def add_child(self, child: Todo) -> None:
 
     def add_child(self, child: Todo) -> None:
-        """Add child to self.children, guard against recursion"""
+        """Add child to self.children, avoid recursion, update parenthoods."""
+
         def walk_steps(node: Todo) -> None:
             if node.id_ == self.id_:
                 raise BadFormatException('bad child choice causes recursion')
             for child in node.children:
                 walk_steps(child)
         def walk_steps(node: Todo) -> None:
             if node.id_ == self.id_:
                 raise BadFormatException('bad child choice causes recursion')
             for child in node.children:
                 walk_steps(child)
+
         if self.id_ is None:
             raise HandledException('Can only add children to saved Todos.')
         if child.id_ is None:
         if self.id_ is None:
             raise HandledException('Can only add children to saved Todos.')
         if child.id_ is None:
@@ -174,6 +193,13 @@ class Todo(BaseModel):
         self.children += [child]
         child.parents += [self]
 
         self.children += [child]
         child.parents += [self]
 
+    def remove_child(self, child: Todo) -> None:
+        """Remove child from self.children, update counter relations."""
+        if child not in self.children:
+            raise HandledException('Cannot remove un-parented child.')
+        self.children.remove(child)
+        child.parents.remove(self)
+
     def save(self, db_conn: DatabaseConnection) -> None:
         """Write self and children to DB and its cache."""
         if self.process.id_ is None:
     def save(self, db_conn: DatabaseConnection) -> None:
         """Write self and children to DB and its cache."""
         if self.process.id_ is None:
@@ -185,7 +211,7 @@ class Todo(BaseModel):
                                   [[c.id_] for c in self.children])
         db_conn.rewrite_relations('todo_conditions', 'todo', self.id_,
                                   [[c.id_] for c in self.conditions])
                                   [[c.id_] for c in self.children])
         db_conn.rewrite_relations('todo_conditions', 'todo', self.id_,
                                   [[c.id_] for c in self.conditions])
-        db_conn.rewrite_relations('todo_fulfills', 'todo', self.id_,
-                                  [[c.id_] for c in self.fulfills])
-        db_conn.rewrite_relations('todo_undoes', 'todo', self.id_,
-                                  [[c.id_] for c in self.undoes])
+        db_conn.rewrite_relations('todo_enables', 'todo', self.id_,
+                                  [[c.id_] for c in self.enables])
+        db_conn.rewrite_relations('todo_disables', 'todo', self.id_,
+                                  [[c.id_] for c in self.disables])