+ to_save_simples: list[str] = []
+ to_save_relations: list[tuple[str, str, str, int]] = []
+ versioned_defaults: dict[str, str | float] = {}
+ add_to_dict: list[str] = []
+ id_: None | BaseModelId
+ cache_: dict[BaseModelId, Self]
+ to_search: list[str] = []
+ can_create_by_id = False
+ _exists = True
+ sorters: dict[str, Callable[..., Any]] = {}
+
+ def __init__(self, id_: BaseModelId | None) -> None:
+ if isinstance(id_, int) and id_ < 1:
+ msg = f'illegal {self.__class__.__name__} ID, must be >=1: {id_}'
+ raise BadFormatException(msg)
+ if isinstance(id_, str) and "" == id_:
+ msg = f'illegal {self.__class__.__name__} ID, must be non-empty'
+ raise BadFormatException(msg)
+ self.id_ = id_
+
+ def __hash__(self) -> int:
+ hashable = [self.id_] + [getattr(self, name)
+ for name in self.to_save_simples]
+ for definition in self.to_save_relations:
+ attr = getattr(self, definition[2])
+ hashable += [tuple(rel.id_ for rel in attr)]
+ for name in self.to_save_versioned():
+ hashable += [hash(getattr(self, name))]
+ return hash(tuple(hashable))
+
+ def __eq__(self, other: object) -> bool:
+ if not isinstance(other, self.__class__):
+ return False
+ return hash(self) == hash(other)
+
+ def __lt__(self, other: Any) -> bool:
+ if not isinstance(other, self.__class__):
+ msg = 'cannot compare to object of different class'
+ raise HandledException(msg)
+ assert isinstance(self.id_, int)
+ assert isinstance(other.id_, int)
+ return self.id_ < other.id_
+
+ @classmethod
+ def to_save_versioned(cls) -> list[str]:
+ """Return keys of cls.versioned_defaults assuming we wanna save 'em."""
+ return list(cls.versioned_defaults.keys())
+
+ @property
+ def as_dict_and_refs(self) -> tuple[dict[str, object],
+ list[BaseModel[int] | BaseModel[str]]]:
+ """Return self as json.dumps-ready dict, list of referenced objects."""
+ d: dict[str, object] = {'id': self.id_}
+ refs: list[BaseModel[int] | BaseModel[str]] = []
+ for to_save in self.to_save_simples:
+ d[to_save] = getattr(self, to_save)
+ if len(self.to_save_versioned()) > 0:
+ d['_versioned'] = {}
+ for k in self.to_save_versioned():
+ attr = getattr(self, k)
+ assert isinstance(d['_versioned'], dict)
+ d['_versioned'][k] = attr.history
+ rels_to_collect = [rel[2] for rel in self.to_save_relations]
+ rels_to_collect += self.add_to_dict
+ for attr_name in rels_to_collect:
+ rel_list = []
+ for item in getattr(self, attr_name):
+ rel_list += [item.id_]
+ if item not in refs:
+ refs += [item]
+ d[attr_name] = rel_list
+ return d, refs
+
+ @classmethod
+ def name_lowercase(cls) -> str:
+ """Convenience method to return cls' name in lowercase."""
+ return cls.__name__.lower()
+
+ @classmethod
+ def sort_by(cls, seq: list[Any], sort_key: str, default: str = 'title'
+ ) -> str:
+ """Sort cls list by cls.sorters[sort_key] (reverse if '-'-prefixed).
+
+ Before cls.sorters[sort_key] is applied, seq is sorted by .id_, to
+ ensure predictability where parts of seq are of same sort value.
+ """
+ reverse = False
+ if len(sort_key) > 1 and '-' == sort_key[0]:
+ sort_key = sort_key[1:]
+ reverse = True
+ if sort_key not in cls.sorters:
+ sort_key = default
+ seq.sort(key=lambda x: x.id_, reverse=reverse)
+ sorter: Callable[..., Any] = cls.sorters[sort_key]
+ seq.sort(key=sorter, reverse=reverse)
+ if reverse:
+ sort_key = f'-{sort_key}'
+ return sort_key
+
+ # cache management
+ # (we primarily use the cache to ensure we work on the same object in
+ # memory no matter where and how we retrieve it, e.g. we don't want
+ # .by_id() calls to create a new object each time, but rather a pointer
+ # to the one already instantiated)
+
+ def __getattribute__(self, name: str) -> Any:
+ """Ensure fail if ._disappear() was called, except to check ._exists"""
+ if name != '_exists' and not super().__getattribute__('_exists'):
+ msg = f'Object for attribute does not exist: {name}'
+ raise HandledException(msg)
+ return super().__getattribute__(name)
+
+ def _disappear(self) -> None:
+ """Invalidate object, make future use raise exceptions."""
+ assert self.id_ is not None
+ if self._get_cached(self.id_):
+ self._uncache()
+ to_kill = list(self.__dict__.keys())
+ for attr in to_kill:
+ delattr(self, attr)
+ self._exists = False
+
+ @classmethod
+ def empty_cache(cls) -> None:
+ """Empty class's cache, and disappear all former inhabitants."""
+ # pylint: disable=protected-access
+ # (cause we remain within the class)
+ if hasattr(cls, 'cache_'):
+ to_disappear = list(cls.cache_.values())
+ for item in to_disappear:
+ item._disappear()
+ cls.cache_ = {}
+
+ @classmethod
+ def get_cache(cls: type[BaseModelInstance]
+ ) -> dict[Any, BaseModelInstance]:
+ """Get cache dictionary, create it if not yet existing."""
+ if not hasattr(cls, 'cache_'):
+ d: dict[Any, BaseModelInstance] = {}
+ cls.cache_ = d
+ return cls.cache_
+
+ @classmethod
+ def _get_cached(cls: type[BaseModelInstance],
+ id_: BaseModelId
+ ) -> BaseModelInstance | None:
+ """Get object of id_ from class's cache, or None if not found."""
+ cache = cls.get_cache()
+ if id_ in cache:
+ obj = cache[id_]
+ assert isinstance(obj, cls)
+ return obj
+ return None
+
+ def cache(self) -> None:
+ """Update object in class's cache.
+
+ Also calls ._disappear if cache holds older reference to object of same
+ ID, but different memory address, to avoid doing anything with
+ dangling leftovers.
+ """
+ if self.id_ is None:
+ raise HandledException('Cannot cache object without ID.')
+ cache = self.get_cache()
+ old_cached = self._get_cached(self.id_)
+ if old_cached and id(old_cached) != id(self):
+ # pylint: disable=protected-access
+ # (cause we remain within the class)
+ old_cached._disappear()
+ cache[self.id_] = self
+
+ def _uncache(self) -> None:
+ """Remove self from cache."""
+ if self.id_ is None:
+ raise HandledException('Cannot un-cache object without ID.')
+ cache = self.get_cache()
+ del cache[self.id_]
+
+ # object retrieval and generation