home · contact · privacy
Refactor and add single video data view, only allow downloading from here.
authorChristian Heller <c.heller@plomlompom.de>
Mon, 11 Nov 2024 07:37:30 +0000 (08:37 +0100)
committerChristian Heller <c.heller@plomlompom.de>
Mon, 11 Nov 2024 07:37:30 +0000 (08:37 +0100)
templates/index.tmpl
templates/results.tmpl
templates/videos.tmpl
ytplom.py

index 346b5434c6afe0445f9835e6db3fcfe46a373dcf..b34319d02e0afe7cb400190f4745c2a843e1c673 100644 (file)
@@ -1,6 +1,7 @@
 <html>
 <meta charset="UTF-8">
 <body>
+<p>queries · <a href="/videos">videos</a></p>
 <p>quota: {{quota_count}}/100000</p>
 <form action="" method="POST" />
 <input name="query" />
index c6cbec74f9804dabfb034ebc40117ac6067cf97d..6b7ac2bdf9a3235abbd108e89eba679065a97fa9 100644 (file)
@@ -1,21 +1,21 @@
 <html>
 <meta charset="UTF-8">
 <body>
-<p>quota: {{quota_count}}/100000 · <a href="/">index</a><br />
-query: {{query["text"]}}</p>
+<p><a href="/">queries</a> · <a href="/videos">videos</a></p>
+<p>query: {{query_text}}</p>
 <table>
-{% for result in query["results"] %}
+{% for video in videos %}
 <tr>
 <td>
-<a href="{{youtube_prefix}}{{result.id}}"><img src="/thumbnails/{{result.id}}.jpg" /></a>
+<a href="/video_about/{{video.id}}"><img src="/thumbnails/{{video.id}}.jpg" /></a>
 </td>
 <td>
-{{result.definition}}<br />
-{{result.duration}}<br />
-<a href="/dl/{{result.id}}?from_query={{query_id}}">{% if result.available %}[loaded]{% else %}[LOAD]{% endif %}</a></td>
+{{video.definition}}<br />
+{{video.duration}}<br />
+{% if video.available %}<a href="/dl/{{video.id}}">[loaded]{% endif %}</a></td>
 </td>
 <td>
-<b>{{result.title}}</b> · {{result.description}}
+<b><a href="/video_about/{{video.id}}">{{video.title}}</a></b> · {{video.description}}
 </td>
 </tr>
 {% endfor %}
index f1cac55ffba45d02882c244f1ff4523fa6764b87..757e566deed3694365dc3ec8e415ff4a3a29fe61 100644 (file)
@@ -1,10 +1,11 @@
 <html>
 <meta charset="UTF-8">
 <body>
+<p><a href="/">queries</a> · videos</p>
 <p>downloaded videos:</p>
 <ul>
 {% for video_id, path in videos %}
-<li><a href="/dl/{{video_id}}">{{ path }}</a>
+<li><a href="/video_about/{{video_id}}">{{ path }}</a>
 {% endfor %}
 </ul>
 </body>
index b6d45defce5d62e30e36be8d248eb47c288deaa0..320872c655258ca93e7ded25f896b7d1524d634e 100755 (executable)
--- a/ytplom.py
+++ b/ytplom.py
@@ -21,13 +21,19 @@ QuotaCost = NewType('QuotaCost', int)
 VideoId = NewType('VideoId', str)
 FilePathStr = NewType('FilePathStr', str)
 QueryId = NewType('QueryId', str)
-Result: TypeAlias = dict[str, str | bool]
-QueryData: TypeAlias = dict[str, str | int | list[Result]]
-QuotaLog: TypeAlias = dict[DatetimeStr, QuotaCost]
+QueryText = NewType('QueryText', str)
+AmountDownloads = NewType('AmountDownloads', int)
+Result: TypeAlias = dict[str, str]
 Header: TypeAlias = tuple[str, str]
+VideoData: TypeAlias = dict[str, str | bool]
+QueryData: TypeAlias = dict[str, QueryId | QueryText | DatetimeStr
+                            | AmountDownloads | list[Result]]
+QuotaLog: TypeAlias = dict[DatetimeStr, QuotaCost]
 DownloadsDB = dict[VideoId, FilePathStr]
