from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove
from os.path import (dirname, isdir, isfile, exists as path_exists,
join as path_join, splitext, basename)
from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove
from os.path import (dirname, isdir, isfile, exists as path_exists,
join as path_join, splitext, basename)
from random import shuffle
from time import time, sleep
from datetime import datetime, timedelta
from random import shuffle
from time import time, sleep
from datetime import datetime, timedelta
FlagsInt = NewType('FlagsInt', int)
AmountDownloads = NewType('AmountDownloads', int)
PlayerUpdateId = NewType('PlayerUpdateId', str)
FlagsInt = NewType('FlagsInt', int)
AmountDownloads = NewType('AmountDownloads', int)
PlayerUpdateId = NewType('PlayerUpdateId', str)
PageNames: TypeAlias = dict[str, PathStr]
DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr]
TemplateContext: TypeAlias = dict[
str, None | bool | PlayerUpdateId | Optional[PathStr] | YoutubeId
| QueryText | QuotaCost | list[FlagName] | 'VideoFile' | 'YoutubeVideo'
| PageNames | list['YoutubeVideo'] | list['YoutubeQuery']
PageNames: TypeAlias = dict[str, PathStr]
DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr]
TemplateContext: TypeAlias = dict[
str, None | bool | PlayerUpdateId | Optional[PathStr] | YoutubeId
| QueryText | QuotaCost | list[FlagName] | 'VideoFile' | 'YoutubeVideo'
| PageNames | list['YoutubeVideo'] | list['YoutubeQuery']
# major expected directories
PATH_HOME = PathStr(environ.get('HOME', ''))
# major expected directories
PATH_HOME = PathStr(environ.get('HOME', ''))
'download': PathStr('dl'),
'file': PathStr('file'),
'files': PathStr('files'),
'download': PathStr('dl'),
'file': PathStr('file'),
'files': PathStr('files'),
@classmethod
def get_one(cls, conn: DbConnection, id_: str) -> Self:
"""Return single entry of id_ from DB."""
@classmethod
def get_one(cls, conn: DbConnection, id_: str) -> Self:
"""Return single entry of id_ from DB."""
row = conn.exec(sql, (id_,)).fetchone()
if not row:
msg = f'no entry found for ID "{id_}" in table {cls._table_name}'
row = conn.exec(sql, (id_,)).fetchone()
if not row:
msg = f'no entry found for ID "{id_}" in table {cls._table_name}'
_table_name = 'files'
_cols = ('rel_path', 'yt_id', 'flags', 'last_update')
last_update: DatetimeStr
_table_name = 'files'
_cols = ('rel_path', 'yt_id', 'flags', 'last_update')
last_update: DatetimeStr
+ @classmethod
+ def get_by_b64(cls, conn: DbConnection, rel_path_b64: B64Str) -> Self:
+ """Retrieve by .rel_path provided as urlsafe_b64 encoding."""
+ return cls.get_one(conn, urlsafe_b64decode(rel_path_b64).decode())
+
+ @property
+ def rel_path_b64(self) -> B64Str:
+ """Return .rel_path as urlsafe_b64 e3ncoding."""
+ return B64Str(urlsafe_b64encode(self.rel_path.encode()).decode())
+
- def ids_to_paths(self) -> DownloadsIndex:
- """Return mapping YoutubeIds:paths of files downloaded to them."""
+ def ids_to_full_paths(self) -> DownloadsIndex:
+ """Return mapping YoutubeIds:full paths of files downloaded to them."""
+ @property
+ def rel_paths_b64_to_rel_paths(self) -> list[tuple[B64Str, PathStr]]:
+ """Return mapping .rel_paths b64-encoded:rel_paths of known files."""
+ self._sync_db()
+ return [(f.rel_path_b64, f.rel_path) for f in self._files]
+
@property
def ids_unfinished(self) -> set[YoutubeId]:
"""Return set of IDs of videos awaiting or currently in download."""
@property
def ids_unfinished(self) -> set[YoutubeId]:
"""Return set of IDs of videos awaiting or currently in download."""
def queue_download(self, video_id: YoutubeId) -> None:
"""Add video_id to download queue *if* not already processed."""
def queue_download(self, video_id: YoutubeId) -> None:
"""Add video_id to download queue *if* not already processed."""
- pre_existing = self.ids_unfinished | set(self._to_download
- + list(self.ids_to_paths))
+ pre_existing = (self.ids_unfinished
+ | set(self._to_download
+ + list(self.ids_to_full_paths)))
if PAGE_NAMES['playlist'] == page_name:
self._receive_player_command(list(postvars.keys())[0])
elif PAGE_NAMES['file'] == page_name:
if PAGE_NAMES['playlist'] == page_name:
self._receive_player_command(list(postvars.keys())[0])
elif PAGE_NAMES['file'] == page_name:
[FlagName(k) for k in postvars])
elif PAGE_NAMES['yt_queries'] == page_name:
self._receive_yt_query(QueryText(postvars['query'][0]))
[FlagName(k) for k in postvars])
elif PAGE_NAMES['yt_queries'] == page_name:
self._receive_yt_query(QueryText(postvars['query'][0]))
self._send_http(headers=[('Location', '/')], code=302)
def _receive_video_flag(self,
self._send_http(headers=[('Location', '/')], code=302)
def _receive_video_flag(self,
conn.commit_close()
file.ensure_absence_if_deleted()
self._send_http(headers=[('Location',
conn.commit_close()
file.ensure_absence_if_deleted()
self._send_http(headers=[('Location',
elif PAGE_NAMES['yt_result'] == page_name:
self._send_yt_result(YoutubeId(toks_url[2]))
elif PAGE_NAMES['missing'] == page_name:
elif PAGE_NAMES['yt_result'] == page_name:
self._send_yt_result(YoutubeId(toks_url[2]))
elif PAGE_NAMES['missing'] == page_name:
self._send_http(img, [('Content-type', 'image/jpg')])
def _send_or_download_video(self, video_id: YoutubeId) -> None:
self._send_http(img, [('Content-type', 'image/jpg')])
def _send_or_download_video(self, video_id: YoutubeId) -> None:
- if video_id in self.server.downloads.ids_to_paths:
- with open(self.server.downloads.ids_to_paths[video_id],
+ if video_id in self.server.downloads.ids_to_full_paths:
+ with open(self.server.downloads.ids_to_full_paths[video_id],
NAME_TEMPLATE_YT_VIDEO,
{'video_data': video_data,
'is_temp': video_id in self.server.downloads.ids_unfinished,
NAME_TEMPLATE_YT_VIDEO,
{'video_data': video_data,
'is_temp': video_id in self.server.downloads.ids_unfinished,
'youtube_prefix': YOUTUBE_URL_PREFIX,
'queries': linked_queries})
'youtube_prefix': YOUTUBE_URL_PREFIX,
'queries': linked_queries})
conn.commit_close()
self._send_rendered_template(
NAME_TEMPLATE_FILE_DATA,
{'file': file, 'flag_names': list(FILE_FLAGS)})
def _send_files_index(self) -> None:
conn.commit_close()
self._send_rendered_template(
NAME_TEMPLATE_FILE_DATA,
{'file': file, 'flag_names': list(FILE_FLAGS)})
def _send_files_index(self) -> None:
- videos = [(id_, PathStr(basename(path)))
- for id_, path in self.server.downloads.ids_to_paths.items()]
- videos.sort(key=lambda t: t[1])
- self._send_rendered_template(NAME_TEMPLATE_FILES, {'videos': videos})
+ files = self.server.downloads.rel_paths_b64_to_rel_paths
+ files.sort(key=lambda t: t[1])
+ self._send_rendered_template(NAME_TEMPLATE_FILES, {'files': files})