*args, **kwargs)
self.config = config
self.jinja = JinjaEnv(loader=JinjaFSLoader(_PATH_TEMPLATES))
- self.player = Player()
+ self.player = Player(config.tags_prefilter)
self.downloads = DownloadsManager()
self.downloads.clean_unfinished()
self.downloads.start_thread()
self.server.player.move_entry(int(command.split('_')[1]))
elif command.startswith('down_'):
self.server.player.move_entry(int(command.split('_')[1]), False)
- for k in [k for k in ('filter_path', 'filter_tags')
- if k in postvars.as_dict]:
- setattr(self.server.player, k, postvars.as_dict[k])
+ if 'filter_path' in postvars.as_dict:
+ self.server.player.filter_path = FilterStr(
+ postvars.first_for('filter_path'))
+ if 'filter_tags' in postvars.as_dict:
+ self.server.player.filter_tags = {
+ Tag(t) for t in postvars.first_for('filter_tags').split(',')
+ if t}
self._send_http('OK', code=200)
def _receive_files_command(self, postvars: _ReqMap) -> None:
def _send_files_index(self, params: _ReqMap) -> None:
filter_path = FilterStr(params.first_for('filter_path'))
- filter_tags = FilterStr(params.first_for('filter_tags'))
+ filter_tags_str = params.first_for('filter_tags')
+ filter_tags = self.server.config.tags_prefilter | {
+ Tag(t) for t in filter_tags_str.split(',') if t}
show_absent = bool(params.first_for('show_absent'))
with DbConn() as conn:
files = VideoFile.get_filtered(
{'files': files,
'selected': 'files',
'filter_path': filter_path,
- 'filter_tags': filter_tags,
+ 'filter_tags': filter_tags_str,
'show_absent': show_absent})
def _send_missing_json(self) -> None:
'port': 8090,
'port_remote': 8090,
'background_color': '#ffffff',
- 'queries_cutoff': ''
+ 'queries_cutoff': '',
+ 'tags_prefilter_str': ''
}
# type definitions for mypy
api_key: str
background_color: str
queries_cutoff: str
+ tags_prefilter_str: str
def __init__(self):
def set_attrs_from_dict(d):
set_attrs_from_dict({k[len(ENVIRON_PREFIX):].lower(): v
for k, v in environ.items()
if k.isupper() and k.startswith(ENVIRON_PREFIX)})
+ self.tags_prefilter = {
+ Tag(t) for t in self.tags_prefilter_str.split(',') if t}
class YoutubeQuery(DbData):
@classmethod
def get_filtered(cls,
conn: BaseDbConn,
- filter_path: FilterStr = FilterStr(''),
- filter_tags: FilterStr = FilterStr(''),
+ filter_path: FilterStr,
+ filter_tags: set[Tag],
show_absent: bool = False
) -> list[Self]:
"""Return cls.get_all matching provided filter criteria."""
and (show_absent or f.present)]
if filter_tags:
to_remove = set()
- tags_and = filter_tags.split(',')
for f in filtered_before_tags:
- for t in [t for t in tags_and if t not in f.tags]:
+ for t in [t for t in filter_tags if t not in f.tags]:
to_remove.add(f)
for f in to_remove:
filtered_before_tags.remove(f)
"""MPV representation with some additional features."""
_idx: int
- def __init__(self) -> None:
+ def __init__(self, tags_prefilter: set[Tag]) -> None:
self.last_update = DatetimeStr('')
self._mpv: Optional[MPV] = None
self._kill_queue: Queue = Queue()
self._monitoring_kill = False
self.filter_path = FilterStr('')
- self.filter_tags = FilterStr('')
+ self._tags_prefilter = tags_prefilter
+ self.filter_tags: set[Tag] = set()
self.load_files_and_start()
def _monitor_kill(self) -> None:
known_files = {
f.full_path: f for f
in VideoFile.get_filtered(
- conn, self.filter_path, self.filter_tags)}
+ conn, self.filter_path,
+ self._tags_prefilter | self.filter_tags)}
self.files = [known_files[p] for p in PATH_DOWNLOADS.iterdir()
if p in known_files
and p.is_file()