splitext, basename)
from random import shuffle
from time import time, sleep
-from json import load as json_load, dump as json_dump, dumps as json_dumps
+from json import dumps as json_dumps
+from uuid import uuid4
from datetime import datetime, timedelta
from threading import Thread
from http.server import HTTPServer, BaseHTTPRequestHandler
"""Call on DB fetches finding less than expected."""
-PATH_QUOTA_LOG = PathStr('quota_log.json')
PATH_DIR_DOWNLOADS = PathStr('downloads')
PATH_DIR_THUMBNAILS = PathStr('thumbnails')
PATH_DIR_REQUESTS_CACHE = PathStr('cache_googleapi')
FOREIGN KEY (query_id) REFERENCES yt_queries(id),
FOREIGN KEY (video_id) REFERENCES yt_videos(id)
);
+CREATE TABLE quota_costs (
+ id TEXT PRIMARY KEY,
+ timestamp TEXT NOT NULL,
+ cost INT NOT NULL
+);
'''
to_download: list[VideoId] = []
(query_id, self.id_))
+class DbQuotaCost(DbData):
+ """Collects API access quota costs."""
+ _table_name = 'quota_costs'
+ _cols = ('id_', 'timestamp', 'cost')
+
+ def __init__(self,
+ id_: Optional[str],
+ timestamp: DatetimeStr,
+ cost: QuotaCost
+ ) -> None:
+ self.id_ = id_ if id_ else str(uuid4())
+ self.timestamp = timestamp
+ self.cost = cost
+
+ @classmethod
+ def update(cls, conn: DatabaseConnection, cost: QuotaCost) -> None:
+ """Adds cost mapped to current datetime."""
+ cls._remove_old(conn)
+ new = cls(None,
+ DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)),
+ QuotaCost(cost))
+ new.save(conn)
+
+ @classmethod
+ def current(cls, conn: DatabaseConnection) -> QuotaCost:
+ """Returns quota cost total for last 24 hours, purges old data."""
+ cls._remove_old(conn)
+ quota_costs = cls.get_all(conn)
+ return QuotaCost(sum(c.cost for c in quota_costs))
+
+ @classmethod
+ def _remove_old(cls, conn: DatabaseConnection) -> None:
+ cutoff = datetime.now() - timedelta(days=1)
+ sql = SqlText(f'DELETE FROM {cls._table_name} WHERE timestamp < ?')
+ conn.exec(SqlText(sql), (cutoff.strftime(TIMESTAMP_FMT),))
+
+
class Player:
"""MPV representation with some additional features."""
elif not isdir(dir_name):
msg = f'at expected directory path {dir_name} found non-directory'
raise Exception(msg)
- if not path_exists(PATH_QUOTA_LOG):
- with open(PATH_QUOTA_LOG, 'w', encoding='utf8') as f:
- f.write('{}')
- else:
- try:
- read_quota_log() # just to check if we can
- except Exception as e:
- print(f'Trouble reading quota log file at {PATH_QUOTA_LOG}:')
- raise e
def clean_unfinished_downloads() -> None:
server.server_close()
-def read_quota_log() -> QuotaLog:
- """Return logged quota expenditures of past 24 hours."""
- with open(PATH_QUOTA_LOG, 'r', encoding='utf8') as f:
- log = json_load(f)
- ret = {}
- now = datetime.now()
- for timestamp, amount in log.items():
- then = datetime.strptime(timestamp, TIMESTAMP_FMT)
- if then >= now - timedelta(days=1):
- ret[timestamp] = amount
- return ret
-
-
-def update_quota_log(now: DatetimeStr, cost: QuotaCost) -> None:
- """Update quota log from read_quota_log, add cost to now's row."""
- quota_log = read_quota_log()
- quota_log[now] = QuotaCost(quota_log.get(now, 0) + cost)
- with open(PATH_QUOTA_LOG, 'w', encoding='utf8') as f:
- json_dump(quota_log, f)
-
-
def download_thread() -> None:
"""Keep iterating through to_download for IDs, download their videos."""
while True:
self._send_http(headers=[('Location', '/')], code=302)
def _post_query(self, query_txt: QueryText) -> None:
+ conn = DatabaseConnection()
- def collect_results(now, query_txt: QueryText) -> list[VideoData]:
+ def collect_results(query_txt: QueryText) -> list[VideoData]:
youtube = googleapiclient.discovery.build('youtube', 'v3',
developerKey=API_KEY)
- update_quota_log(now, QUOTA_COST_YOUTUBE_SEARCH)
+ DbQuotaCost.update(conn, QUOTA_COST_YOUTUBE_SEARCH)
search_request = youtube.search().list(
q=query_txt,
part='snippet',
title=snippet['title'],
description=snippet['description'],
published_at=snippet['publishedAt'])]
- update_quota_log(now, QUOTA_COST_YOUTUBE_DETAILS)
+ DbQuotaCost.update(conn, QUOTA_COST_YOUTUBE_DETAILS)
ids_for_details = ','.join([r.id_ for r in results])
videos_request = youtube.videos().list(id=ids_for_details,
part='content_details')
result.definition = content_details['definition'].upper()
return results
- now = DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT))
- query_data = QueryData(None, query_txt, now)
- conn = DatabaseConnection()
+ query_data = QueryData(
+ None, query_txt,
+ DatetimeStr(datetime.now().strftime(TIMESTAMP_FMT)))
query_data.save(conn)
- for result in collect_results(now, query_txt):
+ for result in collect_results(query_txt):
result.save(conn)
assert query_data.id_ is not None
result.save_to_query(conn, query_data.id_)
{'query': query.text, 'videos': results})
def _send_queries_index_and_search(self) -> None:
- quota_count = QuotaCost(sum(read_quota_log().values()))
conn = DatabaseConnection()
+ quota_count = DbQuotaCost.current(conn)
queries_data = QueryData.get_all(conn)
conn.commit_close()
queries_data.sort(key=lambda q: q.retrieved_at, reverse=True)