#!/usr/bin/env python3
"""Minimalistic download-focused YouTube interface."""
-
-# included libs
-from typing import TypeAlias, Optional, NewType, Callable, Self, Any
-from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove
-from os.path import (isdir, isfile, exists as path_exists, join as path_join,
- splitext, basename)
-from random import shuffle
-from time import time, sleep
-from datetime import datetime, timedelta
-from json import dumps as json_dumps
-from uuid import uuid4
from threading import Thread
-from sqlite3 import connect as sql_connect, Cursor, Row
-from http.server import HTTPServer, BaseHTTPRequestHandler
-from urllib.parse import urlparse, parse_qs
-from urllib.request import urlretrieve
-from urllib.error import HTTPError
-# non-included libs
-from jinja2 import Template
-from mpv import MPV # type: ignore
-from yt_dlp import YoutubeDL # type: ignore
-import googleapiclient.discovery # type: ignore
-
-# what we might want to manually define per environs
-API_KEY = environ.get('GOOGLE_API_KEY')
-HTTP_PORT = int(environ.get('YTPLOM_PORT', 8084))
-
-# type definitions for mypy
-DatetimeStr = NewType('DatetimeStr', str)
-QuotaCost = NewType('QuotaCost', int)
-YoutubeId = NewType('YoutubeId', str)
-PathStr = NewType('PathStr', str)
-QueryId = NewType('QueryId', str)
-QueryText = NewType('QueryText', str)
-ProseText = NewType('ProseText', str)
-SqlText = NewType('SqlText', str)
-AmountDownloads = NewType('AmountDownloads', int)
-PlayerUpdateId = NewType('PlayerUpdateId', str)
-DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr]
-TemplateContext: TypeAlias = dict[
- str, None | bool | PlayerUpdateId | Optional[PathStr] | YoutubeId
- | QueryText | QuotaCost | 'YoutubeVideo' | list['YoutubeVideo']
- | list['YoutubeQuery'] | list[tuple[YoutubeId, PathStr]]
- | list[tuple[PathStr, PathStr]]]
-
-# local data reasonably expected to be in user home directory
-PATH_HOME = PathStr(environ.get('HOME', ''))
-PATH_WORKDIR = PathStr(path_join(PATH_HOME, 'ytplom'))
-PATH_THUMBNAILS = PathStr(path_join(PATH_WORKDIR, 'thumbnails'))
-PATH_DB = PathStr(path_join(PATH_WORKDIR, 'db.sql'))
-PATH_DOWNLOADS = PathStr(path_join(PATH_WORKDIR, 'downloads'))
-PATH_TEMP = PathStr(path_join(PATH_WORKDIR, 'temp'))
-
-# template paths; might move outside PATH_WORKDIR in the future
-PATH_TEMPLATES = PathStr(path_join(PATH_WORKDIR, 'templates'))
-NAME_TEMPLATE_QUERIES = PathStr('queries.tmpl')
-NAME_TEMPLATE_RESULTS = PathStr('results.tmpl')
-NAME_TEMPLATE_VIDEOS = PathStr('videos.tmpl')
-NAME_TEMPLATE_VIDEO_ABOUT = PathStr('video_about.tmpl')
-NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl')
-PATH_TEMPLATE_QUERIES = PathStr(path_join(PATH_TEMPLATES,
- NAME_TEMPLATE_QUERIES))
-
-# yt_dlp config
-YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
- '/best[height<=1080][width<=1920]'
-YT_DL_PARAMS = {'paths': {'home': PATH_DOWNLOADS,
- 'temp': PATH_TEMP},
- 'format': YT_DOWNLOAD_FORMAT}
-
-# Youtube API expectations
-YOUTUBE_URL_PREFIX = PathStr('https://www.youtube.com/watch?v=')
-THUMBNAIL_URL_PREFIX = PathStr('https://i.ytimg.com/vi/')
-THUMBNAIL_URL_SUFFIX = PathStr('/default.jpg')
-QUOTA_COST_YOUTUBE_SEARCH = QuotaCost(100)
-QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1)
-
-# local expectations
-TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
-LEGAL_EXTENSIONS = {'webm', 'mp4', 'mkv'}
-
-# tables to create database with
-SCRIPT_INIT_DB = '''
-CREATE TABLE yt_queries (
- id TEXT PRIMARY KEY,
- text TEXT NOT NULL,
- retrieved_at TEXT NOT NULL
-);
-CREATE TABLE yt_videos (
- id TEXT PRIMARY KEY,
- title TEXT NOT NULL,
- description TEXT NOT NULL,
- published_at TEXT NOT NULL,
- duration TEXT NOT NULL,
- definition TEXT NOT NULL
-);
-CREATE TABLE yt_query_results (
- query_id TEXT NOT NULL,
- video_id TEXT NOT NULL,
- PRIMARY KEY (query_id, video_id),
- FOREIGN KEY (query_id) REFERENCES yt_queries(id),
- FOREIGN KEY (video_id) REFERENCES yt_videos(id)
-);
-CREATE TABLE quota_costs (
- id TEXT PRIMARY KEY,
- timestamp TEXT NOT NULL,
- cost INT NOT NULL
-);
-CREATE TABLE files (
- rel_path TEXT PRIMARY KEY,
- yt_id TEXT NOT NULL DEFAULT "",
- FOREIGN KEY (yt_id) REFERENCES yt_videos(id)
-);
-'''
-
-
-class NotFoundException(BaseException):
- """Call on DB fetches finding less than expected."""
-
-
-def _ensure_expected_dirs(expected_dirs: list[PathStr]) -> None:
- """Ensure existance of expected_dirs _as_ directories."""
- for dir_name in expected_dirs:
- if not path_exists(dir_name):
- print(f'creating expected directory: {dir_name}')
- makedirs(dir_name)
- elif not isdir(dir_name):
- msg = f'at expected directory path {dir_name} found non-directory'
- raise Exception(msg)
-
-
-class DatabaseConnection:
- """Wrapped sqlite3.Connection."""
-
- def __init__(self) -> None:
- if not path_exists(PATH_DB):
- with sql_connect(PATH_DB) as conn:
- conn.executescript(SCRIPT_INIT_DB)
- self._conn = sql_connect(PATH_DB)
-
- def exec(self, sql: SqlText, inputs: tuple[Any, ...] = tuple()) -> Cursor:
- """Wrapper around sqlite3.Connection.execute."""
- return self._conn.execute(sql, inputs)
-
- def commit_close(self) -> None:
- """Run sqlite3.Connection.commit and .close."""
- self._conn.commit()
- self._conn.close()
-
-
-class DbData:
- """Abstraction of common DB operation."""
- _table_name: str
- _cols: tuple[str, ...]
-
- @classmethod
- def _from_table_row(cls, row: Row) -> Self:
- kwargs = {}
- for i, col_name in enumerate(cls._cols):
- kwargs[col_name] = row[i]
- return cls(**kwargs)
-
- @classmethod
- def get_one(cls, conn: DatabaseConnection, id_: str) -> Self:
- """Return single entry of id_ from DB."""
- sql = SqlText(f'SELECT * FROM {cls._table_name} WHERE id = ?')
- row = conn.exec(sql, (id_,)).fetchone()
- if not row:
- raise NotFoundException
- return cls._from_table_row(row)
-
- @classmethod
- def get_all(cls, conn: DatabaseConnection) -> list[Self]:
- """Return all entries from DB."""
- sql = SqlText(f'SELECT * FROM {cls._table_name}')
- rows = conn.exec(sql).fetchall()
- return [cls._from_table_row(row) for row in rows]
-
- def save(self, conn: DatabaseConnection) -> Cursor:
- """Save entry to DB."""
- vals = [getattr(self, col_name) for col_name in self._cols]
- q_marks = '(' + ','.join(['?'] * len(vals)) + ')'
- sql = SqlText(f'REPLACE INTO {self._table_name} VALUES {q_marks}')
- return conn.exec(sql, tuple(vals))
-
-
-class YoutubeQuery(DbData):
- """Representation of YouTube query (without results)."""
- _table_name = 'yt_queries'
- _cols = ('id_', 'text', 'retrieved_at')
-
- def __init__(self,
- id_: Optional[QueryId],
- text: QueryText,
- retrieved_at: DatetimeStr
- ) -> None:
- self.id_ = id_ if id_ else QueryId(str(uuid4()))
- self.text = QueryText(text)
- self.retrieved_at = retrieved_at
-
- @classmethod
- def get_all_for_video(cls,
- conn: DatabaseConnection,
- video_id: YoutubeId
- ) -> list[Self]:
- """Return YoutubeQueries containing YoutubeVideo's ID in results."""
- sql = SqlText('SELECT query_id FROM '
- 'yt_query_results WHERE video_id = ?')
- query_ids = conn.exec(sql, (video_id,)).fetchall()
- return [cls.get_one(conn, query_id_tup[0])
- for query_id_tup in query_ids]
-
-
-class YoutubeVideo(DbData):
- """Representation of YouTube video metadata as provided by their API."""
- _table_name = 'yt_videos'
- _cols = ('id_', 'title', 'description', 'published_at', 'duration',
- 'definition')
-
- def __init__(self,
- id_: YoutubeId,
- title: ProseText = ProseText('?'),
- description: ProseText = ProseText('?'),
- published_at: DatetimeStr = DatetimeStr('?'),
- duration: str = '?',
- definition: str = '?'
- ) -> None:
- self.id_ = id_
- self.title = title
- self.description = description
- self.published_at = published_at
- self.duration = duration
- self.definition = definition
-
- def set_duration_from_yt_string(self, yt_string) -> None:
- """Set .duration from the kind of format the YouTube API provides."""
- date_dur, time_dur = yt_string.split('T')
- seconds = 0
- date_dur = date_dur[1:]
- for dur_char, len_seconds in (('Y', 60*60*24*365.25),
- ('M', 60*60*24*30),
- ('D', 60*60*24)):
- if dur_char in date_dur:
- dur_str, date_dur = date_dur.split(dur_char)
- seconds += int(dur_str) * int(len_seconds)
- for dur_char, len_seconds in (('H', 60*60),
- ('M', 60),
- ('S', 1)):
- if dur_char in time_dur:
- dur_str, time_dur = time_dur.split(dur_char)
- seconds += int(dur_str) * len_seconds
- seconds_str = str(seconds % 60)
- minutes_str = str(seconds // 60)
- hours_str = str(seconds // (60 * 60))
- self.duration = ':'.join([f'0{s}' if len(s) == 1 else s for s
- in (hours_str, minutes_str, seconds_str)])
-
- @classmethod
- def get_all_for_query(cls,
- conn: DatabaseConnection,
- query_id: QueryId
- ) -> list[Self]:
- """Return all videos for query of query_id."""
- sql = SqlText('SELECT video_id '
- 'FROM yt_query_results WHERE query_id = ?')
- video_ids = conn.exec(sql, (query_id,)).fetchall()
- return [cls.get_one(conn, video_id_tup[0])
- for video_id_tup in video_ids]
-
- def save_to_query(self,
- conn: DatabaseConnection,
- query_id: QueryId
- ) -> None:
- """Save inclusion of self in results to query of query_id."""
- conn.exec(SqlText('REPLACE INTO yt_query_results VALUES (?, ?)'),
- (query_id, self.id_))
-
-
-class VideoFile(DbData):
- """Collects data about downloaded files."""
- _table_name = 'files'
- _cols = ('rel_path', 'yt_id')
-
- def __init__(self, rel_path: PathStr, yt_id: YoutubeId) -> None:
- self.rel_path = rel_path
- self.yt_id = yt_id
-
- def remove(self, conn: DatabaseConnection) -> None:
- """Remove self from database by self.rel_path as identifier."""
- sql = SqlText(f'DELETE FROM {self._table_name} WHERE rel_path = ?')
- conn.exec(SqlText(sql), (self.rel_path,))
-
-
-class QuotaLog(DbData):
- """Collects API access quota costs."""
- _table_name = 'quota_costs'
- _cols = ('id_', 'timestamp', 'cost')
-
- def __init__(self,
- id_: Optional[str],
- timestamp: DatetimeStr,
- cost: QuotaCost
- ) -> None:
- self.id_ = id_ if id_ else str(uuid4())
- self.timestamp = timestamp
- self.cost = cost
-
- @classmethod
- def update(cls, conn: DatabaseConnection, cost: QuotaCost) -> None:
- """Adds cost mapped to current datetime."""
- cls._remove_old(conn)
- new = cls(None,
- DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)),
- QuotaCost(cost))
- new.save(conn)
-
- @classmethod
- def current(cls, conn: DatabaseConnection) -> QuotaCost:
- """Returns quota cost total for last 24 hours, purges old data."""
- cls._remove_old(conn)
- quota_costs = cls.get_all(conn)
- return QuotaCost(sum(c.cost for c in quota_costs))
-
- @classmethod
- def _remove_old(cls, conn: DatabaseConnection) -> None:
- cutoff = datetime.now() - timedelta(days=1)
- sql = SqlText(f'DELETE FROM {cls._table_name} WHERE timestamp < ?')
- conn.exec(SqlText(sql), (cutoff.strftime(TIMESTAMP_FMT),))
-
-
-class Player:
- """MPV representation with some additional features."""
-
- def __init__(self) -> None:
- self.last_update = PlayerUpdateId('')
- self._filenames = [PathStr(e.path) for e in scandir(PATH_DOWNLOADS)
- if isfile(e.path)
- and splitext(e.path)[1][1:] in LEGAL_EXTENSIONS]
- shuffle(self._filenames)
- self._idx: int = 0
- self._mpv: Optional[MPV] = None
-
- @property
- def _mpv_available(self) -> bool:
- return bool(self._mpv and not self._mpv.core_shutdown)
-
- @staticmethod
- def _if_mpv_available(f) -> Callable:
- def wrapper(self):
- return f(self) if self._mpv else None
- return wrapper
-
- def _signal_update(self) -> None:
- self.last_update = PlayerUpdateId(f'{self._idx}:{time()}')
+from ytplom.misc import DownloadsDb, HTTP_PORT, Player, Server, TaskHandler
- def _start_mpv(self) -> None:
- self._mpv = MPV(input_default_bindings=True,
- input_vo_keyboard=True,
- config=True)
- self._mpv.observe_property('pause', lambda a, b: self._signal_update())
- @self._mpv.event_callback('start-file')
- def on_start_file(_) -> None:
- assert self._mpv is not None
- self._mpv.pause = False
- self._idx = self._mpv.playlist_pos
- self._signal_update()
-
- @self._mpv.event_callback('shutdown')
- def on_shutdown(_) -> None:
- self._mpv = None
- self._signal_update()
-
- for path in self._filenames:
- self._mpv.playlist_append(path)
- self._mpv.playlist_play_index(self._idx)
-
- @property
- def current_filename(self) -> Optional[PathStr]:
- """Return what we assume is the name of the currently playing file."""
- if not self._filenames:
- return None
- return PathStr(basename(self._filenames[self._idx]))
-
- @property
- def prev_files(self) -> list[PathStr]:
- """List 'past' files of playlist."""
- return list(reversed(self._filenames[:self._idx]))
-
- @property
- def next_files(self) -> list[PathStr]:
- """List 'coming' files of playlist."""
- return self._filenames[self._idx + 1:]
-
- @property
- def is_running(self) -> bool:
- """Return if player is running/available."""
- return self._mpv_available
-
- @property
- def is_paused(self) -> bool:
- """Return if player is paused."""
- if self._mpv_available:
- assert self._mpv is not None
- return self._mpv.pause
- return False
-
- def toggle_run(self) -> None:
- """Toggle player running."""
- if self._mpv_available:
- assert self._mpv is not None
- self._mpv.terminate()
- self._mpv = None
- else:
- self._start_mpv()
- self._signal_update()
-
- @_if_mpv_available
- def toggle_pause(self) -> None:
- """Toggle player pausing."""
- assert self._mpv is not None
- self._mpv.pause = not self._mpv.pause
- self._signal_update()
-
- @_if_mpv_available
- def prev(self) -> None:
- """Move player to previous item in playlist."""
- assert self._mpv is not None
- if self._mpv.playlist_pos > 0:
- self._mpv.playlist_prev()
- else:
- self._mpv.playlist_play_index(0)
-
- @_if_mpv_available
- def next(self) -> None:
- """Move player to next item in playlist."""
- assert self._mpv is not None
- max_idx: int = len(self._mpv.playlist_filenames) - 1
- if self._mpv.playlist_pos < len(self._mpv.playlist_filenames) - 1:
- self._mpv.playlist_next()
- else:
- self._mpv.playlist_play_index(max_idx)
-
-
-class DownloadsDb:
- """Collections downloading-related stuff."""
-
- def __init__(self) -> None:
- self._to_download: list[YoutubeId] = []
- _ensure_expected_dirs([PATH_DOWNLOADS, PATH_TEMP])
- self._sync_db()
-
- def _sync_db(self):
- conn = DatabaseConnection()
- files_via_db = VideoFile.get_all(conn)
- old_cwd = getcwd()
- chdir(PATH_DOWNLOADS)
- for file in files_via_db:
- if not isfile(path_join(file.rel_path)):
- print(f'SYNC: no file {file.rel_path} found, removing entry.')
- file.remove(conn)
- paths = [file.rel_path for file in files_via_db]
- for path in [PathStr(e.path) for e in scandir() if isfile(e.path)]:
- if path not in paths:
- yt_id = self._id_from_filename(path)
- file = VideoFile(path, yt_id)
- print(f'SYNC: new file {path}, saving with YT ID "{yt_id}".')
- file.save(conn)
- chdir(old_cwd)
- self._files = VideoFile.get_all(conn)
- conn.commit_close()
-
- @staticmethod
- def _id_from_filename(path: PathStr,
- double_split: bool = False
- ) -> YoutubeId:
- before_ext = splitext(path)[0]
- if double_split:
- before_ext = splitext(before_ext)[0]
- return YoutubeId(before_ext.split('[')[-1].split(']')[0])
-
- @property
- def ids_to_paths(self) -> DownloadsIndex:
- """Return mapping YoutubeIds:paths of files downloaded to them."""
- self._sync_db()
- return {f.yt_id: PathStr(path_join(PATH_DOWNLOADS, f.rel_path))
- for f in self._files}
-
- @property
- def ids_unfinished(self) -> set[YoutubeId]:
- """Return set of IDs of videos awaiting or currently in download."""
- in_temp_dir = []
- for path in [PathStr(e.path) for e
- in scandir(PATH_TEMP) if isfile(e.path)]:
- in_temp_dir += [self._id_from_filename(path)]
- return set(self._to_download + in_temp_dir)
-
- def clean_unfinished(self) -> None:
- """Empty temp directory of unfinished downloads."""
- for e in [e for e in scandir(PATH_TEMP) if isfile(e.path)]:
- print(f'removing unfinished download: {e.path}')
- os_remove(e.path)
-
- def queue_download(self, video_id: YoutubeId) -> None:
- """Add video_id to download queue *if* not already processed."""
- pre_existing = self.ids_unfinished | set(self._to_download
- + list(self.ids_to_paths))
- if video_id not in pre_existing:
- self._to_download += [video_id]
-
- def _download_next(self) -> None:
- if self._to_download:
- video_id = self._to_download.pop(0)
- with YoutubeDL(YT_DL_PARAMS) as ydl:
- ydl.download([f'{YOUTUBE_URL_PREFIX}{video_id}'])
-
- def download_loop(self) -> None:
- """Keep iterating through download queue for new download tasks."""
- while True:
- sleep(0.5)
- self._download_next()
-
-
-class Server(HTTPServer):
- """Extension of HTTPServer providing for Player and DownloadsDb."""
-
- def __init__(self,
- player: Player,
- downloads_db: DownloadsDb,
- *args, **kwargs
- ) -> None:
- super().__init__(*args, **kwargs)
- self.player = player
- self.downloads = downloads_db
-
-
-class TaskHandler(BaseHTTPRequestHandler):
- """Handler for GET and POST requests to our server."""
- server: Server
-
- def _send_http(self,
- content: bytes = b'',
- headers: Optional[list[tuple[str, str]]] = None,
- code: int = 200
- ) -> None:
- headers = headers if headers else []
- self.send_response(code)
- for header_tuple in headers:
- self.send_header(header_tuple[0], header_tuple[1])
- self.end_headers()
- if content:
- self.wfile.write(content)
-
- def do_POST(self) -> None: # pylint:disable=invalid-name
- """Map POST requests to handlers for various paths."""
- url = urlparse(self.path)
- toks_url: list[str] = url.path.split('/')
- page_name = toks_url[1]
- body_length = int(self.headers['content-length'])
- postvars = parse_qs(self.rfile.read(body_length).decode())
- if 'playlist' == page_name:
- self._post_player_command(list(postvars.keys())[0])
- elif 'queries' == page_name:
- self._post_query(QueryText(postvars['query'][0]))
-
- def _post_player_command(self, commands: str) -> None:
- if 'pause' in commands:
- self.server.player.toggle_pause()
- elif 'prev' in commands:
- self.server.player.prev()
- elif 'next' in commands:
- self.server.player.next()
- elif 'stop' in commands:
- self.server.player.toggle_run()
- sleep(0.5) # avoid reload happening before current_file update
- self._send_http(headers=[('Location', '/')], code=302)
-
- def _post_query(self, query_txt: QueryText) -> None:
- conn = DatabaseConnection()
-
- def collect_results(query_txt: QueryText) -> list[YoutubeVideo]:
- youtube = googleapiclient.discovery.build('youtube', 'v3',
- developerKey=API_KEY)
- QuotaLog.update(conn, QUOTA_COST_YOUTUBE_SEARCH)
- search_request = youtube.search().list(
- q=query_txt,
- part='snippet',
- maxResults=25,
- safeSearch='none',
- type='video')
- results: list[YoutubeVideo] = []
- ids_to_detail: list[YoutubeId] = []
- for item in search_request.execute()['items']:
- video_id: YoutubeId = item['id']['videoId']
- ids_to_detail += [video_id]
- snippet = item['snippet']
- urlretrieve(snippet['thumbnails']['default']['url'],
- path_join(PATH_THUMBNAILS, f'{video_id}.jpg'))
- results += [YoutubeVideo(id_=video_id,
- title=snippet['title'],
- description=snippet['description'],
- published_at=snippet['publishedAt'])]
- QuotaLog.update(conn, QUOTA_COST_YOUTUBE_DETAILS)
- ids_for_details = ','.join([r.id_ for r in results])
- videos_request = youtube.videos().list(id=ids_for_details,
- part='content_details')
- for i, detailed in enumerate(videos_request.execute()['items']):
- result = results[i]
- assert result.id_ == detailed['id']
- content_details: dict[str, str] = detailed['contentDetails']
- result.set_duration_from_yt_string(content_details['duration'])
- result.definition = content_details['definition'].upper()
- return results
-
- query_data = YoutubeQuery(
- None, query_txt,
- DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)))
- query_data.save(conn)
- for result in collect_results(query_txt):
- result.save(conn)
- result.save_to_query(conn, query_data.id_)
- conn.commit_close()
- self._send_http(headers=[('Location', f'/query/{query_data.id_}')],
- code=302)
-
- def do_GET(self) -> None: # pylint:disable=invalid-name
- """Map GET requests to handlers for various paths."""
- url = urlparse(self.path)
- toks_url: list[str] = url.path.split('/')
- page_name = toks_url[1]
- try:
- if 'thumbnails' == page_name:
- self._send_thumbnail(PathStr(toks_url[2]))
- elif 'dl' == page_name:
- self._send_or_download_video(YoutubeId(toks_url[2]))
- elif 'videos' == page_name:
- self._send_videos_index()
- elif 'video_about' == page_name:
- self._send_video_about(YoutubeId(toks_url[2]))
- elif 'query' == page_name:
- self._send_query_page(QueryId(toks_url[2]))
- elif 'queries' == page_name:
- self._send_queries_index_and_search()
- elif '_last_playlist_update.json' == page_name:
- self._send_last_playlist_update()
- else: # e.g. for /
- self._send_playlist()
- except NotFoundException:
- self._send_http(b'not found', code=404)
-
- def _send_rendered_template(self,
- tmpl_name: PathStr,
- tmpl_ctx: TemplateContext
- ) -> None:
- with open(path_join(PATH_TEMPLATES, tmpl_name),
- 'r', encoding='utf8'
- ) as templ_file:
- tmpl = Template(str(templ_file.read()))
- html = tmpl.render(**tmpl_ctx)
- self._send_http(bytes(html, 'utf8'))
-
- def _send_thumbnail(self, filename: PathStr) -> None:
- _ensure_expected_dirs([PATH_THUMBNAILS])
- path_thumbnail = path_join(PATH_THUMBNAILS, filename)
- if not path_exists(path_thumbnail):
- video_id = splitext(filename)[0]
- url = f'{THUMBNAIL_URL_PREFIX}{video_id}{THUMBNAIL_URL_SUFFIX}'
- try:
- urlretrieve(url, path_join(PATH_THUMBNAILS, f'{video_id}.jpg'))
- except HTTPError as e:
- if 404 == e.code:
- raise NotFoundException from e
- raise e
- with open(path_thumbnail, 'rb') as f:
- img = f.read()
- self._send_http(img, [('Content-type', 'image/jpg')])
-
- def _send_or_download_video(self, video_id: YoutubeId) -> None:
- if video_id in self.server.downloads.ids_to_paths:
- with open(self.server.downloads.ids_to_paths[video_id],
- 'rb') as video_file:
- video = video_file.read()
- self._send_http(content=video)
- return
- self.server.downloads.queue_download(video_id)
- self._send_http(headers=[('Location', f'/video_about/{video_id}')],
- code=302)
-
- def _send_query_page(self, query_id: QueryId) -> None:
- conn = DatabaseConnection()
- query = YoutubeQuery.get_one(conn, str(query_id))
- results = YoutubeVideo.get_all_for_query(conn, query_id)
- conn.commit_close()
- self._send_rendered_template(
- NAME_TEMPLATE_RESULTS,
- {'query': query.text, 'videos': results})
-
- def _send_queries_index_and_search(self) -> None:
- conn = DatabaseConnection()
- quota_count = QuotaLog.current(conn)
- queries_data = YoutubeQuery.get_all(conn)
- conn.commit_close()
- queries_data.sort(key=lambda q: q.retrieved_at, reverse=True)
- self._send_rendered_template(
- NAME_TEMPLATE_QUERIES, {'queries': queries_data,
- 'quota_count': quota_count})
-
- def _send_video_about(self, video_id: YoutubeId) -> None:
- conn = DatabaseConnection()
- linked_queries = YoutubeQuery.get_all_for_video(conn, video_id)
- try:
- video_data = YoutubeVideo.get_one(conn, video_id)
- except NotFoundException:
- video_data = YoutubeVideo(video_id)
- conn.commit_close()
- self._send_rendered_template(
- NAME_TEMPLATE_VIDEO_ABOUT,
- {'video_data': video_data,
- 'is_temp': video_id in self.server.downloads.ids_unfinished,
- 'file_path': self.server.downloads.ids_to_paths.get(video_id,
- None),
- 'youtube_prefix': YOUTUBE_URL_PREFIX,
- 'queries': linked_queries})
-
- def _send_videos_index(self) -> None:
- videos = [(id_, PathStr(basename(path)))
- for id_, path in self.server.downloads.ids_to_paths.items()]
- videos.sort(key=lambda t: t[1])
- self._send_rendered_template(NAME_TEMPLATE_VIDEOS, {'videos': videos})
-
- def _send_last_playlist_update(self) -> None:
- payload: dict[str, PlayerUpdateId] = {
- 'last_update': self.server.player.last_update}
- self._send_http(bytes(json_dumps(payload), 'utf8'),
- headers=[('Content-type', 'application/json')])
-
- def _send_playlist(self) -> None:
- tuples: list[tuple[PathStr, PathStr]] = []
- i: int = 0
- while True:
- prev, next_ = PathStr(''), PathStr('')
- if len(self.server.player.prev_files) > i:
- prev = PathStr(basename(self.server.player.prev_files[i]))
- if len(self.server.player.next_files) > i:
- next_ = PathStr(basename(self.server.player.next_files[i]))
- if not prev + next_:
- break
- tuples += [(prev, next_)]
- i += 1
- self._send_rendered_template(
- NAME_TEMPLATE_PLAYLIST,
- {'last_update': self.server.player.last_update,
- 'running': self.server.player.is_running,
- 'paused': self.server.player.is_paused,
- 'current_title': self.server.player.current_filename,
- 'tuples': tuples})
-
-
-def run():
- """Create DownloadsDb, Player, run server loop."""
+if __name__ == '__main__':
downloads_db = DownloadsDb()
downloads_db.clean_unfinished()
Thread(target=downloads_db.download_loop, daemon=False).start()
print('aborted due to keyboard interrupt; '
'repeat to end download thread too')
server.server_close()
-
-
-if __name__ == '__main__':
- run()
--- /dev/null
+"""Main ytplom lib."""
+
+# included libs
+from typing import TypeAlias, Optional, NewType, Callable, Self, Any
+from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove
+from os.path import (isdir, isfile, exists as path_exists, join as path_join,
+ splitext, basename)
+from random import shuffle
+from time import time, sleep
+from datetime import datetime, timedelta
+from json import dumps as json_dumps
+from uuid import uuid4
+from sqlite3 import connect as sql_connect, Cursor, Row
+from http.server import HTTPServer, BaseHTTPRequestHandler
+from urllib.parse import urlparse, parse_qs
+from urllib.request import urlretrieve
+from urllib.error import HTTPError
+# non-included libs
+from jinja2 import Template
+from mpv import MPV # type: ignore
+from yt_dlp import YoutubeDL # type: ignore
+import googleapiclient.discovery # type: ignore
+
+# what we might want to manually define per environs
+API_KEY = environ.get('GOOGLE_API_KEY')
+HTTP_PORT = int(environ.get('YTPLOM_PORT', 8084))
+
+# type definitions for mypy
+DatetimeStr = NewType('DatetimeStr', str)
+QuotaCost = NewType('QuotaCost', int)
+YoutubeId = NewType('YoutubeId', str)
+PathStr = NewType('PathStr', str)
+QueryId = NewType('QueryId', str)
+QueryText = NewType('QueryText', str)
+ProseText = NewType('ProseText', str)
+SqlText = NewType('SqlText', str)
+AmountDownloads = NewType('AmountDownloads', int)
+PlayerUpdateId = NewType('PlayerUpdateId', str)
+DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr]
+TemplateContext: TypeAlias = dict[
+ str, None | bool | PlayerUpdateId | Optional[PathStr] | YoutubeId
+ | QueryText | QuotaCost | 'YoutubeVideo' | list['YoutubeVideo']
+ | list['YoutubeQuery'] | list[tuple[YoutubeId, PathStr]]
+ | list[tuple[PathStr, PathStr]]]
+
+# local data reasonably expected to be in user home directory
+PATH_HOME = PathStr(environ.get('HOME', ''))
+PATH_WORKDIR = PathStr(path_join(PATH_HOME, 'ytplom'))
+PATH_THUMBNAILS = PathStr(path_join(PATH_WORKDIR, 'thumbnails'))
+PATH_DB = PathStr(path_join(PATH_WORKDIR, 'db.sql'))
+PATH_DOWNLOADS = PathStr(path_join(PATH_WORKDIR, 'downloads'))
+PATH_TEMP = PathStr(path_join(PATH_WORKDIR, 'temp'))
+
+# template paths; might move outside PATH_WORKDIR in the future
+PATH_TEMPLATES = PathStr(path_join(PATH_WORKDIR, 'templates'))
+NAME_TEMPLATE_QUERIES = PathStr('queries.tmpl')
+NAME_TEMPLATE_RESULTS = PathStr('results.tmpl')
+NAME_TEMPLATE_VIDEOS = PathStr('videos.tmpl')
+NAME_TEMPLATE_VIDEO_ABOUT = PathStr('video_about.tmpl')
+NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl')
+PATH_TEMPLATE_QUERIES = PathStr(path_join(PATH_TEMPLATES,
+ NAME_TEMPLATE_QUERIES))
+
+# yt_dlp config
+YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
+ '/best[height<=1080][width<=1920]'
+YT_DL_PARAMS = {'paths': {'home': PATH_DOWNLOADS,
+ 'temp': PATH_TEMP},
+ 'format': YT_DOWNLOAD_FORMAT}
+
+# Youtube API expectations
+YOUTUBE_URL_PREFIX = PathStr('https://www.youtube.com/watch?v=')
+THUMBNAIL_URL_PREFIX = PathStr('https://i.ytimg.com/vi/')
+THUMBNAIL_URL_SUFFIX = PathStr('/default.jpg')
+QUOTA_COST_YOUTUBE_SEARCH = QuotaCost(100)
+QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1)
+
+# local expectations
+TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
+LEGAL_EXTENSIONS = {'webm', 'mp4', 'mkv'}
+
+# tables to create database with
+SCRIPT_INIT_DB = '''
+CREATE TABLE yt_queries (
+ id TEXT PRIMARY KEY,
+ text TEXT NOT NULL,
+ retrieved_at TEXT NOT NULL
+);
+CREATE TABLE yt_videos (
+ id TEXT PRIMARY KEY,
+ title TEXT NOT NULL,
+ description TEXT NOT NULL,
+ published_at TEXT NOT NULL,
+ duration TEXT NOT NULL,
+ definition TEXT NOT NULL
+);
+CREATE TABLE yt_query_results (
+ query_id TEXT NOT NULL,
+ video_id TEXT NOT NULL,
+ PRIMARY KEY (query_id, video_id),
+ FOREIGN KEY (query_id) REFERENCES yt_queries(id),
+ FOREIGN KEY (video_id) REFERENCES yt_videos(id)
+);
+CREATE TABLE quota_costs (
+ id TEXT PRIMARY KEY,
+ timestamp TEXT NOT NULL,
+ cost INT NOT NULL
+);
+CREATE TABLE files (
+ rel_path TEXT PRIMARY KEY,
+ yt_id TEXT NOT NULL DEFAULT "",
+ FOREIGN KEY (yt_id) REFERENCES yt_videos(id)
+);
+'''
+
+
+class NotFoundException(BaseException):
+ """Call on DB fetches finding less than expected."""
+
+
+def _ensure_expected_dirs(expected_dirs: list[PathStr]) -> None:
+ """Ensure existance of expected_dirs _as_ directories."""
+ for dir_name in expected_dirs:
+ if not path_exists(dir_name):
+ print(f'creating expected directory: {dir_name}')
+ makedirs(dir_name)
+ elif not isdir(dir_name):
+ msg = f'at expected directory path {dir_name} found non-directory'
+ raise Exception(msg)
+
+
+class DatabaseConnection:
+ """Wrapped sqlite3.Connection."""
+
+ def __init__(self) -> None:
+ if not path_exists(PATH_DB):
+ with sql_connect(PATH_DB) as conn:
+ conn.executescript(SCRIPT_INIT_DB)
+ self._conn = sql_connect(PATH_DB)
+
+ def exec(self, sql: SqlText, inputs: tuple[Any, ...] = tuple()) -> Cursor:
+ """Wrapper around sqlite3.Connection.execute."""
+ return self._conn.execute(sql, inputs)
+
+ def commit_close(self) -> None:
+ """Run sqlite3.Connection.commit and .close."""
+ self._conn.commit()
+ self._conn.close()
+
+
+class DbData:
+ """Abstraction of common DB operation."""
+ _table_name: str
+ _cols: tuple[str, ...]
+
+ @classmethod
+ def _from_table_row(cls, row: Row) -> Self:
+ kwargs = {}
+ for i, col_name in enumerate(cls._cols):
+ kwargs[col_name] = row[i]
+ return cls(**kwargs)
+
+ @classmethod
+ def get_one(cls, conn: DatabaseConnection, id_: str) -> Self:
+ """Return single entry of id_ from DB."""
+ sql = SqlText(f'SELECT * FROM {cls._table_name} WHERE id = ?')
+ row = conn.exec(sql, (id_,)).fetchone()
+ if not row:
+ raise NotFoundException
+ return cls._from_table_row(row)
+
+ @classmethod
+ def get_all(cls, conn: DatabaseConnection) -> list[Self]:
+ """Return all entries from DB."""
+ sql = SqlText(f'SELECT * FROM {cls._table_name}')
+ rows = conn.exec(sql).fetchall()
+ return [cls._from_table_row(row) for row in rows]
+
+ def save(self, conn: DatabaseConnection) -> Cursor:
+ """Save entry to DB."""
+ vals = [getattr(self, col_name) for col_name in self._cols]
+ q_marks = '(' + ','.join(['?'] * len(vals)) + ')'
+ sql = SqlText(f'REPLACE INTO {self._table_name} VALUES {q_marks}')
+ return conn.exec(sql, tuple(vals))
+
+
+class YoutubeQuery(DbData):
+ """Representation of YouTube query (without results)."""
+ _table_name = 'yt_queries'
+ _cols = ('id_', 'text', 'retrieved_at')
+
+ def __init__(self,
+ id_: Optional[QueryId],
+ text: QueryText,
+ retrieved_at: DatetimeStr
+ ) -> None:
+ self.id_ = id_ if id_ else QueryId(str(uuid4()))
+ self.text = QueryText(text)
+ self.retrieved_at = retrieved_at
+
+ @classmethod
+ def get_all_for_video(cls,
+ conn: DatabaseConnection,
+ video_id: YoutubeId
+ ) -> list[Self]:
+ """Return YoutubeQueries containing YoutubeVideo's ID in results."""
+ sql = SqlText('SELECT query_id FROM '
+ 'yt_query_results WHERE video_id = ?')
+ query_ids = conn.exec(sql, (video_id,)).fetchall()
+ return [cls.get_one(conn, query_id_tup[0])
+ for query_id_tup in query_ids]
+
+
+class YoutubeVideo(DbData):
+ """Representation of YouTube video metadata as provided by their API."""
+ _table_name = 'yt_videos'
+ _cols = ('id_', 'title', 'description', 'published_at', 'duration',
+ 'definition')
+
+ def __init__(self,
+ id_: YoutubeId,
+ title: ProseText = ProseText('?'),
+ description: ProseText = ProseText('?'),
+ published_at: DatetimeStr = DatetimeStr('?'),
+ duration: str = '?',
+ definition: str = '?'
+ ) -> None:
+ self.id_ = id_
+ self.title = title
+ self.description = description
+ self.published_at = published_at
+ self.duration = duration
+ self.definition = definition
+
+ def set_duration_from_yt_string(self, yt_string) -> None:
+ """Set .duration from the kind of format the YouTube API provides."""
+ date_dur, time_dur = yt_string.split('T')
+ seconds = 0
+ date_dur = date_dur[1:]
+ for dur_char, len_seconds in (('Y', 60*60*24*365.25),
+ ('M', 60*60*24*30),
+ ('D', 60*60*24)):
+ if dur_char in date_dur:
+ dur_str, date_dur = date_dur.split(dur_char)
+ seconds += int(dur_str) * int(len_seconds)
+ for dur_char, len_seconds in (('H', 60*60),
+ ('M', 60),
+ ('S', 1)):
+ if dur_char in time_dur:
+ dur_str, time_dur = time_dur.split(dur_char)
+ seconds += int(dur_str) * len_seconds
+ seconds_str = str(seconds % 60)
+ minutes_str = str(seconds // 60)
+ hours_str = str(seconds // (60 * 60))
+ self.duration = ':'.join([f'0{s}' if len(s) == 1 else s for s
+ in (hours_str, minutes_str, seconds_str)])
+
+ @classmethod
+ def get_all_for_query(cls,
+ conn: DatabaseConnection,
+ query_id: QueryId
+ ) -> list[Self]:
+ """Return all videos for query of query_id."""
+ sql = SqlText('SELECT video_id '
+ 'FROM yt_query_results WHERE query_id = ?')
+ video_ids = conn.exec(sql, (query_id,)).fetchall()
+ return [cls.get_one(conn, video_id_tup[0])
+ for video_id_tup in video_ids]
+
+ def save_to_query(self,
+ conn: DatabaseConnection,
+ query_id: QueryId
+ ) -> None:
+ """Save inclusion of self in results to query of query_id."""
+ conn.exec(SqlText('REPLACE INTO yt_query_results VALUES (?, ?)'),
+ (query_id, self.id_))
+
+
+class VideoFile(DbData):
+ """Collects data about downloaded files."""
+ _table_name = 'files'
+ _cols = ('rel_path', 'yt_id')
+
+ def __init__(self, rel_path: PathStr, yt_id: YoutubeId) -> None:
+ self.rel_path = rel_path
+ self.yt_id = yt_id
+
+ def remove(self, conn: DatabaseConnection) -> None:
+ """Remove self from database by self.rel_path as identifier."""
+ sql = SqlText(f'DELETE FROM {self._table_name} WHERE rel_path = ?')
+ conn.exec(SqlText(sql), (self.rel_path,))
+
+
+class QuotaLog(DbData):
+ """Collects API access quota costs."""
+ _table_name = 'quota_costs'
+ _cols = ('id_', 'timestamp', 'cost')
+
+ def __init__(self,
+ id_: Optional[str],
+ timestamp: DatetimeStr,
+ cost: QuotaCost
+ ) -> None:
+ self.id_ = id_ if id_ else str(uuid4())
+ self.timestamp = timestamp
+ self.cost = cost
+
+ @classmethod
+ def update(cls, conn: DatabaseConnection, cost: QuotaCost) -> None:
+ """Adds cost mapped to current datetime."""
+ cls._remove_old(conn)
+ new = cls(None,
+ DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)),
+ QuotaCost(cost))
+ new.save(conn)
+
+ @classmethod
+ def current(cls, conn: DatabaseConnection) -> QuotaCost:
+ """Returns quota cost total for last 24 hours, purges old data."""
+ cls._remove_old(conn)
+ quota_costs = cls.get_all(conn)
+ return QuotaCost(sum(c.cost for c in quota_costs))
+
+ @classmethod
+ def _remove_old(cls, conn: DatabaseConnection) -> None:
+ cutoff = datetime.now() - timedelta(days=1)
+ sql = SqlText(f'DELETE FROM {cls._table_name} WHERE timestamp < ?')
+ conn.exec(SqlText(sql), (cutoff.strftime(TIMESTAMP_FMT),))
+
+
+class Player:
+ """MPV representation with some additional features."""
+
+ def __init__(self) -> None:
+ self.last_update = PlayerUpdateId('')
+ self._filenames = [PathStr(e.path) for e in scandir(PATH_DOWNLOADS)
+ if isfile(e.path)
+ and splitext(e.path)[1][1:] in LEGAL_EXTENSIONS]
+ shuffle(self._filenames)
+ self._idx: int = 0
+ self._mpv: Optional[MPV] = None
+
+ @property
+ def _mpv_available(self) -> bool:
+ return bool(self._mpv and not self._mpv.core_shutdown)
+
+ @staticmethod
+ def _if_mpv_available(f) -> Callable:
+ def wrapper(self):
+ return f(self) if self._mpv else None
+ return wrapper
+
+ def _signal_update(self) -> None:
+ self.last_update = PlayerUpdateId(f'{self._idx}:{time()}')
+
+ def _start_mpv(self) -> None:
+ self._mpv = MPV(input_default_bindings=True,
+ input_vo_keyboard=True,
+ config=True)
+ self._mpv.observe_property('pause', lambda a, b: self._signal_update())
+
+ @self._mpv.event_callback('start-file')
+ def on_start_file(_) -> None:
+ assert self._mpv is not None
+ self._mpv.pause = False
+ self._idx = self._mpv.playlist_pos
+ self._signal_update()
+
+ @self._mpv.event_callback('shutdown')
+ def on_shutdown(_) -> None:
+ self._mpv = None
+ self._signal_update()
+
+ for path in self._filenames:
+ self._mpv.playlist_append(path)
+ self._mpv.playlist_play_index(self._idx)
+
+ @property
+ def current_filename(self) -> Optional[PathStr]:
+ """Return what we assume is the name of the currently playing file."""
+ if not self._filenames:
+ return None
+ return PathStr(basename(self._filenames[self._idx]))
+
+ @property
+ def prev_files(self) -> list[PathStr]:
+ """List 'past' files of playlist."""
+ return list(reversed(self._filenames[:self._idx]))
+
+ @property
+ def next_files(self) -> list[PathStr]:
+ """List 'coming' files of playlist."""
+ return self._filenames[self._idx + 1:]
+
+ @property
+ def is_running(self) -> bool:
+ """Return if player is running/available."""
+ return self._mpv_available
+
+ @property
+ def is_paused(self) -> bool:
+ """Return if player is paused."""
+ if self._mpv_available:
+ assert self._mpv is not None
+ return self._mpv.pause
+ return False
+
+ def toggle_run(self) -> None:
+ """Toggle player running."""
+ if self._mpv_available:
+ assert self._mpv is not None
+ self._mpv.terminate()
+ self._mpv = None
+ else:
+ self._start_mpv()
+ self._signal_update()
+
+ @_if_mpv_available
+ def toggle_pause(self) -> None:
+ """Toggle player pausing."""
+ assert self._mpv is not None
+ self._mpv.pause = not self._mpv.pause
+ self._signal_update()
+
+ @_if_mpv_available
+ def prev(self) -> None:
+ """Move player to previous item in playlist."""
+ assert self._mpv is not None
+ if self._mpv.playlist_pos > 0:
+ self._mpv.playlist_prev()
+ else:
+ self._mpv.playlist_play_index(0)
+
+ @_if_mpv_available
+ def next(self) -> None:
+ """Move player to next item in playlist."""
+ assert self._mpv is not None
+ max_idx: int = len(self._mpv.playlist_filenames) - 1
+ if self._mpv.playlist_pos < len(self._mpv.playlist_filenames) - 1:
+ self._mpv.playlist_next()
+ else:
+ self._mpv.playlist_play_index(max_idx)
+
+
+class DownloadsDb:
+ """Collections downloading-related stuff."""
+
+ def __init__(self) -> None:
+ self._to_download: list[YoutubeId] = []
+ _ensure_expected_dirs([PATH_DOWNLOADS, PATH_TEMP])
+ self._sync_db()
+
+ def _sync_db(self):
+ conn = DatabaseConnection()
+ files_via_db = VideoFile.get_all(conn)
+ old_cwd = getcwd()
+ chdir(PATH_DOWNLOADS)
+ for file in files_via_db:
+ if not isfile(path_join(file.rel_path)):
+ print(f'SYNC: no file {file.rel_path} found, removing entry.')
+ file.remove(conn)
+ paths = [file.rel_path for file in files_via_db]
+ for path in [PathStr(e.path) for e in scandir() if isfile(e.path)]:
+ if path not in paths:
+ yt_id = self._id_from_filename(path)
+ file = VideoFile(path, yt_id)
+ print(f'SYNC: new file {path}, saving with YT ID "{yt_id}".')
+ file.save(conn)
+ chdir(old_cwd)
+ self._files = VideoFile.get_all(conn)
+ conn.commit_close()
+
+ @staticmethod
+ def _id_from_filename(path: PathStr,
+ double_split: bool = False
+ ) -> YoutubeId:
+ before_ext = splitext(path)[0]
+ if double_split:
+ before_ext = splitext(before_ext)[0]
+ return YoutubeId(before_ext.split('[')[-1].split(']')[0])
+
+ @property
+ def ids_to_paths(self) -> DownloadsIndex:
+ """Return mapping YoutubeIds:paths of files downloaded to them."""
+ self._sync_db()
+ return {f.yt_id: PathStr(path_join(PATH_DOWNLOADS, f.rel_path))
+ for f in self._files}
+
+ @property
+ def ids_unfinished(self) -> set[YoutubeId]:
+ """Return set of IDs of videos awaiting or currently in download."""
+ in_temp_dir = []
+ for path in [PathStr(e.path) for e
+ in scandir(PATH_TEMP) if isfile(e.path)]:
+ in_temp_dir += [self._id_from_filename(path)]
+ return set(self._to_download + in_temp_dir)
+
+ def clean_unfinished(self) -> None:
+ """Empty temp directory of unfinished downloads."""
+ for e in [e for e in scandir(PATH_TEMP) if isfile(e.path)]:
+ print(f'removing unfinished download: {e.path}')
+ os_remove(e.path)
+
+ def queue_download(self, video_id: YoutubeId) -> None:
+ """Add video_id to download queue *if* not already processed."""
+ pre_existing = self.ids_unfinished | set(self._to_download
+ + list(self.ids_to_paths))
+ if video_id not in pre_existing:
+ self._to_download += [video_id]
+
+ def _download_next(self) -> None:
+ if self._to_download:
+ video_id = self._to_download.pop(0)
+ with YoutubeDL(YT_DL_PARAMS) as ydl:
+ ydl.download([f'{YOUTUBE_URL_PREFIX}{video_id}'])
+
+ def download_loop(self) -> None:
+ """Keep iterating through download queue for new download tasks."""
+ while True:
+ sleep(0.5)
+ self._download_next()
+
+
+class Server(HTTPServer):
+ """Extension of HTTPServer providing for Player and DownloadsDb."""
+
+ def __init__(self,
+ player: Player,
+ downloads_db: DownloadsDb,
+ *args, **kwargs
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.player = player
+ self.downloads = downloads_db
+
+
+class TaskHandler(BaseHTTPRequestHandler):
+ """Handler for GET and POST requests to our server."""
+ server: Server
+
+ def _send_http(self,
+ content: bytes = b'',
+ headers: Optional[list[tuple[str, str]]] = None,
+ code: int = 200
+ ) -> None:
+ headers = headers if headers else []
+ self.send_response(code)
+ for header_tuple in headers:
+ self.send_header(header_tuple[0], header_tuple[1])
+ self.end_headers()
+ if content:
+ self.wfile.write(content)
+
+ def do_POST(self) -> None: # pylint:disable=invalid-name
+ """Map POST requests to handlers for various paths."""
+ url = urlparse(self.path)
+ toks_url: list[str] = url.path.split('/')
+ page_name = toks_url[1]
+ body_length = int(self.headers['content-length'])
+ postvars = parse_qs(self.rfile.read(body_length).decode())
+ if 'playlist' == page_name:
+ self._post_player_command(list(postvars.keys())[0])
+ elif 'queries' == page_name:
+ self._post_query(QueryText(postvars['query'][0]))
+
+ def _post_player_command(self, commands: str) -> None:
+ if 'pause' in commands:
+ self.server.player.toggle_pause()
+ elif 'prev' in commands:
+ self.server.player.prev()
+ elif 'next' in commands:
+ self.server.player.next()
+ elif 'stop' in commands:
+ self.server.player.toggle_run()
+ sleep(0.5) # avoid reload happening before current_file update
+ self._send_http(headers=[('Location', '/')], code=302)
+
+ def _post_query(self, query_txt: QueryText) -> None:
+ conn = DatabaseConnection()
+
+ def collect_results(query_txt: QueryText) -> list[YoutubeVideo]:
+ youtube = googleapiclient.discovery.build('youtube', 'v3',
+ developerKey=API_KEY)
+ QuotaLog.update(conn, QUOTA_COST_YOUTUBE_SEARCH)
+ search_request = youtube.search().list(
+ q=query_txt,
+ part='snippet',
+ maxResults=25,
+ safeSearch='none',
+ type='video')
+ results: list[YoutubeVideo] = []
+ ids_to_detail: list[YoutubeId] = []
+ for item in search_request.execute()['items']:
+ video_id: YoutubeId = item['id']['videoId']
+ ids_to_detail += [video_id]
+ snippet = item['snippet']
+ urlretrieve(snippet['thumbnails']['default']['url'],
+ path_join(PATH_THUMBNAILS, f'{video_id}.jpg'))
+ results += [YoutubeVideo(id_=video_id,
+ title=snippet['title'],
+ description=snippet['description'],
+ published_at=snippet['publishedAt'])]
+ QuotaLog.update(conn, QUOTA_COST_YOUTUBE_DETAILS)
+ ids_for_details = ','.join([r.id_ for r in results])
+ videos_request = youtube.videos().list(id=ids_for_details,
+ part='content_details')
+ for i, detailed in enumerate(videos_request.execute()['items']):
+ result = results[i]
+ assert result.id_ == detailed['id']
+ content_details: dict[str, str] = detailed['contentDetails']
+ result.set_duration_from_yt_string(content_details['duration'])
+ result.definition = content_details['definition'].upper()
+ return results
+
+ query_data = YoutubeQuery(
+ None, query_txt,
+ DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)))
+ query_data.save(conn)
+ for result in collect_results(query_txt):
+ result.save(conn)
+ result.save_to_query(conn, query_data.id_)
+ conn.commit_close()
+ self._send_http(headers=[('Location', f'/query/{query_data.id_}')],
+ code=302)
+
+ def do_GET(self) -> None: # pylint:disable=invalid-name
+ """Map GET requests to handlers for various paths."""
+ url = urlparse(self.path)
+ toks_url: list[str] = url.path.split('/')
+ page_name = toks_url[1]
+ try:
+ if 'thumbnails' == page_name:
+ self._send_thumbnail(PathStr(toks_url[2]))
+ elif 'dl' == page_name:
+ self._send_or_download_video(YoutubeId(toks_url[2]))
+ elif 'videos' == page_name:
+ self._send_videos_index()
+ elif 'video_about' == page_name:
+ self._send_video_about(YoutubeId(toks_url[2]))
+ elif 'query' == page_name:
+ self._send_query_page(QueryId(toks_url[2]))
+ elif 'queries' == page_name:
+ self._send_queries_index_and_search()
+ elif '_last_playlist_update.json' == page_name:
+ self._send_last_playlist_update()
+ else: # e.g. for /
+ self._send_playlist()
+ except NotFoundException:
+ self._send_http(b'not found', code=404)
+
+ def _send_rendered_template(self,
+ tmpl_name: PathStr,
+ tmpl_ctx: TemplateContext
+ ) -> None:
+ with open(path_join(PATH_TEMPLATES, tmpl_name),
+ 'r', encoding='utf8'
+ ) as templ_file:
+ tmpl = Template(str(templ_file.read()))
+ html = tmpl.render(**tmpl_ctx)
+ self._send_http(bytes(html, 'utf8'))
+
+ def _send_thumbnail(self, filename: PathStr) -> None:
+ _ensure_expected_dirs([PATH_THUMBNAILS])
+ path_thumbnail = path_join(PATH_THUMBNAILS, filename)
+ if not path_exists(path_thumbnail):
+ video_id = splitext(filename)[0]
+ url = f'{THUMBNAIL_URL_PREFIX}{video_id}{THUMBNAIL_URL_SUFFIX}'
+ try:
+ urlretrieve(url, path_join(PATH_THUMBNAILS, f'{video_id}.jpg'))
+ except HTTPError as e:
+ if 404 == e.code:
+ raise NotFoundException from e
+ raise e
+ with open(path_thumbnail, 'rb') as f:
+ img = f.read()
+ self._send_http(img, [('Content-type', 'image/jpg')])
+
+ def _send_or_download_video(self, video_id: YoutubeId) -> None:
+ if video_id in self.server.downloads.ids_to_paths:
+ with open(self.server.downloads.ids_to_paths[video_id],
+ 'rb') as video_file:
+ video = video_file.read()
+ self._send_http(content=video)
+ return
+ self.server.downloads.queue_download(video_id)
+ self._send_http(headers=[('Location', f'/video_about/{video_id}')],
+ code=302)
+
+ def _send_query_page(self, query_id: QueryId) -> None:
+ conn = DatabaseConnection()
+ query = YoutubeQuery.get_one(conn, str(query_id))
+ results = YoutubeVideo.get_all_for_query(conn, query_id)
+ conn.commit_close()
+ self._send_rendered_template(
+ NAME_TEMPLATE_RESULTS,
+ {'query': query.text, 'videos': results})
+
+ def _send_queries_index_and_search(self) -> None:
+ conn = DatabaseConnection()
+ quota_count = QuotaLog.current(conn)
+ queries_data = YoutubeQuery.get_all(conn)
+ conn.commit_close()
+ queries_data.sort(key=lambda q: q.retrieved_at, reverse=True)
+ self._send_rendered_template(
+ NAME_TEMPLATE_QUERIES, {'queries': queries_data,
+ 'quota_count': quota_count})
+
+ def _send_video_about(self, video_id: YoutubeId) -> None:
+ conn = DatabaseConnection()
+ linked_queries = YoutubeQuery.get_all_for_video(conn, video_id)
+ try:
+ video_data = YoutubeVideo.get_one(conn, video_id)
+ except NotFoundException:
+ video_data = YoutubeVideo(video_id)
+ conn.commit_close()
+ self._send_rendered_template(
+ NAME_TEMPLATE_VIDEO_ABOUT,
+ {'video_data': video_data,
+ 'is_temp': video_id in self.server.downloads.ids_unfinished,
+ 'file_path': self.server.downloads.ids_to_paths.get(video_id,
+ None),
+ 'youtube_prefix': YOUTUBE_URL_PREFIX,
+ 'queries': linked_queries})
+
+ def _send_videos_index(self) -> None:
+ videos = [(id_, PathStr(basename(path)))
+ for id_, path in self.server.downloads.ids_to_paths.items()]
+ videos.sort(key=lambda t: t[1])
+ self._send_rendered_template(NAME_TEMPLATE_VIDEOS, {'videos': videos})
+
+ def _send_last_playlist_update(self) -> None:
+ payload: dict[str, PlayerUpdateId] = {
+ 'last_update': self.server.player.last_update}
+ self._send_http(bytes(json_dumps(payload), 'utf8'),
+ headers=[('Content-type', 'application/json')])
+
+ def _send_playlist(self) -> None:
+ tuples: list[tuple[PathStr, PathStr]] = []
+ i: int = 0
+ while True:
+ prev, next_ = PathStr(''), PathStr('')
+ if len(self.server.player.prev_files) > i:
+ prev = PathStr(basename(self.server.player.prev_files[i]))
+ if len(self.server.player.next_files) > i:
+ next_ = PathStr(basename(self.server.player.next_files[i]))
+ if not prev + next_:
+ break
+ tuples += [(prev, next_)]
+ i += 1
+ self._send_rendered_template(
+ NAME_TEMPLATE_PLAYLIST,
+ {'last_update': self.server.player.last_update,
+ 'running': self.server.player.is_running,
+ 'paused': self.server.player.is_paused,
+ 'current_title': self.server.player.current_filename,
+ 'tuples': tuples})