- if not path.is_file():
- raise HandledException(f'no DB file at {path}')
- if expected_version >= 0:
- user_version = self._get_user_version()
- if user_version != expected_version:
- raise HandledException(
- f'wrong DB version {user_version} (!= {expected_version})')
+ if not self._path.is_file():
+ raise HandledException(f'no DB file at {self._path}')
+
+ if version_to_validate < 0:
+ return
+ # ensure version
+ if (user_version := self._get_user_version()) != version_to_validate:
+ raise HandledException(
+ f'wrong DB version {user_version} (!= {version_to_validate})')
+
+ # ensure schema
+ with sql_connect(self._path) as conn:
+ schema_rows = [
+ r[0] for r in
+ conn.execute('SELECT sql FROM sqlite_master ORDER BY sql')
+ if r[0]]
+ schema_rows_normed = []
+ indent = ' '
+ for row in schema_rows:
+ row_normed = []
+ for subrow in row.split('\n'):
+ subrow = subrow.rstrip()
+ in_parentheses = 0
+ split_at = []
+ for i, c in enumerate(subrow):
+ if '(' == c:
+ in_parentheses += 1
+ elif ')' == c:
+ in_parentheses -= 1
+ elif ',' == c and 0 == in_parentheses:
+ split_at += [i + 1]
+ prev_split = 0
+ for i in split_at:
+ segment = subrow[prev_split:i].strip()
+ if len(segment) > 0:
+ row_normed += [f'{indent}{segment}']
+ prev_split = i
+ segment = subrow[prev_split:].strip()
+ if len(segment) > 0:
+ row_normed += [f'{indent}{segment}']
+ row_normed[0] = row_normed[0].lstrip()
+ row_normed[-1] = row_normed[-1].lstrip()
+ if row_normed[-1] != ')' and row_normed[-3][-1] != ',':
+ row_normed[-3] = row_normed[-3] + ','
+ row_normed[-2:] = [indent + row_normed[-1][:-1]] + [')']
+ schema_rows_normed += ['\n'.join(row_normed)]
+ retrieved_schema = ';\n'.join(schema_rows_normed) + ';'
+ with open(_PATH_DB_SCHEMA, 'r', encoding='utf-8') as f:
+ stored_schema = f.read().rstrip()
+ if stored_schema != retrieved_schema:
+ diff_msg = Differ().compare(retrieved_schema.splitlines(),
+ stored_schema.splitlines())
+ raise HandledException('DB has wrong tables schema. Diff:\n'
+ + '\n'.join(diff_msg))