From: Christian Heller Date: Thu, 12 Oct 2023 22:53:33 +0000 (+0200) Subject: Add basic ledger interpreter. X-Git-Url: https://plomlompom.com/repos/%7B%7Bdb.prefix%7D%7D/static/%7B%7B%20web_path%20%7D%7D/tasks?a=commitdiff_plain;h=c3b5860ae1ca76c2a1dc3b61d3c1242d581424f0;p=misc Add basic ledger interpreter. --- diff --git a/ledger.py b/ledger.py new file mode 100755 index 0000000..334318f --- /dev/null +++ b/ledger.py @@ -0,0 +1,316 @@ +from http.server import BaseHTTPRequestHandler, HTTPServer +import sys +hostName = "localhost" +serverPort = 8082 + + +class HandledException(Exception): + pass + + +def handled_error_exit(msg): + print(f"ERROR: {msg}") + sys.exit(1) + + +def apply_booking_to_account_balances(account_sums, account, currency, amount): + if not account in account_sums: + account_sums[account] = {currency: amount} + elif not currency in account_sums[account].keys(): + account_sums[account][currency] = amount + else: + account_sums[account][currency] += amount + + +class Booking: + + def __init__(self, date_string, description, booking_lines, start_line): + self.date_string = date_string + self.description = description + self.lines = booking_lines + self.start_line = start_line + self.validate_booking_lines() + self.account_changes = self.parse_booking_lines_to_account_changes() + + def validate_booking_lines(self): + prefix = f"booking at line {self.start_line}" + sums = {} + empty_values = 0 + for line in self.lines: + if line == '': + continue + _, amount, currency = line + if amount is None: + if empty_values > 0: + raise HandledException(f"{prefix} relates more than one empty value of same currency {currency}") + empty_values += 1 + continue + if currency not in sums: + sums[currency] = 0 + sums[currency] += amount + if empty_values == 0: + for k, v in sums.items(): + if v != 0: + raise HandledException(f"{prefix} does not sum up to zero") + else: + sinkable = False + for k, v in sums.items(): + if v != 0: + sinkable = True + if not sinkable: + raise HandledException(f"{prefix} has empty value that cannot be filled") + + def parse_booking_lines_to_account_changes(self): + account_changes = {} + debt = {} + sink_account = None + for line in self.lines: + if line == '': + continue + account, amount, currency = line + if amount is None: + sink_account = account + continue + apply_booking_to_account_balances(account_changes, account, currency, amount) + if currency not in debt: + debt[currency] = amount + else: + debt[currency] += amount + if sink_account: + for currency, amount in debt.items(): + apply_booking_to_account_balances(account_changes, sink_account, currency, -amount) + return account_changes + + + +class Database: + + def __init__(self, load_from_file=True): + import os + db_name = "_ledger" + self.db_file = db_name + ".json" + self.lock_file = db_name+ ".lock" + self.bookings = [] + self.comments = [] + if load_from_file and os.path.exists(self.db_file): + with open(self.db_file, "r") as f: + self.parse_lines(f.readlines()) + + def parse_lines(self, lines): + import datetime + import decimal + inside_booking = False + date_string, description = None, None + booking_lines = [] + start_line = 0 + for i, line in enumerate(lines): + prefix = f"line {i}" + # we start with the case of an utterly empty line + self.comments += [""] + stripped_line = line.rstrip() + if stripped_line == '': + if inside_booking: + # assume we finished a booking, finalize, and commit to DB + if len(booking_lines) < 2: + raise HandledException(f"{prefix} booking ends to early") + booking = Booking(date_string, description, booking_lines, start_line) + self.bookings += [booking] + # expect new booking to follow so re-zeroall booking data + inside_booking = False + date_string, description = None, None + booking_lines = [] + continue + # if non-empty line, first get comment if any, and commit to DB + split_by_comment = stripped_line.split(sep=";", maxsplit=1) + if len(split_by_comment) == 2: + self.comments[i] = split_by_comment[1] + # if pre-comment empty: if inside_booking, this must be a comment-only line so we keep it for later ledger-output to capture those comments; otherwise, no more to process for this line + non_comment = split_by_comment[0].rstrip() + if non_comment.rstrip() == '': + if inside_booking: + booking_lines += [''] + continue + # if we're starting a booking, parse by first-line pattern + if not inside_booking: + start_line = i + toks = non_comment.split(maxsplit=1) + date_string = toks[0] + try: + datetime.datetime.strptime(date_string, '%Y-%m-%d') + except ValueError: + raise HandledException(f"{prefix} bad date string: {date_string}") + try: + description = toks[1] + except IndexError: + raise HandledException(f"{prefix} bad description: {description}") + inside_booking = True + continue + # otherwise, read as transfer data + toks = non_comment.split() # ignore specification's allowance of single spaces in names + if len(toks) > 3: + raise HandledException(f"{prefix} too many booking line tokens: {toks}") + amount, currency = None, None + account_name = toks[0] + if account_name[0] == '[' and account_name[-1] == ']': + # ignore specification's differentiation of "virtual" accounts + account_name = account_name[1:-1] + decimal_chars = ".-0123456789" + if len(toks) == 3: + i_currency = 1 + try: + amount = decimal.Decimal(toks[1]) + i_currency = 2 + except decimal.InvalidOperation: + try: + amount = decimal.Decimal(toks[2]) + except decimal.InvalidOperation: + raise HandledException(f"{prefix} no decimal number in: {toks[1:]}") + currency = toks[i_currency] + if currency[0] in decimal_chars: + raise HandledException(f"{prefix} currency starts with int, dot, or minus: {currency}") + elif len(toks) == 2: + value = toks[1] + inside_amount = False + inside_currency = False + amount_string = "" + currency = "" + dots_counted = 0 + for i, c in enumerate(value): + if i == 0: + if c in decimal_chars: + inside_amount = True + else: + inside_currency = True + if inside_currency: + if c in decimal_chars and len(amount_string) == 0: + inside_currency = False + inside_amount = True + else: + currency += c + continue + if inside_amount: + if c not in decimal_chars: + if len(currency) > 0: + raise HandledException(f"{prefix} amount has non-decimal chars: {value}") + inside_currency = True + inside_amount = False + currency += c + continue + if c == '-' and len(amount_string) > 1: + raise HandledException(f"{prefix} amount has non-start '-': {value}") + if c == '.': + if dots_counted > 1: + raise HandledException(f"{prefix} amount has multiple dots: {value}") + dots_counted += 1 + amount_string += c + if len(amount_string) == 0: + raise HandledException(f"{prefix} amount missing: {value}") + if len(currency) == 0: + raise HandledException(f"{prefix} currency missing: {value}") + amount = decimal.Decimal(amount_string) + booking_lines += [(account_name, amount, currency)] + + def ledger_as_html(self): + lines = [] + for comment in self.comments: + lines += [f"; {comment}" if comment != '' else ''] + for booking in self.bookings: + i = booking.start_line + suffix = f" {lines[i]}" if len(lines[i]) > 0 else "" + lines[i] = f"{booking.date_string} {booking.description}{suffix}" + for booking_line in booking.lines: + i += 1 + if booking_line == '': + continue + suffix = f" {lines[i]}" if len(lines[i]) > 0 else "" + value = f" {booking_line[1]} {booking_line[2]}" if booking_line[1] else "" + lines[i] = f"{booking_line[0]}{value}{suffix}" + content = "\n".join(lines) + return f"
{content}
" + + def balance_as_html(self): + account_sums = {} + for booking in self.bookings: + for account, changes in booking.account_changes.items(): + for currency, amount in changes.items(): + apply_booking_to_account_balances(account_sums, account, currency, amount) + account_tree = {} + def collect_branches(account_name, path): + node = account_tree + path_copy = path[:] + while len(path_copy) > 0: + step = path_copy.pop(0) + node = node[step] + toks = account_name.split(":", maxsplit=1) + parent = toks[0] + if parent in node.keys(): + child = node[parent] + else: + child = {} + node[parent] = child + if len(toks) == 2: + k, v = collect_branches(toks[1], path + [parent]) + if k not in child.keys(): + child[k] = v + else: + child[k].update(v) + return parent, child + for account_name in sorted(account_sums.keys()): + k, v = collect_branches(account_name, []) + if k not in account_tree.keys(): + account_tree[k] = v + else: + account_tree[k].update(v) + def collect_totals(parent_path, tree_node): + for k, v in tree_node.items(): + child_path = parent_path + ":" + k + for currency, amount in collect_totals(child_path, v).items(): + apply_booking_to_account_balances(account_sums, parent_path, currency, amount) + return account_sums[parent_path] + for account_name in account_tree.keys(): + account_sums[account_name] = collect_totals(account_name, account_tree[account_name]) + lines = [] + def print_subtree(lines, indent, node, subtree, path): + line = f"{indent}{node}" + n_tabs = 5 - (len(line) // 8) + line += n_tabs * "\t" + if "€" in account_sums[path + node].keys(): + amount = account_sums[path + node]["€"] + line += f"{amount:9.2f} €\t" + else: + line += f"\t\t" + for currency, amount in account_sums[path + node].items(): + if currency != '€' and amount > 0: + line += f"{amount:5.2f} {currency}\t" + lines += [line] + indent += " " + for k, v in sorted(subtree.items()): + print_subtree(lines, indent, k, v, path + node + ":") + for k, v in sorted(account_tree.items()): + print_subtree(lines, "", k, v, "") + content = "\n".join(lines) + return f"
{content}
" + + +class MyServer(BaseHTTPRequestHandler): + + def do_GET(self): + self.send_response(200) + self.send_header("Content-type", "text/html") + self.end_headers() + db = Database() + # page = db.ledger_as_html() + page = db.balance_as_html() + self.wfile.write(bytes(page, "utf-8")) + + +db = Database() +if __name__ == "__main__": + webServer = HTTPServer((hostName, serverPort), MyServer) + print(f"Server started http://{hostName}:{serverPort}") + try: + webServer.serve_forever() + except KeyboardInterrupt: + pass + webServer.server_close() + print("Server stopped.")