home · contact · privacy
Improve code.
authorChristian Heller <c.heller@plomlompom.de>
Sat, 9 Nov 2024 07:43:31 +0000 (08:43 +0100)
committerChristian Heller <c.heller@plomlompom.de>
Sat, 9 Nov 2024 07:43:31 +0000 (08:43 +0100)
ytplom.py

index 4bb369da76f2bc94c9000dce34c47c1d7b8e3b91..bfe09ae470fb65bb019af64f799c14e8e722f7ae 100755 (executable)
--- a/ytplom.py
+++ b/ytplom.py
@@ -1,8 +1,9 @@
 #!/usr/bin/env python3
 """Minimalistic download-focused YouTube interface."""
+from typing import TypeAlias, Optional
 from os import environ, makedirs, scandir, remove as os_remove
-from os.path import (isdir, exists as path_exists, join as path_join, splitext,
-                     basename)
+from os.path import (isdir, isfile, exists as path_exists, join as path_join,
+                     splitext, basename)
 from time import sleep
 from json import load as json_load, dump as json_dump
 from datetime import datetime, timedelta
@@ -15,9 +16,17 @@ from jinja2 import Template
 from yt_dlp import YoutubeDL  # type: ignore
 import googleapiclient.discovery  # type: ignore
 
-API_KEY = environ.get('GOOGLE_API_KEY')
+Query: TypeAlias = dict[str, str | int | list[dict[str, str]]]
+Result: TypeAlias = dict[str, str]
+QuotaLog: TypeAlias = dict[str, int]
+Headers: TypeAlias = list[tuple[str, str]]
+HttpPayload: TypeAlias = dict[str, list[str]]
+VideoId: TypeAlias = str
+PathStr: TypeAlias = str
 
+API_KEY = environ.get('GOOGLE_API_KEY')
 HTTP_PORT = 8083
+
 PATH_QUOTA_LOG = 'quota_log.json'
 PATH_DIR_DOWNLOADS = 'downloads'
 PATH_DIR_THUMBNAILS = 'thumbnails'
@@ -27,20 +36,26 @@ NAME_DIR_TEMP = 'temp'
 NAME_TEMPLATE_INDEX = 'index.tmpl'
 NAME_TEMPLATE_RESULTS = 'results.tmpl'
 
-PATH_DIR_TEMP = path_join(PATH_DIR_DOWNLOADS, NAME_DIR_TEMP)
+PATH_DIR_TEMP: PathStr = path_join(PATH_DIR_DOWNLOADS, NAME_DIR_TEMP)
 EXPECTED_DIRS = [PATH_DIR_DOWNLOADS, PATH_DIR_TEMP, PATH_DIR_THUMBNAILS,
                  PATH_DIR_REQUESTS_CACHE]
-PATH_TEMPLATE_INDEX = path_join(PATH_DIR_TEMPLATES, NAME_TEMPLATE_INDEX)
+PATH_TEMPLATE_INDEX: PathStr = path_join(PATH_DIR_TEMPLATES,
+                                         NAME_TEMPLATE_INDEX)
 TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
 YOUTUBE_URL_PREFIX = 'https://www.youtube.com/watch?v='
+YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
+        '/best[height<=1080][width<=1920]'
+YT_DL_PARAMS = {'paths': {'home': PATH_DIR_DOWNLOADS,
+                          'temp': NAME_DIR_TEMP},
+                'format': YT_DOWNLOAD_FORMAT}
 
-QUOTA_COST_YOUTUBE_SEARCH = 100
-QUOTA_COST_YOUTUBE_DETAILS = 1
+QUOTA_COST_YOUTUBE_SEARCH: int = 100
+QUOTA_COST_YOUTUBE_DETAILS: int = 1
 
-to_download: list[str] = []
+to_download: list[VideoId] = []
 
 
-def ensure_expected_dirs_and_files():
+def ensure_expected_dirs_and_files() -> None:
     """Ensure existance of all dirs and files we need for proper operation."""
     for dir_name in EXPECTED_DIRS:
         if not path_exists(dir_name):
@@ -60,14 +75,14 @@ def ensure_expected_dirs_and_files():
             raise e
 
 
-def clean_unfinished_downloads():
+def clean_unfinished_downloads() -> None:
     """Empty temp directory of unfinished downloads."""
