from os import chdir, environ, getcwd, makedirs, scandir, remove as os_remove
from os.path import (dirname, isdir, isfile, exists as path_exists,
join as path_join, splitext, basename)
+from base64 import urlsafe_b64encode, urlsafe_b64decode
from random import shuffle
from time import time, sleep
from datetime import datetime, timedelta
FlagsInt = NewType('FlagsInt', int)
AmountDownloads = NewType('AmountDownloads', int)
PlayerUpdateId = NewType('PlayerUpdateId', str)
+B64Str = NewType('B64Str', str)
PageNames: TypeAlias = dict[str, PathStr]
DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr]
TemplateContext: TypeAlias = dict[
str, None | bool | PlayerUpdateId | Optional[PathStr] | YoutubeId
| QueryText | QuotaCost | list[FlagName] | 'VideoFile' | 'YoutubeVideo'
| PageNames | list['YoutubeVideo'] | list['YoutubeQuery']
- | list[tuple[YoutubeId, PathStr]] | list[tuple[PathStr, PathStr]]]
+ | list[tuple[B64Str, PathStr]] | list[tuple[PathStr, PathStr]]]
# major expected directories
PATH_HOME = PathStr(environ.get('HOME', ''))
NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl')
# page names
-PAGE_NAMES: PageNames = {
+PAGE_NAMES = {
'download': PathStr('dl'),
'file': PathStr('file'),
'files': PathStr('files'),
class DbData:
"""Abstraction of common DB operation."""
+ id_name: str = 'id'
_table_name: str
_cols: tuple[str, ...]
@classmethod
def get_one(cls, conn: DbConnection, id_: str) -> Self:
"""Return single entry of id_ from DB."""
- sql = SqlText(f'SELECT * FROM {cls._table_name} WHERE id = ?')
+ sql = SqlText(f'SELECT * FROM {cls._table_name} '
+ f'WHERE {cls.id_name} = ?')
row = conn.exec(sql, (id_,)).fetchone()
if not row:
msg = f'no entry found for ID "{id_}" in table {cls._table_name}'
class VideoFile(DbData):
"""Collects data about downloaded files."""
+ id_name = 'rel_path'
_table_name = 'files'
_cols = ('rel_path', 'yt_id', 'flags', 'last_update')
last_update: DatetimeStr
raise NotFoundException(f'no entry for file to Youtube ID {yt_id}')
return cls._from_table_row(row)
+ @classmethod
+ def get_by_b64(cls, conn: DbConnection, rel_path_b64: B64Str) -> Self:
+ """Retrieve by .rel_path provided as urlsafe_b64 encoding."""
+ return cls.get_one(conn, urlsafe_b64decode(rel_path_b64).decode())
+
+ @property
+ def rel_path_b64(self) -> B64Str:
+ """Return .rel_path as urlsafe_b64 e3ncoding."""
+ return B64Str(urlsafe_b64encode(self.rel_path.encode()).decode())
+
@property
def full_path(self) -> PathStr:
"""Return self.rel_path suffixed under PATH_DOWNLOADS."""
return [f.rel_path for f in self._files if f.missing]
@property
- def ids_to_paths(self) -> DownloadsIndex:
- """Return mapping YoutubeIds:paths of files downloaded to them."""
+ def ids_to_full_paths(self) -> DownloadsIndex:
+ """Return mapping YoutubeIds:full paths of files downloaded to them."""
self._sync_db()
return {f.yt_id: f.full_path for f in self._files}
+ @property
+ def rel_paths_b64_to_rel_paths(self) -> list[tuple[B64Str, PathStr]]:
+ """Return mapping .rel_paths b64-encoded:rel_paths of known files."""
+ self._sync_db()
+ return [(f.rel_path_b64, f.rel_path) for f in self._files]
+
@property
def ids_unfinished(self) -> set[YoutubeId]:
"""Return set of IDs of videos awaiting or currently in download."""
def queue_download(self, video_id: YoutubeId) -> None:
"""Add video_id to download queue *if* not already processed."""
- pre_existing = self.ids_unfinished | set(self._to_download
- + list(self.ids_to_paths))
+ pre_existing = (self.ids_unfinished
+ | set(self._to_download
+ + list(self.ids_to_full_paths)))
if video_id not in pre_existing:
self._to_download += [video_id]
if PAGE_NAMES['playlist'] == page_name:
self._receive_player_command(list(postvars.keys())[0])
elif PAGE_NAMES['file'] == page_name:
- self._receive_video_flag(YoutubeId(toks_url[2]),
+ self._receive_video_flag(B64Str(toks_url[2]),
[FlagName(k) for k in postvars])
elif PAGE_NAMES['yt_queries'] == page_name:
self._receive_yt_query(QueryText(postvars['query'][0]))
self._send_http(headers=[('Location', '/')], code=302)
def _receive_video_flag(self,
- yt_id: YoutubeId,
+ rel_path_b64: B64Str,
flag_names: list[FlagName]
) -> None:
conn = DbConnection()
- file = VideoFile.get_by_yt_id(conn, yt_id)
+ file = VideoFile.get_by_b64(conn, rel_path_b64)
flags = FlagsInt(0)
for flag_name in flag_names:
flags = FlagsInt(file.flags | FILE_FLAGS[flag_name])
conn.commit_close()
file.ensure_absence_if_deleted()
self._send_http(headers=[('Location',
- f'/{PAGE_NAMES["file"]}/{yt_id}')],
+ f'/{PAGE_NAMES["file"]}/{rel_path_b64}')],
code=302)
def _receive_yt_query(self, query_txt: QueryText) -> None:
elif PAGE_NAMES['files'] == page_name:
self._send_files_index()
elif PAGE_NAMES['file'] == page_name:
- self._send_file_data(YoutubeId(toks_url[2]))
+ self._send_file_data(B64Str(toks_url[2]))
elif PAGE_NAMES['yt_result'] == page_name:
self._send_yt_result(YoutubeId(toks_url[2]))
elif PAGE_NAMES['missing'] == page_name:
self._send_http(img, [('Content-type', 'image/jpg')])
def _send_or_download_video(self, video_id: YoutubeId) -> None:
- if video_id in self.server.downloads.ids_to_paths:
- with open(self.server.downloads.ids_to_paths[video_id],
+ if video_id in self.server.downloads.ids_to_full_paths:
+ with open(self.server.downloads.ids_to_full_paths[video_id],
'rb') as video_file:
video = video_file.read()
self._send_http(content=video)
NAME_TEMPLATE_YT_VIDEO,
{'video_data': video_data,
'is_temp': video_id in self.server.downloads.ids_unfinished,
- 'file_path': self.server.downloads.ids_to_paths.get(video_id,
- None),
+ 'file_path': self.server.downloads.ids_to_full_paths.get(
+ video_id, None),
'youtube_prefix': YOUTUBE_URL_PREFIX,
'queries': linked_queries})
- def _send_file_data(self, yt_id: YoutubeId) -> None:
+ def _send_file_data(self, rel_path_b64: B64Str) -> None:
conn = DbConnection()
- file = VideoFile.get_by_yt_id(conn, yt_id)
+ file = VideoFile.get_by_b64(conn, rel_path_b64)
conn.commit_close()
self._send_rendered_template(
NAME_TEMPLATE_FILE_DATA,
{'file': file, 'flag_names': list(FILE_FLAGS)})
def _send_files_index(self) -> None:
- videos = [(id_, PathStr(basename(path)))
- for id_, path in self.server.downloads.ids_to_paths.items()]
- videos.sort(key=lambda t: t[1])
- self._send_rendered_template(NAME_TEMPLATE_FILES, {'videos': videos})
+ files = self.server.downloads.rel_paths_b64_to_rel_paths
+ files.sort(key=lambda t: t[1])
+ self._send_rendered_template(NAME_TEMPLATE_FILES, {'files': files})
def _send_missing_json(self) -> None:
self._send_http(