From: Christian Heller Date: Wed, 19 Mar 2025 09:50:04 +0000 (+0100) Subject: Split off majority of code into separate module files. X-Git-Url: https://plomlompom.com/repos/%7B%7B%20web_path%20%7D%7D/%7B%7Bprefix%7D%7D/day_todos?a=commitdiff_plain;h=d98082af08a05ade8d7b58759acd7bbcbde6c430;p=ledgplom Split off majority of code into separate module files. --- diff --git a/src/ledgplom/__init__.py b/src/ledgplom/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ledgplom/http.py b/src/ledgplom/http.py new file mode 100644 index 0000000..a704c27 --- /dev/null +++ b/src/ledgplom/http.py @@ -0,0 +1,193 @@ +"""Collect directly HTTP-related elements.""" + +# standard libs +from pathlib import Path +from typing import Any +# non-standard libs +from plomlib.web import PlomHttpHandler, PlomHttpServer, PlomQueryMap +from ledgplom.ledger import Account, DatLine, Ledger + + +_SERVER_PORT = 8084 +_SERVER_HOST = '127.0.0.1' +_PATH_TEMPLATES = Path('templates') +_PREFIX_LEDGER = 'ledger_' +_PREFIX_EDIT = 'edit_' +_PREFIX_FILE = 'file_' +_TOK_STRUCT = 'structured' +_TOK_RAW = 'raw' +_PAGENAME_EDIT_STRUCT = f'{_PREFIX_EDIT}{_TOK_STRUCT}' +_PAGENAME_EDIT_RAW = f'{_PREFIX_EDIT}{_TOK_RAW}' +_PAGENAME_LEDGER_STRUCT = f'{_PREFIX_LEDGER}{_TOK_STRUCT}' +_PAGENAME_LEDGER_RAW = f'{_PREFIX_LEDGER}{_TOK_RAW}' + + +class Server(PlomHttpServer): + """Extends parent by loading .dat file into database for Handler.""" + + def __init__(self, path_dat: Path, *args, **kwargs) -> None: + super().__init__(_PATH_TEMPLATES, (_SERVER_HOST, _SERVER_PORT), + _Handler) + self.ledger = Ledger(path_dat) + + +class _Handler(PlomHttpHandler): + """"Handles HTTP requests.""" + mapper = PlomQueryMap + + def _send_rendered(self, tmpl_name: str, ctx: dict[str, Any]) -> None: + self.send_rendered(Path(f'{tmpl_name}.tmpl'), ctx) + + def do_POST(self) -> None: + """"Route POST requests to respective handlers.""" + # pylint: disable=invalid-name + redir_target = Path(self.path) + if (file_prefixed := self.postvars.keys_prefixed(_PREFIX_FILE)): + self.post_file_action(file_prefixed[0]) + elif (self.pagename.startswith(_PREFIX_EDIT) + and self.postvars.first('apply')): + redir_target = self.post_edit() + elif self.pagename.startswith(_PREFIX_LEDGER): + redir_target = self.post_ledger_action() + self.redirect(redir_target) + + def post_file_action(self, file_prefixed: str) -> None: + """Based on file_prefixed name, trigger .server.ledger.(load|save).""" + if file_prefixed == f'{_PREFIX_FILE}load': + self.server.ledger.load() + elif file_prefixed == f'{_PREFIX_FILE}save': + self.server.ledger.save() + + def post_edit(self) -> Path: + """Based on postvars, edit targeted Booking.""" + booking = self.server.ledger.bookings[int(self.path_toks[2])] + new_lines = [] + if self.pagename == _PAGENAME_EDIT_STRUCT: + line_keys = self.postvars.keys_prefixed('line_') + lineno_to_inputs: dict[int, list[str]] = {} + for key in line_keys: + toks = key.split('_', maxsplit=2) + lineno = int(toks[1]) + if lineno not in lineno_to_inputs: + lineno_to_inputs[lineno] = [] + lineno_to_inputs[lineno] += [toks[2]] + indent = ' ' + for lineno, input_names in lineno_to_inputs.items(): + data = '' + comment = self.postvars.first(f'line_{lineno}_comment') + for name in input_names: + input_ = self.postvars.first(f'line_{lineno}_{name}' + ).strip() + if name == 'date': + data = input_ + elif name == 'target': + data += f' {input_}' + elif name == 'error': + data = f'{indent}{input_}' + elif name == 'account': + data = f'{indent}{input_}' + elif name in {'amount', 'currency'}: + data += f' {input_}' + new_lines += [ + DatLine(f'{data} ; {comment}' if comment else data)] + else: # edit_raw + new_lines += [DatLine(line) for line + in self.postvars.first('booking').splitlines()] + new_id = self.server.ledger.rewrite_booking(booking.id_, new_lines) + return Path('/bookings').joinpath(f'{new_id}') + + def post_ledger_action(self) -> Path: + """Call .server.ledger.(move|copy|add_empty_new)_booking.""" + if 'add_booking' in self.postvars.as_dict: + id_ = self.server.ledger.add_empty_booking() + else: + keys_prefixed = self.postvars.keys_prefixed(_PREFIX_LEDGER) + action, id_str = keys_prefixed[0].split('_', maxsplit=2)[1:] + id_ = int(id_str) + if action.startswith('move'): + id_ = self.server.ledger.move_booking(id_, action == 'moveup') + return Path(self.path).joinpath(f'#{id_}') + id_ = self.server.ledger.copy_booking(id_) + return Path(_PAGENAME_EDIT_STRUCT).joinpath(f'{id_}') + + def do_GET(self) -> None: + """"Route GET requests to respective handlers.""" + # pylint: disable=invalid-name + if self.pagename == 'bookings': + self.redirect( + Path('/').joinpath(_PAGENAME_EDIT_STRUCT + ).joinpath(self.path_toks[2])) + return + ctx = {'tainted': self.server.ledger.tainted, 'path': self.path} + if self.pagename == 'balance': + self.get_balance(ctx) + elif self.pagename.startswith(_PREFIX_EDIT): + self.get_edit(ctx, self.pagename == _PAGENAME_EDIT_RAW) + elif self.pagename.startswith(_PREFIX_LEDGER): + self.get_ledger(ctx, self.pagename == _PAGENAME_LEDGER_RAW) + else: + self.get_ledger(ctx, False) + + def get_balance(self, ctx) -> None: + """Display tree of calculated Accounts over .bookings[:up_incl+1].""" + id_ = int(self.params.first('up_incl') or '-1') + ctx['roots'] = [ac for ac in self.server.ledger.accounts.values() + if not ac.parent] + ctx['valid'] = self.server.ledger.bookings_valid_up_incl(id_) + ctx['booking'] = self.server.ledger.bookings[id_] + ctx['path_up_incl'] = f'{self.path_toks[1]}?up_incl=' + self._send_rendered('balance', ctx) + + def get_edit(self, ctx, raw: bool) -> None: + """Display edit form for individual Booking.""" + id_ = int(self.path_toks[2]) + booking = self.server.ledger.bookings[id_] + observed_tree: list[dict[str, Any]] = [] + for full_path in sorted(booking.account_changes.keys()): + parent_children: list[dict[str, Any]] = observed_tree + for path, _ in Account.path_to_steps(full_path): + already_registered = False + for child in [n for n in parent_children if path == n['name']]: + parent_children = child['children'] + already_registered = True + break + if already_registered: + continue + self.server.ledger.ensure_account(path) + before = self.server.ledger.accounts[path].get_wealth(id_ - 1) + after = self.server.ledger.accounts[path].get_wealth(id_) + direct_target = full_path == path + diff = { + cur: amt for cur, amt in (after - before).moneys.items() + if amt != 0 + or (direct_target + and cur in booking.account_changes[full_path].moneys)} + if diff or direct_target: + displayed_currencies = set(diff.keys()) + for wealth in before, after: + wealth.ensure_currencies(displayed_currencies) + wealth.purge_currencies_except(displayed_currencies) + node: dict[str, Any] = { + 'name': path, + 'direct_target': direct_target, + 'wealth_before': before.moneys, + 'wealth_diff': diff, + 'wealth_after': after.moneys, + 'children': []} + parent_children += [node] + parent_children = node['children'] + ctx['roots'] = observed_tree + ctx['id'] = id_ + ctx['dat_lines'] = [dl if raw else dl.as_dict + for dl in booking.booked_lines] + ctx['valid'] = self.server.ledger.bookings_valid_up_incl(id_) + if not raw: + ctx['all_accounts'] = sorted(self.server.ledger.accounts.keys()) + self._send_rendered( + _PAGENAME_EDIT_RAW if raw else _PAGENAME_EDIT_STRUCT, ctx) + + def get_ledger(self, ctx: dict[str, Any], raw: bool) -> None: + """Display ledger of all Bookings.""" + ctx['dat_lines'] = self.server.ledger.dat_lines + self._send_rendered( + _PAGENAME_LEDGER_RAW if raw else _PAGENAME_LEDGER_STRUCT, ctx) diff --git a/src/ledgplom/ledger.py b/src/ledgplom/ledger.py new file mode 100644 index 0000000..c2213bf --- /dev/null +++ b/src/ledgplom/ledger.py @@ -0,0 +1,562 @@ +"""Actual ledger classes.""" + +# standard libs +from datetime import date as dt_date +from decimal import Decimal, InvalidOperation as DecimalInvalidOperation +from pathlib import Path +from typing import Any, Iterator, Optional, Self + + +_PREFIX_DEF = 'def ' + + +class _Dictable: + """Line abstraction featuring .as_dict property.""" + dictables: set[str] = set() + + @property + def as_dict(self) -> dict[str, Any]: + """Return as JSON-ready dict attributes listed in .dictables.""" + d = {} + for name in self.dictables: + value = getattr(self, name) + if hasattr(value, 'as_dict'): + value = value.as_dict + elif not isinstance(value, (str, int)): + value = str(value) + d[name] = value + return d + + +class _Wealth(): + """Collects amounts mapped to currencies.""" + + def __init__(self, moneys: Optional[dict[str, Decimal]] = None) -> None: + self.moneys = moneys if moneys else {} + self._sort_with_euro_up() + + def _sort_with_euro_up(self) -> None: + if '€' in self.moneys: + temp = {'€': self.moneys['€']} + for curr in sorted([c for c in self.moneys if c != '€']): + temp[curr] = self.moneys[curr] + self.moneys = temp + + def ensure_currencies(self, currencies: set[str]) -> None: + """Ensure all of currencies have at least a Decimal(0) entry.""" + for currency in currencies: + if currency not in self.moneys: + self.moneys[currency] = Decimal(0) + self._sort_with_euro_up() + + def purge_currencies_except(self, currencies: set[str]) -> None: + """From .moneys remove currencies except those listed.""" + self.moneys = {curr: amt for curr, amt in self.moneys.items() + if curr in currencies} + + def _add(self, other: Self, add=True) -> Self: + result = self.__class__(self.moneys.copy()) + result.ensure_currencies(set(other.moneys.keys())) + for currency, amount in other.moneys.items(): + result.moneys[currency] += amount if add else -amount + return result + + def __add__(self, other: Self) -> Self: + return self._add(other) + + def __sub__(self, other: Self) -> Self: + return self._add(other, add=False) + + @property + def sink_empty(self) -> bool: + """Return if all evens out to zero.""" + return not bool(self.as_sink.moneys) + + @property + def as_sink(self) -> '_Wealth': + """Drop zero amounts, invert non-zero ones.""" + sink = _Wealth() + for moneys in [_Wealth({c: a}) for c, a in self.moneys.items() if a]: + sink -= moneys + return sink + + +class Account: + """Combine name, position in tree of own, and wealth of self + children.""" + + def __init__(self, parent: Optional['Account'], basename: str) -> None: + self._wealth_diffs: dict[int, _Wealth] = {} + self.basename = basename + self.desc = '' + self.children: list[Self] = [] + self.parent = parent + if self.parent: + self.parent.children += [self] + + def _get_local_wealth(self, up_incl: int) -> _Wealth: + """Calculate by summing all recorded wealth diffs up+incl. _Booking.""" + wealth = _Wealth() + for wealth_diff in [wd for id_, wd in self._wealth_diffs.items() + if id_ <= up_incl]: + wealth += wealth_diff + return wealth + + def get_wealth(self, up_incl: int) -> _Wealth: + """Total of .local_wealth with that of .children up+incl. _Booking.""" + total = _Wealth() + total += self._get_local_wealth(up_incl) + for child in self.children: + total += child.get_wealth(up_incl) + return total + + def add_wealth_diff(self, booking_id: int, wealth_diff: _Wealth) -> None: + """Add knowledge that _Booking of booking_add added wealth_diff.""" + if booking_id in self._wealth_diffs: + self._wealth_diffs[booking_id] += wealth_diff + else: + self._wealth_diffs[booking_id] = wealth_diff + + @staticmethod + def path_to_steps(full_path: str) -> Iterator[tuple[str, str]]: + """Split full_path into steps, for each return its path, basename.""" + rebuilt_path = '' + for step_name in full_path.split(':'): + rebuilt_path += (':' if rebuilt_path else '') + step_name + yield rebuilt_path, step_name + + +class DatLine(_Dictable): + """Line of .dat file parsed into comments and machine-readable data.""" + dictables = {'booking_line', 'code', 'comment', 'error', 'is_intro'} + prev_line_empty: bool + + def __init__(self, line: str) -> None: + self.raw = line[:] + halves = [t.rstrip() for t in line.split(';', maxsplit=1)] + self.comment = halves[1] if len(halves) > 1 else '' + self.code = halves[0] + self.booking_line: Optional[_BookingLine] = None + + @property + def comment_instructions(self) -> dict[str, str]: + """Parse .comment into Account modification instructions.""" + instructions = {} + if self.comment.startswith(_PREFIX_DEF): + parts = [part.strip() for part + in self.comment[len(_PREFIX_DEF):].split(';')] + first_part_parts = parts[0].split(maxsplit=1) + account_name = first_part_parts[0] + desc = first_part_parts[1] if len(first_part_parts) > 1 else '' + instructions[account_name] = desc + return instructions + + @property + def comment_in_ledger(self) -> str: + """What to show in structured ledger view (no instructions).""" + return '' if len(self.comment_instructions) > 0 else self.comment + + @property + def is_intro(self) -> bool: + """Return if intro line of a _Booking.""" + return isinstance(self.booking_line, _IntroLine) + + @property + def booking_id(self) -> int: + """If .booking_line, its .booking_id, else -1.""" + return self.booking.id_ if self.booking else -1 + + @property + def booking(self) -> Optional['_Booking']: + """If .booking_line, matching _Booking, else None.""" + return self.booking_line.booking if self.booking_line else None + + @property + def error(self) -> str: + """Return error if registered on attempt to parse into _BookingLine.""" + return '; '.join(self.booking_line.errors) if self.booking_line else '' + + @property + def is_questionable(self) -> bool: + """Return whether line be questionable per associated _Booking.""" + return (self.booking_line.booking.is_questionable if self.booking_line + else False) + + @property + def raw_nbsp(self) -> str: + """Return .raw but ensure whitespace as  , and at least one.""" + if not self.raw: + return ' ' + return self.raw.replace(' ', ' ') + + +class _BookingLine(_Dictable): + """Parsed code part of a DatLine belonging to a _Booking.""" + + def __init__(self, booking: '_Booking') -> None: + self.errors: list[str] = [] + self.booking = booking + self.idx = 0 + + +class _IntroLine(_BookingLine): + """First line of a _Booking, expected to carry date etc.""" + dictables = {'date', 'target'} + + def __init__(self, booking: '_Booking', code: str) -> None: + super().__init__(booking) + if code[0].isspace(): + self.errors += ['intro line indented'] + toks = code.lstrip().split(maxsplit=1) + self.date = toks[0] + self.target = toks[1] if len(toks) > 1 else '' + if len(toks) == 1: + self.errors += ['illegal number of tokens'] + try: + dt_date.fromisoformat(self.date) + except ValueError: + self.errors += [f'not properly formatted legal date: {self.date}'] + + +class _TransferLine(_BookingLine): + """Non-first _Booking line, expected to carry value movement.""" + dictables = {'amount', 'account', 'currency'} + + def __init__(self, booking: '_Booking', code: str, idx: int) -> None: + super().__init__(booking) + self.idx = idx + self.currency = '' + self.amount: Optional[Decimal] = None + if not code[0].isspace(): + self.errors += ['transfer line not indented'] + toks = code.lstrip().split() + self.account = toks[0] + if len(toks) > 1: + self.currency = toks[2] if 3 == len(toks) else '€' + try: + self.amount = Decimal(toks[1]) + except DecimalInvalidOperation: + self.errors += [f'improper amount value: {toks[1]}'] + if len(toks) > 3: + self.errors += ['illegal number of tokens'] + + @property + def amount_short(self) -> str: + """If no .amount, '', else printed – but if too long, ellipsized.""" + if self.amount is not None: + exp = self.amount.as_tuple().exponent + assert isinstance(exp, int) + return f'{self.amount:.1f}…' if exp < -2 else f'{self.amount:.2f}' + return '' + + +class _Booking: + """Represents lines of individual booking.""" + next: Optional[Self] + prev: Optional[Self] + + def __init__(self, + id_: int, + booked_lines: list[DatLine], + gap_lines: Optional[list[DatLine]] = None + ) -> None: + self.next, self.prev = None, None + self.id_, self.booked_lines = id_, booked_lines[:] + self._gap_lines = gap_lines[:] if gap_lines else [] + # parse booked_lines into Intro- and _TransferLines + self.intro_line = _IntroLine(self, self.booked_lines[0].code) + self._transfer_lines = [ + _TransferLine(self, b_line.code, i+1) for i, b_line + in enumerate(self.booked_lines[1:])] + self.booked_lines[0].booking_line = self.intro_line + for i, b_line in enumerate(self._transfer_lines): + self.booked_lines[i + 1].booking_line = b_line + # calculate .account_changes + changes = _Wealth() + sink_account = None + self.account_changes: dict[str, _Wealth] = {} + for transfer_line in [tl for tl in self._transfer_lines + if not tl.errors]: + if transfer_line.account not in self.account_changes: + self.account_changes[transfer_line.account] = _Wealth() + if transfer_line.amount is None: + if sink_account: + transfer_line.errors += ['too many sinks'] + sink_account = transfer_line.account + continue + change = _Wealth({transfer_line.currency: transfer_line.amount}) + self.account_changes[transfer_line.account] += change + changes += change + if sink_account: + self.account_changes[sink_account] += changes.as_sink + elif not changes.sink_empty: + self._transfer_lines[-1].errors += ['needed sink missing'] + + def recalc_prev_next(self, bookings: list[Self]) -> None: + """Assuming .id_ to be index in bookings, link prev + next bookings.""" + self.prev = bookings[self.id_ - 1] if self.id_ > 0 else None + if self.prev: + self.prev.next = self + self.next = (bookings[self.id_ + 1] if self.id_ + 1 < len(bookings) + else None) + if self.next: + self.next.prev = self + + @property + def gap_lines(self) -> list[DatLine]: + """Return ._gap_lines or, if empty, list of one empty DatLine.""" + return self._gap_lines if self._gap_lines else [DatLine('')] + + @gap_lines.setter + def gap_lines(self, gap_lines=list[DatLine]) -> None: + self._gap_lines = gap_lines[:] + + @property + def gap_lines_copied(self) -> list[DatLine]: + """Return new DatLines generated from .raw's of .gap_lines.""" + return [DatLine(dat_line.raw) for dat_line in self.gap_lines] + + @property + def booked_lines_copied(self) -> list[DatLine]: + """Return new DatLines generated from .raw's of .booked_lines.""" + return [DatLine(dat_line.raw) for dat_line in self.booked_lines] + + @property + def target(self) -> str: + """Return main other party for transaction.""" + return self.intro_line.target + + @property + def date(self) -> str: + """Return _Booking's day's date.""" + return self.intro_line.date + + def can_move(self, up: bool) -> bool: + """Whether movement rules would allow self to move up or down.""" + if up and ((not self.prev) or self.prev.date != self.date): + return False + if (not up) and ((not self.next) or self.next.date != self.date): + return False + return True + + @property + def is_questionable(self) -> bool: + """Whether lines count any errors.""" + for _ in [bl for bl in [self.intro_line] + self._transfer_lines + if bl.errors]: + return True + return False + + +class Ledger: + """Collection of DatLines, and Accounts, _Bookings derived from them.""" + accounts: dict[str, Account] + bookings: list[_Booking] + dat_lines: list[DatLine] + initial_gap_lines: list[DatLine] + + def __init__(self, path_dat: Path) -> None: + self._path_dat = path_dat + self.load() + + def load(self) -> None: + """(Re-)read ledger from file at ._path_dat.""" + self.accounts, self.bookings, self.initial_gap_lines = {}, [], [] + self.dat_lines: list[DatLine] = [ + DatLine(line) + for line in self._path_dat.read_text(encoding='utf8').splitlines()] + self.last_save_hash = self._hash_dat_lines() + booked: list[DatLine] = [] + gap_lines: list[DatLine] = [] + booking: Optional[_Booking] = None + for dat_line in self.dat_lines + [DatLine('')]: + if dat_line.code: + if gap_lines: + if booking: + booking.gap_lines = gap_lines[:] + else: + self.initial_gap_lines = gap_lines[:] + gap_lines.clear() + booked += [dat_line] + else: + gap_lines += [dat_line] + if booked: + booking = _Booking(len(self.bookings), booked[:]) + self.bookings += [booking] + booked.clear() + if booking: + booking.gap_lines = gap_lines[:-1] + self._sync(recalc_datlines=False) + + def _sync(self, recalc_datlines=True, check_dates=True): + if recalc_datlines: + self.dat_lines = self.initial_gap_lines[:] + for booking in self.bookings: + self.dat_lines += booking.booked_lines + self.dat_lines += booking.gap_lines + for idx, booking in enumerate(self.bookings[1:]): + booking.prev = self.bookings[idx] + for idx, booking in enumerate(self.bookings[:-1]): + booking.next = self.bookings[idx + 1] + self.bookings[-1].next = None + if check_dates: + last_date = '' + err_msg = 'date < previous valid date' + for booking in self.bookings: + if err_msg in booking.intro_line.errors: + booking.intro_line.errors.remove(err_msg) + if last_date > booking.date: + booking.intro_line.errors += [err_msg] + else: + last_date = booking.date + self._recalc_prev_line_empty() + self.accounts = {} + for dat_line in self.dat_lines: + for acc_name, desc in dat_line.comment_instructions.items(): + self.ensure_account(acc_name) + self.accounts[acc_name].desc = desc + for booking in self.bookings: + for acc_name, wealth in booking.account_changes.items(): + self.ensure_account(acc_name) + self.accounts[acc_name].add_wealth_diff(booking.id_, wealth) + + def ensure_account(self, full_path: str) -> None: + """If full_path not in self.accounts, add its tree with Accounts.""" + parent_path = '' + for path, step_name in Account.path_to_steps(full_path): + if path not in self.accounts: + self.accounts[path] = Account( + self.accounts[parent_path] if parent_path else None, + step_name) + parent_path = path + + def save(self) -> None: + """Save current state to ._path_dat.""" + self._path_dat.write_text( + '\n'.join([line.raw for line in self.dat_lines]), encoding='utf8') + self.load() + + def _hash_dat_lines(self) -> int: + return hash(tuple(dl.raw for dl in self.dat_lines)) + + def bookings_valid_up_incl(self, booking_id: int) -> bool: + """If no .is_questionable in self.bookings up to booking_id.""" + return len([b for b in self.bookings[:booking_id + 1] + if b.is_questionable] + ) < 1 + + @property + def tainted(self) -> bool: + """If .dat_lines different to those of last .load().""" + return self._hash_dat_lines() != self.last_save_hash + + def _recalc_prev_line_empty(self) -> None: + prev_line = None + for line in self.dat_lines: + line.prev_line_empty = False + if prev_line: + line.prev_line_empty = not (prev_line.code + + prev_line.comment_in_ledger) + if prev_line or line.code + line.comment_in_ledger: # jump over + prev_line = line # empty start + + def _move_booking(self, idx_from: int, idx_to: int): + moving = self.bookings[idx_from] + if idx_from >= idx_to: # moving upward, deletion must + del self.bookings[idx_from] # precede insertion to keep + self.bookings[idx_to:idx_to] = [moving] # deletion index, downwards + if idx_from < idx_to: # the other way around keeps + del self.bookings[idx_from] # insertion index + min_idx, max_idx = min(idx_from, idx_to), max(idx_from, idx_to) + for idx, booking in enumerate(self.bookings[min_idx:max_idx + 1]): + booking.id_ = min_idx + idx + + def move_booking(self, idx_from: int, up: bool) -> int: + """Move _Booking of old_id one step up or downwards""" + new_id = idx_from + (-1 if up else 1) + idx_to = new_id + (0 if up else 1) # down-move imlies jump over next + self._move_booking(new_id, idx_to) + self._sync() + return new_id + + def rewrite_booking(self, old_id: int, new_lines: list[DatLine]) -> int: + """Rewrite _Booking with new_lines, move if changed date.""" + old_booking = self.bookings[old_id] + booked_start, booked_end, gap_start_found = -1, 0, False + for i, line in enumerate(new_lines): + if booked_start < 0 and line.code.strip(): # ignore any initial + booked_start = i # empty lines + elif booked_start >= 0 and not line.code.strip(): # past start, + gap_start_found = True # yet empty? gap + if not gap_start_found: # end index is always after current line, + booked_end += 1 # provided we're not yet in the gap + elif line.code.strip(): + new_lines[i] = DatLine(f'; {line.code}') + before_gap = new_lines[:booked_start] + new_booked_lines = (new_lines[booked_start:booked_end] + if booked_start > -1 else []) + after_gap = old_booking.gap_lines_copied # new gap be old gap _plus_ + after_gap += new_lines[booked_end:] # any new gap lines + if not new_booked_lines: # interpret empty posting as deletion request + del self.bookings[old_id] + for booking in self.bookings[old_id:]: + booking.id_ -= 1 + leftover_gap = before_gap + after_gap + if old_id == 0: + self.initial_gap_lines += leftover_gap + else: + self.bookings[old_id - 1].gap_lines += leftover_gap + self._sync(check_dates=False) + return old_id if old_id < len(self.bookings) else 0 + if old_id == 0: + self.initial_gap_lines += before_gap + else: + self.bookings[old_id - 1].gap_lines += before_gap + new_date = new_booked_lines[0].code.lstrip().split(maxsplit=1)[0] + updated = _Booking(old_id, new_booked_lines, after_gap) + self.bookings[old_id] = updated + if new_date != old_booking.date: # if changed date, move to there + if self.bookings[0].date > new_date: + new_id = 0 + elif self.bookings[-1].date < new_date: + new_id = self.bookings[-1].id_ + 1 + else: + of_date_1st = i_booking = self.bookings[0] + while i_booking.next: + if of_date_1st.date != i_booking.date: + of_date_1st = i_booking + if i_booking.next.date > new_date: + break + i_booking = i_booking.next + # ensure that, if we land in group of like-dated _Bookings, we + # land on the edge closest to our last position + new_id = (of_date_1st.id_ if old_id < i_booking.id_ + else i_booking.id_ + 1) + self._move_booking(old_id, new_id) + self._sync(check_dates=False) + return updated.id_ + + def _add_new_booking( + self, + target: str, + dat_lines_transaction: list[DatLine], + intro_comment: str = '' + ) -> int: + booking = _Booking( + len(self.bookings), + [DatLine(f'{dt_date.today().isoformat()} {target}' + + ' ; '.join([''] + [s for s in [intro_comment] if s])) + ] + dat_lines_transaction) + self.bookings += [booking] + self._sync() + return booking.id_ + + def add_empty_booking(self) -> int: + """Add new _Booking to end of ledger.""" + return self._add_new_booking('?', []) + + def copy_booking(self, copied_id: int) -> int: + """Add copy of _Booking of copied_id to_end of ledger.""" + copied = self.bookings[copied_id] + return self._add_new_booking(copied.target, + copied.booked_lines_copied[1:], + copied.booked_lines[0].comment) diff --git a/src/run.py b/src/run.py index 91079d7..91c1905 100755 --- a/src/run.py +++ b/src/run.py @@ -2,755 +2,18 @@ """Viewer and editor for ledger .dat files.""" # standard libs -from datetime import date as dt_date -from decimal import Decimal, InvalidOperation as DecimalInvalidOperation from os import environ from pathlib import Path from sys import exit as sys_exit -from typing import Any, Iterator, Optional, Self # non-standard libs try: - from plomlib.web import PlomHttpHandler, PlomHttpServer, PlomQueryMap + from ledgplom.http import Server except ModuleNotFoundError as e: print(f"Missing dependency: {e}. Please run with 'install_deps' argument.") sys_exit(1) LEDGER_DAT = environ.get('LEDGER_DAT') -SERVER_PORT = 8084 -SERVER_HOST = '127.0.0.1' -PATH_TEMPLATES = Path('templates') - -PREFIX_DEF = 'def ' -PREFIX_LEDGER = 'ledger_' -PREFIX_EDIT = 'edit_' -PREFIX_FILE = 'file_' -TOK_STRUCT = 'structured' -TOK_RAW = 'raw' -EDIT_STRUCT = f'{PREFIX_EDIT}{TOK_STRUCT}' -EDIT_RAW = f'{PREFIX_EDIT}{TOK_RAW}' -LEDGER_STRUCT = f'{PREFIX_LEDGER}{TOK_STRUCT}' -LEDGER_RAW = f'{PREFIX_LEDGER}{TOK_RAW}' - - -class Dictable: - """Line abstraction featuring .as_dict property.""" - dictables: set[str] = set() - - @property - def as_dict(self) -> dict[str, Any]: - """Return as JSON-ready dict attributes listed in .dictables.""" - d = {} - for name in self.dictables: - value = getattr(self, name) - if hasattr(value, 'as_dict'): - value = value.as_dict - elif not isinstance(value, (str, int)): - value = str(value) - d[name] = value - return d - - -class Wealth(): - """Collects amounts mapped to currencies.""" - - def __init__(self, moneys: Optional[dict[str, Decimal]] = None) -> None: - self.moneys = moneys if moneys else {} - self._sort_with_euro_up() - - def _sort_with_euro_up(self) -> None: - if '€' in self.moneys: - temp = {'€': self.moneys['€']} - for curr in sorted([c for c in self.moneys if c != '€']): - temp[curr] = self.moneys[curr] - self.moneys = temp - - def ensure_currencies(self, currencies: set[str]) -> None: - """Ensure all of currencies have at least a Decimal(0) entry.""" - for currency in currencies: - if currency not in self.moneys: - self.moneys[currency] = Decimal(0) - self._sort_with_euro_up() - - def purge_currencies_except(self, currencies: set[str]) -> None: - """From .moneys remove currencies except those listed.""" - self.moneys = {curr: amt for curr, amt in self.moneys.items() - if curr in currencies} - - def _add(self, other: Self, add=True) -> Self: - result = self.__class__(self.moneys.copy()) - result.ensure_currencies(set(other.moneys.keys())) - for currency, amount in other.moneys.items(): - result.moneys[currency] += amount if add else -amount - return result - - def __add__(self, other: Self) -> Self: - return self._add(other) - - def __sub__(self, other: Self) -> Self: - return self._add(other, add=False) - - @property - def sink_empty(self) -> bool: - """Return if all evens out to zero.""" - return not bool(self.as_sink.moneys) - - @property - def as_sink(self) -> 'Wealth': - """Drop zero amounts, invert non-zero ones.""" - sink = Wealth() - for moneys in [Wealth({c: a}) for c, a in self.moneys.items() if a]: - sink -= moneys - return sink - - -class Account: - """Combine name, position in tree of own, and wealth of self + children.""" - - def __init__(self, parent: Optional['Account'], basename: str) -> None: - self._wealth_diffs: dict[int, Wealth] = {} - self.basename = basename - self.desc = '' - self.children: list[Self] = [] - self.parent = parent - if self.parent: - self.parent.children += [self] - - def _get_local_wealth(self, up_incl: int) -> Wealth: - """Calculate by summing all recorded wealth diffs up+incl. Booking.""" - wealth = Wealth() - for wealth_diff in [wd for id_, wd in self._wealth_diffs.items() - if id_ <= up_incl]: - wealth += wealth_diff - return wealth - - def get_wealth(self, up_incl: int) -> Wealth: - """Total of .local_wealth with that of .children up+incl. Booking.""" - total = Wealth() - total += self._get_local_wealth(up_incl) - for child in self.children: - total += child.get_wealth(up_incl) - return total - - def add_wealth_diff(self, booking_id: int, wealth_diff: Wealth) -> None: - """Add knowledge that Booking of booking_add added wealth_diff.""" - if booking_id in self._wealth_diffs: - self._wealth_diffs[booking_id] += wealth_diff - else: - self._wealth_diffs[booking_id] = wealth_diff - - @staticmethod - def path_to_steps(full_path: str) -> Iterator[tuple[str, str]]: - """Split full_path into steps, for each return its path, basename.""" - rebuilt_path = '' - for step_name in full_path.split(':'): - rebuilt_path += (':' if rebuilt_path else '') + step_name - yield rebuilt_path, step_name - - -class DatLine(Dictable): - """Line of .dat file parsed into comments and machine-readable data.""" - dictables = {'booking_line', 'code', 'comment', 'error', 'is_intro'} - prev_line_empty: bool - - def __init__(self, line: str) -> None: - self.raw = line[:] - halves = [t.rstrip() for t in line.split(';', maxsplit=1)] - self.comment = halves[1] if len(halves) > 1 else '' - self.code = halves[0] - self.booking_line: Optional[BookingLine] = None - - @property - def comment_instructions(self) -> dict[str, str]: - """Parse .comment into Account modification instructions.""" - instructions = {} - if self.comment.startswith(PREFIX_DEF): - parts = [part.strip() for part - in self.comment[len(PREFIX_DEF):].split(';')] - first_part_parts = parts[0].split(maxsplit=1) - account_name = first_part_parts[0] - desc = first_part_parts[1] if len(first_part_parts) > 1 else '' - instructions[account_name] = desc - return instructions - - @property - def comment_in_ledger(self) -> str: - """What to show in structured ledger view (no instructions).""" - return '' if len(self.comment_instructions) > 0 else self.comment - - @property - def is_intro(self) -> bool: - """Return if intro line of a Booking.""" - return isinstance(self.booking_line, IntroLine) - - @property - def booking_id(self) -> int: - """If .booking_line, its .booking_id, else -1.""" - return self.booking.id_ if self.booking else -1 - - @property - def booking(self) -> Optional['Booking']: - """If .booking_line, matching Booking, else None.""" - return self.booking_line.booking if self.booking_line else None - - @property - def error(self) -> str: - """Return error if registered on attempt to parse into BookingLine.""" - return '; '.join(self.booking_line.errors) if self.booking_line else '' - - @property - def is_questionable(self) -> bool: - """Return whether line be questionable per associated Booking.""" - return (self.booking_line.booking.is_questionable if self.booking_line - else False) - - @property - def raw_nbsp(self) -> str: - """Return .raw but ensure whitespace as  , and at least one.""" - if not self.raw: - return ' ' - return self.raw.replace(' ', ' ') - - -class BookingLine(Dictable): - """Parsed code part of a DatLine belonging to a Booking.""" - - def __init__(self, booking: 'Booking') -> None: - self.errors: list[str] = [] - self.booking = booking - self.idx = 0 - - -class IntroLine(BookingLine): - """First line of a Booking, expected to carry date etc.""" - dictables = {'date', 'target'} - - def __init__(self, booking: 'Booking', code: str) -> None: - super().__init__(booking) - if code[0].isspace(): - self.errors += ['intro line indented'] - toks = code.lstrip().split(maxsplit=1) - self.date = toks[0] - self.target = toks[1] if len(toks) > 1 else '' - if len(toks) == 1: - self.errors += ['illegal number of tokens'] - try: - dt_date.fromisoformat(self.date) - except ValueError: - self.errors += [f'not properly formatted legal date: {self.date}'] - - -class TransferLine(BookingLine): - """Non-first Booking line, expected to carry value movement.""" - dictables = {'amount', 'account', 'currency'} - - def __init__(self, booking: 'Booking', code: str, idx: int) -> None: - super().__init__(booking) - self.idx = idx - self.currency = '' - self.amount: Optional[Decimal] = None - if not code[0].isspace(): - self.errors += ['transfer line not indented'] - toks = code.lstrip().split() - self.account = toks[0] - if len(toks) > 1: - self.currency = toks[2] if 3 == len(toks) else '€' - try: - self.amount = Decimal(toks[1]) - except DecimalInvalidOperation: - self.errors += [f'improper amount value: {toks[1]}'] - if len(toks) > 3: - self.errors += ['illegal number of tokens'] - - @property - def amount_short(self) -> str: - """If no .amount, '', else printed – but if too long, ellipsized.""" - if self.amount is not None: - exp = self.amount.as_tuple().exponent - assert isinstance(exp, int) - return f'{self.amount:.1f}…' if exp < -2 else f'{self.amount:.2f}' - return '' - - -class Booking: - """Represents lines of individual booking.""" - next: Optional[Self] - prev: Optional[Self] - - def __init__(self, - id_: int, - booked_lines: list[DatLine], - gap_lines: Optional[list[DatLine]] = None - ) -> None: - self.next, self.prev = None, None - self.id_, self.booked_lines = id_, booked_lines[:] - self._gap_lines = gap_lines[:] if gap_lines else [] - # parse booked_lines into Intro- and TransferLines - self.intro_line = IntroLine(self, self.booked_lines[0].code) - self._transfer_lines = [ - TransferLine(self, b_line.code, i+1) for i, b_line - in enumerate(self.booked_lines[1:])] - self.booked_lines[0].booking_line = self.intro_line - for i, b_line in enumerate(self._transfer_lines): - self.booked_lines[i + 1].booking_line = b_line - # calculate .account_changes - changes = Wealth() - sink_account = None - self.account_changes: dict[str, Wealth] = {} - for transfer_line in [tl for tl in self._transfer_lines - if not tl.errors]: - if transfer_line.account not in self.account_changes: - self.account_changes[transfer_line.account] = Wealth() - if transfer_line.amount is None: - if sink_account: - transfer_line.errors += ['too many sinks'] - sink_account = transfer_line.account - continue - change = Wealth({transfer_line.currency: transfer_line.amount}) - self.account_changes[transfer_line.account] += change - changes += change - if sink_account: - self.account_changes[sink_account] += changes.as_sink - elif not changes.sink_empty: - self._transfer_lines[-1].errors += ['needed sink missing'] - - def recalc_prev_next(self, bookings: list[Self]) -> None: - """Assuming .id_ to be index in bookings, link prev + next Bookings.""" - self.prev = bookings[self.id_ - 1] if self.id_ > 0 else None - if self.prev: - self.prev.next = self - self.next = (bookings[self.id_ + 1] if self.id_ + 1 < len(bookings) - else None) - if self.next: - self.next.prev = self - - @property - def gap_lines(self) -> list[DatLine]: - """Return ._gap_lines or, if empty, list of one empty DatLine.""" - return self._gap_lines if self._gap_lines else [DatLine('')] - - @gap_lines.setter - def gap_lines(self, gap_lines=list[DatLine]) -> None: - self._gap_lines = gap_lines[:] - - @property - def gap_lines_copied(self) -> list[DatLine]: - """Return new DatLines generated from .raw's of .gap_lines.""" - return [DatLine(dat_line.raw) for dat_line in self.gap_lines] - - @property - def booked_lines_copied(self) -> list[DatLine]: - """Return new DatLines generated from .raw's of .booked_lines.""" - return [DatLine(dat_line.raw) for dat_line in self.booked_lines] - - @property - def target(self) -> str: - """Return main other party for transaction.""" - return self.intro_line.target - - @property - def date(self) -> str: - """Return Booking's day's date.""" - return self.intro_line.date - - def can_move(self, up: bool) -> bool: - """Whether movement rules would allow self to move up or down.""" - if up and ((not self.prev) or self.prev.date != self.date): - return False - if (not up) and ((not self.next) or self.next.date != self.date): - return False - return True - - @property - def is_questionable(self) -> bool: - """Whether lines count any errors.""" - for _ in [bl for bl in [self.intro_line] + self._transfer_lines - if bl.errors]: - return True - return False - - -class Handler(PlomHttpHandler): - """"Handles HTTP requests.""" - mapper = PlomQueryMap - - def _send_rendered(self, tmpl_name: str, ctx: dict[str, Any]) -> None: - self.send_rendered(Path(f'{tmpl_name}.tmpl'), ctx) - - def do_POST(self) -> None: - """"Route POST requests to respective handlers.""" - # pylint: disable=invalid-name - redir_target = Path(self.path) - if (file_prefixed := self.postvars.keys_prefixed(PREFIX_FILE)): - self.post_file_action(file_prefixed[0]) - elif (self.pagename.startswith(PREFIX_EDIT) - and self.postvars.first('apply')): - redir_target = self.post_edit() - elif self.pagename.startswith(PREFIX_LEDGER): - redir_target = self.post_ledger_action() - self.redirect(redir_target) - - def post_file_action(self, file_prefixed: str) -> None: - """Based on file_prefixed name, trigger .server.ledger.(load|save).""" - if file_prefixed == f'{PREFIX_FILE}load': - self.server.ledger.load() - elif file_prefixed == f'{PREFIX_FILE}save': - self.server.ledger.save() - - def post_edit(self) -> Path: - """Based on postvars, edit targeted Booking.""" - booking = self.server.ledger.bookings[int(self.path_toks[2])] - new_lines = [] - if self.pagename == EDIT_STRUCT: - line_keys = self.postvars.keys_prefixed('line_') - lineno_to_inputs: dict[int, list[str]] = {} - for key in line_keys: - toks = key.split('_', maxsplit=2) - lineno = int(toks[1]) - if lineno not in lineno_to_inputs: - lineno_to_inputs[lineno] = [] - lineno_to_inputs[lineno] += [toks[2]] - indent = ' ' - for lineno, input_names in lineno_to_inputs.items(): - data = '' - comment = self.postvars.first(f'line_{lineno}_comment') - for name in input_names: - input_ = self.postvars.first(f'line_{lineno}_{name}' - ).strip() - if name == 'date': - data = input_ - elif name == 'target': - data += f' {input_}' - elif name == 'error': - data = f'{indent}{input_}' - elif name == 'account': - data = f'{indent}{input_}' - elif name in {'amount', 'currency'}: - data += f' {input_}' - new_lines += [ - DatLine(f'{data} ; {comment}' if comment else data)] - else: # edit_raw - new_lines += [DatLine(line) for line - in self.postvars.first('booking').splitlines()] - new_id = self.server.ledger.rewrite_booking(booking.id_, new_lines) - return Path('/bookings').joinpath(f'{new_id}') - - def post_ledger_action(self) -> Path: - """Call .server.ledger.(move|copy|add_empty_new)_booking.""" - if 'add_booking' in self.postvars.as_dict: - id_ = self.server.ledger.add_empty_booking() - else: - keys_prefixed = self.postvars.keys_prefixed(PREFIX_LEDGER) - action, id_str = keys_prefixed[0].split('_', maxsplit=2)[1:] - id_ = int(id_str) - if action.startswith('move'): - id_ = self.server.ledger.move_booking(id_, action == 'moveup') - return Path(self.path).joinpath(f'#{id_}') - id_ = self.server.ledger.copy_booking(id_) - return Path(EDIT_STRUCT).joinpath(f'{id_}') - - def do_GET(self) -> None: - """"Route GET requests to respective handlers.""" - # pylint: disable=invalid-name - if self.pagename == 'bookings': - self.redirect( - Path('/').joinpath(EDIT_STRUCT).joinpath(self.path_toks[2])) - return - ctx = {'tainted': self.server.ledger.tainted, 'path': self.path} - if self.pagename == 'balance': - self.get_balance(ctx) - elif self.pagename.startswith(PREFIX_EDIT): - self.get_edit(ctx, self.pagename == EDIT_RAW) - elif self.pagename.startswith(PREFIX_LEDGER): - self.get_ledger(ctx, self.pagename == LEDGER_RAW) - else: - self.get_ledger(ctx, False) - - def get_balance(self, ctx) -> None: - """Display tree of calculated Accounts over .bookings[:up_incl+1].""" - id_ = int(self.params.first('up_incl') or '-1') - ctx['roots'] = [ac for ac in self.server.ledger.accounts.values() - if not ac.parent] - ctx['valid'] = self.server.ledger.bookings_valid_up_incl(id_) - ctx['booking'] = self.server.ledger.bookings[id_] - ctx['path_up_incl'] = f'{self.path_toks[1]}?up_incl=' - self._send_rendered('balance', ctx) - - def get_edit(self, ctx, raw: bool) -> None: - """Display edit form for individual Booking.""" - id_ = int(self.path_toks[2]) - booking = self.server.ledger.bookings[id_] - observed_tree: list[dict[str, Any]] = [] - for full_path in sorted(booking.account_changes.keys()): - parent_children: list[dict[str, Any]] = observed_tree - for path, _ in Account.path_to_steps(full_path): - already_registered = False - for child in [n for n in parent_children if path == n['name']]: - parent_children = child['children'] - already_registered = True - break - if already_registered: - continue - self.server.ledger.ensure_account(path) - before = self.server.ledger.accounts[path].get_wealth(id_ - 1) - after = self.server.ledger.accounts[path].get_wealth(id_) - direct_target = full_path == path - diff = { - cur: amt for cur, amt in (after - before).moneys.items() - if amt != 0 - or (direct_target - and cur in booking.account_changes[full_path].moneys)} - if diff or direct_target: - displayed_currencies = set(diff.keys()) - for wealth in before, after: - wealth.ensure_currencies(displayed_currencies) - wealth.purge_currencies_except(displayed_currencies) - node: dict[str, Any] = { - 'name': path, - 'direct_target': direct_target, - 'wealth_before': before.moneys, - 'wealth_diff': diff, - 'wealth_after': after.moneys, - 'children': []} - parent_children += [node] - parent_children = node['children'] - ctx['roots'] = observed_tree - ctx['id'] = id_ - ctx['dat_lines'] = [dl if raw else dl.as_dict - for dl in booking.booked_lines] - ctx['valid'] = self.server.ledger.bookings_valid_up_incl(id_) - if not raw: - ctx['all_accounts'] = sorted(self.server.ledger.accounts.keys()) - self._send_rendered(EDIT_RAW if raw else EDIT_STRUCT, ctx) - - def get_ledger(self, ctx: dict[str, Any], raw: bool) -> None: - """Display ledger of all Bookings.""" - ctx['dat_lines'] = self.server.ledger.dat_lines - self._send_rendered(LEDGER_RAW if raw else LEDGER_STRUCT, ctx) - - -class Server(PlomHttpServer): - """Extends parent by loading .dat file into database for Handler.""" - - def __init__(self, path_dat: Path, *args, **kwargs) -> None: - super().__init__(PATH_TEMPLATES, (SERVER_HOST, SERVER_PORT), Handler) - self.ledger = Ledger(path_dat) - - -class Ledger: - """Collection of DatLines, and Accounts, Bookings derived from them.""" - accounts: dict[str, Account] - bookings: list[Booking] - dat_lines: list[DatLine] - initial_gap_lines: list[DatLine] - - def __init__(self, path_dat: Path) -> None: - self._path_dat = path_dat - self.load() - - def load(self) -> None: - """(Re-)read ledger from file at ._path_dat.""" - self.accounts, self.bookings, self.initial_gap_lines = {}, [], [] - self.initial_gap_lines: list[DatLine] = [] # TODO: fix duplicate booking - self.dat_lines: list[DatLine] = [ - DatLine(line) - for line in self._path_dat.read_text(encoding='utf8').splitlines()] - self.last_save_hash = self._hash_dat_lines() - booked: list[DatLine] = [] - gap_lines: list[DatLine] = [] - booking: Optional[Booking] = None - for dat_line in self.dat_lines + [DatLine('')]: - if dat_line.code: - if gap_lines: - if booking: - booking.gap_lines = gap_lines[:] - else: - self.initial_gap_lines = gap_lines[:] - gap_lines.clear() - booked += [dat_line] - else: - gap_lines += [dat_line] - if booked: - booking = Booking(len(self.bookings), booked[:]) - self.bookings += [booking] - booked.clear() - if booking: - booking.gap_lines = gap_lines[:-1] - self._sync(recalc_datlines=False) - - def _sync(self, recalc_datlines=True, check_dates=True): - if recalc_datlines: - self.dat_lines = self.initial_gap_lines[:] - for booking in self.bookings: - self.dat_lines += booking.booked_lines - self.dat_lines += booking.gap_lines - for idx, booking in enumerate(self.bookings[1:]): - booking.prev = self.bookings[idx] - for idx, booking in enumerate(self.bookings[:-1]): - booking.next = self.bookings[idx + 1] - self.bookings[-1].next = None - if check_dates: - last_date = '' - err_msg = 'date < previous valid date' - for booking in self.bookings: - if err_msg in booking.intro_line.errors: - booking.intro_line.errors.remove(err_msg) - if last_date > booking.date: - booking.intro_line.errors += [err_msg] - else: - last_date = booking.date - self._recalc_prev_line_empty() - self.accounts = {} - for dat_line in self.dat_lines: - for acc_name, desc in dat_line.comment_instructions.items(): - self.ensure_account(acc_name) - self.accounts[acc_name].desc = desc - for booking in self.bookings: - for acc_name, wealth in booking.account_changes.items(): - self.ensure_account(acc_name) - self.accounts[acc_name].add_wealth_diff(booking.id_, wealth) - - def ensure_account(self, full_path: str) -> None: - """If full_path not in self.accounts, add its tree with Accounts.""" - parent_path = '' - for path, step_name in Account.path_to_steps(full_path): - if path not in self.accounts: - self.accounts[path] = Account( - self.accounts[parent_path] if parent_path else None, - step_name) - parent_path = path - - def save(self) -> None: - """Save current state to ._path_dat.""" - self._path_dat.write_text( - '\n'.join([line.raw for line in self.dat_lines]), encoding='utf8') - self.load() - - def _hash_dat_lines(self) -> int: - return hash(tuple(dl.raw for dl in self.dat_lines)) - - def bookings_valid_up_incl(self, booking_id: int) -> bool: - """If no .is_questionable in self.bookings up to booking_id.""" - return len([b for b in self.bookings[:booking_id + 1] - if b.is_questionable] - ) < 1 - - @property - def tainted(self) -> bool: - """If .dat_lines different to those of last .load().""" - return self._hash_dat_lines() != self.last_save_hash - - def _recalc_prev_line_empty(self) -> None: - prev_line = None - for line in self.dat_lines: - line.prev_line_empty = False - if prev_line: - line.prev_line_empty = not (prev_line.code - + prev_line.comment_in_ledger) - if prev_line or line.code + line.comment_in_ledger: # jump over - prev_line = line # empty start - - def _move_booking(self, idx_from: int, idx_to: int): - moving = self.bookings[idx_from] - if idx_from >= idx_to: # moving upward, deletion must - del self.bookings[idx_from] # precede insertion to keep - self.bookings[idx_to:idx_to] = [moving] # deletion index, downwards - if idx_from < idx_to: # the other way around keeps - del self.bookings[idx_from] # insertion index - min_idx, max_idx = min(idx_from, idx_to), max(idx_from, idx_to) - for idx, booking in enumerate(self.bookings[min_idx:max_idx + 1]): - booking.id_ = min_idx + idx - - def move_booking(self, idx_from: int, up: bool) -> int: - """Move Booking of old_id one step up or downwards""" - new_id = idx_from + (-1 if up else 1) - idx_to = new_id + (0 if up else 1) # down-move imlies jump over next - self._move_booking(new_id, idx_to) - self._sync() - return new_id - - def rewrite_booking(self, old_id: int, new_lines: list[DatLine]) -> int: - """Rewrite Booking with new_lines, move if changed date.""" - old_booking = self.bookings[old_id] - booked_start, booked_end, gap_start_found = -1, 0, False - for i, line in enumerate(new_lines): - if booked_start < 0 and line.code.strip(): # ignore any initial - booked_start = i # empty lines - elif booked_start >= 0 and not line.code.strip(): # past start, - gap_start_found = True # yet empty? gap - if not gap_start_found: # end index is always after current line, - booked_end += 1 # provided we're not yet in the gap - elif line.code.strip(): - new_lines[i] = DatLine(f'; {line.code}') - before_gap = new_lines[:booked_start] - new_booked_lines = (new_lines[booked_start:booked_end] - if booked_start > -1 else []) - after_gap = old_booking.gap_lines_copied # new gap be old gap _plus_ - after_gap += new_lines[booked_end:] # any new gap lines - if not new_booked_lines: # interpret empty posting as deletion request - del self.bookings[old_id] - for booking in self.bookings[old_id:]: - booking.id_ -= 1 - leftover_gap = before_gap + after_gap - if old_id == 0: - self.initial_gap_lines += leftover_gap - else: - self.bookings[old_id - 1].gap_lines += leftover_gap - self._sync(check_dates=False) - return old_id if old_id < len(self.bookings) else 0 - if old_id == 0: - self.initial_gap_lines += before_gap - else: - self.bookings[old_id - 1].gap_lines += before_gap - new_date = new_booked_lines[0].code.lstrip().split(maxsplit=1)[0] - updated = Booking(old_id, new_booked_lines, after_gap) - self.bookings[old_id] = updated - if new_date != old_booking.date: # if changed date, move to there - if self.bookings[0].date > new_date: - new_id = 0 - elif self.bookings[-1].date < new_date: - new_id = self.bookings[-1].id_ + 1 - else: - of_date_1st = i_booking = self.bookings[0] - while i_booking.next: - if of_date_1st.date != i_booking.date: - of_date_1st = i_booking - if i_booking.next.date > new_date: - break - i_booking = i_booking.next - # ensure that, if we land in group of like-dated Bookings, we - # land on the edge closest to our last position - new_id = (of_date_1st.id_ if old_id < i_booking.id_ - else i_booking.id_ + 1) - self._move_booking(old_id, new_id) - self._sync(check_dates=False) - return updated.id_ - - def _add_new_booking( - self, - target: str, - dat_lines_transaction: list[DatLine], - intro_comment: str = '' - ) -> int: - booking = Booking( - len(self.bookings), - [DatLine(f'{dt_date.today().isoformat()} {target}' - + ' ; '.join([''] + [s for s in [intro_comment] if s])) - ] + dat_lines_transaction) - self.bookings += [booking] - self._sync() - return booking.id_ - - def add_empty_booking(self) -> int: - """Add new Booking to end of ledger.""" - return self._add_new_booking('?', []) - - def copy_booking(self, copied_id: int) -> int: - """Add copy of Booking of copied_id to_end of ledger.""" - copied = self.bookings[copied_id] - return self._add_new_booking(copied.target, - copied.booked_lines_copied[1:], - copied.booked_lines[0].comment) if __name__ == "__main__":