"""Main ytplom lib."""
# included libs
-from typing import Generator, NewType, Optional, Self
+from typing import Callable, Generator, NewType, Optional, Self
from os import chdir, environ
from random import shuffle
-from time import sleep
from datetime import datetime, timedelta
from decimal import Decimal
from json import dumps as json_dumps, loads as json_loads
from uuid import uuid4
from pathlib import Path
from threading import Thread
-from queue import Queue
+from queue import SimpleQueue
# non-included libs
from ffmpeg import probe as ffprobe # type: ignore
import googleapiclient.discovery # type: ignore
rel_path: Path
digest: Hash
tags: TagSet
+ # class methods
+ call_forget: Callable
# class attributes
last_update_cutoff: DatetimeStr
tags_prefilter_needed: TagSet
"""Remove actual file from local filesystem."""
print(f'SYNC: removing from filesystem: {self.rel_path}')
self.full_path.unlink()
+ self.__class__.call_forget(self.yt_id)
def purge(self, conn) -> None:
"""Remove self from database, and in filesystem if no other owners."""
def __init__(self) -> None:
self._mpv: Optional[MPV] = None
self._monitoring_kill: bool = False
- self._kill_queue: Queue = Queue()
+ self._kill_queue: SimpleQueue = SimpleQueue()
self.playlist: list[VideoFile] = []
self.speed = -1.0
self.duration = -1
class DownloadsManager:
"""Manages downloading and downloads access."""
- _last_updates: dict[YoutubeId, dict[str, str]]
def __init__(self) -> None:
+ self._downloaded: list[YoutubeId] = []
+ self._downloading: Optional[YoutubeId] = None
self._to_download: list[YoutubeId] = []
+ self._status: str = ''
ensure_expected_dirs([PATH_DOWNLOADS, PATH_TEMP])
- self._last_updates: dict[YoutubeId, dict[str, str]] = {}
- self._sync_db()
-
- def _sync_db(self):
+ self._timestamps: dict[YoutubeId, DatetimeStr] = {}
with DbConn() as conn:
VideoFile.purge_deleteds(conn)
conn.commit()
- known_paths = [file.rel_path for file in VideoFile.get_all(conn)]
+ known_files = VideoFile.get_all(conn)
old_cwd = Path.cwd()
chdir(PATH_DOWNLOADS)
+ self.q: SimpleQueue = SimpleQueue()
for path in [p for p in Path('.').iterdir() if p.is_file()]:
yt_id = self._id_from_filename(path)
- if path not in known_paths:
+ if yt_id not in [f.yt_id for f in known_files]:
print(f'SYNC: new file {path}, saving to YT ID "{yt_id}".')
file = VideoFile(digest=None,
rel_path=path,
with DbConn() as conn:
file.save(conn)
conn.commit()
- with DbConn() as conn:
- for path in [p for p in Path('.').iterdir() if p.is_file()]:
- yt_id = self._id_from_filename(path)
- if (yt_id not in self._last_updates
- or 'present' != self._last_updates[yt_id]['status']):
- self._update_status(yt_id, 'present')
- self._files = VideoFile.get_all(conn)
+ self._downloaded += [yt_id]
chdir(old_cwd)
- def last_update_for(self,
- conn: DbConn,
- yt_id: YoutubeId
+ def last_update_for(self, conn: DbConn, yt_id: YoutubeId
) -> dict[str, str]:
- """Retrieve ._last_updates[yt_id] but reset to 'absent' if needed."""
- if yt_id in self._last_updates:
- if self._last_updates[yt_id]['status'] != 'present':
- return self._last_updates[yt_id]
- try:
- file = VideoFile.get_by_yt_id(conn, yt_id)
- if not file.present:
- self._update_status(yt_id, 'absent')
- except NotFoundException:
- self._update_status(yt_id, 'absent')
- else:
- self._update_status(yt_id, 'absent')
- return self._last_updates[yt_id]
-
- def _update_status(self, yt_id: YoutubeId, status: str) -> None:
- self._last_updates[yt_id] = {'status': status, 'time': _now_string()}
+ 'For yt_id construct update with timestamp, status, optional fields.'
+ update = {
+ 'time': self._timestamps.get(yt_id, '2000-01-01'),
+ ## 'status': ('present' if yt_id in self._downloaded + self._inherited
+ 'status': ('present' if yt_id in self._downloaded
+ else ('queued' if yt_id in self._to_download
+ else (self._status if yt_id == self._downloading
+ else 'absent')))}
+ if update['status'] == 'present':
+ file = VideoFile.get_by_yt_id(conn, yt_id)
+ update['path'] = str(file.rel_path)
+ update['digest'] = file.digest.b64
+ return update
+
+ def _update_timestamp(self, yt_id: YoutubeId) -> None:
+ self._timestamps[yt_id] = _now_string()
@staticmethod
def _id_from_filename(path: Path) -> YoutubeId:
print(f'removing unfinished download: {path}')
path.unlink()
- def queue_download(self, video_id: YoutubeId) -> None:
- """Add video_id to download queue *if* not already processed."""
- self._sync_db()
- if video_id in self._to_download:
+ def _queue_download(self, yt_id: YoutubeId) -> None:
+ if yt_id == self._downloading or yt_id in self._downloaded:
return
- for path in ([p for p in PATH_TEMP.iterdir() if p.is_file()]
- + [f.full_path for f in self._files]):
- if self._id_from_filename(path) == video_id:
- return
- self._to_download += [video_id]
- self._update_status(video_id, 'queued')
+ self._to_download += [yt_id]
+ if not self._downloading:
+ self.q.put('download_next')
+
+ def _forget_file(self, yt_id: YoutubeId) -> None:
+ if yt_id in self._downloaded:
+ self._downloaded.remove(yt_id)
+
+ def _savefile(self, arg: str) -> YoutubeId:
+ filename = Path(arg)
+ yt_id = self._id_from_filename(filename)
+ file = VideoFile(digest=None,
+ rel_path=filename,
+ yt_id=yt_id,
+ tags_str=VideoFile.tags_default.joined)
+ with DbConn() as conn:
+ file.save(conn)
+ conn.commit()
+ self._downloaded += [yt_id]
+ self._downloading = None
+ return yt_id
def _download_next(self) -> None:
- if self._to_download:
- downloaded_before: int = 0
- sizes: dict[str, list[int]] = {}
-
- def hook(d) -> None:
- nonlocal downloaded_before
- if d['status'] in {'downloading', 'finished'}:
- downloaded_i = d[TOK_LOADED]
- downloaded_mb = (downloaded_i + downloaded_before) / MEGA
- size = sizes[d['info_dict'][TOK_FO_ID]]
- if (not size[0]) and TOK_FRAG_C in d and TOK_FRAG_I in d:
- progress = min(d[TOK_FRAG_I] + 1, d[TOK_FRAG_C]
- ) / d[TOK_FRAG_C]
- size[1] = d[TOK_LOADED] / progress
- guess_total_mb = sum(t[1] for t in sizes.values()) / MEGA
- msg = f'{int(100 * downloaded_mb/guess_total_mb)}%: '\
- f'{downloaded_mb:5.1f}/{guess_total_mb:.1f}'
- self._update_status(video_id, f'downloading: {msg} MB')
- if d['status'] == 'finished':
- downloaded_before += downloaded_i
-
- video_id = self._to_download.pop(0)
- url = f'{YOUTUBE_URL_PREFIX}{video_id}'
- with YoutubeDL(YT_DL_PARAMS | {'progress_hooks': [hook]}) as ydl:
- self._update_status(video_id, 'preparing download')
- try:
- info = ydl.sanitize_info(ydl.extract_info(url,
- download=False))
- key_formats = 'requested_formats'
- if key_formats not in info:
- raise YoutubeDLError(f'no "{key_formats}" in info')
- for f in info[key_formats]:
- sizes[f[TOK_FO_ID]] = [False, 1]
- if TOK_FS_AP in f:
- sizes[f[TOK_FO_ID]] = [True, f[TOK_FS_AP]]
- ydl.download(url)
- except YoutubeDLError as e:
- self._update_status(video_id, 'ERROR')
- raise e
- self._sync_db()
+ if not self._to_download:
+ return
+ downloaded_before: int = 0
+ sizes: dict[str, list[int]] = {}
+ self._downloading = yt_id = self._to_download.pop(0)
+ filename: Optional[str] = None
+
+ def hook(d) -> None:
+ nonlocal downloaded_before
+ nonlocal filename
+ if d['status'] in {'downloading', 'finished'}:
+ downloaded_i = d[TOK_LOADED]
+ downloaded_mb = (downloaded_i + downloaded_before) / MEGA
+ size = sizes[d['info_dict'][TOK_FO_ID]]
+ if (not size[0]) and TOK_FRAG_C in d and TOK_FRAG_I in d:
+ progress = min(d[TOK_FRAG_I] + 1, d[TOK_FRAG_C]
+ ) / d[TOK_FRAG_C]
+ size[1] = d[TOK_LOADED] / progress
+ guess_total_mb = sum(t[1] for t in sizes.values()) / MEGA
+ msg = f'{int(100 * downloaded_mb/guess_total_mb)}%: '\
+ f'{downloaded_mb:5.1f}/{guess_total_mb:.1f}'
+ self._status = f'downloading: {msg} MB'
+ self._update_timestamp(yt_id)
+ if d['status'] == 'finished':
+ downloaded_before += downloaded_i
+ filename = Path(d["info_dict"]["filename"]).name
+
+ url = f'{YOUTUBE_URL_PREFIX}{self._downloading}'
+ self._status = 'preparing download'
+ with YoutubeDL(YT_DL_PARAMS | {'progress_hooks': [hook]}) as ydl:
+ self._update_timestamp(yt_id)
+ try:
+ self._status = 'extracting download info'
+ info_dirty = ydl.extract_info(url, download=False)
+ info = ydl.sanitize_info(info_dirty)
+ key_formats = 'requested_formats'
+ if key_formats not in info:
+ raise YoutubeDLError(f'no "{key_formats}" in info')
+ for f in info[key_formats]:
+ sizes[f[TOK_FO_ID]] = [False, 1]
+ if TOK_FS_AP in f:
+ sizes[f[TOK_FO_ID]] = [True, f[TOK_FS_AP]]
+ ydl.download(url)
+ except YoutubeDLError as e:
+ self._update_timestamp(yt_id)
+ self._status = 'ERROR'
+ raise e
+ self.q.put(f'savefile_{filename}')
+ self.q.put('download_next')
def start_thread(self) -> None:
- """Keep iterating through download queue for new download tasks."""
- def loop():
+ 'Collect, enact commands sent through .q.'
+ def loop() -> None:
while True:
- sleep(0.5)
- self._download_next()
+ command = self.q.get()
+ if command == 'download_next':
+ Thread(target=self._download_next, daemon=False).start()
+ continue
+ command, arg = command.split('_', maxsplit=1)
+ if command == 'savefile':
+ yt_id = self._savefile(arg)
+ else:
+ yt_id = arg
+ if command == 'forget':
+ self._forget_file(yt_id)
+ elif command == 'queue':
+ self._queue_download(yt_id)
+ self._update_timestamp(yt_id)
+
Thread(target=loop, daemon=False).start()