home · contact · privacy
Refactor YoutubeQuery creation.
authorChristian Heller <c.heller@plomlompom.de>
Sat, 30 Nov 2024 18:40:55 +0000 (19:40 +0100)
committerChristian Heller <c.heller@plomlompom.de>
Sat, 30 Nov 2024 18:40:55 +0000 (19:40 +0100)
src/ytplom/http.py
src/ytplom/misc.py

index b2366d56376936b3dae6767b4ee7e7fc80d4a770..cd66699750200e8d71a16a49851f47e13786d207 100644 (file)
@@ -1,5 +1,4 @@
 """Collect directly HTTP-related elements."""
-from datetime import datetime
 from http.server import HTTPServer, BaseHTTPRequestHandler
 from json import dumps as json_dumps
 from pathlib import Path
@@ -10,17 +9,16 @@ from urllib.request import urlretrieve
 from urllib.error import HTTPError
 from jinja2 import (  # type: ignore
         Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader)
-import googleapiclient.discovery  # type: ignore
 from ytplom.misc import (
-        B64Str, DatetimeStr, FilesWithIndex, FlagName, NotFoundException,
-        PlayerUpdateId, QueryId, QueryText, QuotaCost, UrlStr, YoutubeId,
-        FILE_FLAGS, PATH_APP_DATA, PATH_CACHE, TIMESTAMP_FMT,
-        YOUTUBE_URL_PREFIX,
+        B64Str, FilesWithIndex, FlagName, NotFoundException, PlayerUpdateId,
+        QueryId, QueryText, QuotaCost, UrlStr, YoutubeId,
+        FILE_FLAGS, PATH_APP_DATA, PATH_THUMBNAILS, YOUTUBE_URL_PREFIX,
         ensure_expected_dirs,
         Config, DbConnection, DownloadsManager, Player, QuotaLog, VideoFile,
         YoutubeQuery, YoutubeVideo
 )
 
+# type definitions for mypy
 PageNames: TypeAlias = dict[str, Path]
 ParamsStr = NewType('ParamsStr', str)
 TemplateContext: TypeAlias = dict[
@@ -33,11 +31,8 @@ TemplateContext: TypeAlias = dict[
 ]
 
 # API expectations
-PATH_THUMBNAILS = PATH_CACHE.joinpath('thumbnails')
 THUMBNAIL_URL_PREFIX = UrlStr('https://i.ytimg.com/vi/')
 THUMBNAIL_URL_SUFFIX = UrlStr('/default.jpg')
-QUOTA_COST_YOUTUBE_SEARCH = QuotaCost(100)
-QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1)
 
 # template paths
 PATH_TEMPLATES = PATH_APP_DATA.joinpath('templates')
@@ -158,53 +153,8 @@ class _TaskHandler(BaseHTTPRequestHandler):
 
     def _receive_yt_query(self, query_txt: QueryText) -> None:
         conn = DbConnection()
