def _receive_files_command(self, command: str) -> None:
if command.startswith('play_'):
- conn = DbConnection()
- file = VideoFile.get_by_b64(conn, B64Str(command.split('_', 1)[1]))
- conn.commit_close()
+ with DbConnection() as conn:
+ file = VideoFile.get_by_b64(conn,
+ B64Str(command.split('_', 1)[1]))
self.server.player.inject_and_play(file)
self._redirect(Path('/'))
rel_path_b64: B64Str,
flag_names: list[FlagName]
) -> None:
- conn = DbConnection()
- file = VideoFile.get_by_b64(conn, rel_path_b64)
- file.set_flags([FILE_FLAGS[name] for name in flag_names])
- file.save(conn)
- conn.commit_close()
+ with DbConnection() as conn:
+ file = VideoFile.get_by_b64(conn, rel_path_b64)
+ file.set_flags([FILE_FLAGS[name] for name in flag_names])
+ file.save(conn)
+ conn.commit()
file.ensure_absence_if_deleted()
self._redirect(Path('/')
.joinpath(PAGE_NAMES['file'])
.joinpath(rel_path_b64))
def _receive_yt_query(self, query_txt: QueryText) -> None:
- conn = DbConnection()
- query_data = YoutubeQuery.new_by_request_saved(
- conn, self.server.config, query_txt)
- conn.commit_close()
+ with DbConnection() as conn:
+ query_data = YoutubeQuery.new_by_request_saved(
+ conn, self.server.config, query_txt)
+ conn.commit()
self._redirect(Path('/')
.joinpath(PAGE_NAMES['yt_query'])
.joinpath(query_data.id_))
self._send_http(img, [('Content-type', 'image/jpg')])
def _send_or_download_video(self, video_id: YoutubeId) -> None:
- conn = DbConnection()
try:
- file_data = VideoFile.get_by_yt_id(conn, video_id)
+ with DbConnection() as conn:
+ file_data = VideoFile.get_by_yt_id(conn, video_id)
except NotFoundException:
- conn.commit_close()
self.server.downloads.queue_download(video_id)
self._redirect(Path('/')
.joinpath(PAGE_NAMES['yt_result'])
.joinpath(video_id))
return
- conn.commit_close()
with file_data.full_path.open('rb') as video_file:
video = video_file.read()
self._send_http(content=video)
def _send_yt_query_page(self, query_id: QueryId) -> None:
- conn = DbConnection()
- query = YoutubeQuery.get_one(conn, str(query_id))
- results = YoutubeVideo.get_all_for_query(conn, query_id)
- conn.commit_close()
+ with DbConnection() as conn:
+ query = YoutubeQuery.get_one(conn, str(query_id))
+ results = YoutubeVideo.get_all_for_query(conn, query_id)
self._send_rendered_template(
_NAME_TEMPLATE_RESULTS,
{'query': query.text, 'videos': results})
def _send_yt_queries_index_and_search(self) -> None:
- conn = DbConnection()
- quota_count = QuotaLog.current(conn)
- queries_data = YoutubeQuery.get_all(conn)
- conn.commit_close()
+ with DbConnection() as conn:
+ quota_count = QuotaLog.current(conn)
+ queries_data = YoutubeQuery.get_all(conn)
queries_data.sort(key=lambda q: q.retrieved_at, reverse=True)
self._send_rendered_template(
_NAME_TEMPLATE_QUERIES,
def _send_yt_result(self, video_id: YoutubeId) -> None:
conn = DbConnection()
- linked_queries = YoutubeQuery.get_all_for_video(conn, video_id)
- try:
- video_data = YoutubeVideo.get_one(conn, video_id)
- except NotFoundException:
- video_data = YoutubeVideo(video_id)
- try:
- file = VideoFile.get_by_yt_id(conn, video_id)
- file_path = file.full_path if file.present else None
- except NotFoundException:
- file_path = None
- conn.commit_close()
+ with DbConnection() as conn:
+ linked_queries = YoutubeQuery.get_all_for_video(conn, video_id)
+ try:
+ video_data = YoutubeVideo.get_one(conn, video_id)
+ except NotFoundException:
+ video_data = YoutubeVideo(video_id)
+ try:
+ file = VideoFile.get_by_yt_id(conn, video_id)
+ file_path = file.full_path if file.present else None
+ except NotFoundException:
+ file_path = None
self._send_rendered_template(
_NAME_TEMPLATE_YT_VIDEO,
{'video_data': video_data,
'queries': linked_queries})
def _send_file_data(self, rel_path_b64: B64Str) -> None:
- conn = DbConnection()
- file = VideoFile.get_by_b64(conn, rel_path_b64)
- conn.commit_close()
+ with DbConnection() as conn:
+ file = VideoFile.get_by_b64(conn, rel_path_b64)
self._send_rendered_template(
_NAME_TEMPLATE_FILE_DATA,
{'file': file, 'flag_names': list(FILE_FLAGS)})
filter_: _ParamsStr,
show_absent: bool
) -> None:
- conn = DbConnection()
- files = [f for f in VideoFile.get_all(conn)
- if filter_.lower() in str(f.rel_path).lower()
- and (show_absent or f.present)]
- conn.commit_close()
+ with DbConnection() as conn:
+ files = [f for f in VideoFile.get_all(conn)
+ if filter_.lower() in str(f.rel_path).lower()
+ and (show_absent or f.present)]
files.sort(key=lambda t: t.rel_path)
self._send_rendered_template(
_NAME_TEMPLATE_FILES,
'show_absent': show_absent})
def _send_missing_json(self) -> None:
- conn = DbConnection()
- missing = [f.rel_path for f in VideoFile.get_all(conn) if f.missing]
- conn.commit_close()
+ with DbConnection() as conn:
+ missing = [f.rel_path for f in VideoFile.get_all(conn)
+ if f.missing]
self._send_http(bytes(json_dumps(missing), 'utf8'),
headers=[('Content-type', 'application/json')])
"""Main ytplom lib."""
# included libs
-from typing import Any, NewType, Optional, Self, TypeAlias
+from typing import Any, Literal, NewType, Optional, Self, TypeAlias
from os import chdir, environ
from base64 import urlsafe_b64encode, urlsafe_b64decode
from random import shuffle
f'{EXPECTED_DB_VERSION} – run "migrate"?')
self._conn = sql_connect(self._path)
+ def __enter__(self) -> Self:
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback) -> Literal[False]:
+ self._conn.close()
+ return False
+
def exec(self, sql: SqlText, inputs: tuple[Any, ...] = tuple()) -> Cursor:
"""Wrapper around sqlite3.Connection.execute."""
return self._conn.execute(sql, inputs)
- def commit_close(self) -> None:
- """Run sqlite3.Connection.commit and .close."""
+ def commit(self) -> None:
+ """Commit changes (i.e. DbData.save() calls) to database."""
self._conn.commit()
- self._conn.close()
class DbData:
def load_files(self) -> None:
"""Collect files in PATH_DOWNLOADS DB-known and of legal extension."""
- conn = DbConnection()
- known_files = {f.full_path: f for f in VideoFile.get_all(conn)}
- conn.commit_close()
+ with DbConnection() as conn:
+ known_files = {f.full_path: f for f in VideoFile.get_all(conn)}
self._files = [known_files[p] for p in PATH_DOWNLOADS.iterdir()
if p in known_files
and p.is_file()
self._sync_db()
def _sync_db(self):
- conn = DbConnection()
- known_paths = [file.rel_path for file in VideoFile.get_all(conn)]
- old_cwd = Path.cwd()
- chdir(PATH_DOWNLOADS)
- for path in [p for p in Path('.').iterdir()
- if p.is_file() and p not in known_paths]:
- yt_id = self._id_from_filename(path)
- file = VideoFile(path, yt_id)
- print(f'SYNC: new file {path}, saving with YT ID "{yt_id}".')
- file.save(conn)
- self._files = VideoFile.get_all(conn)
- for file in self._files:
- file.ensure_absence_if_deleted()
- chdir(old_cwd)
- conn.commit_close()
+ with DbConnection as conn:
+ known_paths = [file.rel_path for file in VideoFile.get_all(conn)]
+ old_cwd = Path.cwd()
+ chdir(PATH_DOWNLOADS)
+ for path in [p for p in Path('.').iterdir()
+ if p.is_file() and p not in known_paths]:
+ yt_id = self._id_from_filename(path)
+ file = VideoFile(path, yt_id)
+ print(f'SYNC: new file {path}, saving with YT ID "{yt_id}".')
+ file.save(conn)
+ self._files = VideoFile.get_all(conn)
+ for file in self._files:
+ file.ensure_absence_if_deleted()
+ chdir(old_cwd)
+ conn.commit()
@staticmethod
def _id_from_filename(path: Path) -> YoutubeId: