home · contact · privacy
Refactor Process/ProcessStep setting and saving.
[plomtask] / tests / processes.py
index bda62750442501445fae1883f9a8a2b280aa18f8..960fd707eee4345cb4350c065d9dcc974e94405f 100644 (file)
@@ -3,6 +3,7 @@ from unittest import TestCase
 from typing import Any
 from tests.utils import TestCaseWithDB, TestCaseWithServer
 from plomtask.processes import Process, ProcessStep
 from typing import Any
 from tests.utils import TestCaseWithDB, TestCaseWithServer
 from plomtask.processes import Process, ProcessStep
+from plomtask.conditions import Condition
 from plomtask.exceptions import NotFoundException, BadFormatException
 
 
 from plomtask.exceptions import NotFoundException, BadFormatException
 
 
@@ -24,74 +25,104 @@ class TestsSansDB(TestCase):
 class TestsWithDB(TestCaseWithDB):
     """Mdule tests not requiring DB setup."""
 
 class TestsWithDB(TestCaseWithDB):
     """Mdule tests not requiring DB setup."""
 
+    def setUp(self) -> None:
+        super().setUp()
+        self.proc1 = Process(None)
+        self.proc1.save(self.db_conn)
+        self.proc2 = Process(None)
+        self.proc2.save(self.db_conn)
+        self.proc3 = Process(None)
+        self.proc3.save(self.db_conn)
+
     def test_Process_ids(self) -> None:
     def test_Process_ids(self) -> None:
-        """Test Process.save_without_steps() re Process.id_."""
-        p = Process(None)
-        p.save_without_steps(self.db_conn)
-        self.assertEqual(p.id_,
+        """Test Process.save() re Process.id_."""
+        self.assertEqual(self.proc1.id_,
                          Process.by_id(self.db_conn, 1, create=False).id_)
                          Process.by_id(self.db_conn, 1, create=False).id_)
-        p = Process(None)
-        p.save_without_steps(self.db_conn)
-        self.assertEqual(p.id_,
+        self.assertEqual(self.proc2.id_,
                          Process.by_id(self.db_conn, 2, create=False).id_)
                          Process.by_id(self.db_conn, 2, create=False).id_)
-        p = Process(5)
-        p.save_without_steps(self.db_conn)
-        self.assertEqual(p.id_,
+        proc5 = Process(5)
+        proc5.save(self.db_conn)
+        self.assertEqual(proc5.id_,
                          Process.by_id(self.db_conn, 5, create=False).id_)
 
     def test_Process_steps(self) -> None:
         """Test addition, nesting, and non-recursion of ProcessSteps"""
                          Process.by_id(self.db_conn, 5, create=False).id_)
 
     def test_Process_steps(self) -> None:
         """Test addition, nesting, and non-recursion of ProcessSteps"""
-        p_1 = Process(1)
-        p_1.save_without_steps(self.db_conn)
-        assert p_1.id_ is not None
-        p_2 = Process(2)
-        p_2.save_without_steps(self.db_conn)
-        assert p_2.id_ is not None
-        p_3 = Process(3)
-        p_3.save_without_steps(self.db_conn)
-        assert p_3.id_ is not None
-        p_1.add_step(self.db_conn, None, p_2.id_, None)
+        def add_step(proc: Process,
+                     steps_proc: list[tuple[int | None, int, int | None]],
+                     step_tuple: tuple[int | None, int, int | None],
+                     expected_id: int) -> None:
+            steps_proc += [step_tuple]
+            proc.set_steps(self.db_conn, steps_proc)
+            steps_proc[-1] = (expected_id, step_tuple[1], step_tuple[2])
+        assert self.proc2.id_ is not None
+        assert self.proc3.id_ is not None
+        steps_proc1: list[tuple[int | None, int, int | None]] = []
+        add_step(self.proc1, steps_proc1, (None, self.proc2.id_, None), 1)
         p_1_dict: dict[int, dict[str, Any]] = {1: {
         p_1_dict: dict[int, dict[str, Any]] = {1: {
-            'process': p_2, 'parent_id': None,
+            'process': self.proc2, 'parent_id': None,
             'is_explicit': True, 'steps': {}, 'seen': False
         }}
             'is_explicit': True, 'steps': {}, 'seen': False
         }}
-        self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict)
-        s_b = p_1.add_step(self.db_conn, None, p_3.id_, None)
+        self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
+        add_step(self.proc1, steps_proc1, (None, self.proc3.id_, None), 2)
+        step_2 = self.proc1.explicit_steps[-1]
         p_1_dict[2] = {
         p_1_dict[2] = {
-            'process': p_3, 'parent_id': None,
+            'process': self.proc3, 'parent_id': None,
             'is_explicit': True, 'steps': {}, 'seen': False
         }
             'is_explicit': True, 'steps': {}, 'seen': False
         }
-        self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict)
-        s_c = p_2.add_step(self.db_conn, None, p_3.id_, None)
-        assert s_c.id_ is not None
+        self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
+        steps_proc2: list[tuple[int | None, int, int | None]] = []
+        add_step(self.proc2, steps_proc2, (None, self.proc3.id_, None), 3)
         p_1_dict[1]['steps'] = {3: {
         p_1_dict[1]['steps'] = {3: {
-            'process': p_3, 'parent_id': None,
+            'process': self.proc3, 'parent_id': None,
             'is_explicit': False, 'steps': {}, 'seen': False
         }}
             'is_explicit': False, 'steps': {}, 'seen': False
         }}
-        self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict)
-        p_1.add_step(self.db_conn, None, p_2.id_, s_b.id_)
+        self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
+        add_step(self.proc1, steps_proc1, (None, self.proc2.id_, step_2.id_),
+                 4)
         p_1_dict[2]['steps'][4] = {
         p_1_dict[2]['steps'][4] = {
-            'process': p_2, 'parent_id': s_b.id_, 'seen': False,
+            'process': self.proc2, 'parent_id': step_2.id_, 'seen': False,
             'is_explicit': True, 'steps': {3: {
             'is_explicit': True, 'steps': {3: {
-                'process': p_3, 'parent_id': None,
+                'process': self.proc3, 'parent_id': None,
                 'is_explicit': False, 'steps': {}, 'seen': True
                 }}}
                 'is_explicit': False, 'steps': {}, 'seen': True
                 }}}
-        self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict)
-        p_1.add_step(self.db_conn, None, p_3.id_, 999)
+        self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
+        add_step(self.proc1, steps_proc1, (None, self.proc3.id_, 999), 5)
         p_1_dict[5] = {
         p_1_dict[5] = {
-            'process': p_3, 'parent_id': None,
+            'process': self.proc3, 'parent_id': None,
             'is_explicit': True, 'steps': {}, 'seen': False
         }
             'is_explicit': True, 'steps': {}, 'seen': False
         }
-        self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict)
-        p_1.add_step(self.db_conn, None, p_3.id_, 3)
+        self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
+        add_step(self.proc1, steps_proc1, (None, self.proc3.id_, 3), 6)
         p_1_dict[6] = {
         p_1_dict[6] = {
-            'process': p_3, 'parent_id': None,
+            'process': self.proc3, 'parent_id': None,
             'is_explicit': True, 'steps': {}, 'seen': False
         }
             'is_explicit': True, 'steps': {}, 'seen': False
         }
-        self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict)
-        self.assertEqual(p_1.used_as_step_by(self.db_conn), [])
-        self.assertEqual(p_2.used_as_step_by(self.db_conn), [p_1])
-        self.assertEqual(p_3.used_as_step_by(self.db_conn), [p_1, p_2])
+        self.assertEqual(self.proc1.get_steps(self.db_conn, None),
+                         p_1_dict)
+        self.assertEqual(self.proc1.used_as_step_by(self.db_conn),
+                         [])
+        self.assertEqual(self.proc2.used_as_step_by(self.db_conn),
+                         [self.proc1])
+        self.assertEqual(self.proc3.used_as_step_by(self.db_conn),
+                         [self.proc1, self.proc2])
+
+    def test_Process_conditions(self) -> None:
+        """Test setting Process.conditions/fulfills/undoes."""
+        for target in ('conditions', 'fulfills', 'undoes'):
+            c1 = Condition(None, False)
+            c1.save(self.db_conn)
+            assert c1.id_ is not None
+            c2 = Condition(None, False)
+            c2.save(self.db_conn)
+            assert c2.id_ is not None
+            self.proc1.set_conditions(self.db_conn, [], target)
+            self.assertEqual(getattr(self.proc1, target), [])
+            self.proc1.set_conditions(self.db_conn, [c1.id_], target)
+            self.assertEqual(getattr(self.proc1, target), [c1])
+            self.proc1.set_conditions(self.db_conn, [c2.id_], target)
+            self.assertEqual(getattr(self.proc1, target), [c2])
+            self.proc1.set_conditions(self.db_conn, [c1.id_, c2.id_], target)
+            self.assertEqual(getattr(self.proc1, target), [c1, c2])
 
     def test_Process_by_id(self) -> None:
         """Test Process.by_id()."""
 
     def test_Process_by_id(self) -> None:
         """Test Process.by_id()."""
@@ -99,32 +130,21 @@ class TestsWithDB(TestCaseWithDB):
             Process.by_id(self.db_conn, None, create=False)
         with self.assertRaises(NotFoundException):
             Process.by_id(self.db_conn, 0, create=False)
             Process.by_id(self.db_conn, None, create=False)
         with self.assertRaises(NotFoundException):
             Process.by_id(self.db_conn, 0, create=False)
-        with self.assertRaises(NotFoundException):
-            Process.by_id(self.db_conn, 1, create=False)
-        self.assertNotEqual(Process(1).id_,
+        self.assertNotEqual(self.proc1.id_,
                             Process.by_id(self.db_conn, None, create=True).id_)
                             Process.by_id(self.db_conn, None, create=True).id_)
