home · contact · privacy
Add basic ledger interpreter.
authorChristian Heller <c.heller@plomlompom.de>
Thu, 12 Oct 2023 22:53:33 +0000 (00:53 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Thu, 12 Oct 2023 22:53:33 +0000 (00:53 +0200)
ledger.py [new file with mode: 0755]

diff --git a/ledger.py b/ledger.py
new file mode 100755 (executable)
index 0000000..334318f
--- /dev/null
+++ b/ledger.py
@@ -0,0 +1,316 @@
+from http.server import BaseHTTPRequestHandler, HTTPServer
+import sys
+hostName = "localhost"
+serverPort = 8082
+
+
+class HandledException(Exception):
+    pass
+
+
+def handled_error_exit(msg):
+    print(f"ERROR: {msg}")
+    sys.exit(1)
+
+
+def apply_booking_to_account_balances(account_sums, account, currency, amount):
+    if not account in account_sums:
+        account_sums[account] = {currency: amount} 
+    elif not currency in account_sums[account].keys():
+        account_sums[account][currency] = amount
+    else:
+        account_sums[account][currency] += amount
+
+
+class Booking:
+
+    def __init__(self, date_string, description, booking_lines, start_line):
+        self.date_string = date_string
+        self.description = description 
+        self.lines = booking_lines 
+        self.start_line = start_line
+        self.validate_booking_lines()
+        self.account_changes = self.parse_booking_lines_to_account_changes()
+
+    def validate_booking_lines(self):
+        prefix = f"booking at line {self.start_line}"
+        sums = {}
+        empty_values = 0
+        for line in self.lines:
+            if line == '': 
+                continue
+            _, amount, currency = line
+            if amount is None:
+                if empty_values > 0:
+                    raise HandledException(f"{prefix} relates more than one empty value of same currency {currency}")
+                empty_values += 1
+                continue
+            if currency not in sums:
+                sums[currency] = 0
+            sums[currency] += amount
+        if empty_values == 0:
+            for k, v in sums.items():
+                if v != 0:
+                    raise HandledException(f"{prefix} does not sum up to zero")
+        else:
+            sinkable = False
+            for k, v in sums.items():
+                if v != 0:
+                    sinkable = True 
+            if not sinkable:
+                raise HandledException(f"{prefix} has empty value that cannot be filled")
+
+    def parse_booking_lines_to_account_changes(self):
+        account_changes = {}
+        debt = {}
+        sink_account = None
+        for line in self.lines:
+            if line == '': 
+                continue
+            account, amount, currency = line
+            if amount is None:
+                sink_account = account
+                continue
+            apply_booking_to_account_balances(account_changes, account, currency, amount)
+            if currency not in debt:
+                debt[currency] = amount
+            else:
+                debt[currency] += amount
+        if sink_account:
+            for currency, amount in debt.items():
+                apply_booking_to_account_balances(account_changes, sink_account, currency, -amount)
+        return account_changes
+
+
+
+class Database:
+
+    def __init__(self, load_from_file=True):
+        import os
+        db_name = "_ledger"
+        self.db_file = db_name + ".json"
+        self.lock_file = db_name+ ".lock"
+        self.bookings = []
+        self.comments = []
+        if load_from_file and os.path.exists(self.db_file):
+            with open(self.db_file, "r") as f:
+                self.parse_lines(f.readlines())
+
+    def parse_lines(self, lines): 
+        import datetime
+        import decimal 
+        inside_booking = False
+        date_string, description = None, None
+        booking_lines = []
+        start_line = 0
+        for i, line in enumerate(lines):
+            prefix = f"line {i}"
+            # we start with the case of an utterly empty line
+            self.comments += [""] 
+            stripped_line = line.rstrip()
+            if stripped_line == '':
+                if inside_booking:
+                    # assume we finished a booking, finalize, and commit to DB 
+                    if len(booking_lines) < 2:
+                        raise HandledException(f"{prefix} booking ends to early")
+                    booking = Booking(date_string, description, booking_lines, start_line)
+                    self.bookings += [booking]
+                # expect new booking to follow so re-zeroall booking data 
+                inside_booking = False
+                date_string, description = None, None
+                booking_lines = []
+                continue
+            # if non-empty line, first get comment if any, and commit to DB 
+            split_by_comment = stripped_line.split(sep=";", maxsplit=1)
+            if len(split_by_comment) == 2:
+                self.comments[i] = split_by_comment[1] 
+            # if pre-comment empty: if inside_booking, this must be a comment-only line so we keep it for later ledger-output to capture those comments; otherwise, no more to process for this line 
+            non_comment = split_by_comment[0].rstrip()
+            if non_comment.rstrip() == '': 
+                 if inside_booking: 
+                     booking_lines += ['']
+                 continue
+            # if we're starting a booking, parse by first-line pattern 
+            if not inside_booking:
+                start_line = i
+                toks = non_comment.split(maxsplit=1)
+                date_string = toks[0]
+                try:
+                    datetime.datetime.strptime(date_string, '%Y-%m-%d')
+                except ValueError:
+                    raise HandledException(f"{prefix} bad date string: {date_string}")
+                try:
+                    description = toks[1]
+                except IndexError:
+                    raise HandledException(f"{prefix} bad description: {description}")
+                inside_booking = True
+                continue
+            # otherwise, read as transfer data
+            toks = non_comment.split()  # ignore specification's allowance of single spaces in names
+            if len(toks) > 3:
+                raise HandledException(f"{prefix} too many booking line tokens: {toks}")
+            amount, currency = None, None
+            account_name = toks[0]
+            if account_name[0] == '[' and account_name[-1] == ']':
+                # ignore specification's differentiation of "virtual" accounts
+                account_name = account_name[1:-1]
+            decimal_chars = ".-0123456789"
+            if len(toks) == 3:
+                i_currency = 1 
+                try:
+                    amount = decimal.Decimal(toks[1])
+                    i_currency = 2
+                except decimal.InvalidOperation:
+                    try:
+                        amount = decimal.Decimal(toks[2])
+                    except decimal.InvalidOperation:
+                        raise HandledException(f"{prefix} no decimal number in: {toks[1:]}")
+                currency = toks[i_currency] 
+                if currency[0] in decimal_chars:
+                    raise HandledException(f"{prefix} currency starts with int, dot, or minus: {currency}")
+            elif len(toks) == 2:
+                value = toks[1]
+                inside_amount = False
+                inside_currency = False
+                amount_string = ""
+                currency = ""
+                dots_counted = 0
+                for i, c in enumerate(value):
+                    if i == 0:
+                        if c in decimal_chars: 
+                            inside_amount = True 
+                        else:
+                            inside_currency = True 
+                    if inside_currency:
+                        if c in decimal_chars and len(amount_string) == 0:
+                            inside_currency = False
+                            inside_amount = True 
+                        else:
+                            currency += c
+                            continue
+                    if inside_amount:
+                        if c not in decimal_chars:
+                            if len(currency) > 0:
+                                raise HandledException(f"{prefix} amount has non-decimal chars: {value}")
+                            inside_currency = True
+                            inside_amount = False 
+                            currency += c
+                            continue
+                        if c == '-' and len(amount_string) > 1:
+                            raise HandledException(f"{prefix} amount has non-start '-': {value}")
+                        if c == '.':
+                            if dots_counted > 1:
+                                raise HandledException(f"{prefix} amount has multiple dots: {value}")
+                            dots_counted += 1
+                        amount_string += c
+                if len(amount_string) == 0:
+                    raise HandledException(f"{prefix} amount missing: {value}")
+                if len(currency) == 0:
+                    raise HandledException(f"{prefix} currency missing: {value}")
+                amount = decimal.Decimal(amount_string)
+            booking_lines += [(account_name, amount, currency)]
+
+    def ledger_as_html(self):
+        lines = []
+        for comment in self.comments:
+            lines += [f"; {comment}" if comment != '' else '']
+        for booking in self.bookings:
+            i = booking.start_line
+            suffix = f" {lines[i]}" if len(lines[i]) > 0 else ""
+            lines[i] = f"{booking.date_string} {booking.description}{suffix}"
+            for booking_line in booking.lines:
+                i += 1
+                if booking_line == '':
+                    continue
+                suffix = f" {lines[i]}" if len(lines[i]) > 0 else ""
+                value = f" {booking_line[1]} {booking_line[2]}" if booking_line[1] else ""
+                lines[i] = f"{booking_line[0]}{value}{suffix}"
+        content = "\n".join(lines)
+        return f"<html><meta charset=\"UTF-8\"><pre>{content}</pre></html>"
+
+    def balance_as_html(self):
+        account_sums = {}
+        for booking in self.bookings:
+            for account, changes in booking.account_changes.items():
+                for currency, amount in changes.items():
+                    apply_booking_to_account_balances(account_sums, account, currency, amount)
+        account_tree = {}
+        def collect_branches(account_name, path):
+            node = account_tree 
+            path_copy = path[:]
+            while len(path_copy) > 0:
+                step = path_copy.pop(0)
+                node = node[step] 
+            toks = account_name.split(":", maxsplit=1)
+            parent = toks[0]
+            if parent in node.keys():
+                child = node[parent]
+            else:
+                child = {}
+                node[parent] = child 
+            if len(toks) == 2:
+                k, v = collect_branches(toks[1], path + [parent])
+                if k not in child.keys():
+                    child[k] = v
+                else:
+                    child[k].update(v)
+            return parent, child 
+        for account_name in sorted(account_sums.keys()):
+            k, v = collect_branches(account_name, [])
+            if k not in account_tree.keys():
+                account_tree[k] = v
+            else:
+                account_tree[k].update(v)
+        def collect_totals(parent_path, tree_node):
+            for k, v in tree_node.items():
+                child_path = parent_path + ":" + k
+                for currency, amount in collect_totals(child_path, v).items():
+                    apply_booking_to_account_balances(account_sums, parent_path, currency, amount) 
+            return account_sums[parent_path]
+        for account_name in account_tree.keys():
+            account_sums[account_name] = collect_totals(account_name, account_tree[account_name])
+        lines = []
+        def print_subtree(lines, indent, node, subtree, path):
+            line = f"{indent}{node}" 
+            n_tabs = 5 - (len(line) // 8)
+            line += n_tabs * "\t" 
+            if "€" in account_sums[path + node].keys():
+                amount = account_sums[path + node]["€"]
+                line += f"{amount:9.2f} €\t"
+            else:
+                line += f"\t\t"
+            for currency, amount in account_sums[path + node].items(): 
+                if currency != '€' and amount > 0:
+                    line += f"{amount:5.2f} {currency}\t"
+            lines += [line]
+            indent += "  "
+            for k, v in sorted(subtree.items()):
+                print_subtree(lines, indent, k, v, path + node + ":")
+        for k, v in sorted(account_tree.items()):
+            print_subtree(lines, "", k, v, "")
+        content = "\n".join(lines)
+        return f"<html><meta charset=\"UTF-8\"><pre>{content}</pre></html>"
+
+
+class MyServer(BaseHTTPRequestHandler):
+
+    def do_GET(self):
+        self.send_response(200)
+        self.send_header("Content-type", "text/html")
+        self.end_headers()
+        db = Database()
+        # page = db.ledger_as_html()
+        page = db.balance_as_html()
+        self.wfile.write(bytes(page, "utf-8"))
+
+
+db = Database()
+if __name__ == "__main__":        
+    webServer = HTTPServer((hostName, serverPort), MyServer)
+    print(f"Server started http://{hostName}:{serverPort}")
+    try:
+        webServer.serve_forever()
+    except KeyboardInterrupt:
+        pass
+    webServer.server_close()
+    print("Server stopped.")