QueryText = NewType('QueryText', str)
ProseText = NewType('ProseText', str)
SqlText = NewType('SqlText', str)
+FlagName = NewType('FlagName', str)
+FlagsInt = NewType('FlagsInt', int)
AmountDownloads = NewType('AmountDownloads', int)
PlayerUpdateId = NewType('PlayerUpdateId', str)
DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr]
TemplateContext: TypeAlias = dict[
str, None | bool | PlayerUpdateId | Optional[PathStr] | YoutubeId
- | QueryText | QuotaCost | 'YoutubeVideo' | list['YoutubeVideo']
- | list['YoutubeQuery'] | list[tuple[YoutubeId, PathStr]]
- | list[tuple[PathStr, PathStr]]]
+ | QueryText | QuotaCost | list[FlagName] | 'VideoFile' | 'YoutubeVideo'
+ | list['YoutubeVideo'] | list['YoutubeQuery']
+ | list[tuple[YoutubeId, PathStr]] | list[tuple[PathStr, PathStr]]]
# local data reasonably expected to be in user home directory
PATH_HOME = PathStr(environ.get('HOME', ''))
NAME_TEMPLATE_QUERIES = PathStr('queries.tmpl')
NAME_TEMPLATE_RESULTS = PathStr('results.tmpl')
NAME_TEMPLATE_VIDEOS = PathStr('videos.tmpl')
+NAME_TEMPLATE_VIDEO = PathStr('video.tmpl')
NAME_TEMPLATE_YT_VIDEO = PathStr('yt_video.tmpl')
NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl')
PATH_TEMPLATE_QUERIES = PathStr(path_join(PATH_TEMPLATES,
CREATE TABLE files (
rel_path TEXT PRIMARY KEY,
yt_id TEXT NOT NULL DEFAULT "",
+ flags INTEGER NOT NULL DEFAULT 0;
FOREIGN KEY (yt_id) REFERENCES yt_videos(id)
);
'''
+VIDEO_FLAGS: dict[FlagName, FlagsInt] = {
+ FlagName('foo'): FlagsInt(1 << 0),
+ FlagName('bar'): FlagsInt(1 << 1),
+ FlagName('baz'): FlagsInt(1 << 2)
+}
+
class NotFoundException(BaseException):
"""Call on DB fetches finding less than expected."""
class VideoFile(DbData):
"""Collects data about downloaded files."""
_table_name = 'files'
- _cols = ('rel_path', 'yt_id')
+ _cols = ('rel_path', 'yt_id', 'flags')
- def __init__(self, rel_path: PathStr, yt_id: YoutubeId) -> None:
+ def __init__(self, rel_path: PathStr, yt_id: YoutubeId, flags=FlagsInt(0)
+ ) -> None:
self.rel_path = rel_path
self.yt_id = yt_id
+ self.flags = flags
self.missing = False
+ @classmethod
+ def get_by_yt_id(cls, conn: DatabaseConnection, yt_id: YoutubeId) -> Self:
+ """Return VideoFile of .yt_id."""
+ sql = SqlText(f'SELECT * FROM {cls._table_name} WHERE yt_id = ?')
+ row = conn.exec(sql, (yt_id,)).fetchone()
+ if not row:
+ raise NotFoundException
+ return cls._from_table_row(row)
+
+ def flag_set(self, flag_name: FlagName) -> bool:
+ """Return if flag of flag_name is set in self.flags."""
+ return self.flags & VIDEO_FLAGS[flag_name]
+
class QuotaLog(DbData):
"""Collects API access quota costs."""
postvars = parse_qs(self.rfile.read(body_length).decode())
if 'playlist' == page_name:
self._post_player_command(list(postvars.keys())[0])
+ elif 'video' == page_name:
+ self._post_video_flag(YoutubeId(toks_url[2]),
+ [FlagName(k) for k in postvars])
elif 'queries' == page_name:
self._post_query(QueryText(postvars['query'][0]))
- def _post_player_command(self, commands: str) -> None:
- if 'pause' in commands:
+ def _post_player_command(self, command: str) -> None:
+ if 'pause' == command:
self.server.player.toggle_pause()
- elif 'prev' in commands:
+ elif 'prev' == command:
self.server.player.prev()
- elif 'next' in commands:
+ elif 'next' == command:
self.server.player.next()
- elif 'stop' in commands:
+ elif 'stop' == command:
self.server.player.toggle_run()
- elif 'reload' in commands:
+ elif 'reload' == command:
self.server.player.reload()
sleep(0.5) # avoid redir happening before current_file update
self._send_http(headers=[('Location', '/')], code=302)
+ def _post_video_flag(self,
+ yt_id: YoutubeId,
+ flag_names: list[FlagName]
+ ) -> None:
+ conn = DatabaseConnection()
+ file = VideoFile.get_by_yt_id(conn, yt_id)
+ file.flags = 0
+ for flag_name in flag_names:
+ file.flags |= VIDEO_FLAGS[flag_name]
+ file.save(conn)
+ conn.commit_close()
+ self._send_http(headers=[('Location', f'/video/{yt_id}')], code=302)
+
def _post_query(self, query_txt: QueryText) -> None:
conn = DatabaseConnection()
self._send_or_download_video(YoutubeId(toks_url[2]))
elif 'videos' == page_name:
self._send_videos_index()
+ elif 'video' == page_name:
+ self._send_video_data(YoutubeId(toks_url[2]))
elif 'yt_video' == page_name:
self._send_yt_video_data(YoutubeId(toks_url[2]))
elif 'missing.json' == page_name:
'youtube_prefix': YOUTUBE_URL_PREFIX,
'queries': linked_queries})
+ def _send_video_data(self, yt_id: YoutubeId) -> None:
+ conn = DatabaseConnection()
+ file = VideoFile.get_by_yt_id(conn, yt_id)
+ conn.commit_close()
+ self._send_rendered_template(
+ NAME_TEMPLATE_VIDEO,
+ {'file': file, 'flag_names': list(VIDEO_FLAGS)})
+
def _send_videos_index(self) -> None:
videos = [(id_, PathStr(basename(path)))
for id_, path in self.server.downloads.ids_to_paths.items()]