home · contact · privacy
Fix Process retrieval/display/saving bugs.
[plomtask] / plomtask / processes.py
index 0e8846d3f8591342d87c860f757ec9c2c8c43d07..6249d48445a3488a8f8c4bdd5748b63a5a727ae5 100644 (file)
@@ -1,13 +1,24 @@
 """Collecting Processes and Process-related items."""
 from __future__ import annotations
 """Collecting Processes and Process-related items."""
 from __future__ import annotations
-from typing import Any, Set
+from dataclasses import dataclass
+from typing import Set
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.misc import VersionedAttribute
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.misc import VersionedAttribute
-from plomtask.conditions import Condition
+from plomtask.conditions import Condition, ConditionsRelations
 from plomtask.exceptions import NotFoundException, BadFormatException
 
 
 from plomtask.exceptions import NotFoundException, BadFormatException
 
 
-class Process(BaseModel):
+@dataclass
+class ProcessStepsNode:
+    """Collects what's useful to know for ProcessSteps tree display."""
+    process: Process
+    parent_id: int | None
+    is_explicit: bool
+    steps: dict[int, ProcessStepsNode]
+    seen: bool
+
+
+class Process(BaseModel, ConditionsRelations):
     """Template for, and metadata for, Todos, and their arrangements."""
     table_name = 'processes'
 
     """Template for, and metadata for, Todos, and their arrangements."""
     table_name = 'processes'
 
@@ -41,27 +52,29 @@ class Process(BaseModel):
               create: bool = False) -> Process:
         """Collect Process, its VersionedAttributes, and its child IDs."""
         process = None
               create: bool = False) -> Process:
         """Collect Process, its VersionedAttributes, and its child IDs."""
         process = None
+        from_cache = False
         if id_:
         if id_:
-            process, _ = super()._by_id(db_conn, id_)
-        if not process:
-            if not create:
-                raise NotFoundException(f'Process not found of id: {id_}')
-            process = Process(id_)
-        if isinstance(process.id_, int):
-            for name in ('title', 'description', 'effort'):
-                table = f'process_{name}s'
-                for row in db_conn.row_where(table, 'parent', process.id_):
-                    getattr(process, name).history_from_row(row)
-            for row in db_conn.row_where('process_steps', 'owner',
-                                         process.id_):
-                step = ProcessStep.from_table_row(db_conn, row)
-                process.explicit_steps += [step]
-            for name in ('conditions', 'enables', 'disables'):
-                table = f'process_{name}'
-                for cond_id in db_conn.column_where(table, 'condition',
-                                                    'process', process.id_):
-                    target = getattr(process, name)
-                    target += [Condition.by_id(db_conn, cond_id)]
+            process, from_cache = super()._by_id(db_conn, id_)
+        if not from_cache:
+            if not process:
+                if not create:
+                    raise NotFoundException(f'Process not found of id: {id_}')
+                process = Process(id_)
+            if isinstance(process.id_, int):
+                for name in ('title', 'description', 'effort'):
+                    table = f'process_{name}s'
+                    for row in db_conn.row_where(table, 'parent', process.id_):
+                        getattr(process, name).history_from_row(row)
+                for row in db_conn.row_where('process_steps', 'owner',
+                                             process.id_):
+                    step = ProcessStep.from_table_row(db_conn, row)
+                    process.explicit_steps += [step]
+                for name in ('conditions', 'enables', 'disables'):
+                    table = f'process_{name}'
+                    for c_id in db_conn.column_where(table, 'condition',
+                                                     'process', process.id_):
+                        target = getattr(process, name)
+                        target += [Condition.by_id(db_conn, c_id)]
         assert isinstance(process, Process)
         return process
 
         assert isinstance(process, Process)
         return process
 
@@ -76,29 +89,30 @@ class Process(BaseModel):
         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
 
     def get_steps(self, db_conn: DatabaseConnection, external_owner:
         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
 
     def get_steps(self, db_conn: DatabaseConnection, external_owner:
-                  Process | None = None) -> dict[int, dict[str, object]]:
+                  Process | None = None) -> dict[int, ProcessStepsNode]:
         """Return tree of depended-on explicit and implicit ProcessSteps."""
 
         """Return tree of depended-on explicit and implicit ProcessSteps."""
 
