home · contact · privacy
Simplify code with namedtuples and dataclasses.
[plomtask] / tests / processes.py
index 8a7f91d4c561a34878af14b7e0796dcaa38ecf26..9413822291a87439aafda0cd39459c93b3905661 100644 (file)
@@ -1,8 +1,9 @@
 """Test Processes module."""
 from unittest import TestCase
 from tests.utils import TestCaseWithDB, TestCaseWithServer
-from plomtask.processes import Process
-from plomtask.exceptions import NotFoundException, BadFormatException
+from plomtask.processes import Process, ProcessStep, ProcessStepsNode
+from plomtask.conditions import Condition
+from plomtask.exceptions import NotFoundException, HandledException
 
 
 class TestsSansDB(TestCase):
@@ -14,28 +15,98 @@ class TestsSansDB(TestCase):
         self.assertEqual(Process(None).description.newest, '')
         self.assertEqual(Process(None).effort.newest, 1.0)
 
+    def test_Process_legal_ID(self) -> None:
+        """Test Process cannot be instantiated with id_=0."""
+        with self.assertRaises(HandledException):
+            Process(0)
+
 
 class TestsWithDB(TestCaseWithDB):
     """Mdule tests not requiring DB setup."""
 
-    def test_Process_save(self) -> None:
-        """Test Process.save()."""
-        p_saved = Process(None)
-        p_saved.save(self.db_conn)
-        self.assertEqual(p_saved.id_,
+    def setUp(self) -> None:
+        super().setUp()
+        self.proc1 = Process(None)
+        self.proc1.save(self.db_conn)
+        self.proc2 = Process(None)
+        self.proc2.save(self.db_conn)
+        self.proc3 = Process(None)
+        self.proc3.save(self.db_conn)
+
+    def test_Process_ids(self) -> None:
+        """Test Process.save() re Process.id_."""
+        self.assertEqual(self.proc1.id_,
                          Process.by_id(self.db_conn, 1, create=False).id_)
-        with self.assertRaises(BadFormatException):
-            p_saved = Process(0)
-        p_saved = Process(5)
-        p_saved.save(self.db_conn)
-        self.assertEqual(p_saved.id_,
+        self.assertEqual(self.proc2.id_,
+                         Process.by_id(self.db_conn, 2, create=False).id_)
+        proc5 = Process(5)
+        proc5.save(self.db_conn)
+        self.assertEqual(proc5.id_,
                          Process.by_id(self.db_conn, 5, create=False).id_)
-        p_saved.title.set('named')
-        p_loaded = Process.by_id(self.db_conn, p_saved.id_)
-        self.assertNotEqual(p_saved.title.history, p_loaded.title.history)
-        p_saved.save(self.db_conn)
-        p_loaded = Process.by_id(self.db_conn, p_saved.id_)
-        self.assertEqual(p_saved.title.history, p_loaded.title.history)
+
+    def test_Process_steps(self) -> None:
+        """Test addition, nesting, and non-recursion of ProcessSteps"""
+        def add_step(proc: Process,
+                     steps_proc: list[tuple[int | None, int, int | None]],
+                     step_tuple: tuple[int | None, int, int | None],
+                     expected_id: int) -> None:
+            steps_proc += [step_tuple]
+            proc.set_steps(self.db_conn, steps_proc)
+            steps_proc[-1] = (expected_id, step_tuple[1], step_tuple[2])
+        assert isinstance(self.proc2.id_, int)
+        assert isinstance(self.proc3.id_, int)
+        steps_proc1: list[tuple[int | None, int, int | None]] = []
+        add_step(self.proc1, steps_proc1, (None, self.proc2.id_, None), 1)
+        p_1_dict: dict[int, ProcessStepsNode] = {}
+        p_1_dict[1] = ProcessStepsNode(self.proc2, None, True, {}, False)
+        self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
+        add_step(self.proc1, steps_proc1, (None, self.proc3.id_, None), 2)
+        step_2 = self.proc1.explicit_steps[-1]
+        assert isinstance(step_2.id_, int)
+        p_1_dict[2] = ProcessStepsNode(self.proc3, None, True, {}, False)
+        self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
+        steps_proc2: list[tuple[int | None, int, int | None]] = []
+        add_step(self.proc2, steps_proc2, (None, self.proc3.id_, None), 3)
+        p_1_dict[1].steps[3] = ProcessStepsNode(self.proc3, None,
+                                                False, {}, False)
+        self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
+        add_step(self.proc1, steps_proc1, (None, self.proc2.id_, step_2.id_),
+                 4)
+        step_3 = ProcessStepsNode(self.proc3, None, False, {}, True)
+        p_1_dict[2].steps[4] = ProcessStepsNode(self.proc2, step_2.id_, True,
+                                                {3: step_3}, False)
+        self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
+        add_step(self.proc1, steps_proc1, (None, self.proc3.id_, 999), 5)
+        p_1_dict[5] = ProcessStepsNode(self.proc3, None, True, {}, False)
+        self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
+        add_step(self.proc1, steps_proc1, (None, self.proc3.id_, 3), 6)
+        p_1_dict[6] = ProcessStepsNode(self.proc3, None, True, {}, False)
+        self.assertEqual(self.proc1.get_steps(self.db_conn, None),
+                         p_1_dict)
+        self.assertEqual(self.proc1.used_as_step_by(self.db_conn),
+                         [])
+        self.assertEqual(self.proc2.used_as_step_by(self.db_conn),
+                         [self.proc1])
+        self.assertEqual(self.proc3.used_as_step_by(self.db_conn),
+                         [self.proc1, self.proc2])
+
+    def test_Process_conditions(self) -> None:
+        """Test setting Process.conditions/enables/disables."""
+        for target in ('conditions', 'enables', 'disables'):
+            c1 = Condition(None, False)
+            c1.save(self.db_conn)
+            assert isinstance(c1.id_, int)
+            c2 = Condition(None, False)
+            c2.save(self.db_conn)
+            assert isinstance(c2.id_, int)
+            self.proc1.set_conditions(self.db_conn, [], target)
+            self.assertEqual(getattr(self.proc1, target), [])
+            self.proc1.set_conditions(self.db_conn, [c1.id_], target)
+            self.assertEqual(getattr(self.proc1, target), [c1])
+            self.proc1.set_conditions(self.db_conn, [c2.id_], target)
+            self.assertEqual(getattr(self.proc1, target), [c2])
+            self.proc1.set_conditions(self.db_conn, [c1.id_, c2.id_], target)
+            self.assertEqual(getattr(self.proc1, target), [c1, c2])
 
     def test_Process_by_id(self) -> None:
         """Test Process.by_id()."""
@@ -43,44 +114,54 @@ class TestsWithDB(TestCaseWithDB):
             Process.by_id(self.db_conn, None, create=False)
         with self.assertRaises(NotFoundException):
             Process.by_id(self.db_conn, 0, create=False)
-        with self.assertRaises(NotFoundException):
-            Process.by_id(self.db_conn, 1, create=False)
-        self.assertNotEqual(Process(1).id_,
+        self.assertNotEqual(self.proc1.id_,
                             Process.by_id(self.db_conn, None, create=True).id_)
-        self.assertEqual(Process(1).id_,
-                         Process.by_id(self.db_conn, 1, create=True).id_)
         self.assertEqual(Process(2).id_,
                          Process.by_id(self.db_conn, 2, create=True).id_)
 
     def test_Process_all(self) -> None:
         """Test Process.all()."""
-        p_1 = Process(None)
-        p_1.save(self.db_conn)
-        p_2 = Process(None)
-        p_2.save(self.db_conn)
-        self.assertEqual({p_1.id_, p_2.id_},
+        self.assertEqual({self.proc1.id_, self.proc2.id_, self.proc3.id_},
                          set(p.id_ for p in Process.all(self.db_conn)))
 
+    def test_ProcessStep_singularity(self) -> None:
+        """Test pointers made for single object keep pointing to it."""
+        assert isinstance(self.proc2.id_, int)
+        self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)])
+        step = self.proc1.explicit_steps[-1]
+        assert isinstance(step.id_, int)
+        step_retrieved = ProcessStep.by_id(self.db_conn, step.id_)
+        step.parent_step_id = 99
+        self.assertEqual(step.parent_step_id, step_retrieved.parent_step_id)
+
+    def test_Process_singularity(self) -> None:
+        """Test pointers made for single object keep pointing to it."""
+        assert isinstance(self.proc1.id_, int)
+        assert isinstance(self.proc2.id_, int)
+        self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)])
+        p_retrieved = Process.by_id(self.db_conn, self.proc1.id_)
+        self.assertEqual(self.proc1.explicit_steps, p_retrieved.explicit_steps)
+
+    def test_Process_versioned_attributes_singularity(self) -> None:
+        """Test behavior of VersionedAttributes on saving (with .title)."""
+        assert isinstance(self.proc1.id_, int)
+        self.proc1.title.set('named')
+        p_loaded = Process.by_id(self.db_conn, self.proc1.id_)
+        self.assertEqual(self.proc1.title.history, p_loaded.title.history)
+
 
 class TestsWithServer(TestCaseWithServer):
     """Module tests against our HTTP server/handler (and database)."""
 
     def test_do_POST_process(self) -> None:
         """Test POST /process and its effect on the database."""
-        form_data = {'title': 'foo', 'description': 'foo',
-                     'effort': 1.1, 'children': [1, 2]}
+        self.assertEqual(0, len(Process.all(self.db_conn)))
+        form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1}
+        self.check_post(form_data, '/process?id=', 302, '/')
+        self.assertEqual(1, len(Process.all(self.db_conn)))
         self.check_post(form_data, '/process?id=FOO', 400)
         form_data['effort'] = 'foo'
         self.check_post(form_data, '/process?id=', 400)
-        form_data['effort'] = 1.1
-        form_data['children'] = 1.1
-        self.check_post(form_data, '/process?id=', 400)
-        form_data['children'] = 'a'
-        self.check_post(form_data, '/process?id=', 400)
-        form_data['children'] = [1, 1.2]
-        self.check_post(form_data, '/process?id=', 400)
-        form_data['children'] = [1, 'b']
-        self.check_post(form_data, '/process?id=', 400)
         self.check_post({}, '/process?id=', 400)
         form_data = {'title': '', 'description': ''}
         self.check_post(form_data, '/process?id=', 400)
@@ -88,31 +169,24 @@ class TestsWithServer(TestCaseWithServer):
         self.check_post(form_data, '/process?id=', 400)
         form_data = {'description': '', 'effort': 1.0}
         self.check_post(form_data, '/process?id=', 400)
