from typing import Any
from sqlite3 import Row
from plomtask.db import DatabaseConnection, BaseModel
-from plomtask.misc import VersionedAttribute
+from plomtask.versioned_attributes import VersionedAttribute
from plomtask.exceptions import HandledException
+++ /dev/null
-"""Attributes whose values are recorded as a timestamped history."""
-from datetime import datetime
-from typing import Any
-from sqlite3 import Row
-from time import sleep
-from plomtask.db import DatabaseConnection
-
-TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
-
-
-class VersionedAttribute:
- """Attributes whose values are recorded as a timestamped history."""
-
- def __init__(self,
- parent: Any, table_name: str, default: str | float) -> None:
- self.parent = parent
- self.table_name = table_name
- self.default = default
- self.history: dict[str, str | float] = {}
-
- @property
- def _newest_timestamp(self) -> str:
- """Return most recent timestamp."""
- return sorted(self.history.keys())[-1]
-
- @property
- def newest(self) -> str | float:
- """Return most recent value, or self.default if self.history empty."""
- if 0 == len(self.history):
- return self.default
- return self.history[self._newest_timestamp]
-
- def set(self, value: str | float) -> None:
- """Add to self.history if and only if not same value as newest one.
-
- Note that we wait one micro-second, as timestamp comparison to check
- most recent elements only goes up to that precision.
-
- Also note that we don't check against .newest because that may make us
- compare value against .default even if not set. We want to be able to
- explicitly set .default as the first element.
- """
- sleep(0.00001)
- if 0 == len(self.history) \
- or value != self.history[self._newest_timestamp]:
- self.history[datetime.now().strftime(TIMESTAMP_FMT)] = value
-
- def history_from_row(self, row: Row) -> None:
- """Extend self.history from expected table row format."""
- self.history[row[1]] = row[2]
-
- def at(self, queried_time: str) -> str | float:
- """Retrieve value of timestamp nearest queried_time from the past."""
- sorted_timestamps = sorted(self.history.keys())
- if 0 == len(sorted_timestamps):
- return self.default
- selected_timestamp = sorted_timestamps[0]
- for timestamp in sorted_timestamps[1:]:
- if timestamp > queried_time:
- break
- selected_timestamp = timestamp
- return self.history[selected_timestamp]
-
- def save(self, db_conn: DatabaseConnection) -> None:
- """Save as self.history entries, but first wipe old ones."""
- db_conn.rewrite_relations(self.table_name, 'parent', self.parent.id_,
- [[item[0], item[1]]
- for item in self.history.items()])
from typing import Set, Any
from sqlite3 import Row
from plomtask.db import DatabaseConnection, BaseModel
-from plomtask.misc import VersionedAttribute
+from plomtask.versioned_attributes import VersionedAttribute
from plomtask.conditions import Condition, ConditionsRelations
from plomtask.exceptions import (NotFoundException, BadFormatException,
HandledException)
--- /dev/null
+"""Attributes whose values are recorded as a timestamped history."""
+from datetime import datetime
+from typing import Any
+from sqlite3 import Row
+from time import sleep
+from plomtask.db import DatabaseConnection
+
+TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
+
+
+class VersionedAttribute:
+ """Attributes whose values are recorded as a timestamped history."""
+
+ def __init__(self,
+ parent: Any, table_name: str, default: str | float) -> None:
+ self.parent = parent
+ self.table_name = table_name
+ self.default = default
+ self.history: dict[str, str | float] = {}
+
+ @property
+ def _newest_timestamp(self) -> str:
+ """Return most recent timestamp."""
+ return sorted(self.history.keys())[-1]
+
+ @property
+ def newest(self) -> str | float:
+ """Return most recent value, or self.default if self.history empty."""
+ if 0 == len(self.history):
+ return self.default
+ return self.history[self._newest_timestamp]
+
+ def set(self, value: str | float) -> None:
+ """Add to self.history if and only if not same value as newest one.
+
+ Note that we wait one micro-second, as timestamp comparison to check
+ most recent elements only goes up to that precision.
+
+ Also note that we don't check against .newest because that may make us
+ compare value against .default even if not set. We want to be able to
+ explicitly set .default as the first element.
+ """
+ sleep(0.00001)
+ if 0 == len(self.history) \
+ or value != self.history[self._newest_timestamp]:
+ self.history[datetime.now().strftime(TIMESTAMP_FMT)] = value
+
+ def history_from_row(self, row: Row) -> None:
+ """Extend self.history from expected table row format."""
+ self.history[row[1]] = row[2]
+
+ def at(self, queried_time: str) -> str | float:
+ """Retrieve value of timestamp nearest queried_time from the past."""
+ sorted_timestamps = sorted(self.history.keys())
+ if 0 == len(sorted_timestamps):
+ return self.default
+ selected_timestamp = sorted_timestamps[0]
+ for timestamp in sorted_timestamps[1:]:
+ if timestamp > queried_time:
+ break
+ selected_timestamp = timestamp
+ return self.history[selected_timestamp]
+
+ def save(self, db_conn: DatabaseConnection) -> None:
+ """Save as self.history entries, but first wipe old ones."""
+ db_conn.rewrite_relations(self.table_name, 'parent', self.parent.id_,
+ [[item[0], item[1]]
+ for item in self.history.items()])
from time import sleep
from datetime import datetime
from tests.utils import TestCaseWithDB
-from plomtask.misc import VersionedAttribute, TIMESTAMP_FMT
+from plomtask.versioned_attributes import VersionedAttribute, TIMESTAMP_FMT
from plomtask.db import BaseModel
SQL_TEST_TABLE = '''