From: Christian Heller Date: Fri, 15 Nov 2024 02:10:47 +0000 (+0100) Subject: Move query and query result/video data into sqlite DB. X-Git-Url: https://plomlompom.com/repos/%7B%7B%20web_path%20%7D%7D/static/%7B%7Bprefix%7D%7D/%7B%7Bprefix%7D%7D/ledger?a=commitdiff_plain;h=0fe4b2b64a6ef15c6f80e8540d9716229412a575;p=ytplom Move query and query result/video data into sqlite DB. --- diff --git a/templates/queries.tmpl b/templates/queries.tmpl index 0d888b0..6ba91cb 100644 --- a/templates/queries.tmpl +++ b/templates/queries.tmpl @@ -9,14 +9,12 @@ - {% for query in queries %} - - + {% endfor %}
retrieved atDLs query
{{query.retrieved_at[:19]}}{{query.downloads}}{{query.text}}{{query.text}}
diff --git a/templates/results.tmpl b/templates/results.tmpl index 8b88362..124704a 100644 --- a/templates/results.tmpl +++ b/templates/results.tmpl @@ -2,20 +2,19 @@

playlist · videos · queries

-

query: {{query_text}}

+

query: {{query}}

{% for video in videos %} +{{video.duration}} {% endfor %} diff --git a/templates/video_about.tmpl b/templates/video_about.tmpl index 290e76d..0bee6a4 100644 --- a/templates/video_about.tmpl +++ b/templates/video_about.tmpl @@ -4,24 +4,24 @@

playlist · videos · queries

