home · contact · privacy
Replace PathStr hassles with pathlib.Path.
authorChristian Heller <c.heller@plomlompom.de>
Fri, 29 Nov 2024 05:06:14 +0000 (06:06 +0100)
committerChristian Heller <c.heller@plomlompom.de>
Fri, 29 Nov 2024 05:06:14 +0000 (06:06 +0100)
src/migrate.py
src/sync.py
src/templates/playlist.tmpl
src/ytplom/misc.py

index 7d712d97747429d1c7be0cb105a0d167ca24ef2c..85f4af46a82d102a13ddf7bfc4b8d7da09d817d4 100755 (executable)
@@ -1,8 +1,6 @@
 #!/usr/bin/env python3
 """Script to migrate DB to most recent schema."""
 from sys import exit as sys_exit
-from os import scandir
-from os.path import basename, isfile
 from sqlite3 import connect as sql_connect
 from ytplom.misc import (
         EXPECTED_DB_VERSION, PATH_DB, PATH_DB_SCHEMA, PATH_MIGRATIONS,
@@ -23,24 +21,23 @@ def main() -> None:
           f'{EXPECTED_DB_VERSION} …')
     needed = [n+1 for n in range(start_version, EXPECTED_DB_VERSION)]
     migrations = {}
-    for entry in [entry for entry in scandir(PATH_MIGRATIONS)
-                  if isfile(entry) and entry.path != PATH_DB_SCHEMA]:
-        toks = basename(entry.path).split('_')
+    for path in [p for p in PATH_MIGRATIONS.iterdir()
+                 if p.is_file() and p != PATH_DB_SCHEMA]:
+        toks = path.name.split('_')
         try:
             version = int(toks[0])
         except ValueError as e:
-            msg = f'Found illegal migration path {entry.path}, aborting.'
+            msg = f'Found illegal migration path {path}, aborting.'
             raise HandledException(msg) from e
         if version in needed:
-            migrations[version] = entry.path
+            migrations[version] = path
     missing = [n for n in needed if n not in migrations]
     if missing:
         raise HandledException(f'Needed migrations missing: {missing}')
     with sql_connect(PATH_DB) as conn:
         for version_number, migration_path in migrations.items():
             print(f'Applying migration {version_number}: {migration_path}')
-            with open(migration_path, 'r', encoding='utf8') as f:
-                conn.executescript(f.read())
+            conn.executescript(migration_path.read_text(encoding='utf8'))
             conn.execute(f'{SQL_DB_VERSION} = {version_number}')
 
 
index 3d6a71245db1eb93a50c149ecd5a16f2071134a8..2b6458171f486b8e4aedb1daf3c7b76da03a8726 100755 (executable)
@@ -4,19 +4,17 @@
 # included libs
 from typing import Any, Callable
 from json import loads as json_loads
-from os import remove as os_remove
-from os.path import join as path_join
+from pathlib import Path
 from urllib.request import urlopen
 # non-included libs
 from paramiko import SSHClient  # type: ignore
 from scp import SCPClient  # type: ignore
 from ytplom.misc import (
         PAGE_NAMES, PATH_DB, PATH_DOWNLOADS, PATH_TEMP,
-        Config, DbConnection, PathStr, QuotaLog, VideoFile,
-        YoutubeQuery, YoutubeVideo)
+        Config, DbConnection, QuotaLog, VideoFile, YoutubeQuery, YoutubeVideo)
 
 
-PATH_DB_REMOTE = PathStr(path_join(PATH_TEMP, 'remote_db.sql'))
+PATH_DB_REMOTE = PATH_TEMP.joinpath('remote_db.sql')
 ATTR_NAME_LAST_UPDATE = 'last_update'
 
 
@@ -88,19 +86,19 @@ def main():
     local_db.commit_close()
     remote_db.commit_close()
     scp.put(PATH_DB_REMOTE, PATH_DB)
-    os_remove(PATH_DB_REMOTE)
+    PATH_DB_REMOTE.unlink()
     missings = []
     for host, port in ((config.remote, config.port_remote),
                        (config.host, config.port)):
         url_missing = f'http://{host}:{port}/{PAGE_NAMES["missing"]}'
         with urlopen(url_missing) as response:
-            missings += [json_loads(response.read())]
+            missings += [Path(p) for p in json_loads(response.read())]
     for i, direction_mover in enumerate([('local->remote', scp.put),
                                          ('remote->local', scp.get)]):
         direction, mover = direction_mover
         for path in (p for p in missings[i]
                      if p not in missings[int(not bool(i))]):
