+ def _load_from_db(self, id_: int | str) -> list[object]:
+ db_found: list[object] = []
+ for row in self.db_conn.row_where(self.checked_class.table_name,
+ 'id', id_):
+ db_found += [self.checked_class.from_table_row(self.db_conn,
+ row)]
+ return db_found
+
+ def _change_obj(self, obj: object) -> str:
+ attr_name: str = self.checked_class.to_save_simples[-1]
+ attr = getattr(obj, attr_name)
+ new_attr: str | int | float | bool
+ if isinstance(attr, (int, float)):
+ new_attr = attr + 1
+ elif isinstance(attr, str):
+ new_attr = attr + '_'
+ elif isinstance(attr, bool):
+ new_attr = not attr
+ setattr(obj, attr_name, new_attr)
+ return attr_name
+
+ def check_identity_with_cache_and_db(self, content: list[Any]) -> None:
+ """Test both cache and DB equal content."""
+ expected_cache = {}
+ for item in content:
+ expected_cache[item.id_] = item
+ self.assertEqual(self.checked_class.get_cache(), expected_cache)
+ hashes_content = [hash(x) for x in content]
+ db_found: list[Any] = []
+ for item in content:
+ assert isinstance(item.id_, type(self.default_ids[0]))
+ db_found += self._load_from_db(item.id_)
+ hashes_db_found = [hash(x) for x in db_found]
+ self.assertEqual(sorted(hashes_content), sorted(hashes_db_found))
+
+ def check_by_date_range_with_limits(self,
+ date_col: str,
+ set_id_field: bool = True
+ ) -> None:
+ """Test .by_date_range_with_limits."""
+ # pylint: disable=too-many-locals
+ f = self.checked_class.by_date_range_with_limits
+ # check illegal ranges
+ legal_range = ('yesterday', 'tomorrow')
+ for i in [0, 1]:
+ for bad_date in ['foo', '2024-02-30', '2024-01-01 12:00:00']:
+ date_range = list(legal_range[:])
+ date_range[i] = bad_date
+ with self.assertRaises(HandledException):
+ f(self.db_conn, date_range, date_col)
+ # check empty, translation of 'yesterday' and 'tomorrow'
+ items, start, end = f(self.db_conn, legal_range, date_col)
+ self.assertEqual(items, [])
+ yesterday = datetime.now() + timedelta(days=-1)
+ tomorrow = datetime.now() + timedelta(days=+1)
+ self.assertEqual(start, yesterday.strftime(DATE_FORMAT))
+ self.assertEqual(end, tomorrow.strftime(DATE_FORMAT))
+ # prepare dated items for non-empty results
+ kwargs_with_date = self.default_init_kwargs.copy()
+ if set_id_field:
+ kwargs_with_date['id_'] = None
+ objs = []
+ dates = ['2024-01-01', '2024-01-02', '2024-01-04']
+ for date in ['2024-01-01', '2024-01-02', '2024-01-04']:
+ kwargs_with_date['date'] = date
+ obj = self.checked_class(**kwargs_with_date)
+ objs += [obj]
+ # check ranges still empty before saving
+ date_range = [dates[0], dates[-1]]
+ self.assertEqual(f(self.db_conn, date_range, date_col)[0], [])
+ # check all objs displayed within closed interval
+ for obj in objs:
+ obj.save(self.db_conn)
+ self.assertEqual(f(self.db_conn, date_range, date_col)[0], objs)
+ # check that only displayed what exists within interval
+ date_range = ['2023-12-20', '2024-01-03']
+ expected = [objs[0], objs[1]]
+ self.assertEqual(f(self.db_conn, date_range, date_col)[0], expected)
+ date_range = ['2024-01-03', '2024-01-30']
+ expected = [objs[2]]
+ self.assertEqual(f(self.db_conn, date_range, date_col)[0], expected)
+ # check that inverted interval displays nothing
+ date_range = [dates[-1], dates[0]]
+ self.assertEqual(f(self.db_conn, date_range, date_col)[0], [])
+ # check that "today" is interpreted, and single-element interval
+ today_date = datetime.now().strftime(DATE_FORMAT)
+ kwargs_with_date['date'] = today_date
+ obj_today = self.checked_class(**kwargs_with_date)
+ obj_today.save(self.db_conn)
+ date_range = ['today', 'today']
+ items, start, end = f(self.db_conn, date_range, date_col)
+ self.assertEqual(start, today_date)
+ self.assertEqual(start, end)
+ self.assertEqual(items, [obj_today])
+
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_saving_versioned_attributes(self,
+ owner: Any,
+ attr_name: str,
+ attr: VersionedAttribute,
+ _: str | float,
+ to_set: list[str] | list[float]
+ ) -> None:
+ """Test storage and initialization of versioned attributes."""
+
+ def retrieve_attr_vals(attr: VersionedAttribute) -> list[object]:
+ attr_vals_saved: list[object] = []
+ for row in self.db_conn.row_where(attr.table_name, 'parent',
+ owner.id_):
+ attr_vals_saved += [row[2]]
+ return attr_vals_saved
+
+ attr.set(to_set[0])
+ # check that without attr.save() no rows in DB
+ rows = self.db_conn.row_where(attr.table_name, 'parent', owner.id_)
+ self.assertEqual([], rows)
+ # fail saving attributes on non-saved owner
+ with self.assertRaises(NotFoundException):
+ attr.save(self.db_conn)
+ # check owner.save() created entries as expected in attr table
+ owner.save(self.db_conn)
+ attr_vals_saved = retrieve_attr_vals(attr)
+ self.assertEqual([to_set[0]], attr_vals_saved)
+ # check changing attr val without save affects owner in memory …
+ attr.set(to_set[1])
+ cmp_attr = getattr(owner, attr_name)
+ self.assertEqual(to_set, list(cmp_attr.history.values()))
+ self.assertEqual(cmp_attr.history, attr.history)
+ # … but does not yet affect DB
+ attr_vals_saved = retrieve_attr_vals(attr)
+ self.assertEqual([to_set[0]], attr_vals_saved)
+ # check individual attr.save also stores new val to DB
+ attr.save(self.db_conn)
+ attr_vals_saved = retrieve_attr_vals(attr)
+ self.assertEqual(to_set, attr_vals_saved)
+
+ @TestCaseAugmented._run_if_checked_class
+ def test_saving_and_caching(self) -> None:
+ """Test effects of .cache() and .save()."""
+ id1 = self.default_ids[0]
+ # check failure to cache without ID (if None-ID input possible)
+ if isinstance(id1, int):
+ obj0 = self._make_from_defaults(None)
+ with self.assertRaises(HandledException):
+ obj0.cache()
+ # check mere object init itself doesn't even store in cache
+ obj1 = self._make_from_defaults(id1)
+ self.assertEqual(self.checked_class.get_cache(), {})
+ # check .cache() fills cache, but not DB
+ obj1.cache()
+ self.assertEqual(self.checked_class.get_cache(), {id1: obj1})
+ found_in_db = self._load_from_db(id1)
+ self.assertEqual(found_in_db, [])
+ # check .save() sets ID (for int IDs), updates cache, and fills DB
+ # (expect ID to be set to id1, despite obj1 already having that as ID:
+ # it's generated by cursor.lastrowid on the DB table, and with obj1
+ # not written there, obj2 should get it first!)
+ id_input = None if isinstance(id1, int) else id1
+ obj2 = self._make_from_defaults(id_input)
+ obj2.save(self.db_conn)
+ self.assertEqual(self.checked_class.get_cache(), {id1: obj2})
+ # NB: we'll only compare hashes because obj2 itself disappears on
+ # .from_table_row-trioggered database reload
+ obj2_hash = hash(obj2)
+ found_in_db += self._load_from_db(id1)
+ self.assertEqual([hash(o) for o in found_in_db], [obj2_hash])
+ # check we cannot overwrite obj2 with obj1 despite its same ID,
+ # since it has disappeared now
+ with self.assertRaises(HandledException):
+ obj1.save(self.db_conn)
+
+ @TestCaseAugmented._run_if_checked_class
+ def test_by_id(self) -> None:
+ """Test .by_id()."""
+ id1, id2, _ = self.default_ids
+ # check failure if not yet saved
+ obj1 = self._make_from_defaults(id1)
+ with self.assertRaises(NotFoundException):
+ self.checked_class.by_id(self.db_conn, id1)
+ # check identity of cached and retrieved
+ obj1.cache()
+ self.assertEqual(obj1, self.checked_class.by_id(self.db_conn, id1))
+ # check identity of saved and retrieved
+ obj2 = self._make_from_defaults(id2)
+ obj2.save(self.db_conn)
+ self.assertEqual(obj2, self.checked_class.by_id(self.db_conn, id2))
+
+ @TestCaseAugmented._run_if_checked_class
+ def test_by_id_or_create(self) -> None:
+ """Test .by_id_or_create."""
+ # check .by_id_or_create fails if wrong class
+ if not self.checked_class.can_create_by_id:
+ with self.assertRaises(HandledException):
+ self.checked_class.by_id_or_create(self.db_conn, None)
+ return
+ # check ID input of None creates, on saving, ID=1,2,… for int IDs
+ if isinstance(self.default_ids[0], int):
+ for n in range(2):
+ item = self.checked_class.by_id_or_create(self.db_conn, None)
+ self.assertEqual(item.id_, None)
+ item.save(self.db_conn)
+ self.assertEqual(item.id_, n+1)
+ # check .by_id_or_create acts like normal instantiation (sans saving)
+ id_ = self.default_ids[2]
+ item = self.checked_class.by_id_or_create(self.db_conn, id_)
+ self.assertEqual(item.id_, id_)
+ with self.assertRaises(NotFoundException):
+ self.checked_class.by_id(self.db_conn, item.id_)
+ self.assertEqual(self.checked_class(item.id_), item)
+
+ @TestCaseAugmented._run_if_checked_class
+ def test_from_table_row(self) -> None:
+ """Test .from_table_row() properly reads in class directly from DB."""
+ id_ = self.default_ids[0]
+ obj = self._make_from_defaults(id_)
+ obj.save(self.db_conn)
+ assert isinstance(obj.id_, type(id_))
+ for row in self.db_conn.row_where(self.checked_class.table_name,
+ 'id', obj.id_):
+ # check .from_table_row reproduces state saved, no matter if obj
+ # later changed (with caching even)
+ # NB: we'll only compare hashes because obj itself disappears on
+ # .from_table_row-triggered database reload
+ hash_original = hash(obj)
+ attr_name = self._change_obj(obj)
+ obj.cache()
+ to_cmp = getattr(obj, attr_name)
+ retrieved = self.checked_class.from_table_row(self.db_conn, row)
+ self.assertNotEqual(to_cmp, getattr(retrieved, attr_name))
+ self.assertEqual(hash_original, hash(retrieved))
+ # check cache contains what .from_table_row just produced
+ self.assertEqual({retrieved.id_: retrieved},
+ self.checked_class.get_cache())
+
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_versioned_history_from_row(self,
+ owner: Any,
+ _: str,
+ attr: VersionedAttribute,
+ default: str | float,
+ to_set: list[str] | list[float]
+ ) -> None:
+ """"Test VersionedAttribute.history_from_row() knows its DB rows."""
+ attr.set(to_set[0])
+ attr.set(to_set[1])
+ owner.save(self.db_conn)
+ # make empty VersionedAttribute, fill from rows, compare to owner's
+ for row in self.db_conn.row_where(owner.table_name, 'id', owner.id_):
+ loaded_attr = VersionedAttribute(owner, attr.table_name, default)
+ for row in self.db_conn.row_where(attr.table_name, 'parent',
+ owner.id_):
+ loaded_attr.history_from_row(row)
+ self.assertEqual(len(attr.history.keys()),
+ len(loaded_attr.history.keys()))
+ for timestamp, value in attr.history.items():
+ self.assertEqual(value, loaded_attr.history[timestamp])
+
+ @TestCaseAugmented._run_if_checked_class
+ def test_all(self) -> None:
+ """Test .all() and its relation to cache and savings."""
+ id1, id2, id3 = self.default_ids
+ item1 = self._make_from_defaults(id1)
+ item2 = self._make_from_defaults(id2)
+ item3 = self._make_from_defaults(id3)
+ # check .all() returns empty list on un-cached items
+ self.assertEqual(self.checked_class.all(self.db_conn), [])
+ # check that all() shows only cached/saved items
+ item1.cache()
+ item3.save(self.db_conn)
+ self.assertEqual(sorted(self.checked_class.all(self.db_conn)),
+ sorted([item1, item3]))
+ item2.save(self.db_conn)
+ self.assertEqual(sorted(self.checked_class.all(self.db_conn)),
+ sorted([item1, item2, item3]))
+
+ @TestCaseAugmented._run_if_checked_class
+ def test_singularity(self) -> None:
+ """Test pointers made for single object keep pointing to it."""
+ id1 = self.default_ids[0]
+ obj = self._make_from_defaults(id1)
+ obj.save(self.db_conn)
+ # change object, expect retrieved through .by_id to carry change
+ attr_name = self._change_obj(obj)
+ new_attr = getattr(obj, attr_name)
+ retrieved = self.checked_class.by_id(self.db_conn, id1)
+ self.assertEqual(new_attr, getattr(retrieved, attr_name))
+
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_versioned_singularity(self,
+ owner: Any,
+ attr_name: str,
+ attr: VersionedAttribute,
+ _: str | float,
+ to_set: list[str] | list[float]
+ ) -> None:
+ """Test singularity of VersionedAttributes on saving."""
+ owner.save(self.db_conn)
+ # change obj, expect retrieved through .by_id to carry change
+ attr.set(to_set[0])
+ retrieved = self.checked_class.by_id(self.db_conn, owner.id_)
+ attr_retrieved = getattr(retrieved, attr_name)
+ self.assertEqual(attr.history, attr_retrieved.history)
+
+ @TestCaseAugmented._run_if_checked_class
+ def test_remove(self) -> None:
+ """Test .remove() effects on DB and cache."""
+ id_ = self.default_ids[0]
+ obj = self._make_from_defaults(id_)
+ # check removal only works after saving
+ with self.assertRaises(HandledException):
+ obj.remove(self.db_conn)
+ obj.save(self.db_conn)
+ obj.remove(self.db_conn)
+ # check access to obj fails after removal
+ with self.assertRaises(HandledException):
+ print(obj.id_)
+ # check DB and cache now empty
+ self.check_identity_with_cache_and_db([])
+