From: Christian Heller Date: Mon, 11 Nov 2024 07:37:30 +0000 (+0100) Subject: Refactor and add single video data view, only allow downloading from here. X-Git-Url: https://plomlompom.com/repos/%7B%7Bprefix%7D%7D/static/%7B%7Bdb.prefix%7D%7D/tasks?a=commitdiff_plain;h=956771c40bdf82cb1b7f80d3c38541f633051cf9;p=ytplom Refactor and add single video data view, only allow downloading from here. --- diff --git a/templates/index.tmpl b/templates/index.tmpl index 346b543..b34319d 100644 --- a/templates/index.tmpl +++ b/templates/index.tmpl @@ -1,6 +1,7 @@ +

queries · videos

quota: {{quota_count}}/100000

diff --git a/templates/results.tmpl b/templates/results.tmpl index c6cbec7..6b7ac2b 100644 --- a/templates/results.tmpl +++ b/templates/results.tmpl @@ -1,21 +1,21 @@ -

quota: {{quota_count}}/100000 · index
-query: {{query["text"]}}

+

queries · videos

+

query: {{query_text}}

-{% for result in query["results"] %} +{% for video in videos %} +{{video.definition}}
+{{video.duration}}
+{% if video.available %}[loaded]{% endif %} {% endfor %} diff --git a/templates/videos.tmpl b/templates/videos.tmpl index f1cac55..757e566 100644 --- a/templates/videos.tmpl +++ b/templates/videos.tmpl @@ -1,10 +1,11 @@ +

queries · videos

downloaded videos:

diff --git a/ytplom.py b/ytplom.py index b6d45de..320872c 100755 --- a/ytplom.py +++ b/ytplom.py @@ -21,13 +21,19 @@ QuotaCost = NewType('QuotaCost', int) VideoId = NewType('VideoId', str) FilePathStr = NewType('FilePathStr', str) QueryId = NewType('QueryId', str) -Result: TypeAlias = dict[str, str | bool] -QueryData: TypeAlias = dict[str, str | int | list[Result]] -QuotaLog: TypeAlias = dict[DatetimeStr, QuotaCost] +QueryText = NewType('QueryText', str) +AmountDownloads = NewType('AmountDownloads', int) +Result: TypeAlias = dict[str, str] Header: TypeAlias = tuple[str, str] +VideoData: TypeAlias = dict[str, str | bool] +QueryData: TypeAlias = dict[str, QueryId | QueryText | DatetimeStr + | AmountDownloads | list[Result]] +QuotaLog: TypeAlias = dict[DatetimeStr, QuotaCost] DownloadsDB = dict[VideoId, FilePathStr] -TemplateContext = dict[str, int | str | QueryData | list[QueryData] | - list[tuple[VideoId, FilePathStr]]] +TemplateContext = dict[str, str | QuotaCost | QueryData | VideoData | + list[QueryData] | list[VideoData] | + list[tuple[VideoId, FilePathStr]] | + list[tuple[QueryId, QueryText]]] API_KEY = environ.get('GOOGLE_API_KEY') HTTP_PORT = 8083 @@ -41,6 +47,7 @@ NAME_DIR_TEMP = FilePathStr('temp') NAME_TEMPLATE_INDEX = FilePathStr('index.tmpl') NAME_TEMPLATE_RESULTS = FilePathStr('results.tmpl') NAME_TEMPLATE_VIDEOS = FilePathStr('videos.tmpl') +NAME_TEMPLATE_VIDEO_ABOUT = FilePathStr('video_about.tmpl') PATH_DIR_TEMP = FilePathStr(path_join(PATH_DIR_DOWNLOADS, NAME_DIR_TEMP)) EXPECTED_DIRS = [PATH_DIR_DOWNLOADS, PATH_DIR_TEMP, PATH_DIR_THUMBNAILS, @@ -152,14 +159,9 @@ class TaskHandler(BaseHTTPRequestHandler): def do_POST(self) -> None: # pylint:disable=invalid-name """Send requests to YouTube API and cache them.""" - def store_at_filename_hashing_query(query_data: QueryData) -> QueryId: - md5sum = md5(query_txt.encode()).hexdigest() - with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{md5sum}.json'), - 'w', encoding='utf8') as f: - json_dump(query_data, f) - return QueryId(md5sum) - - def collect_results(now: DatetimeStr, query_txt: str) -> list[Result]: + def collect_results(now: DatetimeStr, + query_txt: QueryText + ) -> list[Result]: youtube = googleapiclient.discovery.build('youtube', 'v3', developerKey=API_KEY) update_quota_log(now, QUOTA_COST_YOUTUBE_SEARCH) @@ -196,11 +198,15 @@ class TaskHandler(BaseHTTPRequestHandler): body_length = int(self.headers['content-length']) postvars = parse_qs(self.rfile.read(body_length).decode()) - query_txt = postvars['query'][0] + query_txt = QueryText(postvars['query'][0]) now = DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)) results = collect_results(now, query_txt) - md5sum = store_at_filename_hashing_query( - {'text': query_txt, 'retrieved_at': now, 'results': results}) + md5sum = md5(str(query_txt).encode()).hexdigest() + with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{md5sum}.json'), + 'w', encoding='utf8') as f: + json_dump({'text': query_txt, + 'retrieved_at': now, + 'results': results}, f) self._send_http(headers=[('Location', f'/query/{md5sum}')], code=302) def do_GET(self) -> None: # pylint:disable=invalid-name @@ -211,10 +217,11 @@ class TaskHandler(BaseHTTPRequestHandler): if 'thumbnails' == page_name: self._send_thumbnail(FilePathStr(toks_url[2])) elif 'dl' == page_name: - self._send_or_download_video(VideoId(toks_url[2]), - parse_qs(url.query)) + self._send_or_download_video(VideoId(toks_url[2])) elif 'videos' == page_name: self._send_videos_index() + elif 'video_about' == page_name: + self._send_video_about(VideoId(toks_url[2])) elif 'query' == page_name: self._send_query_page(QueryId(toks_url[2])) else: # e.g. for / @@ -224,7 +231,6 @@ class TaskHandler(BaseHTTPRequestHandler): tmpl_name: FilePathStr, tmpl_ctx: TemplateContext ) -> None: - """Send HTML rendered from tmpl_name and tmpl_ctx""" with open(path_join(PATH_DIR_TEMPLATES, tmpl_name), 'r', encoding='utf8' ) as templ_file: @@ -232,20 +238,7 @@ class TaskHandler(BaseHTTPRequestHandler): html = tmpl.render(**tmpl_ctx) self._send_http(bytes(html, 'utf8')) - @staticmethod - def _make_template_context(with_quota_count: bool = True - ) -> TemplateContext: - """Create template context dictionary with current quota count.""" - tmpl_ctx: TemplateContext = {} - if with_quota_count: - quota_count = 0 - tmpl_ctx['quota_count'] = quota_count - for amount in read_quota_log().values(): - quota_count += amount - return tmpl_ctx - def _make_downloads_db(self) -> DownloadsDB: - """Create dictionary of downloads mapping video IDs to file paths.""" downloads_db = {} for e in [e for e in scandir(PATH_DIR_DOWNLOADS) if isfile(e.path)]: before_ext = splitext(e.path)[0] @@ -253,30 +246,27 @@ class TaskHandler(BaseHTTPRequestHandler): downloads_db[id_] = FilePathStr(e.path) return downloads_db - def _send_thumbnail(self, filename: FilePathStr) -> None: - """Send thumbnail file.""" - with open(path_join(PATH_DIR_THUMBNAILS, filename), 'rb') as f: - img = f.read() - self._send_http(img, [('Content-type', 'image/jpg')]) - - def _send_or_download_video(self, - video_id: VideoId, - params: dict[str, list[str]] - ) -> None: - """If in storage, send video of video_id, otherwise download.""" - downloads_db = self._make_downloads_db() - if video_id in downloads_db: - with open(downloads_db[video_id], 'rb') as video_file: - video = video_file.read() - self._send_http(content=video) - return - to_download.append(video_id) - dl_query_id = params.get('from_query', [''])[0] - redir_path = f'/query/{dl_query_id}' if dl_query_id else '/' - self._send_http(headers=[('Location', redir_path)], code=302) - - def _send_query_page(self, query_id: QueryId) -> None: - """Load cached query+result data, calculate further attribute, send.""" + def _harvest_queries(self) -> list[tuple[QueryId, list[Result], + QueryText, DatetimeStr]]: + queries_data = [] + for file in [f for f in scandir(PATH_DIR_REQUESTS_CACHE) + if isfile(f.path)]: + with open(file.path, 'r', encoding='utf8') as query_file: + filed_query: QueryData = json_load(query_file) + assert isinstance(filed_query['results'], list) + assert type(filed_query['text']) is QueryText + assert type(filed_query['retrieved_at']) is DatetimeStr + id_ = QueryId(splitext(basename(file.path))[0]) + results_list = filed_query['results'] + query_text = filed_query['text'] + retrieved_at = filed_query['retrieved_at'] + queries_data += [(id_, results_list, query_text, retrieved_at)] + return queries_data + + def _result_to_video_data(self, + result: Result, + downloads_db: DownloadsDB + ) -> VideoData: def reformat_duration(duration_str: str): date_dur, time_dur = duration_str.split('T') @@ -299,52 +289,78 @@ class TaskHandler(BaseHTTPRequestHandler): hours_str = str(seconds // (60 * 60)) return ':'.join([f'0{s}' if len(s) == 1 else s for s in (hours_str, minutes_str, seconds_str)]) + assert isinstance(result['duration'], str) + return { + 'id': result['id'], + 'available': result['id'] in downloads_db, + 'duration': reformat_duration(result['duration']), + 'title': result['title'], + 'definition': result['definition'] + } + + def _send_thumbnail(self, filename: FilePathStr) -> None: + with open(path_join(PATH_DIR_THUMBNAILS, filename), 'rb') as f: + img = f.read() + self._send_http(img, [('Content-type', 'image/jpg')]) + + def _send_or_download_video(self, video_id: VideoId) -> None: + downloads_db = self._make_downloads_db() + if video_id in downloads_db: + with open(downloads_db[video_id], 'rb') as video_file: + video = video_file.read() + self._send_http(content=video) + return + to_download.append(video_id) + self._send_http(headers=[('Location', f'/video_about/{video_id}')], + code=302) + def _send_query_page(self, query_id: QueryId) -> None: downloads_db = self._make_downloads_db() - tmpl_ctx = self._make_template_context() - tmpl_ctx['youtube_prefix'] = YOUTUBE_URL_PREFIX - tmpl_ctx['query_id'] = query_id with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{query_id}.json'), 'r', encoding='utf8') as query_file: query = json_load(query_file) - for result in query['results']: - result['available'] = result['id'] in downloads_db - result['duration'] = reformat_duration(result['duration']) - result['definition'] = result['definition'].upper() - tmpl_ctx['query'] = query - self._send_rendered_template(NAME_TEMPLATE_RESULTS, tmpl_ctx) + self._send_rendered_template( + NAME_TEMPLATE_RESULTS, + {'query': query['text'], + 'videos': [self._result_to_video_data(result, downloads_db) + for result in query['results']]}) def _send_queries_index_and_search(self) -> None: - """Send listing of cached queries, search form for doing new ones.""" downloads_db = self._make_downloads_db() - tmpl_ctx = self._make_template_context() - queries: list[QueryData] = [] - for file in [f for f in scandir(PATH_DIR_REQUESTS_CACHE) - if isfile(f.path)]: - id_ = splitext(basename(file.path))[0] - with open(file.path, 'r', encoding='utf8') as query_file: - filed_query: QueryData = json_load(query_file) - filed_query['id'] = id_ - assert isinstance(filed_query['results'], list) - for result in filed_query['results']: - result['available'] = result['id'] in downloads_db - filed_query['downloads'] = len( - [result for result in filed_query['results'] - if result['available']]) - queries += [filed_query] - queries.sort(key=lambda q: q['retrieved_at'], reverse=True) - tmpl_ctx['queries'] = queries - self._send_rendered_template(NAME_TEMPLATE_INDEX, tmpl_ctx) + queries_data: list[QueryData] = [] + for id_, results, query_text, timestamp in self._harvest_queries(): + queries_data += [ + {'id': id_, 'text': query_text, 'retrieved_at': timestamp, + 'downloads': AmountDownloads(len([ + r for r in results if r['id'] in downloads_db]))}] + queries_data.sort(key=lambda q: q['retrieved_at'], reverse=True) + self._send_rendered_template(NAME_TEMPLATE_INDEX, {'queries': + queries_data}) + + def _send_video_about(self, video_id: VideoId) -> None: + linked_queries: list[tuple[QueryId, QueryText]] = [] + first_result: Optional[Result] = None + for id_, results, query_text, _ in self._harvest_queries(): + for result in results: + if video_id == result['id']: + linked_queries += [(id_, query_text)] + first_result = first_result or result + if not first_result: + self._send_http(b'nothing found', code=404) + return + self._send_rendered_template( + NAME_TEMPLATE_VIDEO_ABOUT, + {'video_id': video_id, + 'youtube_prefix': YOUTUBE_URL_PREFIX, + 'queries': linked_queries, + 'video_data': self._result_to_video_data( + first_result, self._make_downloads_db())}) def _send_videos_index(self) -> None: - """Send listing of downloaded videos, linked to their /dl pages.""" - downloads_db = self._make_downloads_db() - tmpl_ctx = self._make_template_context(with_quota_count=False) videos = [(id_, FilePathStr(basename(path))) - for id_, path in downloads_db.items()] + for id_, path in self._make_downloads_db().items()] videos.sort(key=lambda t: t[1]) - tmpl_ctx['videos'] = videos - self._send_rendered_template(NAME_TEMPLATE_VIDEOS, tmpl_ctx) + self._send_rendered_template(NAME_TEMPLATE_VIDEOS, {'videos': videos}) if __name__ == '__main__':
- + -{{result.definition}}
-{{result.duration}}
-{% if result.available %}[loaded]{% else %}[LOAD]{% endif %}
-{{result.title}} · {{result.description}} +{{video.title}} · {{video.description}}