From: Christian Heller Date: Sat, 9 Nov 2024 07:43:31 +0000 (+0100) Subject: Improve code. X-Git-Url: https://plomlompom.com/repos/%7B%7Bdb.prefix%7D%7D/static/pick_tasks?a=commitdiff_plain;h=79b9f21922f5ee3bd41f3eb10d0c13772d0944e0;p=ytplom Improve code. --- diff --git a/ytplom.py b/ytplom.py index 4bb369d..bfe09ae 100755 --- a/ytplom.py +++ b/ytplom.py @@ -1,8 +1,9 @@ #!/usr/bin/env python3 """Minimalistic download-focused YouTube interface.""" +from typing import TypeAlias, Optional from os import environ, makedirs, scandir, remove as os_remove -from os.path import (isdir, exists as path_exists, join as path_join, splitext, - basename) +from os.path import (isdir, isfile, exists as path_exists, join as path_join, + splitext, basename) from time import sleep from json import load as json_load, dump as json_dump from datetime import datetime, timedelta @@ -15,9 +16,17 @@ from jinja2 import Template from yt_dlp import YoutubeDL # type: ignore import googleapiclient.discovery # type: ignore -API_KEY = environ.get('GOOGLE_API_KEY') +Query: TypeAlias = dict[str, str | int | list[dict[str, str]]] +Result: TypeAlias = dict[str, str] +QuotaLog: TypeAlias = dict[str, int] +Headers: TypeAlias = list[tuple[str, str]] +HttpPayload: TypeAlias = dict[str, list[str]] +VideoId: TypeAlias = str +PathStr: TypeAlias = str +API_KEY = environ.get('GOOGLE_API_KEY') HTTP_PORT = 8083 + PATH_QUOTA_LOG = 'quota_log.json' PATH_DIR_DOWNLOADS = 'downloads' PATH_DIR_THUMBNAILS = 'thumbnails' @@ -27,20 +36,26 @@ NAME_DIR_TEMP = 'temp' NAME_TEMPLATE_INDEX = 'index.tmpl' NAME_TEMPLATE_RESULTS = 'results.tmpl' -PATH_DIR_TEMP = path_join(PATH_DIR_DOWNLOADS, NAME_DIR_TEMP) +PATH_DIR_TEMP: PathStr = path_join(PATH_DIR_DOWNLOADS, NAME_DIR_TEMP) EXPECTED_DIRS = [PATH_DIR_DOWNLOADS, PATH_DIR_TEMP, PATH_DIR_THUMBNAILS, PATH_DIR_REQUESTS_CACHE] -PATH_TEMPLATE_INDEX = path_join(PATH_DIR_TEMPLATES, NAME_TEMPLATE_INDEX) +PATH_TEMPLATE_INDEX: PathStr = path_join(PATH_DIR_TEMPLATES, + NAME_TEMPLATE_INDEX) TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f' YOUTUBE_URL_PREFIX = 'https://www.youtube.com/watch?v=' +YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\ + '/best[height<=1080][width<=1920]' +YT_DL_PARAMS = {'paths': {'home': PATH_DIR_DOWNLOADS, + 'temp': NAME_DIR_TEMP}, + 'format': YT_DOWNLOAD_FORMAT} -QUOTA_COST_YOUTUBE_SEARCH = 100 -QUOTA_COST_YOUTUBE_DETAILS = 1 +QUOTA_COST_YOUTUBE_SEARCH: int = 100 +QUOTA_COST_YOUTUBE_DETAILS: int = 1 -to_download: list[str] = [] +to_download: list[VideoId] = [] -def ensure_expected_dirs_and_files(): +def ensure_expected_dirs_and_files() -> None: """Ensure existance of all dirs and files we need for proper operation.""" for dir_name in EXPECTED_DIRS: if not path_exists(dir_name): @@ -60,14 +75,14 @@ def ensure_expected_dirs_and_files(): raise e -def clean_unfinished_downloads(): +def clean_unfinished_downloads() -> None: """Empty temp directory of unfinished downloads.""" - for e in [e for e in scandir(PATH_DIR_TEMP) if e.is_file]: + for e in [e for e in scandir(PATH_DIR_TEMP) if isfile(e.path)]: print(f'removing unfinished download: {e.path}') os_remove(e.path) -def run_server(): +def run_server() -> None: """Run HTTPServer on TaskHandler, handle KeyboardInterrupt as exit.""" server = HTTPServer(('localhost', HTTP_PORT), TaskHandler) print(f'running at port {HTTP_PORT}') @@ -79,7 +94,7 @@ def run_server(): server.server_close() -def read_quota_log(): +def read_quota_log() -> QuotaLog: """Return logged quota expenditures of past 24 hours.""" with open(PATH_QUOTA_LOG, 'r', encoding='utf8') as f: log = json_load(f) @@ -92,7 +107,7 @@ def read_quota_log(): return ret -def update_quota_log(now, cost): +def update_quota_log(now: str, cost: int) -> None: """Update quota log from read_quota_log, add cost to now's row.""" quota_log = read_quota_log() quota_log[now] = quota_log.get(now, 0) + cost @@ -100,123 +115,140 @@ def update_quota_log(now, cost): json_dump(quota_log, f) -def download_thread(): +def download_thread() -> None: """Keep iterating through to_download for IDs, download their videos.""" while True: sleep(0.5) try: - video_id = to_download.pop(0) + video_id: VideoId = to_download.pop(0) except IndexError: continue - url = f'{YOUTUBE_URL_PREFIX}{video_id}' - fmt = 'bestvideo[height<=1080][width<=1920]+bestaudio'\ - '/best[height<=1080][width<=1920]' - params = {'paths': {'home': PATH_DIR_DOWNLOADS, 'temp': NAME_DIR_TEMP}, - 'format': fmt} - with YoutubeDL(params) as ydl: - ydl.download([url]) + with YoutubeDL(YT_DL_PARAMS) as ydl: + ydl.download([f'{YOUTUBE_URL_PREFIX}{video_id}']) class TaskHandler(BaseHTTPRequestHandler): """Handler for GET and POST requests to our server.""" - def _send_http(self, content=None, headers=None, code=200): + def _send_http(self, + content: bytes = b'', + headers: Optional[Headers] = None, + code: int = 200 + ) -> None: headers = headers if headers else [] self.send_response(code) for header_tuple in headers: self.send_header(header_tuple[0], header_tuple[1]) self.end_headers() - if content is not None: + if content: self.wfile.write(content) - def do_POST(self): # pylint:disable=invalid-name + def do_POST(self) -> None: # pylint:disable=invalid-name """Send requests to YouTube API and cache them.""" length = int(self.headers['content-length']) - postvars = parse_qs(self.rfile.read(length).decode()) - query = postvars['query'][0] + postvars: HttpPayload = parse_qs(self.rfile.read(length).decode()) + query_txt: str = postvars['query'][0] youtube = googleapiclient.discovery.build('youtube', 'v3', developerKey=API_KEY) now = datetime.now().strftime(TIMESTAMP_FMT) + # collect videos matching query, first details per result update_quota_log(now, QUOTA_COST_YOUTUBE_SEARCH) - request = youtube.search().list(part='snippet', maxResults=25, q=query, - safeSearch='none', type='video') - response = request.execute() - to_save = {'text': query, 'retrieved_at': now, 'results': []} - ids_for_details = [] + search_request = youtube.search().list( + part='snippet', maxResults=25, q=query_txt, safeSearch='none', + type='video') + response = search_request.execute() + results: list[Result] = [] + ids_for_details: list[VideoId] = [] for item in response['items']: - video_id = item['id']['videoId'] + video_id: VideoId = item['id']['videoId'] ids_for_details += [video_id] - snippet = item['snippet'] - to_save['results'] += [{'id': video_id, - 'title': snippet['title'], - 'description': snippet['description'], - 'published_at': snippet['publishedAt'], - }] - thumbnail_url = item['snippet']['thumbnails']['default']['url'] - store_at = path_join(PATH_DIR_THUMBNAILS, f'{video_id}.jpg') - urlretrieve(thumbnail_url, store_at) + snippet: dict[str, str] = item['snippet'] + result: Result = {'id': video_id, + 'title': snippet['title'], + 'description': snippet['description'], + 'published_at': snippet['publishedAt'], + } + results += [result] + urlretrieve(item['snippet']['thumbnails']['default']['url'], + path_join(PATH_DIR_THUMBNAILS, f'{video_id}.jpg')) + # collect more details for found videos update_quota_log(now, QUOTA_COST_YOUTUBE_DETAILS) - request = youtube.videos().list(id=','.join(ids_for_details), - part='content_details') - details = request.execute() + videos_request = youtube.videos().list(id=','.join(ids_for_details), + part='content_details') + details = videos_request.execute() for i, detailed in enumerate(details['items']): - item = to_save['results'][i] - assert item['id'] == detailed['id'] - item['duration'] = detailed['contentDetails']['duration'] - item['definition'] = detailed['contentDetails']['definition'] + results_item: Result = results[i] + assert results_item['id'] == detailed['id'] + content_details: dict[str, str] = detailed['contentDetails'] + results_item['duration'] = content_details['duration'] + results_item['definition'] = content_details['definition'] - md5sum = md5(query.encode()).hexdigest() - path = path_join(PATH_DIR_REQUESTS_CACHE, f'{md5sum}.json') + # store query, its datetime, and its results at hash of query + md5sum: str = md5(query_txt.encode()).hexdigest() + path: PathStr = path_join(PATH_DIR_REQUESTS_CACHE, f'{md5sum}.json') with open(path, 'w', encoding='utf8') as f: - json_dump(to_save, f) + json_dump({'text': query_txt, + 'retrieved_at': now, + 'results': results}, + f) self._send_http(headers=[('Location', f'/query/{md5sum}')], code=302) - def do_GET(self): # pylint:disable=invalid-name + def do_GET(self) -> None: # pylint:disable=invalid-name """Map GET requests to handlers for various paths.""" parsed_url = urlparse(self.path) - toks_url = parsed_url.path.split('/') - page = toks_url[1] + toks_url: list[str] = parsed_url.path.split('/') + page_name: str = toks_url[1] - if 'thumbnails' == page: - filename = toks_url[2] + # on /thumbnails requests, return directly with bytes of stored files + if 'thumbnails' == page_name: + filename: str = toks_url[2] with open(path_join(PATH_DIR_THUMBNAILS, filename), 'rb') as f: - img = f.read() + img: bytes = f.read() self._send_http(img, [('Content-type', 'image/jpg')]) return - downloaded = {} - for e in [e for e in scandir(PATH_DIR_DOWNLOADS) if e.is_file]: + # otherwise populate downloaded + downloaded: dict[VideoId, PathStr] = {} + for e in [e for e in scandir(PATH_DIR_DOWNLOADS) if isfile(e.path)]: + before_ext: str before_ext, _ = splitext(e.path) - id_ = before_ext.split('[')[-1].split(']')[0] + id_: VideoId = before_ext.split('[')[-1].split(']')[0] downloaded[id_] = e.path - if 'dl' == page: - video_id = toks_url[2] + # on /dl, directly send video file if ID found, else add to to_download + if 'dl' == page_name: + video_id: VideoId = toks_url[2] if video_id in downloaded: - with open(downloaded[video_id], 'rb') as f: - video = f.read() + with open(downloaded[video_id], 'rb') as video_file: + video: bytes = video_file.read() self._send_http(content=video) return to_download.append(video_id) - params = parse_qs(parsed_url.query) - query_id = params.get('from_query', [''])[0] - redir_path = f'/query/{query_id}' if query_id else '/' + params: HttpPayload = parse_qs(parsed_url.query) + dl_query_id: str = params.get('from_query', [''])[0] + redir_path = f'/query/{dl_query_id}' if dl_query_id else '/' self._send_http(headers=[('Location', redir_path)], code=302) return - kwargs = {'quota_count': 0} + # otherwise, start template context with always-to-show quota count + quota_count = 0 + tmpl_ctx: dict[str, int | str | Query | list[Query]] = {} + tmpl_ctx['quota_count'] = quota_count for amount in read_quota_log().values(): - kwargs['quota_count'] += amount - if 'query' == page: + quota_count += amount + tmpl_name: str + + # on /query, load cached query data, calc result attributes to show + if 'query' == page_name: tmpl_name = NAME_TEMPLATE_RESULTS - kwargs['youtube_prefix'] = YOUTUBE_URL_PREFIX - query_id = toks_url[2] - kwargs['query_id'] = query_id - path = path_join(PATH_DIR_REQUESTS_CACHE, f'{query_id}.json') - with open(path, 'r', encoding='utf8') as f: - query = json_load(f) + tmpl_ctx['youtube_prefix'] = YOUTUBE_URL_PREFIX + query_id: str = toks_url[2] + tmpl_ctx['query_id'] = query_id + with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{query_id}.json'), + 'r', encoding='utf8') as query_file: + query: dict = json_load(query_file) for result in query['results']: result['available'] = result['id'] in downloaded date_dur, time_dur = result['duration'].split('T') @@ -227,7 +259,7 @@ class TaskHandler(BaseHTTPRequestHandler): ('D', 60*60*24)): if dur_char in date_dur: dur_str, date_dur = date_dur.split(dur_char) - seconds += int(dur_str) * len_seconds + seconds += int(dur_str) * int(len_seconds) for dur_char, len_seconds in (('H', 60*60), ('M', 60), ('S', 1)): @@ -241,32 +273,38 @@ class TaskHandler(BaseHTTPRequestHandler): [f'0{str_}' if len(str_) == 1 else str_ for str_ in (hours_str, minutes_str, seconds_str)]) result['definition'] = result['definition'].upper() - kwargs['query'] = query + tmpl_ctx['query'] = query + + # on / or anything else, prepare listing of all queries else: tmpl_name = NAME_TEMPLATE_INDEX - queries = [] + queries: list[Query] = [] for file in [f for f in scandir(PATH_DIR_REQUESTS_CACHE) - if f.is_file]: + if isfile(f.path)]: id_, _ = splitext(basename(file.path)) - with open(file.path, 'r', encoding='utf8') as f: - query = json_load(f) - query['id'] = id_ - for result in query['results']: + with open(file.path, 'r', encoding='utf8') as query_file: + filed_query: Query = json_load(query_file) + filed_query['id'] = id_ + assert isinstance(filed_query['results'], list) + for result in filed_query['results']: result['available'] = result['id'] in downloaded - query['downloads'] = len([result for result in query['results'] - if result['available']]) - queries += [query] + filed_query['downloads'] = len( + [result for result in query['results'] + if result['available']]) + queries += [filed_query] queries.sort(key=lambda q: q['retrieved_at'], reverse=True) - kwargs['queries'] = queries - path = path_join(PATH_DIR_TEMPLATES, tmpl_name) - with open(path, 'r', encoding='utf8') as f: - tmpl = Template(f.read()) - html = tmpl.render(**kwargs) + tmpl_ctx['queries'] = queries + + # render html from tmpl_name and tmpl_ctx + with open(path_join(PATH_DIR_TEMPLATES, tmpl_name), + 'r', encoding='utf8' + ) as templ_file: + tmpl = Template(str(templ_file.read())) + html: str = tmpl.render(**tmpl_ctx) self._send_http(bytes(html, 'utf8')) if __name__ == '__main__': - to_download = [] ensure_expected_dirs_and_files() clean_unfinished_downloads() Thread(target=download_thread, daemon=False).start()