from base64 import urlsafe_b64decode, urlsafe_b64encode
from hashlib import file_digest
from pathlib import Path
-from sqlite3 import connect as sql_connect, Cursor as DbCursor, Row
+from sqlite3 import connect as sql_connect, Cursor as SqlCursor, Row as SqlRow
from typing import Any, Literal, NewType, Self
from ytplom.primitives import (
HandledException, NotFoundException, PATH_APP_DATA)
return False
def exec(self, sql: SqlText, inputs: tuple[Any, ...] = tuple()
- ) -> DbCursor:
+ ) -> SqlCursor:
"""Wrapper around sqlite3.Connection.execute, building '?' if inputs"""
if len(inputs) > 0:
q_marks = ('?' if len(inputs) == 1
return self._conn.execute(SqlText(f'{sql} {q_marks}'), inputs)
return self._conn.execute(sql)
- def exec_script(self, sql: SqlText) -> DbCursor:
+ def exec_script(self, sql: SqlText) -> None:
"""Wrapper around sqlite3.Connection.executescript."""
- return self._conn.executescript(sql)
+ self._conn.executescript(sql)
def commit(self) -> None:
"""Commit changes (i.e. DbData.save() calls) to database."""
return str(getattr(self, self._str_field))
@classmethod
- def _from_table_row(cls, row: Row) -> Self:
+ def _from_table_row(cls, row: SqlRow) -> Self:
kwargs = {}
for i, col_name in enumerate(cls._cols):
kwargs[col_name] = row[i]
rows = conn.exec(sql).fetchall()
return [cls._from_table_row(row) for row in rows]
- def save(self, conn: DbConn) -> DbCursor:
+ def save(self, conn: DbConn) -> None:
"""Save entry to DB."""
vals = []
for val in [getattr(self, col_name) for col_name in self._cols]:
elif isinstance(val, Hash):
val = val.bytes
vals += [val]
- return conn.exec(SqlText(f'REPLACE INTO {self._table_name} VALUES'),
- tuple(vals))
+ conn.exec(SqlText(f'REPLACE INTO {self._table_name} VALUES'),
+ tuple(vals))
from mpv import MPV # type: ignore
from yt_dlp import YoutubeDL # type: ignore
# ourselves
-from ytplom.db import DbConn, DbCursor, DbData, Hash, SqlText
+from ytplom.db import DbConn, DbData, Hash, SqlText
from ytplom.primitives import HandledException, NotFoundException
def _renew_last_update(self):
self.last_update = DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT))
- def save(self, conn: DbConn) -> DbCursor:
+ def save(self, conn: DbConn) -> None:
"""Extend super().save by new .last_update if sufficient changes."""
if hash(self) != self._hash_on_last_update:
self._renew_last_update()
@classmethod
def _remove_old(cls, conn: DbConn) -> None:
cutoff = datetime.now() - timedelta(days=1)
- sql = SqlText(f'DELETE FROM {cls._table_name} WHERE timestamp < ?')
+ sql = SqlText(f'DELETE FROM {cls._table_name} WHERE timestamp <')
conn.exec(SqlText(sql), (cutoff.strftime(TIMESTAMP_FMT),))