From 71a0c94ce508dcd21165e06a955043d021efb89f Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Tue, 28 Nov 2023 07:12:22 +0100 Subject: [PATCH] Improve accounting scripts. --- calories.py | 13 ++- income_progress_bars.py | 172 +++++++++++++++++++--------------------- ledger.py | 163 ++++++++++++++++++------------------- plomlib.py | 6 ++ unite.py | 60 +++++++------- 5 files changed, 203 insertions(+), 211 deletions(-) diff --git a/calories.py b/calories.py index 28b1a16..9951ce5 100644 --- a/calories.py +++ b/calories.py @@ -233,7 +233,7 @@ class ConsumptionsHandler(PlomHandler): return 'consumptions', default_path def do_POST(self): - self.write_db() + self.try_do(self.write_db) def write_db(self): from uuid import uuid4 @@ -289,15 +289,12 @@ class ConsumptionsHandler(PlomHandler): default_slots -= 1 if (default_slots <= 0): break - try: - db.write() - homepage = self.apps['consumptions'] if hasattr(self, 'apps') else self.homepage - self.redirect(homepage) - except PlomException as e: - self.fail_400(e) + db.write() + homepage = self.apps['consumptions'] if hasattr(self, 'apps') else self.homepage + self.redirect(homepage) def do_GET(self): - self.show_db() + self.try_do(self.show_db) def show_db(self): db = CaloriesDB() diff --git a/income_progress_bars.py b/income_progress_bars.py index 60f7104..5d9a887 100644 --- a/income_progress_bars.py +++ b/income_progress_bars.py @@ -170,104 +170,98 @@ class IncomeProgressHandler(PlomHandler): return 'income_progress', default_path def do_POST(self): - self.post_income_update() + self.try_do(self.post_income_update) def post_income_update(self): from urllib.parse import parse_qs - try: - length = int(self.headers['content-length']) - postvars = parse_qs(self.rfile.read(length).decode(), keep_blank_values=1) - db = IncomeDB() - db.workday_minutes_worked_1 = int(postvars['workday_minutes_worked_1'][0]) - db.workday_minutes_worked_2 = int(postvars['workday_minutes_worked_2'][0]) - db.workday_minutes_worked_3 = int(postvars['workday_minutes_worked_3'][0]) - db.workday_hourly_rate_1 = int(postvars['workday_hourly_rate_1'][0]) - db.workday_hourly_rate_2 = int(postvars['workday_hourly_rate_2'][0]) - db.workday_hourly_rate_3 = int(postvars['workday_hourly_rate_3'][0]) - db.year_goal = int(postvars['year_goal'][0]) - db.workdays_per_month = int(postvars['workdays_per_month'][0]) - if 'finish' in postvars.keys(): - day_income = (db.workday_minutes_worked_1 / 60.0) * db.workday_hourly_rate_1 - day_income += (db.workday_minutes_worked_2 / 60.0) * db.workday_hourly_rate_2 - day_income += (db.workday_minutes_worked_3 / 60.0) * db.workday_hourly_rate_3 - db.year_income += day_income - db.month_income += day_income - db.week_income += day_income - db.workday_minutes_worked_1 = 0 - db.workday_minutes_worked_2 = 0 - db.workday_minutes_worked_3 = 0 - db.write_db() - homepage = self.apps['income_progress'] if hasattr(self, 'apps') else self.homepage - self.redirect(homepage) - except PlomException as e: - self.fail_400(e) + length = int(self.headers['content-length']) + postvars = parse_qs(self.rfile.read(length).decode(), keep_blank_values=1) + db = IncomeDB() + db.workday_minutes_worked_1 = int(postvars['workday_minutes_worked_1'][0]) + db.workday_minutes_worked_2 = int(postvars['workday_minutes_worked_2'][0]) + db.workday_minutes_worked_3 = int(postvars['workday_minutes_worked_3'][0]) + db.workday_hourly_rate_1 = int(postvars['workday_hourly_rate_1'][0]) + db.workday_hourly_rate_2 = int(postvars['workday_hourly_rate_2'][0]) + db.workday_hourly_rate_3 = int(postvars['workday_hourly_rate_3'][0]) + db.year_goal = int(postvars['year_goal'][0]) + db.workdays_per_month = int(postvars['workdays_per_month'][0]) + if 'finish' in postvars.keys(): + day_income = (db.workday_minutes_worked_1 / 60.0) * db.workday_hourly_rate_1 + day_income += (db.workday_minutes_worked_2 / 60.0) * db.workday_hourly_rate_2 + day_income += (db.workday_minutes_worked_3 / 60.0) * db.workday_hourly_rate_3 + db.year_income += day_income + db.month_income += day_income + db.week_income += day_income + db.workday_minutes_worked_1 = 0 + db.workday_minutes_worked_2 = 0 + db.workday_minutes_worked_3 = 0 + db.write_db() + homepage = self.apps['income_progress'] if hasattr(self, 'apps') else self.homepage + self.redirect(homepage) def do_GET(self): - self.display_income_progress() + self.try_do(self.display_income_progress) def display_income_progress(self): import datetime import calendar - try: - db = IncomeDB() - today = datetime.datetime.now() - update_db = False - if today.year != db.timestamp_year: - db.timestamp_year = today.year - db.timestamp_month = today.month - db.year_income = 0 - db.month_income = 0 - update_db = True - if today.month != db.timestamp_month: - db.timestamp_month = today.month - db.month_income = 0 - update_db = True - if today.isocalendar()[1] != db.timestamp_week: - db.timestamp_week = today.isocalendar()[1] - db.week_income = 0 - update_db = True - if update_db: - print("Resetting timestamp") - db.write_db() - day_of_year = today.toordinal() - datetime.date(today.year, 1, 1).toordinal() + 1 - year_length = 365 + calendar.isleap(today.year) - workday_goal = db.year_goal / 12 / db.workdays_per_month - workdays_per_week = (db.workdays_per_month * 12) / (year_length / 7) - month_goal = db.year_goal / 12 - week_goal = db.year_goal / (year_length / 7) - day_income = (db.workday_minutes_worked_1 / 60.0) * db.workday_hourly_rate_1 - day_income += (db.workday_minutes_worked_2 / 60.0) * db.workday_hourly_rate_2 - day_income += (db.workday_minutes_worked_3 / 60.0) * db.workday_hourly_rate_3 - year_plus = db.year_income + day_income - month_plus = db.month_income + day_income - week_plus = db.week_income + day_income - progress_time_year = day_of_year / year_length - progress_time_month = today.day / calendar.monthrange(today.year, today.month)[1] - progress_time_week = today.weekday() / 7 - progress_bars = [ProgressBar("year", year_plus, db.year_goal, progress_time_year), - ProgressBar("month", month_plus, month_goal, progress_time_month), - ProgressBar("week", week_plus, week_goal, progress_time_week), - ProgressBar("workday", day_income, workday_goal)] - homepage = self.apps['income_progress'] if hasattr(self, 'apps') else self.homepage - page = jinja2.Template(tmpl).render( - homepage = homepage, - progress_bars = progress_bars, - workday_hourly_rate_1 = db.workday_hourly_rate_1, - workday_minutes_worked_1 = db.workday_minutes_worked_1, - workday_hourly_rate_2 = db.workday_hourly_rate_2, - workday_minutes_worked_2 = db.workday_minutes_worked_2, - workday_hourly_rate_3 = db.workday_hourly_rate_3, - workday_minutes_worked_3 = db.workday_minutes_worked_3, - year_goal = db.year_goal, - month_goal = month_goal, - week_goal = week_goal, - workdays_per_month = db.workdays_per_month, - workday_goal = workday_goal, - workdays_per_week = workdays_per_week, - ) - self.send_HTML(page) - except PlomException as e: - self.fail_400(e) + db = IncomeDB() + today = datetime.datetime.now() + update_db = False + if today.year != db.timestamp_year: + db.timestamp_year = today.year + db.timestamp_month = today.month + db.year_income = 0 + db.month_income = 0 + update_db = True + if today.month != db.timestamp_month: + db.timestamp_month = today.month + db.month_income = 0 + update_db = True + if today.isocalendar()[1] != db.timestamp_week: + db.timestamp_week = today.isocalendar()[1] + db.week_income = 0 + update_db = True + if update_db: + print("Resetting timestamp") + db.write_db() + day_of_year = today.toordinal() - datetime.date(today.year, 1, 1).toordinal() + 1 + year_length = 365 + calendar.isleap(today.year) + workday_goal = db.year_goal / 12 / db.workdays_per_month + workdays_per_week = (db.workdays_per_month * 12) / (year_length / 7) + month_goal = db.year_goal / 12 + week_goal = db.year_goal / (year_length / 7) + day_income = (db.workday_minutes_worked_1 / 60.0) * db.workday_hourly_rate_1 + day_income += (db.workday_minutes_worked_2 / 60.0) * db.workday_hourly_rate_2 + day_income += (db.workday_minutes_worked_3 / 60.0) * db.workday_hourly_rate_3 + year_plus = db.year_income + day_income + month_plus = db.month_income + day_income + week_plus = db.week_income + day_income + progress_time_year = day_of_year / year_length + progress_time_month = today.day / calendar.monthrange(today.year, today.month)[1] + progress_time_week = today.weekday() / 7 + progress_bars = [ProgressBar("year", year_plus, db.year_goal, progress_time_year), + ProgressBar("month", month_plus, month_goal, progress_time_month), + ProgressBar("week", week_plus, week_goal, progress_time_week), + ProgressBar("workday", day_income, workday_goal)] + homepage = self.apps['income_progress'] if hasattr(self, 'apps') else self.homepage + page = jinja2.Template(tmpl).render( + homepage = homepage, + progress_bars = progress_bars, + workday_hourly_rate_1 = db.workday_hourly_rate_1, + workday_minutes_worked_1 = db.workday_minutes_worked_1, + workday_hourly_rate_2 = db.workday_hourly_rate_2, + workday_minutes_worked_2 = db.workday_minutes_worked_2, + workday_hourly_rate_3 = db.workday_hourly_rate_3, + workday_minutes_worked_3 = db.workday_minutes_worked_3, + year_goal = db.year_goal, + month_goal = month_goal, + week_goal = week_goal, + workdays_per_month = db.workdays_per_month, + workday_goal = workday_goal, + workdays_per_week = workdays_per_week, + ) + self.send_HTML(page) if __name__ == "__main__": diff --git a/ledger.py b/ledger.py index 3071df6..6653af6 100755 --- a/ledger.py +++ b/ledger.py @@ -365,7 +365,6 @@ class LedgerDB(PlomDB): return self.write_lines_in_total_lines_at(self.real_lines, start_at, lines) def update(self, start, end, lines, date): - print("DEBUG update", date) total_lines = self.real_lines[:start] + self.real_lines[end:] n_original_lines = end - start start_at = len(total_lines) @@ -727,97 +726,91 @@ class LedgerHandler(PlomHandler): return 'ledger', default_path def do_POST(self): - self.forward_posts() + self.try_do(self.forward_posts) def forward_posts(self): - try: - prefix = self.apps['ledger'] if hasattr(self, 'apps') else '' - parsed_url = urlparse(self.path) - length = int(self.headers['content-length']) - postvars = parse_qs(self.rfile.read(length).decode(), keep_blank_values=1) - print("DEBUG", postvars['start'], postvars['end']) - start = int(postvars['start'][0]) - end = int(postvars['end'][0]) - db = LedgerDB(prefix) - add_empty_line = None - lines = [] - # get inputs - if prefix + '/add_structured' == parsed_url.path and not 'revert' in postvars.keys(): - lines, add_empty_line = db.booking_lines_from_postvars(postvars) - elif prefix + '/add_free' == parsed_url.path and not 'revert' in postvars.keys(): - lines = postvars['booking'][0].splitlines() - # validate where appropriate - if ('save' in postvars.keys()) or ('check' in postvars.keys()): - _, _ = parse_lines(lines) - # if saving, process where to and where to redirect after - if 'save' in postvars.keys(): - last_date = str(datetime.now())[:10] - if len(db.bookings) > 0: - last_date = db.bookings[-1].date_string - target_date = last_date[:] - first_line_tokens = lines[0].split() if len(lines) > 0 else '' - first_token = first_line_tokens[0] if len(first_line_tokens) > 0 else '' - try: - datetime.strptime(first_token, '%Y-%m-%d') - target_date = first_token - except ValueError: - pass - if start == end == 0: - start = db.insert_at_date(lines, target_date) - nth = db.get_nth_for_booking_of_start_line(start) - else: - new_start = db.update(start, end, lines, target_date) - nth = db.get_nth_for_booking_of_start_line(new_start) - if new_start > start: - nth -= 1 - self.redirect(prefix + f'/#{nth}') - # otherwise just re-build editing form + prefix = self.apps['ledger'] if hasattr(self, 'apps') else '' + parsed_url = urlparse(self.path) + length = int(self.headers['content-length']) + postvars = parse_qs(self.rfile.read(length).decode(), keep_blank_values=1) + print("DEBUG", postvars['start'], postvars['end']) + start = int(postvars['start'][0]) + end = int(postvars['end'][0]) + db = LedgerDB(prefix) + add_empty_line = None + lines = [] + # get inputs + if prefix + '/add_structured' == parsed_url.path and not 'revert' in postvars.keys(): + lines, add_empty_line = db.booking_lines_from_postvars(postvars) + elif prefix + '/add_free' == parsed_url.path and not 'revert' in postvars.keys(): + lines = postvars['booking'][0].splitlines() + # validate where appropriate + if ('save' in postvars.keys()) or ('check' in postvars.keys()): + _, _ = parse_lines(lines) + # if saving, process where to and where to redirect after + if 'save' in postvars.keys(): + last_date = str(datetime.now())[:10] + if len(db.bookings) > 0: + last_date = db.bookings[-1].date_string + target_date = last_date[:] + first_line_tokens = lines[0].split() if len(lines) > 0 else '' + first_token = first_line_tokens[0] if len(first_line_tokens) > 0 else '' + try: + datetime.strptime(first_token, '%Y-%m-%d') + target_date = first_token + except ValueError: + pass + if start == end == 0: + start = db.insert_at_date(lines, target_date) + nth = db.get_nth_for_booking_of_start_line(start) else: - if prefix + '/add_structured' == parsed_url.path: - edit_content = db.add_structured(start, end, temp_lines=lines, add_empty_line=add_empty_line) - else: - edit_content = db.add_free(start, end) - header = jinja2.Template(html_head).render(prefix=prefix) - self.send_HTML(header + edit_content) - except PlomException as e: - self.fail_400(e) + new_start = db.update(start, end, lines, target_date) + nth = db.get_nth_for_booking_of_start_line(new_start) + if new_start > start: + nth -= 1 + self.redirect(prefix + f'/#{nth}') + # otherwise just re-build editing form + else: + if prefix + '/add_structured' == parsed_url.path: + edit_content = db.add_structured(start, end, temp_lines=lines, add_empty_line=add_empty_line) + else: + edit_content = db.add_free(start, end) + header = jinja2.Template(html_head).render(prefix=prefix) + self.send_HTML(header + edit_content) def do_GET(self): - self.forward_gets() + self.try_do(self.forward_gets) def forward_gets(self): - try: - prefix = self.apps['ledger'] if hasattr(self, 'apps') else '' - parsed_url = urlparse(self.path) - params = parse_qs(parsed_url.query) - start = int(params.get('start', ['0'])[0]) - end = int(params.get('end', ['0'])[0]) - db = LedgerDB(prefix=prefix) - if parsed_url.path == prefix + '/balance': - stop = params.get('stop', [None])[0] - page = db.balance_as_html(stop) - elif parsed_url.path == prefix + '/add_free': - page = db.add_free(start, end) - elif parsed_url.path == prefix + '/add_structured': - page = db.add_structured(start, end) - elif parsed_url.path == prefix + '/copy_free': - page = db.add_free(start, end, copy=True) - elif parsed_url.path == prefix + '/copy_structured': - page = db.add_structured(start, end, copy=True) - elif parsed_url.path == prefix + '/move_up': - nth = db.move_up(start, end) - self.redirect(prefix + f'/#{nth}') - return - elif parsed_url.path == prefix + '/move_down': - nth = db.move_down(start, end) - self.redirect(prefix + f'/#{nth}') - return - else: - page = db.ledger_as_html() - header = jinja2.Template(html_head).render(prefix=prefix) - self.send_HTML(header + page) - except PlomException as e: - self.fail_400(e) + prefix = self.apps['ledger'] if hasattr(self, 'apps') else '' + parsed_url = urlparse(self.path) + params = parse_qs(parsed_url.query) + start = int(params.get('start', ['0'])[0]) + end = int(params.get('end', ['0'])[0]) + db = LedgerDB(prefix=prefix) + if parsed_url.path == prefix + '/balance': + stop = params.get('stop', [None])[0] + page = db.balance_as_html(stop) + elif parsed_url.path == prefix + '/add_free': + page = db.add_free(start, end) + elif parsed_url.path == prefix + '/add_structured': + page = db.add_structured(start, end) + elif parsed_url.path == prefix + '/copy_free': + page = db.add_free(start, end, copy=True) + elif parsed_url.path == prefix + '/copy_structured': + page = db.add_structured(start, end, copy=True) + elif parsed_url.path == prefix + '/move_up': + nth = db.move_up(start, end) + self.redirect(prefix + f'/#{nth}') + return + elif parsed_url.path == prefix + '/move_down': + nth = db.move_down(start, end) + self.redirect(prefix + f'/#{nth}') + return + else: + page = db.ledger_as_html() + header = jinja2.Template(html_head).render(prefix=prefix) + self.send_HTML(header + page) diff --git a/plomlib.py b/plomlib.py index 18c2f18..739b00f 100644 --- a/plomlib.py +++ b/plomlib.py @@ -101,6 +101,12 @@ class PlomHandler(BaseHTTPRequestHandler): def redirect(self, url='/'): self.send_code_and_headers(302, [('Location', url)]) + def try_do(self, do_method): + try: + do_method() + except PlomException as e: + self.fail_400(e) + def run_server(port, handler_class): diff --git a/unite.py b/unite.py index 64a72af..de8be07 100644 --- a/unite.py +++ b/unite.py @@ -3,6 +3,7 @@ from urllib.parse import urlparse from income_progress_bars import IncomeProgressHandler from calories import ConsumptionsHandler from ledger import LedgerHandler +from todo import TodoHandler server_port = 8081 @@ -22,41 +23,42 @@ class UnitedRequestHandler(PlomHandler): cls.routes[method][path] = service def do_POST(self): - try: - parsed_url = urlparse(self.path) - path_toks = parsed_url.path.split('/') - while len(path_toks) > 0: - target_path = '/'.join(path_toks) - if target_path in self.routes['POST'].keys(): - self.routes['POST'][target_path](self) - return - path_toks.pop() - page = 'nothing to post?' - self.send_HTML(page) - except PlomException as e: - self.fail_400(e) + self.try_do(self._do_posts) + + def _do_posts(self): + parsed_url = urlparse(self.path) + path_toks = parsed_url.path.split('/') + while len(path_toks) > 0: + target_path = '/'.join(path_toks) + if target_path in self.routes['POST'].keys(): + self.routes['POST'][target_path](self) + return + path_toks.pop() + page = 'nothing to post?' + self.send_HTML(page) def do_GET(self): - try: - parsed_url = urlparse(self.path) - path_toks = parsed_url.path.split('/') - while len(path_toks) > 0: - target_path = '/'.join(path_toks) - print(target_path) - if target_path in self.routes['GET'].keys(): - self.routes['GET'][target_path](self) - return - path_toks.pop() - page = 'hi there!
' - for route in self.routes['GET']: - page += f'{route}
' - self.send_HTML(page) - except PlomException as e: - self.fail_400(e) + self.try_do(self._do_gets) + + def _do_gets(self): + parsed_url = urlparse(self.path) + path_toks = parsed_url.path.split('/') + while len(path_toks) > 0: + target_path = '/'.join(path_toks) + print(target_path) + if target_path in self.routes['GET'].keys(): + self.routes['GET'][target_path](self) + return + path_toks.pop() + page = 'hi there!
' + for route in self.routes['GET']: + page += f'{route}
' + self.send_HTML(page) if __name__ == "__main__": UnitedRequestHandler.register_app(IncomeProgressHandler) UnitedRequestHandler.register_app(ConsumptionsHandler) UnitedRequestHandler.register_app(LedgerHandler) + UnitedRequestHandler.register_app(TodoHandler) run_server(server_port, UnitedRequestHandler) -- 2.30.2