VideoId = NewType('VideoId', str)
FilePathStr = NewType('FilePathStr', str)
QueryId = NewType('QueryId', str)
-Result: TypeAlias = dict[str, str | bool]
-QueryData: TypeAlias = dict[str, str | int | list[Result]]
-QuotaLog: TypeAlias = dict[DatetimeStr, QuotaCost]
+QueryText = NewType('QueryText', str)
+AmountDownloads = NewType('AmountDownloads', int)
+Result: TypeAlias = dict[str, str]
Header: TypeAlias = tuple[str, str]
+VideoData: TypeAlias = dict[str, str | bool]
+QueryData: TypeAlias = dict[str, QueryId | QueryText | DatetimeStr
+ | AmountDownloads | list[Result]]
+QuotaLog: TypeAlias = dict[DatetimeStr, QuotaCost]
DownloadsDB = dict[VideoId, FilePathStr]
-TemplateContext = dict[str, int | str | QueryData | list[QueryData] |
- list[tuple[VideoId, FilePathStr]]]
+TemplateContext = dict[str, str | QuotaCost | QueryData | VideoData |
+ list[QueryData] | list[VideoData] |
+ list[tuple[VideoId, FilePathStr]] |
+ list[tuple[QueryId, QueryText]]]
API_KEY = environ.get('GOOGLE_API_KEY')
HTTP_PORT = 8083
NAME_TEMPLATE_INDEX = FilePathStr('index.tmpl')
NAME_TEMPLATE_RESULTS = FilePathStr('results.tmpl')
NAME_TEMPLATE_VIDEOS = FilePathStr('videos.tmpl')
+NAME_TEMPLATE_VIDEO_ABOUT = FilePathStr('video_about.tmpl')
PATH_DIR_TEMP = FilePathStr(path_join(PATH_DIR_DOWNLOADS, NAME_DIR_TEMP))
EXPECTED_DIRS = [PATH_DIR_DOWNLOADS, PATH_DIR_TEMP, PATH_DIR_THUMBNAILS,
def do_POST(self) -> None: # pylint:disable=invalid-name
"""Send requests to YouTube API and cache them."""
- def store_at_filename_hashing_query(query_data: QueryData) -> QueryId:
- md5sum = md5(query_txt.encode()).hexdigest()
- with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{md5sum}.json'),
- 'w', encoding='utf8') as f:
- json_dump(query_data, f)
- return QueryId(md5sum)
-
- def collect_results(now: DatetimeStr, query_txt: str) -> list[Result]:
+ def collect_results(now: DatetimeStr,
+ query_txt: QueryText
+ ) -> list[Result]:
youtube = googleapiclient.discovery.build('youtube', 'v3',
developerKey=API_KEY)
update_quota_log(now, QUOTA_COST_YOUTUBE_SEARCH)
body_length = int(self.headers['content-length'])
postvars = parse_qs(self.rfile.read(body_length).decode())
- query_txt = postvars['query'][0]
+ query_txt = QueryText(postvars['query'][0])
now = DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT))
results = collect_results(now, query_txt)
- md5sum = store_at_filename_hashing_query(
- {'text': query_txt, 'retrieved_at': now, 'results': results})
+ md5sum = md5(str(query_txt).encode()).hexdigest()
+ with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{md5sum}.json'),
+ 'w', encoding='utf8') as f:
+ json_dump({'text': query_txt,
+ 'retrieved_at': now,
+ 'results': results}, f)
self._send_http(headers=[('Location', f'/query/{md5sum}')], code=302)
def do_GET(self) -> None: # pylint:disable=invalid-name
if 'thumbnails' == page_name:
self._send_thumbnail(FilePathStr(toks_url[2]))
elif 'dl' == page_name:
- self._send_or_download_video(VideoId(toks_url[2]),
- parse_qs(url.query))
+ self._send_or_download_video(VideoId(toks_url[2]))
elif 'videos' == page_name:
self._send_videos_index()
+ elif 'video_about' == page_name:
+ self._send_video_about(VideoId(toks_url[2]))
elif 'query' == page_name:
self._send_query_page(QueryId(toks_url[2]))
else: # e.g. for /
tmpl_name: FilePathStr,
tmpl_ctx: TemplateContext
) -> None:
- """Send HTML rendered from tmpl_name and tmpl_ctx"""
with open(path_join(PATH_DIR_TEMPLATES, tmpl_name),
'r', encoding='utf8'
) as templ_file:
html = tmpl.render(**tmpl_ctx)
self._send_http(bytes(html, 'utf8'))
- @staticmethod
- def _make_template_context(with_quota_count: bool = True
- ) -> TemplateContext:
- """Create template context dictionary with current quota count."""
- tmpl_ctx: TemplateContext = {}
- if with_quota_count:
- quota_count = 0
- tmpl_ctx['quota_count'] = quota_count
- for amount in read_quota_log().values():
- quota_count += amount
- return tmpl_ctx
-
def _make_downloads_db(self) -> DownloadsDB:
- """Create dictionary of downloads mapping video IDs to file paths."""
downloads_db = {}
for e in [e for e in scandir(PATH_DIR_DOWNLOADS) if isfile(e.path)]:
before_ext = splitext(e.path)[0]
downloads_db[id_] = FilePathStr(e.path)
return downloads_db
- def _send_thumbnail(self, filename: FilePathStr) -> None:
- """Send thumbnail file."""
- with open(path_join(PATH_DIR_THUMBNAILS, filename), 'rb') as f:
- img = f.read()
- self._send_http(img, [('Content-type', 'image/jpg')])
-
- def _send_or_download_video(self,
- video_id: VideoId,
- params: dict[str, list[str]]
- ) -> None:
- """If in storage, send video of video_id, otherwise download."""
- downloads_db = self._make_downloads_db()
- if video_id in downloads_db:
- with open(downloads_db[video_id], 'rb') as video_file:
- video = video_file.read()
- self._send_http(content=video)
- return
- to_download.append(video_id)
- dl_query_id = params.get('from_query', [''])[0]
- redir_path = f'/query/{dl_query_id}' if dl_query_id else '/'
- self._send_http(headers=[('Location', redir_path)], code=302)
-
- def _send_query_page(self, query_id: QueryId) -> None:
- """Load cached query+result data, calculate further attribute, send."""
+ def _harvest_queries(self) -> list[tuple[QueryId, list[Result],
+ QueryText, DatetimeStr]]:
+ queries_data = []
+ for file in [f for f in scandir(PATH_DIR_REQUESTS_CACHE)
+ if isfile(f.path)]:
+ with open(file.path, 'r', encoding='utf8') as query_file:
+ filed_query: QueryData = json_load(query_file)
+ assert isinstance(filed_query['results'], list)
+ assert type(filed_query['text']) is QueryText
+ assert type(filed_query['retrieved_at']) is DatetimeStr
+ id_ = QueryId(splitext(basename(file.path))[0])
+ results_list = filed_query['results']
+ query_text = filed_query['text']
+ retrieved_at = filed_query['retrieved_at']
+ queries_data += [(id_, results_list, query_text, retrieved_at)]
+ return queries_data
+
+ def _result_to_video_data(self,
+ result: Result,
+ downloads_db: DownloadsDB
+ ) -> VideoData:
def reformat_duration(duration_str: str):
date_dur, time_dur = duration_str.split('T')
hours_str = str(seconds // (60 * 60))
return ':'.join([f'0{s}' if len(s) == 1 else s
for s in (hours_str, minutes_str, seconds_str)])
+ assert isinstance(result['duration'], str)
+ return {
+ 'id': result['id'],
+ 'available': result['id'] in downloads_db,
+ 'duration': reformat_duration(result['duration']),
+ 'title': result['title'],
+ 'definition': result['definition']
+ }
+
+ def _send_thumbnail(self, filename: FilePathStr) -> None:
+ with open(path_join(PATH_DIR_THUMBNAILS, filename), 'rb') as f:
+ img = f.read()
+ self._send_http(img, [('Content-type', 'image/jpg')])
+
+ def _send_or_download_video(self, video_id: VideoId) -> None:
+ downloads_db = self._make_downloads_db()
+ if video_id in downloads_db:
+ with open(downloads_db[video_id], 'rb') as video_file:
+ video = video_file.read()
+ self._send_http(content=video)
+ return
+ to_download.append(video_id)
+ self._send_http(headers=[('Location', f'/video_about/{video_id}')],
+ code=302)
+ def _send_query_page(self, query_id: QueryId) -> None:
downloads_db = self._make_downloads_db()
- tmpl_ctx = self._make_template_context()
- tmpl_ctx['youtube_prefix'] = YOUTUBE_URL_PREFIX
- tmpl_ctx['query_id'] = query_id
with open(path_join(PATH_DIR_REQUESTS_CACHE, f'{query_id}.json'),
'r', encoding='utf8') as query_file:
query = json_load(query_file)
- for result in query['results']:
- result['available'] = result['id'] in downloads_db
- result['duration'] = reformat_duration(result['duration'])
- result['definition'] = result['definition'].upper()
- tmpl_ctx['query'] = query
- self._send_rendered_template(NAME_TEMPLATE_RESULTS, tmpl_ctx)
+ self._send_rendered_template(
+ NAME_TEMPLATE_RESULTS,
+ {'query': query['text'],
+ 'videos': [self._result_to_video_data(result, downloads_db)
+ for result in query['results']]})
def _send_queries_index_and_search(self) -> None:
- """Send listing of cached queries, search form for doing new ones."""
downloads_db = self._make_downloads_db()
- tmpl_ctx = self._make_template_context()
- queries: list[QueryData] = []
- for file in [f for f in scandir(PATH_DIR_REQUESTS_CACHE)
- if isfile(f.path)]:
- id_ = splitext(basename(file.path))[0]
- with open(file.path, 'r', encoding='utf8') as query_file:
- filed_query: QueryData = json_load(query_file)
- filed_query['id'] = id_
- assert isinstance(filed_query['results'], list)
- for result in filed_query['results']:
- result['available'] = result['id'] in downloads_db
- filed_query['downloads'] = len(
- [result for result in filed_query['results']
- if result['available']])
- queries += [filed_query]
- queries.sort(key=lambda q: q['retrieved_at'], reverse=True)
- tmpl_ctx['queries'] = queries
- self._send_rendered_template(NAME_TEMPLATE_INDEX, tmpl_ctx)
+ queries_data: list[QueryData] = []
+ for id_, results, query_text, timestamp in self._harvest_queries():
+ queries_data += [
+ {'id': id_, 'text': query_text, 'retrieved_at': timestamp,
+ 'downloads': AmountDownloads(len([
+ r for r in results if r['id'] in downloads_db]))}]
+ queries_data.sort(key=lambda q: q['retrieved_at'], reverse=True)
+ self._send_rendered_template(NAME_TEMPLATE_INDEX, {'queries':
+ queries_data})
+
+ def _send_video_about(self, video_id: VideoId) -> None:
+ linked_queries: list[tuple[QueryId, QueryText]] = []
+ first_result: Optional[Result] = None
+ for id_, results, query_text, _ in self._harvest_queries():
+ for result in results:
+ if video_id == result['id']:
+ linked_queries += [(id_, query_text)]
+ first_result = first_result or result
+ if not first_result:
+ self._send_http(b'nothing found', code=404)
+ return
+ self._send_rendered_template(
+ NAME_TEMPLATE_VIDEO_ABOUT,
+ {'video_id': video_id,
+ 'youtube_prefix': YOUTUBE_URL_PREFIX,
+ 'queries': linked_queries,
+ 'video_data': self._result_to_video_data(
+ first_result, self._make_downloads_db())})
def _send_videos_index(self) -> None:
- """Send listing of downloaded videos, linked to their /dl pages."""
- downloads_db = self._make_downloads_db()
- tmpl_ctx = self._make_template_context(with_quota_count=False)
videos = [(id_, FilePathStr(basename(path)))
- for id_, path in downloads_db.items()]
+ for id_, path in self._make_downloads_db().items()]
videos.sort(key=lambda t: t[1])
- tmpl_ctx['videos'] = videos
- self._send_rendered_template(NAME_TEMPLATE_VIDEOS, tmpl_ctx)
+ self._send_rendered_template(NAME_TEMPLATE_VIDEOS, {'videos': videos})
if __name__ == '__main__':