#!/usr/bin/env python3
+"""Minimalistic download-focused YouTube interface."""
from os import environ, makedirs, scandir, remove as os_remove
from os.path import (isdir, exists as path_exists, join as path_join, splitext,
from urllib.parse import urlparse, parse_qs
from urllib.request import urlretrieve
from hashlib import md5
from jinja2 import Template
-from yt_dlp import YoutubeDL
-import googleapiclient.discovery
+from yt_dlp import YoutubeDL # type: ignore
+import googleapiclient.discovery # type: ignore
API_KEY = environ.get('GOOGLE_API_KEY')
-to_download = []
+to_download: list[str] = []
def ensure_expected_dirs_and_files():
+ """Ensure existance of all dirs and files we need for proper operation."""
for dir_name in EXPECTED_DIRS:
if not path_exists(dir_name):
print(f'creating expected directory: {dir_name}')
def clean_unfinished_downloads():
+ """Empty temp directory of unfinished downloads."""
for e in [e for e in scandir(PATH_DIR_TEMP) if e.is_file]:
print(f'removing unfinished download: {e.path}')
def run_server():
+ """Run HTTPServer on TaskHandler, handle KeyboardInterrupt as exit."""
server = HTTPServer(('localhost', HTTP_PORT), TaskHandler)
print(f'running at port {HTTP_PORT}')
def read_quota_log():
+ """Return logged quota expenditures of past 24 hours."""
with open(PATH_QUOTA_LOG, 'r', encoding='utf8') as f:
log = json_load(f)
ret = {}
def update_quota_log(now, cost):
+ """Update quota log from read_quota_log, add cost to now's row."""
quota_log = read_quota_log()
quota_log[now] = quota_log.get(now, 0) + cost
with open(PATH_QUOTA_LOG, 'w', encoding='utf8') as f:
def download_thread():
+ """Keep iterating through to_download for IDs, download their videos."""
while True:
url = f'{YOUTUBE_URL_PREFIX}{video_id}'
fmt = 'bestvideo[height<=1080][width<=1920]+bestaudio'\
- '/best[height<=1080][width<=1920]'
+ '/best[height<=1080][width<=1920]'
params = {'paths': {'home': PATH_DIR_DOWNLOADS, 'temp': NAME_DIR_TEMP},
'format': fmt}
with YoutubeDL(params) as ydl:
class TaskHandler(BaseHTTPRequestHandler):
+ """Handler for GET and POST requests to our server."""
def _send_http(self, content=None, headers=None, code=200):
headers = headers if headers else []
if content is not None:
- def do_POST(self):
+ def do_POST(self): # pylint:disable=invalid-name
+ """Send requests to YouTube API and cache them."""
length = int(self.headers['content-length'])
postvars = parse_qs(self.rfile.read(length).decode())
query = postvars['query'][0]
json_dump(to_save, f)
self._send_http(headers=[('Location', f'/query/{md5sum}')], code=302)
- def do_GET(self):
+ def do_GET(self): # pylint:disable=invalid-name
+ """Map GET requests to handlers for various paths."""
parsed_url = urlparse(self.path)
toks_url = parsed_url.path.split('/')
page = toks_url[1]
query = json_load(f)
for result in query['results']:
result['available'] = result['id'] in downloaded
- date_dur, time_dur_remains = result['duration'].split('T')
+ date_dur, time_dur = result['duration'].split('T')
seconds = 0
- date_dur_remains = date_dur[1:]
+ date_dur = date_dur[1:]
for dur_char, len_seconds in (('Y', 60*60*24*365.25),
('M', 60*60*24*30),
('D', 60*60*24)):
- if dur_char in date_dur_remains:
- dur_str, date_dur_remains = date_dur_remains.split(dur_char)
+ if dur_char in date_dur:
+ dur_str, date_dur = date_dur.split(dur_char)
seconds += int(dur_str) * len_seconds
for dur_char, len_seconds in (('H', 60*60),
('M', 60),
('S', 1)):
- if dur_char in time_dur_remains:
- dur_str, time_dur_remains = time_dur_remains.split(dur_char)
+ if dur_char in time_dur:
+ dur_str, time_dur = time_dur.split(dur_char)
seconds += int(dur_str) * len_seconds
seconds_str = str(seconds % 60)
minutes_str = str(seconds // 60)
result['duration'] = ':'.join(
[f'0{str_}' if len(str_) == 1 else str_
for str_ in (hours_str, minutes_str, seconds_str)])
- result['definition'] = result['definition'].upper()
+ result['definition'] = result['definition'].upper()
kwargs['query'] = query