-    for e in [e for e in scandir(PATH_DIR_TEMP) if e.is_file]:
+    for e in [e for e in scandir(PATH_DIR_TEMP) if isfile(e.path)]:
         print(f'removing unfinished download: {e.path}')
         os_remove(e.path)
 
 
-def run_server():
+def run_server() -> None:
     """Run HTTPServer on TaskHandler, handle KeyboardInterrupt as exit."""
     server = HTTPServer(('localhost', HTTP_PORT), TaskHandler)
     print(f'running at port {HTTP_PORT}')
@@ -79,7 +94,7 @@ def run_server():
     server.server_close()
 
 
-def read_quota_log():
+def read_quota_log() -> QuotaLog:
     """Return logged quota expenditures of past 24 hours."""
     with open(PATH_QUOTA_LOG, 'r', encoding='utf8') as f:
         log = json_load(f)
@@ -92,7 +107,7 @@ def read_quota_log():
     return ret
 
 
-def update_quota_log(now, cost):
+def update_quota_log(now: str, cost: int) -> None:
     """Update quota log from read_quota_log, add cost to now's row."""
     quota_log = read_quota_log()
     quota_log[now] = quota_log.get(now, 0) + cost
@@ -100,123 +115,140 @@ def update_quota_log(now, cost):
         json_dump(quota_log, f)
 
 
-def download_thread():
+def download_thread() -> None:
     """Keep iterating through to_download for IDs, download their videos."""
     while True:
         sleep(0.5)
         try:
-            video_id = to_download.pop(0)
+            video_id: VideoId = to_download.pop(0)
         except IndexError:
             continue
-        url = f'{YOUTUBE_URL_PREFIX}{video_id}'
-        fmt = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
-              '/best[height<=1080][width<=1920]'
-        params = {'paths': {'home': PATH_DIR_DOWNLOADS, 'temp': NAME_DIR_TEMP},
-                  'format': fmt}
-        with YoutubeDL(params) as ydl:
-            ydl.download([url])
+        with YoutubeDL(YT_DL_PARAMS) as ydl:
+            ydl.download([f'{YOUTUBE_URL_PREFIX}{video_id}'])
 
 
 class TaskHandler(BaseHTTPRequestHandler):
     """Handler for GET and POST requests to our server."""
 
-    def _send_http(self, content=None, headers=None, code=200):
+    def _send_http(self,
+                   content: bytes = b'',
+                   headers: Optional[Headers] = None,
+                   code: int = 200
+                   ) -> None:
         headers = headers if headers else []
         self.send_response(code)
         for header_tuple in headers:
             self.send_header(header_tuple[0], header_tuple[1])
         self.end_headers()
-        if content is not None:
+        if content:
             self.wfile.write(content)
 
-    def do_POST(self):  # pylint:disable=invalid-name
+    def do_POST(self) -> None:  # pylint:disable=invalid-name
         """Send requests to YouTube API and cache them."""
         length = int(self.headers['content-length'])
-        postvars = parse_qs(self.rfile.read(length).decode())
-        query = postvars['query'][0]
+        postvars: HttpPayload = parse_qs(self.rfile.read(length).decode())
+        query_txt: str = postvars['query'][0]
         youtube = googleapiclient.discovery.build('youtube', 'v3',
                                                   developerKey=API_KEY)
         now = datetime.now().strftime(TIMESTAMP_FMT)
 
+        # collect videos matching query, first details per result
         update_quota_log(now, QUOTA_COST_YOUTUBE_SEARCH)
-        request = youtube.search().list(part='snippet', maxResults=25, q=query,
-                                        safeSearch='none', type='video')
-        response = request.execute()
-        to_save = {'text': query, 'retrieved_at': now, 'results': []}
-        ids_for_details = []
+        search_request = youtube.search().list(
+                part='snippet', maxResults=25, q=query_txt, safeSearch='none',
+                type='video')
+        response = search_request.execute()
+        results: list[Result] = []
+        ids_for_details: list[VideoId] = []
         for item in response['items']:
-            video_id = item['id']['videoId']
+            video_id: VideoId = item['id']['videoId']
             ids_for_details += [video_id]