-
-        def collect_results(query_txt: QueryText) -> list[YoutubeVideo]:
-            ensure_expected_dirs([PATH_THUMBNAILS])
-            youtube = googleapiclient.discovery.build(
-                    'youtube', 'v3', developerKey=self.server.config.api_key)
-            QuotaLog.update(conn, QUOTA_COST_YOUTUBE_SEARCH)
-            search_request = youtube.search().list(
-                    q=query_txt,
-                    part='snippet',
-                    maxResults=25,
-                    safeSearch='none',
-                    type='video')
-            results: list[YoutubeVideo] = []
-            ids_to_detail: list[YoutubeId] = []
-            for item in search_request.execute()['items']:
-                video_id: YoutubeId = item['id']['videoId']
-                ids_to_detail += [video_id]
-                snippet = item['snippet']
-                urlretrieve(snippet['thumbnails']['default']['url'],
-                            PATH_THUMBNAILS.joinpath(f'{video_id}.jpg'))
-                results += [YoutubeVideo(id_=video_id,
-                                         title=snippet['title'],
-                                         description=snippet['description'],
-                                         published_at=snippet['publishedAt'])]
-            QuotaLog.update(conn, QUOTA_COST_YOUTUBE_DETAILS)
-            ids_for_details = ','.join([r.id_ for r in results])
-            videos_request = youtube.videos().list(id=ids_for_details,
-                                                   part='content_details')
-            unfinished_streams: list[YoutubeId] = []
-            for i, detailed in enumerate(videos_request.execute()['items']):
-                result = results[i]
-                assert result.id_ == detailed['id']
-                content_details: dict[str, str] = detailed['contentDetails']
-                if 'P0D' == content_details['duration']:
-                    unfinished_streams += [result.id_]
-                    continue
-                result.set_duration_from_yt_string(content_details['duration'])
-                result.definition = content_details['definition'].upper()
-            return [r for r in results if r.id_ not in unfinished_streams]
-
-        query_data = YoutubeQuery(
-                None, query_txt,
-                DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)))
-        query_data.save(conn)
-        for result in collect_results(query_txt):
-            result.save(conn)
-            result.save_to_query(conn, query_data.id_)
+        query_data = YoutubeQuery.new_by_request_saved(
+                conn, self.server.config, query_txt)
         conn.commit_close()
         self._redirect(Path('/')
                        .joinpath(PAGE_NAMES['yt_query'])
index ca478a196d5c1aff90e1fab9c1d0891457d5fe8b..012c3f5985f628c8bd2820f59b04947dd0ab25f8 100644 (file)
@@ -8,12 +8,14 @@ from random import shuffle
 from time import time, sleep
 from datetime import datetime, timedelta
 from json import loads as json_loads
+from urllib.request import urlretrieve
 from uuid import uuid4
 from pathlib import Path
 from sqlite3 import connect as sql_connect, Cursor, Row
 from threading import Thread
 from queue import Queue
 # non-included libs
+import googleapiclient.discovery  # type: ignore
 from mpv import MPV  # type: ignore
 from yt_dlp import YoutubeDL  # type: ignore
 
@@ -48,6 +50,7 @@ PATH_CACHE = Path.home().joinpath('.cache/ytplom')
 PATH_DOWNLOADS = Path.home().joinpath('ytplom_downloads')
 PATH_DB = PATH_APP_DATA.joinpath('db.sql')
 PATH_TEMP = PATH_CACHE.joinpath('temp')
+PATH_THUMBNAILS = PATH_CACHE.joinpath('thumbnails')
 PATH_CONFFILE = Path.home().joinpath('.config/ytplom/config.json')
 
 # yt_dlp config
@@ -59,6 +62,8 @@ YT_DL_PARAMS = {'paths': {'home': str(PATH_DOWNLOADS),
 
 # Youtube API expectations
 YOUTUBE_URL_PREFIX = UrlStr('https://www.youtube.com/watch?v=')
+QUOTA_COST_YOUTUBE_SEARCH = QuotaCost(100)
+QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1)
 
 # database stuff
 EXPECTED_DB_VERSION = 1
@@ -220,6 +225,57 @@ class YoutubeQuery(DbData):
         self.text = QueryText(text)
         self.retrieved_at = retrieved_at
 
+    @classmethod
+    def new_by_request_saved(cls,
+                             conn: DbConnection,
+                             config: Config,
+                             query_txt: QueryText
+                             ) -> Self:
+        """Query YT API and save results."""
+        ensure_expected_dirs([PATH_THUMBNAILS])
+        youtube = googleapiclient.discovery.build(
+                'youtube', 'v3', developerKey=config.api_key)
+        QuotaLog.update(conn, QUOTA_COST_YOUTUBE_SEARCH)
+        search_request = youtube.search().list(
+                q=query_txt,
+                part='snippet',
+                maxResults=25,
+                safeSearch='none',
+                type='video')
+        results: list[YoutubeVideo] = []
+        ids_to_detail: list[YoutubeId] = []
+        for item in search_request.execute()['items']:
+            video_id: YoutubeId = item['id']['videoId']
+            ids_to_detail += [video_id]
+            snippet = item['snippet']
+            urlretrieve(snippet['thumbnails']['default']['url'],
+                        PATH_THUMBNAILS.joinpath(f'{video_id}.jpg'))
+            results += [YoutubeVideo(id_=video_id,
+                                     title=snippet['title'],
+                                     description=snippet['description'],
+                                     published_at=snippet['publishedAt'])]
+        QuotaLog.update(conn, QUOTA_COST_YOUTUBE_DETAILS)
+        ids_for_details = ','.join([r.id_ for r in results])
+        videos_request = youtube.videos().list(id=ids_for_details,
+                                               part='content_details')
+        unfinished_streams: list[YoutubeId] = []
+        for i, detailed in enumerate(videos_request.execute()['items']):
+            result = results[i]
+            assert result.id_ == detailed['id']
+            content_details: dict[str, str] = detailed['contentDetails']
+            if 'P0D' == content_details['duration']:
+                unfinished_streams += [result.id_]
+                continue
+            result.set_duration_from_yt_string(content_details['duration'])
+            result.definition = content_details['definition'].upper()
+        query = cls(None, query_txt,
+                    DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)))
+        query.save(conn)
+        for result in [r for r in results if r.id_ not in unfinished_streams]:
+            result.save(conn)
+            result.save_to_query(conn, query.id_)
+        return query
+
     @classmethod
     def get_all_for_video(cls,
                           conn: DbConnection,