From: Christian Heller Date: Wed, 1 May 2024 22:29:11 +0000 (+0200) Subject: Use higher resolution for VersionedAttribute.history timestamps, avoid conflicts... X-Git-Url: https://plomlompom.com/repos/%7B%7B%20web_path%20%7D%7D/%7B%7Bdb.prefix%7D%7D/error?a=commitdiff_plain;h=908debd6dc3a56a4e39617e35d479f5683017433;p=plomtask Use higher resolution for VersionedAttribute.history timestamps, avoid conflicts by waiting that resolution for each new .set(). --- diff --git a/plomtask/misc.py b/plomtask/misc.py index 5759c0d..2c46c1c 100644 --- a/plomtask/misc.py +++ b/plomtask/misc.py @@ -3,6 +3,9 @@ from datetime import datetime from typing import Any from sqlite3 import Row from plomtask.db import DatabaseConnection +from time import sleep + +TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f' class VersionedAttribute: @@ -28,10 +31,19 @@ class VersionedAttribute: return self.history[self._newest_timestamp] def set(self, value: str | float) -> None: - """Add to self.history if and only if not same value as newest one.""" + """Add to self.history if and only if not same value as newest one. + + Note that we wait one micro-second, as timestamp comparison to check + most recent elements only goes up to that precision. + + Also note that we don't check against .newest because that may make us + compare value against .default even if not set. We want to be able to + explicitly set .default as the first element. + """ + sleep(0.00001) if 0 == len(self.history) \ or value != self.history[self._newest_timestamp]: - self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value + self.history[datetime.now().strftime(TIMESTAMP_FMT)] = value def history_from_row(self, row: Row) -> None: """Extend self.history from expected table row format.""" diff --git a/tests/versioned_attributes.py b/tests/versioned_attributes.py new file mode 100644 index 0000000..df6fc46 --- /dev/null +++ b/tests/versioned_attributes.py @@ -0,0 +1,125 @@ +""""Test Versioned Attributes in the abstract.""" +from unittest import TestCase +from tests.utils import TestCaseWithDB +from time import sleep +from datetime import datetime +from plomtask.misc import VersionedAttribute, TIMESTAMP_FMT +from plomtask.db import BaseModel + +SQL_TEST_TABLE = ''' +CREATE TABLE versioned_tests ( + parent INTEGER NOT NULL, + timestamp TEXT NOT NULL, + value TEXT NOT NULL, + PRIMARY KEY (parent, timestamp) +); +''' +class TestParentType(BaseModel[int]): + pass + + +class TestsSansDB(TestCase): + """Tests not requiring DB setup.""" + + def test_VersionedAttribute_set(self) -> None: + """Test .set() behaves as expected.""" + # check value gets set even if already is the default + attr = VersionedAttribute(None, '', 'A') + attr.set('A') + self.assertEqual(list(attr.history.values()), ['A']) + # check same value does not get set twice in a row, + # and that not even its timestamp get updated + timestamp = list(attr.history.keys())[0] + attr.set('A') + self.assertEqual(list(attr.history.values()), ['A']) + self.assertEqual(list(attr.history.keys())[0], timestamp) + # check that different value _will_ be set/added + attr.set('B') + self.assertEqual(sorted(attr.history.values()), ['A', 'B']) + # check that a previously used value can be set if not most recent + attr.set('A') + self.assertEqual(sorted(attr.history.values()), ['A', 'A', 'B']) + # again check for same value not being set twice in a row, even for + # later items + attr.set('D') + self.assertEqual(sorted(attr.history.values()), ['A', 'A', 'B', 'D']) + attr.set('D') + self.assertEqual(sorted(attr.history.values()), ['A', 'A', 'B', 'D']) + + def test_VersionedAttribute_newest(self) -> None: + """Test .newest returns newest element, or default on empty.""" + attr = VersionedAttribute(None, '', 'A') + self.assertEqual(attr.newest, 'A') + attr.set('B') + self.assertEqual(attr.newest, 'B') + attr.set('C') + + def test_VersionedAttribute_at(self) -> None: + """Test .at() returns values nearest to queried time, or default.""" + # check .at() return default on empty history + attr = VersionedAttribute(None, '', 'A') + timestamp_A = datetime.now().strftime(TIMESTAMP_FMT) + self.assertEqual(attr.at(timestamp_A), 'A') + # check value exactly at timestamp returned + attr.set('B') + timestamp_B = list(attr.history.keys())[0] + self.assertEqual(attr.at(timestamp_B), 'B') + # check earliest value returned if exists, rather than default + self.assertEqual(attr.at(timestamp_A), 'B') + # check reverts to previous value for timestamps not indexed + sleep(0.00001) + timestamp_between = datetime.now().strftime(TIMESTAMP_FMT) + sleep(0.00001) + attr.set('C') + timestamp_C = sorted(attr.history.keys())[-1] + self.assertEqual(attr.at(timestamp_C), 'C') + self.assertEqual(attr.at(timestamp_between), 'B') + sleep(0.00001) + timestamp_after_C = datetime.now().strftime(TIMESTAMP_FMT) + self.assertEqual(attr.at(timestamp_after_C), 'C') + + +class TestsWithDB(TestCaseWithDB): + """Module tests requiring DB setup.""" + + def setUp(self) -> None: + super().setUp() + self.db_conn.exec(SQL_TEST_TABLE) + + def test_VersionedAttribute_save(self) -> None: + """Test .save() to write to DB.""" + test_parent = TestParentType(1) + attr = VersionedAttribute(test_parent, 'versioned_tests', 'A') + # check mere .set() calls do not by themselves reflect in the DB + attr.set('B') + self.assertEqual([], + self.db_conn.row_where('versioned_tests', 'parent', 1)) + # check .save() makes history appear in DB + attr.save(self.db_conn) + vals_found = [] + for row in self.db_conn.row_where('versioned_tests', 'parent', 1): + vals_found += [row[2]] + self.assertEqual(['B'], vals_found) + # check .save() also updates history in DB + attr.set('C') + attr.save(self.db_conn) + vals_found = [] + for row in self.db_conn.row_where('versioned_tests', 'parent', 1): + vals_found += [row[2]] + self.assertEqual(['B', 'C'], sorted(vals_found)) + + def test_VersionedAttribute_history_from_row(self) -> None: + """"Test .history_from_row() properly interprets DB rows.""" + test_parent = TestParentType(1) + attr = VersionedAttribute(test_parent, 'versioned_tests', 'A') + attr.set('B') + attr.set('C') + attr.save(self.db_conn) + loaded_attr = VersionedAttribute(test_parent, 'versioned_tests', 'A') + for row in self.db_conn.row_where('versioned_tests', 'parent', 1): + loaded_attr.history_from_row(row) + for timestamp, value in attr.history.items(): + self.assertEqual(value, loaded_attr.history[timestamp]) + self.assertEqual(len(attr.history.keys()), + len(loaded_attr.history.keys())) +