"""Collect directly HTTP-related elements."""
+
+# included libs
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from json import dumps as json_dumps, loads as json_loads
from pathlib import Path
from urllib.parse import parse_qs, urlparse
from urllib.request import urlretrieve
from urllib.error import HTTPError
+# non-included libs
from jinja2 import ( # type: ignore
Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader)
+# ourselves
from ytplom.db import Hash, DbConn
from ytplom.misc import (
FilterStr, FlagName, QueryId, QueryText, TagSet, YoutubeId,
)
from ytplom.primitives import NotFoundException, PATH_APP_DATA
+
# API expectations
_THUMBNAIL_URL_PREFIX = 'https://i.ytimg.com/vi/'
_THUMBNAIL_URL_SUFFIX = '/default.jpg'
self.as_str = map_as_str
@property
- def as_dict(self) -> dict[str, list[str]]:
- """Return parsed to dictionary."""
+ def _as_dict(self) -> dict[str, list[str]]:
if self.is_json:
return json_loads(self.as_str)
return parse_qs(self.as_str)
- @property
- def single_key(self) -> str:
- """Return single .as_dict key, implicitly assuming there's only one."""
- return list(self.as_dict.keys())[0]
+ def has_key(self, key: str) -> bool:
+ """Return if key exists at all."""
+ return key in self._as_dict
def first_for(self, key: str) -> str:
- """Return .as_dict[key][0] if possible, else ''."""
- return self.as_dict.get(key, [''])[0]
+ """Return first value mapped to key, '' if none."""
+ return self._as_dict.get(key, [''])[0]
def all_for(self, key: str) -> list[str]:
"""Return all values mapped to key."""
- return self.as_dict.get(key, [])
+ return self._as_dict.get(key, [])
def key_starting_with(self, start: str) -> Generator:
"""From .as_dict yield key starting with start."""
- for k in self.as_dict:
+ for k in self._as_dict:
if k.startswith(start):
yield k
self.send_header(header_tuple[0], header_tuple[1])
self.end_headers()
if content:
- if isinstance(content, str):
- content = bytes(content, 'utf8')
- self.wfile.write(content)
+ self.wfile.write(bytes(content, 'utf8') if isinstance(content, str)
+ else content)
def _redirect(self, target: Path) -> None:
self._send_http(headers=[('Location', str(target))], code=302)
VideoFile.purge_deleteds(conn)
self.server.player.load_files_and_mpv()
conn.commit()
- self._send_http('OK', code=200)
+ self._send_http('OK')
def _receive_player_command(self, postvars: _ReqMap) -> None:
command = postvars.first_for('command')
self.server.player.move_entry(int(command.split('_')[1]))
elif command.startswith('down_'):
self.server.player.move_entry(int(command.split('_')[1]), False)
- if 'filter_path' in postvars.as_dict:
+ if postvars.has_key('filter_path'):
self.server.player.filter_path = FilterStr(
postvars.first_for('filter_path'))
- if 'needed_tags' in postvars.as_dict:
+ if postvars.has_key('needed_tags'):
self.server.player.needed_tags = TagSet.from_joined(
postvars.first_for('needed_tags'))
- self._send_http('OK', code=200)
+ self._send_http('OK')
def _receive_files_command(self, postvars: _ReqMap) -> None:
for k in postvars.key_starting_with('play_'):
return # … this display filter might have suppressed set tags
with DbConn() as conn:
file = VideoFile.get_one(conn, digest)
- if 'unlink' in postvars.as_dict:
+ if postvars.has_key('unlink'):
file.unlink_locally()
file.set_flags({FILE_FLAGS[FlagName(name)]
for name in postvars.all_for('flags')})
payload['title_tags'] = tags
payload['title_digest'] = digest
payload['title'] = title
- if 'playlist' in params.as_dict:
+ if params.has_key('playlist'):
payload['idx'] = self.server.player.idx
payload['playlist_files'] = [
{'rel_path': str(f.rel_path), 'digest': f.digest.b64}