#!/usr/bin/env python3
"""Minimalistic download-focused YouTube interface."""
from typing import TypeAlias, Optional, NewType, Callable, Self, Any
-from os import environ, makedirs, scandir, remove as os_remove
+from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove
from os.path import (isdir, isfile, exists as path_exists, join as path_join,
splitext, basename)
from random import shuffle
DatetimeStr = NewType('DatetimeStr', str)
QuotaCost = NewType('QuotaCost', int)
-VideoId = NewType('VideoId', str)
+YoutubeId = NewType('YoutubeId', str)
PathStr = NewType('PathStr', str)
QueryId = NewType('QueryId', int)
QueryText = NewType('QueryText', str)
SqlText = NewType('SqlText', str)
AmountDownloads = NewType('AmountDownloads', int)
PlayerUpdateId = NewType('PlayerUpdateId', str)
-DownloadsIndex: TypeAlias = dict[VideoId, PathStr]
+DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr]
TemplateContext: TypeAlias = dict[
- str, None | bool | PlayerUpdateId | Optional[PathStr] | VideoId
- | QueryText | QuotaCost | 'VideoData' | list['VideoData']
- | list['QueryData'] | list[tuple[VideoId, PathStr]]
+ str, None | bool | PlayerUpdateId | Optional[PathStr] | YoutubeId
+ | QueryText | QuotaCost | 'YoutubeVideo' | list['YoutubeVideo']
+ | list['QueryData'] | list[tuple[YoutubeId, PathStr]]
| list[tuple[PathStr, PathStr]]]
timestamp TEXT NOT NULL,
cost INT NOT NULL
);
+CREATE TABLE files (
+ rel_path TEXT PRIMARY KEY,
+ yt_id TEXT NOT NULL DEFAULT "",
+ FOREIGN KEY (yt_id) REFERENCES yt_videos(id)
+);
'''
@classmethod
def get_all_for_video(cls,
conn: DatabaseConnection,
- video_id: VideoId
+ video_id: YoutubeId
) -> list[Self]:
- """Return all QueryData that got VideoData of video_id as result."""
+ """Return all QueryData that got YoutubeVideo of video_id as result."""
sql = SqlText('SELECT query_id FROM '
'yt_query_results WHERE video_id = ?')
query_ids = conn.exec(sql, (video_id,)).fetchall()
for query_id_tup in query_ids]
-class VideoData(DbData):
+class YoutubeVideo(DbData):
"""Representation of YouTube video metadata as provided by their API."""
_table_name = 'yt_videos'
_cols = ('id_', 'title', 'description', 'published_at', 'duration',
'definition')
def __init__(self,
- id_: VideoId,
+ id_: YoutubeId,
title: ProseText = ProseText('?'),
description: ProseText = ProseText('?'),
published_at: DatetimeStr = DatetimeStr('?'),
(query_id, self.id_))
+class VideoFile(DbData):
+ """Collects data about downloaded files."""
+ _table_name = 'files'
+ _cols = ('rel_path', 'yt_id')
+
+ def __init__(self, rel_path: PathStr, yt_id: YoutubeId) -> None:
+ self.rel_path = rel_path
+ self.yt_id = yt_id
+
+ def remove(self, conn: DatabaseConnection) -> None:
+ """Remove self from database by self.rel_path as identifier."""
+ sql = SqlText(f'DELETE FROM {self._table_name} WHERE rel_path = ?')
+ conn.exec(SqlText(sql), (self.rel_path,))
+
+
class QuotaLog(DbData):
"""Collects API access quota costs."""
_table_name = 'quota_costs'
"""Collections downloading-related stuff."""
def __init__(self) -> None:
- self._to_download: list[VideoId] = []
+ self._to_download: list[YoutubeId] = []
_ensure_expected_dirs([PATH_DIR_DOWNLOADS, PATH_DIR_TEMP])
+ self._sync_db()
+
+ def _sync_db(self):
+ conn = DatabaseConnection()
+ files_via_db = VideoFile.get_all(conn)
+ old_cwd = getcwd()
+ chdir(PATH_DIR_DOWNLOADS)
+ for file in files_via_db:
+ if not isfile(path_join(file.rel_path)):
+ print(f'SYNC: no file {file.rel_path} found, removing entry.')
+ file.remove(conn)
+ paths = [file.rel_path for file in files_via_db]
+ for path in [PathStr(e.path) for e in scandir() if isfile(e.path)]:
+ if path not in paths:
+ yt_id = self._id_from_filename(path)
+ file = VideoFile(path, yt_id)
+ print(f'SYNC: new file {path}, saving with YT ID "{yt_id}".')
+ file.save(conn)
+ chdir(old_cwd)
+ self._files = VideoFile.get_all(conn)
+ conn.commit_close()
@staticmethod
def _id_from_filename(path: PathStr,
double_split: bool = False
- ) -> VideoId:
+ ) -> YoutubeId:
before_ext = splitext(path)[0]
if double_split:
before_ext = splitext(before_ext)[0]
- return VideoId(before_ext.split('[')[-1].split(']')[0])
+ return YoutubeId(before_ext.split('[')[-1].split(']')[0])
@property
def ids_to_paths(self) -> DownloadsIndex:
- """Return mapping of VideoIds to paths of files downloaded to them."""
- ids_to_paths = {}
- for path in [PathStr(e.path) for e
- in scandir(PATH_DIR_DOWNLOADS) if isfile(e.path)]:
- ids_to_paths[self._id_from_filename(path)] = PathStr(path)
- return ids_to_paths
+ """Return mapping YoutubeIds:paths of files downloaded to them."""
+ self._sync_db()
+ return {f.yt_id: PathStr(path_join(PATH_DIR_DOWNLOADS, f.rel_path))
+ for f in self._files}
@property
- def ids_unfinished(self) -> set[VideoId]:
+ def ids_unfinished(self) -> set[YoutubeId]:
"""Return set of IDs of videos awaiting or currently in download."""
in_temp_dir = []
for path in [PathStr(e.path) for e
print(f'removing unfinished download: {e.path}')
os_remove(e.path)
- def queue_download(self, video_id: VideoId) -> None:
+ def queue_download(self, video_id: YoutubeId) -> None:
"""Add video_id to download queue *if* not already processed."""
pre_existing = self.ids_unfinished | set(self._to_download
+ list(self.ids_to_paths))
def _post_query(self, query_txt: QueryText) -> None:
conn = DatabaseConnection()
- def collect_results(query_txt: QueryText) -> list[VideoData]:
+ def collect_results(query_txt: QueryText) -> list[YoutubeVideo]:
youtube = googleapiclient.discovery.build('youtube', 'v3',
developerKey=API_KEY)
QuotaLog.update(conn, QUOTA_COST_YOUTUBE_SEARCH)
maxResults=25,
safeSearch='none',
type='video')
- results: list[VideoData] = []
- ids_to_detail: list[VideoId] = []
+ results: list[YoutubeVideo] = []
+ ids_to_detail: list[YoutubeId] = []
for item in search_request.execute()['items']:
- video_id: VideoId = item['id']['videoId']
+ video_id: YoutubeId = item['id']['videoId']
ids_to_detail += [video_id]
snippet = item['snippet']
urlretrieve(snippet['thumbnails']['default']['url'],
path_join(PATH_DIR_THUMBNAILS, f'{video_id}.jpg'))
- results += [VideoData(id_=video_id,
- title=snippet['title'],
- description=snippet['description'],
- published_at=snippet['publishedAt'])]
+ results += [YoutubeVideo(id_=video_id,
+ title=snippet['title'],
+ description=snippet['description'],
+ published_at=snippet['publishedAt'])]
QuotaLog.update(conn, QUOTA_COST_YOUTUBE_DETAILS)
ids_for_details = ','.join([r.id_ for r in results])
videos_request = youtube.videos().list(id=ids_for_details,
if 'thumbnails' == page_name:
self._send_thumbnail(PathStr(toks_url[2]))
elif 'dl' == page_name:
- self._send_or_download_video(VideoId(toks_url[2]))
+ self._send_or_download_video(YoutubeId(toks_url[2]))
elif 'videos' == page_name:
self._send_videos_index()
elif 'video_about' == page_name:
- self._send_video_about(VideoId(toks_url[2]))
+ self._send_video_about(YoutubeId(toks_url[2]))
elif 'query' == page_name:
self._send_query_page(QueryId(int(toks_url[2])))
elif 'queries' == page_name:
img = f.read()
self._send_http(img, [('Content-type', 'image/jpg')])
- def _send_or_download_video(self, video_id: VideoId) -> None:
+ def _send_or_download_video(self, video_id: YoutubeId) -> None:
if video_id in self.server.downloads.ids_to_paths:
with open(self.server.downloads.ids_to_paths[video_id],
'rb') as video_file:
def _send_query_page(self, query_id: QueryId) -> None:
conn = DatabaseConnection()
query = QueryData.get_one(conn, str(query_id))
- results = VideoData.get_all_for_query(conn, query_id)
+ results = YoutubeVideo.get_all_for_query(conn, query_id)
conn.commit_close()
self._send_rendered_template(
NAME_TEMPLATE_RESULTS,
NAME_TEMPLATE_QUERIES, {'queries': queries_data,
'quota_count': quota_count})
- def _send_video_about(self, video_id: VideoId) -> None:
+ def _send_video_about(self, video_id: YoutubeId) -> None:
conn = DatabaseConnection()
linked_queries = QueryData.get_all_for_video(conn, video_id)
try:
- video_data = VideoData.get_one(conn, video_id)
+ video_data = YoutubeVideo.get_one(conn, video_id)
except NotFoundException:
- video_data = VideoData(video_id)
+ video_data = YoutubeVideo(video_id)
conn.commit_close()
self._send_rendered_template(
NAME_TEMPLATE_VIDEO_ABOUT,