-TemplateContext = dict[str, int | str | QueryData | list[QueryData] |
-                       list[tuple[VideoId, FilePathStr]]]
+TemplateContext = dict[str, str | QuotaCost | QueryData | VideoData |
+                       list[QueryData] | list[VideoData] |
+                       list[tuple[VideoId, FilePathStr]] |
+                       list[tuple[QueryId, QueryText]]]
 
 API_KEY = environ.get('GOOGLE_API_KEY')
 HTTP_PORT = 8083
@@ -41,6 +47,7 @@ NAME_DIR_TEMP = FilePathStr('temp')
 NAME_TEMPLATE_INDEX = FilePathStr('index.tmpl')
 NAME_TEMPLATE_RESULTS = FilePathStr('results.tmpl')
 NAME_TEMPLATE_VIDEOS = FilePathStr('videos.tmpl')
+NAME_TEMPLATE_VIDEO_ABOUT = FilePathStr('video_about.tmpl')
 
 PATH_DIR_TEMP = FilePathStr(path_join(PATH_DIR_DOWNLOADS, NAME_DIR_TEMP))
 EXPECTED_DIRS = [PATH_DIR_DOWNLOADS, PATH_DIR_TEMP, PATH_DIR_THUMBNAILS,
@@ -152,14 +159,9 @@ class TaskHandler(BaseHTTPRequestHandler):
     def do_POST(self) -> None:  # pylint:disable=invalid-name
         """Send requests to YouTube API and cache them."""
 
-        def store_at_filename_hashing_query(query_data: QueryData) -> QueryId:
-            md5sum = md5(query_txt.encode()).hexdigest()
-            with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{md5sum}.json'),
-                      'w', encoding='utf8') as f:
-                json_dump(query_data, f)
-            return QueryId(md5sum)
-
-        def collect_results(now: DatetimeStr, query_txt: str) -> list[Result]:
+        def collect_results(now: DatetimeStr,
+                            query_txt: QueryText
+                            ) -> list[Result]:
             youtube = googleapiclient.discovery.build('youtube', 'v3',
                                                       developerKey=API_KEY)
             update_quota_log(now, QUOTA_COST_YOUTUBE_SEARCH)
@@ -196,11 +198,15 @@ class TaskHandler(BaseHTTPRequestHandler):
 
         body_length = int(self.headers['content-length'])
         postvars = parse_qs(self.rfile.read(body_length).decode())
-        query_txt = postvars['query'][0]
+        query_txt = QueryText(postvars['query'][0])
         now = DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT))
         results = collect_results(now, query_txt)
-        md5sum = store_at_filename_hashing_query(
-                {'text': query_txt, 'retrieved_at': now, 'results': results})
+        md5sum = md5(str(query_txt).encode()).hexdigest()
+        with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{md5sum}.json'),
+                  'w', encoding='utf8') as f:
+            json_dump({'text': query_txt,
+                       'retrieved_at': now,
+                       'results': results}, f)
         self._send_http(headers=[('Location', f'/query/{md5sum}')], code=302)
 
     def do_GET(self) -> None:  # pylint:disable=invalid-name
@@ -211,10 +217,11 @@ class TaskHandler(BaseHTTPRequestHandler):
         if 'thumbnails' == page_name:
             self._send_thumbnail(FilePathStr(toks_url[2]))
         elif 'dl' == page_name:
-            self._send_or_download_video(VideoId(toks_url[2]),
-                                         parse_qs(url.query))
+            self._send_or_download_video(VideoId(toks_url[2]))
         elif 'videos' == page_name:
             self._send_videos_index()
+        elif 'video_about' == page_name:
+            self._send_video_about(VideoId(toks_url[2]))
         elif 'query' == page_name:
             self._send_query_page(QueryId(toks_url[2]))
         else:  # e.g. for /
@@ -224,7 +231,6 @@ class TaskHandler(BaseHTTPRequestHandler):
                                 tmpl_name: FilePathStr,
                                 tmpl_ctx: TemplateContext
                                 ) -> None:
-        """Send HTML rendered from tmpl_name and tmpl_ctx"""
         with open(path_join(PATH_DIR_TEMPLATES, tmpl_name),
                   'r', encoding='utf8'
                   ) as templ_file:
