home · contact · privacy
Re-organize downloads management in a queue-driven command loop.
authorChristian Heller <c.heller@plomlompom.de>
Fri, 14 Nov 2025 08:11:27 +0000 (09:11 +0100)
committerChristian Heller <c.heller@plomlompom.de>
Fri, 14 Nov 2025 08:11:27 +0000 (09:11 +0100)
src/ytplom/http.py
src/ytplom/misc.py

index a99ada40477c0cdb882bf8121bc88ae8dec614b0..c7fe804839ff0678bd1dbec2817714a7a003a1c0 100644 (file)
@@ -72,6 +72,8 @@ class Server(ThreadingMixIn, PlomHttpServer):
         self.downloads = DownloadsManager()
         self.downloads.clean_unfinished()
         self.downloads.start_thread()
+        VideoFile.call_forget\
+            = lambda yt_id: self.downloads.q.put(f'forget_{yt_id}')
 
 
 class _ReqMap(PlomQueryMap):
@@ -246,7 +248,7 @@ class _TaskHandler(PlomHttpHandler):
                                .joinpath(PAGE_NAMES['file'])
                                .joinpath(file_data.digest.b64))
         except NotFoundException:
-            self.server.downloads.queue_download(video_id)
+            self.server.downloads.q.put(f'queue_{video_id}')
             self._redirect(Path('/')
                            .joinpath(PAGE_NAMES['yt_result'])
                            .joinpath(video_id))
@@ -315,11 +317,8 @@ class _TaskHandler(PlomHttpHandler):
                     update = self.server.downloads.last_update_for(conn, yt_id)
                     if last_updates['download'] < update['time']:
                         last_updates['download'] = update['time']
-                        if update['status'] == 'present':
-                            file = VideoFile.get_by_yt_id(conn, yt_id)
-                            update['path'] = str(file.rel_path)
-                            update['digest'] = file.digest.b64
-                        payload['download'] = update
+                        payload['download'] = {k: update[k] for k in update
+                                               if k != 'time'}
             if not payload:
                 sleep(_EVENTS_UPDATE_INTERVAL_S)
 
index 92cd728c9ac2c34a01eea39cf48c9c62abefe486..760d0b485dea1f9d1994caa0836d3a389efce2fe 100644 (file)
@@ -1,10 +1,9 @@
 """Main ytplom lib."""
 
 # included libs
-from typing import Generator, NewType, Optional, Self
+from typing import Callable, Generator, NewType, Optional, Self
 from os import chdir, environ
 from random import shuffle
-from time import sleep
 from datetime import datetime, timedelta
 from decimal import Decimal
 from json import dumps as json_dumps, loads as json_loads
@@ -12,7 +11,7 @@ from urllib.request import urlretrieve
 from uuid import uuid4
 from pathlib import Path
 from threading import Thread
-from queue import Queue
+from queue import SimpleQueue
 # non-included libs
 from ffmpeg import probe as ffprobe  # type: ignore
 import googleapiclient.discovery  # type: ignore
@@ -359,6 +358,8 @@ class VideoFile(DbData):
     rel_path: Path
     digest: Hash
     tags: TagSet
+    # class methods
+    call_forget: Callable
     # class attributes
     last_update_cutoff: DatetimeStr
     tags_prefilter_needed: TagSet
@@ -519,6 +520,7 @@ class VideoFile(DbData):
         """Remove actual file from local filesystem."""
         print(f'SYNC: removing from filesystem: {self.rel_path}')
         self.full_path.unlink()
+        self.__class__.call_forget(self.yt_id)
 
     def purge(self, conn) -> None:
         """Remove self from database, and in filesystem if no other owners."""
@@ -612,7 +614,7 @@ class Player:
     def __init__(self) -> None:
         self._mpv: Optional[MPV] = None
         self._monitoring_kill: bool = False
-        self._kill_queue: Queue = Queue()
+        self._kill_queue: SimpleQueue = SimpleQueue()
         self.playlist: list[VideoFile] = []
         self.speed = -1.0
         self.duration = -1
@@ -833,24 +835,24 @@ class Player:
 
 class DownloadsManager:
     """Manages downloading and downloads access."""
-    _last_updates: dict[YoutubeId, dict[str, str]]
 
     def __init__(self) -> None:
+        self._downloaded: list[YoutubeId] = []
+        self._downloading: Optional[YoutubeId] = None
         self._to_download: list[YoutubeId] = []
+        self._status: str = ''
         ensure_expected_dirs([PATH_DOWNLOADS, PATH_TEMP])
-        self._last_updates: dict[YoutubeId, dict[str, str]] = {}
-        self._sync_db()
-
-    def _sync_db(self):
+        self._timestamps: dict[YoutubeId, DatetimeStr] = {}
         with DbConn() as conn:
             VideoFile.purge_deleteds(conn)
             conn.commit()
