home · contact · privacy
Fix some ProcessStepping bugs.
authorChristian Heller <c.heller@plomlompom.de>
Thu, 30 May 2024 10:00:58 +0000 (12:00 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Thu, 30 May 2024 10:00:58 +0000 (12:00 +0200)
plomtask/http.py
plomtask/processes.py
tests/processes.py

index 944f79a274f32d2dbb616a5f26eb91e5f6a2efd1..fd206032bd302de252c05747e3457199c03770fb 100644 (file)
@@ -379,9 +379,6 @@ class TaskHandler(BaseHTTPRequestHandler):
             if step_id not in self.form_data.get_all_int('steps'):
                 raise BadFormatException('trying to keep unknown step')
         for step_id in self.form_data.get_all_int('steps'):
             if step_id not in self.form_data.get_all_int('steps'):
                 raise BadFormatException('trying to keep unknown step')
         for step_id in self.form_data.get_all_int('steps'):
-            for step_process_id in self.form_data.get_all_int(
-                    f'new_step_to_{step_id}'):
-                steps += [(None, step_process_id, step_id)]
             if step_id not in self.form_data.get_all_int('keep_step'):
                 continue
             step_process_id = self.form_data.get_int(
             if step_id not in self.form_data.get_all_int('keep_step'):
                 continue
             step_process_id = self.form_data.get_int(
@@ -389,8 +386,13 @@ class TaskHandler(BaseHTTPRequestHandler):
             parent_id = self.form_data.get_int_or_none(
                     f'step_{step_id}_parent_id')
             steps += [(step_id, step_process_id, parent_id)]
             parent_id = self.form_data.get_int_or_none(
                     f'step_{step_id}_parent_id')
             steps += [(step_id, step_process_id, parent_id)]
+        for step_id in self.form_data.get_all_int('steps'):
+            for step_process_id in self.form_data.get_all_int(
+                    f'new_step_to_{step_id}'):
+                steps += [(None, step_process_id, step_id)]
         for step_process_id in self.form_data.get_all_int('new_top_step'):
             steps += [(None, step_process_id, None)]
         for step_process_id in self.form_data.get_all_int('new_top_step'):
             steps += [(None, step_process_id, None)]
+        process.uncache()
         process.set_steps(self.conn, steps)
         process.save(self.conn)
         return f'/process?id={process.id_}'
         process.set_steps(self.conn, steps)
         process.save(self.conn)
         return f'/process?id={process.id_}'
index 684dec81dde47e6bd409ffaa52edc3a251246e8d..027e97517a98c269cdedb39601143442931ba868 100644 (file)
@@ -124,7 +124,6 @@ class Process(BaseModel[int], ConditionsRelations):
         just deleted under its feet), or if the parent step would not be
         owned by the current Process.
         """
         just deleted under its feet), or if the parent step would not be
         owned by the current Process.
         """
-
         def walk_steps(node: ProcessStep) -> None:
             if node.step_process_id == self.id_:
                 raise BadFormatException('bad step selection causes recursion')
         def walk_steps(node: ProcessStep) -> None:
             if node.step_process_id == self.id_:
                 raise BadFormatException('bad step selection causes recursion')
index 127a3f680d4a674030b1be83756f12921a78d4e5..7d1d0f13776d06afe490e097c0b475d927283a05 100644 (file)
@@ -95,32 +95,49 @@ class TestsWithDB(TestCaseWithDB):
         assert isinstance(p2.id_, int)
         assert isinstance(p3.id_, int)
         steps_p1: list[tuple[int | None, int, int | None]] = []
         assert isinstance(p2.id_, int)
         assert isinstance(p3.id_, int)
         steps_p1: list[tuple[int | None, int, int | None]] = []
+        # add step of process p2 as first (top-level) step to p1
         add_step(p1, steps_p1, (None, p2.id_, None), 1)
         p1_dict: dict[int, ProcessStepsNode] = {}
         p1_dict[1] = ProcessStepsNode(p2, None, True, {}, False)
         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
         add_step(p1, steps_p1, (None, p2.id_, None), 1)
         p1_dict: dict[int, ProcessStepsNode] = {}
         p1_dict[1] = ProcessStepsNode(p2, None, True, {}, False)
         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+        # add step of process p3 as second (top-level) step to p1
         add_step(p1, steps_p1, (None, p3.id_, None), 2)
         step_2 = p1.explicit_steps[-1]
         p1_dict[2] = ProcessStepsNode(p3, None, True, {}, False)
         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
         add_step(p1, steps_p1, (None, p3.id_, None), 2)
         step_2 = p1.explicit_steps[-1]
         p1_dict[2] = ProcessStepsNode(p3, None, True, {}, False)
         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+        # add step of process p3 as first (top-level) step to p2,
+        # expect it as implicit sub-step of p1's second (p3) step
         steps_p2: list[tuple[int | None, int, int | None]] = []
         add_step(p2, steps_p2, (None, p3.id_, None), 3)
         p1_dict[1].steps[3] = ProcessStepsNode(p3, None, False, {}, False)
         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
         steps_p2: list[tuple[int | None, int, int | None]] = []
         add_step(p2, steps_p2, (None, p3.id_, None), 3)
         p1_dict[1].steps[3] = ProcessStepsNode(p3, None, False, {}, False)
         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+        # add step of process p2 as explicit sub-step to p1's first sub-step
         add_step(p1, steps_p1, (None, p2.id_, step_2.id_), 4)
         step_3 = ProcessStepsNode(p3, None, False, {}, True)
         p1_dict[2].steps[4] = ProcessStepsNode(p2, step_2.id_, True,
                                                {3: step_3}, False)
         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
         add_step(p1, steps_p1, (None, p2.id_, step_2.id_), 4)
         step_3 = ProcessStepsNode(p3, None, False, {}, True)
         p1_dict[2].steps[4] = ProcessStepsNode(p2, step_2.id_, True,
                                                {3: step_3}, False)
         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+        # add step of process p3 as explicit sub-step to non-existing p1
+        # sub-step (of id=999), expect it to become another p1 top-level step
         add_step(p1, steps_p1, (None, p3.id_, 999), 5)
         p1_dict[5] = ProcessStepsNode(p3, None, True, {}, False)
         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
         add_step(p1, steps_p1, (None, p3.id_, 999), 5)
         p1_dict[5] = ProcessStepsNode(p3, None, True, {}, False)
         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+        # add step of process p3 as explicit sub-step to p1's implicit p3
+        # sub-step, expect it to become another p1 top-level step
         add_step(p1, steps_p1, (None, p3.id_, 3), 6)
         p1_dict[6] = ProcessStepsNode(p3, None, True, {}, False)
         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
         self.assertEqual(p1.used_as_step_by(self.db_conn), [])
         self.assertEqual(p2.used_as_step_by(self.db_conn), [p1])
         self.assertEqual(p3.used_as_step_by(self.db_conn), [p1, p2])
         add_step(p1, steps_p1, (None, p3.id_, 3), 6)
         p1_dict[6] = ProcessStepsNode(p3, None, True, {}, False)
         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
         self.assertEqual(p1.used_as_step_by(self.db_conn), [])
         self.assertEqual(p2.used_as_step_by(self.db_conn), [p1])
         self.assertEqual(p3.used_as_step_by(self.db_conn), [p1, p2])
+        # add step of process p2 as explicit sub-step to p1's second (p3)
+        # top-level step
+        add_step(p1, steps_p1, (None, p3.id_, 2), 7)
+        p1_dict[2].steps[7] = ProcessStepsNode(p3, 2, True, {}, False)
+        # import pprint
+        # pprint.pp(p1.get_steps(self.db_conn, None))
+        # pprint.pp(p1_dict)
+        self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
 
     def test_Process_conditions(self) -> None:
         """Test setting Process.conditions/enables/disables."""
 
     def test_Process_conditions(self) -> None:
         """Test setting Process.conditions/enables/disables."""
@@ -245,9 +262,11 @@ class TestsWithServer(TestCaseWithServer):
 
     def test_do_POST_process_steps(self) -> None:
         """Test behavior of ProcessStep posting."""
 
     def test_do_POST_process_steps(self) -> None:
         """Test behavior of ProcessStep posting."""
+        # pylint: disable=too-many-statements
         form_data_1 = self.post_process(1)
         self.post_process(2)
         self.post_process(3)
         form_data_1 = self.post_process(1)
         self.post_process(2)
         self.post_process(3)
+        # post first (top-level) step of process 2 to process 1
         form_data_1['new_top_step'] = [2]
         self.post_process(1, form_data_1)
         retrieved_process = Process.by_id(self.db_conn, 1)
         form_data_1['new_top_step'] = [2]
         self.post_process(1, form_data_1)
         retrieved_process = Process.by_id(self.db_conn, 1)
@@ -256,12 +275,15 @@ class TestsWithServer(TestCaseWithServer):
         self.assertEqual(retrieved_step.step_process_id, 2)
         self.assertEqual(retrieved_step.owner_id, 1)
         self.assertEqual(retrieved_step.parent_step_id, None)
         self.assertEqual(retrieved_step.step_process_id, 2)
         self.assertEqual(retrieved_step.owner_id, 1)
         self.assertEqual(retrieved_step.parent_step_id, None)
+        # post empty steps list to process, expect clean slate, and old step to
+        # completely disappear
         form_data_1['new_top_step'] = []
         self.post_process(1, form_data_1)
         retrieved_process = Process.by_id(self.db_conn, 1)
         self.assertEqual(retrieved_process.explicit_steps, [])
         with self.assertRaises(NotFoundException):
             ProcessStep.by_id(self.db_conn, retrieved_step.id_)
         form_data_1['new_top_step'] = []
         self.post_process(1, form_data_1)
         retrieved_process = Process.by_id(self.db_conn, 1)
         self.assertEqual(retrieved_process.explicit_steps, [])
         with self.assertRaises(NotFoundException):
             ProcessStep.by_id(self.db_conn, retrieved_step.id_)
+        # post new first (top_level) step of process 3 to process 1
         form_data_1['new_top_step'] = [3]
         self.post_process(1, form_data_1)
         retrieved_process = Process.by_id(self.db_conn, 1)
         form_data_1['new_top_step'] = [3]
         self.post_process(1, form_data_1)
         retrieved_process = Process.by_id(self.db_conn, 1)
@@ -269,17 +291,21 @@ class TestsWithServer(TestCaseWithServer):
         self.assertEqual(retrieved_step.step_process_id, 3)
         self.assertEqual(retrieved_step.owner_id, 1)
         self.assertEqual(retrieved_step.parent_step_id, None)
         self.assertEqual(retrieved_step.step_process_id, 3)
         self.assertEqual(retrieved_step.owner_id, 1)
         self.assertEqual(retrieved_step.parent_step_id, None)
+        # post to process steps list without keeps, expect clean slate
         form_data_1['new_top_step'] = []
         form_data_1['steps'] = [retrieved_step.id_]
         self.post_process(1, form_data_1)
         retrieved_process = Process.by_id(self.db_conn, 1)
         self.assertEqual(retrieved_process.explicit_steps, [])
         form_data_1['new_top_step'] = []
         form_data_1['steps'] = [retrieved_step.id_]
         self.post_process(1, form_data_1)
         retrieved_process = Process.by_id(self.db_conn, 1)
         self.assertEqual(retrieved_process.explicit_steps, [])
+        # post to process empty steps list but keep, expect 400
         form_data_1['steps'] = []
         form_data_1['keep_step'] = [retrieved_step.id_]
         self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
         form_data_1['steps'] = []
         form_data_1['keep_step'] = [retrieved_step.id_]
         self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
+        # post to process steps list with keep on non-created step, expect 400
         form_data_1['steps'] = [retrieved_step.id_]
         form_data_1['keep_step'] = [retrieved_step.id_]
         self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
         form_data_1['steps'] = [retrieved_step.id_]
         form_data_1['keep_step'] = [retrieved_step.id_]
         self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
+        # post to process steps list with keep and process ID, expect 200
         form_data_1[f'step_{retrieved_step.id_}_process_id'] = [2]
         self.post_process(1, form_data_1)
         retrieved_process = Process.by_id(self.db_conn, 1)
         form_data_1[f'step_{retrieved_step.id_}_process_id'] = [2]
         self.post_process(1, form_data_1)
         retrieved_process = Process.by_id(self.db_conn, 1)
@@ -288,12 +314,69 @@ class TestsWithServer(TestCaseWithServer):
         self.assertEqual(retrieved_step.step_process_id, 2)
         self.assertEqual(retrieved_step.owner_id, 1)
         self.assertEqual(retrieved_step.parent_step_id, None)
         self.assertEqual(retrieved_step.step_process_id, 2)
         self.assertEqual(retrieved_step.owner_id, 1)
         self.assertEqual(retrieved_step.parent_step_id, None)
+        # post nonsensical new_top_step id and otherwise zero'd steps, expect
+        # 400 and preservation of previous state
         form_data_1['new_top_step'] = ['foo']
         form_data_1['steps'] = []
         form_data_1['keep_step'] = []
         self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
         retrieved_process = Process.by_id(self.db_conn, 1)
         self.assertEqual(len(retrieved_process.explicit_steps), 1)
         form_data_1['new_top_step'] = ['foo']
         form_data_1['steps'] = []
         form_data_1['keep_step'] = []
         self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
         retrieved_process = Process.by_id(self.db_conn, 1)
         self.assertEqual(len(retrieved_process.explicit_steps), 1)
+        retrieved_step = retrieved_process.explicit_steps[0]
+        self.assertEqual(retrieved_step.step_process_id, 2)
+        self.assertEqual(retrieved_step.owner_id, 1)
+        self.assertEqual(retrieved_step.parent_step_id, None)
+        # post to process steps list with keep and process ID, expect 200
+        form_data_1['new_top_step'] = [3]
+        form_data_1['steps'] = [retrieved_step.id_]
+        form_data_1['keep_step'] = [retrieved_step.id_]
+        self.post_process(1, form_data_1)
+        retrieved_process = Process.by_id(self.db_conn, 1)
+        self.assertEqual(len(retrieved_process.explicit_steps), 2)
+        retrieved_step_0 = retrieved_process.explicit_steps[0]
+        self.assertEqual(retrieved_step_0.step_process_id, 2)
+        self.assertEqual(retrieved_step_0.owner_id, 1)
+        self.assertEqual(retrieved_step_0.parent_step_id, None)
+        retrieved_step_1 = retrieved_process.explicit_steps[1]
+        self.assertEqual(retrieved_step_1.step_process_id, 3)
+        self.assertEqual(retrieved_step_1.owner_id, 1)
+        self.assertEqual(retrieved_step_1.parent_step_id, None)
+        # post to process steps list with keeps etc., but trigger recursion
+        form_data_1['new_top_step'] = []
+        form_data_1['steps'] = [retrieved_step_0.id_, retrieved_step_1.id_]
+        form_data_1['keep_step'] = [retrieved_step_0.id_, retrieved_step_1.id_]
+        form_data_1[f'step_{retrieved_step_0.id_}_process_id'] = [2]
+        form_data_1[f'step_{retrieved_step_1.id_}_process_id'] = [1]
+        self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
+        # check previous status preserved despite failed steps setting
+        retrieved_process = Process.by_id(self.db_conn, 1)
+        self.assertEqual(len(retrieved_process.explicit_steps), 2)
+        retrieved_step_0 = retrieved_process.explicit_steps[0]
+        self.assertEqual(retrieved_step_0.step_process_id, 2)
+        self.assertEqual(retrieved_step_0.owner_id, 1)
+        self.assertEqual(retrieved_step_0.parent_step_id, None)
+        retrieved_step_1 = retrieved_process.explicit_steps[1]
+        self.assertEqual(retrieved_step_1.step_process_id, 3)
+        self.assertEqual(retrieved_step_1.owner_id, 1)
+        self.assertEqual(retrieved_step_1.parent_step_id, None)
+        form_data_1[f'step_{retrieved_step_1.id_}_process_id'] = [3]
+        # post sub-step to step
+        form_data_1[f'new_step_to_{retrieved_step_1.id_}'] = [3]
+        self.post_process(1, form_data_1)
+        retrieved_process = Process.by_id(self.db_conn, 1)
+        self.assertEqual(len(retrieved_process.explicit_steps), 3)
+        retrieved_step_0 = retrieved_process.explicit_steps[0]
+        self.assertEqual(retrieved_step_0.step_process_id, 2)
+        self.assertEqual(retrieved_step_0.owner_id, 1)
+        self.assertEqual(retrieved_step_0.parent_step_id, None)
+        retrieved_step_1 = retrieved_process.explicit_steps[1]
+        self.assertEqual(retrieved_step_1.step_process_id, 3)
+        self.assertEqual(retrieved_step_1.owner_id, 1)
+        self.assertEqual(retrieved_step_1.parent_step_id, None)
+        retrieved_step_2 = retrieved_process.explicit_steps[2]
+        self.assertEqual(retrieved_step_2.step_process_id, 3)
+        self.assertEqual(retrieved_step_2.owner_id, 1)
+        self.assertEqual(retrieved_step_2.parent_step_id, retrieved_step_1.id_)
 
     def test_do_GET(self) -> None:
         """Test /process and /processes response codes."""
 
     def test_do_GET(self) -> None:
         """Test /process and /processes response codes."""