-from http.server import BaseHTTPRequestHandler, HTTPServer
-import sys
import os
-import html
+import decimal
+from datetime import datetime, timedelta
from urllib.parse import parse_qs, urlparse
-hostName = "localhost"
-serverPort = 8082
+from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
+from plomlib import PlomDB, PlomException, run_server, PlomHandler
+db_path = '/home/plom/org/ledger2024.dat'
+server_port = 8082
+j2env = JinjaEnv(loader=JinjaFSLoader('ledger_templates'))
-class HandledException(Exception):
- pass
+class EditableException(PlomException):
+ def __init__(self, booking_index, *args, **kwargs):
+ self.booking_index = booking_index
+ super().__init__(*args, **kwargs)
+
+class LedgerTextLine:
-def handled_error_exit(msg):
- print(f"ERROR: {msg}")
- sys.exit(1)
+ def __init__(self, text_line):
+ self.text_line = text_line
+ self.comment = ''
+ split_by_comment = text_line.rstrip().split(sep=';', maxsplit=1)
+ self.non_comment = split_by_comment[0].rstrip()
+ self.empty = len(split_by_comment) == 1 and len(self.non_comment) == 0
+ if self.empty:
+ return
+ if len(split_by_comment) == 2:
+ self.comment = split_by_comment[1].lstrip()
+
+
+class Wealth:
+
+ def __init__(self):
+ self.money_dict = {}
+
+ def __iadd__(self, moneys):
+ money_dict = moneys
+ if type(moneys) == Wealth:
+ moneys = moneys.money_dict
+ for currency, amount in moneys.items():
+ if not currency in self.money_dict.keys():
+ self.money_dict[currency] = 0
+ self.money_dict[currency] += amount
+ return self
+
+ @property
+ def sink_empty(self):
+ return len(self.as_sink) == 0
+
+ @property
+ def as_sink(self):
+ sink = {}
+ for currency, amount in self.money_dict.items():
+ if 0 == amount:
+ continue
+ sink[currency] = -amount
+ return sink
+
+
+class Account:
+ def __init__(self, own_name, parent):
+ self.own_name = own_name
+ self.parent = parent
+ if self.parent:
+ self.parent.children += [self]
+ self.own_moneys = Wealth()
+ self.children = []
-def apply_booking_to_account_balances(account_sums, account, currency, amount):
- if not account in account_sums:
- account_sums[account] = {currency: amount}
- elif not currency in account_sums[account].keys():
- account_sums[account][currency] = amount
- else:
- account_sums[account][currency] += amount
+ def add_wealth(self, moneys):
+ self.own_moneys += moneys
+
+ @property
+ def full_name(self):
+ if self.parent and len(self.parent.own_name) > 0:
+ return f'{self.parent.full_name}:{self.own_name}'
+ else:
+ return self.own_name
+
+ @property
+ def full_moneys(self):
+ full_moneys = Wealth()
+ full_moneys += self.own_moneys
+ for child in self.children:
+ full_moneys += child.full_moneys
+ return full_moneys
-def parse_lines(lines):
- import datetime
- import decimal
+def parse_lines_to_bookings(lines, ignore_editable_exceptions=False):
+ lines = [LedgerTextLine(line) for line in lines]
+ lines += [LedgerTextLine('')] # to simulate ending of last booking
+ bookings = []
inside_booking = False
- date_string, description = None, None
booking_lines = []
- start_line = 0
- bookings = []
- comments = []
- lines += [''] # to ensure a booking-ending last line
+ booking_start_i = 0
+ last_date = ''
for i, line in enumerate(lines):
- prefix = f"line {i}"
- # we start with the case of an utterly empty line
- comments += [""]
- stripped_line = line.rstrip()
- if stripped_line == '':
+ intro = f'file line {i}'
+ if line.empty:
if inside_booking:
- # assume we finished a booking, finalize, and commit to DB
- if len(booking_lines) < 2:
- raise HandledException(f"{prefix} booking ends to early")
- booking = Booking(date_string, description, booking_lines, start_line)
+ booking = Booking(lines=booking_lines, starts_at=booking_start_i)
+ if last_date > booking.date and not ignore_editable_exceptions:
+ raise EditableException(len(bookings), f'{intro}: out-of-order date (follows {last_date})')
+ last_date = booking.date
bookings += [booking]
- # expect new booking to follow so re-zeroall booking data
- inside_booking = False
- date_string, description = None, None
- booking_lines = []
- continue
- # if non-empty line, first get comment if any, and commit to DB
- split_by_comment = stripped_line.split(sep=";", maxsplit=1)
- if len(split_by_comment) == 2:
- comments[i] = split_by_comment[1]
- # if pre-comment empty: if inside_booking, this must be a comment-only line so we keep it for later ledger-output to capture those comments; otherwise, no more to process for this line
- non_comment = split_by_comment[0].rstrip()
- if non_comment.rstrip() == '':
- if inside_booking:
- booking_lines += ['']
- continue
- # if we're starting a booking, parse by first-line pattern
- if not inside_booking:
- start_line = i
- toks = non_comment.split(maxsplit=1)
- date_string = toks[0]
- try:
- datetime.datetime.strptime(date_string, '%Y-%m-%d')
- except ValueError:
- raise HandledException(f"{prefix} bad date string: {date_string}")
- try:
- description = toks[1]
- except IndexError:
- raise HandledException(f"{prefix} bad description: {description}")
+ inside_booking =False
+ booking_lines = []
+ else:
+ if not inside_booking:
+ booking_start_i = i
inside_booking = True
- booking_lines += [non_comment]
- continue
- # otherwise, read as transfer data
- toks = non_comment.split() # ignore specification's allowance of single spaces in names
- if len(toks) > 3:
- raise HandledException(f"{prefix} too many booking line tokens: {toks}")
- amount, currency = None, None
- account_name = toks[0]
- if account_name[0] == '[' and account_name[-1] == ']':
- # ignore specification's differentiation of "virtual" accounts
- account_name = account_name[1:-1]
- decimal_chars = ".-0123456789"
- if len(toks) == 3:
- i_currency = 1
- try:
- amount = decimal.Decimal(toks[1])
- i_currency = 2
- except decimal.InvalidOperation:
+ booking_lines += [line]
+ if inside_booking:
+ raise PlomException(f'{intro}: last booking unfinished')
+ return bookings
+
+
+class TransferLine:
+
+ def __init__(self, line=None, account='', amount=None, currency='', comment='', validate=True):
+ self.account = account
+ self.amount = amount
+ self.currency = currency
+ self.comment = comment
+ if line:
+ if line.empty:
+ raise PlomException('line empty')
+ self.comment = line.comment
+ toks = line.non_comment.split()
+ if (len(toks) not in {1, 3}):
+ if validate:
+ raise PlomException(f'number of non-comment tokens not 1 or 3')
+ elif len(toks) == 2:
+ toks += ['']
+ else:
+ toks = 3*['']
+ self.account = toks[0]
+ if len(toks) != 1:
try:
- amount = decimal.Decimal(toks[2])
+ self.amount = decimal.Decimal(toks[1])
except decimal.InvalidOperation:
- raise HandledException(f"{prefix} no decimal number in: {toks[1:]}")
- currency = toks[i_currency]
- if currency[0] in decimal_chars:
- raise HandledException(f"{prefix} currency starts with int, dot, or minus: {currency}")
- elif len(toks) == 2:
- value = toks[1]
- inside_amount = False
- inside_currency = False
- amount_string = ""
- currency = ""
- dots_counted = 0
- for i, c in enumerate(value):
- if i == 0:
- if c in decimal_chars:
- inside_amount = True
- else:
- inside_currency = True
- if inside_currency:
- if c in decimal_chars and len(amount_string) == 0:
- inside_currency = False
- inside_amount = True
+ if validate:
+ raise PlomException(f'invalid token for Decimal: {toks[1]}')
else:
- currency += c
- continue
- if inside_amount:
- if c not in decimal_chars:
- if len(currency) > 0:
- raise HandledException(f"{prefix} amount has non-decimal chars: {value}")
- inside_currency = True
- inside_amount = False
- currency += c
- continue
- if c == '-' and len(amount_string) > 1:
- raise HandledException(f"{prefix} amount has non-start '-': {value}")
- if c == '.':
- if dots_counted > 1:
- raise HandledException(f"{prefix} amount has multiple dots: {value}")
- dots_counted += 1
- amount_string += c
- if len(amount_string) == 0:
- raise HandledException(f"{prefix} amount missing: {value}")
- if len(currency) == 0:
- raise HandledException(f"{prefix} currency missing: {value}")
- amount = decimal.Decimal(amount_string)
- booking_lines += [(account_name, amount, currency)]
- if inside_booking:
- raise HandledException(f"{prefix} last booking unfinished")
- return bookings, comments
+ self.comment = f'unparsed: {toks[1]}; {self.comment}'
+ self.currency = toks[2]
+
+ @property
+ def amount_fmt(self):
+ if self.amount is None:
+ return ''
+ elif self.amount.as_tuple().exponent < -2:
+ return f'{self.amount:.1f}…'
+ else:
+ return f'{self.amount:.2f}'
+
+ @property
+ def for_writing(self):
+ line = f' {self.account}'
+ if self.amount is not None:
+ line += f' {self.amount} {self.currency}'
+ if self.comment != '':
+ line += f' ; {self.comment}'
+ return line
+
+ @property
+ def comment_cols(self):
+ return max(20, len(self.comment))
+
class Booking:
- def __init__(self, date_string, description, booking_lines, start_line):
- self.date_string = date_string
- self.description = description
- self.lines = booking_lines
- self.start_line = start_line
- self.validate_booking_lines()
- self.account_changes = self.parse_booking_lines_to_account_changes()
-
- def validate_booking_lines(self):
- prefix = f"booking at line {self.start_line}"
- sums = {}
- empty_values = 0
- for line in self.lines[1:]:
- if line == '':
- continue
- _, amount, currency = line
- if amount is None:
- if empty_values > 0:
- raise HandledException(f"{prefix} relates more than one empty value of same currency {currency}")
- empty_values += 1
- continue
- if currency not in sums:
- sums[currency] = 0
- sums[currency] += amount
- if empty_values == 0:
- for k, v in sums.items():
- if v != 0:
- raise HandledException(f"{prefix} does not sum up to zero")
+ def __init__(self, lines=None, starts_at='?', date='', description='', top_comment='', transfer_lines=None, validate=True):
+ self.validate = validate
+ self.starts_at = starts_at
+ self.intro = f'booking starting at line {self.starts_at}'
+ self.clean()
+ if lines:
+ self.lines = lines
+ self.parse_lines()
else:
- sinkable = False
- for k, v in sums.items():
- if v != 0:
- sinkable = True
- if not sinkable:
- raise HandledException(f"{prefix} has empty value that cannot be filled")
-
- def parse_booking_lines_to_account_changes(self):
- account_changes = {}
- debt = {}
- sink_account = None
- for line in self.lines[1:]:
- if line == '':
- continue
- account, amount, currency = line
- if amount is None:
- sink_account = account
+ self.date = date
+ self.description = description
+ self.top_comment = top_comment
+ self.validate_head()
+ self.transfer_lines = transfer_lines if transfer_lines else []
+ if self.validate and len(self.transfer_lines) < 2:
+ raise PlomException(f'{self.intro}: too few transfer lines')
+ self.calculate_account_changes()
+ self.lines = [LedgerTextLine(l) for l in self.for_writing]
+
+ @classmethod
+ def from_postvars(cls, postvars, starts_at='?', validate=True):
+ date = postvars['date'][0]
+ description = postvars['description'][0]
+ top_comment = postvars['top_comment'][0]
+ transfer_lines = []
+ for i, account in enumerate(postvars['account']):
+ if len(account) == 0:
continue
- apply_booking_to_account_balances(account_changes, account, currency, amount)
- if currency not in debt:
- debt[currency] = amount
+ amount = None
+ if len(postvars['amount'][i]) > 0:
+ amount = decimal.Decimal(postvars['amount'][i])
+ transfer_lines += [TransferLine(None, account, amount, postvars['currency'][i], postvars['comment'][i])]
+ return cls(None, starts_at, date, description, top_comment, transfer_lines, validate=validate)
+
+ def clean(self):
+ self.transfer_lines = []
+ self.account_changes = {}
+
+ def parse_lines(self):
+ if len(self.lines) < 3 and self.validate:
+ raise PlomException(f'{self.intro}: ends with less than 3 lines:' + str(self.lines))
+ top_line = self.lines[0]
+ if top_line.empty and self.validate:
+ raise PlomException('{self.intro}: headline empty')
+ self.top_comment = top_line.comment
+ toks = top_line.non_comment.split(maxsplit=1)
+ if len(toks) < 2:
+ if self.validate:
+ raise PlomException(f'{self.intro}: headline missing elements: {non_comment}')
+ elif 0 == len(toks):
+ toks = 2*['']
else:
- debt[currency] += amount
- if sink_account:
- for currency, amount in debt.items():
- apply_booking_to_account_balances(account_changes, sink_account, currency, -amount)
- return account_changes
+ toks += ['']
+ self.date = toks[0]
+ self.description = toks[1]
+ self.validate_head()
+ for i, line in enumerate(self.lines[1:]):
+ try:
+ self.transfer_lines += [TransferLine(line, validate=self.validate)]
+ except PlomException as e:
+ raise PlomException(f'{self.intro}, transfer line {i}: {e}')
+ self.calculate_account_changes()
+ def calculate_account_changes(self):
+ sink_account = None
+ money_changes = Wealth()
+ for i, transfer_line in enumerate(self.transfer_lines):
+ intro = f'{self.intro}, transfer line {i}'
+ if transfer_line.amount is None:
+ if sink_account is not None and self.validate:
+ raise PlomException(f'{intro}: second sink found (only one allowed)')
+ sink_account = transfer_line.account
+ else:
+ if not transfer_line.account in self.account_changes.keys():
+ self.account_changes[transfer_line.account] = Wealth()
+ money = {transfer_line.currency: transfer_line.amount}
+ self.account_changes[transfer_line.account] += money
+ money_changes += money
+ if sink_account is None and (not money_changes.sink_empty) and self.validate:
+ raise PlomException(f'{intro}: does not balance (undeclared non-empty sink)')
+ if sink_account is not None:
+ if not sink_account in self.account_changes.keys():
+ self.account_changes[sink_account] = Wealth()
+ self.account_changes[sink_account] += money_changes.as_sink
+ @property
+ def for_writing(self):
+ lines = [f'{self.date} {self.description}']
+ if self.top_comment is not None and self.top_comment.rstrip() != '':
+ lines[0] += f' ; {self.top_comment}'
+ for line in self.transfer_lines:
+ lines += [line.for_writing]
+ return lines
-class Database:
+ @property
+ def comment_cols(self):
+ return max(20, len(self.top_comment))
- def __init__(self):
- db_name = "_ledger"
- self.db_file = db_name + ".json"
- self.lock_file = db_name+ ".lock"
+ def validate_head(self):
+ if not self.validate:
+ return
+ if len(self.date) == 0:
+ raise PlomException(f'{self.intro}: missing date')
+ if len(self.description) == 0:
+ raise PlomException(f'{self.intro}: missing description')
+ try:
+ datetime.strptime(self.date, '%Y-%m-%d')
+ except ValueError:
+ raise PlomException(f'{self.intro}: bad headline date format: {self.date}')
+
+ def fill_sink(self):
+ replacement_lines = []
+ for i, line in enumerate(self.transfer_lines):
+ if line.amount is None:
+ for currency, amount in self.account_changes[line.account].money_dict.items():
+ replacement_lines += [TransferLine(None, f'{line.account}', amount, currency).for_writing]
+ break
+ lines = self.lines[:i+1] + [LedgerTextLine(l) for l in replacement_lines] + self.lines[i+2:]
+ self.clean()
+ self.lines = lines
+ self.parse_lines()
+
+ def mirror(self):
+ new_transfer_lines = []
+ for transfer_line in self.transfer_lines:
+ uncommented_source = LedgerTextLine(transfer_line.for_writing).non_comment
+ comment = f're: {uncommented_source.lstrip()}'
+ new_account = '?'
+ new_transfer_lines += [TransferLine(None, new_account, -transfer_line.amount, transfer_line.currency, comment)]
+ for transfer_line in new_transfer_lines:
+ self.lines += [LedgerTextLine(transfer_line.for_writing)]
+ self.clean()
+ self.parse_lines()
+
+ def replace(self, replace_from, replace_to):
+ lines = []
+ for l in self.for_writing:
+ lines += [l.replace(replace_from, replace_to)]
+ self.lines = [LedgerTextLine(l) for l in lines]
+ self.clean()
+ self.parse_lines()
+
+ def add_taxes(self, db):
+ acc_kk_add = 'Reserves:KrankenkassenBeitragsWachstum'
+ acc_kk_minimum = 'Reserves:Monthly:KrankenkassenDefaultBeitrag'
+ acc_kk = 'Expenses:KrankenKasse'
+ acc_ESt = 'Reserves:EinkommensSteuer'
+ acc_assets = 'Assets'
+ acc_neuanfangspuffer_expenses = 'Reserves:NeuAnfangsPuffer:Ausgaben'
+ months_passed = datetime.strptime(self.date, '%Y-%m-%d').month - 1
+ past_kk_expenses = 0
+ past_kk_add = 0
+ past_neuanfangspuffer_expenses = 0
+ for b in db.bookings:
+ if b.date == self.date:
+ break
+ if acc_neuanfangspuffer_expenses in b.account_changes.keys():
+ past_neuanfangspuffer_expenses -= b.account_changes[acc_neuanfangspuffer_expenses].money_dict['€']
+ if acc_kk_add in b.account_changes.keys():
+ past_kk_add += b.account_changes[acc_kk_add].money_dict['€']
+ if acc_kk_minimum in b.account_changes.keys():
+ past_kk_expenses += b.account_changes[acc_kk_minimum].money_dict['€']
+
+ needed_netto = -self.account_changes['Assets'].money_dict['€']
+ past_taxed_needs_before_kk = past_neuanfangspuffer_expenses - past_kk_expenses
+ ESt_this_month = 0
+ E0 = decimal.Decimal(11604)
+ E1 = decimal.Decimal(17006)
+ E2 = decimal.Decimal(66761)
+ E3 = decimal.Decimal(277826)
+ taxed_income_before_kk = needed_netto
+ too_low = 0
+ too_high = 2 * needed_netto
+ while True:
+ estimate_for_remaining_year = (12 - months_passed) * taxed_income_before_kk
+ zvE = past_taxed_needs_before_kk + estimate_for_remaining_year
+ if zvE < E0:
+ ESt_year = decimal.Decimal(0)
+ elif zvE < E1:
+ y = (zvE - E0)/10000
+ ESt_year = (decimal.Decimal(922.98) * y + 1400) * y
+ elif zvE < E2:
+ y = (zvE - E1)/10000
+ ESt_year = (decimal.Decimal(181.19) * y + 2397) * y + decimal.Decimal(1025.38)
+ elif zvE < E3:
+ ESt_year = decimal.Decimal(0.42) * zvE - 10602.13
+ else:
+ ESt_year = decimal.Decimal(0.45) * zvE - 18936.88
+ ESt_this_month = ESt_year / 12
+ taxed_income_minus_ESt = taxed_income_before_kk - ESt_this_month
+ if abs(taxed_income_minus_ESt - needed_netto) < 0.001:
+ break
+ elif taxed_income_minus_ESt < needed_netto:
+ too_low = taxed_income_before_kk
+ elif taxed_income_minus_ESt > needed_netto:
+ too_high = taxed_income_before_kk
+ taxed_income_before_kk = too_low + (too_high - too_low)/2
+ ESt_this_month = ESt_this_month.quantize(decimal.Decimal('0.00'))
+ comment = f'estimated zvE: {past_taxed_needs_before_kk}€ + {estimate_for_remaining_year:.2f}€ = {zvE:.2f}€ → year ESt: {ESt_year:.2f} → needed taxed income before Krankenkasse: {taxed_income_before_kk:.2f}€'
+ self.transfer_lines += [TransferLine(None, acc_ESt, ESt_this_month, '€', comment)]
+
+ kk_factor = decimal.Decimal(1.197)
+ kk_minimum_income = 1178.33
+ kk_minimum_tax = decimal.Decimal(232.13).quantize(decimal.Decimal('0.00'))
+ if self.date < '2024-02-01':
+ kk_minimum_income = 1131.67
+ kk_minimum_tax = decimal.Decimal(222.94).quantize(decimal.Decimal('0.00'))
+ comment = f'assumed minimum income of {kk_minimum_income:.2f}€ * {kk_factor:.3f}'
+ self.transfer_lines += [TransferLine(None, acc_kk_minimum, kk_minimum_tax, '€', comment)]
+ kk_add = taxed_income_before_kk * kk_factor - taxed_income_before_kk - kk_minimum_tax
+ kk_add = decimal.Decimal(kk_add).quantize(decimal.Decimal('0.00'))
+ if past_kk_add + kk_add < 0: # *if* kk_add would actually kill all earlier kk_add …
+ kk_add = - past_kk_add # … shrink it so it won't push the kk_add total below 0
+ comment = f'local negative as large as possible without moving {acc_kk_add} below zero'
+ else:
+ comment = f'({taxed_income_before_kk:.2f}€ * {kk_factor:.3f}) - {taxed_income_before_kk:.2f} - {kk_minimum_tax}'
+ self.transfer_lines += [TransferLine(None, acc_kk_add, kk_add, '€', comment)]
+
+ diff_through_taxes_and_kk = ESt_this_month + kk_minimum_tax + kk_add
+ comment = f'{ESt_this_month} + {kk_minimum_tax} + {kk_add}'
+ self.transfer_lines += [TransferLine(None, acc_assets, -diff_through_taxes_and_kk, '€', comment)]
+ final_loss = diff_through_taxes_and_kk + needed_netto
+ comment = f'{needed_netto} + {diff_through_taxes_and_kk}'
+ self.transfer_lines += [TransferLine(None, acc_assets, final_loss, '€', comment)]
+ self.transfer_lines += [TransferLine(None, acc_neuanfangspuffer_expenses, -final_loss, '€')]
+
+
+class LedgerDB(PlomDB):
+
+ def __init__(self, prefix, ignore_editable_exceptions=False):
+ self.prefix = prefix
self.bookings = []
self.comments = []
- self.real_lines = []
- if os.path.exists(self.db_file):
- with open(self.db_file, "r") as f:
- self.real_lines += f.readlines()
- ret = parse_lines(self.real_lines)
- self.bookings += ret[0]
- self.comments += ret[1]
-
- def get_lines(self, start, end):
- return db.real_lines[start:end]
-
- def replace(self, start, end, lines):
- import shutil
- if os.path.exists(self.lock_file):
- raise HandledException('Sorry, lock file!')
- if os.path.exists(self.db_file):
- shutil.copy(self.db_file, self.db_file + ".bak")
- f = open(self.lock_file, 'w+')
- f.close()
- text = ''.join(self.real_lines[:start]) + '\n'.join(lines) + ''.join(self.real_lines[end:])
- with open(self.db_file, 'w') as f:
- f.write(text);
- os.remove(self.lock_file)
-
- def append(self, lines):
- import shutil
- if os.path.exists(self.lock_file):
- raise HandledException('Sorry, lock file!')
- if os.path.exists(self.db_file):
- shutil.copy(self.db_file, self.db_file + ".bak")
- f = open(self.lock_file, 'w+')
- f.close()
- with open(self.db_file, 'a') as f:
- f.write('\n' + '\n'.join(lines) + '\n');
- os.remove(self.lock_file)
-
-
-class MyServer(BaseHTTPRequestHandler):
- header = """<html>
-<meta charset="UTF-8">
-<body>
-<a href="/ledger">ledger</a>
-<a href="/balance">balance</a>
-<a href="/add_free">add free</a>
-<a href="/add_structured">add structured</a>
-<hr />
-"""
- footer = "</body>\n<html>"
+ self.text_lines = []
+ super().__init__(db_path)
+ self.bookings = parse_lines_to_bookings(self.text_lines, ignore_editable_exceptions)
+
+ def read_db_file(self, f):
+ self.text_lines += f.readlines()
+
+ def insert_booking_at_date(self, booking):
+ place_at = 0
+ if len(self.bookings) > 0:
+ for i, iterated_booking in enumerate(self.bookings):
+ if booking.date < iterated_booking.date:
+ break
+ elif booking.date == iterated_booking.date:
+ place_at = i
+ break
+ else:
+ place_at = i + 1
+ self.bookings.insert(place_at, booking)
+
+ def ledger_as_html(self):
+ for index, booking in enumerate(self.bookings):
+ booking.can_up = index > 0 and self.bookings[index - 1].date == booking.date
+ booking.can_down = index < len(self.bookings) - 1 and self.bookings[index + 1].date == booking.date
+ return j2env.get_template('ledger.html').render(bookings=self.bookings)
+
+ def balance_as_html(self, until_after=None):
+ bookings = self.bookings[:(until_after if until_after is None else int(until_after)+1)]
+ account_trunk = Account('', None)
+ accounts = {account_trunk.full_name: account_trunk}
+ for booking in bookings:
+ for full_account_name, moneys in booking.account_changes.items():
+ toks = full_account_name.split(':')
+ path = []
+ for tok in toks:
+ parent_name = ':'.join(path)
+ path += [tok]
+ account_name = ':'.join(path)
+ if not account_name in accounts.keys():
+ accounts[account_name] = Account(own_name=tok, parent=accounts[parent_name])
+ accounts[full_account_name].add_wealth(moneys)
+ class Node:
+ def __init__(self, indent, name, moneys):
+ self.indent = indent
+ self.name = name
+ self.moneys = moneys.money_dict
+ nodes = []
+ def walk_tree(nodes, indent, account):
+ nodes += [Node(indent, account.own_name, account.full_moneys)]
+ for child in account.children:
+ walk_tree(nodes, indent+1, child)
+ for acc in account_trunk.children:
+ walk_tree(nodes, 0, acc)
+ return j2env.get_template('balance.html').render(nodes=nodes)
+
+ def edit(self, index, sent=None, error_msg=None, edit_mode='table', copy=False):
+ accounts = set()
+ if sent or -1 == index:
+ content = sent if sent else ([] if 'textarea'==edit_mode else None)
+ else:
+ content = self.bookings[index]
+ date_today = str(datetime.now())[:10]
+ if copy:
+ content.date = date_today
+ elif -1 == index and (content is None or [] == content):
+ content = Booking(date=date_today, validate=False)
+ if 'textarea' == edit_mode and content:
+ content = content.for_writing
+ else:
+ for booking in self.bookings:
+ for transfer_line in booking.transfer_lines:
+ accounts.add(transfer_line.account)
+ return j2env.get_template('edit.html').render(content=content, index=index, error_msg=error_msg, edit_mode=edit_mode, accounts=accounts, adding=(copy or -1 == index))
+
+ def move_up(self, index):
+ return self.move(index, -1)
+
+ def move_down(self, index):
+ return self.move(index, +1)
+
+ def move(self, index, direction):
+ to_move = self.bookings[index]
+ swap_index = index + 1*(direction)
+ to_swap = self.bookings[swap_index]
+ self.bookings[index] = to_swap
+ self.bookings[index + 1*(direction)] = to_move
+ return swap_index
+
+ def write_db(self):
+ lines = []
+ for i, booking in enumerate(self.bookings):
+ if i > 0:
+ lines += ['']
+ lines += booking.for_writing
+ self.write_text_to_db('\n'.join(lines) + '\n')
+
+
+class LedgerHandler(PlomHandler):
+
+ def app_init(self, handler):
+ default_path = '/ledger'
+ handler.add_route('GET', default_path, self.forward_gets)
+ handler.add_route('POST', default_path, self.forward_posts)
+ return 'ledger', default_path
def do_POST(self):
+ self.try_do(self.forward_posts)
+
+ def forward_posts(self):
+ prefix = self.apps['ledger'] if hasattr(self, 'apps') else ''
length = int(self.headers['content-length'])
postvars = parse_qs(self.rfile.read(length).decode(), keep_blank_values=1)
+ db = LedgerDB(prefix, ignore_editable_exceptions=True)
+ index = 0
parsed_url = urlparse(self.path)
- if '/add_structured' == parsed_url.path:
- n_lines = int(len(postvars) / 4)
- date = postvars['date'][0]
- description = postvars['description'][0]
- start_comment = postvars['line_0_comment'][0]
- lines = [f'{date} {description} ; {start_comment}']
- for i in range(1, n_lines):
- account = postvars[f'line_{i}_account'][0]
- amount = postvars[f'line_{i}_amount'][0]
- currency = postvars[f'line_{i}_currency'][0]
- comment = postvars[f'line_{i}_comment'][0]
- new_main = f'{account} {amount} {currency}'
- if '' == new_main.rstrip() == comment.rstrip(): # don't write empty lines
- continue
- lines += [f'{new_main} ; {comment}']
- elif '/add_free' == parsed_url.path:
- lines = postvars['booking'][0].splitlines()
- start = int(postvars['start'][0])
- end = int(postvars['end'][0])
- try:
- _, _ = parse_lines(lines)
- if start == end == 0:
- db.append(lines)
+ for string in {'update', 'add', 'check', 'mirror', 'fill_sink', 'textarea', 'table', 'move_up', 'move_down', 'add_taxes', 'replace'}:
+ if string in postvars.keys():
+ submit_button = string
+ break
+ if f'{prefix}/ledger' == parsed_url.path and submit_button in {'move_up', 'move_down'}:
+ mover = getattr(db, submit_button)
+ index = mover(int(postvars[submit_button][0]))
+ elif prefix + '/edit' == parsed_url.path:
+ index = int(postvars['index'][0])
+ edit_mode = postvars['edit_mode'][0]
+ validate = submit_button in {'update', 'add', 'copy', 'check'}
+ starts_at = '?' if index == -1 else db.bookings[index].starts_at
+ if 'textarea' == edit_mode:
+ lines = [LedgerTextLine(line) for line in postvars['booking'][0].rstrip().split('\n')]
+ booking = Booking(lines, starts_at, validate=validate)
else:
- db.replace(start, end, lines)
- self.send_response(301)
- self.send_header('Location', '/')
- self.end_headers()
- except HandledException as e:
- self.send_response(400)
- self.end_headers()
- page = f'{self.header}ERROR: {e}{self.footer}'
- self.wfile.write(bytes(page, "utf-8"))
+ booking = Booking.from_postvars(postvars, starts_at, validate)
+ if submit_button in {'update', 'add'}:
+ if submit_button == 'update':
+ if 'textarea' == edit_mode and 'delete' == ''.join([l.text_line for l in lines]).strip():
+ del db.bookings[index]
+ # if not creating new Booking, and date unchanged, keep it in place
+ elif booking.date == db.bookings[index].date:
+ db.bookings[index] = booking
+ else:
+ del db.bookings[index]
+ db.insert_booking_at_date(booking)
+ else:
+ db.insert_booking_at_date(booking)
+ else: # non-DB-writing calls
+ error_msg = None
+ if 'check' == submit_button:
+ error_msg = 'All looks fine!'
+ elif submit_button in {'mirror', 'fill_sink', 'add_taxes'}:
+ if 'add_taxes' == submit_button:
+ booking.add_taxes(db)
+ else:
+ getattr(booking, submit_button)()
+ elif 'replace' == submit_button:
+ booking.replace(postvars['replace_from'][0], postvars['replace_to'][0])
+ elif submit_button in {'textarea', 'table'}:
+ edit_mode = submit_button
+ page = db.edit(index, booking, error_msg=error_msg, edit_mode=edit_mode)
+ self.send_HTML(page)
+ return
+ db.write_db()
+ index = index if index >= 0 else len(db.bookings) - 1
+ self.redirect(prefix + f'/ledger#{index}')
def do_GET(self):
- self.send_response(200)
- self.send_header("Content-type", "text/html")
- self.end_headers()
- db = Database()
+ self.try_do(self.forward_gets)
+
+ def forward_gets(self):
+ prefix = self.apps['ledger'] if hasattr(self, 'apps') else ''
+ try:
+ db = LedgerDB(prefix=prefix)
+ except EditableException as e:
+ # We catch the EditableException for further editing, and then
+ # re-run the DB initiation without it blocking DB creation.
+ db = LedgerDB(prefix=prefix, ignore_editable_exceptions=True)
+ page = db.edit(index=e.booking_index, error_msg=f'ERROR: {e}')
+ self.send_HTML(page)
+ return
parsed_url = urlparse(self.path)
- page = self.header + ''
params = parse_qs(parsed_url.query)
- start = int(params.get('start', ['0'])[0])
- end = int(params.get('end', ['0'])[0])
- if parsed_url.path == '/balance':
- page += self.balance_as_html(db)
- elif parsed_url.path == '/add_free':
- page += self.add_free(db, start, end)
- elif parsed_url.path == '/add_structured':
- bonus_lines = int(params.get('bonus_lines', ['0'])[0])
- page += self.add_structured(db, start, end)
+ if parsed_url.path == f'{prefix}/balance':
+ stop = params.get('until_after', [None])[0]
+ page = db.balance_as_html(stop)
+ elif parsed_url.path == f'{prefix}/edit':
+ index = params.get('i', [-1])[0]
+ copy = params.get('copy', [0])[0]
+ page = db.edit(int(index), copy=bool(copy))
else:
- page += self.ledger_as_html(db)
- page += self.footer
- self.wfile.write(bytes(page, "utf-8"))
-
- def balance_as_html(self, db):
- account_sums = {}
- for booking in db.bookings:
- for account, changes in booking.account_changes.items():
- for currency, amount in changes.items():
- apply_booking_to_account_balances(account_sums, account, currency, amount)
- account_tree = {}
- def collect_branches(account_name, path):
- node = account_tree
- path_copy = path[:]
- while len(path_copy) > 0:
- step = path_copy.pop(0)
- node = node[step]
- toks = account_name.split(":", maxsplit=1)
- parent = toks[0]
- if parent in node.keys():
- child = node[parent]
- else:
- child = {}
- node[parent] = child
- if len(toks) == 2:
- k, v = collect_branches(toks[1], path + [parent])
- if k not in child.keys():
- child[k] = v
- else:
- child[k].update(v)
- return parent, child
- for account_name in sorted(account_sums.keys()):
- k, v = collect_branches(account_name, [])
- if k not in account_tree.keys():
- account_tree[k] = v
- else:
- account_tree[k].update(v)
- def collect_totals(parent_path, tree_node):
- for k, v in tree_node.items():
- child_path = parent_path + ":" + k
- for currency, amount in collect_totals(child_path, v).items():
- apply_booking_to_account_balances(account_sums, parent_path, currency, amount)
- return account_sums[parent_path]
- for account_name in account_tree.keys():
- account_sums[account_name] = collect_totals(account_name, account_tree[account_name])
- lines = []
- def print_subtree(lines, indent, node, subtree, path):
- line = f"{indent}{node}"
- n_tabs = 5 - (len(line) // 8)
- line += n_tabs * "\t"
- if "€" in account_sums[path + node].keys():
- amount = account_sums[path + node]["€"]
- line += f"{amount:9.2f} €\t"
- else:
- line += f"\t\t"
- for currency, amount in account_sums[path + node].items():
- if currency != '€' and amount > 0:
- line += f"{amount:5.2f} {currency}\t"
- lines += [line]
- indent += " "
- for k, v in sorted(subtree.items()):
- print_subtree(lines, indent, k, v, path + node + ":")
- for k, v in sorted(account_tree.items()):
- print_subtree(lines, "", k, v, "")
- content = "\n".join(lines)
- return f"<pre>{content}</pre>"
-
- def ledger_as_html(self, db):
- lines = []
- line_sep = '<br />'
- for comment in db.comments:
- line = f'; {comment}' if comment != '' else ''
- lines += [line + line_sep]
- for booking in db.bookings:
- i = booking.start_line
- suffix = lines[i]
- lines[i] = f'<p>{booking.date_string} {booking.description}{suffix}'
- for booking_line in booking.lines[1:]:
- i += 1
- if booking_line == '':
- continue
- suffix = f' {lines[i]}' if len(lines[i]) > 0 else ''
- value = f' {booking_line[1]} {booking_line[2]}' if booking_line[1] else ''
- lines[i] = f'{booking_line[0]}{value}{suffix}'
- lines[i] = lines[i][:-len(line_sep)] + f"""</p>
-edit:
-<a href="/add_structured?start={booking.start_line}&end={i+1}">structured</a>
-/ <a href="/add_free?start={booking.start_line}&end={i+1}">free</a>
-<br />"""
- return '\n'.join(lines)
-
- def header_add_form(self, action):
- return f"<form method=\"POST\" action=\"/{action}\">\n"
-
- def footer_add_form(self, start, end):
- return f"""
-<input type="hidden" name="start" value={start} />
-<input type="hidden" name="end" value={end} />
-<input type="submit">
-</form>"""
-
- def add_free(self, db, start=0, end=0):
- content = html.escape(''.join(db.get_lines(start, end)))
- return f'{self.header_add_form("add_free")}<textarea name="booking" rows="8" cols="80">{content}</textarea>{self.footer_add_form(start, end)}'
-
- def add_structured(self, db, start=0, end=0, bonus_lines=10):
- lines = db.get_lines(start, end)
- bookings, comments = parse_lines(lines)
- if len(bookings) > 1:
- raise HandledException('can only edit single Booking')
- input_lines = ''
- last_line = 0
- def inpu(name, val=""):
- safe_val = html.escape(str(val))
- return f'<input name="{name}" value="{safe_val}" />'
- if len(bookings) == 0:
- input_lines += f'{inpu("date")} {inpu("description")} ; {inpu("comment")}<br />'
- last_line = 1
- else:
- booking = bookings[0]
- last_line = len(comments)
- input_lines += f'{inpu("date", booking.date_string)} {inpu("description", booking.description)} ; {inpu("comment", comments[0])}<br />'
- for i in range(1, len(comments)):
- account = amount = currency = ''
- if i < len(booking.lines) and booking.lines[i] != '':
- account = booking.lines[i][0]
- amount = booking.lines[i][1]
- currency = booking.lines[i][2]
- input_lines += f'{inpu("line_{i}_account", account)} {inpu("line_{i}_amount", amount)} {inpu("line_{i}_currency", currency)} ; {inpu("line_{i}_comment", comments[i])}<br />'
- for j in range(bonus_lines):
- i = j + last_line
- input_lines += f'{inpu("line_{i}_account")} {inpu("line_{i}_amount")} {inpu("line_{i}_currency")} ; {inpu("line_{i}_comment")}<br />'
- return f'{self.header_add_form("add_structured")}{input_lines}{self.footer_add_form(start, end)}'
-
-
-db = Database()
-if __name__ == "__main__":
- webServer = HTTPServer((hostName, serverPort), MyServer)
- print(f"Server started http://{hostName}:{serverPort}")
- try:
- webServer.serve_forever()
- except KeyboardInterrupt:
- pass
- webServer.server_close()
- print("Server stopped.")
+ page = db.ledger_as_html()
+ self.send_HTML(page)
+
+
+
+if __name__ == "__main__":
+ run_server(server_port, LedgerHandler)