From: Christian Heller Date: Thu, 21 Nov 2024 02:08:15 +0000 (+0100) Subject: Modularize. X-Git-Url: https://plomlompom.com/repos/%7B%7B%20web_path%20%7D%7D/static/%7B%7Bprefix%7D%7D/task?a=commitdiff_plain;h=55b9a2714239dbf10d45ff1972ac1cd8e5916c6f;p=ytplom Modularize. --- diff --git a/ytplom.py b/ytplom.py index 1d94ccc..9a204f6 100755 --- a/ytplom.py +++ b/ytplom.py @@ -1,765 +1,10 @@ #!/usr/bin/env python3 """Minimalistic download-focused YouTube interface.""" - -# included libs -from typing import TypeAlias, Optional, NewType, Callable, Self, Any -from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove -from os.path import (isdir, isfile, exists as path_exists, join as path_join, - splitext, basename) -from random import shuffle -from time import time, sleep -from datetime import datetime, timedelta -from json import dumps as json_dumps -from uuid import uuid4 from threading import Thread -from sqlite3 import connect as sql_connect, Cursor, Row -from http.server import HTTPServer, BaseHTTPRequestHandler -from urllib.parse import urlparse, parse_qs -from urllib.request import urlretrieve -from urllib.error import HTTPError -# non-included libs -from jinja2 import Template -from mpv import MPV # type: ignore -from yt_dlp import YoutubeDL # type: ignore -import googleapiclient.discovery # type: ignore - -# what we might want to manually define per environs -API_KEY = environ.get('GOOGLE_API_KEY') -HTTP_PORT = int(environ.get('YTPLOM_PORT', 8084)) - -# type definitions for mypy -DatetimeStr = NewType('DatetimeStr', str) -QuotaCost = NewType('QuotaCost', int) -YoutubeId = NewType('YoutubeId', str) -PathStr = NewType('PathStr', str) -QueryId = NewType('QueryId', str) -QueryText = NewType('QueryText', str) -ProseText = NewType('ProseText', str) -SqlText = NewType('SqlText', str) -AmountDownloads = NewType('AmountDownloads', int) -PlayerUpdateId = NewType('PlayerUpdateId', str) -DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr] -TemplateContext: TypeAlias = dict[ - str, None | bool | PlayerUpdateId | Optional[PathStr] | YoutubeId - | QueryText | QuotaCost | 'YoutubeVideo' | list['YoutubeVideo'] - | list['YoutubeQuery'] | list[tuple[YoutubeId, PathStr]] - | list[tuple[PathStr, PathStr]]] - -# local data reasonably expected to be in user home directory -PATH_HOME = PathStr(environ.get('HOME', '')) -PATH_WORKDIR = PathStr(path_join(PATH_HOME, 'ytplom')) -PATH_THUMBNAILS = PathStr(path_join(PATH_WORKDIR, 'thumbnails')) -PATH_DB = PathStr(path_join(PATH_WORKDIR, 'db.sql')) -PATH_DOWNLOADS = PathStr(path_join(PATH_WORKDIR, 'downloads')) -PATH_TEMP = PathStr(path_join(PATH_WORKDIR, 'temp')) - -# template paths; might move outside PATH_WORKDIR in the future -PATH_TEMPLATES = PathStr(path_join(PATH_WORKDIR, 'templates')) -NAME_TEMPLATE_QUERIES = PathStr('queries.tmpl') -NAME_TEMPLATE_RESULTS = PathStr('results.tmpl') -NAME_TEMPLATE_VIDEOS = PathStr('videos.tmpl') -NAME_TEMPLATE_VIDEO_ABOUT = PathStr('video_about.tmpl') -NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl') -PATH_TEMPLATE_QUERIES = PathStr(path_join(PATH_TEMPLATES, - NAME_TEMPLATE_QUERIES)) - -# yt_dlp config -YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\ - '/best[height<=1080][width<=1920]' -YT_DL_PARAMS = {'paths': {'home': PATH_DOWNLOADS, - 'temp': PATH_TEMP}, - 'format': YT_DOWNLOAD_FORMAT} - -# Youtube API expectations -YOUTUBE_URL_PREFIX = PathStr('https://www.youtube.com/watch?v=') -THUMBNAIL_URL_PREFIX = PathStr('https://i.ytimg.com/vi/') -THUMBNAIL_URL_SUFFIX = PathStr('/default.jpg') -QUOTA_COST_YOUTUBE_SEARCH = QuotaCost(100) -QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1) - -# local expectations -TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f' -LEGAL_EXTENSIONS = {'webm', 'mp4', 'mkv'} - -# tables to create database with -SCRIPT_INIT_DB = ''' -CREATE TABLE yt_queries ( - id TEXT PRIMARY KEY, - text TEXT NOT NULL, - retrieved_at TEXT NOT NULL -); -CREATE TABLE yt_videos ( - id TEXT PRIMARY KEY, - title TEXT NOT NULL, - description TEXT NOT NULL, - published_at TEXT NOT NULL, - duration TEXT NOT NULL, - definition TEXT NOT NULL -); -CREATE TABLE yt_query_results ( - query_id TEXT NOT NULL, - video_id TEXT NOT NULL, - PRIMARY KEY (query_id, video_id), - FOREIGN KEY (query_id) REFERENCES yt_queries(id), - FOREIGN KEY (video_id) REFERENCES yt_videos(id) -); -CREATE TABLE quota_costs ( - id TEXT PRIMARY KEY, - timestamp TEXT NOT NULL, - cost INT NOT NULL -); -CREATE TABLE files ( - rel_path TEXT PRIMARY KEY, - yt_id TEXT NOT NULL DEFAULT "", - FOREIGN KEY (yt_id) REFERENCES yt_videos(id) -); -''' - - -class NotFoundException(BaseException): - """Call on DB fetches finding less than expected.""" - - -def _ensure_expected_dirs(expected_dirs: list[PathStr]) -> None: - """Ensure existance of expected_dirs _as_ directories.""" - for dir_name in expected_dirs: - if not path_exists(dir_name): - print(f'creating expected directory: {dir_name}') - makedirs(dir_name) - elif not isdir(dir_name): - msg = f'at expected directory path {dir_name} found non-directory' - raise Exception(msg) - - -class DatabaseConnection: - """Wrapped sqlite3.Connection.""" - - def __init__(self) -> None: - if not path_exists(PATH_DB): - with sql_connect(PATH_DB) as conn: - conn.executescript(SCRIPT_INIT_DB) - self._conn = sql_connect(PATH_DB) - - def exec(self, sql: SqlText, inputs: tuple[Any, ...] = tuple()) -> Cursor: - """Wrapper around sqlite3.Connection.execute.""" - return self._conn.execute(sql, inputs) - - def commit_close(self) -> None: - """Run sqlite3.Connection.commit and .close.""" - self._conn.commit() - self._conn.close() - - -class DbData: - """Abstraction of common DB operation.""" - _table_name: str - _cols: tuple[str, ...] - - @classmethod - def _from_table_row(cls, row: Row) -> Self: - kwargs = {} - for i, col_name in enumerate(cls._cols): - kwargs[col_name] = row[i] - return cls(**kwargs) - - @classmethod - def get_one(cls, conn: DatabaseConnection, id_: str) -> Self: - """Return single entry of id_ from DB.""" - sql = SqlText(f'SELECT * FROM {cls._table_name} WHERE id = ?') - row = conn.exec(sql, (id_,)).fetchone() - if not row: - raise NotFoundException - return cls._from_table_row(row) - - @classmethod - def get_all(cls, conn: DatabaseConnection) -> list[Self]: - """Return all entries from DB.""" - sql = SqlText(f'SELECT * FROM {cls._table_name}') - rows = conn.exec(sql).fetchall() - return [cls._from_table_row(row) for row in rows] - - def save(self, conn: DatabaseConnection) -> Cursor: - """Save entry to DB.""" - vals = [getattr(self, col_name) for col_name in self._cols] - q_marks = '(' + ','.join(['?'] * len(vals)) + ')' - sql = SqlText(f'REPLACE INTO {self._table_name} VALUES {q_marks}') - return conn.exec(sql, tuple(vals)) - - -class YoutubeQuery(DbData): - """Representation of YouTube query (without results).""" - _table_name = 'yt_queries' - _cols = ('id_', 'text', 'retrieved_at') - - def __init__(self, - id_: Optional[QueryId], - text: QueryText, - retrieved_at: DatetimeStr - ) -> None: - self.id_ = id_ if id_ else QueryId(str(uuid4())) - self.text = QueryText(text) - self.retrieved_at = retrieved_at - - @classmethod - def get_all_for_video(cls, - conn: DatabaseConnection, - video_id: YoutubeId - ) -> list[Self]: - """Return YoutubeQueries containing YoutubeVideo's ID in results.""" - sql = SqlText('SELECT query_id FROM ' - 'yt_query_results WHERE video_id = ?') - query_ids = conn.exec(sql, (video_id,)).fetchall() - return [cls.get_one(conn, query_id_tup[0]) - for query_id_tup in query_ids] - - -class YoutubeVideo(DbData): - """Representation of YouTube video metadata as provided by their API.""" - _table_name = 'yt_videos' - _cols = ('id_', 'title', 'description', 'published_at', 'duration', - 'definition') - - def __init__(self, - id_: YoutubeId, - title: ProseText = ProseText('?'), - description: ProseText = ProseText('?'), - published_at: DatetimeStr = DatetimeStr('?'), - duration: str = '?', - definition: str = '?' - ) -> None: - self.id_ = id_ - self.title = title - self.description = description - self.published_at = published_at - self.duration = duration - self.definition = definition - - def set_duration_from_yt_string(self, yt_string) -> None: - """Set .duration from the kind of format the YouTube API provides.""" - date_dur, time_dur = yt_string.split('T') - seconds = 0 - date_dur = date_dur[1:] - for dur_char, len_seconds in (('Y', 60*60*24*365.25), - ('M', 60*60*24*30), - ('D', 60*60*24)): - if dur_char in date_dur: - dur_str, date_dur = date_dur.split(dur_char) - seconds += int(dur_str) * int(len_seconds) - for dur_char, len_seconds in (('H', 60*60), - ('M', 60), - ('S', 1)): - if dur_char in time_dur: - dur_str, time_dur = time_dur.split(dur_char) - seconds += int(dur_str) * len_seconds - seconds_str = str(seconds % 60) - minutes_str = str(seconds // 60) - hours_str = str(seconds // (60 * 60)) - self.duration = ':'.join([f'0{s}' if len(s) == 1 else s for s - in (hours_str, minutes_str, seconds_str)]) - - @classmethod - def get_all_for_query(cls, - conn: DatabaseConnection, - query_id: QueryId - ) -> list[Self]: - """Return all videos for query of query_id.""" - sql = SqlText('SELECT video_id ' - 'FROM yt_query_results WHERE query_id = ?') - video_ids = conn.exec(sql, (query_id,)).fetchall() - return [cls.get_one(conn, video_id_tup[0]) - for video_id_tup in video_ids] - - def save_to_query(self, - conn: DatabaseConnection, - query_id: QueryId - ) -> None: - """Save inclusion of self in results to query of query_id.""" - conn.exec(SqlText('REPLACE INTO yt_query_results VALUES (?, ?)'), - (query_id, self.id_)) - - -class VideoFile(DbData): - """Collects data about downloaded files.""" - _table_name = 'files' - _cols = ('rel_path', 'yt_id') - - def __init__(self, rel_path: PathStr, yt_id: YoutubeId) -> None: - self.rel_path = rel_path - self.yt_id = yt_id - - def remove(self, conn: DatabaseConnection) -> None: - """Remove self from database by self.rel_path as identifier.""" - sql = SqlText(f'DELETE FROM {self._table_name} WHERE rel_path = ?') - conn.exec(SqlText(sql), (self.rel_path,)) - - -class QuotaLog(DbData): - """Collects API access quota costs.""" - _table_name = 'quota_costs' - _cols = ('id_', 'timestamp', 'cost') - - def __init__(self, - id_: Optional[str], - timestamp: DatetimeStr, - cost: QuotaCost - ) -> None: - self.id_ = id_ if id_ else str(uuid4()) - self.timestamp = timestamp - self.cost = cost - - @classmethod - def update(cls, conn: DatabaseConnection, cost: QuotaCost) -> None: - """Adds cost mapped to current datetime.""" - cls._remove_old(conn) - new = cls(None, - DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)), - QuotaCost(cost)) - new.save(conn) - - @classmethod - def current(cls, conn: DatabaseConnection) -> QuotaCost: - """Returns quota cost total for last 24 hours, purges old data.""" - cls._remove_old(conn) - quota_costs = cls.get_all(conn) - return QuotaCost(sum(c.cost for c in quota_costs)) - - @classmethod - def _remove_old(cls, conn: DatabaseConnection) -> None: - cutoff = datetime.now() - timedelta(days=1) - sql = SqlText(f'DELETE FROM {cls._table_name} WHERE timestamp < ?') - conn.exec(SqlText(sql), (cutoff.strftime(TIMESTAMP_FMT),)) - - -class Player: - """MPV representation with some additional features.""" - - def __init__(self) -> None: - self.last_update = PlayerUpdateId('') - self._filenames = [PathStr(e.path) for e in scandir(PATH_DOWNLOADS) - if isfile(e.path) - and splitext(e.path)[1][1:] in LEGAL_EXTENSIONS] - shuffle(self._filenames) - self._idx: int = 0 - self._mpv: Optional[MPV] = None - - @property - def _mpv_available(self) -> bool: - return bool(self._mpv and not self._mpv.core_shutdown) - - @staticmethod - def _if_mpv_available(f) -> Callable: - def wrapper(self): - return f(self) if self._mpv else None - return wrapper - - def _signal_update(self) -> None: - self.last_update = PlayerUpdateId(f'{self._idx}:{time()}') +from ytplom.misc import DownloadsDb, HTTP_PORT, Player, Server, TaskHandler - def _start_mpv(self) -> None: - self._mpv = MPV(input_default_bindings=True, - input_vo_keyboard=True, - config=True) - self._mpv.observe_property('pause', lambda a, b: self._signal_update()) - @self._mpv.event_callback('start-file') - def on_start_file(_) -> None: - assert self._mpv is not None - self._mpv.pause = False - self._idx = self._mpv.playlist_pos - self._signal_update() - - @self._mpv.event_callback('shutdown') - def on_shutdown(_) -> None: - self._mpv = None - self._signal_update() - - for path in self._filenames: - self._mpv.playlist_append(path) - self._mpv.playlist_play_index(self._idx) - - @property - def current_filename(self) -> Optional[PathStr]: - """Return what we assume is the name of the currently playing file.""" - if not self._filenames: - return None - return PathStr(basename(self._filenames[self._idx])) - - @property - def prev_files(self) -> list[PathStr]: - """List 'past' files of playlist.""" - return list(reversed(self._filenames[:self._idx])) - - @property - def next_files(self) -> list[PathStr]: - """List 'coming' files of playlist.""" - return self._filenames[self._idx + 1:] - - @property - def is_running(self) -> bool: - """Return if player is running/available.""" - return self._mpv_available - - @property - def is_paused(self) -> bool: - """Return if player is paused.""" - if self._mpv_available: - assert self._mpv is not None - return self._mpv.pause - return False - - def toggle_run(self) -> None: - """Toggle player running.""" - if self._mpv_available: - assert self._mpv is not None - self._mpv.terminate() - self._mpv = None - else: - self._start_mpv() - self._signal_update() - - @_if_mpv_available - def toggle_pause(self) -> None: - """Toggle player pausing.""" - assert self._mpv is not None - self._mpv.pause = not self._mpv.pause - self._signal_update() - - @_if_mpv_available - def prev(self) -> None: - """Move player to previous item in playlist.""" - assert self._mpv is not None - if self._mpv.playlist_pos > 0: - self._mpv.playlist_prev() - else: - self._mpv.playlist_play_index(0) - - @_if_mpv_available - def next(self) -> None: - """Move player to next item in playlist.""" - assert self._mpv is not None - max_idx: int = len(self._mpv.playlist_filenames) - 1 - if self._mpv.playlist_pos < len(self._mpv.playlist_filenames) - 1: - self._mpv.playlist_next() - else: - self._mpv.playlist_play_index(max_idx) - - -class DownloadsDb: - """Collections downloading-related stuff.""" - - def __init__(self) -> None: - self._to_download: list[YoutubeId] = [] - _ensure_expected_dirs([PATH_DOWNLOADS, PATH_TEMP]) - self._sync_db() - - def _sync_db(self): - conn = DatabaseConnection() - files_via_db = VideoFile.get_all(conn) - old_cwd = getcwd() - chdir(PATH_DOWNLOADS) - for file in files_via_db: - if not isfile(path_join(file.rel_path)): - print(f'SYNC: no file {file.rel_path} found, removing entry.') - file.remove(conn) - paths = [file.rel_path for file in files_via_db] - for path in [PathStr(e.path) for e in scandir() if isfile(e.path)]: - if path not in paths: - yt_id = self._id_from_filename(path) - file = VideoFile(path, yt_id) - print(f'SYNC: new file {path}, saving with YT ID "{yt_id}".') - file.save(conn) - chdir(old_cwd) - self._files = VideoFile.get_all(conn) - conn.commit_close() - - @staticmethod - def _id_from_filename(path: PathStr, - double_split: bool = False - ) -> YoutubeId: - before_ext = splitext(path)[0] - if double_split: - before_ext = splitext(before_ext)[0] - return YoutubeId(before_ext.split('[')[-1].split(']')[0]) - - @property - def ids_to_paths(self) -> DownloadsIndex: - """Return mapping YoutubeIds:paths of files downloaded to them.""" - self._sync_db() - return {f.yt_id: PathStr(path_join(PATH_DOWNLOADS, f.rel_path)) - for f in self._files} - - @property - def ids_unfinished(self) -> set[YoutubeId]: - """Return set of IDs of videos awaiting or currently in download.""" - in_temp_dir = [] - for path in [PathStr(e.path) for e - in scandir(PATH_TEMP) if isfile(e.path)]: - in_temp_dir += [self._id_from_filename(path)] - return set(self._to_download + in_temp_dir) - - def clean_unfinished(self) -> None: - """Empty temp directory of unfinished downloads.""" - for e in [e for e in scandir(PATH_TEMP) if isfile(e.path)]: - print(f'removing unfinished download: {e.path}') - os_remove(e.path) - - def queue_download(self, video_id: YoutubeId) -> None: - """Add video_id to download queue *if* not already processed.""" - pre_existing = self.ids_unfinished | set(self._to_download - + list(self.ids_to_paths)) - if video_id not in pre_existing: - self._to_download += [video_id] - - def _download_next(self) -> None: - if self._to_download: - video_id = self._to_download.pop(0) - with YoutubeDL(YT_DL_PARAMS) as ydl: - ydl.download([f'{YOUTUBE_URL_PREFIX}{video_id}']) - - def download_loop(self) -> None: - """Keep iterating through download queue for new download tasks.""" - while True: - sleep(0.5) - self._download_next() - - -class Server(HTTPServer): - """Extension of HTTPServer providing for Player and DownloadsDb.""" - - def __init__(self, - player: Player, - downloads_db: DownloadsDb, - *args, **kwargs - ) -> None: - super().__init__(*args, **kwargs) - self.player = player - self.downloads = downloads_db - - -class TaskHandler(BaseHTTPRequestHandler): - """Handler for GET and POST requests to our server.""" - server: Server - - def _send_http(self, - content: bytes = b'', - headers: Optional[list[tuple[str, str]]] = None, - code: int = 200 - ) -> None: - headers = headers if headers else [] - self.send_response(code) - for header_tuple in headers: - self.send_header(header_tuple[0], header_tuple[1]) - self.end_headers() - if content: - self.wfile.write(content) - - def do_POST(self) -> None: # pylint:disable=invalid-name - """Map POST requests to handlers for various paths.""" - url = urlparse(self.path) - toks_url: list[str] = url.path.split('/') - page_name = toks_url[1] - body_length = int(self.headers['content-length']) - postvars = parse_qs(self.rfile.read(body_length).decode()) - if 'playlist' == page_name: - self._post_player_command(list(postvars.keys())[0]) - elif 'queries' == page_name: - self._post_query(QueryText(postvars['query'][0])) - - def _post_player_command(self, commands: str) -> None: - if 'pause' in commands: - self.server.player.toggle_pause() - elif 'prev' in commands: - self.server.player.prev() - elif 'next' in commands: - self.server.player.next() - elif 'stop' in commands: - self.server.player.toggle_run() - sleep(0.5) # avoid reload happening before current_file update - self._send_http(headers=[('Location', '/')], code=302) - - def _post_query(self, query_txt: QueryText) -> None: - conn = DatabaseConnection() - - def collect_results(query_txt: QueryText) -> list[YoutubeVideo]: - youtube = googleapiclient.discovery.build('youtube', 'v3', - developerKey=API_KEY) - QuotaLog.update(conn, QUOTA_COST_YOUTUBE_SEARCH) - search_request = youtube.search().list( - q=query_txt, - part='snippet', - maxResults=25, - safeSearch='none', - type='video') - results: list[YoutubeVideo] = [] - ids_to_detail: list[YoutubeId] = [] - for item in search_request.execute()['items']: - video_id: YoutubeId = item['id']['videoId'] - ids_to_detail += [video_id] - snippet = item['snippet'] - urlretrieve(snippet['thumbnails']['default']['url'], - path_join(PATH_THUMBNAILS, f'{video_id}.jpg')) - results += [YoutubeVideo(id_=video_id, - title=snippet['title'], - description=snippet['description'], - published_at=snippet['publishedAt'])] - QuotaLog.update(conn, QUOTA_COST_YOUTUBE_DETAILS) - ids_for_details = ','.join([r.id_ for r in results]) - videos_request = youtube.videos().list(id=ids_for_details, - part='content_details') - for i, detailed in enumerate(videos_request.execute()['items']): - result = results[i] - assert result.id_ == detailed['id'] - content_details: dict[str, str] = detailed['contentDetails'] - result.set_duration_from_yt_string(content_details['duration']) - result.definition = content_details['definition'].upper() - return results - - query_data = YoutubeQuery( - None, query_txt, - DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT))) - query_data.save(conn) - for result in collect_results(query_txt): - result.save(conn) - result.save_to_query(conn, query_data.id_) - conn.commit_close() - self._send_http(headers=[('Location', f'/query/{query_data.id_}')], - code=302) - - def do_GET(self) -> None: # pylint:disable=invalid-name - """Map GET requests to handlers for various paths.""" - url = urlparse(self.path) - toks_url: list[str] = url.path.split('/') - page_name = toks_url[1] - try: - if 'thumbnails' == page_name: - self._send_thumbnail(PathStr(toks_url[2])) - elif 'dl' == page_name: - self._send_or_download_video(YoutubeId(toks_url[2])) - elif 'videos' == page_name: - self._send_videos_index() - elif 'video_about' == page_name: - self._send_video_about(YoutubeId(toks_url[2])) - elif 'query' == page_name: - self._send_query_page(QueryId(toks_url[2])) - elif 'queries' == page_name: - self._send_queries_index_and_search() - elif '_last_playlist_update.json' == page_name: - self._send_last_playlist_update() - else: # e.g. for / - self._send_playlist() - except NotFoundException: - self._send_http(b'not found', code=404) - - def _send_rendered_template(self, - tmpl_name: PathStr, - tmpl_ctx: TemplateContext - ) -> None: - with open(path_join(PATH_TEMPLATES, tmpl_name), - 'r', encoding='utf8' - ) as templ_file: - tmpl = Template(str(templ_file.read())) - html = tmpl.render(**tmpl_ctx) - self._send_http(bytes(html, 'utf8')) - - def _send_thumbnail(self, filename: PathStr) -> None: - _ensure_expected_dirs([PATH_THUMBNAILS]) - path_thumbnail = path_join(PATH_THUMBNAILS, filename) - if not path_exists(path_thumbnail): - video_id = splitext(filename)[0] - url = f'{THUMBNAIL_URL_PREFIX}{video_id}{THUMBNAIL_URL_SUFFIX}' - try: - urlretrieve(url, path_join(PATH_THUMBNAILS, f'{video_id}.jpg')) - except HTTPError as e: - if 404 == e.code: - raise NotFoundException from e - raise e - with open(path_thumbnail, 'rb') as f: - img = f.read() - self._send_http(img, [('Content-type', 'image/jpg')]) - - def _send_or_download_video(self, video_id: YoutubeId) -> None: - if video_id in self.server.downloads.ids_to_paths: - with open(self.server.downloads.ids_to_paths[video_id], - 'rb') as video_file: - video = video_file.read() - self._send_http(content=video) - return - self.server.downloads.queue_download(video_id) - self._send_http(headers=[('Location', f'/video_about/{video_id}')], - code=302) - - def _send_query_page(self, query_id: QueryId) -> None: - conn = DatabaseConnection() - query = YoutubeQuery.get_one(conn, str(query_id)) - results = YoutubeVideo.get_all_for_query(conn, query_id) - conn.commit_close() - self._send_rendered_template( - NAME_TEMPLATE_RESULTS, - {'query': query.text, 'videos': results}) - - def _send_queries_index_and_search(self) -> None: - conn = DatabaseConnection() - quota_count = QuotaLog.current(conn) - queries_data = YoutubeQuery.get_all(conn) - conn.commit_close() - queries_data.sort(key=lambda q: q.retrieved_at, reverse=True) - self._send_rendered_template( - NAME_TEMPLATE_QUERIES, {'queries': queries_data, - 'quota_count': quota_count}) - - def _send_video_about(self, video_id: YoutubeId) -> None: - conn = DatabaseConnection() - linked_queries = YoutubeQuery.get_all_for_video(conn, video_id) - try: - video_data = YoutubeVideo.get_one(conn, video_id) - except NotFoundException: - video_data = YoutubeVideo(video_id) - conn.commit_close() - self._send_rendered_template( - NAME_TEMPLATE_VIDEO_ABOUT, - {'video_data': video_data, - 'is_temp': video_id in self.server.downloads.ids_unfinished, - 'file_path': self.server.downloads.ids_to_paths.get(video_id, - None), - 'youtube_prefix': YOUTUBE_URL_PREFIX, - 'queries': linked_queries}) - - def _send_videos_index(self) -> None: - videos = [(id_, PathStr(basename(path))) - for id_, path in self.server.downloads.ids_to_paths.items()] - videos.sort(key=lambda t: t[1]) - self._send_rendered_template(NAME_TEMPLATE_VIDEOS, {'videos': videos}) - - def _send_last_playlist_update(self) -> None: - payload: dict[str, PlayerUpdateId] = { - 'last_update': self.server.player.last_update} - self._send_http(bytes(json_dumps(payload), 'utf8'), - headers=[('Content-type', 'application/json')]) - - def _send_playlist(self) -> None: - tuples: list[tuple[PathStr, PathStr]] = [] - i: int = 0 - while True: - prev, next_ = PathStr(''), PathStr('') - if len(self.server.player.prev_files) > i: - prev = PathStr(basename(self.server.player.prev_files[i])) - if len(self.server.player.next_files) > i: - next_ = PathStr(basename(self.server.player.next_files[i])) - if not prev + next_: - break - tuples += [(prev, next_)] - i += 1 - self._send_rendered_template( - NAME_TEMPLATE_PLAYLIST, - {'last_update': self.server.player.last_update, - 'running': self.server.player.is_running, - 'paused': self.server.player.is_paused, - 'current_title': self.server.player.current_filename, - 'tuples': tuples}) - - -def run(): - """Create DownloadsDb, Player, run server loop.""" +if __name__ == '__main__': downloads_db = DownloadsDb() downloads_db.clean_unfinished() Thread(target=downloads_db.download_loop, daemon=False).start() @@ -772,7 +17,3 @@ def run(): print('aborted due to keyboard interrupt; ' 'repeat to end download thread too') server.server_close() - - -if __name__ == '__main__': - run() diff --git a/ytplom/__init__.py b/ytplom/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ytplom/misc.py b/ytplom/misc.py new file mode 100644 index 0000000..639acdf --- /dev/null +++ b/ytplom/misc.py @@ -0,0 +1,756 @@ +"""Main ytplom lib.""" + +# included libs +from typing import TypeAlias, Optional, NewType, Callable, Self, Any +from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove +from os.path import (isdir, isfile, exists as path_exists, join as path_join, + splitext, basename) +from random import shuffle +from time import time, sleep +from datetime import datetime, timedelta +from json import dumps as json_dumps +from uuid import uuid4 +from sqlite3 import connect as sql_connect, Cursor, Row +from http.server import HTTPServer, BaseHTTPRequestHandler +from urllib.parse import urlparse, parse_qs +from urllib.request import urlretrieve +from urllib.error import HTTPError +# non-included libs +from jinja2 import Template +from mpv import MPV # type: ignore +from yt_dlp import YoutubeDL # type: ignore +import googleapiclient.discovery # type: ignore + +# what we might want to manually define per environs +API_KEY = environ.get('GOOGLE_API_KEY') +HTTP_PORT = int(environ.get('YTPLOM_PORT', 8084)) + +# type definitions for mypy +DatetimeStr = NewType('DatetimeStr', str) +QuotaCost = NewType('QuotaCost', int) +YoutubeId = NewType('YoutubeId', str) +PathStr = NewType('PathStr', str) +QueryId = NewType('QueryId', str) +QueryText = NewType('QueryText', str) +ProseText = NewType('ProseText', str) +SqlText = NewType('SqlText', str) +AmountDownloads = NewType('AmountDownloads', int) +PlayerUpdateId = NewType('PlayerUpdateId', str) +DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr] +TemplateContext: TypeAlias = dict[ + str, None | bool | PlayerUpdateId | Optional[PathStr] | YoutubeId + | QueryText | QuotaCost | 'YoutubeVideo' | list['YoutubeVideo'] + | list['YoutubeQuery'] | list[tuple[YoutubeId, PathStr]] + | list[tuple[PathStr, PathStr]]] + +# local data reasonably expected to be in user home directory +PATH_HOME = PathStr(environ.get('HOME', '')) +PATH_WORKDIR = PathStr(path_join(PATH_HOME, 'ytplom')) +PATH_THUMBNAILS = PathStr(path_join(PATH_WORKDIR, 'thumbnails')) +PATH_DB = PathStr(path_join(PATH_WORKDIR, 'db.sql')) +PATH_DOWNLOADS = PathStr(path_join(PATH_WORKDIR, 'downloads')) +PATH_TEMP = PathStr(path_join(PATH_WORKDIR, 'temp')) + +# template paths; might move outside PATH_WORKDIR in the future +PATH_TEMPLATES = PathStr(path_join(PATH_WORKDIR, 'templates')) +NAME_TEMPLATE_QUERIES = PathStr('queries.tmpl') +NAME_TEMPLATE_RESULTS = PathStr('results.tmpl') +NAME_TEMPLATE_VIDEOS = PathStr('videos.tmpl') +NAME_TEMPLATE_VIDEO_ABOUT = PathStr('video_about.tmpl') +NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl') +PATH_TEMPLATE_QUERIES = PathStr(path_join(PATH_TEMPLATES, + NAME_TEMPLATE_QUERIES)) + +# yt_dlp config +YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\ + '/best[height<=1080][width<=1920]' +YT_DL_PARAMS = {'paths': {'home': PATH_DOWNLOADS, + 'temp': PATH_TEMP}, + 'format': YT_DOWNLOAD_FORMAT} + +# Youtube API expectations +YOUTUBE_URL_PREFIX = PathStr('https://www.youtube.com/watch?v=') +THUMBNAIL_URL_PREFIX = PathStr('https://i.ytimg.com/vi/') +THUMBNAIL_URL_SUFFIX = PathStr('/default.jpg') +QUOTA_COST_YOUTUBE_SEARCH = QuotaCost(100) +QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1) + +# local expectations +TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f' +LEGAL_EXTENSIONS = {'webm', 'mp4', 'mkv'} + +# tables to create database with +SCRIPT_INIT_DB = ''' +CREATE TABLE yt_queries ( + id TEXT PRIMARY KEY, + text TEXT NOT NULL, + retrieved_at TEXT NOT NULL +); +CREATE TABLE yt_videos ( + id TEXT PRIMARY KEY, + title TEXT NOT NULL, + description TEXT NOT NULL, + published_at TEXT NOT NULL, + duration TEXT NOT NULL, + definition TEXT NOT NULL +); +CREATE TABLE yt_query_results ( + query_id TEXT NOT NULL, + video_id TEXT NOT NULL, + PRIMARY KEY (query_id, video_id), + FOREIGN KEY (query_id) REFERENCES yt_queries(id), + FOREIGN KEY (video_id) REFERENCES yt_videos(id) +); +CREATE TABLE quota_costs ( + id TEXT PRIMARY KEY, + timestamp TEXT NOT NULL, + cost INT NOT NULL +); +CREATE TABLE files ( + rel_path TEXT PRIMARY KEY, + yt_id TEXT NOT NULL DEFAULT "", + FOREIGN KEY (yt_id) REFERENCES yt_videos(id) +); +''' + + +class NotFoundException(BaseException): + """Call on DB fetches finding less than expected.""" + + +def _ensure_expected_dirs(expected_dirs: list[PathStr]) -> None: + """Ensure existance of expected_dirs _as_ directories.""" + for dir_name in expected_dirs: + if not path_exists(dir_name): + print(f'creating expected directory: {dir_name}') + makedirs(dir_name) + elif not isdir(dir_name): + msg = f'at expected directory path {dir_name} found non-directory' + raise Exception(msg) + + +class DatabaseConnection: + """Wrapped sqlite3.Connection.""" + + def __init__(self) -> None: + if not path_exists(PATH_DB): + with sql_connect(PATH_DB) as conn: + conn.executescript(SCRIPT_INIT_DB) + self._conn = sql_connect(PATH_DB) + + def exec(self, sql: SqlText, inputs: tuple[Any, ...] = tuple()) -> Cursor: + """Wrapper around sqlite3.Connection.execute.""" + return self._conn.execute(sql, inputs) + + def commit_close(self) -> None: + """Run sqlite3.Connection.commit and .close.""" + self._conn.commit() + self._conn.close() + + +class DbData: + """Abstraction of common DB operation.""" + _table_name: str + _cols: tuple[str, ...] + + @classmethod + def _from_table_row(cls, row: Row) -> Self: + kwargs = {} + for i, col_name in enumerate(cls._cols): + kwargs[col_name] = row[i] + return cls(**kwargs) + + @classmethod + def get_one(cls, conn: DatabaseConnection, id_: str) -> Self: + """Return single entry of id_ from DB.""" + sql = SqlText(f'SELECT * FROM {cls._table_name} WHERE id = ?') + row = conn.exec(sql, (id_,)).fetchone() + if not row: + raise NotFoundException + return cls._from_table_row(row) + + @classmethod + def get_all(cls, conn: DatabaseConnection) -> list[Self]: + """Return all entries from DB.""" + sql = SqlText(f'SELECT * FROM {cls._table_name}') + rows = conn.exec(sql).fetchall() + return [cls._from_table_row(row) for row in rows] + + def save(self, conn: DatabaseConnection) -> Cursor: + """Save entry to DB.""" + vals = [getattr(self, col_name) for col_name in self._cols] + q_marks = '(' + ','.join(['?'] * len(vals)) + ')' + sql = SqlText(f'REPLACE INTO {self._table_name} VALUES {q_marks}') + return conn.exec(sql, tuple(vals)) + + +class YoutubeQuery(DbData): + """Representation of YouTube query (without results).""" + _table_name = 'yt_queries' + _cols = ('id_', 'text', 'retrieved_at') + + def __init__(self, + id_: Optional[QueryId], + text: QueryText, + retrieved_at: DatetimeStr + ) -> None: + self.id_ = id_ if id_ else QueryId(str(uuid4())) + self.text = QueryText(text) + self.retrieved_at = retrieved_at + + @classmethod + def get_all_for_video(cls, + conn: DatabaseConnection, + video_id: YoutubeId + ) -> list[Self]: + """Return YoutubeQueries containing YoutubeVideo's ID in results.""" + sql = SqlText('SELECT query_id FROM ' + 'yt_query_results WHERE video_id = ?') + query_ids = conn.exec(sql, (video_id,)).fetchall() + return [cls.get_one(conn, query_id_tup[0]) + for query_id_tup in query_ids] + + +class YoutubeVideo(DbData): + """Representation of YouTube video metadata as provided by their API.""" + _table_name = 'yt_videos' + _cols = ('id_', 'title', 'description', 'published_at', 'duration', + 'definition') + + def __init__(self, + id_: YoutubeId, + title: ProseText = ProseText('?'), + description: ProseText = ProseText('?'), + published_at: DatetimeStr = DatetimeStr('?'), + duration: str = '?', + definition: str = '?' + ) -> None: + self.id_ = id_ + self.title = title + self.description = description + self.published_at = published_at + self.duration = duration + self.definition = definition + + def set_duration_from_yt_string(self, yt_string) -> None: + """Set .duration from the kind of format the YouTube API provides.""" + date_dur, time_dur = yt_string.split('T') + seconds = 0 + date_dur = date_dur[1:] + for dur_char, len_seconds in (('Y', 60*60*24*365.25), + ('M', 60*60*24*30), + ('D', 60*60*24)): + if dur_char in date_dur: + dur_str, date_dur = date_dur.split(dur_char) + seconds += int(dur_str) * int(len_seconds) + for dur_char, len_seconds in (('H', 60*60), + ('M', 60), + ('S', 1)): + if dur_char in time_dur: + dur_str, time_dur = time_dur.split(dur_char) + seconds += int(dur_str) * len_seconds + seconds_str = str(seconds % 60) + minutes_str = str(seconds // 60) + hours_str = str(seconds // (60 * 60)) + self.duration = ':'.join([f'0{s}' if len(s) == 1 else s for s + in (hours_str, minutes_str, seconds_str)]) + + @classmethod + def get_all_for_query(cls, + conn: DatabaseConnection, + query_id: QueryId + ) -> list[Self]: + """Return all videos for query of query_id.""" + sql = SqlText('SELECT video_id ' + 'FROM yt_query_results WHERE query_id = ?') + video_ids = conn.exec(sql, (query_id,)).fetchall() + return [cls.get_one(conn, video_id_tup[0]) + for video_id_tup in video_ids] + + def save_to_query(self, + conn: DatabaseConnection, + query_id: QueryId + ) -> None: + """Save inclusion of self in results to query of query_id.""" + conn.exec(SqlText('REPLACE INTO yt_query_results VALUES (?, ?)'), + (query_id, self.id_)) + + +class VideoFile(DbData): + """Collects data about downloaded files.""" + _table_name = 'files' + _cols = ('rel_path', 'yt_id') + + def __init__(self, rel_path: PathStr, yt_id: YoutubeId) -> None: + self.rel_path = rel_path + self.yt_id = yt_id + + def remove(self, conn: DatabaseConnection) -> None: + """Remove self from database by self.rel_path as identifier.""" + sql = SqlText(f'DELETE FROM {self._table_name} WHERE rel_path = ?') + conn.exec(SqlText(sql), (self.rel_path,)) + + +class QuotaLog(DbData): + """Collects API access quota costs.""" + _table_name = 'quota_costs' + _cols = ('id_', 'timestamp', 'cost') + + def __init__(self, + id_: Optional[str], + timestamp: DatetimeStr, + cost: QuotaCost + ) -> None: + self.id_ = id_ if id_ else str(uuid4()) + self.timestamp = timestamp + self.cost = cost + + @classmethod + def update(cls, conn: DatabaseConnection, cost: QuotaCost) -> None: + """Adds cost mapped to current datetime.""" + cls._remove_old(conn) + new = cls(None, + DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)), + QuotaCost(cost)) + new.save(conn) + + @classmethod + def current(cls, conn: DatabaseConnection) -> QuotaCost: + """Returns quota cost total for last 24 hours, purges old data.""" + cls._remove_old(conn) + quota_costs = cls.get_all(conn) + return QuotaCost(sum(c.cost for c in quota_costs)) + + @classmethod + def _remove_old(cls, conn: DatabaseConnection) -> None: + cutoff = datetime.now() - timedelta(days=1) + sql = SqlText(f'DELETE FROM {cls._table_name} WHERE timestamp < ?') + conn.exec(SqlText(sql), (cutoff.strftime(TIMESTAMP_FMT),)) + + +class Player: + """MPV representation with some additional features.""" + + def __init__(self) -> None: + self.last_update = PlayerUpdateId('') + self._filenames = [PathStr(e.path) for e in scandir(PATH_DOWNLOADS) + if isfile(e.path) + and splitext(e.path)[1][1:] in LEGAL_EXTENSIONS] + shuffle(self._filenames) + self._idx: int = 0 + self._mpv: Optional[MPV] = None + + @property + def _mpv_available(self) -> bool: + return bool(self._mpv and not self._mpv.core_shutdown) + + @staticmethod + def _if_mpv_available(f) -> Callable: + def wrapper(self): + return f(self) if self._mpv else None + return wrapper + + def _signal_update(self) -> None: + self.last_update = PlayerUpdateId(f'{self._idx}:{time()}') + + def _start_mpv(self) -> None: + self._mpv = MPV(input_default_bindings=True, + input_vo_keyboard=True, + config=True) + self._mpv.observe_property('pause', lambda a, b: self._signal_update()) + + @self._mpv.event_callback('start-file') + def on_start_file(_) -> None: + assert self._mpv is not None + self._mpv.pause = False + self._idx = self._mpv.playlist_pos + self._signal_update() + + @self._mpv.event_callback('shutdown') + def on_shutdown(_) -> None: + self._mpv = None + self._signal_update() + + for path in self._filenames: + self._mpv.playlist_append(path) + self._mpv.playlist_play_index(self._idx) + + @property + def current_filename(self) -> Optional[PathStr]: + """Return what we assume is the name of the currently playing file.""" + if not self._filenames: + return None + return PathStr(basename(self._filenames[self._idx])) + + @property + def prev_files(self) -> list[PathStr]: + """List 'past' files of playlist.""" + return list(reversed(self._filenames[:self._idx])) + + @property + def next_files(self) -> list[PathStr]: + """List 'coming' files of playlist.""" + return self._filenames[self._idx + 1:] + + @property + def is_running(self) -> bool: + """Return if player is running/available.""" + return self._mpv_available + + @property + def is_paused(self) -> bool: + """Return if player is paused.""" + if self._mpv_available: + assert self._mpv is not None + return self._mpv.pause + return False + + def toggle_run(self) -> None: + """Toggle player running.""" + if self._mpv_available: + assert self._mpv is not None + self._mpv.terminate() + self._mpv = None + else: + self._start_mpv() + self._signal_update() + + @_if_mpv_available + def toggle_pause(self) -> None: + """Toggle player pausing.""" + assert self._mpv is not None + self._mpv.pause = not self._mpv.pause + self._signal_update() + + @_if_mpv_available + def prev(self) -> None: + """Move player to previous item in playlist.""" + assert self._mpv is not None + if self._mpv.playlist_pos > 0: + self._mpv.playlist_prev() + else: + self._mpv.playlist_play_index(0) + + @_if_mpv_available + def next(self) -> None: + """Move player to next item in playlist.""" + assert self._mpv is not None + max_idx: int = len(self._mpv.playlist_filenames) - 1 + if self._mpv.playlist_pos < len(self._mpv.playlist_filenames) - 1: + self._mpv.playlist_next() + else: + self._mpv.playlist_play_index(max_idx) + + +class DownloadsDb: + """Collections downloading-related stuff.""" + + def __init__(self) -> None: + self._to_download: list[YoutubeId] = [] + _ensure_expected_dirs([PATH_DOWNLOADS, PATH_TEMP]) + self._sync_db() + + def _sync_db(self): + conn = DatabaseConnection() + files_via_db = VideoFile.get_all(conn) + old_cwd = getcwd() + chdir(PATH_DOWNLOADS) + for file in files_via_db: + if not isfile(path_join(file.rel_path)): + print(f'SYNC: no file {file.rel_path} found, removing entry.') + file.remove(conn) + paths = [file.rel_path for file in files_via_db] + for path in [PathStr(e.path) for e in scandir() if isfile(e.path)]: + if path not in paths: + yt_id = self._id_from_filename(path) + file = VideoFile(path, yt_id) + print(f'SYNC: new file {path}, saving with YT ID "{yt_id}".') + file.save(conn) + chdir(old_cwd) + self._files = VideoFile.get_all(conn) + conn.commit_close() + + @staticmethod + def _id_from_filename(path: PathStr, + double_split: bool = False + ) -> YoutubeId: + before_ext = splitext(path)[0] + if double_split: + before_ext = splitext(before_ext)[0] + return YoutubeId(before_ext.split('[')[-1].split(']')[0]) + + @property + def ids_to_paths(self) -> DownloadsIndex: + """Return mapping YoutubeIds:paths of files downloaded to them.""" + self._sync_db() + return {f.yt_id: PathStr(path_join(PATH_DOWNLOADS, f.rel_path)) + for f in self._files} + + @property + def ids_unfinished(self) -> set[YoutubeId]: + """Return set of IDs of videos awaiting or currently in download.""" + in_temp_dir = [] + for path in [PathStr(e.path) for e + in scandir(PATH_TEMP) if isfile(e.path)]: + in_temp_dir += [self._id_from_filename(path)] + return set(self._to_download + in_temp_dir) + + def clean_unfinished(self) -> None: + """Empty temp directory of unfinished downloads.""" + for e in [e for e in scandir(PATH_TEMP) if isfile(e.path)]: + print(f'removing unfinished download: {e.path}') + os_remove(e.path) + + def queue_download(self, video_id: YoutubeId) -> None: + """Add video_id to download queue *if* not already processed.""" + pre_existing = self.ids_unfinished | set(self._to_download + + list(self.ids_to_paths)) + if video_id not in pre_existing: + self._to_download += [video_id] + + def _download_next(self) -> None: + if self._to_download: + video_id = self._to_download.pop(0) + with YoutubeDL(YT_DL_PARAMS) as ydl: + ydl.download([f'{YOUTUBE_URL_PREFIX}{video_id}']) + + def download_loop(self) -> None: + """Keep iterating through download queue for new download tasks.""" + while True: + sleep(0.5) + self._download_next() + + +class Server(HTTPServer): + """Extension of HTTPServer providing for Player and DownloadsDb.""" + + def __init__(self, + player: Player, + downloads_db: DownloadsDb, + *args, **kwargs + ) -> None: + super().__init__(*args, **kwargs) + self.player = player + self.downloads = downloads_db + + +class TaskHandler(BaseHTTPRequestHandler): + """Handler for GET and POST requests to our server.""" + server: Server + + def _send_http(self, + content: bytes = b'', + headers: Optional[list[tuple[str, str]]] = None, + code: int = 200 + ) -> None: + headers = headers if headers else [] + self.send_response(code) + for header_tuple in headers: + self.send_header(header_tuple[0], header_tuple[1]) + self.end_headers() + if content: + self.wfile.write(content) + + def do_POST(self) -> None: # pylint:disable=invalid-name + """Map POST requests to handlers for various paths.""" + url = urlparse(self.path) + toks_url: list[str] = url.path.split('/') + page_name = toks_url[1] + body_length = int(self.headers['content-length']) + postvars = parse_qs(self.rfile.read(body_length).decode()) + if 'playlist' == page_name: + self._post_player_command(list(postvars.keys())[0]) + elif 'queries' == page_name: + self._post_query(QueryText(postvars['query'][0])) + + def _post_player_command(self, commands: str) -> None: + if 'pause' in commands: + self.server.player.toggle_pause() + elif 'prev' in commands: + self.server.player.prev() + elif 'next' in commands: + self.server.player.next() + elif 'stop' in commands: + self.server.player.toggle_run() + sleep(0.5) # avoid reload happening before current_file update + self._send_http(headers=[('Location', '/')], code=302) + + def _post_query(self, query_txt: QueryText) -> None: + conn = DatabaseConnection() + + def collect_results(query_txt: QueryText) -> list[YoutubeVideo]: + youtube = googleapiclient.discovery.build('youtube', 'v3', + developerKey=API_KEY) + QuotaLog.update(conn, QUOTA_COST_YOUTUBE_SEARCH) + search_request = youtube.search().list( + q=query_txt, + part='snippet', + maxResults=25, + safeSearch='none', + type='video') + results: list[YoutubeVideo] = [] + ids_to_detail: list[YoutubeId] = [] + for item in search_request.execute()['items']: + video_id: YoutubeId = item['id']['videoId'] + ids_to_detail += [video_id] + snippet = item['snippet'] + urlretrieve(snippet['thumbnails']['default']['url'], + path_join(PATH_THUMBNAILS, f'{video_id}.jpg')) + results += [YoutubeVideo(id_=video_id, + title=snippet['title'], + description=snippet['description'], + published_at=snippet['publishedAt'])] + QuotaLog.update(conn, QUOTA_COST_YOUTUBE_DETAILS) + ids_for_details = ','.join([r.id_ for r in results]) + videos_request = youtube.videos().list(id=ids_for_details, + part='content_details') + for i, detailed in enumerate(videos_request.execute()['items']): + result = results[i] + assert result.id_ == detailed['id'] + content_details: dict[str, str] = detailed['contentDetails'] + result.set_duration_from_yt_string(content_details['duration']) + result.definition = content_details['definition'].upper() + return results + + query_data = YoutubeQuery( + None, query_txt, + DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT))) + query_data.save(conn) + for result in collect_results(query_txt): + result.save(conn) + result.save_to_query(conn, query_data.id_) + conn.commit_close() + self._send_http(headers=[('Location', f'/query/{query_data.id_}')], + code=302) + + def do_GET(self) -> None: # pylint:disable=invalid-name + """Map GET requests to handlers for various paths.""" + url = urlparse(self.path) + toks_url: list[str] = url.path.split('/') + page_name = toks_url[1] + try: + if 'thumbnails' == page_name: + self._send_thumbnail(PathStr(toks_url[2])) + elif 'dl' == page_name: + self._send_or_download_video(YoutubeId(toks_url[2])) + elif 'videos' == page_name: + self._send_videos_index() + elif 'video_about' == page_name: + self._send_video_about(YoutubeId(toks_url[2])) + elif 'query' == page_name: + self._send_query_page(QueryId(toks_url[2])) + elif 'queries' == page_name: + self._send_queries_index_and_search() + elif '_last_playlist_update.json' == page_name: + self._send_last_playlist_update() + else: # e.g. for / + self._send_playlist() + except NotFoundException: + self._send_http(b'not found', code=404) + + def _send_rendered_template(self, + tmpl_name: PathStr, + tmpl_ctx: TemplateContext + ) -> None: + with open(path_join(PATH_TEMPLATES, tmpl_name), + 'r', encoding='utf8' + ) as templ_file: + tmpl = Template(str(templ_file.read())) + html = tmpl.render(**tmpl_ctx) + self._send_http(bytes(html, 'utf8')) + + def _send_thumbnail(self, filename: PathStr) -> None: + _ensure_expected_dirs([PATH_THUMBNAILS]) + path_thumbnail = path_join(PATH_THUMBNAILS, filename) + if not path_exists(path_thumbnail): + video_id = splitext(filename)[0] + url = f'{THUMBNAIL_URL_PREFIX}{video_id}{THUMBNAIL_URL_SUFFIX}' + try: + urlretrieve(url, path_join(PATH_THUMBNAILS, f'{video_id}.jpg')) + except HTTPError as e: + if 404 == e.code: + raise NotFoundException from e + raise e + with open(path_thumbnail, 'rb') as f: + img = f.read() + self._send_http(img, [('Content-type', 'image/jpg')]) + + def _send_or_download_video(self, video_id: YoutubeId) -> None: + if video_id in self.server.downloads.ids_to_paths: + with open(self.server.downloads.ids_to_paths[video_id], + 'rb') as video_file: + video = video_file.read() + self._send_http(content=video) + return + self.server.downloads.queue_download(video_id) + self._send_http(headers=[('Location', f'/video_about/{video_id}')], + code=302) + + def _send_query_page(self, query_id: QueryId) -> None: + conn = DatabaseConnection() + query = YoutubeQuery.get_one(conn, str(query_id)) + results = YoutubeVideo.get_all_for_query(conn, query_id) + conn.commit_close() + self._send_rendered_template( + NAME_TEMPLATE_RESULTS, + {'query': query.text, 'videos': results}) + + def _send_queries_index_and_search(self) -> None: + conn = DatabaseConnection() + quota_count = QuotaLog.current(conn) + queries_data = YoutubeQuery.get_all(conn) + conn.commit_close() + queries_data.sort(key=lambda q: q.retrieved_at, reverse=True) + self._send_rendered_template( + NAME_TEMPLATE_QUERIES, {'queries': queries_data, + 'quota_count': quota_count}) + + def _send_video_about(self, video_id: YoutubeId) -> None: + conn = DatabaseConnection() + linked_queries = YoutubeQuery.get_all_for_video(conn, video_id) + try: + video_data = YoutubeVideo.get_one(conn, video_id) + except NotFoundException: + video_data = YoutubeVideo(video_id) + conn.commit_close() + self._send_rendered_template( + NAME_TEMPLATE_VIDEO_ABOUT, + {'video_data': video_data, + 'is_temp': video_id in self.server.downloads.ids_unfinished, + 'file_path': self.server.downloads.ids_to_paths.get(video_id, + None), + 'youtube_prefix': YOUTUBE_URL_PREFIX, + 'queries': linked_queries}) + + def _send_videos_index(self) -> None: + videos = [(id_, PathStr(basename(path))) + for id_, path in self.server.downloads.ids_to_paths.items()] + videos.sort(key=lambda t: t[1]) + self._send_rendered_template(NAME_TEMPLATE_VIDEOS, {'videos': videos}) + + def _send_last_playlist_update(self) -> None: + payload: dict[str, PlayerUpdateId] = { + 'last_update': self.server.player.last_update} + self._send_http(bytes(json_dumps(payload), 'utf8'), + headers=[('Content-type', 'application/json')]) + + def _send_playlist(self) -> None: + tuples: list[tuple[PathStr, PathStr]] = [] + i: int = 0 + while True: + prev, next_ = PathStr(''), PathStr('') + if len(self.server.player.prev_files) > i: + prev = PathStr(basename(self.server.player.prev_files[i])) + if len(self.server.player.next_files) > i: + next_ = PathStr(basename(self.server.player.next_files[i])) + if not prev + next_: + break + tuples += [(prev, next_)] + i += 1 + self._send_rendered_template( + NAME_TEMPLATE_PLAYLIST, + {'last_update': self.server.player.last_update, + 'running': self.server.player.is_running, + 'paused': self.server.player.is_paused, + 'current_title': self.server.player.current_filename, + 'tuples': tuples})