- + {{video.definition}}
-{{video.duration}}
-{% if video.available %}[loaded]{% endif %}
-{{video.title}} · {{video.description}} +{{video.title}} · {{video.description}}
- + - + diff --git a/ytplom.py b/ytplom.py index a86bf5c..361b898 100755 --- a/ytplom.py +++ b/ytplom.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 """Minimalistic download-focused YouTube interface.""" -from typing import TypeAlias, Optional, NewType, Callable +from typing import TypeAlias, Optional, NewType, Callable, Self, Any from os import environ, makedirs, scandir, remove as os_remove from os.path import (isdir, isfile, exists as path_exists, join as path_join, splitext, basename) @@ -12,44 +12,45 @@ from threading import Thread from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs from urllib.request import urlretrieve -from hashlib import md5 +from sqlite3 import connect as sql_connect, Cursor, Row from jinja2 import Template from mpv import MPV # type: ignore from yt_dlp import YoutubeDL # type: ignore import googleapiclient.discovery # type: ignore +API_KEY = environ.get('GOOGLE_API_KEY') +HTTP_PORT = 8084 + DatetimeStr = NewType('DatetimeStr', str) QuotaCost = NewType('QuotaCost', int) VideoId = NewType('VideoId', str) PathStr = NewType('PathStr', str) -QueryId = NewType('QueryId', str) +QueryId = NewType('QueryId', int) QueryText = NewType('QueryText', str) +ProseText = NewType('ProseText', str) +SqlText = NewType('SqlText', str) AmountDownloads = NewType('AmountDownloads', int) PlayerUpdateId = NewType('PlayerUpdateId', str) -Result: TypeAlias = dict[str, str] -Header: TypeAlias = tuple[str, str] -VideoData: TypeAlias = dict[str, str | bool] -QueryData: TypeAlias = dict[str, QueryId | QueryText | DatetimeStr - | AmountDownloads | list[Result]] QuotaLog: TypeAlias = dict[DatetimeStr, QuotaCost] -DownloadsDB = dict[VideoId, PathStr] -TemplateContext = dict[str, None | bool | PlayerUpdateId | PathStr | VideoId - | QuotaCost | QueryData - | VideoData | list[QueryData] | list[VideoData] - | list[tuple[VideoId, PathStr]] - | list[tuple[QueryId, QueryText]] - | list[tuple[PathStr, PathStr]]] +DownloadsDb: TypeAlias = dict[VideoId, PathStr] +TemplateContext: TypeAlias = dict[ + str, None | bool | PlayerUpdateId | PathStr | VideoId | QueryText + | QuotaCost | 'VideoData' | list['VideoData'] | list['QueryData'] + | list[tuple[VideoId, PathStr]] | list[tuple[PathStr, PathStr]]] + + +class NotFoundException(BaseException): + """Call on DB fetches finding less than expected.""" -API_KEY = environ.get('GOOGLE_API_KEY') -HTTP_PORT = 8083 PATH_QUOTA_LOG = PathStr('quota_log.json') PATH_DIR_DOWNLOADS = PathStr('downloads') PATH_DIR_THUMBNAILS = PathStr('thumbnails') PATH_DIR_REQUESTS_CACHE = PathStr('cache_googleapi') PATH_DIR_TEMPLATES = PathStr('templates') +PATH_DB = PathStr('db.sql') NAME_DIR_TEMP = PathStr('temp') -NAME_TEMPLATE_INDEX = PathStr('queries.tmpl') +NAME_TEMPLATE_QUERIES = PathStr('queries.tmpl') NAME_TEMPLATE_RESULTS = PathStr('results.tmpl') NAME_TEMPLATE_VIDEOS = PathStr('videos.tmpl') NAME_TEMPLATE_VIDEO_ABOUT = PathStr('video_about.tmpl') @@ -58,8 +59,8 @@ NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl') PATH_DIR_TEMP = PathStr(path_join(PATH_DIR_DOWNLOADS, NAME_DIR_TEMP)) EXPECTED_DIRS = [PATH_DIR_DOWNLOADS, PATH_DIR_TEMP, PATH_DIR_THUMBNAILS, PATH_DIR_REQUESTS_CACHE] -PATH_TEMPLATE_INDEX = PathStr(path_join(PATH_DIR_TEMPLATES, - NAME_TEMPLATE_INDEX)) +PATH_TEMPLATE_QUERIES = PathStr(path_join(PATH_DIR_TEMPLATES, + NAME_TEMPLATE_QUERIES)) TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f' YOUTUBE_URL_PREFIX = PathStr('https://www.youtube.com/watch?v=') YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\ @@ -73,9 +74,187 @@ QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1) LEGAL_EXTENSIONS = {'webm', 'mp4', 'mkv'} +SCRIPT_INIT_DB = ''' +CREATE TABLE yt_queries ( + id INTEGER PRIMARY KEY, + text TEXT NOT NULL, + retrieved_at TEXT NOT NULL +); +CREATE TABLE yt_videos ( + id TEXT PRIMARY KEY, + title TEXT NOT NULL, + description TEXT NOT NULL, + published_at TEXT NOT NULL, + duration TEXT NOT NULL, + definition TEXT NOT NULL +); +CREATE TABLE yt_query_results ( + query_id INTEGER NOT NULL, + video_id TEXT NOT NULL, + PRIMARY KEY (query_id, video_id), + FOREIGN KEY (query_id) REFERENCES yt_queries(id), + FOREIGN KEY (video_id) REFERENCES yt_videos(id) +); +''' + to_download: list[VideoId] = [] +class DatabaseConnection: + """Wrapped sqlite3.Connection.""" + + def __init__(self) -> None: + if not path_exists(PATH_DB): + with sql_connect(PATH_DB) as conn: + conn.executescript(SCRIPT_INIT_DB) + self._conn = sql_connect(PATH_DB) + + def exec(self, sql: SqlText, inputs: tuple[Any, ...] = tuple()) -> Cursor: + """Wrapper around sqlite3.Connection.execute.""" + return self._conn.execute(sql, inputs) + + def commit_close(self) -> None: + """Run sqlite3.Connection.commit and .close.""" + self._conn.commit() + self._conn.close() + + +class DbData: + """Abstraction of common DB operation.""" + _table_name: str + _cols: tuple[str, ...] + + @classmethod + def _from_table_row(cls, row: Row) -> Self: + kwargs = {} + for i, col_name in enumerate(cls._cols): + kwargs[col_name] = row[i] + return cls(**kwargs) + + @classmethod + def get_one(cls, conn: DatabaseConnection, id_: str) -> Self: + """Return single entry of id_ from DB.""" + sql = SqlText(f'SELECT * FROM {cls._table_name} WHERE id = ?') + row = conn.exec(sql, (id_,)).fetchone() + if not row: + raise NotFoundException + return cls._from_table_row(row) + + @classmethod + def get_all(cls, conn: DatabaseConnection) -> list[Self]: + """Return all entries from DB.""" + sql = SqlText(f'SELECT * FROM {cls._table_name}') + rows = conn.exec(sql).fetchall() + return [cls._from_table_row(row) for row in rows] + + def save(self, conn: DatabaseConnection) -> Cursor: + """Save entry to DB.""" + vals = [getattr(self, col_name) for col_name in self._cols] + q_marks = '(' + ','.join(['?'] * len(vals)) + ')' + sql = SqlText(f'REPLACE INTO {self._table_name} VALUES {q_marks}') + return conn.exec(sql, tuple(vals)) + + +class QueryData(DbData): + """Representation of YouTube query (without results).""" + _table_name = 'yt_queries' + _cols = ('id_', 'text', 'retrieved_at') + + def __init__(self, + id_: Optional[QueryId], + text: QueryText, + retrieved_at: DatetimeStr + ) -> None: + self.id_ = id_ + self.text = QueryText(text) + self.retrieved_at = retrieved_at + + def save(self, conn: DatabaseConnection) -> Cursor: + """DbData.save but assign .id_ from Cursor's lastrowid if yet unset.""" + cursor = super().save(conn) + if self.id_ is None: + assert cursor.lastrowid is not None + self.id_ = QueryId(cursor.lastrowid) + return cursor + + @classmethod + def get_all_for_video(cls, + conn: DatabaseConnection, + video_id: VideoId + ) -> list[Self]: + """Return all QueryData that got VideoData of video_id as result.""" + sql = SqlText('SELECT query_id FROM ' + 'yt_query_results WHERE video_id = ?') + query_ids = conn.exec(sql, (video_id,)).fetchall() + return [cls.get_one(conn, query_id_tup[0]) + for query_id_tup in query_ids] + + +class VideoData(DbData): + """Representation of YouTube video metadata as provided by their API.""" + _table_name = 'yt_videos' + _cols = ('id_', 'title', 'description', 'published_at', 'duration', + 'definition') + + def __init__(self, + id_: VideoId, + title: ProseText, + description: ProseText, + published_at: DatetimeStr, + duration: str = '', + definition: str = '' + ) -> None: + self.id_ = id_ + self.title = title + self.description = description + self.published_at = published_at + self.duration = duration + self.definition = definition + + def set_duration_from_yt_string(self, yt_string) -> None: + """Set .duration from the kind of format the YouTube API provides.""" + date_dur, time_dur = yt_string.split('T') + seconds = 0 + date_dur = date_dur[1:] + for dur_char, len_seconds in (('Y', 60*60*24*365.25), + ('M', 60*60*24*30), + ('D', 60*60*24)): + if dur_char in date_dur: + dur_str, date_dur = date_dur.split(dur_char) + seconds += int(dur_str) * int(len_seconds) + for dur_char, len_seconds in (('H', 60*60), + ('M', 60), + ('S', 1)): + if dur_char in time_dur: + dur_str, time_dur = time_dur.split(dur_char) + seconds += int(dur_str) * len_seconds + seconds_str = str(seconds % 60) + minutes_str = str(seconds // 60) + hours_str = str(seconds // (60 * 60)) + self.duration = ':'.join([f'0{s}' if len(s) == 1 else s for s + in (hours_str, minutes_str, seconds_str)]) + + @classmethod + def get_all_for_query(cls, + conn: DatabaseConnection, + query_id: QueryId + ) -> list[Self]: + """Return all videos for query of query_id.""" + sql = SqlText('SELECT video_id' + 'FROM yt_query_results WHERE query_id = ?') + video_ids = conn.exec(sql, (query_id,)).fetchall() + return [cls.get_one(conn, video_id_tup[0]) + for video_id_tup in video_ids] + + def save_to_query(self, + conn: DatabaseConnection, + query_id: QueryId + ) -> None: + """Save inclusion of self in results to query of query_id.""" + conn.exec(SqlText('REPLACE INTO yt_query_results VALUES (?, ?)'), + (query_id, self.id_)) + + class Player: """MPV representation with some additional features.""" @@ -276,7 +455,7 @@ class TaskHandler(BaseHTTPRequestHandler): def _send_http(self, content: bytes = b'', - headers: Optional[list[Header]] = None, + headers: Optional[list[tuple[str, str]]] = None, code: int = 200 ) -> None: headers = headers if headers else [] @@ -295,12 +474,11 @@ class TaskHandler(BaseHTTPRequestHandler): body_length = int(self.headers['content-length']) postvars = parse_qs(self.rfile.read(body_length).decode()) if 'playlist' == page_name: - self._post_player_command(list(postvars.keys())) + self._post_player_command(list(postvars.keys())[0]) elif 'queries' == page_name: self._post_query(QueryText(postvars['query'][0])) - def _post_player_command(self, commands: list[str]) -> None: - # print("DEBUG commands", commands) + def _post_player_command(self, commands: str) -> None: if 'pause' in commands: self.server.player.toggle_pause() elif 'prev' in commands: @@ -314,9 +492,7 @@ class TaskHandler(BaseHTTPRequestHandler): def _post_query(self, query_txt: QueryText) -> None: - def collect_results(now: DatetimeStr, - query_txt: QueryText - ) -> list[Result]: + def collect_results(now, query_txt: QueryText) -> list[VideoData]: youtube = googleapiclient.discovery.build('youtube', 'v3', developerKey=API_KEY) update_quota_log(now, QUOTA_COST_YOUTUBE_SEARCH) @@ -326,62 +502,66 @@ class TaskHandler(BaseHTTPRequestHandler): maxResults=25, safeSearch='none', type='video') - results = [] + results: list[VideoData] = [] ids_to_detail: list[VideoId] = [] for item in search_request.execute()['items']: video_id: VideoId = item['id']['videoId'] ids_to_detail += [video_id] - snippet: dict[str, str] = item['snippet'] - result: Result = {'id': video_id, - 'title': snippet['title'], - 'description': snippet['description'], - 'published_at': snippet['publishedAt'], - } - results += [result] - urlretrieve(item['snippet']['thumbnails']['default']['url'], + snippet = item['snippet'] + urlretrieve(snippet['thumbnails']['default']['url'], path_join(PATH_DIR_THUMBNAILS, f'{video_id}.jpg')) + results += [VideoData(id_=video_id, + title=snippet['title'], + description=snippet['description'], + published_at=snippet['publishedAt'])] update_quota_log(now, QUOTA_COST_YOUTUBE_DETAILS) - videos_request = youtube.videos().list(id=','.join(ids_to_detail), + ids_for_details = ','.join([r.id_ for r in results]) + videos_request = youtube.videos().list(id=ids_for_details, part='content_details') for i, detailed in enumerate(videos_request.execute()['items']): - results_item: Result = results[i] - assert results_item['id'] == detailed['id'] + result = results[i] + assert result.id_ == detailed['id'] content_details: dict[str, str] = detailed['contentDetails'] - results_item['duration'] = content_details['duration'] - results_item['definition'] = content_details['definition'] + result.set_duration_from_yt_string(content_details['duration']) + result.definition = content_details['definition'].upper() return results now = DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)) - results = collect_results(now, query_txt) - md5sum = md5(str(query_txt).encode()).hexdigest() - with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{md5sum}.json'), - 'w', encoding='utf8') as f: - json_dump({'text': query_txt, - 'retrieved_at': now, - 'results': results}, f) - self._send_http(headers=[('Location', f'/query/{md5sum}')], code=302) + query_data = QueryData(None, query_txt, now) + conn = DatabaseConnection() + query_data.save(conn) + for result in collect_results(now, query_txt): + result.save(conn) + assert query_data.id_ is not None + result.save_to_query(conn, query_data.id_) + conn.commit_close() + self._send_http(headers=[('Location', f'/query/{query_data.id_}')], + code=302) def do_GET(self) -> None: # pylint:disable=invalid-name """Map GET requests to handlers for various paths.""" url = urlparse(self.path) toks_url: list[str] = url.path.split('/') page_name = toks_url[1] - if 'thumbnails' == page_name: - self._send_thumbnail(PathStr(toks_url[2])) - elif 'dl' == page_name: - self._send_or_download_video(VideoId(toks_url[2])) - elif 'videos' == page_name: - self._send_videos_index() - elif 'video_about' == page_name: - self._send_video_about(VideoId(toks_url[2])) - elif 'query' == page_name: - self._send_query_page(QueryId(toks_url[2])) - elif 'queries' == page_name: - self._send_queries_index_and_search() - elif '_last_playlist_update.json' == page_name: - self._send_last_playlist_update() - else: # e.g. for / - self._send_playlist() + try: + if 'thumbnails' == page_name: + self._send_thumbnail(PathStr(toks_url[2])) + elif 'dl' == page_name: + self._send_or_download_video(VideoId(toks_url[2])) + elif 'videos' == page_name: + self._send_videos_index() + elif 'video_about' == page_name: + self._send_video_about(VideoId(toks_url[2])) + elif 'query' == page_name: + self._send_query_page(QueryId(int(toks_url[2]))) + elif 'queries' == page_name: + self._send_queries_index_and_search() + elif '_last_playlist_update.json' == page_name: + self._send_last_playlist_update() + else: # e.g. for / + self._send_playlist() + except NotFoundException: + self._send_http(b'not found', code=404) def _send_rendered_template(self, tmpl_name: PathStr, @@ -394,7 +574,7 @@ class TaskHandler(BaseHTTPRequestHandler): html = tmpl.render(**tmpl_ctx) self._send_http(bytes(html, 'utf8')) - def _make_downloads_db(self) -> DownloadsDB: + def _make_downloads_db(self) -> DownloadsDb: downloads_db = {} for e in [e for e in scandir(PATH_DIR_DOWNLOADS) if isfile(e.path)]: before_ext = splitext(e.path)[0] @@ -402,58 +582,6 @@ class TaskHandler(BaseHTTPRequestHandler): downloads_db[id_] = PathStr(e.path) return downloads_db - def _harvest_queries(self) -> list[tuple[QueryId, list[Result], - QueryText, DatetimeStr]]: - queries_data = [] - for file in [f for f in scandir(PATH_DIR_REQUESTS_CACHE) - if isfile(f.path)]: - with open(file.path, 'r', encoding='utf8') as query_file: - filed_query: QueryData = json_load(query_file) - assert isinstance(filed_query['results'], list) - assert isinstance(filed_query['text'], str) - assert isinstance(filed_query['retrieved_at'], str) - id_ = QueryId(splitext(basename(file.path))[0]) - results_list = filed_query['results'] - query_text = QueryText(filed_query['text']) - retrieved_at = DatetimeStr(filed_query['retrieved_at']) - queries_data += [(id_, results_list, query_text, retrieved_at)] - return queries_data - - def _result_to_video_data(self, - result: Result, - downloads_db: DownloadsDB - ) -> VideoData: - - def reformat_duration(duration_str: str): - date_dur, time_dur = duration_str.split('T') - seconds = 0 - date_dur = date_dur[1:] - for dur_char, len_seconds in (('Y', 60*60*24*365.25), - ('M', 60*60*24*30), - ('D', 60*60*24)): - if dur_char in date_dur: - dur_str, date_dur = date_dur.split(dur_char) - seconds += int(dur_str) * int(len_seconds) - for dur_char, len_seconds in (('H', 60*60), - ('M', 60), - ('S', 1)): - if dur_char in time_dur: - dur_str, time_dur = time_dur.split(dur_char) - seconds += int(dur_str) * len_seconds - seconds_str = str(seconds % 60) - minutes_str = str(seconds // 60) - hours_str = str(seconds // (60 * 60)) - return ':'.join([f'0{s}' if len(s) == 1 else s - for s in (hours_str, minutes_str, seconds_str)]) - assert isinstance(result['duration'], str) - return { - 'id': result['id'], - 'available': result['id'] in downloads_db, - 'duration': reformat_duration(result['duration']), - 'title': result['title'], - 'definition': result['definition'].upper() - } - def _send_thumbnail(self, filename: PathStr) -> None: with open(path_join(PATH_DIR_THUMBNAILS, filename), 'rb') as f: img = f.read() @@ -471,46 +599,36 @@ class TaskHandler(BaseHTTPRequestHandler): code=302) def _send_query_page(self, query_id: QueryId) -> None: - downloads_db = self._make_downloads_db() - with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{query_id}.json'), - 'r', encoding='utf8') as query_file: - query = json_load(query_file) + conn = DatabaseConnection() + query = QueryData.get_one(conn, str(query_id)) + results = VideoData.get_all_for_query(conn, query_id) + conn.commit_close() self._send_rendered_template( NAME_TEMPLATE_RESULTS, - {'query': query['text'], - 'videos': [self._result_to_video_data(result, downloads_db) - for result in query['results']]}) + {'query': query.text, 'videos': results}) def _send_queries_index_and_search(self) -> None: - downloads_db = self._make_downloads_db() - queries_data: list[QueryData] = [] - for id_, results, query_text, timestamp in self._harvest_queries(): - queries_data += [ - {'id': id_, 'text': query_text, 'retrieved_at': timestamp, - 'downloads': AmountDownloads(len([ - r for r in results if r['id'] in downloads_db]))}] - queries_data.sort(key=lambda q: q['retrieved_at'], reverse=True) - self._send_rendered_template(NAME_TEMPLATE_INDEX, {'queries': - queries_data}) + quota_count = QuotaCost(sum(read_quota_log().values())) + conn = DatabaseConnection() + queries_data = QueryData.get_all(conn) + conn.commit_close() + queries_data.sort(key=lambda q: q.retrieved_at, reverse=True) + self._send_rendered_template( + NAME_TEMPLATE_QUERIES, {'queries': queries_data, + 'quota_count': quota_count}) def _send_video_about(self, video_id: VideoId) -> None: - linked_queries: list[tuple[QueryId, QueryText]] = [] - first_result: Optional[Result] = None - for id_, results, query_text, _ in self._harvest_queries(): - for result in results: - if video_id == result['id']: - linked_queries += [(id_, query_text)] - first_result = first_result or result - if not first_result: - self._send_http(b'nothing found', code=404) - return + downloads_db = self._make_downloads_db() + conn = DatabaseConnection() + linked_queries = QueryData.get_all_for_video(conn, video_id) + video_data = VideoData.get_one(conn, video_id) + conn.commit_close() self._send_rendered_template( NAME_TEMPLATE_VIDEO_ABOUT, - {'video_id': video_id, + {'video_data': video_data, + 'available': video_data.id_ in downloads_db, 'youtube_prefix': YOUTUBE_URL_PREFIX, - 'queries': linked_queries, - 'video_data': self._result_to_video_data( - first_result, self._make_downloads_db())}) + 'queries': linked_queries}) def _send_videos_index(self) -> None: videos = [(id_, PathStr(basename(path))) @@ -519,7 +637,7 @@ class TaskHandler(BaseHTTPRequestHandler): self._send_rendered_template(NAME_TEMPLATE_VIDEOS, {'videos': videos}) def _send_last_playlist_update(self) -> None: - payload: dict[str, str] = { + payload: dict[str, PlayerUpdateId] = { 'last_update': self.server.player.last_update} self._send_http(bytes(json_dumps(payload), 'utf8'), headers=[('Content-type', 'application/json')])
title:{{video_data.title}}
thumbnail:
thumbnail:
description:{{video_data.description}}
duration:{{video_data.duration}}
definition:{{video_data.definition}}
YouTube ID:{{video_id}}
YouTube ID:{{video_data.id_}}
actions:
linked queries: