class DatabaseConnection:
"""Wrapped sqlite3.Connection."""
- def __init__(self) -> None:
- if not path_exists(PATH_DB):
- with sql_connect(PATH_DB) as conn:
+ def __init__(self, path: PathStr = PATH_DB) -> None:
+ self._path = path
+ if not path_exists(self._path):
+ with sql_connect(self._path) as conn:
conn.executescript(SCRIPT_INIT_DB)
- self._conn = sql_connect(PATH_DB)
+ self._conn = sql_connect(self._path)
def exec(self, sql: SqlText, inputs: tuple[Any, ...] = tuple()) -> Cursor:
"""Wrapper around sqlite3.Connection.execute."""
_table_name: str
_cols: tuple[str, ...]
+ def __eq__(self, other: Any) -> bool:
+ if not isinstance(other, self.__class__):
+ return False
+ for attr_name in self._cols:
+ if getattr(self, attr_name) != getattr(other, attr_name):
+ return False
+ return True
+
@classmethod
def _from_table_row(cls, row: Row) -> Self:
kwargs = {}
def __init__(self, rel_path: PathStr, yt_id: YoutubeId) -> None:
self.rel_path = rel_path
self.yt_id = yt_id
-
- def remove(self, conn: DatabaseConnection) -> None:
- """Remove self from database by self.rel_path as identifier."""
- sql = SqlText(f'DELETE FROM {self._table_name} WHERE rel_path = ?')
- conn.exec(SqlText(sql), (self.rel_path,))
+ self.missing = False
class QuotaLog(DbData):
class Player:
"""MPV representation with some additional features."""
+ _idx: int
def __init__(self) -> None:
self.last_update = PlayerUpdateId('')
+ self._load_filenames()
+ self._mpv: Optional[MPV] = None
+
+ def _load_filenames(self) -> None:
self._filenames = [PathStr(e.path) for e in scandir(PATH_DOWNLOADS)
if isfile(e.path)
and splitext(e.path)[1][1:] in LEGAL_EXTENSIONS]
shuffle(self._filenames)
- self._idx: int = 0
- self._mpv: Optional[MPV] = None
+ self._idx = 0
@property
def _mpv_available(self) -> bool:
self._mpv.playlist_append(path)
self._mpv.playlist_play_index(self._idx)
+ @_if_mpv_available
+ def _kill_mpv(self) -> None:
+ assert self._mpv is not None
+ self._mpv.terminate()
+ self._mpv = None
+
@property
def current_filename(self) -> Optional[PathStr]:
"""Return what we assume is the name of the currently playing file."""
def toggle_run(self) -> None:
"""Toggle player running."""
if self._mpv_available:
- assert self._mpv is not None
- self._mpv.terminate()
- self._mpv = None
+ self._kill_mpv()
else:
self._start_mpv()
self._signal_update()
else:
self._mpv.playlist_play_index(max_idx)
+ def reload(self) -> None:
+ """Close MPV, re-read (and re-shuffle) filenames, then re-start MPV."""
+ self._kill_mpv()
+ self._load_filenames()
+ self._start_mpv()
+ self._signal_update()
+
class DownloadsDb:
"""Collections downloading-related stuff."""
files_via_db = VideoFile.get_all(conn)
old_cwd = getcwd()
chdir(PATH_DOWNLOADS)
- for file in files_via_db:
- if not isfile(path_join(file.rel_path)):
- print(f'SYNC: no file {file.rel_path} found, removing entry.')
- file.remove(conn)
paths = [file.rel_path for file in files_via_db]
for path in [PathStr(e.path) for e in scandir() if isfile(e.path)]:
if path not in paths:
file = VideoFile(path, yt_id)
print(f'SYNC: new file {path}, saving with YT ID "{yt_id}".')
file.save(conn)
- chdir(old_cwd)
self._files = VideoFile.get_all(conn)
+ for file in self._files:
+ if not isfile(path_join(file.rel_path)):
+ file.missing = True
+ chdir(old_cwd)
conn.commit_close()
@staticmethod
before_ext = splitext(before_ext)[0]
return YoutubeId(before_ext.split('[')[-1].split(']')[0])
+ @property
+ def missing(self) -> list[PathStr]:
+ """Return relative paths of files known but not in PATH_DOWNLOADS."""
+ self._sync_db()
+ return [f.rel_path for f in self._files if f.missing]
+
@property
def ids_to_paths(self) -> DownloadsIndex:
"""Return mapping YoutubeIds:paths of files downloaded to them."""
video_id = self._to_download.pop(0)
with YoutubeDL(YT_DL_PARAMS) as ydl:
ydl.download([f'{YOUTUBE_URL_PREFIX}{video_id}'])
+ self._sync_db()
def download_loop(self) -> None:
"""Keep iterating through download queue for new download tasks."""
class Server(HTTPServer):
"""Extension of HTTPServer providing for Player and DownloadsDb."""
- def __init__(self,
- player: Player,
- downloads_db: DownloadsDb,
- *args, **kwargs
- ) -> None:
+ def __init__(self, downloads_db: DownloadsDb, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
- self.player = player
+ self.player = Player()
self.downloads = downloads_db
self.server.player.next()
elif 'stop' in commands:
self.server.player.toggle_run()
- sleep(0.5) # avoid reload happening before current_file update
+ elif 'reload' in commands:
+ self.server.player.reload()
+ sleep(0.5) # avoid redir happening before current_file update
self._send_http(headers=[('Location', '/')], code=302)
def _post_query(self, query_txt: QueryText) -> None:
self._send_videos_index()
elif 'video_about' == page_name:
self._send_video_about(YoutubeId(toks_url[2]))
+ elif 'missing.json' == page_name:
+ self._send_missing_json()
elif 'query' == page_name:
self._send_query_page(QueryId(toks_url[2]))
elif 'queries' == page_name:
videos.sort(key=lambda t: t[1])
self._send_rendered_template(NAME_TEMPLATE_VIDEOS, {'videos': videos})
+ def _send_missing_json(self) -> None:
+ self._send_http(
+ bytes(json_dumps(self.server.downloads.missing), 'utf8'),
+ headers=[('Content-type', 'application/json')])
+
def _send_last_playlist_update(self) -> None:
payload: dict[str, PlayerUpdateId] = {
'last_update': self.server.player.last_update}