#!/usr/bin/env python3
"""Minimalistic download-focused YouTube interface."""
+from typing import TypeAlias, Optional
from os import environ, makedirs, scandir, remove as os_remove
-from os.path import (isdir, exists as path_exists, join as path_join, splitext,
- basename)
+from os.path import (isdir, isfile, exists as path_exists, join as path_join,
+ splitext, basename)
from time import sleep
from json import load as json_load, dump as json_dump
from datetime import datetime, timedelta
from yt_dlp import YoutubeDL # type: ignore
import googleapiclient.discovery # type: ignore
-API_KEY = environ.get('GOOGLE_API_KEY')
+Query: TypeAlias = dict[str, str | int | list[dict[str, str]]]
+Result: TypeAlias = dict[str, str]
+QuotaLog: TypeAlias = dict[str, int]
+Headers: TypeAlias = list[tuple[str, str]]
+HttpPayload: TypeAlias = dict[str, list[str]]
+VideoId: TypeAlias = str
+PathStr: TypeAlias = str
+API_KEY = environ.get('GOOGLE_API_KEY')
HTTP_PORT = 8083
+
PATH_QUOTA_LOG = 'quota_log.json'
PATH_DIR_DOWNLOADS = 'downloads'
PATH_DIR_THUMBNAILS = 'thumbnails'
NAME_TEMPLATE_INDEX = 'index.tmpl'
NAME_TEMPLATE_RESULTS = 'results.tmpl'
-PATH_DIR_TEMP = path_join(PATH_DIR_DOWNLOADS, NAME_DIR_TEMP)
+PATH_DIR_TEMP: PathStr = path_join(PATH_DIR_DOWNLOADS, NAME_DIR_TEMP)
EXPECTED_DIRS = [PATH_DIR_DOWNLOADS, PATH_DIR_TEMP, PATH_DIR_THUMBNAILS,
PATH_DIR_REQUESTS_CACHE]
-PATH_TEMPLATE_INDEX = path_join(PATH_DIR_TEMPLATES, NAME_TEMPLATE_INDEX)
+PATH_TEMPLATE_INDEX: PathStr = path_join(PATH_DIR_TEMPLATES,
+ NAME_TEMPLATE_INDEX)
TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
YOUTUBE_URL_PREFIX = 'https://www.youtube.com/watch?v='
+YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
+ '/best[height<=1080][width<=1920]'
+YT_DL_PARAMS = {'paths': {'home': PATH_DIR_DOWNLOADS,
+ 'temp': NAME_DIR_TEMP},
+ 'format': YT_DOWNLOAD_FORMAT}
-QUOTA_COST_YOUTUBE_SEARCH = 100
-QUOTA_COST_YOUTUBE_DETAILS = 1
+QUOTA_COST_YOUTUBE_SEARCH: int = 100
+QUOTA_COST_YOUTUBE_DETAILS: int = 1
-to_download: list[str] = []
+to_download: list[VideoId] = []
-def ensure_expected_dirs_and_files():
+def ensure_expected_dirs_and_files() -> None:
"""Ensure existance of all dirs and files we need for proper operation."""
for dir_name in EXPECTED_DIRS:
if not path_exists(dir_name):
raise e
-def clean_unfinished_downloads():
+def clean_unfinished_downloads() -> None:
"""Empty temp directory of unfinished downloads."""
- for e in [e for e in scandir(PATH_DIR_TEMP) if e.is_file]:
+ for e in [e for e in scandir(PATH_DIR_TEMP) if isfile(e.path)]:
print(f'removing unfinished download: {e.path}')
os_remove(e.path)
-def run_server():
+def run_server() -> None:
"""Run HTTPServer on TaskHandler, handle KeyboardInterrupt as exit."""
server = HTTPServer(('localhost', HTTP_PORT), TaskHandler)
print(f'running at port {HTTP_PORT}')
server.server_close()
-def read_quota_log():
+def read_quota_log() -> QuotaLog:
"""Return logged quota expenditures of past 24 hours."""
with open(PATH_QUOTA_LOG, 'r', encoding='utf8') as f:
log = json_load(f)
return ret
-def update_quota_log(now, cost):
+def update_quota_log(now: str, cost: int) -> None:
"""Update quota log from read_quota_log, add cost to now's row."""
quota_log = read_quota_log()
quota_log[now] = quota_log.get(now, 0) + cost
json_dump(quota_log, f)
-def download_thread():
+def download_thread() -> None:
"""Keep iterating through to_download for IDs, download their videos."""
while True:
sleep(0.5)
try:
- video_id = to_download.pop(0)
+ video_id: VideoId = to_download.pop(0)
except IndexError:
continue
- url = f'{YOUTUBE_URL_PREFIX}{video_id}'
- fmt = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
- '/best[height<=1080][width<=1920]'
- params = {'paths': {'home': PATH_DIR_DOWNLOADS, 'temp': NAME_DIR_TEMP},
- 'format': fmt}
- with YoutubeDL(params) as ydl:
- ydl.download([url])
+ with YoutubeDL(YT_DL_PARAMS) as ydl:
+ ydl.download([f'{YOUTUBE_URL_PREFIX}{video_id}'])
class TaskHandler(BaseHTTPRequestHandler):
"""Handler for GET and POST requests to our server."""
- def _send_http(self, content=None, headers=None, code=200):
+ def _send_http(self,
+ content: bytes = b'',
+ headers: Optional[Headers] = None,
+ code: int = 200
+ ) -> None:
headers = headers if headers else []
self.send_response(code)
for header_tuple in headers:
self.send_header(header_tuple[0], header_tuple[1])
self.end_headers()
- if content is not None:
+ if content:
self.wfile.write(content)
- def do_POST(self): # pylint:disable=invalid-name
+ def do_POST(self) -> None: # pylint:disable=invalid-name
"""Send requests to YouTube API and cache them."""
length = int(self.headers['content-length'])
- postvars = parse_qs(self.rfile.read(length).decode())
- query = postvars['query'][0]
+ postvars: HttpPayload = parse_qs(self.rfile.read(length).decode())
+ query_txt: str = postvars['query'][0]
youtube = googleapiclient.discovery.build('youtube', 'v3',
developerKey=API_KEY)
now = datetime.now().strftime(TIMESTAMP_FMT)
+ # collect videos matching query, first details per result
update_quota_log(now, QUOTA_COST_YOUTUBE_SEARCH)
- request = youtube.search().list(part='snippet', maxResults=25, q=query,
- safeSearch='none', type='video')
- response = request.execute()
- to_save = {'text': query, 'retrieved_at': now, 'results': []}
- ids_for_details = []
+ search_request = youtube.search().list(
+ part='snippet', maxResults=25, q=query_txt, safeSearch='none',
+ type='video')
+ response = search_request.execute()
+ results: list[Result] = []
+ ids_for_details: list[VideoId] = []
for item in response['items']:
- video_id = item['id']['videoId']
+ video_id: VideoId = item['id']['videoId']
ids_for_details += [video_id]
- snippet = item['snippet']
- to_save['results'] += [{'id': video_id,
- 'title': snippet['title'],
- 'description': snippet['description'],
- 'published_at': snippet['publishedAt'],
- }]
- thumbnail_url = item['snippet']['thumbnails']['default']['url']
- store_at = path_join(PATH_DIR_THUMBNAILS, f'{video_id}.jpg')
- urlretrieve(thumbnail_url, store_at)
+ snippet: dict[str, str] = item['snippet']
+ result: Result = {'id': video_id,
+ 'title': snippet['title'],
+ 'description': snippet['description'],
+ 'published_at': snippet['publishedAt'],
+ }
+ results += [result]
+ urlretrieve(item['snippet']['thumbnails']['default']['url'],
+ path_join(PATH_DIR_THUMBNAILS, f'{video_id}.jpg'))
+ # collect more details for found videos
update_quota_log(now, QUOTA_COST_YOUTUBE_DETAILS)
- request = youtube.videos().list(id=','.join(ids_for_details),
- part='content_details')
- details = request.execute()
+ videos_request = youtube.videos().list(id=','.join(ids_for_details),
+ part='content_details')
+ details = videos_request.execute()
for i, detailed in enumerate(details['items']):
- item = to_save['results'][i]
- assert item['id'] == detailed['id']
- item['duration'] = detailed['contentDetails']['duration']
- item['definition'] = detailed['contentDetails']['definition']
+ results_item: Result = results[i]
+ assert results_item['id'] == detailed['id']
+ content_details: dict[str, str] = detailed['contentDetails']
+ results_item['duration'] = content_details['duration']
+ results_item['definition'] = content_details['definition']
- md5sum = md5(query.encode()).hexdigest()
- path = path_join(PATH_DIR_REQUESTS_CACHE, f'{md5sum}.json')
+ # store query, its datetime, and its results at hash of query
+ md5sum: str = md5(query_txt.encode()).hexdigest()
+ path: PathStr = path_join(PATH_DIR_REQUESTS_CACHE, f'{md5sum}.json')
with open(path, 'w', encoding='utf8') as f:
- json_dump(to_save, f)
+ json_dump({'text': query_txt,
+ 'retrieved_at': now,
+ 'results': results},
+ f)
self._send_http(headers=[('Location', f'/query/{md5sum}')], code=302)
- def do_GET(self): # pylint:disable=invalid-name
+ def do_GET(self) -> None: # pylint:disable=invalid-name
"""Map GET requests to handlers for various paths."""
parsed_url = urlparse(self.path)
- toks_url = parsed_url.path.split('/')
- page = toks_url[1]
+ toks_url: list[str] = parsed_url.path.split('/')
+ page_name: str = toks_url[1]
- if 'thumbnails' == page:
- filename = toks_url[2]
+ # on /thumbnails requests, return directly with bytes of stored files
+ if 'thumbnails' == page_name:
+ filename: str = toks_url[2]
with open(path_join(PATH_DIR_THUMBNAILS, filename), 'rb') as f:
- img = f.read()
+ img: bytes = f.read()
self._send_http(img, [('Content-type', 'image/jpg')])
return
- downloaded = {}
- for e in [e for e in scandir(PATH_DIR_DOWNLOADS) if e.is_file]:
+ # otherwise populate downloaded
+ downloaded: dict[VideoId, PathStr] = {}
+ for e in [e for e in scandir(PATH_DIR_DOWNLOADS) if isfile(e.path)]:
+ before_ext: str
before_ext, _ = splitext(e.path)
- id_ = before_ext.split('[')[-1].split(']')[0]
+ id_: VideoId = before_ext.split('[')[-1].split(']')[0]
downloaded[id_] = e.path
- if 'dl' == page:
- video_id = toks_url[2]
+ # on /dl, directly send video file if ID found, else add to to_download
+ if 'dl' == page_name:
+ video_id: VideoId = toks_url[2]
if video_id in downloaded:
- with open(downloaded[video_id], 'rb') as f:
- video = f.read()
+ with open(downloaded[video_id], 'rb') as video_file:
+ video: bytes = video_file.read()
self._send_http(content=video)
return
to_download.append(video_id)
- params = parse_qs(parsed_url.query)
- query_id = params.get('from_query', [''])[0]
- redir_path = f'/query/{query_id}' if query_id else '/'
+ params: HttpPayload = parse_qs(parsed_url.query)
+ dl_query_id: str = params.get('from_query', [''])[0]
+ redir_path = f'/query/{dl_query_id}' if dl_query_id else '/'
self._send_http(headers=[('Location', redir_path)], code=302)
return
- kwargs = {'quota_count': 0}
+ # otherwise, start template context with always-to-show quota count
+ quota_count = 0
+ tmpl_ctx: dict[str, int | str | Query | list[Query]] = {}
+ tmpl_ctx['quota_count'] = quota_count
for amount in read_quota_log().values():
- kwargs['quota_count'] += amount
- if 'query' == page:
+ quota_count += amount
+ tmpl_name: str
+
+ # on /query, load cached query data, calc result attributes to show
+ if 'query' == page_name:
tmpl_name = NAME_TEMPLATE_RESULTS
- kwargs['youtube_prefix'] = YOUTUBE_URL_PREFIX
- query_id = toks_url[2]
- kwargs['query_id'] = query_id
- path = path_join(PATH_DIR_REQUESTS_CACHE, f'{query_id}.json')
- with open(path, 'r', encoding='utf8') as f:
- query = json_load(f)
+ tmpl_ctx['youtube_prefix'] = YOUTUBE_URL_PREFIX
+ query_id: str = toks_url[2]
+ tmpl_ctx['query_id'] = query_id
+ with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{query_id}.json'),
+ 'r', encoding='utf8') as query_file:
+ query: dict = json_load(query_file)
for result in query['results']:
result['available'] = result['id'] in downloaded
date_dur, time_dur = result['duration'].split('T')
('D', 60*60*24)):
if dur_char in date_dur:
dur_str, date_dur = date_dur.split(dur_char)
- seconds += int(dur_str) * len_seconds
+ seconds += int(dur_str) * int(len_seconds)
for dur_char, len_seconds in (('H', 60*60),
('M', 60),
('S', 1)):
[f'0{str_}' if len(str_) == 1 else str_
for str_ in (hours_str, minutes_str, seconds_str)])
result['definition'] = result['definition'].upper()
- kwargs['query'] = query
+ tmpl_ctx['query'] = query
+
+ # on / or anything else, prepare listing of all queries
else:
tmpl_name = NAME_TEMPLATE_INDEX
- queries = []
+ queries: list[Query] = []
for file in [f for f in scandir(PATH_DIR_REQUESTS_CACHE)
- if f.is_file]:
+ if isfile(f.path)]:
id_, _ = splitext(basename(file.path))
- with open(file.path, 'r', encoding='utf8') as f:
- query = json_load(f)
- query['id'] = id_
- for result in query['results']:
+ with open(file.path, 'r', encoding='utf8') as query_file:
+ filed_query: Query = json_load(query_file)
+ filed_query['id'] = id_
+ assert isinstance(filed_query['results'], list)
+ for result in filed_query['results']:
result['available'] = result['id'] in downloaded
- query['downloads'] = len([result for result in query['results']
- if result['available']])
- queries += [query]
+ filed_query['downloads'] = len(
+ [result for result in query['results']
+ if result['available']])
+ queries += [filed_query]
queries.sort(key=lambda q: q['retrieved_at'], reverse=True)
- kwargs['queries'] = queries
- path = path_join(PATH_DIR_TEMPLATES, tmpl_name)
- with open(path, 'r', encoding='utf8') as f:
- tmpl = Template(f.read())
- html = tmpl.render(**kwargs)
+ tmpl_ctx['queries'] = queries
+
+ # render html from tmpl_name and tmpl_ctx
+ with open(path_join(PATH_DIR_TEMPLATES, tmpl_name),
+ 'r', encoding='utf8'
+ ) as templ_file:
+ tmpl = Template(str(templ_file.read()))
+ html: str = tmpl.render(**tmpl_ctx)
self._send_http(bytes(html, 'utf8'))
if __name__ == '__main__':
- to_download = []
ensure_expected_dirs_and_files()
clean_unfinished_downloads()
Thread(target=download_thread, daemon=False).start()