home · contact · privacy
Simplify ProcessStepsNode JSON serialization.
authorChristian Heller <c.heller@plomlompom.de>
Mon, 5 Aug 2024 01:52:36 +0000 (03:52 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Mon, 5 Aug 2024 01:52:36 +0000 (03:52 +0200)
plomtask/http.py
plomtask/misc.py [new file with mode: 0644]
plomtask/processes.py
plomtask/todos.py

index 36a5d78cf608e31e128e5367056173da1305c55b..475f87fd17bd629804d3582a91e1aa88fc436b3f 100644 (file)
@@ -16,7 +16,8 @@ from plomtask.exceptions import (HandledException, BadFormatException,
 from plomtask.db import DatabaseConnection, DatabaseFile, BaseModel
 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
 from plomtask.conditions import Condition
-from plomtask.todos import Todo, TodoOrProcStepNode, DictableNode
+from plomtask.todos import Todo, TodoOrProcStepNode
+from plomtask.misc import DictableNode
 
 TEMPLATES_DIR = 'templates'
 
@@ -385,12 +386,12 @@ class TaskHandler(BaseHTTPRequestHandler):
                                steps_nodes: list[TodoOrProcStepNode]) -> int:
             for process_step_node in process_step_nodes:
                 node_id += 1
-                node = TodoOrProcStepNode(node_id, None,
-                                          process_step_node.process, [])
+                proc = Process.by_id(self.conn,
+                                     process_step_node.step.step_process_id)
+                node = TodoOrProcStepNode(node_id, None, proc, [])
                 steps_nodes += [node]
                 node_id = walk_process_steps(
-                        node_id, list(process_step_node.steps.values()),
-                        node.children)
+                        node_id, process_step_node.steps, node.children)
             return node_id
 
         def walk_todo_steps(node_id: int, todos: list[Todo],
@@ -428,8 +429,8 @@ class TaskHandler(BaseHTTPRequestHandler):
         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
         process_tree = todo.process.get_steps(self.conn, None)
         steps_todo_to_process: list[TodoOrProcStepNode] = []
-        last_node_id = walk_process_steps(
-                0, list(process_tree.values()), steps_todo_to_process)
+        last_node_id = walk_process_steps(0, process_tree,
+                                          steps_todo_to_process)
         for steps_node in steps_todo_to_process:
             steps_node.fillable = True
         walk_todo_steps(last_node_id, todo_steps, steps_todo_to_process)
@@ -518,7 +519,8 @@ class TaskHandler(BaseHTTPRequestHandler):
             preset_top_step = process_id
         return {'process': process, 'is_new': process.id_ is None,
                 'preset_top_step': preset_top_step,
-                'steps': process.get_steps(self.conn), 'owners': owners,
+                'steps': process.get_steps(self.conn),
+                'owners': owners,
                 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
                 'process_candidates': Process.all(self.conn),
                 'condition_candidates': Condition.all(self.conn)}
diff --git a/plomtask/misc.py b/plomtask/misc.py
new file mode 100644 (file)
index 0000000..fa79bf5
--- /dev/null
@@ -0,0 +1,33 @@
+"""What doesn't fit elsewhere so far."""
+from typing import Any
+
+
+class DictableNode:
+    """Template for display chain nodes providing .as_dict_and_refs."""
+    # pylint: disable=too-few-public-methods
+    _to_dict: list[str] = []
+
+    def __init__(self, *args: Any) -> None:
+        for i, arg in enumerate(args):
+            setattr(self, self._to_dict[i], arg)
+
+    @property
+    def as_dict_and_refs(self) -> tuple[dict[str, object], list[Any]]:
+        """Return self as json.dumps-ready dict, list of referenced objects."""
+        d = {}
+        refs = []
+        for name in self._to_dict:
+            attr = getattr(self, name)
+            if hasattr(attr, 'id_'):
+                d[name] = attr.id_
+                continue
+            if isinstance(attr, list):
+                d[name] = []
+                for item in attr:
+                    item_d, item_refs = item.as_dict_and_refs
+                    d[name] += [item_d]
+                    for item_ref in [r for r in item_refs if r not in refs]:
+                        refs += [item_ref]
+                continue
+            d[name] = attr
+        return d, refs
index 9870ab3c572517d498e631d479d9996b949fb26f..b68ffd8ecb3f2df8e25c46b9558879cb9dbea5fb 100644 (file)
@@ -1,8 +1,8 @@
 """Collecting Processes and Process-related items."""
 from __future__ import annotations
-from dataclasses import dataclass
 from typing import Set, Any
 from sqlite3 import Row
+from plomtask.misc import DictableNode
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.versioned_attributes import VersionedAttribute
 from plomtask.conditions import Condition, ConditionsRelations
@@ -10,15 +10,15 @@ from plomtask.exceptions import (NotFoundException, BadFormatException,
                                  HandledException)
 
 
-@dataclass
-class ProcessStepsNode:
+class ProcessStepsNode(DictableNode):
     """Collects what's useful to know for ProcessSteps tree display."""
-    process: Process
-    parent_id: int | None
+    # pylint: disable=too-few-public-methods
+    step: ProcessStep
     is_explicit: bool
-    steps: dict[int, ProcessStepsNode]
+    steps: list[ProcessStepsNode]
     seen: bool = False
     is_suppressed: bool = False
+    _to_dict = ['step', 'is_explicit', 'steps', 'seen', 'is_suppressed']
 
 
 class Process(BaseModel[int], ConditionsRelations):
@@ -87,7 +87,7 @@ class Process(BaseModel[int], ConditionsRelations):
         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
 
     def get_steps(self, db_conn: DatabaseConnection, external_owner:
-                  Process | None = None) -> dict[int, ProcessStepsNode]:
+                  Process | None = None) -> list[ProcessStepsNode]:
         """Return tree of depended-on explicit and implicit ProcessSteps."""
 
         def make_node(step: ProcessStep, suppressed: bool) -> ProcessStepsNode:
