home · contact · privacy
Clean up code, path references. master
authorChristian Heller <c.heller@plomlompom.de>
Wed, 20 Nov 2024 13:37:08 +0000 (14:37 +0100)
committerChristian Heller <c.heller@plomlompom.de>
Wed, 20 Nov 2024 13:37:08 +0000 (14:37 +0100)
ytplom.py

index b3cd23d1bb4685edabc302a3982ab2c2a365a1e6..2ca5891ce7cd56bf4150c0450869dcbbd2f0f228 100755 (executable)
--- a/ytplom.py
+++ b/ytplom.py
@@ -1,28 +1,33 @@
 #!/usr/bin/env python3
 """Minimalistic download-focused YouTube interface."""
 #!/usr/bin/env python3
 """Minimalistic download-focused YouTube interface."""
+
+# included libs
 from typing import TypeAlias, Optional, NewType, Callable, Self, Any
 from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove
 from os.path import (isdir, isfile, exists as path_exists, join as path_join,
                      splitext, basename)
 from random import shuffle
 from time import time, sleep
 from typing import TypeAlias, Optional, NewType, Callable, Self, Any
 from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove
 from os.path import (isdir, isfile, exists as path_exists, join as path_join,
                      splitext, basename)
 from random import shuffle
 from time import time, sleep
+from datetime import datetime, timedelta
 from json import dumps as json_dumps
 from uuid import uuid4
 from json import dumps as json_dumps
 from uuid import uuid4
-from datetime import datetime, timedelta
 from threading import Thread
 from threading import Thread
+from sqlite3 import connect as sql_connect, Cursor, Row
 from http.server import HTTPServer, BaseHTTPRequestHandler
 from urllib.parse import urlparse, parse_qs
 from urllib.request import urlretrieve
 from urllib.error import HTTPError
 from http.server import HTTPServer, BaseHTTPRequestHandler
 from urllib.parse import urlparse, parse_qs
 from urllib.request import urlretrieve
 from urllib.error import HTTPError
-from sqlite3 import connect as sql_connect, Cursor, Row
+# non-included libs
 from jinja2 import Template
 from mpv import MPV  # type: ignore
 from yt_dlp import YoutubeDL  # type: ignore
 import googleapiclient.discovery  # type: ignore
 
 from jinja2 import Template
 from mpv import MPV  # type: ignore
 from yt_dlp import YoutubeDL  # type: ignore
 import googleapiclient.discovery  # type: ignore
 
+# what we might want to manually define per environs
 API_KEY = environ.get('GOOGLE_API_KEY')
 API_KEY = environ.get('GOOGLE_API_KEY')
-HTTP_PORT = 8084
+HTTP_PORT = int(environ.get('YTPLOM_PORT', 8084))
 
 
+# type definitions for mypy
 DatetimeStr = NewType('DatetimeStr', str)
 QuotaCost = NewType('QuotaCost', int)
 YoutubeId = NewType('YoutubeId', str)
 DatetimeStr = NewType('DatetimeStr', str)
 QuotaCost = NewType('QuotaCost', int)
 YoutubeId = NewType('YoutubeId', str)
@@ -37,43 +42,46 @@ DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr]
 TemplateContext: TypeAlias = dict[
         str, None | bool | PlayerUpdateId | Optional[PathStr] | YoutubeId
         | QueryText | QuotaCost | 'YoutubeVideo' | list['YoutubeVideo']
 TemplateContext: TypeAlias = dict[
         str, None | bool | PlayerUpdateId | Optional[PathStr] | YoutubeId
         | QueryText | QuotaCost | 'YoutubeVideo' | list['YoutubeVideo']
-        | list['QueryData'] | list[tuple[YoutubeId, PathStr]]
+        | list['YoutubeQuery'] | list[tuple[YoutubeId, PathStr]]
         | list[tuple[PathStr, PathStr]]]
 
         | list[tuple[PathStr, PathStr]]]
 