@@ -232,20 +238,7 @@ class TaskHandler(BaseHTTPRequestHandler):
         html = tmpl.render(**tmpl_ctx)
         self._send_http(bytes(html, 'utf8'))
 
-    @staticmethod
-    def _make_template_context(with_quota_count: bool = True
-                               ) -> TemplateContext:
-        """Create template context dictionary with current quota count."""
-        tmpl_ctx: TemplateContext = {}
-        if with_quota_count:
-            quota_count = 0
-            tmpl_ctx['quota_count'] = quota_count
-            for amount in read_quota_log().values():
-                quota_count += amount
-        return tmpl_ctx
-
     def _make_downloads_db(self) -> DownloadsDB:
-        """Create dictionary of downloads mapping video IDs to file paths."""
         downloads_db = {}
         for e in [e for e in scandir(PATH_DIR_DOWNLOADS) if isfile(e.path)]:
             before_ext = splitext(e.path)[0]
@@ -253,30 +246,27 @@ class TaskHandler(BaseHTTPRequestHandler):
             downloads_db[id_] = FilePathStr(e.path)
         return downloads_db
 
-    def _send_thumbnail(self, filename: FilePathStr) -> None:
-        """Send thumbnail file."""
-        with open(path_join(PATH_DIR_THUMBNAILS, filename), 'rb') as f:
-            img = f.read()
-        self._send_http(img, [('Content-type', 'image/jpg')])
-
-    def _send_or_download_video(self,
-                                video_id: VideoId,
-                                params: dict[str, list[str]]
-                                ) -> None:
-        """If in storage, send video of video_id, otherwise download."""
-        downloads_db = self._make_downloads_db()
-        if video_id in downloads_db:
-            with open(downloads_db[video_id], 'rb') as video_file:
-                video = video_file.read()
-            self._send_http(content=video)
-            return
-        to_download.append(video_id)
-        dl_query_id = params.get('from_query', [''])[0]
-        redir_path = f'/query/{dl_query_id}' if dl_query_id else '/'
-        self._send_http(headers=[('Location', redir_path)], code=302)
-
-    def _send_query_page(self, query_id: QueryId) -> None:
-        """Load cached query+result data, calculate further attribute, send."""
+    def _harvest_queries(self) -> list[tuple[QueryId, list[Result],
+                                             QueryText, DatetimeStr]]:
+        queries_data = []
+        for file in [f for f in scandir(PATH_DIR_REQUESTS_CACHE)
+                     if isfile(f.path)]:
+            with open(file.path, 'r', encoding='utf8') as query_file:
+                filed_query: QueryData = json_load(query_file)
+            assert isinstance(filed_query['results'], list)
+            assert type(filed_query['text']) is QueryText
+            assert type(filed_query['retrieved_at']) is DatetimeStr
+            id_ = QueryId(splitext(basename(file.path))[0])
+            results_list = filed_query['results']
+            query_text = filed_query['text']
+            retrieved_at = filed_query['retrieved_at']
+            queries_data += [(id_, results_list, query_text, retrieved_at)]
+        return queries_data
+
+    def _result_to_video_data(self,
+                              result: Result,
+                              downloads_db: DownloadsDB
+                              ) -> VideoData:
 
         def reformat_duration(duration_str: str):
             date_dur, time_dur = duration_str.split('T')
@@ -299,52 +289,78 @@ class TaskHandler(BaseHTTPRequestHandler):
             hours_str = str(seconds // (60 * 60))
             return ':'.join([f'0{s}' if len(s) == 1 else s
                              for s in (hours_str, minutes_str, seconds_str)])
+        assert isinstance(result['duration'], str)
+        return {
+            'id': result['id'],
+            'available': result['id'] in downloads_db,
+            'duration': reformat_duration(result['duration']),
+            'title': result['title'],
+            'definition': result['definition']
+        }
+
+    def _send_thumbnail(self, filename: FilePathStr) -> None:
+        with open(path_join(PATH_DIR_THUMBNAILS, filename), 'rb') as f:
+            img = f.read()
+        self._send_http(img, [('Content-type', 'image/jpg')])
+
+    def _send_or_download_video(self, video_id: VideoId) -> None:
+        downloads_db = self._make_downloads_db()
+        if video_id in downloads_db:
+            with open(downloads_db[video_id], 'rb') as video_file:
+                video = video_file.read()
+            self._send_http(content=video)
+            return
+        to_download.append(video_id)
+        self._send_http(headers=[('Location', f'/video_about/{video_id}')],
+                        code=302)
 
+    def _send_query_page(self, query_id: QueryId) -> None:
         downloads_db = self._make_downloads_db()
-        tmpl_ctx = self._make_template_context()
-        tmpl_ctx['youtube_prefix'] = YOUTUBE_URL_PREFIX
-        tmpl_ctx['query_id'] = query_id
         with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{query_id}.json'),
                   'r', encoding='utf8') as query_file:
             query = json_load(query_file)
