home · contact · privacy
Fix some ProcessStepping bugs.
authorChristian Heller <c.heller@plomlompom.de>
Thu, 30 May 2024 10:00:58 +0000 (12:00 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Thu, 30 May 2024 10:00:58 +0000 (12:00 +0200)
plomtask/http.py
plomtask/processes.py
tests/processes.py

index 944f79a274f32d2dbb616a5f26eb91e5f6a2efd1..fd206032bd302de252c05747e3457199c03770fb 100644 (file)
@@ -379,9 +379,6 @@ class TaskHandler(BaseHTTPRequestHandler):
             if step_id not in self.form_data.get_all_int('steps'):
                 raise BadFormatException('trying to keep unknown step')
         for step_id in self.form_data.get_all_int('steps'):
-            for step_process_id in self.form_data.get_all_int(
-                    f'new_step_to_{step_id}'):
-                steps += [(None, step_process_id, step_id)]
             if step_id not in self.form_data.get_all_int('keep_step'):
                 continue
             step_process_id = self.form_data.get_int(
@@ -389,8 +386,13 @@ class TaskHandler(BaseHTTPRequestHandler):
             parent_id = self.form_data.get_int_or_none(
                     f'step_{step_id}_parent_id')
             steps += [(step_id, step_process_id, parent_id)]
+        for step_id in self.form_data.get_all_int('steps'):
+            for step_process_id in self.form_data.get_all_int(
+                    f'new_step_to_{step_id}'):
+                steps += [(None, step_process_id, step_id)]
         for step_process_id in self.form_data.get_all_int('new_top_step'):
             steps += [(None, step_process_id, None)]
+        process.uncache()
         process.set_steps(self.conn, steps)
         process.save(self.conn)
         return f'/process?id={process.id_}'
index 684dec81dde47e6bd409ffaa52edc3a251246e8d..027e97517a98c269cdedb39601143442931ba868 100644 (file)
@@ -124,7 +124,6 @@ class Process(BaseModel[int], ConditionsRelations):
         just deleted under its feet), or if the parent step would not be
         owned by the current Process.
         """
-
         def walk_steps(node: ProcessStep) -> None:
             if node.step_process_id == self.id_:
                 raise BadFormatException('bad step selection causes recursion')
index 127a3f680d4a674030b1be83756f12921a78d4e5..7d1d0f13776d06afe490e097c0b475d927283a05 100644 (file)
@@ -95,32 +95,49 @@ class TestsWithDB(TestCaseWithDB):
         assert isinstance(p2.id_, int)
         assert isinstance(p3.id_, int)
         steps_p1: list[tuple[int | None, int, int | None]] = []
+        # add step of process p2 as first (top-level) step to p1
         add_step(p1, steps_p1, (None, p2.id_, None), 1)
         p1_dict: dict[int, ProcessStepsNode] = {}
         p1_dict[1] = ProcessStepsNode(p2, None, True, {}, False)
         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+        # add step of process p3 as second (top-level) step to p1
         add_step(p1, steps_p1, (None, p3.id_, None), 2)
         step_2 = p1.explicit_steps[-1]
         p1_dict[2] = ProcessStepsNode(p3, None, True, {}, False)
         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+        # add step of process p3 as first (top-level) step to p2,
+        # expect it as implicit sub-step of p1's second (p3) step
         steps_p2: list[tuple[int | None, int, int | None]] = []
         add_step(p2, steps_p2, (None, p3.id_, None), 3)
         p1_dict[1].steps[3] = ProcessStepsNode(p3, None, False, {}, False)
         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+        # add step of process p2 as explicit sub-step to p1's first sub-step
         add_step(p1, steps_p1, (None, p2.id_, step_2.id_), 4)
         step_3 = ProcessStepsNode(p3, None, False, {}, True)
         p1_dict[2].steps[4] = ProcessStepsNode(p2, step_2.id_, True,
                                                {3: step_3}, False)
         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+        # add step of process p3 as explicit sub-step to non-existing p1
+        # sub-step (of id=999), expect it to become another p1 top-level step
         add_step(p1, steps_p1, (None, p3.id_, 999), 5)
         p1_dict[5] = ProcessStepsNode(p3, None, True, {}, False)
         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+        # add step of process p3 as explicit sub-step to p1's implicit p3
+        # sub-step, expect it to become another p1 top-level step
         add_step(p1, steps_p1, (None, p3.id_, 3), 6)
         p1_dict[6] = ProcessStepsNode(p3, None, True, {}, False)
         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
         self.assertEqual(p1.used_as_step_by(self.db_conn), [])
         self.assertEqual(p2.used_as_step_by(self.db_conn), [p1])
         self.assertEqual(p3.used_as_step_by(self.db_conn), [p1, p2])
+        # add step of process p2 as explicit sub-step to p1's second (p3)
+        # top-level step
+        add_step(p1, steps_p1, (None, p3.id_, 2), 7)
+        p1_dict[2].steps[7] = ProcessStepsNode(p3, 2, True, {}, False)
+        # import pprint
+        # pprint.pp(p1.get_steps(self.db_conn, None))
+        # pprint.pp(p1_dict)
+        self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
 
     def test_Process_conditions(self) -> None:
         """Test setting Process.conditions/enables/disables."""
@@ -245,9 +262,11 @@ class TestsWithServer(TestCaseWithServer):
 
     def test_do_POST_process_steps(self) -> None:
         """Test behavior of ProcessStep posting."""
+        # pylint: disable=too-many-statements
         form_data_1 = self.post_process(1)
         self.post_process(2)
         self.post_process(3)
+        # post first (top-level) step of process 2 to process 1
         form_data_1['new_top_step'] = [2]
         self.post_process(1, form_data_1)
         retrieved_process = Process.by_id(self.db_conn, 1)
@@ -256,12 +275,15 @@ class TestsWithServer(TestCaseWithServer):
         self.assertEqual(retrieved_step.step_process_id, 2)
         self.assertEqual(retrieved_step.owner_id, 1)
         self.assertEqual(retrieved_step.parent_step_id, None)
+        # post empty steps list to process, expect clean slate, and old step to
+        # completely disappear
         form_data_1['new_top_step'] = []
         self.post_process(1, form_data_1)
         retrieved_process = Process.by_id(self.db_conn, 1)
         self.assertEqual(retrieved_process.explicit_steps, [])
         with self.assertRaises(NotFoundException):
             ProcessStep.by_id(self.db_conn, retrieved_step.id_)
+        # post new first (top_level) step of process 3 to process 1
         form_data_1['new_top_step'] = [3]
         self.post_process(1, form_data_1)
         retrieved_process = Process.by_id(self.db_conn, 1)
@@ -269,17 +291,21 @@ class TestsWithServer(TestCaseWithServer):
         self.assertEqual(retrieved_step.step_process_id, 3)
         self.assertEqual(retrieved_step.owner_id, 1)
         self.assertEqual(retrieved_step.parent_step_id, None)
+        # post to process steps list without keeps, expect clean slate
         form_data_1['new_top_step'] = []
         form_data_1['steps'] = [retrieved_step.id_]
         self.post_process(1, form_data_1)
         retrieved_process = Process.by_id(self.db_conn, 1)
         self.assertEqual(retrieved_process.explicit_steps, [])
+        # post to process empty steps list but keep, expect 400
         form_data_1['steps'] = []
         form_data_1['keep_step'] = [retrieved_step.id_]
         self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
+        # post to process steps list with keep on non-created step, expect 400
         form_data_1['steps'] = [retrieved_step.id_]
         form_data_1['keep_step'] = [retrieved_step.id_]
         self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
+        # post to process steps list with keep and process ID, expect 200
         form_data_1[f'step_{retrieved_step.id_}_process_id'] = [2]
         self.post_process(1, form_data_1)
         retrieved_process = Process.by_id(self.db_conn, 1)
@@ -288,12 +314,69 @@ class TestsWithServer(TestCaseWithServer):
         self.assertEqual(retrieved_step.step_process_id, 2)
         self.assertEqual(retrieved_step.owner_id, 1)
         self.assertEqual(retrieved_step.parent_step_id, None)
+        # post nonsensical new_top_step id and otherwise zero'd steps, expect
+        # 400 and preservation of previous state
         form_data_1['new_top_step'] = ['foo']
         form_data_1['steps'] = []
         form_data_1['keep_step'] = []
         self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
         retrieved_process = Process.by_id(self.db_conn, 1)
         self.assertEqual(len(retrieved_process.explicit_steps), 1)
+        retrieved_step = retrieved_process.explicit_steps[0]
+        self.assertEqual(retrieved_step.step_process_id, 2)
+        self.assertEqual(retrieved_step.owner_id, 1)
+        self.assertEqual(retrieved_step.parent_step_id, None)
+        # post to process steps list with keep and process ID, expect 200
+        form_data_1['new_top_step'] = [3]
+        form_data_1['steps'] = [retrieved_step.id_]
+        form_data_1['keep_step'] = [retrieved_step.id_]
+        self.post_process(1, form_data_1)
+        retrieved_process = Process.by_id(self.db_conn, 1)
+        self.assertEqual(len(retrieved_process.explicit_steps), 2)
+        retrieved_step_0 = retrieved_process.explicit_steps[0]
+        self.assertEqual(retrieved_step_0.step_process_id, 2)
+        self.assertEqual(retrieved_step_0.owner_id, 1)
+        self.assertEqual(retrieved_step_0.parent_step_id, None)
+        retrieved_step_1 = retrieved_process.explicit_steps[1]
+        self.assertEqual(retrieved_step_1.step_process_id, 3)
+        self.assertEqual(retrieved_step_1.owner_id, 1)
+        self.assertEqual(retrieved_step_1.parent_step_id, None)
+        # post to process steps list with keeps etc., but trigger recursion
+        form_data_1['new_top_step'] = []
+        form_data_1['steps'] = [retrieved_step_0.id_, retrieved_step_1.id_]
+        form_data_1['keep_step'] = [retrieved_step_0.id_, retrieved_step_1.id_]
+        form_data_1[f'step_{retrieved_step_0.id_}_process_id'] = [2]
+        form_data_1[f'step_{retrieved_step_1.id_}_process_id'] = [1]
+        self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
+        # check previous status preserved despite failed steps setting
+        retrieved_process = Process.by_id(self.db_conn, 1)
+        self.assertEqual(len(retrieved_process.explicit_steps), 2)
+        retrieved_step_0 = retrieved_process.explicit_steps[0]
+        self.assertEqual(retrieved_step_0.step_process_id, 2)
+        self.assertEqual(retrieved_step_0.owner_id, 1)
+        self.assertEqual(retrieved_step_0.parent_step_id, None)
+        retrieved_step_1 = retrieved_process.explicit_steps[1]
+        self.assertEqual(retrieved_step_1.step_process_id, 3)
+        self.assertEqual(retrieved_step_1.owner_id, 1)
+        self.assertEqual(retrieved_step_1.parent_step_id, None)
+        form_data_1[f'step_{retrieved_step_1.id_}_process_id'] = [3]
+        # post sub-step to step
+        form_data_1[f'new_step_to_{retrieved_step_1.id_}'] = [3]
+        self.post_process(1, form_data_1)
+        retrieved_process = Process.by_id(self.db_conn, 1)
+        self.assertEqual(len(retrieved_process.explicit_steps), 3)
+        retrieved_step_0 = retrieved_process.explicit_steps[0]
+        self.assertEqual(retrieved_step_0.step_process_id, 2)
+        self.assertEqual(retrieved_step_0.owner_id, 1)
+        self.assertEqual(retrieved_step_0.parent_step_id, None)
+        retrieved_step_1 = retrieved_process.explicit_steps[1]
+        self.assertEqual(retrieved_step_1.step_process_id, 3)
+        self.assertEqual(retrieved_step_1.owner_id, 1)
+        self.assertEqual(retrieved_step_1.parent_step_id, None)
+        retrieved_step_2 = retrieved_process.explicit_steps[2]
+        self.assertEqual(retrieved_step_2.step_process_id, 3)
+        self.assertEqual(retrieved_step_2.owner_id, 1)
+        self.assertEqual(retrieved_step_2.parent_step_id, retrieved_step_1.id_)
 
     def test_do_GET(self) -> None:
         """Test /process and /processes response codes."""