- def parse_lines(self, lines):
- import datetime
- import decimal
- inside_booking = False
- date_string, description = None, None
- booking_lines = []
- start_line = 0
- for i, line in enumerate(lines):
- prefix = f"line {i}"
- # we start with the case of an utterly empty line
- self.comments += [""]
- stripped_line = line.rstrip()
- if stripped_line == '':
- if inside_booking:
- # assume we finished a booking, finalize, and commit to DB
- if len(booking_lines) < 2:
- raise HandledException(f"{prefix} booking ends to early")
- booking = Booking(date_string, description, booking_lines, start_line)
- self.bookings += [booking]
- # expect new booking to follow so re-zeroall booking data
- inside_booking = False
- date_string, description = None, None
- booking_lines = []
- continue
- # if non-empty line, first get comment if any, and commit to DB
- split_by_comment = stripped_line.split(sep=";", maxsplit=1)
- if len(split_by_comment) == 2:
- self.comments[i] = split_by_comment[1]
- # if pre-comment empty: if inside_booking, this must be a comment-only line so we keep it for later ledger-output to capture those comments; otherwise, no more to process for this line
- non_comment = split_by_comment[0].rstrip()
- if non_comment.rstrip() == '':
- if inside_booking:
- booking_lines += ['']
- continue
- # if we're starting a booking, parse by first-line pattern
- if not inside_booking:
- start_line = i
- toks = non_comment.split(maxsplit=1)
- date_string = toks[0]
- try:
- datetime.datetime.strptime(date_string, '%Y-%m-%d')
- except ValueError:
- raise HandledException(f"{prefix} bad date string: {date_string}")
- try:
- description = toks[1]
- except IndexError:
- raise HandledException(f"{prefix} bad description: {description}")
- inside_booking = True
- continue
- # otherwise, read as transfer data
- toks = non_comment.split() # ignore specification's allowance of single spaces in names
- if len(toks) > 3:
- raise HandledException(f"{prefix} too many booking line tokens: {toks}")
- amount, currency = None, None
- account_name = toks[0]
- if account_name[0] == '[' and account_name[-1] == ']':
- # ignore specification's differentiation of "virtual" accounts
- account_name = account_name[1:-1]
- decimal_chars = ".-0123456789"
- if len(toks) == 3:
- i_currency = 1
- try:
- amount = decimal.Decimal(toks[1])
- i_currency = 2
- except decimal.InvalidOperation:
- try:
- amount = decimal.Decimal(toks[2])
- except decimal.InvalidOperation:
- raise HandledException(f"{prefix} no decimal number in: {toks[1:]}")
- currency = toks[i_currency]
- if currency[0] in decimal_chars:
- raise HandledException(f"{prefix} currency starts with int, dot, or minus: {currency}")
- elif len(toks) == 2:
- value = toks[1]
- inside_amount = False
- inside_currency = False
- amount_string = ""
- currency = ""
- dots_counted = 0
- for i, c in enumerate(value):
- if i == 0:
- if c in decimal_chars:
- inside_amount = True
- else:
- inside_currency = True
- if inside_currency:
- if c in decimal_chars and len(amount_string) == 0:
- inside_currency = False
- inside_amount = True
- else:
- currency += c
- continue
- if inside_amount:
- if c not in decimal_chars:
- if len(currency) > 0:
- raise HandledException(f"{prefix} amount has non-decimal chars: {value}")
- inside_currency = True
- inside_amount = False
- currency += c
- continue
- if c == '-' and len(amount_string) > 1:
- raise HandledException(f"{prefix} amount has non-start '-': {value}")
- if c == '.':
- if dots_counted > 1:
- raise HandledException(f"{prefix} amount has multiple dots: {value}")
- dots_counted += 1
- amount_string += c
- if len(amount_string) == 0:
- raise HandledException(f"{prefix} amount missing: {value}")
- if len(currency) == 0:
- raise HandledException(f"{prefix} currency missing: {value}")
- amount = decimal.Decimal(amount_string)
- booking_lines += [(account_name, amount, currency)]
-
- def ledger_as_html(self):
+ def get_lines(self, start, end):
+ return self.real_lines[start:end]
+
+ def replace(self, start, end, lines):
+ import shutil
+ if os.path.exists(self.lock_file):
+ raise HandledException('Sorry, lock file!')
+ if os.path.exists(self.db_file):
+ shutil.copy(self.db_file, self.db_file + ".bak")
+ f = open(self.lock_file, 'w+')
+ f.close()
+ total_lines = self.real_lines[:start] + lines + self.real_lines[end:]
+ text = '\n'.join(total_lines)
+ with open(self.db_file, 'w') as f:
+ f.write(text);
+ os.remove(self.lock_file)
+
+ def append(self, lines):
+ import shutil
+ if os.path.exists(self.lock_file):
+ raise HandledException('Sorry, lock file!')
+ if os.path.exists(self.db_file):
+ shutil.copy(self.db_file, self.db_file + ".bak")
+ f = open(self.lock_file, 'w+')
+ f.close()
+ with open(self.db_file, 'a') as f:
+ f.write('\n\n' + '\n'.join(lines) + '\n\n');
+ os.remove(self.lock_file)
+
+ def add_taxes(self, lines, finish=False):
+ ret = []
+ bookings, _ = parse_lines(lines)
+ date = bookings[0].date_string
+ acc_kk_add = 'Reserves:KrankenkassenBeitragsWachstum'
+ acc_kk_minimum = 'Reserves:Month:KrankenkassenDefaultBeitrag'
+ acc_kk = 'Expenses:KrankenKasse'
+ acc_est = 'Reserves:Einkommenssteuer'
+ acc_assets = 'Assets'
+ acc_buffer = 'Reserves:NeuAnfangsPuffer:Ausgaben'
+ last_monthbreak_assets = 0
+ last_monthbreak_est = 0
+ last_monthbreak_kk_minimum = 0
+ last_monthbreak_kk_add = 0
+ buffer_expenses = 0
+ kk_expenses = 0
+ est_expenses = 0
+ for b in self.bookings:
+ if date == b.date_string:
+ break
+ acc_keys = b.account_changes.keys()
+ if acc_buffer in acc_keys:
+ buffer_expenses -= b.account_changes[acc_buffer]['€']
+ if acc_kk_add in acc_keys:
+ kk_expenses += b.account_changes[acc_kk_add]['€']
+ if acc_kk in acc_keys:
+ kk_expenses += b.account_changes[acc_kk]['€']
+ if acc_est in acc_keys:
+ est_expenses += b.account_changes[acc_est]['€']
+ if finish and acc_kk_add in acc_keys and acc_kk_minimum in acc_keys:
+ last_monthbreak_kk_add = b.account_changes[acc_kk_add]['€']
+ last_monthbreak_est = b.account_changes[acc_est]['€']
+ last_monthbreak_kk_minimum = b.account_changes[acc_kk_minimum]['€']
+ last_monthbreak_assets = b.account_changes[acc_buffer]['€']
+ old_needed_income = last_monthbreak_assets + last_monthbreak_kk_add + last_monthbreak_kk_minimum + last_monthbreak_est
+ if finish:
+ ret += [f' {acc_est} {-last_monthbreak_est}€ ; for old assumption of needed income: {old_needed_income}€']
+ _, account_sums = bookings_to_account_tree(bookings)
+ expenses_so_far = -1 * account_sums[acc_assets]['€'] - old_needed_income
+ needed_income_before_kk = expenses_so_far
+ ESt_this_month = 0
+ left_over = needed_income_before_kk - ESt_this_month
+ too_low = 0
+ too_high = 2 * needed_income_before_kk
+ E0 = decimal.Decimal(10908)
+ E1 = decimal.Decimal(15999)
+ E2 = decimal.Decimal(62809)
+ E3 = decimal.Decimal(277825)
+ months_passed = int(date[5:7]) - 1
+ while True:
+ zvE = buffer_expenses - kk_expenses + (12 - months_passed) * needed_income_before_kk
+ if finish:
+ zvE += last_monthbreak_assets + last_monthbreak_kk_add + last_monthbreak_kk_minimum
+ if zvE < E0:
+ ESt = decimal.Decimal(0)
+ elif zvE < E1:
+ y = (zvE - E0)/10000
+ ESt = (decimal.Decimal(979.18) * y + 1400) * y
+ elif zvE < E2:
+ y = (zvE - E1)/10000
+ ESt = (decimal.Decimal(192.59) * y + 2397) * y + decimal.Decimal(966.53)
+ elif zvE < E3:
+ ESt = decimal.Decimal(0.42) * (zvE - decimal.Decimal(62809)) + decimal.Decimal(16405.54)
+ else:
+ ESt = decimal.Decimal(0.45) * (zvE - decimal.Decimal(277825)) + decimal.Decimal(106713.52)
+ ESt_this_month = (ESt + last_monthbreak_est - est_expenses) / (12 - months_passed)
+ left_over = needed_income_before_kk - ESt_this_month
+ if abs(left_over - expenses_so_far) < 0.001:
+ break
+ elif left_over < expenses_so_far:
+ too_low = needed_income_before_kk
+ elif left_over > expenses_so_far:
+ too_high = needed_income_before_kk
+ needed_income_before_kk = too_low + (too_high - too_low)/2
+ ESt_this_month = ESt_this_month.quantize(decimal.Decimal('0.00'))
+ ret += [f' {acc_est} {ESt_this_month}€ ; expenses so far: {expenses_so_far:.2f}€; zvE: {zvE:.2f}€; ESt total: {ESt:.2f}€; needed before Krankenkasse: {needed_income_before_kk:.2f}€']
+ kk_minimum_income = 1131.67
+ if date < '2023-02-01':
+ kk_minimum_income = decimal.Decimal(1096.67)
+ kk_factor = decimal.Decimal(0.189)
+ kk_minimum_tax = decimal.Decimal(207.27).quantize(decimal.Decimal('0.00'))
+ elif date < '2023-08-01':
+ kk_factor = decimal.Decimal(0.191)
+ kk_minimum_tax = decimal.Decimal(216.15).quantize(decimal.Decimal('0.00'))
+ else:
+ kk_factor = decimal.Decimal(0.197)
+ kk_minimum_tax = decimal.Decimal(222.94).quantize(decimal.Decimal('0.00'))
+ kk_add = max(0, kk_factor * needed_income_before_kk - kk_minimum_tax)
+ kk_add = decimal.Decimal(kk_add).quantize(decimal.Decimal('0.00'))
+ if finish:
+ ret += [f' {acc_kk_add} {-last_monthbreak_kk_add}€ ; max(0, {kk_factor:.3f} * {-(old_needed_income - last_monthbreak_est):.2f}€ - {kk_minimum_tax}€)']
+ else:
+ ret += [f' {acc_kk_minimum} {kk_minimum_tax}€ ; assumed minimum income {kk_minimum_income:.2f}€ * {kk_factor:.3f}']
+ ret += [f' {acc_kk_add} {kk_add}€ ; max(0, {kk_factor:.3f} * {needed_income_before_kk:.2f}€ - {kk_minimum_tax}€)']
+ diff = - last_monthbreak_est + ESt_this_month - last_monthbreak_kk_add + kk_add
+ if not finish:
+ diff += kk_minimum_tax
+ final_minus = expenses_so_far + old_needed_income + diff
+ ret += [f' {acc_assets} {-diff} €']
+ ret += [f' {acc_assets} {final_minus} €']
+ ret += [f' {acc_buffer} {-final_minus} €']
+ return ret
+
+
+class MyServer(BaseHTTPRequestHandler):
+ header = """<html>
+<meta charset="UTF-8">
+<style>
+body { color: #000000; }
+table { margin-bottom: 2em; }
+th, td { text-align: left }
+input[type=number] { text-align: right; font-family: monospace; }
+.money { font-family: monospace; text-align: right; }
+.comment { font-style: italic; color: #777777; }
+.full_line_comment { display: block; white-space: nowrap; width: 0; }
+</style>
+<body>
+<a href="/ledger">ledger</a>
+<a href="/balance">balance</a>
+<a href="/add_free">add free</a>
+<a href="/add_structured">add structured</a>
+<hr />
+"""
+ footer = "</body>\n<html>"
+
+ def do_POST(self):
+ db = Database()
+ length = int(self.headers['content-length'])
+ postvars = parse_qs(self.rfile.read(length).decode(), keep_blank_values=1)
+ parsed_url = urlparse(self.path)