-        for result in query['results']:
-            result['available'] = result['id'] in downloads_db
-            result['duration'] = reformat_duration(result['duration'])
-            result['definition'] = result['definition'].upper()
-        tmpl_ctx['query'] = query
-        self._send_rendered_template(NAME_TEMPLATE_RESULTS, tmpl_ctx)
+        self._send_rendered_template(
+                NAME_TEMPLATE_RESULTS,
+                {'query': query['text'],
+                 'videos': [self._result_to_video_data(result, downloads_db)
+                            for result in query['results']]})
 
     def _send_queries_index_and_search(self) -> None:
-        """Send listing of cached queries, search form for doing new ones."""
         downloads_db = self._make_downloads_db()
-        tmpl_ctx = self._make_template_context()
-        queries: list[QueryData] = []
-        for file in [f for f in scandir(PATH_DIR_REQUESTS_CACHE)
-                     if isfile(f.path)]:
-            id_ = splitext(basename(file.path))[0]
-            with open(file.path, 'r', encoding='utf8') as query_file:
-                filed_query: QueryData = json_load(query_file)
-            filed_query['id'] = id_
-            assert isinstance(filed_query['results'], list)
-            for result in filed_query['results']:
-                result['available'] = result['id'] in downloads_db
-            filed_query['downloads'] = len(
-                    [result for result in filed_query['results']
-                     if result['available']])
-            queries += [filed_query]
-        queries.sort(key=lambda q: q['retrieved_at'], reverse=True)
-        tmpl_ctx['queries'] = queries
-        self._send_rendered_template(NAME_TEMPLATE_INDEX, tmpl_ctx)
+        queries_data: list[QueryData] = []
+        for id_, results, query_text, timestamp in self._harvest_queries():
+            queries_data += [
+                    {'id': id_, 'text': query_text, 'retrieved_at': timestamp,
+                     'downloads': AmountDownloads(len([
+                         r for r in results if r['id'] in downloads_db]))}]
+        queries_data.sort(key=lambda q: q['retrieved_at'], reverse=True)
+        self._send_rendered_template(NAME_TEMPLATE_INDEX, {'queries':
+                                                           queries_data})
+
+    def _send_video_about(self, video_id: VideoId) -> None:
+        linked_queries: list[tuple[QueryId, QueryText]] = []
+        first_result: Optional[Result] = None
+        for id_, results, query_text, _ in self._harvest_queries():
+            for result in results:
+                if video_id == result['id']:
+                    linked_queries += [(id_, query_text)]
+                    first_result = first_result or result
+        if not first_result:
+            self._send_http(b'nothing found', code=404)
+            return
+        self._send_rendered_template(
+                NAME_TEMPLATE_VIDEO_ABOUT,
+                {'video_id': video_id,
+                 'youtube_prefix': YOUTUBE_URL_PREFIX,
+                 'queries': linked_queries,
+                 'video_data': self._result_to_video_data(
+                     first_result, self._make_downloads_db())})
 
     def _send_videos_index(self) -> None:
-        """Send listing of downloaded videos, linked to their /dl pages."""
-        downloads_db = self._make_downloads_db()
-        tmpl_ctx = self._make_template_context(with_quota_count=False)
         videos = [(id_, FilePathStr(basename(path)))
-                  for id_, path in downloads_db.items()]
+                  for id_, path in self._make_downloads_db().items()]
         videos.sort(key=lambda t: t[1])
-        tmpl_ctx['videos'] = videos
-        self._send_rendered_template(NAME_TEMPLATE_VIDEOS, tmpl_ctx)
+        self._send_rendered_template(NAME_TEMPLATE_VIDEOS, {'videos': videos})
 
 
 if __name__ == '__main__':