--- /dev/null
+"""Collect directly HTTP-related elements."""
+from datetime import datetime
+from http.server import HTTPServer, BaseHTTPRequestHandler
+from json import dumps as json_dumps
+from pathlib import Path
+from time import sleep
+from typing import NewType, Optional, TypeAlias
+from urllib.parse import urlparse, parse_qs
+from urllib.request import urlretrieve
+from urllib.error import HTTPError
+from jinja2 import ( # type: ignore
+ Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader)
+import googleapiclient.discovery # type: ignore
+from ytplom.misc import (
+ B64Str, DatetimeStr, FilesWithIndex, FlagsInt, FlagName,
+ NotFoundException, PlayerUpdateId, QueryId, QueryText, QuotaCost,
+ UrlStr, YoutubeId,
+ FILE_FLAGS, PATH_APP_DATA, PATH_CACHE, TIMESTAMP_FMT,
+ YOUTUBE_URL_PREFIX,
+ ensure_expected_dirs,
+ Config, DbConnection, DownloadsManager, Player, QuotaLog, VideoFile,
+ YoutubeQuery, YoutubeVideo
+)
+
+PageNames: TypeAlias = dict[str, Path]
+ParamsStr = NewType('ParamsStr', str)
+TemplateContext: TypeAlias = dict[
+ str,
+ None | bool
+ | FilesWithIndex | PageNames | ParamsStr | Path | PlayerUpdateId
+ | QueryText | QuotaCost | UrlStr | 'VideoFile' | YoutubeId
+ | 'YoutubeVideo' | list[FlagName] | list['VideoFile']
+ | list['YoutubeVideo'] | list['YoutubeQuery']
+]
+
+# API expectations
+PATH_THUMBNAILS = PATH_CACHE.joinpath('thumbnails')
+THUMBNAIL_URL_PREFIX = UrlStr('https://i.ytimg.com/vi/')
+THUMBNAIL_URL_SUFFIX = UrlStr('/default.jpg')
+QUOTA_COST_YOUTUBE_SEARCH = QuotaCost(100)
+QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1)
+
+# template paths
+PATH_TEMPLATES = PATH_APP_DATA.joinpath('templates')
+NAME_TEMPLATE_QUERIES = Path('yt_queries.tmpl')
+NAME_TEMPLATE_RESULTS = Path('yt_results.tmpl')
+NAME_TEMPLATE_FILES = Path('files.tmpl')
+NAME_TEMPLATE_FILE_DATA = Path('file_data.tmpl')
+NAME_TEMPLATE_YT_VIDEO = Path('yt_result.tmpl')
+NAME_TEMPLATE_PLAYLIST = Path('playlist.tmpl')
+
+# page names
+PAGE_NAMES: PageNames = {
+ 'download': Path('dl'),
+ 'file': Path('file'),
+ 'files': Path('files'),
+ 'last_update': Path('last_playlist_update.json'),
+ 'missing': Path('missing.json'),
+ 'playlist': Path('playlist'),
+ 'thumbnails': Path('thumbnails'),
+ 'yt_result': Path('yt_result'),
+ 'yt_query': Path('yt_query'),
+ 'yt_queries': Path('yt_queries')
+}
+
+
+class Server(HTTPServer):
+ """Extension of HTTPServer providing for Player and DownloadsManager."""
+
+ def __init__(self, config: Config, *args, **kwargs) -> None:
+ super().__init__((config.host, config.port), _TaskHandler,
+ *args, **kwargs)
+ self.config = config
+ self.jinja = JinjaEnv(loader=JinjaFSLoader(PATH_TEMPLATES))
+ self.player = Player()
+ self.downloads = DownloadsManager()
+ self.downloads.clean_unfinished()
+ self.downloads.start_thread()
+
+
+class _TaskHandler(BaseHTTPRequestHandler):
+ """Handler for GET and POST requests to our server."""
+ server: Server
+
+ def _send_http(self,
+ content: bytes = b'',
+ headers: Optional[list[tuple[str, str]]] = None,
+ code: int = 200
+ ) -> None:
+ headers = headers if headers else []
+ self.send_response(code)
+ for header_tuple in headers:
+ self.send_header(header_tuple[0], header_tuple[1])
+ self.end_headers()
+ if content:
+ self.wfile.write(content)
+
+ def _redirect(self, target: Path) -> None:
+ self._send_http(headers=[('Location', str(target))], code=302)
+
+ def do_POST(self) -> None: # pylint:disable=invalid-name
+ """Map POST requests to handlers for various paths."""
+ url = urlparse(self.path)
+ toks_url = Path(url.path).parts
+ page_name = Path(toks_url[1] if len(toks_url) > 1 else '')
+ body_length = int(self.headers['content-length'])
+ postvars = parse_qs(self.rfile.read(body_length).decode())
+ if PAGE_NAMES['playlist'] == page_name:
+ self._receive_player_command(list(postvars.keys())[0])
+ if PAGE_NAMES['files'] == page_name:
+ self._receive_files_command(list(postvars.keys())[0])
+ elif PAGE_NAMES['file'] == page_name:
+ self._receive_video_flag(B64Str(toks_url[2]),
+ [FlagName(k) for k in postvars])
+ elif PAGE_NAMES['yt_queries'] == page_name:
+ self._receive_yt_query(QueryText(postvars['query'][0]))
+
+ def _receive_player_command(self, command: str) -> None:
+ if 'pause' == command:
+ self.server.player.toggle_pause()
+ elif 'prev' == command:
+ self.server.player.prev()
+ elif 'next' == command:
+ self.server.player.next()
+ elif 'stop' == command:
+ self.server.player.toggle_run()
+ elif 'reload' == command:
+ self.server.player.reload()
+ elif command.startswith('jump_'):
+ self.server.player.jump_to(int(command.split('_')[1]))
+ elif command.startswith('up'):
+ self.server.player.move_entry(int(command.split('_')[1]))
+ elif command.startswith('down_'):
+ self.server.player.move_entry(int(command.split('_')[1]), False)
+ sleep(0.5) # avoid redir happening before current_file update
+ self._redirect(Path('/'))
+
+ def _receive_files_command(self, command: str) -> None:
+ if command.startswith('play_'):
+ conn = DbConnection()
+ file = VideoFile.get_by_b64(conn, B64Str(command.split('_', 1)[1]))
+ conn.commit_close()
+ self.server.player.inject_and_play(file)
+ self._redirect(Path('/'))
+
+ def _receive_video_flag(self,
+ rel_path_b64: B64Str,
+ flag_names: list[FlagName]
+ ) -> None:
+ conn = DbConnection()
+ file = VideoFile.get_by_b64(conn, rel_path_b64)
+ flags = FlagsInt(0)
+ for flag_name in flag_names:
+ flags = FlagsInt(file.flags | FILE_FLAGS[flag_name])
+ file.flags = flags
+ file.save(conn)
+ conn.commit_close()
+ file.ensure_absence_if_deleted()
+ self._redirect(Path('/')
+ .joinpath(PAGE_NAMES['file'])
+ .joinpath(rel_path_b64))
+
+ def _receive_yt_query(self, query_txt: QueryText) -> None:
+ conn = DbConnection()
+
+ def collect_results(query_txt: QueryText) -> list[YoutubeVideo]:
+ ensure_expected_dirs([PATH_THUMBNAILS])
+ youtube = googleapiclient.discovery.build(
+ 'youtube', 'v3', developerKey=self.server.config.api_key)
+ QuotaLog.update(conn, QUOTA_COST_YOUTUBE_SEARCH)
+ search_request = youtube.search().list(
+ q=query_txt,
+ part='snippet',
+ maxResults=25,
+ safeSearch='none',
+ type='video')
+ results: list[YoutubeVideo] = []
+ ids_to_detail: list[YoutubeId] = []
+ for item in search_request.execute()['items']:
+ video_id: YoutubeId = item['id']['videoId']
+ ids_to_detail += [video_id]
+ snippet = item['snippet']
+ urlretrieve(snippet['thumbnails']['default']['url'],
+ PATH_THUMBNAILS.joinpath(f'{video_id}.jpg'))
+ results += [YoutubeVideo(id_=video_id,
+ title=snippet['title'],
+ description=snippet['description'],
+ published_at=snippet['publishedAt'])]
+ QuotaLog.update(conn, QUOTA_COST_YOUTUBE_DETAILS)
+ ids_for_details = ','.join([r.id_ for r in results])
+ videos_request = youtube.videos().list(id=ids_for_details,
+ part='content_details')
+ unfinished_streams: list[YoutubeId] = []
+ for i, detailed in enumerate(videos_request.execute()['items']):
+ result = results[i]
+ assert result.id_ == detailed['id']
+ content_details: dict[str, str] = detailed['contentDetails']
+ if 'P0D' == content_details['duration']:
+ unfinished_streams += [result.id_]
+ continue
+ result.set_duration_from_yt_string(content_details['duration'])
+ result.definition = content_details['definition'].upper()
+ return [r for r in results if r.id_ not in unfinished_streams]
+
+ query_data = YoutubeQuery(
+ None, query_txt,
+ DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)))
+ query_data.save(conn)
+ for result in collect_results(query_txt):
+ result.save(conn)
+ result.save_to_query(conn, query_data.id_)
+ conn.commit_close()
+ self._redirect(Path('/')
+ .joinpath(PAGE_NAMES['yt_query'])
+ .joinpath(query_data.id_))
+
+ def do_GET(self) -> None: # pylint:disable=invalid-name
+ """Map GET requests to handlers for various paths."""
+ url = urlparse(self.path)
+ toks_url = Path(url.path).parts
+ page_name = Path(toks_url[1] if len(toks_url) > 1 else '')
+ try:
+ if PAGE_NAMES['thumbnails'] == page_name:
+ self._send_thumbnail(Path(toks_url[2]))
+ elif PAGE_NAMES['download'] == page_name:
+ self._send_or_download_video(YoutubeId(toks_url[2]))
+ elif PAGE_NAMES['files'] == page_name:
+ params = parse_qs(url.query)
+ filter_ = ParamsStr(params.get('filter', [''])[0])
+ show_absent = params.get('show_absent', [False])[0]
+ self._send_files_index(filter_, bool(show_absent))
+ elif PAGE_NAMES['file'] == page_name:
+ self._send_file_data(B64Str(toks_url[2]))
+ elif PAGE_NAMES['yt_result'] == page_name:
+ self._send_yt_result(YoutubeId(toks_url[2]))
+ elif PAGE_NAMES['missing'] == page_name:
+ self._send_missing_json()
+ elif PAGE_NAMES['yt_query'] == page_name:
+ self._send_yt_query_page(QueryId(toks_url[2]))
+ elif PAGE_NAMES['yt_queries'] == page_name:
+ self._send_yt_queries_index_and_search()
+ elif PAGE_NAMES['last_update'] == page_name:
+ self._send_last_playlist_update()
+ else: # e.g. for /
+ self._send_playlist()
+ except NotFoundException as e:
+ self._send_http(bytes(str(e), 'utf8'), code=404)
+
+ def _send_rendered_template(self,
+ tmpl_name: Path,
+ tmpl_ctx: TemplateContext
+ ) -> None:
+ tmpl = self.server.jinja.get_template(str(tmpl_name))
+ tmpl_ctx['page_names'] = PAGE_NAMES
+ html = tmpl.render(**tmpl_ctx)
+ self._send_http(bytes(html, 'utf8'))
+
+ def _send_thumbnail(self, filename: Path) -> None:
+ ensure_expected_dirs([PATH_THUMBNAILS])
+ path_thumbnail = PATH_THUMBNAILS.joinpath(filename)
+ if not path_thumbnail.exists():
+ video_id = filename.stem
+ url = f'{THUMBNAIL_URL_PREFIX}{video_id}{THUMBNAIL_URL_SUFFIX}'
+ try:
+ urlretrieve(url, PATH_THUMBNAILS.joinpath(f'{video_id}.jpg'))
+ except HTTPError as e:
+ if 404 == e.code:
+ raise NotFoundException from e
+ raise e
+ with path_thumbnail.open('rb') as f:
+ img = f.read()
+ self._send_http(img, [('Content-type', 'image/jpg')])
+
+ def _send_or_download_video(self, video_id: YoutubeId) -> None:
+ conn = DbConnection()
+ try:
+ file_data = VideoFile.get_by_yt_id(conn, video_id)
+ except NotFoundException:
+ conn.commit_close()
+ self.server.downloads.queue_download(video_id)
+ self._redirect(Path('/')
+ .joinpath(PAGE_NAMES['yt_result'])
+ .joinpath(video_id))
+ return
+ conn.commit_close()
+ with file_data.full_path.open('rb') as video_file:
+ video = video_file.read()
+ self._send_http(content=video)
+
+ def _send_yt_query_page(self, query_id: QueryId) -> None:
+ conn = DbConnection()
+ query = YoutubeQuery.get_one(conn, str(query_id))
+ results = YoutubeVideo.get_all_for_query(conn, query_id)
+ conn.commit_close()
+ self._send_rendered_template(
+ NAME_TEMPLATE_RESULTS,
+ {'query': query.text, 'videos': results})
+
+ def _send_yt_queries_index_and_search(self) -> None:
+ conn = DbConnection()
+ quota_count = QuotaLog.current(conn)
+ queries_data = YoutubeQuery.get_all(conn)
+ conn.commit_close()
+ queries_data.sort(key=lambda q: q.retrieved_at, reverse=True)
+ self._send_rendered_template(
+ NAME_TEMPLATE_QUERIES, {'queries': queries_data,
+ 'quota_count': quota_count})
+
+ def _send_yt_result(self, video_id: YoutubeId) -> None:
+ conn = DbConnection()
+ linked_queries = YoutubeQuery.get_all_for_video(conn, video_id)
+ try:
+ video_data = YoutubeVideo.get_one(conn, video_id)
+ except NotFoundException:
+ video_data = YoutubeVideo(video_id)
+ try:
+ file_path = VideoFile.get_by_yt_id(conn, video_id).full_path
+ except NotFoundException:
+ file_path = None
+ conn.commit_close()
+ self._send_rendered_template(
+ NAME_TEMPLATE_YT_VIDEO,
+ {'video_data': video_data,
+ 'is_temp': video_id in self.server.downloads.ids_unfinished,
+ 'file_path': file_path,
+ 'youtube_prefix': YOUTUBE_URL_PREFIX,
+ 'queries': linked_queries})
+
+ def _send_file_data(self, rel_path_b64: B64Str) -> None:
+ conn = DbConnection()
+ file = VideoFile.get_by_b64(conn, rel_path_b64)
+ conn.commit_close()
+ self._send_rendered_template(
+ NAME_TEMPLATE_FILE_DATA,
+ {'file': file, 'flag_names': list(FILE_FLAGS)})
+
+ def _send_files_index(self, filter_: ParamsStr, show_absent: bool) -> None:
+ conn = DbConnection()
+ files = [f for f in VideoFile.get_all(conn)
+ if filter_.lower() in str(f.rel_path).lower()
+ and (show_absent or f.present)]
+ conn.commit_close()
+ files.sort(key=lambda t: t.rel_path)
+ self._send_rendered_template(
+ NAME_TEMPLATE_FILES,
+ {'files': files, 'filter': filter_,
+ 'show_absent': show_absent})
+
+ def _send_missing_json(self) -> None:
+ conn = DbConnection()
+ missing = [f.rel_path for f in VideoFile.get_all(conn) if f.missing]
+ conn.commit_close()
+ self._send_http(bytes(json_dumps(missing), 'utf8'),
+ headers=[('Content-type', 'application/json')])
+
+ def _send_last_playlist_update(self) -> None:
+ payload: dict[str, PlayerUpdateId] = {
+ 'last_update': self.server.player.last_update}
+ self._send_http(bytes(json_dumps(payload), 'utf8'),
+ headers=[('Content-type', 'application/json')])
+
+ def _send_playlist(self) -> None:
+ if self.server.player.empty:
+ self.server.player.load_files()
+ self._send_rendered_template(
+ NAME_TEMPLATE_PLAYLIST,
+ {'last_update': self.server.player.last_update,
+ 'running': self.server.player.is_running,
+ 'paused': self.server.player.is_paused,
+ 'current_video': self.server.player.current_file,
+ 'prev_files_w_idx': self.server.player.prev_files_w_idx,
+ 'next_files_w_idx': self.server.player.next_files_w_idx})
from random import shuffle
from time import time, sleep
from datetime import datetime, timedelta
-from json import dumps as json_dumps, loads as json_loads
+from json import loads as json_loads
from uuid import uuid4
from pathlib import Path
from sqlite3 import connect as sql_connect, Cursor, Row
-from http.server import HTTPServer, BaseHTTPRequestHandler
-from urllib.parse import urlparse, parse_qs
-from urllib.request import urlretrieve
-from urllib.error import HTTPError
from threading import Thread
from queue import Queue
# non-included libs
-from jinja2 import ( # type: ignore
- Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader)
from mpv import MPV # type: ignore
from yt_dlp import YoutubeDL # type: ignore
-import googleapiclient.discovery # type: ignore
# default configuration
DEFAULTS = {
DatetimeStr = NewType('DatetimeStr', str)
QuotaCost = NewType('QuotaCost', int)
YoutubeId = NewType('YoutubeId', str)
-PathStr = NewType('PathStr', str)
QueryId = NewType('QueryId', str)
QueryText = NewType('QueryText', str)
ProseText = NewType('ProseText', str)
AmountDownloads = NewType('AmountDownloads', int)
PlayerUpdateId = NewType('PlayerUpdateId', str)
B64Str = NewType('B64Str', str)
-ParamsStr = NewType('ParamsStr', str)
UrlStr = NewType('UrlStr', str)
-PageNames: TypeAlias = dict[str, Path]
-DownloadsIndex: TypeAlias = dict[YoutubeId, Path]
FilesWithIndex: TypeAlias = list[tuple[int, 'VideoFile']]
-TemplateContext: TypeAlias = dict[
- str,
- None | bool
- | FilesWithIndex | PageNames | ParamsStr | Path | PlayerUpdateId
- | QueryText | QuotaCost | UrlStr | 'VideoFile' | YoutubeId
- | 'YoutubeVideo' | list[FlagName] | list['VideoFile']
- | list['YoutubeVideo'] | list['YoutubeQuery']
-]
# major expected directories
PATH_APP_DATA = Path.home().joinpath('.local/share/ytplom')
PATH_DOWNLOADS = Path.home().joinpath('ytplom_downloads')
PATH_DB = PATH_APP_DATA.joinpath('db.sql')
PATH_TEMP = PATH_CACHE.joinpath('temp')
-PATH_THUMBNAILS = PATH_CACHE.joinpath('thumbnails')
PATH_CONFFILE = Path.home().joinpath('.config/ytplom/config.json')
-# template paths
-PATH_TEMPLATES = PATH_APP_DATA.joinpath('templates')
-NAME_TEMPLATE_QUERIES = Path('yt_queries.tmpl')
-NAME_TEMPLATE_RESULTS = Path('yt_results.tmpl')
-NAME_TEMPLATE_FILES = Path('files.tmpl')
-NAME_TEMPLATE_FILE_DATA = Path('file_data.tmpl')
-NAME_TEMPLATE_YT_VIDEO = Path('yt_result.tmpl')
-NAME_TEMPLATE_PLAYLIST = Path('playlist.tmpl')
-
-# page names
-PAGE_NAMES = {
- 'download': Path('dl'),
- 'file': Path('file'),
- 'files': Path('files'),
- 'last_update': Path('last_playlist_update.json'),
- 'missing': Path('missing.json'),
- 'playlist': Path('playlist'),
- 'thumbnails': Path('thumbnails'),
- 'yt_result': Path('yt_result'),
- 'yt_query': Path('yt_query'),
- 'yt_queries': Path('yt_queries')
-}
-
# yt_dlp config
YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
'/best[height<=1080][width<=1920]'
# Youtube API expectations
YOUTUBE_URL_PREFIX = UrlStr('https://www.youtube.com/watch?v=')
-THUMBNAIL_URL_PREFIX = UrlStr('https://i.ytimg.com/vi/')
-THUMBNAIL_URL_SUFFIX = UrlStr('/default.jpg')
-QUOTA_COST_YOUTUBE_SEARCH = QuotaCost(100)
-QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1)
# database stuff
EXPECTED_DB_VERSION = 1
"""Raise in any other case where we know what's happening."""
-def _ensure_expected_dirs(expected_dirs: list[Path]) -> None:
+def ensure_expected_dirs(expected_dirs: list[Path]) -> None:
"""Ensure existance of expected_dirs _as_ directories."""
for dir_path in [p for p in expected_dirs if not p.is_dir()]:
if dir_path.exists():
return self._flags
@flags.setter
- def flags(self, flags: FlagsInt) -> None:
+ def flags(self, flags: list[FlagsInt]) -> None:
+ # self._flags = FlagsInt(0)
+ # for flag in flags:
+ # self._flags = self._flags | flag
self._renew_last_update()
- self._flags = flags
def is_flag_set(self, flag_name: FlagName) -> bool:
"""Return if flag of flag_name is set in flags field."""
def __init__(self) -> None:
self._to_download: list[YoutubeId] = []
- _ensure_expected_dirs([PATH_DOWNLOADS, PATH_TEMP])
+ ensure_expected_dirs([PATH_DOWNLOADS, PATH_TEMP])
self._sync_db()
def _sync_db(self):
sleep(0.5)
self._download_next()
Thread(target=loop, daemon=False).start()
-
-
-class Server(HTTPServer):
- """Extension of HTTPServer providing for Player and DownloadsManager."""
-
- def __init__(self, config: Config, *args, **kwargs) -> None:
- super().__init__(*args, **kwargs)
- self.config = config
- self.jinja = JinjaEnv(loader=JinjaFSLoader(PATH_TEMPLATES))
- self.player = Player()
- self.downloads = DownloadsManager()
- self.downloads.clean_unfinished()
- self.downloads.start_thread()
-
-
-class TaskHandler(BaseHTTPRequestHandler):
- """Handler for GET and POST requests to our server."""
- server: Server
-
- def _send_http(self,
- content: bytes = b'',
- headers: Optional[list[tuple[str, str]]] = None,
- code: int = 200
- ) -> None:
- headers = headers if headers else []
- self.send_response(code)
- for header_tuple in headers:
- self.send_header(header_tuple[0], header_tuple[1])
- self.end_headers()
- if content:
- self.wfile.write(content)
-
- def _redirect(self, target: Path) -> None:
- self._send_http(headers=[('Location', str(target))], code=302)
-
- def do_POST(self) -> None: # pylint:disable=invalid-name
- """Map POST requests to handlers for various paths."""
- url = urlparse(self.path)
- toks_url = Path(url.path).parts
- page_name = Path(toks_url[1] if len(toks_url) > 1 else '')
- body_length = int(self.headers['content-length'])
- postvars = parse_qs(self.rfile.read(body_length).decode())
- if PAGE_NAMES['playlist'] == page_name:
- self._receive_player_command(list(postvars.keys())[0])
- if PAGE_NAMES['files'] == page_name:
- self._receive_files_command(list(postvars.keys())[0])
- elif PAGE_NAMES['file'] == page_name:
- self._receive_video_flag(B64Str(toks_url[2]),
- [FlagName(k) for k in postvars])
- elif PAGE_NAMES['yt_queries'] == page_name:
- self._receive_yt_query(QueryText(postvars['query'][0]))
-
- def _receive_player_command(self, command: str) -> None:
- if 'pause' == command:
- self.server.player.toggle_pause()
- elif 'prev' == command:
- self.server.player.prev()
- elif 'next' == command:
- self.server.player.next()
- elif 'stop' == command:
- self.server.player.toggle_run()
- elif 'reload' == command:
- self.server.player.reload()
- elif command.startswith('jump_'):
- self.server.player.jump_to(int(command.split('_')[1]))
- elif command.startswith('up'):
- self.server.player.move_entry(int(command.split('_')[1]))
- elif command.startswith('down_'):
- self.server.player.move_entry(int(command.split('_')[1]), False)
- sleep(0.5) # avoid redir happening before current_file update
- self._redirect(Path('/'))
-
- def _receive_files_command(self, command: str) -> None:
- if command.startswith('play_'):
- conn = DbConnection()
- file = VideoFile.get_by_b64(conn, B64Str(command.split('_', 1)[1]))
- conn.commit_close()
- self.server.player.inject_and_play(file)
- self._redirect(Path('/'))
-
- def _receive_video_flag(self,
- rel_path_b64: B64Str,
- flag_names: list[FlagName]
- ) -> None:
- conn = DbConnection()
- file = VideoFile.get_by_b64(conn, rel_path_b64)
- flags = FlagsInt(0)
- for flag_name in flag_names:
- flags = FlagsInt(file.flags | FILE_FLAGS[flag_name])
- file.flags = flags
- file.save(conn)
- conn.commit_close()
- file.ensure_absence_if_deleted()
- self._redirect(Path('/')
- .joinpath(PAGE_NAMES['file'])
- .joinpath(rel_path_b64))
-
- def _receive_yt_query(self, query_txt: QueryText) -> None:
- conn = DbConnection()
-
- def collect_results(query_txt: QueryText) -> list[YoutubeVideo]:
- _ensure_expected_dirs([PATH_THUMBNAILS])
- youtube = googleapiclient.discovery.build(
- 'youtube', 'v3', developerKey=self.server.config.api_key)
- QuotaLog.update(conn, QUOTA_COST_YOUTUBE_SEARCH)
- search_request = youtube.search().list(
- q=query_txt,
- part='snippet',
- maxResults=25,
- safeSearch='none',
- type='video')
- results: list[YoutubeVideo] = []
- ids_to_detail: list[YoutubeId] = []
- for item in search_request.execute()['items']:
- video_id: YoutubeId = item['id']['videoId']
- ids_to_detail += [video_id]
- snippet = item['snippet']
- urlretrieve(snippet['thumbnails']['default']['url'],
- PATH_THUMBNAILS.joinpath(f'{video_id}.jpg'))
- results += [YoutubeVideo(id_=video_id,
- title=snippet['title'],
- description=snippet['description'],
- published_at=snippet['publishedAt'])]
- QuotaLog.update(conn, QUOTA_COST_YOUTUBE_DETAILS)
- ids_for_details = ','.join([r.id_ for r in results])
- videos_request = youtube.videos().list(id=ids_for_details,
- part='content_details')
- unfinished_streams: list[YoutubeId] = []
- for i, detailed in enumerate(videos_request.execute()['items']):
- result = results[i]
- assert result.id_ == detailed['id']
- content_details: dict[str, str] = detailed['contentDetails']
- if 'P0D' == content_details['duration']:
- unfinished_streams += [result.id_]
- continue
- result.set_duration_from_yt_string(content_details['duration'])
- result.definition = content_details['definition'].upper()
- return [r for r in results if r.id_ not in unfinished_streams]
-
- query_data = YoutubeQuery(
- None, query_txt,
- DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)))
- query_data.save(conn)
- for result in collect_results(query_txt):
- result.save(conn)
- result.save_to_query(conn, query_data.id_)
- conn.commit_close()
- self._redirect(Path('/')
- .joinpath(PAGE_NAMES['yt_query'])
- .joinpath(query_data.id_))
-
- def do_GET(self) -> None: # pylint:disable=invalid-name
- """Map GET requests to handlers for various paths."""
- url = urlparse(self.path)
- toks_url = Path(url.path).parts
- page_name = Path(toks_url[1] if len(toks_url) > 1 else '')
- try:
- if PAGE_NAMES['thumbnails'] == page_name:
- self._send_thumbnail(Path(toks_url[2]))
- elif PAGE_NAMES['download'] == page_name:
- self._send_or_download_video(YoutubeId(toks_url[2]))
- elif PAGE_NAMES['files'] == page_name:
- params = parse_qs(url.query)
- filter_ = ParamsStr(params.get('filter', [''])[0])
- show_absent = params.get('show_absent', [False])[0]
- self._send_files_index(filter_, bool(show_absent))
- elif PAGE_NAMES['file'] == page_name:
- self._send_file_data(B64Str(toks_url[2]))
- elif PAGE_NAMES['yt_result'] == page_name:
- self._send_yt_result(YoutubeId(toks_url[2]))
- elif PAGE_NAMES['missing'] == page_name:
- self._send_missing_json()
- elif PAGE_NAMES['yt_query'] == page_name:
- self._send_yt_query_page(QueryId(toks_url[2]))
- elif PAGE_NAMES['yt_queries'] == page_name:
- self._send_yt_queries_index_and_search()
- elif PAGE_NAMES['last_update'] == page_name:
- self._send_last_playlist_update()
- else: # e.g. for /
- self._send_playlist()
- except NotFoundException as e:
- self._send_http(bytes(str(e), 'utf8'), code=404)
-
- def _send_rendered_template(self,
- tmpl_name: Path,
- tmpl_ctx: TemplateContext
- ) -> None:
- tmpl = self.server.jinja.get_template(str(tmpl_name))
- tmpl_ctx['page_names'] = PAGE_NAMES
- html = tmpl.render(**tmpl_ctx)
- self._send_http(bytes(html, 'utf8'))
-
- def _send_thumbnail(self, filename: Path) -> None:
- _ensure_expected_dirs([PATH_THUMBNAILS])
- path_thumbnail = PATH_THUMBNAILS.joinpath(filename)
- if not path_thumbnail.exists():
- video_id = filename.stem
- url = f'{THUMBNAIL_URL_PREFIX}{video_id}{THUMBNAIL_URL_SUFFIX}'
- try:
- urlretrieve(url, PATH_THUMBNAILS.joinpath(f'{video_id}.jpg'))
- except HTTPError as e:
- if 404 == e.code:
- raise NotFoundException from e
- raise e
- with path_thumbnail.open('rb') as f:
- img = f.read()
- self._send_http(img, [('Content-type', 'image/jpg')])
-
- def _send_or_download_video(self, video_id: YoutubeId) -> None:
- conn = DbConnection()
- try:
- file_data = VideoFile.get_by_yt_id(conn, video_id)
- except NotFoundException:
- conn.commit_close()
- self.server.downloads.queue_download(video_id)
- self._redirect(Path('/')
- .joinpath(PAGE_NAMES['yt_result'])
- .joinpath(video_id))
- return
- conn.commit_close()
- with file_data.full_path.open('rb') as video_file:
- video = video_file.read()
- self._send_http(content=video)
-
- def _send_yt_query_page(self, query_id: QueryId) -> None:
- conn = DbConnection()
- query = YoutubeQuery.get_one(conn, str(query_id))
- results = YoutubeVideo.get_all_for_query(conn, query_id)
- conn.commit_close()
- self._send_rendered_template(
- NAME_TEMPLATE_RESULTS,
- {'query': query.text, 'videos': results})
-
- def _send_yt_queries_index_and_search(self) -> None:
- conn = DbConnection()
- quota_count = QuotaLog.current(conn)
- queries_data = YoutubeQuery.get_all(conn)
- conn.commit_close()
- queries_data.sort(key=lambda q: q.retrieved_at, reverse=True)
- self._send_rendered_template(
- NAME_TEMPLATE_QUERIES, {'queries': queries_data,
- 'quota_count': quota_count})
-
- def _send_yt_result(self, video_id: YoutubeId) -> None:
- conn = DbConnection()
- linked_queries = YoutubeQuery.get_all_for_video(conn, video_id)
- try:
- video_data = YoutubeVideo.get_one(conn, video_id)
- except NotFoundException:
- video_data = YoutubeVideo(video_id)
- try:
- file_path = VideoFile.get_by_yt_id(conn, video_id).full_path
- except NotFoundException:
- file_path = None
- conn.commit_close()
- self._send_rendered_template(
- NAME_TEMPLATE_YT_VIDEO,
- {'video_data': video_data,
- 'is_temp': video_id in self.server.downloads.ids_unfinished,
- 'file_path': file_path,
- 'youtube_prefix': YOUTUBE_URL_PREFIX,
- 'queries': linked_queries})
-
- def _send_file_data(self, rel_path_b64: B64Str) -> None:
- conn = DbConnection()
- file = VideoFile.get_by_b64(conn, rel_path_b64)
- conn.commit_close()
- self._send_rendered_template(
- NAME_TEMPLATE_FILE_DATA,
- {'file': file, 'flag_names': list(FILE_FLAGS)})
-
- def _send_files_index(self, filter_: ParamsStr, show_absent: bool) -> None:
- conn = DbConnection()
- files = [f for f in VideoFile.get_all(conn)
- if filter_.lower() in str(f.rel_path).lower()
- and (show_absent or f.present)]
- conn.commit_close()
- files.sort(key=lambda t: t.rel_path)
- self._send_rendered_template(
- NAME_TEMPLATE_FILES,
- {'files': files, 'filter': filter_,
- 'show_absent': show_absent})
-
- def _send_missing_json(self) -> None:
- conn = DbConnection()
- missing = [f.rel_path for f in VideoFile.get_all(conn) if f.missing]
- conn.commit_close()
- self._send_http(bytes(json_dumps(missing), 'utf8'),
- headers=[('Content-type', 'application/json')])
-
- def _send_last_playlist_update(self) -> None:
- payload: dict[str, PlayerUpdateId] = {
- 'last_update': self.server.player.last_update}
- self._send_http(bytes(json_dumps(payload), 'utf8'),
- headers=[('Content-type', 'application/json')])
-
- def _send_playlist(self) -> None:
- if self.server.player.empty:
- self.server.player.load_files()
- self._send_rendered_template(
- NAME_TEMPLATE_PLAYLIST,
- {'last_update': self.server.player.last_update,
- 'running': self.server.player.is_running,
- 'paused': self.server.player.is_paused,
- 'current_video': self.server.player.current_file,
- 'prev_files_w_idx': self.server.player.prev_files_w_idx,
- 'next_files_w_idx': self.server.player.next_files_w_idx})