home · contact · privacy
Modularize.
authorChristian Heller <c.heller@plomlompom.de>
Thu, 21 Nov 2024 02:08:15 +0000 (03:08 +0100)
committerChristian Heller <c.heller@plomlompom.de>
Thu, 21 Nov 2024 02:08:15 +0000 (03:08 +0100)
ytplom.py
ytplom/__init__.py [new file with mode: 0644]
ytplom/misc.py [new file with mode: 0644]

index 1d94ccc71b61f15f1ab737ede9724ed2350c3ca4..9a204f62efc550b0685e818e4d5e14d16e5a7bb5 100755 (executable)
--- a/ytplom.py
+++ b/ytplom.py
 #!/usr/bin/env python3
 """Minimalistic download-focused YouTube interface."""
-
-# included libs
-from typing import TypeAlias, Optional, NewType, Callable, Self, Any
-from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove
-from os.path import (isdir, isfile, exists as path_exists, join as path_join,
-                     splitext, basename)
-from random import shuffle
-from time import time, sleep
-from datetime import datetime, timedelta
-from json import dumps as json_dumps
-from uuid import uuid4
 from threading import Thread
-from sqlite3 import connect as sql_connect, Cursor, Row
-from http.server import HTTPServer, BaseHTTPRequestHandler
-from urllib.parse import urlparse, parse_qs
-from urllib.request import urlretrieve
-from urllib.error import HTTPError
-# non-included libs
-from jinja2 import Template
-from mpv import MPV  # type: ignore
-from yt_dlp import YoutubeDL  # type: ignore
-import googleapiclient.discovery  # type: ignore
-
-# what we might want to manually define per environs
-API_KEY = environ.get('GOOGLE_API_KEY')
-HTTP_PORT = int(environ.get('YTPLOM_PORT', 8084))
-
-# type definitions for mypy
-DatetimeStr = NewType('DatetimeStr', str)
-QuotaCost = NewType('QuotaCost', int)
-YoutubeId = NewType('YoutubeId', str)
-PathStr = NewType('PathStr', str)
-QueryId = NewType('QueryId', str)
-QueryText = NewType('QueryText', str)
-ProseText = NewType('ProseText', str)
-SqlText = NewType('SqlText', str)
-AmountDownloads = NewType('AmountDownloads', int)
-PlayerUpdateId = NewType('PlayerUpdateId', str)
-DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr]
-TemplateContext: TypeAlias = dict[
-        str, None | bool | PlayerUpdateId | Optional[PathStr] | YoutubeId
-        | QueryText | QuotaCost | 'YoutubeVideo' | list['YoutubeVideo']
-        | list['YoutubeQuery'] | list[tuple[YoutubeId, PathStr]]
-        | list[tuple[PathStr, PathStr]]]
-
-# local data reasonably expected to be in user home directory
-PATH_HOME = PathStr(environ.get('HOME', ''))
-PATH_WORKDIR = PathStr(path_join(PATH_HOME, 'ytplom'))
-PATH_THUMBNAILS = PathStr(path_join(PATH_WORKDIR, 'thumbnails'))
-PATH_DB = PathStr(path_join(PATH_WORKDIR, 'db.sql'))
-PATH_DOWNLOADS = PathStr(path_join(PATH_WORKDIR, 'downloads'))
-PATH_TEMP = PathStr(path_join(PATH_WORKDIR, 'temp'))
-
-# template paths; might move outside PATH_WORKDIR in the future
-PATH_TEMPLATES = PathStr(path_join(PATH_WORKDIR, 'templates'))
-NAME_TEMPLATE_QUERIES = PathStr('queries.tmpl')
-NAME_TEMPLATE_RESULTS = PathStr('results.tmpl')
-NAME_TEMPLATE_VIDEOS = PathStr('videos.tmpl')
-NAME_TEMPLATE_VIDEO_ABOUT = PathStr('video_about.tmpl')
-NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl')
-PATH_TEMPLATE_QUERIES = PathStr(path_join(PATH_TEMPLATES,
-                                          NAME_TEMPLATE_QUERIES))
-
-# yt_dlp config
-YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
-        '/best[height<=1080][width<=1920]'
-YT_DL_PARAMS = {'paths': {'home': PATH_DOWNLOADS,
-                          'temp': PATH_TEMP},
-                'format': YT_DOWNLOAD_FORMAT}
-
-# Youtube API expectations
-YOUTUBE_URL_PREFIX = PathStr('https://www.youtube.com/watch?v=')
-THUMBNAIL_URL_PREFIX = PathStr('https://i.ytimg.com/vi/')
-THUMBNAIL_URL_SUFFIX = PathStr('/default.jpg')
-QUOTA_COST_YOUTUBE_SEARCH = QuotaCost(100)
-QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1)
-
-# local expectations
-TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
-LEGAL_EXTENSIONS = {'webm', 'mp4', 'mkv'}
-
-# tables to create database with
-SCRIPT_INIT_DB = '''
-CREATE TABLE yt_queries (
-  id TEXT PRIMARY KEY,
-  text TEXT NOT NULL,
-  retrieved_at TEXT NOT NULL
-);
-CREATE TABLE yt_videos (
-  id TEXT PRIMARY KEY,
-  title TEXT NOT NULL,
-  description TEXT NOT NULL,
-  published_at TEXT NOT NULL,
-  duration TEXT NOT NULL,
-  definition TEXT NOT NULL
-);
-CREATE TABLE yt_query_results (
-  query_id TEXT NOT NULL,
-  video_id TEXT NOT NULL,
-  PRIMARY KEY (query_id, video_id),
-  FOREIGN KEY (query_id) REFERENCES yt_queries(id),
-  FOREIGN KEY (video_id) REFERENCES yt_videos(id)
-);
-CREATE TABLE quota_costs (
-  id TEXT PRIMARY KEY,
-  timestamp TEXT NOT NULL,
-  cost INT NOT NULL
-);
-CREATE TABLE files (
-  rel_path TEXT PRIMARY KEY,
-  yt_id TEXT NOT NULL DEFAULT "",
-  FOREIGN KEY (yt_id) REFERENCES yt_videos(id)
-);
-'''
-
-
-class NotFoundException(BaseException):
-    """Call on DB fetches finding less than expected."""
-
-
-def _ensure_expected_dirs(expected_dirs: list[PathStr]) -> None:
-    """Ensure existance of expected_dirs _as_ directories."""
-    for dir_name in expected_dirs:
-        if not path_exists(dir_name):
-            print(f'creating expected directory: {dir_name}')
-            makedirs(dir_name)
-        elif not isdir(dir_name):
-            msg = f'at expected directory path {dir_name} found non-directory'
-            raise Exception(msg)
-
-
-class DatabaseConnection:
-    """Wrapped sqlite3.Connection."""
-
-    def __init__(self) -> None:
-        if not path_exists(PATH_DB):
-            with sql_connect(PATH_DB) as conn:
-                conn.executescript(SCRIPT_INIT_DB)
-        self._conn = sql_connect(PATH_DB)
-
-    def exec(self, sql: SqlText, inputs: tuple[Any, ...] = tuple()) -> Cursor:
-        """Wrapper around sqlite3.Connection.execute."""
-        return self._conn.execute(sql, inputs)
-
-    def commit_close(self) -> None:
-        """Run sqlite3.Connection.commit and .close."""
-        self._conn.commit()
-        self._conn.close()
-
-
-class DbData:
-    """Abstraction of common DB operation."""
-    _table_name: str
-    _cols: tuple[str, ...]
-
-    @classmethod
-    def _from_table_row(cls, row: Row) -> Self:
-        kwargs = {}
-        for i, col_name in enumerate(cls._cols):
-            kwargs[col_name] = row[i]
-        return cls(**kwargs)
-
-    @classmethod
-    def get_one(cls, conn: DatabaseConnection, id_: str) -> Self:
-        """Return single entry of id_ from DB."""
-        sql = SqlText(f'SELECT * FROM {cls._table_name} WHERE id = ?')
-        row = conn.exec(sql, (id_,)).fetchone()
-        if not row:
-            raise NotFoundException
-        return cls._from_table_row(row)
-
-    @classmethod
-    def get_all(cls, conn: DatabaseConnection) -> list[Self]:
-        """Return all entries from DB."""
-        sql = SqlText(f'SELECT * FROM {cls._table_name}')
-        rows = conn.exec(sql).fetchall()
-        return [cls._from_table_row(row) for row in rows]
-
-    def save(self, conn: DatabaseConnection) -> Cursor:
-        """Save entry to DB."""
-        vals = [getattr(self, col_name) for col_name in self._cols]
-        q_marks = '(' + ','.join(['?'] * len(vals)) + ')'
-        sql = SqlText(f'REPLACE INTO {self._table_name} VALUES {q_marks}')
-        return conn.exec(sql, tuple(vals))
-
-
-class YoutubeQuery(DbData):
-    """Representation of YouTube query (without results)."""
-    _table_name = 'yt_queries'
-    _cols = ('id_', 'text', 'retrieved_at')
-
-    def __init__(self,
-                 id_: Optional[QueryId],
-                 text: QueryText,
-                 retrieved_at: DatetimeStr
-                 ) -> None:
-        self.id_ = id_ if id_ else QueryId(str(uuid4()))
-        self.text = QueryText(text)
-        self.retrieved_at = retrieved_at
-
-    @classmethod
-    def get_all_for_video(cls,
-                          conn: DatabaseConnection,
-                          video_id: YoutubeId
-                          ) -> list[Self]:
-        """Return YoutubeQueries containing YoutubeVideo's ID in results."""
-        sql = SqlText('SELECT query_id FROM '
-                      'yt_query_results WHERE video_id = ?')
-        query_ids = conn.exec(sql, (video_id,)).fetchall()
-        return [cls.get_one(conn, query_id_tup[0])
-                for query_id_tup in query_ids]
-
-
-class YoutubeVideo(DbData):
-    """Representation of YouTube video metadata as provided by their API."""
-    _table_name = 'yt_videos'
-    _cols = ('id_', 'title', 'description', 'published_at', 'duration',
-             'definition')
-
-    def __init__(self,
-                 id_: YoutubeId,
-                 title: ProseText = ProseText('?'),
-                 description: ProseText = ProseText('?'),
-                 published_at: DatetimeStr = DatetimeStr('?'),
-                 duration: str = '?',
-                 definition: str = '?'
-                 ) -> None:
-        self.id_ = id_
-        self.title = title
-        self.description = description
-        self.published_at = published_at
-        self.duration = duration
-        self.definition = definition
-
-    def set_duration_from_yt_string(self, yt_string) -> None:
-        """Set .duration from the kind of format the YouTube API provides."""
-        date_dur, time_dur = yt_string.split('T')
-        seconds = 0
-        date_dur = date_dur[1:]
-        for dur_char, len_seconds in (('Y', 60*60*24*365.25),
-                                      ('M', 60*60*24*30),
-                                      ('D', 60*60*24)):
-            if dur_char in date_dur:
-                dur_str, date_dur = date_dur.split(dur_char)
-                seconds += int(dur_str) * int(len_seconds)
-        for dur_char, len_seconds in (('H', 60*60),
-                                      ('M', 60),
-                                      ('S', 1)):
-            if dur_char in time_dur:
-                dur_str, time_dur = time_dur.split(dur_char)
-                seconds += int(dur_str) * len_seconds
-        seconds_str = str(seconds % 60)
-        minutes_str = str(seconds // 60)
-        hours_str = str(seconds // (60 * 60))
-        self.duration = ':'.join([f'0{s}' if len(s) == 1 else s for s
-                                  in (hours_str, minutes_str, seconds_str)])
-
-    @classmethod
-    def get_all_for_query(cls,
-                          conn: DatabaseConnection,
-                          query_id: QueryId
-                          ) -> list[Self]:
-        """Return all videos for query of query_id."""
-        sql = SqlText('SELECT video_id '
-                      'FROM yt_query_results WHERE query_id = ?')
-        video_ids = conn.exec(sql, (query_id,)).fetchall()
-        return [cls.get_one(conn, video_id_tup[0])
-                for video_id_tup in video_ids]
-
-    def save_to_query(self,
-                      conn: DatabaseConnection,
-                      query_id: QueryId
-                      ) -> None:
-        """Save inclusion of self in results to query of query_id."""
-        conn.exec(SqlText('REPLACE INTO yt_query_results VALUES (?, ?)'),
-                  (query_id, self.id_))
-
-
-class VideoFile(DbData):
-    """Collects data about downloaded files."""
-    _table_name = 'files'
-    _cols = ('rel_path', 'yt_id')
-
-    def __init__(self, rel_path: PathStr, yt_id: YoutubeId) -> None:
-        self.rel_path = rel_path
-        self.yt_id = yt_id
-
-    def remove(self, conn: DatabaseConnection) -> None:
-        """Remove self from database by self.rel_path as identifier."""
-        sql = SqlText(f'DELETE FROM {self._table_name} WHERE rel_path = ?')
-        conn.exec(SqlText(sql), (self.rel_path,))
-
-
-class QuotaLog(DbData):
-    """Collects API access quota costs."""
-    _table_name = 'quota_costs'
-    _cols = ('id_', 'timestamp', 'cost')
-
-    def __init__(self,
-                 id_: Optional[str],
-                 timestamp: DatetimeStr,
-                 cost: QuotaCost
-                 ) -> None:
-        self.id_ = id_ if id_ else str(uuid4())
-        self.timestamp = timestamp
-        self.cost = cost
-
-    @classmethod
-    def update(cls, conn: DatabaseConnection, cost: QuotaCost) -> None:
-        """Adds cost mapped to current datetime."""
-        cls._remove_old(conn)
-        new = cls(None,
-                  DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)),
-                  QuotaCost(cost))
-        new.save(conn)
-
-    @classmethod
-    def current(cls, conn: DatabaseConnection) -> QuotaCost:
-        """Returns quota cost total for last 24 hours, purges old data."""
-        cls._remove_old(conn)
-        quota_costs = cls.get_all(conn)
-        return QuotaCost(sum(c.cost for c in quota_costs))
-
-    @classmethod
-    def _remove_old(cls, conn: DatabaseConnection) -> None:
-        cutoff = datetime.now() - timedelta(days=1)
-        sql = SqlText(f'DELETE FROM {cls._table_name} WHERE timestamp < ?')
-        conn.exec(SqlText(sql), (cutoff.strftime(TIMESTAMP_FMT),))
-
-
-class Player:
-    """MPV representation with some additional features."""
-
-    def __init__(self) -> None:
-        self.last_update = PlayerUpdateId('')
-        self._filenames = [PathStr(e.path) for e in scandir(PATH_DOWNLOADS)
-                           if isfile(e.path)
-                           and splitext(e.path)[1][1:] in LEGAL_EXTENSIONS]
-        shuffle(self._filenames)
-        self._idx: int = 0
-        self._mpv: Optional[MPV] = None
-
-    @property
-    def _mpv_available(self) -> bool:
-        return bool(self._mpv and not self._mpv.core_shutdown)
-
-    @staticmethod
-    def _if_mpv_available(f) -> Callable:
-        def wrapper(self):
-            return f(self) if self._mpv else None
-        return wrapper
-
-    def _signal_update(self) -> None:
-        self.last_update = PlayerUpdateId(f'{self._idx}:{time()}')
+from ytplom.misc import DownloadsDb, HTTP_PORT, Player, Server, TaskHandler
 
-    def _start_mpv(self) -> None:
-        self._mpv = MPV(input_default_bindings=True,
-                        input_vo_keyboard=True,
-                        config=True)
-        self._mpv.observe_property('pause', lambda a, b: self._signal_update())
 
-        @self._mpv.event_callback('start-file')
-        def on_start_file(_) -> None:
-            assert self._mpv is not None
-            self._mpv.pause = False
-            self._idx = self._mpv.playlist_pos
-            self._signal_update()
-
-        @self._mpv.event_callback('shutdown')
-        def on_shutdown(_) -> None:
-            self._mpv = None
-            self._signal_update()
-
-        for path in self._filenames:
-            self._mpv.playlist_append(path)
-        self._mpv.playlist_play_index(self._idx)
-
-    @property
-    def current_filename(self) -> Optional[PathStr]:
-        """Return what we assume is the name of the currently playing file."""
-        if not self._filenames:
-            return None
-        return PathStr(basename(self._filenames[self._idx]))
-
-    @property
-    def prev_files(self) -> list[PathStr]:
-        """List 'past' files of playlist."""
-        return list(reversed(self._filenames[:self._idx]))
-
-    @property
-    def next_files(self) -> list[PathStr]:
-        """List 'coming' files of playlist."""
-        return self._filenames[self._idx + 1:]
-
-    @property
-    def is_running(self) -> bool:
-        """Return if player is running/available."""
-        return self._mpv_available
-
-    @property
-    def is_paused(self) -> bool:
-        """Return if player is paused."""
-        if self._mpv_available:
-            assert self._mpv is not None
-            return self._mpv.pause
-        return False
-
-    def toggle_run(self) -> None:
-        """Toggle player running."""
-        if self._mpv_available:
-            assert self._mpv is not None
-            self._mpv.terminate()
-            self._mpv = None
-        else:
-            self._start_mpv()
-        self._signal_update()
-
-    @_if_mpv_available
-    def toggle_pause(self) -> None:
-        """Toggle player pausing."""
-        assert self._mpv is not None
-        self._mpv.pause = not self._mpv.pause
-        self._signal_update()
-
-    @_if_mpv_available
-    def prev(self) -> None:
-        """Move player to previous item in playlist."""
-        assert self._mpv is not None
-        if self._mpv.playlist_pos > 0:
-            self._mpv.playlist_prev()
-        else:
-            self._mpv.playlist_play_index(0)
-
-    @_if_mpv_available
-    def next(self) -> None:
-        """Move player to next item in playlist."""
-        assert self._mpv is not None
-        max_idx: int = len(self._mpv.playlist_filenames) - 1
-        if self._mpv.playlist_pos < len(self._mpv.playlist_filenames) - 1:
-            self._mpv.playlist_next()
-        else:
-            self._mpv.playlist_play_index(max_idx)
-
-
-class DownloadsDb:
-    """Collections downloading-related stuff."""
-
-    def __init__(self) -> None:
-        self._to_download: list[YoutubeId] = []
-        _ensure_expected_dirs([PATH_DOWNLOADS, PATH_TEMP])
-        self._sync_db()
-
-    def _sync_db(self):
-        conn = DatabaseConnection()
-        files_via_db = VideoFile.get_all(conn)
-        old_cwd = getcwd()
-        chdir(PATH_DOWNLOADS)
-        for file in files_via_db:
-            if not isfile(path_join(file.rel_path)):
-                print(f'SYNC: no file {file.rel_path} found, removing entry.')
-                file.remove(conn)
-        paths = [file.rel_path for file in files_via_db]
-        for path in [PathStr(e.path) for e in scandir() if isfile(e.path)]:
-            if path not in paths:
-                yt_id = self._id_from_filename(path)
-                file = VideoFile(path, yt_id)
-                print(f'SYNC: new file {path}, saving with YT ID "{yt_id}".')
-                file.save(conn)
-        chdir(old_cwd)
-        self._files = VideoFile.get_all(conn)
-        conn.commit_close()
-
-    @staticmethod
-    def _id_from_filename(path: PathStr,
-                          double_split: bool = False
-                          ) -> YoutubeId:
-        before_ext = splitext(path)[0]
-        if double_split:
-            before_ext = splitext(before_ext)[0]
-        return YoutubeId(before_ext.split('[')[-1].split(']')[0])
-
-    @property
-    def ids_to_paths(self) -> DownloadsIndex:
-        """Return mapping YoutubeIds:paths of files downloaded to them."""
-        self._sync_db()
-        return {f.yt_id: PathStr(path_join(PATH_DOWNLOADS, f.rel_path))
-                for f in self._files}
-
-    @property
-    def ids_unfinished(self) -> set[YoutubeId]:
-        """Return set of IDs of videos awaiting or currently in download."""
-        in_temp_dir = []
-        for path in [PathStr(e.path) for e
-                     in scandir(PATH_TEMP) if isfile(e.path)]:
-            in_temp_dir += [self._id_from_filename(path)]
-        return set(self._to_download + in_temp_dir)
-
-    def clean_unfinished(self) -> None:
-        """Empty temp directory of unfinished downloads."""
-        for e in [e for e in scandir(PATH_TEMP) if isfile(e.path)]:
-            print(f'removing unfinished download: {e.path}')
-            os_remove(e.path)
-
-    def queue_download(self, video_id: YoutubeId) -> None:
-        """Add video_id to download queue *if* not already processed."""
-        pre_existing = self.ids_unfinished | set(self._to_download
-                                                 + list(self.ids_to_paths))
-        if video_id not in pre_existing:
-            self._to_download += [video_id]
-
-    def _download_next(self) -> None:
-        if self._to_download:
-            video_id = self._to_download.pop(0)
-            with YoutubeDL(YT_DL_PARAMS) as ydl:
-                ydl.download([f'{YOUTUBE_URL_PREFIX}{video_id}'])
-
-    def download_loop(self) -> None:
-        """Keep iterating through download queue for new download tasks."""
-        while True:
-            sleep(0.5)
-            self._download_next()
-
-
-class Server(HTTPServer):
-    """Extension of HTTPServer providing for Player and DownloadsDb."""
-
-    def __init__(self,
-                 player: Player,
-                 downloads_db: DownloadsDb,
-                 *args, **kwargs
-                 ) -> None:
-        super().__init__(*args, **kwargs)
-        self.player = player
-        self.downloads = downloads_db
-
-
-class TaskHandler(BaseHTTPRequestHandler):
-    """Handler for GET and POST requests to our server."""
-    server: Server
-
-    def _send_http(self,
-                   content: bytes = b'',
-                   headers: Optional[list[tuple[str, str]]] = None,
-                   code: int = 200
-                   ) -> None:
-        headers = headers if headers else []
-        self.send_response(code)
-        for header_tuple in headers:
-            self.send_header(header_tuple[0], header_tuple[1])
-        self.end_headers()
-        if content:
-            self.wfile.write(content)
-
-    def do_POST(self) -> None:  # pylint:disable=invalid-name
-        """Map POST requests to handlers for various paths."""
-        url = urlparse(self.path)
-        toks_url: list[str] = url.path.split('/')
-        page_name = toks_url[1]
-        body_length = int(self.headers['content-length'])
-        postvars = parse_qs(self.rfile.read(body_length).decode())
-        if 'playlist' == page_name:
-            self._post_player_command(list(postvars.keys())[0])
-        elif 'queries' == page_name:
-            self._post_query(QueryText(postvars['query'][0]))
-
-    def _post_player_command(self, commands: str) -> None:
-        if 'pause' in commands:
-            self.server.player.toggle_pause()
-        elif 'prev' in commands:
-            self.server.player.prev()
-        elif 'next' in commands:
-            self.server.player.next()
-        elif 'stop' in commands:
-            self.server.player.toggle_run()
-        sleep(0.5)  # avoid reload happening before current_file update
-        self._send_http(headers=[('Location', '/')], code=302)
-
-    def _post_query(self, query_txt: QueryText) -> None:
-        conn = DatabaseConnection()
-
-        def collect_results(query_txt: QueryText) -> list[YoutubeVideo]:
-            youtube = googleapiclient.discovery.build('youtube', 'v3',
-                                                      developerKey=API_KEY)
-            QuotaLog.update(conn, QUOTA_COST_YOUTUBE_SEARCH)
-            search_request = youtube.search().list(
-                    q=query_txt,
-                    part='snippet',
-                    maxResults=25,
-                    safeSearch='none',
-                    type='video')
-            results: list[YoutubeVideo] = []
-            ids_to_detail: list[YoutubeId] = []
-            for item in search_request.execute()['items']:
-                video_id: YoutubeId = item['id']['videoId']
-                ids_to_detail += [video_id]
-                snippet = item['snippet']
-                urlretrieve(snippet['thumbnails']['default']['url'],
-                            path_join(PATH_THUMBNAILS, f'{video_id}.jpg'))
-                results += [YoutubeVideo(id_=video_id,
-                                         title=snippet['title'],
-                                         description=snippet['description'],
-                                         published_at=snippet['publishedAt'])]
-            QuotaLog.update(conn, QUOTA_COST_YOUTUBE_DETAILS)
-            ids_for_details = ','.join([r.id_ for r in results])
-            videos_request = youtube.videos().list(id=ids_for_details,
-                                                   part='content_details')
-            for i, detailed in enumerate(videos_request.execute()['items']):
-                result = results[i]
-                assert result.id_ == detailed['id']
-                content_details: dict[str, str] = detailed['contentDetails']
-                result.set_duration_from_yt_string(content_details['duration'])
-                result.definition = content_details['definition'].upper()
-            return results
-
-        query_data = YoutubeQuery(
-                None, query_txt,
-                DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)))
-        query_data.save(conn)
-        for result in collect_results(query_txt):
-            result.save(conn)
-            result.save_to_query(conn, query_data.id_)
-        conn.commit_close()
-        self._send_http(headers=[('Location', f'/query/{query_data.id_}')],
-                        code=302)
-
-    def do_GET(self) -> None:  # pylint:disable=invalid-name
-        """Map GET requests to handlers for various paths."""
-        url = urlparse(self.path)
-        toks_url: list[str] = url.path.split('/')
-        page_name = toks_url[1]
-        try:
-            if 'thumbnails' == page_name:
-                self._send_thumbnail(PathStr(toks_url[2]))
-            elif 'dl' == page_name:
-                self._send_or_download_video(YoutubeId(toks_url[2]))
-            elif 'videos' == page_name:
-                self._send_videos_index()
-            elif 'video_about' == page_name:
-                self._send_video_about(YoutubeId(toks_url[2]))
-            elif 'query' == page_name:
-                self._send_query_page(QueryId(toks_url[2]))
-            elif 'queries' == page_name:
-                self._send_queries_index_and_search()
-            elif '_last_playlist_update.json' == page_name:
-                self._send_last_playlist_update()
-            else:  # e.g. for /
-                self._send_playlist()
-        except NotFoundException:
-            self._send_http(b'not found', code=404)
-
-    def _send_rendered_template(self,
-                                tmpl_name: PathStr,
-                                tmpl_ctx: TemplateContext
-                                ) -> None:
-        with open(path_join(PATH_TEMPLATES, tmpl_name),
-                  'r', encoding='utf8'
-                  ) as templ_file:
-            tmpl = Template(str(templ_file.read()))
-        html = tmpl.render(**tmpl_ctx)
-        self._send_http(bytes(html, 'utf8'))
-
-    def _send_thumbnail(self, filename: PathStr) -> None:
-        _ensure_expected_dirs([PATH_THUMBNAILS])
-        path_thumbnail = path_join(PATH_THUMBNAILS, filename)
-        if not path_exists(path_thumbnail):
-            video_id = splitext(filename)[0]
-            url = f'{THUMBNAIL_URL_PREFIX}{video_id}{THUMBNAIL_URL_SUFFIX}'
-            try:
-                urlretrieve(url, path_join(PATH_THUMBNAILS, f'{video_id}.jpg'))
-            except HTTPError as e:
-                if 404 == e.code:
-                    raise NotFoundException from e
-                raise e
-        with open(path_thumbnail, 'rb') as f:
-            img = f.read()
-        self._send_http(img, [('Content-type', 'image/jpg')])
-
-    def _send_or_download_video(self, video_id: YoutubeId) -> None:
-        if video_id in self.server.downloads.ids_to_paths:
-            with open(self.server.downloads.ids_to_paths[video_id],
-                      'rb') as video_file:
-                video = video_file.read()
-            self._send_http(content=video)
-            return
-        self.server.downloads.queue_download(video_id)
-        self._send_http(headers=[('Location', f'/video_about/{video_id}')],
-                        code=302)
-
-    def _send_query_page(self, query_id: QueryId) -> None:
-        conn = DatabaseConnection()
-        query = YoutubeQuery.get_one(conn, str(query_id))
-        results = YoutubeVideo.get_all_for_query(conn, query_id)
-        conn.commit_close()
-        self._send_rendered_template(
-                NAME_TEMPLATE_RESULTS,
-                {'query': query.text, 'videos': results})
-
-    def _send_queries_index_and_search(self) -> None:
-        conn = DatabaseConnection()
-        quota_count = QuotaLog.current(conn)
-        queries_data = YoutubeQuery.get_all(conn)
-        conn.commit_close()
-        queries_data.sort(key=lambda q: q.retrieved_at, reverse=True)
-        self._send_rendered_template(
-                NAME_TEMPLATE_QUERIES, {'queries': queries_data,
-                                        'quota_count': quota_count})
-
-    def _send_video_about(self, video_id: YoutubeId) -> None:
-        conn = DatabaseConnection()
-        linked_queries = YoutubeQuery.get_all_for_video(conn, video_id)
-        try:
-            video_data = YoutubeVideo.get_one(conn, video_id)
-        except NotFoundException:
-            video_data = YoutubeVideo(video_id)
-        conn.commit_close()
-        self._send_rendered_template(
-                NAME_TEMPLATE_VIDEO_ABOUT,
-                {'video_data': video_data,
-                 'is_temp': video_id in self.server.downloads.ids_unfinished,
-                 'file_path': self.server.downloads.ids_to_paths.get(video_id,
-                                                                     None),
-                 'youtube_prefix': YOUTUBE_URL_PREFIX,
-                 'queries': linked_queries})
-
-    def _send_videos_index(self) -> None:
-        videos = [(id_, PathStr(basename(path)))
-                  for id_, path in self.server.downloads.ids_to_paths.items()]
-        videos.sort(key=lambda t: t[1])
-        self._send_rendered_template(NAME_TEMPLATE_VIDEOS, {'videos': videos})
-
-    def _send_last_playlist_update(self) -> None:
-        payload: dict[str, PlayerUpdateId] = {
-                'last_update': self.server.player.last_update}
-        self._send_http(bytes(json_dumps(payload), 'utf8'),
-                        headers=[('Content-type', 'application/json')])
-
-    def _send_playlist(self) -> None:
-        tuples: list[tuple[PathStr, PathStr]] = []
-        i: int = 0
-        while True:
-            prev, next_ = PathStr(''), PathStr('')
-            if len(self.server.player.prev_files) > i:
-                prev = PathStr(basename(self.server.player.prev_files[i]))
-            if len(self.server.player.next_files) > i:
-                next_ = PathStr(basename(self.server.player.next_files[i]))
-            if not prev + next_:
-                break
-            tuples += [(prev, next_)]
-            i += 1
-        self._send_rendered_template(
-                NAME_TEMPLATE_PLAYLIST,
-                {'last_update': self.server.player.last_update,
-                 'running': self.server.player.is_running,
-                 'paused': self.server.player.is_paused,
-                 'current_title': self.server.player.current_filename,
-                 'tuples': tuples})
-
-
-def run():
-    """Create DownloadsDb, Player, run server loop."""
+if __name__ == '__main__':
     downloads_db = DownloadsDb()
     downloads_db.clean_unfinished()
     Thread(target=downloads_db.download_loop, daemon=False).start()
@@ -772,7 +17,3 @@ def run():
         print('aborted due to keyboard interrupt; '
               'repeat to end download thread too')
     server.server_close()
-
-
-if __name__ == '__main__':
-    run()
diff --git a/ytplom/__init__.py b/ytplom/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/ytplom/misc.py b/ytplom/misc.py
new file mode 100644 (file)
index 0000000..639acdf
--- /dev/null
@@ -0,0 +1,756 @@
+"""Main ytplom lib."""
+
+# included libs
+from typing import TypeAlias, Optional, NewType, Callable, Self, Any
+from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove
+from os.path import (isdir, isfile, exists as path_exists, join as path_join,
+                     splitext, basename)
+from random import shuffle
+from time import time, sleep
+from datetime import datetime, timedelta
+from json import dumps as json_dumps
+from uuid import uuid4
+from sqlite3 import connect as sql_connect, Cursor, Row
+from http.server import HTTPServer, BaseHTTPRequestHandler
+from urllib.parse import urlparse, parse_qs
+from urllib.request import urlretrieve
+from urllib.error import HTTPError
+# non-included libs
+from jinja2 import Template
+from mpv import MPV  # type: ignore
+from yt_dlp import YoutubeDL  # type: ignore
+import googleapiclient.discovery  # type: ignore
+
+# what we might want to manually define per environs
+API_KEY = environ.get('GOOGLE_API_KEY')
+HTTP_PORT = int(environ.get('YTPLOM_PORT', 8084))
+
+# type definitions for mypy
+DatetimeStr = NewType('DatetimeStr', str)
+QuotaCost = NewType('QuotaCost', int)
+YoutubeId = NewType('YoutubeId', str)
+PathStr = NewType('PathStr', str)
+QueryId = NewType('QueryId', str)
+QueryText = NewType('QueryText', str)
+ProseText = NewType('ProseText', str)
+SqlText = NewType('SqlText', str)
+AmountDownloads = NewType('AmountDownloads', int)
+PlayerUpdateId = NewType('PlayerUpdateId', str)
+DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr]
+TemplateContext: TypeAlias = dict[
+        str, None | bool | PlayerUpdateId | Optional[PathStr] | YoutubeId
+        | QueryText | QuotaCost | 'YoutubeVideo' | list['YoutubeVideo']
+        | list['YoutubeQuery'] | list[tuple[YoutubeId, PathStr]]
+        | list[tuple[PathStr, PathStr]]]
+
+# local data reasonably expected to be in user home directory
+PATH_HOME = PathStr(environ.get('HOME', ''))
+PATH_WORKDIR = PathStr(path_join(PATH_HOME, 'ytplom'))
+PATH_THUMBNAILS = PathStr(path_join(PATH_WORKDIR, 'thumbnails'))
+PATH_DB = PathStr(path_join(PATH_WORKDIR, 'db.sql'))
+PATH_DOWNLOADS = PathStr(path_join(PATH_WORKDIR, 'downloads'))
+PATH_TEMP = PathStr(path_join(PATH_WORKDIR, 'temp'))
+
+# template paths; might move outside PATH_WORKDIR in the future
+PATH_TEMPLATES = PathStr(path_join(PATH_WORKDIR, 'templates'))
+NAME_TEMPLATE_QUERIES = PathStr('queries.tmpl')
+NAME_TEMPLATE_RESULTS = PathStr('results.tmpl')
+NAME_TEMPLATE_VIDEOS = PathStr('videos.tmpl')
+NAME_TEMPLATE_VIDEO_ABOUT = PathStr('video_about.tmpl')
+NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl')
+PATH_TEMPLATE_QUERIES = PathStr(path_join(PATH_TEMPLATES,
+                                          NAME_TEMPLATE_QUERIES))
+
+# yt_dlp config
+YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
+        '/best[height<=1080][width<=1920]'
+YT_DL_PARAMS = {'paths': {'home': PATH_DOWNLOADS,
+                          'temp': PATH_TEMP},
+                'format': YT_DOWNLOAD_FORMAT}
+
+# Youtube API expectations
+YOUTUBE_URL_PREFIX = PathStr('https://www.youtube.com/watch?v=')
+THUMBNAIL_URL_PREFIX = PathStr('https://i.ytimg.com/vi/')
+THUMBNAIL_URL_SUFFIX = PathStr('/default.jpg')
+QUOTA_COST_YOUTUBE_SEARCH = QuotaCost(100)
+QUOTA_COST_YOUTUBE_DETAILS = QuotaCost(1)
+
+# local expectations
+TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
+LEGAL_EXTENSIONS = {'webm', 'mp4', 'mkv'}
+
+# tables to create database with
+SCRIPT_INIT_DB = '''
+CREATE TABLE yt_queries (
+  id TEXT PRIMARY KEY,
+  text TEXT NOT NULL,
+  retrieved_at TEXT NOT NULL
+);
+CREATE TABLE yt_videos (
+  id TEXT PRIMARY KEY,
+  title TEXT NOT NULL,
+  description TEXT NOT NULL,
+  published_at TEXT NOT NULL,
+  duration TEXT NOT NULL,
+  definition TEXT NOT NULL
+);
+CREATE TABLE yt_query_results (
+  query_id TEXT NOT NULL,
+  video_id TEXT NOT NULL,
+  PRIMARY KEY (query_id, video_id),
+  FOREIGN KEY (query_id) REFERENCES yt_queries(id),
+  FOREIGN KEY (video_id) REFERENCES yt_videos(id)
+);
+CREATE TABLE quota_costs (
+  id TEXT PRIMARY KEY,
+  timestamp TEXT NOT NULL,
+  cost INT NOT NULL
+);
+CREATE TABLE files (
+  rel_path TEXT PRIMARY KEY,
+  yt_id TEXT NOT NULL DEFAULT "",
+  FOREIGN KEY (yt_id) REFERENCES yt_videos(id)
+);
+'''
+
+
+class NotFoundException(BaseException):
+    """Call on DB fetches finding less than expected."""
+
+
+def _ensure_expected_dirs(expected_dirs: list[PathStr]) -> None:
+    """Ensure existance of expected_dirs _as_ directories."""
+    for dir_name in expected_dirs:
+        if not path_exists(dir_name):
+            print(f'creating expected directory: {dir_name}')
+            makedirs(dir_name)
+        elif not isdir(dir_name):
+            msg = f'at expected directory path {dir_name} found non-directory'
+            raise Exception(msg)
+
+
+class DatabaseConnection:
+    """Wrapped sqlite3.Connection."""
+
+    def __init__(self) -> None:
+        if not path_exists(PATH_DB):
+            with sql_connect(PATH_DB) as conn:
+                conn.executescript(SCRIPT_INIT_DB)
+        self._conn = sql_connect(PATH_DB)
+
+    def exec(self, sql: SqlText, inputs: tuple[Any, ...] = tuple()) -> Cursor:
+        """Wrapper around sqlite3.Connection.execute."""
+        return self._conn.execute(sql, inputs)
+
+    def commit_close(self) -> None:
+        """Run sqlite3.Connection.commit and .close."""
+        self._conn.commit()
+        self._conn.close()
+
+
+class DbData:
+    """Abstraction of common DB operation."""
+    _table_name: str
+    _cols: tuple[str, ...]
+
+    @classmethod
+    def _from_table_row(cls, row: Row) -> Self:
+        kwargs = {}
+        for i, col_name in enumerate(cls._cols):
+            kwargs[col_name] = row[i]
+        return cls(**kwargs)
+
+    @classmethod
+    def get_one(cls, conn: DatabaseConnection, id_: str) -> Self:
+        """Return single entry of id_ from DB."""
+        sql = SqlText(f'SELECT * FROM {cls._table_name} WHERE id = ?')
+        row = conn.exec(sql, (id_,)).fetchone()
+        if not row:
+            raise NotFoundException
+        return cls._from_table_row(row)
+
+    @classmethod
+    def get_all(cls, conn: DatabaseConnection) -> list[Self]:
+        """Return all entries from DB."""
+        sql = SqlText(f'SELECT * FROM {cls._table_name}')
+        rows = conn.exec(sql).fetchall()
+        return [cls._from_table_row(row) for row in rows]
+
+    def save(self, conn: DatabaseConnection) -> Cursor:
+        """Save entry to DB."""
+        vals = [getattr(self, col_name) for col_name in self._cols]
+        q_marks = '(' + ','.join(['?'] * len(vals)) + ')'
+        sql = SqlText(f'REPLACE INTO {self._table_name} VALUES {q_marks}')
+        return conn.exec(sql, tuple(vals))
+
+
+class YoutubeQuery(DbData):
+    """Representation of YouTube query (without results)."""
+    _table_name = 'yt_queries'
+    _cols = ('id_', 'text', 'retrieved_at')
+
+    def __init__(self,
+                 id_: Optional[QueryId],
+                 text: QueryText,
+                 retrieved_at: DatetimeStr
+                 ) -> None:
+        self.id_ = id_ if id_ else QueryId(str(uuid4()))
+        self.text = QueryText(text)
+        self.retrieved_at = retrieved_at
+
+    @classmethod
+    def get_all_for_video(cls,
+                          conn: DatabaseConnection,
+                          video_id: YoutubeId
+                          ) -> list[Self]:
+        """Return YoutubeQueries containing YoutubeVideo's ID in results."""
+        sql = SqlText('SELECT query_id FROM '
+                      'yt_query_results WHERE video_id = ?')
+        query_ids = conn.exec(sql, (video_id,)).fetchall()
+        return [cls.get_one(conn, query_id_tup[0])
+                for query_id_tup in query_ids]
+
+
+class YoutubeVideo(DbData):
+    """Representation of YouTube video metadata as provided by their API."""
+    _table_name = 'yt_videos'
+    _cols = ('id_', 'title', 'description', 'published_at', 'duration',
+             'definition')
+
+    def __init__(self,
+                 id_: YoutubeId,
+                 title: ProseText = ProseText('?'),
+                 description: ProseText = ProseText('?'),
+                 published_at: DatetimeStr = DatetimeStr('?'),
+                 duration: str = '?',
+                 definition: str = '?'
+                 ) -> None:
+        self.id_ = id_
+        self.title = title
+        self.description = description
+        self.published_at = published_at
+        self.duration = duration
+        self.definition = definition
+
+    def set_duration_from_yt_string(self, yt_string) -> None:
+        """Set .duration from the kind of format the YouTube API provides."""
+        date_dur, time_dur = yt_string.split('T')
+        seconds = 0
+        date_dur = date_dur[1:]
+        for dur_char, len_seconds in (('Y', 60*60*24*365.25),
+                                      ('M', 60*60*24*30),
+                                      ('D', 60*60*24)):
+            if dur_char in date_dur:
+                dur_str, date_dur = date_dur.split(dur_char)
+                seconds += int(dur_str) * int(len_seconds)
+        for dur_char, len_seconds in (('H', 60*60),
+                                      ('M', 60),
+                                      ('S', 1)):
+            if dur_char in time_dur:
+                dur_str, time_dur = time_dur.split(dur_char)
+                seconds += int(dur_str) * len_seconds
+        seconds_str = str(seconds % 60)
+        minutes_str = str(seconds // 60)
+        hours_str = str(seconds // (60 * 60))
+        self.duration = ':'.join([f'0{s}' if len(s) == 1 else s for s
+                                  in (hours_str, minutes_str, seconds_str)])
+
+    @classmethod
+    def get_all_for_query(cls,
+                          conn: DatabaseConnection,
+                          query_id: QueryId
+                          ) -> list[Self]:
+        """Return all videos for query of query_id."""
+        sql = SqlText('SELECT video_id '
+                      'FROM yt_query_results WHERE query_id = ?')
+        video_ids = conn.exec(sql, (query_id,)).fetchall()
+        return [cls.get_one(conn, video_id_tup[0])
+                for video_id_tup in video_ids]
+
+    def save_to_query(self,
+                      conn: DatabaseConnection,
+                      query_id: QueryId
+                      ) -> None:
+        """Save inclusion of self in results to query of query_id."""
+        conn.exec(SqlText('REPLACE INTO yt_query_results VALUES (?, ?)'),
+                  (query_id, self.id_))
+
+
+class VideoFile(DbData):
+    """Collects data about downloaded files."""
+    _table_name = 'files'
+    _cols = ('rel_path', 'yt_id')
+
+    def __init__(self, rel_path: PathStr, yt_id: YoutubeId) -> None:
+        self.rel_path = rel_path
+        self.yt_id = yt_id
+
+    def remove(self, conn: DatabaseConnection) -> None:
+        """Remove self from database by self.rel_path as identifier."""
+        sql = SqlText(f'DELETE FROM {self._table_name} WHERE rel_path = ?')
+        conn.exec(SqlText(sql), (self.rel_path,))
+
+
+class QuotaLog(DbData):
+    """Collects API access quota costs."""
+    _table_name = 'quota_costs'
+    _cols = ('id_', 'timestamp', 'cost')
+
+    def __init__(self,
+                 id_: Optional[str],
+                 timestamp: DatetimeStr,
+                 cost: QuotaCost
+                 ) -> None:
+        self.id_ = id_ if id_ else str(uuid4())
+        self.timestamp = timestamp
+        self.cost = cost
+
+    @classmethod
+    def update(cls, conn: DatabaseConnection, cost: QuotaCost) -> None:
+        """Adds cost mapped to current datetime."""
+        cls._remove_old(conn)
+        new = cls(None,
+                  DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)),
+                  QuotaCost(cost))
+        new.save(conn)
+
+    @classmethod
+    def current(cls, conn: DatabaseConnection) -> QuotaCost:
+        """Returns quota cost total for last 24 hours, purges old data."""
+        cls._remove_old(conn)
+        quota_costs = cls.get_all(conn)
+        return QuotaCost(sum(c.cost for c in quota_costs))
+
+    @classmethod
+    def _remove_old(cls, conn: DatabaseConnection) -> None:
+        cutoff = datetime.now() - timedelta(days=1)
+        sql = SqlText(f'DELETE FROM {cls._table_name} WHERE timestamp < ?')
+        conn.exec(SqlText(sql), (cutoff.strftime(TIMESTAMP_FMT),))
+
+
+class Player:
+    """MPV representation with some additional features."""
+
+    def __init__(self) -> None:
+        self.last_update = PlayerUpdateId('')
+        self._filenames = [PathStr(e.path) for e in scandir(PATH_DOWNLOADS)
+                           if isfile(e.path)
+                           and splitext(e.path)[1][1:] in LEGAL_EXTENSIONS]
+        shuffle(self._filenames)
+        self._idx: int = 0
+        self._mpv: Optional[MPV] = None
+
+    @property
+    def _mpv_available(self) -> bool:
+        return bool(self._mpv and not self._mpv.core_shutdown)
+
+    @staticmethod
+    def _if_mpv_available(f) -> Callable:
+        def wrapper(self):
+            return f(self) if self._mpv else None
+        return wrapper
+
+    def _signal_update(self) -> None:
+        self.last_update = PlayerUpdateId(f'{self._idx}:{time()}')
+
+    def _start_mpv(self) -> None:
+        self._mpv = MPV(input_default_bindings=True,
+                        input_vo_keyboard=True,
+                        config=True)
+        self._mpv.observe_property('pause', lambda a, b: self._signal_update())
+
+        @self._mpv.event_callback('start-file')
+        def on_start_file(_) -> None:
+            assert self._mpv is not None
+            self._mpv.pause = False
+            self._idx = self._mpv.playlist_pos
+            self._signal_update()
+
+        @self._mpv.event_callback('shutdown')
+        def on_shutdown(_) -> None:
+            self._mpv = None
+            self._signal_update()
+
+        for path in self._filenames:
+            self._mpv.playlist_append(path)
+        self._mpv.playlist_play_index(self._idx)
+
+    @property
+    def current_filename(self) -> Optional[PathStr]:
+        """Return what we assume is the name of the currently playing file."""
+        if not self._filenames:
+            return None
+        return PathStr(basename(self._filenames[self._idx]))
+
+    @property
+    def prev_files(self) -> list[PathStr]:
+        """List 'past' files of playlist."""
+        return list(reversed(self._filenames[:self._idx]))
+
+    @property
+    def next_files(self) -> list[PathStr]:
+        """List 'coming' files of playlist."""
+        return self._filenames[self._idx + 1:]
+
+    @property
+    def is_running(self) -> bool:
+        """Return if player is running/available."""
+        return self._mpv_available
+
+    @property
+    def is_paused(self) -> bool:
+        """Return if player is paused."""
+        if self._mpv_available:
+            assert self._mpv is not None
+            return self._mpv.pause
+        return False
+
+    def toggle_run(self) -> None:
+        """Toggle player running."""
+        if self._mpv_available:
+            assert self._mpv is not None
+            self._mpv.terminate()
+            self._mpv = None
+        else:
+            self._start_mpv()
+        self._signal_update()
+
+    @_if_mpv_available
+    def toggle_pause(self) -> None:
+        """Toggle player pausing."""
+        assert self._mpv is not None
+        self._mpv.pause = not self._mpv.pause
+        self._signal_update()
+
+    @_if_mpv_available
+    def prev(self) -> None:
+        """Move player to previous item in playlist."""
+        assert self._mpv is not None
+        if self._mpv.playlist_pos > 0:
+            self._mpv.playlist_prev()
+        else:
+            self._mpv.playlist_play_index(0)
+
+    @_if_mpv_available
+    def next(self) -> None:
+        """Move player to next item in playlist."""
+        assert self._mpv is not None
+        max_idx: int = len(self._mpv.playlist_filenames) - 1
+        if self._mpv.playlist_pos < len(self._mpv.playlist_filenames) - 1:
+            self._mpv.playlist_next()
+        else:
+            self._mpv.playlist_play_index(max_idx)
+
+
+class DownloadsDb:
+    """Collections downloading-related stuff."""
+
+    def __init__(self) -> None:
+        self._to_download: list[YoutubeId] = []
+        _ensure_expected_dirs([PATH_DOWNLOADS, PATH_TEMP])
+        self._sync_db()
+
+    def _sync_db(self):
+        conn = DatabaseConnection()
+        files_via_db = VideoFile.get_all(conn)
+        old_cwd = getcwd()
+        chdir(PATH_DOWNLOADS)
+        for file in files_via_db:
+            if not isfile(path_join(file.rel_path)):
+                print(f'SYNC: no file {file.rel_path} found, removing entry.')
+                file.remove(conn)
+        paths = [file.rel_path for file in files_via_db]
+        for path in [PathStr(e.path) for e in scandir() if isfile(e.path)]:
+            if path not in paths:
+                yt_id = self._id_from_filename(path)
+                file = VideoFile(path, yt_id)
+                print(f'SYNC: new file {path}, saving with YT ID "{yt_id}".')
+                file.save(conn)
+        chdir(old_cwd)
+        self._files = VideoFile.get_all(conn)
+        conn.commit_close()
+
+    @staticmethod
+    def _id_from_filename(path: PathStr,
+                          double_split: bool = False
+                          ) -> YoutubeId:
+        before_ext = splitext(path)[0]
+        if double_split:
+            before_ext = splitext(before_ext)[0]
+        return YoutubeId(before_ext.split('[')[-1].split(']')[0])
+
+    @property
+    def ids_to_paths(self) -> DownloadsIndex:
+        """Return mapping YoutubeIds:paths of files downloaded to them."""
+        self._sync_db()
+        return {f.yt_id: PathStr(path_join(PATH_DOWNLOADS, f.rel_path))
+                for f in self._files}
+
+    @property
+    def ids_unfinished(self) -> set[YoutubeId]:
+        """Return set of IDs of videos awaiting or currently in download."""
+        in_temp_dir = []
+        for path in [PathStr(e.path) for e
+                     in scandir(PATH_TEMP) if isfile(e.path)]:
+            in_temp_dir += [self._id_from_filename(path)]
+        return set(self._to_download + in_temp_dir)
+
+    def clean_unfinished(self) -> None:
+        """Empty temp directory of unfinished downloads."""
+        for e in [e for e in scandir(PATH_TEMP) if isfile(e.path)]:
+            print(f'removing unfinished download: {e.path}')
+            os_remove(e.path)
+
+    def queue_download(self, video_id: YoutubeId) -> None:
+        """Add video_id to download queue *if* not already processed."""
+        pre_existing = self.ids_unfinished | set(self._to_download
+                                                 + list(self.ids_to_paths))
+        if video_id not in pre_existing:
+            self._to_download += [video_id]
+
+    def _download_next(self) -> None:
+        if self._to_download:
+            video_id = self._to_download.pop(0)
+            with YoutubeDL(YT_DL_PARAMS) as ydl:
+                ydl.download([f'{YOUTUBE_URL_PREFIX}{video_id}'])
+
+    def download_loop(self) -> None:
+        """Keep iterating through download queue for new download tasks."""
+        while True:
+            sleep(0.5)
+            self._download_next()
+
+
+class Server(HTTPServer):
+    """Extension of HTTPServer providing for Player and DownloadsDb."""
+
+    def __init__(self,
+                 player: Player,
+                 downloads_db: DownloadsDb,
+                 *args, **kwargs
+                 ) -> None:
+        super().__init__(*args, **kwargs)
+        self.player = player
+        self.downloads = downloads_db
+
+
+class TaskHandler(BaseHTTPRequestHandler):
+    """Handler for GET and POST requests to our server."""
+    server: Server
+
+    def _send_http(self,
+                   content: bytes = b'',
+                   headers: Optional[list[tuple[str, str]]] = None,
+                   code: int = 200
+                   ) -> None:
+        headers = headers if headers else []
+        self.send_response(code)
+        for header_tuple in headers:
+            self.send_header(header_tuple[0], header_tuple[1])
+        self.end_headers()
+        if content:
+            self.wfile.write(content)
+
+    def do_POST(self) -> None:  # pylint:disable=invalid-name
+        """Map POST requests to handlers for various paths."""
+        url = urlparse(self.path)
+        toks_url: list[str] = url.path.split('/')
+        page_name = toks_url[1]
+        body_length = int(self.headers['content-length'])
+        postvars = parse_qs(self.rfile.read(body_length).decode())
+        if 'playlist' == page_name:
+            self._post_player_command(list(postvars.keys())[0])
+        elif 'queries' == page_name:
+            self._post_query(QueryText(postvars['query'][0]))
+
+    def _post_player_command(self, commands: str) -> None:
+        if 'pause' in commands:
+            self.server.player.toggle_pause()
+        elif 'prev' in commands:
+            self.server.player.prev()
+        elif 'next' in commands:
+            self.server.player.next()
+        elif 'stop' in commands:
+            self.server.player.toggle_run()
+        sleep(0.5)  # avoid reload happening before current_file update
+        self._send_http(headers=[('Location', '/')], code=302)
+
+    def _post_query(self, query_txt: QueryText) -> None:
+        conn = DatabaseConnection()
+
+        def collect_results(query_txt: QueryText) -> list[YoutubeVideo]:
+            youtube = googleapiclient.discovery.build('youtube', 'v3',
+                                                      developerKey=API_KEY)
+            QuotaLog.update(conn, QUOTA_COST_YOUTUBE_SEARCH)
+            search_request = youtube.search().list(
+                    q=query_txt,
+                    part='snippet',
+                    maxResults=25,
+                    safeSearch='none',
+                    type='video')
+            results: list[YoutubeVideo] = []
+            ids_to_detail: list[YoutubeId] = []
+            for item in search_request.execute()['items']:
+                video_id: YoutubeId = item['id']['videoId']
+                ids_to_detail += [video_id]
+                snippet = item['snippet']
+                urlretrieve(snippet['thumbnails']['default']['url'],
+                            path_join(PATH_THUMBNAILS, f'{video_id}.jpg'))
+                results += [YoutubeVideo(id_=video_id,
+                                         title=snippet['title'],
+                                         description=snippet['description'],
+                                         published_at=snippet['publishedAt'])]
+            QuotaLog.update(conn, QUOTA_COST_YOUTUBE_DETAILS)
+            ids_for_details = ','.join([r.id_ for r in results])
+            videos_request = youtube.videos().list(id=ids_for_details,
+                                                   part='content_details')
+            for i, detailed in enumerate(videos_request.execute()['items']):
+                result = results[i]
+                assert result.id_ == detailed['id']
+                content_details: dict[str, str] = detailed['contentDetails']
+                result.set_duration_from_yt_string(content_details['duration'])
+                result.definition = content_details['definition'].upper()
+            return results
+
+        query_data = YoutubeQuery(
+                None, query_txt,
+                DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)))
+        query_data.save(conn)
+        for result in collect_results(query_txt):
+            result.save(conn)
+            result.save_to_query(conn, query_data.id_)
+        conn.commit_close()
+        self._send_http(headers=[('Location', f'/query/{query_data.id_}')],
+                        code=302)
+
+    def do_GET(self) -> None:  # pylint:disable=invalid-name
+        """Map GET requests to handlers for various paths."""
+        url = urlparse(self.path)
+        toks_url: list[str] = url.path.split('/')
+        page_name = toks_url[1]
+        try:
+            if 'thumbnails' == page_name:
+                self._send_thumbnail(PathStr(toks_url[2]))
+            elif 'dl' == page_name:
+                self._send_or_download_video(YoutubeId(toks_url[2]))
+            elif 'videos' == page_name:
+                self._send_videos_index()
+            elif 'video_about' == page_name:
+                self._send_video_about(YoutubeId(toks_url[2]))
+            elif 'query' == page_name:
+                self._send_query_page(QueryId(toks_url[2]))
+            elif 'queries' == page_name:
+                self._send_queries_index_and_search()
+            elif '_last_playlist_update.json' == page_name:
+                self._send_last_playlist_update()
+            else:  # e.g. for /
+                self._send_playlist()
+        except NotFoundException:
+            self._send_http(b'not found', code=404)
+
+    def _send_rendered_template(self,
+                                tmpl_name: PathStr,
+                                tmpl_ctx: TemplateContext
+                                ) -> None:
+        with open(path_join(PATH_TEMPLATES, tmpl_name),
+                  'r', encoding='utf8'
+                  ) as templ_file:
+            tmpl = Template(str(templ_file.read()))
+        html = tmpl.render(**tmpl_ctx)
+        self._send_http(bytes(html, 'utf8'))
+
+    def _send_thumbnail(self, filename: PathStr) -> None:
+        _ensure_expected_dirs([PATH_THUMBNAILS])
+        path_thumbnail = path_join(PATH_THUMBNAILS, filename)
+        if not path_exists(path_thumbnail):
+            video_id = splitext(filename)[0]
+            url = f'{THUMBNAIL_URL_PREFIX}{video_id}{THUMBNAIL_URL_SUFFIX}'
+            try:
+                urlretrieve(url, path_join(PATH_THUMBNAILS, f'{video_id}.jpg'))
+            except HTTPError as e:
+                if 404 == e.code:
+                    raise NotFoundException from e
+                raise e
+        with open(path_thumbnail, 'rb') as f:
+            img = f.read()
+        self._send_http(img, [('Content-type', 'image/jpg')])
+
+    def _send_or_download_video(self, video_id: YoutubeId) -> None:
+        if video_id in self.server.downloads.ids_to_paths:
+            with open(self.server.downloads.ids_to_paths[video_id],
+                      'rb') as video_file:
+                video = video_file.read()
+            self._send_http(content=video)
+            return
+        self.server.downloads.queue_download(video_id)
+        self._send_http(headers=[('Location', f'/video_about/{video_id}')],
+                        code=302)
+
+    def _send_query_page(self, query_id: QueryId) -> None:
+        conn = DatabaseConnection()
+        query = YoutubeQuery.get_one(conn, str(query_id))
+        results = YoutubeVideo.get_all_for_query(conn, query_id)
+        conn.commit_close()
+        self._send_rendered_template(
+                NAME_TEMPLATE_RESULTS,
+                {'query': query.text, 'videos': results})
+
+    def _send_queries_index_and_search(self) -> None:
+        conn = DatabaseConnection()
+        quota_count = QuotaLog.current(conn)
+        queries_data = YoutubeQuery.get_all(conn)
+        conn.commit_close()
+        queries_data.sort(key=lambda q: q.retrieved_at, reverse=True)
+        self._send_rendered_template(
+                NAME_TEMPLATE_QUERIES, {'queries': queries_data,
+                                        'quota_count': quota_count})
+
+    def _send_video_about(self, video_id: YoutubeId) -> None:
+        conn = DatabaseConnection()
+        linked_queries = YoutubeQuery.get_all_for_video(conn, video_id)
+        try:
+            video_data = YoutubeVideo.get_one(conn, video_id)
+        except NotFoundException:
+            video_data = YoutubeVideo(video_id)
+        conn.commit_close()
+        self._send_rendered_template(
+                NAME_TEMPLATE_VIDEO_ABOUT,
+                {'video_data': video_data,
+                 'is_temp': video_id in self.server.downloads.ids_unfinished,
+                 'file_path': self.server.downloads.ids_to_paths.get(video_id,
+                                                                     None),
+                 'youtube_prefix': YOUTUBE_URL_PREFIX,
+                 'queries': linked_queries})
+
+    def _send_videos_index(self) -> None:
+        videos = [(id_, PathStr(basename(path)))
+                  for id_, path in self.server.downloads.ids_to_paths.items()]
+        videos.sort(key=lambda t: t[1])
+        self._send_rendered_template(NAME_TEMPLATE_VIDEOS, {'videos': videos})
+
+    def _send_last_playlist_update(self) -> None:
+        payload: dict[str, PlayerUpdateId] = {
+                'last_update': self.server.player.last_update}
+        self._send_http(bytes(json_dumps(payload), 'utf8'),
+                        headers=[('Content-type', 'application/json')])
+
+    def _send_playlist(self) -> None:
+        tuples: list[tuple[PathStr, PathStr]] = []
+        i: int = 0
+        while True:
+            prev, next_ = PathStr(''), PathStr('')
+            if len(self.server.player.prev_files) > i:
+                prev = PathStr(basename(self.server.player.prev_files[i]))
+            if len(self.server.player.next_files) > i:
+                next_ = PathStr(basename(self.server.player.next_files[i]))
+            if not prev + next_:
+                break
+            tuples += [(prev, next_)]
+            i += 1
+        self._send_rendered_template(
+                NAME_TEMPLATE_PLAYLIST,
+                {'last_update': self.server.player.last_update,
+                 'running': self.server.player.is_running,
+                 'paused': self.server.player.is_paused,
+                 'current_title': self.server.player.current_filename,
+                 'tuples': tuples})