+ """Check file exists, and is of proper DB version and schema."""
+ if not isfile(self.path):
+ raise NotFoundException
+ if self.user_version != EXPECTED_DB_VERSION:
+ raise UnmigratedDbException()
+ self._validate_schema()
+
+ @staticmethod
+ def _available_migrations() -> list[str]:
+ """Validate migrations directory and return sorted entries."""
+ msg_too_big = 'Migration directory points beyond expected DB version.'
+ msg_bad_entry = 'Migration directory contains unexpected entry: '
+ msg_missing = 'Migration directory misses migration of number: '
+ migrations = {}
+ for entry in listdir(MIGRATIONS_DIR):
+ if entry == FILENAME_DB_SCHEMA:
+ continue
+ toks = entry.split('_', 1)
+ if len(toks) < 2:
+ raise HandledException(msg_bad_entry + entry)
+ try:
+ i = int(toks[0])
+ except ValueError as e:
+ raise HandledException(msg_bad_entry + entry) from e
+ if i > EXPECTED_DB_VERSION:
+ raise HandledException(msg_too_big)
+ migrations[i] = toks[1]
+ migrations_list = []
+ for i in range(EXPECTED_DB_VERSION + 1):
+ if i not in migrations:
+ raise HandledException(msg_missing + str(i))
+ migrations_list += [f'{i}_{migrations[i]}']
+ return migrations_list
+
+ @staticmethod
+ def get_version_of_db(path: str) -> int:
+ """Get DB user_version, fail if outside expected range."""
+ sql_for_db_version = 'PRAGMA user_version'
+ with sql_connect(path) as conn:
+ db_version = list(conn.execute(sql_for_db_version))[0][0]
+ if db_version > EXPECTED_DB_VERSION:
+ msg = f'Wrong DB version, expected '\
+ f'{EXPECTED_DB_VERSION}, got unknown {db_version}.'
+ raise HandledException(msg)
+ assert isinstance(db_version, int)
+ return db_version
+
+ @property
+ def user_version(self) -> int:
+ """Get DB user_version."""
+ return self.__class__.get_version_of_db(self.path)