#!/usr/bin/env python3
"""Minimalistic download-focused YouTube interface."""
-from typing import TypeAlias, Optional, NewType
+from typing import TypeAlias, Optional, NewType, Callable
from os import environ, makedirs, scandir, remove as os_remove
from os.path import (isdir, isfile, exists as path_exists, join as path_join,
splitext, basename)
-from time import sleep
-from json import load as json_load, dump as json_dump
+from random import shuffle
+from time import time, sleep
+from json import load as json_load, dump as json_dump, dumps as json_dumps
from datetime import datetime, timedelta
from threading import Thread
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.request import urlretrieve
from hashlib import md5
from jinja2 import Template
+from mpv import MPV # type: ignore
from yt_dlp import YoutubeDL # type: ignore
import googleapiclient.discovery # type: ignore
QueryId = NewType('QueryId', str)
QueryText = NewType('QueryText', str)
AmountDownloads = NewType('AmountDownloads', int)
+PlayerUpdateId = NewType('PlayerUpdateId', str)
Result: TypeAlias = dict[str, str]
Header: TypeAlias = tuple[str, str]
VideoData: TypeAlias = dict[str, str | bool]
| AmountDownloads | list[Result]]
QuotaLog: TypeAlias = dict[DatetimeStr, QuotaCost]
DownloadsDB = dict[VideoId, PathStr]
-TemplateContext = dict[str, PathStr | VideoId | QuotaCost | QueryData
+TemplateContext = dict[str, None | bool | PlayerUpdateId | PathStr | VideoId
+ | QuotaCost | QueryData
| VideoData | list[QueryData] | list[VideoData]
| list[tuple[VideoId, PathStr]]
- | list[tuple[QueryId, QueryText]]]
+ | list[tuple[QueryId, QueryText]]
+ | list[tuple[PathStr, PathStr]]]
API_KEY = environ.get('GOOGLE_API_KEY')
HTTP_PORT = 8083
NAME_TEMPLATE_RESULTS = PathStr('results.tmpl')
NAME_TEMPLATE_VIDEOS = PathStr('videos.tmpl')
NAME_TEMPLATE_VIDEO_ABOUT = PathStr('video_about.tmpl')
+NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl')
PATH_DIR_TEMP = PathStr(path_join(PATH_DIR_DOWNLOADS, NAME_DIR_TEMP))
EXPECTED_DIRS = [PATH_DIR_DOWNLOADS, PATH_DIR_TEMP, PATH_DIR_THUMBNAILS,
QUOTA_COST_YOUTUBE_SEARCH = QuotaCost(100)
QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1)
+LEGAL_EXTENSIONS = {'webm', 'mp4', 'mkv'}
+
to_download: list[VideoId] = []
+class Player:
+ """MPV representation with some additional features."""
+
+ def __init__(self) -> None:
+ self.last_update = PlayerUpdateId('')
+ self._filenames = [PathStr(e.path) for e in scandir(PATH_DIR_DOWNLOADS)
+ if isfile(e.path)
+ and splitext(e.path)[1][1:] in LEGAL_EXTENSIONS]
+ shuffle(self._filenames)
+ self._idx: int = 0
+ self._mpv: Optional[MPV] = None
+
+ @property
+ def _mpv_available(self) -> bool:
+ return bool(self._mpv and not self._mpv.core_shutdown)
+
+ @staticmethod
+ def _if_mpv_available(f) -> Callable:
+ def wrapper(self):
+ return f(self) if self._mpv else None
+ return wrapper
+
+ def _signal_update(self) -> None:
+ self.last_update = PlayerUpdateId(f'{self._idx}:{time()}')
+
+ def _start_mpv(self) -> None:
+ self._mpv = MPV(input_default_bindings=True,
+ input_vo_keyboard=True,
+ config=True)
+ self._mpv.observe_property('pause', lambda a, b: self._signal_update())
+
+ @self._mpv.event_callback('start-file')
+ def on_start_file(_) -> None:
+ assert self._mpv is not None
+ self._mpv.pause = False
+ self._idx = self._mpv.playlist_pos
+ self._signal_update()
+
+ @self._mpv.event_callback('shutdown')
+ def on_shutdown(_) -> None:
+ self._mpv = None
+ self._signal_update()
+
+ for path in self._filenames:
+ self._mpv.playlist_append(path)
+ self._mpv.playlist_play_index(self._idx)
+
+ @property
+ def current_filename(self) -> Optional[PathStr]:
+ """Return what we assume is the name of the currently playing file."""
+ if not self._filenames:
+ return None
+ return PathStr(basename(self._filenames[self._idx]))
+
+ @property
+ def prev_files(self) -> list[PathStr]:
+ """List 'past' files of playlist."""
+ return list(reversed(self._filenames[:self._idx]))
+
+ @property
+ def next_files(self) -> list[PathStr]:
+ """List 'coming' files of playlist."""
+ return self._filenames[self._idx + 1:]
+
+ @property
+ def is_running(self) -> bool:
+ """Return if player is running/available."""
+ return self._mpv_available
+
+ @property
+ def is_paused(self) -> bool:
+ """Return if player is paused."""
+ if self._mpv_available:
+ assert self._mpv is not None
+ return self._mpv.pause
+ return False
+
+ def toggle_run(self) -> None:
+ """Toggle player running."""
+ if self._mpv_available:
+ assert self._mpv is not None
+ self._mpv.terminate()
+ self._mpv = None
+ else:
+ self._start_mpv()
+ self._signal_update()
+
+ @_if_mpv_available
+ def toggle_pause(self) -> None:
+ """Toggle player pausing."""
+ assert self._mpv is not None
+ self._mpv.pause = not self._mpv.pause
+ self._signal_update()
+
+ @_if_mpv_available
+ def prev(self) -> None:
+ """Move player to previous item in playlist."""
+ assert self._mpv is not None
+ if self._mpv.playlist_pos > 0:
+ self._mpv.playlist_prev()
+ else:
+ self._mpv.playlist_play_index(0)
+
+ @_if_mpv_available
+ def next(self) -> None:
+ """Move player to next item in playlist."""
+ assert self._mpv is not None
+ max_idx: int = len(self._mpv.playlist_filenames) - 1
+ if self._mpv.playlist_pos < len(self._mpv.playlist_filenames) - 1:
+ self._mpv.playlist_next()
+ else:
+ self._mpv.playlist_play_index(max_idx)
+
+
+class PlayerServer(HTTPServer):
+ """Extension of HTTPServer providing for .player inclusion."""
+
+ def __init__(self, *args, **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+ self.player = Player()
+
+
def ensure_expected_dirs_and_files() -> None:
"""Ensure existance of all dirs and files we need for proper operation."""
for dir_name in EXPECTED_DIRS:
def run_server() -> None:
- """Run HTTPServer on TaskHandler, handle KeyboardInterrupt as exit."""
- server = HTTPServer(('localhost', HTTP_PORT), TaskHandler)
+ """Run PlayerServer on TaskHandler, handle KeyboardInterrupt as exit."""
+ server = PlayerServer(('localhost', HTTP_PORT), TaskHandler)
print(f'running at port {HTTP_PORT}')
try:
server.serve_forever()
log = json_load(f)
ret = {}
now = datetime.now()
- for time, amount in log.items():
- then = datetime.strptime(time, TIMESTAMP_FMT)
+ for timestamp, amount in log.items():
+ then = datetime.strptime(timestamp, TIMESTAMP_FMT)
if then >= now - timedelta(days=1):
- ret[time] = amount
+ ret[timestamp] = amount
return ret
class TaskHandler(BaseHTTPRequestHandler):
"""Handler for GET and POST requests to our server."""
+ server: PlayerServer
def _send_http(self,
content: bytes = b'',
self.wfile.write(content)
def do_POST(self) -> None: # pylint:disable=invalid-name
- """Send requests to YouTube API and cache them."""
+ """Map POST requests to handlers for various paths."""
+ url = urlparse(self.path)
+ toks_url: list[str] = url.path.split('/')
+ page_name = toks_url[1]
+ body_length = int(self.headers['content-length'])
+ postvars = parse_qs(self.rfile.read(body_length).decode())
+ if 'playlist' == page_name:
+ self._post_player_command(list(postvars.keys()))
+ elif 'queries' == page_name:
+ self._post_query(QueryText(postvars['query'][0]))
+
+ def _post_player_command(self, commands: list[str]) -> None:
+ # print("DEBUG commands", commands)
+ if 'pause' in commands:
+ self.server.player.toggle_pause()
+ elif 'prev' in commands:
+ self.server.player.prev()
+ elif 'next' in commands:
+ self.server.player.next()
+ elif 'stop' in commands:
+ self.server.player.toggle_run()
+ sleep(0.5) # avoid reload happening before current_file update
+ self._send_http(headers=[('Location', '/')], code=302)
+
+ def _post_query(self, query_txt: QueryText) -> None:
def collect_results(now: DatetimeStr,
query_txt: QueryText
results_item['definition'] = content_details['definition']
return results
- body_length = int(self.headers['content-length'])
- postvars = parse_qs(self.rfile.read(body_length).decode())
- query_txt = QueryText(postvars['query'][0])
now = DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT))
results = collect_results(now, query_txt)
md5sum = md5(str(query_txt).encode()).hexdigest()
self._send_video_about(VideoId(toks_url[2]))
elif 'query' == page_name:
self._send_query_page(QueryId(toks_url[2]))
- else: # e.g. for /
+ elif 'queries' == page_name:
self._send_queries_index_and_search()
+ elif '_last_playlist_update.json' == page_name:
+ self._send_last_playlist_update()
+ else: # e.g. for /
+ self._send_playlist()
def _send_rendered_template(self,
tmpl_name: PathStr,
videos.sort(key=lambda t: t[1])
self._send_rendered_template(NAME_TEMPLATE_VIDEOS, {'videos': videos})
+ def _send_last_playlist_update(self) -> None:
+ payload: dict[str, str] = {
+ 'last_update': self.server.player.last_update}
+ self._send_http(bytes(json_dumps(payload), 'utf8'),
+ headers=[('Content-type', 'application/json')])
+
+ def _send_playlist(self) -> None:
+ tuples: list[tuple[PathStr, PathStr]] = []
+ i: int = 0
+ while True:
+ prev, next_ = PathStr(''), PathStr('')
+ if len(self.server.player.prev_files) > i:
+ prev = PathStr(basename(self.server.player.prev_files[i]))
+ if len(self.server.player.next_files) > i:
+ next_ = PathStr(basename(self.server.player.next_files[i]))
+ if not prev + next_:
+ break
+ tuples += [(prev, next_)]
+ i += 1
+ self._send_rendered_template(
+ NAME_TEMPLATE_PLAYLIST,
+ {'last_update': self.server.player.last_update,
+ 'running': self.server.player.is_running,
+ 'paused': self.server.player.is_paused,
+ 'current_title': self.server.player.current_filename,
+ 'tuples': tuples})
+
if __name__ == '__main__':
ensure_expected_dirs_and_files()