import html
import jinja2
import decimal
-import datetime
+from datetime import datetime, timedelta
from urllib.parse import parse_qs, urlparse
-hostName = "localhost"
-serverPort = 8082
+from plomlib import PlomDB, PlomException, run_server, run_server, PlomServer
-
-class HandledException(Exception):
- pass
+server_port = 8082
def apply_booking_to_account_balances(account_sums, account, currency, amount):
def parse_lines(lines, validate_bookings=True):
- import datetime
inside_booking = False
date_string, description = None, None
booking_lines = []
if inside_booking:
# assume we finished a booking, finalize, and commit to DB
if len(booking_lines) < 2:
- raise HandledException(f"{prefix} booking ends to early")
+ raise PlomException(f"{prefix} booking ends to early")
booking = Booking(date_string, description, booking_lines, start_line, validate_bookings)
bookings += [booking]
# expect new booking to follow so re-zeroall booking data
toks = non_comment.split(maxsplit=1)
date_string = toks[0]
try:
- datetime.datetime.strptime(date_string, '%Y-%m-%d')
+ datetime.strptime(date_string, '%Y-%m-%d')
except ValueError:
- raise HandledException(f"{prefix} bad date string: {date_string}")
+ raise PlomException(f"{prefix} bad date string: {date_string}")
if last_date > date_string:
- raise HandledException(f"{prefix} out-of-order-date")
+ raise PlomException(f"{prefix} out-of-order-date")
last_date = date_string
try:
description = toks[1]
except IndexError:
- raise HandledException(f"{prefix} bad description: {description}")
+ raise PlomException(f"{prefix} bad description: {description}")
inside_booking = True
booking_lines += [non_comment]
continue
# otherwise, read as transfer data
toks = non_comment.split() # ignore specification's allowance of single spaces in names
if len(toks) > 3:
- raise HandledException(f"{prefix} too many booking line tokens: {toks}")
+ raise PlomException(f"{prefix} too many booking line tokens: {toks}")
amount, currency = None, None
account_name = toks[0]
if account_name[0] == '[' and account_name[-1] == ']':
try:
amount = decimal.Decimal(toks[2])
except decimal.InvalidOperation:
- raise HandledException(f"{prefix} no decimal number in: {toks[1:]}")
+ raise PlomException(f"{prefix} no decimal number in: {toks[1:]}")
currency = toks[i_currency]
if currency[0] in decimal_chars:
- raise HandledException(f"{prefix} currency starts with int, dot, or minus: {currency}")
+ raise PlomException(f"{prefix} currency starts with int, dot, or minus: {currency}")
elif len(toks) == 2:
value = toks[1]
inside_amount = False
if inside_amount:
if c not in decimal_chars:
if len(currency) > 0:
- raise HandledException(f"{prefix} amount has non-decimal chars: {value}")
+ raise PlomException(f"{prefix} amount has non-decimal chars: {value}")
inside_currency = True
inside_amount = False
currency += c
continue
if c == '-' and len(amount_string) > 1:
- raise HandledException(f"{prefix} amount has non-start '-': {value}")
+ raise PlomException(f"{prefix} amount has non-start '-': {value}")
if c == '.':
if dots_counted > 1:
- raise HandledException(f"{prefix} amount has multiple dots: {value}")
+ raise PlomException(f"{prefix} amount has multiple dots: {value}")
dots_counted += 1
amount_string += c
if len(currency) == 0:
- raise HandledException(f"{prefix} currency missing: {value}")
+ raise PlomException(f"{prefix} currency missing: {value}")
if len(amount_string) > 0:
amount = decimal.Decimal(amount_string)
booking_lines += [(account_name, amount, currency)]
if inside_booking:
- raise HandledException(f"{prefix} last booking unfinished")
+ raise PlomException(f"{prefix} last booking unfinished")
return bookings, comments
_, amount, currency = line
if amount is None:
if empty_values > 0:
- raise HandledException(f"{prefix} relates more than one empty value of same currency {currency}")
+ raise PlomException(f"{prefix} relates more than one empty value of same currency {currency}")
empty_values += 1
continue
if currency not in sums:
if empty_values == 0:
for k, v in sums.items():
if v != 0:
- raise HandledException(f"{prefix} does not add up to zero / {k} {v}")
+ raise PlomException(f"{prefix} does not add up to zero / {k} {v}")
else:
sinkable = False
for k, v in sums.items():
if v != 0:
sinkable = True
if not sinkable:
- raise HandledException(f"{prefix} has empty value that cannot be filled")
+ raise PlomException(f"{prefix} has empty value that cannot be filled")
def parse_booking_lines_to_account_changes(self):
account_changes = {}
-class Database:
+class LedgerDB(PlomDB):
def __init__(self):
- db_name = "_ledger"
- self.db_file = db_name + ".json"
- self.lock_file = db_name+ ".lock"
self.bookings = []
self.comments = []
self.real_lines = []
- if os.path.exists(self.db_file):
- with open(self.db_file, "r") as f:
- self.real_lines += [l.rstrip() for l in f.readlines()]
+ super().__init__('_ledger')
ret = parse_lines(self.real_lines)
self.bookings += ret[0]
self.comments += ret[1]
+ def read_db_file(self, f):
+ self.real_lines += [l.rstrip() for l in f.readlines()]
+
def get_lines(self, start, end):
return self.real_lines[start:end]
def write_db(self, text, mode='w'):
- import shutil
- if os.path.exists(self.lock_file):
- raise HandledException('Sorry, lock file!')
- f = open(self.lock_file, 'w+')
- f.close()
-
- # always back up most recent to .bak
- bakpath = f'{self.db_file}.bak'
- shutil.copy(self.db_file, bakpath)
-
- # collect modification times of numbered .bak files
- bak_prefix = f'{bakpath}.'
- backup_dates = []
- i = 0
- bak_as = f'{bak_prefix}{i}'
- while os.path.exists(bak_as):
- mod_time = os.path.getmtime(bak_as)
- backup_dates += [str(datetime.datetime.fromtimestamp(mod_time))]
- i += 1
- bak_as = f'{bak_prefix}{i}'
-
- # collect what numbered .bak files to save:
- # shrink datetime string right to left character by character,
- # on each step add the oldest file whose mtime still fits the pattern
- # (privilege older files to keep existing longer)
- to_save = []
- datetime_len = 19
- now = str(datetime.datetime.now())[:datetime_len]
- while datetime_len > 2:
- # assume backup_dates starts with oldest dates
- for i, date in enumerate(backup_dates):
- if date[:datetime_len] == now:
- if i not in to_save:
- to_save += [i]
- break
- datetime_len -= 1
- now = now[:datetime_len]
-
- # remove redundant backup files
- j = 0
- for i in to_save:
- if i != j:
- source = f'{bak_prefix}{i}'
- target = f'{bak_prefix}{j}'
- shutil.move(source, target)
- j += 1
- for i in range(j, len(backup_dates)):
- try:
- os.remove(f'{bak_prefix}{i}')
- except FileNotFoundError:
- pass
+ self.write_text_to_db(text)
- # put second backup copy of current state at end of bak list
- shutil.copy(self.db_file, f'{bak_prefix}{j}')
- with open(self.db_file, mode) as f:
- f.write(text);
- os.remove(self.lock_file)
+ def insert_at_date(self, lines, date):
+ start_at = len(self.real_lines)
+ for b in self.bookings:
+ if b.date_string == date:
+ start_at = b.start_line
+ break
+ elif b.date_string > date:
+ break
+ if start_at == len(self.real_lines):
+ lines = [''] + lines
+ return self.write_lines_in_total_lines_at(self.real_lines, start_at, lines)
+
+ def update(self, start, end, lines, date):
+ total_lines = self.real_lines[:start] + self.real_lines[end:]
+ n_original_lines = end - start
+ start_at = len(total_lines)
+ for b in self.bookings:
+ if b.date_string == date:
+ if start_at == len(total_lines) or b.start_line == start:
+ start_at = b.start_line
+ if b.start_line > start:
+ start_at -= n_original_lines
+ elif b.date_string > date:
+ break
+ if start_at == len(total_lines):
+ lines = [''] + lines
+ return self.write_lines_in_total_lines_at(total_lines, start_at, lines)
- def replace(self, start, end, lines):
- total_lines = self.real_lines[:start] + lines + self.real_lines[end:]
+ def write_lines_in_total_lines_at(self, total_lines, start_at, lines):
+ total_lines = total_lines[:start_at] + lines + [''] + total_lines[start_at:]
+ _, _ = parse_lines(lines)
text = '\n'.join(total_lines)
self.write_db(text)
-
- def append(self, lines):
- text = '\n\n' + '\n'.join(lines) + '\n\n'
- self.write_db(text, 'a')
+ return start_at
def get_nth_for_booking_of_start_line(self, start_line):
nth = 0
for b in self.bookings:
- if b.start_line == start_line:
+ if b.start_line >= start_line:
break
nth += 1
return nth
return ret
-class MyServer(BaseHTTPRequestHandler):
+class LedgerServer(PlomServer):
header = """<html>
<meta charset="UTF-8">
<style>
/ <a href="/add_free?start={{start}}&end={{end}}">free</a>
| copy:<a href="/copy_structured?start={{start}}&end={{end}}">structured</a>
/ <a href="/copy_free?start={{start}}&end={{end}}">free</a>
+| move {% if move_up %}<a href="/move_up?start={{start}}&end={{end}}">up</a>{% else %}up{% endif %}/{% if move_down %}<a href="/move_down?start={{start}}&end={{end}}">down</a>{% else %}down{% endif %}
| <a href="/balance?stop={{nth+1}}">balance after</a>
]</span>
<table>
postvars = parse_qs(self.rfile.read(length).decode(), keep_blank_values=1)
start = int(postvars['start'][0])
end = int(postvars['end'][0])
- db = Database()
+ db = LedgerDB()
add_empty_line = None
lines = []
# get inputs
_, _ = parse_lines(lines)
# if saving, process where to and where to redirect after
if 'save' in postvars.keys():
+ last_date = str(datetime.now())[:10]
+ if len(db.bookings) > 0:
+ last_date = db.bookings[-1].date_string
+ target_date = last_date[:]
+ first_line_tokens = lines[0].split() if len(lines) > 0 else ''
+ first_token = first_line_tokens[0] if len(first_line_tokens) > 0 else ''
+ try:
+ datetime.strptime(first_token, '%Y-%m-%d')
+ target_date = first_token
+ except ValueError:
+ pass
if start == end == 0:
- db.append(lines)
- redir_url = f'/#last'
- else:
- db.replace(start, end, lines)
+ start = db.insert_at_date(lines, target_date)
nth = db.get_nth_for_booking_of_start_line(start)
- redir_url = f'/#{nth}'
- self.send_code_and_headers(301, [('Location', redir_url)])
+ else:
+ new_start = db.update(start, end, lines, target_date)
+ nth = db.get_nth_for_booking_of_start_line(new_start)
+ if new_start > start:
+ nth -= 1
+ redir_url = f'/#{nth}'
+ self.send_code_and_headers(302, [('Location', redir_url)])
# otherwise just re-build editing form
else:
if '/add_structured' == parsed_url.path:
else:
edit_content = self.add_free(db, start, end)
self.send_HTML(self.header + edit_content + self.footer)
- except HandledException as e:
+ except PlomException as e:
self.fail_400(e)
def do_GET(self):
params = parse_qs(parsed_url.query)
start = int(params.get('start', ['0'])[0])
end = int(params.get('end', ['0'])[0])
- db = Database()
+ db = LedgerDB()
page = self.header
if parsed_url.path == '/balance':
stop = params.get('stop', [None])[0]
page += self.add_free(db, start, end, copy=True)
elif parsed_url.path == '/copy_structured':
page += self.add_structured(db, start, end, copy=True)
+ elif parsed_url.path == '/move_up':
+ nth = self.move_up(db, start, end)
+ self.send_code_and_headers(302, [('Location', f'/#{nth}')])
+ return
+ elif parsed_url.path == '/move_down':
+ nth = self.move_down(db, start, end)
+ self.send_code_and_headers(302, [('Location', f'/#{nth}')])
+ return
else:
page += self.ledger_as_html(db)
page += self.footer
self.send_HTML(page)
- except HandledException as e:
+ except PlomException as e:
self.fail_400(e)
- def fail_400(self, e):
- page = f'{self.header}ERROR: {e}{self.footer}'
- self.send_HTML(page, 400)
-
- def send_HTML(self, html, code=200):
- self.send_code_and_headers(code, [('Content-type', 'text/html')])
- self.wfile.write(bytes(html, "utf-8"))
-
- def send_code_and_headers(self, code, headers=[]):
- self.send_response(code)
- for fieldname, content in headers:
- self.send_header(fieldname, content)
- self.end_headers()
-
def booking_lines_from_postvars(self, postvars, db):
add_empty_line = None
date = postvars['date'][0]
for currency in temp_bookings[0].sink:
amount = temp_bookings[0].sink[currency]
lines += [f'Assets {amount:.2f} {currency}']
- except HandledException:
+ except PlomException:
pass
if 'add_taxes' in postvars.keys():
lines += db.add_taxes(lines, finish=False)
elements_to_write = []
last_i = i = 0 ##
for nth, booking in enumerate(db.bookings):
+ move_up = nth > 0 and db.bookings[nth - 1].date_string == booking.date_string
+ move_down = nth < len(db.bookings) - 1 and db.bookings[nth + 1].date_string == booking.date_string
booking_end = last_i = booking.start_line + len(booking.lines)
booking_lines = []
i = booking.start_line ##
date=booking.date_string,
desc=booking.description,
head_comment=db.comments[booking.start_line],
+ move_up=move_up,
+ move_down=move_down,
booking_lines = booking_lines)]
elements_to_write += [single_c_tmpl.render(c=c) for c in db.comments[last_i:] if c != ''] #
return '\n'.join(elements_to_write)
lines = temp_lines if len(''.join(temp_lines)) > 0 else db.get_lines(start, end)
bookings, comments = parse_lines(lines, validate_bookings=False)
if len(bookings) > 1:
- raise HandledException('can only structurally edit single Booking')
+ raise PlomException('can only structurally edit single Booking')
if add_empty_line is not None:
comments = comments[:add_empty_line+1] + [''] + comments[add_empty_line+1:]
booking = bookings[0]
for currency in moneys.keys():
datalist_sets['currencies'].add(currency)
content = ''
- today = str(datetime.datetime.now())[:10]
+ today = str(datetime.now())[:10]
booking_lines = []
if copy:
start = end = 0
end=end)
return content
+ def move_up(self, db, start, end):
+ prev_booking = None
+ for redir_nth, b in enumerate(db.bookings):
+ if b.start_line >= start:
+ break
+ prev_booking = b
+ start_at = prev_booking.start_line
+ self.make_move(db, start, end, start_at)
+ return redir_nth - 1
+
+ def move_down(self, db, start, end):
+ next_booking = None
+ for redir_nth, b in enumerate(db.bookings):
+ if b.start_line > start:
+ next_booking = b
+ break
+ start_at = next_booking.start_line + len(next_booking.lines) - (end - start) + 1
+ self.make_move(db, start, end, start_at)
+ return redir_nth
+
+ def make_move(self, db, start, end, start_at):
+ lines = db.get_lines(start, end)
+ total_lines = db.real_lines[:start] + db.real_lines[end:]
+ db.write_lines_in_total_lines_at(total_lines, start_at, lines)
+
if __name__ == "__main__":
- webServer = HTTPServer((hostName, serverPort), MyServer)
- print(f"Server started http://{hostName}:{serverPort}")
- try:
- webServer.serve_forever()
- except KeyboardInterrupt:
- pass
- webServer.server_close()
- print("Server stopped.")
+ run_server(server_port, LedgerServer)