from json import dumps as json_dumps
from pathlib import Path
from time import sleep
-from typing import NewType, Optional, TypeAlias
-from urllib.parse import urlparse, parse_qs
+from typing import Generator, NewType, Optional, TypeAlias
+from urllib.parse import parse_qs, urlparse
from urllib.request import urlretrieve
from urllib.error import HTTPError
from jinja2 import ( # type: ignore
Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader)
from ytplom.db import Hash, DbConn
from ytplom.misc import (
- FilesWithIndex, FlagName, PlayerUpdateId, QueryId, QueryText,
- QuotaCost, Tag, UrlStr, YoutubeId,
+ FilesWithIndex, FilterStr, FlagName, PlayerUpdateId, QueryId,
+ QueryText, QuotaCost, Tag, UrlStr, YoutubeId,
FILE_FLAGS, PATH_THUMBNAILS, YOUTUBE_URL_PREFIX,
ensure_expected_dirs,
Config, DownloadsManager, Player, QuotaLog, VideoFile, YoutubeQuery,
from ytplom.primitives import NotFoundException, PATH_APP_DATA
# type definitions for mypy
+
+_ColorStr = NewType('_ColorStr', str)
_PageNames: TypeAlias = dict[str, Path]
-_FilterStr = NewType('_FilterStr', str)
+_ReqDict: TypeAlias = dict[str, list[str]]
_TemplateContext: TypeAlias = dict[
str,
None | bool
- | FilesWithIndex | _PageNames | _FilterStr | Path | PlayerUpdateId
- | QueryText | QuotaCost | UrlStr | 'VideoFile' | YoutubeId
- | 'YoutubeVideo' | list[FlagName] | set['Tag'] | list['VideoFile']
- | list['YoutubeVideo'] | list['YoutubeQuery']
+ | _ColorStr | FilesWithIndex | _PageNames | FilterStr | Path
+ | PlayerUpdateId | QueryText | QuotaCost | UrlStr | 'VideoFile'
+ | YoutubeId | 'YoutubeVideo' | list[FlagName] | set['Tag']
+ | list['VideoFile'] | list['YoutubeVideo'] | list['YoutubeQuery']
]
# API expectations
}
+class _ReqMap:
+ """Wrapper over parse_qs results, i.e. HTTP postvars or query params."""
+
+ def __init__(self, map_as_str: str) -> None:
+ self.as_str = map_as_str
+
+ @property
+ def as_dict(self) -> _ReqDict:
+ """Return as parse_qs-resulting dictionary."""
+ return parse_qs(self.as_str)
+
+ @property
+ def single_key(self) -> str:
+ """Return single .as_dict key, implicitly assuming there's only one."""
+ return list(self.as_dict.keys())[0]
+
+ def single_value(self, key: str) -> str:
+ """Return .as_dict[key][0] if possible, else ''."""
+ return self.as_dict.get(key, [''])[0]
+
+ def key_starting_with(self, start: str) -> Generator:
+ """From .as_dict yield key starting with start."""
+ for k in self.as_dict:
+ if k.startswith(start):
+ yield k
+
+
class Server(HTTPServer):
"""Extension of HTTPServer providing for Player and DownloadsManager."""
toks_url = Path(url.path).parts
page_name = Path(toks_url[1] if len(toks_url) > 1 else '')
body_length = int(self.headers['content-length'])
- postvars = parse_qs(self.rfile.read(body_length).decode())
+ postvars = _ReqMap(self.rfile.read(body_length).decode())
+ # postvars = parse_qs(self.rfile.read(body_length).decode())
if PAGE_NAMES['playlist'] == page_name:
- self._receive_player_command(list(postvars.keys())[0])
+ self._receive_player_command(postvars.single_key, url.query)
if PAGE_NAMES['files'] == page_name:
- self._receive_files_command(list(postvars.keys())[0])
+ self._receive_files_command(postvars)
elif PAGE_NAMES['file'] == page_name:
self._receive_file_data(Hash.from_b64(toks_url[2]),
- postvars)
+ postvars.as_dict)
elif PAGE_NAMES['yt_queries'] == page_name:
- self._receive_yt_query(QueryText(postvars['query'][0]))
+ self._receive_yt_query(QueryText(postvars.single_value('query')))
- def _receive_player_command(self, command: str) -> None:
+ def _receive_player_command(self, command: str, params_str: str) -> None:
if 'pause' == command:
self.server.player.toggle_pause()
elif 'prev' == command:
elif 'stop' == command:
self.server.player.toggle_run()
elif 'reload' == command:
- self.server.player.reload()
+ self.server.player.clear()
elif command.startswith('jump_'):
self.server.player.jump_to(int(command.split('_')[1]))
elif command.startswith('up'):
elif command.startswith('down_'):
self.server.player.move_entry(int(command.split('_')[1]), False)
sleep(0.5) # avoid redir happening before current_file update
- self._redirect(Path('/'))
+ self._redirect(Path('/')
+ .joinpath(f'{PAGE_NAMES["playlist"]}?{params_str}'))
- def _receive_files_command(self, command: str) -> None:
- if command.startswith('play_'):
+ def _receive_files_command(self, postvars: _ReqMap) -> None:
+ for k in postvars.key_starting_with('play_'):
with DbConn() as conn:
file = VideoFile.get_one(
- conn, Hash.from_b64(command.split('_', 1)[1]))
+ conn, Hash.from_b64(k.split('_', 1)[1]))
self.server.player.inject_and_play(file)
- self._redirect(Path('/'))
+ self._redirect(Path(postvars.as_dict['redir_target'][0]))
- def _receive_file_data(self,
- digest: Hash,
- postvars: dict[str, list[str]]
- ) -> None:
+ def _receive_file_data(self, digest: Hash, postvars: _ReqDict) -> None:
flag_names = [FlagName(f) for f in postvars.get('flags', [])]
with DbConn() as conn:
file = VideoFile.get_one(conn, digest)
file.save(conn)
conn.commit()
file.ensure_absence_if_deleted()
- self._redirect(Path(postvars['redir'][0]))
+ self._redirect(Path(postvars['redir_target'][0]))
def _receive_yt_query(self, query_txt: QueryText) -> None:
with DbConn() as conn:
elif PAGE_NAMES['download'] == page_name:
self._send_or_download_video(YoutubeId(toks_url[2]))
elif PAGE_NAMES['files'] == page_name:
- self._send_files_index(parse_qs(url.query))
+ self._send_files_index(_ReqMap(url.query))
elif PAGE_NAMES['file'] == page_name:
self._send_file_data(Hash.from_b64(toks_url[2]))
elif PAGE_NAMES['yt_result'] == page_name:
elif PAGE_NAMES['last_update'] == page_name:
self._send_last_playlist_update()
else: # e.g. for /
- self._send_playlist()
+ self._send_playlist(_ReqMap(url.query))
except NotFoundException as e:
self._send_http(bytes(str(e), 'utf8'), code=404)
tmpl_ctx: _TemplateContext
) -> None:
tmpl = self.server.jinja.get_template(str(tmpl_name))
- tmpl_ctx['background_color'] = self.server.config.background_color
+ tmpl_ctx['background_color'] = _ColorStr(
+ self.server.config.background_color)
tmpl_ctx['page_names'] = PAGE_NAMES
html = tmpl.render(**tmpl_ctx)
self._send_http(bytes(html, 'utf8'))
'flag_names': list(FILE_FLAGS),
'unused_tags': unused_tags})
- def _send_files_index(self, params: dict[str, list[str]]) -> None:
- filter_path = _FilterStr(params.get('filter_path', [''])[0])
- filter_tags = _FilterStr(params.get('filter_tags', [''])[0])
- show_absent = bool(params.get('show_absent', [False])[0])
+ def _send_files_index(self, params: _ReqMap) -> None:
+ filter_path = FilterStr(params.single_value('filter_path'))
+ filter_tags = FilterStr(params.single_value('filter_tags'))
+ show_absent = bool(params.single_value('show_absent'))
with DbConn() as conn:
- files = [f for f in VideoFile.get_all(conn)
- if str(filter_path).lower() in str(f.rel_path).lower()
- and ([t for t in f.tags if str(filter_tags).lower() in t]
- or not filter_tags)
- and (show_absent or f.present)]
+ files = VideoFile.get_filtered(
+ conn, filter_path, filter_tags, show_absent)
files.sort(key=lambda t: t.rel_path)
- self._send_rendered_template(_NAME_TEMPLATE_FILES,
- {'files': files,
- 'filter_path': filter_path,
- 'filter_tags': filter_tags,
- 'show_absent': show_absent})
+ self._send_rendered_template(
+ _NAME_TEMPLATE_FILES,
+ {'files': files,
+ 'filter_path': filter_path,
+ 'filter_tags': filter_tags,
+ 'show_absent': show_absent,
+ 'redir_target': Path(
+ f'/{PAGE_NAMES["files"]}?{params.as_str}')})
def _send_missing_json(self) -> None:
with DbConn() as conn:
self._send_http(bytes(json_dumps(payload), 'utf8'),
headers=[('Content-type', 'application/json')])
- def _send_playlist(self) -> None:
- if self.server.player.empty:
- self.server.player.load_files()
+ def _send_playlist(self, params: _ReqMap) -> None:
+ filter_path = FilterStr(params.single_value('filter_path'))
+ filter_tags = FilterStr(params.single_value('filter_tags'))
+ if self.server.player.empty or filter_path or filter_tags:
+ self.server.player.load_files(filter_path, filter_tags)
current_file, unused_tags = None, set()
- if self.server.player.current_file_digest:
- with DbConn() as conn:
+ with DbConn() as conn:
+ if self.server.player.current_file_digest:
current_file = VideoFile.get_one(
conn, self.server.player.current_file_digest)
unused_tags = current_file.unused_tags(conn)
'running': self.server.player.is_running,
'paused': self.server.player.is_paused,
'current_file': current_file,
+ 'filter_path': filter_path,
+ 'filter_tags': filter_tags,
+ 'redir_target': Path(
+ f'/{PAGE_NAMES["playlist"]}?{params.as_str}'),
'unused_tags': unused_tags,
'files_w_idx': list(enumerate(self.server.player.files))
})
QueryId = NewType('QueryId', str)
QueryText = NewType('QueryText', str)
ProseText = NewType('ProseText', str)
+FilterStr = NewType('FilterStr', str)
FlagName = NewType('FlagName', str)
FlagsInt = NewType('FlagsInt', int)
Tag = NewType('Tag', str)
raise NotFoundException(f'no entry for file to Youtube ID {yt_id}')
return cls._from_table_row(row)
+ @classmethod
+ def get_filtered(cls,
+ conn: BaseDbConn,
+ filter_path: FilterStr = FilterStr(''),
+ filter_tags: FilterStr = FilterStr(''),
+ show_absent: bool = False
+ ) -> list[Self]:
+ """Return cls.get_all matching provided filter criteria."""
+ return [f for f in cls.get_all(conn)
+ if str(filter_path).lower() in str(f.rel_path).lower()
+ and ([t for t in f.tags if str(filter_tags).lower() in t]
+ or not filter_tags)
+ and (show_absent or f.present)]
+
def unused_tags(self, conn: BaseDbConn) -> set[Tag]:
"""Return tags used among other VideoFiles, not in self."""
tags = set()
Thread(target=kill_on_queue_get, daemon=True).start()
- def load_files(self) -> None:
+ def load_files(self,
+ filter_path: FilterStr = FilterStr(''),
+ filter_tags: FilterStr = FilterStr('')
+ ) -> None:
"""Collect files in PATH_DOWNLOADS DB-known and of legal extension."""
with DbConn() as conn:
- known_files = {f.full_path: f for f in VideoFile.get_all(conn)}
+ known_files = {
+ f.full_path: f for f
+ in VideoFile.get_filtered(conn, filter_path, filter_tags)}
self.files = [known_files[p] for p in PATH_DOWNLOADS.iterdir()
if p in known_files
and p.is_file()
self._mpv.command('loadfile', path, 'insert-at', i0)
self.files[i0], self.files[i1] = self.files[i1], self.files[i0]
- def reload(self) -> None:
- """Close MPV, re-read (and re-shuffle) filenames, then re-start MPV."""
+ def clear(self) -> None:
+ """Close MPV, empty filenames."""
self._kill_mpv()
- self.load_files()
- self._start_mpv()
- self._signal_update()
+ self.files.clear()
def inject_and_play(self, file: VideoFile) -> None:
"""Inject file after current title, then jump to it."""