X-Git-Url: https://plomlompom.com/repos/berlin_corona.txt?a=blobdiff_plain;f=tests%2Futils.py;h=25cc9ba1e79d663ec692570f6f4c1fce4eaaf911;hb=c021152e6566c8374170de916c69d6b5c816cd54;hp=f76fe33c93fc65d68aa07b7ca04aa0b98c762072;hpb=e3bfd84f9061d5f03ec5f5764f75e4137505ea45;p=plomtask diff --git a/tests/utils.py b/tests/utils.py index f76fe33..25cc9ba 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,12 +1,13 @@ """Shared test utilities.""" +from __future__ import annotations from unittest import TestCase +from typing import Mapping, Any, Callable from threading import Thread from http.client import HTTPConnection from json import loads as json_loads from urllib.parse import urlencode from uuid import uuid4 from os import remove as remove_file -from typing import Mapping, Any from plomtask.db import DatabaseFile, DatabaseConnection from plomtask.http import TaskHandler, TaskServer from plomtask.processes import Process, ProcessStep @@ -61,19 +62,36 @@ class TestCaseWithDB(TestCase): self.db_conn.close() remove_file(self.db_file.path) - def test_saving_and_caching(self) -> None: - """Test storage and initialization of instances and attributes.""" - if not hasattr(self, 'checked_class'): - return - self.check_saving_and_caching(id_=1, **self.default_init_kwargs) - obj = self.checked_class(None, **self.default_init_kwargs) - obj.save(self.db_conn) - self.assertEqual(obj.id_, 2) - for k, v in self.test_versioneds.items(): - self.check_saving_of_versioned(k, v) + @staticmethod + def _within_checked_class(f: Callable[..., None]) -> Callable[..., None]: + def wrapper(self: TestCaseWithDB) -> None: + if hasattr(self, 'checked_class'): + f(self) + return wrapper - def check_storage(self, content: list[Any]) -> None: - """Test cache and DB equal content.""" + def _load_from_db(self, id_: int | str) -> list[object]: + db_found: list[object] = [] + for row in self.db_conn.row_where(self.checked_class.table_name, + 'id', id_): + db_found += [self.checked_class.from_table_row(self.db_conn, + row)] + return db_found + + def _change_obj(self, obj: object) -> str: + attr_name: str = self.checked_class.to_save[-1] + attr = getattr(obj, attr_name) + new_attr: str | int | float | bool + if isinstance(attr, (int, float)): + new_attr = attr + 1 + elif isinstance(attr, str): + new_attr = attr + '_' + elif isinstance(attr, bool): + new_attr = not attr + setattr(obj, attr_name, new_attr) + return attr_name + + def check_identity_with_cache_and_db(self, content: list[Any]) -> None: + """Test both cache and DB equal content.""" expected_cache = {} for item in content: expected_cache[item.id_] = item @@ -82,75 +100,124 @@ class TestCaseWithDB(TestCase): db_found: list[Any] = [] for item in content: assert isinstance(item.id_, type(self.default_ids[0])) - for row in self.db_conn.row_where(self.checked_class.table_name, - 'id', item.id_): - db_found += [self.checked_class.from_table_row(self.db_conn, - row)] + db_found += self._load_from_db(item.id_) hashes_db_found = [hash(x) for x in db_found] self.assertEqual(sorted(hashes_content), sorted(hashes_db_found)) - def check_saving_and_caching(self, **kwargs: Any) -> None: - """Test instance.save in its core without relations.""" - obj = self.checked_class(**kwargs) # pylint: disable=not-callable - # check object init itself doesn't store anything yet - self.check_storage([]) - # check saving sets core attributes properly - obj.save(self.db_conn) - for key, value in kwargs.items(): - self.assertEqual(getattr(obj, key), value) - # check saving stored properly in cache and DB - self.check_storage([obj]) - - def check_saving_of_versioned(self, attr_name: str, type_: type) -> None: - """Test owner's versioned attributes.""" - owner = self.checked_class(None) - vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1] - attr = getattr(owner, attr_name) - attr.set(vals[0]) - attr.set(vals[1]) - owner.save(self.db_conn) - retrieved = owner.__class__.by_id(self.db_conn, owner.id_) - attr = getattr(retrieved, attr_name) - self.assertEqual(sorted(attr.history.values()), vals) - - def check_by_id(self) -> None: - """Test .by_id(), including creation.""" + @_within_checked_class + def test_saving_versioned(self) -> None: + """Test storage and initialization of versioned attributes.""" + def retrieve_attr_vals() -> list[object]: + attr_vals_saved: list[object] = [] + assert hasattr(retrieved, 'id_') + for row in self.db_conn.row_where(attr.table_name, 'parent', + retrieved.id_): + attr_vals_saved += [row[2]] + return attr_vals_saved + for attr_name, type_ in self.test_versioneds.items(): + # fail saving attributes on non-saved owner + owner = self.checked_class(None, **self.default_init_kwargs) + vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1] + attr = getattr(owner, attr_name) + attr.set(vals[0]) + attr.set(vals[1]) + with self.assertRaises(NotFoundException): + attr.save(self.db_conn) + owner.save(self.db_conn) + # check stored attribute is as expected + retrieved = self._load_from_db(owner.id_)[0] + attr = getattr(retrieved, attr_name) + self.assertEqual(sorted(attr.history.values()), vals) + # check owner.save() created entries in attr table + attr_vals_saved = retrieve_attr_vals() + self.assertEqual(vals, attr_vals_saved) + # check setting new val to attr inconsequential to DB without save + attr.set(vals[0]) + attr_vals_saved = retrieve_attr_vals() + self.assertEqual(vals, attr_vals_saved) + # check save finally adds new val + attr.save(self.db_conn) + attr_vals_saved = retrieve_attr_vals() + self.assertEqual(vals + [vals[0]], attr_vals_saved) + + @_within_checked_class + def test_saving_and_caching(self) -> None: + """Test effects of .cache() and .save().""" + id1 = self.default_ids[0] + # check failure to cache without ID (if None-ID input possible) + if isinstance(id1, int): + obj0 = self.checked_class(None, **self.default_init_kwargs) + with self.assertRaises(HandledException): + obj0.cache() + # check mere object init itself doesn't even store in cache + obj1 = self.checked_class(id1, **self.default_init_kwargs) + self.assertEqual(self.checked_class.get_cache(), {}) + # check .cache() fills cache, but not DB + obj1.cache() + self.assertEqual(self.checked_class.get_cache(), {id1: obj1}) + db_found = self._load_from_db(id1) + self.assertEqual(db_found, []) + # check .save() sets ID (for int IDs), updates cache, and fills DB + # (expect ID to be set to id1, despite obj1 already having that as ID: + # it's generated by cursor.lastrowid on the DB table, and with obj1 + # not written there, obj2 should get it first!) + id_input = None if isinstance(id1, int) else id1 + obj2 = self.checked_class(id_input, **self.default_init_kwargs) + obj2.save(self.db_conn) + obj2_hash = hash(obj2) + self.assertEqual(self.checked_class.get_cache(), {id1: obj2}) + db_found += self._load_from_db(id1) + self.assertEqual([hash(o) for o in db_found], [obj2_hash]) + # check we cannot overwrite obj2 with obj1 despite its same ID, + # since it has disappeared now + with self.assertRaises(HandledException): + obj1.save(self.db_conn) + + @_within_checked_class + def test_by_id(self) -> None: + """Test .by_id().""" + id1, id2, _ = self.default_ids # check failure if not yet saved - id1, id2 = self.default_ids[0], self.default_ids[1] - obj = self.checked_class(id1) # pylint: disable=not-callable + obj1 = self.checked_class(id1, **self.default_init_kwargs) with self.assertRaises(NotFoundException): self.checked_class.by_id(self.db_conn, id1) + # check identity of cached and retrieved + obj1.cache() + self.assertEqual(obj1, self.checked_class.by_id(self.db_conn, id1)) # check identity of saved and retrieved - obj.save(self.db_conn) - self.assertEqual(obj, self.checked_class.by_id(self.db_conn, id1)) - # check create=True acts like normal instantiation (sans saving) - by_id_created = self.checked_class.by_id(self.db_conn, id2, - create=True) - # pylint: disable=not-callable - self.assertEqual(self.checked_class(id2), by_id_created) - self.check_storage([obj]) + obj2 = self.checked_class(id2, **self.default_init_kwargs) + obj2.save(self.db_conn) + self.assertEqual(obj2, self.checked_class.by_id(self.db_conn, id2)) + + @_within_checked_class + def test_by_id_or_create(self) -> None: + """Test .by_id_or_create.""" + # check .by_id_or_create acts like normal instantiation (sans saving) + id_ = self.default_ids[0] + if not self.checked_class.can_create_by_id: + with self.assertRaises(HandledException): + self.checked_class.by_id_or_create(self.db_conn, id_) + # check .by_id_or_create fails if wrong class + else: + by_id_created = self.checked_class.by_id_or_create(self.db_conn, + id_) + with self.assertRaises(NotFoundException): + self.checked_class.by_id(self.db_conn, id_) + self.assertEqual(self.checked_class(id_), by_id_created) + @_within_checked_class def test_from_table_row(self) -> None: - """Test .from_table_row() properly reads in class from DB.""" - if not hasattr(self, 'checked_class'): - return + """Test .from_table_row() properly reads in class directly from DB.""" id_ = self.default_ids[0] obj = self.checked_class(id_, **self.default_init_kwargs) obj.save(self.db_conn) - assert isinstance(obj.id_, type(self.default_ids[0])) + assert isinstance(obj.id_, type(id_)) for row in self.db_conn.row_where(self.checked_class.table_name, 'id', obj.id_): # check .from_table_row reproduces state saved, no matter if obj # later changed (with caching even) hash_original = hash(obj) - attr_name = self.checked_class.to_save[-1] - attr = getattr(obj, attr_name) - if isinstance(attr, (int, float)): - setattr(obj, attr_name, attr + 1) - elif isinstance(attr, str): - setattr(obj, attr_name, attr + "_") - elif isinstance(attr, bool): - setattr(obj, attr_name, not attr) + attr_name = self._change_obj(obj) obj.cache() to_cmp = getattr(obj, attr_name) retrieved = self.checked_class.from_table_row(self.db_conn, row) @@ -159,68 +226,77 @@ class TestCaseWithDB(TestCase): # check cache contains what .from_table_row just produced self.assertEqual({retrieved.id_: retrieved}, self.checked_class.get_cache()) + # check .from_table_row also reads versioned attributes from DB + for attr_name, type_ in self.test_versioneds.items(): + owner = self.checked_class(None) + vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1] + attr = getattr(owner, attr_name) + attr.set(vals[0]) + attr.set(vals[1]) + owner.save(self.db_conn) + for row in self.db_conn.row_where(owner.table_name, 'id', + owner.id_): + retrieved = owner.__class__.from_table_row(self.db_conn, row) + attr = getattr(retrieved, attr_name) + self.assertEqual(sorted(attr.history.values()), vals) - def check_versioned_from_table_row(self, attr_name: str, - type_: type) -> None: - """Test .from_table_row() reads versioned attributes from DB.""" - owner = self.checked_class(None) - vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1] - attr = getattr(owner, attr_name) - attr.set(vals[0]) - attr.set(vals[1]) - owner.save(self.db_conn) - for row in self.db_conn.row_where(owner.table_name, 'id', owner.id_): - retrieved = owner.__class__.from_table_row(self.db_conn, row) - attr = getattr(retrieved, attr_name) - self.assertEqual(sorted(attr.history.values()), vals) - - def check_all(self) -> tuple[Any, Any, Any]: - """Test .all().""" - # pylint: disable=not-callable - item1 = self.checked_class(self.default_ids[0]) - item2 = self.checked_class(self.default_ids[1]) - item3 = self.checked_class(self.default_ids[2]) - # check pre-save .all() returns empty list + @_within_checked_class + def test_all(self) -> None: + """Test .all() and its relation to cache and savings.""" + id_1, id_2, id_3 = self.default_ids + item1 = self.checked_class(id_1, **self.default_init_kwargs) + item2 = self.checked_class(id_2, **self.default_init_kwargs) + item3 = self.checked_class(id_3, **self.default_init_kwargs) + # check .all() returns empty list on un-cached items self.assertEqual(self.checked_class.all(self.db_conn), []) - # check that all() shows all saved, but no unsaved items - item1.save(self.db_conn) + # check that all() shows only cached/saved items + item1.cache() item3.save(self.db_conn) self.assertEqual(sorted(self.checked_class.all(self.db_conn)), sorted([item1, item3])) item2.save(self.db_conn) self.assertEqual(sorted(self.checked_class.all(self.db_conn)), sorted([item1, item2, item3])) - return item1, item2, item3 - def check_singularity(self, defaulting_field: str, - non_default_value: Any, *args: Any) -> None: + @_within_checked_class + def test_singularity(self) -> None: """Test pointers made for single object keep pointing to it.""" id1 = self.default_ids[0] - obj = self.checked_class(id1, *args) # pylint: disable=not-callable + obj = self.checked_class(id1, **self.default_init_kwargs) obj.save(self.db_conn) - setattr(obj, defaulting_field, non_default_value) + # change object, expect retrieved through .by_id to carry change + attr_name = self._change_obj(obj) + new_attr = getattr(obj, attr_name) retrieved = self.checked_class.by_id(self.db_conn, id1) - self.assertEqual(non_default_value, - getattr(retrieved, defaulting_field)) + self.assertEqual(new_attr, getattr(retrieved, attr_name)) - def check_versioned_singularity(self) -> None: + @_within_checked_class + def test_versioned_singularity_title(self) -> None: """Test singularity of VersionedAttributes on saving (with .title).""" - obj = self.checked_class(None) # pylint: disable=not-callable - obj.save(self.db_conn) - assert isinstance(obj.id_, int) - obj.title.set('named') - retrieved = self.checked_class.by_id(self.db_conn, obj.id_) - self.assertEqual(obj.title.history, retrieved.title.history) + if 'title' in self.test_versioneds: + obj = self.checked_class(None) + obj.save(self.db_conn) + assert isinstance(obj.id_, int) + # change obj, expect retrieved through .by_id to carry change + obj.title.set('named') + retrieved = self.checked_class.by_id(self.db_conn, obj.id_) + self.assertEqual(obj.title.history, retrieved.title.history) - def check_remove(self, *args: Any) -> None: + @_within_checked_class + def test_remove(self) -> None: """Test .remove() effects on DB and cache.""" id_ = self.default_ids[0] - obj = self.checked_class(id_, *args) # pylint: disable=not-callable + obj = self.checked_class(id_, **self.default_init_kwargs) + # check removal only works after saving with self.assertRaises(HandledException): obj.remove(self.db_conn) obj.save(self.db_conn) obj.remove(self.db_conn) - self.check_storage([]) + # check access to obj fails after removal + with self.assertRaises(HandledException): + print(obj.id_) + # check DB and cache now empty + self.check_identity_with_cache_and_db([]) class TestCaseWithServer(TestCaseWithDB):