home · contact · privacy
Clean up do_POST_process code. master
authorChristian Heller <c.heller@plomlompom.de>
Fri, 9 Aug 2024 16:15:04 +0000 (18:15 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Fri, 9 Aug 2024 16:15:04 +0000 (18:15 +0200)
plomtask/http.py
plomtask/processes.py

index c2e1f5deea8abbaef4208e8370595319ef69a237..51b35cb9722d60e50ba1eb679960a1c0c06725bd 100644 (file)
@@ -645,7 +645,6 @@ class TaskHandler(BaseHTTPRequestHandler):
         cond_rels = [self._form.get_all_int(name) for name in
                      ['conditions', 'blockers', 'enables', 'disables']]
         effort_or_not = self._form.get_str('effort')
         cond_rels = [self._form.get_all_int(name) for name in
                      ['conditions', 'blockers', 'enables', 'disables']]
         effort_or_not = self._form.get_str('effort')
-        #
         if effort_or_not is not None:
             if effort_or_not == '':
                 to_update['effort'] = None
         if effort_or_not is not None:
             if effort_or_not == '':
                 to_update['effort'] = None
@@ -655,7 +654,6 @@ class TaskHandler(BaseHTTPRequestHandler):
                 except ValueError as e:
                     msg = 'cannot float form field value for key: effort'
                     raise BadFormatException(msg) from e
                 except ValueError as e:
                     msg = 'cannot float form field value for key: effort'
                     raise BadFormatException(msg) from e
-        todo.set_condition_relations(self._conn, *cond_rels)
         for filler in [f for f in step_fillers if f != 'ignore']:
             target_id: int
             to_int = filler
         for filler in [f for f in step_fillers if f != 'ignore']:
             target_id: int
             to_int = filler
@@ -673,6 +671,8 @@ class TaskHandler(BaseHTTPRequestHandler):
                 to_make['full'] += [target_id]
             else:
                 adopted_child_ids += [target_id]
                 to_make['full'] += [target_id]
             else:
                 adopted_child_ids += [target_id]
+        #
+        todo.set_condition_relations(self._conn, *cond_rels)
         to_remove = []
         for child in todo.children:
             if child.id_ and (child.id_ not in adopted_child_ids):
         to_remove = []
         for child in todo.children:
             if child.id_ and (child.id_ not in adopted_child_ids):
@@ -713,6 +713,16 @@ class TaskHandler(BaseHTTPRequestHandler):
     def do_POST_process(self, process: Process) -> str:
         """Update or insert Process of ?id= and fields defined in postvars."""
         # pylint: disable=too-many-locals
     def do_POST_process(self, process: Process) -> str:
         """Update or insert Process of ?id= and fields defined in postvars."""
         # pylint: disable=too-many-locals
+
+        def id_or_title(l_id_or_title: list[str]) -> tuple[str, list[int]]:
+            l_ids, title = [], ''
+            for id_or_title in l_id_or_title:
+                try:
+                    l_ids += [int(id_or_title)]
+                except ValueError:
+                    title = id_or_title
+            return title, l_ids
+
         versioned = {'title': self._form.get_str_or_fail('title'),
                      'description': self._form.get_str_or_fail('description'),
                      'effort': self._form.get_float_or_fail('effort')}
         versioned = {'title': self._form.get_str_or_fail('title'),
                      'description': self._form.get_str_or_fail('description'),
                      'effort': self._form.get_float_or_fail('effort')}
@@ -720,55 +730,36 @@ class TaskHandler(BaseHTTPRequestHandler):
                      in ['conditions', 'blockers', 'enables', 'disables']]
         calendarize = self._form.get_bool_or_none('calendarize')
         step_of = self._form.get_all_str('step_of')
                      in ['conditions', 'blockers', 'enables', 'disables']]
         calendarize = self._form.get_bool_or_none('calendarize')
         step_of = self._form.get_all_str('step_of')
-        suppresses = self._form.get_all_int('suppresses')
+        suppressions = self._form.get_all_int('suppresses')
         kept_steps = self._form.get_all_int('kept_steps')
         kept_steps = self._form.get_all_int('kept_steps')
