from yt_dlp import YoutubeDL # type: ignore
import googleapiclient.discovery # type: ignore
-Query: TypeAlias = dict[str, str | int | list[dict[str, str]]]
-Result: TypeAlias = dict[str, str]
-QuotaLog: TypeAlias = dict[str, int]
-Headers: TypeAlias = list[tuple[str, str]]
-HttpPayload: TypeAlias = dict[str, list[str]]
+DatetimeStr: TypeAlias = str
+QuotaCost: TypeAlias = int
VideoId: TypeAlias = str
-PathStr: TypeAlias = str
+FilePathStr: TypeAlias = str
+QueryId = str
+Result: TypeAlias = dict[str, str | bool]
+QueryData: TypeAlias = dict[QueryId, str | int | list[Result]]
+QuotaLog: TypeAlias = dict[DatetimeStr, QuotaCost]
+Header: TypeAlias = tuple[str, str]
+DownloadsDB = dict[VideoId, FilePathStr]
+TemplateContext = dict[str, int | str | QueryData | list[QueryData]]
API_KEY = environ.get('GOOGLE_API_KEY')
HTTP_PORT = 8083
NAME_TEMPLATE_INDEX = 'index.tmpl'
NAME_TEMPLATE_RESULTS = 'results.tmpl'
-PATH_DIR_TEMP: PathStr = path_join(PATH_DIR_DOWNLOADS, NAME_DIR_TEMP)
+PATH_DIR_TEMP: FilePathStr = path_join(PATH_DIR_DOWNLOADS, NAME_DIR_TEMP)
EXPECTED_DIRS = [PATH_DIR_DOWNLOADS, PATH_DIR_TEMP, PATH_DIR_THUMBNAILS,
PATH_DIR_REQUESTS_CACHE]
-PATH_TEMPLATE_INDEX: PathStr = path_join(PATH_DIR_TEMPLATES,
- NAME_TEMPLATE_INDEX)
+PATH_TEMPLATE_INDEX: FilePathStr = path_join(PATH_DIR_TEMPLATES,
+ NAME_TEMPLATE_INDEX)
TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
YOUTUBE_URL_PREFIX = 'https://www.youtube.com/watch?v='
YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
'temp': NAME_DIR_TEMP},
'format': YT_DOWNLOAD_FORMAT}
-QUOTA_COST_YOUTUBE_SEARCH: int = 100
-QUOTA_COST_YOUTUBE_DETAILS: int = 1
+QUOTA_COST_YOUTUBE_SEARCH: QuotaCost = 100
+QUOTA_COST_YOUTUBE_DETAILS: QuotaCost = 1
to_download: list[VideoId] = []
return ret
-def update_quota_log(now: str, cost: int) -> None:
+def update_quota_log(now: DatetimeStr, cost: QuotaCost) -> None:
"""Update quota log from read_quota_log, add cost to now's row."""
quota_log = read_quota_log()
quota_log[now] = quota_log.get(now, 0) + cost
def _send_http(self,
content: bytes = b'',
- headers: Optional[Headers] = None,
+ headers: Optional[list[Header]] = None,
code: int = 200
) -> None:
headers = headers if headers else []
def do_POST(self) -> None: # pylint:disable=invalid-name
"""Send requests to YouTube API and cache them."""
- length = int(self.headers['content-length'])
- postvars: HttpPayload = parse_qs(self.rfile.read(length).decode())
- query_txt: str = postvars['query'][0]
- youtube = googleapiclient.discovery.build('youtube', 'v3',
- developerKey=API_KEY)
- now = datetime.now().strftime(TIMESTAMP_FMT)
- # collect videos matching query, first details per result
- update_quota_log(now, QUOTA_COST_YOUTUBE_SEARCH)
- search_request = youtube.search().list(
- part='snippet', maxResults=25, q=query_txt, safeSearch='none',
- type='video')
- response = search_request.execute()
- results: list[Result] = []
- ids_for_details: list[VideoId] = []
- for item in response['items']:
- video_id: VideoId = item['id']['videoId']
- ids_for_details += [video_id]
- snippet: dict[str, str] = item['snippet']
- result: Result = {'id': video_id,
- 'title': snippet['title'],
- 'description': snippet['description'],
- 'published_at': snippet['publishedAt'],
- }
- results += [result]
- urlretrieve(item['snippet']['thumbnails']['default']['url'],
- path_join(PATH_DIR_THUMBNAILS, f'{video_id}.jpg'))
-
- # collect more details for found videos
- update_quota_log(now, QUOTA_COST_YOUTUBE_DETAILS)
- videos_request = youtube.videos().list(id=','.join(ids_for_details),
- part='content_details')
- details = videos_request.execute()
- for i, detailed in enumerate(details['items']):
- results_item: Result = results[i]
- assert results_item['id'] == detailed['id']
- content_details: dict[str, str] = detailed['contentDetails']
- results_item['duration'] = content_details['duration']
- results_item['definition'] = content_details['definition']
-
- # store query, its datetime, and its results at hash of query
- md5sum: str = md5(query_txt.encode()).hexdigest()
- path: PathStr = path_join(PATH_DIR_REQUESTS_CACHE, f'{md5sum}.json')
- with open(path, 'w', encoding='utf8') as f:
- json_dump({'text': query_txt,
- 'retrieved_at': now,
- 'results': results},
- f)
+ def store_at_filename_hashing_query(query_data: QueryData) -> QueryId:
+ md5sum = md5(query_txt.encode()).hexdigest()
+ with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{md5sum}.json'),
+ 'w', encoding='utf8') as f:
+ json_dump(query_data, f)
+ return md5sum
+
+ def collect_results(now: DatetimeStr, query_txt: str) -> list[Result]:
+ youtube = googleapiclient.discovery.build('youtube', 'v3',
+ developerKey=API_KEY)
+ update_quota_log(now, QUOTA_COST_YOUTUBE_SEARCH)
+ search_request = youtube.search().list(
+ q=query_txt,
+ part='snippet',
+ maxResults=25,
+ safeSearch='none',
+ type='video')
+ results = []
+ ids_to_detail: list[VideoId] = []
+ for item in search_request.execute()['items']:
+ video_id: VideoId = item['id']['videoId']
+ ids_to_detail += [video_id]
+ snippet: dict[str, str] = item['snippet']
+ result: Result = {'id': video_id,
+ 'title': snippet['title'],
+ 'description': snippet['description'],
+ 'published_at': snippet['publishedAt'],
+ }
+ results += [result]
+ urlretrieve(item['snippet']['thumbnails']['default']['url'],
+ path_join(PATH_DIR_THUMBNAILS, f'{video_id}.jpg'))
+ update_quota_log(now, QUOTA_COST_YOUTUBE_DETAILS)
+ videos_request = youtube.videos().list(id=','.join(ids_to_detail),
+ part='content_details')
+ for i, detailed in enumerate(videos_request.execute()['items']):
+ results_item: Result = results[i]
+ assert results_item['id'] == detailed['id']
+ content_details: dict[str, str] = detailed['contentDetails']
+ results_item['duration'] = content_details['duration']
+ results_item['definition'] = content_details['definition']
+ return results
+
+ body_length = int(self.headers['content-length'])
+ postvars = parse_qs(self.rfile.read(body_length).decode())
+ query_txt = postvars['query'][0]
+ now = datetime.now().strftime(TIMESTAMP_FMT)
+ results = collect_results(now, query_txt)
+ md5sum = store_at_filename_hashing_query(
+ {'text': query_txt, 'retrieved_at': now, 'results': results})
self._send_http(headers=[('Location', f'/query/{md5sum}')], code=302)
def do_GET(self) -> None: # pylint:disable=invalid-name
"""Map GET requests to handlers for various paths."""
- parsed_url = urlparse(self.path)
- toks_url: list[str] = parsed_url.path.split('/')
- page_name: str = toks_url[1]
-
- # on /thumbnails requests, return directly with bytes of stored files
+ url = urlparse(self.path)
+ toks_url: list[str] = url.path.split('/')
+ page_name = toks_url[1]
if 'thumbnails' == page_name:
- filename: str = toks_url[2]
- with open(path_join(PATH_DIR_THUMBNAILS, filename), 'rb') as f:
- img: bytes = f.read()
- self._send_http(img, [('Content-type', 'image/jpg')])
- return
-
- # otherwise populate downloaded
- downloaded: dict[VideoId, PathStr] = {}
- for e in [e for e in scandir(PATH_DIR_DOWNLOADS) if isfile(e.path)]:
- before_ext: str
- before_ext, _ = splitext(e.path)
- id_: VideoId = before_ext.split('[')[-1].split(']')[0]
- downloaded[id_] = e.path
-
- # on /dl, directly send video file if ID found, else add to to_download
+ self._send_thumbnail(toks_url[2])
if 'dl' == page_name:
- video_id: VideoId = toks_url[2]
- if video_id in downloaded:
- with open(downloaded[video_id], 'rb') as video_file:
- video: bytes = video_file.read()
- self._send_http(content=video)
- return
- to_download.append(video_id)
- params: HttpPayload = parse_qs(parsed_url.query)
- dl_query_id: str = params.get('from_query', [''])[0]
- redir_path = f'/query/{dl_query_id}' if dl_query_id else '/'
- self._send_http(headers=[('Location', redir_path)], code=302)
- return
-
- # otherwise, start template context with always-to-show quota count
- quota_count = 0
- tmpl_ctx: dict[str, int | str | Query | list[Query]] = {}
- tmpl_ctx['quota_count'] = quota_count
- for amount in read_quota_log().values():
- quota_count += amount
- tmpl_name: str
-
- # on /query, load cached query data, calc result attributes to show
+ self._send_or_download_video(toks_url[2], parse_qs(url.query))
if 'query' == page_name:
- tmpl_name = NAME_TEMPLATE_RESULTS
- tmpl_ctx['youtube_prefix'] = YOUTUBE_URL_PREFIX
- query_id: str = toks_url[2]
- tmpl_ctx['query_id'] = query_id
- with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{query_id}.json'),
- 'r', encoding='utf8') as query_file:
- query: dict = json_load(query_file)
- for result in query['results']:
- result['available'] = result['id'] in downloaded
- date_dur, time_dur = result['duration'].split('T')
- seconds = 0
- date_dur = date_dur[1:]
- for dur_char, len_seconds in (('Y', 60*60*24*365.25),
- ('M', 60*60*24*30),
- ('D', 60*60*24)):
- if dur_char in date_dur:
- dur_str, date_dur = date_dur.split(dur_char)
- seconds += int(dur_str) * int(len_seconds)
- for dur_char, len_seconds in (('H', 60*60),
- ('M', 60),
- ('S', 1)):
- if dur_char in time_dur:
- dur_str, time_dur = time_dur.split(dur_char)
- seconds += int(dur_str) * len_seconds
- seconds_str = str(seconds % 60)
- minutes_str = str(seconds // 60)
- hours_str = str(seconds // (60 * 60))
- result['duration'] = ':'.join(
- [f'0{str_}' if len(str_) == 1 else str_
- for str_ in (hours_str, minutes_str, seconds_str)])
- result['definition'] = result['definition'].upper()
- tmpl_ctx['query'] = query
-
- # on / or anything else, prepare listing of all queries
- else:
- tmpl_name = NAME_TEMPLATE_INDEX
- queries: list[Query] = []
- for file in [f for f in scandir(PATH_DIR_REQUESTS_CACHE)
- if isfile(f.path)]:
- id_, _ = splitext(basename(file.path))
- with open(file.path, 'r', encoding='utf8') as query_file:
- filed_query: Query = json_load(query_file)
- filed_query['id'] = id_
- assert isinstance(filed_query['results'], list)
- for result in filed_query['results']:
- result['available'] = result['id'] in downloaded
- filed_query['downloads'] = len(
- [result for result in query['results']
- if result['available']])
- queries += [filed_query]
- queries.sort(key=lambda q: q['retrieved_at'], reverse=True)
- tmpl_ctx['queries'] = queries
-
- # render html from tmpl_name and tmpl_ctx
+ self._send_query_page(toks_url[2])
+ else: # e.g. for /
+ self._send_queries_index_and_search()
+
+ def _send_rendered_template(self,
+ tmpl_name: FilePathStr,
+ tmpl_ctx: TemplateContext
+ ) -> None:
+ """Send HTML rendered from tmpl_name and tmpl_ctx"""
with open(path_join(PATH_DIR_TEMPLATES, tmpl_name),
'r', encoding='utf8'
) as templ_file:
tmpl = Template(str(templ_file.read()))
- html: str = tmpl.render(**tmpl_ctx)
+ html= tmpl.render(**tmpl_ctx)
self._send_http(bytes(html, 'utf8'))
+ @staticmethod
+ def _make_template_context() -> TemplateContext:
+ """Create template context dictionary with current quota count."""
+ quota_count = 0
+ tmpl_ctx: TemplateContext = {'quota_count': quota_count}
+ for amount in read_quota_log().values():
+ quota_count += amount
+ return tmpl_ctx
+
+ def _make_downloads_db(self) -> DownloadsDB:
+ """Create dictionary of downloads mapping video IDs to file paths."""
+ downloads_db = {}
+ for e in [e for e in scandir(PATH_DIR_DOWNLOADS) if isfile(e.path)]:
+ before_ext, _ = splitext(e.path)
+ id_: VideoId = before_ext.split('[')[-1].split(']')[0]
+ downloads_db[id_] = e.path
+ return downloads_db
+
+ def _send_thumbnail(self, filename: FilePathStr) -> None:
+ """Send thumbnail file."""
+ with open(path_join(PATH_DIR_THUMBNAILS, filename), 'rb') as f:
+ img = f.read()
+ self._send_http(img, [('Content-type', 'image/jpg')])
+
+ def _send_or_download_video(self,
+ video_id: VideoId,
+ params: dict[str, list[str]]
+ ) -> None:
+ """If in storage, send video of video_id, otherwise download."""
+ downloads_db = self._make_downloads_db()
+ if video_id in downloads_db:
+ with open(downloads_db[video_id], 'rb') as video_file:
+ video = video_file.read()
+ self._send_http(content=video)
+ return
+ to_download.append(video_id)
+ dl_query_id: QueryId = params.get('from_query', [''])[0]
+ redir_path = f'/query/{dl_query_id}' if dl_query_id else '/'
+ self._send_http(headers=[('Location', redir_path)], code=302)
+
+ def _send_query_page(self, query_id: QueryId) -> None:
+ """Load cached query+result data, calculate further attribute, send."""
+
+ def reformat_duration(duration_str: str):
+ date_dur, time_dur = duration_str.split('T')
+ seconds: int = 0
+ date_dur = date_dur[1:]
+ for dur_char, len_seconds in (('Y', 60*60*24*365.25),
+ ('M', 60*60*24*30),
+ ('D', 60*60*24)):
+ if dur_char in date_dur:
+ dur_str, date_dur = date_dur.split(dur_char)
+ seconds += int(dur_str) * int(len_seconds)
+ for dur_char, len_seconds in (('H', 60*60),
+ ('M', 60),
+ ('S', 1)):
+ if dur_char in time_dur:
+ dur_str, time_dur = time_dur.split(dur_char)
+ seconds += int(dur_str) * len_seconds
+ seconds_str = str(seconds % 60)
+ minutes_str = str(seconds // 60)
+ hours_str = str(seconds // (60 * 60))
+ return ':'.join([f'0{s}' if len(s) == 1 else s
+ for s in (hours_str, minutes_str, seconds_str)])
+
+ downloads_db = self._make_downloads_db()
+ tmpl_ctx = self._make_template_context()
+ tmpl_ctx['youtube_prefix'] = YOUTUBE_URL_PREFIX
+ tmpl_ctx['query_id'] = query_id
+ with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{query_id}.json'),
+ 'r', encoding='utf8') as query_file:
+ query = json_load(query_file)
+ for result in query['results']:
+ result['available'] = result['id'] in downloads_db
+ result['duration'] = reformat_duration(result['duration'])
+ result['definition'] = result['definition'].upper()
+ tmpl_ctx['query'] = query
+ self._send_rendered_template(NAME_TEMPLATE_RESULTS, tmpl_ctx)
+
+ def _send_queries_index_and_search(self) -> None:
+ """Send listing of cached queries, search form for doing new ones."""
+ downloads_db = self._make_downloads_db()
+ tmpl_ctx = self._make_template_context()
+ queries: list[QueryData] = []
+ for file in [f for f in scandir(PATH_DIR_REQUESTS_CACHE)
+ if isfile(f.path)]:
+ id_, _ = splitext(basename(file.path))
+ with open(file.path, 'r', encoding='utf8') as query_file:
+ filed_query: QueryData = json_load(query_file)
+ filed_query['id'] = id_
+ assert isinstance(filed_query['results'], list)
+ for result in filed_query['results']:
+ result['available'] = result['id'] in downloads_db
+ filed_query['downloads'] = len(
+ [result for result in filed_query['results']
+ if result['available']])
+ queries += [filed_query]
+ queries.sort(key=lambda q: q['retrieved_at'], reverse=True)
+ tmpl_ctx['queries'] = queries
+ self._send_rendered_template(NAME_TEMPLATE_INDEX, tmpl_ctx)
+
if __name__ == '__main__':
ensure_expected_dirs_and_files()