-"""Main ytplom lib."""
-
-# included libs
-from typing import TypeAlias, Optional, NewType, Callable, Self, Any
-from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove
-from os.path import (dirname, isdir, isfile, exists as path_exists,
- join as path_join, splitext, basename)
-from random import shuffle
-from time import time, sleep
-from datetime import datetime, timedelta
-from json import dumps as json_dumps
-from uuid import uuid4
-from sqlite3 import connect as sql_connect, Cursor, Row
-from http.server import HTTPServer, BaseHTTPRequestHandler
-from urllib.parse import urlparse, parse_qs
-from urllib.request import urlretrieve
-from urllib.error import HTTPError
-# non-included libs
-from jinja2 import ( # type: ignore
- Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader)
-from mpv import MPV # type: ignore
-from yt_dlp import YoutubeDL # type: ignore
-import googleapiclient.discovery # type: ignore
-
-# what we might want to manually define per environs
-API_KEY = environ.get('GOOGLE_API_KEY')
-HTTP_PORT = int(environ.get('YTPLOM_PORT', 8084))
-
-# type definitions for mypy
-DatetimeStr = NewType('DatetimeStr', str)
-QuotaCost = NewType('QuotaCost', int)
-YoutubeId = NewType('YoutubeId', str)
-PathStr = NewType('PathStr', str)
-QueryId = NewType('QueryId', str)
-QueryText = NewType('QueryText', str)
-ProseText = NewType('ProseText', str)
-SqlText = NewType('SqlText', str)
-FlagName = NewType('FlagName', str)
-FlagsInt = NewType('FlagsInt', int)
-AmountDownloads = NewType('AmountDownloads', int)
-PlayerUpdateId = NewType('PlayerUpdateId', str)
-DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr]
-TemplateContext: TypeAlias = dict[
- str, None | bool | PlayerUpdateId | Optional[PathStr] | YoutubeId
- | QueryText | QuotaCost | list[FlagName] | 'VideoFile' | 'YoutubeVideo'
- | list['YoutubeVideo'] | list['YoutubeQuery']
- | list[tuple[YoutubeId, PathStr]] | list[tuple[PathStr, PathStr]]]
-
-# major expected directories
-PATH_HOME = PathStr(environ.get('HOME', ''))
-PATH_APP_DATA = PathStr(path_join(PATH_HOME, '.local/share/ytplom'))
-PATH_CACHE = PathStr(path_join(PATH_HOME, '.cache/ytplom'))
-
-# paths for rather dynamic data
-PATH_DOWNLOADS = PathStr(path_join(PATH_HOME, 'ytplom_downloads'))
-PATH_DB = PathStr(path_join(PATH_APP_DATA, 'db.sql'))
-PATH_TEMP = PathStr(path_join(PATH_CACHE, 'temp'))
-PATH_THUMBNAILS = PathStr(path_join(PATH_CACHE, 'thumbnails'))
-
-# template paths
-PATH_TEMPLATES = PathStr(path_join(PATH_APP_DATA, 'templates'))
-NAME_TEMPLATE_QUERIES = PathStr('queries.tmpl')
-NAME_TEMPLATE_RESULTS = PathStr('results.tmpl')
-NAME_TEMPLATE_VIDEOS = PathStr('videos.tmpl')
-NAME_TEMPLATE_VIDEO = PathStr('video.tmpl')
-NAME_TEMPLATE_YT_VIDEO = PathStr('yt_video.tmpl')
-NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl')
-
-# yt_dlp config
-YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
- '/best[height<=1080][width<=1920]'
-YT_DL_PARAMS = {'paths': {'home': PATH_DOWNLOADS,
- 'temp': PATH_TEMP},
- 'format': YT_DOWNLOAD_FORMAT}
-
-# Youtube API expectations
-YOUTUBE_URL_PREFIX = PathStr('https://www.youtube.com/watch?v=')
-THUMBNAIL_URL_PREFIX = PathStr('https://i.ytimg.com/vi/')
-THUMBNAIL_URL_SUFFIX = PathStr('/default.jpg')
-QUOTA_COST_YOUTUBE_SEARCH = QuotaCost(100)
-QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1)
-
-# local expectations
-TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
-LEGAL_EXTENSIONS = {'webm', 'mp4', 'mkv'}
-
-# database stuff
-EXPECTED_DB_VERSION = 0
-SQL_DB_VERSION = SqlText('PRAGMA user_version')
-PATH_MIGRATIONS = PathStr(path_join(PATH_APP_DATA, 'migrations'))
-PATH_DB_SCHEMA = PathStr(path_join(PATH_MIGRATIONS,
- f'init_{EXPECTED_DB_VERSION}.sql'))
-
-# other
-NAME_INSTALLER = PathStr('install.sh')
-VIDEO_FLAGS: dict[FlagName, FlagsInt] = {
- FlagName('delete'): FlagsInt(1 << 62)
-}
-
-
-class NotFoundException(Exception):
- """Raise on expected data missing, e.g. DB fetches finding nothing."""
-
-
-class HandledException(Exception):
- """Raise in any other case where we know what's happening."""
-
-
-def _ensure_expected_dirs(expected_dirs: list[PathStr]) -> None:
- """Ensure existance of expected_dirs _as_ directories."""
- for dir_name in expected_dirs:
- if not isdir(dir_name):
- if path_exists(dir_name):
- raise HandledException(f'at expected directory path {dir_name}'
- 'found non-directory')
- print(f'creating expected directory: {dir_name}')
- makedirs(dir_name)
-
-
-class DatabaseConnection:
- """Wrapped sqlite3.Connection."""
-
- def __init__(self, path: PathStr = PATH_DB) -> None:
- self._path = path
- if not isfile(self._path):
- if path_exists(self._path):
- raise HandledException(f'no DB at {self._path}; would create, '
- 'but something\'s already there?')
- path_db_dir = dirname(self._path)
- if not isdir(path_db_dir):
- raise NotFoundException(
- f'cannot find {path_db_dir} as directory to put DB '
- f'into, did you run {NAME_INSTALLER}?')
- with sql_connect(self._path) as conn:
- with open(PATH_DB_SCHEMA, 'r', encoding='utf8') as f:
- conn.executescript(f.read())
- conn.execute(f'{SQL_DB_VERSION} = {EXPECTED_DB_VERSION}')
- with sql_connect(self._path) as conn:
- db_version = list(conn.execute(SQL_DB_VERSION))[0][0]
- if db_version != EXPECTED_DB_VERSION:
- raise HandledException(f'wrong database version {db_version}, '
- f'expected: {EXPECTED_DB_VERSION}')
- self._conn = sql_connect(self._path)
-
- def exec(self, sql: SqlText, inputs: tuple[Any, ...] = tuple()) -> Cursor:
- """Wrapper around sqlite3.Connection.execute."""
- return self._conn.execute(sql, inputs)
-
- def commit_close(self) -> None:
- """Run sqlite3.Connection.commit and .close."""
- self._conn.commit()
- self._conn.close()
-
-
-class DbData:
- """Abstraction of common DB operation."""
- _table_name: str
- _cols: tuple[str, ...]
-
- def __eq__(self, other: Any) -> bool:
- if not isinstance(other, self.__class__):
- return False
- for attr_name in self._cols:
- if getattr(self, attr_name) != getattr(other, attr_name):
- return False
- return True
-
- @classmethod
- def _from_table_row(cls, row: Row) -> Self:
- kwargs = {}
- for i, col_name in enumerate(cls._cols):
- kwargs[col_name] = row[i]
- return cls(**kwargs)
-
- @classmethod
- def get_one(cls, conn: DatabaseConnection, id_: str) -> Self:
- """Return single entry of id_ from DB."""
- sql = SqlText(f'SELECT * FROM {cls._table_name} WHERE id = ?')
- row = conn.exec(sql, (id_,)).fetchone()
- if not row:
- msg = f'no entry found for ID "{id_}" in table {cls._table_name}'
- raise NotFoundException(msg)
- return cls._from_table_row(row)
-
- @classmethod
- def get_all(cls, conn: DatabaseConnection) -> list[Self]:
- """Return all entries from DB."""
- sql = SqlText(f'SELECT * FROM {cls._table_name}')
- rows = conn.exec(sql).fetchall()
- return [cls._from_table_row(row) for row in rows]
-
- def save(self, conn: DatabaseConnection) -> Cursor:
- """Save entry to DB."""
- vals = [getattr(self, col_name) for col_name in self._cols]
- q_marks = '(' + ','.join(['?'] * len(vals)) + ')'
- sql = SqlText(f'REPLACE INTO {self._table_name} VALUES {q_marks}')
- return conn.exec(sql, tuple(vals))
-
-
-class YoutubeQuery(DbData):
- """Representation of YouTube query (without results)."""
- _table_name = 'yt_queries'
- _cols = ('id_', 'text', 'retrieved_at')
-
- def __init__(self,
- id_: Optional[QueryId],
- text: QueryText,
- retrieved_at: DatetimeStr
- ) -> None:
- self.id_ = id_ if id_ else QueryId(str(uuid4()))
- self.text = QueryText(text)
- self.retrieved_at = retrieved_at
-
- @classmethod
- def get_all_for_video(cls,
- conn: DatabaseConnection,
- video_id: YoutubeId
- ) -> list[Self]:
- """Return YoutubeQueries containing YoutubeVideo's ID in results."""
- sql = SqlText('SELECT query_id FROM '
- 'yt_query_results WHERE video_id = ?')
- query_ids = conn.exec(sql, (video_id,)).fetchall()
- return [cls.get_one(conn, query_id_tup[0])
- for query_id_tup in query_ids]
-
-
-class YoutubeVideo(DbData):
- """Representation of YouTube video metadata as provided by their API."""
- _table_name = 'yt_videos'
- _cols = ('id_', 'title', 'description', 'published_at', 'duration',
- 'definition')
-
- def __init__(self,
- id_: YoutubeId,
- title: ProseText = ProseText('?'),
- description: ProseText = ProseText('?'),
- published_at: DatetimeStr = DatetimeStr('?'),
- duration: str = '?',
- definition: str = '?'
- ) -> None:
- self.id_ = id_
- self.title = title
- self.description = description
- self.published_at = published_at
- self.duration = duration
- self.definition = definition
-
- def set_duration_from_yt_string(self, yt_string: str) -> None:
- """Set .duration from the kind of format the YouTube API provides."""
- date_dur, time_dur = yt_string.split('T')
- seconds = 0
- date_dur = date_dur[1:]
- for dur_char, len_seconds in (('Y', 60*60*24*365.25),
- ('M', 60*60*24*30),
- ('D', 60*60*24)):
- if dur_char in date_dur:
- dur_str, date_dur = date_dur.split(dur_char)
- seconds += int(dur_str) * int(len_seconds)
- for dur_char, len_seconds in (('H', 60*60),
- ('M', 60),
- ('S', 1)):
- if dur_char in time_dur:
- dur_str, time_dur = time_dur.split(dur_char)
- seconds += int(dur_str) * len_seconds
- seconds_str = str(seconds % 60)
- minutes_str = str(seconds // 60)
- hours_str = str(seconds // (60 * 60))
- self.duration = ':'.join([f'0{s}' if len(s) == 1 else s for s
- in (hours_str, minutes_str, seconds_str)])
-
- @classmethod
- def get_all_for_query(cls,
- conn: DatabaseConnection,
- query_id: QueryId
- ) -> list[Self]:
- """Return all videos for query of query_id."""
- sql = SqlText('SELECT video_id '
- 'FROM yt_query_results WHERE query_id = ?')
- video_ids = conn.exec(sql, (query_id,)).fetchall()
- return [cls.get_one(conn, video_id_tup[0])
- for video_id_tup in video_ids]
-
- def save_to_query(self,
- conn: DatabaseConnection,
- query_id: QueryId
- ) -> None:
- """Save inclusion of self in results to query of query_id."""
- conn.exec(SqlText('REPLACE INTO yt_query_results VALUES (?, ?)'),
- (query_id, self.id_))
-
-
-class VideoFile(DbData):
- """Collects data about downloaded files."""
- _table_name = 'files'
- _cols = ('rel_path', 'yt_id', 'flags')
-
- def __init__(self, rel_path: PathStr, yt_id: YoutubeId, flags=FlagsInt(0)
- ) -> None:
- self.rel_path = rel_path
- self.yt_id = yt_id
- self.flags = flags
-
- @classmethod
- def get_by_yt_id(cls, conn: DatabaseConnection, yt_id: YoutubeId) -> Self:
- """Return VideoFile of .yt_id."""
- sql = SqlText(f'SELECT * FROM {cls._table_name} WHERE yt_id = ?')
- row = conn.exec(sql, (yt_id,)).fetchone()
- if not row:
- raise NotFoundException(f'no entry for file to Youtube ID {yt_id}')
- return cls._from_table_row(row)
-
- @property
- def full_path(self) -> PathStr:
- """Return self.rel_path suffixed under PATH_DOWNLOADS."""
- return PathStr(path_join(PATH_DOWNLOADS, self.rel_path))
-
- @property
- def present(self) -> bool:
- """Return if file exists in filesystem."""
- return path_exists(self.full_path)
-
- @property
- def missing(self) -> bool:
- """Return if file absent despite absence of 'delete' flag."""
- return not (self.flag_set(FlagName('delete')) or self.present)
-
- def flag_set(self, flag_name: FlagName) -> bool:
- """Return if flag of flag_name is set in self.flags."""
- return self.flags & VIDEO_FLAGS[flag_name]
-
- def ensure_absence_if_deleted(self) -> None:
- """If 'delete' flag set, ensure no actual file in filesystem."""
- if self.flag_set(FlagName('delete')) and path_exists(self.full_path):
- print(f'SYNC: {self.rel_path} set "delete", '
- 'removing from filesystem.')
- os_remove(self.full_path)
-
-
-class QuotaLog(DbData):
- """Collects API access quota costs."""
- _table_name = 'quota_costs'
- _cols = ('id_', 'timestamp', 'cost')
-
- def __init__(self,
- id_: Optional[str],
- timestamp: DatetimeStr,
- cost: QuotaCost
- ) -> None:
- self.id_ = id_ if id_ else str(uuid4())
- self.timestamp = timestamp
- self.cost = cost
-
- @classmethod
- def update(cls, conn: DatabaseConnection, cost: QuotaCost) -> None:
- """Adds cost mapped to current datetime."""
- cls._remove_old(conn)
- new = cls(None,
- DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)),
- QuotaCost(cost))
- new.save(conn)
-
- @classmethod
- def current(cls, conn: DatabaseConnection) -> QuotaCost:
- """Returns quota cost total for last 24 hours, purges old data."""
- cls._remove_old(conn)
- quota_costs = cls.get_all(conn)
- return QuotaCost(sum(c.cost for c in quota_costs))
-
- @classmethod
- def _remove_old(cls, conn: DatabaseConnection) -> None:
- cutoff = datetime.now() - timedelta(days=1)
- sql = SqlText(f'DELETE FROM {cls._table_name} WHERE timestamp < ?')
- conn.exec(SqlText(sql), (cutoff.strftime(TIMESTAMP_FMT),))
-
-
-class Player:
- """MPV representation with some additional features."""
- _idx: int
-
- def __init__(self) -> None:
- self.last_update = PlayerUpdateId('')
- self._load_filenames()
- self._mpv: Optional[MPV] = None
-
- def _load_filenames(self) -> None:
- self._filenames = [PathStr(e.path) for e in scandir(PATH_DOWNLOADS)
- if isfile(e.path)
- and splitext(e.path)[1][1:] in LEGAL_EXTENSIONS]
- shuffle(self._filenames)
- self._idx = 0
-
- @property
- def _mpv_available(self) -> bool:
- return bool(self._mpv and not self._mpv.core_shutdown)
-
- @staticmethod
- def _if_mpv_available(f) -> Callable:
- def wrapper(self):
- return f(self) if self._mpv else None
- return wrapper
-
- def _signal_update(self) -> None:
- self.last_update = PlayerUpdateId(f'{self._idx}:{time()}')
-
- def _start_mpv(self) -> None:
- self._mpv = MPV(input_default_bindings=True,
- input_vo_keyboard=True,
- config=True)
- self._mpv.observe_property('pause', lambda a, b: self._signal_update())
-
- @self._mpv.event_callback('start-file')
- def on_start_file(_) -> None:
- assert self._mpv is not None
- self._mpv.pause = False
- self._idx = self._mpv.playlist_pos
- self._signal_update()
-
- @self._mpv.event_callback('shutdown')
- def on_shutdown(_) -> None:
- self._mpv = None
- self._signal_update()
-
- for path in self._filenames:
- self._mpv.playlist_append(path)
- self._mpv.playlist_play_index(self._idx)
-
- @_if_mpv_available
- def _kill_mpv(self) -> None:
- assert self._mpv is not None
- self._mpv.terminate()
- self._mpv = None
-
- @property
- def current_filename(self) -> Optional[PathStr]:
- """Return what we assume is the name of the currently playing file."""
- if not self._filenames:
- return None
- return PathStr(basename(self._filenames[self._idx]))
-
- @property
- def prev_files(self) -> list[PathStr]:
- """List 'past' files of playlist."""
- return list(reversed(self._filenames[:self._idx]))
-
- @property
- def next_files(self) -> list[PathStr]:
- """List 'coming' files of playlist."""
- return self._filenames[self._idx + 1:]
-
- @property
- def is_running(self) -> bool:
- """Return if player is running/available."""
- return self._mpv_available
-
- @property
- def is_paused(self) -> bool:
- """Return if player is paused."""
- if self._mpv_available:
- assert self._mpv is not None
- return self._mpv.pause
- return False
-
- def toggle_run(self) -> None:
- """Toggle player running."""
- if self._mpv_available:
- self._kill_mpv()
- else:
- self._start_mpv()
- self._signal_update()
-
- @_if_mpv_available
- def toggle_pause(self) -> None:
- """Toggle player pausing."""
- assert self._mpv is not None
- self._mpv.pause = not self._mpv.pause
- self._signal_update()
-
- @_if_mpv_available
- def prev(self) -> None:
- """Move player to previous item in playlist."""
- assert self._mpv is not None
- if self._mpv.playlist_pos > 0:
- self._mpv.playlist_prev()
- else:
- self._mpv.playlist_play_index(0)
-
- @_if_mpv_available
- def next(self) -> None:
- """Move player to next item in playlist."""
- assert self._mpv is not None
- max_idx: int = len(self._mpv.playlist_filenames) - 1
- if self._mpv.playlist_pos < len(self._mpv.playlist_filenames) - 1:
- self._mpv.playlist_next()
- else:
- self._mpv.playlist_play_index(max_idx)
-
- def reload(self) -> None:
- """Close MPV, re-read (and re-shuffle) filenames, then re-start MPV."""
- self._kill_mpv()
- self._load_filenames()
- self._start_mpv()
- self._signal_update()
-
-
-class DownloadsDb:
- """Collections downloading-related stuff."""
-
- def __init__(self) -> None:
- self._to_download: list[YoutubeId] = []
- _ensure_expected_dirs([PATH_DOWNLOADS, PATH_TEMP])
- self._sync_db()
-
- def _sync_db(self):
- conn = DatabaseConnection()
- files_via_db = VideoFile.get_all(conn)
- old_cwd = getcwd()
- chdir(PATH_DOWNLOADS)
- paths = [file.rel_path for file in files_via_db]
- for path in [PathStr(e.path) for e in scandir() if isfile(e.path)]:
- if path not in paths:
- yt_id = self._id_from_filename(path)
- file = VideoFile(path, yt_id)
- print(f'SYNC: new file {path}, saving with YT ID "{yt_id}".')
- file.save(conn)
- self._files = VideoFile.get_all(conn)
- for file in self._files:
- file.ensure_absence_if_deleted()
- chdir(old_cwd)
- conn.commit_close()
-
- @staticmethod
- def _id_from_filename(path: PathStr,
- double_split: bool = False
- ) -> YoutubeId:
- before_ext = splitext(path)[0]
- if double_split:
- before_ext = splitext(before_ext)[0]
- return YoutubeId(before_ext.split('[')[-1].split(']')[0])
-
- @property
- def missing(self) -> list[PathStr]:
- """Return relative paths of files known but not in PATH_DOWNLOADS."""
- self._sync_db()
- return [f.rel_path for f in self._files if f.missing]
-
- @property
- def ids_to_paths(self) -> DownloadsIndex:
- """Return mapping YoutubeIds:paths of files downloaded to them."""
- self._sync_db()
- return {f.yt_id: f.full_path for f in self._files}
-
- @property
- def ids_unfinished(self) -> set[YoutubeId]:
- """Return set of IDs of videos awaiting or currently in download."""
- in_temp_dir = []
- for path in [PathStr(e.path) for e
- in scandir(PATH_TEMP) if isfile(e.path)]:
- in_temp_dir += [self._id_from_filename(path)]
- return set(self._to_download + in_temp_dir)
-
- def clean_unfinished(self) -> None:
- """Empty temp directory of unfinished downloads."""
- for e in [e for e in scandir(PATH_TEMP) if isfile(e.path)]:
- print(f'removing unfinished download: {e.path}')
- os_remove(e.path)
-
- def queue_download(self, video_id: YoutubeId) -> None:
- """Add video_id to download queue *if* not already processed."""
- pre_existing = self.ids_unfinished | set(self._to_download
- + list(self.ids_to_paths))
- if video_id not in pre_existing:
- self._to_download += [video_id]
-
- def _download_next(self) -> None:
- if self._to_download:
- video_id = self._to_download.pop(0)
- with YoutubeDL(YT_DL_PARAMS) as ydl:
- ydl.download([f'{YOUTUBE_URL_PREFIX}{video_id}'])
- self._sync_db()
-
- def download_loop(self) -> None:
- """Keep iterating through download queue for new download tasks."""
- while True:
- sleep(0.5)
- self._download_next()
-
-
-class Server(HTTPServer):
- """Extension of HTTPServer providing for Player and DownloadsDb."""
-
- def __init__(self, downloads_db: DownloadsDb, *args, **kwargs) -> None:
- super().__init__(*args, **kwargs)
- self.jinja = JinjaEnv(loader=JinjaFSLoader(PATH_TEMPLATES))
- self.player = Player()
- self.downloads = downloads_db
-
-
-class TaskHandler(BaseHTTPRequestHandler):
- """Handler for GET and POST requests to our server."""
- server: Server
-
- def _send_http(self,
- content: bytes = b'',
- headers: Optional[list[tuple[str, str]]] = None,
- code: int = 200
- ) -> None:
- headers = headers if headers else []
- self.send_response(code)
- for header_tuple in headers:
- self.send_header(header_tuple[0], header_tuple[1])
- self.end_headers()
- if content:
- self.wfile.write(content)
-
- def do_POST(self) -> None: # pylint:disable=invalid-name
- """Map POST requests to handlers for various paths."""
- url = urlparse(self.path)
- toks_url: list[str] = url.path.split('/')
- page_name = toks_url[1]
- body_length = int(self.headers['content-length'])
- postvars = parse_qs(self.rfile.read(body_length).decode())
- if 'playlist' == page_name:
- self._post_player_command(list(postvars.keys())[0])
- elif 'video' == page_name:
- self._post_video_flag(YoutubeId(toks_url[2]),
- [FlagName(k) for k in postvars])
- elif 'queries' == page_name:
- self._post_query(QueryText(postvars['query'][0]))
-
- def _post_player_command(self, command: str) -> None:
- if 'pause' == command:
- self.server.player.toggle_pause()
- elif 'prev' == command:
- self.server.player.prev()
- elif 'next' == command:
- self.server.player.next()
- elif 'stop' == command:
- self.server.player.toggle_run()
- elif 'reload' == command:
- self.server.player.reload()
- sleep(0.5) # avoid redir happening before current_file update
- self._send_http(headers=[('Location', '/')], code=302)
-
- def _post_video_flag(self,
- yt_id: YoutubeId,
- flag_names: list[FlagName]
- ) -> None:
- conn = DatabaseConnection()
- file = VideoFile.get_by_yt_id(conn, yt_id)
- file.flags = 0
- for flag_name in flag_names:
- file.flags |= VIDEO_FLAGS[flag_name]
- file.save(conn)
- conn.commit_close()
- file.ensure_absence_if_deleted()
- self._send_http(headers=[('Location', f'/video/{yt_id}')], code=302)
-
- def _post_query(self, query_txt: QueryText) -> None:
- conn = DatabaseConnection()
-
- def collect_results(query_txt: QueryText) -> list[YoutubeVideo]:
- _ensure_expected_dirs([PATH_THUMBNAILS])
- youtube = googleapiclient.discovery.build('youtube', 'v3',
- developerKey=API_KEY)
- QuotaLog.update(conn, QUOTA_COST_YOUTUBE_SEARCH)
- search_request = youtube.search().list(
- q=query_txt,
- part='snippet',
- maxResults=25,
- safeSearch='none',
- type='video')
- results: list[YoutubeVideo] = []
- ids_to_detail: list[YoutubeId] = []
- for item in search_request.execute()['items']:
- video_id: YoutubeId = item['id']['videoId']
- ids_to_detail += [video_id]
- snippet = item['snippet']
- urlretrieve(snippet['thumbnails']['default']['url'],
- path_join(PATH_THUMBNAILS, f'{video_id}.jpg'))
- results += [YoutubeVideo(id_=video_id,
- title=snippet['title'],
- description=snippet['description'],
- published_at=snippet['publishedAt'])]
- QuotaLog.update(conn, QUOTA_COST_YOUTUBE_DETAILS)
- ids_for_details = ','.join([r.id_ for r in results])
- videos_request = youtube.videos().list(id=ids_for_details,
- part='content_details')
- unfinished_streams: list[YoutubeId] = []
- for i, detailed in enumerate(videos_request.execute()['items']):
- result = results[i]
- assert result.id_ == detailed['id']
- content_details: dict[str, str] = detailed['contentDetails']
- if 'P0D' == content_details['duration']:
- unfinished_streams += [result.id_]
- continue
- result.set_duration_from_yt_string(content_details['duration'])
- result.definition = content_details['definition'].upper()
- return [r for r in results if r.id_ not in unfinished_streams]
-
- query_data = YoutubeQuery(
- None, query_txt,
- DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)))
- query_data.save(conn)
- for result in collect_results(query_txt):
- result.save(conn)
- result.save_to_query(conn, query_data.id_)
- conn.commit_close()
- self._send_http(headers=[('Location', f'/query/{query_data.id_}')],
- code=302)
-
- def do_GET(self) -> None: # pylint:disable=invalid-name
- """Map GET requests to handlers for various paths."""
- url = urlparse(self.path)
- toks_url: list[str] = url.path.split('/')
- page_name = toks_url[1]
- try:
- if 'thumbnails' == page_name:
- self._send_thumbnail(PathStr(toks_url[2]))
- elif 'dl' == page_name:
- self._send_or_download_video(YoutubeId(toks_url[2]))
- elif 'videos' == page_name:
- self._send_videos_index()
- elif 'video' == page_name:
- self._send_video_data(YoutubeId(toks_url[2]))
- elif 'yt_video' == page_name:
- self._send_yt_video_data(YoutubeId(toks_url[2]))
- elif 'missing.json' == page_name:
- self._send_missing_json()
- elif 'query' == page_name:
- self._send_query_page(QueryId(toks_url[2]))
- elif 'queries' == page_name:
- self._send_queries_index_and_search()
- elif '_last_playlist_update.json' == page_name:
- self._send_last_playlist_update()
- else: # e.g. for /
- self._send_playlist()
- except NotFoundException as e:
- self._send_http(bytes(str(e), 'utf8'), code=404)
-
- def _send_rendered_template(self,
- tmpl_name: PathStr,
- tmpl_ctx: TemplateContext
- ) -> None:
- tmpl = self.server.jinja.get_template(tmpl_name)
- html = tmpl.render(**tmpl_ctx)
- self._send_http(bytes(html, 'utf8'))
-
- def _send_thumbnail(self, filename: PathStr) -> None:
- _ensure_expected_dirs([PATH_THUMBNAILS])
- path_thumbnail = path_join(PATH_THUMBNAILS, filename)
- if not path_exists(path_thumbnail):
- video_id = splitext(filename)[0]
- url = f'{THUMBNAIL_URL_PREFIX}{video_id}{THUMBNAIL_URL_SUFFIX}'
- try:
- urlretrieve(url, path_join(PATH_THUMBNAILS, f'{video_id}.jpg'))
- except HTTPError as e:
- if 404 == e.code:
- raise NotFoundException from e
- raise e
- with open(path_thumbnail, 'rb') as f:
- img = f.read()
- self._send_http(img, [('Content-type', 'image/jpg')])
-
- def _send_or_download_video(self, video_id: YoutubeId) -> None:
- if video_id in self.server.downloads.ids_to_paths:
- with open(self.server.downloads.ids_to_paths[video_id],
- 'rb') as video_file:
- video = video_file.read()
- self._send_http(content=video)
- return
- self.server.downloads.queue_download(video_id)
- self._send_http(headers=[('Location', f'/yt_video/{video_id}')],
- code=302)
-
- def _send_query_page(self, query_id: QueryId) -> None:
- conn = DatabaseConnection()
- query = YoutubeQuery.get_one(conn, str(query_id))
- results = YoutubeVideo.get_all_for_query(conn, query_id)
- conn.commit_close()
- self._send_rendered_template(
- NAME_TEMPLATE_RESULTS,
- {'query': query.text, 'videos': results})
-
- def _send_queries_index_and_search(self) -> None:
- conn = DatabaseConnection()
- quota_count = QuotaLog.current(conn)
- queries_data = YoutubeQuery.get_all(conn)
- conn.commit_close()
- queries_data.sort(key=lambda q: q.retrieved_at, reverse=True)
- self._send_rendered_template(
- NAME_TEMPLATE_QUERIES, {'queries': queries_data,
- 'quota_count': quota_count})
-
- def _send_yt_video_data(self, video_id: YoutubeId) -> None:
- conn = DatabaseConnection()
- linked_queries = YoutubeQuery.get_all_for_video(conn, video_id)
- try:
- video_data = YoutubeVideo.get_one(conn, video_id)
- except NotFoundException:
- video_data = YoutubeVideo(video_id)
- conn.commit_close()
- self._send_rendered_template(
- NAME_TEMPLATE_YT_VIDEO,
- {'video_data': video_data,
- 'is_temp': video_id in self.server.downloads.ids_unfinished,
- 'file_path': self.server.downloads.ids_to_paths.get(video_id,
- None),
- 'youtube_prefix': YOUTUBE_URL_PREFIX,
- 'queries': linked_queries})
-
- def _send_video_data(self, yt_id: YoutubeId) -> None:
- conn = DatabaseConnection()
- file = VideoFile.get_by_yt_id(conn, yt_id)
- conn.commit_close()
- self._send_rendered_template(
- NAME_TEMPLATE_VIDEO,
- {'file': file, 'flag_names': list(VIDEO_FLAGS)})
-
- def _send_videos_index(self) -> None:
- videos = [(id_, PathStr(basename(path)))
- for id_, path in self.server.downloads.ids_to_paths.items()]
- videos.sort(key=lambda t: t[1])
- self._send_rendered_template(NAME_TEMPLATE_VIDEOS, {'videos': videos})
-
- def _send_missing_json(self) -> None:
- self._send_http(
- bytes(json_dumps(self.server.downloads.missing), 'utf8'),
- headers=[('Content-type', 'application/json')])
-
- def _send_last_playlist_update(self) -> None:
- payload: dict[str, PlayerUpdateId] = {
- 'last_update': self.server.player.last_update}
- self._send_http(bytes(json_dumps(payload), 'utf8'),
- headers=[('Content-type', 'application/json')])
-
- def _send_playlist(self) -> None:
- tuples: list[tuple[PathStr, PathStr]] = []
- i: int = 0
- while True:
- prev, next_ = PathStr(''), PathStr('')
- if len(self.server.player.prev_files) > i:
- prev = PathStr(basename(self.server.player.prev_files[i]))
- if len(self.server.player.next_files) > i:
- next_ = PathStr(basename(self.server.player.next_files[i]))
- if not prev + next_:
- break
- tuples += [(prev, next_)]
- i += 1
- self._send_rendered_template(
- NAME_TEMPLATE_PLAYLIST,
- {'last_update': self.server.player.last_update,
- 'running': self.server.player.is_running,
- 'paused': self.server.player.is_paused,
- 'current_title': self.server.player.current_filename,
- 'tuples': tuples})