-            full_path = path_join(PATH_DOWNLOADS, path)
+            full_path = PATH_DOWNLOADS.joinpath(path)
             print(f'SYNC: sending {direction} file {path}')
             mover(full_path, full_path)
     scp.close()
index 3a6e44cef9f357324bbf72e867f8bb4fdda93170..bbf441e5fb0f618c2ab970c3dbb686ffe67b63f1 100644 (file)
@@ -48,7 +48,7 @@ td.entry_buttons { width: 5em; }
 <input type="submit" name="up_{{idx}}" value="{% if reverse %}v{% else %}^{% endif %}" />
 <input type="submit" name="down_{{idx}}" value="{% if reverse %}^{% else %}v{% endif %}" />
 </td>
-<td><a href="/{{page_names.file}}/{{file.rel_path_b64}}">{{ file.basename }}</a></td>
+<td><a href="/{{page_names.file}}/{{file.rel_path_b64}}">{{ file.rel_path }}</a></td>
 </tr>
 {% endfor %}
 </table>
@@ -61,7 +61,7 @@ td.entry_buttons { width: 5em; }
 <table>
 <tr><td id="status" colspan=2>
 {% if running %}{% if pause %}PAUSED{% else %}PLAYING{% endif %}{% else %}STOPPED{% endif %}:<br />
-<a href="/{{page_names.file}}/{{current_video.rel_path_b64}}">{{ current_video.basename }}</a><br />
+<a href="/{{page_names.file}}/{{current_video.rel_path_b64}}">{{ current_video.rel_path }}</a><br />
 <form action="/{{page_names.playlist}}" method="POST">
 <input type="submit" name="pause" autofocus value="{% if paused %}resume{% else %}pause{% endif %}">
 <input type="submit" name="prev" value="prev">
index 22cfb99de3302f507ed3b8cef8bf92f12f9e9c85..96cabd6bcd50ae26739892bf6aa725249e132956 100644 (file)
@@ -2,15 +2,14 @@
 
 # included libs
 from typing import Any, NewType, Optional, Self, TypeAlias
-from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove
-from os.path import (basename, dirname, isdir, isfile, exists as path_exists,
-                     join as path_join, splitext)
+from os import chdir, environ
 from base64 import urlsafe_b64encode, urlsafe_b64decode
 from random import shuffle
 from time import time, sleep
 from datetime import datetime, timedelta
-from json import dumps as json_dumps, load as json_load
+from json import dumps as json_dumps, loads as json_loads
 from uuid import uuid4
+from pathlib import Path
 from sqlite3 import connect as sql_connect, Cursor, Row
 from http.server import HTTPServer, BaseHTTPRequestHandler
 from urllib.parse import urlparse, parse_qs
@@ -47,79 +46,78 @@ AmountDownloads = NewType('AmountDownloads', int)
 PlayerUpdateId = NewType('PlayerUpdateId', str)
 B64Str = NewType('B64Str', str)
 ParamsStr = NewType('ParamsStr', str)
-PageNames: TypeAlias = dict[str, PathStr]
-DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr]
+UrlStr = NewType('UrlStr', str)
+PageNames: TypeAlias = dict[str, Path]
+DownloadsIndex: TypeAlias = dict[YoutubeId, Path]
 FilesWithIndex: TypeAlias = list[tuple[int, 'VideoFile']]
 TemplateContext: TypeAlias = dict[
         str,
         None | bool
-        | FilesWithIndex | PageNames | ParamsStr | PathStr | PlayerUpdateId
-        | QueryText | QuotaCost | 'VideoFile' | YoutubeId | 'YoutubeVideo'
-        | list[FlagName] | list['VideoFile'] | list['YoutubeVideo']
-        | list['YoutubeQuery']
+        | FilesWithIndex | PageNames | ParamsStr | Path | PlayerUpdateId
+        | QueryText | QuotaCost | UrlStr | 'VideoFile' | YoutubeId
+        | 'YoutubeVideo' | list[FlagName] | list['VideoFile']
+        | list['YoutubeVideo'] | list['YoutubeQuery']
 ]
 
 # major expected directories
