1 """Attributes whose values are recorded as a timestamped history."""
2 from datetime import datetime
4 from sqlite3 import Row
6 from plomtask.db import DatabaseConnection
7 from plomtask.exceptions import (HandledException, BadFormatException,
10 TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
13 class VersionedAttribute:
14 """Attributes whose values are recorded as a timestamped history."""
17 parent: Any, table_name: str, default: str | float) -> None:
19 self.table_name = table_name
20 self._default = default
21 self.history: dict[str, str | float] = {}
22 # NB: For tighter mypy testing, we might prefer self.history to be
23 # dict[str, float] | dict[str, str] instead, but my current coding
24 # knowledge only manages to make that work by adding much further
25 # complexity, so let's leave it at that for now …
27 def __hash__(self) -> int:
28 history_tuples = tuple((k, v) for k, v in self.history.items())
29 hashable = (self.parent.id_, self.table_name, self._default,
34 def _newest_timestamp(self) -> str:
35 """Return most recent timestamp."""
36 return sorted(self.history.keys())[-1]
39 def value_type_name(self) -> str:
40 """Return string of name of attribute value type."""
41 return type(self._default).__name__
44 def newest(self) -> str | float:
45 """Return most recent value, or self._default if self.history empty."""
46 if 0 == len(self.history):
48 return self.history[self._newest_timestamp]
50 def reset_timestamp(self, old_str: str, new_str: str) -> None:
51 """Rename self.history key (timestamp) old to new.
53 Chronological sequence of keys must be preserved, i.e. cannot move
54 key before earlier or after later timestamp.
57 new = datetime.strptime(new_str, TIMESTAMP_FMT)
58 old = datetime.strptime(old_str, TIMESTAMP_FMT)
59 except ValueError as exc:
60 raise BadFormatException('Timestamp of illegal format.') from exc
61 timestamps = list(self.history.keys())
62 if old_str not in timestamps:
63 raise HandledException(f'Timestamp {old} not found in history.')
64 sorted_timestamps = sorted([datetime.strptime(t, TIMESTAMP_FMT)
66 expected_position = sorted_timestamps.index(old)
67 sorted_timestamps.remove(old)
68 sorted_timestamps += [new]
69 sorted_timestamps.sort()
70 if sorted_timestamps.index(new) != expected_position:
71 raise HandledException('Timestamp not respecting chronology.')
72 value = self.history[old_str]
73 del self.history[old_str]
74 self.history[new_str] = value
76 def set(self, value: str | float) -> None:
77 """Add to self.history if and only if not same value as newest one.
79 Note that we wait one micro-second, as timestamp comparison to check
80 most recent elements only goes up to that precision.
82 Also note that we don't check against .newest because that may make us
83 compare value against .default even if not set. We want to be able to
84 explicitly set .default as the first element.
87 if 0 == len(self.history) \
88 or value != self.history[self._newest_timestamp]:
89 self.history[datetime.now().strftime(TIMESTAMP_FMT)] = value
91 def history_from_row(self, row: Row) -> None:
92 """Extend self.history from expected table row format."""
93 self.history[row[1]] = row[2]
95 def at(self, queried_time: str) -> str | float:
96 """Retrieve value of timestamp nearest queried_time from the past."""
97 if len(queried_time) == 10:
98 queried_time += ' 23:59:59.999'
99 sorted_timestamps = sorted(self.history.keys())
100 if 0 == len(sorted_timestamps):
102 selected_timestamp = sorted_timestamps[0]
103 for timestamp in sorted_timestamps[1:]:
104 if timestamp > queried_time:
106 selected_timestamp = timestamp
107 return self.history[selected_timestamp]
109 def save(self, db_conn: DatabaseConnection) -> None:
110 """Save as self.history entries, but first wipe old ones."""
111 if self.parent.id_ is None:
112 raise NotFoundException('cannot save attribute to parent if no ID')
113 db_conn.rewrite_relations(self.table_name, 'parent', self.parent.id_,
115 for item in self.history.items()])
117 def remove(self, db_conn: DatabaseConnection) -> None:
118 """Remove from DB."""
119 db_conn.delete_where(self.table_name, 'parent', self.parent.id_)