"""Shared test utilities."""
+# pylint: disable=too-many-lines
from __future__ import annotations
from unittest import TestCase
from typing import Mapping, Any, Callable
from http.client import HTTPConnection
from datetime import datetime, timedelta
from time import sleep
-from json import loads as json_loads
+from json import loads as json_loads, dumps as json_dumps
from urllib.parse import urlencode
from uuid import uuid4
from os import remove as remove_file
+from pprint import pprint
from plomtask.db import DatabaseFile, DatabaseConnection
from plomtask.http import TaskHandler, TaskServer
from plomtask.processes import Process, ProcessStep
VERSIONED_VALS: dict[str,
list[str] | list[float]] = {'str': ['A', 'B'],
'float': [0.3, 1.1]}
+VALID_TRUES = {True, 'True', 'true', '1', 'on'}
class TestCaseAugmented(TestCase):
default_init_kwargs: dict[str, Any] = {}
@staticmethod
- def _run_if_checked_class(f: Callable[..., None]) -> Callable[..., None]:
- def wrapper(self: TestCase) -> None:
- if hasattr(self, 'checked_class'):
- f(self)
- return wrapper
-
- @classmethod
- def _run_on_versioned_attributes(cls,
- f: Callable[..., None]
+ def _run_on_versioned_attributes(f: Callable[..., None]
) -> Callable[..., None]:
- @cls._run_if_checked_class
def wrapper(self: TestCase) -> None:
assert isinstance(self, TestCaseAugmented)
for attr_name in self.checked_class.to_save_versioned():
f(self, owner, attr_name, attr, default, to_set)
return wrapper
+ @classmethod
+ def _run_if_sans_db(cls, f: Callable[..., None]) -> Callable[..., None]:
+ def wrapper(self: TestCaseSansDB) -> None:
+ if issubclass(cls, TestCaseSansDB):
+ f(self)
+ return wrapper
+
+ @classmethod
+ def _run_if_with_db_but_not_server(cls,
+ f: Callable[..., None]
+ ) -> Callable[..., None]:
+ def wrapper(self: TestCaseWithDB) -> None:
+ if issubclass(cls, TestCaseWithDB) and\
+ not issubclass(cls, TestCaseWithServer):
+ f(self)
+ return wrapper
+
@classmethod
def _make_from_defaults(cls, id_: float | str | None) -> Any:
return cls.checked_class(id_, **cls.default_init_kwargs)
legal_ids: list[str] | list[int] = [1, 5]
illegal_ids: list[str] | list[int] = [0]
- @TestCaseAugmented._run_if_checked_class
+ @TestCaseAugmented._run_if_sans_db
def test_id_validation(self) -> None:
"""Test .id_ validation/setting."""
for id_ in self.illegal_ids:
obj = self._make_from_defaults(id_)
self.assertEqual(obj.id_, id_)
+ @TestCaseAugmented._run_if_sans_db
@TestCaseAugmented._run_on_versioned_attributes
def test_versioned_set(self,
_: Any,
__: str,
attr: VersionedAttribute,
default: str | float,
- to_set: list[str | float]
+ to_set: list[str] | list[float]
) -> None:
"""Test VersionedAttribute.set() behaves as expected."""
attr.set(default)
attr.set(to_set[1])
self.assertEqual(timesorted_vals, expected)
+ @TestCaseAugmented._run_if_sans_db
@TestCaseAugmented._run_on_versioned_attributes
def test_versioned_newest(self,
_: Any,
__: str,
attr: VersionedAttribute,
default: str | float,
- to_set: list[str | float]
+ to_set: list[str] | list[float]
) -> None:
"""Test VersionedAttribute.newest."""
# check .newest on empty history returns .default
attr.set(default)
self.assertEqual(attr.newest, default)
+ @TestCaseAugmented._run_if_sans_db
@TestCaseAugmented._run_on_versioned_attributes
def test_versioned_at(self,
_: Any,
__: str,
attr: VersionedAttribute,
default: str | float,
- to_set: list[str | float]
+ to_set: list[str] | list[float]
) -> None:
"""Test .at() returns values nearest to queried time, or default."""
# check .at() return default on empty history
class TestCaseWithDB(TestCaseAugmented):
"""Module tests not requiring DB setup."""
- default_ids: tuple[int | str, int | str, int | str] = (1, 2, 3)
+ default_ids: tuple[int, int, int] | tuple[str, str, str] = (1, 2, 3)
def setUp(self) -> None:
Condition.empty_cache()
tomorrow = datetime.now() + timedelta(days=+1)
self.assertEqual(start, yesterday.strftime(DATE_FORMAT))
self.assertEqual(end, tomorrow.strftime(DATE_FORMAT))
- # make dated items for non-empty results
+ # prepare dated items for non-empty results
kwargs_with_date = self.default_init_kwargs.copy()
if set_id_field:
kwargs_with_date['id_'] = None
self.assertEqual(start, end)
self.assertEqual(items, [obj_today])
+ @TestCaseAugmented._run_if_with_db_but_not_server
@TestCaseAugmented._run_on_versioned_attributes
def test_saving_versioned_attributes(self,
owner: Any,
attr_name: str,
attr: VersionedAttribute,
_: str | float,
- to_set: list[str | float]
+ to_set: list[str] | list[float]
) -> None:
"""Test storage and initialization of versioned attributes."""
attr_vals_saved = retrieve_attr_vals(attr)
self.assertEqual(to_set, attr_vals_saved)
- @TestCaseAugmented._run_if_checked_class
+ @TestCaseAugmented._run_if_with_db_but_not_server
def test_saving_and_caching(self) -> None:
"""Test effects of .cache() and .save()."""
id1 = self.default_ids[0]
obj2.save(self.db_conn)
self.assertEqual(self.checked_class.get_cache(), {id1: obj2})
# NB: we'll only compare hashes because obj2 itself disappears on
- # .from_table_row-trioggered database reload
+ # .from_table_row-triggered database reload
obj2_hash = hash(obj2)
found_in_db += self._load_from_db(id1)
self.assertEqual([hash(o) for o in found_in_db], [obj2_hash])
with self.assertRaises(HandledException):
obj1.save(self.db_conn)
- @TestCaseAugmented._run_if_checked_class
+ @TestCaseAugmented._run_if_with_db_but_not_server
def test_by_id(self) -> None:
"""Test .by_id()."""
id1, id2, _ = self.default_ids
obj2.save(self.db_conn)
self.assertEqual(obj2, self.checked_class.by_id(self.db_conn, id2))
- @TestCaseAugmented._run_if_checked_class
+ @TestCaseAugmented._run_if_with_db_but_not_server
def test_by_id_or_create(self) -> None:
"""Test .by_id_or_create."""
# check .by_id_or_create fails if wrong class
self.checked_class.by_id(self.db_conn, item.id_)
self.assertEqual(self.checked_class(item.id_), item)
- @TestCaseAugmented._run_if_checked_class
+ @TestCaseAugmented._run_if_with_db_but_not_server
def test_from_table_row(self) -> None:
"""Test .from_table_row() properly reads in class directly from DB."""
id_ = self.default_ids[0]
self.assertEqual({retrieved.id_: retrieved},
self.checked_class.get_cache())
+ @TestCaseAugmented._run_if_with_db_but_not_server
@TestCaseAugmented._run_on_versioned_attributes
def test_versioned_history_from_row(self,
owner: Any,
_: str,
attr: VersionedAttribute,
default: str | float,
- to_set: list[str | float]
+ to_set: list[str] | list[float]
) -> None:
""""Test VersionedAttribute.history_from_row() knows its DB rows."""
attr.set(to_set[0])
for timestamp, value in attr.history.items():
self.assertEqual(value, loaded_attr.history[timestamp])
- @TestCaseAugmented._run_if_checked_class
+ @TestCaseAugmented._run_if_with_db_but_not_server
def test_all(self) -> None:
"""Test .all() and its relation to cache and savings."""
id1, id2, id3 = self.default_ids
self.assertEqual(sorted(self.checked_class.all(self.db_conn)),
sorted([item1, item2, item3]))
- @TestCaseAugmented._run_if_checked_class
+ @TestCaseAugmented._run_if_with_db_but_not_server
def test_singularity(self) -> None:
"""Test pointers made for single object keep pointing to it."""
id1 = self.default_ids[0]
retrieved = self.checked_class.by_id(self.db_conn, id1)
self.assertEqual(new_attr, getattr(retrieved, attr_name))
+ @TestCaseAugmented._run_if_with_db_but_not_server
@TestCaseAugmented._run_on_versioned_attributes
def test_versioned_singularity(self,
owner: Any,
attr_name: str,
attr: VersionedAttribute,
_: str | float,
- to_set: list[str | float]
+ to_set: list[str] | list[float]
) -> None:
"""Test singularity of VersionedAttributes on saving."""
owner.save(self.db_conn)
attr_retrieved = getattr(retrieved, attr_name)
self.assertEqual(attr.history, attr_retrieved.history)
- @TestCaseAugmented._run_if_checked_class
+ @TestCaseAugmented._run_if_with_db_but_not_server
def test_remove(self) -> None:
"""Test .remove() effects on DB and cache."""
id_ = self.default_ids[0]
self.check_identity_with_cache_and_db([])
-class TestCaseWithServer(TestCaseWithDB):
- """Module tests against our HTTP server/handler (and database)."""
+class Expected:
+ """Builder of (JSON-like) dict to compare against responses of test server.
- def setUp(self) -> None:
- super().setUp()
- self.httpd = TaskServer(self.db_file, ('localhost', 0), TaskHandler)
- self.server_thread = Thread(target=self.httpd.serve_forever)
- self.server_thread.daemon = True
- self.server_thread.start()
- self.conn = HTTPConnection(str(self.httpd.server_address[0]),
- self.httpd.server_address[1])
- self.httpd.set_json_mode()
+ Collects all items and relations we expect expressed in the server's JSON
+ responses and puts them into the proper json.dumps-friendly dict structure,
+ accessibla via .as_dict, to compare them in TestsWithServer.check_json_get.
- def tearDown(self) -> None:
- self.httpd.shutdown()
- self.httpd.server_close()
- self.server_thread.join()
- super().tearDown()
+ On its own provides for .as_dict output only {"_library": …}, initialized
+ from .__init__ and to be directly manipulated via the .lib* methods.
+ Further structures of the expected response may be added and kept
+ up-to-date by subclassing .__init__, .recalc, and .d.
- @staticmethod
- def as_id_list(items: list[dict[str, object]]) -> list[int | str]:
- """Return list of only 'id' fields of items."""
- id_list = []
- for item in items:
- assert isinstance(item['id'], (int, str))
- id_list += [item['id']]
- return id_list
+ NB: Lots of expectations towards server behavior will be made explicit here
+ (or in the subclasses) rather than in the actual TestCase methods' code.
+ """
+ _default_dict: dict[str, Any]
+ _forced: dict[str, Any]
+ _fields: dict[str, Any]
+ _on_empty_make_temp: tuple[str, str]
+
+ def __init__(self,
+ todos: list[dict[str, Any]] | None = None,
+ procs: list[dict[str, Any]] | None = None,
+ procsteps: list[dict[str, Any]] | None = None,
+ conds: list[dict[str, Any]] | None = None,
+ days: list[dict[str, Any]] | None = None
+ ) -> None:
+ # pylint: disable=too-many-arguments
+ for name in ['_default_dict', '_fields', '_forced']:
+ if not hasattr(self, name):
+ setattr(self, name, {})
+ self._lib = {}
+ for title, items in [('Todo', todos),
+ ('Process', procs),
+ ('ProcessStep', procsteps),
+ ('Condition', conds),
+ ('Day', days)]:
+ if items:
+ self._lib[title] = self._as_refs(items)
+ for k, v in self._default_dict.items():
+ if k not in self._fields:
+ self._fields[k] = v
+
+ def recalc(self) -> None:
+ """Update internal dictionary by subclass-specific rules."""
+ todos = self.lib_all('Todo')
+ for todo in todos:
+ todo['parents'] = []
+ for todo in todos:
+ for child_id in todo['children']:
+ self.lib_get('Todo', child_id)['parents'] += [todo['id']]
+ todo['children'].sort()
+ procsteps = self.lib_all('ProcessStep')
+ procs = self.lib_all('Process')
+ for proc in procs:
+ proc['explicit_steps'] = [s['id'] for s in procsteps
+ if s['owner_id'] == proc['id']]
+
+ @property
+ def as_dict(self) -> dict[str, Any]:
+ """Return dict to compare against test server JSON responses."""
+ make_temp = False
+ if hasattr(self, '_on_empty_make_temp'):
+ category, dicter = getattr(self, '_on_empty_make_temp')
+ id_ = self._fields[category.lower()]
+ make_temp = not bool(self.lib_get(category, id_))
+ if make_temp:
+ self.lib_set(category, [getattr(self, dicter)(id_)])
+ self.recalc()
+ d = {'_library': self._lib}
+ for k, v in self._fields.items():
+ # we expect everything sortable to be sorted
+ if isinstance(v, list) and k not in self._forced:
+ # NB: if we don't test for v being list, sorted() on an empty
+ # dict may return an empty list
+ try:
+ v = sorted(v)
+ except TypeError:
+ pass
+ d[k] = v
+ for k, v in self._forced.items():
+ d[k] = v
+ if make_temp:
+ json = json_dumps(d)
+ id_ = id_ if id_ is not None else '?'
+ self.lib_del(category, id_)
+ d = json_loads(json)
+ return d
+
+ def lib_get(self, category: str, id_: str | int) -> dict[str, Any]:
+ """From library, return item of category and id_, or empty dict."""
+ str_id = str(id_)
+ if category in self._lib and str_id in self._lib[category]:
+ return self._lib[category][str_id]
+ return {}
+
+ def lib_all(self, category: str) -> list[dict[str, Any]]:
+ """From library, return items of category, or [] if none."""
+ if category in self._lib:
+ return list(self._lib[category].values())
+ return []
+
+ def lib_set(self, category: str, items: list[dict[str, object]]) -> None:
+ """Update library for category with items."""
+ if category not in self._lib:
+ self._lib[category] = {}
+ for k, v in self._as_refs(items).items():
+ self._lib[category][k] = v
+
+ def lib_del(self, category: str, id_: str | int) -> None:
+ """Remove category element of id_ from library."""
+ del self._lib[category][str(id_)]
+ if 0 == len(self._lib[category]):
+ del self._lib[category]
+
+ def lib_wipe(self, category: str) -> None:
+ """Remove category from library."""
+ if category in self._lib:
+ del self._lib[category]
+
+ def set(self, field_name: str, value: object) -> None:
+ """Set top-level .as_dict field."""
+ self._fields[field_name] = value
+
+ def force(self, field_name: str, value: object) -> None:
+ """Set ._forced field to ensure value in .as_dict."""
+ self._forced[field_name] = value
+
+ def unforce(self, field_name: str) -> None:
+ """Unset ._forced field."""
+ del self._forced[field_name]
@staticmethod
- def as_refs(items: list[dict[str, object]]
- ) -> dict[str, dict[str, object]]:
+ def _as_refs(items: list[dict[str, object]]
+ ) -> dict[str, dict[str, object]]:
"""Return dictionary of items by their 'id' fields."""
refs = {}
for item in items:
- refs[str(item['id'])] = item
+ id_ = str(item['id']) if item['id'] is not None else '?'
+ refs[id_] = item
return refs
+ @staticmethod
+ def as_ids(items: list[dict[str, Any]]) -> list[int] | list[str]:
+ """Return list of only 'id' fields of items."""
+ return [item['id'] for item in items]
+
+ @staticmethod
+ def day_as_dict(date: str, comment: str = '') -> dict[str, object]:
+ """Return JSON of Day to expect."""
+ return {'id': date, 'comment': comment, 'todos': []}
+
+ def set_day_from_post(self, date: str, d: dict[str, Any]) -> None:
+ """Set Day of date in library based on POST dict d."""
+ day = self.day_as_dict(date)
+ for k, v in d.items():
+ if 'day_comment' == k:
+ day['comment'] = v
+ elif 'new_todo' == k:
+ next_id = 1
+ for todo in self.lib_all('Todo'):
+ if next_id <= todo['id']:
+ next_id = todo['id'] + 1
+ for proc_id in sorted([id_ for id_ in v if id_]):
+ todo = self.todo_as_dict(next_id, proc_id, date)
+ self.lib_set('Todo', [todo])
+ next_id += 1
+ elif 'done' == k:
+ for todo_id in v:
+ self.lib_get('Todo', todo_id)['is_done'] = True
+ elif 'todo_id' == k:
+ for i, todo_id in enumerate(v):
+ t = self.lib_get('Todo', todo_id)
+ if 'comment' in d:
+ t['comment'] = d['comment'][i]
+ if 'effort' in d:
+ effort = d['effort'][i] if d['effort'][i] else None
+ t['effort'] = effort
+ self.lib_set('Day', [day])
+
@staticmethod
def cond_as_dict(id_: int = 1,
is_active: bool = False,
- titles: None | list[str] = None,
- descriptions: None | list[str] = None
+ title: None | str = None,
+ description: None | str = None,
) -> dict[str, object]:
"""Return JSON of Condition to expect."""
+ versioned: dict[str, dict[str, object]]
+ versioned = {'title': {}, 'description': {}}
+ if title is not None:
+ versioned['title']['0'] = title
+ if description is not None:
+ versioned['description']['0'] = description
+ return {'id': id_, 'is_active': is_active, '_versioned': versioned}
+
+ def set_cond_from_post(self, id_: int, d: dict[str, Any]) -> None:
+ """Set Condition of id_ in library based on POST dict d."""
+ if d == {'delete': ''}:
+ self.lib_del('Condition', id_)
+ return
+ cond = self.lib_get('Condition', id_)
+ if cond:
+ if 'is_active' in d:
+ cond['is_active'] = d['is_active']
+ for category in ['title', 'description']:
+ history = cond['_versioned'][category]
+ if len(history) > 0:
+ last_i = sorted([int(k) for k in history.keys()])[-1]
+ if d[category] != history[str(last_i)]:
+ history[str(last_i + 1)] = d[category]
+ else:
+ history['0'] = d[category]
+ else:
+ cond = self.cond_as_dict(id_, **d)
+ self.lib_set('Condition', [cond])
+
+ @staticmethod
+ def todo_as_dict(id_: int = 1,
+ process_id: int = 1,
+ date: str = '2024-01-01',
+ conditions: None | list[int] = None,
+ disables: None | list[int] = None,
+ blockers: None | list[int] = None,
+ enables: None | list[int] = None,
+ calendarize: bool = False,
+ comment: str = '',
+ is_done: bool = False,
+ effort: float | None = None,
+ children: list[int] | None = None,
+ parents: list[int] | None = None,
+ ) -> dict[str, object]:
+ """Return JSON of Todo to expect."""
+ # pylint: disable=too-many-arguments
d = {'id': id_,
- 'is_active': is_active,
- '_versioned': {
- 'title': {},
- 'description': {}}}
- titles = titles if titles else []
- descriptions = descriptions if descriptions else []
- assert isinstance(d['_versioned'], dict)
- for i, title in enumerate(titles):
- d['_versioned']['title'][i] = title
- for i, description in enumerate(descriptions):
- d['_versioned']['description'][i] = description
+ 'date': date,
+ 'process_id': process_id,
+ 'is_done': is_done,
+ 'calendarize': calendarize,
+ 'comment': comment,
+ 'children': children if children else [],
+ 'parents': parents if parents else [],
+ 'effort': effort,
+ 'conditions': conditions if conditions else [],
+ 'disables': disables if disables else [],
+ 'blockers': blockers if blockers else [],
+ 'enables': enables if enables else []}
return d
+ def set_todo_from_post(self, id_: int, d: dict[str, Any]) -> None:
+ """Set Todo of id_ in library based on POST dict d."""
+ corrected_kwargs: dict[str, Any] = {'children': []}
+ for k, v in d.items():
+ if k.startswith('step_filler_to_'):
+ continue
+ if 'adopt' == k:
+ new_children = v if isinstance(v, list) else [v]
+ corrected_kwargs['children'] += new_children
+ continue
+ if k in {'is_done', 'calendarize'}:
+ v = v in VALID_TRUES
+ corrected_kwargs[k] = v
+ todo = self.lib_get('Todo', id_)
+ if todo:
+ for k, v in corrected_kwargs.items():
+ todo[k] = v
+ else:
+ todo = self.todo_as_dict(id_, **corrected_kwargs)
+ self.lib_set('Todo', [todo])
+
+ @staticmethod
+ def procstep_as_dict(id_: int,
+ owner_id: int,
+ step_process_id: int,
+ parent_step_id: int | None = None
+ ) -> dict[str, object]:
+ """Return JSON of ProcessStep to expect."""
+ return {'id': id_,
+ 'owner_id': owner_id,
+ 'step_process_id': step_process_id,
+ 'parent_step_id': parent_step_id}
+
@staticmethod
def proc_as_dict(id_: int = 1,
- title: str = 'A',
- description: str = '',
- effort: float = 1.0,
+ title: None | str = None,
+ description: None | str = None,
+ effort: None | float = None,
conditions: None | list[int] = None,
disables: None | list[int] = None,
blockers: None | list[int] = None,
- enables: None | list[int] = None
+ enables: None | list[int] = None,
+ explicit_steps: None | list[int] = None
) -> dict[str, object]:
"""Return JSON of Process to expect."""
# pylint: disable=too-many-arguments
+ versioned: dict[str, dict[str, object]]
+ versioned = {'title': {}, 'description': {}, 'effort': {}}
+ if title is not None:
+ versioned['title']['0'] = title
+ if description is not None:
+ versioned['description']['0'] = description
+ if effort is not None:
+ versioned['effort']['0'] = effort
d = {'id': id_,
'calendarize': False,
'suppressed_steps': [],
- 'explicit_steps': [],
- '_versioned': {
- 'title': {0: title},
- 'description': {0: description},
- 'effort': {0: effort}},
+ 'explicit_steps': explicit_steps if explicit_steps else [],
+ '_versioned': versioned,
'conditions': conditions if conditions else [],
'disables': disables if disables else [],
'enables': enables if enables else [],
'blockers': blockers if blockers else []}
return d
+ def set_proc_from_post(self, id_: int, d: dict[str, Any]) -> None:
+ """Set Process of id_ in library based on POST dict d."""
+ proc = self.lib_get('Process', id_)
+ if proc:
+ for category in ['title', 'description', 'effort']:
+ history = proc['_versioned'][category]
+ if len(history) > 0:
+ last_i = sorted([int(k) for k in history.keys()])[-1]
+ if d[category] != history[str(last_i)]:
+ history[str(last_i + 1)] = d[category]
+ else:
+ history['0'] = d[category]
+ else:
+ proc = self.proc_as_dict(id_,
+ d['title'], d['description'], d['effort'])
+ ignore = {'title', 'description', 'effort', 'new_top_step', 'step_of',
+ 'kept_steps'}
+ for k, v in d.items():
+ if k in ignore\
+ or k.startswith('step_') or k.startswith('new_step_to'):
+ continue
+ if k in {'calendarize'}:
+ v = v in VALID_TRUES
+ elif k in {'suppressed_steps', 'explicit_steps', 'conditions',
+ 'disables', 'enables', 'blockers'}:
+ if not isinstance(v, list):
+ v = [v]
+ proc[k] = v
+ self.lib_set('Process', [proc])
+
+
+class TestCaseWithServer(TestCaseWithDB):
+ """Module tests against our HTTP server/handler (and database)."""
+
+ def setUp(self) -> None:
+ super().setUp()
+ self.httpd = TaskServer(self.db_file, ('localhost', 0), TaskHandler)
+ self.server_thread = Thread(target=self.httpd.serve_forever)
+ self.server_thread.daemon = True
+ self.server_thread.start()
+ self.conn = HTTPConnection(str(self.httpd.server_address[0]),
+ self.httpd.server_address[1])
+ self.httpd.render_mode = 'json'
+
+ def tearDown(self) -> None:
+ self.httpd.shutdown()
+ self.httpd.server_close()
+ self.server_thread.join()
+ super().tearDown()
+
+ def post_exp_cond(self,
+ exps: list[Expected],
+ payload: dict[str, object],
+ id_: int = 1,
+ post_to_id: bool = True,
+ redir_to_id: bool = True
+ ) -> None:
+ """POST /condition(s), appropriately update Expecteds."""
+ # pylint: disable=too-many-arguments
+ target = f'/condition?id={id_}' if post_to_id else '/condition'
+ redir = f'/condition?id={id_}' if redir_to_id else '/conditions'
+ self.check_post(payload, target, redir=redir)
+ for exp in exps:
+ exp.set_cond_from_post(id_, payload)
+
+ def post_exp_day(self,
+ exps: list[Expected],
+ payload: dict[str, Any],
+ date: str = '2024-01-01'
+ ) -> None:
+ """POST /day, appropriately update Expecteds."""
+ if 'make_type' not in payload:
+ payload['make_type'] = 'empty'
+ if 'day_comment' not in payload:
+ payload['day_comment'] = ''
+ target = f'/day?date={date}'
+ redir_to = f'{target}&make_type={payload["make_type"]}'
+ self.check_post(payload, target, 302, redir_to)
+ for exp in exps:
+ exp.set_day_from_post(date, payload)
+
+ def post_exp_process(self,
+ exps: list[Expected],
+ payload: dict[str, Any],
+ id_: int,
+ ) -> dict[str, object]:
+ """POST /process, appropriately update Expecteds."""
+ if 'title' not in payload:
+ payload['title'] = 'foo'
+ if 'description' not in payload:
+ payload['description'] = 'foo'
+ if 'effort' not in payload:
+ payload['effort'] = 1.1
+ self.check_post(payload, f'/process?id={id_}',
+ redir=f'/process?id={id_}')
+ for exp in exps:
+ exp.set_proc_from_post(id_, payload)
+ return payload
+
+ def check_filter(self, exp: Expected, category: str, key: str,
+ val: str, list_ids: list[int]) -> None:
+ """Check GET /{category}?{key}={val} sorts to list_ids."""
+ # pylint: disable=too-many-arguments
+ exp.set(key, val)
+ exp.force(category, list_ids)
+ self.check_json_get(f'/{category}?{key}={val}', exp)
+
def check_redirect(self, target: str) -> None:
"""Check that self.conn answers with a 302 redirect to target."""
response = self.conn.getresponse()
else:
self.assertEqual(self.conn.getresponse().status, expected_code)
- def check_get_defaults(self, path: str) -> None:
+ def check_get_defaults(self,
+ path: str,
+ default_id: str = '1',
+ id_name: str = 'id'
+ ) -> None:
"""Some standard model paths to test."""
- self.check_get(path, 200)
- self.check_get(f'{path}?id=', 200)
- self.check_get(f'{path}?id=foo', 400)
- self.check_get(f'/{path}?id=0', 500)
- self.check_get(f'{path}?id=1', 200)
-
- def post_process(self, id_: int = 1,
- form_data: dict[str, Any] | None = None
- ) -> dict[str, Any]:
- """POST basic Process."""
- if not form_data:
- form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1}
- self.check_post(form_data, f'/process?id={id_}',
- redir=f'/process?id={id_}')
- return form_data
+ nonexist_status = 200 if self.checked_class.can_create_by_id else 404
+ self.check_get(path, nonexist_status)
+ self.check_get(f'{path}?{id_name}=', 400)
+ self.check_get(f'{path}?{id_name}=foo', 400)
+ self.check_get(f'/{path}?{id_name}=0', 400)
+ self.check_get(f'{path}?{id_name}={default_id}', nonexist_status)
- def check_json_get(self, path: str, expected: dict[str, object]) -> None:
+ def check_json_get(self, path: str, expected: Expected) -> None:
"""Compare JSON on GET path with expected.
To simplify comparison of VersionedAttribute histories, transforms
- timestamp keys of VersionedAttribute history keys into integers
- counting chronologically forward from 0.
+ timestamp keys of VersionedAttribute history keys into (strings of)
+ integers counting chronologically forward from 0.
"""
def rewrite_history_keys_in(item: Any) -> Any:
if isinstance(item, dict):
if '_versioned' in item.keys():
- for k in item['_versioned']:
- vals = item['_versioned'][k].values()
+ for category in item['_versioned']:
+ vals = item['_versioned'][category].values()
history = {}
for i, val in enumerate(vals):
- history[i] = val
- item['_versioned'][k] = history
- for k in list(item.keys()):
- rewrite_history_keys_in(item[k])
+ history[str(i)] = val
+ item['_versioned'][category] = history
+ for category in list(item.keys()):
+ rewrite_history_keys_in(item[category])
elif isinstance(item, list):
item[:] = [rewrite_history_keys_in(i) for i in item]
return item
+ def walk_diffs(path: str, cmp1: object, cmp2: object) -> None:
+ # pylint: disable=too-many-branches
+ def warn(intro: str, val: object) -> None:
+ if isinstance(val, (str, int, float)):
+ print(intro, val)
+ else:
+ print(intro)
+ pprint(val)
+ if cmp1 != cmp2:
+ if isinstance(cmp1, dict) and isinstance(cmp2, dict):
+ for k, v in cmp1.items():
+ if k not in cmp2:
+ warn(f'DIFF {path}: retrieved lacks {k}', v)
+ elif v != cmp2[k]:
+ walk_diffs(f'{path}:{k}', v, cmp2[k])
+ for k in [k for k in cmp2.keys() if k not in cmp1]:
+ warn(f'DIFF {path}: expected lacks retrieved\'s {k}',
+ cmp2[k])
+ elif isinstance(cmp1, list) and isinstance(cmp2, list):
+ for i, v1 in enumerate(cmp1):
+ if i >= len(cmp2):
+ warn(f'DIFF {path}[{i}] retrieved misses:', v1)
+ elif v1 != cmp2[i]:
+ walk_diffs(f'{path}[{i}]', v1, cmp2[i])
+ if len(cmp2) > len(cmp1):
+ for i, v2 in enumerate(cmp2[len(cmp1):]):
+ warn(f'DIFF {path}[{len(cmp1)+i}] misses:', v2)
+ else:
+ warn(f'DIFF {path} – for expected:', cmp1)
+ warn('… and for retrieved:', cmp2)
+
self.conn.request('GET', path)
response = self.conn.getresponse()
self.assertEqual(response.status, 200)
retrieved = json_loads(response.read().decode())
rewrite_history_keys_in(retrieved)
- self.assertEqual(expected, retrieved)
+ cmp = expected.as_dict
+ try:
+ self.assertEqual(cmp, retrieved)
+ except AssertionError as e:
+ print('EXPECTED:')
+ pprint(cmp)
+ print('RETRIEVED:')
+ pprint(retrieved)
+ walk_diffs('', cmp, retrieved)
+ raise e