-PATH_HOME = PathStr(environ.get('HOME', ''))
-PATH_APP_DATA = PathStr(path_join(PATH_HOME, '.local/share/ytplom'))
-PATH_CACHE = PathStr(path_join(PATH_HOME, '.cache/ytplom'))
+PATH_APP_DATA = Path.home().joinpath('.local/share/ytplom')
+PATH_CACHE = Path.home().joinpath('.cache/ytplom')
 
 # paths for rather dynamic data
-PATH_DOWNLOADS = PathStr(path_join(PATH_HOME, 'ytplom_downloads'))
-PATH_DB = PathStr(path_join(PATH_APP_DATA, 'db.sql'))
-PATH_TEMP = PathStr(path_join(PATH_CACHE, 'temp'))
-PATH_THUMBNAILS = PathStr(path_join(PATH_CACHE, 'thumbnails'))
-PATH_CONFFILE = PathStr(path_join(PATH_HOME, '.config/ytplom/config.json'))
+PATH_DOWNLOADS = Path.home().joinpath('ytplom_downloads')
+PATH_DB = PATH_APP_DATA.joinpath('db.sql')
+PATH_TEMP = PATH_CACHE.joinpath('temp')
+PATH_THUMBNAILS = PATH_CACHE.joinpath('thumbnails')
+PATH_CONFFILE = Path.home().joinpath('.config/ytplom/config.json')
 
 # template paths
-PATH_TEMPLATES = PathStr(path_join(PATH_APP_DATA, 'templates'))
-NAME_TEMPLATE_QUERIES = PathStr('yt_queries.tmpl')
-NAME_TEMPLATE_RESULTS = PathStr('yt_results.tmpl')
-NAME_TEMPLATE_FILES = PathStr('files.tmpl')
-NAME_TEMPLATE_FILE_DATA = PathStr('file_data.tmpl')
-NAME_TEMPLATE_YT_VIDEO = PathStr('yt_result.tmpl')
-NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl')
+PATH_TEMPLATES = PATH_APP_DATA.joinpath('templates')
+NAME_TEMPLATE_QUERIES = Path('yt_queries.tmpl')
+NAME_TEMPLATE_RESULTS = Path('yt_results.tmpl')
+NAME_TEMPLATE_FILES = Path('files.tmpl')
+NAME_TEMPLATE_FILE_DATA = Path('file_data.tmpl')
+NAME_TEMPLATE_YT_VIDEO = Path('yt_result.tmpl')
+NAME_TEMPLATE_PLAYLIST = Path('playlist.tmpl')
 
 # page names
 PAGE_NAMES = {
-    'download': PathStr('dl'),
-    'file': PathStr('file'),
-    'files': PathStr('files'),
-    'last_update': PathStr('last_playlist_update.json'),
-    'missing': PathStr('missing.json'),
-    'playlist': PathStr('playlist'),
-    'thumbnails': PathStr('thumbnails'),
-    'yt_result': PathStr('yt_result'),
-    'yt_query': PathStr('yt_query'),
-    'yt_queries': PathStr('yt_queries')
+    'download': Path('dl'),
+    'file': Path('file'),
+    'files': Path('files'),
+    'last_update': Path('last_playlist_update.json'),
+    'missing': Path('missing.json'),
+    'playlist': Path('playlist'),
+    'thumbnails': Path('thumbnails'),
+    'yt_result': Path('yt_result'),
+    'yt_query': Path('yt_query'),
+    'yt_queries': Path('yt_queries')
 }
 
 # yt_dlp config
 YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
         '/best[height<=1080][width<=1920]'
-YT_DL_PARAMS = {'paths': {'home': PATH_DOWNLOADS,
-                          'temp': PATH_TEMP},
+YT_DL_PARAMS = {'paths': {'home': str(PATH_DOWNLOADS),
+                          'temp': str(PATH_TEMP)},
                 'format': YT_DOWNLOAD_FORMAT}
 
 # Youtube API expectations
-YOUTUBE_URL_PREFIX = PathStr('https://www.youtube.com/watch?v=')
-THUMBNAIL_URL_PREFIX = PathStr('https://i.ytimg.com/vi/')
-THUMBNAIL_URL_SUFFIX = PathStr('/default.jpg')
+YOUTUBE_URL_PREFIX = UrlStr('https://www.youtube.com/watch?v=')
+THUMBNAIL_URL_PREFIX = UrlStr('https://i.ytimg.com/vi/')
+THUMBNAIL_URL_SUFFIX = UrlStr('/default.jpg')
 QUOTA_COST_YOUTUBE_SEARCH = QuotaCost(100)
 QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1)
 
 # database stuff
 EXPECTED_DB_VERSION = 1
 SQL_DB_VERSION = SqlText('PRAGMA user_version')
