home · contact · privacy
Minor template improvements.
[plomtask] / tests / utils.py
index 61dbb36b949ee3fdfd5a38dcada155a5ab18a924..6f44f611b487f139536822b6340fae5f5ddcf5d5 100644 (file)
@@ -15,10 +15,37 @@ from plomtask.todos import Todo
 from plomtask.exceptions import NotFoundException, HandledException
 
 
+class TestCaseSansDB(TestCase):
+    """Tests requiring no DB setup."""
+    checked_class: Any
+    do_id_test: bool = False
+    default_init_args: list[Any] = []
+    versioned_defaults_to_test: dict[str, str | float] = {}
+
+    def test_id_setting(self) -> None:
+        """Test .id_ being set and its legal range being enforced."""
+        if not self.do_id_test:
+            return
+        with self.assertRaises(HandledException):
+            self.checked_class(0, *self.default_init_args)
+        obj = self.checked_class(5, *self.default_init_args)
+        self.assertEqual(obj.id_, 5)
+
+    def test_versioned_defaults(self) -> None:
+        """Test defaults of VersionedAttributes."""
+        if len(self.versioned_defaults_to_test) == 0:
+            return
+        obj = self.checked_class(1, *self.default_init_args)
+        for k, v in self.versioned_defaults_to_test.items():
+            self.assertEqual(getattr(obj, k).newest, v)
+
+
 class TestCaseWithDB(TestCase):
     """Module tests not requiring DB setup."""
     checked_class: Any
-    default_ids: tuple[int | str, int | str, int | str]
+    default_ids: tuple[int | str, int | str, int | str] = (1, 2, 3)
+    default_init_kwargs: dict[str, Any] = {}
+    test_versioneds: dict[str, type] = {}
 
     def setUp(self) -> None:
         Condition.empty_cache()
@@ -27,14 +54,24 @@ class TestCaseWithDB(TestCase):
         ProcessStep.empty_cache()
         Todo.empty_cache()
         timestamp = datetime.now().timestamp()
-        self.db_file = DatabaseFile(f'test_db:{timestamp}')
-        self.db_file.remake()
+        self.db_file = DatabaseFile.create_at(f'test_db:{timestamp}')
         self.db_conn = DatabaseConnection(self.db_file)
 
     def tearDown(self) -> None:
         self.db_conn.close()
         remove_file(self.db_file.path)
 
+    def test_saving_and_caching(self) -> None:
+        """Test storage and initialization of instances and attributes."""
+        if not hasattr(self, 'checked_class'):
+            return
+        self.check_saving_and_caching(id_=1, **self.default_init_kwargs)
+        obj = self.checked_class(None, **self.default_init_kwargs)
+        obj.save(self.db_conn)
+        self.assertEqual(obj.id_, 2)
+        for k, v in self.test_versioneds.items():
+            self.check_saving_of_versioned(k, v)
+
     def check_storage(self, content: list[Any]) -> None:
         """Test cache and DB equal content."""
         expected_cache = {}
@@ -43,14 +80,14 @@ class TestCaseWithDB(TestCase):
         self.assertEqual(self.checked_class.get_cache(), expected_cache)
         db_found: list[Any] = []
         for item in content:
-            assert isinstance(item.id_, (str, int))
+            assert isinstance(item.id_, type(self.default_ids[0]))
             for row in self.db_conn.row_where(self.checked_class.table_name,
                                               'id', item.id_):
                 db_found += [self.checked_class.from_table_row(self.db_conn,
                                                                row)]
         self.assertEqual(sorted(content), sorted(db_found))
 
-    def check_saving_and_caching(self, **kwargs: Any) -> Any:
+    def check_saving_and_caching(self, **kwargs: Any) -> None:
         """Test instance.save in its core without relations."""
         obj = self.checked_class(**kwargs)  # pylint: disable=not-callable
         # check object init itself doesn't store anything yet
@@ -62,6 +99,19 @@ class TestCaseWithDB(TestCase):
         for key, value in kwargs.items():
             self.assertEqual(getattr(obj, key), value)
 
