home · contact · privacy
Refactor code.
authorChristian Heller <c.heller@plomlompom.de>
Sun, 10 Nov 2024 15:23:51 +0000 (16:23 +0100)
committerChristian Heller <c.heller@plomlompom.de>
Sun, 10 Nov 2024 15:23:51 +0000 (16:23 +0100)
ytplom.py

index bfe09ae470fb65bb019af64f799c14e8e722f7ae..0d55a26fd6b0f606e5c973a5faa2959f2ff79d76 100755 (executable)
--- a/ytplom.py
+++ b/ytplom.py
@@ -16,13 +16,17 @@ from jinja2 import Template
 from yt_dlp import YoutubeDL  # type: ignore
 import googleapiclient.discovery  # type: ignore
 
-Query: TypeAlias = dict[str, str | int | list[dict[str, str]]]
-Result: TypeAlias = dict[str, str]
-QuotaLog: TypeAlias = dict[str, int]
-Headers: TypeAlias = list[tuple[str, str]]
-HttpPayload: TypeAlias = dict[str, list[str]]
+DatetimeStr: TypeAlias = str
+QuotaCost: TypeAlias = int
 VideoId: TypeAlias = str
-PathStr: TypeAlias = str
+FilePathStr: TypeAlias = str
+QueryId = str
+Result: TypeAlias = dict[str, str | bool]
+QueryData: TypeAlias = dict[QueryId, str | int | list[Result]]
+QuotaLog: TypeAlias = dict[DatetimeStr, QuotaCost]
+Header: TypeAlias = tuple[str, str]
+DownloadsDB = dict[VideoId, FilePathStr]
+TemplateContext = dict[str, int | str | QueryData | list[QueryData]]
 
 API_KEY = environ.get('GOOGLE_API_KEY')
 HTTP_PORT = 8083
@@ -36,11 +40,11 @@ NAME_DIR_TEMP = 'temp'
 NAME_TEMPLATE_INDEX = 'index.tmpl'
 NAME_TEMPLATE_RESULTS = 'results.tmpl'
 
-PATH_DIR_TEMP: PathStr = path_join(PATH_DIR_DOWNLOADS, NAME_DIR_TEMP)
+PATH_DIR_TEMP: FilePathStr = path_join(PATH_DIR_DOWNLOADS, NAME_DIR_TEMP)
 EXPECTED_DIRS = [PATH_DIR_DOWNLOADS, PATH_DIR_TEMP, PATH_DIR_THUMBNAILS,
                  PATH_DIR_REQUESTS_CACHE]
-PATH_TEMPLATE_INDEX: PathStr = path_join(PATH_DIR_TEMPLATES,
-                                         NAME_TEMPLATE_INDEX)
+PATH_TEMPLATE_INDEX: FilePathStr = path_join(PATH_DIR_TEMPLATES,
+                                             NAME_TEMPLATE_INDEX)
 TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
 YOUTUBE_URL_PREFIX = 'https://www.youtube.com/watch?v='
 YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
@@ -49,8 +53,8 @@ YT_DL_PARAMS = {'paths': {'home': PATH_DIR_DOWNLOADS,
                           'temp': NAME_DIR_TEMP},
                 'format': YT_DOWNLOAD_FORMAT}
 
-QUOTA_COST_YOUTUBE_SEARCH: int = 100
-QUOTA_COST_YOUTUBE_DETAILS: int = 1
+QUOTA_COST_YOUTUBE_SEARCH: QuotaCost = 100
+QUOTA_COST_YOUTUBE_DETAILS: QuotaCost = 1
 
 to_download: list[VideoId] = []
 
@@ -107,7 +111,7 @@ def read_quota_log() -> QuotaLog:
     return ret
 
 
-def update_quota_log(now: str, cost: int) -> None:
+def update_quota_log(now: DatetimeStr, cost: QuotaCost) -> None:
     """Update quota log from read_quota_log, add cost to now's row."""
     quota_log = read_quota_log()
     quota_log[now] = quota_log.get(now, 0) + cost