-        def make_node(step: ProcessStep) -> dict[str, object]:
+        def make_node(step: ProcessStep) -> ProcessStepsNode:
             is_explicit = False
             if external_owner is not None:
                 is_explicit = step.owner_id == external_owner.id_
             process = self.__class__.by_id(db_conn, step.step_process_id)
             step_steps = process.get_steps(db_conn, external_owner)
             is_explicit = False
             if external_owner is not None:
                 is_explicit = step.owner_id == external_owner.id_
             process = self.__class__.by_id(db_conn, step.step_process_id)
             step_steps = process.get_steps(db_conn, external_owner)
-            return {'process': process, 'parent_id': step.parent_step_id,
-                    'is_explicit': is_explicit, 'steps': step_steps}
+            return ProcessStepsNode(process, step.parent_step_id,
+                                    is_explicit, step_steps, False)
 
 
-        def walk_steps(node_id: int, node: dict[str, Any]) -> None:
+        def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
             explicit_children = [s for s in self.explicit_steps
                                  if s.parent_step_id == node_id]
             for child in explicit_children:
             explicit_children = [s for s in self.explicit_steps
                                  if s.parent_step_id == node_id]
             for child in explicit_children:
-                node['steps'][child.id_] = make_node(child)
-            node['seen'] = node_id in seen_step_ids
+                assert isinstance(child.id_, int)
+                node.steps[child.id_] = make_node(child)
+            node.seen = node_id in seen_step_ids
             seen_step_ids.add(node_id)
             seen_step_ids.add(node_id)
-            for id_, step in node['steps'].items():
+            for id_, step in node.steps.items():
                 walk_steps(id_, step)
 
                 walk_steps(id_, step)
 
-        steps: dict[int, dict[str, object]] = {}
+        steps: dict[int, ProcessStepsNode] = {}
         seen_step_ids: Set[int] = set()
         if external_owner is None:
             external_owner = self
         seen_step_ids: Set[int] = set()
         if external_owner is None:
             external_owner = self
@@ -110,25 +124,6 @@ class Process(BaseModel):
             walk_steps(step_id, step_node)
         return steps
 
             walk_steps(step_id, step_node)
         return steps
 
-    def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
-                       trgt: str = 'conditions') -> None:
-        """Set self.[target] to Conditions identified by ids."""
-        trgt_list = getattr(self, trgt)
-        while len(trgt_list) > 0:
-            trgt_list.pop()
-        for id_ in ids:
-            trgt_list += [Condition.by_id(db_conn, id_)]
-
-    def set_enables(self, db_conn: DatabaseConnection,
-                    ids: list[int]) -> None:
-        """Set self.enables to Conditions identified by ids."""
-        self.set_conditions(db_conn, ids, 'enables')
-
-    def set_disables(self, db_conn: DatabaseConnection,
-                     ids: list[int]) -> None:
-        """Set self.disables to Conditions identified by ids."""
-        self.set_conditions(db_conn, ids, 'disables')
-
     def _add_step(self,
                   db_conn: DatabaseConnection,
                   id_: int | None,
     def _add_step(self,
                   db_conn: DatabaseConnection,
                   id_: int | None,
@@ -143,12 +138,14 @@ class Process(BaseModel):
         just deleted under its feet), or if the parent step would not be
         owned by the current Process.
         """
         just deleted under its feet), or if the parent step would not be
         owned by the current Process.
         """
+
         def walk_steps(node: ProcessStep) -> None:
             if node.step_process_id == self.id_:
                 raise BadFormatException('bad step selection causes recursion')
             step_process = self.by_id(db_conn, node.step_process_id)
             for step in step_process.explicit_steps:
                 walk_steps(step)
         def walk_steps(node: ProcessStep) -> None:
             if node.step_process_id == self.id_:
                 raise BadFormatException('bad step selection causes recursion')
             step_process = self.by_id(db_conn, node.step_process_id)
             for step in step_process.explicit_steps:
                 walk_steps(step)
+
         if parent_step_id is not None:
             try:
                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
         if parent_step_id is not None:
             try:
                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)