X-Git-Url: https://plomlompom.com/repos/%22https:/validator.w3.org/static/git-logo.png?a=blobdiff_plain;f=tests%2Futils.py;h=665436873c27af704a13827715d3c795e04e1fe1;hb=refs%2Fheads%2Fmaster;hp=f473c180ba565355a3b6ac0a03acb85d2fa307de;hpb=bdb93117ce0f2b08b7b70cf43ac086afa4689c0f;p=plomtask diff --git a/tests/utils.py b/tests/utils.py index f473c18..df3feea 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,62 +1,172 @@ """Shared test utilities.""" +# pylint: disable=too-many-lines from __future__ import annotations from unittest import TestCase from typing import Mapping, Any, Callable from threading import Thread from http.client import HTTPConnection -from json import loads as json_loads +from datetime import datetime, timedelta +from time import sleep +from json import loads as json_loads, dumps as json_dumps from urllib.parse import urlencode from uuid import uuid4 from os import remove as remove_file +from pprint import pprint from plomtask.db import DatabaseFile, DatabaseConnection from plomtask.http import TaskHandler, TaskServer from plomtask.processes import Process, ProcessStep from plomtask.conditions import Condition from plomtask.days import Day +from plomtask.dating import DATE_FORMAT from plomtask.todos import Todo +from plomtask.versioned_attributes import VersionedAttribute, TIMESTAMP_FMT from plomtask.exceptions import NotFoundException, HandledException -def _within_checked_class(f: Callable[..., None]) -> Callable[..., None]: - def wrapper(self: TestCase) -> None: - if hasattr(self, 'checked_class'): - f(self) - return wrapper +VERSIONED_VALS: dict[str, + list[str] | list[float]] = {'str': ['A', 'B'], + 'float': [0.3, 1.1]} -class TestCaseSansDB(TestCase): - """Tests requiring no DB setup.""" +class TestCaseAugmented(TestCase): + """Tester core providing helpful basic internal decorators and methods.""" checked_class: Any - default_init_args: list[Any] = [] - versioned_defaults_to_test: dict[str, str | float] = {} - legal_ids = [1, 5] - illegal_ids = [0] + default_init_kwargs: dict[str, Any] = {} + + @staticmethod + def _run_if_checked_class(f: Callable[..., None]) -> Callable[..., None]: + def wrapper(self: TestCase) -> None: + if hasattr(self, 'checked_class'): + f(self) + return wrapper + + @classmethod + def _run_on_versioned_attributes(cls, + f: Callable[..., None] + ) -> Callable[..., None]: + @cls._run_if_checked_class + def wrapper(self: TestCase) -> None: + assert isinstance(self, TestCaseAugmented) + for attr_name in self.checked_class.to_save_versioned(): + default = self.checked_class.versioned_defaults[attr_name] + owner = self.checked_class(None, **self.default_init_kwargs) + attr = getattr(owner, attr_name) + to_set = VERSIONED_VALS[attr.value_type_name] + f(self, owner, attr_name, attr, default, to_set) + return wrapper + + @classmethod + def _make_from_defaults(cls, id_: float | str | None) -> Any: + return cls.checked_class(id_, **cls.default_init_kwargs) + + +class TestCaseSansDB(TestCaseAugmented): + """Tests requiring no DB setup.""" + legal_ids: list[str] | list[int] = [1, 5] + illegal_ids: list[str] | list[int] = [0] - @_within_checked_class + @TestCaseAugmented._run_if_checked_class def test_id_validation(self) -> None: """Test .id_ validation/setting.""" for id_ in self.illegal_ids: with self.assertRaises(HandledException): - self.checked_class(id_, *self.default_init_args) + self._make_from_defaults(id_) for id_ in self.legal_ids: - obj = self.checked_class(id_, *self.default_init_args) + obj = self._make_from_defaults(id_) self.assertEqual(obj.id_, id_) - @_within_checked_class - def test_versioned_defaults(self) -> None: - """Test defaults of VersionedAttributes.""" - id_ = self.legal_ids[0] - obj = self.checked_class(id_, *self.default_init_args) - for k, v in self.versioned_defaults_to_test.items(): - self.assertEqual(getattr(obj, k).newest, v) - - -class TestCaseWithDB(TestCase): + @TestCaseAugmented._run_on_versioned_attributes + def test_versioned_set(self, + _: Any, + __: str, + attr: VersionedAttribute, + default: str | float, + to_set: list[str] | list[float] + ) -> None: + """Test VersionedAttribute.set() behaves as expected.""" + attr.set(default) + self.assertEqual(list(attr.history.values()), [default]) + # check same value does not get set twice in a row, + # and that not even its timestamp get updated + timestamp = list(attr.history.keys())[0] + attr.set(default) + self.assertEqual(list(attr.history.values()), [default]) + self.assertEqual(list(attr.history.keys())[0], timestamp) + # check that different value _will_ be set/added + attr.set(to_set[0]) + timesorted_vals = [attr.history[t] for + t in sorted(attr.history.keys())] + expected = [default, to_set[0]] + self.assertEqual(timesorted_vals, expected) + # check that a previously used value can be set if not most recent + attr.set(default) + timesorted_vals = [attr.history[t] for + t in sorted(attr.history.keys())] + expected = [default, to_set[0], default] + self.assertEqual(timesorted_vals, expected) + # again check for same value not being set twice in a row, even for + # later items + attr.set(to_set[1]) + timesorted_vals = [attr.history[t] for + t in sorted(attr.history.keys())] + expected = [default, to_set[0], default, to_set[1]] + self.assertEqual(timesorted_vals, expected) + attr.set(to_set[1]) + self.assertEqual(timesorted_vals, expected) + + @TestCaseAugmented._run_on_versioned_attributes + def test_versioned_newest(self, + _: Any, + __: str, + attr: VersionedAttribute, + default: str | float, + to_set: list[str] | list[float] + ) -> None: + """Test VersionedAttribute.newest.""" + # check .newest on empty history returns .default + self.assertEqual(attr.newest, default) + # check newest element always returned + for v in [to_set[0], to_set[1]]: + attr.set(v) + self.assertEqual(attr.newest, v) + # check newest element returned even if also early value + attr.set(default) + self.assertEqual(attr.newest, default) + + @TestCaseAugmented._run_on_versioned_attributes + def test_versioned_at(self, + _: Any, + __: str, + attr: VersionedAttribute, + default: str | float, + to_set: list[str] | list[float] + ) -> None: + """Test .at() returns values nearest to queried time, or default.""" + # check .at() return default on empty history + timestamp_a = datetime.now().strftime(TIMESTAMP_FMT) + self.assertEqual(attr.at(timestamp_a), default) + # check value exactly at timestamp returned + attr.set(to_set[0]) + timestamp_b = list(attr.history.keys())[0] + self.assertEqual(attr.at(timestamp_b), to_set[0]) + # check earliest value returned if exists, rather than default + self.assertEqual(attr.at(timestamp_a), to_set[0]) + # check reverts to previous value for timestamps not indexed + sleep(0.00001) + timestamp_between = datetime.now().strftime(TIMESTAMP_FMT) + sleep(0.00001) + attr.set(to_set[1]) + timestamp_c = sorted(attr.history.keys())[-1] + self.assertEqual(attr.at(timestamp_c), to_set[1]) + self.assertEqual(attr.at(timestamp_between), to_set[0]) + sleep(0.00001) + timestamp_after_c = datetime.now().strftime(TIMESTAMP_FMT) + self.assertEqual(attr.at(timestamp_after_c), to_set[1]) + + +class TestCaseWithDB(TestCaseAugmented): """Module tests not requiring DB setup.""" - checked_class: Any - default_ids: tuple[int | str, int | str, int | str] = (1, 2, 3) - default_init_kwargs: dict[str, Any] = {} - test_versioneds: dict[str, type] = {} + default_ids: tuple[int, int, int] | tuple[str, str, str] = (1, 2, 3) def setUp(self) -> None: Condition.empty_cache() @@ -80,7 +190,7 @@ class TestCaseWithDB(TestCase): return db_found def _change_obj(self, obj: object) -> str: - attr_name: str = self.checked_class.to_save[-1] + attr_name: str = self.checked_class.to_save_simples[-1] attr = getattr(obj, attr_name) new_attr: str | int | float | bool if isinstance(attr, (int, float)): @@ -106,92 +216,159 @@ class TestCaseWithDB(TestCase): hashes_db_found = [hash(x) for x in db_found] self.assertEqual(sorted(hashes_content), sorted(hashes_db_found)) - @_within_checked_class - def test_saving_versioned(self) -> None: + def check_by_date_range_with_limits(self, + date_col: str, + set_id_field: bool = True + ) -> None: + """Test .by_date_range_with_limits.""" + # pylint: disable=too-many-locals + f = self.checked_class.by_date_range_with_limits + # check illegal ranges + legal_range = ('yesterday', 'tomorrow') + for i in [0, 1]: + for bad_date in ['foo', '2024-02-30', '2024-01-01 12:00:00']: + date_range = list(legal_range[:]) + date_range[i] = bad_date + with self.assertRaises(HandledException): + f(self.db_conn, date_range, date_col) + # check empty, translation of 'yesterday' and 'tomorrow' + items, start, end = f(self.db_conn, legal_range, date_col) + self.assertEqual(items, []) + yesterday = datetime.now() + timedelta(days=-1) + tomorrow = datetime.now() + timedelta(days=+1) + self.assertEqual(start, yesterday.strftime(DATE_FORMAT)) + self.assertEqual(end, tomorrow.strftime(DATE_FORMAT)) + # prepare dated items for non-empty results + kwargs_with_date = self.default_init_kwargs.copy() + if set_id_field: + kwargs_with_date['id_'] = None + objs = [] + dates = ['2024-01-01', '2024-01-02', '2024-01-04'] + for date in ['2024-01-01', '2024-01-02', '2024-01-04']: + kwargs_with_date['date'] = date + obj = self.checked_class(**kwargs_with_date) + objs += [obj] + # check ranges still empty before saving + date_range = [dates[0], dates[-1]] + self.assertEqual(f(self.db_conn, date_range, date_col)[0], []) + # check all objs displayed within closed interval + for obj in objs: + obj.save(self.db_conn) + self.assertEqual(f(self.db_conn, date_range, date_col)[0], objs) + # check that only displayed what exists within interval + date_range = ['2023-12-20', '2024-01-03'] + expected = [objs[0], objs[1]] + self.assertEqual(f(self.db_conn, date_range, date_col)[0], expected) + date_range = ['2024-01-03', '2024-01-30'] + expected = [objs[2]] + self.assertEqual(f(self.db_conn, date_range, date_col)[0], expected) + # check that inverted interval displays nothing + date_range = [dates[-1], dates[0]] + self.assertEqual(f(self.db_conn, date_range, date_col)[0], []) + # check that "today" is interpreted, and single-element interval + today_date = datetime.now().strftime(DATE_FORMAT) + kwargs_with_date['date'] = today_date + obj_today = self.checked_class(**kwargs_with_date) + obj_today.save(self.db_conn) + date_range = ['today', 'today'] + items, start, end = f(self.db_conn, date_range, date_col) + self.assertEqual(start, today_date) + self.assertEqual(start, end) + self.assertEqual(items, [obj_today]) + + @TestCaseAugmented._run_on_versioned_attributes + def test_saving_versioned_attributes(self, + owner: Any, + attr_name: str, + attr: VersionedAttribute, + _: str | float, + to_set: list[str] | list[float] + ) -> None: """Test storage and initialization of versioned attributes.""" - def retrieve_attr_vals() -> list[object]: + + def retrieve_attr_vals(attr: VersionedAttribute) -> list[object]: attr_vals_saved: list[object] = [] - assert hasattr(retrieved, 'id_') for row in self.db_conn.row_where(attr.table_name, 'parent', - retrieved.id_): + owner.id_): attr_vals_saved += [row[2]] return attr_vals_saved - for attr_name, type_ in self.test_versioneds.items(): - # fail saving attributes on non-saved owner - owner = self.checked_class(None, **self.default_init_kwargs) - vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1] - attr = getattr(owner, attr_name) - attr.set(vals[0]) - attr.set(vals[1]) - with self.assertRaises(NotFoundException): - attr.save(self.db_conn) - owner.save(self.db_conn) - # check stored attribute is as expected - retrieved = self._load_from_db(owner.id_)[0] - attr = getattr(retrieved, attr_name) - self.assertEqual(sorted(attr.history.values()), vals) - # check owner.save() created entries in attr table - attr_vals_saved = retrieve_attr_vals() - self.assertEqual(vals, attr_vals_saved) - # check setting new val to attr inconsequential to DB without save - attr.set(vals[0]) - attr_vals_saved = retrieve_attr_vals() - self.assertEqual(vals, attr_vals_saved) - # check save finally adds new val - attr.save(self.db_conn) - attr_vals_saved = retrieve_attr_vals() - self.assertEqual(vals + [vals[0]], attr_vals_saved) - @_within_checked_class + attr.set(to_set[0]) + # check that without attr.save() no rows in DB + rows = self.db_conn.row_where(attr.table_name, 'parent', owner.id_) + self.assertEqual([], rows) + # fail saving attributes on non-saved owner + with self.assertRaises(NotFoundException): + attr.save(self.db_conn) + # check owner.save() created entries as expected in attr table + owner.save(self.db_conn) + attr_vals_saved = retrieve_attr_vals(attr) + self.assertEqual([to_set[0]], attr_vals_saved) + # check changing attr val without save affects owner in memory … + attr.set(to_set[1]) + cmp_attr = getattr(owner, attr_name) + self.assertEqual(to_set, list(cmp_attr.history.values())) + self.assertEqual(cmp_attr.history, attr.history) + # … but does not yet affect DB + attr_vals_saved = retrieve_attr_vals(attr) + self.assertEqual([to_set[0]], attr_vals_saved) + # check individual attr.save also stores new val to DB + attr.save(self.db_conn) + attr_vals_saved = retrieve_attr_vals(attr) + self.assertEqual(to_set, attr_vals_saved) + + @TestCaseAugmented._run_if_checked_class def test_saving_and_caching(self) -> None: """Test effects of .cache() and .save().""" id1 = self.default_ids[0] # check failure to cache without ID (if None-ID input possible) if isinstance(id1, int): - obj0 = self.checked_class(None, **self.default_init_kwargs) + obj0 = self._make_from_defaults(None) with self.assertRaises(HandledException): obj0.cache() # check mere object init itself doesn't even store in cache - obj1 = self.checked_class(id1, **self.default_init_kwargs) + obj1 = self._make_from_defaults(id1) self.assertEqual(self.checked_class.get_cache(), {}) # check .cache() fills cache, but not DB obj1.cache() self.assertEqual(self.checked_class.get_cache(), {id1: obj1}) - db_found = self._load_from_db(id1) - self.assertEqual(db_found, []) + found_in_db = self._load_from_db(id1) + self.assertEqual(found_in_db, []) # check .save() sets ID (for int IDs), updates cache, and fills DB # (expect ID to be set to id1, despite obj1 already having that as ID: # it's generated by cursor.lastrowid on the DB table, and with obj1 # not written there, obj2 should get it first!) id_input = None if isinstance(id1, int) else id1 - obj2 = self.checked_class(id_input, **self.default_init_kwargs) + obj2 = self._make_from_defaults(id_input) obj2.save(self.db_conn) - obj2_hash = hash(obj2) self.assertEqual(self.checked_class.get_cache(), {id1: obj2}) - db_found += self._load_from_db(id1) - self.assertEqual([hash(o) for o in db_found], [obj2_hash]) + # NB: we'll only compare hashes because obj2 itself disappears on + # .from_table_row-triggered database reload + obj2_hash = hash(obj2) + found_in_db += self._load_from_db(id1) + self.assertEqual([hash(o) for o in found_in_db], [obj2_hash]) # check we cannot overwrite obj2 with obj1 despite its same ID, # since it has disappeared now with self.assertRaises(HandledException): obj1.save(self.db_conn) - @_within_checked_class + @TestCaseAugmented._run_if_checked_class def test_by_id(self) -> None: """Test .by_id().""" id1, id2, _ = self.default_ids # check failure if not yet saved - obj1 = self.checked_class(id1, **self.default_init_kwargs) + obj1 = self._make_from_defaults(id1) with self.assertRaises(NotFoundException): self.checked_class.by_id(self.db_conn, id1) # check identity of cached and retrieved obj1.cache() self.assertEqual(obj1, self.checked_class.by_id(self.db_conn, id1)) # check identity of saved and retrieved - obj2 = self.checked_class(id2, **self.default_init_kwargs) + obj2 = self._make_from_defaults(id2) obj2.save(self.db_conn) self.assertEqual(obj2, self.checked_class.by_id(self.db_conn, id2)) - @_within_checked_class + @TestCaseAugmented._run_if_checked_class def test_by_id_or_create(self) -> None: """Test .by_id_or_create.""" # check .by_id_or_create fails if wrong class @@ -214,17 +391,19 @@ class TestCaseWithDB(TestCase): self.checked_class.by_id(self.db_conn, item.id_) self.assertEqual(self.checked_class(item.id_), item) - @_within_checked_class + @TestCaseAugmented._run_if_checked_class def test_from_table_row(self) -> None: """Test .from_table_row() properly reads in class directly from DB.""" id_ = self.default_ids[0] - obj = self.checked_class(id_, **self.default_init_kwargs) + obj = self._make_from_defaults(id_) obj.save(self.db_conn) assert isinstance(obj.id_, type(id_)) for row in self.db_conn.row_where(self.checked_class.table_name, 'id', obj.id_): # check .from_table_row reproduces state saved, no matter if obj # later changed (with caching even) + # NB: we'll only compare hashes because obj itself disappears on + # .from_table_row-triggered database reload hash_original = hash(obj) attr_name = self._change_obj(obj) obj.cache() @@ -235,27 +414,37 @@ class TestCaseWithDB(TestCase): # check cache contains what .from_table_row just produced self.assertEqual({retrieved.id_: retrieved}, self.checked_class.get_cache()) - # check .from_table_row also reads versioned attributes from DB - for attr_name, type_ in self.test_versioneds.items(): - owner = self.checked_class(None) - vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1] - attr = getattr(owner, attr_name) - attr.set(vals[0]) - attr.set(vals[1]) - owner.save(self.db_conn) - for row in self.db_conn.row_where(owner.table_name, 'id', + + @TestCaseAugmented._run_on_versioned_attributes + def test_versioned_history_from_row(self, + owner: Any, + _: str, + attr: VersionedAttribute, + default: str | float, + to_set: list[str] | list[float] + ) -> None: + """"Test VersionedAttribute.history_from_row() knows its DB rows.""" + attr.set(to_set[0]) + attr.set(to_set[1]) + owner.save(self.db_conn) + # make empty VersionedAttribute, fill from rows, compare to owner's + for row in self.db_conn.row_where(owner.table_name, 'id', owner.id_): + loaded_attr = VersionedAttribute(owner, attr.table_name, default) + for row in self.db_conn.row_where(attr.table_name, 'parent', owner.id_): - retrieved = owner.__class__.from_table_row(self.db_conn, row) - attr = getattr(retrieved, attr_name) - self.assertEqual(sorted(attr.history.values()), vals) + loaded_attr.history_from_row(row) + self.assertEqual(len(attr.history.keys()), + len(loaded_attr.history.keys())) + for timestamp, value in attr.history.items(): + self.assertEqual(value, loaded_attr.history[timestamp]) - @_within_checked_class + @TestCaseAugmented._run_if_checked_class def test_all(self) -> None: """Test .all() and its relation to cache and savings.""" - id_1, id_2, id_3 = self.default_ids - item1 = self.checked_class(id_1, **self.default_init_kwargs) - item2 = self.checked_class(id_2, **self.default_init_kwargs) - item3 = self.checked_class(id_3, **self.default_init_kwargs) + id1, id2, id3 = self.default_ids + item1 = self._make_from_defaults(id1) + item2 = self._make_from_defaults(id2) + item3 = self._make_from_defaults(id3) # check .all() returns empty list on un-cached items self.assertEqual(self.checked_class.all(self.db_conn), []) # check that all() shows only cached/saved items @@ -267,11 +456,11 @@ class TestCaseWithDB(TestCase): self.assertEqual(sorted(self.checked_class.all(self.db_conn)), sorted([item1, item2, item3])) - @_within_checked_class + @TestCaseAugmented._run_if_checked_class def test_singularity(self) -> None: """Test pointers made for single object keep pointing to it.""" id1 = self.default_ids[0] - obj = self.checked_class(id1, **self.default_init_kwargs) + obj = self._make_from_defaults(id1) obj.save(self.db_conn) # change object, expect retrieved through .by_id to carry change attr_name = self._change_obj(obj) @@ -279,23 +468,27 @@ class TestCaseWithDB(TestCase): retrieved = self.checked_class.by_id(self.db_conn, id1) self.assertEqual(new_attr, getattr(retrieved, attr_name)) - @_within_checked_class - def test_versioned_singularity_title(self) -> None: - """Test singularity of VersionedAttributes on saving (with .title).""" - if 'title' in self.test_versioneds: - obj = self.checked_class(None) - obj.save(self.db_conn) - assert isinstance(obj.id_, int) - # change obj, expect retrieved through .by_id to carry change - obj.title.set('named') - retrieved = self.checked_class.by_id(self.db_conn, obj.id_) - self.assertEqual(obj.title.history, retrieved.title.history) - - @_within_checked_class + @TestCaseAugmented._run_on_versioned_attributes + def test_versioned_singularity(self, + owner: Any, + attr_name: str, + attr: VersionedAttribute, + _: str | float, + to_set: list[str] | list[float] + ) -> None: + """Test singularity of VersionedAttributes on saving.""" + owner.save(self.db_conn) + # change obj, expect retrieved through .by_id to carry change + attr.set(to_set[0]) + retrieved = self.checked_class.by_id(self.db_conn, owner.id_) + attr_retrieved = getattr(retrieved, attr_name) + self.assertEqual(attr.history, attr_retrieved.history) + + @TestCaseAugmented._run_if_checked_class def test_remove(self) -> None: """Test .remove() effects on DB and cache.""" id_ = self.default_ids[0] - obj = self.checked_class(id_, **self.default_init_kwargs) + obj = self._make_from_defaults(id_) # check removal only works after saving with self.assertRaises(HandledException): obj.remove(self.db_conn) @@ -308,6 +501,346 @@ class TestCaseWithDB(TestCase): self.check_identity_with_cache_and_db([]) +class Expected: + """Builder of (JSON-like) dict to compare against responses of test server. + + Collects all items and relations we expect expressed in the server's JSON + responses and puts them into the proper json.dumps-friendly dict structure, + accessibla via .as_dict, to compare them in TestsWithServer.check_json_get. + + On its own provides for .as_dict output only {"_library": …}, initialized + from .__init__ and to be directly manipulated via the .lib* methods. + Further structures of the expected response may be added and kept + up-to-date by subclassing .__init__, .recalc, and .d. + + NB: Lots of expectations towards server behavior will be made explicit here + (or in the subclasses) rather than in the actual TestCase methods' code. + """ + _default_dict: dict[str, Any] + _forced: dict[str, Any] + _fields: dict[str, Any] + _on_empty_make_temp: tuple[str, str] + + def __init__(self, + todos: list[dict[str, Any]] | None = None, + procs: list[dict[str, Any]] | None = None, + procsteps: list[dict[str, Any]] | None = None, + conds: list[dict[str, Any]] | None = None, + days: list[dict[str, Any]] | None = None + ) -> None: + # pylint: disable=too-many-arguments + for name in ['_default_dict', '_fields', '_forced']: + if not hasattr(self, name): + setattr(self, name, {}) + self._lib = {} + for title, items in [('Todo', todos), + ('Process', procs), + ('ProcessStep', procsteps), + ('Condition', conds), + ('Day', days)]: + if items: + self._lib[title] = self._as_refs(items) + for k, v in self._default_dict.items(): + if k not in self._fields: + self._fields[k] = v + + def recalc(self) -> None: + """Update internal dictionary by subclass-specific rules.""" + todos = self.lib_all('Todo') + for todo in todos: + todo['parents'] = [] + for todo in todos: + for child_id in todo['children']: + self.lib_get('Todo', child_id)['parents'] += [todo['id']] + todo['children'].sort() + procsteps = self.lib_all('ProcessStep') + procs = self.lib_all('Process') + for proc in procs: + proc['explicit_steps'] = [s['id'] for s in procsteps + if s['owner_id'] == proc['id']] + + @property + def as_dict(self) -> dict[str, Any]: + """Return dict to compare against test server JSON responses.""" + make_temp = False + if hasattr(self, '_on_empty_make_temp'): + category, dicter = getattr(self, '_on_empty_make_temp') + id_ = self._fields[category.lower()] + make_temp = not bool(self.lib_get(category, id_)) + if make_temp: + f = getattr(self, dicter) + self.lib_set(category, [f(id_)]) + self.recalc() + d = {'_library': self._lib} + for k, v in self._fields.items(): + # we expect everything sortable to be sorted + if isinstance(v, list) and k not in self._forced: + # NB: if we don't test for v being list, sorted() on an empty + # dict may return an empty list + try: + v = sorted(v) + except TypeError: + pass + d[k] = v + for k, v in self._forced.items(): + d[k] = v + if make_temp: + json = json_dumps(d) + self.lib_del(category, id_) + d = json_loads(json) + return d + + def lib_get(self, category: str, id_: str | int) -> dict[str, Any]: + """From library, return item of category and id_, or empty dict.""" + str_id = str(id_) + if category in self._lib and str_id in self._lib[category]: + return self._lib[category][str_id] + return {} + + def lib_all(self, category: str) -> list[dict[str, Any]]: + """From library, return items of category, or [] if none.""" + if category in self._lib: + return list(self._lib[category].values()) + return [] + + def lib_set(self, category: str, items: list[dict[str, object]]) -> None: + """Update library for category with items.""" + if category not in self._lib: + self._lib[category] = {} + for k, v in self._as_refs(items).items(): + self._lib[category][k] = v + + def lib_del(self, category: str, id_: str | int) -> None: + """Remove category element of id_ from library.""" + del self._lib[category][str(id_)] + if 0 == len(self._lib[category]): + del self._lib[category] + + def lib_wipe(self, category: str) -> None: + """Remove category from library.""" + if category in self._lib: + del self._lib[category] + + def set(self, field_name: str, value: object) -> None: + """Set top-level .as_dict field.""" + self._fields[field_name] = value + + def force(self, field_name: str, value: object) -> None: + """Set ._forced field to ensure value in .as_dict.""" + self._forced[field_name] = value + + def unforce(self, field_name: str) -> None: + """Unset ._forced field.""" + del self._forced[field_name] + + @staticmethod + def _as_refs(items: list[dict[str, object]] + ) -> dict[str, dict[str, object]]: + """Return dictionary of items by their 'id' fields.""" + refs = {} + for item in items: + refs[str(item['id'])] = item + return refs + + @staticmethod + def as_ids(items: list[dict[str, Any]]) -> list[int] | list[str]: + """Return list of only 'id' fields of items.""" + return [item['id'] for item in items] + + @staticmethod + def day_as_dict(date: str, comment: str = '') -> dict[str, object]: + """Return JSON of Day to expect.""" + return {'id': date, 'comment': comment, 'todos': []} + + def set_day_from_post(self, date: str, d: dict[str, Any]) -> None: + """Set Day of date in library based on POST dict d.""" + day = self.day_as_dict(date) + for k, v in d.items(): + if 'day_comment' == k: + day['comment'] = v + elif 'new_todo' == k: + next_id = 1 + for todo in self.lib_all('Todo'): + if next_id <= todo['id']: + next_id = todo['id'] + 1 + for proc_id in sorted(v): + todo = self.todo_as_dict(next_id, proc_id, date) + self.lib_set('Todo', [todo]) + next_id += 1 + elif 'done' == k: + for todo_id in v: + self.lib_get('Todo', todo_id)['is_done'] = True + elif 'todo_id' == k: + for i, todo_id in enumerate(v): + t = self.lib_get('Todo', todo_id) + if 'comment' in d: + t['comment'] = d['comment'][i] + if 'effort' in d: + effort = d['effort'][i] if d['effort'][i] else None + t['effort'] = effort + self.lib_set('Day', [day]) + + @staticmethod + def cond_as_dict(id_: int = 1, + is_active: bool = False, + title: None | str = None, + description: None | str = None, + ) -> dict[str, object]: + """Return JSON of Condition to expect.""" + versioned: dict[str, dict[str, object]] + versioned = {'title': {}, 'description': {}} + if title is not None: + versioned['title']['0'] = title + if description is not None: + versioned['description']['0'] = description + return {'id': id_, 'is_active': is_active, '_versioned': versioned} + + def set_cond_from_post(self, id_: int, d: dict[str, Any]) -> None: + """Set Condition of id_ in library based on POST dict d.""" + if d == {'delete': ''}: + self.lib_del('Condition', id_) + return + cond = self.lib_get('Condition', id_) + if cond: + cond['is_active'] = d['is_active'] + for category in ['title', 'description']: + history = cond['_versioned'][category] + if len(history) > 0: + last_i = sorted([int(k) for k in history.keys()])[-1] + if d[category] != history[str(last_i)]: + history[str(last_i + 1)] = d[category] + else: + history['0'] = d[category] + else: + cond = self.cond_as_dict( + id_, d['is_active'], d['title'], d['description']) + self.lib_set('Condition', [cond]) + + @staticmethod + def todo_as_dict(id_: int = 1, + process_id: int = 1, + date: str = '2024-01-01', + conditions: None | list[int] = None, + disables: None | list[int] = None, + blockers: None | list[int] = None, + enables: None | list[int] = None, + calendarize: bool = False, + comment: str = '', + is_done: bool = False, + effort: float | None = None, + children: list[int] | None = None, + parents: list[int] | None = None, + ) -> dict[str, object]: + """Return JSON of Todo to expect.""" + # pylint: disable=too-many-arguments + d = {'id': id_, + 'date': date, + 'process_id': process_id, + 'is_done': is_done, + 'calendarize': calendarize, + 'comment': comment, + 'children': children if children else [], + 'parents': parents if parents else [], + 'effort': effort, + 'conditions': conditions if conditions else [], + 'disables': disables if disables else [], + 'blockers': blockers if blockers else [], + 'enables': enables if enables else []} + return d + + def set_todo_from_post(self, id_: int, d: dict[str, Any]) -> None: + """Set Todo of id_ in library based on POST dict d.""" + corrected_kwargs: dict[str, Any] = {} + for k, v in d.items(): + if k in {'adopt', 'step_filler'}: + if 'children' not in corrected_kwargs: + corrected_kwargs['children'] = [] + new_children = v if isinstance(v, list) else [v] + corrected_kwargs['children'] += new_children + continue + if 'done' == k: + k = 'is_done' + if k in {'is_done', 'calendarize'}: + v = True + corrected_kwargs[k] = v + todo = self.todo_as_dict(id_, **corrected_kwargs) + self.lib_set('Todo', [todo]) + + @staticmethod + def procstep_as_dict(id_: int, + owner_id: int, + step_process_id: int, + parent_step_id: int | None = None + ) -> dict[str, object]: + """Return JSON of ProcessStep to expect.""" + return {'id': id_, + 'owner_id': owner_id, + 'step_process_id': step_process_id, + 'parent_step_id': parent_step_id} + + @staticmethod + def proc_as_dict(id_: int = 1, + title: None | str = None, + description: None | str = None, + effort: None | float = None, + conditions: None | list[int] = None, + disables: None | list[int] = None, + blockers: None | list[int] = None, + enables: None | list[int] = None, + explicit_steps: None | list[int] = None + ) -> dict[str, object]: + """Return JSON of Process to expect.""" + # pylint: disable=too-many-arguments + versioned: dict[str, dict[str, object]] + versioned = {'title': {}, 'description': {}, 'effort': {}} + if title is not None: + versioned['title']['0'] = title + if description is not None: + versioned['description']['0'] = description + if effort is not None: + versioned['effort']['0'] = effort + d = {'id': id_, + 'calendarize': False, + 'suppressed_steps': [], + 'explicit_steps': explicit_steps if explicit_steps else [], + '_versioned': versioned, + 'conditions': conditions if conditions else [], + 'disables': disables if disables else [], + 'enables': enables if enables else [], + 'blockers': blockers if blockers else []} + return d + + def set_proc_from_post(self, id_: int, d: dict[str, Any]) -> None: + """Set Process of id_ in library based on POST dict d.""" + proc = self.lib_get('Process', id_) + if proc: + for category in ['title', 'description', 'effort']: + history = proc['_versioned'][category] + if len(history) > 0: + last_i = sorted([int(k) for k in history.keys()])[-1] + if d[category] != history[str(last_i)]: + history[str(last_i + 1)] = d[category] + else: + history['0'] = d[category] + else: + proc = self.proc_as_dict(id_, + d['title'], d['description'], d['effort']) + ignore = {'title', 'description', 'effort', 'new_top_step', 'step_of', + 'kept_steps'} + for k, v in d.items(): + if k in ignore\ + or k.startswith('step_') or k.startswith('new_step_to'): + continue + if k in {'calendarize'}: + v = True + elif k in {'suppressed_steps', 'explicit_steps', 'conditions', + 'disables', 'enables', 'blockers'}: + if not isinstance(v, list): + v = [v] + proc[k] = v + self.lib_set('Process', [proc]) + + class TestCaseWithServer(TestCaseWithDB): """Module tests against our HTTP server/handler (and database).""" @@ -319,7 +852,7 @@ class TestCaseWithServer(TestCaseWithDB): self.server_thread.start() self.conn = HTTPConnection(str(self.httpd.server_address[0]), self.httpd.server_address[1]) - self.httpd.set_json_mode() + self.httpd.render_mode = 'json' def tearDown(self) -> None: self.httpd.shutdown() @@ -327,6 +860,55 @@ class TestCaseWithServer(TestCaseWithDB): self.server_thread.join() super().tearDown() + def post_exp_cond(self, + exps: list[Expected], + id_: int, + payload: dict[str, object], + path_suffix: str = '', + redir_suffix: str = '' + ) -> None: + """POST /condition(s), appropriately update Expecteds.""" + # pylint: disable=too-many-arguments + path = f'/condition{path_suffix}' + redir = f'/condition{redir_suffix}' + self.check_post(payload, path, redir=redir) + for exp in exps: + exp.set_cond_from_post(id_, payload) + + def post_exp_day(self, + exps: list[Expected], + payload: dict[str, Any], + date: str = '2024-01-01' + ) -> None: + """POST /day, appropriately update Expecteds.""" + if 'make_type' not in payload: + payload['make_type'] = 'empty' + if 'day_comment' not in payload: + payload['day_comment'] = '' + target = f'/day?date={date}' + redir_to = f'{target}&make_type={payload["make_type"]}' + self.check_post(payload, target, 302, redir_to) + for exp in exps: + exp.set_day_from_post(date, payload) + + def post_exp_process(self, + exps: list[Expected], + payload: dict[str, Any], + id_: int, + ) -> dict[str, object]: + """POST /process, appropriately update Expecteds.""" + if 'title' not in payload: + payload['title'] = 'foo' + if 'description' not in payload: + payload['description'] = 'foo' + if 'effort' not in payload: + payload['effort'] = 1.1 + self.check_post(payload, f'/process?id={id_}', + redir=f'/process?id={id_}') + for exp in exps: + exp.set_proc_from_post(id_, payload) + return payload + def check_redirect(self, target: str) -> None: """Check that self.conn answers with a 302 redirect to target.""" response = self.conn.getresponse() @@ -339,7 +921,7 @@ class TestCaseWithServer(TestCaseWithDB): self.assertEqual(self.conn.getresponse().status, expected_code) def check_post(self, data: Mapping[str, object], target: str, - expected_code: int, redirect_location: str = '') -> None: + expected_code: int = 302, redir: str = '') -> None: """Check that POST of data to target yields expected_code.""" encoded_form_data = urlencode(data, doseq=True).encode('utf-8') headers = {'Content-Type': 'application/x-www-form-urlencoded', @@ -347,9 +929,8 @@ class TestCaseWithServer(TestCaseWithDB): self.conn.request('POST', target, body=encoded_form_data, headers=headers) if 302 == expected_code: - if redirect_location == '': - redirect_location = target - self.check_redirect(redirect_location) + redir = target if redir == '' else redir + self.check_redirect(redir) else: self.assertEqual(self.conn.getresponse().status, expected_code) @@ -361,40 +942,72 @@ class TestCaseWithServer(TestCaseWithDB): self.check_get(f'/{path}?id=0', 500) self.check_get(f'{path}?id=1', 200) - def post_process(self, id_: int = 1, - form_data: dict[str, Any] | None = None - ) -> dict[str, Any]: - """POST basic Process.""" - if not form_data: - form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1} - self.check_post(form_data, f'/process?id={id_}', 302, - f'/process?id={id_}') - return form_data - - def check_json_get(self, path: str, expected: dict[str, object]) -> None: + def check_json_get(self, path: str, expected: Expected) -> None: """Compare JSON on GET path with expected. To simplify comparison of VersionedAttribute histories, transforms - timestamp keys of VersionedAttribute history keys into integers - counting chronologically forward from 0. + timestamp keys of VersionedAttribute history keys into (strings of) + integers counting chronologically forward from 0. """ + def rewrite_history_keys_in(item: Any) -> Any: if isinstance(item, dict): if '_versioned' in item.keys(): - for k in item['_versioned']: - vals = item['_versioned'][k].values() + for category in item['_versioned']: + vals = item['_versioned'][category].values() history = {} for i, val in enumerate(vals): - history[i] = val - item['_versioned'][k] = history - for k in list(item.keys()): - rewrite_history_keys_in(item[k]) + history[str(i)] = val + item['_versioned'][category] = history + for category in list(item.keys()): + rewrite_history_keys_in(item[category]) elif isinstance(item, list): item[:] = [rewrite_history_keys_in(i) for i in item] return item + + def walk_diffs(path: str, cmp1: object, cmp2: object) -> None: + # pylint: disable=too-many-branches + def warn(intro: str, val: object) -> None: + if isinstance(val, (str, int, float)): + print(intro, val) + else: + print(intro) + pprint(val) + if cmp1 != cmp2: + if isinstance(cmp1, dict) and isinstance(cmp2, dict): + for k, v in cmp1.items(): + if k not in cmp2: + warn(f'DIFF {path}: retrieved lacks {k}', v) + elif v != cmp2[k]: + walk_diffs(f'{path}:{k}', v, cmp2[k]) + for k in [k for k in cmp2.keys() if k not in cmp1]: + warn(f'DIFF {path}: expected lacks retrieved\'s {k}', + cmp2[k]) + elif isinstance(cmp1, list) and isinstance(cmp2, list): + for i, v1 in enumerate(cmp1): + if i >= len(cmp2): + warn(f'DIFF {path}[{i}] retrieved misses:', v1) + elif v1 != cmp2[i]: + walk_diffs(f'{path}[{i}]', v1, cmp2[i]) + if len(cmp2) > len(cmp1): + for i, v2 in enumerate(cmp2[len(cmp1):]): + warn(f'DIFF {path}[{len(cmp1)+i}] misses:', v2) + else: + warn(f'DIFF {path} – for expected:', cmp1) + warn('… and for retrieved:', cmp2) + self.conn.request('GET', path) response = self.conn.getresponse() self.assertEqual(response.status, 200) retrieved = json_loads(response.read().decode()) rewrite_history_keys_in(retrieved) - self.assertEqual(expected, retrieved) + cmp = expected.as_dict + try: + self.assertEqual(cmp, retrieved) + except AssertionError as e: + print('EXPECTED:') + pprint(cmp) + print('RETRIEVED:') + pprint(retrieved) + walk_diffs('', cmp, retrieved) + raise e