*args, **kwargs)
self.config = config
self.jinja = JinjaEnv(loader=JinjaFSLoader(_PATH_TEMPLATES))
- self.player = Player(config.tags_prefilter)
+ self.player = Player(config.needed_tags_prefilter)
self.downloads = DownloadsManager()
self.downloads.clean_unfinished()
self.downloads.start_thread()
if 'filter_path' in postvars.as_dict:
self.server.player.filter_path = FilterStr(
postvars.first_for('filter_path'))
- if 'filter_tags' in postvars.as_dict:
- self.server.player.filter_tags = {
- Tag(t) for t in postvars.first_for('filter_tags').split(',')
- if t}
+ if 'needed_tags' in postvars.as_dict:
+ self.server.player.needed_tags = Tag.from_joined(
+ postvars.first_for('needed_tags'))
self._send_http('OK', code=200)
def _receive_files_command(self, postvars: _ReqMap) -> None:
def _send_files_index(self, params: _ReqMap) -> None:
filter_path = FilterStr(params.first_for('filter_path'))
- filter_tags_str = params.first_for('filter_tags')
- filter_tags = self.server.config.tags_prefilter | {
- Tag(t) for t in filter_tags_str.split(',') if t}
+ needed_tags_str = params.first_for('needed_tags')
+ needed_tags = (self.server.config.needed_tags_prefilter
+ | Tag.from_joined(needed_tags_str))
show_absent = bool(params.first_for('show_absent'))
with DbConn() as conn:
files = VideoFile.get_filtered(
- conn, filter_path, filter_tags, show_absent)
+ conn, filter_path, needed_tags, show_absent)
files.sort(key=lambda t: t.rel_path)
self._send_rendered_template(_NAME_TEMPLATE_FILES,
{'files': files,
'selected': 'files',
'filter_path': filter_path,
- 'filter_tags': filter_tags_str,
+ 'needed_tags': needed_tags_str,
'show_absent': show_absent})
def _send_missing_json(self) -> None:
_NAME_TEMPLATE_PLAYLIST,
{'selected': 'playlist',
'filter_path': self.server.player.filter_path,
- 'filter_tags': self.server.player.filter_tags})
+ 'needed_tags': ','.join([
+ str(t) for t in self.server.player.needed_tags])})
def _send_events(self, params: _ReqMap) -> None:
self._send_http(headers=[(_HEADER_CONTENT_TYPE, 'text/event-stream'),
'running': self.server.player.is_running,
'paused': self.server.player.is_paused,
'title_digest': playing.digest.b64 if playing else '',
- 'title_tags': ', '.join(playing.tags) if playing else '',
+ 'title_tags': playing.tags_str if playing else '',
'title': str(playing.rel_path) if playing else 'none'}
if 'playlist' in params.as_dict:
data['playlist_files'] = [
"""Main ytplom lib."""
# included libs
-from typing import NewType, Optional, Self
+from typing import Any, NewType, Optional, Self
from os import chdir, environ
from random import shuffle
from time import sleep
'port_remote': 8090,
'background_color': '#ffffff',
'queries_cutoff': '',
- 'tags_prefilter_str': '',
+ 'needed_tags_prefilter_str': '',
'allow_file_edit': True
}
FilterStr = NewType('FilterStr', str)
FlagName = NewType('FlagName', str)
FlagsInt = NewType('FlagsInt', int)
-Tag = NewType('Tag', str)
# major expected directories
PATH_DOWNLOADS = Path.home().joinpath('ytplom_downloads')
api_key: str
background_color: str
queries_cutoff: str
- tags_prefilter_str: str
+ needed_tags_prefilter_str: str
allow_file_edit: bool
def __init__(self):
set_attrs_from_dict({k[len(ENVIRON_PREFIX):].lower(): v
for k, v in environ.items()
if k.isupper() and k.startswith(ENVIRON_PREFIX)})
- self.tags_prefilter = {
- Tag(t) for t in self.tags_prefilter_str.split(',') if t}
+ self.needed_tags_prefilter = Tag.from_joined(
+ self.needed_tags_prefilter_str)
class YoutubeQuery(DbData):
(query_id, self.id_))
+class Tag:
+ """Represents individual VideoFile.tags."""
+
+ def __init__(self, tag_str: str) -> None:
+ self._str = tag_str
+
+ def __hash__(self) -> int:
+ return hash(self._str)
+
+ def __eq__(self, other: Any) -> bool:
+ if not isinstance(other, self.__class__):
+ return False
+ return self._str == other._str
+
+ def __str__(self):
+ return self._str
+
+ @classmethod
+ def from_joined(cls, joined: str) -> set[Self]:
+ """Build set from comma-delimited units in joined."""
+ return {cls(t) for t in joined.split(',') if t}
+
+
class VideoFile(DbData):
"""Collects data about downloaded files."""
id_name = 'digest'
self.rel_path = rel_path
self.digest = digest if digest else Hash.from_file(self.full_path)
self.flags = flags
- self.tags = set([Tag(t) for t in tags_str.split(',')]
- if tags_str else [])
+ self.tags = Tag.from_joined(tags_str)
self.yt_id = yt_id
if last_update is None:
self._renew_last_update()
def get_filtered(cls,
conn: BaseDbConn,
filter_path: FilterStr,
- filter_tags: set[Tag],
+ needed_tags: set[Tag],
show_absent: bool = False
) -> list[Self]:
"""Return cls.get_all matching provided filter criteria."""
- filtered_before_tags = [
- f for f in cls.get_all(conn)
- if str(filter_path).lower() in str(f.rel_path).lower()
- and (show_absent or f.present)]
- if filter_tags:
- to_remove = set()
- for f in filtered_before_tags:
- for t in [t for t in filter_tags if t not in f.tags]:
- to_remove.add(f)
- for f in to_remove:
- filtered_before_tags.remove(f)
- return filtered_before_tags
+ return [f for f in cls.get_all(conn)
+ if (show_absent or f.present)
+ and str(filter_path).lower() in str(f.rel_path).lower()
+ and not [t for t in needed_tags if t not in f.tags]]
def unused_tags(self, conn: BaseDbConn) -> set[Tag]:
"""Return tags used among other VideoFiles, not in self."""
- tags = set()
+ tags: set[Tag] = set()
for file in self.get_all(conn):
- for tag in [t for t in file.tags if t not in self.tags]:
- tags.add(tag)
+ tags = tags | {t for t in file.tags if t not in self.tags}
return tags
@property
- def tags_str(self):
+ def tags_str(self) -> str:
"""Return self.tags joined by ','."""
- return ','.join(self.tags)
+ return ','.join([str(s) for s in self.tags])
@property
def full_path(self) -> Path:
"""MPV representation with some additional features."""
_idx: int
- def __init__(self, tags_prefilter: set[Tag]) -> None:
+ def __init__(self, needed_tags_prefilter: set[Tag]) -> None:
self.last_update = DatetimeStr('')
self._mpv: Optional[MPV] = None
self._kill_queue: Queue = Queue()
self._monitoring_kill = False
self.filter_path = FilterStr('')
- self._tags_prefilter = tags_prefilter
- self.filter_tags: set[Tag] = set()
+ self._needed_tags_prefilter = needed_tags_prefilter
+ self.needed_tags: set[Tag] = set()
self.load_files_and_start()
def _monitor_kill(self) -> None:
f.full_path: f for f
in VideoFile.get_filtered(
conn, self.filter_path,
- self._tags_prefilter | self.filter_tags)}
+ self._needed_tags_prefilter | self.needed_tags)}
self.files = [known_files[p] for p in PATH_DOWNLOADS.iterdir()
if p in known_files
and p.is_file()