"""Main ytplom lib."""
# included libs
-from typing import TypeAlias, Optional, NewType, Callable, Self, Any
+from typing import Any, NewType, Optional, Self, TypeAlias
from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove
from os.path import (dirname, isdir, isfile, exists as path_exists,
join as path_join, normpath, splitext, basename)
shuffle(self._files)
self._idx = 0
- @property
- def _mpv_available(self) -> bool:
- return bool(self._mpv and not self._mpv.core_shutdown)
-
- @staticmethod
- def _if_mpv_available(f) -> Callable:
- def wrapper(self, *args, **kwargs):
- return f(self, *args, **kwargs) if self._mpv else None
- return wrapper
-
def _signal_update(self) -> None:
self.last_update = PlayerUpdateId(f'{self._idx}:{time()}')
input_vo_keyboard=True,
config=True)
self._mpv.observe_property('pause', lambda a, b: self._signal_update())
+ for path in [f.full_path for f in self._files]:
+ self._mpv.command('loadfile', path, 'append')
@self._mpv.event_callback('start-file')
def on_start_file(_) -> None:
self._mpv = None
self._signal_update()
- for path in [f.full_path for f in self._files]:
- self._mpv.command('loadfile', path, 'append')
self._mpv.command('playlist-play-index', self._idx)
- @_if_mpv_available
def _kill_mpv(self) -> None:
- assert self._mpv is not None
- self._mpv.terminate()
- self._mpv = None
+ if self._mpv:
+ self._mpv.terminate()
+ self._mpv = None
@property
def current_file(self) -> Optional[VideoFile]:
@property
def is_running(self) -> bool:
"""Return if player is running/available."""
- return self._mpv_available
+ return bool(self._mpv)
@property
def is_paused(self) -> bool:
"""Return if player is paused."""
- if self._mpv_available:
- assert self._mpv is not None
+ if self._mpv:
return self._mpv.pause
return False
def toggle_run(self) -> None:
"""Toggle player running."""
- if self._mpv_available:
+ if self._mpv:
self._kill_mpv()
else:
self._start_mpv()
self._signal_update()
- @_if_mpv_available
def toggle_pause(self) -> None:
"""Toggle player pausing."""
- assert self._mpv is not None
- self._mpv.pause = not self._mpv.pause
- self._signal_update()
+ if self._mpv:
+ self._mpv.pause = not self._mpv.pause
+ self._signal_update()
- @_if_mpv_available
def prev(self) -> None:
"""Move player to previous item in playlist."""
- assert self._mpv is not None
- if self._mpv.playlist_pos > 0:
- self._mpv.command('playlist-prev')
- else:
- self._mpv.command('playlist-play-index', 0)
+ if self._idx > 0:
+ self._idx -= 1
+ if self._mpv:
+ self._mpv.command('playlist-play-index', self._idx)
- @_if_mpv_available
def next(self) -> None:
"""Move player to next item in playlist."""
- assert self._mpv is not None
- max_idx = len(self._mpv.playlist) - 1
- if self._mpv.playlist_pos < max_idx:
- self._mpv.command('playlist-next')
- else:
- self._mpv.command('playlist-play-index', max_idx)
+ if self._idx < len(self._files) - 1:
+ self._idx += 1
+ if self._mpv:
+ self._mpv.command('playlist-play-index', self._idx)
- @_if_mpv_available
def jump_to(self, target_idx: int) -> None:
"""Move player to target_idx position in playlist."""
- assert self._mpv is not None
- self._mpv.command('playlist-play_index', target_idx)
+ self._idx = target_idx
+ if self._mpv:
+ self._mpv.command('playlist-play-index', self._idx)
def reload(self) -> None:
"""Close MPV, re-read (and re-shuffle) filenames, then re-start MPV."""