home · contact · privacy
Re-organize tags code.
authorChristian Heller <c.heller@plomlompom.de>
Sun, 15 Dec 2024 02:37:53 +0000 (03:37 +0100)
committerChristian Heller <c.heller@plomlompom.de>
Sun, 15 Dec 2024 02:37:53 +0000 (03:37 +0100)
src/templates/file_data.tmpl
src/templates/files.tmpl
src/ytplom/http.py
src/ytplom/misc.py

index 82dc50d1d925f1afca710a3c500f5a85c0b9653e..37c12259f13cb43d257f3f1c9b847ac57b8dbdc3 100644 (file)
@@ -34,7 +34,7 @@ td.tag_checkboxes { width: 1em; }
 <th>tags</th>
 <td>
 <table>
-{% for tag in file.tags %}
+{% for tag in self.tags_showable %}
 <tr>
 <td class="tag_checkboxes"><input type="checkbox" name="tags" value="{{tag}}" checked{% if not allow_edit %} disabled{% endif %}/></td>
 <td>{{tag}}</td>
index dca7479b7b25b734b794bdca36aea9c92f9bc3a3..e447c2aef021f22b3abdc04df3d1ff302adf9637 100644 (file)
@@ -17,7 +17,7 @@ show absent: <input type="checkbox" name="show_absent" {% if show_absent %}check
 <tr>
 <td>{{ file.size | round(1) }}</td>
 <td><input type="submit" name="play_{{file.digest.b64}}" value="play" {% if not file.present %}disabled {% endif %}/></td>
-<td>{% for tag in file.tags %}{{tag}} {%endfor %}</td>
+<td>{{file.tags_showable.joined}}</td>
 <td><a href="/{{page_names.file}}/{{file.digest.b64}}">{{file.rel_path}}</a></td>
 </tr>
 {% endfor %}
index cc7a90887fb821d2ae53866d3c0acdb7d6b9c373..de8950209bf040fddba1fec5fc98988d0e74d3ac 100644 (file)
@@ -11,7 +11,7 @@ from jinja2 import (  # type: ignore
         Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader)
 from ytplom.db import Hash, DbConn
 from ytplom.misc import (
-        FilterStr, FlagName, QueryId, QueryText, Tag, YoutubeId,
+        FilterStr, FlagName, QueryId, QueryText, TagSet, YoutubeId,
         FILE_FLAGS, PATH_THUMBNAILS, YOUTUBE_URL_PREFIX,
         ensure_expected_dirs,
         Config, DownloadsManager, Player, QuotaLog, VideoFile, YoutubeQuery,
@@ -94,8 +94,9 @@ class Server(ThreadingHTTPServer):
                          *args, **kwargs)
         self.config = config
         self.jinja = JinjaEnv(loader=JinjaFSLoader(_PATH_TEMPLATES))
-        self.player = Player(config.needed_tags_prefilter,
-                             config.whitelist_tags_prefilter)
+        self.player = Player(config.whitelist_tags_display,
+                             config.whitelist_tags_prefilter,
+                             config.needed_tags_prefilter)
         self.downloads = DownloadsManager()
         self.downloads.clean_unfinished()
         self.downloads.start_thread()
@@ -161,7 +162,7 @@ class _TaskHandler(BaseHTTPRequestHandler):
             self.server.player.filter_path = FilterStr(
                     postvars.first_for('filter_path'))
         if 'needed_tags' in postvars.as_dict:
-            self.server.player.needed_tags = Tag.from_joined(
+            self.server.player.needed_tags = TagSet.from_joined(
                     postvars.first_for('needed_tags'))
         self._send_http('OK', code=200)
 
@@ -174,14 +175,15 @@ class _TaskHandler(BaseHTTPRequestHandler):
         self._redirect(Path(postvars.first_for('redir_target')))
 
     def _receive_file_data(self, digest: Hash, postvars: _ReqMap) -> None:
-        if not self.server.config.allow_file_edit:
-            self._send_http('no way', code=403)
-            return
+        if ((not self.server.config.allow_file_edit)
+                or self.server.config.whitelist_tags_display):  # also here, …
+            self._send_http('no way', code=403)  # … cuz input form under …
+            return  # … this display filter might have suppressed set tags
         with DbConn() as conn:
             file = VideoFile.get_one(conn, digest)
             file.set_flags({FILE_FLAGS[FlagName(name)]
                             for name in postvars.all_for('flags')})
-            file.tags = {Tag(t) for t in postvars.all_for('tags')}
+            file.tags = TagSet.from_str_list(postvars.all_for('tags'))
             file.save(conn)
             conn.commit()
         file.ensure_absence_if_deleted()
@@ -312,7 +314,8 @@ class _TaskHandler(BaseHTTPRequestHandler):
 
     def _send_file_data(self, digest: Hash) -> None:
         with DbConn() as conn:
-            file = VideoFile.get_one(conn, digest)
+            file = VideoFile.get_one_with_whitelist_tags_display(
+                    conn, digest, self.server.config.whitelist_tags_display)
             unused_tags = file.unused_tags(conn)
         self._send_rendered_template(
                 _NAME_TEMPLATE_FILE_DATA,
@@ -330,10 +333,14 @@ class _TaskHandler(BaseHTTPRequestHandler):
                     conn,
                     filter_path,
                     self.server.config.needed_tags_prefilter,
-                    Tag.from_joined(needed_tags_str),
+                    TagSet.from_joined(needed_tags_str),
                     self.server.config.whitelist_tags_prefilter,
+                    self.server.config.whitelist_tags_display,
                     show_absent)
         files.sort(key=lambda t: t.rel_path)
+        for f in files:
+            f.whitelist_tags_display =\
+                    self.server.config.whitelist_tags_display
         self._send_rendered_template(_NAME_TEMPLATE_FILES,
                                      {'files': files,
                                       'selected': 'files',
@@ -355,8 +362,7 @@ class _TaskHandler(BaseHTTPRequestHandler):
                 _NAME_TEMPLATE_PLAYLIST,
                 {'selected': 'playlist',
                  'filter_path': self.server.player.filter_path,
-                 'needed_tags': ','.join([
-                     str(t) for t in self.server.player.needed_tags])})
+                 'needed_tags': self.server.player.needed_tags.joined})
 
     def _send_events(self, params: _ReqMap) -> None:
         self._send_http(headers=[(_HEADER_CONTENT_TYPE, 'text/event-stream'),
@@ -370,17 +376,24 @@ class _TaskHandler(BaseHTTPRequestHandler):
             elif ((not playing)
                   or (playing.digest != self.server.player.current_digest)):
                 with DbConn() as conn:
-                    playing = VideoFile.get_one(
-                            conn, self.server.player.current_digest)
+                    playing = VideoFile.get_one_with_whitelist_tags_display(
+                            conn,
+                            self.server.player.current_digest,
+                            self.server.config.whitelist_tags_display)
             if last_sent < self.server.player.last_update:
                 last_sent = self.server.player.last_update
+                title, tags, digest = '', '', ''
+                if playing:
+                    tags = playing.tags_showable.joined
+                    title = str(playing.rel_path)
+                    digest = playing.digest.b64
                 data = {
                     'last_update': self.server.player.last_update,
                     'running': self.server.player.is_running,
                     'paused': self.server.player.is_paused,
-                    'title_digest': playing.digest.b64 if playing else '',
-                    'title_tags': playing.tags_str if playing else '',
-                    'title': str(playing.rel_path) if playing else 'none'}
+                    'title_digest': digest,
+                    'title_tags': tags,
+                    'title': title}
                 if 'playlist' in params.as_dict:
                     data['playlist_files'] = [
                         {'rel_path': str(f.rel_path), 'digest': f.digest.b64}
index 86a1c1fae9ce04a6bf1d866e517a107692416e61..ea65b5e0bca65b0c34a9073903d424bb84345d95 100644 (file)
@@ -1,7 +1,7 @@
 """Main ytplom lib."""
 
 # included libs
-from typing import Any, NewType, Optional, Self
+from typing import Generator, NewType, Optional, Self
 from os import chdir, environ
 from random import shuffle
 from time import sleep
@@ -31,6 +31,7 @@ DEFAULTS = {
     'queries_cutoff': '',
     'whitelist_tags_prefilter_str': '',
     'needed_tags_prefilter_str': '',
+    'whitelist_tags_display_str': '',
     'allow_file_edit': True
 }
 
@@ -95,6 +96,7 @@ class Config:
     needed_tags_prefilter_str: str
     whitelist_tags_prefilter_str: str
     allow_file_edit: bool
+    whitelist_tags_display_str: str
 
     def __init__(self):
         def set_attrs_from_dict(d):
@@ -108,10 +110,12 @@ class Config:
         set_attrs_from_dict({k[len(ENVIRON_PREFIX):].lower(): v
                              for k, v in environ.items()
                              if k.isupper() and k.startswith(ENVIRON_PREFIX)})
-        self.needed_tags_prefilter = Tag.from_joined(
+        self.needed_tags_prefilter = TagSet.from_joined(
                 self.needed_tags_prefilter_str)
-        self.whitelist_tags_prefilter = Tag.from_joined(
+        self.whitelist_tags_prefilter = TagSet.from_joined(
                 self.whitelist_tags_prefilter_str)
+        self.whitelist_tags_display = TagSet.from_joined(
+                self.whitelist_tags_display_str)
 
 
 class YoutubeQuery(DbData):
@@ -254,27 +258,66 @@ class YoutubeVideo(DbData):
                             (query_id, self.id_))
 
 
-class Tag:
-    """Represents individual VideoFile.tags."""
+class TagSet:
+    """Collection of tags as used in VideoFile.tags."""
 
-    def __init__(self, tag_str: str) -> None:
-        self._str = tag_str
+    def __init__(self, tags: Optional[set[str]] = None) -> None:
+        self._tags = tags if tags else set()
 
-    def __hash__(self) -> int:
-        return hash(self._str)
+    def __iter__(self) -> Generator:
+        yield from self._tags
+
+    def add(self, other_tags: Self) -> None:
+        """To self, add tags of other_tags."""
+        # pylint:disable=protected-access
+        self._tags = self._tags | other_tags._tags
+
+    def all_not_in(self, other_tags: Self) -> Self:
+        """Return all tags of self _not_ in other_tags."""
+        # pylint:disable=protected-access
+        return self.__class__({t for t in self._tags
+                               if t not in other_tags._tags})
+
+    def all_also_in(self, other_tags: Self) -> Self:
+        """Return all tags of self _also_ in other_tags"""
+        # pylint:disable=protected-access
+        return self.__class__({t for t in self._tags
+                               if t in other_tags._tags})
+
+    def are_all_in(self, other_tags: Self) -> bool:
+        """Return if all tags of self are also in other_tags."""
+        return self.empty or self.all_not_in(other_tags).empty
 
-    def __eq__(self, other: Any) -> bool:
-        if not isinstance(other, self.__class__):
-            return False
-        return self._str == other._str
+    def whitelisted(self, whitelist: Self) -> Self:
+        """Return self filtered by whitelist; if empty, return all."""
+        if whitelist.empty:
+            return self
+        return self.all_also_in(whitelist)
 
-    def __str__(self):
-        return self._str
+    @property
+    def empty(self) -> bool:
+        """Return if self empty."""
+        return 0 == len(self._tags)
+
+    @property
+    def as_str_list(self) -> list[str]:
+        """Return self list of strings."""
+        return [str(t) for t in self._tags]
+
+    @property
+    def joined(self) -> str:
+        """Return self as string of comma-separated tags."""
+        return ','.join(self.as_str_list)
+
+    @classmethod
+    def from_joined(cls, joined: str) -> Self:
+        """Build self from string of comma-separated tags."""
+        return cls({t for t in joined.split(',') if t})
 
     @classmethod
-    def from_joined(cls, joined: str) -> set[Self]:
-        """Build set from comma-delimited units in joined."""
-        return {cls(t) for t in joined.split(',') if t}
+    def from_str_list(cls, str_list: list[str]) -> Self:
+        """Build self from list of tag strings."""
+        return cls(set(str_list))
 
 
 class VideoFile(DbData):
@@ -285,7 +328,7 @@ class VideoFile(DbData):
     last_update: DatetimeStr
     rel_path: Path
     digest: Hash
-    tags: set[Tag]
+    tags: TagSet
 
     def __init__(self,
                  digest: Optional[Hash],
@@ -298,13 +341,14 @@ class VideoFile(DbData):
         self.rel_path = rel_path
         self.digest = digest if digest else Hash.from_file(self.full_path)
         self.flags = flags
-        self.tags = Tag.from_joined(tags_str)
+        self.tags = TagSet.from_joined(tags_str)
         self.yt_id = yt_id
         if last_update is None:
             self._renew_last_update()
         else:
             self.last_update = last_update
         self._hash_on_last_update = hash(self)
+        self.whitelist_tags_display: Optional[TagSet] = None
 
     def __hash__(self) -> int:
         return hash(f'{self.digest.b64}|{self.rel_path}|{self.flags}|'
@@ -319,6 +363,15 @@ class VideoFile(DbData):
             self._renew_last_update()
         return super().save(conn)
 
+    @classmethod
+    def get_one_with_whitelist_tags_display(cls, conn: BaseDbConn, id_: Hash,
+                                            whitelist_tags_display: TagSet
+                                            ) -> Self:
+        """Same as .get_one except sets .whitelist_tags_display."""
+        vf = cls.get_one(conn, id_)
+        vf.whitelist_tags_display = whitelist_tags_display
+        return vf
+
     @classmethod
     def get_by_yt_id(cls, conn: BaseDbConn, yt_id: YoutubeId) -> Self:
         """Return VideoFile of .yt_id."""
@@ -332,30 +385,49 @@ class VideoFile(DbData):
     def get_filtered(cls,
                      conn: BaseDbConn,
                      filter_path: FilterStr,
-                     needed_tags_dark: set[Tag],
-                     needed_tags_seen: set[Tag],
-                     whitelist_tags: set[Tag],
+                     needed_tags_dark: TagSet,
+                     needed_tags_seen: TagSet,
+                     whitelist_tags_filter: TagSet,
+                     whitelist_tags_display: TagSet,
                      show_absent: bool = False
                      ) -> list[Self]:
         """Return cls.get_all matching provided filter criteria."""
-        return [f for f in cls.get_all(conn)
-                if (show_absent or f.present)
-                and str(filter_path).lower() in str(f.rel_path).lower()
-                and ([t for t in whitelist_tags if t in f.tags]
-                     or not [t for t in needed_tags_dark if t not in f.tags])
-                and not [t for t in needed_tags_seen if t not in f.tags]]
-
-    def unused_tags(self, conn: BaseDbConn) -> set[Tag]:
+        if not needed_tags_seen.all_not_in(whitelist_tags_display).empty:
+            return []
+        files = [f for f in cls.get_all(conn)
+                 if (show_absent or f.present)
+                 and str(filter_path).lower() in str(f.rel_path).lower()
+                 and (needed_tags_dark.are_all_in(f.tags)
+                      or not whitelist_tags_filter.all_also_in(f.tags).empty)
+                 and needed_tags_seen.whitelisted(whitelist_tags_display
+                                                  ).are_all_in(f.tags)]
+        for f in files:
+            f.whitelist_tags_display = whitelist_tags_display
+        return files
+
+    @property
+    def tags_showable(self) -> TagSet:
+        """Show .tags passing .whitelist_tags_display, if latter set."""
+        if self.whitelist_tags_display is None:
+            raise HandledException(
+                    'canot show display-whitelisted tags on unset whitelist')
+        return self.tags.whitelisted(self.whitelist_tags_display)
+
+    def unused_tags(self, conn: BaseDbConn) -> TagSet:
         """Return tags used among other VideoFiles, not in self."""
-        tags: set[Tag] = set()
+        if self.whitelist_tags_display is None:
+            raise HandledException(
+                    'canot show display-whitelisted tags on unset whitelist')
+        tags = TagSet()
         for file in self.get_all(conn):
-            tags = tags | {t for t in file.tags if t not in self.tags}
+            tags.add(file.tags.all_not_in(self.tags).whitelisted(
+                self.whitelist_tags_display))
         return tags
 
     @property
     def tags_str(self) -> str:
-        """Return self.tags joined by ','."""
-        return ','.join([str(s) for s in self.tags])
+        """Return .tags joined by ','."""
+        return self.tags.joined
 
     @property
     def full_path(self) -> Path:
@@ -439,8 +511,9 @@ class Player:
     _idx: int
 
     def __init__(self,
-                 whitelist_tags_prefilter: set[Tag],
-                 needed_tags_prefilter: set[Tag],
+                 whitelist_tags_display: TagSet,
+                 whitelist_tags_prefilter: TagSet,
+                 needed_tags_prefilter: TagSet,
                  ) -> None:
         self.last_update = DatetimeStr('')
         self._mpv: Optional[MPV] = None
@@ -448,8 +521,9 @@ class Player:
         self._monitoring_kill = False
         self.filter_path = FilterStr('')
         self._whitelist_tags_prefilter = whitelist_tags_prefilter
+        self._whitelist_tags_display = whitelist_tags_display
         self._needed_tags_prefilter = needed_tags_prefilter
-        self.needed_tags: set[Tag] = set()
+        self.needed_tags = TagSet()
         self.load_files_and_start()
 
     def _monitor_kill(self) -> None:
@@ -482,7 +556,8 @@ class Player:
                         self.filter_path,
                         self._needed_tags_prefilter,
                         self.needed_tags,
-                        self._whitelist_tags_prefilter)}
+                        self._whitelist_tags_prefilter,
+                        self._whitelist_tags_display)}
         self.files = [known_files[p] for p in PATH_DOWNLOADS.iterdir()
                       if p in known_files
                       and p.is_file()