From: Christian Heller Date: Sat, 9 Nov 2024 03:07:20 +0000 (+0100) Subject: Improve code style. X-Git-Url: https://plomlompom.com/repos/%7B%7B%20web_path%20%7D%7D/decks/static/%7B%7Btodo.comment%7D%7D?a=commitdiff_plain;h=c0baffbdaf98e83ccb00cf5ce9a91321e117d4c0;p=ytplom Improve code style. --- diff --git a/ytplom.py b/ytplom.py index 370f504..4bb369d 100755 --- a/ytplom.py +++ b/ytplom.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +"""Minimalistic download-focused YouTube interface.""" from os import environ, makedirs, scandir, remove as os_remove from os.path import (isdir, exists as path_exists, join as path_join, splitext, basename) @@ -10,10 +11,9 @@ from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs from urllib.request import urlretrieve from hashlib import md5 - from jinja2 import Template -from yt_dlp import YoutubeDL -import googleapiclient.discovery +from yt_dlp import YoutubeDL # type: ignore +import googleapiclient.discovery # type: ignore API_KEY = environ.get('GOOGLE_API_KEY') @@ -37,10 +37,11 @@ YOUTUBE_URL_PREFIX = 'https://www.youtube.com/watch?v=' QUOTA_COST_YOUTUBE_SEARCH = 100 QUOTA_COST_YOUTUBE_DETAILS = 1 -to_download = [] +to_download: list[str] = [] def ensure_expected_dirs_and_files(): + """Ensure existance of all dirs and files we need for proper operation.""" for dir_name in EXPECTED_DIRS: if not path_exists(dir_name): print(f'creating expected directory: {dir_name}') @@ -60,12 +61,14 @@ def ensure_expected_dirs_and_files(): def clean_unfinished_downloads(): + """Empty temp directory of unfinished downloads.""" for e in [e for e in scandir(PATH_DIR_TEMP) if e.is_file]: print(f'removing unfinished download: {e.path}') os_remove(e.path) def run_server(): + """Run HTTPServer on TaskHandler, handle KeyboardInterrupt as exit.""" server = HTTPServer(('localhost', HTTP_PORT), TaskHandler) print(f'running at port {HTTP_PORT}') try: @@ -77,6 +80,7 @@ def run_server(): def read_quota_log(): + """Return logged quota expenditures of past 24 hours.""" with open(PATH_QUOTA_LOG, 'r', encoding='utf8') as f: log = json_load(f) ret = {} @@ -89,6 +93,7 @@ def read_quota_log(): def update_quota_log(now, cost): + """Update quota log from read_quota_log, add cost to now's row.""" quota_log = read_quota_log() quota_log[now] = quota_log.get(now, 0) + cost with open(PATH_QUOTA_LOG, 'w', encoding='utf8') as f: @@ -96,6 +101,7 @@ def update_quota_log(now, cost): def download_thread(): + """Keep iterating through to_download for IDs, download their videos.""" while True: sleep(0.5) try: @@ -104,7 +110,7 @@ def download_thread(): continue url = f'{YOUTUBE_URL_PREFIX}{video_id}' fmt = 'bestvideo[height<=1080][width<=1920]+bestaudio'\ - '/best[height<=1080][width<=1920]' + '/best[height<=1080][width<=1920]' params = {'paths': {'home': PATH_DIR_DOWNLOADS, 'temp': NAME_DIR_TEMP}, 'format': fmt} with YoutubeDL(params) as ydl: @@ -112,6 +118,7 @@ def download_thread(): class TaskHandler(BaseHTTPRequestHandler): + """Handler for GET and POST requests to our server.""" def _send_http(self, content=None, headers=None, code=200): headers = headers if headers else [] @@ -122,7 +129,8 @@ class TaskHandler(BaseHTTPRequestHandler): if content is not None: self.wfile.write(content) - def do_POST(self): + def do_POST(self): # pylint:disable=invalid-name + """Send requests to YouTube API and cache them.""" length = int(self.headers['content-length']) postvars = parse_qs(self.rfile.read(length).decode()) query = postvars['query'][0] @@ -165,7 +173,8 @@ class TaskHandler(BaseHTTPRequestHandler): json_dump(to_save, f) self._send_http(headers=[('Location', f'/query/{md5sum}')], code=302) - def do_GET(self): + def do_GET(self): # pylint:disable=invalid-name + """Map GET requests to handlers for various paths.""" parsed_url = urlparse(self.path) toks_url = parsed_url.path.split('/') page = toks_url[1] @@ -210,20 +219,20 @@ class TaskHandler(BaseHTTPRequestHandler): query = json_load(f) for result in query['results']: result['available'] = result['id'] in downloaded - date_dur, time_dur_remains = result['duration'].split('T') + date_dur, time_dur = result['duration'].split('T') seconds = 0 - date_dur_remains = date_dur[1:] + date_dur = date_dur[1:] for dur_char, len_seconds in (('Y', 60*60*24*365.25), ('M', 60*60*24*30), ('D', 60*60*24)): - if dur_char in date_dur_remains: - dur_str, date_dur_remains = date_dur_remains.split(dur_char) + if dur_char in date_dur: + dur_str, date_dur = date_dur.split(dur_char) seconds += int(dur_str) * len_seconds for dur_char, len_seconds in (('H', 60*60), ('M', 60), ('S', 1)): - if dur_char in time_dur_remains: - dur_str, time_dur_remains = time_dur_remains.split(dur_char) + if dur_char in time_dur: + dur_str, time_dur = time_dur.split(dur_char) seconds += int(dur_str) * len_seconds seconds_str = str(seconds % 60) minutes_str = str(seconds // 60) @@ -231,7 +240,7 @@ class TaskHandler(BaseHTTPRequestHandler): result['duration'] = ':'.join( [f'0{str_}' if len(str_) == 1 else str_ for str_ in (hours_str, minutes_str, seconds_str)]) - result['definition'] = result['definition'].upper() + result['definition'] = result['definition'].upper() kwargs['query'] = query else: tmpl_name = NAME_TEMPLATE_INDEX