+# local data reasonably expected to be in user home directory
+PATH_HOME = PathStr(environ.get('HOME', ''))
+PATH_WORKDIR = PathStr(path_join(PATH_HOME, 'ytplom'))
+PATH_THUMBNAILS = PathStr(path_join(PATH_WORKDIR, 'thumbnails'))
+PATH_DB = PathStr(path_join(PATH_WORKDIR, 'db.sql'))
+PATH_DOWNLOADS = PathStr(path_join(PATH_WORKDIR, 'downloads'))
+PATH_TEMP = PathStr(path_join(PATH_WORKDIR, 'temp'))
 
 
-class NotFoundException(BaseException):
-    """Call on DB fetches finding less than expected."""
-
-
-PATH_DIR_DOWNLOADS = PathStr('downloads')
-PATH_DIR_THUMBNAILS = PathStr('thumbnails')
-PATH_DIR_TEMPLATES = PathStr('templates')
-PATH_DB = PathStr('db.sql')
-NAME_DIR_TEMP = PathStr('temp')
+# template paths; might move outside PATH_WORKDIR in the future
+PATH_TEMPLATES = PathStr(path_join(PATH_WORKDIR, 'templates'))
 NAME_TEMPLATE_QUERIES = PathStr('queries.tmpl')
 NAME_TEMPLATE_RESULTS = PathStr('results.tmpl')
 NAME_TEMPLATE_VIDEOS = PathStr('videos.tmpl')
 NAME_TEMPLATE_VIDEO_ABOUT = PathStr('video_about.tmpl')
 NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl')
 NAME_TEMPLATE_QUERIES = PathStr('queries.tmpl')
 NAME_TEMPLATE_RESULTS = PathStr('results.tmpl')
 NAME_TEMPLATE_VIDEOS = PathStr('videos.tmpl')
 NAME_TEMPLATE_VIDEO_ABOUT = PathStr('video_about.tmpl')
 NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl')
-
-PATH_DIR_TEMP = PathStr(path_join(PATH_DIR_DOWNLOADS, NAME_DIR_TEMP))
-PATH_TEMPLATE_QUERIES = PathStr(path_join(PATH_DIR_TEMPLATES,
+PATH_TEMPLATE_QUERIES = PathStr(path_join(PATH_TEMPLATES,
                                           NAME_TEMPLATE_QUERIES))
                                           NAME_TEMPLATE_QUERIES))
-TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
-YOUTUBE_URL_PREFIX = PathStr('https://www.youtube.com/watch?v=')
+
+# yt_dlp config
 YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
         '/best[height<=1080][width<=1920]'
 YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
         '/best[height<=1080][width<=1920]'
-YT_DL_PARAMS = {'paths': {'home': PATH_DIR_DOWNLOADS,
-                          'temp': NAME_DIR_TEMP},
+YT_DL_PARAMS = {'paths': {'home': PATH_DOWNLOADS,
+                          'temp': PATH_TEMP},
                 'format': YT_DOWNLOAD_FORMAT}
                 'format': YT_DOWNLOAD_FORMAT}
+
+# Youtube API expectations
+YOUTUBE_URL_PREFIX = PathStr('https://www.youtube.com/watch?v=')
 THUMBNAIL_URL_PREFIX = PathStr('https://i.ytimg.com/vi/')
 THUMBNAIL_URL_SUFFIX = PathStr('/default.jpg')
 THUMBNAIL_URL_PREFIX = PathStr('https://i.ytimg.com/vi/')
 THUMBNAIL_URL_SUFFIX = PathStr('/default.jpg')
-
 QUOTA_COST_YOUTUBE_SEARCH = QuotaCost(100)
 QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1)
 
 QUOTA_COST_YOUTUBE_SEARCH = QuotaCost(100)
 QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1)
 
