from scp import SCPClient # type: ignore
from ytplom.misc import (
PAGE_NAMES, PATH_DB, PATH_DOWNLOADS, PATH_TEMP,
- Config, DatabaseConnection, PathStr, QuotaLog, VideoFile,
+ Config, DbConnection, PathStr, QuotaLog, VideoFile,
YoutubeQuery, YoutubeVideo)
def back_and_forth(sync_func: Callable,
- dbs: tuple[DatabaseConnection, DatabaseConnection],
+ dbs: tuple[DbConnection, DbConnection],
shared: YoutubeVideo | tuple[Any, str]
) -> None:
"""Run sync_func on arg pairs + shared, and again with pairs switched."""
def sync_objects(host_names: tuple[str, str],
- dbs: tuple[DatabaseConnection, DatabaseConnection],
+ dbs: tuple[DbConnection, DbConnection],
shared: tuple[Any, str]
) -> None:
"""Equalize both DB's object collections; prefer newer states to older."""
def sync_relations(host_names: tuple[str, str],
- dbs: tuple[DatabaseConnection, DatabaseConnection],
+ dbs: tuple[DbConnection, DbConnection],
yt_result: YoutubeVideo
) -> None:
"""To dbs[1] add YT yt_video->yt_q_colls[0] mapping not in yt_q_colls[1]"""
ssh.connect(config.remote)
scp = SCPClient(ssh.get_transport())
scp.get(PATH_DB, PATH_DB_REMOTE)
- local_db = DatabaseConnection(PATH_DB)
- remote_db = DatabaseConnection(PATH_DB_REMOTE)
+ local_db, remote_db = DbConnection(PATH_DB), DbConnection(PATH_DB_REMOTE)
for cls in (QuotaLog, YoutubeQuery, YoutubeVideo, VideoFile):
back_and_forth(sync_objects, (local_db, remote_db),
(cls, 'rel_path' if cls is VideoFile else 'id_'))
# template paths
PATH_TEMPLATES = PathStr(path_join(PATH_APP_DATA, 'templates'))
-NAME_TEMPLATE_QUERIES = PathStr('queries.tmpl')
-NAME_TEMPLATE_RESULTS = PathStr('results.tmpl')
-NAME_TEMPLATE_VIDEOS = PathStr('videos.tmpl')
-NAME_TEMPLATE_VIDEO = PathStr('video.tmpl')
-NAME_TEMPLATE_YT_VIDEO = PathStr('yt_video.tmpl')
+NAME_TEMPLATE_QUERIES = PathStr('yt_queries.tmpl')
+NAME_TEMPLATE_RESULTS = PathStr('yt_results.tmpl')
+NAME_TEMPLATE_FILES = PathStr('files.tmpl')
+NAME_TEMPLATE_FILE_DATA = PathStr('file_data.tmpl')
+NAME_TEMPLATE_YT_VIDEO = PathStr('yt_result.tmpl')
NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl')
# page names
'missing': PathStr('missing.json'),
'playlist': PathStr('playlist'),
'thumbnails': PathStr('thumbnails'),
- 'yt_result': PathStr('yt_video'),
+ 'yt_result': PathStr('yt_result'),
'yt_query': PathStr('yt_query'),
'yt_queries': PathStr('yt_queries')
}
TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
LEGAL_EXTENSIONS = {'webm', 'mp4', 'mkv'}
NAME_INSTALLER = PathStr('install.sh')
-VIDEO_FLAGS: dict[FlagName, FlagsInt] = {
+FILE_FLAGS: dict[FlagName, FlagsInt] = {
FlagName('delete'): FlagsInt(1 << 62)
}
if k.isupper() and k.startswith(ENVIRON_PREFIX)})
-class DatabaseConnection:
+class DbConnection:
"""Wrapped sqlite3.Connection."""
def __init__(self, path: PathStr = PATH_DB) -> None:
return cls(**kwargs)
@classmethod
- def get_one(cls, conn: DatabaseConnection, id_: str) -> Self:
+ def get_one(cls, conn: DbConnection, id_: str) -> Self:
"""Return single entry of id_ from DB."""
sql = SqlText(f'SELECT * FROM {cls._table_name} WHERE id = ?')
row = conn.exec(sql, (id_,)).fetchone()
return cls._from_table_row(row)
@classmethod
- def get_all(cls, conn: DatabaseConnection) -> list[Self]:
+ def get_all(cls, conn: DbConnection) -> list[Self]:
"""Return all entries from DB."""
sql = SqlText(f'SELECT * FROM {cls._table_name}')
rows = conn.exec(sql).fetchall()
return [cls._from_table_row(row) for row in rows]
- def save(self, conn: DatabaseConnection) -> Cursor:
+ def save(self, conn: DbConnection) -> Cursor:
"""Save entry to DB."""
vals = [getattr(self, col_name) for col_name in self._cols]
q_marks = '(' + ','.join(['?'] * len(vals)) + ')'
@classmethod
def get_all_for_video(cls,
- conn: DatabaseConnection,
+ conn: DbConnection,
video_id: YoutubeId
) -> list[Self]:
"""Return YoutubeQueries containing YoutubeVideo's ID in results."""
@classmethod
def get_all_for_query(cls,
- conn: DatabaseConnection,
+ conn: DbConnection,
query_id: QueryId
) -> list[Self]:
"""Return all videos for query of query_id."""
return [cls.get_one(conn, video_id_tup[0])
for video_id_tup in video_ids]
- def save_to_query(self,
- conn: DatabaseConnection,
- query_id: QueryId
- ) -> None:
+ def save_to_query(self, conn: DbConnection, query_id: QueryId) -> None:
"""Save inclusion of self in results to query of query_id."""
conn.exec(SqlText('REPLACE INTO yt_query_results VALUES (?, ?)'),
(query_id, self.id_))
self.last_update = DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT))
@classmethod
- def get_by_yt_id(cls, conn: DatabaseConnection, yt_id: YoutubeId) -> Self:
+ def get_by_yt_id(cls, conn: DbConnection, yt_id: YoutubeId) -> Self:
"""Return VideoFile of .yt_id."""
sql = SqlText(f'SELECT * FROM {cls._table_name} WHERE yt_id = ?')
row = conn.exec(sql, (yt_id,)).fetchone()
def is_flag_set(self, flag_name: FlagName) -> bool:
"""Return if flag of flag_name is set in flags field."""
- return bool(self._flags & VIDEO_FLAGS[flag_name])
+ return bool(self._flags & FILE_FLAGS[flag_name])
def ensure_absence_if_deleted(self) -> None:
"""If 'delete' flag set, ensure no actual file in filesystem."""
self.cost = cost
@classmethod
- def update(cls, conn: DatabaseConnection, cost: QuotaCost) -> None:
+ def update(cls, conn: DbConnection, cost: QuotaCost) -> None:
"""Adds cost mapped to current datetime."""
cls._remove_old(conn)
new = cls(None,
new.save(conn)
@classmethod
- def current(cls, conn: DatabaseConnection) -> QuotaCost:
+ def current(cls, conn: DbConnection) -> QuotaCost:
"""Returns quota cost total for last 24 hours, purges old data."""
cls._remove_old(conn)
quota_costs = cls.get_all(conn)
return QuotaCost(sum(c.cost for c in quota_costs))
@classmethod
- def _remove_old(cls, conn: DatabaseConnection) -> None:
+ def _remove_old(cls, conn: DbConnection) -> None:
cutoff = datetime.now() - timedelta(days=1)
sql = SqlText(f'DELETE FROM {cls._table_name} WHERE timestamp < ?')
conn.exec(SqlText(sql), (cutoff.strftime(TIMESTAMP_FMT),))
self._sync_db()
def _sync_db(self):
- conn = DatabaseConnection()
+ conn = DbConnection()
files_via_db = VideoFile.get_all(conn)
old_cwd = getcwd()
chdir(PATH_DOWNLOADS)
body_length = int(self.headers['content-length'])
postvars = parse_qs(self.rfile.read(body_length).decode())
if PAGE_NAMES['playlist'] == page_name:
- self._post_player_command(list(postvars.keys())[0])
+ self._receive_player_command(list(postvars.keys())[0])
elif PAGE_NAMES['file'] == page_name:
- self._post_video_flag(YoutubeId(toks_url[2]),
- [FlagName(k) for k in postvars])
+ self._receive_video_flag(YoutubeId(toks_url[2]),
+ [FlagName(k) for k in postvars])
elif PAGE_NAMES['yt_queries'] == page_name:
- self._post_query(QueryText(postvars['query'][0]))
+ self._receive_yt_query(QueryText(postvars['query'][0]))
- def _post_player_command(self, command: str) -> None:
+ def _receive_player_command(self, command: str) -> None:
if 'pause' == command:
self.server.player.toggle_pause()
elif 'prev' == command:
sleep(0.5) # avoid redir happening before current_file update
self._send_http(headers=[('Location', '/')], code=302)
- def _post_video_flag(self,
- yt_id: YoutubeId,
- flag_names: list[FlagName]
- ) -> None:
- conn = DatabaseConnection()
+ def _receive_video_flag(self,
+ yt_id: YoutubeId,
+ flag_names: list[FlagName]
+ ) -> None:
+ conn = DbConnection()
file = VideoFile.get_by_yt_id(conn, yt_id)
flags = FlagsInt(0)
for flag_name in flag_names:
- flags = FlagsInt(file.flags | VIDEO_FLAGS[flag_name])
+ flags = FlagsInt(file.flags | FILE_FLAGS[flag_name])
file.flags = flags
file.save(conn)
conn.commit_close()
f'/{PAGE_NAMES["file"]}/{yt_id}')],
code=302)
- def _post_query(self, query_txt: QueryText) -> None:
- conn = DatabaseConnection()
+ def _receive_yt_query(self, query_txt: QueryText) -> None:
+ conn = DbConnection()
def collect_results(query_txt: QueryText) -> list[YoutubeVideo]:
_ensure_expected_dirs([PATH_THUMBNAILS])
elif PAGE_NAMES['download'] == page_name:
self._send_or_download_video(YoutubeId(toks_url[2]))
elif PAGE_NAMES['files'] == page_name:
- self._send_videos_index()
+ self._send_files_index()
elif PAGE_NAMES['file'] == page_name:
- self._send_video_data(YoutubeId(toks_url[2]))
+ self._send_file_data(YoutubeId(toks_url[2]))
elif PAGE_NAMES['yt_result'] == page_name:
- self._send_yt_video_data(YoutubeId(toks_url[2]))
+ self._send_yt_result(YoutubeId(toks_url[2]))
elif PAGE_NAMES['missing'] == page_name:
self._send_missing_json()
elif PAGE_NAMES['yt_query'] == page_name:
- self._send_query_page(QueryId(toks_url[2]))
+ self._send_yt_query_page(QueryId(toks_url[2]))
elif PAGE_NAMES['yt_queries'] == page_name:
- self._send_queries_index_and_search()
+ self._send_yt_queries_index_and_search()
elif PAGE_NAMES['last_update'] == page_name:
self._send_last_playlist_update()
else: # e.g. for /
f'/{PAGE_NAMES["yt_result"]}/{video_id}')],
code=302)
- def _send_query_page(self, query_id: QueryId) -> None:
- conn = DatabaseConnection()
+ def _send_yt_query_page(self, query_id: QueryId) -> None:
+ conn = DbConnection()
query = YoutubeQuery.get_one(conn, str(query_id))
results = YoutubeVideo.get_all_for_query(conn, query_id)
conn.commit_close()
NAME_TEMPLATE_RESULTS,
{'query': query.text, 'videos': results})
- def _send_queries_index_and_search(self) -> None:
- conn = DatabaseConnection()
+ def _send_yt_queries_index_and_search(self) -> None:
+ conn = DbConnection()
quota_count = QuotaLog.current(conn)
queries_data = YoutubeQuery.get_all(conn)
conn.commit_close()
NAME_TEMPLATE_QUERIES, {'queries': queries_data,
'quota_count': quota_count})
- def _send_yt_video_data(self, video_id: YoutubeId) -> None:
- conn = DatabaseConnection()
+ def _send_yt_result(self, video_id: YoutubeId) -> None:
+ conn = DbConnection()
linked_queries = YoutubeQuery.get_all_for_video(conn, video_id)
try:
video_data = YoutubeVideo.get_one(conn, video_id)
'youtube_prefix': YOUTUBE_URL_PREFIX,
'queries': linked_queries})
- def _send_video_data(self, yt_id: YoutubeId) -> None:
- conn = DatabaseConnection()
+ def _send_file_data(self, yt_id: YoutubeId) -> None:
+ conn = DbConnection()
file = VideoFile.get_by_yt_id(conn, yt_id)
conn.commit_close()
self._send_rendered_template(
- NAME_TEMPLATE_VIDEO,
- {'file': file, 'flag_names': list(VIDEO_FLAGS)})
+ NAME_TEMPLATE_FILE_DATA,
+ {'file': file, 'flag_names': list(FILE_FLAGS)})
- def _send_videos_index(self) -> None:
+ def _send_files_index(self) -> None:
videos = [(id_, PathStr(basename(path)))
for id_, path in self.server.downloads.ids_to_paths.items()]
videos.sort(key=lambda t: t[1])
- self._send_rendered_template(NAME_TEMPLATE_VIDEOS, {'videos': videos})
+ self._send_rendered_template(NAME_TEMPLATE_FILES, {'videos': videos})
def _send_missing_json(self) -> None:
self._send_http(