X-Git-Url: https://plomlompom.com/repos/%7B%7B%20web_path%20%7D%7D/decks/%7B%7Bdeck_id%7D%7D/cards/%7B%7Bcard_id%7D%7D/form?a=blobdiff_plain;ds=sidebyside;f=tests%2Futils.py;h=665436873c27af704a13827715d3c795e04e1fe1;hb=HEAD;hp=55c948a409dbf1b2c43f775c0d39106ba3ed510d;hpb=25b71c6f0b10db05907128daf50c6e543e514c35;p=plomtask diff --git a/tests/utils.py b/tests/utils.py index 55c948a..8008033 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -4,6 +4,8 @@ from unittest import TestCase from typing import Mapping, Any, Callable from threading import Thread from http.client import HTTPConnection +from datetime import datetime, timedelta +from time import sleep from json import loads as json_loads from urllib.parse import urlencode from uuid import uuid4 @@ -13,41 +15,156 @@ from plomtask.http import TaskHandler, TaskServer from plomtask.processes import Process, ProcessStep from plomtask.conditions import Condition from plomtask.days import Day +from plomtask.dating import DATE_FORMAT from plomtask.todos import Todo +from plomtask.versioned_attributes import VersionedAttribute, TIMESTAMP_FMT from plomtask.exceptions import NotFoundException, HandledException -class TestCaseSansDB(TestCase): - """Tests requiring no DB setup.""" +VERSIONED_VALS: dict[str, + list[str] | list[float]] = {'str': ['A', 'B'], + 'float': [0.3, 1.1]} + + +class TestCaseAugmented(TestCase): + """Tester core providing helpful basic internal decorators and methods.""" checked_class: Any - do_id_test: bool = False - default_init_args: list[Any] = [] - versioned_defaults_to_test: dict[str, str | float] = {} + default_init_kwargs: dict[str, Any] = {} - def test_id_setting(self) -> None: - """Test .id_ being set and its legal range being enforced.""" - if not self.do_id_test: - return - with self.assertRaises(HandledException): - self.checked_class(0, *self.default_init_args) - obj = self.checked_class(5, *self.default_init_args) - self.assertEqual(obj.id_, 5) + @staticmethod + def _run_if_checked_class(f: Callable[..., None]) -> Callable[..., None]: + def wrapper(self: TestCase) -> None: + if hasattr(self, 'checked_class'): + f(self) + return wrapper - def test_versioned_defaults(self) -> None: - """Test defaults of VersionedAttributes.""" - if len(self.versioned_defaults_to_test) == 0: - return - obj = self.checked_class(1, *self.default_init_args) - for k, v in self.versioned_defaults_to_test.items(): - self.assertEqual(getattr(obj, k).newest, v) + @classmethod + def _run_on_versioned_attributes(cls, + f: Callable[..., None] + ) -> Callable[..., None]: + @cls._run_if_checked_class + def wrapper(self: TestCase) -> None: + assert isinstance(self, TestCaseAugmented) + for attr_name in self.checked_class.to_save_versioned(): + default = self.checked_class.versioned_defaults[attr_name] + owner = self.checked_class(None, **self.default_init_kwargs) + attr = getattr(owner, attr_name) + to_set = VERSIONED_VALS[attr.value_type_name] + f(self, owner, attr_name, attr, default, to_set) + return wrapper + + @classmethod + def _make_from_defaults(cls, id_: float | str | None) -> Any: + return cls.checked_class(id_, **cls.default_init_kwargs) + + +class TestCaseSansDB(TestCaseAugmented): + """Tests requiring no DB setup.""" + legal_ids: list[str] | list[int] = [1, 5] + illegal_ids: list[str] | list[int] = [0] + + @TestCaseAugmented._run_if_checked_class + def test_id_validation(self) -> None: + """Test .id_ validation/setting.""" + for id_ in self.illegal_ids: + with self.assertRaises(HandledException): + self._make_from_defaults(id_) + for id_ in self.legal_ids: + obj = self._make_from_defaults(id_) + self.assertEqual(obj.id_, id_) + + @TestCaseAugmented._run_on_versioned_attributes + def test_versioned_set(self, + _: Any, + __: str, + attr: VersionedAttribute, + default: str | float, + to_set: list[str | float] + ) -> None: + """Test VersionedAttribute.set() behaves as expected.""" + attr.set(default) + self.assertEqual(list(attr.history.values()), [default]) + # check same value does not get set twice in a row, + # and that not even its timestamp get updated + timestamp = list(attr.history.keys())[0] + attr.set(default) + self.assertEqual(list(attr.history.values()), [default]) + self.assertEqual(list(attr.history.keys())[0], timestamp) + # check that different value _will_ be set/added + attr.set(to_set[0]) + timesorted_vals = [attr.history[t] for + t in sorted(attr.history.keys())] + expected = [default, to_set[0]] + self.assertEqual(timesorted_vals, expected) + # check that a previously used value can be set if not most recent + attr.set(default) + timesorted_vals = [attr.history[t] for + t in sorted(attr.history.keys())] + expected = [default, to_set[0], default] + self.assertEqual(timesorted_vals, expected) + # again check for same value not being set twice in a row, even for + # later items + attr.set(to_set[1]) + timesorted_vals = [attr.history[t] for + t in sorted(attr.history.keys())] + expected = [default, to_set[0], default, to_set[1]] + self.assertEqual(timesorted_vals, expected) + attr.set(to_set[1]) + self.assertEqual(timesorted_vals, expected) + + @TestCaseAugmented._run_on_versioned_attributes + def test_versioned_newest(self, + _: Any, + __: str, + attr: VersionedAttribute, + default: str | float, + to_set: list[str | float] + ) -> None: + """Test VersionedAttribute.newest.""" + # check .newest on empty history returns .default + self.assertEqual(attr.newest, default) + # check newest element always returned + for v in [to_set[0], to_set[1]]: + attr.set(v) + self.assertEqual(attr.newest, v) + # check newest element returned even if also early value + attr.set(default) + self.assertEqual(attr.newest, default) + + @TestCaseAugmented._run_on_versioned_attributes + def test_versioned_at(self, + _: Any, + __: str, + attr: VersionedAttribute, + default: str | float, + to_set: list[str | float] + ) -> None: + """Test .at() returns values nearest to queried time, or default.""" + # check .at() return default on empty history + timestamp_a = datetime.now().strftime(TIMESTAMP_FMT) + self.assertEqual(attr.at(timestamp_a), default) + # check value exactly at timestamp returned + attr.set(to_set[0]) + timestamp_b = list(attr.history.keys())[0] + self.assertEqual(attr.at(timestamp_b), to_set[0]) + # check earliest value returned if exists, rather than default + self.assertEqual(attr.at(timestamp_a), to_set[0]) + # check reverts to previous value for timestamps not indexed + sleep(0.00001) + timestamp_between = datetime.now().strftime(TIMESTAMP_FMT) + sleep(0.00001) + attr.set(to_set[1]) + timestamp_c = sorted(attr.history.keys())[-1] + self.assertEqual(attr.at(timestamp_c), to_set[1]) + self.assertEqual(attr.at(timestamp_between), to_set[0]) + sleep(0.00001) + timestamp_after_c = datetime.now().strftime(TIMESTAMP_FMT) + self.assertEqual(attr.at(timestamp_after_c), to_set[1]) -class TestCaseWithDB(TestCase): +class TestCaseWithDB(TestCaseAugmented): """Module tests not requiring DB setup.""" - checked_class: Any default_ids: tuple[int | str, int | str, int | str] = (1, 2, 3) - default_init_kwargs: dict[str, Any] = {} - test_versioneds: dict[str, type] = {} def setUp(self) -> None: Condition.empty_cache() @@ -62,30 +179,26 @@ class TestCaseWithDB(TestCase): self.db_conn.close() remove_file(self.db_file.path) - @staticmethod - def _within_checked_class(f: Callable[..., None]) -> Callable[..., None]: - def wrapper(self: TestCaseWithDB) -> None: - if hasattr(self, 'checked_class'): - f(self) - return wrapper + def _load_from_db(self, id_: int | str) -> list[object]: + db_found: list[object] = [] + for row in self.db_conn.row_where(self.checked_class.table_name, + 'id', id_): + db_found += [self.checked_class.from_table_row(self.db_conn, + row)] + return db_found - @_within_checked_class - def test_saving_and_caching(self) -> None: - """Test storage and initialization of instances and attributes.""" - self.check_saving_and_caching(id_=1, **self.default_init_kwargs) - obj = self.checked_class(None, **self.default_init_kwargs) - obj.save(self.db_conn) - self.assertEqual(obj.id_, 2) - for attr_name, type_ in self.test_versioneds.items(): - owner = self.checked_class(None) - vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1] - attr = getattr(owner, attr_name) - attr.set(vals[0]) - attr.set(vals[1]) - owner.save(self.db_conn) - retrieved = owner.__class__.by_id(self.db_conn, owner.id_) - attr = getattr(retrieved, attr_name) - self.assertEqual(sorted(attr.history.values()), vals) + def _change_obj(self, obj: object) -> str: + attr_name: str = self.checked_class.to_save_simples[-1] + attr = getattr(obj, attr_name) + new_attr: str | int | float | bool + if isinstance(attr, (int, float)): + new_attr = attr + 1 + elif isinstance(attr, str): + new_attr = attr + '_' + elif isinstance(attr, bool): + new_attr = not attr + setattr(obj, attr_name, new_attr) + return attr_name def check_identity_with_cache_and_db(self, content: list[Any]) -> None: """Test both cache and DB equal content.""" @@ -97,79 +210,200 @@ class TestCaseWithDB(TestCase): db_found: list[Any] = [] for item in content: assert isinstance(item.id_, type(self.default_ids[0])) - for row in self.db_conn.row_where(self.checked_class.table_name, - 'id', item.id_): - db_found += [self.checked_class.from_table_row(self.db_conn, - row)] + db_found += self._load_from_db(item.id_) hashes_db_found = [hash(x) for x in db_found] self.assertEqual(sorted(hashes_content), sorted(hashes_db_found)) - def check_saving_and_caching(self, **kwargs: Any) -> None: - """Test instance.save in its core without relations.""" - obj = self.checked_class(**kwargs) # pylint: disable=not-callable - # check object init itself doesn't store anything yet - self.check_identity_with_cache_and_db([]) - # check saving sets core attributes properly - obj.save(self.db_conn) - for key, value in kwargs.items(): - self.assertEqual(getattr(obj, key), value) - # check saving stored properly in cache and DB - self.check_identity_with_cache_and_db([obj]) + def check_by_date_range_with_limits(self, + date_col: str, + set_id_field: bool = True + ) -> None: + """Test .by_date_range_with_limits.""" + # pylint: disable=too-many-locals + f = self.checked_class.by_date_range_with_limits + # check illegal ranges + legal_range = ('yesterday', 'tomorrow') + for i in [0, 1]: + for bad_date in ['foo', '2024-02-30', '2024-01-01 12:00:00']: + date_range = list(legal_range[:]) + date_range[i] = bad_date + with self.assertRaises(HandledException): + f(self.db_conn, date_range, date_col) + # check empty, translation of 'yesterday' and 'tomorrow' + items, start, end = f(self.db_conn, legal_range, date_col) + self.assertEqual(items, []) + yesterday = datetime.now() + timedelta(days=-1) + tomorrow = datetime.now() + timedelta(days=+1) + self.assertEqual(start, yesterday.strftime(DATE_FORMAT)) + self.assertEqual(end, tomorrow.strftime(DATE_FORMAT)) + # prepare dated items for non-empty results + kwargs_with_date = self.default_init_kwargs.copy() + if set_id_field: + kwargs_with_date['id_'] = None + objs = [] + dates = ['2024-01-01', '2024-01-02', '2024-01-04'] + for date in ['2024-01-01', '2024-01-02', '2024-01-04']: + kwargs_with_date['date'] = date + obj = self.checked_class(**kwargs_with_date) + objs += [obj] + # check ranges still empty before saving + date_range = [dates[0], dates[-1]] + self.assertEqual(f(self.db_conn, date_range, date_col)[0], []) + # check all objs displayed within closed interval + for obj in objs: + obj.save(self.db_conn) + self.assertEqual(f(self.db_conn, date_range, date_col)[0], objs) + # check that only displayed what exists within interval + date_range = ['2023-12-20', '2024-01-03'] + expected = [objs[0], objs[1]] + self.assertEqual(f(self.db_conn, date_range, date_col)[0], expected) + date_range = ['2024-01-03', '2024-01-30'] + expected = [objs[2]] + self.assertEqual(f(self.db_conn, date_range, date_col)[0], expected) + # check that inverted interval displays nothing + date_range = [dates[-1], dates[0]] + self.assertEqual(f(self.db_conn, date_range, date_col)[0], []) + # check that "today" is interpreted, and single-element interval + today_date = datetime.now().strftime(DATE_FORMAT) + kwargs_with_date['date'] = today_date + obj_today = self.checked_class(**kwargs_with_date) + obj_today.save(self.db_conn) + date_range = ['today', 'today'] + items, start, end = f(self.db_conn, date_range, date_col) + self.assertEqual(start, today_date) + self.assertEqual(start, end) + self.assertEqual(items, [obj_today]) + + @TestCaseAugmented._run_on_versioned_attributes + def test_saving_versioned_attributes(self, + owner: Any, + attr_name: str, + attr: VersionedAttribute, + _: str | float, + to_set: list[str | float] + ) -> None: + """Test storage and initialization of versioned attributes.""" - @_within_checked_class + def retrieve_attr_vals(attr: VersionedAttribute) -> list[object]: + attr_vals_saved: list[object] = [] + for row in self.db_conn.row_where(attr.table_name, 'parent', + owner.id_): + attr_vals_saved += [row[2]] + return attr_vals_saved + + attr.set(to_set[0]) + # check that without attr.save() no rows in DB + rows = self.db_conn.row_where(attr.table_name, 'parent', owner.id_) + self.assertEqual([], rows) + # fail saving attributes on non-saved owner + with self.assertRaises(NotFoundException): + attr.save(self.db_conn) + # check owner.save() created entries as expected in attr table + owner.save(self.db_conn) + attr_vals_saved = retrieve_attr_vals(attr) + self.assertEqual([to_set[0]], attr_vals_saved) + # check changing attr val without save affects owner in memory … + attr.set(to_set[1]) + cmp_attr = getattr(owner, attr_name) + self.assertEqual(to_set, list(cmp_attr.history.values())) + self.assertEqual(cmp_attr.history, attr.history) + # … but does not yet affect DB + attr_vals_saved = retrieve_attr_vals(attr) + self.assertEqual([to_set[0]], attr_vals_saved) + # check individual attr.save also stores new val to DB + attr.save(self.db_conn) + attr_vals_saved = retrieve_attr_vals(attr) + self.assertEqual(to_set, attr_vals_saved) + + @TestCaseAugmented._run_if_checked_class + def test_saving_and_caching(self) -> None: + """Test effects of .cache() and .save().""" + id1 = self.default_ids[0] + # check failure to cache without ID (if None-ID input possible) + if isinstance(id1, int): + obj0 = self._make_from_defaults(None) + with self.assertRaises(HandledException): + obj0.cache() + # check mere object init itself doesn't even store in cache + obj1 = self._make_from_defaults(id1) + self.assertEqual(self.checked_class.get_cache(), {}) + # check .cache() fills cache, but not DB + obj1.cache() + self.assertEqual(self.checked_class.get_cache(), {id1: obj1}) + found_in_db = self._load_from_db(id1) + self.assertEqual(found_in_db, []) + # check .save() sets ID (for int IDs), updates cache, and fills DB + # (expect ID to be set to id1, despite obj1 already having that as ID: + # it's generated by cursor.lastrowid on the DB table, and with obj1 + # not written there, obj2 should get it first!) + id_input = None if isinstance(id1, int) else id1 + obj2 = self._make_from_defaults(id_input) + obj2.save(self.db_conn) + self.assertEqual(self.checked_class.get_cache(), {id1: obj2}) + # NB: we'll only compare hashes because obj2 itself disappears on + # .from_table_row-trioggered database reload + obj2_hash = hash(obj2) + found_in_db += self._load_from_db(id1) + self.assertEqual([hash(o) for o in found_in_db], [obj2_hash]) + # check we cannot overwrite obj2 with obj1 despite its same ID, + # since it has disappeared now + with self.assertRaises(HandledException): + obj1.save(self.db_conn) + + @TestCaseAugmented._run_if_checked_class def test_by_id(self) -> None: """Test .by_id().""" id1, id2, _ = self.default_ids # check failure if not yet saved - obj1 = self.checked_class(id1, **self.default_init_kwargs) + obj1 = self._make_from_defaults(id1) with self.assertRaises(NotFoundException): self.checked_class.by_id(self.db_conn, id1) # check identity of cached and retrieved obj1.cache() self.assertEqual(obj1, self.checked_class.by_id(self.db_conn, id1)) # check identity of saved and retrieved - obj2 = self.checked_class(id2, **self.default_init_kwargs) + obj2 = self._make_from_defaults(id2) obj2.save(self.db_conn) self.assertEqual(obj2, self.checked_class.by_id(self.db_conn, id2)) - # obj1.save(self.db_conn) - # self.check_identity_with_cache_and_db([obj1, obj2]) - @_within_checked_class + @TestCaseAugmented._run_if_checked_class def test_by_id_or_create(self) -> None: """Test .by_id_or_create.""" - # check .by_id_or_create acts like normal instantiation (sans saving) - id_ = self.default_ids[0] + # check .by_id_or_create fails if wrong class if not self.checked_class.can_create_by_id: with self.assertRaises(HandledException): - self.checked_class.by_id_or_create(self.db_conn, id_) - # check .by_id_or_create fails if wrong class - else: - by_id_created = self.checked_class.by_id_or_create(self.db_conn, - id_) - with self.assertRaises(NotFoundException): - self.checked_class.by_id(self.db_conn, id_) - self.assertEqual(self.checked_class(id_), by_id_created) + self.checked_class.by_id_or_create(self.db_conn, None) + return + # check ID input of None creates, on saving, ID=1,2,… for int IDs + if isinstance(self.default_ids[0], int): + for n in range(2): + item = self.checked_class.by_id_or_create(self.db_conn, None) + self.assertEqual(item.id_, None) + item.save(self.db_conn) + self.assertEqual(item.id_, n+1) + # check .by_id_or_create acts like normal instantiation (sans saving) + id_ = self.default_ids[2] + item = self.checked_class.by_id_or_create(self.db_conn, id_) + self.assertEqual(item.id_, id_) + with self.assertRaises(NotFoundException): + self.checked_class.by_id(self.db_conn, item.id_) + self.assertEqual(self.checked_class(item.id_), item) - @_within_checked_class + @TestCaseAugmented._run_if_checked_class def test_from_table_row(self) -> None: """Test .from_table_row() properly reads in class directly from DB.""" id_ = self.default_ids[0] - obj = self.checked_class(id_, **self.default_init_kwargs) + obj = self._make_from_defaults(id_) obj.save(self.db_conn) - assert isinstance(obj.id_, type(self.default_ids[0])) + assert isinstance(obj.id_, type(id_)) for row in self.db_conn.row_where(self.checked_class.table_name, 'id', obj.id_): # check .from_table_row reproduces state saved, no matter if obj # later changed (with caching even) + # NB: we'll only compare hashes because obj itself disappears on + # .from_table_row-triggered database reload hash_original = hash(obj) - attr_name = self.checked_class.to_save[-1] - attr = getattr(obj, attr_name) - if isinstance(attr, (int, float)): - setattr(obj, attr_name, attr + 1) - elif isinstance(attr, str): - setattr(obj, attr_name, attr + "_") - elif isinstance(attr, bool): - setattr(obj, attr_name, not attr) + attr_name = self._change_obj(obj) obj.cache() to_cmp = getattr(obj, attr_name) retrieved = self.checked_class.from_table_row(self.db_conn, row) @@ -179,27 +413,36 @@ class TestCaseWithDB(TestCase): self.assertEqual({retrieved.id_: retrieved}, self.checked_class.get_cache()) - def check_versioned_from_table_row(self, attr_name: str, - type_: type) -> None: - """Test .from_table_row() reads versioned attributes from DB.""" - owner = self.checked_class(None) - vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1] - attr = getattr(owner, attr_name) - attr.set(vals[0]) - attr.set(vals[1]) + @TestCaseAugmented._run_on_versioned_attributes + def test_versioned_history_from_row(self, + owner: Any, + _: str, + attr: VersionedAttribute, + default: str | float, + to_set: list[str | float] + ) -> None: + """"Test VersionedAttribute.history_from_row() knows its DB rows.""" + attr.set(to_set[0]) + attr.set(to_set[1]) owner.save(self.db_conn) + # make empty VersionedAttribute, fill from rows, compare to owner's for row in self.db_conn.row_where(owner.table_name, 'id', owner.id_): - retrieved = owner.__class__.from_table_row(self.db_conn, row) - attr = getattr(retrieved, attr_name) - self.assertEqual(sorted(attr.history.values()), vals) + loaded_attr = VersionedAttribute(owner, attr.table_name, default) + for row in self.db_conn.row_where(attr.table_name, 'parent', + owner.id_): + loaded_attr.history_from_row(row) + self.assertEqual(len(attr.history.keys()), + len(loaded_attr.history.keys())) + for timestamp, value in attr.history.items(): + self.assertEqual(value, loaded_attr.history[timestamp]) - @_within_checked_class + @TestCaseAugmented._run_if_checked_class def test_all(self) -> None: """Test .all() and its relation to cache and savings.""" - id_1, id_2, id_3 = self.default_ids - item1 = self.checked_class(id_1, **self.default_init_kwargs) - item2 = self.checked_class(id_2, **self.default_init_kwargs) - item3 = self.checked_class(id_3, **self.default_init_kwargs) + id1, id2, id3 = self.default_ids + item1 = self._make_from_defaults(id1) + item2 = self._make_from_defaults(id2) + item3 = self._make_from_defaults(id3) # check .all() returns empty list on un-cached items self.assertEqual(self.checked_class.all(self.db_conn), []) # check that all() shows only cached/saved items @@ -211,42 +454,48 @@ class TestCaseWithDB(TestCase): self.assertEqual(sorted(self.checked_class.all(self.db_conn)), sorted([item1, item2, item3])) - @_within_checked_class + @TestCaseAugmented._run_if_checked_class def test_singularity(self) -> None: """Test pointers made for single object keep pointing to it.""" id1 = self.default_ids[0] - obj = self.checked_class(id1, **self.default_init_kwargs) + obj = self._make_from_defaults(id1) obj.save(self.db_conn) - attr_name = self.checked_class.to_save[-1] - attr = getattr(obj, attr_name) - new_attr: str | int | float | bool - if isinstance(attr, (int, float)): - new_attr = attr + 1 - elif isinstance(attr, str): - new_attr = attr + '_' - elif isinstance(attr, bool): - new_attr = not attr - setattr(obj, attr_name, new_attr) + # change object, expect retrieved through .by_id to carry change + attr_name = self._change_obj(obj) + new_attr = getattr(obj, attr_name) retrieved = self.checked_class.by_id(self.db_conn, id1) self.assertEqual(new_attr, getattr(retrieved, attr_name)) - def check_versioned_singularity(self) -> None: - """Test singularity of VersionedAttributes on saving (with .title).""" - obj = self.checked_class(None) # pylint: disable=not-callable - obj.save(self.db_conn) - assert isinstance(obj.id_, int) - obj.title.set('named') - retrieved = self.checked_class.by_id(self.db_conn, obj.id_) - self.assertEqual(obj.title.history, retrieved.title.history) + @TestCaseAugmented._run_on_versioned_attributes + def test_versioned_singularity(self, + owner: Any, + attr_name: str, + attr: VersionedAttribute, + _: str | float, + to_set: list[str | float] + ) -> None: + """Test singularity of VersionedAttributes on saving.""" + owner.save(self.db_conn) + # change obj, expect retrieved through .by_id to carry change + attr.set(to_set[0]) + retrieved = self.checked_class.by_id(self.db_conn, owner.id_) + attr_retrieved = getattr(retrieved, attr_name) + self.assertEqual(attr.history, attr_retrieved.history) - def check_remove(self, *args: Any) -> None: + @TestCaseAugmented._run_if_checked_class + def test_remove(self) -> None: """Test .remove() effects on DB and cache.""" id_ = self.default_ids[0] - obj = self.checked_class(id_, *args) # pylint: disable=not-callable + obj = self._make_from_defaults(id_) + # check removal only works after saving with self.assertRaises(HandledException): obj.remove(self.db_conn) obj.save(self.db_conn) obj.remove(self.db_conn) + # check access to obj fails after removal + with self.assertRaises(HandledException): + print(obj.id_) + # check DB and cache now empty self.check_identity_with_cache_and_db([]) @@ -269,6 +518,71 @@ class TestCaseWithServer(TestCaseWithDB): self.server_thread.join() super().tearDown() + @staticmethod + def as_id_list(items: list[dict[str, object]]) -> list[int | str]: + """Return list of only 'id' fields of items.""" + id_list = [] + for item in items: + assert isinstance(item['id'], (int, str)) + id_list += [item['id']] + return id_list + + @staticmethod + def as_refs(items: list[dict[str, object]] + ) -> dict[str, dict[str, object]]: + """Return dictionary of items by their 'id' fields.""" + refs = {} + for item in items: + refs[str(item['id'])] = item + return refs + + @staticmethod + def cond_as_dict(id_: int = 1, + is_active: bool = False, + titles: None | list[str] = None, + descriptions: None | list[str] = None + ) -> dict[str, object]: + """Return JSON of Condition to expect.""" + d = {'id': id_, + 'is_active': is_active, + '_versioned': { + 'title': {}, + 'description': {}}} + titles = titles if titles else [] + descriptions = descriptions if descriptions else [] + assert isinstance(d['_versioned'], dict) + for i, title in enumerate(titles): + d['_versioned']['title'][i] = title + for i, description in enumerate(descriptions): + d['_versioned']['description'][i] = description + return d + + @staticmethod + def proc_as_dict(id_: int = 1, + title: str = 'A', + description: str = '', + effort: float = 1.0, + conditions: None | list[int] = None, + disables: None | list[int] = None, + blockers: None | list[int] = None, + enables: None | list[int] = None + ) -> dict[str, object]: + """Return JSON of Process to expect.""" + # pylint: disable=too-many-arguments + d = {'id': id_, + 'calendarize': False, + 'suppressed_steps': [], + 'explicit_steps': [], + '_versioned': { + 'title': {0: title}, + 'description': {0: description}, + 'effort': {0: effort}}, + 'conditions': conditions if conditions else [], + 'disables': disables if disables else [], + 'enables': enables if enables else [], + 'blockers': blockers if blockers else []} + return d + def check_redirect(self, target: str) -> None: """Check that self.conn answers with a 302 redirect to target.""" response = self.conn.getresponse() @@ -281,7 +595,7 @@ class TestCaseWithServer(TestCaseWithDB): self.assertEqual(self.conn.getresponse().status, expected_code) def check_post(self, data: Mapping[str, object], target: str, - expected_code: int, redirect_location: str = '') -> None: + expected_code: int = 302, redir: str = '') -> None: """Check that POST of data to target yields expected_code.""" encoded_form_data = urlencode(data, doseq=True).encode('utf-8') headers = {'Content-Type': 'application/x-www-form-urlencoded', @@ -289,9 +603,8 @@ class TestCaseWithServer(TestCaseWithDB): self.conn.request('POST', target, body=encoded_form_data, headers=headers) if 302 == expected_code: - if redirect_location == '': - redirect_location = target - self.check_redirect(redirect_location) + redir = target if redir == '' else redir + self.check_redirect(redir) else: self.assertEqual(self.conn.getresponse().status, expected_code) @@ -309,8 +622,8 @@ class TestCaseWithServer(TestCaseWithDB): """POST basic Process.""" if not form_data: form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1} - self.check_post(form_data, f'/process?id={id_}', 302, - f'/process?id={id_}') + self.check_post(form_data, f'/process?id={id_}', + redir=f'/process?id={id_}') return form_data def check_json_get(self, path: str, expected: dict[str, object]) -> None: @@ -320,6 +633,7 @@ class TestCaseWithServer(TestCaseWithDB): timestamp keys of VersionedAttribute history keys into integers counting chronologically forward from 0. """ + def rewrite_history_keys_in(item: Any) -> Any: if isinstance(item, dict): if '_versioned' in item.keys(): @@ -334,6 +648,7 @@ class TestCaseWithServer(TestCaseWithDB): elif isinstance(item, list): item[:] = [rewrite_history_keys_in(i) for i in item] return item + self.conn.request('GET', path) response = self.conn.getresponse() self.assertEqual(response.status, 200)