+# local expectations
+TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
 LEGAL_EXTENSIONS = {'webm', 'mp4', 'mkv'}
 
 LEGAL_EXTENSIONS = {'webm', 'mp4', 'mkv'}
 
+# tables to create database with
 SCRIPT_INIT_DB = '''
 CREATE TABLE yt_queries (
   id INTEGER PRIMARY KEY,
 SCRIPT_INIT_DB = '''
 CREATE TABLE yt_queries (
   id INTEGER PRIMARY KEY,
@@ -108,6 +116,10 @@ CREATE TABLE files (
 '''
 
 
 '''
 
 
+class NotFoundException(BaseException):
+    """Call on DB fetches finding less than expected."""
+
+
 def _ensure_expected_dirs(expected_dirs: list[PathStr]) -> None:
     """Ensure existance of expected_dirs _as_ directories."""
     for dir_name in expected_dirs:
 def _ensure_expected_dirs(expected_dirs: list[PathStr]) -> None:
     """Ensure existance of expected_dirs _as_ directories."""
     for dir_name in expected_dirs:
@@ -174,7 +186,7 @@ class DbData:
         return conn.exec(sql, tuple(vals))
 
 
         return conn.exec(sql, tuple(vals))
 
 
-class QueryData(DbData):
+class YoutubeQuery(DbData):
     """Representation of YouTube query (without results)."""
     _table_name = 'yt_queries'
     _cols = ('id_', 'text', 'retrieved_at')
     """Representation of YouTube query (without results)."""
     _table_name = 'yt_queries'
     _cols = ('id_', 'text', 'retrieved_at')
@@ -201,7 +213,7 @@ class QueryData(DbData):
                           conn: DatabaseConnection,
                           video_id: YoutubeId
                           ) -> list[Self]:
                           conn: DatabaseConnection,
                           video_id: YoutubeId
                           ) -> list[Self]:
-        """Return all QueryData that got YoutubeVideo of video_id as result."""
+        """Return YoutubeQueries containing YoutubeVideo's ID in results."""
         sql = SqlText('SELECT query_id FROM '
                       'yt_query_results WHERE video_id = ?')
         query_ids = conn.exec(sql, (video_id,)).fetchall()
         sql = SqlText('SELECT query_id FROM '
                       'yt_query_results WHERE video_id = ?')
         query_ids = conn.exec(sql, (video_id,)).fetchall()
@@ -331,7 +343,7 @@ class Player:
 
     def __init__(self) -> None:
         self.last_update = PlayerUpdateId('')
 
     def __init__(self) -> None:
         self.last_update = PlayerUpdateId('')
-        self._filenames = [PathStr(e.path) for e in scandir(PATH_DIR_DOWNLOADS)
+        self._filenames = [PathStr(e.path) for e in scandir(PATH_DOWNLOADS)
                            if isfile(e.path)
                            and splitext(e.path)[1][1:] in LEGAL_EXTENSIONS]
         shuffle(self._filenames)
                            if isfile(e.path)
                            and splitext(e.path)[1][1:] in LEGAL_EXTENSIONS]
         shuffle(self._filenames)
@@ -445,14 +457,14 @@ class DownloadsDb:
 
     def __init__(self) -> None:
         self._to_download: list[YoutubeId] = []
 
     def __init__(self) -> None:
         self._to_download: list[YoutubeId] = []
-        _ensure_expected_dirs([PATH_DIR_DOWNLOADS, PATH_DIR_TEMP])
+        _ensure_expected_dirs([PATH_DOWNLOADS, PATH_TEMP])
         self._sync_db()
 
     def _sync_db(self):
         conn = DatabaseConnection()
         files_via_db = VideoFile.get_all(conn)
         old_cwd = getcwd()
         self._sync_db()
 
     def _sync_db(self):
         conn = DatabaseConnection()
         files_via_db = VideoFile.get_all(conn)
         old_cwd = getcwd()
