from paramiko import SSHClient # type: ignore
from scp import SCPClient # type: ignore
from ytplom.misc import (
- PATH_DB, PATH_DOWNLOADS, PATH_TEMP,
+ PAGE_NAMES, PATH_DB, PATH_DOWNLOADS, PATH_TEMP,
DatabaseConnection, PathStr, QuotaLog, VideoFile,
YoutubeQuery, YoutubeVideo)
YTPLOM_PORT = environ.get('YTPLOM_PORT')
PATH_DB_REMOTE = PathStr(path_join(PATH_TEMP, 'remote_db.sql'))
-MISSING_API = PathStr('/missing.json')
ATTR_NAME_LAST_UPDATE = 'last_update'
os_remove(PATH_DB_REMOTE)
for host, direction, mover in ((YTPLOM_REMOTE, 'local->remote', scp.put),
('localhost', 'remote->local', scp.get)):
- with urlopen(f'http://{host}:{YTPLOM_PORT}{MISSING_API}') as response:
+ url_missing = f'http://{host}:{YTPLOM_PORT}/{PAGE_NAMES["missing"]}'
+ with urlopen(url_missing) as response:
missing = json_loads(response.read())
for path in (path_join(PATH_DOWNLOADS, path) for path in missing):
print(f'SYNC: sending {direction} file {path}')
FlagsInt = NewType('FlagsInt', int)
AmountDownloads = NewType('AmountDownloads', int)
PlayerUpdateId = NewType('PlayerUpdateId', str)
+PageNames: TypeAlias = dict[str, PathStr]
DownloadsIndex: TypeAlias = dict[YoutubeId, PathStr]
TemplateContext: TypeAlias = dict[
str, None | bool | PlayerUpdateId | Optional[PathStr] | YoutubeId
| QueryText | QuotaCost | list[FlagName] | 'VideoFile' | 'YoutubeVideo'
- | list['YoutubeVideo'] | list['YoutubeQuery']
+ | PageNames | list['YoutubeVideo'] | list['YoutubeQuery']
| list[tuple[YoutubeId, PathStr]] | list[tuple[PathStr, PathStr]]]
# major expected directories
NAME_TEMPLATE_YT_VIDEO = PathStr('yt_video.tmpl')
NAME_TEMPLATE_PLAYLIST = PathStr('playlist.tmpl')
+# page names
+PAGE_NAMES: PageNames = {
+ 'download': PathStr('dll'),
+ 'file': PathStr('fille'),
+ 'files': PathStr('filles'),
+ 'last_update': PathStr('last_playlist_update.json'),
+ 'missing': PathStr('missing.jsoner'),
+ 'playlist': PathStr('playlisttt'),
+ 'thumbnails': PathStr('thumbnailz'),
+ 'yt_result': PathStr('ytt_video'),
+ 'yt_query': PathStr('ytt_query'),
+ 'yt_queries': PathStr('ytt_queries')
+}
+
# yt_dlp config
YT_DOWNLOAD_FORMAT = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
'/best[height<=1080][width<=1920]'
page_name = toks_url[1]
body_length = int(self.headers['content-length'])
postvars = parse_qs(self.rfile.read(body_length).decode())
- if 'playlist' == page_name:
+ if PAGE_NAMES['playlist'] == page_name:
self._post_player_command(list(postvars.keys())[0])
- elif 'video' == page_name:
+ elif PAGE_NAMES['file'] == page_name:
self._post_video_flag(YoutubeId(toks_url[2]),
[FlagName(k) for k in postvars])
- elif 'queries' == page_name:
+ elif PAGE_NAMES['yt_queries'] == page_name:
self._post_query(QueryText(postvars['query'][0]))
def _post_player_command(self, command: str) -> None:
file.save(conn)
conn.commit_close()
file.ensure_absence_if_deleted()
- self._send_http(headers=[('Location', f'/video/{yt_id}')], code=302)
+ self._send_http(headers=[('Location',
+ f'/{PAGE_NAMES["file"]}/{yt_id}')],
+ code=302)
def _post_query(self, query_txt: QueryText) -> None:
conn = DatabaseConnection()
result.save(conn)
result.save_to_query(conn, query_data.id_)
conn.commit_close()
- self._send_http(headers=[('Location', f'/query/{query_data.id_}')],
- code=302)
+ self._send_http(
+ headers=[('Location',
+ f'/{PAGE_NAMES["yt_query"]}/{query_data.id_}')],
+ code=302)
def do_GET(self) -> None: # pylint:disable=invalid-name
"""Map GET requests to handlers for various paths."""
toks_url: list[str] = url.path.split('/')
page_name = toks_url[1]
try:
- if 'thumbnails' == page_name:
+ if PAGE_NAMES['thumbnails'] == page_name:
self._send_thumbnail(PathStr(toks_url[2]))
- elif 'dl' == page_name:
+ elif PAGE_NAMES['download'] == page_name:
self._send_or_download_video(YoutubeId(toks_url[2]))
- elif 'videos' == page_name:
+ elif PAGE_NAMES['files'] == page_name:
self._send_videos_index()
- elif 'video' == page_name:
+ elif PAGE_NAMES['file'] == page_name:
self._send_video_data(YoutubeId(toks_url[2]))
- elif 'yt_video' == page_name:
+ elif PAGE_NAMES['yt_result'] == page_name:
self._send_yt_video_data(YoutubeId(toks_url[2]))
- elif 'missing.json' == page_name:
+ elif PAGE_NAMES['missing'] == page_name:
self._send_missing_json()
- elif 'query' == page_name:
+ elif PAGE_NAMES['yt_query'] == page_name:
self._send_query_page(QueryId(toks_url[2]))
- elif 'queries' == page_name:
+ elif PAGE_NAMES['yt_queries'] == page_name:
self._send_queries_index_and_search()
- elif '_last_playlist_update.json' == page_name:
+ elif PAGE_NAMES['last_update'] == page_name:
self._send_last_playlist_update()
else: # e.g. for /
self._send_playlist()
tmpl_ctx: TemplateContext
) -> None:
tmpl = self.server.jinja.get_template(tmpl_name)
+ tmpl_ctx['page_names'] = PAGE_NAMES
html = tmpl.render(**tmpl_ctx)
self._send_http(bytes(html, 'utf8'))
self._send_http(content=video)
return
self.server.downloads.queue_download(video_id)
- self._send_http(headers=[('Location', f'/yt_video/{video_id}')],
+ self._send_http(headers=[('Location',
+ f'/{PAGE_NAMES["yt_result"]}/{video_id}')],
code=302)
def _send_query_page(self, query_id: QueryId) -> None: