home · contact · privacy
Slightly improve and re-organize Condition tests.
[plomtask] / plomtask / versioned_attributes.py
index b3442d7df1030b72b2990a21e12ffa12302aa9d1..8861c9834ff3924d6459ced5cb9c69629424bb45 100644 (file)
@@ -4,6 +4,8 @@ from typing import Any
 from sqlite3 import Row
 from time import sleep
 from plomtask.db import DatabaseConnection
+from plomtask.exceptions import (HandledException, BadFormatException,
+                                 NotFoundException)
 
 TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
 
@@ -18,6 +20,12 @@ class VersionedAttribute:
         self.default = default
         self.history: dict[str, str | float] = {}
 
+    def __hash__(self) -> int:
+        history_tuples = tuple((k, v) for k, v in self.history.items())
+        hashable = (self.parent.id_, self.table_name, self.default,
+                    history_tuples)
+        return hash(hashable)
+
     @property
     def _newest_timestamp(self) -> str:
         """Return most recent timestamp."""
@@ -30,6 +38,32 @@ class VersionedAttribute:
             return self.default
         return self.history[self._newest_timestamp]
 
+    def reset_timestamp(self, old_str: str, new_str: str) -> None:
+        """Rename self.history key (timestamp) old to new.
+
+        Chronological sequence of keys must be preserved, i.e. cannot move
+        key before earlier or after later timestamp.
+        """
+        try:
+            new = datetime.strptime(new_str, TIMESTAMP_FMT)
+            old = datetime.strptime(old_str, TIMESTAMP_FMT)
+        except ValueError as exc:
+            raise BadFormatException('Timestamp of illegal format.') from exc
+        timestamps = list(self.history.keys())
+        if old_str not in timestamps:
+            raise HandledException(f'Timestamp {old} not found in history.')
+        sorted_timestamps = sorted([datetime.strptime(t, TIMESTAMP_FMT)
+                                    for t in timestamps])
+        expected_position = sorted_timestamps.index(old)
+        sorted_timestamps.remove(old)
+        sorted_timestamps += [new]
+        sorted_timestamps.sort()
+        if sorted_timestamps.index(new) != expected_position:
+            raise HandledException('Timestamp not respecting chronology.')
+        value = self.history[old_str]
+        del self.history[old_str]
+        self.history[new_str] = value
+
     def set(self, value: str | float) -> None:
         """Add to self.history if and only if not same value as newest one.
 
@@ -65,6 +99,8 @@ class VersionedAttribute:
 
     def save(self, db_conn: DatabaseConnection) -> None:
         """Save as self.history entries, but first wipe old ones."""
+        if self.parent.id_ is None:
+            raise NotFoundException('cannot save attribute to parent if no ID')
         db_conn.rewrite_relations(self.table_name, 'parent', self.parent.id_,
                                   [[item[0], item[1]]
                                    for item in self.history.items()])