from typing import Mapping, Any, Callable
from threading import Thread
from http.client import HTTPConnection
+from datetime import datetime, timedelta
+from time import sleep
from json import loads as json_loads
from urllib.parse import urlencode
from uuid import uuid4
from plomtask.processes import Process, ProcessStep
from plomtask.conditions import Condition
from plomtask.days import Day
+from plomtask.dating import DATE_FORMAT
from plomtask.todos import Todo
+from plomtask.versioned_attributes import VersionedAttribute, TIMESTAMP_FMT
from plomtask.exceptions import NotFoundException, HandledException
-class TestCaseSansDB(TestCase):
- """Tests requiring no DB setup."""
+VERSIONED_VALS: dict[str,
+ list[str] | list[float]] = {'str': ['A', 'B'],
+ 'float': [0.3, 1.1]}
+
+
+class TestCaseAugmented(TestCase):
+ """Tester core providing helpful basic internal decorators and methods."""
checked_class: Any
- do_id_test: bool = False
- default_init_args: list[Any] = []
- versioned_defaults_to_test: dict[str, str | float] = {}
+ default_init_kwargs: dict[str, Any] = {}
- def test_id_setting(self) -> None:
- """Test .id_ being set and its legal range being enforced."""
- if not self.do_id_test:
- return
- with self.assertRaises(HandledException):
- self.checked_class(0, *self.default_init_args)
- obj = self.checked_class(5, *self.default_init_args)
- self.assertEqual(obj.id_, 5)
+ @staticmethod
+ def _run_if_checked_class(f: Callable[..., None]) -> Callable[..., None]:
+ def wrapper(self: TestCase) -> None:
+ if hasattr(self, 'checked_class'):
+ f(self)
+ return wrapper
+
+ @classmethod
+ def _run_on_versioned_attributes(cls,
+ f: Callable[..., None]
+ ) -> Callable[..., None]:
+ @cls._run_if_checked_class
+ def wrapper(self: TestCase) -> None:
+ assert isinstance(self, TestCaseAugmented)
+ for attr_name in self.checked_class.to_save_versioned():
+ default = self.checked_class.versioned_defaults[attr_name]
+ owner = self.checked_class(None, **self.default_init_kwargs)
+ attr = getattr(owner, attr_name)
+ to_set = VERSIONED_VALS[attr.value_type_name]
+ f(self, owner, attr_name, attr, default, to_set)
+ return wrapper
+
+ @classmethod
+ def _make_from_defaults(cls, id_: float | str | None) -> Any:
+ return cls.checked_class(id_, **cls.default_init_kwargs)
- def test_versioned_defaults(self) -> None:
- """Test defaults of VersionedAttributes."""
- if len(self.versioned_defaults_to_test) == 0:
- return
- obj = self.checked_class(1, *self.default_init_args)
- for k, v in self.versioned_defaults_to_test.items():
- self.assertEqual(getattr(obj, k).newest, v)
+class TestCaseSansDB(TestCaseAugmented):
+ """Tests requiring no DB setup."""
+ legal_ids: list[str] | list[int] = [1, 5]
+ illegal_ids: list[str] | list[int] = [0]
-class TestCaseWithDB(TestCase):
+ @TestCaseAugmented._run_if_checked_class
+ def test_id_validation(self) -> None:
+ """Test .id_ validation/setting."""
+ for id_ in self.illegal_ids:
+ with self.assertRaises(HandledException):
+ self._make_from_defaults(id_)
+ for id_ in self.legal_ids:
+ obj = self._make_from_defaults(id_)
+ self.assertEqual(obj.id_, id_)
+
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_versioned_set(self,
+ _: Any,
+ __: str,
+ attr: VersionedAttribute,
+ default: str | float,
+ to_set: list[str] | list[float]
+ ) -> None:
+ """Test VersionedAttribute.set() behaves as expected."""
+ attr.set(default)
+ self.assertEqual(list(attr.history.values()), [default])
+ # check same value does not get set twice in a row,
+ # and that not even its timestamp get updated
+ timestamp = list(attr.history.keys())[0]
+ attr.set(default)
+ self.assertEqual(list(attr.history.values()), [default])
+ self.assertEqual(list(attr.history.keys())[0], timestamp)
+ # check that different value _will_ be set/added
+ attr.set(to_set[0])
+ timesorted_vals = [attr.history[t] for
+ t in sorted(attr.history.keys())]
+ expected = [default, to_set[0]]
+ self.assertEqual(timesorted_vals, expected)
+ # check that a previously used value can be set if not most recent
+ attr.set(default)
+ timesorted_vals = [attr.history[t] for
+ t in sorted(attr.history.keys())]
+ expected = [default, to_set[0], default]
+ self.assertEqual(timesorted_vals, expected)
+ # again check for same value not being set twice in a row, even for
+ # later items
+ attr.set(to_set[1])
+ timesorted_vals = [attr.history[t] for
+ t in sorted(attr.history.keys())]
+ expected = [default, to_set[0], default, to_set[1]]
+ self.assertEqual(timesorted_vals, expected)
+ attr.set(to_set[1])
+ self.assertEqual(timesorted_vals, expected)
+
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_versioned_newest(self,
+ _: Any,
+ __: str,
+ attr: VersionedAttribute,
+ default: str | float,
+ to_set: list[str] | list[float]
+ ) -> None:
+ """Test VersionedAttribute.newest."""
+ # check .newest on empty history returns .default
+ self.assertEqual(attr.newest, default)
+ # check newest element always returned
+ for v in [to_set[0], to_set[1]]:
+ attr.set(v)
+ self.assertEqual(attr.newest, v)
+ # check newest element returned even if also early value
+ attr.set(default)
+ self.assertEqual(attr.newest, default)
+
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_versioned_at(self,
+ _: Any,
+ __: str,
+ attr: VersionedAttribute,
+ default: str | float,
+ to_set: list[str] | list[float]
+ ) -> None:
+ """Test .at() returns values nearest to queried time, or default."""
+ # check .at() return default on empty history
+ timestamp_a = datetime.now().strftime(TIMESTAMP_FMT)
+ self.assertEqual(attr.at(timestamp_a), default)
+ # check value exactly at timestamp returned
+ attr.set(to_set[0])
+ timestamp_b = list(attr.history.keys())[0]
+ self.assertEqual(attr.at(timestamp_b), to_set[0])
+ # check earliest value returned if exists, rather than default
+ self.assertEqual(attr.at(timestamp_a), to_set[0])
+ # check reverts to previous value for timestamps not indexed
+ sleep(0.00001)
+ timestamp_between = datetime.now().strftime(TIMESTAMP_FMT)
+ sleep(0.00001)
+ attr.set(to_set[1])
+ timestamp_c = sorted(attr.history.keys())[-1]
+ self.assertEqual(attr.at(timestamp_c), to_set[1])
+ self.assertEqual(attr.at(timestamp_between), to_set[0])
+ sleep(0.00001)
+ timestamp_after_c = datetime.now().strftime(TIMESTAMP_FMT)
+ self.assertEqual(attr.at(timestamp_after_c), to_set[1])
+
+
+class TestCaseWithDB(TestCaseAugmented):
"""Module tests not requiring DB setup."""
- checked_class: Any
- default_ids: tuple[int | str, int | str, int | str] = (1, 2, 3)
- default_init_kwargs: dict[str, Any] = {}
- test_versioneds: dict[str, type] = {}
+ default_ids: tuple[int, int, int] | tuple[str, str, str] = (1, 2, 3)
def setUp(self) -> None:
Condition.empty_cache()
self.db_conn.close()
remove_file(self.db_file.path)
- @staticmethod
- def _within_checked_class(f: Callable[..., None]) -> Callable[..., None]:
- def wrapper(self: TestCaseWithDB) -> None:
- if hasattr(self, 'checked_class'):
- f(self)
- return wrapper
-
def _load_from_db(self, id_: int | str) -> list[object]:
db_found: list[object] = []
for row in self.db_conn.row_where(self.checked_class.table_name,
row)]
return db_found
- @_within_checked_class
- def test_saving_versioned(self) -> None:
- """Test storage and initialization of versioned attributes."""
- def retrieve_attr_vals() -> list[object]:
- attr_vals_saved: list[object] = []
- assert hasattr(retrieved, 'id_')
- for row in self.db_conn.row_where(attr.table_name, 'parent',
- retrieved.id_):
- attr_vals_saved += [row[2]]
- return attr_vals_saved
- for attr_name, type_ in self.test_versioneds.items():
- # fail saving attributes on non-saved owner
- owner = self.checked_class(None, **self.default_init_kwargs)
- vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1]
- attr = getattr(owner, attr_name)
- attr.set(vals[0])
- attr.set(vals[1])
- with self.assertRaises(NotFoundException):
- attr.save(self.db_conn)
- owner.save(self.db_conn)
- # check stored attribute is as expected
- retrieved = self._load_from_db(owner.id_)[0]
- attr = getattr(retrieved, attr_name)
- self.assertEqual(sorted(attr.history.values()), vals)
- # check owner.save() created entries in attr table
- attr_vals_saved = retrieve_attr_vals()
- self.assertEqual(vals, attr_vals_saved)
- # check setting new val to attr inconsequential to DB without save
- attr.set(vals[0])
- attr_vals_saved = retrieve_attr_vals()
- self.assertEqual(vals, attr_vals_saved)
- # check save finally adds new val
- attr.save(self.db_conn)
- attr_vals_saved = retrieve_attr_vals()
- self.assertEqual(vals + [vals[0]], attr_vals_saved)
+ def _change_obj(self, obj: object) -> str:
+ attr_name: str = self.checked_class.to_save_simples[-1]
+ attr = getattr(obj, attr_name)
+ new_attr: str | int | float | bool
+ if isinstance(attr, (int, float)):
+ new_attr = attr + 1
+ elif isinstance(attr, str):
+ new_attr = attr + '_'
+ elif isinstance(attr, bool):
+ new_attr = not attr
+ setattr(obj, attr_name, new_attr)
+ return attr_name
def check_identity_with_cache_and_db(self, content: list[Any]) -> None:
"""Test both cache and DB equal content."""
hashes_db_found = [hash(x) for x in db_found]
self.assertEqual(sorted(hashes_content), sorted(hashes_db_found))
- @_within_checked_class
+ def check_by_date_range_with_limits(self,
+ date_col: str,
+ set_id_field: bool = True
+ ) -> None:
+ """Test .by_date_range_with_limits."""
+ # pylint: disable=too-many-locals
+ f = self.checked_class.by_date_range_with_limits
+ # check illegal ranges
+ legal_range = ('yesterday', 'tomorrow')
+ for i in [0, 1]:
+ for bad_date in ['foo', '2024-02-30', '2024-01-01 12:00:00']:
+ date_range = list(legal_range[:])
+ date_range[i] = bad_date
+ with self.assertRaises(HandledException):
+ f(self.db_conn, date_range, date_col)
+ # check empty, translation of 'yesterday' and 'tomorrow'
+ items, start, end = f(self.db_conn, legal_range, date_col)
+ self.assertEqual(items, [])
+ yesterday = datetime.now() + timedelta(days=-1)
+ tomorrow = datetime.now() + timedelta(days=+1)
+ self.assertEqual(start, yesterday.strftime(DATE_FORMAT))
+ self.assertEqual(end, tomorrow.strftime(DATE_FORMAT))
+ # prepare dated items for non-empty results
+ kwargs_with_date = self.default_init_kwargs.copy()
+ if set_id_field:
+ kwargs_with_date['id_'] = None
+ objs = []
+ dates = ['2024-01-01', '2024-01-02', '2024-01-04']
+ for date in ['2024-01-01', '2024-01-02', '2024-01-04']:
+ kwargs_with_date['date'] = date
+ obj = self.checked_class(**kwargs_with_date)
+ objs += [obj]
+ # check ranges still empty before saving
+ date_range = [dates[0], dates[-1]]
+ self.assertEqual(f(self.db_conn, date_range, date_col)[0], [])
+ # check all objs displayed within closed interval
+ for obj in objs:
+ obj.save(self.db_conn)
+ self.assertEqual(f(self.db_conn, date_range, date_col)[0], objs)
+ # check that only displayed what exists within interval
+ date_range = ['2023-12-20', '2024-01-03']
+ expected = [objs[0], objs[1]]
+ self.assertEqual(f(self.db_conn, date_range, date_col)[0], expected)
+ date_range = ['2024-01-03', '2024-01-30']
+ expected = [objs[2]]
+ self.assertEqual(f(self.db_conn, date_range, date_col)[0], expected)
+ # check that inverted interval displays nothing
+ date_range = [dates[-1], dates[0]]
+ self.assertEqual(f(self.db_conn, date_range, date_col)[0], [])
+ # check that "today" is interpreted, and single-element interval
+ today_date = datetime.now().strftime(DATE_FORMAT)
+ kwargs_with_date['date'] = today_date
+ obj_today = self.checked_class(**kwargs_with_date)
+ obj_today.save(self.db_conn)
+ date_range = ['today', 'today']
+ items, start, end = f(self.db_conn, date_range, date_col)
+ self.assertEqual(start, today_date)
+ self.assertEqual(start, end)
+ self.assertEqual(items, [obj_today])
+
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_saving_versioned_attributes(self,
+ owner: Any,
+ attr_name: str,
+ attr: VersionedAttribute,
+ _: str | float,
+ to_set: list[str] | list[float]
+ ) -> None:
+ """Test storage and initialization of versioned attributes."""
+
+ def retrieve_attr_vals(attr: VersionedAttribute) -> list[object]:
+ attr_vals_saved: list[object] = []
+ for row in self.db_conn.row_where(attr.table_name, 'parent',
+ owner.id_):
+ attr_vals_saved += [row[2]]
+ return attr_vals_saved
+
+ attr.set(to_set[0])
+ # check that without attr.save() no rows in DB
+ rows = self.db_conn.row_where(attr.table_name, 'parent', owner.id_)
+ self.assertEqual([], rows)
+ # fail saving attributes on non-saved owner
+ with self.assertRaises(NotFoundException):
+ attr.save(self.db_conn)
+ # check owner.save() created entries as expected in attr table
+ owner.save(self.db_conn)
+ attr_vals_saved = retrieve_attr_vals(attr)
+ self.assertEqual([to_set[0]], attr_vals_saved)
+ # check changing attr val without save affects owner in memory …
+ attr.set(to_set[1])
+ cmp_attr = getattr(owner, attr_name)
+ self.assertEqual(to_set, list(cmp_attr.history.values()))
+ self.assertEqual(cmp_attr.history, attr.history)
+ # … but does not yet affect DB
+ attr_vals_saved = retrieve_attr_vals(attr)
+ self.assertEqual([to_set[0]], attr_vals_saved)
+ # check individual attr.save also stores new val to DB
+ attr.save(self.db_conn)
+ attr_vals_saved = retrieve_attr_vals(attr)
+ self.assertEqual(to_set, attr_vals_saved)
+
+ @TestCaseAugmented._run_if_checked_class
def test_saving_and_caching(self) -> None:
"""Test effects of .cache() and .save()."""
id1 = self.default_ids[0]
# check failure to cache without ID (if None-ID input possible)
if isinstance(id1, int):
- obj0 = self.checked_class(None, **self.default_init_kwargs)
+ obj0 = self._make_from_defaults(None)
with self.assertRaises(HandledException):
obj0.cache()
# check mere object init itself doesn't even store in cache
- obj1 = self.checked_class(id1, **self.default_init_kwargs)
+ obj1 = self._make_from_defaults(id1)
self.assertEqual(self.checked_class.get_cache(), {})
# check .cache() fills cache, but not DB
obj1.cache()
self.assertEqual(self.checked_class.get_cache(), {id1: obj1})
- db_found = self._load_from_db(id1)
- self.assertEqual(db_found, [])
+ found_in_db = self._load_from_db(id1)
+ self.assertEqual(found_in_db, [])
# check .save() sets ID (for int IDs), updates cache, and fills DB
# (expect ID to be set to id1, despite obj1 already having that as ID:
# it's generated by cursor.lastrowid on the DB table, and with obj1
# not written there, obj2 should get it first!)
id_input = None if isinstance(id1, int) else id1
- obj2 = self.checked_class(id_input, **self.default_init_kwargs)
+ obj2 = self._make_from_defaults(id_input)
obj2.save(self.db_conn)
- obj2_hash = hash(obj2)
self.assertEqual(self.checked_class.get_cache(), {id1: obj2})
- db_found += self._load_from_db(id1)
- self.assertEqual([hash(o) for o in db_found], [obj2_hash])
+ # NB: we'll only compare hashes because obj2 itself disappears on
+ # .from_table_row-trioggered database reload
+ obj2_hash = hash(obj2)
+ found_in_db += self._load_from_db(id1)
+ self.assertEqual([hash(o) for o in found_in_db], [obj2_hash])
# check we cannot overwrite obj2 with obj1 despite its same ID,
# since it has disappeared now
with self.assertRaises(HandledException):
obj1.save(self.db_conn)
- @_within_checked_class
+ @TestCaseAugmented._run_if_checked_class
def test_by_id(self) -> None:
"""Test .by_id()."""
id1, id2, _ = self.default_ids
# check failure if not yet saved
- obj1 = self.checked_class(id1, **self.default_init_kwargs)
+ obj1 = self._make_from_defaults(id1)
with self.assertRaises(NotFoundException):
self.checked_class.by_id(self.db_conn, id1)
# check identity of cached and retrieved
obj1.cache()
self.assertEqual(obj1, self.checked_class.by_id(self.db_conn, id1))
# check identity of saved and retrieved
- obj2 = self.checked_class(id2, **self.default_init_kwargs)
+ obj2 = self._make_from_defaults(id2)
obj2.save(self.db_conn)
self.assertEqual(obj2, self.checked_class.by_id(self.db_conn, id2))
- @_within_checked_class
+ @TestCaseAugmented._run_if_checked_class
def test_by_id_or_create(self) -> None:
"""Test .by_id_or_create."""
- # check .by_id_or_create acts like normal instantiation (sans saving)
- id_ = self.default_ids[0]
+ # check .by_id_or_create fails if wrong class
if not self.checked_class.can_create_by_id:
with self.assertRaises(HandledException):
- self.checked_class.by_id_or_create(self.db_conn, id_)
- # check .by_id_or_create fails if wrong class
- else:
- by_id_created = self.checked_class.by_id_or_create(self.db_conn,
- id_)
- with self.assertRaises(NotFoundException):
- self.checked_class.by_id(self.db_conn, id_)
- self.assertEqual(self.checked_class(id_), by_id_created)
+ self.checked_class.by_id_or_create(self.db_conn, None)
+ return
+ # check ID input of None creates, on saving, ID=1,2,… for int IDs
+ if isinstance(self.default_ids[0], int):
+ for n in range(2):
+ item = self.checked_class.by_id_or_create(self.db_conn, None)
+ self.assertEqual(item.id_, None)
+ item.save(self.db_conn)
+ self.assertEqual(item.id_, n+1)
+ # check .by_id_or_create acts like normal instantiation (sans saving)
+ id_ = self.default_ids[2]
+ item = self.checked_class.by_id_or_create(self.db_conn, id_)
+ self.assertEqual(item.id_, id_)
+ with self.assertRaises(NotFoundException):
+ self.checked_class.by_id(self.db_conn, item.id_)
+ self.assertEqual(self.checked_class(item.id_), item)
- @_within_checked_class
+ @TestCaseAugmented._run_if_checked_class
def test_from_table_row(self) -> None:
"""Test .from_table_row() properly reads in class directly from DB."""
id_ = self.default_ids[0]
- obj = self.checked_class(id_, **self.default_init_kwargs)
+ obj = self._make_from_defaults(id_)
obj.save(self.db_conn)
- assert isinstance(obj.id_, type(self.default_ids[0]))
+ assert isinstance(obj.id_, type(id_))
for row in self.db_conn.row_where(self.checked_class.table_name,
'id', obj.id_):
# check .from_table_row reproduces state saved, no matter if obj
# later changed (with caching even)
+ # NB: we'll only compare hashes because obj itself disappears on
+ # .from_table_row-triggered database reload
hash_original = hash(obj)
- attr_name = self.checked_class.to_save[-1]
- attr = getattr(obj, attr_name)
- if isinstance(attr, (int, float)):
- setattr(obj, attr_name, attr + 1)
- elif isinstance(attr, str):
- setattr(obj, attr_name, attr + "_")
- elif isinstance(attr, bool):
- setattr(obj, attr_name, not attr)
+ attr_name = self._change_obj(obj)
obj.cache()
to_cmp = getattr(obj, attr_name)
retrieved = self.checked_class.from_table_row(self.db_conn, row)
self.assertEqual({retrieved.id_: retrieved},
self.checked_class.get_cache())
- def check_versioned_from_table_row(self, attr_name: str,
- type_: type) -> None:
- """Test .from_table_row() reads versioned attributes from DB."""
- owner = self.checked_class(None)
- vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1]
- attr = getattr(owner, attr_name)
- attr.set(vals[0])
- attr.set(vals[1])
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_versioned_history_from_row(self,
+ owner: Any,
+ _: str,
+ attr: VersionedAttribute,
+ default: str | float,
+ to_set: list[str] | list[float]
+ ) -> None:
+ """"Test VersionedAttribute.history_from_row() knows its DB rows."""
+ attr.set(to_set[0])
+ attr.set(to_set[1])
owner.save(self.db_conn)
+ # make empty VersionedAttribute, fill from rows, compare to owner's
for row in self.db_conn.row_where(owner.table_name, 'id', owner.id_):
- retrieved = owner.__class__.from_table_row(self.db_conn, row)
- attr = getattr(retrieved, attr_name)
- self.assertEqual(sorted(attr.history.values()), vals)
-
- @_within_checked_class
+ loaded_attr = VersionedAttribute(owner, attr.table_name, default)
+ for row in self.db_conn.row_where(attr.table_name, 'parent',
+ owner.id_):
+ loaded_attr.history_from_row(row)
+ self.assertEqual(len(attr.history.keys()),
+ len(loaded_attr.history.keys()))
+ for timestamp, value in attr.history.items():
+ self.assertEqual(value, loaded_attr.history[timestamp])
+
+ @TestCaseAugmented._run_if_checked_class
def test_all(self) -> None:
"""Test .all() and its relation to cache and savings."""
- id_1, id_2, id_3 = self.default_ids
- item1 = self.checked_class(id_1, **self.default_init_kwargs)
- item2 = self.checked_class(id_2, **self.default_init_kwargs)
- item3 = self.checked_class(id_3, **self.default_init_kwargs)
+ id1, id2, id3 = self.default_ids
+ item1 = self._make_from_defaults(id1)
+ item2 = self._make_from_defaults(id2)
+ item3 = self._make_from_defaults(id3)
# check .all() returns empty list on un-cached items
self.assertEqual(self.checked_class.all(self.db_conn), [])
# check that all() shows only cached/saved items
self.assertEqual(sorted(self.checked_class.all(self.db_conn)),
sorted([item1, item2, item3]))
- @_within_checked_class
+ @TestCaseAugmented._run_if_checked_class
def test_singularity(self) -> None:
"""Test pointers made for single object keep pointing to it."""
id1 = self.default_ids[0]
- obj = self.checked_class(id1, **self.default_init_kwargs)
+ obj = self._make_from_defaults(id1)
obj.save(self.db_conn)
- attr_name = self.checked_class.to_save[-1]
- attr = getattr(obj, attr_name)
- new_attr: str | int | float | bool
- if isinstance(attr, (int, float)):
- new_attr = attr + 1
- elif isinstance(attr, str):
- new_attr = attr + '_'
- elif isinstance(attr, bool):
- new_attr = not attr
- setattr(obj, attr_name, new_attr)
+ # change object, expect retrieved through .by_id to carry change
+ attr_name = self._change_obj(obj)
+ new_attr = getattr(obj, attr_name)
retrieved = self.checked_class.by_id(self.db_conn, id1)
self.assertEqual(new_attr, getattr(retrieved, attr_name))
- def check_versioned_singularity(self) -> None:
- """Test singularity of VersionedAttributes on saving (with .title)."""
- obj = self.checked_class(None) # pylint: disable=not-callable
- obj.save(self.db_conn)
- assert isinstance(obj.id_, int)
- obj.title.set('named')
- retrieved = self.checked_class.by_id(self.db_conn, obj.id_)
- self.assertEqual(obj.title.history, retrieved.title.history)
-
- def check_remove(self, *args: Any) -> None:
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_versioned_singularity(self,
+ owner: Any,
+ attr_name: str,
+ attr: VersionedAttribute,
+ _: str | float,
+ to_set: list[str] | list[float]
+ ) -> None:
+ """Test singularity of VersionedAttributes on saving."""
+ owner.save(self.db_conn)
+ # change obj, expect retrieved through .by_id to carry change
+ attr.set(to_set[0])
+ retrieved = self.checked_class.by_id(self.db_conn, owner.id_)
+ attr_retrieved = getattr(retrieved, attr_name)
+ self.assertEqual(attr.history, attr_retrieved.history)
+
+ @TestCaseAugmented._run_if_checked_class
+ def test_remove(self) -> None:
"""Test .remove() effects on DB and cache."""
id_ = self.default_ids[0]
- obj = self.checked_class(id_, *args) # pylint: disable=not-callable
+ obj = self._make_from_defaults(id_)
+ # check removal only works after saving
with self.assertRaises(HandledException):
obj.remove(self.db_conn)
obj.save(self.db_conn)
obj.remove(self.db_conn)
+ # check access to obj fails after removal
+ with self.assertRaises(HandledException):
+ print(obj.id_)
+ # check DB and cache now empty
self.check_identity_with_cache_and_db([])
self.server_thread.start()
self.conn = HTTPConnection(str(self.httpd.server_address[0]),
self.httpd.server_address[1])
- self.httpd.set_json_mode()
+ self.httpd.render_mode = 'json'
def tearDown(self) -> None:
self.httpd.shutdown()
self.server_thread.join()
super().tearDown()
+ @staticmethod
+ def as_id_list(items: list[dict[str, object]]) -> list[int | str]:
+ """Return list of only 'id' fields of items."""
+ # NB: To tighten the mypy test, consider to, instead of returning
+ # list[str | int], returnlist[int] | list[str]. But since so far to me
+ # the only way to make that work seems to be to repaclement of the
+ # currently active last line with complexity of the out-commented code
+ # block beneath, I currently opt for the status quo.
+ id_list = []
+ for item in items:
+ assert isinstance(item['id'], (int, str))
+ id_list += [item['id']]
+ return id_list
+ # if id_list:
+ # if isinstance(id_list[0], int):
+ # for id_ in id_list:
+ # assert isinstance(id_, int)
+ # l_int: list[int] = [id_ for id_ in id_list
+ # if isinstance(id_, int)]
+ # return l_int
+ # for id_ in id_list:
+ # assert isinstance(id_, str)
+ # l_str: list[str] = [id_ for id_ in id_list
+ # if isinstance(id_, str)]
+ # return l_str
+ # return []
+
+ @staticmethod
+ def as_refs(items: list[dict[str, object]]
+ ) -> dict[str, dict[str, object]]:
+ """Return dictionary of items by their 'id' fields."""
+ refs = {}
+ for item in items:
+ refs[str(item['id'])] = item
+ return refs
+
+ @staticmethod
+ def cond_as_dict(id_: int = 1,
+ is_active: bool = False,
+ titles: None | list[str] = None,
+ descriptions: None | list[str] = None
+ ) -> dict[str, object]:
+ """Return JSON of Condition to expect."""
+ d = {'id': id_,
+ 'is_active': is_active,
+ '_versioned': {
+ 'title': {},
+ 'description': {}}}
+ titles = titles if titles else []
+ descriptions = descriptions if descriptions else []
+ assert isinstance(d['_versioned'], dict)
+ for i, title in enumerate(titles):
+ d['_versioned']['title'][i] = title
+ for i, description in enumerate(descriptions):
+ d['_versioned']['description'][i] = description
+ return d
+
+ @staticmethod
+ def procstep_as_dict(id_: int,
+ owner_id: int,
+ step_process_id: int,
+ parent_step_id: int | None = None
+ ) -> dict[str, object]:
+ """Return JSON of Process to expect."""
+ return {'id': id_,
+ 'owner_id': owner_id,
+ 'step_process_id': step_process_id,
+ 'parent_step_id': parent_step_id}
+
+ @staticmethod
+ def todo_as_dict(id_: int = 1,
+ process_id: int = 1,
+ date: str = '2024-01-01',
+ conditions: None | list[int] = None,
+ disables: None | list[int] = None,
+ blockers: None | list[int] = None,
+ enables: None | list[int] = None,
+ calendarize: bool = False,
+ comment: str = '',
+ is_done: bool = False,
+ effort: float | None = None,
+ children: list[int] | None = None,
+ parents: list[int] | None = None,
+ ) -> dict[str, object]:
+ """Return JSON of Todo to expect."""
+ # pylint: disable=too-many-arguments
+ d = {'id': id_,
+ 'date': date,
+ 'process_id': process_id,
+ 'is_done': is_done,
+ 'calendarize': calendarize,
+ 'comment': comment,
+ 'children': children if children else [],
+ 'parents': parents if parents else [],
+ 'effort': effort,
+ 'conditions': conditions if conditions else [],
+ 'disables': disables if disables else [],
+ 'blockers': blockers if blockers else [],
+ 'enables': enables if enables else []}
+ return d
+
+ @staticmethod
+ def proc_as_dict(id_: int = 1,
+ title: str = 'A',
+ description: str = '',
+ effort: float = 1.0,
+ conditions: None | list[int] = None,
+ disables: None | list[int] = None,
+ blockers: None | list[int] = None,
+ enables: None | list[int] = None,
+ explicit_steps: None | list[int] = None
+ ) -> dict[str, object]:
+ """Return JSON of Process to expect."""
+ # pylint: disable=too-many-arguments
+ d = {'id': id_,
+ 'calendarize': False,
+ 'suppressed_steps': [],
+ 'explicit_steps': explicit_steps if explicit_steps else [],
+ '_versioned': {
+ 'title': {0: title},
+ 'description': {0: description},
+ 'effort': {0: effort}},
+ 'conditions': conditions if conditions else [],
+ 'disables': disables if disables else [],
+ 'enables': enables if enables else [],
+ 'blockers': blockers if blockers else []}
+ return d
+
def check_redirect(self, target: str) -> None:
"""Check that self.conn answers with a 302 redirect to target."""
response = self.conn.getresponse()
self.assertEqual(self.conn.getresponse().status, expected_code)
def check_post(self, data: Mapping[str, object], target: str,
- expected_code: int, redirect_location: str = '') -> None:
+ expected_code: int = 302, redir: str = '') -> None:
"""Check that POST of data to target yields expected_code."""
encoded_form_data = urlencode(data, doseq=True).encode('utf-8')
headers = {'Content-Type': 'application/x-www-form-urlencoded',
self.conn.request('POST', target,
body=encoded_form_data, headers=headers)
if 302 == expected_code:
- if redirect_location == '':
- redirect_location = target
- self.check_redirect(redirect_location)
+ redir = target if redir == '' else redir
+ self.check_redirect(redir)
else:
self.assertEqual(self.conn.getresponse().status, expected_code)
"""POST basic Process."""
if not form_data:
form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1}
- self.check_post(form_data, f'/process?id={id_}', 302,
- f'/process?id={id_}')
+ self.check_post(form_data, f'/process?id={id_}',
+ redir=f'/process?id={id_}')
return form_data
def check_json_get(self, path: str, expected: dict[str, object]) -> None:
timestamp keys of VersionedAttribute history keys into integers
counting chronologically forward from 0.
"""
+
def rewrite_history_keys_in(item: Any) -> Any:
if isinstance(item, dict):
if '_versioned' in item.keys():
elif isinstance(item, list):
item[:] = [rewrite_history_keys_in(i) for i in item]
return item
+
self.conn.request('GET', path)
response = self.conn.getresponse()
self.assertEqual(response.status, 200)
retrieved = json_loads(response.read().decode())
rewrite_history_keys_in(retrieved)
+ # import pprint
+ # pprint.pprint(expected)
+ # pprint.pprint(retrieved)
self.assertEqual(expected, retrieved)