1 """Attributes whose values are recorded as a timestamped history."""
2 from datetime import datetime
4 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection
7 from plomtask.exceptions import (HandledException, BadFormatException,
10 TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
13 class VersionedAttribute:
14 """Attributes whose values are recorded as a timestamped history."""
17 parent: Any, table_name: str, default: str | float) -> None:
19 self.table_name = table_name
20 self.default = default
21 self.history: dict[str, str | float] = {}
23 def __hash__(self) -> int:
24 history_tuples = tuple((k, v) for k, v in self.history.items())
25 hashable = (self.parent.id_, self.table_name, self.default,
30 def _newest_timestamp(self) -> str:
31 """Return most recent timestamp."""
32 return sorted(self.history.keys())[-1]
35 def newest(self) -> str | float:
36 """Return most recent value, or self.default if self.history empty."""
37 if 0 == len(self.history):
39 return self.history[self._newest_timestamp]
41 def reset_timestamp(self, old_str: str, new_str: str) -> None:
42 """Rename self.history key (timestamp) old to new.
44 Chronological sequence of keys must be preserved, i.e. cannot move
45 key before earlier or after later timestamp.
48 new = datetime.strptime(new_str, TIMESTAMP_FMT)
49 old = datetime.strptime(old_str, TIMESTAMP_FMT)
50 except ValueError as exc:
51 raise BadFormatException('Timestamp of illegal format.') from exc
52 timestamps = list(self.history.keys())
53 if old_str not in timestamps:
54 raise HandledException(f'Timestamp {old} not found in history.')
55 sorted_timestamps = sorted([datetime.strptime(t, TIMESTAMP_FMT)
57 expected_position = sorted_timestamps.index(old)
58 sorted_timestamps.remove(old)
59 sorted_timestamps += [new]
60 sorted_timestamps.sort()
61 if sorted_timestamps.index(new) != expected_position:
62 raise HandledException('Timestamp not respecting chronology.')
63 value = self.history[old_str]
64 del self.history[old_str]
65 self.history[new_str] = value
67 def set(self, value: str | float) -> None:
68 """Add to self.history if and only if not same value as newest one.
70 Note that we wait one micro-second, as timestamp comparison to check
71 most recent elements only goes up to that precision.
73 Also note that we don't check against .newest because that may make us
74 compare value against .default even if not set. We want to be able to
75 explicitly set .default as the first element.
78 if 0 == len(self.history) \
79 or value != self.history[self._newest_timestamp]:
80 self.history[datetime.now().strftime(TIMESTAMP_FMT)] = value
82 def history_from_row(self, row: Row) -> None:
83 """Extend self.history from expected table row format."""
84 self.history[row[1]] = row[2]
86 def at(self, queried_time: str) -> str | float:
87 """Retrieve value of timestamp nearest queried_time from the past."""
88 if len(queried_time) == 10:
89 queried_time += ' 23:59:59.999'
90 sorted_timestamps = sorted(self.history.keys())
91 if 0 == len(sorted_timestamps):
93 selected_timestamp = sorted_timestamps[0]
94 for timestamp in sorted_timestamps[1:]:
95 if timestamp > queried_time:
97 selected_timestamp = timestamp
98 return self.history[selected_timestamp]
100 def save(self, db_conn: DatabaseConnection) -> None:
101 """Save as self.history entries, but first wipe old ones."""
102 if self.parent.id_ is None:
103 raise NotFoundException('cannot save attribute to parent if no ID')
104 db_conn.rewrite_relations(self.table_name, 'parent', self.parent.id_,
106 for item in self.history.items()])
108 def remove(self, db_conn: DatabaseConnection) -> None:
109 """Remove from DB."""
110 db_conn.delete_where(self.table_name, 'parent', self.parent.id_)