-from http.server import BaseHTTPRequestHandler, HTTPServer
+# from http.server import BaseHTTPRequestHandler, HTTPServer
import os
import json
import jinja2
+from plomlib import PlomDB, PlomException, run_server, run_server, PlomServer
-hostName = "localhost"
-serverPort = 8081
+server_port = 8081
tmpl = jinja2.Template("""<html>
<meta charset="UTF-8">
</body
</html>""")
-class Database:
- timestamp_year = 0,
- timestamp_month = 0,
- timestamp_week = 0,
- year_income = 0,
- month_income = 0,
- week_income = 0,
- workday_hourly_rate_1 = 10,
- workday_hourly_rate_2 = 25,
- workday_hourly_rate_3 = 50,
- workday_minutes_worked_1 = 0,
- workday_minutes_worked_2 = 0,
- workday_minutes_worked_3 = 0,
- year_goal = 20000,
- workdays_per_month = 16
+class IncomeDB(PlomDB):
def __init__(self):
- db_name = "_income"
- self.db_file = db_name + ".json"
- self.lock_file = db_name+ ".lock"
- if os.path.exists(self.db_file):
- with open(self.db_file, "r") as f:
- d = json.load(f)
- for k, v in d.items():
- if not hasattr(self, k):
- raise Exception("bad key in db: " + k)
- setattr(self, k, v)
+ # defaults
+ self.timestamp_year = 0,
+ self.timestamp_month = 0,
+ self.timestamp_week = 0,
+ self.year_income = 0,
+ self.month_income = 0,
+ self.week_income = 0,
+ self.workday_hourly_rate_1 = 10,
+ self.workday_hourly_rate_2 = 25,
+ self.workday_hourly_rate_3 = 50,
+ self.workday_minutes_worked_1 = 0,
+ self.workday_minutes_worked_2 = 0,
+ self.workday_minutes_worked_3 = 0,
+ self.year_goal = 20000,
+ self.workdays_per_month = 16
+ super().__init__('_income')
- def lock(self):
- if os.path.exists(self.lock_file):
- raise Exception('Sorry, lock file!')
- f = open(self.lock_file, 'w+')
- f.close()
-
- def unlock(self):
- os.remove(self.lock_file)
+ def read_db_file(self, f):
+ d = json.load(f)
+ for k, v in d.items():
+ if not hasattr(self, k):
+ raise PlomException("bad key in db: " + k)
+ setattr(self, k, v)
def to_dict(self):
keys = [k for k in dir(self) if (not k.startswith('_')) and (not callable(getattr(self, k)))]
def write_db(self):
self.write_text_to_db(json.dumps(self.to_dict()))
- def backup(self):
- import shutil
- from datetime import datetime, timedelta
- # collect modification times of numbered .bak files
- bak_prefix = f'{self.db_file}.bak.'
- backup_dates = []
- i = 0
- bak_as = f'{bak_prefix}{i}'
- while os.path.exists(bak_as):
- mod_time = os.path.getmtime(bak_as)
- backup_dates += [str(datetime.fromtimestamp(mod_time))]
- i += 1
- bak_as = f'{bak_prefix}{i}'
-
- # collect what numbered .bak files to save: the older, the fewer; for each
- # timedelta, keep the newest file that's older
- ages_to_keep = [timedelta(minutes=4**i) for i in range(0, 8)]
- now = datetime.now()
- to_save = []
- for age in ages_to_keep:
- limit = now - age
- for i, date in enumerate(reversed(backup_dates)):
- if datetime.strptime(date, '%Y-%m-%d %H:%M:%S.%f') < limit:
- unreversed_i = len(backup_dates) - i - 1
- if unreversed_i not in to_save:
- to_save += [unreversed_i]
- break
-
- # remove redundant backup files
- j = 0
- for i in to_save:
- if i != j:
- source = f'{bak_prefix}{i}'
- target = f'{bak_prefix}{j}'
- shutil.move(source, target)
- j += 1
- for i in range(j, len(backup_dates)):
- try:
- os.remove(f'{bak_prefix}{i}')
- except FileNotFoundError:
- pass
-
- # put copy of current state at end of bak list
- shutil.copy(self.db_file, f'{bak_prefix}{j}')
-
- def write_text_to_db(self, text):
- self.lock()
- self.backup()
- with open(self.db_file, 'w') as f:
- f.write(text);
- self.unlock()
-
class ProgressBar:
def __init__(self, title, earned, goal, time_progress=-1):
self.title = title
if time_progress > 0:
self.success = success_income / time_progress
-class MyServer(BaseHTTPRequestHandler):
+# class MyServer(BaseHTTPRequestHandler):
+class IncomeServer(PlomServer):
def do_POST(self):
from urllib.parse import parse_qs
- length = int(self.headers['content-length'])
- postvars = parse_qs(self.rfile.read(length).decode(), keep_blank_values=1)
- db = Database()
- db.workday_minutes_worked_1 = int(postvars['workday_minutes_worked_1'][0])
- db.workday_minutes_worked_2 = int(postvars['workday_minutes_worked_2'][0])
- db.workday_minutes_worked_3 = int(postvars['workday_minutes_worked_3'][0])
- db.workday_hourly_rate_1 = int(postvars['workday_hourly_rate_1'][0])
- db.workday_hourly_rate_2 = int(postvars['workday_hourly_rate_2'][0])
- db.workday_hourly_rate_3 = int(postvars['workday_hourly_rate_3'][0])
- db.year_goal = int(postvars['year_goal'][0])
- db.workdays_per_month = int(postvars['workdays_per_month'][0])
- if 'finish' in postvars.keys():
- day_income = (db.workday_minutes_worked_1 / 60.0) * db.workday_hourly_rate_1
- day_income += (db.workday_minutes_worked_2 / 60.0) * db.workday_hourly_rate_2
- day_income += (db.workday_minutes_worked_3 / 60.0) * db.workday_hourly_rate_3
- db.year_income += day_income
- db.month_income += day_income
- db.week_income += day_income
- db.workday_minutes_worked_1 = 0
- db.workday_minutes_worked_2 = 0
- db.workday_minutes_worked_3 = 0
- db.write_db()
- self.send_response(302)
- self.send_header('Location', '/')
- self.end_headers()
+ try:
+ length = int(self.headers['content-length'])
+ postvars = parse_qs(self.rfile.read(length).decode(), keep_blank_values=1)
+ db = IncomeDB()
+ db.workday_minutes_worked_1 = int(postvars['workday_minutes_worked_1'][0])
+ db.workday_minutes_worked_2 = int(postvars['workday_minutes_worked_2'][0])
+ db.workday_minutes_worked_3 = int(postvars['workday_minutes_worked_3'][0])
+ db.workday_hourly_rate_1 = int(postvars['workday_hourly_rate_1'][0])
+ db.workday_hourly_rate_2 = int(postvars['workday_hourly_rate_2'][0])
+ db.workday_hourly_rate_3 = int(postvars['workday_hourly_rate_3'][0])
+ db.year_goal = int(postvars['year_goal'][0])
+ db.workdays_per_month = int(postvars['workdays_per_month'][0])
+ if 'finish' in postvars.keys():
+ day_income = (db.workday_minutes_worked_1 / 60.0) * db.workday_hourly_rate_1
+ day_income += (db.workday_minutes_worked_2 / 60.0) * db.workday_hourly_rate_2
+ day_income += (db.workday_minutes_worked_3 / 60.0) * db.workday_hourly_rate_3
+ db.year_income += day_income
+ db.month_income += day_income
+ db.week_income += day_income
+ db.workday_minutes_worked_1 = 0
+ db.workday_minutes_worked_2 = 0
+ db.workday_minutes_worked_3 = 0
+ db.write_db()
+ self.send_response(302)
+ self.send_header('Location', '/')
+ self.end_headers()
+ except PlomException as e:
+ self.fail_400(e)
def do_GET(self):
import datetime
import calendar
- db = Database() #load_db()
- today = datetime.datetime.now()
- update_db = False
- if today.year != db.timestamp_year:
- db.timestamp_year = today.year
- db.timestamp_month = today.month
- db.year_income = 0
- db.month_income = 0
- update_db = True
- if today.month != db.timestamp_month:
- db.timestamp_month = today.month
- db.month_income = 0
- update_db = True
- if today.isocalendar()[1] != db.timestamp_week:
- db.timestamp_week = today.isocalendar()[1]
- db.week_income = 0
- update_db = True
- if update_db:
- print("Resetting timestamp")
- db.write_db()
- day_of_year = today.toordinal() - datetime.date(today.year, 1, 1).toordinal() + 1
- year_length = 365 + calendar.isleap(today.year)
- workday_goal = db.year_goal / 12 / db.workdays_per_month
- workdays_per_week = (db.workdays_per_month * 12) / (year_length / 7)
- month_goal = db.year_goal / 12
- week_goal = db.year_goal / (year_length / 7)
- day_income = (db.workday_minutes_worked_1 / 60.0) * db.workday_hourly_rate_1
- day_income += (db.workday_minutes_worked_2 / 60.0) * db.workday_hourly_rate_2
- day_income += (db.workday_minutes_worked_3 / 60.0) * db.workday_hourly_rate_3
- year_plus = db.year_income + day_income
- month_plus = db.month_income + day_income
- week_plus = db.week_income + day_income
- progress_time_year = day_of_year / year_length
- progress_time_month = today.day / calendar.monthrange(today.year, today.month)[1]
- progress_time_week = today.weekday() / 7
- progress_bars = [ProgressBar("year", year_plus, db.year_goal, progress_time_year),
- ProgressBar("month", month_plus, month_goal, progress_time_month),
- ProgressBar("week", week_plus, week_goal, progress_time_week),
- ProgressBar("workday", day_income, workday_goal)]
- page = tmpl.render(
- progress_bars = progress_bars,
- workday_hourly_rate_1 = db.workday_hourly_rate_1,
- workday_minutes_worked_1 = db.workday_minutes_worked_1,
- workday_hourly_rate_2 = db.workday_hourly_rate_2,
- workday_minutes_worked_2 = db.workday_minutes_worked_2,
- workday_hourly_rate_3 = db.workday_hourly_rate_3,
- workday_minutes_worked_3 = db.workday_minutes_worked_3,
- year_goal = db.year_goal,
- month_goal = month_goal,
- week_goal = week_goal,
- workdays_per_month = db.workdays_per_month,
- workday_goal = workday_goal,
- workdays_per_week = workdays_per_week,
- )
- self.send_response(200)
- self.send_header("Content-type", "text/html")
- self.end_headers()
- self.wfile.write(bytes(page, "utf-8"))
-
- def fail_on_lockfile(self):
- if os.path.exists(lock_file):
- self.send_response(400)
- self.end_headers()
- self.wfile.write(bytes("Sorry, lock file!", "utf-8"))
- return True
- return False
+ try:
+ db = IncomeDB()
+ today = datetime.datetime.now()
+ update_db = False
+ if today.year != db.timestamp_year:
+ db.timestamp_year = today.year
+ db.timestamp_month = today.month
+ db.year_income = 0
+ db.month_income = 0
+ update_db = True
+ if today.month != db.timestamp_month:
+ db.timestamp_month = today.month
+ db.month_income = 0
+ update_db = True
+ if today.isocalendar()[1] != db.timestamp_week:
+ db.timestamp_week = today.isocalendar()[1]
+ db.week_income = 0
+ update_db = True
+ if update_db:
+ print("Resetting timestamp")
+ db.write_db()
+ day_of_year = today.toordinal() - datetime.date(today.year, 1, 1).toordinal() + 1
+ year_length = 365 + calendar.isleap(today.year)
+ workday_goal = db.year_goal / 12 / db.workdays_per_month
+ workdays_per_week = (db.workdays_per_month * 12) / (year_length / 7)
+ month_goal = db.year_goal / 12
+ week_goal = db.year_goal / (year_length / 7)
+ day_income = (db.workday_minutes_worked_1 / 60.0) * db.workday_hourly_rate_1
+ day_income += (db.workday_minutes_worked_2 / 60.0) * db.workday_hourly_rate_2
+ day_income += (db.workday_minutes_worked_3 / 60.0) * db.workday_hourly_rate_3
+ year_plus = db.year_income + day_income
+ month_plus = db.month_income + day_income
+ week_plus = db.week_income + day_income
+ progress_time_year = day_of_year / year_length
+ progress_time_month = today.day / calendar.monthrange(today.year, today.month)[1]
+ progress_time_week = today.weekday() / 7
+ progress_bars = [ProgressBar("year", year_plus, db.year_goal, progress_time_year),
+ ProgressBar("month", month_plus, month_goal, progress_time_month),
+ ProgressBar("week", week_plus, week_goal, progress_time_week),
+ ProgressBar("workday", day_income, workday_goal)]
+ page = tmpl.render(
+ progress_bars = progress_bars,
+ workday_hourly_rate_1 = db.workday_hourly_rate_1,
+ workday_minutes_worked_1 = db.workday_minutes_worked_1,
+ workday_hourly_rate_2 = db.workday_hourly_rate_2,
+ workday_minutes_worked_2 = db.workday_minutes_worked_2,
+ workday_hourly_rate_3 = db.workday_hourly_rate_3,
+ workday_minutes_worked_3 = db.workday_minutes_worked_3,
+ year_goal = db.year_goal,
+ month_goal = month_goal,
+ week_goal = week_goal,
+ workdays_per_month = db.workdays_per_month,
+ workday_goal = workday_goal,
+ workdays_per_week = workdays_per_week,
+ )
+ self.send_HTML(page)
+ except PlomException as e:
+ self.fail_400(e)
if __name__ == "__main__":
- webServer = HTTPServer((hostName, serverPort), MyServer)
- print(f"Server started http://{hostName}:{serverPort}")
- try:
- webServer.serve_forever()
- except KeyboardInterrupt:
- pass
- webServer.server_close()
- print("Server stopped.")
+ run_server(server_port, IncomeServer)
import decimal
from datetime import datetime, timedelta
from urllib.parse import parse_qs, urlparse
-hostName = "localhost"
-serverPort = 8082
+from plomlib import PlomDB, PlomException, run_server, run_server, PlomServer
-
-class HandledException(Exception):
- pass
+server_port = 8082
def apply_booking_to_account_balances(account_sums, account, currency, amount):
if inside_booking:
# assume we finished a booking, finalize, and commit to DB
if len(booking_lines) < 2:
- raise HandledException(f"{prefix} booking ends to early")
+ raise PlomException(f"{prefix} booking ends to early")
booking = Booking(date_string, description, booking_lines, start_line, validate_bookings)
bookings += [booking]
# expect new booking to follow so re-zeroall booking data
try:
datetime.strptime(date_string, '%Y-%m-%d')
except ValueError:
- raise HandledException(f"{prefix} bad date string: {date_string}")
+ raise PlomException(f"{prefix} bad date string: {date_string}")
if last_date > date_string:
- raise HandledException(f"{prefix} out-of-order-date")
+ raise PlomException(f"{prefix} out-of-order-date")
last_date = date_string
try:
description = toks[1]
except IndexError:
- raise HandledException(f"{prefix} bad description: {description}")
+ raise PlomException(f"{prefix} bad description: {description}")
inside_booking = True
booking_lines += [non_comment]
continue
# otherwise, read as transfer data
toks = non_comment.split() # ignore specification's allowance of single spaces in names
if len(toks) > 3:
- raise HandledException(f"{prefix} too many booking line tokens: {toks}")
+ raise PlomException(f"{prefix} too many booking line tokens: {toks}")
amount, currency = None, None
account_name = toks[0]
if account_name[0] == '[' and account_name[-1] == ']':
try:
amount = decimal.Decimal(toks[2])
except decimal.InvalidOperation:
- raise HandledException(f"{prefix} no decimal number in: {toks[1:]}")
+ raise PlomException(f"{prefix} no decimal number in: {toks[1:]}")
currency = toks[i_currency]
if currency[0] in decimal_chars:
- raise HandledException(f"{prefix} currency starts with int, dot, or minus: {currency}")
+ raise PlomException(f"{prefix} currency starts with int, dot, or minus: {currency}")
elif len(toks) == 2:
value = toks[1]
inside_amount = False
if inside_amount:
if c not in decimal_chars:
if len(currency) > 0:
- raise HandledException(f"{prefix} amount has non-decimal chars: {value}")
+ raise PlomException(f"{prefix} amount has non-decimal chars: {value}")
inside_currency = True
inside_amount = False
currency += c
continue
if c == '-' and len(amount_string) > 1:
- raise HandledException(f"{prefix} amount has non-start '-': {value}")
+ raise PlomException(f"{prefix} amount has non-start '-': {value}")
if c == '.':
if dots_counted > 1:
- raise HandledException(f"{prefix} amount has multiple dots: {value}")
+ raise PlomException(f"{prefix} amount has multiple dots: {value}")
dots_counted += 1
amount_string += c
if len(currency) == 0:
- raise HandledException(f"{prefix} currency missing: {value}")
+ raise PlomException(f"{prefix} currency missing: {value}")
if len(amount_string) > 0:
amount = decimal.Decimal(amount_string)
booking_lines += [(account_name, amount, currency)]
if inside_booking:
- raise HandledException(f"{prefix} last booking unfinished")
+ raise PlomException(f"{prefix} last booking unfinished")
return bookings, comments
_, amount, currency = line
if amount is None:
if empty_values > 0:
- raise HandledException(f"{prefix} relates more than one empty value of same currency {currency}")
+ raise PlomException(f"{prefix} relates more than one empty value of same currency {currency}")
empty_values += 1
continue
if currency not in sums:
if empty_values == 0:
for k, v in sums.items():
if v != 0:
- raise HandledException(f"{prefix} does not add up to zero / {k} {v}")
+ raise PlomException(f"{prefix} does not add up to zero / {k} {v}")
else:
sinkable = False
for k, v in sums.items():
if v != 0:
sinkable = True
if not sinkable:
- raise HandledException(f"{prefix} has empty value that cannot be filled")
+ raise PlomException(f"{prefix} has empty value that cannot be filled")
def parse_booking_lines_to_account_changes(self):
account_changes = {}
-class Database:
+class LedgerDB(PlomDB):
def __init__(self):
- db_name = "_ledger"
- self.db_file = db_name + ".json"
- self.lock_file = db_name+ ".lock"
self.bookings = []
self.comments = []
self.real_lines = []
- if os.path.exists(self.db_file):
- with open(self.db_file, "r") as f:
- self.real_lines += [l.rstrip() for l in f.readlines()]
+ super().__init__('_ledger')
ret = parse_lines(self.real_lines)
self.bookings += ret[0]
self.comments += ret[1]
+ def read_db_file(self, f):
+ self.real_lines += [l.rstrip() for l in f.readlines()]
+
def get_lines(self, start, end):
return self.real_lines[start:end]
def write_db(self, text, mode='w'):
- import shutil
- if os.path.exists(self.lock_file):
- raise HandledException('Sorry, lock file!')
- f = open(self.lock_file, 'w+')
- f.close()
-
- # collect modification times of numbered .bak files
- bak_prefix = f'{self.db_file}.bak.'
- backup_dates = []
- i = 0
- bak_as = f'{bak_prefix}{i}'
- while os.path.exists(bak_as):
- mod_time = os.path.getmtime(bak_as)
- backup_dates += [str(datetime.fromtimestamp(mod_time))]
- i += 1
- bak_as = f'{bak_prefix}{i}'
-
- # collect what numbered .bak files to save: the older, the fewer; for each
- # timedelta, keep the newest file that's older
- ages_to_keep = [timedelta(minutes=4**i) for i in range(0, 8)]
- now = datetime.now()
- to_save = []
- for age in ages_to_keep:
- limit = now - age
- for i, date in enumerate(reversed(backup_dates)):
- if datetime.strptime(date, '%Y-%m-%d %H:%M:%S.%f') < limit:
- unreversed_i = len(backup_dates) - i - 1
- if unreversed_i not in to_save:
- to_save += [unreversed_i]
- break
-
- # remove redundant backup files
- j = 0
- for i in to_save:
- if i != j:
- source = f'{bak_prefix}{i}'
- target = f'{bak_prefix}{j}'
- shutil.move(source, target)
- j += 1
- for i in range(j, len(backup_dates)):
- try:
- os.remove(f'{bak_prefix}{i}')
- except FileNotFoundError:
- pass
-
- # put copy of current state at end of bak list
- shutil.copy(self.db_file, f'{bak_prefix}{j}')
- with open(self.db_file, mode) as f:
- f.write(text);
- os.remove(self.lock_file)
+ self.write_text_to_db(text)
def insert_at_date(self, lines, date):
start_at = len(self.real_lines)
return ret
-class MyServer(BaseHTTPRequestHandler):
+class LedgerServer(PlomServer):
header = """<html>
<meta charset="UTF-8">
<style>
postvars = parse_qs(self.rfile.read(length).decode(), keep_blank_values=1)
start = int(postvars['start'][0])
end = int(postvars['end'][0])
- db = Database()
+ db = LedgerDB()
add_empty_line = None
lines = []
# get inputs
else:
edit_content = self.add_free(db, start, end)
self.send_HTML(self.header + edit_content + self.footer)
- except HandledException as e:
+ except PlomException as e:
self.fail_400(e)
def do_GET(self):
params = parse_qs(parsed_url.query)
start = int(params.get('start', ['0'])[0])
end = int(params.get('end', ['0'])[0])
- db = Database()
+ db = LedgerDB()
page = self.header
if parsed_url.path == '/balance':
stop = params.get('stop', [None])[0]
page += self.ledger_as_html(db)
page += self.footer
self.send_HTML(page)
- except HandledException as e:
+ except PlomException as e:
self.fail_400(e)
- def fail_400(self, e):
- page = f'{self.header}ERROR: {e}{self.footer}'
- self.send_HTML(page, 400)
-
- def send_HTML(self, html, code=200):
- self.send_code_and_headers(code, [('Content-type', 'text/html')])
- self.wfile.write(bytes(html, "utf-8"))
-
- def send_code_and_headers(self, code, headers=[]):
- self.send_response(code)
- for fieldname, content in headers:
- self.send_header(fieldname, content)
- self.end_headers()
-
def booking_lines_from_postvars(self, postvars, db):
add_empty_line = None
date = postvars['date'][0]
for currency in temp_bookings[0].sink:
amount = temp_bookings[0].sink[currency]
lines += [f'Assets {amount:.2f} {currency}']
- except HandledException:
+ except PlomException:
pass
if 'add_taxes' in postvars.keys():
lines += db.add_taxes(lines, finish=False)
lines = temp_lines if len(''.join(temp_lines)) > 0 else db.get_lines(start, end)
bookings, comments = parse_lines(lines, validate_bookings=False)
if len(bookings) > 1:
- raise HandledException('can only structurally edit single Booking')
+ raise PlomException('can only structurally edit single Booking')
if add_empty_line is not None:
comments = comments[:add_empty_line+1] + [''] + comments[add_empty_line+1:]
booking = bookings[0]
if __name__ == "__main__":
- webServer = HTTPServer((hostName, serverPort), MyServer)
- print(f"Server started http://{hostName}:{serverPort}")
- try:
- webServer.serve_forever()
- except KeyboardInterrupt:
- pass
- webServer.server_close()
- print("Server stopped.")
+ run_server(server_port, LedgerServer)
--- /dev/null
+import os
+from http.server import BaseHTTPRequestHandler
+
+
+class PlomException(Exception):
+ pass
+
+
+class PlomDB:
+
+ def __init__(self, db_name):
+ self.db_file = db_name + ".json"
+ self.lock_file = db_name+ ".lock"
+ if os.path.exists(self.db_file):
+ with open(self.db_file, "r") as f:
+ self.read_db_file(f)
+
+ def lock(self):
+ if os.path.exists(self.lock_file):
+ raise PlomException('Sorry, lock file!')
+ f = open(self.lock_file, 'w+')
+ f.close()
+
+ def unlock(self):
+ os.remove(self.lock_file)
+
+ def backup(self):
+ import shutil
+ from datetime import datetime, timedelta
+ # collect modification times of numbered .bak files
+ bak_prefix = f'{self.db_file}.bak.'
+ backup_dates = []
+ i = 0
+ bak_as = f'{bak_prefix}{i}'
+ while os.path.exists(bak_as):
+ mod_time = os.path.getmtime(bak_as)
+ backup_dates += [str(datetime.fromtimestamp(mod_time))]
+ i += 1
+ bak_as = f'{bak_prefix}{i}'
+
+ # collect what numbered .bak files to save: the older, the fewer; for each
+ # timedelta, keep the newest file that's older
+ ages_to_keep = [timedelta(minutes=4**i) for i in range(0, 8)]
+ now = datetime.now()
+ to_save = []
+ for age in ages_to_keep:
+ limit = now - age
+ for i, date in enumerate(reversed(backup_dates)):
+ if datetime.strptime(date, '%Y-%m-%d %H:%M:%S.%f') < limit:
+ unreversed_i = len(backup_dates) - i - 1
+ if unreversed_i not in to_save:
+ to_save += [unreversed_i]
+ break
+
+ # remove redundant backup files
+ j = 0
+ for i in to_save:
+ if i != j:
+ source = f'{bak_prefix}{i}'
+ target = f'{bak_prefix}{j}'
+ shutil.move(source, target)
+ j += 1
+ for i in range(j, len(backup_dates)):
+ try:
+ os.remove(f'{bak_prefix}{i}')
+ except FileNotFoundError:
+ pass
+
+ # put copy of current state at end of bak list
+ shutil.copy(self.db_file, f'{bak_prefix}{j}')
+
+ def write_text_to_db(self, text, mode='w'):
+ self.lock()
+ self.backup()
+ with open(self.db_file, mode) as f:
+ f.write(text);
+ self.unlock()
+
+
+class PlomServer(BaseHTTPRequestHandler):
+ header = ''
+ footer = ''
+
+ def run(self, port):
+ from http.server import HTTPServer
+ webServer = HTTPServer(('localhost', port), type(self))
+ print(f"Server started http://localhost:{port}")
+ try:
+ webServer.serve_forever()
+ except KeyboardInterrupt:
+ pass
+ webServer.server_close()
+ print("Server stopped.")
+
+ def fail_400(self, e):
+ page = f'{self.header}ERROR: {e}{self.footer}'
+ self.send_HTML(page, 400)
+
+ def send_HTML(self, html, code=200):
+ self.send_code_and_headers(code, [('Content-type', 'text/html')])
+ self.wfile.write(bytes(html, "utf-8"))
+
+ def send_code_and_headers(self, code, headers=[]):
+ self.send_response(code)
+ for fieldname, content in headers:
+ self.send_header(fieldname, content)
+ self.end_headers()
+
+
+def run_server(port, server_class):
+ from http.server import HTTPServer
+ webServer = HTTPServer(('localhost', port), server_class)
+ print(f"Server started http://localhost:{port}")
+ try:
+ webServer.serve_forever()
+ except KeyboardInterrupt:
+ pass
+ webServer.server_close()
+ print("Server stopped.")