From: Christian Heller Date: Sun, 10 Nov 2024 15:23:51 +0000 (+0100) Subject: Refactor code. X-Git-Url: https://plomlompom.com/repos/%7B%7Bdb.prefix%7D%7D/static/%7B%7Bprefix%7D%7D/%7B%7Bdb.prefix%7D%7D/add_task?a=commitdiff_plain;h=deba5588e20efef6accfc6adeda328d9a45a2424;p=ytplom Refactor code. --- diff --git a/ytplom.py b/ytplom.py index bfe09ae..0d55a26 100755 --- a/ytplom.py +++ b/ytplom.py @@ -16,13 +16,17 @@ from jinja2 import Template from yt_dlp import YoutubeDL # type: ignore import googleapiclient.discovery # type: ignore -Query: TypeAlias = dict[str, str | int | list[dict[str, str]]] -Result: TypeAlias = dict[str, str] -QuotaLog: TypeAlias = dict[str, int] -Headers: TypeAlias = list[tuple[str, str]] -HttpPayload: TypeAlias = dict[str, list[str]] +DatetimeStr: TypeAlias = str +QuotaCost: TypeAlias = int VideoId: TypeAlias = str -PathStr: TypeAlias = str +FilePathStr: TypeAlias = str +QueryId = str +Result: TypeAlias = dict[str, str | bool] +QueryData: TypeAlias = dict[QueryId, str | int | list[Result]] +QuotaLog: TypeAlias = dict[DatetimeStr, QuotaCost] +Header: TypeAlias = tuple[str, str] +DownloadsDB = dict[VideoId, FilePathStr] +TemplateContext = dict[str, int | str | QueryData | list[QueryData]] API_KEY = environ.get('GOOGLE_API_KEY') HTTP_PORT = 8083 @@ -36,11 +40,11 @@ NAME_DIR_TEMP = 'temp' NAME_TEMPLATE_INDEX = 'index.tmpl' NAME_TEMPLATE_RESULTS = 'results.tmpl' -PATH_DIR_TEMP: PathStr = path_join(PATH_DIR_DOWNLOADS, NAME_DIR_TEMP) +PATH_DIR_TEMP: FilePathStr = path_join(PATH_DIR_DOWNLOADS, NAME_DIR_TEMP) EXPECTED_DIRS = [PATH_DIR_DOWNLOADS, PATH_DIR_TEMP, PATH_DIR_THUMBNAILS, PATH_DIR_REQUESTS_CACHE] -PATH_TEMPLATE_INDEX: PathStr = path_join(PATH_DIR_TEMPLATES, - NAME_TEMPLATE_INDEX) +PATH_TEMPLATE_INDEX: FilePathStr = path_join(PATH_DIR_TEMPLATES, + NAME_TEMPLATE_INDEX) TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f' YOUTUBE_URL_PREFIX = 'https://www.youtube.com/watch?v=' YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\ @@ -49,8 +53,8 @@ YT_DL_PARAMS = {'paths': {'home': PATH_DIR_DOWNLOADS, 'temp': NAME_DIR_TEMP}, 'format': YT_DOWNLOAD_FORMAT} -QUOTA_COST_YOUTUBE_SEARCH: int = 100 -QUOTA_COST_YOUTUBE_DETAILS: int = 1 +QUOTA_COST_YOUTUBE_SEARCH: QuotaCost = 100 +QUOTA_COST_YOUTUBE_DETAILS: QuotaCost = 1 to_download: list[VideoId] = [] @@ -107,7 +111,7 @@ def read_quota_log() -> QuotaLog: return ret -def update_quota_log(now: str, cost: int) -> None: +def update_quota_log(now: DatetimeStr, cost: QuotaCost) -> None: """Update quota log from read_quota_log, add cost to now's row.""" quota_log = read_quota_log() quota_log[now] = quota_log.get(now, 0) + cost @@ -132,7 +136,7 @@ class TaskHandler(BaseHTTPRequestHandler): def _send_http(self, content: bytes = b'', - headers: Optional[Headers] = None, + headers: Optional[list[Header]] = None, code: int = 200 ) -> None: headers = headers if headers else [] @@ -145,164 +149,185 @@ class TaskHandler(BaseHTTPRequestHandler): def do_POST(self) -> None: # pylint:disable=invalid-name """Send requests to YouTube API and cache them.""" - length = int(self.headers['content-length']) - postvars: HttpPayload = parse_qs(self.rfile.read(length).decode()) - query_txt: str = postvars['query'][0] - youtube = googleapiclient.discovery.build('youtube', 'v3', - developerKey=API_KEY) - now = datetime.now().strftime(TIMESTAMP_FMT) - # collect videos matching query, first details per result - update_quota_log(now, QUOTA_COST_YOUTUBE_SEARCH) - search_request = youtube.search().list( - part='snippet', maxResults=25, q=query_txt, safeSearch='none', - type='video') - response = search_request.execute() - results: list[Result] = [] - ids_for_details: list[VideoId] = [] - for item in response['items']: - video_id: VideoId = item['id']['videoId'] - ids_for_details += [video_id] - snippet: dict[str, str] = item['snippet'] - result: Result = {'id': video_id, - 'title': snippet['title'], - 'description': snippet['description'], - 'published_at': snippet['publishedAt'], - } - results += [result] - urlretrieve(item['snippet']['thumbnails']['default']['url'], - path_join(PATH_DIR_THUMBNAILS, f'{video_id}.jpg')) - - # collect more details for found videos - update_quota_log(now, QUOTA_COST_YOUTUBE_DETAILS) - videos_request = youtube.videos().list(id=','.join(ids_for_details), - part='content_details') - details = videos_request.execute() - for i, detailed in enumerate(details['items']): - results_item: Result = results[i] - assert results_item['id'] == detailed['id'] - content_details: dict[str, str] = detailed['contentDetails'] - results_item['duration'] = content_details['duration'] - results_item['definition'] = content_details['definition'] - - # store query, its datetime, and its results at hash of query - md5sum: str = md5(query_txt.encode()).hexdigest() - path: PathStr = path_join(PATH_DIR_REQUESTS_CACHE, f'{md5sum}.json') - with open(path, 'w', encoding='utf8') as f: - json_dump({'text': query_txt, - 'retrieved_at': now, - 'results': results}, - f) + def store_at_filename_hashing_query(query_data: QueryData) -> QueryId: + md5sum = md5(query_txt.encode()).hexdigest() + with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{md5sum}.json'), + 'w', encoding='utf8') as f: + json_dump(query_data, f) + return md5sum + + def collect_results(now: DatetimeStr, query_txt: str) -> list[Result]: + youtube = googleapiclient.discovery.build('youtube', 'v3', + developerKey=API_KEY) + update_quota_log(now, QUOTA_COST_YOUTUBE_SEARCH) + search_request = youtube.search().list( + q=query_txt, + part='snippet', + maxResults=25, + safeSearch='none', + type='video') + results = [] + ids_to_detail: list[VideoId] = [] + for item in search_request.execute()['items']: + video_id: VideoId = item['id']['videoId'] + ids_to_detail += [video_id] + snippet: dict[str, str] = item['snippet'] + result: Result = {'id': video_id, + 'title': snippet['title'], + 'description': snippet['description'], + 'published_at': snippet['publishedAt'], + } + results += [result] + urlretrieve(item['snippet']['thumbnails']['default']['url'], + path_join(PATH_DIR_THUMBNAILS, f'{video_id}.jpg')) + update_quota_log(now, QUOTA_COST_YOUTUBE_DETAILS) + videos_request = youtube.videos().list(id=','.join(ids_to_detail), + part='content_details') + for i, detailed in enumerate(videos_request.execute()['items']): + results_item: Result = results[i] + assert results_item['id'] == detailed['id'] + content_details: dict[str, str] = detailed['contentDetails'] + results_item['duration'] = content_details['duration'] + results_item['definition'] = content_details['definition'] + return results + + body_length = int(self.headers['content-length']) + postvars = parse_qs(self.rfile.read(body_length).decode()) + query_txt = postvars['query'][0] + now = datetime.now().strftime(TIMESTAMP_FMT) + results = collect_results(now, query_txt) + md5sum = store_at_filename_hashing_query( + {'text': query_txt, 'retrieved_at': now, 'results': results}) self._send_http(headers=[('Location', f'/query/{md5sum}')], code=302) def do_GET(self) -> None: # pylint:disable=invalid-name """Map GET requests to handlers for various paths.""" - parsed_url = urlparse(self.path) - toks_url: list[str] = parsed_url.path.split('/') - page_name: str = toks_url[1] - - # on /thumbnails requests, return directly with bytes of stored files + url = urlparse(self.path) + toks_url: list[str] = url.path.split('/') + page_name = toks_url[1] if 'thumbnails' == page_name: - filename: str = toks_url[2] - with open(path_join(PATH_DIR_THUMBNAILS, filename), 'rb') as f: - img: bytes = f.read() - self._send_http(img, [('Content-type', 'image/jpg')]) - return - - # otherwise populate downloaded - downloaded: dict[VideoId, PathStr] = {} - for e in [e for e in scandir(PATH_DIR_DOWNLOADS) if isfile(e.path)]: - before_ext: str - before_ext, _ = splitext(e.path) - id_: VideoId = before_ext.split('[')[-1].split(']')[0] - downloaded[id_] = e.path - - # on /dl, directly send video file if ID found, else add to to_download + self._send_thumbnail(toks_url[2]) if 'dl' == page_name: - video_id: VideoId = toks_url[2] - if video_id in downloaded: - with open(downloaded[video_id], 'rb') as video_file: - video: bytes = video_file.read() - self._send_http(content=video) - return - to_download.append(video_id) - params: HttpPayload = parse_qs(parsed_url.query) - dl_query_id: str = params.get('from_query', [''])[0] - redir_path = f'/query/{dl_query_id}' if dl_query_id else '/' - self._send_http(headers=[('Location', redir_path)], code=302) - return - - # otherwise, start template context with always-to-show quota count - quota_count = 0 - tmpl_ctx: dict[str, int | str | Query | list[Query]] = {} - tmpl_ctx['quota_count'] = quota_count - for amount in read_quota_log().values(): - quota_count += amount - tmpl_name: str - - # on /query, load cached query data, calc result attributes to show + self._send_or_download_video(toks_url[2], parse_qs(url.query)) if 'query' == page_name: - tmpl_name = NAME_TEMPLATE_RESULTS - tmpl_ctx['youtube_prefix'] = YOUTUBE_URL_PREFIX - query_id: str = toks_url[2] - tmpl_ctx['query_id'] = query_id - with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{query_id}.json'), - 'r', encoding='utf8') as query_file: - query: dict = json_load(query_file) - for result in query['results']: - result['available'] = result['id'] in downloaded - date_dur, time_dur = result['duration'].split('T') - seconds = 0 - date_dur = date_dur[1:] - for dur_char, len_seconds in (('Y', 60*60*24*365.25), - ('M', 60*60*24*30), - ('D', 60*60*24)): - if dur_char in date_dur: - dur_str, date_dur = date_dur.split(dur_char) - seconds += int(dur_str) * int(len_seconds) - for dur_char, len_seconds in (('H', 60*60), - ('M', 60), - ('S', 1)): - if dur_char in time_dur: - dur_str, time_dur = time_dur.split(dur_char) - seconds += int(dur_str) * len_seconds - seconds_str = str(seconds % 60) - minutes_str = str(seconds // 60) - hours_str = str(seconds // (60 * 60)) - result['duration'] = ':'.join( - [f'0{str_}' if len(str_) == 1 else str_ - for str_ in (hours_str, minutes_str, seconds_str)]) - result['definition'] = result['definition'].upper() - tmpl_ctx['query'] = query - - # on / or anything else, prepare listing of all queries - else: - tmpl_name = NAME_TEMPLATE_INDEX - queries: list[Query] = [] - for file in [f for f in scandir(PATH_DIR_REQUESTS_CACHE) - if isfile(f.path)]: - id_, _ = splitext(basename(file.path)) - with open(file.path, 'r', encoding='utf8') as query_file: - filed_query: Query = json_load(query_file) - filed_query['id'] = id_ - assert isinstance(filed_query['results'], list) - for result in filed_query['results']: - result['available'] = result['id'] in downloaded - filed_query['downloads'] = len( - [result for result in query['results'] - if result['available']]) - queries += [filed_query] - queries.sort(key=lambda q: q['retrieved_at'], reverse=True) - tmpl_ctx['queries'] = queries - - # render html from tmpl_name and tmpl_ctx + self._send_query_page(toks_url[2]) + else: # e.g. for / + self._send_queries_index_and_search() + + def _send_rendered_template(self, + tmpl_name: FilePathStr, + tmpl_ctx: TemplateContext + ) -> None: + """Send HTML rendered from tmpl_name and tmpl_ctx""" with open(path_join(PATH_DIR_TEMPLATES, tmpl_name), 'r', encoding='utf8' ) as templ_file: tmpl = Template(str(templ_file.read())) - html: str = tmpl.render(**tmpl_ctx) + html= tmpl.render(**tmpl_ctx) self._send_http(bytes(html, 'utf8')) + @staticmethod + def _make_template_context() -> TemplateContext: + """Create template context dictionary with current quota count.""" + quota_count = 0 + tmpl_ctx: TemplateContext = {'quota_count': quota_count} + for amount in read_quota_log().values(): + quota_count += amount + return tmpl_ctx + + def _make_downloads_db(self) -> DownloadsDB: + """Create dictionary of downloads mapping video IDs to file paths.""" + downloads_db = {} + for e in [e for e in scandir(PATH_DIR_DOWNLOADS) if isfile(e.path)]: + before_ext, _ = splitext(e.path) + id_: VideoId = before_ext.split('[')[-1].split(']')[0] + downloads_db[id_] = e.path + return downloads_db + + def _send_thumbnail(self, filename: FilePathStr) -> None: + """Send thumbnail file.""" + with open(path_join(PATH_DIR_THUMBNAILS, filename), 'rb') as f: + img = f.read() + self._send_http(img, [('Content-type', 'image/jpg')]) + + def _send_or_download_video(self, + video_id: VideoId, + params: dict[str, list[str]] + ) -> None: + """If in storage, send video of video_id, otherwise download.""" + downloads_db = self._make_downloads_db() + if video_id in downloads_db: + with open(downloads_db[video_id], 'rb') as video_file: + video = video_file.read() + self._send_http(content=video) + return + to_download.append(video_id) + dl_query_id: QueryId = params.get('from_query', [''])[0] + redir_path = f'/query/{dl_query_id}' if dl_query_id else '/' + self._send_http(headers=[('Location', redir_path)], code=302) + + def _send_query_page(self, query_id: QueryId) -> None: + """Load cached query+result data, calculate further attribute, send.""" + + def reformat_duration(duration_str: str): + date_dur, time_dur = duration_str.split('T') + seconds: int = 0 + date_dur = date_dur[1:] + for dur_char, len_seconds in (('Y', 60*60*24*365.25), + ('M', 60*60*24*30), + ('D', 60*60*24)): + if dur_char in date_dur: + dur_str, date_dur = date_dur.split(dur_char) + seconds += int(dur_str) * int(len_seconds) + for dur_char, len_seconds in (('H', 60*60), + ('M', 60), + ('S', 1)): + if dur_char in time_dur: + dur_str, time_dur = time_dur.split(dur_char) + seconds += int(dur_str) * len_seconds + seconds_str = str(seconds % 60) + minutes_str = str(seconds // 60) + hours_str = str(seconds // (60 * 60)) + return ':'.join([f'0{s}' if len(s) == 1 else s + for s in (hours_str, minutes_str, seconds_str)]) + + downloads_db = self._make_downloads_db() + tmpl_ctx = self._make_template_context() + tmpl_ctx['youtube_prefix'] = YOUTUBE_URL_PREFIX + tmpl_ctx['query_id'] = query_id + with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{query_id}.json'), + 'r', encoding='utf8') as query_file: + query = json_load(query_file) + for result in query['results']: + result['available'] = result['id'] in downloads_db + result['duration'] = reformat_duration(result['duration']) + result['definition'] = result['definition'].upper() + tmpl_ctx['query'] = query + self._send_rendered_template(NAME_TEMPLATE_RESULTS, tmpl_ctx) + + def _send_queries_index_and_search(self) -> None: + """Send listing of cached queries, search form for doing new ones.""" + downloads_db = self._make_downloads_db() + tmpl_ctx = self._make_template_context() + queries: list[QueryData] = [] + for file in [f for f in scandir(PATH_DIR_REQUESTS_CACHE) + if isfile(f.path)]: + id_, _ = splitext(basename(file.path)) + with open(file.path, 'r', encoding='utf8') as query_file: + filed_query: QueryData = json_load(query_file) + filed_query['id'] = id_ + assert isinstance(filed_query['results'], list) + for result in filed_query['results']: + result['available'] = result['id'] in downloads_db + filed_query['downloads'] = len( + [result for result in filed_query['results'] + if result['available']]) + queries += [filed_query] + queries.sort(key=lambda q: q['retrieved_at'], reverse=True) + tmpl_ctx['queries'] = queries + self._send_rendered_template(NAME_TEMPLATE_INDEX, tmpl_ctx) + if __name__ == '__main__': ensure_expected_dirs_and_files()