From: Christian Heller Date: Fri, 14 Nov 2025 08:11:27 +0000 (+0100) Subject: Re-organize downloads management in a queue-driven command loop. X-Git-Url: https://plomlompom.com/repos/%7B%7Bdb.prefix%7D%7D/todos?a=commitdiff_plain;h=36c05a545570934033ef06dc5b1fcf18a97fb989;p=ytplom Re-organize downloads management in a queue-driven command loop. --- diff --git a/src/ytplom/http.py b/src/ytplom/http.py index a99ada4..c7fe804 100644 --- a/src/ytplom/http.py +++ b/src/ytplom/http.py @@ -72,6 +72,8 @@ class Server(ThreadingMixIn, PlomHttpServer): self.downloads = DownloadsManager() self.downloads.clean_unfinished() self.downloads.start_thread() + VideoFile.call_forget\ + = lambda yt_id: self.downloads.q.put(f'forget_{yt_id}') class _ReqMap(PlomQueryMap): @@ -246,7 +248,7 @@ class _TaskHandler(PlomHttpHandler): .joinpath(PAGE_NAMES['file']) .joinpath(file_data.digest.b64)) except NotFoundException: - self.server.downloads.queue_download(video_id) + self.server.downloads.q.put(f'queue_{video_id}') self._redirect(Path('/') .joinpath(PAGE_NAMES['yt_result']) .joinpath(video_id)) @@ -315,11 +317,8 @@ class _TaskHandler(PlomHttpHandler): update = self.server.downloads.last_update_for(conn, yt_id) if last_updates['download'] < update['time']: last_updates['download'] = update['time'] - if update['status'] == 'present': - file = VideoFile.get_by_yt_id(conn, yt_id) - update['path'] = str(file.rel_path) - update['digest'] = file.digest.b64 - payload['download'] = update + payload['download'] = {k: update[k] for k in update + if k != 'time'} if not payload: sleep(_EVENTS_UPDATE_INTERVAL_S) diff --git a/src/ytplom/misc.py b/src/ytplom/misc.py index 92cd728..760d0b4 100644 --- a/src/ytplom/misc.py +++ b/src/ytplom/misc.py @@ -1,10 +1,9 @@ """Main ytplom lib.""" # included libs -from typing import Generator, NewType, Optional, Self +from typing import Callable, Generator, NewType, Optional, Self from os import chdir, environ from random import shuffle -from time import sleep from datetime import datetime, timedelta from decimal import Decimal from json import dumps as json_dumps, loads as json_loads @@ -12,7 +11,7 @@ from urllib.request import urlretrieve from uuid import uuid4 from pathlib import Path from threading import Thread -from queue import Queue +from queue import SimpleQueue # non-included libs from ffmpeg import probe as ffprobe # type: ignore import googleapiclient.discovery # type: ignore @@ -359,6 +358,8 @@ class VideoFile(DbData): rel_path: Path digest: Hash tags: TagSet + # class methods + call_forget: Callable # class attributes last_update_cutoff: DatetimeStr tags_prefilter_needed: TagSet @@ -519,6 +520,7 @@ class VideoFile(DbData): """Remove actual file from local filesystem.""" print(f'SYNC: removing from filesystem: {self.rel_path}') self.full_path.unlink() + self.__class__.call_forget(self.yt_id) def purge(self, conn) -> None: """Remove self from database, and in filesystem if no other owners.""" @@ -612,7 +614,7 @@ class Player: def __init__(self) -> None: self._mpv: Optional[MPV] = None self._monitoring_kill: bool = False - self._kill_queue: Queue = Queue() + self._kill_queue: SimpleQueue = SimpleQueue() self.playlist: list[VideoFile] = [] self.speed = -1.0 self.duration = -1 @@ -833,24 +835,24 @@ class Player: class DownloadsManager: """Manages downloading and downloads access.""" - _last_updates: dict[YoutubeId, dict[str, str]] def __init__(self) -> None: + self._downloaded: list[YoutubeId] = [] + self._downloading: Optional[YoutubeId] = None self._to_download: list[YoutubeId] = [] + self._status: str = '' ensure_expected_dirs([PATH_DOWNLOADS, PATH_TEMP]) - self._last_updates: dict[YoutubeId, dict[str, str]] = {} - self._sync_db() - - def _sync_db(self): + self._timestamps: dict[YoutubeId, DatetimeStr] = {} with DbConn() as conn: VideoFile.purge_deleteds(conn) conn.commit() - known_paths = [file.rel_path for file in VideoFile.get_all(conn)] + known_files = VideoFile.get_all(conn) old_cwd = Path.cwd() chdir(PATH_DOWNLOADS) + self.q: SimpleQueue = SimpleQueue() for path in [p for p in Path('.').iterdir() if p.is_file()]: yt_id = self._id_from_filename(path) - if path not in known_paths: + if yt_id not in [f.yt_id for f in known_files]: print(f'SYNC: new file {path}, saving to YT ID "{yt_id}".') file = VideoFile(digest=None, rel_path=path, @@ -859,35 +861,27 @@ class DownloadsManager: with DbConn() as conn: file.save(conn) conn.commit() - with DbConn() as conn: - for path in [p for p in Path('.').iterdir() if p.is_file()]: - yt_id = self._id_from_filename(path) - if (yt_id not in self._last_updates - or 'present' != self._last_updates[yt_id]['status']): - self._update_status(yt_id, 'present') - self._files = VideoFile.get_all(conn) + self._downloaded += [yt_id] chdir(old_cwd) - def last_update_for(self, - conn: DbConn, - yt_id: YoutubeId + def last_update_for(self, conn: DbConn, yt_id: YoutubeId ) -> dict[str, str]: - """Retrieve ._last_updates[yt_id] but reset to 'absent' if needed.""" - if yt_id in self._last_updates: - if self._last_updates[yt_id]['status'] != 'present': - return self._last_updates[yt_id] - try: - file = VideoFile.get_by_yt_id(conn, yt_id) - if not file.present: - self._update_status(yt_id, 'absent') - except NotFoundException: - self._update_status(yt_id, 'absent') - else: - self._update_status(yt_id, 'absent') - return self._last_updates[yt_id] - - def _update_status(self, yt_id: YoutubeId, status: str) -> None: - self._last_updates[yt_id] = {'status': status, 'time': _now_string()} + 'For yt_id construct update with timestamp, status, optional fields.' + update = { + 'time': self._timestamps.get(yt_id, '2000-01-01'), + ## 'status': ('present' if yt_id in self._downloaded + self._inherited + 'status': ('present' if yt_id in self._downloaded + else ('queued' if yt_id in self._to_download + else (self._status if yt_id == self._downloading + else 'absent')))} + if update['status'] == 'present': + file = VideoFile.get_by_yt_id(conn, yt_id) + update['path'] = str(file.rel_path) + update['digest'] = file.digest.b64 + return update + + def _update_timestamp(self, yt_id: YoutubeId) -> None: + self._timestamps[yt_id] = _now_string() @staticmethod def _id_from_filename(path: Path) -> YoutubeId: @@ -899,64 +893,99 @@ class DownloadsManager: print(f'removing unfinished download: {path}') path.unlink() - def queue_download(self, video_id: YoutubeId) -> None: - """Add video_id to download queue *if* not already processed.""" - self._sync_db() - if video_id in self._to_download: + def _queue_download(self, yt_id: YoutubeId) -> None: + if yt_id == self._downloading or yt_id in self._downloaded: return - for path in ([p for p in PATH_TEMP.iterdir() if p.is_file()] - + [f.full_path for f in self._files]): - if self._id_from_filename(path) == video_id: - return - self._to_download += [video_id] - self._update_status(video_id, 'queued') + self._to_download += [yt_id] + if not self._downloading: + self.q.put('download_next') + + def _forget_file(self, yt_id: YoutubeId) -> None: + if yt_id in self._downloaded: + self._downloaded.remove(yt_id) + + def _savefile(self, arg: str) -> YoutubeId: + filename = Path(arg) + yt_id = self._id_from_filename(filename) + file = VideoFile(digest=None, + rel_path=filename, + yt_id=yt_id, + tags_str=VideoFile.tags_default.joined) + with DbConn() as conn: + file.save(conn) + conn.commit() + self._downloaded += [yt_id] + self._downloading = None + return yt_id def _download_next(self) -> None: - if self._to_download: - downloaded_before: int = 0 - sizes: dict[str, list[int]] = {} - - def hook(d) -> None: - nonlocal downloaded_before - if d['status'] in {'downloading', 'finished'}: - downloaded_i = d[TOK_LOADED] - downloaded_mb = (downloaded_i + downloaded_before) / MEGA - size = sizes[d['info_dict'][TOK_FO_ID]] - if (not size[0]) and TOK_FRAG_C in d and TOK_FRAG_I in d: - progress = min(d[TOK_FRAG_I] + 1, d[TOK_FRAG_C] - ) / d[TOK_FRAG_C] - size[1] = d[TOK_LOADED] / progress - guess_total_mb = sum(t[1] for t in sizes.values()) / MEGA - msg = f'{int(100 * downloaded_mb/guess_total_mb)}%: '\ - f'{downloaded_mb:5.1f}/{guess_total_mb:.1f}' - self._update_status(video_id, f'downloading: {msg} MB') - if d['status'] == 'finished': - downloaded_before += downloaded_i - - video_id = self._to_download.pop(0) - url = f'{YOUTUBE_URL_PREFIX}{video_id}' - with YoutubeDL(YT_DL_PARAMS | {'progress_hooks': [hook]}) as ydl: - self._update_status(video_id, 'preparing download') - try: - info = ydl.sanitize_info(ydl.extract_info(url, - download=False)) - key_formats = 'requested_formats' - if key_formats not in info: - raise YoutubeDLError(f'no "{key_formats}" in info') - for f in info[key_formats]: - sizes[f[TOK_FO_ID]] = [False, 1] - if TOK_FS_AP in f: - sizes[f[TOK_FO_ID]] = [True, f[TOK_FS_AP]] - ydl.download(url) - except YoutubeDLError as e: - self._update_status(video_id, 'ERROR') - raise e - self._sync_db() + if not self._to_download: + return + downloaded_before: int = 0 + sizes: dict[str, list[int]] = {} + self._downloading = yt_id = self._to_download.pop(0) + filename: Optional[str] = None + + def hook(d) -> None: + nonlocal downloaded_before + nonlocal filename + if d['status'] in {'downloading', 'finished'}: + downloaded_i = d[TOK_LOADED] + downloaded_mb = (downloaded_i + downloaded_before) / MEGA + size = sizes[d['info_dict'][TOK_FO_ID]] + if (not size[0]) and TOK_FRAG_C in d and TOK_FRAG_I in d: + progress = min(d[TOK_FRAG_I] + 1, d[TOK_FRAG_C] + ) / d[TOK_FRAG_C] + size[1] = d[TOK_LOADED] / progress + guess_total_mb = sum(t[1] for t in sizes.values()) / MEGA + msg = f'{int(100 * downloaded_mb/guess_total_mb)}%: '\ + f'{downloaded_mb:5.1f}/{guess_total_mb:.1f}' + self._status = f'downloading: {msg} MB' + self._update_timestamp(yt_id) + if d['status'] == 'finished': + downloaded_before += downloaded_i + filename = Path(d["info_dict"]["filename"]).name + + url = f'{YOUTUBE_URL_PREFIX}{self._downloading}' + self._status = 'preparing download' + with YoutubeDL(YT_DL_PARAMS | {'progress_hooks': [hook]}) as ydl: + self._update_timestamp(yt_id) + try: + self._status = 'extracting download info' + info_dirty = ydl.extract_info(url, download=False) + info = ydl.sanitize_info(info_dirty) + key_formats = 'requested_formats' + if key_formats not in info: + raise YoutubeDLError(f'no "{key_formats}" in info') + for f in info[key_formats]: + sizes[f[TOK_FO_ID]] = [False, 1] + if TOK_FS_AP in f: + sizes[f[TOK_FO_ID]] = [True, f[TOK_FS_AP]] + ydl.download(url) + except YoutubeDLError as e: + self._update_timestamp(yt_id) + self._status = 'ERROR' + raise e + self.q.put(f'savefile_{filename}') + self.q.put('download_next') def start_thread(self) -> None: - """Keep iterating through download queue for new download tasks.""" - def loop(): + 'Collect, enact commands sent through .q.' + def loop() -> None: while True: - sleep(0.5) - self._download_next() + command = self.q.get() + if command == 'download_next': + Thread(target=self._download_next, daemon=False).start() + continue + command, arg = command.split('_', maxsplit=1) + if command == 'savefile': + yt_id = self._savefile(arg) + else: + yt_id = arg + if command == 'forget': + self._forget_file(yt_id) + elif command == 'queue': + self._queue_download(yt_id) + self._update_timestamp(yt_id) + Thread(target=loop, daemon=False).start()