-            snippet = item['snippet']
-            to_save['results'] += [{'id': video_id,
-                                    'title': snippet['title'],
-                                    'description': snippet['description'],
-                                    'published_at': snippet['publishedAt'],
-                                    }]
-            thumbnail_url = item['snippet']['thumbnails']['default']['url']
-            store_at = path_join(PATH_DIR_THUMBNAILS, f'{video_id}.jpg')
-            urlretrieve(thumbnail_url, store_at)
+            snippet: dict[str, str] = item['snippet']
+            result: Result = {'id': video_id,
+                              'title': snippet['title'],
+                              'description': snippet['description'],
+                              'published_at': snippet['publishedAt'],
+                              }
+            results += [result]
+            urlretrieve(item['snippet']['thumbnails']['default']['url'],
+                        path_join(PATH_DIR_THUMBNAILS, f'{video_id}.jpg'))
 
+        # collect more details for found videos
         update_quota_log(now, QUOTA_COST_YOUTUBE_DETAILS)
-        request = youtube.videos().list(id=','.join(ids_for_details),
-                                        part='content_details')
-        details = request.execute()
+        videos_request = youtube.videos().list(id=','.join(ids_for_details),
+                                               part='content_details')
+        details = videos_request.execute()
         for i, detailed in enumerate(details['items']):
-            item = to_save['results'][i]
-            assert item['id'] == detailed['id']
-            item['duration'] = detailed['contentDetails']['duration']
-            item['definition'] = detailed['contentDetails']['definition']
+            results_item: Result = results[i]
+            assert results_item['id'] == detailed['id']
+            content_details: dict[str, str] = detailed['contentDetails']
+            results_item['duration'] = content_details['duration']
+            results_item['definition'] = content_details['definition']
 
-        md5sum = md5(query.encode()).hexdigest()
-        path = path_join(PATH_DIR_REQUESTS_CACHE, f'{md5sum}.json')
+        # store query, its datetime, and its results at hash of query
+        md5sum: str = md5(query_txt.encode()).hexdigest()
+        path: PathStr = path_join(PATH_DIR_REQUESTS_CACHE, f'{md5sum}.json')
         with open(path, 'w', encoding='utf8') as f:
-            json_dump(to_save, f)
+            json_dump({'text': query_txt,
+                       'retrieved_at': now,
+                       'results': results},
+                      f)
         self._send_http(headers=[('Location', f'/query/{md5sum}')], code=302)
 
-    def do_GET(self):  # pylint:disable=invalid-name
+    def do_GET(self) -> None:  # pylint:disable=invalid-name
         """Map GET requests to handlers for various paths."""
         parsed_url = urlparse(self.path)
-        toks_url = parsed_url.path.split('/')
-        page = toks_url[1]
+        toks_url: list[str] = parsed_url.path.split('/')
+        page_name: str = toks_url[1]
 
-        if 'thumbnails' == page:
-            filename = toks_url[2]
+        # on /thumbnails requests, return directly with bytes of stored files
+        if 'thumbnails' == page_name:
+            filename: str = toks_url[2]
             with open(path_join(PATH_DIR_THUMBNAILS, filename), 'rb') as f:
-                img = f.read()
+                img: bytes = f.read()
             self._send_http(img, [('Content-type', 'image/jpg')])
             return
 
-        downloaded = {}
-        for e in [e for e in scandir(PATH_DIR_DOWNLOADS) if e.is_file]:
+        # otherwise populate downloaded
+        downloaded: dict[VideoId, PathStr] = {}
+        for e in [e for e in scandir(PATH_DIR_DOWNLOADS) if isfile(e.path)]:
+            before_ext: str
             before_ext, _ = splitext(e.path)
-            id_ = before_ext.split('[')[-1].split(']')[0]
+            id_: VideoId = before_ext.split('[')[-1].split(']')[0]
             downloaded[id_] = e.path
 
-        if 'dl' == page:
-            video_id = toks_url[2]
+        # on /dl, directly send video file if ID found, else add to to_download
+        if 'dl' == page_name:
+            video_id: VideoId = toks_url[2]
             if video_id in downloaded:
-                with open(downloaded[video_id], 'rb') as f:
-                    video = f.read()
+                with open(downloaded[video_id], 'rb') as video_file:
+                    video: bytes = video_file.read()
                 self._send_http(content=video)
                 return
             to_download.append(video_id)
-            params = parse_qs(parsed_url.query)
-            query_id = params.get('from_query', [''])[0]
-            redir_path = f'/query/{query_id}' if query_id else '/'
+            params: HttpPayload = parse_qs(parsed_url.query)
+            dl_query_id: str = params.get('from_query', [''])[0]
+            redir_path = f'/query/{dl_query_id}' if dl_query_id else '/'
             self._send_http(headers=[('Location', redir_path)], code=302)
             return
 
