from sqlite3 import Row
from time import sleep
from plomtask.db import DatabaseConnection
-from plomtask.exceptions import HandledException, BadFormatException
+from plomtask.exceptions import (HandledException, BadFormatException,
+ NotFoundException)
TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
parent: Any, table_name: str, default: str | float) -> None:
self.parent = parent
self.table_name = table_name
- self.default = default
+ self._default = default
self.history: dict[str, str | float] = {}
def __hash__(self) -> int:
history_tuples = tuple((k, v) for k, v in self.history.items())
- hashable = (self.parent.id_, self.table_name, self.default,
+ hashable = (self.parent.id_, self.table_name, self._default,
history_tuples)
return hash(hashable)
- @property
- def as_dict(self) -> dict[str, object]:
- """Return self as (json.dumps-coompatible) dict."""
- d = {'parent_id': self.parent.id_, 'history': self.history}
- return d
-
@property
def _newest_timestamp(self) -> str:
"""Return most recent timestamp."""
return sorted(self.history.keys())[-1]
+ @property
+ def value_type_name(self) -> str:
+ """Return string of name of attribute value type."""
+ return type(self._default).__name__
+
@property
def newest(self) -> str | float:
- """Return most recent value, or self.default if self.history empty."""
+ """Return most recent value, or self._default if self.history empty."""
if 0 == len(self.history):
- return self.default
+ return self._default
return self.history[self._newest_timestamp]
def reset_timestamp(self, old_str: str, new_str: str) -> None:
queried_time += ' 23:59:59.999'
sorted_timestamps = sorted(self.history.keys())
if 0 == len(sorted_timestamps):
- return self.default
+ return self._default
selected_timestamp = sorted_timestamps[0]
for timestamp in sorted_timestamps[1:]:
if timestamp > queried_time:
def save(self, db_conn: DatabaseConnection) -> None:
"""Save as self.history entries, but first wipe old ones."""
+ if self.parent.id_ is None:
+ raise NotFoundException('cannot save attribute to parent if no ID')
db_conn.rewrite_relations(self.table_name, 'parent', self.parent.id_,
[[item[0], item[1]]
for item in self.history.items()])