@@ -95,26 +95,26 @@ class Process(BaseModel[int], ConditionsRelations):
             if external_owner is not None:
                 is_explicit = step.owner_id == external_owner.id_
             process = self.__class__.by_id(db_conn, step.step_process_id)
-            step_steps = {}
+            step_steps = []
             if not suppressed:
                 step_steps = process.get_steps(db_conn, external_owner)
-            return ProcessStepsNode(process, step.parent_step_id,
-                                    is_explicit, step_steps, False, suppressed)
+            return ProcessStepsNode(step, is_explicit, step_steps, False,
+                                    suppressed)
 
-        def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
-            node.seen = node_id in seen_step_ids
-            seen_step_ids.add(node_id)
+        def walk_steps(node: ProcessStepsNode) -> None:
+            node.seen = node.step.id_ in seen_step_ids
+            assert isinstance(node.step.id_, int)
+            seen_step_ids.add(node.step.id_)
             if node.is_suppressed:
                 return
             explicit_children = [s for s in self.explicit_steps
-                                 if s.parent_step_id == node_id]
+                                 if s.parent_step_id == node.step.id_]
             for child in explicit_children:
-                assert isinstance(child.id_, int)
-                node.steps[child.id_] = make_node(child, False)
-            for id_, step in node.steps.items():
-                walk_steps(id_, step)
+                node.steps += [make_node(child, False)]
+            for step in node.steps:
+                walk_steps(step)
 
-        steps: dict[int, ProcessStepsNode] = {}
+        step_nodes: list[ProcessStepsNode] = []
         seen_step_ids: Set[int] = set()
         if external_owner is None:
             external_owner = self
@@ -122,10 +122,10 @@ class Process(BaseModel[int], ConditionsRelations):
                      if s.parent_step_id is None]:
             assert isinstance(step.id_, int)
             new_node = make_node(step, step in external_owner.suppressed_steps)
-            steps[step.id_] = new_node
-        for step_id, step_node in steps.items():
-            walk_steps(step_id, step_node)
-        return steps
+            step_nodes += [new_node]
+        for step_node in step_nodes:
+            walk_steps(step_node)
+        return step_nodes
 
     def set_step_suppressions(self, db_conn: DatabaseConnection,
                               step_ids: list[int]) -> None:
