import html
import jinja2
import decimal
-import datetime
+from datetime import datetime, timedelta
from urllib.parse import parse_qs, urlparse
-hostName = "localhost"
-serverPort = 8082
+from plomlib import PlomDB, PlomException, run_server, run_server, PlomServer
-
-class HandledException(Exception):
- pass
+server_port = 8082
def apply_booking_to_account_balances(account_sums, account, currency, amount):
if inside_booking:
# assume we finished a booking, finalize, and commit to DB
if len(booking_lines) < 2:
- raise HandledException(f"{prefix} booking ends to early")
+ raise PlomException(f"{prefix} booking ends to early")
booking = Booking(date_string, description, booking_lines, start_line, validate_bookings)
bookings += [booking]
# expect new booking to follow so re-zeroall booking data
toks = non_comment.split(maxsplit=1)
date_string = toks[0]
try:
- datetime.datetime.strptime(date_string, '%Y-%m-%d')
+ datetime.strptime(date_string, '%Y-%m-%d')
except ValueError:
- raise HandledException(f"{prefix} bad date string: {date_string}")
+ raise PlomException(f"{prefix} bad date string: {date_string}")
if last_date > date_string:
- raise HandledException(f"{prefix} out-of-order-date")
+ raise PlomException(f"{prefix} out-of-order-date")
last_date = date_string
try:
description = toks[1]
except IndexError:
- raise HandledException(f"{prefix} bad description: {description}")
+ raise PlomException(f"{prefix} bad description: {description}")
inside_booking = True
booking_lines += [non_comment]
continue
# otherwise, read as transfer data
toks = non_comment.split() # ignore specification's allowance of single spaces in names
if len(toks) > 3:
- raise HandledException(f"{prefix} too many booking line tokens: {toks}")
+ raise PlomException(f"{prefix} too many booking line tokens: {toks}")
amount, currency = None, None
account_name = toks[0]
if account_name[0] == '[' and account_name[-1] == ']':
try:
amount = decimal.Decimal(toks[2])
except decimal.InvalidOperation:
- raise HandledException(f"{prefix} no decimal number in: {toks[1:]}")
+ raise PlomException(f"{prefix} no decimal number in: {toks[1:]}")
currency = toks[i_currency]
if currency[0] in decimal_chars:
- raise HandledException(f"{prefix} currency starts with int, dot, or minus: {currency}")
+ raise PlomException(f"{prefix} currency starts with int, dot, or minus: {currency}")
elif len(toks) == 2:
value = toks[1]
inside_amount = False
if inside_amount:
if c not in decimal_chars:
if len(currency) > 0:
- raise HandledException(f"{prefix} amount has non-decimal chars: {value}")
+ raise PlomException(f"{prefix} amount has non-decimal chars: {value}")
inside_currency = True
inside_amount = False
currency += c
continue
if c == '-' and len(amount_string) > 1:
- raise HandledException(f"{prefix} amount has non-start '-': {value}")
+ raise PlomException(f"{prefix} amount has non-start '-': {value}")
if c == '.':
if dots_counted > 1:
- raise HandledException(f"{prefix} amount has multiple dots: {value}")
+ raise PlomException(f"{prefix} amount has multiple dots: {value}")
dots_counted += 1
amount_string += c
if len(currency) == 0:
- raise HandledException(f"{prefix} currency missing: {value}")
+ raise PlomException(f"{prefix} currency missing: {value}")
if len(amount_string) > 0:
amount = decimal.Decimal(amount_string)
booking_lines += [(account_name, amount, currency)]
if inside_booking:
- raise HandledException(f"{prefix} last booking unfinished")
+ raise PlomException(f"{prefix} last booking unfinished")
return bookings, comments
_, amount, currency = line
if amount is None:
if empty_values > 0:
- raise HandledException(f"{prefix} relates more than one empty value of same currency {currency}")
+ raise PlomException(f"{prefix} relates more than one empty value of same currency {currency}")
empty_values += 1
continue
if currency not in sums:
if empty_values == 0:
for k, v in sums.items():
if v != 0:
- raise HandledException(f"{prefix} does not add up to zero / {k} {v}")
+ raise PlomException(f"{prefix} does not add up to zero / {k} {v}")
else:
sinkable = False
for k, v in sums.items():
if v != 0:
sinkable = True
if not sinkable:
- raise HandledException(f"{prefix} has empty value that cannot be filled")
+ raise PlomException(f"{prefix} has empty value that cannot be filled")
def parse_booking_lines_to_account_changes(self):
account_changes = {}
-class Database:
+class LedgerDB(PlomDB):
def __init__(self):
- db_name = "_ledger"
- self.db_file = db_name + ".json"
- self.lock_file = db_name+ ".lock"
self.bookings = []
self.comments = []
self.real_lines = []
- if os.path.exists(self.db_file):
- with open(self.db_file, "r") as f:
- self.real_lines += [l.rstrip() for l in f.readlines()]
+ super().__init__('_ledger')
ret = parse_lines(self.real_lines)
self.bookings += ret[0]
self.comments += ret[1]
+ def read_db_file(self, f):
+ self.real_lines += [l.rstrip() for l in f.readlines()]
+
def get_lines(self, start, end):
return self.real_lines[start:end]
def write_db(self, text, mode='w'):
- import shutil
- if os.path.exists(self.lock_file):
- raise HandledException('Sorry, lock file!')
- f = open(self.lock_file, 'w+')
- f.close()
-
- # always back up most recent to .bak
- bakpath = f'{self.db_file}.bak'
- shutil.copy(self.db_file, bakpath)
-
- # collect modification times of numbered .bak files
- bak_prefix = f'{bakpath}.'
- backup_dates = []
- i = 0
- bak_as = f'{bak_prefix}{i}'
- while os.path.exists(bak_as):
- mod_time = os.path.getmtime(bak_as)
- backup_dates += [str(datetime.datetime.fromtimestamp(mod_time))]
- i += 1
- bak_as = f'{bak_prefix}{i}'
-
- # collect what numbered .bak files to save:
- # shrink datetime string right to left character by character,
- # on each step add the oldest file whose mtime still fits the pattern
- # (privilege older files to keep existing longer)
- to_save = []
- datetime_len = 19
- now = str(datetime.datetime.now())[:datetime_len]
- while datetime_len > 2:
- # assume backup_dates starts with oldest dates
- for i, date in enumerate(backup_dates):
- if date[:datetime_len] == now:
- if i not in to_save:
- to_save += [i]
- break
- datetime_len -= 1
- now = now[:datetime_len]
-
- # remove redundant backup files
- j = 0
- for i in to_save:
- if i != j:
- source = f'{bak_prefix}{i}'
- target = f'{bak_prefix}{j}'
- shutil.move(source, target)
- j += 1
- for i in range(j, len(backup_dates)):
- try:
- os.remove(f'{bak_prefix}{i}')
- except FileNotFoundError:
- pass
-
- # put second backup copy of current state at end of bak list
- shutil.copy(self.db_file, f'{bak_prefix}{j}')
- with open(self.db_file, mode) as f:
- f.write(text);
- os.remove(self.lock_file)
+ self.write_text_to_db(text)
def insert_at_date(self, lines, date):
start_at = len(self.real_lines)
return ret
-class MyServer(BaseHTTPRequestHandler):
+class LedgerServer(PlomServer):
header = """<html>
<meta charset="UTF-8">
<style>
postvars = parse_qs(self.rfile.read(length).decode(), keep_blank_values=1)
start = int(postvars['start'][0])
end = int(postvars['end'][0])
- db = Database()
+ db = LedgerDB()
add_empty_line = None
lines = []
# get inputs
_, _ = parse_lines(lines)
# if saving, process where to and where to redirect after
if 'save' in postvars.keys():
- last_date = str(datetime.datetime.now())[:10]
+ last_date = str(datetime.now())[:10]
if len(db.bookings) > 0:
last_date = db.bookings[-1].date_string
target_date = last_date[:]
first_line_tokens = lines[0].split() if len(lines) > 0 else ''
first_token = first_line_tokens[0] if len(first_line_tokens) > 0 else ''
try:
- datetime.datetime.strptime(first_token, '%Y-%m-%d')
+ datetime.strptime(first_token, '%Y-%m-%d')
target_date = first_token
except ValueError:
pass
if start == end == 0:
- start = db.insert_at_date(lines, target_date)
- nth = db.get_nth_for_booking_of_start_line(start)
+ start = db.insert_at_date(lines, target_date)
+ nth = db.get_nth_for_booking_of_start_line(start)
else:
new_start = db.update(start, end, lines, target_date)
nth = db.get_nth_for_booking_of_start_line(new_start)
else:
edit_content = self.add_free(db, start, end)
self.send_HTML(self.header + edit_content + self.footer)
- except HandledException as e:
+ except PlomException as e:
self.fail_400(e)
def do_GET(self):
params = parse_qs(parsed_url.query)
start = int(params.get('start', ['0'])[0])
end = int(params.get('end', ['0'])[0])
- db = Database()
+ db = LedgerDB()
page = self.header
if parsed_url.path == '/balance':
stop = params.get('stop', [None])[0]
page += self.ledger_as_html(db)
page += self.footer
self.send_HTML(page)
- except HandledException as e:
+ except PlomException as e:
self.fail_400(e)
- def fail_400(self, e):
- page = f'{self.header}ERROR: {e}{self.footer}'
- self.send_HTML(page, 400)
-
- def send_HTML(self, html, code=200):
- self.send_code_and_headers(code, [('Content-type', 'text/html')])
- self.wfile.write(bytes(html, "utf-8"))
-
- def send_code_and_headers(self, code, headers=[]):
- self.send_response(code)
- for fieldname, content in headers:
- self.send_header(fieldname, content)
- self.end_headers()
-
def booking_lines_from_postvars(self, postvars, db):
add_empty_line = None
date = postvars['date'][0]
for currency in temp_bookings[0].sink:
amount = temp_bookings[0].sink[currency]
lines += [f'Assets {amount:.2f} {currency}']
- except HandledException:
+ except PlomException:
pass
if 'add_taxes' in postvars.keys():
lines += db.add_taxes(lines, finish=False)
lines = temp_lines if len(''.join(temp_lines)) > 0 else db.get_lines(start, end)
bookings, comments = parse_lines(lines, validate_bookings=False)
if len(bookings) > 1:
- raise HandledException('can only structurally edit single Booking')
+ raise PlomException('can only structurally edit single Booking')
if add_empty_line is not None:
comments = comments[:add_empty_line+1] + [''] + comments[add_empty_line+1:]
booking = bookings[0]
for currency in moneys.keys():
datalist_sets['currencies'].add(currency)
content = ''
- today = str(datetime.datetime.now())[:10]
+ today = str(datetime.now())[:10]
booking_lines = []
if copy:
start = end = 0
if __name__ == "__main__":
- webServer = HTTPServer((hostName, serverPort), MyServer)
- print(f"Server started http://{hostName}:{serverPort}")
- try:
- webServer.serve_forever()
- except KeyboardInterrupt:
- pass
- webServer.server_close()
- print("Server stopped.")
+ run_server(server_port, LedgerServer)