@@ -132,7 +136,7 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     def _send_http(self,
                    content: bytes = b'',
-                   headers: Optional[Headers] = None,
+                   headers: Optional[list[Header]] = None,
                    code: int = 200
                    ) -> None:
         headers = headers if headers else []
@@ -145,164 +149,185 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     def do_POST(self) -> None:  # pylint:disable=invalid-name
         """Send requests to YouTube API and cache them."""
-        length = int(self.headers['content-length'])
-        postvars: HttpPayload = parse_qs(self.rfile.read(length).decode())
-        query_txt: str = postvars['query'][0]
-        youtube = googleapiclient.discovery.build('youtube', 'v3',
-                                                  developerKey=API_KEY)
-        now = datetime.now().strftime(TIMESTAMP_FMT)
 
-        # collect videos matching query, first details per result
-        update_quota_log(now, QUOTA_COST_YOUTUBE_SEARCH)
-        search_request = youtube.search().list(
-                part='snippet', maxResults=25, q=query_txt, safeSearch='none',
-                type='video')
-        response = search_request.execute()
-        results: list[Result] = []
-        ids_for_details: list[VideoId] = []
-        for item in response['items']:
-            video_id: VideoId = item['id']['videoId']
-            ids_for_details += [video_id]
-            snippet: dict[str, str] = item['snippet']
-            result: Result = {'id': video_id,
-                              'title': snippet['title'],
-                              'description': snippet['description'],
-                              'published_at': snippet['publishedAt'],
-                              }
-            results += [result]
-            urlretrieve(item['snippet']['thumbnails']['default']['url'],
-                        path_join(PATH_DIR_THUMBNAILS, f'{video_id}.jpg'))
-
-        # collect more details for found videos
-        update_quota_log(now, QUOTA_COST_YOUTUBE_DETAILS)
-        videos_request = youtube.videos().list(id=','.join(ids_for_details),
-                                               part='content_details')
-        details = videos_request.execute()
-        for i, detailed in enumerate(details['items']):
-            results_item: Result = results[i]
-            assert results_item['id'] == detailed['id']
-            content_details: dict[str, str] = detailed['contentDetails']
-            results_item['duration'] = content_details['duration']
-            results_item['definition'] = content_details['definition']
-
-        # store query, its datetime, and its results at hash of query
-        md5sum: str = md5(query_txt.encode()).hexdigest()
-        path: PathStr = path_join(PATH_DIR_REQUESTS_CACHE, f'{md5sum}.json')
-        with open(path, 'w', encoding='utf8') as f:
-            json_dump({'text': query_txt,
-                       'retrieved_at': now,
-                       'results': results},
-                      f)
+        def store_at_filename_hashing_query(query_data: QueryData) -> QueryId:
+            md5sum = md5(query_txt.encode()).hexdigest()
+            with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{md5sum}.json'),
+                      'w', encoding='utf8') as f:
+                json_dump(query_data, f)
+            return md5sum
+
+        def collect_results(now: DatetimeStr, query_txt: str) -> list[Result]:
+            youtube = googleapiclient.discovery.build('youtube', 'v3',
+                                                      developerKey=API_KEY)
+            update_quota_log(now, QUOTA_COST_YOUTUBE_SEARCH)
+            search_request = youtube.search().list(
+                    q=query_txt,
+                    part='snippet',
+                    maxResults=25,
+                    safeSearch='none',
+                    type='video')
+            results = []
+            ids_to_detail: list[VideoId] = []
+            for item in search_request.execute()['items']:
+                video_id: VideoId = item['id']['videoId']
+                ids_to_detail += [video_id]
+                snippet: dict[str, str] = item['snippet']
+                result: Result = {'id': video_id,
+                                  'title': snippet['title'],
+                                  'description': snippet['description'],
+                                  'published_at': snippet['publishedAt'],
+                                  }
+                results += [result]
+                urlretrieve(item['snippet']['thumbnails']['default']['url'],
+                            path_join(PATH_DIR_THUMBNAILS, f'{video_id}.jpg'))
+            update_quota_log(now, QUOTA_COST_YOUTUBE_DETAILS)
+            videos_request = youtube.videos().list(id=','.join(ids_to_detail),
+                                                   part='content_details')
+            for i, detailed in enumerate(videos_request.execute()['items']):
+                results_item: Result = results[i]
+                assert results_item['id'] == detailed['id']
+                content_details: dict[str, str] = detailed['contentDetails']
+                results_item['duration'] = content_details['duration']
+                results_item['definition'] = content_details['definition']
+            return results
+
+        body_length = int(self.headers['content-length'])
+        postvars = parse_qs(self.rfile.read(body_length).decode())
+        query_txt = postvars['query'][0]
+        now = datetime.now().strftime(TIMESTAMP_FMT)
+        results = collect_results(now, query_txt)
+        md5sum = store_at_filename_hashing_query(
+                {'text': query_txt, 'retrieved_at': now, 'results': results})
         self._send_http(headers=[('Location', f'/query/{md5sum}')], code=302)
 
     def do_GET(self) -> None:  # pylint:disable=invalid-name
         """Map GET requests to handlers for various paths."""
