# included libs
from typing import Any, NewType, Optional, Self, TypeAlias
-from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove
-from os.path import (basename, dirname, isdir, isfile, exists as path_exists,
- join as path_join, splitext)
+from os import chdir, environ
from base64 import urlsafe_b64encode, urlsafe_b64decode
from random import shuffle
from time import time, sleep
from datetime import datetime, timedelta
-from json import dumps as json_dumps, load as json_load
+from json import dumps as json_dumps, loads as json_loads
from uuid import uuid4
+from pathlib import Path
from sqlite3 import connect as sql_connect, Cursor, Row
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
PlayerUpdateId = NewType('PlayerUpdateId', str)
B64Str = NewType('B64Str', str)
ParamsStr = NewType('ParamsStr', str)
-PageNames: TypeAlias = dict[str, PathStr]
-DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr]
+UrlStr = NewType('UrlStr', str)
+PageNames: TypeAlias = dict[str, Path]
+DownloadsIndex: TypeAlias = dict[YoutubeId, Path]
FilesWithIndex: TypeAlias = list[tuple[int, 'VideoFile']]
TemplateContext: TypeAlias = dict[
str,
None | bool
- | FilesWithIndex | PageNames | ParamsStr | PathStr | PlayerUpdateId
- | QueryText | QuotaCost | 'VideoFile' | YoutubeId | 'YoutubeVideo'
- | list[FlagName] | list['VideoFile'] | list['YoutubeVideo']
- | list['YoutubeQuery']
+ | FilesWithIndex | PageNames | ParamsStr | Path | PlayerUpdateId
+ | QueryText | QuotaCost | UrlStr | 'VideoFile' | YoutubeId
+ | 'YoutubeVideo' | list[FlagName] | list['VideoFile']
+ | list['YoutubeVideo'] | list['YoutubeQuery']
]
# major expected directories
-PATH_HOME = PathStr(environ.get('HOME', ''))
-PATH_APP_DATA = PathStr(path_join(PATH_HOME, '.local/share/ytplom'))
-PATH_CACHE = PathStr(path_join(PATH_HOME, '.cache/ytplom'))
+PATH_APP_DATA = Path.home().joinpath('.local/share/ytplom')
+PATH_CACHE = Path.home().joinpath('.cache/ytplom')
# paths for rather dynamic data
-PATH_DOWNLOADS = PathStr(path_join(PATH_HOME, 'ytplom_downloads'))
-PATH_DB = PathStr(path_join(PATH_APP_DATA, 'db.sql'))
-PATH_TEMP = PathStr(path_join(PATH_CACHE, 'temp'))
-PATH_THUMBNAILS = PathStr(path_join(PATH_CACHE, 'thumbnails'))
-PATH_CONFFILE = PathStr(path_join(PATH_HOME, '.config/ytplom/config.json'))
+PATH_DOWNLOADS = Path.home().joinpath('ytplom_downloads')
+PATH_DB = PATH_APP_DATA.joinpath('db.sql')
+PATH_TEMP = PATH_CACHE.joinpath('temp')
+PATH_THUMBNAILS = PATH_CACHE.joinpath('thumbnails')
+PATH_CONFFILE = Path.home().joinpath('.config/ytplom/config.json')
# template paths
-PATH_TEMPLATES = PathStr(path_join(PATH_APP_DATA, 'templates'))
-NAME_TEMPLATE_QUERIES = PathStr('yt_queries.tmpl')
-NAME_TEMPLATE_RESULTS = PathStr('yt_results.tmpl')
-NAME_TEMPLATE_FILES = PathStr('files.tmpl')
-NAME_TEMPLATE_FILE_DATA = PathStr('file_data.tmpl')
-NAME_TEMPLATE_YT_VIDEO = PathStr('yt_result.tmpl')
-NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl')
+PATH_TEMPLATES = PATH_APP_DATA.joinpath('templates')
+NAME_TEMPLATE_QUERIES = Path('yt_queries.tmpl')
+NAME_TEMPLATE_RESULTS = Path('yt_results.tmpl')
+NAME_TEMPLATE_FILES = Path('files.tmpl')
+NAME_TEMPLATE_FILE_DATA = Path('file_data.tmpl')
+NAME_TEMPLATE_YT_VIDEO = Path('yt_result.tmpl')
+NAME_TEMPLATE_PLAYLIST = Path('playlist.tmpl')
# page names
PAGE_NAMES = {
- 'download': PathStr('dl'),
- 'file': PathStr('file'),
- 'files': PathStr('files'),
- 'last_update': PathStr('last_playlist_update.json'),
- 'missing': PathStr('missing.json'),
- 'playlist': PathStr('playlist'),
- 'thumbnails': PathStr('thumbnails'),
- 'yt_result': PathStr('yt_result'),
- 'yt_query': PathStr('yt_query'),
- 'yt_queries': PathStr('yt_queries')
+ 'download': Path('dl'),
+ 'file': Path('file'),
+ 'files': Path('files'),
+ 'last_update': Path('last_playlist_update.json'),
+ 'missing': Path('missing.json'),
+ 'playlist': Path('playlist'),
+ 'thumbnails': Path('thumbnails'),
+ 'yt_result': Path('yt_result'),
+ 'yt_query': Path('yt_query'),
+ 'yt_queries': Path('yt_queries')
}
# yt_dlp config
YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
'/best[height<=1080][width<=1920]'
-YT_DL_PARAMS = {'paths': {'home': PATH_DOWNLOADS,
- 'temp': PATH_TEMP},
+YT_DL_PARAMS = {'paths': {'home': str(PATH_DOWNLOADS),
+ 'temp': str(PATH_TEMP)},
'format': YT_DOWNLOAD_FORMAT}
# Youtube API expectations
-YOUTUBE_URL_PREFIX = PathStr('https://www.youtube.com/watch?v=')
-THUMBNAIL_URL_PREFIX = PathStr('https://i.ytimg.com/vi/')
-THUMBNAIL_URL_SUFFIX = PathStr('/default.jpg')
+YOUTUBE_URL_PREFIX = UrlStr('https://www.youtube.com/watch?v=')
+THUMBNAIL_URL_PREFIX = UrlStr('https://i.ytimg.com/vi/')
+THUMBNAIL_URL_SUFFIX = UrlStr('/default.jpg')
QUOTA_COST_YOUTUBE_SEARCH = QuotaCost(100)
QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1)
# database stuff
EXPECTED_DB_VERSION = 1
SQL_DB_VERSION = SqlText('PRAGMA user_version')
-PATH_MIGRATIONS = PathStr(path_join(PATH_APP_DATA, 'migrations'))
-PATH_DB_SCHEMA = PathStr(path_join(PATH_MIGRATIONS,
- f'init_{EXPECTED_DB_VERSION}.sql'))
+PATH_MIGRATIONS = PATH_APP_DATA.joinpath('migrations')
+PATH_DB_SCHEMA = PATH_MIGRATIONS.joinpath(f'init_{EXPECTED_DB_VERSION}.sql')
# other
ENVIRON_PREFIX = 'YTPLOM_'
TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
LEGAL_EXTENSIONS = {'webm', 'mp4', 'mkv'}
-NAME_INSTALLER = PathStr('install.sh')
+NAME_INSTALLER = Path('install.sh')
FILE_FLAGS: dict[FlagName, FlagsInt] = {
FlagName('delete'): FlagsInt(1 << 62)
}
"""Raise in any other case where we know what's happening."""
-def _ensure_expected_dirs(expected_dirs: list[PathStr]) -> None:
+def _ensure_expected_dirs(expected_dirs: list[Path]) -> None:
"""Ensure existance of expected_dirs _as_ directories."""
- for dir_name in expected_dirs:
- if not isdir(dir_name):
- if path_exists(dir_name):
- raise HandledException(f'at expected directory path {dir_name}'
- 'found non-directory')
- print(f'creating expected directory: {dir_name}')
- makedirs(dir_name)
+ for dir_path in [p for p in expected_dirs if not p.is_dir()]:
+ if dir_path.exists():
+ raise HandledException(f'at expected directory path {dir_path}'
+ 'found non-directory')
+ print(f'creating expected directory: {dir_path}')
+ dir_path.mkdir(parents=True, exist_ok=True)
-def get_db_version(db_path: PathStr) -> int:
+def get_db_version(db_path: Path) -> int:
"""Return user_version value of DB at db_path."""
with sql_connect(db_path) as conn:
return list(conn.execute(SQL_DB_VERSION))[0][0]
if attr_name in d:
setattr(self, attr_name, type_(d[attr_name]))
set_attrs_from_dict(DEFAULTS)
- if isfile(PATH_CONFFILE):
- with open(PATH_CONFFILE, 'r', encoding='utf8') as f:
- conffile = json_load(f)
+ if PATH_CONFFILE.is_file():
+ conffile = json_loads(PATH_CONFFILE.read_text(encoding='utf8'))
set_attrs_from_dict(conffile)
set_attrs_from_dict({k[len(ENVIRON_PREFIX):].lower(): v
for k, v in environ.items()
class DbConnection:
"""Wrapped sqlite3.Connection."""
- def __init__(self, path: PathStr = PATH_DB) -> None:
+ def __init__(self, path: Path = PATH_DB) -> None:
self._path = path
- if not isfile(self._path):
- if path_exists(self._path):
+ if not self._path.is_file():
+ if self._path.exists():
raise HandledException(f'no DB at {self._path}; would create, '
'but something\'s already there?')
- path_db_dir = dirname(self._path)
- if not isdir(path_db_dir):
+ if not self._path.parent.is_dir():
raise NotFoundException(
- f'cannot find {path_db_dir} as directory to put DB '
- f'into, did you run {NAME_INSTALLER}?')
+ f'cannot find {self._path.parent} as directory to put '
+ f'DB into, did you run {NAME_INSTALLER}?')
with sql_connect(self._path) as conn:
- with open(PATH_DB_SCHEMA, 'r', encoding='utf8') as f:
- conn.executescript(f.read())
+ conn.executescript(PATH_DB_SCHEMA.read_text(encoding='utf8'))
conn.execute(f'{SQL_DB_VERSION} = {EXPECTED_DB_VERSION}')
cur_version = get_db_version(self._path)
if cur_version != EXPECTED_DB_VERSION:
kwargs = {}
for i, col_name in enumerate(cls._cols):
kwargs[col_name] = row[i]
+ for attr_name, type_ in cls.__annotations__.items():
+ if attr_name in kwargs:
+ kwargs[attr_name] = type_(kwargs[attr_name])
return cls(**kwargs)
@classmethod
vals = [getattr(self, col_name) for col_name in self._cols]
q_marks = '(' + ','.join(['?'] * len(vals)) + ')'
sql = SqlText(f'REPLACE INTO {self._table_name} VALUES {q_marks}')
- return conn.exec(sql, tuple(vals))
+ return conn.exec(sql, tuple(str(v) if isinstance(v, Path) else v
+ for v in vals))
class YoutubeQuery(DbData):
_table_name = 'files'
_cols = ('rel_path', 'yt_id', 'flags', 'last_update')
last_update: DatetimeStr
+ rel_path: Path
def __init__(self,
- rel_path: PathStr,
+ rel_path: Path,
yt_id: YoutubeId,
flags: FlagsInt = FlagsInt(0),
last_update: Optional[DatetimeStr] = None
@property
def rel_path_b64(self) -> B64Str:
"""Return .rel_path as urlsafe_b64 e3ncoding."""
- return B64Str(urlsafe_b64encode(self.rel_path.encode()).decode())
+ return B64Str(urlsafe_b64encode(str(self.rel_path).encode()).decode())
@property
- def full_path(self) -> PathStr:
+ def full_path(self) -> Path:
"""Return self.rel_path suffixed under PATH_DOWNLOADS."""
- return PathStr(path_join(PATH_DOWNLOADS, self.rel_path))
-
- @property
- def basename(self) -> PathStr:
- """Return basename(self.rel_path)."""
- return PathStr(basename(self.rel_path))
+ return PATH_DOWNLOADS.joinpath(self.rel_path)
@property
def present(self) -> bool:
"""Return if file exists in filesystem."""
- return path_exists(self.full_path)
+ return self.full_path.exists()
@property
def missing(self) -> bool:
def ensure_absence_if_deleted(self) -> None:
"""If 'delete' flag set, ensure no actual file in filesystem."""
- if (self.is_flag_set(FlagName('delete'))
- and path_exists(self.full_path)):
+ if self.is_flag_set(FlagName('delete')) and self.present:
print(f'SYNC: {self.rel_path} set "delete", '
'removing from filesystem.')
- os_remove(self.full_path)
+ self.full_path.unlink()
class QuotaLog(DbData):
conn = DbConnection()
known_files = {f.full_path: f for f in VideoFile.get_all(conn)}
conn.commit_close()
- self._files = [known_files[PathStr(e.path)]
- for e in scandir(PATH_DOWNLOADS)
- if e.path in known_files
- and isfile(e.path)
- and splitext(e.path)[1][1:] in LEGAL_EXTENSIONS]
+ self._files = [known_files[p] for p in PATH_DOWNLOADS.iterdir()
+ if p in known_files
+ and p.is_file()
+ and p.suffix[1:] in LEGAL_EXTENSIONS]
shuffle(self._files)
self._idx = 0
def _sync_db(self):
conn = DbConnection()
- files_via_db = VideoFile.get_all(conn)
- old_cwd = getcwd()
+ known_paths = [file.rel_path for file in VideoFile.get_all(conn)]
+ old_cwd = Path.cwd()
chdir(PATH_DOWNLOADS)
- paths = [file.rel_path for file in files_via_db]
- for path in [PathStr(basename(e.path)) for e in scandir()
- if isfile(e.path)]:
- if path not in paths:
- yt_id = self._id_from_filename(path)
- file = VideoFile(path, yt_id)
- print(f'SYNC: new file {path}, saving with YT ID "{yt_id}".')
- file.save(conn)
+ for path in [p for p in Path('.').iterdir()
+ if p.is_file() and p not in known_paths]:
+ yt_id = self._id_from_filename(path)
+ file = VideoFile(path, yt_id)
+ print(f'SYNC: new file {path}, saving with YT ID "{yt_id}".')
+ file.save(conn)
self._files = VideoFile.get_all(conn)
for file in self._files:
file.ensure_absence_if_deleted()
conn.commit_close()
@staticmethod
- def _id_from_filename(path: PathStr,
- double_split: bool = False
- ) -> YoutubeId:
- before_ext = splitext(path)[0]
- if double_split:
- before_ext = splitext(before_ext)[0]
- return YoutubeId(before_ext.split('[')[-1].split(']')[0])
+ def _id_from_filename(path: Path) -> YoutubeId:
+ return YoutubeId(path.stem.split('[')[-1].split(']')[0])
@property
def ids_unfinished(self) -> set[YoutubeId]:
"""Return set of IDs of videos awaiting or currently in download."""
in_temp_dir = []
- for path in [PathStr(e.path) for e
- in scandir(PATH_TEMP) if isfile(e.path)]:
+ for path in [p for p in PATH_TEMP.iterdir() if p.is_file()]:
in_temp_dir += [self._id_from_filename(path)]
return set(self._to_download + in_temp_dir)
def clean_unfinished(self) -> None:
"""Empty temp directory of unfinished downloads."""
- for e in [e for e in scandir(PATH_TEMP) if isfile(e.path)]:
- print(f'removing unfinished download: {e.path}')
- os_remove(e.path)
+ for path in [p for p in PATH_TEMP.iterdir() if p.is_file()]:
+ print(f'removing unfinished download: {path}')
+ path.unlink()
def queue_download(self, video_id: YoutubeId) -> None:
"""Add video_id to download queue *if* not already processed."""
if content:
self.wfile.write(content)
- def _redirect(self, target: PathStr) -> None:
- self._send_http(headers=[('Location', target)], code=302)
+ def _redirect(self, target: Path) -> None:
+ self._send_http(headers=[('Location', str(target))], code=302)
def do_POST(self) -> None: # pylint:disable=invalid-name
"""Map POST requests to handlers for various paths."""
url = urlparse(self.path)
- toks_url: list[str] = url.path.split('/')
- page_name = toks_url[1]
+ toks_url = Path(url.path).parts
+ page_name = Path(toks_url[1] if len(toks_url) > 1 else '')
body_length = int(self.headers['content-length'])
postvars = parse_qs(self.rfile.read(body_length).decode())
if PAGE_NAMES['playlist'] == page_name:
elif command.startswith('down_'):
self.server.player.move_entry(int(command.split('_')[1]), False)
sleep(0.5) # avoid redir happening before current_file update
- self._redirect(PathStr('/'))
+ self._redirect(Path('/'))
def _receive_video_flag(self,
rel_path_b64: B64Str,
file.save(conn)
conn.commit_close()
file.ensure_absence_if_deleted()
- self._redirect(PathStr(f'/{PAGE_NAMES["file"]}/{rel_path_b64}'))
+ self._redirect(Path('/')
+ .joinpath(PAGE_NAMES['file'])
+ .joinpath(rel_path_b64))
def _receive_yt_query(self, query_txt: QueryText) -> None:
conn = DbConnection()
ids_to_detail += [video_id]
snippet = item['snippet']
urlretrieve(snippet['thumbnails']['default']['url'],
- path_join(PATH_THUMBNAILS, f'{video_id}.jpg'))
+ PATH_THUMBNAILS.joinpath(f'{video_id}.jpg'))
results += [YoutubeVideo(id_=video_id,
title=snippet['title'],
description=snippet['description'],
result.save(conn)
result.save_to_query(conn, query_data.id_)
conn.commit_close()
- self._redirect(PathStr(f'/{PAGE_NAMES["yt_query"]}/{query_data.id_}'))
+ self._redirect(Path('/')
+ .joinpath(PAGE_NAMES['yt_query'])
+ .joinpath(query_data.id_))
def do_GET(self) -> None: # pylint:disable=invalid-name
"""Map GET requests to handlers for various paths."""
url = urlparse(self.path)
- toks_url: list[str] = url.path.split('/')
- page_name = toks_url[1]
+ toks_url = Path(url.path).parts
+ page_name = Path(toks_url[1] if len(toks_url) > 1 else '')
try:
if PAGE_NAMES['thumbnails'] == page_name:
- self._send_thumbnail(PathStr(toks_url[2]))
+ self._send_thumbnail(Path(toks_url[2]))
elif PAGE_NAMES['download'] == page_name:
self._send_or_download_video(YoutubeId(toks_url[2]))
elif PAGE_NAMES['files'] == page_name:
self._send_http(bytes(str(e), 'utf8'), code=404)
def _send_rendered_template(self,
- tmpl_name: PathStr,
+ tmpl_name: Path,
tmpl_ctx: TemplateContext
) -> None:
- tmpl = self.server.jinja.get_template(tmpl_name)
+ tmpl = self.server.jinja.get_template(str(tmpl_name))
tmpl_ctx['page_names'] = PAGE_NAMES
html = tmpl.render(**tmpl_ctx)
self._send_http(bytes(html, 'utf8'))
- def _send_thumbnail(self, filename: PathStr) -> None:
+ def _send_thumbnail(self, filename: Path) -> None:
_ensure_expected_dirs([PATH_THUMBNAILS])
- path_thumbnail = path_join(PATH_THUMBNAILS, filename)
- if not path_exists(path_thumbnail):
- video_id = splitext(filename)[0]
+ path_thumbnail = PATH_THUMBNAILS.joinpath(filename)
+ if not path_thumbnail.exists():
+ video_id = filename.stem
url = f'{THUMBNAIL_URL_PREFIX}{video_id}{THUMBNAIL_URL_SUFFIX}'
try:
- urlretrieve(url, path_join(PATH_THUMBNAILS, f'{video_id}.jpg'))
+ urlretrieve(url, PATH_THUMBNAILS.joinpath(f'{video_id}.jpg'))
except HTTPError as e:
if 404 == e.code:
raise NotFoundException from e
raise e
- with open(path_thumbnail, 'rb') as f:
+ with path_thumbnail.open('rb') as f:
img = f.read()
self._send_http(img, [('Content-type', 'image/jpg')])
except NotFoundException:
conn.commit_close()
self.server.downloads.queue_download(video_id)
- self._redirect(PathStr(f'/{PAGE_NAMES["yt_result"]}/{video_id}'))
+ self._redirect(Path('/')
+ .joinpath(PAGE_NAMES['yt_result'])
+ .joinpath(video_id))
return
conn.commit_close()
- with open(file_data.full_path, 'rb') as video_file:
+ with file_data.full_path.open('rb') as video_file:
video = video_file.read()
self._send_http(content=video)
def _send_files_index(self, filter_: ParamsStr, show_absent: bool) -> None:
conn = DbConnection()
files = [f for f in VideoFile.get_all(conn)
- if (filter_ in f.rel_path) and (show_absent or f.present)]
+ if filter_ in str(f.rel_path) and (show_absent or f.present)]
conn.commit_close()
files.sort(key=lambda t: t.rel_path)
self._send_rendered_template(