home · contact · privacy
Hide already-seen descendants of implicit ProcessSteps.
[plomtask] / tests / processes.py
index e8967f70273e2345df966d599d50fdbeb4069e39..d6a9899a4910a5d60fbd91c0cacf65d5edaa3ec9 100644 (file)
@@ -1,6 +1,6 @@
 """Test Processes module."""
 from unittest import TestCase
-from urllib.parse import urlencode
+from typing import Any
 from tests.utils import TestCaseWithDB, TestCaseWithServer
 from plomtask.processes import Process
 from plomtask.exceptions import NotFoundException, BadFormatException
@@ -15,28 +15,91 @@ class TestsSansDB(TestCase):
         self.assertEqual(Process(None).description.newest, '')
         self.assertEqual(Process(None).effort.newest, 1.0)
 
+    def test_Process_legal_ID(self) -> None:
+        """Test Process cannot be instantiated with id_=0."""
+        with self.assertRaises(BadFormatException):
+            Process(0)
+
 
 class TestsWithDB(TestCaseWithDB):
     """Mdule tests not requiring DB setup."""
 
-    def test_Process_save(self) -> None:
-        """Test Process.save()."""
-        p_saved = Process(None)
-        p_saved.save(self.db_conn)
-        self.assertEqual(p_saved.id_,
+    def test_Process_ids(self) -> None:
+        """Test Process.save_without_steps() re Process.id_."""
+        p = Process(None)
+        p.save_without_steps(self.db_conn)
+        self.assertEqual(p.id_,
                          Process.by_id(self.db_conn, 1, create=False).id_)
-        with self.assertRaises(BadFormatException):
-            p_saved = Process(0)
-        p_saved = Process(5)
-        p_saved.save(self.db_conn)
-        self.assertEqual(p_saved.id_,
+        p = Process(None)
+        p.save_without_steps(self.db_conn)
+        self.assertEqual(p.id_,
+                         Process.by_id(self.db_conn, 2, create=False).id_)
+        p = Process(5)
+        p.save_without_steps(self.db_conn)
+        self.assertEqual(p.id_,
                          Process.by_id(self.db_conn, 5, create=False).id_)
-        p_saved.title.set('named')
-        p_loaded = Process.by_id(self.db_conn, p_saved.id_)
-        self.assertNotEqual(p_saved.title.history, p_loaded.title.history)
-        p_saved.save(self.db_conn)
-        p_loaded = Process.by_id(self.db_conn, p_saved.id_)
-        self.assertEqual(p_saved.title.history, p_loaded.title.history)
+
+    def test_Process_versioned_attributes(self) -> None:
+        """Test behavior of VersionedAttributes on saving (with .title)."""
+        p = Process(None)
+        p.save_without_steps(self.db_conn)
+        p.title.set('named')
+        p_loaded = Process.by_id(self.db_conn, p.id_)
+        self.assertNotEqual(p.title.history, p_loaded.title.history)
+        p.save_without_steps(self.db_conn)
+        p_loaded = Process.by_id(self.db_conn, p.id_)
+        self.assertEqual(p.title.history, p_loaded.title.history)
+
+    def test_Process_steps(self) -> None:
+        """Test addition, nesting, and non-recursion of ProcessSteps"""
+        p_1 = Process(1)
+        p_1.save_without_steps(self.db_conn)
+        assert p_1.id_ is not None
+        p_2 = Process(2)
+        p_2.save_without_steps(self.db_conn)
+        assert p_2.id_ is not None
+        p_3 = Process(3)
+        p_3.save_without_steps(self.db_conn)
+        assert p_3.id_ is not None
+        p_1.add_step(self.db_conn, None, p_2.id_, None)
+        p_1_dict: dict[int, dict[str, Any]] = {1: {
+            'process': p_2, 'parent_id': None,
+            'is_explicit': True, 'steps': {}, 'seen': False
+        }}
+        self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict)
+        s_b = p_1.add_step(self.db_conn, None, p_3.id_, None)
+        p_1_dict[2] = {
+            'process': p_3, 'parent_id': None,
+            'is_explicit': True, 'steps': {}, 'seen': False
+        }
+        self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict)
+        s_c = p_2.add_step(self.db_conn, None, p_3.id_, None)
+        assert s_c.id_ is not None
+        p_1_dict[1]['steps'] = {3: {
+            'process': p_3, 'parent_id': None,
+            'is_explicit': False, 'steps': {}, 'seen': False
+        }}
+        self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict)
+        p_1.add_step(self.db_conn, None, p_2.id_, s_b.id_)
+        p_1_dict[2]['steps'][4] = {
+            'process': p_2, 'parent_id': s_b.id_, 'seen': False,
+            'is_explicit': True, 'steps': {3: {
+                'process': p_3, 'parent_id': None,
+                'is_explicit': False, 'steps': {}, 'seen': True
+                }}}
+        self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict)
+        p_1.add_step(self.db_conn, None, p_3.id_, 999)
+        p_1_dict[5] = {
+            'process': p_3, 'parent_id': None,
+            'is_explicit': True, 'steps': {}, 'seen': False
+        }
+        self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict)
+        p_1.add_step(self.db_conn, None, p_3.id_, 3)
+        p_1_dict[6] = {
+            'process': p_3, 'parent_id': None,
+            'is_explicit': True, 'steps': {}, 'seen': False
+        }
+        self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict)
 
     def test_Process_by_id(self) -> None:
         """Test Process.by_id()."""
