Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader)
from ytplom.db import Hash, DbConn
from ytplom.misc import (
- FilterStr, FlagName, QueryId, QueryText, Tag, YoutubeId,
+ FilterStr, FlagName, QueryId, QueryText, TagSet, YoutubeId,
FILE_FLAGS, PATH_THUMBNAILS, YOUTUBE_URL_PREFIX,
ensure_expected_dirs,
Config, DownloadsManager, Player, QuotaLog, VideoFile, YoutubeQuery,
*args, **kwargs)
self.config = config
self.jinja = JinjaEnv(loader=JinjaFSLoader(_PATH_TEMPLATES))
- self.player = Player(config.needed_tags_prefilter,
- config.whitelist_tags_prefilter)
+ self.player = Player(config.whitelist_tags_display,
+ config.whitelist_tags_prefilter,
+ config.needed_tags_prefilter)
self.downloads = DownloadsManager()
self.downloads.clean_unfinished()
self.downloads.start_thread()
self.server.player.filter_path = FilterStr(
postvars.first_for('filter_path'))
if 'needed_tags' in postvars.as_dict:
- self.server.player.needed_tags = Tag.from_joined(
+ self.server.player.needed_tags = TagSet.from_joined(
postvars.first_for('needed_tags'))
self._send_http('OK', code=200)
self._redirect(Path(postvars.first_for('redir_target')))
def _receive_file_data(self, digest: Hash, postvars: _ReqMap) -> None:
- if not self.server.config.allow_file_edit:
- self._send_http('no way', code=403)
- return
+ if ((not self.server.config.allow_file_edit)
+ or self.server.config.whitelist_tags_display): # also here, …
+ self._send_http('no way', code=403) # … cuz input form under …
+ return # … this display filter might have suppressed set tags
with DbConn() as conn:
file = VideoFile.get_one(conn, digest)
file.set_flags({FILE_FLAGS[FlagName(name)]
for name in postvars.all_for('flags')})
- file.tags = {Tag(t) for t in postvars.all_for('tags')}
+ file.tags = TagSet.from_str_list(postvars.all_for('tags'))
file.save(conn)
conn.commit()
file.ensure_absence_if_deleted()
def _send_file_data(self, digest: Hash) -> None:
with DbConn() as conn:
- file = VideoFile.get_one(conn, digest)
+ file = VideoFile.get_one_with_whitelist_tags_display(
+ conn, digest, self.server.config.whitelist_tags_display)
unused_tags = file.unused_tags(conn)
self._send_rendered_template(
_NAME_TEMPLATE_FILE_DATA,
conn,
filter_path,
self.server.config.needed_tags_prefilter,
- Tag.from_joined(needed_tags_str),
+ TagSet.from_joined(needed_tags_str),
self.server.config.whitelist_tags_prefilter,
+ self.server.config.whitelist_tags_display,
show_absent)
files.sort(key=lambda t: t.rel_path)
+ for f in files:
+ f.whitelist_tags_display =\
+ self.server.config.whitelist_tags_display
self._send_rendered_template(_NAME_TEMPLATE_FILES,
{'files': files,
'selected': 'files',
_NAME_TEMPLATE_PLAYLIST,
{'selected': 'playlist',
'filter_path': self.server.player.filter_path,
- 'needed_tags': ','.join([
- str(t) for t in self.server.player.needed_tags])})
+ 'needed_tags': self.server.player.needed_tags.joined})
def _send_events(self, params: _ReqMap) -> None:
self._send_http(headers=[(_HEADER_CONTENT_TYPE, 'text/event-stream'),
elif ((not playing)
or (playing.digest != self.server.player.current_digest)):
with DbConn() as conn:
- playing = VideoFile.get_one(
- conn, self.server.player.current_digest)
+ playing = VideoFile.get_one_with_whitelist_tags_display(
+ conn,
+ self.server.player.current_digest,
+ self.server.config.whitelist_tags_display)
if last_sent < self.server.player.last_update:
last_sent = self.server.player.last_update
+ title, tags, digest = '', '', ''
+ if playing:
+ tags = playing.tags_showable.joined
+ title = str(playing.rel_path)
+ digest = playing.digest.b64
data = {
'last_update': self.server.player.last_update,
'running': self.server.player.is_running,
'paused': self.server.player.is_paused,
- 'title_digest': playing.digest.b64 if playing else '',
- 'title_tags': playing.tags_str if playing else '',
- 'title': str(playing.rel_path) if playing else 'none'}
+ 'title_digest': digest,
+ 'title_tags': tags,
+ 'title': title}
if 'playlist' in params.as_dict:
data['playlist_files'] = [
{'rel_path': str(f.rel_path), 'digest': f.digest.b64}
"""Main ytplom lib."""
# included libs
-from typing import Any, NewType, Optional, Self
+from typing import Generator, NewType, Optional, Self
from os import chdir, environ
from random import shuffle
from time import sleep
'queries_cutoff': '',
'whitelist_tags_prefilter_str': '',
'needed_tags_prefilter_str': '',
+ 'whitelist_tags_display_str': '',
'allow_file_edit': True
}
needed_tags_prefilter_str: str
whitelist_tags_prefilter_str: str
allow_file_edit: bool
+ whitelist_tags_display_str: str
def __init__(self):
def set_attrs_from_dict(d):
set_attrs_from_dict({k[len(ENVIRON_PREFIX):].lower(): v
for k, v in environ.items()
if k.isupper() and k.startswith(ENVIRON_PREFIX)})
- self.needed_tags_prefilter = Tag.from_joined(
+ self.needed_tags_prefilter = TagSet.from_joined(
self.needed_tags_prefilter_str)
- self.whitelist_tags_prefilter = Tag.from_joined(
+ self.whitelist_tags_prefilter = TagSet.from_joined(
self.whitelist_tags_prefilter_str)
+ self.whitelist_tags_display = TagSet.from_joined(
+ self.whitelist_tags_display_str)
class YoutubeQuery(DbData):
(query_id, self.id_))
-class Tag:
- """Represents individual VideoFile.tags."""
+class TagSet:
+ """Collection of tags as used in VideoFile.tags."""
- def __init__(self, tag_str: str) -> None:
- self._str = tag_str
+ def __init__(self, tags: Optional[set[str]] = None) -> None:
+ self._tags = tags if tags else set()
- def __hash__(self) -> int:
- return hash(self._str)
+ def __iter__(self) -> Generator:
+ yield from self._tags
+
+ def add(self, other_tags: Self) -> None:
+ """To self, add tags of other_tags."""
+ # pylint:disable=protected-access
+ self._tags = self._tags | other_tags._tags
+
+ def all_not_in(self, other_tags: Self) -> Self:
+ """Return all tags of self _not_ in other_tags."""
+ # pylint:disable=protected-access
+ return self.__class__({t for t in self._tags
+ if t not in other_tags._tags})
+
+ def all_also_in(self, other_tags: Self) -> Self:
+ """Return all tags of self _also_ in other_tags"""
+ # pylint:disable=protected-access
+ return self.__class__({t for t in self._tags
+ if t in other_tags._tags})
+
+ def are_all_in(self, other_tags: Self) -> bool:
+ """Return if all tags of self are also in other_tags."""
+ return self.empty or self.all_not_in(other_tags).empty
- def __eq__(self, other: Any) -> bool:
- if not isinstance(other, self.__class__):
- return False
- return self._str == other._str
+ def whitelisted(self, whitelist: Self) -> Self:
+ """Return self filtered by whitelist; if empty, return all."""
+ if whitelist.empty:
+ return self
+ return self.all_also_in(whitelist)
- def __str__(self):
- return self._str
+ @property
+ def empty(self) -> bool:
+ """Return if self empty."""
+ return 0 == len(self._tags)
+
+ @property
+ def as_str_list(self) -> list[str]:
+ """Return self list of strings."""
+ return [str(t) for t in self._tags]
+
+ @property
+ def joined(self) -> str:
+ """Return self as string of comma-separated tags."""
+ return ','.join(self.as_str_list)
+
+ @classmethod
+ def from_joined(cls, joined: str) -> Self:
+ """Build self from string of comma-separated tags."""
+ return cls({t for t in joined.split(',') if t})
@classmethod
- def from_joined(cls, joined: str) -> set[Self]:
- """Build set from comma-delimited units in joined."""
- return {cls(t) for t in joined.split(',') if t}
+ def from_str_list(cls, str_list: list[str]) -> Self:
+ """Build self from list of tag strings."""
+ return cls(set(str_list))
class VideoFile(DbData):
last_update: DatetimeStr
rel_path: Path
digest: Hash
- tags: set[Tag]
+ tags: TagSet
def __init__(self,
digest: Optional[Hash],
self.rel_path = rel_path
self.digest = digest if digest else Hash.from_file(self.full_path)
self.flags = flags
- self.tags = Tag.from_joined(tags_str)
+ self.tags = TagSet.from_joined(tags_str)
self.yt_id = yt_id
if last_update is None:
self._renew_last_update()
else:
self.last_update = last_update
self._hash_on_last_update = hash(self)
+ self.whitelist_tags_display: Optional[TagSet] = None
def __hash__(self) -> int:
return hash(f'{self.digest.b64}|{self.rel_path}|{self.flags}|'
self._renew_last_update()
return super().save(conn)
+ @classmethod
+ def get_one_with_whitelist_tags_display(cls, conn: BaseDbConn, id_: Hash,
+ whitelist_tags_display: TagSet
+ ) -> Self:
+ """Same as .get_one except sets .whitelist_tags_display."""
+ vf = cls.get_one(conn, id_)
+ vf.whitelist_tags_display = whitelist_tags_display
+ return vf
+
@classmethod
def get_by_yt_id(cls, conn: BaseDbConn, yt_id: YoutubeId) -> Self:
"""Return VideoFile of .yt_id."""
def get_filtered(cls,
conn: BaseDbConn,
filter_path: FilterStr,
- needed_tags_dark: set[Tag],
- needed_tags_seen: set[Tag],
- whitelist_tags: set[Tag],
+ needed_tags_dark: TagSet,
+ needed_tags_seen: TagSet,
+ whitelist_tags_filter: TagSet,
+ whitelist_tags_display: TagSet,
show_absent: bool = False
) -> list[Self]:
"""Return cls.get_all matching provided filter criteria."""
- return [f for f in cls.get_all(conn)
- if (show_absent or f.present)
- and str(filter_path).lower() in str(f.rel_path).lower()
- and ([t for t in whitelist_tags if t in f.tags]
- or not [t for t in needed_tags_dark if t not in f.tags])
- and not [t for t in needed_tags_seen if t not in f.tags]]
-
- def unused_tags(self, conn: BaseDbConn) -> set[Tag]:
+ if not needed_tags_seen.all_not_in(whitelist_tags_display).empty:
+ return []
+ files = [f for f in cls.get_all(conn)
+ if (show_absent or f.present)
+ and str(filter_path).lower() in str(f.rel_path).lower()
+ and (needed_tags_dark.are_all_in(f.tags)
+ or not whitelist_tags_filter.all_also_in(f.tags).empty)
+ and needed_tags_seen.whitelisted(whitelist_tags_display
+ ).are_all_in(f.tags)]
+ for f in files:
+ f.whitelist_tags_display = whitelist_tags_display
+ return files
+
+ @property
+ def tags_showable(self) -> TagSet:
+ """Show .tags passing .whitelist_tags_display, if latter set."""
+ if self.whitelist_tags_display is None:
+ raise HandledException(
+ 'canot show display-whitelisted tags on unset whitelist')
+ return self.tags.whitelisted(self.whitelist_tags_display)
+
+ def unused_tags(self, conn: BaseDbConn) -> TagSet:
"""Return tags used among other VideoFiles, not in self."""
- tags: set[Tag] = set()
+ if self.whitelist_tags_display is None:
+ raise HandledException(
+ 'canot show display-whitelisted tags on unset whitelist')
+ tags = TagSet()
for file in self.get_all(conn):
- tags = tags | {t for t in file.tags if t not in self.tags}
+ tags.add(file.tags.all_not_in(self.tags).whitelisted(
+ self.whitelist_tags_display))
return tags
@property
def tags_str(self) -> str:
- """Return self.tags joined by ','."""
- return ','.join([str(s) for s in self.tags])
+ """Return .tags joined by ','."""
+ return self.tags.joined
@property
def full_path(self) -> Path:
_idx: int
def __init__(self,
- whitelist_tags_prefilter: set[Tag],
- needed_tags_prefilter: set[Tag],
+ whitelist_tags_display: TagSet,
+ whitelist_tags_prefilter: TagSet,
+ needed_tags_prefilter: TagSet,
) -> None:
self.last_update = DatetimeStr('')
self._mpv: Optional[MPV] = None
self._monitoring_kill = False
self.filter_path = FilterStr('')
self._whitelist_tags_prefilter = whitelist_tags_prefilter
+ self._whitelist_tags_display = whitelist_tags_display
self._needed_tags_prefilter = needed_tags_prefilter
- self.needed_tags: set[Tag] = set()
+ self.needed_tags = TagSet()
self.load_files_and_start()
def _monitor_kill(self) -> None:
self.filter_path,
self._needed_tags_prefilter,
self.needed_tags,
- self._whitelist_tags_prefilter)}
+ self._whitelist_tags_prefilter,
+ self._whitelist_tags_display)}
self.files = [known_files[p] for p in PATH_DOWNLOADS.iterdir()
if p in known_files
and p.is_file()