From: Christian Heller Date: Fri, 29 Nov 2024 05:06:14 +0000 (+0100) Subject: Replace PathStr hassles with pathlib.Path. X-Git-Url: https://plomlompom.com/repos/%7B%7Bdb.prefix%7D%7D/%7B%7B%20web_path%20%7D%7D/decks/%7B%7Bprefix%7D%7D/%7Broute%7D?a=commitdiff_plain;h=a40eb8f4b9eb930ce410b8afdc8ee3562dd5e092;p=ytplom Replace PathStr hassles with pathlib.Path. --- diff --git a/src/migrate.py b/src/migrate.py index 7d712d9..85f4af4 100755 --- a/src/migrate.py +++ b/src/migrate.py @@ -1,8 +1,6 @@ #!/usr/bin/env python3 """Script to migrate DB to most recent schema.""" from sys import exit as sys_exit -from os import scandir -from os.path import basename, isfile from sqlite3 import connect as sql_connect from ytplom.misc import ( EXPECTED_DB_VERSION, PATH_DB, PATH_DB_SCHEMA, PATH_MIGRATIONS, @@ -23,24 +21,23 @@ def main() -> None: f'{EXPECTED_DB_VERSION} …') needed = [n+1 for n in range(start_version, EXPECTED_DB_VERSION)] migrations = {} - for entry in [entry for entry in scandir(PATH_MIGRATIONS) - if isfile(entry) and entry.path != PATH_DB_SCHEMA]: - toks = basename(entry.path).split('_') + for path in [p for p in PATH_MIGRATIONS.iterdir() + if p.is_file() and p != PATH_DB_SCHEMA]: + toks = path.name.split('_') try: version = int(toks[0]) except ValueError as e: - msg = f'Found illegal migration path {entry.path}, aborting.' + msg = f'Found illegal migration path {path}, aborting.' raise HandledException(msg) from e if version in needed: - migrations[version] = entry.path + migrations[version] = path missing = [n for n in needed if n not in migrations] if missing: raise HandledException(f'Needed migrations missing: {missing}') with sql_connect(PATH_DB) as conn: for version_number, migration_path in migrations.items(): print(f'Applying migration {version_number}: {migration_path}') - with open(migration_path, 'r', encoding='utf8') as f: - conn.executescript(f.read()) + conn.executescript(migration_path.read_text(encoding='utf8')) conn.execute(f'{SQL_DB_VERSION} = {version_number}') diff --git a/src/sync.py b/src/sync.py index 3d6a712..2b64581 100755 --- a/src/sync.py +++ b/src/sync.py @@ -4,19 +4,17 @@ # included libs from typing import Any, Callable from json import loads as json_loads -from os import remove as os_remove -from os.path import join as path_join +from pathlib import Path from urllib.request import urlopen # non-included libs from paramiko import SSHClient # type: ignore from scp import SCPClient # type: ignore from ytplom.misc import ( PAGE_NAMES, PATH_DB, PATH_DOWNLOADS, PATH_TEMP, - Config, DbConnection, PathStr, QuotaLog, VideoFile, - YoutubeQuery, YoutubeVideo) + Config, DbConnection, QuotaLog, VideoFile, YoutubeQuery, YoutubeVideo) -PATH_DB_REMOTE = PathStr(path_join(PATH_TEMP, 'remote_db.sql')) +PATH_DB_REMOTE = PATH_TEMP.joinpath('remote_db.sql') ATTR_NAME_LAST_UPDATE = 'last_update' @@ -88,19 +86,19 @@ def main(): local_db.commit_close() remote_db.commit_close() scp.put(PATH_DB_REMOTE, PATH_DB) - os_remove(PATH_DB_REMOTE) + PATH_DB_REMOTE.unlink() missings = [] for host, port in ((config.remote, config.port_remote), (config.host, config.port)): url_missing = f'http://{host}:{port}/{PAGE_NAMES["missing"]}' with urlopen(url_missing) as response: - missings += [json_loads(response.read())] + missings += [Path(p) for p in json_loads(response.read())] for i, direction_mover in enumerate([('local->remote', scp.put), ('remote->local', scp.get)]): direction, mover = direction_mover for path in (p for p in missings[i] if p not in missings[int(not bool(i))]): - full_path = path_join(PATH_DOWNLOADS, path) + full_path = PATH_DOWNLOADS.joinpath(path) print(f'SYNC: sending {direction} file {path}') mover(full_path, full_path) scp.close() diff --git a/src/templates/playlist.tmpl b/src/templates/playlist.tmpl index 3a6e44c..bbf441e 100644 --- a/src/templates/playlist.tmpl +++ b/src/templates/playlist.tmpl @@ -48,7 +48,7 @@ td.entry_buttons { width: 5em; } -{{ file.basename }} +{{ file.rel_path }} {% endfor %} @@ -61,7 +61,7 @@ td.entry_buttons { width: 5em; }
{% if running %}{% if pause %}PAUSED{% else %}PLAYING{% endif %}{% else %}STOPPED{% endif %}:
-{{ current_video.basename }}
+{{ current_video.rel_path }}
diff --git a/src/ytplom/misc.py b/src/ytplom/misc.py index 22cfb99..96cabd6 100644 --- a/src/ytplom/misc.py +++ b/src/ytplom/misc.py @@ -2,15 +2,14 @@ # included libs from typing import Any, NewType, Optional, Self, TypeAlias -from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove -from os.path import (basename, dirname, isdir, isfile, exists as path_exists, - join as path_join, splitext) +from os import chdir, environ from base64 import urlsafe_b64encode, urlsafe_b64decode from random import shuffle from time import time, sleep from datetime import datetime, timedelta -from json import dumps as json_dumps, load as json_load +from json import dumps as json_dumps, loads as json_loads from uuid import uuid4 +from pathlib import Path from sqlite3 import connect as sql_connect, Cursor, Row from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs @@ -47,79 +46,78 @@ AmountDownloads = NewType('AmountDownloads', int) PlayerUpdateId = NewType('PlayerUpdateId', str) B64Str = NewType('B64Str', str) ParamsStr = NewType('ParamsStr', str) -PageNames: TypeAlias = dict[str, PathStr] -DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr] +UrlStr = NewType('UrlStr', str) +PageNames: TypeAlias = dict[str, Path] +DownloadsIndex: TypeAlias = dict[YoutubeId, Path] FilesWithIndex: TypeAlias = list[tuple[int, 'VideoFile']] TemplateContext: TypeAlias = dict[ str, None | bool - | FilesWithIndex | PageNames | ParamsStr | PathStr | PlayerUpdateId - | QueryText | QuotaCost | 'VideoFile' | YoutubeId | 'YoutubeVideo' - | list[FlagName] | list['VideoFile'] | list['YoutubeVideo'] - | list['YoutubeQuery'] + | FilesWithIndex | PageNames | ParamsStr | Path | PlayerUpdateId + | QueryText | QuotaCost | UrlStr | 'VideoFile' | YoutubeId + | 'YoutubeVideo' | list[FlagName] | list['VideoFile'] + | list['YoutubeVideo'] | list['YoutubeQuery'] ] # major expected directories -PATH_HOME = PathStr(environ.get('HOME', '')) -PATH_APP_DATA = PathStr(path_join(PATH_HOME, '.local/share/ytplom')) -PATH_CACHE = PathStr(path_join(PATH_HOME, '.cache/ytplom')) +PATH_APP_DATA = Path.home().joinpath('.local/share/ytplom') +PATH_CACHE = Path.home().joinpath('.cache/ytplom') # paths for rather dynamic data -PATH_DOWNLOADS = PathStr(path_join(PATH_HOME, 'ytplom_downloads')) -PATH_DB = PathStr(path_join(PATH_APP_DATA, 'db.sql')) -PATH_TEMP = PathStr(path_join(PATH_CACHE, 'temp')) -PATH_THUMBNAILS = PathStr(path_join(PATH_CACHE, 'thumbnails')) -PATH_CONFFILE = PathStr(path_join(PATH_HOME, '.config/ytplom/config.json')) +PATH_DOWNLOADS = Path.home().joinpath('ytplom_downloads') +PATH_DB = PATH_APP_DATA.joinpath('db.sql') +PATH_TEMP = PATH_CACHE.joinpath('temp') +PATH_THUMBNAILS = PATH_CACHE.joinpath('thumbnails') +PATH_CONFFILE = Path.home().joinpath('.config/ytplom/config.json') # template paths -PATH_TEMPLATES = PathStr(path_join(PATH_APP_DATA, 'templates')) -NAME_TEMPLATE_QUERIES = PathStr('yt_queries.tmpl') -NAME_TEMPLATE_RESULTS = PathStr('yt_results.tmpl') -NAME_TEMPLATE_FILES = PathStr('files.tmpl') -NAME_TEMPLATE_FILE_DATA = PathStr('file_data.tmpl') -NAME_TEMPLATE_YT_VIDEO = PathStr('yt_result.tmpl') -NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl') +PATH_TEMPLATES = PATH_APP_DATA.joinpath('templates') +NAME_TEMPLATE_QUERIES = Path('yt_queries.tmpl') +NAME_TEMPLATE_RESULTS = Path('yt_results.tmpl') +NAME_TEMPLATE_FILES = Path('files.tmpl') +NAME_TEMPLATE_FILE_DATA = Path('file_data.tmpl') +NAME_TEMPLATE_YT_VIDEO = Path('yt_result.tmpl') +NAME_TEMPLATE_PLAYLIST = Path('playlist.tmpl') # page names PAGE_NAMES = { - 'download': PathStr('dl'), - 'file': PathStr('file'), - 'files': PathStr('files'), - 'last_update': PathStr('last_playlist_update.json'), - 'missing': PathStr('missing.json'), - 'playlist': PathStr('playlist'), - 'thumbnails': PathStr('thumbnails'), - 'yt_result': PathStr('yt_result'), - 'yt_query': PathStr('yt_query'), - 'yt_queries': PathStr('yt_queries') + 'download': Path('dl'), + 'file': Path('file'), + 'files': Path('files'), + 'last_update': Path('last_playlist_update.json'), + 'missing': Path('missing.json'), + 'playlist': Path('playlist'), + 'thumbnails': Path('thumbnails'), + 'yt_result': Path('yt_result'), + 'yt_query': Path('yt_query'), + 'yt_queries': Path('yt_queries') } # yt_dlp config YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\ '/best[height<=1080][width<=1920]' -YT_DL_PARAMS = {'paths': {'home': PATH_DOWNLOADS, - 'temp': PATH_TEMP}, +YT_DL_PARAMS = {'paths': {'home': str(PATH_DOWNLOADS), + 'temp': str(PATH_TEMP)}, 'format': YT_DOWNLOAD_FORMAT} # Youtube API expectations -YOUTUBE_URL_PREFIX = PathStr('https://www.youtube.com/watch?v=') -THUMBNAIL_URL_PREFIX = PathStr('https://i.ytimg.com/vi/') -THUMBNAIL_URL_SUFFIX = PathStr('/default.jpg') +YOUTUBE_URL_PREFIX = UrlStr('https://www.youtube.com/watch?v=') +THUMBNAIL_URL_PREFIX = UrlStr('https://i.ytimg.com/vi/') +THUMBNAIL_URL_SUFFIX = UrlStr('/default.jpg') QUOTA_COST_YOUTUBE_SEARCH = QuotaCost(100) QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1) # database stuff EXPECTED_DB_VERSION = 1 SQL_DB_VERSION = SqlText('PRAGMA user_version') -PATH_MIGRATIONS = PathStr(path_join(PATH_APP_DATA, 'migrations')) -PATH_DB_SCHEMA = PathStr(path_join(PATH_MIGRATIONS, - f'init_{EXPECTED_DB_VERSION}.sql')) +PATH_MIGRATIONS = PATH_APP_DATA.joinpath('migrations') +PATH_DB_SCHEMA = PATH_MIGRATIONS.joinpath(f'init_{EXPECTED_DB_VERSION}.sql') # other ENVIRON_PREFIX = 'YTPLOM_' TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f' LEGAL_EXTENSIONS = {'webm', 'mp4', 'mkv'} -NAME_INSTALLER = PathStr('install.sh') +NAME_INSTALLER = Path('install.sh') FILE_FLAGS: dict[FlagName, FlagsInt] = { FlagName('delete'): FlagsInt(1 << 62) } @@ -133,18 +131,17 @@ class HandledException(Exception): """Raise in any other case where we know what's happening.""" -def _ensure_expected_dirs(expected_dirs: list[PathStr]) -> None: +def _ensure_expected_dirs(expected_dirs: list[Path]) -> None: """Ensure existance of expected_dirs _as_ directories.""" - for dir_name in expected_dirs: - if not isdir(dir_name): - if path_exists(dir_name): - raise HandledException(f'at expected directory path {dir_name}' - 'found non-directory') - print(f'creating expected directory: {dir_name}') - makedirs(dir_name) + for dir_path in [p for p in expected_dirs if not p.is_dir()]: + if dir_path.exists(): + raise HandledException(f'at expected directory path {dir_path}' + 'found non-directory') + print(f'creating expected directory: {dir_path}') + dir_path.mkdir(parents=True, exist_ok=True) -def get_db_version(db_path: PathStr) -> int: +def get_db_version(db_path: Path) -> int: """Return user_version value of DB at db_path.""" with sql_connect(db_path) as conn: return list(conn.execute(SQL_DB_VERSION))[0][0] @@ -164,9 +161,8 @@ class Config: if attr_name in d: setattr(self, attr_name, type_(d[attr_name])) set_attrs_from_dict(DEFAULTS) - if isfile(PATH_CONFFILE): - with open(PATH_CONFFILE, 'r', encoding='utf8') as f: - conffile = json_load(f) + if PATH_CONFFILE.is_file(): + conffile = json_loads(PATH_CONFFILE.read_text(encoding='utf8')) set_attrs_from_dict(conffile) set_attrs_from_dict({k[len(ENVIRON_PREFIX):].lower(): v for k, v in environ.items() @@ -176,20 +172,18 @@ class Config: class DbConnection: """Wrapped sqlite3.Connection.""" - def __init__(self, path: PathStr = PATH_DB) -> None: + def __init__(self, path: Path = PATH_DB) -> None: self._path = path - if not isfile(self._path): - if path_exists(self._path): + if not self._path.is_file(): + if self._path.exists(): raise HandledException(f'no DB at {self._path}; would create, ' 'but something\'s already there?') - path_db_dir = dirname(self._path) - if not isdir(path_db_dir): + if not self._path.parent.is_dir(): raise NotFoundException( - f'cannot find {path_db_dir} as directory to put DB ' - f'into, did you run {NAME_INSTALLER}?') + f'cannot find {self._path.parent} as directory to put ' + f'DB into, did you run {NAME_INSTALLER}?') with sql_connect(self._path) as conn: - with open(PATH_DB_SCHEMA, 'r', encoding='utf8') as f: - conn.executescript(f.read()) + conn.executescript(PATH_DB_SCHEMA.read_text(encoding='utf8')) conn.execute(f'{SQL_DB_VERSION} = {EXPECTED_DB_VERSION}') cur_version = get_db_version(self._path) if cur_version != EXPECTED_DB_VERSION: @@ -227,6 +221,9 @@ class DbData: kwargs = {} for i, col_name in enumerate(cls._cols): kwargs[col_name] = row[i] + for attr_name, type_ in cls.__annotations__.items(): + if attr_name in kwargs: + kwargs[attr_name] = type_(kwargs[attr_name]) return cls(**kwargs) @classmethod @@ -252,7 +249,8 @@ class DbData: vals = [getattr(self, col_name) for col_name in self._cols] q_marks = '(' + ','.join(['?'] * len(vals)) + ')' sql = SqlText(f'REPLACE INTO {self._table_name} VALUES {q_marks}') - return conn.exec(sql, tuple(vals)) + return conn.exec(sql, tuple(str(v) if isinstance(v, Path) else v + for v in vals)) class YoutubeQuery(DbData): @@ -350,9 +348,10 @@ class VideoFile(DbData): _table_name = 'files' _cols = ('rel_path', 'yt_id', 'flags', 'last_update') last_update: DatetimeStr + rel_path: Path def __init__(self, - rel_path: PathStr, + rel_path: Path, yt_id: YoutubeId, flags: FlagsInt = FlagsInt(0), last_update: Optional[DatetimeStr] = None @@ -385,22 +384,17 @@ class VideoFile(DbData): @property def rel_path_b64(self) -> B64Str: """Return .rel_path as urlsafe_b64 e3ncoding.""" - return B64Str(urlsafe_b64encode(self.rel_path.encode()).decode()) + return B64Str(urlsafe_b64encode(str(self.rel_path).encode()).decode()) @property - def full_path(self) -> PathStr: + def full_path(self) -> Path: """Return self.rel_path suffixed under PATH_DOWNLOADS.""" - return PathStr(path_join(PATH_DOWNLOADS, self.rel_path)) - - @property - def basename(self) -> PathStr: - """Return basename(self.rel_path).""" - return PathStr(basename(self.rel_path)) + return PATH_DOWNLOADS.joinpath(self.rel_path) @property def present(self) -> bool: """Return if file exists in filesystem.""" - return path_exists(self.full_path) + return self.full_path.exists() @property def missing(self) -> bool: @@ -423,11 +417,10 @@ class VideoFile(DbData): def ensure_absence_if_deleted(self) -> None: """If 'delete' flag set, ensure no actual file in filesystem.""" - if (self.is_flag_set(FlagName('delete')) - and path_exists(self.full_path)): + if self.is_flag_set(FlagName('delete')) and self.present: print(f'SYNC: {self.rel_path} set "delete", ' 'removing from filesystem.') - os_remove(self.full_path) + self.full_path.unlink() class QuotaLog(DbData): @@ -503,11 +496,10 @@ class Player: conn = DbConnection() known_files = {f.full_path: f for f in VideoFile.get_all(conn)} conn.commit_close() - self._files = [known_files[PathStr(e.path)] - for e in scandir(PATH_DOWNLOADS) - if e.path in known_files - and isfile(e.path) - and splitext(e.path)[1][1:] in LEGAL_EXTENSIONS] + self._files = [known_files[p] for p in PATH_DOWNLOADS.iterdir() + if p in known_files + and p.is_file() + and p.suffix[1:] in LEGAL_EXTENSIONS] shuffle(self._files) self._idx = 0 @@ -652,17 +644,15 @@ class DownloadsManager: def _sync_db(self): conn = DbConnection() - files_via_db = VideoFile.get_all(conn) - old_cwd = getcwd() + known_paths = [file.rel_path for file in VideoFile.get_all(conn)] + old_cwd = Path.cwd() chdir(PATH_DOWNLOADS) - paths = [file.rel_path for file in files_via_db] - for path in [PathStr(basename(e.path)) for e in scandir() - if isfile(e.path)]: - if path not in paths: - yt_id = self._id_from_filename(path) - file = VideoFile(path, yt_id) - print(f'SYNC: new file {path}, saving with YT ID "{yt_id}".') - file.save(conn) + for path in [p for p in Path('.').iterdir() + if p.is_file() and p not in known_paths]: + yt_id = self._id_from_filename(path) + file = VideoFile(path, yt_id) + print(f'SYNC: new file {path}, saving with YT ID "{yt_id}".') + file.save(conn) self._files = VideoFile.get_all(conn) for file in self._files: file.ensure_absence_if_deleted() @@ -670,28 +660,22 @@ class DownloadsManager: conn.commit_close() @staticmethod - def _id_from_filename(path: PathStr, - double_split: bool = False - ) -> YoutubeId: - before_ext = splitext(path)[0] - if double_split: - before_ext = splitext(before_ext)[0] - return YoutubeId(before_ext.split('[')[-1].split(']')[0]) + def _id_from_filename(path: Path) -> YoutubeId: + return YoutubeId(path.stem.split('[')[-1].split(']')[0]) @property def ids_unfinished(self) -> set[YoutubeId]: """Return set of IDs of videos awaiting or currently in download.""" in_temp_dir = [] - for path in [PathStr(e.path) for e - in scandir(PATH_TEMP) if isfile(e.path)]: + for path in [p for p in PATH_TEMP.iterdir() if p.is_file()]: in_temp_dir += [self._id_from_filename(path)] return set(self._to_download + in_temp_dir) def clean_unfinished(self) -> None: """Empty temp directory of unfinished downloads.""" - for e in [e for e in scandir(PATH_TEMP) if isfile(e.path)]: - print(f'removing unfinished download: {e.path}') - os_remove(e.path) + for path in [p for p in PATH_TEMP.iterdir() if p.is_file()]: + print(f'removing unfinished download: {path}') + path.unlink() def queue_download(self, video_id: YoutubeId) -> None: """Add video_id to download queue *if* not already processed.""" @@ -748,14 +732,14 @@ class TaskHandler(BaseHTTPRequestHandler): if content: self.wfile.write(content) - def _redirect(self, target: PathStr) -> None: - self._send_http(headers=[('Location', target)], code=302) + def _redirect(self, target: Path) -> None: + self._send_http(headers=[('Location', str(target))], code=302) def do_POST(self) -> None: # pylint:disable=invalid-name """Map POST requests to handlers for various paths.""" url = urlparse(self.path) - toks_url: list[str] = url.path.split('/') - page_name = toks_url[1] + toks_url = Path(url.path).parts + page_name = Path(toks_url[1] if len(toks_url) > 1 else '') body_length = int(self.headers['content-length']) postvars = parse_qs(self.rfile.read(body_length).decode()) if PAGE_NAMES['playlist'] == page_name: @@ -784,7 +768,7 @@ class TaskHandler(BaseHTTPRequestHandler): elif command.startswith('down_'): self.server.player.move_entry(int(command.split('_')[1]), False) sleep(0.5) # avoid redir happening before current_file update - self._redirect(PathStr('/')) + self._redirect(Path('/')) def _receive_video_flag(self, rel_path_b64: B64Str, @@ -799,7 +783,9 @@ class TaskHandler(BaseHTTPRequestHandler): file.save(conn) conn.commit_close() file.ensure_absence_if_deleted() - self._redirect(PathStr(f'/{PAGE_NAMES["file"]}/{rel_path_b64}')) + self._redirect(Path('/') + .joinpath(PAGE_NAMES['file']) + .joinpath(rel_path_b64)) def _receive_yt_query(self, query_txt: QueryText) -> None: conn = DbConnection() @@ -822,7 +808,7 @@ class TaskHandler(BaseHTTPRequestHandler): ids_to_detail += [video_id] snippet = item['snippet'] urlretrieve(snippet['thumbnails']['default']['url'], - path_join(PATH_THUMBNAILS, f'{video_id}.jpg')) + PATH_THUMBNAILS.joinpath(f'{video_id}.jpg')) results += [YoutubeVideo(id_=video_id, title=snippet['title'], description=snippet['description'], @@ -851,16 +837,18 @@ class TaskHandler(BaseHTTPRequestHandler): result.save(conn) result.save_to_query(conn, query_data.id_) conn.commit_close() - self._redirect(PathStr(f'/{PAGE_NAMES["yt_query"]}/{query_data.id_}')) + self._redirect(Path('/') + .joinpath(PAGE_NAMES['yt_query']) + .joinpath(query_data.id_)) def do_GET(self) -> None: # pylint:disable=invalid-name """Map GET requests to handlers for various paths.""" url = urlparse(self.path) - toks_url: list[str] = url.path.split('/') - page_name = toks_url[1] + toks_url = Path(url.path).parts + page_name = Path(toks_url[1] if len(toks_url) > 1 else '') try: if PAGE_NAMES['thumbnails'] == page_name: - self._send_thumbnail(PathStr(toks_url[2])) + self._send_thumbnail(Path(toks_url[2])) elif PAGE_NAMES['download'] == page_name: self._send_or_download_video(YoutubeId(toks_url[2])) elif PAGE_NAMES['files'] == page_name: @@ -886,27 +874,27 @@ class TaskHandler(BaseHTTPRequestHandler): self._send_http(bytes(str(e), 'utf8'), code=404) def _send_rendered_template(self, - tmpl_name: PathStr, + tmpl_name: Path, tmpl_ctx: TemplateContext ) -> None: - tmpl = self.server.jinja.get_template(tmpl_name) + tmpl = self.server.jinja.get_template(str(tmpl_name)) tmpl_ctx['page_names'] = PAGE_NAMES html = tmpl.render(**tmpl_ctx) self._send_http(bytes(html, 'utf8')) - def _send_thumbnail(self, filename: PathStr) -> None: + def _send_thumbnail(self, filename: Path) -> None: _ensure_expected_dirs([PATH_THUMBNAILS]) - path_thumbnail = path_join(PATH_THUMBNAILS, filename) - if not path_exists(path_thumbnail): - video_id = splitext(filename)[0] + path_thumbnail = PATH_THUMBNAILS.joinpath(filename) + if not path_thumbnail.exists(): + video_id = filename.stem url = f'{THUMBNAIL_URL_PREFIX}{video_id}{THUMBNAIL_URL_SUFFIX}' try: - urlretrieve(url, path_join(PATH_THUMBNAILS, f'{video_id}.jpg')) + urlretrieve(url, PATH_THUMBNAILS.joinpath(f'{video_id}.jpg')) except HTTPError as e: if 404 == e.code: raise NotFoundException from e raise e - with open(path_thumbnail, 'rb') as f: + with path_thumbnail.open('rb') as f: img = f.read() self._send_http(img, [('Content-type', 'image/jpg')]) @@ -917,10 +905,12 @@ class TaskHandler(BaseHTTPRequestHandler): except NotFoundException: conn.commit_close() self.server.downloads.queue_download(video_id) - self._redirect(PathStr(f'/{PAGE_NAMES["yt_result"]}/{video_id}')) + self._redirect(Path('/') + .joinpath(PAGE_NAMES['yt_result']) + .joinpath(video_id)) return conn.commit_close() - with open(file_data.full_path, 'rb') as video_file: + with file_data.full_path.open('rb') as video_file: video = video_file.read() self._send_http(content=video) @@ -974,7 +964,7 @@ class TaskHandler(BaseHTTPRequestHandler): def _send_files_index(self, filter_: ParamsStr, show_absent: bool) -> None: conn = DbConnection() files = [f for f in VideoFile.get_all(conn) - if (filter_ in f.rel_path) and (show_absent or f.present)] + if filter_ in str(f.rel_path) and (show_absent or f.present)] conn.commit_close() files.sort(key=lambda t: t.rel_path) self._send_rendered_template(