+    def check_saving_of_versioned(self, attr_name: str, type_: type) -> None:
+        """Test owner's versioned attributes."""
+        owner = self.checked_class(None)
+        vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1]
+        attr = getattr(owner, attr_name)
+        attr.set(vals[0])
+        attr.set(vals[1])
+        owner.save(self.db_conn)
+        owner.uncache()
+        retrieved = owner.__class__.by_id(self.db_conn, owner.id_)
+        attr = getattr(retrieved, attr_name)
+        self.assertEqual(sorted(attr.history.values()), vals)
+
     def check_by_id(self) -> None:
         """Test .by_id(), including creation."""
         # check failure if not yet saved
@@ -79,18 +129,32 @@ class TestCaseWithDB(TestCase):
         self.assertEqual(self.checked_class(id2), by_id_created)
         self.check_storage([obj])
 
-    def check_from_table_row(self) -> None:
+    def check_from_table_row(self, *args: Any) -> None:
         """Test .from_table_row() properly reads in class from DB"""
         id_ = self.default_ids[0]
-        obj = self.checked_class(id_)  # pylint: disable=not-callable
+        obj = self.checked_class(id_, *args)  # pylint: disable=not-callable
         obj.save(self.db_conn)
-        assert isinstance(obj.id_, (str, int))
+        assert isinstance(obj.id_, type(self.default_ids[0]))
         for row in self.db_conn.row_where(self.checked_class.table_name,
                                           'id', obj.id_):
             retrieved = self.checked_class.from_table_row(self.db_conn, row)
             self.assertEqual(obj, retrieved)
             self.assertEqual({obj.id_: obj}, self.checked_class.get_cache())
 
+    def check_versioned_from_table_row(self, attr_name: str,
+                                       type_: type) -> None:
+        """Test .from_table_row() reads versioned attributes from DB."""
+        owner = self.checked_class(None)
+        vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1]
+        attr = getattr(owner, attr_name)
+        attr.set(vals[0])
+        attr.set(vals[1])
+        owner.save(self.db_conn)
+        for row in self.db_conn.row_where(owner.table_name, 'id', owner.id_):
+            retrieved = owner.__class__.from_table_row(self.db_conn, row)
+            attr = getattr(retrieved, attr_name)
+            self.assertEqual(sorted(attr.history.values()), vals)
+
     def check_all(self) -> tuple[Any, Any, Any]:
         """Test .all()."""
         # pylint: disable=not-callable
@@ -110,20 +174,29 @@ class TestCaseWithDB(TestCase):
         return item1, item2, item3
 
     def check_singularity(self, defaulting_field: str,
-                          non_default_value: Any) -> None:
+                          non_default_value: Any, *args: Any) -> None:
         """Test pointers made for single object keep pointing to it."""
         id1 = self.default_ids[0]
-        obj = self.checked_class(id1)  # pylint: disable=not-callable
+        obj = self.checked_class(id1, *args)  # pylint: disable=not-callable
         obj.save(self.db_conn)
         setattr(obj, defaulting_field, non_default_value)
         retrieved = self.checked_class.by_id(self.db_conn, id1)
         self.assertEqual(non_default_value,
                          getattr(retrieved, defaulting_field))
 
-    def check_remove(self) -> None:
+    def check_versioned_singularity(self) -> None:
+        """Test singularity of VersionedAttributes on saving (with .title)."""
+        obj = self.checked_class(None)  # pylint: disable=not-callable
+        obj.save(self.db_conn)
+        assert isinstance(obj.id_, int)
+        obj.title.set('named')
+        retrieved = self.checked_class.by_id(self.db_conn, obj.id_)
+        self.assertEqual(obj.title.history, retrieved.title.history)
+
+    def check_remove(self, *args: Any) -> None:
         """Test .remove() effects on DB and cache."""
         id_ = self.default_ids[0]
-        obj = self.checked_class(id_)  # pylint: disable=not-callable
+        obj = self.checked_class(id_, *args)  # pylint: disable=not-callable
         with self.assertRaises(HandledException):
             obj.remove(self.db_conn)
         obj.save(self.db_conn)
@@ -189,5 +262,6 @@ class TestCaseWithServer(TestCaseWithDB):
         """POST basic Process."""
         if not form_data:
             form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1}
-        self.check_post(form_data, '/process?id=', 302, f'/process?id={id_}')
+        self.check_post(form_data, f'/process?id={id_}', 302,
+                        f'/process?id={id_}')
         return form_data