-        new_top_steps = self._form.get_all_str('new_top_step')
+        new_top_step_procs = self._form.get_all_str('new_top_step')
         new_steps_to = {}
         for step_id in kept_steps:
             name = f'new_step_to_{step_id}'
             new_steps_to[step_id] = self._form.get_all_int(name)
         new_steps_to = {}
         for step_id in kept_steps:
             name = f'new_step_to_{step_id}'
             new_steps_to[step_id] = self._form.get_all_int(name)
+        new_owner_title, owners_to_set = id_or_title(step_of)
+        new_step_title, new_top_step_proc_ids = id_or_title(new_top_step_procs)
         #
         for k, v in versioned.items():
             getattr(process, k).set(v)
         #
         for k, v in versioned.items():
             getattr(process, k).set(v)
-        process.set_condition_relations(self._conn, *cond_rels)
         if calendarize is not None:
             process.calendarize = calendarize
         process.save(self._conn)
         assert isinstance(process.id_, int)
         if calendarize is not None:
             process.calendarize = calendarize
         process.save(self._conn)
         assert isinstance(process.id_, int)
-        # set relations to, and if non-existant yet: create, other Processes
-        # pylint: disable=fixme
-        # TODO: in what order to set owners, owneds, and possibly step
-        # suppressions can make the difference between recursion checks
-        # failing; should probably be handled class-internally to Process
-        # rather than here!
-        # 1. owners (upwards)
-        owners_to_set = []
-        new_owner_title = None
-        for owner_identifier in step_of:
-            try:
-                owners_to_set += [int(owner_identifier)]
-            except ValueError:
-                new_owner_title = owner_identifier
-        process.set_owners(self._conn, owners_to_set)
-        # 2. owneds (downwards)
-        new_step_title = None
-        steps: list[ProcessStep] = [ProcessStep.by_id(self._conn, step_id)
-                                    for step_id in kept_steps]
-        for step_id in kept_steps:
-            new_sub_steps = [
+        # set relations to Conditions and ProcessSteps / other Processes
+        process.set_condition_relations(self._conn, *cond_rels)
+        owned_steps = []
+        for step_id in kept_steps:  # collecting sub-steps
+            owned_steps += [ProcessStep.by_id(self._conn, step_id)]
+            owned_steps += [  # new sub-steps
                     ProcessStep(None, process.id_, step_process_id, step_id)
                     for step_process_id in new_steps_to[step_id]]
                     ProcessStep(None, process.id_, step_process_id, step_id)
                     for step_process_id in new_steps_to[step_id]]
-            steps += new_sub_steps
-        for step_id_or_new_title in new_top_steps:
-            try:
-                step_process_id = int(step_id_or_new_title)
-                step = ProcessStep(None, process.id_, step_process_id, None)
-                steps += [step]
-            except ValueError:
-                new_step_title = step_id_or_new_title
-        process.set_steps(self._conn, steps)
-        process.set_step_suppressions(self._conn, suppresses)
-        # encode titles for potentially newly created Processes up or down
+        for step_process_id in new_top_step_proc_ids:
+            owned_steps += [ProcessStep(None, process.id_, step_process_id,
+                                        None)]
+        process.set_step_relations(self._conn, owners_to_set, suppressions,
+                                   owned_steps)
+        # encode titles for potential newly-to-create Processes up or down
         params = f'id={process.id_}'
         if new_step_title:
             title_b64_encoded = b64encode(new_step_title.encode()).decode()
         params = f'id={process.id_}'
         if new_step_title:
             title_b64_encoded = b64encode(new_step_title.encode()).decode()
index 56dd282acce8147d24b502cc61c81862765e9359..23eb624353b656c08f38b3c1c1150c0f48f9a4d5 100644 (file)
@@ -129,16 +129,54 @@ class Process(BaseModel[int], ConditionsRelations):
             walk_steps(step_node)
         return step_nodes
 
             walk_steps(step_node)
         return step_nodes
 
