SqlText = NewType('SqlText', str)
AmountDownloads = NewType('AmountDownloads', int)
PlayerUpdateId = NewType('PlayerUpdateId', str)
-DownloadsDb: TypeAlias = dict[VideoId, PathStr]
+DownloadsIndex: TypeAlias = dict[VideoId, PathStr]
TemplateContext: TypeAlias = dict[
str, None | bool | PlayerUpdateId | Optional[PathStr] | VideoId
| QueryText | QuotaCost | 'VideoData' | list['VideoData']
);
'''
-to_download: list[VideoId] = []
-
class DatabaseConnection:
"""Wrapped sqlite3.Connection."""
self._mpv.playlist_play_index(max_idx)
-class PlayerServer(HTTPServer):
- """Extension of HTTPServer providing for .player inclusion."""
+class DownloadsDb:
+ """Collections downloading-related stuff."""
+
+ def __init__(self) -> None:
+ self._to_download: list[VideoId] = []
- def __init__(self, *args, **kwargs) -> None:
+ @staticmethod
+ def _id_from_filename(path: PathStr,
+ double_split: bool = False
+ ) -> VideoId:
+ before_ext = splitext(path)[0]
+ if double_split:
+ before_ext = splitext(before_ext)[0]
+ return VideoId(before_ext.split('[')[-1].split(']')[0])
+
+ @property
+ def ids_to_paths(self) -> DownloadsIndex:
+ """Return mapping of VideoIds to paths of files downloaded to them."""
+ ids_to_paths = {}
+ for path in [PathStr(e.path) for e
+ in scandir(PATH_DIR_DOWNLOADS) if isfile(e.path)]:
+ ids_to_paths[self._id_from_filename(path)] = PathStr(path)
+ return ids_to_paths
+
+ @property
+ def ids_unfinished(self) -> set[VideoId]:
+ """Return set of IDs of videos awaiting or currently in download."""
+ in_temp_dir = []
+ for path in [PathStr(e.path) for e
+ in scandir(PATH_DIR_TEMP) if isfile(e.path)]:
+ in_temp_dir += [self._id_from_filename(path)]
+ return set(self._to_download + in_temp_dir)
+
+ def clean_unfinished(self) -> None:
+ """Empty temp directory of unfinished downloads."""
+ for e in [e for e in scandir(PATH_DIR_TEMP) if isfile(e.path)]:
+ print(f'removing unfinished download: {e.path}')
+ os_remove(e.path)
+
+ def queue_download(self, video_id: VideoId) -> None:
+ """Add video_id to download queue *if* not already processed."""
+ pre_existing = self.ids_unfinished | set(self._to_download
+ + list(self.ids_to_paths))
+ if video_id not in pre_existing:
+ self._to_download += [video_id]
+
+ def _download_next(self) -> None:
+ if self._to_download:
+ video_id = self._to_download.pop(0)
+ with YoutubeDL(YT_DL_PARAMS) as ydl:
+ ydl.download([f'{YOUTUBE_URL_PREFIX}{video_id}'])
+
+ def download_loop(self) -> None:
+ """Keep iterating through download queue for new download tasks."""
+ while True:
+ sleep(0.5)
+ self._download_next()
+
+
+class Server(HTTPServer):
+ """Extension of HTTPServer providing for Player and DownloadsDb."""
+
+ def __init__(self,
+ player: Player,
+ downloads_db: DownloadsDb,
+ *args, **kwargs
+ ) -> None:
super().__init__(*args, **kwargs)
- self.player = Player()
+ self.player = player
+ self.downloads = downloads_db
def ensure_expected_dirs_and_files() -> None:
raise Exception(msg)
-def clean_unfinished_downloads() -> None:
- """Empty temp directory of unfinished downloads."""
- for e in [e for e in scandir(PATH_DIR_TEMP) if isfile(e.path)]:
- print(f'removing unfinished download: {e.path}')
- os_remove(e.path)
-
-
-def run_server() -> None:
- """Run PlayerServer on TaskHandler, handle KeyboardInterrupt as exit."""
- server = PlayerServer(('localhost', HTTP_PORT), TaskHandler)
- print(f'running at port {HTTP_PORT}')
- try:
- server.serve_forever()
- except KeyboardInterrupt:
- print('aborted due to keyboard interrupt; '
- 'repeat to end download thread too')
- server.server_close()
-
-
-def download_thread() -> None:
- """Keep iterating through to_download for IDs, download their videos."""
- while True:
- sleep(0.5)
- try:
- video_id: VideoId = to_download.pop(0)
- except IndexError:
- continue
- with YoutubeDL(YT_DL_PARAMS) as ydl:
- ydl.download([f'{YOUTUBE_URL_PREFIX}{video_id}'])
-
-
class TaskHandler(BaseHTTPRequestHandler):
"""Handler for GET and POST requests to our server."""
- server: PlayerServer
+ server: Server
def _send_http(self,
content: bytes = b'',
html = tmpl.render(**tmpl_ctx)
self._send_http(bytes(html, 'utf8'))
- def _make_downloads_db(self) -> tuple[DownloadsDb, list[VideoId]]:
-
- def id_from_filename(path: PathStr, double_split: bool = False
- ) -> VideoId:
- before_ext = splitext(path)[0]
- if double_split:
- before_ext = splitext(before_ext)[0]
- return VideoId(before_ext.split('[')[-1].split(']')[0])
-
- downloads_db = {}
- for path in [PathStr(e.path) for e
- in scandir(PATH_DIR_DOWNLOADS) if isfile(e.path)]:
- downloads_db[id_from_filename(path)] = PathStr(path)
- unfinished = []
- for path in [PathStr(e.path) for e
- in scandir(PATH_DIR_TEMP) if isfile(e.path)]:
- unfinished += [id_from_filename(path)]
- return downloads_db, unfinished
-
def _send_thumbnail(self, filename: PathStr) -> None:
path_thumbnail = path_join(PATH_DIR_THUMBNAILS, filename)
if not path_exists(path_thumbnail):
self._send_http(img, [('Content-type', 'image/jpg')])
def _send_or_download_video(self, video_id: VideoId) -> None:
- downloads_db, unfinished = self._make_downloads_db()
- if video_id in downloads_db:
- with open(downloads_db[video_id], 'rb') as video_file:
+ if video_id in self.server.downloads.ids_to_paths:
+ with open(self.server.downloads.ids_to_paths[video_id],
+ 'rb') as video_file:
video = video_file.read()
self._send_http(content=video)
return
- if video_id not in to_download + unfinished:
- to_download.append(video_id)
+ self.server.downloads.queue_download(video_id)
self._send_http(headers=[('Location', f'/video_about/{video_id}')],
code=302)
except NotFoundException:
video_data = VideoData(video_id)
conn.commit_close()
- downloads_db, unfinished = self._make_downloads_db()
self._send_rendered_template(
NAME_TEMPLATE_VIDEO_ABOUT,
{'video_data': video_data,
- 'is_temp': video_id in to_download + unfinished,
- 'file_path': downloads_db.get(video_id, None),
+ 'is_temp': video_id in self.server.downloads.ids_unfinished,
+ 'file_path': self.server.downloads.ids_to_paths.get(video_id,
+ None),
'youtube_prefix': YOUTUBE_URL_PREFIX,
'queries': linked_queries})
def _send_videos_index(self) -> None:
videos = [(id_, PathStr(basename(path)))
- for id_, path in self._make_downloads_db()[0].items()]
+ for id_, path in self.server.downloads.ids_to_paths.items()]
videos.sort(key=lambda t: t[1])
self._send_rendered_template(NAME_TEMPLATE_VIDEOS, {'videos': videos})
'tuples': tuples})
-if __name__ == '__main__':
+def run():
+ """Create DownloadsDb, Player, run server loop."""
+ downloads_db = DownloadsDb()
+ downloads_db.clean_unfinished()
ensure_expected_dirs_and_files()
- clean_unfinished_downloads()
- Thread(target=download_thread, daemon=False).start()
- run_server()
+ Thread(target=downloads_db.download_loop, daemon=False).start()
+ server = Server(Player(), downloads_db, ('localhost', HTTP_PORT),
+ TaskHandler)
+ print(f'running at port {HTTP_PORT}')
+ try:
+ server.serve_forever()
+ except KeyboardInterrupt:
+ print('aborted due to keyboard interrupt; '
+ 'repeat to end download thread too')
+ server.server_close()
+
+
+if __name__ == '__main__':
+ run()