home · contact · privacy
Replace ProcessChildren with more flexible ProcessStep infrastructure.
[plomtask] / tests / processes.py
index 02f664477fa5147b0396538e9a63263dca9ce73a..ac519c8fc1b1b75700a8442cfe55132b43dadda6 100644 (file)
@@ -1,5 +1,6 @@
 """Test Processes module."""
 from unittest import TestCase
 """Test Processes module."""
 from unittest import TestCase
+from typing import Any
 from tests.utils import TestCaseWithDB, TestCaseWithServer
 from plomtask.processes import Process
 from plomtask.exceptions import NotFoundException, BadFormatException
 from tests.utils import TestCaseWithDB, TestCaseWithServer
 from plomtask.processes import Process
 from plomtask.exceptions import NotFoundException, BadFormatException
@@ -14,41 +15,91 @@ class TestsSansDB(TestCase):
         self.assertEqual(Process(None).description.newest, '')
         self.assertEqual(Process(None).effort.newest, 1.0)
 
         self.assertEqual(Process(None).description.newest, '')
         self.assertEqual(Process(None).effort.newest, 1.0)
 
+    def test_Process_legal_ID(self) -> None:
+        """Test Process cannot be instantiated with id_=0."""
+        with self.assertRaises(BadFormatException):
+            Process(0)
+
 
 class TestsWithDB(TestCaseWithDB):
     """Mdule tests not requiring DB setup."""
 
 
 class TestsWithDB(TestCaseWithDB):
     """Mdule tests not requiring DB setup."""
 
-    def test_Process_save(self) -> None:
-        """Test Process.save()."""
-        p_saved = Process(None)
-        p_saved.save(self.db_conn)
-        self.assertEqual(p_saved.id_,
+    def test_Process_ids(self) -> None:
+        """Test Process.save_without_steps() re Process.id_."""
+        p = Process(None)
+        p.save_without_steps(self.db_conn)
+        self.assertEqual(p.id_,
                          Process.by_id(self.db_conn, 1, create=False).id_)
                          Process.by_id(self.db_conn, 1, create=False).id_)
-        with self.assertRaises(BadFormatException):
-            p_saved = Process(0)
-        p_saved = Process(5)
-        p_saved.save(self.db_conn)
-        self.assertEqual(p_saved.id_,
+        p = Process(None)
+        p.save_without_steps(self.db_conn)
+        self.assertEqual(p.id_,
+                         Process.by_id(self.db_conn, 2, create=False).id_)
+        p = Process(5)
+        p.save_without_steps(self.db_conn)
+        self.assertEqual(p.id_,
                          Process.by_id(self.db_conn, 5, create=False).id_)
                          Process.by_id(self.db_conn, 5, create=False).id_)
-        p_saved.title.set('named')
-        p_loaded = Process.by_id(self.db_conn, p_saved.id_)
-        self.assertNotEqual(p_saved.title.history, p_loaded.title.history)
-        p_saved.save(self.db_conn)
-        p_loaded = Process.by_id(self.db_conn, p_saved.id_)
-        self.assertEqual(p_saved.title.history, p_loaded.title.history)
-        p_9 = Process(9)
-        p_9.child_ids = [4]
-        with self.assertRaises(NotFoundException):
-            p_9.save(self.db_conn)
-        p_9.child_ids = [5]
-        p_9.save(self.db_conn)
-        p_5 = Process.by_id(self.db_conn, 5)
-        p_5.child_ids = [1]
-        p_5.save(self.db_conn)
-        p_1 = Process.by_id(self.db_conn, 1)
-        p_1.child_ids = [9]
-        with self.assertRaises(BadFormatException):
-            p_1.save(self.db_conn)
+
+    def test_Process_versioned_attributes(self) -> None:
+        """Test behavior of VersionedAttributes on saving (with .title)."""
+        p = Process(None)
+        p.save_without_steps(self.db_conn)
+        p.title.set('named')
+        p_loaded = Process.by_id(self.db_conn, p.id_)
+        self.assertNotEqual(p.title.history, p_loaded.title.history)
+        p.save_without_steps(self.db_conn)
+        p_loaded = Process.by_id(self.db_conn, p.id_)
+        self.assertEqual(p.title.history, p_loaded.title.history)
+
+    def test_Process_steps(self) -> None:
+        """Test addition, nesting, and non-recursion of ProcessSteps"""
+        p_1 = Process(1)
+        p_1.save_without_steps(self.db_conn)
+        assert p_1.id_ is not None
+        p_2 = Process(2)
+        p_2.save_without_steps(self.db_conn)
+        assert p_2.id_ is not None
+        p_3 = Process(3)
+        p_3.save_without_steps(self.db_conn)
+        assert p_3.id_ is not None
+        p_1.add_step(self.db_conn, None, p_2.id_, None)
+        p_1_dict: dict[int, dict[str, Any]] = {1: {
+            'process': p_2, 'parent_id': None,
+            'is_explicit': True, 'steps': {}
+        }}
+        self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict)
+        s_b = p_1.add_step(self.db_conn, None, p_3.id_, None)
+        p_1_dict[2] = {
+            'process': p_3, 'parent_id': None,
+            'is_explicit': True, 'steps': {}
+        }
+        self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict)
+        s_c = p_2.add_step(self.db_conn, None, p_3.id_, None)
+        assert s_c.id_ is not None
+        p_1_dict[1]['steps'] = {3: {
+            'process': p_3, 'parent_id': None,
+            'is_explicit': False, 'steps': {}
+        }}
+        self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict)
+        p_1.add_step(self.db_conn, None, p_2.id_, s_b.id_)
+        p_1_dict[2]['steps'][4] = {
+            'process': p_2, 'parent_id': s_b.id_,
+            'is_explicit': True, 'steps': {3: {
+                'process': p_3, 'parent_id': None,
+                'is_explicit': False, 'steps': {}
+                }}}
+        self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict)
+        p_1.add_step(self.db_conn, None, p_3.id_, 999)
+        p_1_dict[5] = {
+            'process': p_3, 'parent_id': None,
+            'is_explicit': True, 'steps': {}
+        }
+        self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict)
+        p_1.add_step(self.db_conn, None, p_3.id_, 3)
+        p_1_dict[6] = {
+            'process': p_3, 'parent_id': None,
+            'is_explicit': True, 'steps': {}
+        }
+        self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict)
 
     def test_Process_by_id(self) -> None:
         """Test Process.by_id()."""
 
     def test_Process_by_id(self) -> None:
         """Test Process.by_id()."""
