home · contact · privacy
Extend and refactor tests.
[plomtask] / plomtask / processes.py
index 36158996595dccfad678730eb29a50c422c26b9d..23eb624353b656c08f38b3c1c1150c0f48f9a4d5 100644 (file)
@@ -1,8 +1,8 @@
 """Collecting Processes and Process-related items."""
 from __future__ import annotations
 """Collecting Processes and Process-related items."""
 from __future__ import annotations
-from dataclasses import dataclass
 from typing import Set, Any
 from sqlite3 import Row
 from typing import Set, Any
 from sqlite3 import Row
+from plomtask.misc import DictableNode
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.versioned_attributes import VersionedAttribute
 from plomtask.conditions import Condition, ConditionsRelations
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.versioned_attributes import VersionedAttribute
 from plomtask.conditions import Condition, ConditionsRelations
@@ -10,23 +10,24 @@ from plomtask.exceptions import (NotFoundException, BadFormatException,
                                  HandledException)
 
 
                                  HandledException)
 
 
-@dataclass
-class ProcessStepsNode:
+class ProcessStepsNode(DictableNode):
     """Collects what's useful to know for ProcessSteps tree display."""
     """Collects what's useful to know for ProcessSteps tree display."""
+    # pylint: disable=too-few-public-methods
+    step: ProcessStep
     process: Process
     process: Process
-    parent_id: int | None
     is_explicit: bool
     is_explicit: bool
-    steps: dict[int, ProcessStepsNode]
+    steps: list[ProcessStepsNode]
     seen: bool = False
     is_suppressed: bool = False
     seen: bool = False
     is_suppressed: bool = False
+    _to_dict = ['step', 'process', 'is_explicit', 'steps', 'seen',
+                'is_suppressed']
 
 
 class Process(BaseModel[int], ConditionsRelations):
     """Template for, and metadata for, Todos, and their arrangements."""
     # pylint: disable=too-many-instance-attributes
     table_name = 'processes'
 
 
 class Process(BaseModel[int], ConditionsRelations):
     """Template for, and metadata for, Todos, and their arrangements."""
     # pylint: disable=too-many-instance-attributes
     table_name = 'processes'
-    to_save = ['calendarize']
-    to_save_versioned = ['title', 'description', 'effort']
+    to_save_simples = ['calendarize']
     to_save_relations = [('process_conditions', 'process', 'conditions', 0),
                          ('process_blockers', 'process', 'blockers', 0),
                          ('process_enables', 'process', 'enables', 0),
     to_save_relations = [('process_conditions', 'process', 'conditions', 0),
                          ('process_blockers', 'process', 'blockers', 0),
                          ('process_enables', 'process', 'enables', 0),
@@ -34,15 +35,21 @@ class Process(BaseModel[int], ConditionsRelations):
                          ('process_step_suppressions', 'process',
                           'suppressed_steps', 0)]
     add_to_dict = ['explicit_steps']
                          ('process_step_suppressions', 'process',
                           'suppressed_steps', 0)]
     add_to_dict = ['explicit_steps']
+    versioned_defaults = {'title': 'UNNAMED', 'description': '', 'effort': 1.0}
     to_search = ['title.newest', 'description.newest']
     can_create_by_id = True
     to_search = ['title.newest', 'description.newest']
     can_create_by_id = True
+    sorters = {'steps': lambda p: len(p.explicit_steps),
+               'owners': lambda p: p.n_owners,
+               'effort': lambda p: p.effort.newest,
+               'title': lambda p: p.title.newest}
 
     def __init__(self, id_: int | None, calendarize: bool = False) -> None:
         BaseModel.__init__(self, id_)
         ConditionsRelations.__init__(self)
 
     def __init__(self, id_: int | None, calendarize: bool = False) -> None:
         BaseModel.__init__(self, id_)
         ConditionsRelations.__init__(self)
-        self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
-        self.description = VersionedAttribute(self, 'process_descriptions', '')
-        self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
+        for name in ['title', 'description', 'effort']:
+            attr = VersionedAttribute(self, f'process_{name}s',
+                                      self.versioned_defaults[name])
+            setattr(self, name, attr)
         self.explicit_steps: list[ProcessStep] = []
         self.suppressed_steps: list[ProcessStep] = []
         self.calendarize = calendarize
         self.explicit_steps: list[ProcessStep] = []
         self.suppressed_steps: list[ProcessStep] = []
         self.calendarize = calendarize
