#!/usr/bin/env python3
"""Minimalistic download-focused YouTube interface."""
-from typing import TypeAlias, Optional, NewType, Callable
+from typing import TypeAlias, Optional, NewType, Callable, Self, Any
from os import environ, makedirs, scandir, remove as os_remove
from os.path import (isdir, isfile, exists as path_exists, join as path_join,
splitext, basename)
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
from urllib.request import urlretrieve
-from hashlib import md5
+from sqlite3 import connect as sql_connect, Cursor, Row
from jinja2 import Template
from mpv import MPV # type: ignore
from yt_dlp import YoutubeDL # type: ignore
import googleapiclient.discovery # type: ignore
+API_KEY = environ.get('GOOGLE_API_KEY')
+HTTP_PORT = 8084
+
DatetimeStr = NewType('DatetimeStr', str)
QuotaCost = NewType('QuotaCost', int)
VideoId = NewType('VideoId', str)
PathStr = NewType('PathStr', str)
-QueryId = NewType('QueryId', str)
+QueryId = NewType('QueryId', int)
QueryText = NewType('QueryText', str)
+ProseText = NewType('ProseText', str)
+SqlText = NewType('SqlText', str)
AmountDownloads = NewType('AmountDownloads', int)
PlayerUpdateId = NewType('PlayerUpdateId', str)
-Result: TypeAlias = dict[str, str]
-Header: TypeAlias = tuple[str, str]
-VideoData: TypeAlias = dict[str, str | bool]
-QueryData: TypeAlias = dict[str, QueryId | QueryText | DatetimeStr
- | AmountDownloads | list[Result]]
QuotaLog: TypeAlias = dict[DatetimeStr, QuotaCost]
-DownloadsDB = dict[VideoId, PathStr]
-TemplateContext = dict[str, None | bool | PlayerUpdateId | PathStr | VideoId
- | QuotaCost | QueryData
- | VideoData | list[QueryData] | list[VideoData]
- | list[tuple[VideoId, PathStr]]
- | list[tuple[QueryId, QueryText]]
- | list[tuple[PathStr, PathStr]]]
+DownloadsDb: TypeAlias = dict[VideoId, PathStr]
+TemplateContext: TypeAlias = dict[
+ str, None | bool | PlayerUpdateId | PathStr | VideoId | QueryText
+ | QuotaCost | 'VideoData' | list['VideoData'] | list['QueryData']
+ | list[tuple[VideoId, PathStr]] | list[tuple[PathStr, PathStr]]]
+
+
+class NotFoundException(BaseException):
+ """Call on DB fetches finding less than expected."""
-API_KEY = environ.get('GOOGLE_API_KEY')
-HTTP_PORT = 8083
PATH_QUOTA_LOG = PathStr('quota_log.json')
PATH_DIR_DOWNLOADS = PathStr('downloads')
PATH_DIR_THUMBNAILS = PathStr('thumbnails')
PATH_DIR_REQUESTS_CACHE = PathStr('cache_googleapi')
PATH_DIR_TEMPLATES = PathStr('templates')
+PATH_DB = PathStr('db.sql')
NAME_DIR_TEMP = PathStr('temp')
-NAME_TEMPLATE_INDEX = PathStr('queries.tmpl')
+NAME_TEMPLATE_QUERIES = PathStr('queries.tmpl')
NAME_TEMPLATE_RESULTS = PathStr('results.tmpl')
NAME_TEMPLATE_VIDEOS = PathStr('videos.tmpl')
NAME_TEMPLATE_VIDEO_ABOUT = PathStr('video_about.tmpl')
PATH_DIR_TEMP = PathStr(path_join(PATH_DIR_DOWNLOADS, NAME_DIR_TEMP))
EXPECTED_DIRS = [PATH_DIR_DOWNLOADS, PATH_DIR_TEMP, PATH_DIR_THUMBNAILS,
PATH_DIR_REQUESTS_CACHE]
-PATH_TEMPLATE_INDEX = PathStr(path_join(PATH_DIR_TEMPLATES,
- NAME_TEMPLATE_INDEX))
+PATH_TEMPLATE_QUERIES = PathStr(path_join(PATH_DIR_TEMPLATES,
+ NAME_TEMPLATE_QUERIES))
TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
YOUTUBE_URL_PREFIX = PathStr('https://www.youtube.com/watch?v=')
YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
LEGAL_EXTENSIONS = {'webm', 'mp4', 'mkv'}
+SCRIPT_INIT_DB = '''
+CREATE TABLE yt_queries (
+ id INTEGER PRIMARY KEY,
+ text TEXT NOT NULL,
+ retrieved_at TEXT NOT NULL
+);
+CREATE TABLE yt_videos (
+ id TEXT PRIMARY KEY,
+ title TEXT NOT NULL,
+ description TEXT NOT NULL,
+ published_at TEXT NOT NULL,
+ duration TEXT NOT NULL,
+ definition TEXT NOT NULL
+);
+CREATE TABLE yt_query_results (
+ query_id INTEGER NOT NULL,
+ video_id TEXT NOT NULL,
+ PRIMARY KEY (query_id, video_id),
+ FOREIGN KEY (query_id) REFERENCES yt_queries(id),
+ FOREIGN KEY (video_id) REFERENCES yt_videos(id)
+);
+'''
+
to_download: list[VideoId] = []
+class DatabaseConnection:
+ """Wrapped sqlite3.Connection."""
+
+ def __init__(self) -> None:
+ if not path_exists(PATH_DB):
+ with sql_connect(PATH_DB) as conn:
+ conn.executescript(SCRIPT_INIT_DB)
+ self._conn = sql_connect(PATH_DB)
+
+ def exec(self, sql: SqlText, inputs: tuple[Any, ...] = tuple()) -> Cursor:
+ """Wrapper around sqlite3.Connection.execute."""
+ return self._conn.execute(sql, inputs)
+
+ def commit_close(self) -> None:
+ """Run sqlite3.Connection.commit and .close."""
+ self._conn.commit()
+ self._conn.close()
+
+
+class DbData:
+ """Abstraction of common DB operation."""
+ _table_name: str
+ _cols: tuple[str, ...]
+
+ @classmethod
+ def _from_table_row(cls, row: Row) -> Self:
+ kwargs = {}
+ for i, col_name in enumerate(cls._cols):
+ kwargs[col_name] = row[i]
+ return cls(**kwargs)
+
+ @classmethod
+ def get_one(cls, conn: DatabaseConnection, id_: str) -> Self:
+ """Return single entry of id_ from DB."""
+ sql = SqlText(f'SELECT * FROM {cls._table_name} WHERE id = ?')
+ row = conn.exec(sql, (id_,)).fetchone()
+ if not row:
+ raise NotFoundException
+ return cls._from_table_row(row)
+
+ @classmethod
+ def get_all(cls, conn: DatabaseConnection) -> list[Self]:
+ """Return all entries from DB."""
+ sql = SqlText(f'SELECT * FROM {cls._table_name}')
+ rows = conn.exec(sql).fetchall()
+ return [cls._from_table_row(row) for row in rows]
+
+ def save(self, conn: DatabaseConnection) -> Cursor:
+ """Save entry to DB."""
+ vals = [getattr(self, col_name) for col_name in self._cols]
+ q_marks = '(' + ','.join(['?'] * len(vals)) + ')'
+ sql = SqlText(f'REPLACE INTO {self._table_name} VALUES {q_marks}')
+ return conn.exec(sql, tuple(vals))
+
+
+class QueryData(DbData):
+ """Representation of YouTube query (without results)."""
+ _table_name = 'yt_queries'
+ _cols = ('id_', 'text', 'retrieved_at')
+
+ def __init__(self,
+ id_: Optional[QueryId],
+ text: QueryText,
+ retrieved_at: DatetimeStr
+ ) -> None:
+ self.id_ = id_
+ self.text = QueryText(text)
+ self.retrieved_at = retrieved_at
+
+ def save(self, conn: DatabaseConnection) -> Cursor:
+ """DbData.save but assign .id_ from Cursor's lastrowid if yet unset."""
+ cursor = super().save(conn)
+ if self.id_ is None:
+ assert cursor.lastrowid is not None
+ self.id_ = QueryId(cursor.lastrowid)
+ return cursor
+
+ @classmethod
+ def get_all_for_video(cls,
+ conn: DatabaseConnection,
+ video_id: VideoId
+ ) -> list[Self]:
+ """Return all QueryData that got VideoData of video_id as result."""
+ sql = SqlText('SELECT query_id FROM '
+ 'yt_query_results WHERE video_id = ?')
+ query_ids = conn.exec(sql, (video_id,)).fetchall()
+ return [cls.get_one(conn, query_id_tup[0])
+ for query_id_tup in query_ids]
+
+
+class VideoData(DbData):
+ """Representation of YouTube video metadata as provided by their API."""
+ _table_name = 'yt_videos'
+ _cols = ('id_', 'title', 'description', 'published_at', 'duration',
+ 'definition')
+
+ def __init__(self,
+ id_: VideoId,
+ title: ProseText,
+ description: ProseText,
+ published_at: DatetimeStr,
+ duration: str = '',
+ definition: str = ''
+ ) -> None:
+ self.id_ = id_
+ self.title = title
+ self.description = description
+ self.published_at = published_at
+ self.duration = duration
+ self.definition = definition
+
+ def set_duration_from_yt_string(self, yt_string) -> None:
+ """Set .duration from the kind of format the YouTube API provides."""
+ date_dur, time_dur = yt_string.split('T')
+ seconds = 0
+ date_dur = date_dur[1:]
+ for dur_char, len_seconds in (('Y', 60*60*24*365.25),
+ ('M', 60*60*24*30),
+ ('D', 60*60*24)):
+ if dur_char in date_dur:
+ dur_str, date_dur = date_dur.split(dur_char)
+ seconds += int(dur_str) * int(len_seconds)
+ for dur_char, len_seconds in (('H', 60*60),
+ ('M', 60),
+ ('S', 1)):
+ if dur_char in time_dur:
+ dur_str, time_dur = time_dur.split(dur_char)
+ seconds += int(dur_str) * len_seconds
+ seconds_str = str(seconds % 60)
+ minutes_str = str(seconds // 60)
+ hours_str = str(seconds // (60 * 60))
+ self.duration = ':'.join([f'0{s}' if len(s) == 1 else s for s
+ in (hours_str, minutes_str, seconds_str)])
+
+ @classmethod
+ def get_all_for_query(cls,
+ conn: DatabaseConnection,
+ query_id: QueryId
+ ) -> list[Self]:
+ """Return all videos for query of query_id."""
+ sql = SqlText('SELECT video_id'
+ 'FROM yt_query_results WHERE query_id = ?')
+ video_ids = conn.exec(sql, (query_id,)).fetchall()
+ return [cls.get_one(conn, video_id_tup[0])
+ for video_id_tup in video_ids]
+
+ def save_to_query(self,
+ conn: DatabaseConnection,
+ query_id: QueryId
+ ) -> None:
+ """Save inclusion of self in results to query of query_id."""
+ conn.exec(SqlText('REPLACE INTO yt_query_results VALUES (?, ?)'),
+ (query_id, self.id_))
+
+
class Player:
"""MPV representation with some additional features."""
def _send_http(self,
content: bytes = b'',
- headers: Optional[list[Header]] = None,
+ headers: Optional[list[tuple[str, str]]] = None,
code: int = 200
) -> None:
headers = headers if headers else []
body_length = int(self.headers['content-length'])
postvars = parse_qs(self.rfile.read(body_length).decode())
if 'playlist' == page_name:
- self._post_player_command(list(postvars.keys()))
+ self._post_player_command(list(postvars.keys())[0])
elif 'queries' == page_name:
self._post_query(QueryText(postvars['query'][0]))
- def _post_player_command(self, commands: list[str]) -> None:
- # print("DEBUG commands", commands)
+ def _post_player_command(self, commands: str) -> None:
if 'pause' in commands:
self.server.player.toggle_pause()
elif 'prev' in commands:
def _post_query(self, query_txt: QueryText) -> None:
- def collect_results(now: DatetimeStr,
- query_txt: QueryText
- ) -> list[Result]:
+ def collect_results(now, query_txt: QueryText) -> list[VideoData]:
youtube = googleapiclient.discovery.build('youtube', 'v3',
developerKey=API_KEY)
update_quota_log(now, QUOTA_COST_YOUTUBE_SEARCH)
maxResults=25,
safeSearch='none',
type='video')
- results = []
+ results: list[VideoData] = []
ids_to_detail: list[VideoId] = []
for item in search_request.execute()['items']:
video_id: VideoId = item['id']['videoId']
ids_to_detail += [video_id]
- snippet: dict[str, str] = item['snippet']
- result: Result = {'id': video_id,
- 'title': snippet['title'],
- 'description': snippet['description'],
- 'published_at': snippet['publishedAt'],
- }
- results += [result]
- urlretrieve(item['snippet']['thumbnails']['default']['url'],
+ snippet = item['snippet']
+ urlretrieve(snippet['thumbnails']['default']['url'],
path_join(PATH_DIR_THUMBNAILS, f'{video_id}.jpg'))
+ results += [VideoData(id_=video_id,
+ title=snippet['title'],
+ description=snippet['description'],
+ published_at=snippet['publishedAt'])]
update_quota_log(now, QUOTA_COST_YOUTUBE_DETAILS)
- videos_request = youtube.videos().list(id=','.join(ids_to_detail),
+ ids_for_details = ','.join([r.id_ for r in results])
+ videos_request = youtube.videos().list(id=ids_for_details,
part='content_details')
for i, detailed in enumerate(videos_request.execute()['items']):
- results_item: Result = results[i]
- assert results_item['id'] == detailed['id']
+ result = results[i]
+ assert result.id_ == detailed['id']
content_details: dict[str, str] = detailed['contentDetails']
- results_item['duration'] = content_details['duration']
- results_item['definition'] = content_details['definition']
+ result.set_duration_from_yt_string(content_details['duration'])
+ result.definition = content_details['definition'].upper()
return results
now = DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT))
- results = collect_results(now, query_txt)
- md5sum = md5(str(query_txt).encode()).hexdigest()
- with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{md5sum}.json'),
- 'w', encoding='utf8') as f:
- json_dump({'text': query_txt,
- 'retrieved_at': now,
- 'results': results}, f)
- self._send_http(headers=[('Location', f'/query/{md5sum}')], code=302)
+ query_data = QueryData(None, query_txt, now)
+ conn = DatabaseConnection()
+ query_data.save(conn)
+ for result in collect_results(now, query_txt):
+ result.save(conn)
+ assert query_data.id_ is not None
+ result.save_to_query(conn, query_data.id_)
+ conn.commit_close()
+ self._send_http(headers=[('Location', f'/query/{query_data.id_}')],
+ code=302)
def do_GET(self) -> None: # pylint:disable=invalid-name
"""Map GET requests to handlers for various paths."""
url = urlparse(self.path)
toks_url: list[str] = url.path.split('/')
page_name = toks_url[1]
- if 'thumbnails' == page_name:
- self._send_thumbnail(PathStr(toks_url[2]))
- elif 'dl' == page_name:
- self._send_or_download_video(VideoId(toks_url[2]))
- elif 'videos' == page_name:
- self._send_videos_index()
- elif 'video_about' == page_name:
- self._send_video_about(VideoId(toks_url[2]))
- elif 'query' == page_name:
- self._send_query_page(QueryId(toks_url[2]))
- elif 'queries' == page_name:
- self._send_queries_index_and_search()
- elif '_last_playlist_update.json' == page_name:
- self._send_last_playlist_update()
- else: # e.g. for /
- self._send_playlist()
+ try:
+ if 'thumbnails' == page_name:
+ self._send_thumbnail(PathStr(toks_url[2]))
+ elif 'dl' == page_name:
+ self._send_or_download_video(VideoId(toks_url[2]))
+ elif 'videos' == page_name:
+ self._send_videos_index()
+ elif 'video_about' == page_name:
+ self._send_video_about(VideoId(toks_url[2]))
+ elif 'query' == page_name:
+ self._send_query_page(QueryId(int(toks_url[2])))
+ elif 'queries' == page_name:
+ self._send_queries_index_and_search()
+ elif '_last_playlist_update.json' == page_name:
+ self._send_last_playlist_update()
+ else: # e.g. for /
+ self._send_playlist()
+ except NotFoundException:
+ self._send_http(b'not found', code=404)
def _send_rendered_template(self,
tmpl_name: PathStr,
html = tmpl.render(**tmpl_ctx)
self._send_http(bytes(html, 'utf8'))
- def _make_downloads_db(self) -> DownloadsDB:
+ def _make_downloads_db(self) -> DownloadsDb:
downloads_db = {}
for e in [e for e in scandir(PATH_DIR_DOWNLOADS) if isfile(e.path)]:
before_ext = splitext(e.path)[0]
downloads_db[id_] = PathStr(e.path)
return downloads_db
- def _harvest_queries(self) -> list[tuple[QueryId, list[Result],
- QueryText, DatetimeStr]]:
- queries_data = []
- for file in [f for f in scandir(PATH_DIR_REQUESTS_CACHE)
- if isfile(f.path)]:
- with open(file.path, 'r', encoding='utf8') as query_file:
- filed_query: QueryData = json_load(query_file)
- assert isinstance(filed_query['results'], list)
- assert isinstance(filed_query['text'], str)
- assert isinstance(filed_query['retrieved_at'], str)
- id_ = QueryId(splitext(basename(file.path))[0])
- results_list = filed_query['results']
- query_text = QueryText(filed_query['text'])
- retrieved_at = DatetimeStr(filed_query['retrieved_at'])
- queries_data += [(id_, results_list, query_text, retrieved_at)]
- return queries_data
-
- def _result_to_video_data(self,
- result: Result,
- downloads_db: DownloadsDB
- ) -> VideoData:
-
- def reformat_duration(duration_str: str):
- date_dur, time_dur = duration_str.split('T')
- seconds = 0
- date_dur = date_dur[1:]
- for dur_char, len_seconds in (('Y', 60*60*24*365.25),
- ('M', 60*60*24*30),
- ('D', 60*60*24)):
- if dur_char in date_dur:
- dur_str, date_dur = date_dur.split(dur_char)
- seconds += int(dur_str) * int(len_seconds)
- for dur_char, len_seconds in (('H', 60*60),
- ('M', 60),
- ('S', 1)):
- if dur_char in time_dur:
- dur_str, time_dur = time_dur.split(dur_char)
- seconds += int(dur_str) * len_seconds
- seconds_str = str(seconds % 60)
- minutes_str = str(seconds // 60)
- hours_str = str(seconds // (60 * 60))
- return ':'.join([f'0{s}' if len(s) == 1 else s
- for s in (hours_str, minutes_str, seconds_str)])
- assert isinstance(result['duration'], str)
- return {
- 'id': result['id'],
- 'available': result['id'] in downloads_db,
- 'duration': reformat_duration(result['duration']),
- 'title': result['title'],
- 'definition': result['definition'].upper()
- }
-
def _send_thumbnail(self, filename: PathStr) -> None:
with open(path_join(PATH_DIR_THUMBNAILS, filename), 'rb') as f:
img = f.read()
code=302)
def _send_query_page(self, query_id: QueryId) -> None:
- downloads_db = self._make_downloads_db()
- with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{query_id}.json'),
- 'r', encoding='utf8') as query_file:
- query = json_load(query_file)
+ conn = DatabaseConnection()
+ query = QueryData.get_one(conn, str(query_id))
+ results = VideoData.get_all_for_query(conn, query_id)
+ conn.commit_close()
self._send_rendered_template(
NAME_TEMPLATE_RESULTS,
- {'query': query['text'],
- 'videos': [self._result_to_video_data(result, downloads_db)
- for result in query['results']]})
+ {'query': query.text, 'videos': results})
def _send_queries_index_and_search(self) -> None:
- downloads_db = self._make_downloads_db()
- queries_data: list[QueryData] = []
- for id_, results, query_text, timestamp in self._harvest_queries():
- queries_data += [
- {'id': id_, 'text': query_text, 'retrieved_at': timestamp,
- 'downloads': AmountDownloads(len([
- r for r in results if r['id'] in downloads_db]))}]
- queries_data.sort(key=lambda q: q['retrieved_at'], reverse=True)
- self._send_rendered_template(NAME_TEMPLATE_INDEX, {'queries':
- queries_data})
+ quota_count = QuotaCost(sum(read_quota_log().values()))
+ conn = DatabaseConnection()
+ queries_data = QueryData.get_all(conn)
+ conn.commit_close()
+ queries_data.sort(key=lambda q: q.retrieved_at, reverse=True)
+ self._send_rendered_template(
+ NAME_TEMPLATE_QUERIES, {'queries': queries_data,
+ 'quota_count': quota_count})
def _send_video_about(self, video_id: VideoId) -> None:
- linked_queries: list[tuple[QueryId, QueryText]] = []
- first_result: Optional[Result] = None
- for id_, results, query_text, _ in self._harvest_queries():
- for result in results:
- if video_id == result['id']:
- linked_queries += [(id_, query_text)]
- first_result = first_result or result
- if not first_result:
- self._send_http(b'nothing found', code=404)
- return
+ downloads_db = self._make_downloads_db()
+ conn = DatabaseConnection()
+ linked_queries = QueryData.get_all_for_video(conn, video_id)
+ video_data = VideoData.get_one(conn, video_id)
+ conn.commit_close()
self._send_rendered_template(
NAME_TEMPLATE_VIDEO_ABOUT,
- {'video_id': video_id,
+ {'video_data': video_data,
+ 'available': video_data.id_ in downloads_db,
'youtube_prefix': YOUTUBE_URL_PREFIX,
- 'queries': linked_queries,
- 'video_data': self._result_to_video_data(
- first_result, self._make_downloads_db())})
+ 'queries': linked_queries})
def _send_videos_index(self) -> None:
videos = [(id_, PathStr(basename(path)))
self._send_rendered_template(NAME_TEMPLATE_VIDEOS, {'videos': videos})
def _send_last_playlist_update(self) -> None:
- payload: dict[str, str] = {
+ payload: dict[str, PlayerUpdateId] = {
'last_update': self.server.player.last_update}
self._send_http(bytes(json_dumps(payload), 'utf8'),
headers=[('Content-type', 'application/json')])