home · contact · privacy
Slightly improve and re-organize Condition tests.
[plomtask] / plomtask / versioned_attributes.py
1 """Attributes whose values are recorded as a timestamped history."""
2 from datetime import datetime
3 from typing import Any
4 from sqlite3 import Row
5 from time import sleep
6 from plomtask.db import DatabaseConnection
7 from plomtask.exceptions import HandledException, BadFormatException
8
9 TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
10
11
12 class VersionedAttribute:
13     """Attributes whose values are recorded as a timestamped history."""
14
15     def __init__(self,
16                  parent: Any, table_name: str, default: str | float) -> None:
17         self.parent = parent
18         self.table_name = table_name
19         self.default = default
20         self.history: dict[str, str | float] = {}
21
22     def __hash__(self) -> int:
23         history_tuples = tuple((k, v) for k, v in self.history.items())
24         hashable = (self.parent.id_, self.table_name, self.default,
25                     history_tuples)
26         return hash(hashable)
27
28     @property
29     def as_dict(self) -> dict[str, object]:
30         """Return self as (json.dumps-coompatible) dict."""
31         d = {'parent_process_id': self.parent.id_,
32              'table_name': self.table_name,
33              'history': self.history}
34         return d
35
36     @property
37     def _newest_timestamp(self) -> str:
38         """Return most recent timestamp."""
39         return sorted(self.history.keys())[-1]
40
41     @property
42     def newest(self) -> str | float:
43         """Return most recent value, or self.default if self.history empty."""
44         if 0 == len(self.history):
45             return self.default
46         return self.history[self._newest_timestamp]
47
48     def reset_timestamp(self, old_str: str, new_str: str) -> None:
49         """Rename self.history key (timestamp) old to new.
50
51         Chronological sequence of keys must be preserved, i.e. cannot move
52         key before earlier or after later timestamp.
53         """
54         try:
55             new = datetime.strptime(new_str, TIMESTAMP_FMT)
56             old = datetime.strptime(old_str, TIMESTAMP_FMT)
57         except ValueError as exc:
58             raise BadFormatException('Timestamp of illegal format.') from exc
59         timestamps = list(self.history.keys())
60         if old_str not in timestamps:
61             raise HandledException(f'Timestamp {old} not found in history.')
62         sorted_timestamps = sorted([datetime.strptime(t, TIMESTAMP_FMT)
63                                     for t in timestamps])
64         expected_position = sorted_timestamps.index(old)
65         sorted_timestamps.remove(old)
66         sorted_timestamps += [new]
67         sorted_timestamps.sort()
68         if sorted_timestamps.index(new) != expected_position:
69             raise HandledException('Timestamp not respecting chronology.')
70         value = self.history[old_str]
71         del self.history[old_str]
72         self.history[new_str] = value
73
74     def set(self, value: str | float) -> None:
75         """Add to self.history if and only if not same value as newest one.
76
77         Note that we wait one micro-second, as timestamp comparison to check
78         most recent elements only goes up to that precision.
79
80         Also note that we don't check against .newest because that may make us
81         compare value against .default even if not set. We want to be able to
82         explicitly set .default as the first element.
83         """
84         sleep(0.00001)
85         if 0 == len(self.history) \
86                 or value != self.history[self._newest_timestamp]:
87             self.history[datetime.now().strftime(TIMESTAMP_FMT)] = value
88
89     def history_from_row(self, row: Row) -> None:
90         """Extend self.history from expected table row format."""
91         self.history[row[1]] = row[2]
92
93     def at(self, queried_time: str) -> str | float:
94         """Retrieve value of timestamp nearest queried_time from the past."""
95         if len(queried_time) == 10:
96             queried_time += ' 23:59:59.999'
97         sorted_timestamps = sorted(self.history.keys())
98         if 0 == len(sorted_timestamps):
99             return self.default
100         selected_timestamp = sorted_timestamps[0]
101         for timestamp in sorted_timestamps[1:]:
102             if timestamp > queried_time:
103                 break
104             selected_timestamp = timestamp
105         return self.history[selected_timestamp]
106
107     def save(self, db_conn: DatabaseConnection) -> None:
108         """Save as self.history entries, but first wipe old ones."""
109         db_conn.rewrite_relations(self.table_name, 'parent', self.parent.id_,
110                                   [[item[0], item[1]]
111                                    for item in self.history.items()])
112
113     def remove(self, db_conn: DatabaseConnection) -> None:
114         """Remove from DB."""
115         db_conn.delete_where(self.table_name, 'parent', self.parent.id_)