-        parsed_url = urlparse(self.path)
-        toks_url: list[str] = parsed_url.path.split('/')
-        page_name: str = toks_url[1]
-
-        # on /thumbnails requests, return directly with bytes of stored files
+        url = urlparse(self.path)
+        toks_url: list[str] = url.path.split('/')
+        page_name = toks_url[1]
         if 'thumbnails' == page_name:
-            filename: str = toks_url[2]
-            with open(path_join(PATH_DIR_THUMBNAILS, filename), 'rb') as f:
-                img: bytes = f.read()
-            self._send_http(img, [('Content-type', 'image/jpg')])
-            return
-
-        # otherwise populate downloaded
-        downloaded: dict[VideoId, PathStr] = {}
-        for e in [e for e in scandir(PATH_DIR_DOWNLOADS) if isfile(e.path)]:
-            before_ext: str
-            before_ext, _ = splitext(e.path)
-            id_: VideoId = before_ext.split('[')[-1].split(']')[0]
-            downloaded[id_] = e.path
-
-        # on /dl, directly send video file if ID found, else add to to_download
+            self._send_thumbnail(toks_url[2])
         if 'dl' == page_name:
-            video_id: VideoId = toks_url[2]
-            if video_id in downloaded:
-                with open(downloaded[video_id], 'rb') as video_file:
-                    video: bytes = video_file.read()
-                self._send_http(content=video)
-                return
-            to_download.append(video_id)
-            params: HttpPayload = parse_qs(parsed_url.query)
-            dl_query_id: str = params.get('from_query', [''])[0]
-            redir_path = f'/query/{dl_query_id}' if dl_query_id else '/'
-            self._send_http(headers=[('Location', redir_path)], code=302)
-            return
-
-        # otherwise, start template context with always-to-show quota count
-        quota_count = 0
-        tmpl_ctx: dict[str, int | str | Query | list[Query]] = {}
-        tmpl_ctx['quota_count'] = quota_count
-        for amount in read_quota_log().values():
-            quota_count += amount
-        tmpl_name: str
-
-        # on /query, load cached query data, calc result attributes to show
+            self._send_or_download_video(toks_url[2], parse_qs(url.query))
         if 'query' == page_name:
-            tmpl_name = NAME_TEMPLATE_RESULTS
-            tmpl_ctx['youtube_prefix'] = YOUTUBE_URL_PREFIX
-            query_id: str = toks_url[2]
-            tmpl_ctx['query_id'] = query_id
-            with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{query_id}.json'),
-                      'r', encoding='utf8') as query_file:
-                query: dict = json_load(query_file)
-            for result in query['results']:
-                result['available'] = result['id'] in downloaded
-                date_dur, time_dur = result['duration'].split('T')
-                seconds = 0
-                date_dur = date_dur[1:]
-                for dur_char, len_seconds in (('Y', 60*60*24*365.25),
-                                              ('M', 60*60*24*30),
-                                              ('D', 60*60*24)):
-                    if dur_char in date_dur:
-                        dur_str, date_dur = date_dur.split(dur_char)
-                        seconds += int(dur_str) * int(len_seconds)
-                for dur_char, len_seconds in (('H', 60*60),
-                                              ('M', 60),
-                                              ('S', 1)):
-                    if dur_char in time_dur:
-                        dur_str, time_dur = time_dur.split(dur_char)
-                        seconds += int(dur_str) * len_seconds
-                seconds_str = str(seconds % 60)
-                minutes_str = str(seconds // 60)
-                hours_str = str(seconds // (60 * 60))
-                result['duration'] = ':'.join(
-                        [f'0{str_}' if len(str_) == 1 else str_
-                         for str_ in (hours_str, minutes_str, seconds_str)])
-                result['definition'] = result['definition'].upper()
-            tmpl_ctx['query'] = query
-
-        # on / or anything else, prepare listing of all queries
-        else:
-            tmpl_name = NAME_TEMPLATE_INDEX
-            queries: list[Query] = []
-            for file in [f for f in scandir(PATH_DIR_REQUESTS_CACHE)
-                         if isfile(f.path)]:
-                id_, _ = splitext(basename(file.path))
-                with open(file.path, 'r', encoding='utf8') as query_file:
-                    filed_query: Query = json_load(query_file)
-                filed_query['id'] = id_
-                assert isinstance(filed_query['results'], list)
-                for result in filed_query['results']:
-                    result['available'] = result['id'] in downloaded
-                filed_query['downloads'] = len(
-                        [result for result in query['results']
-                         if result['available']])
-                queries += [filed_query]
-            queries.sort(key=lambda q: q['retrieved_at'], reverse=True)
-            tmpl_ctx['queries'] = queries
-
-        # render html from tmpl_name and tmpl_ctx
+            self._send_query_page(toks_url[2])
+        else:  # e.g. for /
+            self._send_queries_index_and_search()
+
+    def _send_rendered_template(self,
+                                tmpl_name: FilePathStr,
+                                tmpl_ctx: TemplateContext
+                                ) -> None:
+        """Send HTML rendered from tmpl_name and tmpl_ctx"""
         with open(path_join(PATH_DIR_TEMPLATES, tmpl_name),
                   'r', encoding='utf8'
                   ) as templ_file:
             tmpl = Template(str(templ_file.read()))
-        html: str = tmpl.render(**tmpl_ctx)
+        html= tmpl.render(**tmpl_ctx)
         self._send_http(bytes(html, 'utf8'))
 
+    @staticmethod
+    def _make_template_context() -> TemplateContext:
+        """Create template context dictionary with current quota count."""
+        quota_count = 0
+        tmpl_ctx: TemplateContext = {'quota_count': quota_count}
+        for amount in read_quota_log().values():
+            quota_count += amount
+        return tmpl_ctx
+
+    def _make_downloads_db(self) -> DownloadsDB:
+        """Create dictionary of downloads mapping video IDs to file paths."""
+        downloads_db = {}
+        for e in [e for e in scandir(PATH_DIR_DOWNLOADS) if isfile(e.path)]:
+            before_ext, _ = splitext(e.path)
+            id_: VideoId = before_ext.split('[')[-1].split(']')[0]
+            downloads_db[id_] = e.path
+        return downloads_db
+
+    def _send_thumbnail(self, filename: FilePathStr) -> None:
+        """Send thumbnail file."""
+        with open(path_join(PATH_DIR_THUMBNAILS, filename), 'rb') as f:
+            img = f.read()
+        self._send_http(img, [('Content-type', 'image/jpg')])
+
+    def _send_or_download_video(self,
+                                video_id: VideoId,
+                                params: dict[str, list[str]]
+                                ) -> None:
+        """If in storage, send video of video_id, otherwise download."""
+        downloads_db = self._make_downloads_db()
+        if video_id in downloads_db:
+            with open(downloads_db[video_id], 'rb') as video_file:
+                video = video_file.read()
+            self._send_http(content=video)
+            return
+        to_download.append(video_id)
+        dl_query_id: QueryId = params.get('from_query', [''])[0]
+        redir_path = f'/query/{dl_query_id}' if dl_query_id else '/'
+        self._send_http(headers=[('Location', redir_path)], code=302)
+
+    def _send_query_page(self, query_id: QueryId) -> None:
+        """Load cached query+result data, calculate further attribute, send."""
+
+        def reformat_duration(duration_str: str):
+            date_dur, time_dur = duration_str.split('T')
+            seconds: int = 0
+            date_dur = date_dur[1:]
+            for dur_char, len_seconds in (('Y', 60*60*24*365.25),
+                                          ('M', 60*60*24*30),
+                                          ('D', 60*60*24)):
+                if dur_char in date_dur:
+                    dur_str, date_dur = date_dur.split(dur_char)
+                    seconds += int(dur_str) * int(len_seconds)
+            for dur_char, len_seconds in (('H', 60*60),
+                                          ('M', 60),
+                                          ('S', 1)):
+                if dur_char in time_dur:
+                    dur_str, time_dur = time_dur.split(dur_char)
+                    seconds += int(dur_str) * len_seconds
+            seconds_str = str(seconds % 60)
+            minutes_str = str(seconds // 60)
+            hours_str = str(seconds // (60 * 60))
+            return ':'.join([f'0{s}' if len(s) == 1 else s
+                             for s in (hours_str, minutes_str, seconds_str)])
+
+        downloads_db = self._make_downloads_db()
+        tmpl_ctx = self._make_template_context()
+        tmpl_ctx['youtube_prefix'] = YOUTUBE_URL_PREFIX
+        tmpl_ctx['query_id'] = query_id
+        with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{query_id}.json'),
+                  'r', encoding='utf8') as query_file:
+            query = json_load(query_file)
+        for result in query['results']:
+            result['available'] = result['id'] in downloads_db
+            result['duration'] = reformat_duration(result['duration'])
+            result['definition'] = result['definition'].upper()
+        tmpl_ctx['query'] = query
+        self._send_rendered_template(NAME_TEMPLATE_RESULTS, tmpl_ctx)
+
+    def _send_queries_index_and_search(self) -> None:
+        """Send listing of cached queries, search form for doing new ones."""
+        downloads_db = self._make_downloads_db()
+        tmpl_ctx = self._make_template_context()
+        queries: list[QueryData] = []
+        for file in [f for f in scandir(PATH_DIR_REQUESTS_CACHE)
+                     if isfile(f.path)]:
+            id_, _ = splitext(basename(file.path))
+            with open(file.path, 'r', encoding='utf8') as query_file:
+                filed_query: QueryData = json_load(query_file)
+            filed_query['id'] = id_
+            assert isinstance(filed_query['results'], list)
+            for result in filed_query['results']:
+                result['available'] = result['id'] in downloads_db
+            filed_query['downloads'] = len(
+                    [result for result in filed_query['results']
+                     if result['available']])
+            queries += [filed_query]
+        queries.sort(key=lambda q: q['retrieved_at'], reverse=True)
+        tmpl_ctx['queries'] = queries
+        self._send_rendered_template(NAME_TEMPLATE_INDEX, tmpl_ctx)
+
 
 if __name__ == '__main__':
     ensure_expected_dirs_and_files()