-            known_paths = [file.rel_path for file in VideoFile.get_all(conn)]
+            known_files = VideoFile.get_all(conn)
         old_cwd = Path.cwd()
         chdir(PATH_DOWNLOADS)
+        self.q: SimpleQueue = SimpleQueue()
         for path in [p for p in Path('.').iterdir() if p.is_file()]:
             yt_id = self._id_from_filename(path)
-            if path not in known_paths:
+            if yt_id not in [f.yt_id for f in known_files]:
                 print(f'SYNC: new file {path}, saving to YT ID "{yt_id}".')
                 file = VideoFile(digest=None,
                                  rel_path=path,
@@ -859,35 +861,27 @@ class DownloadsManager:
                 with DbConn() as conn:
                     file.save(conn)
                     conn.commit()
-        with DbConn() as conn:
-            for path in [p for p in Path('.').iterdir() if p.is_file()]:
-                yt_id = self._id_from_filename(path)
-                if (yt_id not in self._last_updates
-                        or 'present' != self._last_updates[yt_id]['status']):
-                    self._update_status(yt_id, 'present')
-            self._files = VideoFile.get_all(conn)
+            self._downloaded += [yt_id]
         chdir(old_cwd)
 
-    def last_update_for(self,
-                        conn: DbConn,
-                        yt_id: YoutubeId
+    def last_update_for(self, conn: DbConn, yt_id: YoutubeId
                         ) -> dict[str, str]:
-        """Retrieve ._last_updates[yt_id] but reset to 'absent' if needed."""
-        if yt_id in self._last_updates:
-            if self._last_updates[yt_id]['status'] != 'present':
-                return self._last_updates[yt_id]
-            try:
-                file = VideoFile.get_by_yt_id(conn, yt_id)
-                if not file.present:
-                    self._update_status(yt_id, 'absent')
-            except NotFoundException:
-                self._update_status(yt_id, 'absent')
-        else:
-            self._update_status(yt_id, 'absent')
-        return self._last_updates[yt_id]
-
-    def _update_status(self, yt_id: YoutubeId, status: str) -> None:
-        self._last_updates[yt_id] = {'status': status, 'time': _now_string()}
+        'For yt_id construct update with timestamp, status, optional fields.'
+        update = {
+            'time': self._timestamps.get(yt_id, '2000-01-01'),
+            ## 'status': ('present' if yt_id in self._downloaded + self._inherited
+            'status': ('present' if yt_id in self._downloaded
+                       else ('queued' if yt_id in self._to_download
+                             else (self._status if yt_id == self._downloading
+                                   else 'absent')))}
+        if update['status'] == 'present':
+            file = VideoFile.get_by_yt_id(conn, yt_id)
+            update['path'] = str(file.rel_path)
+            update['digest'] = file.digest.b64
+        return update
+
+    def _update_timestamp(self, yt_id: YoutubeId) -> None:
+        self._timestamps[yt_id] = _now_string()
 
     @staticmethod
     def _id_from_filename(path: Path) -> YoutubeId:
@@ -899,64 +893,99 @@ class DownloadsManager:
             print(f'removing unfinished download: {path}')
             path.unlink()
 
-    def queue_download(self, video_id: YoutubeId) -> None:
-        """Add video_id to download queue *if* not already processed."""
-        self._sync_db()
-        if video_id in self._to_download:
+    def _queue_download(self, yt_id: YoutubeId) -> None:
+        if yt_id == self._downloading or yt_id in self._downloaded:
             return
-        for path in ([p for p in PATH_TEMP.iterdir() if p.is_file()]
-                     + [f.full_path for f in self._files]):
-            if self._id_from_filename(path) == video_id:
-                return
-        self._to_download += [video_id]
-        self._update_status(video_id, 'queued')
+        self._to_download += [yt_id]
+        if not self._downloading:
+            self.q.put('download_next')
+
+    def _forget_file(self, yt_id: YoutubeId) -> None:
+        if yt_id in self._downloaded:
+            self._downloaded.remove(yt_id)
+
+    def _savefile(self, arg: str) -> YoutubeId:
+        filename = Path(arg)
+        yt_id = self._id_from_filename(filename)
+        file = VideoFile(digest=None,
+                         rel_path=filename,
+                         yt_id=yt_id,
+                         tags_str=VideoFile.tags_default.joined)
+        with DbConn() as conn:
+            file.save(conn)
+            conn.commit()
+        self._downloaded += [yt_id]
+        self._downloading = None
+        return yt_id
 
     def _download_next(self) -> None:
-        if self._to_download:
-            downloaded_before: int = 0
-            sizes: dict[str, list[int]] = {}
-
-            def hook(d) -> None:
-                nonlocal downloaded_before
-                if d['status'] in {'downloading', 'finished'}:
-                    downloaded_i = d[TOK_LOADED]
-                    downloaded_mb = (downloaded_i + downloaded_before) / MEGA
-                    size = sizes[d['info_dict'][TOK_FO_ID]]
-                    if (not size[0]) and TOK_FRAG_C in d and TOK_FRAG_I in d:
-                        progress = min(d[TOK_FRAG_I] + 1, d[TOK_FRAG_C]
-                                       ) / d[TOK_FRAG_C]
-                        size[1] = d[TOK_LOADED] / progress
-                    guess_total_mb = sum(t[1] for t in sizes.values()) / MEGA
-                    msg = f'{int(100 * downloaded_mb/guess_total_mb)}%: '\
-                          f'{downloaded_mb:5.1f}/{guess_total_mb:.1f}'
-                    self._update_status(video_id, f'downloading: {msg} MB')
-                    if d['status'] == 'finished':
-                        downloaded_before += downloaded_i
-
-            video_id = self._to_download.pop(0)
-            url = f'{YOUTUBE_URL_PREFIX}{video_id}'
-            with YoutubeDL(YT_DL_PARAMS | {'progress_hooks': [hook]}) as ydl:
-                self._update_status(video_id, 'preparing download')
-                try:
-                    info = ydl.sanitize_info(ydl.extract_info(url,
-                                                              download=False))
-                    key_formats = 'requested_formats'
-                    if key_formats not in info:
-                        raise YoutubeDLError(f'no "{key_formats}" in info')
-                    for f in info[key_formats]:
-                        sizes[f[TOK_FO_ID]] = [False, 1]
-                        if TOK_FS_AP in f:
-                            sizes[f[TOK_FO_ID]] = [True, f[TOK_FS_AP]]
-                    ydl.download(url)
-                except YoutubeDLError as e:
-                    self._update_status(video_id, 'ERROR')
-                    raise e
-            self._sync_db()
+        if not self._to_download:
+            return
+        downloaded_before: int = 0
+        sizes: dict[str, list[int]] = {}
+        self._downloading = yt_id = self._to_download.pop(0)
+        filename: Optional[str] = None
+
+        def hook(d) -> None:
+            nonlocal downloaded_before
+            nonlocal filename
+            if d['status'] in {'downloading', 'finished'}:
+                downloaded_i = d[TOK_LOADED]
+                downloaded_mb = (downloaded_i + downloaded_before) / MEGA
+                size = sizes[d['info_dict'][TOK_FO_ID]]
+                if (not size[0]) and TOK_FRAG_C in d and TOK_FRAG_I in d:
+                    progress = min(d[TOK_FRAG_I] + 1, d[TOK_FRAG_C]
+                                   ) / d[TOK_FRAG_C]
+                    size[1] = d[TOK_LOADED] / progress
+                guess_total_mb = sum(t[1] for t in sizes.values()) / MEGA
+                msg = f'{int(100 * downloaded_mb/guess_total_mb)}%: '\
+                      f'{downloaded_mb:5.1f}/{guess_total_mb:.1f}'
+                self._status = f'downloading: {msg} MB'
+                self._update_timestamp(yt_id)
+                if d['status'] == 'finished':
+                    downloaded_before += downloaded_i
+                    filename = Path(d["info_dict"]["filename"]).name
+
+        url = f'{YOUTUBE_URL_PREFIX}{self._downloading}'
+        self._status = 'preparing download'
+        with YoutubeDL(YT_DL_PARAMS | {'progress_hooks': [hook]}) as ydl:
+            self._update_timestamp(yt_id)
+            try:
+                self._status = 'extracting download info'
+                info_dirty = ydl.extract_info(url, download=False)
+                info = ydl.sanitize_info(info_dirty)
+                key_formats = 'requested_formats'
+                if key_formats not in info:
+                    raise YoutubeDLError(f'no "{key_formats}" in info')
+                for f in info[key_formats]:
+                    sizes[f[TOK_FO_ID]] = [False, 1]
+                    if TOK_FS_AP in f:
+                        sizes[f[TOK_FO_ID]] = [True, f[TOK_FS_AP]]
+                ydl.download(url)
+            except YoutubeDLError as e:
+                self._update_timestamp(yt_id)
+                self._status = 'ERROR'
+                raise e
+        self.q.put(f'savefile_{filename}')
+        self.q.put('download_next')
 
     def start_thread(self) -> None:
-        """Keep iterating through download queue for new download tasks."""
-        def loop():
+        'Collect, enact commands sent through .q.'
+        def loop() -> None:
             while True:
-                sleep(0.5)
-                self._download_next()
+                command = self.q.get()
+                if command == 'download_next':
+                    Thread(target=self._download_next, daemon=False).start()
+                    continue
+                command, arg = command.split('_', maxsplit=1)
+                if command == 'savefile':
+                    yt_id = self._savefile(arg)
+                else:
+                    yt_id = arg
+                    if command == 'forget':
+                        self._forget_file(yt_id)
+                    elif command == 'queue':
+                        self._queue_download(yt_id)
+                self._update_timestamp(yt_id)
+
         Thread(target=loop, daemon=False).start()