index 5782df0ec7e181cccfb21e36e00c2f61b3ffd060..32911078369ada1e85c6a96fade5d3b58661e82a 100644 (file)
@@ -2,6 +2,7 @@
 from __future__ import annotations
 from typing import Any, Set
 from sqlite3 import Row
+from plomtask.misc import DictableNode
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.processes import Process, ProcessStepsNode
 from plomtask.versioned_attributes import VersionedAttribute
@@ -11,37 +12,6 @@ from plomtask.exceptions import (NotFoundException, BadFormatException,
 from plomtask.dating import valid_date
 
 
-class DictableNode:
-    """Template for TodoNode, TodoOrStepsNode providing .as_dict_and_refs."""
-    # pylint: disable=too-few-public-methods
-    _to_dict: list[str] = []
-
-    def __init__(self, *args: Any) -> None:
-        for i, arg in enumerate(args):
-            setattr(self, self._to_dict[i], arg)
-
-    @property
-    def as_dict_and_refs(self) -> tuple[dict[str, object], list[Any]]:
-        """Return self as json.dumps-ready dict, list of referenced objects."""
-        d = {}
-        refs = []
-        for name in self._to_dict:
-            attr = getattr(self, name)
-            if hasattr(attr, 'id_'):
-                d[name] = attr.id_
-                continue
-            if isinstance(attr, list):
-                d[name] = []
-                for item in attr:
-                    item_d, item_refs = item.as_dict_and_refs
-                    d[name] += [item_d]
-                    for item_ref in [r for r in item_refs if r not in refs]:
-                        refs += [item_ref]
-                continue
-            d[name] = attr
-        return d, refs
-
-
 class TodoNode(DictableNode):
     """Collects what's useful to know for Todo/Condition tree display."""
     # pylint: disable=too-few-public-methods
@@ -120,31 +90,30 @@ class Todo(BaseModel[int], ConditionsRelations):
     def ensure_children(self, db_conn: DatabaseConnection) -> None:
         """Ensure Todo children (create or adopt) demanded by Process chain."""
 
-        def key_order_func(n: ProcessStepsNode) -> int:
-            assert isinstance(n.process.id_, int)
-            return n.process.id_
-
         def walk_steps(parent: Todo, step_node: ProcessStepsNode) -> Todo:
             adoptables = [t for t in Todo.by_date(db_conn, parent.date)
                           if (t not in parent.children)
                           and (t != parent)
-                          and step_node.process == t.process]
+                          and step_node.step.step_process_id == t.process.id_]
             satisfier = None
             for adoptable in adoptables:
                 satisfier = adoptable
                 break
             if not satisfier:
-                satisfier = Todo(None, step_node.process, False, parent.date)
+                proc = Process.by_id(db_conn, step_node.step.step_process_id)
+                satisfier = Todo(None, proc, False, parent.date)
                 satisfier.save(db_conn)
-            sub_step_nodes = list(step_node.steps.values())
-            sub_step_nodes.sort(key=key_order_func)
+            sub_step_nodes = sorted(step_node.steps,
+                                    key=lambda s: s.step.step_process_id)
             for sub_node in sub_step_nodes:
                 if sub_node.is_suppressed:
                     continue
                 n_slots = len([n for n in sub_step_nodes
-                               if n.process == sub_node.process])
+                               if n.step.step_process_id
+                               == sub_node.step.step_process_id])
                 filled_slots = len([t for t in satisfier.children
-                                    if t.process == sub_node.process])
+                                    if t.process.id_
+                                    == sub_node.step.step_process_id_])
                 # if we did not newly create satisfier, it may already fill
                 # some step dependencies, so only fill what remains open
                 if n_slots - filled_slots > 0:
@@ -154,7 +123,7 @@ class Todo(BaseModel[int], ConditionsRelations):
 
         process = Process.by_id(db_conn, self.process_id)
         steps_tree = process.get_steps(db_conn)
-        for step_node in steps_tree.values():
+        for step_node in steps_tree:
             if step_node.is_suppressed:
                 continue
             self.add_child(walk_steps(self, step_node))