before_ext = splitext(before_ext)[0]
return YoutubeId(before_ext.split('[')[-1].split(']')[0])
- @property
- def missing(self) -> list[PathStr]:
- """Return relative paths of files known but not in PATH_DOWNLOADS."""
- self._sync_db()
- return [f.rel_path for f in self._files if f.missing]
-
- @property
- def ids_to_full_paths(self) -> DownloadsIndex:
- """Return mapping YoutubeIds:full paths of files downloaded to them."""
- self._sync_db()
- return {f.yt_id: f.full_path for f in self._files}
-
- @property
- def rel_paths_b64_to_rel_paths(self) -> list[tuple[B64Str, PathStr]]:
- """Return mapping .rel_paths b64-encoded:rel_paths of known files."""
- self._sync_db()
- return [(f.rel_path_b64, f.rel_path) for f in self._files]
-
@property
def ids_unfinished(self) -> set[YoutubeId]:
"""Return set of IDs of videos awaiting or currently in download."""
def queue_download(self, video_id: YoutubeId) -> None:
"""Add video_id to download queue *if* not already processed."""
+ self._sync_db()
pre_existing = (self.ids_unfinished
| set(self._to_download
- + list(self.ids_to_full_paths)))
+ + [f.full_path for f in self._files]))
if video_id not in pre_existing:
self._to_download += [video_id]
self._send_http(img, [('Content-type', 'image/jpg')])
def _send_or_download_video(self, video_id: YoutubeId) -> None:
- if video_id in self.server.downloads.ids_to_full_paths:
- with open(self.server.downloads.ids_to_full_paths[video_id],
- 'rb') as video_file:
- video = video_file.read()
- self._send_http(content=video)
+ conn = DbConnection()
+ try:
+ file_data = VideoFile.get_by_yt_id(conn, video_id)
+ except NotFoundException:
+ conn.commit_close()
+ self.server.downloads.queue_download(video_id)
+ self._send_http(headers=[('Location',
+ f'/{PAGE_NAMES["yt_result"]}/{video_id}')],
+ code=302)
return
- self.server.downloads.queue_download(video_id)
- self._send_http(headers=[('Location',
- f'/{PAGE_NAMES["yt_result"]}/{video_id}')],
- code=302)
+ conn.commit_close()
+ with open(file_data.full_path, 'rb') as video_file:
+ video = video_file.read()
+ self._send_http(content=video)
def _send_yt_query_page(self, query_id: QueryId) -> None:
conn = DbConnection()
video_data = YoutubeVideo.get_one(conn, video_id)
except NotFoundException:
video_data = YoutubeVideo(video_id)
+ try:
+ file_path = VideoFile.get_by_yt_id(conn, video_id).full_path
+ except NotFoundException:
+ file_path = None
conn.commit_close()
self._send_rendered_template(
NAME_TEMPLATE_YT_VIDEO,
{'video_data': video_data,
'is_temp': video_id in self.server.downloads.ids_unfinished,
- 'file_path': self.server.downloads.ids_to_full_paths.get(
- video_id, None),
+ 'file_path': file_path,
'youtube_prefix': YOUTUBE_URL_PREFIX,
'queries': linked_queries})
{'file': file, 'flag_names': list(FILE_FLAGS)})
def _send_files_index(self) -> None:
- files = self.server.downloads.rel_paths_b64_to_rel_paths
+ conn = DbConnection()
+ files = [(f.rel_path_b64, f.rel_path) for f in VideoFile.get_all(conn)]
+ conn.commit_close()
files.sort(key=lambda t: t[1])
self._send_rendered_template(NAME_TEMPLATE_FILES, {'files': files})
def _send_missing_json(self) -> None:
- self._send_http(
- bytes(json_dumps(self.server.downloads.missing), 'utf8'),
- headers=[('Content-type', 'application/json')])
+ conn = DbConnection()
+ missing = [f.rel_path for f in VideoFile.get_all(conn) if f.missing]
+ conn.commit_close()
+ self._send_http(bytes(json_dumps(missing), 'utf8'),
+ headers=[('Content-type', 'application/json')])
def _send_last_playlist_update(self) -> None:
payload: dict[str, PlayerUpdateId] = {