-    def set_step_suppressions(self, db_conn: DatabaseConnection,
-                              step_ids: list[int]) -> None:
+    def set_step_relations(self,
+                           db_conn: DatabaseConnection,
+                           owners: list[int],
+                           suppressions: list[int],
+                           owned_steps: list[ProcessStep]
+                           ) -> None:
+        """Set step owners, suppressions, and owned steps."""
+        self._set_owners(db_conn, owners)
+        self._set_step_suppressions(db_conn, suppressions)
+        self.set_steps(db_conn, owned_steps)
+
+    def _set_step_suppressions(self,
+                               db_conn: DatabaseConnection,
+                               step_ids: list[int]
+                               ) -> None:
         """Set self.suppressed_steps from step_ids."""
         assert isinstance(self.id_, int)
         db_conn.delete_where('process_step_suppressions', 'process', self.id_)
         self.suppressed_steps = [ProcessStep.by_id(db_conn, s)
                                  for s in step_ids]
 
         """Set self.suppressed_steps from step_ids."""
         assert isinstance(self.id_, int)
         db_conn.delete_where('process_step_suppressions', 'process', self.id_)
         self.suppressed_steps = [ProcessStep.by_id(db_conn, s)
                                  for s in step_ids]
 
-    def set_steps(self, db_conn: DatabaseConnection,
-                  steps: list[ProcessStep]) -> None:
+    def _set_owners(self,
+                    db_conn: DatabaseConnection,
+                    owner_ids: list[int]
+                    ) -> None:
+        """Re-set owners to those identified in owner_ids."""
+        owners_old = self.used_as_step_by(db_conn)
+        losers = [o for o in owners_old if o.id_ not in owner_ids]
+        owners_old_ids = [o.id_ for o in owners_old]
+        winners = [Process.by_id(db_conn, id_) for id_ in owner_ids
+                   if id_ not in owners_old_ids]
+        steps_to_remove = []
+        for loser in losers:
+            steps_to_remove += [s for s in loser.explicit_steps
+                                if s.step_process_id == self.id_]
+        for step in steps_to_remove:
+            step.remove(db_conn)
+        for winner in winners:
+            assert isinstance(winner.id_, int)
+            assert isinstance(self.id_, int)
+            new_step = ProcessStep(None, winner.id_, self.id_, None)
+            new_explicit_steps = winner.explicit_steps + [new_step]
+            winner.set_steps(db_conn, new_explicit_steps)
+
+    def set_steps(self,
+                  db_conn: DatabaseConnection,
+                  steps: list[ProcessStep]
+                  ) -> None:
         """Set self.explicit_steps in bulk.
 
         Checks against recursion, and turns into top-level steps any of
         """Set self.explicit_steps in bulk.
 
         Checks against recursion, and turns into top-level steps any of
@@ -166,27 +204,6 @@ class Process(BaseModel[int], ConditionsRelations):
             walk_steps(step)
             step.save(db_conn)
 
             walk_steps(step)
             step.save(db_conn)
 
-    def set_owners(self, db_conn: DatabaseConnection,
-                   owner_ids: list[int]) -> None:
-        """Re-set owners to those identified in owner_ids."""
-        owners_old = self.used_as_step_by(db_conn)
-        losers = [o for o in owners_old if o.id_ not in owner_ids]
-        owners_old_ids = [o.id_ for o in owners_old]
-        winners = [Process.by_id(db_conn, id_) for id_ in owner_ids
-                   if id_ not in owners_old_ids]
-        steps_to_remove = []
-        for loser in losers:
-            steps_to_remove += [s for s in loser.explicit_steps
-                                if s.step_process_id == self.id_]
-        for step in steps_to_remove:
-            step.remove(db_conn)
-        for winner in winners:
-            assert isinstance(winner.id_, int)
-            assert isinstance(self.id_, int)
-            new_step = ProcessStep(None, winner.id_, self.id_, None)
-            new_explicit_steps = winner.explicit_steps + [new_step]
-            winner.set_steps(db_conn, new_explicit_steps)
-
     def save(self, db_conn: DatabaseConnection) -> None:
         """Add (or re-write) self and connected items to DB."""
         super().save(db_conn)
     def save(self, db_conn: DatabaseConnection) -> None:
         """Add (or re-write) self and connected items to DB."""
         super().save(db_conn)