@@ -68,9 +119,9 @@ class TestsWithDB(TestCaseWithDB):
     def test_Process_all(self) -> None:
         """Test Process.all()."""
         p_1 = Process(None)
     def test_Process_all(self) -> None:
         """Test Process.all()."""
         p_1 = Process(None)
-        p_1.save(self.db_conn)
+        p_1.save_without_steps(self.db_conn)
         p_2 = Process(None)
         p_2 = Process(None)
-        p_2.save(self.db_conn)
+        p_2.save_without_steps(self.db_conn)
         self.assertEqual({p_1.id_, p_2.id_},
                          set(p.id_ for p in Process.all(self.db_conn)))
 
         self.assertEqual({p_1.id_, p_2.id_},
                          set(p.id_ for p in Process.all(self.db_conn)))
 
@@ -80,8 +131,10 @@ class TestsWithServer(TestCaseWithServer):
 
     def test_do_POST_process(self) -> None:
         """Test POST /process and its effect on the database."""
 
     def test_do_POST_process(self) -> None:
         """Test POST /process and its effect on the database."""
+        self.assertEqual(0, len(Process.all(self.db_conn)))
         form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1}
         self.check_post(form_data, '/process?id=', 302, '/')
         form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1}
         self.check_post(form_data, '/process?id=', 302, '/')
+        self.assertEqual(1, len(Process.all(self.db_conn)))
         self.check_post(form_data, '/process?id=FOO', 400)
         form_data['effort'] = 'foo'
         self.check_post(form_data, '/process?id=', 400)
         self.check_post(form_data, '/process?id=FOO', 400)
         form_data['effort'] = 'foo'
         self.check_post(form_data, '/process?id=', 400)
@@ -92,30 +145,7 @@ class TestsWithServer(TestCaseWithServer):
         self.check_post(form_data, '/process?id=', 400)
         form_data = {'description': '', 'effort': 1.0}
         self.check_post(form_data, '/process?id=', 400)
         self.check_post(form_data, '/process?id=', 400)
         form_data = {'description': '', 'effort': 1.0}
         self.check_post(form_data, '/process?id=', 400)
-        form_data = {'title': '', 'description': '',
-                     'effort': 1.1, 'children': [1]}
-        self.check_post(form_data, '/process?id=', 302, '/')
-        form_data['children'] = 1.1
-        self.check_post(form_data, '/process?id=', 400)
-        form_data['children'] = 'a'
-        self.check_post(form_data, '/process?id=', 400)
-        form_data['children'] = [1.2]
-        self.check_post(form_data, '/process?id=', 400)
-        form_data['children'] = ['b']
-        self.check_post(form_data, '/process?id=', 400)
-        form_data['children'] = [2]
-        self.check_post(form_data, '/process?id=1', 400)
-        retrieved_1 = Process.by_id(self.db_conn, 1)
-        self.assertEqual(retrieved_1.title.newest, 'foo')
-        self.assertEqual(retrieved_1.child_ids, [])
-        retrieved_2 = Process.by_id(self.db_conn, 2)
-        self.assertEqual(retrieved_2.child_ids, [1])
-        form_data = {'title': 'bar', 'description': 'bar', 'effort': 1.1}
-        self.check_post(form_data, '/process?id=1', 302, '/')
-        retrieved_1 = Process.by_id(self.db_conn, 1)
-        self.assertEqual(retrieved_1.title.newest, 'bar')
-        self.assertEqual([p.id_ for p in Process.all(self.db_conn)],
-                         [retrieved_1.id_, retrieved_2.id_])
+        self.assertEqual(1, len(Process.all(self.db_conn)))
 
     def test_do_GET(self) -> None:
         """Test /process and /processes response codes."""
 
     def test_do_GET(self) -> None:
         """Test /process and /processes response codes."""