-        kwargs = {'quota_count': 0}
+        # otherwise, start template context with always-to-show quota count
+        quota_count = 0
+        tmpl_ctx: dict[str, int | str | Query | list[Query]] = {}
+        tmpl_ctx['quota_count'] = quota_count
         for amount in read_quota_log().values():
-            kwargs['quota_count'] += amount
-        if 'query' == page:
+            quota_count += amount
+        tmpl_name: str
+
+        # on /query, load cached query data, calc result attributes to show
+        if 'query' == page_name:
             tmpl_name = NAME_TEMPLATE_RESULTS
-            kwargs['youtube_prefix'] = YOUTUBE_URL_PREFIX
-            query_id = toks_url[2]
-            kwargs['query_id'] = query_id
-            path = path_join(PATH_DIR_REQUESTS_CACHE, f'{query_id}.json')
-            with open(path, 'r', encoding='utf8') as f:
-                query = json_load(f)
+            tmpl_ctx['youtube_prefix'] = YOUTUBE_URL_PREFIX
+            query_id: str = toks_url[2]
+            tmpl_ctx['query_id'] = query_id
+            with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{query_id}.json'),
+                      'r', encoding='utf8') as query_file:
+                query: dict = json_load(query_file)
             for result in query['results']:
                 result['available'] = result['id'] in downloaded
                 date_dur, time_dur = result['duration'].split('T')
@@ -227,7 +259,7 @@ class TaskHandler(BaseHTTPRequestHandler):
                                               ('D', 60*60*24)):
                     if dur_char in date_dur:
                         dur_str, date_dur = date_dur.split(dur_char)
-                        seconds += int(dur_str) * len_seconds
+                        seconds += int(dur_str) * int(len_seconds)
                 for dur_char, len_seconds in (('H', 60*60),
                                               ('M', 60),
                                               ('S', 1)):
@@ -241,32 +273,38 @@ class TaskHandler(BaseHTTPRequestHandler):
                         [f'0{str_}' if len(str_) == 1 else str_
                          for str_ in (hours_str, minutes_str, seconds_str)])
                 result['definition'] = result['definition'].upper()
-            kwargs['query'] = query
+            tmpl_ctx['query'] = query
+
+        # on / or anything else, prepare listing of all queries
         else:
             tmpl_name = NAME_TEMPLATE_INDEX
-            queries = []
+            queries: list[Query] = []
             for file in [f for f in scandir(PATH_DIR_REQUESTS_CACHE)
-                         if f.is_file]:
+                         if isfile(f.path)]:
                 id_, _ = splitext(basename(file.path))
-                with open(file.path, 'r', encoding='utf8') as f:
-                    query = json_load(f)
-                query['id'] = id_
-                for result in query['results']:
+                with open(file.path, 'r', encoding='utf8') as query_file:
+                    filed_query: Query = json_load(query_file)
+                filed_query['id'] = id_
+                assert isinstance(filed_query['results'], list)
+                for result in filed_query['results']:
                     result['available'] = result['id'] in downloaded
-                query['downloads'] = len([result for result in query['results']
-                                          if result['available']])
-                queries += [query]
+                filed_query['downloads'] = len(
+                        [result for result in query['results']
+                         if result['available']])
+                queries += [filed_query]
             queries.sort(key=lambda q: q['retrieved_at'], reverse=True)
-            kwargs['queries'] = queries
-        path = path_join(PATH_DIR_TEMPLATES, tmpl_name)
-        with open(path, 'r', encoding='utf8') as f:
-            tmpl = Template(f.read())
-        html = tmpl.render(**kwargs)
+            tmpl_ctx['queries'] = queries
+
+        # render html from tmpl_name and tmpl_ctx
+        with open(path_join(PATH_DIR_TEMPLATES, tmpl_name),
+                  'r', encoding='utf8'
+                  ) as templ_file:
+            tmpl = Template(str(templ_file.read()))
+        html: str = tmpl.render(**tmpl_ctx)
         self._send_http(bytes(html, 'utf8'))
 
 
 if __name__ == '__main__':
-    to_download = []
     ensure_expected_dirs_and_files()
     clean_unfinished_downloads()
     Thread(target=download_thread, daemon=False).start()