home · contact · privacy
Improve ledger.py.
[misc] / ledger.py
1 from http.server import BaseHTTPRequestHandler, HTTPServer
2 import sys
3 import os
4 import html
5 from urllib.parse import parse_qs, urlparse
6 hostName = "localhost"
7 serverPort = 8082
8
9
10 class HandledException(Exception):
11     pass
12
13
14 def handled_error_exit(msg):
15     print(f"ERROR: {msg}")
16     sys.exit(1)
17
18
19 def apply_booking_to_account_balances(account_sums, account, currency, amount):
20     if not account in account_sums:
21         account_sums[account] = {currency: amount}
22     elif not currency in account_sums[account].keys():
23         account_sums[account][currency] = amount
24     else:
25         account_sums[account][currency] += amount
26
27
28 def add_taxes(lines):
29     import decimal
30     bookings, _ = parse_lines(lines.copy())
31     _, account_sums = bookings_to_account_tree(bookings)
32     expenses_so_far = -1 * account_sums['Assets']['€']
33     needed_income_before_krankenkasse = expenses_so_far
34     ESt_this_month = 0
35     left_over = needed_income_before_krankenkasse - ESt_this_month
36     too_low = 0
37     too_high = 2 * needed_income_before_krankenkasse 
38     E0 = 10908
39     E1 = 15999 
40     E2 = 62809 
41     E3 = 277825 
42     while True:
43         zvE = 12 * needed_income_before_krankenkasse
44         if zvE < E0:
45             ESt = decimal.Decimal(0)
46         elif zvE < E1:
47             y = (zvE - E0)/10000
48             ESt = (decimal.Decimal(979.18) * y + 1400) * y
49         elif zvE < E2:
50             y = (zvE - E1)/10000
51             ESt = (decimal.Decimal(192.59) * y + 2397) * y + decimal.Decimal(966.53)
52         elif zvE < E3:
53             ESt = decimal.Decimal(0.42) * (zvE - decimal.Decimal(62809))  + decimal.Decimal(16405.54)
54         else: 
55             ESt = decimal.Decimal(0.45) * (zvE - decimal.Decimal(277825)) + decimal.Decimal(106713.52) 
56         ESt_this_month = ESt / 12
57         left_over = needed_income_before_krankenkasse - ESt_this_month
58         if abs(left_over - expenses_so_far) < 0.1:
59             break
60         elif left_over < expenses_so_far:
61             too_low = needed_income_before_krankenkasse
62         elif left_over > expenses_so_far:
63             too_high = needed_income_before_krankenkasse
64         needed_income_before_krankenkasse = too_low + (too_high - too_low)/2
65     line_income_tax = f'  Reserves:Einkommenssteuer  {ESt_this_month:.2f}€ ; expenses so far: {expenses_so_far:.2f}€; zvE: {zvE:.2f}€; ESt total: {ESt:.2f}€; needed before Krankenkasse: {needed_income_before_krankenkasse:.2f}€'
66     kk_minimum_income = 1096.67 
67     kk_factor = decimal.Decimal(0.189) 
68     kk_minimum_tax = decimal.Decimal(207.27)
69     # kk_minimum_income = 1131.67 
70     # kk_factor = decimal.Decimal(0.191) 
71     # kk_minimum_tax = decimal.Decimal(216.15)
72     # kk_factor = decimal.Decimal(0.197) 
73     # kk_minimum_tax = decimal.Decimal(222.94)
74     kk_add = max(0, kk_factor * needed_income_before_krankenkasse - kk_minimum_tax)
75     line_kk_minimum = f'  Reserves:Month:Krankenkassendefaultbeitrag  {kk_minimum_tax:.2f}€  ; assed minimum income {kk_minimum_income:.2f}€ * {kk_factor:.3f}'
76     line_kk_add = f'  Reserves:Month:Krankenkassenbeitragswachstum {kk_add:.2f}€  ; max(0, {kk_factor:.3f} * {needed_income_before_krankenkasse:.2f}€ - {kk_minimum_tax:.2f}€)'
77     final_minus = expenses_so_far + ESt_this_month + kk_minimum_tax + kk_add 
78     line_finish = f'  Assets  -{final_minus:.2f}€'
79     return [line_income_tax, line_kk_minimum, line_kk_add, line_finish]
80
81
82 def bookings_to_account_tree(bookings):
83     account_sums = {}
84     for booking in bookings:
85         for account, changes in booking.account_changes.items():
86             for currency, amount in changes.items():
87                 apply_booking_to_account_balances(account_sums, account, currency, amount)
88     account_tree = {}
89     def collect_branches(account_name, path):
90         node = account_tree
91         path_copy = path[:]
92         while len(path_copy) > 0:
93             step = path_copy.pop(0)
94             node = node[step]
95         toks = account_name.split(":", maxsplit=1)
96         parent = toks[0]
97         if parent in node.keys():
98             child = node[parent]
99         else:
100             child = {}
101             node[parent] = child
102         if len(toks) == 2:
103             k, v = collect_branches(toks[1], path + [parent])
104             if k not in child.keys():
105                 child[k] = v
106             else:
107                 child[k].update(v)
108         return parent, child
109     for account_name in sorted(account_sums.keys()):
110         k, v = collect_branches(account_name, [])
111         if k not in account_tree.keys():
112             account_tree[k] = v
113         else:
114             account_tree[k].update(v)
115     def collect_totals(parent_path, tree_node):
116         for k, v in tree_node.items():
117             child_path = parent_path + ":" + k
118             for currency, amount in collect_totals(child_path, v).items():
119                 apply_booking_to_account_balances(account_sums, parent_path, currency, amount)
120         return account_sums[parent_path]
121     for account_name in account_tree.keys():
122         account_sums[account_name] = collect_totals(account_name, account_tree[account_name])
123     return account_tree, account_sums
124
125
126 def parse_lines(lines):
127     import datetime
128     import decimal
129     inside_booking = False
130     date_string, description = None, None
131     booking_lines = []
132     start_line = 0
133     bookings = []
134     comments = []
135     lines += [''] # to ensure a booking-ending last line
136     for i, line in enumerate(lines):
137         prefix = f"line {i}"
138         # we start with the case of an utterly empty line
139         comments += [""]
140         stripped_line = line.rstrip()
141         if stripped_line == '':
142             if inside_booking:
143                 # assume we finished a booking, finalize, and commit to DB
144                 if len(booking_lines) < 2:
145                     raise HandledException(f"{prefix} booking ends to early")
146                 booking = Booking(date_string, description, booking_lines, start_line)
147                 bookings += [booking]
148             # expect new booking to follow so re-zeroall booking data
149             inside_booking = False
150             date_string, description = None, None
151             booking_lines = []
152             continue
153         # if non-empty line, first get comment if any, and commit to DB
154         split_by_comment = stripped_line.split(sep=";", maxsplit=1)
155         if len(split_by_comment) == 2:
156             comments[i] = split_by_comment[1].lstrip()
157         # if pre-comment empty: if inside_booking, this must be a comment-only line so we keep it for later ledger-output to capture those comments; otherwise, no more to process for this line
158         non_comment = split_by_comment[0].rstrip()
159         if non_comment.rstrip() == '':
160              if inside_booking:
161                  booking_lines += ['']
162              continue
163         # if we're starting a booking, parse by first-line pattern
164         if not inside_booking:
165             start_line = i
166             toks = non_comment.split(maxsplit=1)
167             date_string = toks[0]
168             try:
169                 datetime.datetime.strptime(date_string, '%Y-%m-%d')
170             except ValueError:
171                 raise HandledException(f"{prefix} bad date string: {date_string}")
172             try:
173                 description = toks[1]
174             except IndexError:
175                 raise HandledException(f"{prefix} bad description: {description}")
176             inside_booking = True
177             booking_lines += [non_comment]
178             continue
179         # otherwise, read as transfer data
180         toks = non_comment.split()  # ignore specification's allowance of single spaces in names
181         if len(toks) > 3:
182             raise HandledException(f"{prefix} too many booking line tokens: {toks}")
183         amount, currency = None, None
184         account_name = toks[0]
185         if account_name[0] == '[' and account_name[-1] == ']':
186             # ignore specification's differentiation of "virtual" accounts
187             account_name = account_name[1:-1]
188         decimal_chars = ".-0123456789"
189         if len(toks) == 3:
190             i_currency = 1
191             try:
192                 amount = decimal.Decimal(toks[1])
193                 i_currency = 2
194             except decimal.InvalidOperation:
195                 try:
196                     amount = decimal.Decimal(toks[2])
197                 except decimal.InvalidOperation:
198                     raise HandledException(f"{prefix} no decimal number in: {toks[1:]}")
199             currency = toks[i_currency]
200             if currency[0] in decimal_chars:
201                 raise HandledException(f"{prefix} currency starts with int, dot, or minus: {currency}")
202         elif len(toks) == 2:
203             value = toks[1]
204             inside_amount = False
205             inside_currency = False
206             amount_string = ""
207             currency = ""
208             dots_counted = 0
209             for i, c in enumerate(value):
210                 if i == 0:
211                     if c in decimal_chars:
212                         inside_amount = True
213                     else:
214                         inside_currency = True
215                 if inside_currency:
216                     if c in decimal_chars and len(amount_string) == 0:
217                         inside_currency = False
218                         inside_amount = True
219                     else:
220                         currency += c
221                         continue
222                 if inside_amount:
223                     if c not in decimal_chars:
224                         if len(currency) > 0:
225                             raise HandledException(f"{prefix} amount has non-decimal chars: {value}")
226                         inside_currency = True
227                         inside_amount = False
228                         currency += c
229                         continue
230                     if c == '-' and len(amount_string) > 1:
231                         raise HandledException(f"{prefix} amount has non-start '-': {value}")
232                     if c == '.':
233                         if dots_counted > 1:
234                             raise HandledException(f"{prefix} amount has multiple dots: {value}")
235                         dots_counted += 1
236                     amount_string += c
237             if len(amount_string) == 0:
238                 raise HandledException(f"{prefix} amount missing: {value}")
239             if len(currency) == 0:
240                 raise HandledException(f"{prefix} currency missing: {value}")
241             amount = decimal.Decimal(amount_string)
242         booking_lines += [(account_name, amount, currency)]
243     if inside_booking:
244         raise HandledException(f"{prefix} last booking unfinished")
245     return bookings, comments
246
247 class Booking:
248
249     def __init__(self, date_string, description, booking_lines, start_line):
250         self.date_string = date_string
251         self.description = description
252         self.lines = booking_lines
253         self.start_line = start_line
254         self.validate_booking_lines()
255         self.account_changes = self.parse_booking_lines_to_account_changes()
256
257     def validate_booking_lines(self):
258         prefix = f"booking at line {self.start_line}"
259         sums = {}
260         empty_values = 0
261         for line in self.lines[1:]:
262             if line == '':
263                 continue
264             _, amount, currency = line
265             if amount is None:
266                 if empty_values > 0:
267                     raise HandledException(f"{prefix} relates more than one empty value of same currency {currency}")
268                 empty_values += 1
269                 continue
270             if currency not in sums:
271                 sums[currency] = 0
272             sums[currency] += amount
273         if empty_values == 0:
274             for k, v in sums.items():
275                 if v != 0:
276                     raise HandledException(f"{prefix} does not sum up to zero")
277         else:
278             sinkable = False
279             for k, v in sums.items():
280                 if v != 0:
281                     sinkable = True
282             if not sinkable:
283                 raise HandledException(f"{prefix} has empty value that cannot be filled")
284
285     def parse_booking_lines_to_account_changes(self):
286         account_changes = {}
287         debt = {}
288         sink_account = None
289         for line in self.lines[1:]:
290             if line == '':
291                 continue
292             account, amount, currency = line
293             if amount is None:
294                 sink_account = account
295                 continue
296             apply_booking_to_account_balances(account_changes, account, currency, amount)
297             if currency not in debt:
298                 debt[currency] = amount
299             else:
300                 debt[currency] += amount
301         if sink_account:
302             for currency, amount in debt.items():
303                 apply_booking_to_account_balances(account_changes, sink_account, currency, -amount)
304         return account_changes
305
306
307
308 class Database:
309
310     def __init__(self):
311         db_name = "_ledger"
312         self.db_file = db_name + ".json"
313         self.lock_file = db_name+ ".lock"
314         self.bookings = []
315         self.comments = []
316         self.real_lines = []
317         if os.path.exists(self.db_file):
318             with open(self.db_file, "r") as f:
319                 self.real_lines += f.readlines()
320         ret = parse_lines(self.real_lines)
321         self.bookings += ret[0]
322         self.comments += ret[1]
323
324     def get_lines(self, start, end):
325         return self.real_lines[start:end]
326
327     def replace(self, start, end, lines):
328         import shutil
329         if os.path.exists(self.lock_file):
330             raise HandledException('Sorry, lock file!')
331         if os.path.exists(self.db_file):
332             shutil.copy(self.db_file, self.db_file + ".bak")
333         f = open(self.lock_file, 'w+')
334         f.close()
335         text = ''.join(self.real_lines[:start]) + '\n'.join(lines) + ''.join(self.real_lines[end:])
336         with open(self.db_file, 'w') as f:
337             f.write(text);
338         os.remove(self.lock_file)
339
340     def append(self, lines):
341         import shutil
342         if os.path.exists(self.lock_file):
343             raise HandledException('Sorry, lock file!')
344         if os.path.exists(self.db_file):
345             shutil.copy(self.db_file, self.db_file + ".bak")
346         f = open(self.lock_file, 'w+')
347         f.close()
348         with open(self.db_file, 'a') as f:
349             f.write('\n' + '\n'.join(lines) + '\n');
350         os.remove(self.lock_file)
351
352
353 class MyServer(BaseHTTPRequestHandler):
354     header = """<html>
355 <meta charset="UTF-8">
356 <style>
357 body { color: #000000; }
358 table { margin-bottom: 2em; }
359 th, td { text-align: left }
360 input[type=number] { text-align: right; font-family: monospace; }
361 .money { font-family: monospace; text-align: right; }
362 .comment { font-style: italic; color: #777777; }
363 .full_line_comment { display: block; white-space: nowrap; width: 0; }
364 </style>
365 <body>
366 <a href="/ledger">ledger</a>
367 <a href="/balance">balance</a>
368 <a href="/add_free">add free</a>
369 <a href="/add_structured">add structured</a>
370 <hr />
371 """
372     footer = "</body>\n<html>"
373
374     def do_POST(self):
375         db = Database()
376         length = int(self.headers['content-length'])
377         postvars = parse_qs(self.rfile.read(length).decode(), keep_blank_values=1)
378         parsed_url = urlparse(self.path)
379         if '/add_structured' == parsed_url.path:
380             date = postvars['date'][0]
381             description = postvars['description'][0]
382             start_comment = postvars['line_0_comment'][0]
383             lines = [f'{date} {description} ; {start_comment}']
384             i = 1
385             while f'line_{i}_comment' in postvars.keys():
386                 account = postvars[f'line_{i}_account'][0]
387                 amount = postvars[f'line_{i}_amount'][0]
388                 currency = postvars[f'line_{i}_currency'][0]
389                 comment = postvars[f'line_{i}_comment'][0]
390                 i += 1
391                 new_main = f'{account} {amount} {currency}'
392                 if '' == new_main.rstrip() == comment.rstrip():  # don't write empty lines
393                     continue
394                 new_line = new_main
395                 if comment.rstrip() != '':
396                     new_line += f' ; {comment}'
397                 lines += [new_line]
398             if 'add_income_tax' in postvars.keys():
399                 lines += add_taxes(lines)
400         elif '/add_free' == parsed_url.path:
401             lines = postvars['booking'][0].splitlines()
402         start = int(postvars['start'][0])
403         end = int(postvars['end'][0])
404         try:
405             _, _ = parse_lines(lines)
406             if start == end == 0:
407                 db.append(lines)
408             else:
409                 db.replace(start, end, lines)
410             self.send_response(301)
411             redir_url = '/'
412             if 'add_income_tax' in postvars.keys():
413                 redir_url = f'/add_structured?start={start}&end={start+len(lines)-1}'
414             self.send_header('Location', redir_url)
415             self.end_headers()
416         except HandledException as e:
417             self.send_response(400)
418             self.end_headers()
419             page = f'{self.header}ERROR: {e}{self.footer}'
420             self.wfile.write(bytes(page, "utf-8"))
421
422     def do_GET(self):
423         self.send_response(200)
424         self.send_header("Content-type", "text/html")
425         self.end_headers()
426         db = Database()
427         parsed_url = urlparse(self.path)
428         page = self.header + ''
429         params = parse_qs(parsed_url.query)
430         start = int(params.get('start', ['0'])[0])
431         end = int(params.get('end', ['0'])[0])
432         bonus_lines = int(params.get('bonus_lines', ['0'])[0])
433         if parsed_url.path == '/balance':
434             page += self.balance_as_html(db)
435         elif parsed_url.path == '/add_free':
436             page += self.add_free(db, start, end)
437         elif parsed_url.path == '/add_structured':
438             page += self.add_structured(db, start, end)
439         elif parsed_url.path == '/copy_free':
440             page += self.add_free(db, start, end, copy=True)
441         elif parsed_url.path == '/copy_structured':
442             page += self.add_structured(db, start, end, copy=True)
443         else:
444             page += self.ledger_as_html(db)
445         page += self.footer
446         self.wfile.write(bytes(page, "utf-8"))
447
448     def balance_as_html(self, db):
449         lines = []
450         account_tree, account_sums = bookings_to_account_tree(db.bookings)
451         def print_subtree(lines, indent, node, subtree, path):
452             line = f"{indent}{node}"
453             n_tabs = 5 - (len(line) // 8)
454             line += n_tabs * "\t"
455             if "€" in account_sums[path + node].keys():
456                 amount = account_sums[path + node]["€"]
457                 line += f"{amount:9.2f} €\t"
458             else:
459                 line += f"\t\t"
460             for currency, amount in account_sums[path + node].items():
461                 if currency != '€' and amount > 0:
462                     line += f"{amount:5.2f} {currency}\t"
463             lines += [line]
464             indent += "  "
465             for k, v in sorted(subtree.items()):
466                 print_subtree(lines, indent, k, v, path + node + ":")
467         for k, v in sorted(account_tree.items()):
468             print_subtree(lines, "", k, v, "")
469         content = "\n".join(lines)
470         return f"<pre>{content}</pre>"
471
472     def ledger_as_html(self, db):
473         lines = []
474         for comment in db.comments:
475             lines += [f'<span class="comment">{comment}</span>' if comment != '' else '']
476         for booking in db.bookings:
477             i = booking.start_line
478             booking_end = booking.start_line + len(booking.lines)
479             lines[i] = f"""{booking.date_string} {booking.description} {lines[i]}
480 [edit: <a href="/add_structured?start={booking.start_line}&end={booking_end}">structured</a> 
481 / <a href="/add_free?start={booking.start_line}&end={booking_end}">free</a> 
482 | copy:<a href="/copy_structured?start={booking.start_line}&end={booking_end}">structured</a>
483 / <a href="/copy_free?start={booking.start_line}&end={booking_end}">free</a>]
484 <table>"""
485             for booking_line in booking.lines[1:]:
486                 i += 1
487                 if booking_line == '':
488                     lines[i] = f'<tr><td><div class="full_line_comment">{lines[i]}</div></td></tr>'
489                     continue
490                 comment = f' {lines[i]}' if len(lines[i]) > 0 else ''
491                 money = ''
492                 if booking_line[1]:
493                     money = f'{booking_line[1]} {booking_line[2]}'
494                 account = booking_line[0] 
495                 lines[i] = f'<tr><td>{booking_line[0]}</td><td class="money">{money}</td><td>{comment}</td></tr>'
496             lines[i] = lines[i] + "\n" + '</table>'
497         return '\n'.join(lines)
498
499     def header_add_form(self, action):
500         return f"<form method=\"POST\" action=\"/{action}\">\n"
501
502     def footer_add_form(self, start, end, copy):
503         if copy:
504             start = end = 0
505         return f"""
506 <input type="hidden" name="start" value={start} />
507 <input type="hidden" name="end" value={end} />
508 <input type="submit" name="default">
509 </form>"""
510
511     def textarea(self, name, lines, min_rows=1, min_cols=80):
512         safe_content = html.escape(''.join(lines))
513         n_rows = max(min_rows, len(lines))
514         n_cols = min_cols
515         for line in lines:
516             n_cols = len(line) if len(line) > n_cols else n_cols 
517         return f'<textarea name="{name}" rows={n_rows} cols={n_cols}>{safe_content}</textarea>'
518
519     def add_free(self, db, start=0, end=0, copy=False):
520         lines = db.get_lines(start, end)
521         return f'{self.header_add_form("add_free")}{self.textarea("booking",lines,10)}{self.footer_add_form(start, end, copy)}'
522
523     def add_structured(self, db, start=0, end=0, bonus_lines=10, copy=False):
524         import datetime
525         lines = db.get_lines(start, end)
526         bookings, comments = parse_lines(lines)
527         if len(bookings) > 1:
528             raise HandledException('can only edit single Booking')
529         last_line = 0
530         def inpu(name, val="", datalist="", input_type='text', size=-1):
531             val = val if val is not None else ""
532             safe_val = html.escape(str(val))
533             datalist_string = '' if datalist == '' else f'list="{datalist}"'
534             number_step = '' if input_type != 'number' else ' step=0.01'
535             size_string = '' if size < 0 else f' size={size}' 
536             return f'<input type="{input_type}"{number_step} name="{name}"{size_string} value="{safe_val}" {datalist_string}/>'
537         input_lines = inpu('add_income_tax', 'add income tax', '', 'submit') + '<br />'
538         today = str(datetime.datetime.now())[:10]
539         if len(bookings) == 0:
540             input_lines += f'{inpu("date", today, size=9)} {inpu("description", "", "descriptions")} ; {inpu("line_0_comment")}<br />'
541             last_line = 1
542         else:
543             booking = bookings[0]
544             last_line = len(comments)
545             date_string = today if copy else booking.date_string
546             input_lines += f'{inpu("date", date_string, size=9)} {inpu("description", booking.description, "descriptions")} ; {self.textarea("line_0_comment", [comments[0]])}<br />'
547             for i in range(1, len(comments)):
548                 account = amount = currency = ''
549                 if i < len(booking.lines) and booking.lines[i] != '':
550                     account = booking.lines[i][0]
551                     amount = booking.lines[i][1]
552                     currency = booking.lines[i][2]
553                 input_lines += f'{inpu(f"line_{i}_account", account, "accounts", size=40)} {inpu(f"line_{i}_amount", amount, "", "number", size=10)} {inpu(f"line_{i}_currency", currency, "currencies", size=3)} ; {self.textarea(f"line_{i}_comment", [comments[i]])}<br />'
554         for j in range(bonus_lines):
555             i = j + last_line
556             input_lines += f'{inpu(f"line_{i}_account", "", "accounts", size=40)} {inpu(f"line_{i}_amount", "", "", "number", size=10)} {inpu(f"line_{i}_currency", "", "currencies", size=3)} ; {self.textarea(f"line_{i}_comment", [""])}<br />'
557         datalist_sets = {'descriptions': set(), 'accounts': set(), 'currencies': set()}
558         for b in db.bookings:
559             datalist_sets['descriptions'].add(b.description)
560             for account, moneys in b.account_changes.items():
561                 datalist_sets['accounts'].add(account)
562                 for currency in moneys.keys():
563                     datalist_sets['currencies'].add(currency)
564         def build_datalist(name):
565             datalist = f'<datalist id="{name}">' + "\n"
566             for item in datalist_sets[name]:
567                 safe_item = html.escape(item)
568                 datalist += f'<option value="{safe_item}">{safe_item}</option>' + "\n"
569             return f"{datalist}</datalist>\n"
570         datalists = build_datalist('descriptions')
571         datalists += build_datalist('accounts')
572         datalists += build_datalist('currencies')
573         return f'{self.header_add_form("add_structured")}{input_lines}{datalists}{self.footer_add_form(start, end, copy)}'
574
575
576 if __name__ == "__main__":    
577     webServer = HTTPServer((hostName, serverPort), MyServer)
578     print(f"Server started http://{hostName}:{serverPort}")
579     try:
580         webServer.serve_forever()
581     except KeyboardInterrupt:
582         pass
583     webServer.server_close()
584     print("Server stopped.")