@@ -56,9 +119,9 @@ class TestsWithDB(TestCaseWithDB):
     def test_Process_all(self) -> None:
         """Test Process.all()."""
         p_1 = Process(None)
-        p_1.save(self.db_conn)
+        p_1.save_without_steps(self.db_conn)
         p_2 = Process(None)
-        p_2.save(self.db_conn)
+        p_2.save_without_steps(self.db_conn)
         self.assertEqual({p_1.id_, p_2.id_},
                          set(p.id_ for p in Process.all(self.db_conn)))
 
@@ -68,36 +131,26 @@ class TestsWithServer(TestCaseWithServer):
 
     def test_do_POST_process(self) -> None:
         """Test POST /process and its effect on the database."""
-        def post_data_to_expect(form_data: dict[str, object],
-                                to_: str, expect: int) -> None:
-            encoded_form_data = urlencode(form_data).encode('utf-8')
-            headers = {'Content-Type': 'application/x-www-form-urlencoded',
-                       'Content-Length': str(len(encoded_form_data))}
-            self.conn.request('POST', to_,
-                              body=encoded_form_data, headers=headers)
-            self.assertEqual(self.conn.getresponse().status, expect)
-        form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.0}
-        post_data_to_expect(form_data, '/process?id=FOO', 400)
+        self.assertEqual(0, len(Process.all(self.db_conn)))
+        form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1}
+        self.check_post(form_data, '/process?id=', 302, '/')
+        self.assertEqual(1, len(Process.all(self.db_conn)))
+        self.check_post(form_data, '/process?id=FOO', 400)
         form_data['effort'] = 'foo'
-        post_data_to_expect(form_data, '/process?id=', 400)
-        form_data['effort'] = None
-        post_data_to_expect(form_data, '/process?id=', 400)
-        form_data = {'title': None, 'description': 1, 'effort': 1.0}
-        post_data_to_expect(form_data, '/process?id=', 302)
-        retrieved = Process.by_id(self.db_conn, 1)
-        self.assertEqual(retrieved.title.newest, 'None')
-        self.assertEqual([p.id_ for p in Process.all(self.db_conn)],
-                         [retrieved.id_])
+        self.check_post(form_data, '/process?id=', 400)
+        self.check_post({}, '/process?id=', 400)
+        form_data = {'title': '', 'description': ''}
+        self.check_post(form_data, '/process?id=', 400)
+        form_data = {'title': '', 'effort': 1.1}
+        self.check_post(form_data, '/process?id=', 400)
+        form_data = {'description': '', 'effort': 1.0}
+        self.check_post(form_data, '/process?id=', 400)
+        self.assertEqual(1, len(Process.all(self.db_conn)))
 
     def test_do_GET(self) -> None:
         """Test /process and /processes response codes."""
-        self.conn.request('GET', '/process')
-        self.assertEqual(self.conn.getresponse().status, 200)
-        self.conn.request('GET', '/process?id=')
-        self.assertEqual(self.conn.getresponse().status, 200)
-        self.conn.request('GET', '/process?id=0')
-        self.assertEqual(self.conn.getresponse().status, 400)
-        self.conn.request('GET', '/process?id=FOO')
-        self.assertEqual(self.conn.getresponse().status, 400)
-        self.conn.request('GET', '/processes')
-        self.assertEqual(self.conn.getresponse().status, 200)
+        self.check_get('/process', 200)
+        self.check_get('/process?id=', 200)
+        self.check_get('/process?id=0', 400)
+        self.check_get('/process?id=FOO', 400)
+        self.check_get('/processes', 200)