@@ -82,7 +89,7 @@ class Process(BaseModel[int], ConditionsRelations):
         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
 
     def get_steps(self, db_conn: DatabaseConnection, external_owner:
         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
 
     def get_steps(self, db_conn: DatabaseConnection, external_owner:
-                  Process | None = None) -> dict[int, ProcessStepsNode]:
+                  Process | None = None) -> list[ProcessStepsNode]:
         """Return tree of depended-on explicit and implicit ProcessSteps."""
 
         def make_node(step: ProcessStep, suppressed: bool) -> ProcessStepsNode:
         """Return tree of depended-on explicit and implicit ProcessSteps."""
 
         def make_node(step: ProcessStep, suppressed: bool) -> ProcessStepsNode:
@@ -90,26 +97,26 @@ class Process(BaseModel[int], ConditionsRelations):
             if external_owner is not None:
                 is_explicit = step.owner_id == external_owner.id_
             process = self.__class__.by_id(db_conn, step.step_process_id)
             if external_owner is not None:
                 is_explicit = step.owner_id == external_owner.id_
             process = self.__class__.by_id(db_conn, step.step_process_id)
-            step_steps = {}
+            step_steps = []
             if not suppressed:
                 step_steps = process.get_steps(db_conn, external_owner)
             if not suppressed:
                 step_steps = process.get_steps(db_conn, external_owner)
-            return ProcessStepsNode(process, step.parent_step_id,
-                                    is_explicit, step_steps, False, suppressed)
+            return ProcessStepsNode(step, process, is_explicit, step_steps,
+                                    False, suppressed)
 
 
-        def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
-            node.seen = node_id in seen_step_ids
-            seen_step_ids.add(node_id)
+        def walk_steps(node: ProcessStepsNode) -> None:
+            node.seen = node.step.id_ in seen_step_ids
+            assert isinstance(node.step.id_, int)
+            seen_step_ids.add(node.step.id_)
             if node.is_suppressed:
                 return
             explicit_children = [s for s in self.explicit_steps
             if node.is_suppressed:
                 return
             explicit_children = [s for s in self.explicit_steps
-                                 if s.parent_step_id == node_id]
+                                 if s.parent_step_id == node.step.id_]
             for child in explicit_children:
             for child in explicit_children:
-                assert isinstance(child.id_, int)
-                node.steps[child.id_] = make_node(child, False)
-            for id_, step in node.steps.items():
-                walk_steps(id_, step)
+                node.steps += [make_node(child, False)]
+            for step in node.steps:
+                walk_steps(step)
 
 
-        steps: dict[int, ProcessStepsNode] = {}
+        step_nodes: list[ProcessStepsNode] = []
         seen_step_ids: Set[int] = set()
         if external_owner is None:
             external_owner = self
         seen_step_ids: Set[int] = set()
         if external_owner is None:
             external_owner = self
@@ -117,21 +124,59 @@ class Process(BaseModel[int], ConditionsRelations):
                      if s.parent_step_id is None]:
             assert isinstance(step.id_, int)
             new_node = make_node(step, step in external_owner.suppressed_steps)
                      if s.parent_step_id is None]:
             assert isinstance(step.id_, int)
             new_node = make_node(step, step in external_owner.suppressed_steps)
-            steps[step.id_] = new_node
-        for step_id, step_node in steps.items():
-            walk_steps(step_id, step_node)
-        return steps
+            step_nodes += [new_node]
+        for step_node in step_nodes:
+            walk_steps(step_node)
+        return step_nodes
+
+    def set_step_relations(self,
+                           db_conn: DatabaseConnection,
+                           owners: list[int],
+                           suppressions: list[int],
+                           owned_steps: list[ProcessStep]
+                           ) -> None:
+        """Set step owners, suppressions, and owned steps."""
+        self._set_owners(db_conn, owners)
+        self._set_step_suppressions(db_conn, suppressions)
+        self.set_steps(db_conn, owned_steps)
 
 
-    def set_step_suppressions(self, db_conn: DatabaseConnection,
-                              step_ids: list[int]) -> None:
+    def _set_step_suppressions(self,
+                               db_conn: DatabaseConnection,
+                               step_ids: list[int]
+                               ) -> None:
         """Set self.suppressed_steps from step_ids."""
         assert isinstance(self.id_, int)
         db_conn.delete_where('process_step_suppressions', 'process', self.id_)
         self.suppressed_steps = [ProcessStep.by_id(db_conn, s)
                                  for s in step_ids]
 
         """Set self.suppressed_steps from step_ids."""
         assert isinstance(self.id_, int)
         db_conn.delete_where('process_step_suppressions', 'process', self.id_)
         self.suppressed_steps = [ProcessStep.by_id(db_conn, s)
                                  for s in step_ids]
 