-PATH_MIGRATIONS = PathStr(path_join(PATH_APP_DATA, 'migrations'))
-PATH_DB_SCHEMA = PathStr(path_join(PATH_MIGRATIONS,
-                                   f'init_{EXPECTED_DB_VERSION}.sql'))
+PATH_MIGRATIONS = PATH_APP_DATA.joinpath('migrations')
+PATH_DB_SCHEMA = PATH_MIGRATIONS.joinpath(f'init_{EXPECTED_DB_VERSION}.sql')
 
 # other
 ENVIRON_PREFIX = 'YTPLOM_'
 TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
 LEGAL_EXTENSIONS = {'webm', 'mp4', 'mkv'}
-NAME_INSTALLER = PathStr('install.sh')
+NAME_INSTALLER = Path('install.sh')
 FILE_FLAGS: dict[FlagName, FlagsInt] = {
   FlagName('delete'): FlagsInt(1 << 62)
 }
@@ -133,18 +131,17 @@ class HandledException(Exception):
     """Raise in any other case where we know what's happening."""
 
 
-def _ensure_expected_dirs(expected_dirs: list[PathStr]) -> None:
+def _ensure_expected_dirs(expected_dirs: list[Path]) -> None:
     """Ensure existance of expected_dirs _as_ directories."""
-    for dir_name in expected_dirs:
-        if not isdir(dir_name):
-            if path_exists(dir_name):
-                raise HandledException(f'at expected directory path {dir_name}'
-                                       'found non-directory')
-            print(f'creating expected directory: {dir_name}')
-            makedirs(dir_name)
+    for dir_path in [p for p in expected_dirs if not p.is_dir()]:
+        if dir_path.exists():
+            raise HandledException(f'at expected directory path {dir_path}'
+                                   'found non-directory')
+        print(f'creating expected directory: {dir_path}')
+        dir_path.mkdir(parents=True, exist_ok=True)
 
 
-def get_db_version(db_path: PathStr) -> int:
+def get_db_version(db_path: Path) -> int:
     """Return user_version value of DB at db_path."""
     with sql_connect(db_path) as conn:
         return list(conn.execute(SQL_DB_VERSION))[0][0]
@@ -164,9 +161,8 @@ class Config:
                 if attr_name in d:
                     setattr(self, attr_name, type_(d[attr_name]))
         set_attrs_from_dict(DEFAULTS)
-        if isfile(PATH_CONFFILE):
-            with open(PATH_CONFFILE, 'r', encoding='utf8') as f:
-                conffile = json_load(f)
+        if PATH_CONFFILE.is_file():
+            conffile = json_loads(PATH_CONFFILE.read_text(encoding='utf8'))
             set_attrs_from_dict(conffile)
         set_attrs_from_dict({k[len(ENVIRON_PREFIX):].lower(): v
                              for k, v in environ.items()
@@ -176,20 +172,18 @@ class Config:
 class DbConnection:
     """Wrapped sqlite3.Connection."""
 
-    def __init__(self, path: PathStr = PATH_DB) -> None:
+    def __init__(self, path: Path = PATH_DB) -> None:
         self._path = path
-        if not isfile(self._path):
-            if path_exists(self._path):
+        if not self._path.is_file():
+            if self._path.exists():
                 raise HandledException(f'no DB at {self._path}; would create, '
                                        'but something\'s already there?')
-            path_db_dir = dirname(self._path)
-            if not isdir(path_db_dir):
+            if not self._path.parent.is_dir():
                 raise NotFoundException(
-                        f'cannot find {path_db_dir} as directory to put DB '
-                        f'into, did you run {NAME_INSTALLER}?')
+                        f'cannot find {self._path.parent} as directory to put '
+                        f'DB into, did you run {NAME_INSTALLER}?')
             with sql_connect(self._path) as conn:
-                with open(PATH_DB_SCHEMA, 'r', encoding='utf8') as f:
-                    conn.executescript(f.read())
+                conn.executescript(PATH_DB_SCHEMA.read_text(encoding='utf8'))
                 conn.execute(f'{SQL_DB_VERSION} = {EXPECTED_DB_VERSION}')
         cur_version = get_db_version(self._path)
         if cur_version != EXPECTED_DB_VERSION:
@@ -227,6 +221,9 @@ class DbData:
         kwargs = {}
         for i, col_name in enumerate(cls._cols):
             kwargs[col_name] = row[i]
+        for attr_name, type_ in cls.__annotations__.items():
+            if attr_name in kwargs:
+                kwargs[attr_name] = type_(kwargs[attr_name])
         return cls(**kwargs)
 
     @classmethod
@@ -252,7 +249,8 @@ class DbData:
         vals = [getattr(self, col_name) for col_name in self._cols]
         q_marks = '(' + ','.join(['?'] * len(vals)) + ')'
         sql = SqlText(f'REPLACE INTO {self._table_name} VALUES {q_marks}')
-        return conn.exec(sql, tuple(vals))
+        return conn.exec(sql, tuple(str(v) if isinstance(v, Path) else v
+                                    for v in vals))
 
 
 class YoutubeQuery(DbData):
@@ -350,9 +348,10 @@ class VideoFile(DbData):
     _table_name = 'files'
     _cols = ('rel_path', 'yt_id', 'flags', 'last_update')
     last_update: DatetimeStr
+    rel_path: Path
 
     def __init__(self,
-                 rel_path: PathStr,
+                 rel_path: Path,
                  yt_id: YoutubeId,
                  flags: FlagsInt = FlagsInt(0),
                  last_update: Optional[DatetimeStr] = None
@@ -385,22 +384,17 @@ class VideoFile(DbData):
     @property
     def rel_path_b64(self) -> B64Str:
         """Return .rel_path as urlsafe_b64 e3ncoding."""
-        return B64Str(urlsafe_b64encode(self.rel_path.encode()).decode())
+        return B64Str(urlsafe_b64encode(str(self.rel_path).encode()).decode())
 
     @property
-    def full_path(self) -> PathStr:
+    def full_path(self) -> Path:
         """Return self.rel_path suffixed under PATH_DOWNLOADS."""
-        return PathStr(path_join(PATH_DOWNLOADS, self.rel_path))
-
-    @property
-    def basename(self) -> PathStr:
-        """Return basename(self.rel_path)."""
-        return PathStr(basename(self.rel_path))
+        return PATH_DOWNLOADS.joinpath(self.rel_path)
 
     @property
     def present(self) -> bool:
         """Return if file exists in filesystem."""
-        return path_exists(self.full_path)
+        return self.full_path.exists()
 
     @property
     def missing(self) -> bool:
@@ -423,11 +417,10 @@ class VideoFile(DbData):
 
     def ensure_absence_if_deleted(self) -> None:
         """If 'delete' flag set, ensure no actual file in filesystem."""
-        if (self.is_flag_set(FlagName('delete'))
-                and path_exists(self.full_path)):
+        if self.is_flag_set(FlagName('delete')) and self.present:
             print(f'SYNC: {self.rel_path} set "delete", '
                   'removing from filesystem.')
-            os_remove(self.full_path)
+            self.full_path.unlink()
 
 
 class QuotaLog(DbData):
@@ -503,11 +496,10 @@ class Player:
         conn = DbConnection()
         known_files = {f.full_path: f for f in VideoFile.get_all(conn)}
         conn.commit_close()
-        self._files = [known_files[PathStr(e.path)]
-                       for e in scandir(PATH_DOWNLOADS)
-                       if e.path in known_files
-                       and isfile(e.path)
-                       and splitext(e.path)[1][1:] in LEGAL_EXTENSIONS]
+        self._files = [known_files[p] for p in PATH_DOWNLOADS.iterdir()
+                       if p in known_files
+                       and p.is_file()
+                       and p.suffix[1:] in LEGAL_EXTENSIONS]
         shuffle(self._files)
         self._idx = 0
 
@@ -652,17 +644,15 @@ class DownloadsManager:
 
     def _sync_db(self):
         conn = DbConnection()
-        files_via_db = VideoFile.get_all(conn)
-        old_cwd = getcwd()
+        known_paths = [file.rel_path for file in VideoFile.get_all(conn)]
+        old_cwd = Path.cwd()
         chdir(PATH_DOWNLOADS)
-        paths = [file.rel_path for file in files_via_db]
-        for path in [PathStr(basename(e.path)) for e in scandir()
-                     if isfile(e.path)]:
-            if path not in paths:
-                yt_id = self._id_from_filename(path)
-                file = VideoFile(path, yt_id)
-                print(f'SYNC: new file {path}, saving with YT ID "{yt_id}".')
-                file.save(conn)
+        for path in [p for p in Path('.').iterdir()
+                     if p.is_file() and p not in known_paths]:
+            yt_id = self._id_from_filename(path)
+            file = VideoFile(path, yt_id)
+            print(f'SYNC: new file {path}, saving with YT ID "{yt_id}".')
+            file.save(conn)
         self._files = VideoFile.get_all(conn)
         for file in self._files:
             file.ensure_absence_if_deleted()
@@ -670,28 +660,22 @@ class DownloadsManager:
         conn.commit_close()
 
     @staticmethod
-    def _id_from_filename(path: PathStr,
-                          double_split: bool = False
-                          ) -> YoutubeId:
-        before_ext = splitext(path)[0]
-        if double_split:
-            before_ext = splitext(before_ext)[0]
-        return YoutubeId(before_ext.split('[')[-1].split(']')[0])
+    def _id_from_filename(path: Path) -> YoutubeId:
+        return YoutubeId(path.stem.split('[')[-1].split(']')[0])
 
     @property
     def ids_unfinished(self) -> set[YoutubeId]:
         """Return set of IDs of videos awaiting or currently in download."""
         in_temp_dir = []
-        for path in [PathStr(e.path) for e
-                     in scandir(PATH_TEMP) if isfile(e.path)]:
+        for path in [p for p in PATH_TEMP.iterdir() if p.is_file()]:
             in_temp_dir += [self._id_from_filename(path)]
         return set(self._to_download + in_temp_dir)
 
     def clean_unfinished(self) -> None:
         """Empty temp directory of unfinished downloads."""
-        for e in [e for e in scandir(PATH_TEMP) if isfile(e.path)]:
-            print(f'removing unfinished download: {e.path}')
-            os_remove(e.path)
+        for path in [p for p in PATH_TEMP.iterdir() if p.is_file()]:
+            print(f'removing unfinished download: {path}')
+            path.unlink()
 
     def queue_download(self, video_id: YoutubeId) -> None:
         """Add video_id to download queue *if* not already processed."""
@@ -748,14 +732,14 @@ class TaskHandler(BaseHTTPRequestHandler):
         if content:
             self.wfile.write(content)
 
-    def _redirect(self, target: PathStr) -> None:
-        self._send_http(headers=[('Location', target)], code=302)
+    def _redirect(self, target: Path) -> None:
+        self._send_http(headers=[('Location', str(target))], code=302)
 
     def do_POST(self) -> None:  # pylint:disable=invalid-name
         """Map POST requests to handlers for various paths."""
         url = urlparse(self.path)
-        toks_url: list[str] = url.path.split('/')
-        page_name = toks_url[1]
+        toks_url = Path(url.path).parts
+        page_name = Path(toks_url[1] if len(toks_url) > 1 else '')
         body_length = int(self.headers['content-length'])
         postvars = parse_qs(self.rfile.read(body_length).decode())
         if PAGE_NAMES['playlist'] == page_name:
@@ -784,7 +768,7 @@ class TaskHandler(BaseHTTPRequestHandler):
         elif command.startswith('down_'):
             self.server.player.move_entry(int(command.split('_')[1]), False)
         sleep(0.5)  # avoid redir happening before current_file update
-        self._redirect(PathStr('/'))
+        self._redirect(Path('/'))
 
     def _receive_video_flag(self,
                             rel_path_b64: B64Str,
@@ -799,7 +783,9 @@ class TaskHandler(BaseHTTPRequestHandler):
         file.save(conn)
         conn.commit_close()
         file.ensure_absence_if_deleted()
-        self._redirect(PathStr(f'/{PAGE_NAMES["file"]}/{rel_path_b64}'))
+        self._redirect(Path('/')
+                       .joinpath(PAGE_NAMES['file'])
+                       .joinpath(rel_path_b64))
 
     def _receive_yt_query(self, query_txt: QueryText) -> None:
         conn = DbConnection()
@@ -822,7 +808,7 @@ class TaskHandler(BaseHTTPRequestHandler):
                 ids_to_detail += [video_id]
                 snippet = item['snippet']
                 urlretrieve(snippet['thumbnails']['default']['url'],
-                            path_join(PATH_THUMBNAILS, f'{video_id}.jpg'))
+                            PATH_THUMBNAILS.joinpath(f'{video_id}.jpg'))
                 results += [YoutubeVideo(id_=video_id,
                                          title=snippet['title'],
                                          description=snippet['description'],
@@ -851,16 +837,18 @@ class TaskHandler(BaseHTTPRequestHandler):
             result.save(conn)
             result.save_to_query(conn, query_data.id_)
         conn.commit_close()
-        self._redirect(PathStr(f'/{PAGE_NAMES["yt_query"]}/{query_data.id_}'))
+        self._redirect(Path('/')
+                       .joinpath(PAGE_NAMES['yt_query'])
+                       .joinpath(query_data.id_))
 
     def do_GET(self) -> None:  # pylint:disable=invalid-name
         """Map GET requests to handlers for various paths."""
         url = urlparse(self.path)
-        toks_url: list[str] = url.path.split('/')
-        page_name = toks_url[1]
+        toks_url = Path(url.path).parts
+        page_name = Path(toks_url[1] if len(toks_url) > 1 else '')
         try:
             if PAGE_NAMES['thumbnails'] == page_name:
-                self._send_thumbnail(PathStr(toks_url[2]))
+                self._send_thumbnail(Path(toks_url[2]))
             elif PAGE_NAMES['download'] == page_name:
                 self._send_or_download_video(YoutubeId(toks_url[2]))
             elif PAGE_NAMES['files'] == page_name:
@@ -886,27 +874,27 @@ class TaskHandler(BaseHTTPRequestHandler):
             self._send_http(bytes(str(e), 'utf8'), code=404)
 
     def _send_rendered_template(self,
-                                tmpl_name: PathStr,
+                                tmpl_name: Path,
                                 tmpl_ctx: TemplateContext
                                 ) -> None:
-        tmpl = self.server.jinja.get_template(tmpl_name)
+        tmpl = self.server.jinja.get_template(str(tmpl_name))
         tmpl_ctx['page_names'] = PAGE_NAMES
         html = tmpl.render(**tmpl_ctx)
         self._send_http(bytes(html, 'utf8'))
 
-    def _send_thumbnail(self, filename: PathStr) -> None:
+    def _send_thumbnail(self, filename: Path) -> None:
         _ensure_expected_dirs([PATH_THUMBNAILS])
-        path_thumbnail = path_join(PATH_THUMBNAILS, filename)
-        if not path_exists(path_thumbnail):
-            video_id = splitext(filename)[0]
+        path_thumbnail = PATH_THUMBNAILS.joinpath(filename)
+        if not path_thumbnail.exists():
+            video_id = filename.stem
             url = f'{THUMBNAIL_URL_PREFIX}{video_id}{THUMBNAIL_URL_SUFFIX}'
             try:
-                urlretrieve(url, path_join(PATH_THUMBNAILS, f'{video_id}.jpg'))
+                urlretrieve(url, PATH_THUMBNAILS.joinpath(f'{video_id}.jpg'))
             except HTTPError as e:
                 if 404 == e.code:
                     raise NotFoundException from e
                 raise e
-        with open(path_thumbnail, 'rb') as f:
+        with path_thumbnail.open('rb') as f:
             img = f.read()
         self._send_http(img, [('Content-type', 'image/jpg')])
 
@@ -917,10 +905,12 @@ class TaskHandler(BaseHTTPRequestHandler):
         except NotFoundException:
             conn.commit_close()
             self.server.downloads.queue_download(video_id)
-            self._redirect(PathStr(f'/{PAGE_NAMES["yt_result"]}/{video_id}'))
+            self._redirect(Path('/')
+                           .joinpath(PAGE_NAMES['yt_result'])
+                           .joinpath(video_id))
             return
         conn.commit_close()
-        with open(file_data.full_path, 'rb') as video_file:
+        with file_data.full_path.open('rb') as video_file:
             video = video_file.read()
         self._send_http(content=video)
 
@@ -974,7 +964,7 @@ class TaskHandler(BaseHTTPRequestHandler):
     def _send_files_index(self, filter_: ParamsStr, show_absent: bool) -> None:
         conn = DbConnection()
         files = [f for f in VideoFile.get_all(conn)
-                 if (filter_ in f.rel_path) and (show_absent or f.present)]
+                 if filter_ in str(f.rel_path) and (show_absent or f.present)]
         conn.commit_close()
         files.sort(key=lambda t: t.rel_path)
         self._send_rendered_template(