from os import environ
from pathlib import Path
from sys import exit as sys_exit
-from typing import Any, Optional, Self
+from typing import Any, Generator, Optional, Self
# non-standard libs
try:
from plomlib.web import PlomHttpHandler, PlomHttpServer, PlomQueryMap
class Account:
"""Combine name, position in tree of own, and wealth of self + children."""
- def __init__(self, parent: Optional['Account'], basename: str, desc: str
- ) -> None:
- self.local_wealth = Wealth()
+ def __init__(self, parent: Optional['Account'], basename: str) -> None:
+ self._wealth_diffs: dict[int, Wealth] = {}
self.basename = basename
- self.desc = desc
+ self.desc = ''
self.children: list[Self] = []
self.parent = parent
if self.parent:
self.parent.children += [self]
- @property
- def wealth(self) -> Wealth:
- """Total of .local_wealth with that of .children."""
+ def _get_local_wealth(self, up_incl: int) -> Wealth:
+ """Calculate by summing all recorded wealth diffs up+incl. Booking."""
+ wealth = Wealth()
+ for wealth_diff in [wd for id_, wd in self._wealth_diffs.items()
+ if id_ <= up_incl]:
+ wealth += wealth_diff
+ return wealth
+
+ def get_wealth(self, up_incl: int) -> Wealth:
+ """Total of .local_wealth with that of .children up+incl. Booking."""
total = Wealth()
- total += self.local_wealth
+ total += self._get_local_wealth(up_incl)
for child in self.children:
- total += child.wealth
+ total += child.get_wealth(up_incl)
return total
+ def add_wealth_diff(self, booking_id: int, wealth_diff: Wealth) -> None:
+ """Add knowledge that Booking of booking_add added wealth_diff."""
+ if booking_id in self._wealth_diffs:
+ self._wealth_diffs[booking_id] += wealth_diff
+ else:
+ self._wealth_diffs[booking_id] = wealth_diff
+
+ @staticmethod
+ def path_to_steps(full_path: str) -> Generator[tuple[str, str]]:
+ """Split full_path into steps, for each return its path, basename."""
+ rebuilt_path = ''
+ for step_name in full_path.split(':'):
+ rebuilt_path += (':' if rebuilt_path else '') + step_name
+ yield rebuilt_path, step_name
+
+ @classmethod
+ def ensure_in_dict(cls, full_path: str, paths_to_accs: dict[str, Self]
+ ) -> None:
+ """If full_path not key in paths_to_accs, add it with new Account."""
+ parent_path = ''
+ for path, step_name in cls.path_to_steps(full_path):
+ if path not in paths_to_accs:
+ paths_to_accs[path] = cls(
+ paths_to_accs[parent_path] if parent_path else None,
+ step_name)
+ parent_path = path
+
class DatLine(Dictable):
"""Line of .dat file parsed into comments and machine-readable data."""
self.booking_line: Optional[BookingLine] = None
self.hide_comment_in_ledger = False
+ @property
+ def comment_instructions(self) -> dict[str, str]:
+ """Parse .comment into Account modification instructions."""
+ instructions = {}
+ if self.comment.startswith(PREFIX_DEF):
+ parts = [part.strip() for part
+ in self.comment[len(PREFIX_DEF):].split(';')]
+ first_part_parts = parts[0].split(maxsplit=1)
+ account_name = first_part_parts[0]
+ description = first_part_parts[1] if first_part_parts else ''
+ instructions[account_name] = description
+ return instructions
+
@property
def comment_in_ledger(self) -> str:
- """What to show in structured ledger view (as per .hide_comment…)."""
- return '' if self.hide_comment_in_ledger else self.comment
+ """What to show in structured ledger view (no instructions)."""
+ return '' if len(self.comment_instructions) > 0 else self.comment
@property
def is_intro(self) -> bool:
return False
def apply_to_account_dict(self, acc_dict: dict[str, Account]) -> None:
- """To account dictionary of expected keys, apply .account_changes."""
+ """Update account directory with data from .account_changes."""
for acc_name, wealth in self.account_changes.items():
- acc_dict[acc_name].local_wealth += wealth
+ Account.ensure_in_dict(acc_name, acc_dict)
+ acc_dict[acc_name].add_wealth_diff(self.id_, wealth)
class Handler(PlomHttpHandler):
def get_balance(self, ctx) -> None:
"""Display tree of calculated Accounts over .bookings[:up_incl+1]."""
id_ = int(self.params.first('up_incl') or '-1')
- to_balance = (self.server.bookings[:id_ + 1] if id_ >= 0
- else self.server.bookings)
- valid = 0 == len([b for b in to_balance if b.is_questionable])
- acc_dict = self.server.empty_accounts_by_path(id_)
- for booking in to_balance:
- booking.apply_to_account_dict(acc_dict)
- ctx['roots'] = [ac for ac in acc_dict.values() if not ac.parent]
- ctx['valid'] = valid
+ ctx['roots'] = [ac for ac in self.server.accounts.values()
+ if not ac.parent]
+ ctx['valid'] = self.server.bookings_valid_up_incl(id_)
ctx['booking'] = self.server.bookings[id_]
ctx['path_up_incl'] = f'{self.path_toks[1]}?up_incl='
self._send_rendered('balance', ctx)
"""Display edit form for individual Booking."""
id_ = int(self.path_toks[2])
booking = self.server.bookings[id_]
- to_balance = self.server.bookings[:id_ + 1]
- accounts_after = self.server.empty_accounts_by_path()
- accounts_before = self.server.empty_accounts_by_path()
- for b in to_balance:
- if b != booking:
- b.apply_to_account_dict(accounts_before)
- b.apply_to_account_dict(accounts_after)
observed_tree: list[dict[str, Any]] = []
- for full_name in sorted(booking.account_changes.keys()):
+ for full_path in sorted(booking.account_changes.keys()):
parent_children: list[dict[str, Any]] = observed_tree
- path = ''
- for step_name in full_name.split(':'):
- path = ':'.join([path, step_name]) if path else step_name
+ for path, _ in Account.path_to_steps(full_path):
already_registered = False
for child in [n for n in parent_children if path == n['name']]:
parent_children = child['children']
break
if already_registered:
continue
- wealth_before = accounts_before[path].wealth
- wealth_after = accounts_after[path].wealth
- direct_target = full_name == path
+ before = self.server.accounts[path].get_wealth(id_ - 1)
+ after = self.server.accounts[path].get_wealth(id_)
+ direct_target = full_path == path
diff = {
- c: a for c, a in (wealth_after - wealth_before
- ).moneys.items()
- if a != 0
+ cur: amt for cur, amt in (after - before).moneys.items()
+ if amt != 0
or (direct_target
- and c in booking.account_changes[full_name].moneys)}
+ and cur in booking.account_changes[full_path].moneys)}
if diff or direct_target:
displayed_currencies = set(diff.keys())
- for wealth in wealth_before, wealth_after:
+ for wealth in before, after:
wealth.ensure_currencies(displayed_currencies)
wealth.purge_currencies_except(displayed_currencies)
node: dict[str, Any] = {
'name': path,
'direct_target': direct_target,
- 'wealth_before': wealth_before.moneys,
+ 'wealth_before': before.moneys,
'wealth_diff': diff,
- 'wealth_after': wealth_after.moneys,
+ 'wealth_after': after.moneys,
'children': []}
parent_children += [node]
parent_children = node['children']
+ ctx['roots'] = observed_tree
ctx['id'] = id_
ctx['dat_lines'] = [dl if raw else dl.as_dict
for dl in booking.booked_lines]
- ctx['valid'] = 0 == len([b for b in to_balance if b.is_questionable])
- ctx['roots'] = observed_tree
+ ctx['valid'] = self.server.bookings_valid_up_incl(id_)
if not raw:
- ctx['all_accounts'] = sorted(accounts_after.keys())
+ ctx['all_accounts'] = sorted(self.server.accounts.keys())
self._send_rendered(EDIT_RAW if raw else EDIT_STRUCT, ctx)
def get_ledger(self, ctx: dict[str, Any], raw: bool) -> None:
class Server(PlomHttpServer):
"""Extends parent by loading .dat file into database for Handler."""
+ accounts: dict[str, Account]
bookings: list[Booking]
dat_lines: list[DatLine]
initial_gap_lines: list[DatLine]
def load(self) -> None:
"""Read into ledger file at .path_dat."""
+ self.accounts, self.bookings, self.initial_gap_lines = {}, [], []
self.dat_lines = [
DatLine(line)
for line in self._path_dat.read_text(encoding='utf8').splitlines()]
booked_lines: list[DatLine] = []
gap_lines: list[DatLine] = []
booking: Optional[Booking] = None
- self.bookings, self.initial_gap_lines, last_date = [], [], ''
+ last_date = ''
for dat_line in self.dat_lines + [DatLine('')]:
if dat_line.code:
if gap_lines:
'date < previous valid date']
else:
last_date = booking.date
+ booking.apply_to_account_dict(self.accounts)
self.bookings += [booking]
booked_lines.clear()
- gap_lines += [dat_line]
- self.paths_to_descs = {}
- for dat_line in [dl for dl in self.dat_lines
- if dl.comment.startswith(PREFIX_DEF)]:
- parts = [part.strip() for part
- in dat_line.comment[len(PREFIX_DEF):].split(';')]
- first_part_parts = parts[0].split(maxsplit=1)
- account_name = first_part_parts[0]
- desc = first_part_parts[1] if len(first_part_parts) > 1 else ''
- if desc:
- self.paths_to_descs[account_name] = desc
- dat_line.hide_comment_in_ledger = True
+ for acc_name, desc in dat_line.comment_instructions.items():
+ Account.ensure_in_dict(acc_name, self.accounts)
+ self.accounts[acc_name].desc = desc
for booking in self.bookings:
booking.recalc_prev_next(self.bookings)
if booking:
def _hash_dat_lines(self) -> int:
return hash(tuple(dl.raw for dl in self.dat_lines))
- def empty_accounts_by_path(self,
- up_incl: int = -1
- ) -> dict[str, 'Account']:
- """Dict of Accounts refered in .bookings till up_incl by paths."""
- booked_names = set()
- for booking in (self.bookings[:up_incl + 1] if up_incl >= 0
- else self.bookings):
- for account_name in booking.account_changes:
- booked_names.add(account_name)
- paths_to_accs: dict[str, Account] = {}
- for full_name in sorted(list(booked_names)):
- path = ''
- for step_name in full_name.split(':'):
- parent_name = path[:]
- path = ':'.join([path, step_name]) if path else step_name
- if path not in paths_to_accs:
- paths_to_accs[path] = Account(
- paths_to_accs[parent_name] if parent_name else None,
- step_name,
- self.paths_to_descs.get(path, ''))
- return paths_to_accs
+ def bookings_valid_up_incl(self, booking_id: int) -> bool:
+ """If no .is_questionable in self.bookings up to booking_id."""
+ return len([b for b in self.bookings[:booking_id + 1]
+ if b.is_questionable]
+ ) < 1
@property
def tainted(self) -> bool: