{% block script %}
-const RELOAD_INTERVAL_S = 10;
-const PATH_LAST_UPDATE = '{{page_names.last_update}}';
-const MSG_SERVER_DOWN = 'Server seems to be unavailable.';
-const MSG_ERR_UNKNOWN = 'Unknown error checking ' + PATH_LAST_UPDATE;
-const last_update = '{{last_update}}';
-async function keep_updated() {
- try {
- const response = await fetch(PATH_LAST_UPDATE);
- const data = await response.json();
- if (data.last_update != last_update) {
- location.reload();
- }
- } catch(error) {
- const status = document.getElementById('status');
- if (error instanceof TypeError && !error.response) {
- status.innerText = MSG_SERVER_DOWN;
- } else {
- status.innerText = MSG_ERR_UNKNOWN;
- }
- }
- setTimeout(keep_updated, RELOAD_INTERVAL_S * 1000);
-}
-window.onload = keep_updated;
+
+const PATH_PREFIX_TAGS_UPDATE = '/{{page_names.file_tags}}/';
+const CLS_PLAYLIST_ROW = 'playlist_row';
+path_tags_update = '?';
+events_params += 'playlist=1';
+
+function get_el_and_empty(id) {
+ let el = document.getElementById(id);
+ el.textContent = '';
+ return el; }
+
+function new_child_to(tag, parent, textContent='') {
+ const el = document.createElement(tag);
+ parent.appendChild(el);
+ el.textContent = textContent;
+ return el; }
+
+event_handlers.push(function(data) { // update playlist
+ const table = document.getElementById('playlist_rows');
+ var old_rows = document.getElementsByClassName(CLS_PLAYLIST_ROW);
+ while (old_rows[0]) {
+ old_rows[0].remove(); }
+ for (let i = 0; i < data.playlist_files.length; i++) {
+ const file = data.playlist_files[i];
+ const tr = new_child_to('tr', table);
+ tr.classList.add(CLS_PLAYLIST_ROW);
+ const td_entry_control = new_child_to('td', tr);
+ td_entry_control.classList.add('entry_control');
+ if (data.current_file && data.current_file.digest == file.digest) {
+ td_entry_control.textContent = 'playing'; }
+ else {
+ for (const [symbol, prefix] of [['>', 'jump'],
+ ['^', 'up'],
+ ['v', 'down']]) {
+ const btn = new_child_to('button', td_entry_control, symbol);
+ btn.onclick = function() { player_command(`${prefix}_${i}`) }; }}
+ const td_link = new_child_to('td', tr);
+ const a_file = new_child_to('a', td_link, file.rel_path);
+ a_file.href = `/${data.link_prefix}/${file.digest}`; }})
+
+event_handlers.push(function(data) { // update current_file table
+ const td_current_path = get_el_and_empty('current_path');
+ const td_current_yt_id = get_el_and_empty('current_yt_id');
+ const table_current_tags = get_el_and_empty('current_tags');
+ const datalist_unused_tags = get_el_and_empty('unused_tags');
+ const btn_update = document.getElementById('btn_update_tags');
+ btn_update.disabled = true;
+ path_tags_update = '?';
+ if (data.current_file) {
+ const a_path = new_child_to('a', td_current_path, data.current_file.rel_path);
+ a_path.href = `/${data.link_prefix}/${data.current_file.digest}`;
+ if (data.current_file.yt_id) {
+ const a_yt = new_child_to('a', td_current_yt_id, data.current_file.yt_id);
+ a_yt.href = `/${data.yt_result_prefix}/${data.current_file.yt_id}` }
+ for (const tag of data.current_file.tags) {
+ const tr = new_child_to('tr', table_current_tags);
+ const td_checkbox = new_child_to('td', tr);
+ td_checkbox.classList.add('tag_checkbox');
+ const checkbox = new_child_to('input', td_checkbox);
+ checkbox.type = 'checkbox';
+ checkbox.name = 'tag_input';
+ checkbox.value = tag;
+ checkbox.checked = true;
+ new_child_to('td', tr, tag); }
+ const tr = new_child_to('tr', table_current_tags);
+ new_child_to('td', tr, 'add:');
+ const td_input = new_child_to('td', tr);
+ const tag_input = new_child_to('input', td_input);
+ tag_input.setAttribute('list', 'unused_tags');
+ tag_input.name = 'tag_input';
+ for (const tag of data.current_file.unused_tags) {
+ const option = new_child_to('option', datalist_unused_tags, tag);
+ option.value = tag; }
+ btn_update.disabled = false;
+ path_tags_update = `${PATH_PREFIX_TAGS_UPDATE}${data.current_file.digest}`; }});
+
+function update_tags() {
+ var tags = [];
+ for (const tag_input of document.getElementsByName('tag_input')) {
+ if (tag_input.value && ('checkbox' != tag_input.type || tag_input.checked)) {
+ tags.push(tag_input.value); }}
+ send_to({tags: tags}, path_tags_update); }
+
+function redo_playlist() {
+ send_to({filter_path: document.getElementsByName('filter_path')[0].value,
+ filter_tags: document.getElementsByName('filter_tags')[0].value},
+ PATH_PLAYER);
+ player_command('reload'); }
+
{% endblock %}
{% block css %}
td.screen_half { width: 50%; }
th.screen_half_titles { text-align: center; }
-td.entry_buttons { width: 5em; }
+td.entry_control { width: 5em; }
td.tag_checkboxes { width: 1em; }
{% endblock %}
{% block body %}
-{{ macros.nav_head(page_names, redir_target, player_state, "playlist") }}
<table>
-<tr><td colspan=2>
-<form method="GET">
-filter filename: <input name="filter_path" value="{{filter_path}}" />
-filter tags: <input name="filter_tags" value="{{filter_tags}}" />
-<input type="submit" value="filter" />
-</form>
-</td></tr>
-<tr>
<td class="screen_half">
-{% if current_file %}
-{{ macros.file_data_form(current_file, unused_tags, page_names, redir_target, playlist_view=true) }}
-{% endif %}
-</td>
-<td class="screen_half">
-<form action="{{redir_target}}" method="POST">
+
<table>
-<tr><th colspan=2 class="screen_half_titles"><form action="{{redir_target}}" method="POST">playlist <input type="submit" name="reload" value="reload"></form></th></tr>
-{% for idx, file in files_w_idx %}
-<tr>
-<td class="entry_buttons">
-{% if file.digest == current_file.digest %}
-PLAYING
-{% else %}
-<input type="submit" name="jump_{{idx}}" value=">" />
-<input type="submit" name="up_{{idx}}" value="{% if reverse %}v{% else %}^{% endif %}" />
-<input type="submit" name="down_{{idx}}" value="{% if reverse %}^{% else %}v{% endif %}" />
-{% endif %}
-</td>
-<td><a href="/{{page_names.file}}/{{file.digest.b64}}">{{ file.rel_path }}</a></td>
+<tr><th colspan=2 class="screen_half_titles">playlist config</th></tr>
+<tr><th>filter filename</th><td><input name="filter_path" value="{{filter_path}}" /></td></tr>
+<tr><th>filter tags</th><td><input name="filter_tags" value="{{filter_tags}}" /></td></tr>
+<tr><td colspan=2><button onclick="redo_playlist()">reload</button></td></tr>
+</table>
+
+<table>
+<tr><th colspan=2 class="screen_half_titles">current selection</th></tr>
+<tr><th>path:</th><td id="current_path"></td></tr>
+<tr><th>YouTube ID:</th><td id="current_yt_id"></td>
</tr>
-{% endfor %}
+<tr><th>tags</th><td><table id="current_tags"></table></td></tr>
+</table>
+<button id="btn_update_tags" onclick="update_tags()" disabled>update</button>
+<datalist id="unused_tags"></datalist>
+
+</td>
+<td class="screen_half">
+
+<table id="playlist_rows">
+<tr><th colspan=2 class="screen_half_titles">playlist</th></tr>
</table>
-</form>
+
</td>
</tr>
</table>
"""Collect directly HTTP-related elements."""
-from http.server import HTTPServer, BaseHTTPRequestHandler
-from json import dumps as json_dumps
+from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
+from json import dumps as json_dumps, loads as json_loads
from pathlib import Path
from time import sleep
from typing import Any, Generator, Optional, TypeAlias
Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader)
from ytplom.db import Hash, DbConn
from ytplom.misc import (
- FilterStr, FlagName, PlayerUpdateId, QueryId, QueryText, Tag, UrlStr,
- YoutubeId,
+ FilterStr, FlagName, QueryId, QueryText, Tag, UrlStr, YoutubeId,
FILE_FLAGS, PATH_THUMBNAILS, YOUTUBE_URL_PREFIX,
ensure_expected_dirs,
Config, DownloadsManager, Player, QuotaLog, VideoFile, YoutubeQuery,
from ytplom.primitives import NotFoundException, PATH_APP_DATA
# type definitions for mypy
-
_PageNames: TypeAlias = dict[str, Path]
_ReqDict: TypeAlias = dict[str, list[str]]
_TemplateContext: TypeAlias = dict[str, Any]
# page names
PAGE_NAMES: _PageNames = {
'download': Path('dl'),
+ 'events': Path('events'),
'file': Path('file'),
+ 'file_tags': Path('file_tags'),
'files': Path('files'),
- 'last_update': Path('last_playlist_update.json'),
- 'missing': Path('missing.json'),
+ 'missing': Path('missing'),
'player': Path('player'),
'playlist': Path('playlist'),
'thumbnails': Path('thumbnails'),
'yt_queries': Path('yt_queries')
}
+# misc
+_HEADER_CONTENT_TYPE = 'Content-Type'
+_HEADER_APP_JSON = 'application/json'
+
class _ReqMap:
- """Wrapper over parse_qs results, i.e. HTTP postvars or query params."""
+ """Wrapper over dictionary-like HTTP postings."""
- def __init__(self, map_as_str: str) -> None:
+ def __init__(self, map_as_str: str, is_json: bool = False) -> None:
+ self.is_json = is_json
self.as_str = map_as_str
@property
def as_dict(self) -> _ReqDict:
- """Return as parse_qs-resulting dictionary."""
+ """Return parsed to dictionary."""
+ if self.is_json:
+ return json_loads(self.as_str)
return parse_qs(self.as_str)
@property
"""Return single .as_dict key, implicitly assuming there's only one."""
return list(self.as_dict.keys())[0]
- def single_value(self, key: str) -> str:
+ def first_for(self, key: str) -> str:
"""Return .as_dict[key][0] if possible, else ''."""
return self.as_dict.get(key, [''])[0]
+ def all_for(self, key: str) -> list[str]:
+ """Return all values mapped to key."""
+ return self.as_dict.get(key, [])
+
def key_starting_with(self, start: str) -> Generator:
"""From .as_dict yield key starting with start."""
for k in self.as_dict:
yield k
-class Server(HTTPServer):
- """Extension of HTTPServer providing for Player and DownloadsManager."""
+class Server(ThreadingHTTPServer):
+ """Extension of parent server providing for Player and DownloadsManager."""
def __init__(self, config: Config, *args, **kwargs) -> None:
super().__init__((config.host, config.port), _TaskHandler,
server: Server
def _send_http(self,
- content: bytes = b'',
+ content: str | bytes = b'',
headers: Optional[list[tuple[str, str]]] = None,
code: int = 200
) -> None:
self.send_header(header_tuple[0], header_tuple[1])
self.end_headers()
if content:
+ if isinstance(content, str):
+ content = bytes(content, 'utf8')
self.wfile.write(content)
def _redirect(self, target: Path) -> None:
def do_POST(self) -> None: # pylint:disable=invalid-name
"""Map POST requests to handlers for various paths."""
- url = urlparse(self.path)
- toks_url = Path(url.path).parts
+ toks_url = Path(urlparse(self.path).path).parts
page_name = Path(toks_url[1] if len(toks_url) > 1 else '')
- body_length = int(self.headers['content-length'])
- postvars = _ReqMap(self.rfile.read(body_length).decode())
- if PAGE_NAMES['player'] == page_name:
- self._receive_player_command(postvars.as_dict)
- elif PAGE_NAMES['playlist'] == page_name:
- self._receive_playlist_command(postvars.single_key, url.query)
- elif PAGE_NAMES['files'] == page_name:
+ postvars = _ReqMap(
+ self.rfile.read(int(self.headers['content-length'])).decode(),
+ _HEADER_APP_JSON == self.headers[_HEADER_CONTENT_TYPE])
+ if PAGE_NAMES['files'] == page_name:
self._receive_files_command(postvars)
elif PAGE_NAMES['file'] == page_name:
- self._receive_file_data(Hash.from_b64(toks_url[2]),
- postvars.as_dict)
+ self._receive_file_data(Hash.from_b64(toks_url[2]), postvars)
+ elif PAGE_NAMES['file_tags'] == page_name:
+ self._receive_file_tags(Hash.from_b64(toks_url[2]), postvars)
elif PAGE_NAMES['yt_queries'] == page_name:
- self._receive_yt_query(QueryText(postvars.single_value('query')))
+ self._receive_yt_query(QueryText(postvars.first_for('query')))
+ elif PAGE_NAMES['player'] == page_name:
+ self._receive_player_command(postvars)
- def _receive_player_command(self, postvars: _ReqDict) -> None:
- if 'pause' in postvars.keys():
+ def _receive_player_command(self, postvars: _ReqMap) -> None:
+ command = postvars.first_for('command')
+ if 'pause' == command:
self.server.player.toggle_pause()
- elif 'prev' in postvars.keys():
+ elif 'prev' == command:
self.server.player.prev()
- elif 'next' in postvars.keys():
+ elif 'next' == command:
self.server.player.next()
- elif 'stop' in postvars.keys():
+ elif 'stop' == command:
self.server.player.toggle_run()
- self._redirect(Path(postvars['redir_target'][0]))
-
- def _receive_playlist_command(self, command: str, params_str: str) -> None:
- if 'reload' == command:
- self.server.player.clear()
+ elif 'reload' == command:
+ self.server.player.reload()
elif command.startswith('jump_'):
self.server.player.jump_to(int(command.split('_')[1]))
- elif command.startswith('up'):
+ elif command.startswith('up_'):
self.server.player.move_entry(int(command.split('_')[1]))
elif command.startswith('down_'):
self.server.player.move_entry(int(command.split('_')[1]), False)
- sleep(0.5) # avoid redir happening before current_file update
- self._redirect(Path('/')
- .joinpath(f'{PAGE_NAMES["playlist"]}?{params_str}'))
+ for k in [k for k in ('filter_path', 'filter_tags')
+ if k in postvars.as_dict]:
+ setattr(self.server.player, k, postvars.as_dict[k])
+ self._send_http('OK', code=200)
def _receive_files_command(self, postvars: _ReqMap) -> None:
for k in postvars.key_starting_with('play_'):
file = VideoFile.get_one(
conn, Hash.from_b64(k.split('_', 1)[1]))
self.server.player.inject_and_play(file)
- self._redirect(Path(postvars.as_dict['redir_target'][0]))
+ self._redirect(Path(postvars.first_for('redir_target')))
- def _receive_file_data(self, digest: Hash, postvars: _ReqDict) -> None:
- flag_names = [FlagName(f) for f in postvars.get('flags', [])]
+ def _receive_file_tags(self, digest: Hash, postvars: _ReqMap) -> None:
with DbConn() as conn:
file = VideoFile.get_one(conn, digest)
- file.set_flags([FILE_FLAGS[name] for name in flag_names])
- file.tags = [Tag(t) for t in postvars.get('tags', [])]
+ file.tags = {Tag(t) for t in postvars.all_for('tags')}
+ file.save(conn)
+ conn.commit()
+ self._send_http('OK', code=200)
+
+ def _receive_file_data(self, digest: Hash, postvars: _ReqMap) -> None:
+ with DbConn() as conn:
+ file = VideoFile.get_one(conn, digest)
+ file.set_flags({FILE_FLAGS[FlagName(name)]
+ for name in postvars.all_for('flags')})
+ file.tags = {Tag(t) for t in postvars.all_for('tags')}
file.save(conn)
conn.commit()
file.ensure_absence_if_deleted()
- self._redirect(Path(postvars['redir_target'][0]))
+ self._redirect(Path(postvars.first_for('redir_target')))
def _receive_yt_query(self, query_txt: QueryText) -> None:
with DbConn() as conn:
self._send_yt_query_page(QueryId(toks_url[2]))
elif PAGE_NAMES['yt_queries'] == page_name:
self._send_yt_queries_index_and_search()
- elif PAGE_NAMES['last_update'] == page_name:
- self._send_last_playlist_update()
+ elif PAGE_NAMES['events'] == page_name:
+ self._send_events(_ReqMap(url.query))
else: # e.g. for /
- self._send_playlist(_ReqMap(url.query))
+ self._send_playlist()
except NotFoundException as e:
- self._send_http(bytes(str(e), 'utf8'), code=404)
+ self._send_http(str(e), code=404)
def _send_rendered_template(self,
tmpl_name: Path,
tmpl_ctx: _TemplateContext
) -> None:
tmpl = self.server.jinja.get_template(str(tmpl_name))
+ tmpl_ctx['selected'] = tmpl_ctx.get('selected', '')
tmpl_ctx['redir_target'] = self.path
tmpl_ctx['background_color'] = self.server.config.background_color
tmpl_ctx['page_names'] = PAGE_NAMES
- tmpl_ctx['player_state'] = {
- 'running': self.server.player.is_running,
- 'paused': self.server.player.is_paused,
- 'title': (self.server.player.current_file_cached.rel_path
- if self.server.player.current_file_cached
- else 'none')}
- html = tmpl.render(**tmpl_ctx)
- self._send_http(bytes(html, 'utf8'))
+ self._send_http(tmpl.render(**tmpl_ctx))
def _send_thumbnail(self, filename: Path) -> None:
ensure_expected_dirs([PATH_THUMBNAILS])
raise NotFoundException from e
raise e
with path_thumbnail.open('rb') as f:
- img = f.read()
- self._send_http(img, [('Content-type', 'image/jpg')])
+ self._send_http(f.read(), [(_HEADER_CONTENT_TYPE, 'image/jpg')])
def _send_or_download_video(self, video_id: YoutubeId) -> None:
try:
.joinpath(video_id))
return
with file_data.full_path.open('rb') as video_file:
- video = video_file.read()
- self._send_http(content=video)
+ self._send_http(content=video_file.read())
def _send_yt_query_page(self, query_id: QueryId) -> None:
with DbConn() as conn:
query = YoutubeQuery.get_one(conn, str(query_id))
results = YoutubeVideo.get_all_for_query(conn, query_id)
- self._send_rendered_template(
- _NAME_TEMPLATE_RESULTS,
- {'query': query.text, 'videos': results})
+ self._send_rendered_template(_NAME_TEMPLATE_RESULTS,
+ {'query': query.text, 'videos': results})
def _send_yt_queries_index_and_search(self) -> None:
with DbConn() as conn:
quota_count = QuotaLog.current(conn)
queries_data = YoutubeQuery.get_all(conn)
queries_data.sort(key=lambda q: q.retrieved_at, reverse=True)
- self._send_rendered_template(
- _NAME_TEMPLATE_QUERIES,
- {'queries': queries_data, 'quota_count': quota_count})
+ self._send_rendered_template(_NAME_TEMPLATE_QUERIES,
+ {'queries': queries_data,
+ 'quota_count': quota_count,
+ 'selected': 'yt_queries'})
def _send_yt_result(self, video_id: YoutubeId) -> None:
conn = DbConn()
'unused_tags': unused_tags})
def _send_files_index(self, params: _ReqMap) -> None:
- filter_path = FilterStr(params.single_value('filter_path'))
- filter_tags = FilterStr(params.single_value('filter_tags'))
- show_absent = bool(params.single_value('show_absent'))
+ filter_path = FilterStr(params.first_for('filter_path'))
+ filter_tags = FilterStr(params.first_for('filter_tags'))
+ show_absent = bool(params.first_for('show_absent'))
with DbConn() as conn:
files = VideoFile.get_filtered(
conn, filter_path, filter_tags, show_absent)
files.sort(key=lambda t: t.rel_path)
- self._send_rendered_template(
- _NAME_TEMPLATE_FILES,
- {'files': files,
- 'filter_path': filter_path,
- 'filter_tags': filter_tags,
- 'show_absent': show_absent})
+ self._send_rendered_template(_NAME_TEMPLATE_FILES,
+ {'files': files,
+ 'selected': 'files',
+ 'filter_path': filter_path,
+ 'filter_tags': filter_tags,
+ 'show_absent': show_absent})
def _send_missing_json(self) -> None:
with DbConn() as conn:
missing = [str(f.rel_path) for f in VideoFile.get_all(conn)
if f.missing]
- self._send_http(bytes(json_dumps(missing), 'utf8'),
- headers=[('Content-type', 'application/json')])
-
- def _send_last_playlist_update(self) -> None:
- payload: dict[str, PlayerUpdateId] = {
- 'last_update': self.server.player.last_update}
- self._send_http(bytes(json_dumps(payload), 'utf8'),
- headers=[('Content-type', 'application/json')])
-
- def _send_playlist(self, params: _ReqMap) -> None:
- filter_path = FilterStr(params.single_value('filter_path'))
- filter_tags = FilterStr(params.single_value('filter_tags'))
- if self.server.player.empty or filter_path or filter_tags:
- self.server.player.load_files_and_start(filter_path, filter_tags)
- current_file, unused_tags = None, set()
- with DbConn() as conn:
- if self.server.player.current_file_cached:
- current_file = VideoFile.get_one(
- conn, self.server.player.current_file_cached.digest)
- unused_tags = current_file.unused_tags(conn)
+ self._send_http(json_dumps(missing),
+ headers=[(_HEADER_CONTENT_TYPE, _HEADER_APP_JSON)])
+
+ def _send_playlist(self) -> None:
+ if self.server.player.empty:
+ self.server.player.load_files_and_start()
self._send_rendered_template(
_NAME_TEMPLATE_PLAYLIST,
- {'last_update': self.server.player.last_update,
- 'running': self.server.player.is_running,
- 'paused': self.server.player.is_paused,
- 'current_file': current_file,
- 'filter_path': filter_path,
- 'filter_tags': filter_tags,
- 'unused_tags': unused_tags,
- 'files_w_idx': list(enumerate(self.server.player.files))
- })
+ {'selected': 'playlist',
+ 'filter_path': self.server.player.filter_path,
+ 'filter_tags': self.server.player.filter_tags})
+
+ def _send_events(self, params: _ReqMap) -> None:
+ self._send_http(headers=[(_HEADER_CONTENT_TYPE, 'text/event-stream'),
+ ('Cache-Control', 'no-cache'),
+ ('Connection', 'keep-alive')])
+ playing: Optional[VideoFile] = None
+ last_sent = ''
+ while True:
+ if not self.server.player.current_digest:
+ playing = None
+ elif ((not playing)
+ or (playing.digest != self.server.player.current_digest)):
+ with DbConn() as conn:
+ playing = VideoFile.get_one(
+ conn, self.server.player.current_digest)
+ last_playing_update = (
+ VideoFile.last_updates_since_start.get(playing.digest, '')
+ if playing
+ else '')
+ if playing and last_playing_update > playing.last_update:
+ with DbConn() as conn:
+ playing = VideoFile.get_one(conn, playing.digest)
+ last_update = max(self.server.player.last_update,
+ last_playing_update)
+ if last_sent < last_update:
+ last_sent = last_update
+ data = {
+ 'last_update': self.server.player.last_update,
+ 'running': self.server.player.is_running,
+ 'paused': self.server.player.is_paused,
+ 'title': str(playing.rel_path) if playing else 'none'}
+ if 'playlist' in params.as_dict:
+ data['playlist_files'] = [
+ {'rel_path': str(f.rel_path), 'digest': f.digest.b64}
+ for f in self.server.player.files]
+ data['link_prefix'] = str(PAGE_NAMES['file'])
+ if playing:
+ with DbConn() as conn:
+ unused_tags = playing.unused_tags(conn)
+ data['current_file'] = {
+ 'digest': playing.digest.b64,
+ 'rel_path': str(playing.rel_path),
+ 'yt_id': playing.yt_id,
+ 'tags': list(playing.tags),
+ 'unused_tags': list(unused_tags)}
+ data['yt_link_prefix'] = str(PAGE_NAMES['yt_result'])
+ try:
+ self.wfile.write(
+ f'data: {json_dumps(data)}\n\n'.encode())
+ self.wfile.flush()
+ except BrokenPipeError:
+ return
+ sleep(0.25)
from typing import NewType, Optional, Self, TypeAlias
from os import chdir, environ
from random import shuffle
-from time import time, sleep
+from time import sleep
from datetime import datetime, timedelta
from json import loads as json_loads
from urllib.request import urlretrieve
FlagsInt = NewType('FlagsInt', int)
Tag = NewType('Tag', str)
AmountDownloads = NewType('AmountDownloads', int)
-PlayerUpdateId = NewType('PlayerUpdateId', str)
UrlStr = NewType('UrlStr', str)
FilesWithIndex: TypeAlias = list[tuple[int, 'VideoFile']]
last_update: DatetimeStr
rel_path: Path
digest: Hash
- tags: list[Tag]
+ tags: set[Tag]
+ last_updates_since_start: dict[Hash, DatetimeStr] = {}
def __init__(self,
digest: Optional[Hash],
self.rel_path = rel_path
self.digest = digest if digest else Hash.from_file(self.full_path)
self.flags = flags
- self.tags = [Tag(t) for t in tags_str.split(',')] if tags_str else []
+ self.tags = set([Tag(t) for t in tags_str.split(',')]
+ if tags_str else [])
self.yt_id = yt_id
if last_update is None:
self._renew_last_update()
def _renew_last_update(self):
self.last_update = DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT))
- self._hash_on_last_update = hash(self)
+ self.last_updates_since_start[self.digest] = self.last_update
def save(self, conn: BaseDbConn) -> Cursor:
"""Extend super().save by new .last_update if sufficient changes."""
"""Return if file absent despite absence of 'delete' flag."""
return not (self.is_flag_set(FlagName('delete')) or self.present)
- def set_flags(self, flags: list[FlagsInt]) -> None:
+ def set_flags(self, flags: set[FlagsInt]) -> None:
"""Set .flags to bitwise OR of FlagsInt in flags."""
self.flags = FlagsInt(0)
for flag in flags:
_idx: int
def __init__(self) -> None:
- self.last_update = PlayerUpdateId('')
+ self.last_update = DatetimeStr('')
self._mpv: Optional[MPV] = None
self._kill_queue: Queue = Queue()
self._monitoring_kill = False
+ self.filter_path = FilterStr('')
+ self.filter_tags = FilterStr('')
self.load_files_and_start()
def _monitor_kill(self) -> None:
Thread(target=kill_on_queue_get, daemon=True).start()
- def load_files_and_start(self,
- filter_path: FilterStr = FilterStr(''),
- filter_tags: FilterStr = FilterStr('')
- ) -> None:
+ def load_files_and_start(self) -> None:
"""Collect filtered files into playlist, start player."""
with DbConn() as conn:
known_files = {
- f.full_path: f for f
- in VideoFile.get_filtered(conn, filter_path, filter_tags)}
+ f.full_path: f for f
+ in VideoFile.get_filtered(
+ conn, self.filter_path, self.filter_tags)}
self.files = [known_files[p] for p in PATH_DOWNLOADS.iterdir()
if p in known_files
and p.is_file()
self._start_mpv()
def _signal_update(self) -> None:
- self.last_update = PlayerUpdateId(f'{self._idx}:{time()}')
+ self.last_update = DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT))
def _start_mpv(self) -> None:
self._mpv = MPV(input_default_bindings=True,
self._signal_update()
def _play_at_index(self):
+ self._signal_update()
if self._mpv:
self._mpv.command('playlist-play-index', self._idx)
return 0 == len(self.files)
@property
- def current_file_cached(self) -> Optional[VideoFile]:
- """Return cached version of the currently playing file."""
+ def current_digest(self) -> Optional[Hash]:
+ """Return hash digest ID of currently playing file."""
if not self.files:
return None
- return self.files[self._idx]
+ return self.files[self._idx].digest
@property
def is_running(self) -> bool:
self._mpv.command('playlist-remove', i1)
self._mpv.command('loadfile', path, 'insert-at', i0)
self.files[i0], self.files[i1] = self.files[i1], self.files[i0]
+ self._signal_update()
- def clear(self) -> None:
- """Close MPV, empty filenames."""
+ def reload(self) -> None:
+ """Close MPV, empty filenames, restart."""
self._kill_mpv()
self.files.clear()
+ self.load_files_and_start()
def inject_and_play(self, file: VideoFile) -> None:
"""Inject file after current title, then jump to it."""