home · contact · privacy
Enable server to alternatively output response ctx as JSON, for debugging and testing...
[plomtask] / tests / utils.py
index bb37270ca68afde8e00df09a19892011fd371a29..15a53ae0ddc0b78835b5baacea15f43d3a81cba0 100644 (file)
@@ -3,7 +3,7 @@ from unittest import TestCase
 from threading import Thread
 from http.client import HTTPConnection
 from urllib.parse import urlencode
-from datetime import datetime
+from uuid import uuid4
 from os import remove as remove_file
 from typing import Mapping, Any
 from plomtask.db import DatabaseFile, DatabaseConnection
@@ -18,18 +18,25 @@ from plomtask.exceptions import NotFoundException, HandledException
 class TestCaseSansDB(TestCase):
     """Tests requiring no DB setup."""
     checked_class: Any
+    do_id_test: bool = False
+    default_init_args: list[Any] = []
+    versioned_defaults_to_test: dict[str, str | float] = {}
 
-    def check_id_setting(self, *args: Any) -> None:
+    def test_id_setting(self) -> None:
         """Test .id_ being set and its legal range being enforced."""
+        if not self.do_id_test:
+            return
         with self.assertRaises(HandledException):
-            self.checked_class(0, *args)
-        obj = self.checked_class(5, *args)
+            self.checked_class(0, *self.default_init_args)
+        obj = self.checked_class(5, *self.default_init_args)
         self.assertEqual(obj.id_, 5)
 
-    def check_versioned_defaults(self, attrs: dict[str, Any]) -> None:
+    def test_versioned_defaults(self) -> None:
         """Test defaults of VersionedAttributes."""
-        obj = self.checked_class(None)
-        for k, v in attrs.items():
+        if len(self.versioned_defaults_to_test) == 0:
+            return
+        obj = self.checked_class(1, *self.default_init_args)
+        for k, v in self.versioned_defaults_to_test.items():
             self.assertEqual(getattr(obj, k).newest, v)
 
 
@@ -37,6 +44,8 @@ class TestCaseWithDB(TestCase):
     """Module tests not requiring DB setup."""
     checked_class: Any
     default_ids: tuple[int | str, int | str, int | str] = (1, 2, 3)
+    default_init_kwargs: dict[str, Any] = {}
+    test_versioneds: dict[str, type] = {}
 
     def setUp(self) -> None:
         Condition.empty_cache()
@@ -44,21 +53,31 @@ class TestCaseWithDB(TestCase):
         Process.empty_cache()
         ProcessStep.empty_cache()
         Todo.empty_cache()
-        timestamp = datetime.now().timestamp()
-        self.db_file = DatabaseFile(f'test_db:{timestamp}')
-        self.db_file.remake()
+        self.db_file = DatabaseFile.create_at(f'test_db:{uuid4()}')
         self.db_conn = DatabaseConnection(self.db_file)
 
     def tearDown(self) -> None:
         self.db_conn.close()
         remove_file(self.db_file.path)
 
+    def test_saving_and_caching(self) -> None:
+        """Test storage and initialization of instances and attributes."""
+        if not hasattr(self, 'checked_class'):
+            return
+        self.check_saving_and_caching(id_=1, **self.default_init_kwargs)
+        obj = self.checked_class(None, **self.default_init_kwargs)
+        obj.save(self.db_conn)
+        self.assertEqual(obj.id_, 2)
+        for k, v in self.test_versioneds.items():
+            self.check_saving_of_versioned(k, v)
+
     def check_storage(self, content: list[Any]) -> None:
         """Test cache and DB equal content."""
         expected_cache = {}
         for item in content:
             expected_cache[item.id_] = item
         self.assertEqual(self.checked_class.get_cache(), expected_cache)
+        hashes_content = [hash(x) for x in content]
         db_found: list[Any] = []
         for item in content:
             assert isinstance(item.id_, type(self.default_ids[0]))
@@ -66,19 +85,20 @@ class TestCaseWithDB(TestCase):
                                               'id', item.id_):
                 db_found += [self.checked_class.from_table_row(self.db_conn,
                                                                row)]
-        self.assertEqual(sorted(content), sorted(db_found))
+        hashes_db_found = [hash(x) for x in db_found]
+        self.assertEqual(sorted(hashes_content), sorted(hashes_db_found))
 
-    def check_saving_and_caching(self, **kwargs: Any) -> Any:
+    def check_saving_and_caching(self, **kwargs: Any) -> None:
         """Test instance.save in its core without relations."""
         obj = self.checked_class(**kwargs)  # pylint: disable=not-callable
         # check object init itself doesn't store anything yet
         self.check_storage([])
-        # check saving stores in cache and DB
+        # check saving sets core attributes properly
         obj.save(self.db_conn)
-        self.check_storage([obj])
-        # check core attributes set properly (and not unset by saving)
         for key, value in kwargs.items():
             self.assertEqual(getattr(obj, key), value)
+        # check saving stored properly in cache and DB
+        self.check_storage([obj])
 
     def check_saving_of_versioned(self, attr_name: str, type_: type) -> None:
         """Test owner's versioned attributes."""
@@ -88,7 +108,6 @@ class TestCaseWithDB(TestCase):
         attr.set(vals[0])
         attr.set(vals[1])
         owner.save(self.db_conn)
-        owner.uncache()
         retrieved = owner.__class__.by_id(self.db_conn, owner.id_)
         attr = getattr(retrieved, attr_name)
         self.assertEqual(sorted(attr.history.values()), vals)
@@ -118,9 +137,11 @@ class TestCaseWithDB(TestCase):
         assert isinstance(obj.id_, type(self.default_ids[0]))
         for row in self.db_conn.row_where(self.checked_class.table_name,
                                           'id', obj.id_):
+            hash_original = hash(obj)
             retrieved = self.checked_class.from_table_row(self.db_conn, row)
-            self.assertEqual(obj, retrieved)
-            self.assertEqual({obj.id_: obj}, self.checked_class.get_cache())
+            self.assertEqual(hash_original, hash(retrieved))
+            self.assertEqual({retrieved.id_: retrieved},
+                             self.checked_class.get_cache())
 
     def check_versioned_from_table_row(self, attr_name: str,
                                        type_: type) -> None:
@@ -196,6 +217,7 @@ class TestCaseWithServer(TestCaseWithDB):
         self.server_thread.start()
         self.conn = HTTPConnection(str(self.httpd.server_address[0]),
                                    self.httpd.server_address[1])
+        self.httpd.set_json_mode()
 
     def tearDown(self) -> None:
         self.httpd.shutdown()
@@ -243,5 +265,6 @@ class TestCaseWithServer(TestCaseWithDB):
         """POST basic Process."""
         if not form_data:
             form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1}
-        self.check_post(form_data, '/process?id=', 302, f'/process?id={id_}')
+        self.check_post(form_data, f'/process?id={id_}', 302,
+                        f'/process?id={id_}')
         return form_data