-    def set_steps(self, db_conn: DatabaseConnection,
-                  steps: list[ProcessStep]) -> None:
+    def _set_owners(self,
+                    db_conn: DatabaseConnection,
+                    owner_ids: list[int]
+                    ) -> None:
+        """Re-set owners to those identified in owner_ids."""
+        owners_old = self.used_as_step_by(db_conn)
+        losers = [o for o in owners_old if o.id_ not in owner_ids]
+        owners_old_ids = [o.id_ for o in owners_old]
+        winners = [Process.by_id(db_conn, id_) for id_ in owner_ids
+                   if id_ not in owners_old_ids]
+        steps_to_remove = []
+        for loser in losers:
+            steps_to_remove += [s for s in loser.explicit_steps
+                                if s.step_process_id == self.id_]
+        for step in steps_to_remove:
+            step.remove(db_conn)
+        for winner in winners:
+            assert isinstance(winner.id_, int)
+            assert isinstance(self.id_, int)
+            new_step = ProcessStep(None, winner.id_, self.id_, None)
+            new_explicit_steps = winner.explicit_steps + [new_step]
+            winner.set_steps(db_conn, new_explicit_steps)
+
+    def set_steps(self,
+                  db_conn: DatabaseConnection,
+                  steps: list[ProcessStep]
+                  ) -> None:
         """Set self.explicit_steps in bulk.
 
         Checks against recursion, and turns into top-level steps any of
         """Set self.explicit_steps in bulk.
 
         Checks against recursion, and turns into top-level steps any of
@@ -159,27 +204,6 @@ class Process(BaseModel[int], ConditionsRelations):
             walk_steps(step)
             step.save(db_conn)
 
             walk_steps(step)
             step.save(db_conn)
 
-    def set_owners(self, db_conn: DatabaseConnection,
-                   owner_ids: list[int]) -> None:
-        """Re-set owners to those identified in owner_ids."""
-        owners_old = self.used_as_step_by(db_conn)
-        losers = [o for o in owners_old if o.id_ not in owner_ids]
-        owners_old_ids = [o.id_ for o in owners_old]
-        winners = [Process.by_id(db_conn, id_) for id_ in owner_ids
-                   if id_ not in owners_old_ids]
-        steps_to_remove = []
-        for loser in losers:
-            steps_to_remove += [s for s in loser.explicit_steps
-                                if s.step_process_id == self.id_]
-        for step in steps_to_remove:
-            step.remove(db_conn)
-        for winner in winners:
-            assert isinstance(winner.id_, int)
-            assert isinstance(self.id_, int)
-            new_step = ProcessStep(None, winner.id_, self.id_, None)
-            new_explicit_steps = winner.explicit_steps + [new_step]
-            winner.set_steps(db_conn, new_explicit_steps)
-
     def save(self, db_conn: DatabaseConnection) -> None:
         """Add (or re-write) self and connected items to DB."""
         super().save(db_conn)
     def save(self, db_conn: DatabaseConnection) -> None:
         """Add (or re-write) self and connected items to DB."""
         super().save(db_conn)
@@ -206,7 +230,7 @@ class Process(BaseModel[int], ConditionsRelations):
 class ProcessStep(BaseModel[int]):
     """Sub-unit of Processes."""
     table_name = 'process_steps'
 class ProcessStep(BaseModel[int]):
     """Sub-unit of Processes."""
     table_name = 'process_steps'
-    to_save = ['owner_id', 'step_process_id', 'parent_step_id']
+    to_save_simples = ['owner_id', 'step_process_id', 'parent_step_id']
 
     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
                  parent_step_id: int | None) -> None:
 
     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
                  parent_step_id: int | None) -> None: