from typing import TypeAlias, Optional, NewType, Callable, Self, Any
from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove
from os.path import (dirname, isdir, isfile, exists as path_exists,
- join as path_join, splitext, basename)
+ join as path_join, normpath, splitext, basename)
from base64 import urlsafe_b64encode, urlsafe_b64decode
from random import shuffle
from time import time, sleep
B64Str = NewType('B64Str', str)
PageNames: TypeAlias = dict[str, PathStr]
DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr]
+PlaylistTuples = list[tuple[Optional['VideoFile'], Optional['VideoFile']]]
TemplateContext: TypeAlias = dict[
str, None | bool | PlayerUpdateId | Optional[PathStr] | YoutubeId
| QueryText | QuotaCost | list[FlagName] | 'VideoFile' | 'YoutubeVideo'
| PageNames | list['YoutubeVideo'] | list['YoutubeQuery']
- | list[tuple[B64Str, PathStr]] | list[tuple[PathStr, PathStr]]]
+ | list[tuple[B64Str, PathStr]] | PlaylistTuples]
# major expected directories
PATH_HOME = PathStr(environ.get('HOME', ''))
@property
def full_path(self) -> PathStr:
"""Return self.rel_path suffixed under PATH_DOWNLOADS."""
- return PathStr(path_join(PATH_DOWNLOADS, self.rel_path))
+ return PathStr(normpath(path_join(PATH_DOWNLOADS, self.rel_path)))
+
+ @property
+ def basename(self) -> PathStr:
+ """Return basename(self.rel_path)."""
+ return PathStr(basename(self.rel_path))
@property
def present(self) -> bool:
self._mpv: Optional[MPV] = None
def _load_filenames(self) -> None:
- self._filenames = [PathStr(e.path) for e in scandir(PATH_DOWNLOADS)
- if isfile(e.path)
- and splitext(e.path)[1][1:] in LEGAL_EXTENSIONS]
- shuffle(self._filenames)
+ conn = DbConnection()
+ known_files = {f.full_path: f for f in VideoFile.get_all(conn)}
+ conn.commit_close()
+ self._files = [known_files[PathStr(e.path)]
+ for e in scandir(PATH_DOWNLOADS)
+ if e.path in known_files
+ and isfile(e.path)
+ and splitext(e.path)[1][1:] in LEGAL_EXTENSIONS]
+ shuffle(self._files)
self._idx = 0
@property
self._mpv = None
self._signal_update()
- for path in self._filenames:
+ for path in [f.full_path for f in self._files]:
self._mpv.playlist_append(path)
self._mpv.playlist_play_index(self._idx)
self._mpv = None
@property
- def current_filename(self) -> Optional[PathStr]:
- """Return what we assume is the name of the currently playing file."""
- if not self._filenames:
+ def current_file(self) -> Optional[VideoFile]:
+ """Return what we assume is the currently playing file."""
+ if not self._files:
return None
- return PathStr(basename(self._filenames[self._idx]))
+ return self._files[self._idx]
@property
- def prev_files(self) -> list[PathStr]:
+ def prev_files(self) -> list[VideoFile]:
"""List 'past' files of playlist."""
- return list(reversed(self._filenames[:self._idx]))
+ return list(reversed(self._files[:self._idx]))
@property
- def next_files(self) -> list[PathStr]:
+ def next_files(self) -> list[VideoFile]:
"""List 'coming' files of playlist."""
- return self._filenames[self._idx + 1:]
+ return self._files[self._idx + 1:]
@property
def is_running(self) -> bool:
headers=[('Content-type', 'application/json')])
def _send_playlist(self) -> None:
- tuples: list[tuple[PathStr, PathStr]] = []
+ tuples: PlaylistTuples = []
i: int = 0
while True:
- prev, next_ = PathStr(''), PathStr('')
+ prev, next_ = None, None
if len(self.server.player.prev_files) > i:
- prev = PathStr(basename(self.server.player.prev_files[i]))
+ prev = self.server.player.prev_files[i]
if len(self.server.player.next_files) > i:
- next_ = PathStr(basename(self.server.player.next_files[i]))
- if not prev + next_:
+ next_ = self.server.player.next_files[i]
+ if not (prev or next_):
break
tuples += [(prev, next_)]
i += 1
{'last_update': self.server.player.last_update,
'running': self.server.player.is_running,
'paused': self.server.player.is_paused,
- 'current_title': self.server.player.current_filename,
+ 'current_video': self.server.player.current_file,
'tuples': tuples})