From 291368eb272db20d35d4474e54d39d8022db4079 Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Sun, 15 Dec 2024 03:37:53 +0100 Subject: [PATCH] Re-organize tags code. --- src/templates/file_data.tmpl | 2 +- src/templates/files.tmpl | 2 +- src/ytplom/http.py | 47 +++++++---- src/ytplom/misc.py | 153 ++++++++++++++++++++++++++--------- 4 files changed, 146 insertions(+), 58 deletions(-) diff --git a/src/templates/file_data.tmpl b/src/templates/file_data.tmpl index 82dc50d..37c1225 100644 --- a/src/templates/file_data.tmpl +++ b/src/templates/file_data.tmpl @@ -34,7 +34,7 @@ td.tag_checkboxes { width: 1em; } tags -{% for tag in file.tags %} +{% for tag in self.tags_showable %} diff --git a/src/templates/files.tmpl b/src/templates/files.tmpl index dca7479..e447c2a 100644 --- a/src/templates/files.tmpl +++ b/src/templates/files.tmpl @@ -17,7 +17,7 @@ show absent: - + {% endfor %} diff --git a/src/ytplom/http.py b/src/ytplom/http.py index cc7a908..de89502 100644 --- a/src/ytplom/http.py +++ b/src/ytplom/http.py @@ -11,7 +11,7 @@ from jinja2 import ( # type: ignore Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader) from ytplom.db import Hash, DbConn from ytplom.misc import ( - FilterStr, FlagName, QueryId, QueryText, Tag, YoutubeId, + FilterStr, FlagName, QueryId, QueryText, TagSet, YoutubeId, FILE_FLAGS, PATH_THUMBNAILS, YOUTUBE_URL_PREFIX, ensure_expected_dirs, Config, DownloadsManager, Player, QuotaLog, VideoFile, YoutubeQuery, @@ -94,8 +94,9 @@ class Server(ThreadingHTTPServer): *args, **kwargs) self.config = config self.jinja = JinjaEnv(loader=JinjaFSLoader(_PATH_TEMPLATES)) - self.player = Player(config.needed_tags_prefilter, - config.whitelist_tags_prefilter) + self.player = Player(config.whitelist_tags_display, + config.whitelist_tags_prefilter, + config.needed_tags_prefilter) self.downloads = DownloadsManager() self.downloads.clean_unfinished() self.downloads.start_thread() @@ -161,7 +162,7 @@ class _TaskHandler(BaseHTTPRequestHandler): self.server.player.filter_path = FilterStr( postvars.first_for('filter_path')) if 'needed_tags' in postvars.as_dict: - self.server.player.needed_tags = Tag.from_joined( + self.server.player.needed_tags = TagSet.from_joined( postvars.first_for('needed_tags')) self._send_http('OK', code=200) @@ -174,14 +175,15 @@ class _TaskHandler(BaseHTTPRequestHandler): self._redirect(Path(postvars.first_for('redir_target'))) def _receive_file_data(self, digest: Hash, postvars: _ReqMap) -> None: - if not self.server.config.allow_file_edit: - self._send_http('no way', code=403) - return + if ((not self.server.config.allow_file_edit) + or self.server.config.whitelist_tags_display): # also here, … + self._send_http('no way', code=403) # … cuz input form under … + return # … this display filter might have suppressed set tags with DbConn() as conn: file = VideoFile.get_one(conn, digest) file.set_flags({FILE_FLAGS[FlagName(name)] for name in postvars.all_for('flags')}) - file.tags = {Tag(t) for t in postvars.all_for('tags')} + file.tags = TagSet.from_str_list(postvars.all_for('tags')) file.save(conn) conn.commit() file.ensure_absence_if_deleted() @@ -312,7 +314,8 @@ class _TaskHandler(BaseHTTPRequestHandler): def _send_file_data(self, digest: Hash) -> None: with DbConn() as conn: - file = VideoFile.get_one(conn, digest) + file = VideoFile.get_one_with_whitelist_tags_display( + conn, digest, self.server.config.whitelist_tags_display) unused_tags = file.unused_tags(conn) self._send_rendered_template( _NAME_TEMPLATE_FILE_DATA, @@ -330,10 +333,14 @@ class _TaskHandler(BaseHTTPRequestHandler): conn, filter_path, self.server.config.needed_tags_prefilter, - Tag.from_joined(needed_tags_str), + TagSet.from_joined(needed_tags_str), self.server.config.whitelist_tags_prefilter, + self.server.config.whitelist_tags_display, show_absent) files.sort(key=lambda t: t.rel_path) + for f in files: + f.whitelist_tags_display =\ + self.server.config.whitelist_tags_display self._send_rendered_template(_NAME_TEMPLATE_FILES, {'files': files, 'selected': 'files', @@ -355,8 +362,7 @@ class _TaskHandler(BaseHTTPRequestHandler): _NAME_TEMPLATE_PLAYLIST, {'selected': 'playlist', 'filter_path': self.server.player.filter_path, - 'needed_tags': ','.join([ - str(t) for t in self.server.player.needed_tags])}) + 'needed_tags': self.server.player.needed_tags.joined}) def _send_events(self, params: _ReqMap) -> None: self._send_http(headers=[(_HEADER_CONTENT_TYPE, 'text/event-stream'), @@ -370,17 +376,24 @@ class _TaskHandler(BaseHTTPRequestHandler): elif ((not playing) or (playing.digest != self.server.player.current_digest)): with DbConn() as conn: - playing = VideoFile.get_one( - conn, self.server.player.current_digest) + playing = VideoFile.get_one_with_whitelist_tags_display( + conn, + self.server.player.current_digest, + self.server.config.whitelist_tags_display) if last_sent < self.server.player.last_update: last_sent = self.server.player.last_update + title, tags, digest = '', '', '' + if playing: + tags = playing.tags_showable.joined + title = str(playing.rel_path) + digest = playing.digest.b64 data = { 'last_update': self.server.player.last_update, 'running': self.server.player.is_running, 'paused': self.server.player.is_paused, - 'title_digest': playing.digest.b64 if playing else '', - 'title_tags': playing.tags_str if playing else '', - 'title': str(playing.rel_path) if playing else 'none'} + 'title_digest': digest, + 'title_tags': tags, + 'title': title} if 'playlist' in params.as_dict: data['playlist_files'] = [ {'rel_path': str(f.rel_path), 'digest': f.digest.b64} diff --git a/src/ytplom/misc.py b/src/ytplom/misc.py index 86a1c1f..ea65b5e 100644 --- a/src/ytplom/misc.py +++ b/src/ytplom/misc.py @@ -1,7 +1,7 @@ """Main ytplom lib.""" # included libs -from typing import Any, NewType, Optional, Self +from typing import Generator, NewType, Optional, Self from os import chdir, environ from random import shuffle from time import sleep @@ -31,6 +31,7 @@ DEFAULTS = { 'queries_cutoff': '', 'whitelist_tags_prefilter_str': '', 'needed_tags_prefilter_str': '', + 'whitelist_tags_display_str': '', 'allow_file_edit': True } @@ -95,6 +96,7 @@ class Config: needed_tags_prefilter_str: str whitelist_tags_prefilter_str: str allow_file_edit: bool + whitelist_tags_display_str: str def __init__(self): def set_attrs_from_dict(d): @@ -108,10 +110,12 @@ class Config: set_attrs_from_dict({k[len(ENVIRON_PREFIX):].lower(): v for k, v in environ.items() if k.isupper() and k.startswith(ENVIRON_PREFIX)}) - self.needed_tags_prefilter = Tag.from_joined( + self.needed_tags_prefilter = TagSet.from_joined( self.needed_tags_prefilter_str) - self.whitelist_tags_prefilter = Tag.from_joined( + self.whitelist_tags_prefilter = TagSet.from_joined( self.whitelist_tags_prefilter_str) + self.whitelist_tags_display = TagSet.from_joined( + self.whitelist_tags_display_str) class YoutubeQuery(DbData): @@ -254,27 +258,66 @@ class YoutubeVideo(DbData): (query_id, self.id_)) -class Tag: - """Represents individual VideoFile.tags.""" +class TagSet: + """Collection of tags as used in VideoFile.tags.""" - def __init__(self, tag_str: str) -> None: - self._str = tag_str + def __init__(self, tags: Optional[set[str]] = None) -> None: + self._tags = tags if tags else set() - def __hash__(self) -> int: - return hash(self._str) + def __iter__(self) -> Generator: + yield from self._tags + + def add(self, other_tags: Self) -> None: + """To self, add tags of other_tags.""" + # pylint:disable=protected-access + self._tags = self._tags | other_tags._tags + + def all_not_in(self, other_tags: Self) -> Self: + """Return all tags of self _not_ in other_tags.""" + # pylint:disable=protected-access + return self.__class__({t for t in self._tags + if t not in other_tags._tags}) + + def all_also_in(self, other_tags: Self) -> Self: + """Return all tags of self _also_ in other_tags""" + # pylint:disable=protected-access + return self.__class__({t for t in self._tags + if t in other_tags._tags}) + + def are_all_in(self, other_tags: Self) -> bool: + """Return if all tags of self are also in other_tags.""" + return self.empty or self.all_not_in(other_tags).empty - def __eq__(self, other: Any) -> bool: - if not isinstance(other, self.__class__): - return False - return self._str == other._str + def whitelisted(self, whitelist: Self) -> Self: + """Return self filtered by whitelist; if empty, return all.""" + if whitelist.empty: + return self + return self.all_also_in(whitelist) - def __str__(self): - return self._str + @property + def empty(self) -> bool: + """Return if self empty.""" + return 0 == len(self._tags) + + @property + def as_str_list(self) -> list[str]: + """Return self list of strings.""" + return [str(t) for t in self._tags] + + @property + def joined(self) -> str: + """Return self as string of comma-separated tags.""" + return ','.join(self.as_str_list) + + @classmethod + def from_joined(cls, joined: str) -> Self: + """Build self from string of comma-separated tags.""" + return cls({t for t in joined.split(',') if t}) @classmethod - def from_joined(cls, joined: str) -> set[Self]: - """Build set from comma-delimited units in joined.""" - return {cls(t) for t in joined.split(',') if t} + def from_str_list(cls, str_list: list[str]) -> Self: + """Build self from list of tag strings.""" + return cls(set(str_list)) class VideoFile(DbData): @@ -285,7 +328,7 @@ class VideoFile(DbData): last_update: DatetimeStr rel_path: Path digest: Hash - tags: set[Tag] + tags: TagSet def __init__(self, digest: Optional[Hash], @@ -298,13 +341,14 @@ class VideoFile(DbData): self.rel_path = rel_path self.digest = digest if digest else Hash.from_file(self.full_path) self.flags = flags - self.tags = Tag.from_joined(tags_str) + self.tags = TagSet.from_joined(tags_str) self.yt_id = yt_id if last_update is None: self._renew_last_update() else: self.last_update = last_update self._hash_on_last_update = hash(self) + self.whitelist_tags_display: Optional[TagSet] = None def __hash__(self) -> int: return hash(f'{self.digest.b64}|{self.rel_path}|{self.flags}|' @@ -319,6 +363,15 @@ class VideoFile(DbData): self._renew_last_update() return super().save(conn) + @classmethod + def get_one_with_whitelist_tags_display(cls, conn: BaseDbConn, id_: Hash, + whitelist_tags_display: TagSet + ) -> Self: + """Same as .get_one except sets .whitelist_tags_display.""" + vf = cls.get_one(conn, id_) + vf.whitelist_tags_display = whitelist_tags_display + return vf + @classmethod def get_by_yt_id(cls, conn: BaseDbConn, yt_id: YoutubeId) -> Self: """Return VideoFile of .yt_id.""" @@ -332,30 +385,49 @@ class VideoFile(DbData): def get_filtered(cls, conn: BaseDbConn, filter_path: FilterStr, - needed_tags_dark: set[Tag], - needed_tags_seen: set[Tag], - whitelist_tags: set[Tag], + needed_tags_dark: TagSet, + needed_tags_seen: TagSet, + whitelist_tags_filter: TagSet, + whitelist_tags_display: TagSet, show_absent: bool = False ) -> list[Self]: """Return cls.get_all matching provided filter criteria.""" - return [f for f in cls.get_all(conn) - if (show_absent or f.present) - and str(filter_path).lower() in str(f.rel_path).lower() - and ([t for t in whitelist_tags if t in f.tags] - or not [t for t in needed_tags_dark if t not in f.tags]) - and not [t for t in needed_tags_seen if t not in f.tags]] - - def unused_tags(self, conn: BaseDbConn) -> set[Tag]: + if not needed_tags_seen.all_not_in(whitelist_tags_display).empty: + return [] + files = [f for f in cls.get_all(conn) + if (show_absent or f.present) + and str(filter_path).lower() in str(f.rel_path).lower() + and (needed_tags_dark.are_all_in(f.tags) + or not whitelist_tags_filter.all_also_in(f.tags).empty) + and needed_tags_seen.whitelisted(whitelist_tags_display + ).are_all_in(f.tags)] + for f in files: + f.whitelist_tags_display = whitelist_tags_display + return files + + @property + def tags_showable(self) -> TagSet: + """Show .tags passing .whitelist_tags_display, if latter set.""" + if self.whitelist_tags_display is None: + raise HandledException( + 'canot show display-whitelisted tags on unset whitelist') + return self.tags.whitelisted(self.whitelist_tags_display) + + def unused_tags(self, conn: BaseDbConn) -> TagSet: """Return tags used among other VideoFiles, not in self.""" - tags: set[Tag] = set() + if self.whitelist_tags_display is None: + raise HandledException( + 'canot show display-whitelisted tags on unset whitelist') + tags = TagSet() for file in self.get_all(conn): - tags = tags | {t for t in file.tags if t not in self.tags} + tags.add(file.tags.all_not_in(self.tags).whitelisted( + self.whitelist_tags_display)) return tags @property def tags_str(self) -> str: - """Return self.tags joined by ','.""" - return ','.join([str(s) for s in self.tags]) + """Return .tags joined by ','.""" + return self.tags.joined @property def full_path(self) -> Path: @@ -439,8 +511,9 @@ class Player: _idx: int def __init__(self, - whitelist_tags_prefilter: set[Tag], - needed_tags_prefilter: set[Tag], + whitelist_tags_display: TagSet, + whitelist_tags_prefilter: TagSet, + needed_tags_prefilter: TagSet, ) -> None: self.last_update = DatetimeStr('') self._mpv: Optional[MPV] = None @@ -448,8 +521,9 @@ class Player: self._monitoring_kill = False self.filter_path = FilterStr('') self._whitelist_tags_prefilter = whitelist_tags_prefilter + self._whitelist_tags_display = whitelist_tags_display self._needed_tags_prefilter = needed_tags_prefilter - self.needed_tags: set[Tag] = set() + self.needed_tags = TagSet() self.load_files_and_start() def _monitor_kill(self) -> None: @@ -482,7 +556,8 @@ class Player: self.filter_path, self._needed_tags_prefilter, self.needed_tags, - self._whitelist_tags_prefilter)} + self._whitelist_tags_prefilter, + self._whitelist_tags_display)} self.files = [known_files[p] for p in PATH_DOWNLOADS.iterdir() if p in known_files and p.is_file() -- 2.30.2
{{tag}} {{ file.size | round(1) }} {% for tag in file.tags %}{{tag}} {%endfor %}{{file.tags_showable.joined}} {{file.rel_path}}