-        chdir(PATH_DIR_DOWNLOADS)
+        chdir(PATH_DOWNLOADS)
         for file in files_via_db:
             if not isfile(path_join(file.rel_path)):
                 print(f'SYNC: no file {file.rel_path} found, removing entry.')
         for file in files_via_db:
             if not isfile(path_join(file.rel_path)):
                 print(f'SYNC: no file {file.rel_path} found, removing entry.')
@@ -481,7 +493,7 @@ class DownloadsDb:
     def ids_to_paths(self) -> DownloadsIndex:
         """Return mapping YoutubeIds:paths of files downloaded to them."""
         self._sync_db()
     def ids_to_paths(self) -> DownloadsIndex:
         """Return mapping YoutubeIds:paths of files downloaded to them."""
         self._sync_db()
-        return {f.yt_id: PathStr(path_join(PATH_DIR_DOWNLOADS, f.rel_path))
+        return {f.yt_id: PathStr(path_join(PATH_DOWNLOADS, f.rel_path))
                 for f in self._files}
 
     @property
                 for f in self._files}
 
     @property
@@ -489,13 +501,13 @@ class DownloadsDb:
         """Return set of IDs of videos awaiting or currently in download."""
         in_temp_dir = []
         for path in [PathStr(e.path) for e
         """Return set of IDs of videos awaiting or currently in download."""
         in_temp_dir = []
         for path in [PathStr(e.path) for e
-                     in scandir(PATH_DIR_TEMP) if isfile(e.path)]:
+                     in scandir(PATH_TEMP) if isfile(e.path)]:
             in_temp_dir += [self._id_from_filename(path)]
         return set(self._to_download + in_temp_dir)
 
     def clean_unfinished(self) -> None:
         """Empty temp directory of unfinished downloads."""
             in_temp_dir += [self._id_from_filename(path)]
         return set(self._to_download + in_temp_dir)
 
     def clean_unfinished(self) -> None:
         """Empty temp directory of unfinished downloads."""
-        for e in [e for e in scandir(PATH_DIR_TEMP) if isfile(e.path)]:
+        for e in [e for e in scandir(PATH_TEMP) if isfile(e.path)]:
             print(f'removing unfinished download: {e.path}')
             os_remove(e.path)
 
             print(f'removing unfinished download: {e.path}')
             os_remove(e.path)
 
@@ -593,7 +605,7 @@ class TaskHandler(BaseHTTPRequestHandler):
                 ids_to_detail += [video_id]
                 snippet = item['snippet']
                 urlretrieve(snippet['thumbnails']['default']['url'],
                 ids_to_detail += [video_id]
                 snippet = item['snippet']
                 urlretrieve(snippet['thumbnails']['default']['url'],
-                            path_join(PATH_DIR_THUMBNAILS, f'{video_id}.jpg'))
+                            path_join(PATH_THUMBNAILS, f'{video_id}.jpg'))
                 results += [YoutubeVideo(id_=video_id,
                                          title=snippet['title'],
                                          description=snippet['description'],
                 results += [YoutubeVideo(id_=video_id,
                                          title=snippet['title'],
                                          description=snippet['description'],
@@ -610,7 +622,7 @@ class TaskHandler(BaseHTTPRequestHandler):
                 result.definition = content_details['definition'].upper()
             return results
 
                 result.definition = content_details['definition'].upper()
             return results
 
-        query_data = QueryData(
+        query_data = YoutubeQuery(
                 None, query_txt,
                 DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)))
         query_data.save(conn)
                 None, query_txt,
                 DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)))
         query_data.save(conn)
@@ -651,7 +663,7 @@ class TaskHandler(BaseHTTPRequestHandler):
                                 tmpl_name: PathStr,
                                 tmpl_ctx: TemplateContext
                                 ) -> None:
                                 tmpl_name: PathStr,
                                 tmpl_ctx: TemplateContext
                                 ) -> None:
