From: Christian Heller Date: Wed, 20 Nov 2024 13:37:08 +0000 (+0100) Subject: Clean up code, path references. X-Git-Url: https://plomlompom.com/repos/%7B%7B%20web_path%20%7D%7D/%7B%7Bdb.prefix%7D%7D/%7B%7Bprefix%7D%7D/blog?a=commitdiff_plain;h=76e139e4b26a2ed948d1790254a433cb8c8335fa;p=ytplom Clean up code, path references. --- diff --git a/ytplom.py b/ytplom.py index b3cd23d..2ca5891 100755 --- a/ytplom.py +++ b/ytplom.py @@ -1,28 +1,33 @@ #!/usr/bin/env python3 """Minimalistic download-focused YouTube interface.""" + +# included libs from typing import TypeAlias, Optional, NewType, Callable, Self, Any from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove from os.path import (isdir, isfile, exists as path_exists, join as path_join, splitext, basename) from random import shuffle from time import time, sleep +from datetime import datetime, timedelta from json import dumps as json_dumps from uuid import uuid4 -from datetime import datetime, timedelta from threading import Thread +from sqlite3 import connect as sql_connect, Cursor, Row from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs from urllib.request import urlretrieve from urllib.error import HTTPError -from sqlite3 import connect as sql_connect, Cursor, Row +# non-included libs from jinja2 import Template from mpv import MPV # type: ignore from yt_dlp import YoutubeDL # type: ignore import googleapiclient.discovery # type: ignore +# what we might want to manually define per environs API_KEY = environ.get('GOOGLE_API_KEY') -HTTP_PORT = 8084 +HTTP_PORT = int(environ.get('YTPLOM_PORT', 8084)) +# type definitions for mypy DatetimeStr = NewType('DatetimeStr', str) QuotaCost = NewType('QuotaCost', int) YoutubeId = NewType('YoutubeId', str) @@ -37,43 +42,46 @@ DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr] TemplateContext: TypeAlias = dict[ str, None | bool | PlayerUpdateId | Optional[PathStr] | YoutubeId | QueryText | QuotaCost | 'YoutubeVideo' | list['YoutubeVideo'] - | list['QueryData'] | list[tuple[YoutubeId, PathStr]] + | list['YoutubeQuery'] | list[tuple[YoutubeId, PathStr]] | list[tuple[PathStr, PathStr]]] +# local data reasonably expected to be in user home directory +PATH_HOME = PathStr(environ.get('HOME', '')) +PATH_WORKDIR = PathStr(path_join(PATH_HOME, 'ytplom')) +PATH_THUMBNAILS = PathStr(path_join(PATH_WORKDIR, 'thumbnails')) +PATH_DB = PathStr(path_join(PATH_WORKDIR, 'db.sql')) +PATH_DOWNLOADS = PathStr(path_join(PATH_WORKDIR, 'downloads')) +PATH_TEMP = PathStr(path_join(PATH_WORKDIR, 'temp')) -class NotFoundException(BaseException): - """Call on DB fetches finding less than expected.""" - - -PATH_DIR_DOWNLOADS = PathStr('downloads') -PATH_DIR_THUMBNAILS = PathStr('thumbnails') -PATH_DIR_TEMPLATES = PathStr('templates') -PATH_DB = PathStr('db.sql') -NAME_DIR_TEMP = PathStr('temp') +# template paths; might move outside PATH_WORKDIR in the future +PATH_TEMPLATES = PathStr(path_join(PATH_WORKDIR, 'templates')) NAME_TEMPLATE_QUERIES = PathStr('queries.tmpl') NAME_TEMPLATE_RESULTS = PathStr('results.tmpl') NAME_TEMPLATE_VIDEOS = PathStr('videos.tmpl') NAME_TEMPLATE_VIDEO_ABOUT = PathStr('video_about.tmpl') NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl') - -PATH_DIR_TEMP = PathStr(path_join(PATH_DIR_DOWNLOADS, NAME_DIR_TEMP)) -PATH_TEMPLATE_QUERIES = PathStr(path_join(PATH_DIR_TEMPLATES, +PATH_TEMPLATE_QUERIES = PathStr(path_join(PATH_TEMPLATES, NAME_TEMPLATE_QUERIES)) -TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f' -YOUTUBE_URL_PREFIX = PathStr('https://www.youtube.com/watch?v=') + +# yt_dlp config YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\ '/best[height<=1080][width<=1920]' -YT_DL_PARAMS = {'paths': {'home': PATH_DIR_DOWNLOADS, - 'temp': NAME_DIR_TEMP}, +YT_DL_PARAMS = {'paths': {'home': PATH_DOWNLOADS, + 'temp': PATH_TEMP}, 'format': YT_DOWNLOAD_FORMAT} + +# Youtube API expectations +YOUTUBE_URL_PREFIX = PathStr('https://www.youtube.com/watch?v=') THUMBNAIL_URL_PREFIX = PathStr('https://i.ytimg.com/vi/') THUMBNAIL_URL_SUFFIX = PathStr('/default.jpg') - QUOTA_COST_YOUTUBE_SEARCH = QuotaCost(100) QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1) +# local expectations +TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f' LEGAL_EXTENSIONS = {'webm', 'mp4', 'mkv'} +# tables to create database with SCRIPT_INIT_DB = ''' CREATE TABLE yt_queries ( id INTEGER PRIMARY KEY, @@ -108,6 +116,10 @@ CREATE TABLE files ( ''' +class NotFoundException(BaseException): + """Call on DB fetches finding less than expected.""" + + def _ensure_expected_dirs(expected_dirs: list[PathStr]) -> None: """Ensure existance of expected_dirs _as_ directories.""" for dir_name in expected_dirs: @@ -174,7 +186,7 @@ class DbData: return conn.exec(sql, tuple(vals)) -class QueryData(DbData): +class YoutubeQuery(DbData): """Representation of YouTube query (without results).""" _table_name = 'yt_queries' _cols = ('id_', 'text', 'retrieved_at') @@ -201,7 +213,7 @@ class QueryData(DbData): conn: DatabaseConnection, video_id: YoutubeId ) -> list[Self]: - """Return all QueryData that got YoutubeVideo of video_id as result.""" + """Return YoutubeQueries containing YoutubeVideo's ID in results.""" sql = SqlText('SELECT query_id FROM ' 'yt_query_results WHERE video_id = ?') query_ids = conn.exec(sql, (video_id,)).fetchall() @@ -331,7 +343,7 @@ class Player: def __init__(self) -> None: self.last_update = PlayerUpdateId('') - self._filenames = [PathStr(e.path) for e in scandir(PATH_DIR_DOWNLOADS) + self._filenames = [PathStr(e.path) for e in scandir(PATH_DOWNLOADS) if isfile(e.path) and splitext(e.path)[1][1:] in LEGAL_EXTENSIONS] shuffle(self._filenames) @@ -445,14 +457,14 @@ class DownloadsDb: def __init__(self) -> None: self._to_download: list[YoutubeId] = [] - _ensure_expected_dirs([PATH_DIR_DOWNLOADS, PATH_DIR_TEMP]) + _ensure_expected_dirs([PATH_DOWNLOADS, PATH_TEMP]) self._sync_db() def _sync_db(self): conn = DatabaseConnection() files_via_db = VideoFile.get_all(conn) old_cwd = getcwd() - chdir(PATH_DIR_DOWNLOADS) + chdir(PATH_DOWNLOADS) for file in files_via_db: if not isfile(path_join(file.rel_path)): print(f'SYNC: no file {file.rel_path} found, removing entry.') @@ -481,7 +493,7 @@ class DownloadsDb: def ids_to_paths(self) -> DownloadsIndex: """Return mapping YoutubeIds:paths of files downloaded to them.""" self._sync_db() - return {f.yt_id: PathStr(path_join(PATH_DIR_DOWNLOADS, f.rel_path)) + return {f.yt_id: PathStr(path_join(PATH_DOWNLOADS, f.rel_path)) for f in self._files} @property @@ -489,13 +501,13 @@ class DownloadsDb: """Return set of IDs of videos awaiting or currently in download.""" in_temp_dir = [] for path in [PathStr(e.path) for e - in scandir(PATH_DIR_TEMP) if isfile(e.path)]: + in scandir(PATH_TEMP) if isfile(e.path)]: in_temp_dir += [self._id_from_filename(path)] return set(self._to_download + in_temp_dir) def clean_unfinished(self) -> None: """Empty temp directory of unfinished downloads.""" - for e in [e for e in scandir(PATH_DIR_TEMP) if isfile(e.path)]: + for e in [e for e in scandir(PATH_TEMP) if isfile(e.path)]: print(f'removing unfinished download: {e.path}') os_remove(e.path) @@ -593,7 +605,7 @@ class TaskHandler(BaseHTTPRequestHandler): ids_to_detail += [video_id] snippet = item['snippet'] urlretrieve(snippet['thumbnails']['default']['url'], - path_join(PATH_DIR_THUMBNAILS, f'{video_id}.jpg')) + path_join(PATH_THUMBNAILS, f'{video_id}.jpg')) results += [YoutubeVideo(id_=video_id, title=snippet['title'], description=snippet['description'], @@ -610,7 +622,7 @@ class TaskHandler(BaseHTTPRequestHandler): result.definition = content_details['definition'].upper() return results - query_data = QueryData( + query_data = YoutubeQuery( None, query_txt, DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT))) query_data.save(conn) @@ -651,7 +663,7 @@ class TaskHandler(BaseHTTPRequestHandler): tmpl_name: PathStr, tmpl_ctx: TemplateContext ) -> None: - with open(path_join(PATH_DIR_TEMPLATES, tmpl_name), + with open(path_join(PATH_TEMPLATES, tmpl_name), 'r', encoding='utf8' ) as templ_file: tmpl = Template(str(templ_file.read())) @@ -659,14 +671,13 @@ class TaskHandler(BaseHTTPRequestHandler): self._send_http(bytes(html, 'utf8')) def _send_thumbnail(self, filename: PathStr) -> None: - _ensure_expected_dirs([PATH_DIR_THUMBNAILS]) - path_thumbnail = path_join(PATH_DIR_THUMBNAILS, filename) + _ensure_expected_dirs([PATH_THUMBNAILS]) + path_thumbnail = path_join(PATH_THUMBNAILS, filename) if not path_exists(path_thumbnail): video_id = splitext(filename)[0] url = f'{THUMBNAIL_URL_PREFIX}{video_id}{THUMBNAIL_URL_SUFFIX}' try: - urlretrieve(url, - path_join(PATH_DIR_THUMBNAILS, f'{video_id}.jpg')) + urlretrieve(url, path_join(PATH_THUMBNAILS, f'{video_id}.jpg')) except HTTPError as e: if 404 == e.code: raise NotFoundException from e @@ -688,7 +699,7 @@ class TaskHandler(BaseHTTPRequestHandler): def _send_query_page(self, query_id: QueryId) -> None: conn = DatabaseConnection() - query = QueryData.get_one(conn, str(query_id)) + query = YoutubeQuery.get_one(conn, str(query_id)) results = YoutubeVideo.get_all_for_query(conn, query_id) conn.commit_close() self._send_rendered_template( @@ -698,7 +709,7 @@ class TaskHandler(BaseHTTPRequestHandler): def _send_queries_index_and_search(self) -> None: conn = DatabaseConnection() quota_count = QuotaLog.current(conn) - queries_data = QueryData.get_all(conn) + queries_data = YoutubeQuery.get_all(conn) conn.commit_close() queries_data.sort(key=lambda q: q.retrieved_at, reverse=True) self._send_rendered_template( @@ -707,7 +718,7 @@ class TaskHandler(BaseHTTPRequestHandler): def _send_video_about(self, video_id: YoutubeId) -> None: conn = DatabaseConnection() - linked_queries = QueryData.get_all_for_video(conn, video_id) + linked_queries = YoutubeQuery.get_all_for_video(conn, video_id) try: video_data = YoutubeVideo.get_one(conn, video_id) except NotFoundException: