From: Christian Heller Date: Sun, 19 Nov 2023 01:06:55 +0000 (+0100) Subject: Improve/refactor income and ledger scripts with plomlib. X-Git-Url: https://plomlompom.com/repos/berlin_corona.txt?a=commitdiff_plain;h=5f3b9e7eb997cacea086decc3dcd04d6619b9121;p=misc Improve/refactor income and ledger scripts with plomlib. --- diff --git a/income_progress_bars.py b/income_progress_bars.py index 3edb48d..94a3456 100644 --- a/income_progress_bars.py +++ b/income_progress_bars.py @@ -1,10 +1,10 @@ -from http.server import BaseHTTPRequestHandler, HTTPServer +# from http.server import BaseHTTPRequestHandler, HTTPServer import os import json import jinja2 +from plomlib import PlomDB, PlomException, run_server, run_server, PlomServer -hostName = "localhost" -serverPort = 8081 +server_port = 8081 tmpl = jinja2.Template(""" @@ -106,42 +106,32 @@ table { """) -class Database: - timestamp_year = 0, - timestamp_month = 0, - timestamp_week = 0, - year_income = 0, - month_income = 0, - week_income = 0, - workday_hourly_rate_1 = 10, - workday_hourly_rate_2 = 25, - workday_hourly_rate_3 = 50, - workday_minutes_worked_1 = 0, - workday_minutes_worked_2 = 0, - workday_minutes_worked_3 = 0, - year_goal = 20000, - workdays_per_month = 16 +class IncomeDB(PlomDB): def __init__(self): - db_name = "_income" - self.db_file = db_name + ".json" - self.lock_file = db_name+ ".lock" - if os.path.exists(self.db_file): - with open(self.db_file, "r") as f: - d = json.load(f) - for k, v in d.items(): - if not hasattr(self, k): - raise Exception("bad key in db: " + k) - setattr(self, k, v) + # defaults + self.timestamp_year = 0, + self.timestamp_month = 0, + self.timestamp_week = 0, + self.year_income = 0, + self.month_income = 0, + self.week_income = 0, + self.workday_hourly_rate_1 = 10, + self.workday_hourly_rate_2 = 25, + self.workday_hourly_rate_3 = 50, + self.workday_minutes_worked_1 = 0, + self.workday_minutes_worked_2 = 0, + self.workday_minutes_worked_3 = 0, + self.year_goal = 20000, + self.workdays_per_month = 16 + super().__init__('_income') - def lock(self): - if os.path.exists(self.lock_file): - raise Exception('Sorry, lock file!') - f = open(self.lock_file, 'w+') - f.close() - - def unlock(self): - os.remove(self.lock_file) + def read_db_file(self, f): + d = json.load(f) + for k, v in d.items(): + if not hasattr(self, k): + raise PlomException("bad key in db: " + k) + setattr(self, k, v) def to_dict(self): keys = [k for k in dir(self) if (not k.startswith('_')) and (not callable(getattr(self, k)))] @@ -153,58 +143,6 @@ class Database: def write_db(self): self.write_text_to_db(json.dumps(self.to_dict())) - def backup(self): - import shutil - from datetime import datetime, timedelta - # collect modification times of numbered .bak files - bak_prefix = f'{self.db_file}.bak.' - backup_dates = [] - i = 0 - bak_as = f'{bak_prefix}{i}' - while os.path.exists(bak_as): - mod_time = os.path.getmtime(bak_as) - backup_dates += [str(datetime.fromtimestamp(mod_time))] - i += 1 - bak_as = f'{bak_prefix}{i}' - - # collect what numbered .bak files to save: the older, the fewer; for each - # timedelta, keep the newest file that's older - ages_to_keep = [timedelta(minutes=4**i) for i in range(0, 8)] - now = datetime.now() - to_save = [] - for age in ages_to_keep: - limit = now - age - for i, date in enumerate(reversed(backup_dates)): - if datetime.strptime(date, '%Y-%m-%d %H:%M:%S.%f') < limit: - unreversed_i = len(backup_dates) - i - 1 - if unreversed_i not in to_save: - to_save += [unreversed_i] - break - - # remove redundant backup files - j = 0 - for i in to_save: - if i != j: - source = f'{bak_prefix}{i}' - target = f'{bak_prefix}{j}' - shutil.move(source, target) - j += 1 - for i in range(j, len(backup_dates)): - try: - os.remove(f'{bak_prefix}{i}') - except FileNotFoundError: - pass - - # put copy of current state at end of bak list - shutil.copy(self.db_file, f'{bak_prefix}{j}') - - def write_text_to_db(self, text): - self.lock() - self.backup() - with open(self.db_file, 'w') as f: - f.write(text); - self.unlock() - class ProgressBar: def __init__(self, title, earned, goal, time_progress=-1): self.title = title @@ -222,112 +160,101 @@ class ProgressBar: if time_progress > 0: self.success = success_income / time_progress -class MyServer(BaseHTTPRequestHandler): +# class MyServer(BaseHTTPRequestHandler): +class IncomeServer(PlomServer): def do_POST(self): from urllib.parse import parse_qs - length = int(self.headers['content-length']) - postvars = parse_qs(self.rfile.read(length).decode(), keep_blank_values=1) - db = Database() - db.workday_minutes_worked_1 = int(postvars['workday_minutes_worked_1'][0]) - db.workday_minutes_worked_2 = int(postvars['workday_minutes_worked_2'][0]) - db.workday_minutes_worked_3 = int(postvars['workday_minutes_worked_3'][0]) - db.workday_hourly_rate_1 = int(postvars['workday_hourly_rate_1'][0]) - db.workday_hourly_rate_2 = int(postvars['workday_hourly_rate_2'][0]) - db.workday_hourly_rate_3 = int(postvars['workday_hourly_rate_3'][0]) - db.year_goal = int(postvars['year_goal'][0]) - db.workdays_per_month = int(postvars['workdays_per_month'][0]) - if 'finish' in postvars.keys(): - day_income = (db.workday_minutes_worked_1 / 60.0) * db.workday_hourly_rate_1 - day_income += (db.workday_minutes_worked_2 / 60.0) * db.workday_hourly_rate_2 - day_income += (db.workday_minutes_worked_3 / 60.0) * db.workday_hourly_rate_3 - db.year_income += day_income - db.month_income += day_income - db.week_income += day_income - db.workday_minutes_worked_1 = 0 - db.workday_minutes_worked_2 = 0 - db.workday_minutes_worked_3 = 0 - db.write_db() - self.send_response(302) - self.send_header('Location', '/') - self.end_headers() + try: + length = int(self.headers['content-length']) + postvars = parse_qs(self.rfile.read(length).decode(), keep_blank_values=1) + db = IncomeDB() + db.workday_minutes_worked_1 = int(postvars['workday_minutes_worked_1'][0]) + db.workday_minutes_worked_2 = int(postvars['workday_minutes_worked_2'][0]) + db.workday_minutes_worked_3 = int(postvars['workday_minutes_worked_3'][0]) + db.workday_hourly_rate_1 = int(postvars['workday_hourly_rate_1'][0]) + db.workday_hourly_rate_2 = int(postvars['workday_hourly_rate_2'][0]) + db.workday_hourly_rate_3 = int(postvars['workday_hourly_rate_3'][0]) + db.year_goal = int(postvars['year_goal'][0]) + db.workdays_per_month = int(postvars['workdays_per_month'][0]) + if 'finish' in postvars.keys(): + day_income = (db.workday_minutes_worked_1 / 60.0) * db.workday_hourly_rate_1 + day_income += (db.workday_minutes_worked_2 / 60.0) * db.workday_hourly_rate_2 + day_income += (db.workday_minutes_worked_3 / 60.0) * db.workday_hourly_rate_3 + db.year_income += day_income + db.month_income += day_income + db.week_income += day_income + db.workday_minutes_worked_1 = 0 + db.workday_minutes_worked_2 = 0 + db.workday_minutes_worked_3 = 0 + db.write_db() + self.send_response(302) + self.send_header('Location', '/') + self.end_headers() + except PlomException as e: + self.fail_400(e) def do_GET(self): import datetime import calendar - db = Database() #load_db() - today = datetime.datetime.now() - update_db = False - if today.year != db.timestamp_year: - db.timestamp_year = today.year - db.timestamp_month = today.month - db.year_income = 0 - db.month_income = 0 - update_db = True - if today.month != db.timestamp_month: - db.timestamp_month = today.month - db.month_income = 0 - update_db = True - if today.isocalendar()[1] != db.timestamp_week: - db.timestamp_week = today.isocalendar()[1] - db.week_income = 0 - update_db = True - if update_db: - print("Resetting timestamp") - db.write_db() - day_of_year = today.toordinal() - datetime.date(today.year, 1, 1).toordinal() + 1 - year_length = 365 + calendar.isleap(today.year) - workday_goal = db.year_goal / 12 / db.workdays_per_month - workdays_per_week = (db.workdays_per_month * 12) / (year_length / 7) - month_goal = db.year_goal / 12 - week_goal = db.year_goal / (year_length / 7) - day_income = (db.workday_minutes_worked_1 / 60.0) * db.workday_hourly_rate_1 - day_income += (db.workday_minutes_worked_2 / 60.0) * db.workday_hourly_rate_2 - day_income += (db.workday_minutes_worked_3 / 60.0) * db.workday_hourly_rate_3 - year_plus = db.year_income + day_income - month_plus = db.month_income + day_income - week_plus = db.week_income + day_income - progress_time_year = day_of_year / year_length - progress_time_month = today.day / calendar.monthrange(today.year, today.month)[1] - progress_time_week = today.weekday() / 7 - progress_bars = [ProgressBar("year", year_plus, db.year_goal, progress_time_year), - ProgressBar("month", month_plus, month_goal, progress_time_month), - ProgressBar("week", week_plus, week_goal, progress_time_week), - ProgressBar("workday", day_income, workday_goal)] - page = tmpl.render( - progress_bars = progress_bars, - workday_hourly_rate_1 = db.workday_hourly_rate_1, - workday_minutes_worked_1 = db.workday_minutes_worked_1, - workday_hourly_rate_2 = db.workday_hourly_rate_2, - workday_minutes_worked_2 = db.workday_minutes_worked_2, - workday_hourly_rate_3 = db.workday_hourly_rate_3, - workday_minutes_worked_3 = db.workday_minutes_worked_3, - year_goal = db.year_goal, - month_goal = month_goal, - week_goal = week_goal, - workdays_per_month = db.workdays_per_month, - workday_goal = workday_goal, - workdays_per_week = workdays_per_week, - ) - self.send_response(200) - self.send_header("Content-type", "text/html") - self.end_headers() - self.wfile.write(bytes(page, "utf-8")) - - def fail_on_lockfile(self): - if os.path.exists(lock_file): - self.send_response(400) - self.end_headers() - self.wfile.write(bytes("Sorry, lock file!", "utf-8")) - return True - return False + try: + db = IncomeDB() + today = datetime.datetime.now() + update_db = False + if today.year != db.timestamp_year: + db.timestamp_year = today.year + db.timestamp_month = today.month + db.year_income = 0 + db.month_income = 0 + update_db = True + if today.month != db.timestamp_month: + db.timestamp_month = today.month + db.month_income = 0 + update_db = True + if today.isocalendar()[1] != db.timestamp_week: + db.timestamp_week = today.isocalendar()[1] + db.week_income = 0 + update_db = True + if update_db: + print("Resetting timestamp") + db.write_db() + day_of_year = today.toordinal() - datetime.date(today.year, 1, 1).toordinal() + 1 + year_length = 365 + calendar.isleap(today.year) + workday_goal = db.year_goal / 12 / db.workdays_per_month + workdays_per_week = (db.workdays_per_month * 12) / (year_length / 7) + month_goal = db.year_goal / 12 + week_goal = db.year_goal / (year_length / 7) + day_income = (db.workday_minutes_worked_1 / 60.0) * db.workday_hourly_rate_1 + day_income += (db.workday_minutes_worked_2 / 60.0) * db.workday_hourly_rate_2 + day_income += (db.workday_minutes_worked_3 / 60.0) * db.workday_hourly_rate_3 + year_plus = db.year_income + day_income + month_plus = db.month_income + day_income + week_plus = db.week_income + day_income + progress_time_year = day_of_year / year_length + progress_time_month = today.day / calendar.monthrange(today.year, today.month)[1] + progress_time_week = today.weekday() / 7 + progress_bars = [ProgressBar("year", year_plus, db.year_goal, progress_time_year), + ProgressBar("month", month_plus, month_goal, progress_time_month), + ProgressBar("week", week_plus, week_goal, progress_time_week), + ProgressBar("workday", day_income, workday_goal)] + page = tmpl.render( + progress_bars = progress_bars, + workday_hourly_rate_1 = db.workday_hourly_rate_1, + workday_minutes_worked_1 = db.workday_minutes_worked_1, + workday_hourly_rate_2 = db.workday_hourly_rate_2, + workday_minutes_worked_2 = db.workday_minutes_worked_2, + workday_hourly_rate_3 = db.workday_hourly_rate_3, + workday_minutes_worked_3 = db.workday_minutes_worked_3, + year_goal = db.year_goal, + month_goal = month_goal, + week_goal = week_goal, + workdays_per_month = db.workdays_per_month, + workday_goal = workday_goal, + workdays_per_week = workdays_per_week, + ) + self.send_HTML(page) + except PlomException as e: + self.fail_400(e) if __name__ == "__main__": - webServer = HTTPServer((hostName, serverPort), MyServer) - print(f"Server started http://{hostName}:{serverPort}") - try: - webServer.serve_forever() - except KeyboardInterrupt: - pass - webServer.server_close() - print("Server stopped.") + run_server(server_port, IncomeServer) diff --git a/ledger.py b/ledger.py index c004d22..b4d9fd9 100755 --- a/ledger.py +++ b/ledger.py @@ -5,12 +5,9 @@ import jinja2 import decimal from datetime import datetime, timedelta from urllib.parse import parse_qs, urlparse -hostName = "localhost" -serverPort = 8082 +from plomlib import PlomDB, PlomException, run_server, run_server, PlomServer - -class HandledException(Exception): - pass +server_port = 8082 def apply_booking_to_account_balances(account_sums, account, currency, amount): @@ -84,7 +81,7 @@ def parse_lines(lines, validate_bookings=True): if inside_booking: # assume we finished a booking, finalize, and commit to DB if len(booking_lines) < 2: - raise HandledException(f"{prefix} booking ends to early") + raise PlomException(f"{prefix} booking ends to early") booking = Booking(date_string, description, booking_lines, start_line, validate_bookings) bookings += [booking] # expect new booking to follow so re-zeroall booking data @@ -110,21 +107,21 @@ def parse_lines(lines, validate_bookings=True): try: datetime.strptime(date_string, '%Y-%m-%d') except ValueError: - raise HandledException(f"{prefix} bad date string: {date_string}") + raise PlomException(f"{prefix} bad date string: {date_string}") if last_date > date_string: - raise HandledException(f"{prefix} out-of-order-date") + raise PlomException(f"{prefix} out-of-order-date") last_date = date_string try: description = toks[1] except IndexError: - raise HandledException(f"{prefix} bad description: {description}") + raise PlomException(f"{prefix} bad description: {description}") inside_booking = True booking_lines += [non_comment] continue # otherwise, read as transfer data toks = non_comment.split() # ignore specification's allowance of single spaces in names if len(toks) > 3: - raise HandledException(f"{prefix} too many booking line tokens: {toks}") + raise PlomException(f"{prefix} too many booking line tokens: {toks}") amount, currency = None, None account_name = toks[0] if account_name[0] == '[' and account_name[-1] == ']': @@ -140,10 +137,10 @@ def parse_lines(lines, validate_bookings=True): try: amount = decimal.Decimal(toks[2]) except decimal.InvalidOperation: - raise HandledException(f"{prefix} no decimal number in: {toks[1:]}") + raise PlomException(f"{prefix} no decimal number in: {toks[1:]}") currency = toks[i_currency] if currency[0] in decimal_chars: - raise HandledException(f"{prefix} currency starts with int, dot, or minus: {currency}") + raise PlomException(f"{prefix} currency starts with int, dot, or minus: {currency}") elif len(toks) == 2: value = toks[1] inside_amount = False @@ -167,25 +164,25 @@ def parse_lines(lines, validate_bookings=True): if inside_amount: if c not in decimal_chars: if len(currency) > 0: - raise HandledException(f"{prefix} amount has non-decimal chars: {value}") + raise PlomException(f"{prefix} amount has non-decimal chars: {value}") inside_currency = True inside_amount = False currency += c continue if c == '-' and len(amount_string) > 1: - raise HandledException(f"{prefix} amount has non-start '-': {value}") + raise PlomException(f"{prefix} amount has non-start '-': {value}") if c == '.': if dots_counted > 1: - raise HandledException(f"{prefix} amount has multiple dots: {value}") + raise PlomException(f"{prefix} amount has multiple dots: {value}") dots_counted += 1 amount_string += c if len(currency) == 0: - raise HandledException(f"{prefix} currency missing: {value}") + raise PlomException(f"{prefix} currency missing: {value}") if len(amount_string) > 0: amount = decimal.Decimal(amount_string) booking_lines += [(account_name, amount, currency)] if inside_booking: - raise HandledException(f"{prefix} last booking unfinished") + raise PlomException(f"{prefix} last booking unfinished") return bookings, comments @@ -211,7 +208,7 @@ class Booking: _, amount, currency = line if amount is None: if empty_values > 0: - raise HandledException(f"{prefix} relates more than one empty value of same currency {currency}") + raise PlomException(f"{prefix} relates more than one empty value of same currency {currency}") empty_values += 1 continue if currency not in sums: @@ -220,14 +217,14 @@ class Booking: if empty_values == 0: for k, v in sums.items(): if v != 0: - raise HandledException(f"{prefix} does not add up to zero / {k} {v}") + raise PlomException(f"{prefix} does not add up to zero / {k} {v}") else: sinkable = False for k, v in sums.items(): if v != 0: sinkable = True if not sinkable: - raise HandledException(f"{prefix} has empty value that cannot be filled") + raise PlomException(f"{prefix} has empty value that cannot be filled") def parse_booking_lines_to_account_changes(self): account_changes = {} @@ -253,76 +250,25 @@ class Booking: -class Database: +class LedgerDB(PlomDB): def __init__(self): - db_name = "_ledger" - self.db_file = db_name + ".json" - self.lock_file = db_name+ ".lock" self.bookings = [] self.comments = [] self.real_lines = [] - if os.path.exists(self.db_file): - with open(self.db_file, "r") as f: - self.real_lines += [l.rstrip() for l in f.readlines()] + super().__init__('_ledger') ret = parse_lines(self.real_lines) self.bookings += ret[0] self.comments += ret[1] + def read_db_file(self, f): + self.real_lines += [l.rstrip() for l in f.readlines()] + def get_lines(self, start, end): return self.real_lines[start:end] def write_db(self, text, mode='w'): - import shutil - if os.path.exists(self.lock_file): - raise HandledException('Sorry, lock file!') - f = open(self.lock_file, 'w+') - f.close() - - # collect modification times of numbered .bak files - bak_prefix = f'{self.db_file}.bak.' - backup_dates = [] - i = 0 - bak_as = f'{bak_prefix}{i}' - while os.path.exists(bak_as): - mod_time = os.path.getmtime(bak_as) - backup_dates += [str(datetime.fromtimestamp(mod_time))] - i += 1 - bak_as = f'{bak_prefix}{i}' - - # collect what numbered .bak files to save: the older, the fewer; for each - # timedelta, keep the newest file that's older - ages_to_keep = [timedelta(minutes=4**i) for i in range(0, 8)] - now = datetime.now() - to_save = [] - for age in ages_to_keep: - limit = now - age - for i, date in enumerate(reversed(backup_dates)): - if datetime.strptime(date, '%Y-%m-%d %H:%M:%S.%f') < limit: - unreversed_i = len(backup_dates) - i - 1 - if unreversed_i not in to_save: - to_save += [unreversed_i] - break - - # remove redundant backup files - j = 0 - for i in to_save: - if i != j: - source = f'{bak_prefix}{i}' - target = f'{bak_prefix}{j}' - shutil.move(source, target) - j += 1 - for i in range(j, len(backup_dates)): - try: - os.remove(f'{bak_prefix}{i}') - except FileNotFoundError: - pass - - # put copy of current state at end of bak list - shutil.copy(self.db_file, f'{bak_prefix}{j}') - with open(self.db_file, mode) as f: - f.write(text); - os.remove(self.lock_file) + self.write_text_to_db(text) def insert_at_date(self, lines, date): start_at = len(self.real_lines) @@ -486,7 +432,7 @@ class Database: return ret -class MyServer(BaseHTTPRequestHandler): +class LedgerServer(PlomServer): header = """