-        self.assertEqual(Process(1).id_,
-                         Process.by_id(self.db_conn, 1, create=True).id_)
         self.assertEqual(Process(2).id_,
                          Process.by_id(self.db_conn, 2, create=True).id_)
 
     def test_Process_all(self) -> None:
         """Test Process.all()."""
         self.assertEqual(Process(2).id_,
                          Process.by_id(self.db_conn, 2, create=True).id_)
 
     def test_Process_all(self) -> None:
         """Test Process.all()."""
-        p_1 = Process(None)
-        p_1.save_without_steps(self.db_conn)
-        p_2 = Process(None)
-        p_2.save_without_steps(self.db_conn)
-        self.assertEqual({p_1.id_, p_2.id_},
+        self.assertEqual({self.proc1.id_, self.proc2.id_, self.proc3.id_},
                          set(p.id_ for p in Process.all(self.db_conn)))
 
     def test_ProcessStep_singularity(self) -> None:
         """Test pointers made for single object keep pointing to it."""
                          set(p.id_ for p in Process.all(self.db_conn)))
 
     def test_ProcessStep_singularity(self) -> None:
         """Test pointers made for single object keep pointing to it."""
-        p_1 = Process(None)
-        p_1.save_without_steps(self.db_conn)
-        p_2 = Process(None)
-        p_2.save_without_steps(self.db_conn)
-        assert p_2.id_ is not None
-        step = p_1.add_step(self.db_conn, None, p_2.id_, None)
+        assert self.proc2.id_ is not None
+        self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)])
+        step = self.proc1.explicit_steps[-1]
         assert step.id_ is not None
         step_retrieved = ProcessStep.by_id(self.db_conn, step.id_)
         step.parent_step_id = 99
         assert step.id_ is not None
         step_retrieved = ProcessStep.by_id(self.db_conn, step.id_)
         step.parent_step_id = 99
@@ -132,22 +152,16 @@ class TestsWithDB(TestCaseWithDB):
 
     def test_Process_singularity(self) -> None:
         """Test pointers made for single object keep pointing to it."""
 
     def test_Process_singularity(self) -> None:
         """Test pointers made for single object keep pointing to it."""
-        p_1 = Process(None)
-        p_1.save_without_steps(self.db_conn)
-        p_2 = Process(None)
-        p_2.save_without_steps(self.db_conn)
-        assert p_2.id_ is not None
-        p_1.add_step(self.db_conn, None, p_2.id_, None)
-        p_retrieved = Process.by_id(self.db_conn, p_1.id_)
-        self.assertEqual(p_1.explicit_steps, p_retrieved.explicit_steps)
+        assert self.proc2.id_ is not None
+        self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)])
+        p_retrieved = Process.by_id(self.db_conn, self.proc1.id_)
+        self.assertEqual(self.proc1.explicit_steps, p_retrieved.explicit_steps)
 
     def test_Process_versioned_attributes_singularity(self) -> None:
         """Test behavior of VersionedAttributes on saving (with .title)."""
 
     def test_Process_versioned_attributes_singularity(self) -> None:
         """Test behavior of VersionedAttributes on saving (with .title)."""
-        p = Process(None)
-        p.save_without_steps(self.db_conn)
-        p.title.set('named')
-        p_loaded = Process.by_id(self.db_conn, p.id_)
-        self.assertEqual(p.title.history, p_loaded.title.history)
+        self.proc1.title.set('named')
+        p_loaded = Process.by_id(self.db_conn, self.proc1.id_)
+        self.assertEqual(self.proc1.title.history, p_loaded.title.history)
 
 
 class TestsWithServer(TestCaseWithServer):
 
 
 class TestsWithServer(TestCaseWithServer):
@@ -170,6 +184,18 @@ class TestsWithServer(TestCaseWithServer):
         form_data = {'description': '', 'effort': 1.0}
         self.check_post(form_data, '/process?id=', 400)
         self.assertEqual(1, len(Process.all(self.db_conn)))
         form_data = {'description': '', 'effort': 1.0}
         self.check_post(form_data, '/process?id=', 400)
         self.assertEqual(1, len(Process.all(self.db_conn)))
+        form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.0,
+                     'condition': []}
+        self.check_post(form_data, '/process?id=', 302, '/')
+        form_data['condition'] = [1]
+        self.check_post(form_data, '/process?id=', 404)
+        form_data_cond = {'title': 'foo', 'description': 'foo'}
+        self.check_post(form_data_cond, '/condition', 302, '/')
+        self.check_post(form_data, '/process?id=', 302, '/')
+        form_data['undoes'] = [1]
+        self.check_post(form_data, '/process?id=', 302, '/')
+        form_data['fulfills'] = [1]
+        self.check_post(form_data, '/process?id=', 302, '/')
 
     def test_do_GET(self) -> None:
         """Test /process and /processes response codes."""
 
     def test_do_GET(self) -> None:
         """Test /process and /processes response codes."""