"""Attributes whose values are recorded as a timestamped history."""
from datetime import datetime
from typing import Any
+from sqlite3 import Row
+from time import sleep
from plomtask.db import DatabaseConnection
+TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
+
class VersionedAttribute:
"""Attributes whose values are recorded as a timestamped history."""
return self.history[self._newest_timestamp]
def set(self, value: str | float) -> None:
- """Add to self.history if and only if not same value as newest one."""
+ """Add to self.history if and only if not same value as newest one.
+
+ Note that we wait one micro-second, as timestamp comparison to check
+ most recent elements only goes up to that precision.
+
+ Also note that we don't check against .newest because that may make us
+ compare value against .default even if not set. We want to be able to
+ explicitly set .default as the first element.
+ """
+ sleep(0.00001)
if 0 == len(self.history) \
or value != self.history[self._newest_timestamp]:
- self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
+ self.history[datetime.now().strftime(TIMESTAMP_FMT)] = value
+
+ def history_from_row(self, row: Row) -> None:
+ """Extend self.history from expected table row format."""
+ self.history[row[1]] = row[2]
def at(self, queried_time: str) -> str | float:
"""Retrieve value of timestamp nearest queried_time from the past."""
def save(self, db_conn: DatabaseConnection) -> None:
"""Save as self.history entries, but first wipe old ones."""
- db_conn.exec(f'DELETE FROM {self.table_name} WHERE parent = ?',
- (self.parent.id_,))
- for timestamp, value in self.history.items():
- db_conn.exec(f'INSERT INTO {self.table_name} VALUES (?, ?, ?)',
- (self.parent.id_, timestamp, value))
+ db_conn.rewrite_relations(self.table_name, 'parent', self.parent.id_,
+ [[item[0], item[1]]
+ for item in self.history.items()])