-        form_data = {'title': '', 'description': '',
-                     'effort': 1.1, 'children': [1, 2]}
+        self.assertEqual(1, len(Process.all(self.db_conn)))
+        form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.0,
+                     'condition': []}
+        self.check_post(form_data, '/process?id=', 302, '/')
+        form_data['condition'] = [1]
+        self.check_post(form_data, '/process?id=', 404)
+        form_data_cond = {'title': 'foo', 'description': 'foo'}
+        self.check_post(form_data_cond, '/condition', 302, '/')
         self.check_post(form_data, '/process?id=', 302, '/')
-        retrieved_1 = Process.by_id(self.db_conn, 1)
-        self.assertEqual(retrieved_1.title.newest, '')
-        self.assertEqual(retrieved_1.child_ids, [1, 2])
-        form_data['children'] = []
+        form_data['disables'] = [1]
         self.check_post(form_data, '/process?id=', 302, '/')
-        retrieved_2 = Process.by_id(self.db_conn, 2)
-        self.assertEqual(retrieved_2.child_ids, [])
-        del form_data['children']
+        form_data['enables'] = [1]
         self.check_post(form_data, '/process?id=', 302, '/')
-        retrieved_3 = Process.by_id(self.db_conn, 3)
-        self.assertEqual(retrieved_2.child_ids, [])
-        self.check_post(form_data, '/process?id=1', 302, '/')
-        self.assertEqual([p.id_ for p in Process.all(self.db_conn)],
-                         [retrieved_1.id_, retrieved_2.id_, retrieved_3.id_])
-        retrieved_1 = Process.by_id(self.db_conn, 1)
-        self.assertEqual(retrieved_1.child_ids, [])
-        self.check_post(form_data, '/process', 302, '/')
 
     def test_do_GET(self) -> None:
         """Test /process and /processes response codes."""
         self.check_get('/process', 200)
         self.check_get('/process?id=', 200)
-        self.check_get('/process?id=0', 400)
+        self.check_get('/process?id=0', 500)
         self.check_get('/process?id=FOO', 400)
         self.check_get('/processes', 200)