+ @staticmethod
+ def _id_from_filename(path: PathStr,
+ double_split: bool = False
+ ) -> VideoId:
+ before_ext = splitext(path)[0]
+ if double_split:
+ before_ext = splitext(before_ext)[0]
+ return VideoId(before_ext.split('[')[-1].split(']')[0])
+
+ @property
+ def ids_to_paths(self) -> DownloadsIndex:
+ """Return mapping of VideoIds to paths of files downloaded to them."""
+ ids_to_paths = {}
+ for path in [PathStr(e.path) for e
+ in scandir(PATH_DIR_DOWNLOADS) if isfile(e.path)]:
+ ids_to_paths[self._id_from_filename(path)] = PathStr(path)
+ return ids_to_paths
+
+ @property
+ def ids_unfinished(self) -> set[VideoId]:
+ """Return set of IDs of videos awaiting or currently in download."""
+ in_temp_dir = []
+ for path in [PathStr(e.path) for e
+ in scandir(PATH_DIR_TEMP) if isfile(e.path)]:
+ in_temp_dir += [self._id_from_filename(path)]
+ return set(self._to_download + in_temp_dir)
+
+ def clean_unfinished(self) -> None:
+ """Empty temp directory of unfinished downloads."""
+ for e in [e for e in scandir(PATH_DIR_TEMP) if isfile(e.path)]:
+ print(f'removing unfinished download: {e.path}')
+ os_remove(e.path)
+
+ def queue_download(self, video_id: VideoId) -> None:
+ """Add video_id to download queue *if* not already processed."""
+ pre_existing = self.ids_unfinished | set(self._to_download
+ + list(self.ids_to_paths))
+ if video_id not in pre_existing:
+ self._to_download += [video_id]
+
+ def _download_next(self) -> None:
+ if self._to_download:
+ video_id = self._to_download.pop(0)
+ with YoutubeDL(YT_DL_PARAMS) as ydl:
+ ydl.download([f'{YOUTUBE_URL_PREFIX}{video_id}'])
+
+ def download_loop(self) -> None:
+ """Keep iterating through download queue for new download tasks."""
+ while True:
+ sleep(0.5)
+ self._download_next()
+
+
+class Server(HTTPServer):
+ """Extension of HTTPServer providing for Player and DownloadsDb."""
+
+ def __init__(self,
+ player: Player,
+ downloads_db: DownloadsDb,
+ *args, **kwargs
+ ) -> None: