home · contact · privacy
Have install.sh also place executable code, including ~/.config/bin/ytplom wrapping...
authorChristian Heller <c.heller@plomlompom.de>
Sun, 24 Nov 2024 23:11:39 +0000 (00:11 +0100)
committerChristian Heller <c.heller@plomlompom.de>
Sun, 24 Nov 2024 23:11:39 +0000 (00:11 +0100)
install.sh
install_to_share/requirements.txt [new file with mode: 0644]
install_to_share/ytplom.py [new file with mode: 0755]
install_to_share/ytplom/__init__.py [new file with mode: 0644]
install_to_share/ytplom/misc.py [new file with mode: 0644]
requirements.txt [deleted file]
ytplom [new file with mode: 0755]
ytplom.py [deleted file]
ytplom/__init__.py [deleted file]
ytplom/misc.py [deleted file]

index 0661178cf52146d7ad12e3d60b268c610dc2bc0f..085c1874ad66a39e16a1251b270d371b1db9428d 100755 (executable)
@@ -3,5 +3,8 @@ set -e
 set -x
 
 PATH_APP_SHARE=~/.local/share/ytplom
-mkdir -p "${PATH_APP_SHARE}"
+PATH_LOCAL_BIN=~/.local/bin
+
+mkdir -p "${PATH_APP_SHARE}" "${PATH_LOCAL_BIN}"
 cp -r ./install_to_share/* "${PATH_APP_SHARE}/" 
+cp ytplom "${PATH_LOCAL_BIN}/"
diff --git a/install_to_share/requirements.txt b/install_to_share/requirements.txt
new file mode 100644 (file)
index 0000000..6303919
--- /dev/null
@@ -0,0 +1,5 @@
+google-api-python-client==2.154.0
+Jinja2==3.1.4
+python-mpv==1.0.7
+scp==0.15.0
+yt-dlp==2024.11.18
diff --git a/install_to_share/ytplom.py b/install_to_share/ytplom.py
new file mode 100755 (executable)
index 0000000..a3f4c79
--- /dev/null
@@ -0,0 +1,18 @@
+#!/usr/bin/env python3
+"""Minimalistic download-focused YouTube interface."""
+from threading import Thread
+from ytplom.misc import DownloadsDb, HTTP_PORT, Server, TaskHandler
+
+
+if __name__ == '__main__':
+    downloads_db = DownloadsDb()
+    downloads_db.clean_unfinished()
+    Thread(target=downloads_db.download_loop, daemon=False).start()
+    server = Server(downloads_db, ('0.0.0.0', HTTP_PORT), TaskHandler)
+    print(f'running at port {HTTP_PORT}')
+    try:
+        server.serve_forever()
+    except KeyboardInterrupt:
+        print('aborted due to keyboard interrupt; '
+              'repeat to end download thread too')
+    server.server_close()
diff --git a/install_to_share/ytplom/__init__.py b/install_to_share/ytplom/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/install_to_share/ytplom/misc.py b/install_to_share/ytplom/misc.py
new file mode 100644 (file)
index 0000000..647ad39
--- /dev/null
@@ -0,0 +1,853 @@
+"""Main ytplom lib."""
+
+# included libs
+from typing import TypeAlias, Optional, NewType, Callable, Self, Any
+from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove
+from os.path import (dirname, isdir, isfile, exists as path_exists,
+                     join as path_join, splitext, basename)
+from random import shuffle
+from time import time, sleep
+from datetime import datetime, timedelta
+from json import dumps as json_dumps
+from uuid import uuid4
+from sqlite3 import connect as sql_connect, Cursor, Row
+from http.server import HTTPServer, BaseHTTPRequestHandler
+from urllib.parse import urlparse, parse_qs
+from urllib.request import urlretrieve
+from urllib.error import HTTPError
+# non-included libs
+from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
+from mpv import MPV  # type: ignore
+from yt_dlp import YoutubeDL  # type: ignore
+import googleapiclient.discovery  # type: ignore
+
+# what we might want to manually define per environs
+API_KEY = environ.get('GOOGLE_API_KEY')
+HTTP_PORT = int(environ.get('YTPLOM_PORT', 8084))
+
+# type definitions for mypy
+DatetimeStr = NewType('DatetimeStr', str)
+QuotaCost = NewType('QuotaCost', int)
+YoutubeId = NewType('YoutubeId', str)
+PathStr = NewType('PathStr', str)
+QueryId = NewType('QueryId', str)
+QueryText = NewType('QueryText', str)
+ProseText = NewType('ProseText', str)
+SqlText = NewType('SqlText', str)
+FlagName = NewType('FlagName', str)
+FlagsInt = NewType('FlagsInt', int)
+AmountDownloads = NewType('AmountDownloads', int)
+PlayerUpdateId = NewType('PlayerUpdateId', str)
+DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr]
+TemplateContext: TypeAlias = dict[
+        str, None | bool | PlayerUpdateId | Optional[PathStr] | YoutubeId
+        | QueryText | QuotaCost | list[FlagName] | 'VideoFile' | 'YoutubeVideo'
+        | list['YoutubeVideo'] | list['YoutubeQuery']
+        | list[tuple[YoutubeId, PathStr]] | list[tuple[PathStr, PathStr]]]
+
+# major expected directories
+PATH_HOME = PathStr(environ.get('HOME', ''))
+PATH_APP_DATA = PathStr(path_join(PATH_HOME, '.local/share/ytplom'))
+PATH_CACHE = PathStr(path_join(PATH_HOME, '.cache/ytplom'))
+
+# paths for rather dynamic data
+PATH_DOWNLOADS = PathStr(path_join(PATH_HOME, 'ytplom_downloads'))
+PATH_DB = PathStr(path_join(PATH_APP_DATA, 'db.sql'))
+PATH_TEMP = PathStr(path_join(PATH_CACHE, 'temp'))
+PATH_THUMBNAILS = PathStr(path_join(PATH_CACHE, 'thumbnails'))
+
+# template paths
+PATH_TEMPLATES = PathStr(path_join(PATH_APP_DATA, 'templates'))
+NAME_TEMPLATE_QUERIES = PathStr('queries.tmpl')
+NAME_TEMPLATE_RESULTS = PathStr('results.tmpl')
+NAME_TEMPLATE_VIDEOS = PathStr('videos.tmpl')
+NAME_TEMPLATE_VIDEO = PathStr('video.tmpl')
+NAME_TEMPLATE_YT_VIDEO = PathStr('yt_video.tmpl')
+NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl')
+
+# yt_dlp config
+YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
+        '/best[height<=1080][width<=1920]'
+YT_DL_PARAMS = {'paths': {'home': PATH_DOWNLOADS,
+                          'temp': PATH_TEMP},
+                'format': YT_DOWNLOAD_FORMAT}
+
+# Youtube API expectations
+YOUTUBE_URL_PREFIX = PathStr('https://www.youtube.com/watch?v=')
+THUMBNAIL_URL_PREFIX = PathStr('https://i.ytimg.com/vi/')
+THUMBNAIL_URL_SUFFIX = PathStr('/default.jpg')
+QUOTA_COST_YOUTUBE_SEARCH = QuotaCost(100)
+QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1)
+
+# local expectations
+TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
+LEGAL_EXTENSIONS = {'webm', 'mp4', 'mkv'}
+
+# tables to create database with
+EXPECTED_DB_VERSION = 0
+SQL_DB_VERSION = SqlText('PRAGMA user_version')
+PATH_MIGRATIONS = PathStr(path_join(PATH_APP_DATA, 'migrations'))
+PATH_DB_SCHEMA = PathStr(path_join(PATH_MIGRATIONS,
+                                   f'init_{EXPECTED_DB_VERSION}.sql'))
+
+# other
+NAME_INSTALLER = 'install.sh'
+VIDEO_FLAGS: dict[FlagName, FlagsInt] = {
+  FlagName('delete'): FlagsInt(1 << 62)
+}
+
+
+class NotFoundException(Exception):
+    """Raise on expected data missing, e.g. DB fetches finding nothing."""
+
+
+class HandledException(Exception):
+    """Raise in any other case where we know what's happening."""
+
+
+def _ensure_expected_dirs(expected_dirs: list[PathStr]) -> None:
+    """Ensure existance of expected_dirs _as_ directories."""
+    for dir_name in expected_dirs:
+        if not isdir(dir_name):
+            if path_exists(dir_name):
+                raise HandledException(f'at expected directory path {dir_name}'
+                                       'found non-directory')
+            print(f'creating expected directory: {dir_name}')
+            makedirs(dir_name)
+
+
+class DatabaseConnection:
+    """Wrapped sqlite3.Connection."""
+
+    def __init__(self, path: PathStr = PATH_DB) -> None:
+        self._path = path
+        if not isfile(self._path):
+            if path_exists(self._path):
+                raise HandledException(f'no DB at {self._path}; would create, '
+                                       'but something\'s already there?')
+            path_db_dir = dirname(self._path)
+            if not isdir(path_db_dir):
+                raise NotFoundException(
+                        f'cannot find {path_db_dir} as directory to put DB '
+                        f'into, did you run {NAME_INSTALLER}?')
+            with sql_connect(self._path) as conn:
+                with open(PATH_DB_SCHEMA, 'r', encoding='utf8') as f:
+                    conn.executescript(f.read())
+                conn.execute(f'{SQL_DB_VERSION} = {EXPECTED_DB_VERSION}')
+        with sql_connect(self._path) as conn:
+            db_version = list(conn.execute(SQL_DB_VERSION))[0][0]
+        if db_version != EXPECTED_DB_VERSION:
+            raise HandledException(f'wrong database version {db_version}, '
+                                   f'expected: {EXPECTED_DB_VERSION}')
+        self._conn = sql_connect(self._path)
+
+    def exec(self, sql: SqlText, inputs: tuple[Any, ...] = tuple()) -> Cursor:
+        """Wrapper around sqlite3.Connection.execute."""
+        return self._conn.execute(sql, inputs)
+
+    def commit_close(self) -> None:
+        """Run sqlite3.Connection.commit and .close."""
+        self._conn.commit()
+        self._conn.close()
+
+
+class DbData:
+    """Abstraction of common DB operation."""
+    _table_name: str
+    _cols: tuple[str, ...]
+
+    def __eq__(self, other: Any) -> bool:
+        if not isinstance(other, self.__class__):
+            return False
+        for attr_name in self._cols:
+            if getattr(self, attr_name) != getattr(other, attr_name):
+                return False
+        return True
+
+    @classmethod
+    def _from_table_row(cls, row: Row) -> Self:
+        kwargs = {}
+        for i, col_name in enumerate(cls._cols):
+            kwargs[col_name] = row[i]
+        return cls(**kwargs)
+
+    @classmethod
+    def get_one(cls, conn: DatabaseConnection, id_: str) -> Self:
+        """Return single entry of id_ from DB."""
+        sql = SqlText(f'SELECT * FROM {cls._table_name} WHERE id = ?')
+        row = conn.exec(sql, (id_,)).fetchone()
+        if not row:
+            msg = f'no entry found for ID "{id_}" in table {cls._table_name}'
+            raise NotFoundException(msg)
+        return cls._from_table_row(row)
+
+    @classmethod
+    def get_all(cls, conn: DatabaseConnection) -> list[Self]:
+        """Return all entries from DB."""
+        sql = SqlText(f'SELECT * FROM {cls._table_name}')
+        rows = conn.exec(sql).fetchall()
+        return [cls._from_table_row(row) for row in rows]
+
+    def save(self, conn: DatabaseConnection) -> Cursor:
+        """Save entry to DB."""
+        vals = [getattr(self, col_name) for col_name in self._cols]
+        q_marks = '(' + ','.join(['?'] * len(vals)) + ')'
+        sql = SqlText(f'REPLACE INTO {self._table_name} VALUES {q_marks}')
+        return conn.exec(sql, tuple(vals))
+
+
+class YoutubeQuery(DbData):
+    """Representation of YouTube query (without results)."""
+    _table_name = 'yt_queries'
+    _cols = ('id_', 'text', 'retrieved_at')
+
+    def __init__(self,
+                 id_: Optional[QueryId],
+                 text: QueryText,
+                 retrieved_at: DatetimeStr
+                 ) -> None:
+        self.id_ = id_ if id_ else QueryId(str(uuid4()))
+        self.text = QueryText(text)
+        self.retrieved_at = retrieved_at
+
+    @classmethod
+    def get_all_for_video(cls,
+                          conn: DatabaseConnection,
+                          video_id: YoutubeId
+                          ) -> list[Self]:
+        """Return YoutubeQueries containing YoutubeVideo's ID in results."""
+        sql = SqlText('SELECT query_id FROM '
+                      'yt_query_results WHERE video_id = ?')
+        query_ids = conn.exec(sql, (video_id,)).fetchall()
+        return [cls.get_one(conn, query_id_tup[0])
+                for query_id_tup in query_ids]
+
+
+class YoutubeVideo(DbData):
+    """Representation of YouTube video metadata as provided by their API."""
+    _table_name = 'yt_videos'
+    _cols = ('id_', 'title', 'description', 'published_at', 'duration',
+             'definition')
+
+    def __init__(self,
+                 id_: YoutubeId,
+                 title: ProseText = ProseText('?'),
+                 description: ProseText = ProseText('?'),
+                 published_at: DatetimeStr = DatetimeStr('?'),
+                 duration: str = '?',
+                 definition: str = '?'
+                 ) -> None:
+        self.id_ = id_
+        self.title = title
+        self.description = description
+        self.published_at = published_at
+        self.duration = duration
+        self.definition = definition
+
+    def set_duration_from_yt_string(self, yt_string: str) -> None:
+        """Set .duration from the kind of format the YouTube API provides."""
+        date_dur, time_dur = yt_string.split('T')
+        seconds = 0
+        date_dur = date_dur[1:]
+        for dur_char, len_seconds in (('Y', 60*60*24*365.25),
+                                      ('M', 60*60*24*30),
+                                      ('D', 60*60*24)):
+            if dur_char in date_dur:
+                dur_str, date_dur = date_dur.split(dur_char)
+                seconds += int(dur_str) * int(len_seconds)
+        for dur_char, len_seconds in (('H', 60*60),
+                                      ('M', 60),
+                                      ('S', 1)):
+            if dur_char in time_dur:
+                dur_str, time_dur = time_dur.split(dur_char)
+                seconds += int(dur_str) * len_seconds
+        seconds_str = str(seconds % 60)
+        minutes_str = str(seconds // 60)
+        hours_str = str(seconds // (60 * 60))
+        self.duration = ':'.join([f'0{s}' if len(s) == 1 else s for s
+                                  in (hours_str, minutes_str, seconds_str)])
+
+    @classmethod
+    def get_all_for_query(cls,
+                          conn: DatabaseConnection,
+                          query_id: QueryId
+                          ) -> list[Self]:
+        """Return all videos for query of query_id."""
+        sql = SqlText('SELECT video_id '
+                      'FROM yt_query_results WHERE query_id = ?')
+        video_ids = conn.exec(sql, (query_id,)).fetchall()
+        return [cls.get_one(conn, video_id_tup[0])
+                for video_id_tup in video_ids]
+
+    def save_to_query(self,
+                      conn: DatabaseConnection,
+                      query_id: QueryId
+                      ) -> None:
+        """Save inclusion of self in results to query of query_id."""
+        conn.exec(SqlText('REPLACE INTO yt_query_results VALUES (?, ?)'),
+                  (query_id, self.id_))
+
+
+class VideoFile(DbData):
+    """Collects data about downloaded files."""
+    _table_name = 'files'
+    _cols = ('rel_path', 'yt_id', 'flags')
+
+    def __init__(self, rel_path: PathStr, yt_id: YoutubeId, flags=FlagsInt(0)
+                 ) -> None:
+        self.rel_path = rel_path
+        self.yt_id = yt_id
+        self.flags = flags
+
+    @classmethod
+    def get_by_yt_id(cls, conn: DatabaseConnection, yt_id: YoutubeId) -> Self:
+        """Return VideoFile of .yt_id."""
+        sql = SqlText(f'SELECT * FROM {cls._table_name} WHERE yt_id = ?')
+        row = conn.exec(sql, (yt_id,)).fetchone()
+        if not row:
+            raise NotFoundException(f'no entry for file to Youtube ID {yt_id}')
+        return cls._from_table_row(row)
+
+    @property
+    def full_path(self) -> PathStr:
+        """Return self.rel_path suffixed under PATH_DOWNLOADS."""
+        return PathStr(path_join(PATH_DOWNLOADS, self.rel_path))
+
+    @property
+    def present(self) -> bool:
+        """Return if file exists in filesystem."""
+        return path_exists(self.full_path)
+
+    @property
+    def missing(self) -> bool:
+        """Return if file absent despite absence of 'delete' flag."""
+        return not (self.flag_set(FlagName('delete')) or self.present)
+
+    def flag_set(self, flag_name: FlagName) -> bool:
+        """Return if flag of flag_name is set in self.flags."""
+        return self.flags & VIDEO_FLAGS[flag_name]
+
+    def ensure_absence_if_deleted(self) -> None:
+        """If 'delete' flag set, ensure no actual file in filesystem."""
+        if self.flag_set(FlagName('delete')) and path_exists(self.full_path):
+            print(f'SYNC: {self.rel_path} set "delete", '
+                  'removing from filesystem.')
+            os_remove(self.full_path)
+
+
+class QuotaLog(DbData):
+    """Collects API access quota costs."""
+    _table_name = 'quota_costs'
+    _cols = ('id_', 'timestamp', 'cost')
+
+    def __init__(self,
+                 id_: Optional[str],
+                 timestamp: DatetimeStr,
+                 cost: QuotaCost
+                 ) -> None:
+        self.id_ = id_ if id_ else str(uuid4())
+        self.timestamp = timestamp
+        self.cost = cost
+
+    @classmethod
+    def update(cls, conn: DatabaseConnection, cost: QuotaCost) -> None:
+        """Adds cost mapped to current datetime."""
+        cls._remove_old(conn)
+        new = cls(None,
+                  DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)),
+                  QuotaCost(cost))
+        new.save(conn)
+
+    @classmethod
+    def current(cls, conn: DatabaseConnection) -> QuotaCost:
+        """Returns quota cost total for last 24 hours, purges old data."""
+        cls._remove_old(conn)
+        quota_costs = cls.get_all(conn)
+        return QuotaCost(sum(c.cost for c in quota_costs))
+
+    @classmethod
+    def _remove_old(cls, conn: DatabaseConnection) -> None:
+        cutoff = datetime.now() - timedelta(days=1)
+        sql = SqlText(f'DELETE FROM {cls._table_name} WHERE timestamp < ?')
+        conn.exec(SqlText(sql), (cutoff.strftime(TIMESTAMP_FMT),))
+
+
+class Player:
+    """MPV representation with some additional features."""
+    _idx: int
+
+    def __init__(self) -> None:
+        self.last_update = PlayerUpdateId('')
+        self._load_filenames()
+        self._mpv: Optional[MPV] = None
+
+    def _load_filenames(self) -> None:
+        self._filenames = [PathStr(e.path) for e in scandir(PATH_DOWNLOADS)
+                           if isfile(e.path)
+                           and splitext(e.path)[1][1:] in LEGAL_EXTENSIONS]
+        shuffle(self._filenames)
+        self._idx = 0
+
+    @property
+    def _mpv_available(self) -> bool:
+        return bool(self._mpv and not self._mpv.core_shutdown)
+
+    @staticmethod
+    def _if_mpv_available(f) -> Callable:
+        def wrapper(self):
+            return f(self) if self._mpv else None
+        return wrapper
+
+    def _signal_update(self) -> None:
+        self.last_update = PlayerUpdateId(f'{self._idx}:{time()}')
+
+    def _start_mpv(self) -> None:
+        self._mpv = MPV(input_default_bindings=True,
+                        input_vo_keyboard=True,
+                        config=True)
+        self._mpv.observe_property('pause', lambda a, b: self._signal_update())
+
+        @self._mpv.event_callback('start-file')
+        def on_start_file(_) -> None:
+            assert self._mpv is not None
+            self._mpv.pause = False
+            self._idx = self._mpv.playlist_pos
+            self._signal_update()
+
+        @self._mpv.event_callback('shutdown')
+        def on_shutdown(_) -> None:
+            self._mpv = None
+            self._signal_update()
+
+        for path in self._filenames:
+            self._mpv.playlist_append(path)
+        self._mpv.playlist_play_index(self._idx)
+
+    @_if_mpv_available
+    def _kill_mpv(self) -> None:
+        assert self._mpv is not None
+        self._mpv.terminate()
+        self._mpv = None
+
+    @property
+    def current_filename(self) -> Optional[PathStr]:
+        """Return what we assume is the name of the currently playing file."""
+        if not self._filenames:
+            return None
+        return PathStr(basename(self._filenames[self._idx]))
+
+    @property
+    def prev_files(self) -> list[PathStr]:
+        """List 'past' files of playlist."""
+        return list(reversed(self._filenames[:self._idx]))
+
+    @property
+    def next_files(self) -> list[PathStr]:
+        """List 'coming' files of playlist."""
+        return self._filenames[self._idx + 1:]
+
+    @property
+    def is_running(self) -> bool:
+        """Return if player is running/available."""
+        return self._mpv_available
+
+    @property
+    def is_paused(self) -> bool:
+        """Return if player is paused."""
+        if self._mpv_available:
+            assert self._mpv is not None
+            return self._mpv.pause
+        return False
+
+    def toggle_run(self) -> None:
+        """Toggle player running."""
+        if self._mpv_available:
+            self._kill_mpv()
+        else:
+            self._start_mpv()
+        self._signal_update()
+
+    @_if_mpv_available
+    def toggle_pause(self) -> None:
+        """Toggle player pausing."""
+        assert self._mpv is not None
+        self._mpv.pause = not self._mpv.pause
+        self._signal_update()
+
+    @_if_mpv_available
+    def prev(self) -> None:
+        """Move player to previous item in playlist."""
+        assert self._mpv is not None
+        if self._mpv.playlist_pos > 0:
+            self._mpv.playlist_prev()
+        else:
+            self._mpv.playlist_play_index(0)
+
+    @_if_mpv_available
+    def next(self) -> None:
+        """Move player to next item in playlist."""
+        assert self._mpv is not None
+        max_idx: int = len(self._mpv.playlist_filenames) - 1
+        if self._mpv.playlist_pos < len(self._mpv.playlist_filenames) - 1:
+            self._mpv.playlist_next()
+        else:
+            self._mpv.playlist_play_index(max_idx)
+
+    def reload(self) -> None:
+        """Close MPV, re-read (and re-shuffle) filenames, then re-start MPV."""
+        self._kill_mpv()
+        self._load_filenames()
+        self._start_mpv()
+        self._signal_update()
+
+
+class DownloadsDb:
+    """Collections downloading-related stuff."""
+
+    def __init__(self) -> None:
+        self._to_download: list[YoutubeId] = []
+        _ensure_expected_dirs([PATH_DOWNLOADS, PATH_TEMP])
+        self._sync_db()
+
+    def _sync_db(self):
+        conn = DatabaseConnection()
+        files_via_db = VideoFile.get_all(conn)
+        old_cwd = getcwd()
+        chdir(PATH_DOWNLOADS)
+        paths = [file.rel_path for file in files_via_db]
+        for path in [PathStr(e.path) for e in scandir() if isfile(e.path)]:
+            if path not in paths:
+                yt_id = self._id_from_filename(path)
+                file = VideoFile(path, yt_id)
+                print(f'SYNC: new file {path}, saving with YT ID "{yt_id}".')
+                file.save(conn)
+        self._files = VideoFile.get_all(conn)
+        for file in self._files:
+            file.ensure_absence_if_deleted()
+        chdir(old_cwd)
+        conn.commit_close()
+
+    @staticmethod
+    def _id_from_filename(path: PathStr,
+                          double_split: bool = False
+                          ) -> YoutubeId:
+        before_ext = splitext(path)[0]
+        if double_split:
+            before_ext = splitext(before_ext)[0]
+        return YoutubeId(before_ext.split('[')[-1].split(']')[0])
+
+    @property
+    def missing(self) -> list[PathStr]:
+        """Return relative paths of files known but not in PATH_DOWNLOADS."""
+        self._sync_db()
+        return [f.rel_path for f in self._files if f.missing]
+
+    @property
+    def ids_to_paths(self) -> DownloadsIndex:
+        """Return mapping YoutubeIds:paths of files downloaded to them."""
+        self._sync_db()
+        return {f.yt_id: f.full_path for f in self._files}
+
+    @property
+    def ids_unfinished(self) -> set[YoutubeId]:
+        """Return set of IDs of videos awaiting or currently in download."""
+        in_temp_dir = []
+        for path in [PathStr(e.path) for e
+                     in scandir(PATH_TEMP) if isfile(e.path)]:
+            in_temp_dir += [self._id_from_filename(path)]
+        return set(self._to_download + in_temp_dir)
+
+    def clean_unfinished(self) -> None:
+        """Empty temp directory of unfinished downloads."""
+        for e in [e for e in scandir(PATH_TEMP) if isfile(e.path)]:
+            print(f'removing unfinished download: {e.path}')
+            os_remove(e.path)
+
+    def queue_download(self, video_id: YoutubeId) -> None:
+        """Add video_id to download queue *if* not already processed."""
+        pre_existing = self.ids_unfinished | set(self._to_download
+                                                 + list(self.ids_to_paths))
+        if video_id not in pre_existing:
+            self._to_download += [video_id]
+
+    def _download_next(self) -> None:
+        if self._to_download:
+            video_id = self._to_download.pop(0)
+            with YoutubeDL(YT_DL_PARAMS) as ydl:
+                ydl.download([f'{YOUTUBE_URL_PREFIX}{video_id}'])
+            self._sync_db()
+
+    def download_loop(self) -> None:
+        """Keep iterating through download queue for new download tasks."""
+        while True:
+            sleep(0.5)
+            self._download_next()
+
+
+class Server(HTTPServer):
+    """Extension of HTTPServer providing for Player and DownloadsDb."""
+
+    def __init__(self, downloads_db: DownloadsDb, *args, **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self.jinja = JinjaEnv(loader=JinjaFSLoader(PATH_TEMPLATES))
+        self.player = Player()
+        self.downloads = downloads_db
+
+
+class TaskHandler(BaseHTTPRequestHandler):
+    """Handler for GET and POST requests to our server."""
+    server: Server
+
+    def _send_http(self,
+                   content: bytes = b'',
+                   headers: Optional[list[tuple[str, str]]] = None,
+                   code: int = 200
+                   ) -> None:
+        headers = headers if headers else []
+        self.send_response(code)
+        for header_tuple in headers:
+            self.send_header(header_tuple[0], header_tuple[1])
+        self.end_headers()
+        if content:
+            self.wfile.write(content)
+
+    def do_POST(self) -> None:  # pylint:disable=invalid-name
+        """Map POST requests to handlers for various paths."""
+        url = urlparse(self.path)
+        toks_url: list[str] = url.path.split('/')
+        page_name = toks_url[1]
+        body_length = int(self.headers['content-length'])
+        postvars = parse_qs(self.rfile.read(body_length).decode())
+        if 'playlist' == page_name:
+            self._post_player_command(list(postvars.keys())[0])
+        elif 'video' == page_name:
+            self._post_video_flag(YoutubeId(toks_url[2]),
+                                  [FlagName(k) for k in postvars])
+        elif 'queries' == page_name:
+            self._post_query(QueryText(postvars['query'][0]))
+
+    def _post_player_command(self, command: str) -> None:
+        if 'pause' == command:
+            self.server.player.toggle_pause()
+        elif 'prev' == command:
+            self.server.player.prev()
+        elif 'next' == command:
+            self.server.player.next()
+        elif 'stop' == command:
+            self.server.player.toggle_run()
+        elif 'reload' == command:
+            self.server.player.reload()
+        sleep(0.5)  # avoid redir happening before current_file update
+        self._send_http(headers=[('Location', '/')], code=302)
+
+    def _post_video_flag(self,
+                         yt_id: YoutubeId,
+                         flag_names: list[FlagName]
+                         ) -> None:
+        conn = DatabaseConnection()
+        file = VideoFile.get_by_yt_id(conn, yt_id)
+        file.flags = 0
+        for flag_name in flag_names:
+            file.flags |= VIDEO_FLAGS[flag_name]
+        file.save(conn)
+        conn.commit_close()
+        file.ensure_absence_if_deleted()
+        self._send_http(headers=[('Location', f'/video/{yt_id}')], code=302)
+
+    def _post_query(self, query_txt: QueryText) -> None:
+        conn = DatabaseConnection()
+
+        def collect_results(query_txt: QueryText) -> list[YoutubeVideo]:
+            youtube = googleapiclient.discovery.build('youtube', 'v3',
+                                                      developerKey=API_KEY)
+            QuotaLog.update(conn, QUOTA_COST_YOUTUBE_SEARCH)
+            search_request = youtube.search().list(
+                    q=query_txt,
+                    part='snippet',
+                    maxResults=25,
+                    safeSearch='none',
+                    type='video')
+            results: list[YoutubeVideo] = []
+            ids_to_detail: list[YoutubeId] = []
+            for item in search_request.execute()['items']:
+                video_id: YoutubeId = item['id']['videoId']
+                ids_to_detail += [video_id]
+                snippet = item['snippet']
+                urlretrieve(snippet['thumbnails']['default']['url'],
+                            path_join(PATH_THUMBNAILS, f'{video_id}.jpg'))
+                results += [YoutubeVideo(id_=video_id,
+                                         title=snippet['title'],
+                                         description=snippet['description'],
+                                         published_at=snippet['publishedAt'])]
+            QuotaLog.update(conn, QUOTA_COST_YOUTUBE_DETAILS)
+            ids_for_details = ','.join([r.id_ for r in results])
+            videos_request = youtube.videos().list(id=ids_for_details,
+                                                   part='content_details')
+            unfinished_streams: list[YoutubeId] = []
+            for i, detailed in enumerate(videos_request.execute()['items']):
+                result = results[i]
+                assert result.id_ == detailed['id']
+                content_details: dict[str, str] = detailed['contentDetails']
+                if 'P0D' == content_details['duration']:
+                    unfinished_streams += [result.id_]
+                    continue
+                result.set_duration_from_yt_string(content_details['duration'])
+                result.definition = content_details['definition'].upper()
+            return [r for r in results if r.id_ not in unfinished_streams]
+
+        query_data = YoutubeQuery(
+                None, query_txt,
+                DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)))
+        query_data.save(conn)
+        for result in collect_results(query_txt):
+            result.save(conn)
+            result.save_to_query(conn, query_data.id_)
+        conn.commit_close()
+        self._send_http(headers=[('Location', f'/query/{query_data.id_}')],
+                        code=302)
+
+    def do_GET(self) -> None:  # pylint:disable=invalid-name
+        """Map GET requests to handlers for various paths."""
+        url = urlparse(self.path)
+        toks_url: list[str] = url.path.split('/')
+        page_name = toks_url[1]
+        try:
+            if 'thumbnails' == page_name:
+                self._send_thumbnail(PathStr(toks_url[2]))
+            elif 'dl' == page_name:
+                self._send_or_download_video(YoutubeId(toks_url[2]))
+            elif 'videos' == page_name:
+                self._send_videos_index()
+            elif 'video' == page_name:
+                self._send_video_data(YoutubeId(toks_url[2]))
+            elif 'yt_video' == page_name:
+                self._send_yt_video_data(YoutubeId(toks_url[2]))
+            elif 'missing.json' == page_name:
+                self._send_missing_json()
+            elif 'query' == page_name:
+                self._send_query_page(QueryId(toks_url[2]))
+            elif 'queries' == page_name:
+                self._send_queries_index_and_search()
+            elif '_last_playlist_update.json' == page_name:
+                self._send_last_playlist_update()
+            else:  # e.g. for /
+                self._send_playlist()
+        except NotFoundException as e:
+            self._send_http(bytes(str(e), 'utf8'), code=404)
+
+    def _send_rendered_template(self,
+                                tmpl_name: PathStr,
+                                tmpl_ctx: TemplateContext
+                                ) -> None:
+        tmpl = self.server.jinja.get_template(tmpl_name)
+        html = tmpl.render(**tmpl_ctx)
+        self._send_http(bytes(html, 'utf8'))
+
+    def _send_thumbnail(self, filename: PathStr) -> None:
+        _ensure_expected_dirs([PATH_THUMBNAILS])
+        path_thumbnail = path_join(PATH_THUMBNAILS, filename)
+        if not path_exists(path_thumbnail):
+            video_id = splitext(filename)[0]
+            url = f'{THUMBNAIL_URL_PREFIX}{video_id}{THUMBNAIL_URL_SUFFIX}'
+            try:
+                urlretrieve(url, path_join(PATH_THUMBNAILS, f'{video_id}.jpg'))
+            except HTTPError as e:
+                if 404 == e.code:
+                    raise NotFoundException from e
+                raise e
+        with open(path_thumbnail, 'rb') as f:
+            img = f.read()
+        self._send_http(img, [('Content-type', 'image/jpg')])
+
+    def _send_or_download_video(self, video_id: YoutubeId) -> None:
+        if video_id in self.server.downloads.ids_to_paths:
+            with open(self.server.downloads.ids_to_paths[video_id],
+                      'rb') as video_file:
+                video = video_file.read()
+            self._send_http(content=video)
+            return
+        self.server.downloads.queue_download(video_id)
+        self._send_http(headers=[('Location', f'/yt_video/{video_id}')],
+                        code=302)
+
+    def _send_query_page(self, query_id: QueryId) -> None:
+        conn = DatabaseConnection()
+        query = YoutubeQuery.get_one(conn, str(query_id))
+        results = YoutubeVideo.get_all_for_query(conn, query_id)
+        conn.commit_close()
+        self._send_rendered_template(
+                NAME_TEMPLATE_RESULTS,
+                {'query': query.text, 'videos': results})
+
+    def _send_queries_index_and_search(self) -> None:
+        conn = DatabaseConnection()
+        quota_count = QuotaLog.current(conn)
+        queries_data = YoutubeQuery.get_all(conn)
+        conn.commit_close()
+        queries_data.sort(key=lambda q: q.retrieved_at, reverse=True)
+        self._send_rendered_template(
+                NAME_TEMPLATE_QUERIES, {'queries': queries_data,
+                                        'quota_count': quota_count})
+
+    def _send_yt_video_data(self, video_id: YoutubeId) -> None:
+        conn = DatabaseConnection()
+        linked_queries = YoutubeQuery.get_all_for_video(conn, video_id)
+        try:
+            video_data = YoutubeVideo.get_one(conn, video_id)
+        except NotFoundException:
+            video_data = YoutubeVideo(video_id)
+        conn.commit_close()
+        self._send_rendered_template(
+                NAME_TEMPLATE_YT_VIDEO,
+                {'video_data': video_data,
+                 'is_temp': video_id in self.server.downloads.ids_unfinished,
+                 'file_path': self.server.downloads.ids_to_paths.get(video_id,
+                                                                     None),
+                 'youtube_prefix': YOUTUBE_URL_PREFIX,
+                 'queries': linked_queries})
+
+    def _send_video_data(self, yt_id: YoutubeId) -> None:
+        conn = DatabaseConnection()
+        file = VideoFile.get_by_yt_id(conn, yt_id)
+        conn.commit_close()
+        self._send_rendered_template(
+                NAME_TEMPLATE_VIDEO,
+                {'file': file, 'flag_names': list(VIDEO_FLAGS)})
+
+    def _send_videos_index(self) -> None:
+        videos = [(id_, PathStr(basename(path)))
+                  for id_, path in self.server.downloads.ids_to_paths.items()]
+        videos.sort(key=lambda t: t[1])
+        self._send_rendered_template(NAME_TEMPLATE_VIDEOS, {'videos': videos})
+
+    def _send_missing_json(self) -> None:
+        self._send_http(
+                bytes(json_dumps(self.server.downloads.missing), 'utf8'),
+                headers=[('Content-type', 'application/json')])
+
+    def _send_last_playlist_update(self) -> None:
+        payload: dict[str, PlayerUpdateId] = {
+                'last_update': self.server.player.last_update}
+        self._send_http(bytes(json_dumps(payload), 'utf8'),
+                        headers=[('Content-type', 'application/json')])
+
+    def _send_playlist(self) -> None:
+        tuples: list[tuple[PathStr, PathStr]] = []
+        i: int = 0
+        while True:
+            prev, next_ = PathStr(''), PathStr('')
+            if len(self.server.player.prev_files) > i:
+                prev = PathStr(basename(self.server.player.prev_files[i]))
+            if len(self.server.player.next_files) > i:
+                next_ = PathStr(basename(self.server.player.next_files[i]))
+            if not prev + next_:
+                break
+            tuples += [(prev, next_)]
+            i += 1
+        self._send_rendered_template(
+                NAME_TEMPLATE_PLAYLIST,
+                {'last_update': self.server.player.last_update,
+                 'running': self.server.player.is_running,
+                 'paused': self.server.player.is_paused,
+                 'current_title': self.server.player.current_filename,
+                 'tuples': tuples})
diff --git a/requirements.txt b/requirements.txt
deleted file mode 100644 (file)
index 6303919..0000000
+++ /dev/null
@@ -1,5 +0,0 @@
-google-api-python-client==2.154.0
-Jinja2==3.1.4
-python-mpv==1.0.7
-scp==0.15.0
-yt-dlp==2024.11.18
diff --git a/ytplom b/ytplom
new file mode 100755 (executable)
index 0000000..e31cb50
--- /dev/null
+++ b/ytplom
@@ -0,0 +1,12 @@
+#!/usr/bin/sh
+set -e
+set -x
+
+PATH_APP_SHARE=~/.local/share/ytplom
+PATH_VENV="${PATH_APP_SHARE}/venv"
+
+python3 -m venv "${PATH_VENV}"
+. "${PATH_VENV}/bin/activate"
+pip3 install -r "${PATH_APP_SHARE}/requirements.txt"
+export PYTHONPATH="${PATH_APP_SHARE}:${PYTHONPATH}"
+python3 "${PATH_APP_SHARE}/ytplom.py"
diff --git a/ytplom.py b/ytplom.py
deleted file mode 100755 (executable)
index a3f4c79..0000000
--- a/ytplom.py
+++ /dev/null
@@ -1,18 +0,0 @@
-#!/usr/bin/env python3
-"""Minimalistic download-focused YouTube interface."""
-from threading import Thread
-from ytplom.misc import DownloadsDb, HTTP_PORT, Server, TaskHandler
-
-
-if __name__ == '__main__':
-    downloads_db = DownloadsDb()
-    downloads_db.clean_unfinished()
-    Thread(target=downloads_db.download_loop, daemon=False).start()
-    server = Server(downloads_db, ('0.0.0.0', HTTP_PORT), TaskHandler)
-    print(f'running at port {HTTP_PORT}')
-    try:
-        server.serve_forever()
-    except KeyboardInterrupt:
-        print('aborted due to keyboard interrupt; '
-              'repeat to end download thread too')
-    server.server_close()
diff --git a/ytplom/__init__.py b/ytplom/__init__.py
deleted file mode 100644 (file)
index e69de29..0000000
diff --git a/ytplom/misc.py b/ytplom/misc.py
deleted file mode 100644 (file)
index 647ad39..0000000
+++ /dev/null
@@ -1,853 +0,0 @@
-"""Main ytplom lib."""
-
-# included libs
-from typing import TypeAlias, Optional, NewType, Callable, Self, Any
-from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove
-from os.path import (dirname, isdir, isfile, exists as path_exists,
-                     join as path_join, splitext, basename)
-from random import shuffle
-from time import time, sleep
-from datetime import datetime, timedelta
-from json import dumps as json_dumps
-from uuid import uuid4
-from sqlite3 import connect as sql_connect, Cursor, Row
-from http.server import HTTPServer, BaseHTTPRequestHandler
-from urllib.parse import urlparse, parse_qs
-from urllib.request import urlretrieve
-from urllib.error import HTTPError
-# non-included libs
-from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
-from mpv import MPV  # type: ignore
-from yt_dlp import YoutubeDL  # type: ignore
-import googleapiclient.discovery  # type: ignore
-
-# what we might want to manually define per environs
-API_KEY = environ.get('GOOGLE_API_KEY')
-HTTP_PORT = int(environ.get('YTPLOM_PORT', 8084))
-
-# type definitions for mypy
-DatetimeStr = NewType('DatetimeStr', str)
-QuotaCost = NewType('QuotaCost', int)
-YoutubeId = NewType('YoutubeId', str)
-PathStr = NewType('PathStr', str)
-QueryId = NewType('QueryId', str)
-QueryText = NewType('QueryText', str)
-ProseText = NewType('ProseText', str)
-SqlText = NewType('SqlText', str)
-FlagName = NewType('FlagName', str)
-FlagsInt = NewType('FlagsInt', int)
-AmountDownloads = NewType('AmountDownloads', int)
-PlayerUpdateId = NewType('PlayerUpdateId', str)
-DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr]
-TemplateContext: TypeAlias = dict[
-        str, None | bool | PlayerUpdateId | Optional[PathStr] | YoutubeId
-        | QueryText | QuotaCost | list[FlagName] | 'VideoFile' | 'YoutubeVideo'
-        | list['YoutubeVideo'] | list['YoutubeQuery']
-        | list[tuple[YoutubeId, PathStr]] | list[tuple[PathStr, PathStr]]]
-
-# major expected directories
-PATH_HOME = PathStr(environ.get('HOME', ''))
-PATH_APP_DATA = PathStr(path_join(PATH_HOME, '.local/share/ytplom'))
-PATH_CACHE = PathStr(path_join(PATH_HOME, '.cache/ytplom'))
-
-# paths for rather dynamic data
-PATH_DOWNLOADS = PathStr(path_join(PATH_HOME, 'ytplom_downloads'))
-PATH_DB = PathStr(path_join(PATH_APP_DATA, 'db.sql'))
-PATH_TEMP = PathStr(path_join(PATH_CACHE, 'temp'))
-PATH_THUMBNAILS = PathStr(path_join(PATH_CACHE, 'thumbnails'))
-
-# template paths
-PATH_TEMPLATES = PathStr(path_join(PATH_APP_DATA, 'templates'))
-NAME_TEMPLATE_QUERIES = PathStr('queries.tmpl')
-NAME_TEMPLATE_RESULTS = PathStr('results.tmpl')
-NAME_TEMPLATE_VIDEOS = PathStr('videos.tmpl')
-NAME_TEMPLATE_VIDEO = PathStr('video.tmpl')
-NAME_TEMPLATE_YT_VIDEO = PathStr('yt_video.tmpl')
-NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl')
-
-# yt_dlp config
-YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
-        '/best[height<=1080][width<=1920]'
-YT_DL_PARAMS = {'paths': {'home': PATH_DOWNLOADS,
-                          'temp': PATH_TEMP},
-                'format': YT_DOWNLOAD_FORMAT}
-
-# Youtube API expectations
-YOUTUBE_URL_PREFIX = PathStr('https://www.youtube.com/watch?v=')
-THUMBNAIL_URL_PREFIX = PathStr('https://i.ytimg.com/vi/')
-THUMBNAIL_URL_SUFFIX = PathStr('/default.jpg')
-QUOTA_COST_YOUTUBE_SEARCH = QuotaCost(100)
-QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1)
-
-# local expectations
-TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
-LEGAL_EXTENSIONS = {'webm', 'mp4', 'mkv'}
-
-# tables to create database with
-EXPECTED_DB_VERSION = 0
-SQL_DB_VERSION = SqlText('PRAGMA user_version')
-PATH_MIGRATIONS = PathStr(path_join(PATH_APP_DATA, 'migrations'))
-PATH_DB_SCHEMA = PathStr(path_join(PATH_MIGRATIONS,
-                                   f'init_{EXPECTED_DB_VERSION}.sql'))
-
-# other
-NAME_INSTALLER = 'install.sh'
-VIDEO_FLAGS: dict[FlagName, FlagsInt] = {
-  FlagName('delete'): FlagsInt(1 << 62)
-}
-
-
-class NotFoundException(Exception):
-    """Raise on expected data missing, e.g. DB fetches finding nothing."""
-
-
-class HandledException(Exception):
-    """Raise in any other case where we know what's happening."""
-
-
-def _ensure_expected_dirs(expected_dirs: list[PathStr]) -> None:
-    """Ensure existance of expected_dirs _as_ directories."""
-    for dir_name in expected_dirs:
-        if not isdir(dir_name):
-            if path_exists(dir_name):
-                raise HandledException(f'at expected directory path {dir_name}'
-                                       'found non-directory')
-            print(f'creating expected directory: {dir_name}')
-            makedirs(dir_name)
-
-
-class DatabaseConnection:
-    """Wrapped sqlite3.Connection."""
-
-    def __init__(self, path: PathStr = PATH_DB) -> None:
-        self._path = path
-        if not isfile(self._path):
-            if path_exists(self._path):
-                raise HandledException(f'no DB at {self._path}; would create, '
-                                       'but something\'s already there?')
-            path_db_dir = dirname(self._path)
-            if not isdir(path_db_dir):
-                raise NotFoundException(
-                        f'cannot find {path_db_dir} as directory to put DB '
-                        f'into, did you run {NAME_INSTALLER}?')
-            with sql_connect(self._path) as conn:
-                with open(PATH_DB_SCHEMA, 'r', encoding='utf8') as f:
-                    conn.executescript(f.read())
-                conn.execute(f'{SQL_DB_VERSION} = {EXPECTED_DB_VERSION}')
-        with sql_connect(self._path) as conn:
-            db_version = list(conn.execute(SQL_DB_VERSION))[0][0]
-        if db_version != EXPECTED_DB_VERSION:
-            raise HandledException(f'wrong database version {db_version}, '
-                                   f'expected: {EXPECTED_DB_VERSION}')
-        self._conn = sql_connect(self._path)
-
-    def exec(self, sql: SqlText, inputs: tuple[Any, ...] = tuple()) -> Cursor:
-        """Wrapper around sqlite3.Connection.execute."""
-        return self._conn.execute(sql, inputs)
-
-    def commit_close(self) -> None:
-        """Run sqlite3.Connection.commit and .close."""
-        self._conn.commit()
-        self._conn.close()
-
-
-class DbData:
-    """Abstraction of common DB operation."""
-    _table_name: str
-    _cols: tuple[str, ...]
-
-    def __eq__(self, other: Any) -> bool:
-        if not isinstance(other, self.__class__):
-            return False
-        for attr_name in self._cols:
-            if getattr(self, attr_name) != getattr(other, attr_name):
-                return False
-        return True
-
-    @classmethod
-    def _from_table_row(cls, row: Row) -> Self:
-        kwargs = {}
-        for i, col_name in enumerate(cls._cols):
-            kwargs[col_name] = row[i]
-        return cls(**kwargs)
-
-    @classmethod
-    def get_one(cls, conn: DatabaseConnection, id_: str) -> Self:
-        """Return single entry of id_ from DB."""
-        sql = SqlText(f'SELECT * FROM {cls._table_name} WHERE id = ?')
-        row = conn.exec(sql, (id_,)).fetchone()
-        if not row:
-            msg = f'no entry found for ID "{id_}" in table {cls._table_name}'
-            raise NotFoundException(msg)
-        return cls._from_table_row(row)
-
-    @classmethod
-    def get_all(cls, conn: DatabaseConnection) -> list[Self]:
-        """Return all entries from DB."""
-        sql = SqlText(f'SELECT * FROM {cls._table_name}')
-        rows = conn.exec(sql).fetchall()
-        return [cls._from_table_row(row) for row in rows]
-
-    def save(self, conn: DatabaseConnection) -> Cursor:
-        """Save entry to DB."""
-        vals = [getattr(self, col_name) for col_name in self._cols]
-        q_marks = '(' + ','.join(['?'] * len(vals)) + ')'
-        sql = SqlText(f'REPLACE INTO {self._table_name} VALUES {q_marks}')
-        return conn.exec(sql, tuple(vals))
-
-
-class YoutubeQuery(DbData):
-    """Representation of YouTube query (without results)."""
-    _table_name = 'yt_queries'
-    _cols = ('id_', 'text', 'retrieved_at')
-
-    def __init__(self,
-                 id_: Optional[QueryId],
-                 text: QueryText,
-                 retrieved_at: DatetimeStr
-                 ) -> None:
-        self.id_ = id_ if id_ else QueryId(str(uuid4()))
-        self.text = QueryText(text)
-        self.retrieved_at = retrieved_at
-
-    @classmethod
-    def get_all_for_video(cls,
-                          conn: DatabaseConnection,
-                          video_id: YoutubeId
-                          ) -> list[Self]:
-        """Return YoutubeQueries containing YoutubeVideo's ID in results."""
-        sql = SqlText('SELECT query_id FROM '
-                      'yt_query_results WHERE video_id = ?')
-        query_ids = conn.exec(sql, (video_id,)).fetchall()
-        return [cls.get_one(conn, query_id_tup[0])
-                for query_id_tup in query_ids]
-
-
-class YoutubeVideo(DbData):
-    """Representation of YouTube video metadata as provided by their API."""
-    _table_name = 'yt_videos'
-    _cols = ('id_', 'title', 'description', 'published_at', 'duration',
-             'definition')
-
-    def __init__(self,
-                 id_: YoutubeId,
-                 title: ProseText = ProseText('?'),
-                 description: ProseText = ProseText('?'),
-                 published_at: DatetimeStr = DatetimeStr('?'),
-                 duration: str = '?',
-                 definition: str = '?'
-                 ) -> None:
-        self.id_ = id_
-        self.title = title
-        self.description = description
-        self.published_at = published_at
-        self.duration = duration
-        self.definition = definition
-
-    def set_duration_from_yt_string(self, yt_string: str) -> None:
-        """Set .duration from the kind of format the YouTube API provides."""
-        date_dur, time_dur = yt_string.split('T')
-        seconds = 0
-        date_dur = date_dur[1:]
-        for dur_char, len_seconds in (('Y', 60*60*24*365.25),
-                                      ('M', 60*60*24*30),
-                                      ('D', 60*60*24)):
-            if dur_char in date_dur:
-                dur_str, date_dur = date_dur.split(dur_char)
-                seconds += int(dur_str) * int(len_seconds)
-        for dur_char, len_seconds in (('H', 60*60),
-                                      ('M', 60),
-                                      ('S', 1)):
-            if dur_char in time_dur:
-                dur_str, time_dur = time_dur.split(dur_char)
-                seconds += int(dur_str) * len_seconds
-        seconds_str = str(seconds % 60)
-        minutes_str = str(seconds // 60)
-        hours_str = str(seconds // (60 * 60))
-        self.duration = ':'.join([f'0{s}' if len(s) == 1 else s for s
-                                  in (hours_str, minutes_str, seconds_str)])
-
-    @classmethod
-    def get_all_for_query(cls,
-                          conn: DatabaseConnection,
-                          query_id: QueryId
-                          ) -> list[Self]:
-        """Return all videos for query of query_id."""
-        sql = SqlText('SELECT video_id '
-                      'FROM yt_query_results WHERE query_id = ?')
-        video_ids = conn.exec(sql, (query_id,)).fetchall()
-        return [cls.get_one(conn, video_id_tup[0])
-                for video_id_tup in video_ids]
-
-    def save_to_query(self,
-                      conn: DatabaseConnection,
-                      query_id: QueryId
-                      ) -> None:
-        """Save inclusion of self in results to query of query_id."""
-        conn.exec(SqlText('REPLACE INTO yt_query_results VALUES (?, ?)'),
-                  (query_id, self.id_))
-
-
-class VideoFile(DbData):
-    """Collects data about downloaded files."""
-    _table_name = 'files'
-    _cols = ('rel_path', 'yt_id', 'flags')
-
-    def __init__(self, rel_path: PathStr, yt_id: YoutubeId, flags=FlagsInt(0)
-                 ) -> None:
-        self.rel_path = rel_path
-        self.yt_id = yt_id
-        self.flags = flags
-
-    @classmethod
-    def get_by_yt_id(cls, conn: DatabaseConnection, yt_id: YoutubeId) -> Self:
-        """Return VideoFile of .yt_id."""
-        sql = SqlText(f'SELECT * FROM {cls._table_name} WHERE yt_id = ?')
-        row = conn.exec(sql, (yt_id,)).fetchone()
-        if not row:
-            raise NotFoundException(f'no entry for file to Youtube ID {yt_id}')
-        return cls._from_table_row(row)
-
-    @property
-    def full_path(self) -> PathStr:
-        """Return self.rel_path suffixed under PATH_DOWNLOADS."""
-        return PathStr(path_join(PATH_DOWNLOADS, self.rel_path))
-
-    @property
-    def present(self) -> bool:
-        """Return if file exists in filesystem."""
-        return path_exists(self.full_path)
-
-    @property
-    def missing(self) -> bool:
-        """Return if file absent despite absence of 'delete' flag."""
-        return not (self.flag_set(FlagName('delete')) or self.present)
-
-    def flag_set(self, flag_name: FlagName) -> bool:
-        """Return if flag of flag_name is set in self.flags."""
-        return self.flags & VIDEO_FLAGS[flag_name]
-
-    def ensure_absence_if_deleted(self) -> None:
-        """If 'delete' flag set, ensure no actual file in filesystem."""
-        if self.flag_set(FlagName('delete')) and path_exists(self.full_path):
-            print(f'SYNC: {self.rel_path} set "delete", '
-                  'removing from filesystem.')
-            os_remove(self.full_path)
-
-
-class QuotaLog(DbData):
-    """Collects API access quota costs."""
-    _table_name = 'quota_costs'
-    _cols = ('id_', 'timestamp', 'cost')
-
-    def __init__(self,
-                 id_: Optional[str],
-                 timestamp: DatetimeStr,
-                 cost: QuotaCost
-                 ) -> None:
-        self.id_ = id_ if id_ else str(uuid4())
-        self.timestamp = timestamp
-        self.cost = cost
-
-    @classmethod
-    def update(cls, conn: DatabaseConnection, cost: QuotaCost) -> None:
-        """Adds cost mapped to current datetime."""
-        cls._remove_old(conn)
-        new = cls(None,
-                  DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)),
-                  QuotaCost(cost))
-        new.save(conn)
-
-    @classmethod
-    def current(cls, conn: DatabaseConnection) -> QuotaCost:
-        """Returns quota cost total for last 24 hours, purges old data."""
-        cls._remove_old(conn)
-        quota_costs = cls.get_all(conn)
-        return QuotaCost(sum(c.cost for c in quota_costs))
-
-    @classmethod
-    def _remove_old(cls, conn: DatabaseConnection) -> None:
-        cutoff = datetime.now() - timedelta(days=1)
-        sql = SqlText(f'DELETE FROM {cls._table_name} WHERE timestamp < ?')
-        conn.exec(SqlText(sql), (cutoff.strftime(TIMESTAMP_FMT),))
-
-
-class Player:
-    """MPV representation with some additional features."""
-    _idx: int
-
-    def __init__(self) -> None:
-        self.last_update = PlayerUpdateId('')
-        self._load_filenames()
-        self._mpv: Optional[MPV] = None
-
-    def _load_filenames(self) -> None:
-        self._filenames = [PathStr(e.path) for e in scandir(PATH_DOWNLOADS)
-                           if isfile(e.path)
-                           and splitext(e.path)[1][1:] in LEGAL_EXTENSIONS]
-        shuffle(self._filenames)
-        self._idx = 0
-
-    @property
-    def _mpv_available(self) -> bool:
-        return bool(self._mpv and not self._mpv.core_shutdown)
-
-    @staticmethod
-    def _if_mpv_available(f) -> Callable:
-        def wrapper(self):
-            return f(self) if self._mpv else None
-        return wrapper
-
-    def _signal_update(self) -> None:
-        self.last_update = PlayerUpdateId(f'{self._idx}:{time()}')
-
-    def _start_mpv(self) -> None:
-        self._mpv = MPV(input_default_bindings=True,
-                        input_vo_keyboard=True,
-                        config=True)
-        self._mpv.observe_property('pause', lambda a, b: self._signal_update())
-
-        @self._mpv.event_callback('start-file')
-        def on_start_file(_) -> None:
-            assert self._mpv is not None
-            self._mpv.pause = False
-            self._idx = self._mpv.playlist_pos
-            self._signal_update()
-
-        @self._mpv.event_callback('shutdown')
-        def on_shutdown(_) -> None:
-            self._mpv = None
-            self._signal_update()
-
-        for path in self._filenames:
-            self._mpv.playlist_append(path)
-        self._mpv.playlist_play_index(self._idx)
-
-    @_if_mpv_available
-    def _kill_mpv(self) -> None:
-        assert self._mpv is not None
-        self._mpv.terminate()
-        self._mpv = None
-
-    @property
-    def current_filename(self) -> Optional[PathStr]:
-        """Return what we assume is the name of the currently playing file."""
-        if not self._filenames:
-            return None
-        return PathStr(basename(self._filenames[self._idx]))
-
-    @property
-    def prev_files(self) -> list[PathStr]:
-        """List 'past' files of playlist."""
-        return list(reversed(self._filenames[:self._idx]))
-
-    @property
-    def next_files(self) -> list[PathStr]:
-        """List 'coming' files of playlist."""
-        return self._filenames[self._idx + 1:]
-
-    @property
-    def is_running(self) -> bool:
-        """Return if player is running/available."""
-        return self._mpv_available
-
-    @property
-    def is_paused(self) -> bool:
-        """Return if player is paused."""
-        if self._mpv_available:
-            assert self._mpv is not None
-            return self._mpv.pause
-        return False
-
-    def toggle_run(self) -> None:
-        """Toggle player running."""
-        if self._mpv_available:
-            self._kill_mpv()
-        else:
-            self._start_mpv()
-        self._signal_update()
-
-    @_if_mpv_available
-    def toggle_pause(self) -> None:
-        """Toggle player pausing."""
-        assert self._mpv is not None
-        self._mpv.pause = not self._mpv.pause
-        self._signal_update()
-
-    @_if_mpv_available
-    def prev(self) -> None:
-        """Move player to previous item in playlist."""
-        assert self._mpv is not None
-        if self._mpv.playlist_pos > 0:
-            self._mpv.playlist_prev()
-        else:
-            self._mpv.playlist_play_index(0)
-
-    @_if_mpv_available
-    def next(self) -> None:
-        """Move player to next item in playlist."""
-        assert self._mpv is not None
-        max_idx: int = len(self._mpv.playlist_filenames) - 1
-        if self._mpv.playlist_pos < len(self._mpv.playlist_filenames) - 1:
-            self._mpv.playlist_next()
-        else:
-            self._mpv.playlist_play_index(max_idx)
-
-    def reload(self) -> None:
-        """Close MPV, re-read (and re-shuffle) filenames, then re-start MPV."""
-        self._kill_mpv()
-        self._load_filenames()
-        self._start_mpv()
-        self._signal_update()
-
-
-class DownloadsDb:
-    """Collections downloading-related stuff."""
-
-    def __init__(self) -> None:
-        self._to_download: list[YoutubeId] = []
-        _ensure_expected_dirs([PATH_DOWNLOADS, PATH_TEMP])
-        self._sync_db()
-
-    def _sync_db(self):
-        conn = DatabaseConnection()
-        files_via_db = VideoFile.get_all(conn)
-        old_cwd = getcwd()
-        chdir(PATH_DOWNLOADS)
-        paths = [file.rel_path for file in files_via_db]
-        for path in [PathStr(e.path) for e in scandir() if isfile(e.path)]:
-            if path not in paths:
-                yt_id = self._id_from_filename(path)
-                file = VideoFile(path, yt_id)
-                print(f'SYNC: new file {path}, saving with YT ID "{yt_id}".')
-                file.save(conn)
-        self._files = VideoFile.get_all(conn)
-        for file in self._files:
-            file.ensure_absence_if_deleted()
-        chdir(old_cwd)
-        conn.commit_close()
-
-    @staticmethod
-    def _id_from_filename(path: PathStr,
-                          double_split: bool = False
-                          ) -> YoutubeId:
-        before_ext = splitext(path)[0]
-        if double_split:
-            before_ext = splitext(before_ext)[0]
-        return YoutubeId(before_ext.split('[')[-1].split(']')[0])
-
-    @property
-    def missing(self) -> list[PathStr]:
-        """Return relative paths of files known but not in PATH_DOWNLOADS."""
-        self._sync_db()
-        return [f.rel_path for f in self._files if f.missing]
-
-    @property
-    def ids_to_paths(self) -> DownloadsIndex:
-        """Return mapping YoutubeIds:paths of files downloaded to them."""
-        self._sync_db()
-        return {f.yt_id: f.full_path for f in self._files}
-
-    @property
-    def ids_unfinished(self) -> set[YoutubeId]:
-        """Return set of IDs of videos awaiting or currently in download."""
-        in_temp_dir = []
-        for path in [PathStr(e.path) for e
-                     in scandir(PATH_TEMP) if isfile(e.path)]:
-            in_temp_dir += [self._id_from_filename(path)]
-        return set(self._to_download + in_temp_dir)
-
-    def clean_unfinished(self) -> None:
-        """Empty temp directory of unfinished downloads."""
-        for e in [e for e in scandir(PATH_TEMP) if isfile(e.path)]:
-            print(f'removing unfinished download: {e.path}')
-            os_remove(e.path)
-
-    def queue_download(self, video_id: YoutubeId) -> None:
-        """Add video_id to download queue *if* not already processed."""
-        pre_existing = self.ids_unfinished | set(self._to_download
-                                                 + list(self.ids_to_paths))
-        if video_id not in pre_existing:
-            self._to_download += [video_id]
-
-    def _download_next(self) -> None:
-        if self._to_download:
-            video_id = self._to_download.pop(0)
-            with YoutubeDL(YT_DL_PARAMS) as ydl:
-                ydl.download([f'{YOUTUBE_URL_PREFIX}{video_id}'])
-            self._sync_db()
-
-    def download_loop(self) -> None:
-        """Keep iterating through download queue for new download tasks."""
-        while True:
-            sleep(0.5)
-            self._download_next()
-
-
-class Server(HTTPServer):
-    """Extension of HTTPServer providing for Player and DownloadsDb."""
-
-    def __init__(self, downloads_db: DownloadsDb, *args, **kwargs) -> None:
-        super().__init__(*args, **kwargs)
-        self.jinja = JinjaEnv(loader=JinjaFSLoader(PATH_TEMPLATES))
-        self.player = Player()
-        self.downloads = downloads_db
-
-
-class TaskHandler(BaseHTTPRequestHandler):
-    """Handler for GET and POST requests to our server."""
-    server: Server
-
-    def _send_http(self,
-                   content: bytes = b'',
-                   headers: Optional[list[tuple[str, str]]] = None,
-                   code: int = 200
-                   ) -> None:
-        headers = headers if headers else []
-        self.send_response(code)
-        for header_tuple in headers:
-            self.send_header(header_tuple[0], header_tuple[1])
-        self.end_headers()
-        if content:
-            self.wfile.write(content)
-
-    def do_POST(self) -> None:  # pylint:disable=invalid-name
-        """Map POST requests to handlers for various paths."""
-        url = urlparse(self.path)
-        toks_url: list[str] = url.path.split('/')
-        page_name = toks_url[1]
-        body_length = int(self.headers['content-length'])
-        postvars = parse_qs(self.rfile.read(body_length).decode())
-        if 'playlist' == page_name:
-            self._post_player_command(list(postvars.keys())[0])
-        elif 'video' == page_name:
-            self._post_video_flag(YoutubeId(toks_url[2]),
-                                  [FlagName(k) for k in postvars])
-        elif 'queries' == page_name:
-            self._post_query(QueryText(postvars['query'][0]))
-
-    def _post_player_command(self, command: str) -> None:
-        if 'pause' == command:
-            self.server.player.toggle_pause()
-        elif 'prev' == command:
-            self.server.player.prev()
-        elif 'next' == command:
-            self.server.player.next()
-        elif 'stop' == command:
-            self.server.player.toggle_run()
-        elif 'reload' == command:
-            self.server.player.reload()
-        sleep(0.5)  # avoid redir happening before current_file update
-        self._send_http(headers=[('Location', '/')], code=302)
-
-    def _post_video_flag(self,
-                         yt_id: YoutubeId,
-                         flag_names: list[FlagName]
-                         ) -> None:
-        conn = DatabaseConnection()
-        file = VideoFile.get_by_yt_id(conn, yt_id)
-        file.flags = 0
-        for flag_name in flag_names:
-            file.flags |= VIDEO_FLAGS[flag_name]
-        file.save(conn)
-        conn.commit_close()
-        file.ensure_absence_if_deleted()
-        self._send_http(headers=[('Location', f'/video/{yt_id}')], code=302)
-
-    def _post_query(self, query_txt: QueryText) -> None:
-        conn = DatabaseConnection()
-
-        def collect_results(query_txt: QueryText) -> list[YoutubeVideo]:
-            youtube = googleapiclient.discovery.build('youtube', 'v3',
-                                                      developerKey=API_KEY)
-            QuotaLog.update(conn, QUOTA_COST_YOUTUBE_SEARCH)
-            search_request = youtube.search().list(
-                    q=query_txt,
-                    part='snippet',
-                    maxResults=25,
-                    safeSearch='none',
-                    type='video')
-            results: list[YoutubeVideo] = []
-            ids_to_detail: list[YoutubeId] = []
-            for item in search_request.execute()['items']:
-                video_id: YoutubeId = item['id']['videoId']
-                ids_to_detail += [video_id]
-                snippet = item['snippet']
-                urlretrieve(snippet['thumbnails']['default']['url'],
-                            path_join(PATH_THUMBNAILS, f'{video_id}.jpg'))
-                results += [YoutubeVideo(id_=video_id,
-                                         title=snippet['title'],
-                                         description=snippet['description'],
-                                         published_at=snippet['publishedAt'])]
-            QuotaLog.update(conn, QUOTA_COST_YOUTUBE_DETAILS)
-            ids_for_details = ','.join([r.id_ for r in results])
-            videos_request = youtube.videos().list(id=ids_for_details,
-                                                   part='content_details')
-            unfinished_streams: list[YoutubeId] = []
-            for i, detailed in enumerate(videos_request.execute()['items']):
-                result = results[i]
-                assert result.id_ == detailed['id']
-                content_details: dict[str, str] = detailed['contentDetails']
-                if 'P0D' == content_details['duration']:
-                    unfinished_streams += [result.id_]
-                    continue
-                result.set_duration_from_yt_string(content_details['duration'])
-                result.definition = content_details['definition'].upper()
-            return [r for r in results if r.id_ not in unfinished_streams]
-
-        query_data = YoutubeQuery(
-                None, query_txt,
-                DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)))
-        query_data.save(conn)
-        for result in collect_results(query_txt):
-            result.save(conn)
-            result.save_to_query(conn, query_data.id_)
-        conn.commit_close()
-        self._send_http(headers=[('Location', f'/query/{query_data.id_}')],
-                        code=302)
-
-    def do_GET(self) -> None:  # pylint:disable=invalid-name
-        """Map GET requests to handlers for various paths."""
-        url = urlparse(self.path)
-        toks_url: list[str] = url.path.split('/')
-        page_name = toks_url[1]
-        try:
-            if 'thumbnails' == page_name:
-                self._send_thumbnail(PathStr(toks_url[2]))
-            elif 'dl' == page_name:
-                self._send_or_download_video(YoutubeId(toks_url[2]))
-            elif 'videos' == page_name:
-                self._send_videos_index()
-            elif 'video' == page_name:
-                self._send_video_data(YoutubeId(toks_url[2]))
-            elif 'yt_video' == page_name:
-                self._send_yt_video_data(YoutubeId(toks_url[2]))
-            elif 'missing.json' == page_name:
-                self._send_missing_json()
-            elif 'query' == page_name:
-                self._send_query_page(QueryId(toks_url[2]))
-            elif 'queries' == page_name:
-                self._send_queries_index_and_search()
-            elif '_last_playlist_update.json' == page_name:
-                self._send_last_playlist_update()
-            else:  # e.g. for /
-                self._send_playlist()
-        except NotFoundException as e:
-            self._send_http(bytes(str(e), 'utf8'), code=404)
-
-    def _send_rendered_template(self,
-                                tmpl_name: PathStr,
-                                tmpl_ctx: TemplateContext
-                                ) -> None:
-        tmpl = self.server.jinja.get_template(tmpl_name)
-        html = tmpl.render(**tmpl_ctx)
-        self._send_http(bytes(html, 'utf8'))
-
-    def _send_thumbnail(self, filename: PathStr) -> None:
-        _ensure_expected_dirs([PATH_THUMBNAILS])
-        path_thumbnail = path_join(PATH_THUMBNAILS, filename)
-        if not path_exists(path_thumbnail):
-            video_id = splitext(filename)[0]
-            url = f'{THUMBNAIL_URL_PREFIX}{video_id}{THUMBNAIL_URL_SUFFIX}'
-            try:
-                urlretrieve(url, path_join(PATH_THUMBNAILS, f'{video_id}.jpg'))
-            except HTTPError as e:
-                if 404 == e.code:
-                    raise NotFoundException from e
-                raise e
-        with open(path_thumbnail, 'rb') as f:
-            img = f.read()
-        self._send_http(img, [('Content-type', 'image/jpg')])
-
-    def _send_or_download_video(self, video_id: YoutubeId) -> None:
-        if video_id in self.server.downloads.ids_to_paths:
-            with open(self.server.downloads.ids_to_paths[video_id],
-                      'rb') as video_file:
-                video = video_file.read()
-            self._send_http(content=video)
-            return
-        self.server.downloads.queue_download(video_id)
-        self._send_http(headers=[('Location', f'/yt_video/{video_id}')],
-                        code=302)
-
-    def _send_query_page(self, query_id: QueryId) -> None:
-        conn = DatabaseConnection()
-        query = YoutubeQuery.get_one(conn, str(query_id))
-        results = YoutubeVideo.get_all_for_query(conn, query_id)
-        conn.commit_close()
-        self._send_rendered_template(
-                NAME_TEMPLATE_RESULTS,
-                {'query': query.text, 'videos': results})
-
-    def _send_queries_index_and_search(self) -> None:
-        conn = DatabaseConnection()
-        quota_count = QuotaLog.current(conn)
-        queries_data = YoutubeQuery.get_all(conn)
-        conn.commit_close()
-        queries_data.sort(key=lambda q: q.retrieved_at, reverse=True)
-        self._send_rendered_template(
-                NAME_TEMPLATE_QUERIES, {'queries': queries_data,
-                                        'quota_count': quota_count})
-
-    def _send_yt_video_data(self, video_id: YoutubeId) -> None:
-        conn = DatabaseConnection()
-        linked_queries = YoutubeQuery.get_all_for_video(conn, video_id)
-        try:
-            video_data = YoutubeVideo.get_one(conn, video_id)
-        except NotFoundException:
-            video_data = YoutubeVideo(video_id)
-        conn.commit_close()
-        self._send_rendered_template(
-                NAME_TEMPLATE_YT_VIDEO,
-                {'video_data': video_data,
-                 'is_temp': video_id in self.server.downloads.ids_unfinished,
-                 'file_path': self.server.downloads.ids_to_paths.get(video_id,
-                                                                     None),
-                 'youtube_prefix': YOUTUBE_URL_PREFIX,
-                 'queries': linked_queries})
-
-    def _send_video_data(self, yt_id: YoutubeId) -> None:
-        conn = DatabaseConnection()
-        file = VideoFile.get_by_yt_id(conn, yt_id)
-        conn.commit_close()
-        self._send_rendered_template(
-                NAME_TEMPLATE_VIDEO,
-                {'file': file, 'flag_names': list(VIDEO_FLAGS)})
-
-    def _send_videos_index(self) -> None:
-        videos = [(id_, PathStr(basename(path)))
-                  for id_, path in self.server.downloads.ids_to_paths.items()]
-        videos.sort(key=lambda t: t[1])
-        self._send_rendered_template(NAME_TEMPLATE_VIDEOS, {'videos': videos})
-
-    def _send_missing_json(self) -> None:
-        self._send_http(
-                bytes(json_dumps(self.server.downloads.missing), 'utf8'),
-                headers=[('Content-type', 'application/json')])
-
-    def _send_last_playlist_update(self) -> None:
-        payload: dict[str, PlayerUpdateId] = {
-                'last_update': self.server.player.last_update}
-        self._send_http(bytes(json_dumps(payload), 'utf8'),
-                        headers=[('Content-type', 'application/json')])
-
-    def _send_playlist(self) -> None:
-        tuples: list[tuple[PathStr, PathStr]] = []
-        i: int = 0
-        while True:
-            prev, next_ = PathStr(''), PathStr('')
-            if len(self.server.player.prev_files) > i:
-                prev = PathStr(basename(self.server.player.prev_files[i]))
-            if len(self.server.player.next_files) > i:
-                next_ = PathStr(basename(self.server.player.next_files[i]))
-            if not prev + next_:
-                break
-            tuples += [(prev, next_)]
-            i += 1
-        self._send_rendered_template(
-                NAME_TEMPLATE_PLAYLIST,
-                {'last_update': self.server.player.last_update,
-                 'running': self.server.player.is_running,
-                 'paused': self.server.player.is_paused,
-                 'current_title': self.server.player.current_filename,
-                 'tuples': tuples})