-        with open(path_join(PATH_DIR_TEMPLATES, tmpl_name),
+        with open(path_join(PATH_TEMPLATES, tmpl_name),
                   'r', encoding='utf8'
                   ) as templ_file:
             tmpl = Template(str(templ_file.read()))
                   'r', encoding='utf8'
                   ) as templ_file:
             tmpl = Template(str(templ_file.read()))
@@ -659,14 +671,13 @@ class TaskHandler(BaseHTTPRequestHandler):
         self._send_http(bytes(html, 'utf8'))
 
     def _send_thumbnail(self, filename: PathStr) -> None:
         self._send_http(bytes(html, 'utf8'))
 
     def _send_thumbnail(self, filename: PathStr) -> None:
-        _ensure_expected_dirs([PATH_DIR_THUMBNAILS])
-        path_thumbnail = path_join(PATH_DIR_THUMBNAILS, filename)
+        _ensure_expected_dirs([PATH_THUMBNAILS])
+        path_thumbnail = path_join(PATH_THUMBNAILS, filename)
         if not path_exists(path_thumbnail):
             video_id = splitext(filename)[0]
             url = f'{THUMBNAIL_URL_PREFIX}{video_id}{THUMBNAIL_URL_SUFFIX}'
             try:
         if not path_exists(path_thumbnail):
             video_id = splitext(filename)[0]
             url = f'{THUMBNAIL_URL_PREFIX}{video_id}{THUMBNAIL_URL_SUFFIX}'
             try:
-                urlretrieve(url,
-                            path_join(PATH_DIR_THUMBNAILS, f'{video_id}.jpg'))
+                urlretrieve(url, path_join(PATH_THUMBNAILS, f'{video_id}.jpg'))
             except HTTPError as e:
                 if 404 == e.code:
                     raise NotFoundException from e
             except HTTPError as e:
                 if 404 == e.code:
                     raise NotFoundException from e
@@ -688,7 +699,7 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     def _send_query_page(self, query_id: QueryId) -> None:
         conn = DatabaseConnection()
 
     def _send_query_page(self, query_id: QueryId) -> None:
         conn = DatabaseConnection()
-        query = QueryData.get_one(conn, str(query_id))
+        query = YoutubeQuery.get_one(conn, str(query_id))
         results = YoutubeVideo.get_all_for_query(conn, query_id)
         conn.commit_close()
         self._send_rendered_template(
         results = YoutubeVideo.get_all_for_query(conn, query_id)
         conn.commit_close()
         self._send_rendered_template(
@@ -698,7 +709,7 @@ class TaskHandler(BaseHTTPRequestHandler):
     def _send_queries_index_and_search(self) -> None:
         conn = DatabaseConnection()
         quota_count = QuotaLog.current(conn)
     def _send_queries_index_and_search(self) -> None:
         conn = DatabaseConnection()
         quota_count = QuotaLog.current(conn)
-        queries_data = QueryData.get_all(conn)
+        queries_data = YoutubeQuery.get_all(conn)
         conn.commit_close()
         queries_data.sort(key=lambda q: q.retrieved_at, reverse=True)
         self._send_rendered_template(
         conn.commit_close()
         queries_data.sort(key=lambda q: q.retrieved_at, reverse=True)
         self._send_rendered_template(
@@ -707,7 +718,7 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     def _send_video_about(self, video_id: YoutubeId) -> None:
         conn = DatabaseConnection()
 
     def _send_video_about(self, video_id: YoutubeId) -> None:
         conn = DatabaseConnection()
-        linked_queries = QueryData.get_all_for_video(conn, video_id)
+        linked_queries = YoutubeQuery.get_all_for_video(conn, video_id)
         try:
             video_data = YoutubeVideo.get_one(conn, video_id)
         except NotFoundException:
         try:
             video_data = YoutubeVideo.get_one(conn, video_id)
         except NotFoundException: