X-Git-Url: https://plomlompom.com/repos/berlin_corona.txt?a=blobdiff_plain;f=tests%2Futils.py;h=d6c5b20ac7882281d4958e99e3dbcd6a35de708b;hb=9c62e1b0e5c30ed3fd7a49828749db195bc3e557;hp=fb7e22746a96482e04153adff90a4bfa086bd58c;hpb=5104eb7b33c386b6df7508405917408855e1468c;p=plomtask diff --git a/tests/utils.py b/tests/utils.py index fb7e227..d6c5b20 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -2,8 +2,9 @@ from unittest import TestCase from threading import Thread from http.client import HTTPConnection +from json import loads as json_loads from urllib.parse import urlencode -from datetime import datetime +from uuid import uuid4 from os import remove as remove_file from typing import Mapping, Any from plomtask.db import DatabaseFile, DatabaseConnection @@ -18,18 +19,25 @@ from plomtask.exceptions import NotFoundException, HandledException class TestCaseSansDB(TestCase): """Tests requiring no DB setup.""" checked_class: Any + do_id_test: bool = False + default_init_args: list[Any] = [] + versioned_defaults_to_test: dict[str, str | float] = {} - def check_id_setting(self, *args: Any) -> None: + def test_id_setting(self) -> None: """Test .id_ being set and its legal range being enforced.""" + if not self.do_id_test: + return with self.assertRaises(HandledException): - self.checked_class(0, *args) - obj = self.checked_class(5, *args) + self.checked_class(0, *self.default_init_args) + obj = self.checked_class(5, *self.default_init_args) self.assertEqual(obj.id_, 5) - def check_versioned_defaults(self, attrs: dict[str, Any]) -> None: + def test_versioned_defaults(self) -> None: """Test defaults of VersionedAttributes.""" - obj = self.checked_class(None) - for k, v in attrs.items(): + if len(self.versioned_defaults_to_test) == 0: + return + obj = self.checked_class(1, *self.default_init_args) + for k, v in self.versioned_defaults_to_test.items(): self.assertEqual(getattr(obj, k).newest, v) @@ -37,6 +45,8 @@ class TestCaseWithDB(TestCase): """Module tests not requiring DB setup.""" checked_class: Any default_ids: tuple[int | str, int | str, int | str] = (1, 2, 3) + default_init_kwargs: dict[str, Any] = {} + test_versioneds: dict[str, type] = {} def setUp(self) -> None: Condition.empty_cache() @@ -44,21 +54,31 @@ class TestCaseWithDB(TestCase): Process.empty_cache() ProcessStep.empty_cache() Todo.empty_cache() - timestamp = datetime.now().timestamp() - self.db_file = DatabaseFile(f'test_db:{timestamp}') - self.db_file.remake() + self.db_file = DatabaseFile.create_at(f'test_db:{uuid4()}') self.db_conn = DatabaseConnection(self.db_file) def tearDown(self) -> None: self.db_conn.close() remove_file(self.db_file.path) + def test_saving_and_caching(self) -> None: + """Test storage and initialization of instances and attributes.""" + if not hasattr(self, 'checked_class'): + return + self.check_saving_and_caching(id_=1, **self.default_init_kwargs) + obj = self.checked_class(None, **self.default_init_kwargs) + obj.save(self.db_conn) + self.assertEqual(obj.id_, 2) + for k, v in self.test_versioneds.items(): + self.check_saving_of_versioned(k, v) + def check_storage(self, content: list[Any]) -> None: """Test cache and DB equal content.""" expected_cache = {} for item in content: expected_cache[item.id_] = item self.assertEqual(self.checked_class.get_cache(), expected_cache) + hashes_content = [hash(x) for x in content] db_found: list[Any] = [] for item in content: assert isinstance(item.id_, type(self.default_ids[0])) @@ -66,19 +86,20 @@ class TestCaseWithDB(TestCase): 'id', item.id_): db_found += [self.checked_class.from_table_row(self.db_conn, row)] - self.assertEqual(sorted(content), sorted(db_found)) + hashes_db_found = [hash(x) for x in db_found] + self.assertEqual(sorted(hashes_content), sorted(hashes_db_found)) def check_saving_and_caching(self, **kwargs: Any) -> None: """Test instance.save in its core without relations.""" obj = self.checked_class(**kwargs) # pylint: disable=not-callable # check object init itself doesn't store anything yet self.check_storage([]) - # check saving stores in cache and DB + # check saving sets core attributes properly obj.save(self.db_conn) - self.check_storage([obj]) - # check core attributes set properly (and not unset by saving) for key, value in kwargs.items(): self.assertEqual(getattr(obj, key), value) + # check saving stored properly in cache and DB + self.check_storage([obj]) def check_saving_of_versioned(self, attr_name: str, type_: type) -> None: """Test owner's versioned attributes.""" @@ -88,7 +109,6 @@ class TestCaseWithDB(TestCase): attr.set(vals[0]) attr.set(vals[1]) owner.save(self.db_conn) - owner.uncache() retrieved = owner.__class__.by_id(self.db_conn, owner.id_) attr = getattr(retrieved, attr_name) self.assertEqual(sorted(attr.history.values()), vals) @@ -118,9 +138,11 @@ class TestCaseWithDB(TestCase): assert isinstance(obj.id_, type(self.default_ids[0])) for row in self.db_conn.row_where(self.checked_class.table_name, 'id', obj.id_): + hash_original = hash(obj) retrieved = self.checked_class.from_table_row(self.db_conn, row) - self.assertEqual(obj, retrieved) - self.assertEqual({obj.id_: obj}, self.checked_class.get_cache()) + self.assertEqual(hash_original, hash(retrieved)) + self.assertEqual({retrieved.id_: retrieved}, + self.checked_class.get_cache()) def check_versioned_from_table_row(self, attr_name: str, type_: type) -> None: @@ -196,6 +218,7 @@ class TestCaseWithServer(TestCaseWithDB): self.server_thread.start() self.conn = HTTPConnection(str(self.httpd.server_address[0]), self.httpd.server_address[1]) + self.httpd.set_json_mode() def tearDown(self) -> None: self.httpd.shutdown() @@ -243,5 +266,34 @@ class TestCaseWithServer(TestCaseWithDB): """POST basic Process.""" if not form_data: form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1} - self.check_post(form_data, '/process?id=', 302, f'/process?id={id_}') + self.check_post(form_data, f'/process?id={id_}', 302, + f'/process?id={id_}') return form_data + + def check_json_get(self, path: str, expected: dict[str, object]) -> None: + """Compare JSON on GET path with expected. + + To simplify comparison of VersionedAttribute histories, transforms + timestamp keys of VersionedAttribute history keys into integers + counting chronologically forward from 0. + """ + def rewrite_history_keys_in(item: Any) -> Any: + if isinstance(item, dict): + if '_versioned' in item.keys(): + for k in item['_versioned']: + vals = item['_versioned'][k].values() + history = {} + for i, val in enumerate(vals): + history[i] = val + item['_versioned'][k] = history + for k in list(item.keys()): + rewrite_history_keys_in(item[k]) + elif isinstance(item, list): + item[:] = [rewrite_history_keys_in(i) for i in item] + return item + self.conn.request('GET', path) + response = self.conn.getresponse() + self.assertEqual(response.status, 200) + retrieved = json_loads(response.read().decode()) + rewrite_history_keys_in(retrieved) + self.assertEqual(expected, retrieved)