--- /dev/null
+"""Collect directly HTTP-related elements."""
+
+# standard libs
+from pathlib import Path
+from typing import Any
+# non-standard libs
+from plomlib.web import PlomHttpHandler, PlomHttpServer, PlomQueryMap
+from ledgplom.ledger import Account, DatLine, Ledger
+
+
+_SERVER_PORT = 8084
+_SERVER_HOST = '127.0.0.1'
+_PATH_TEMPLATES = Path('templates')
+_PREFIX_LEDGER = 'ledger_'
+_PREFIX_EDIT = 'edit_'
+_PREFIX_FILE = 'file_'
+_TOK_STRUCT = 'structured'
+_TOK_RAW = 'raw'
+_PAGENAME_EDIT_STRUCT = f'{_PREFIX_EDIT}{_TOK_STRUCT}'
+_PAGENAME_EDIT_RAW = f'{_PREFIX_EDIT}{_TOK_RAW}'
+_PAGENAME_LEDGER_STRUCT = f'{_PREFIX_LEDGER}{_TOK_STRUCT}'
+_PAGENAME_LEDGER_RAW = f'{_PREFIX_LEDGER}{_TOK_RAW}'
+
+
+class Server(PlomHttpServer):
+ """Extends parent by loading .dat file into database for Handler."""
+
+ def __init__(self, path_dat: Path, *args, **kwargs) -> None:
+ super().__init__(_PATH_TEMPLATES, (_SERVER_HOST, _SERVER_PORT),
+ _Handler)
+ self.ledger = Ledger(path_dat)
+
+
+class _Handler(PlomHttpHandler):
+ """"Handles HTTP requests."""
+ mapper = PlomQueryMap
+
+ def _send_rendered(self, tmpl_name: str, ctx: dict[str, Any]) -> None:
+ self.send_rendered(Path(f'{tmpl_name}.tmpl'), ctx)
+
+ def do_POST(self) -> None:
+ """"Route POST requests to respective handlers."""
+ # pylint: disable=invalid-name
+ redir_target = Path(self.path)
+ if (file_prefixed := self.postvars.keys_prefixed(_PREFIX_FILE)):
+ self.post_file_action(file_prefixed[0])
+ elif (self.pagename.startswith(_PREFIX_EDIT)
+ and self.postvars.first('apply')):
+ redir_target = self.post_edit()
+ elif self.pagename.startswith(_PREFIX_LEDGER):
+ redir_target = self.post_ledger_action()
+ self.redirect(redir_target)
+
+ def post_file_action(self, file_prefixed: str) -> None:
+ """Based on file_prefixed name, trigger .server.ledger.(load|save)."""
+ if file_prefixed == f'{_PREFIX_FILE}load':
+ self.server.ledger.load()
+ elif file_prefixed == f'{_PREFIX_FILE}save':
+ self.server.ledger.save()
+
+ def post_edit(self) -> Path:
+ """Based on postvars, edit targeted Booking."""
+ booking = self.server.ledger.bookings[int(self.path_toks[2])]
+ new_lines = []
+ if self.pagename == _PAGENAME_EDIT_STRUCT:
+ line_keys = self.postvars.keys_prefixed('line_')
+ lineno_to_inputs: dict[int, list[str]] = {}
+ for key in line_keys:
+ toks = key.split('_', maxsplit=2)
+ lineno = int(toks[1])
+ if lineno not in lineno_to_inputs:
+ lineno_to_inputs[lineno] = []
+ lineno_to_inputs[lineno] += [toks[2]]
+ indent = ' '
+ for lineno, input_names in lineno_to_inputs.items():
+ data = ''
+ comment = self.postvars.first(f'line_{lineno}_comment')
+ for name in input_names:
+ input_ = self.postvars.first(f'line_{lineno}_{name}'
+ ).strip()
+ if name == 'date':
+ data = input_
+ elif name == 'target':
+ data += f' {input_}'
+ elif name == 'error':
+ data = f'{indent}{input_}'
+ elif name == 'account':
+ data = f'{indent}{input_}'
+ elif name in {'amount', 'currency'}:
+ data += f' {input_}'
+ new_lines += [
+ DatLine(f'{data} ; {comment}' if comment else data)]
+ else: # edit_raw
+ new_lines += [DatLine(line) for line
+ in self.postvars.first('booking').splitlines()]
+ new_id = self.server.ledger.rewrite_booking(booking.id_, new_lines)
+ return Path('/bookings').joinpath(f'{new_id}')
+
+ def post_ledger_action(self) -> Path:
+ """Call .server.ledger.(move|copy|add_empty_new)_booking."""
+ if 'add_booking' in self.postvars.as_dict:
+ id_ = self.server.ledger.add_empty_booking()
+ else:
+ keys_prefixed = self.postvars.keys_prefixed(_PREFIX_LEDGER)
+ action, id_str = keys_prefixed[0].split('_', maxsplit=2)[1:]
+ id_ = int(id_str)
+ if action.startswith('move'):
+ id_ = self.server.ledger.move_booking(id_, action == 'moveup')
+ return Path(self.path).joinpath(f'#{id_}')
+ id_ = self.server.ledger.copy_booking(id_)
+ return Path(_PAGENAME_EDIT_STRUCT).joinpath(f'{id_}')
+
+ def do_GET(self) -> None:
+ """"Route GET requests to respective handlers."""
+ # pylint: disable=invalid-name
+ if self.pagename == 'bookings':
+ self.redirect(
+ Path('/').joinpath(_PAGENAME_EDIT_STRUCT
+ ).joinpath(self.path_toks[2]))
+ return
+ ctx = {'tainted': self.server.ledger.tainted, 'path': self.path}
+ if self.pagename == 'balance':
+ self.get_balance(ctx)
+ elif self.pagename.startswith(_PREFIX_EDIT):
+ self.get_edit(ctx, self.pagename == _PAGENAME_EDIT_RAW)
+ elif self.pagename.startswith(_PREFIX_LEDGER):
+ self.get_ledger(ctx, self.pagename == _PAGENAME_LEDGER_RAW)
+ else:
+ self.get_ledger(ctx, False)
+
+ def get_balance(self, ctx) -> None:
+ """Display tree of calculated Accounts over .bookings[:up_incl+1]."""
+ id_ = int(self.params.first('up_incl') or '-1')
+ ctx['roots'] = [ac for ac in self.server.ledger.accounts.values()
+ if not ac.parent]
+ ctx['valid'] = self.server.ledger.bookings_valid_up_incl(id_)
+ ctx['booking'] = self.server.ledger.bookings[id_]
+ ctx['path_up_incl'] = f'{self.path_toks[1]}?up_incl='
+ self._send_rendered('balance', ctx)
+
+ def get_edit(self, ctx, raw: bool) -> None:
+ """Display edit form for individual Booking."""
+ id_ = int(self.path_toks[2])
+ booking = self.server.ledger.bookings[id_]
+ observed_tree: list[dict[str, Any]] = []
+ for full_path in sorted(booking.account_changes.keys()):
+ parent_children: list[dict[str, Any]] = observed_tree
+ for path, _ in Account.path_to_steps(full_path):
+ already_registered = False
+ for child in [n for n in parent_children if path == n['name']]:
+ parent_children = child['children']
+ already_registered = True
+ break
+ if already_registered:
+ continue
+ self.server.ledger.ensure_account(path)
+ before = self.server.ledger.accounts[path].get_wealth(id_ - 1)
+ after = self.server.ledger.accounts[path].get_wealth(id_)
+ direct_target = full_path == path
+ diff = {
+ cur: amt for cur, amt in (after - before).moneys.items()
+ if amt != 0
+ or (direct_target
+ and cur in booking.account_changes[full_path].moneys)}
+ if diff or direct_target:
+ displayed_currencies = set(diff.keys())
+ for wealth in before, after:
+ wealth.ensure_currencies(displayed_currencies)
+ wealth.purge_currencies_except(displayed_currencies)
+ node: dict[str, Any] = {
+ 'name': path,
+ 'direct_target': direct_target,
+ 'wealth_before': before.moneys,
+ 'wealth_diff': diff,
+ 'wealth_after': after.moneys,
+ 'children': []}
+ parent_children += [node]
+ parent_children = node['children']
+ ctx['roots'] = observed_tree
+ ctx['id'] = id_
+ ctx['dat_lines'] = [dl if raw else dl.as_dict
+ for dl in booking.booked_lines]
+ ctx['valid'] = self.server.ledger.bookings_valid_up_incl(id_)
+ if not raw:
+ ctx['all_accounts'] = sorted(self.server.ledger.accounts.keys())
+ self._send_rendered(
+ _PAGENAME_EDIT_RAW if raw else _PAGENAME_EDIT_STRUCT, ctx)
+
+ def get_ledger(self, ctx: dict[str, Any], raw: bool) -> None:
+ """Display ledger of all Bookings."""
+ ctx['dat_lines'] = self.server.ledger.dat_lines
+ self._send_rendered(
+ _PAGENAME_LEDGER_RAW if raw else _PAGENAME_LEDGER_STRUCT, ctx)
--- /dev/null
+"""Actual ledger classes."""
+
+# standard libs
+from datetime import date as dt_date
+from decimal import Decimal, InvalidOperation as DecimalInvalidOperation
+from pathlib import Path
+from typing import Any, Iterator, Optional, Self
+
+
+_PREFIX_DEF = 'def '
+
+
+class _Dictable:
+ """Line abstraction featuring .as_dict property."""
+ dictables: set[str] = set()
+
+ @property
+ def as_dict(self) -> dict[str, Any]:
+ """Return as JSON-ready dict attributes listed in .dictables."""
+ d = {}
+ for name in self.dictables:
+ value = getattr(self, name)
+ if hasattr(value, 'as_dict'):
+ value = value.as_dict
+ elif not isinstance(value, (str, int)):
+ value = str(value)
+ d[name] = value
+ return d
+
+
+class _Wealth():
+ """Collects amounts mapped to currencies."""
+
+ def __init__(self, moneys: Optional[dict[str, Decimal]] = None) -> None:
+ self.moneys = moneys if moneys else {}
+ self._sort_with_euro_up()
+
+ def _sort_with_euro_up(self) -> None:
+ if '€' in self.moneys:
+ temp = {'€': self.moneys['€']}
+ for curr in sorted([c for c in self.moneys if c != '€']):
+ temp[curr] = self.moneys[curr]
+ self.moneys = temp
+
+ def ensure_currencies(self, currencies: set[str]) -> None:
+ """Ensure all of currencies have at least a Decimal(0) entry."""
+ for currency in currencies:
+ if currency not in self.moneys:
+ self.moneys[currency] = Decimal(0)
+ self._sort_with_euro_up()
+
+ def purge_currencies_except(self, currencies: set[str]) -> None:
+ """From .moneys remove currencies except those listed."""
+ self.moneys = {curr: amt for curr, amt in self.moneys.items()
+ if curr in currencies}
+
+ def _add(self, other: Self, add=True) -> Self:
+ result = self.__class__(self.moneys.copy())
+ result.ensure_currencies(set(other.moneys.keys()))
+ for currency, amount in other.moneys.items():
+ result.moneys[currency] += amount if add else -amount
+ return result
+
+ def __add__(self, other: Self) -> Self:
+ return self._add(other)
+
+ def __sub__(self, other: Self) -> Self:
+ return self._add(other, add=False)
+
+ @property
+ def sink_empty(self) -> bool:
+ """Return if all evens out to zero."""
+ return not bool(self.as_sink.moneys)
+
+ @property
+ def as_sink(self) -> '_Wealth':
+ """Drop zero amounts, invert non-zero ones."""
+ sink = _Wealth()
+ for moneys in [_Wealth({c: a}) for c, a in self.moneys.items() if a]:
+ sink -= moneys
+ return sink
+
+
+class Account:
+ """Combine name, position in tree of own, and wealth of self + children."""
+
+ def __init__(self, parent: Optional['Account'], basename: str) -> None:
+ self._wealth_diffs: dict[int, _Wealth] = {}
+ self.basename = basename
+ self.desc = ''
+ self.children: list[Self] = []
+ self.parent = parent
+ if self.parent:
+ self.parent.children += [self]
+
+ def _get_local_wealth(self, up_incl: int) -> _Wealth:
+ """Calculate by summing all recorded wealth diffs up+incl. _Booking."""
+ wealth = _Wealth()
+ for wealth_diff in [wd for id_, wd in self._wealth_diffs.items()
+ if id_ <= up_incl]:
+ wealth += wealth_diff
+ return wealth
+
+ def get_wealth(self, up_incl: int) -> _Wealth:
+ """Total of .local_wealth with that of .children up+incl. _Booking."""
+ total = _Wealth()
+ total += self._get_local_wealth(up_incl)
+ for child in self.children:
+ total += child.get_wealth(up_incl)
+ return total
+
+ def add_wealth_diff(self, booking_id: int, wealth_diff: _Wealth) -> None:
+ """Add knowledge that _Booking of booking_add added wealth_diff."""
+ if booking_id in self._wealth_diffs:
+ self._wealth_diffs[booking_id] += wealth_diff
+ else:
+ self._wealth_diffs[booking_id] = wealth_diff
+
+ @staticmethod
+ def path_to_steps(full_path: str) -> Iterator[tuple[str, str]]:
+ """Split full_path into steps, for each return its path, basename."""
+ rebuilt_path = ''
+ for step_name in full_path.split(':'):
+ rebuilt_path += (':' if rebuilt_path else '') + step_name
+ yield rebuilt_path, step_name
+
+
+class DatLine(_Dictable):
+ """Line of .dat file parsed into comments and machine-readable data."""
+ dictables = {'booking_line', 'code', 'comment', 'error', 'is_intro'}
+ prev_line_empty: bool
+
+ def __init__(self, line: str) -> None:
+ self.raw = line[:]
+ halves = [t.rstrip() for t in line.split(';', maxsplit=1)]
+ self.comment = halves[1] if len(halves) > 1 else ''
+ self.code = halves[0]
+ self.booking_line: Optional[_BookingLine] = None
+
+ @property
+ def comment_instructions(self) -> dict[str, str]:
+ """Parse .comment into Account modification instructions."""
+ instructions = {}
+ if self.comment.startswith(_PREFIX_DEF):
+ parts = [part.strip() for part
+ in self.comment[len(_PREFIX_DEF):].split(';')]
+ first_part_parts = parts[0].split(maxsplit=1)
+ account_name = first_part_parts[0]
+ desc = first_part_parts[1] if len(first_part_parts) > 1 else ''
+ instructions[account_name] = desc
+ return instructions
+
+ @property
+ def comment_in_ledger(self) -> str:
+ """What to show in structured ledger view (no instructions)."""
+ return '' if len(self.comment_instructions) > 0 else self.comment
+
+ @property
+ def is_intro(self) -> bool:
+ """Return if intro line of a _Booking."""
+ return isinstance(self.booking_line, _IntroLine)
+
+ @property
+ def booking_id(self) -> int:
+ """If .booking_line, its .booking_id, else -1."""
+ return self.booking.id_ if self.booking else -1
+
+ @property
+ def booking(self) -> Optional['_Booking']:
+ """If .booking_line, matching _Booking, else None."""
+ return self.booking_line.booking if self.booking_line else None
+
+ @property
+ def error(self) -> str:
+ """Return error if registered on attempt to parse into _BookingLine."""
+ return '; '.join(self.booking_line.errors) if self.booking_line else ''
+
+ @property
+ def is_questionable(self) -> bool:
+ """Return whether line be questionable per associated _Booking."""
+ return (self.booking_line.booking.is_questionable if self.booking_line
+ else False)
+
+ @property
+ def raw_nbsp(self) -> str:
+ """Return .raw but ensure whitespace as , and at least one."""
+ if not self.raw:
+ return ' '
+ return self.raw.replace(' ', ' ')
+
+
+class _BookingLine(_Dictable):
+ """Parsed code part of a DatLine belonging to a _Booking."""
+
+ def __init__(self, booking: '_Booking') -> None:
+ self.errors: list[str] = []
+ self.booking = booking
+ self.idx = 0
+
+
+class _IntroLine(_BookingLine):
+ """First line of a _Booking, expected to carry date etc."""
+ dictables = {'date', 'target'}
+
+ def __init__(self, booking: '_Booking', code: str) -> None:
+ super().__init__(booking)
+ if code[0].isspace():
+ self.errors += ['intro line indented']
+ toks = code.lstrip().split(maxsplit=1)
+ self.date = toks[0]
+ self.target = toks[1] if len(toks) > 1 else ''
+ if len(toks) == 1:
+ self.errors += ['illegal number of tokens']
+ try:
+ dt_date.fromisoformat(self.date)
+ except ValueError:
+ self.errors += [f'not properly formatted legal date: {self.date}']
+
+
+class _TransferLine(_BookingLine):
+ """Non-first _Booking line, expected to carry value movement."""
+ dictables = {'amount', 'account', 'currency'}
+
+ def __init__(self, booking: '_Booking', code: str, idx: int) -> None:
+ super().__init__(booking)
+ self.idx = idx
+ self.currency = ''
+ self.amount: Optional[Decimal] = None
+ if not code[0].isspace():
+ self.errors += ['transfer line not indented']
+ toks = code.lstrip().split()
+ self.account = toks[0]
+ if len(toks) > 1:
+ self.currency = toks[2] if 3 == len(toks) else '€'
+ try:
+ self.amount = Decimal(toks[1])
+ except DecimalInvalidOperation:
+ self.errors += [f'improper amount value: {toks[1]}']
+ if len(toks) > 3:
+ self.errors += ['illegal number of tokens']
+
+ @property
+ def amount_short(self) -> str:
+ """If no .amount, '', else printed – but if too long, ellipsized."""
+ if self.amount is not None:
+ exp = self.amount.as_tuple().exponent
+ assert isinstance(exp, int)
+ return f'{self.amount:.1f}…' if exp < -2 else f'{self.amount:.2f}'
+ return ''
+
+
+class _Booking:
+ """Represents lines of individual booking."""
+ next: Optional[Self]
+ prev: Optional[Self]
+
+ def __init__(self,
+ id_: int,
+ booked_lines: list[DatLine],
+ gap_lines: Optional[list[DatLine]] = None
+ ) -> None:
+ self.next, self.prev = None, None
+ self.id_, self.booked_lines = id_, booked_lines[:]
+ self._gap_lines = gap_lines[:] if gap_lines else []
+ # parse booked_lines into Intro- and _TransferLines
+ self.intro_line = _IntroLine(self, self.booked_lines[0].code)
+ self._transfer_lines = [
+ _TransferLine(self, b_line.code, i+1) for i, b_line
+ in enumerate(self.booked_lines[1:])]
+ self.booked_lines[0].booking_line = self.intro_line
+ for i, b_line in enumerate(self._transfer_lines):
+ self.booked_lines[i + 1].booking_line = b_line
+ # calculate .account_changes
+ changes = _Wealth()
+ sink_account = None
+ self.account_changes: dict[str, _Wealth] = {}
+ for transfer_line in [tl for tl in self._transfer_lines
+ if not tl.errors]:
+ if transfer_line.account not in self.account_changes:
+ self.account_changes[transfer_line.account] = _Wealth()
+ if transfer_line.amount is None:
+ if sink_account:
+ transfer_line.errors += ['too many sinks']
+ sink_account = transfer_line.account
+ continue
+ change = _Wealth({transfer_line.currency: transfer_line.amount})
+ self.account_changes[transfer_line.account] += change
+ changes += change
+ if sink_account:
+ self.account_changes[sink_account] += changes.as_sink
+ elif not changes.sink_empty:
+ self._transfer_lines[-1].errors += ['needed sink missing']
+
+ def recalc_prev_next(self, bookings: list[Self]) -> None:
+ """Assuming .id_ to be index in bookings, link prev + next bookings."""
+ self.prev = bookings[self.id_ - 1] if self.id_ > 0 else None
+ if self.prev:
+ self.prev.next = self
+ self.next = (bookings[self.id_ + 1] if self.id_ + 1 < len(bookings)
+ else None)
+ if self.next:
+ self.next.prev = self
+
+ @property
+ def gap_lines(self) -> list[DatLine]:
+ """Return ._gap_lines or, if empty, list of one empty DatLine."""
+ return self._gap_lines if self._gap_lines else [DatLine('')]
+
+ @gap_lines.setter
+ def gap_lines(self, gap_lines=list[DatLine]) -> None:
+ self._gap_lines = gap_lines[:]
+
+ @property
+ def gap_lines_copied(self) -> list[DatLine]:
+ """Return new DatLines generated from .raw's of .gap_lines."""
+ return [DatLine(dat_line.raw) for dat_line in self.gap_lines]
+
+ @property
+ def booked_lines_copied(self) -> list[DatLine]:
+ """Return new DatLines generated from .raw's of .booked_lines."""
+ return [DatLine(dat_line.raw) for dat_line in self.booked_lines]
+
+ @property
+ def target(self) -> str:
+ """Return main other party for transaction."""
+ return self.intro_line.target
+
+ @property
+ def date(self) -> str:
+ """Return _Booking's day's date."""
+ return self.intro_line.date
+
+ def can_move(self, up: bool) -> bool:
+ """Whether movement rules would allow self to move up or down."""
+ if up and ((not self.prev) or self.prev.date != self.date):
+ return False
+ if (not up) and ((not self.next) or self.next.date != self.date):
+ return False
+ return True
+
+ @property
+ def is_questionable(self) -> bool:
+ """Whether lines count any errors."""
+ for _ in [bl for bl in [self.intro_line] + self._transfer_lines
+ if bl.errors]:
+ return True
+ return False
+
+
+class Ledger:
+ """Collection of DatLines, and Accounts, _Bookings derived from them."""
+ accounts: dict[str, Account]
+ bookings: list[_Booking]
+ dat_lines: list[DatLine]
+ initial_gap_lines: list[DatLine]
+
+ def __init__(self, path_dat: Path) -> None:
+ self._path_dat = path_dat
+ self.load()
+
+ def load(self) -> None:
+ """(Re-)read ledger from file at ._path_dat."""
+ self.accounts, self.bookings, self.initial_gap_lines = {}, [], []
+ self.dat_lines: list[DatLine] = [
+ DatLine(line)
+ for line in self._path_dat.read_text(encoding='utf8').splitlines()]
+ self.last_save_hash = self._hash_dat_lines()
+ booked: list[DatLine] = []
+ gap_lines: list[DatLine] = []
+ booking: Optional[_Booking] = None
+ for dat_line in self.dat_lines + [DatLine('')]:
+ if dat_line.code:
+ if gap_lines:
+ if booking:
+ booking.gap_lines = gap_lines[:]
+ else:
+ self.initial_gap_lines = gap_lines[:]
+ gap_lines.clear()
+ booked += [dat_line]
+ else:
+ gap_lines += [dat_line]
+ if booked:
+ booking = _Booking(len(self.bookings), booked[:])
+ self.bookings += [booking]
+ booked.clear()
+ if booking:
+ booking.gap_lines = gap_lines[:-1]
+ self._sync(recalc_datlines=False)
+
+ def _sync(self, recalc_datlines=True, check_dates=True):
+ if recalc_datlines:
+ self.dat_lines = self.initial_gap_lines[:]
+ for booking in self.bookings:
+ self.dat_lines += booking.booked_lines
+ self.dat_lines += booking.gap_lines
+ for idx, booking in enumerate(self.bookings[1:]):
+ booking.prev = self.bookings[idx]
+ for idx, booking in enumerate(self.bookings[:-1]):
+ booking.next = self.bookings[idx + 1]
+ self.bookings[-1].next = None
+ if check_dates:
+ last_date = ''
+ err_msg = 'date < previous valid date'
+ for booking in self.bookings:
+ if err_msg in booking.intro_line.errors:
+ booking.intro_line.errors.remove(err_msg)
+ if last_date > booking.date:
+ booking.intro_line.errors += [err_msg]
+ else:
+ last_date = booking.date
+ self._recalc_prev_line_empty()
+ self.accounts = {}
+ for dat_line in self.dat_lines:
+ for acc_name, desc in dat_line.comment_instructions.items():
+ self.ensure_account(acc_name)
+ self.accounts[acc_name].desc = desc
+ for booking in self.bookings:
+ for acc_name, wealth in booking.account_changes.items():
+ self.ensure_account(acc_name)
+ self.accounts[acc_name].add_wealth_diff(booking.id_, wealth)
+
+ def ensure_account(self, full_path: str) -> None:
+ """If full_path not in self.accounts, add its tree with Accounts."""
+ parent_path = ''
+ for path, step_name in Account.path_to_steps(full_path):
+ if path not in self.accounts:
+ self.accounts[path] = Account(
+ self.accounts[parent_path] if parent_path else None,
+ step_name)
+ parent_path = path
+
+ def save(self) -> None:
+ """Save current state to ._path_dat."""
+ self._path_dat.write_text(
+ '\n'.join([line.raw for line in self.dat_lines]), encoding='utf8')
+ self.load()
+
+ def _hash_dat_lines(self) -> int:
+ return hash(tuple(dl.raw for dl in self.dat_lines))
+
+ def bookings_valid_up_incl(self, booking_id: int) -> bool:
+ """If no .is_questionable in self.bookings up to booking_id."""
+ return len([b for b in self.bookings[:booking_id + 1]
+ if b.is_questionable]
+ ) < 1
+
+ @property
+ def tainted(self) -> bool:
+ """If .dat_lines different to those of last .load()."""
+ return self._hash_dat_lines() != self.last_save_hash
+
+ def _recalc_prev_line_empty(self) -> None:
+ prev_line = None
+ for line in self.dat_lines:
+ line.prev_line_empty = False
+ if prev_line:
+ line.prev_line_empty = not (prev_line.code
+ + prev_line.comment_in_ledger)
+ if prev_line or line.code + line.comment_in_ledger: # jump over
+ prev_line = line # empty start
+
+ def _move_booking(self, idx_from: int, idx_to: int):
+ moving = self.bookings[idx_from]
+ if idx_from >= idx_to: # moving upward, deletion must
+ del self.bookings[idx_from] # precede insertion to keep
+ self.bookings[idx_to:idx_to] = [moving] # deletion index, downwards
+ if idx_from < idx_to: # the other way around keeps
+ del self.bookings[idx_from] # insertion index
+ min_idx, max_idx = min(idx_from, idx_to), max(idx_from, idx_to)
+ for idx, booking in enumerate(self.bookings[min_idx:max_idx + 1]):
+ booking.id_ = min_idx + idx
+
+ def move_booking(self, idx_from: int, up: bool) -> int:
+ """Move _Booking of old_id one step up or downwards"""
+ new_id = idx_from + (-1 if up else 1)
+ idx_to = new_id + (0 if up else 1) # down-move imlies jump over next
+ self._move_booking(new_id, idx_to)
+ self._sync()
+ return new_id
+
+ def rewrite_booking(self, old_id: int, new_lines: list[DatLine]) -> int:
+ """Rewrite _Booking with new_lines, move if changed date."""
+ old_booking = self.bookings[old_id]
+ booked_start, booked_end, gap_start_found = -1, 0, False
+ for i, line in enumerate(new_lines):
+ if booked_start < 0 and line.code.strip(): # ignore any initial
+ booked_start = i # empty lines
+ elif booked_start >= 0 and not line.code.strip(): # past start,
+ gap_start_found = True # yet empty? gap
+ if not gap_start_found: # end index is always after current line,
+ booked_end += 1 # provided we're not yet in the gap
+ elif line.code.strip():
+ new_lines[i] = DatLine(f'; {line.code}')
+ before_gap = new_lines[:booked_start]
+ new_booked_lines = (new_lines[booked_start:booked_end]
+ if booked_start > -1 else [])
+ after_gap = old_booking.gap_lines_copied # new gap be old gap _plus_
+ after_gap += new_lines[booked_end:] # any new gap lines
+ if not new_booked_lines: # interpret empty posting as deletion request
+ del self.bookings[old_id]
+ for booking in self.bookings[old_id:]:
+ booking.id_ -= 1
+ leftover_gap = before_gap + after_gap
+ if old_id == 0:
+ self.initial_gap_lines += leftover_gap
+ else:
+ self.bookings[old_id - 1].gap_lines += leftover_gap
+ self._sync(check_dates=False)
+ return old_id if old_id < len(self.bookings) else 0
+ if old_id == 0:
+ self.initial_gap_lines += before_gap
+ else:
+ self.bookings[old_id - 1].gap_lines += before_gap
+ new_date = new_booked_lines[0].code.lstrip().split(maxsplit=1)[0]
+ updated = _Booking(old_id, new_booked_lines, after_gap)
+ self.bookings[old_id] = updated
+ if new_date != old_booking.date: # if changed date, move to there
+ if self.bookings[0].date > new_date:
+ new_id = 0
+ elif self.bookings[-1].date < new_date:
+ new_id = self.bookings[-1].id_ + 1
+ else:
+ of_date_1st = i_booking = self.bookings[0]
+ while i_booking.next:
+ if of_date_1st.date != i_booking.date:
+ of_date_1st = i_booking
+ if i_booking.next.date > new_date:
+ break
+ i_booking = i_booking.next
+ # ensure that, if we land in group of like-dated _Bookings, we
+ # land on the edge closest to our last position
+ new_id = (of_date_1st.id_ if old_id < i_booking.id_
+ else i_booking.id_ + 1)
+ self._move_booking(old_id, new_id)
+ self._sync(check_dates=False)
+ return updated.id_
+
+ def _add_new_booking(
+ self,
+ target: str,
+ dat_lines_transaction: list[DatLine],
+ intro_comment: str = ''
+ ) -> int:
+ booking = _Booking(
+ len(self.bookings),
+ [DatLine(f'{dt_date.today().isoformat()} {target}'
+ + ' ; '.join([''] + [s for s in [intro_comment] if s]))
+ ] + dat_lines_transaction)
+ self.bookings += [booking]
+ self._sync()
+ return booking.id_
+
+ def add_empty_booking(self) -> int:
+ """Add new _Booking to end of ledger."""
+ return self._add_new_booking('?', [])
+
+ def copy_booking(self, copied_id: int) -> int:
+ """Add copy of _Booking of copied_id to_end of ledger."""
+ copied = self.bookings[copied_id]
+ return self._add_new_booking(copied.target,
+ copied.booked_lines_copied[1:],
+ copied.booked_lines[0].comment)
"""Viewer and editor for ledger .dat files."""
# standard libs
-from datetime import date as dt_date
-from decimal import Decimal, InvalidOperation as DecimalInvalidOperation
from os import environ
from pathlib import Path
from sys import exit as sys_exit
-from typing import Any, Iterator, Optional, Self
# non-standard libs
try:
- from plomlib.web import PlomHttpHandler, PlomHttpServer, PlomQueryMap
+ from ledgplom.http import Server
except ModuleNotFoundError as e:
print(f"Missing dependency: {e}. Please run with 'install_deps' argument.")
sys_exit(1)
LEDGER_DAT = environ.get('LEDGER_DAT')
-SERVER_PORT = 8084
-SERVER_HOST = '127.0.0.1'
-PATH_TEMPLATES = Path('templates')
-
-PREFIX_DEF = 'def '
-PREFIX_LEDGER = 'ledger_'
-PREFIX_EDIT = 'edit_'
-PREFIX_FILE = 'file_'
-TOK_STRUCT = 'structured'
-TOK_RAW = 'raw'
-EDIT_STRUCT = f'{PREFIX_EDIT}{TOK_STRUCT}'
-EDIT_RAW = f'{PREFIX_EDIT}{TOK_RAW}'
-LEDGER_STRUCT = f'{PREFIX_LEDGER}{TOK_STRUCT}'
-LEDGER_RAW = f'{PREFIX_LEDGER}{TOK_RAW}'
-
-
-class Dictable:
- """Line abstraction featuring .as_dict property."""
- dictables: set[str] = set()
-
- @property
- def as_dict(self) -> dict[str, Any]:
- """Return as JSON-ready dict attributes listed in .dictables."""
- d = {}
- for name in self.dictables:
- value = getattr(self, name)
- if hasattr(value, 'as_dict'):
- value = value.as_dict
- elif not isinstance(value, (str, int)):
- value = str(value)
- d[name] = value
- return d
-
-
-class Wealth():
- """Collects amounts mapped to currencies."""
-
- def __init__(self, moneys: Optional[dict[str, Decimal]] = None) -> None:
- self.moneys = moneys if moneys else {}
- self._sort_with_euro_up()
-
- def _sort_with_euro_up(self) -> None:
- if '€' in self.moneys:
- temp = {'€': self.moneys['€']}
- for curr in sorted([c for c in self.moneys if c != '€']):
- temp[curr] = self.moneys[curr]
- self.moneys = temp
-
- def ensure_currencies(self, currencies: set[str]) -> None:
- """Ensure all of currencies have at least a Decimal(0) entry."""
- for currency in currencies:
- if currency not in self.moneys:
- self.moneys[currency] = Decimal(0)
- self._sort_with_euro_up()
-
- def purge_currencies_except(self, currencies: set[str]) -> None:
- """From .moneys remove currencies except those listed."""
- self.moneys = {curr: amt for curr, amt in self.moneys.items()
- if curr in currencies}
-
- def _add(self, other: Self, add=True) -> Self:
- result = self.__class__(self.moneys.copy())
- result.ensure_currencies(set(other.moneys.keys()))
- for currency, amount in other.moneys.items():
- result.moneys[currency] += amount if add else -amount
- return result
-
- def __add__(self, other: Self) -> Self:
- return self._add(other)
-
- def __sub__(self, other: Self) -> Self:
- return self._add(other, add=False)
-
- @property
- def sink_empty(self) -> bool:
- """Return if all evens out to zero."""
- return not bool(self.as_sink.moneys)
-
- @property
- def as_sink(self) -> 'Wealth':
- """Drop zero amounts, invert non-zero ones."""
- sink = Wealth()
- for moneys in [Wealth({c: a}) for c, a in self.moneys.items() if a]:
- sink -= moneys
- return sink
-
-
-class Account:
- """Combine name, position in tree of own, and wealth of self + children."""
-
- def __init__(self, parent: Optional['Account'], basename: str) -> None:
- self._wealth_diffs: dict[int, Wealth] = {}
- self.basename = basename
- self.desc = ''
- self.children: list[Self] = []
- self.parent = parent
- if self.parent:
- self.parent.children += [self]
-
- def _get_local_wealth(self, up_incl: int) -> Wealth:
- """Calculate by summing all recorded wealth diffs up+incl. Booking."""
- wealth = Wealth()
- for wealth_diff in [wd for id_, wd in self._wealth_diffs.items()
- if id_ <= up_incl]:
- wealth += wealth_diff
- return wealth
-
- def get_wealth(self, up_incl: int) -> Wealth:
- """Total of .local_wealth with that of .children up+incl. Booking."""
- total = Wealth()
- total += self._get_local_wealth(up_incl)
- for child in self.children:
- total += child.get_wealth(up_incl)
- return total
-
- def add_wealth_diff(self, booking_id: int, wealth_diff: Wealth) -> None:
- """Add knowledge that Booking of booking_add added wealth_diff."""
- if booking_id in self._wealth_diffs:
- self._wealth_diffs[booking_id] += wealth_diff
- else:
- self._wealth_diffs[booking_id] = wealth_diff
-
- @staticmethod
- def path_to_steps(full_path: str) -> Iterator[tuple[str, str]]:
- """Split full_path into steps, for each return its path, basename."""
- rebuilt_path = ''
- for step_name in full_path.split(':'):
- rebuilt_path += (':' if rebuilt_path else '') + step_name
- yield rebuilt_path, step_name
-
-
-class DatLine(Dictable):
- """Line of .dat file parsed into comments and machine-readable data."""
- dictables = {'booking_line', 'code', 'comment', 'error', 'is_intro'}
- prev_line_empty: bool
-
- def __init__(self, line: str) -> None:
- self.raw = line[:]
- halves = [t.rstrip() for t in line.split(';', maxsplit=1)]
- self.comment = halves[1] if len(halves) > 1 else ''
- self.code = halves[0]
- self.booking_line: Optional[BookingLine] = None
-
- @property
- def comment_instructions(self) -> dict[str, str]:
- """Parse .comment into Account modification instructions."""
- instructions = {}
- if self.comment.startswith(PREFIX_DEF):
- parts = [part.strip() for part
- in self.comment[len(PREFIX_DEF):].split(';')]
- first_part_parts = parts[0].split(maxsplit=1)
- account_name = first_part_parts[0]
- desc = first_part_parts[1] if len(first_part_parts) > 1 else ''
- instructions[account_name] = desc
- return instructions
-
- @property
- def comment_in_ledger(self) -> str:
- """What to show in structured ledger view (no instructions)."""
- return '' if len(self.comment_instructions) > 0 else self.comment
-
- @property
- def is_intro(self) -> bool:
- """Return if intro line of a Booking."""
- return isinstance(self.booking_line, IntroLine)
-
- @property
- def booking_id(self) -> int:
- """If .booking_line, its .booking_id, else -1."""
- return self.booking.id_ if self.booking else -1
-
- @property
- def booking(self) -> Optional['Booking']:
- """If .booking_line, matching Booking, else None."""
- return self.booking_line.booking if self.booking_line else None
-
- @property
- def error(self) -> str:
- """Return error if registered on attempt to parse into BookingLine."""
- return '; '.join(self.booking_line.errors) if self.booking_line else ''
-
- @property
- def is_questionable(self) -> bool:
- """Return whether line be questionable per associated Booking."""
- return (self.booking_line.booking.is_questionable if self.booking_line
- else False)
-
- @property
- def raw_nbsp(self) -> str:
- """Return .raw but ensure whitespace as , and at least one."""
- if not self.raw:
- return ' '
- return self.raw.replace(' ', ' ')
-
-
-class BookingLine(Dictable):
- """Parsed code part of a DatLine belonging to a Booking."""
-
- def __init__(self, booking: 'Booking') -> None:
- self.errors: list[str] = []
- self.booking = booking
- self.idx = 0
-
-
-class IntroLine(BookingLine):
- """First line of a Booking, expected to carry date etc."""
- dictables = {'date', 'target'}
-
- def __init__(self, booking: 'Booking', code: str) -> None:
- super().__init__(booking)
- if code[0].isspace():
- self.errors += ['intro line indented']
- toks = code.lstrip().split(maxsplit=1)
- self.date = toks[0]
- self.target = toks[1] if len(toks) > 1 else ''
- if len(toks) == 1:
- self.errors += ['illegal number of tokens']
- try:
- dt_date.fromisoformat(self.date)
- except ValueError:
- self.errors += [f'not properly formatted legal date: {self.date}']
-
-
-class TransferLine(BookingLine):
- """Non-first Booking line, expected to carry value movement."""
- dictables = {'amount', 'account', 'currency'}
-
- def __init__(self, booking: 'Booking', code: str, idx: int) -> None:
- super().__init__(booking)
- self.idx = idx
- self.currency = ''
- self.amount: Optional[Decimal] = None
- if not code[0].isspace():
- self.errors += ['transfer line not indented']
- toks = code.lstrip().split()
- self.account = toks[0]
- if len(toks) > 1:
- self.currency = toks[2] if 3 == len(toks) else '€'
- try:
- self.amount = Decimal(toks[1])
- except DecimalInvalidOperation:
- self.errors += [f'improper amount value: {toks[1]}']
- if len(toks) > 3:
- self.errors += ['illegal number of tokens']
-
- @property
- def amount_short(self) -> str:
- """If no .amount, '', else printed – but if too long, ellipsized."""
- if self.amount is not None:
- exp = self.amount.as_tuple().exponent
- assert isinstance(exp, int)
- return f'{self.amount:.1f}…' if exp < -2 else f'{self.amount:.2f}'
- return ''
-
-
-class Booking:
- """Represents lines of individual booking."""
- next: Optional[Self]
- prev: Optional[Self]
-
- def __init__(self,
- id_: int,
- booked_lines: list[DatLine],
- gap_lines: Optional[list[DatLine]] = None
- ) -> None:
- self.next, self.prev = None, None
- self.id_, self.booked_lines = id_, booked_lines[:]
- self._gap_lines = gap_lines[:] if gap_lines else []
- # parse booked_lines into Intro- and TransferLines
- self.intro_line = IntroLine(self, self.booked_lines[0].code)
- self._transfer_lines = [
- TransferLine(self, b_line.code, i+1) for i, b_line
- in enumerate(self.booked_lines[1:])]
- self.booked_lines[0].booking_line = self.intro_line
- for i, b_line in enumerate(self._transfer_lines):
- self.booked_lines[i + 1].booking_line = b_line
- # calculate .account_changes
- changes = Wealth()
- sink_account = None
- self.account_changes: dict[str, Wealth] = {}
- for transfer_line in [tl for tl in self._transfer_lines
- if not tl.errors]:
- if transfer_line.account not in self.account_changes:
- self.account_changes[transfer_line.account] = Wealth()
- if transfer_line.amount is None:
- if sink_account:
- transfer_line.errors += ['too many sinks']
- sink_account = transfer_line.account
- continue
- change = Wealth({transfer_line.currency: transfer_line.amount})
- self.account_changes[transfer_line.account] += change
- changes += change
- if sink_account:
- self.account_changes[sink_account] += changes.as_sink
- elif not changes.sink_empty:
- self._transfer_lines[-1].errors += ['needed sink missing']
-
- def recalc_prev_next(self, bookings: list[Self]) -> None:
- """Assuming .id_ to be index in bookings, link prev + next Bookings."""
- self.prev = bookings[self.id_ - 1] if self.id_ > 0 else None
- if self.prev:
- self.prev.next = self
- self.next = (bookings[self.id_ + 1] if self.id_ + 1 < len(bookings)
- else None)
- if self.next:
- self.next.prev = self
-
- @property
- def gap_lines(self) -> list[DatLine]:
- """Return ._gap_lines or, if empty, list of one empty DatLine."""
- return self._gap_lines if self._gap_lines else [DatLine('')]
-
- @gap_lines.setter
- def gap_lines(self, gap_lines=list[DatLine]) -> None:
- self._gap_lines = gap_lines[:]
-
- @property
- def gap_lines_copied(self) -> list[DatLine]:
- """Return new DatLines generated from .raw's of .gap_lines."""
- return [DatLine(dat_line.raw) for dat_line in self.gap_lines]
-
- @property
- def booked_lines_copied(self) -> list[DatLine]:
- """Return new DatLines generated from .raw's of .booked_lines."""
- return [DatLine(dat_line.raw) for dat_line in self.booked_lines]
-
- @property
- def target(self) -> str:
- """Return main other party for transaction."""
- return self.intro_line.target
-
- @property
- def date(self) -> str:
- """Return Booking's day's date."""
- return self.intro_line.date
-
- def can_move(self, up: bool) -> bool:
- """Whether movement rules would allow self to move up or down."""
- if up and ((not self.prev) or self.prev.date != self.date):
- return False
- if (not up) and ((not self.next) or self.next.date != self.date):
- return False
- return True
-
- @property
- def is_questionable(self) -> bool:
- """Whether lines count any errors."""
- for _ in [bl for bl in [self.intro_line] + self._transfer_lines
- if bl.errors]:
- return True
- return False
-
-
-class Handler(PlomHttpHandler):
- """"Handles HTTP requests."""
- mapper = PlomQueryMap
-
- def _send_rendered(self, tmpl_name: str, ctx: dict[str, Any]) -> None:
- self.send_rendered(Path(f'{tmpl_name}.tmpl'), ctx)
-
- def do_POST(self) -> None:
- """"Route POST requests to respective handlers."""
- # pylint: disable=invalid-name
- redir_target = Path(self.path)
- if (file_prefixed := self.postvars.keys_prefixed(PREFIX_FILE)):
- self.post_file_action(file_prefixed[0])
- elif (self.pagename.startswith(PREFIX_EDIT)
- and self.postvars.first('apply')):
- redir_target = self.post_edit()
- elif self.pagename.startswith(PREFIX_LEDGER):
- redir_target = self.post_ledger_action()
- self.redirect(redir_target)
-
- def post_file_action(self, file_prefixed: str) -> None:
- """Based on file_prefixed name, trigger .server.ledger.(load|save)."""
- if file_prefixed == f'{PREFIX_FILE}load':
- self.server.ledger.load()
- elif file_prefixed == f'{PREFIX_FILE}save':
- self.server.ledger.save()
-
- def post_edit(self) -> Path:
- """Based on postvars, edit targeted Booking."""
- booking = self.server.ledger.bookings[int(self.path_toks[2])]
- new_lines = []
- if self.pagename == EDIT_STRUCT:
- line_keys = self.postvars.keys_prefixed('line_')
- lineno_to_inputs: dict[int, list[str]] = {}
- for key in line_keys:
- toks = key.split('_', maxsplit=2)
- lineno = int(toks[1])
- if lineno not in lineno_to_inputs:
- lineno_to_inputs[lineno] = []
- lineno_to_inputs[lineno] += [toks[2]]
- indent = ' '
- for lineno, input_names in lineno_to_inputs.items():
- data = ''
- comment = self.postvars.first(f'line_{lineno}_comment')
- for name in input_names:
- input_ = self.postvars.first(f'line_{lineno}_{name}'
- ).strip()
- if name == 'date':
- data = input_
- elif name == 'target':
- data += f' {input_}'
- elif name == 'error':
- data = f'{indent}{input_}'
- elif name == 'account':
- data = f'{indent}{input_}'
- elif name in {'amount', 'currency'}:
- data += f' {input_}'
- new_lines += [
- DatLine(f'{data} ; {comment}' if comment else data)]
- else: # edit_raw
- new_lines += [DatLine(line) for line
- in self.postvars.first('booking').splitlines()]
- new_id = self.server.ledger.rewrite_booking(booking.id_, new_lines)
- return Path('/bookings').joinpath(f'{new_id}')
-
- def post_ledger_action(self) -> Path:
- """Call .server.ledger.(move|copy|add_empty_new)_booking."""
- if 'add_booking' in self.postvars.as_dict:
- id_ = self.server.ledger.add_empty_booking()
- else:
- keys_prefixed = self.postvars.keys_prefixed(PREFIX_LEDGER)
- action, id_str = keys_prefixed[0].split('_', maxsplit=2)[1:]
- id_ = int(id_str)
- if action.startswith('move'):
- id_ = self.server.ledger.move_booking(id_, action == 'moveup')
- return Path(self.path).joinpath(f'#{id_}')
- id_ = self.server.ledger.copy_booking(id_)
- return Path(EDIT_STRUCT).joinpath(f'{id_}')
-
- def do_GET(self) -> None:
- """"Route GET requests to respective handlers."""
- # pylint: disable=invalid-name
- if self.pagename == 'bookings':
- self.redirect(
- Path('/').joinpath(EDIT_STRUCT).joinpath(self.path_toks[2]))
- return
- ctx = {'tainted': self.server.ledger.tainted, 'path': self.path}
- if self.pagename == 'balance':
- self.get_balance(ctx)
- elif self.pagename.startswith(PREFIX_EDIT):
- self.get_edit(ctx, self.pagename == EDIT_RAW)
- elif self.pagename.startswith(PREFIX_LEDGER):
- self.get_ledger(ctx, self.pagename == LEDGER_RAW)
- else:
- self.get_ledger(ctx, False)
-
- def get_balance(self, ctx) -> None:
- """Display tree of calculated Accounts over .bookings[:up_incl+1]."""
- id_ = int(self.params.first('up_incl') or '-1')
- ctx['roots'] = [ac for ac in self.server.ledger.accounts.values()
- if not ac.parent]
- ctx['valid'] = self.server.ledger.bookings_valid_up_incl(id_)
- ctx['booking'] = self.server.ledger.bookings[id_]
- ctx['path_up_incl'] = f'{self.path_toks[1]}?up_incl='
- self._send_rendered('balance', ctx)
-
- def get_edit(self, ctx, raw: bool) -> None:
- """Display edit form for individual Booking."""
- id_ = int(self.path_toks[2])
- booking = self.server.ledger.bookings[id_]
- observed_tree: list[dict[str, Any]] = []
- for full_path in sorted(booking.account_changes.keys()):
- parent_children: list[dict[str, Any]] = observed_tree
- for path, _ in Account.path_to_steps(full_path):
- already_registered = False
- for child in [n for n in parent_children if path == n['name']]:
- parent_children = child['children']
- already_registered = True
- break
- if already_registered:
- continue
- self.server.ledger.ensure_account(path)
- before = self.server.ledger.accounts[path].get_wealth(id_ - 1)
- after = self.server.ledger.accounts[path].get_wealth(id_)
- direct_target = full_path == path
- diff = {
- cur: amt for cur, amt in (after - before).moneys.items()
- if amt != 0
- or (direct_target
- and cur in booking.account_changes[full_path].moneys)}
- if diff or direct_target:
- displayed_currencies = set(diff.keys())
- for wealth in before, after:
- wealth.ensure_currencies(displayed_currencies)
- wealth.purge_currencies_except(displayed_currencies)
- node: dict[str, Any] = {
- 'name': path,
- 'direct_target': direct_target,
- 'wealth_before': before.moneys,
- 'wealth_diff': diff,
- 'wealth_after': after.moneys,
- 'children': []}
- parent_children += [node]
- parent_children = node['children']
- ctx['roots'] = observed_tree
- ctx['id'] = id_
- ctx['dat_lines'] = [dl if raw else dl.as_dict
- for dl in booking.booked_lines]
- ctx['valid'] = self.server.ledger.bookings_valid_up_incl(id_)
- if not raw:
- ctx['all_accounts'] = sorted(self.server.ledger.accounts.keys())
- self._send_rendered(EDIT_RAW if raw else EDIT_STRUCT, ctx)
-
- def get_ledger(self, ctx: dict[str, Any], raw: bool) -> None:
- """Display ledger of all Bookings."""
- ctx['dat_lines'] = self.server.ledger.dat_lines
- self._send_rendered(LEDGER_RAW if raw else LEDGER_STRUCT, ctx)
-
-
-class Server(PlomHttpServer):
- """Extends parent by loading .dat file into database for Handler."""
-
- def __init__(self, path_dat: Path, *args, **kwargs) -> None:
- super().__init__(PATH_TEMPLATES, (SERVER_HOST, SERVER_PORT), Handler)
- self.ledger = Ledger(path_dat)
-
-
-class Ledger:
- """Collection of DatLines, and Accounts, Bookings derived from them."""
- accounts: dict[str, Account]
- bookings: list[Booking]
- dat_lines: list[DatLine]
- initial_gap_lines: list[DatLine]
-
- def __init__(self, path_dat: Path) -> None:
- self._path_dat = path_dat
- self.load()
-
- def load(self) -> None:
- """(Re-)read ledger from file at ._path_dat."""
- self.accounts, self.bookings, self.initial_gap_lines = {}, [], []
- self.initial_gap_lines: list[DatLine] = [] # TODO: fix duplicate booking
- self.dat_lines: list[DatLine] = [
- DatLine(line)
- for line in self._path_dat.read_text(encoding='utf8').splitlines()]
- self.last_save_hash = self._hash_dat_lines()
- booked: list[DatLine] = []
- gap_lines: list[DatLine] = []
- booking: Optional[Booking] = None
- for dat_line in self.dat_lines + [DatLine('')]:
- if dat_line.code:
- if gap_lines:
- if booking:
- booking.gap_lines = gap_lines[:]
- else:
- self.initial_gap_lines = gap_lines[:]
- gap_lines.clear()
- booked += [dat_line]
- else:
- gap_lines += [dat_line]
- if booked:
- booking = Booking(len(self.bookings), booked[:])
- self.bookings += [booking]
- booked.clear()
- if booking:
- booking.gap_lines = gap_lines[:-1]
- self._sync(recalc_datlines=False)
-
- def _sync(self, recalc_datlines=True, check_dates=True):
- if recalc_datlines:
- self.dat_lines = self.initial_gap_lines[:]
- for booking in self.bookings:
- self.dat_lines += booking.booked_lines
- self.dat_lines += booking.gap_lines
- for idx, booking in enumerate(self.bookings[1:]):
- booking.prev = self.bookings[idx]
- for idx, booking in enumerate(self.bookings[:-1]):
- booking.next = self.bookings[idx + 1]
- self.bookings[-1].next = None
- if check_dates:
- last_date = ''
- err_msg = 'date < previous valid date'
- for booking in self.bookings:
- if err_msg in booking.intro_line.errors:
- booking.intro_line.errors.remove(err_msg)
- if last_date > booking.date:
- booking.intro_line.errors += [err_msg]
- else:
- last_date = booking.date
- self._recalc_prev_line_empty()
- self.accounts = {}
- for dat_line in self.dat_lines:
- for acc_name, desc in dat_line.comment_instructions.items():
- self.ensure_account(acc_name)
- self.accounts[acc_name].desc = desc
- for booking in self.bookings:
- for acc_name, wealth in booking.account_changes.items():
- self.ensure_account(acc_name)
- self.accounts[acc_name].add_wealth_diff(booking.id_, wealth)
-
- def ensure_account(self, full_path: str) -> None:
- """If full_path not in self.accounts, add its tree with Accounts."""
- parent_path = ''
- for path, step_name in Account.path_to_steps(full_path):
- if path not in self.accounts:
- self.accounts[path] = Account(
- self.accounts[parent_path] if parent_path else None,
- step_name)
- parent_path = path
-
- def save(self) -> None:
- """Save current state to ._path_dat."""
- self._path_dat.write_text(
- '\n'.join([line.raw for line in self.dat_lines]), encoding='utf8')
- self.load()
-
- def _hash_dat_lines(self) -> int:
- return hash(tuple(dl.raw for dl in self.dat_lines))
-
- def bookings_valid_up_incl(self, booking_id: int) -> bool:
- """If no .is_questionable in self.bookings up to booking_id."""
- return len([b for b in self.bookings[:booking_id + 1]
- if b.is_questionable]
- ) < 1
-
- @property
- def tainted(self) -> bool:
- """If .dat_lines different to those of last .load()."""
- return self._hash_dat_lines() != self.last_save_hash
-
- def _recalc_prev_line_empty(self) -> None:
- prev_line = None
- for line in self.dat_lines:
- line.prev_line_empty = False
- if prev_line:
- line.prev_line_empty = not (prev_line.code
- + prev_line.comment_in_ledger)
- if prev_line or line.code + line.comment_in_ledger: # jump over
- prev_line = line # empty start
-
- def _move_booking(self, idx_from: int, idx_to: int):
- moving = self.bookings[idx_from]
- if idx_from >= idx_to: # moving upward, deletion must
- del self.bookings[idx_from] # precede insertion to keep
- self.bookings[idx_to:idx_to] = [moving] # deletion index, downwards
- if idx_from < idx_to: # the other way around keeps
- del self.bookings[idx_from] # insertion index
- min_idx, max_idx = min(idx_from, idx_to), max(idx_from, idx_to)
- for idx, booking in enumerate(self.bookings[min_idx:max_idx + 1]):
- booking.id_ = min_idx + idx
-
- def move_booking(self, idx_from: int, up: bool) -> int:
- """Move Booking of old_id one step up or downwards"""
- new_id = idx_from + (-1 if up else 1)
- idx_to = new_id + (0 if up else 1) # down-move imlies jump over next
- self._move_booking(new_id, idx_to)
- self._sync()
- return new_id
-
- def rewrite_booking(self, old_id: int, new_lines: list[DatLine]) -> int:
- """Rewrite Booking with new_lines, move if changed date."""
- old_booking = self.bookings[old_id]
- booked_start, booked_end, gap_start_found = -1, 0, False
- for i, line in enumerate(new_lines):
- if booked_start < 0 and line.code.strip(): # ignore any initial
- booked_start = i # empty lines
- elif booked_start >= 0 and not line.code.strip(): # past start,
- gap_start_found = True # yet empty? gap
- if not gap_start_found: # end index is always after current line,
- booked_end += 1 # provided we're not yet in the gap
- elif line.code.strip():
- new_lines[i] = DatLine(f'; {line.code}')
- before_gap = new_lines[:booked_start]
- new_booked_lines = (new_lines[booked_start:booked_end]
- if booked_start > -1 else [])
- after_gap = old_booking.gap_lines_copied # new gap be old gap _plus_
- after_gap += new_lines[booked_end:] # any new gap lines
- if not new_booked_lines: # interpret empty posting as deletion request
- del self.bookings[old_id]
- for booking in self.bookings[old_id:]:
- booking.id_ -= 1
- leftover_gap = before_gap + after_gap
- if old_id == 0:
- self.initial_gap_lines += leftover_gap
- else:
- self.bookings[old_id - 1].gap_lines += leftover_gap
- self._sync(check_dates=False)
- return old_id if old_id < len(self.bookings) else 0
- if old_id == 0:
- self.initial_gap_lines += before_gap
- else:
- self.bookings[old_id - 1].gap_lines += before_gap
- new_date = new_booked_lines[0].code.lstrip().split(maxsplit=1)[0]
- updated = Booking(old_id, new_booked_lines, after_gap)
- self.bookings[old_id] = updated
- if new_date != old_booking.date: # if changed date, move to there
- if self.bookings[0].date > new_date:
- new_id = 0
- elif self.bookings[-1].date < new_date:
- new_id = self.bookings[-1].id_ + 1
- else:
- of_date_1st = i_booking = self.bookings[0]
- while i_booking.next:
- if of_date_1st.date != i_booking.date:
- of_date_1st = i_booking
- if i_booking.next.date > new_date:
- break
- i_booking = i_booking.next
- # ensure that, if we land in group of like-dated Bookings, we
- # land on the edge closest to our last position
- new_id = (of_date_1st.id_ if old_id < i_booking.id_
- else i_booking.id_ + 1)
- self._move_booking(old_id, new_id)
- self._sync(check_dates=False)
- return updated.id_
-
- def _add_new_booking(
- self,
- target: str,
- dat_lines_transaction: list[DatLine],
- intro_comment: str = ''
- ) -> int:
- booking = Booking(
- len(self.bookings),
- [DatLine(f'{dt_date.today().isoformat()} {target}'
- + ' ; '.join([''] + [s for s in [intro_comment] if s]))
- ] + dat_lines_transaction)
- self.bookings += [booking]
- self._sync()
- return booking.id_
-
- def add_empty_booking(self) -> int:
- """Add new Booking to end of ledger."""
- return self._add_new_booking('?', [])
-
- def copy_booking(self, copied_id: int) -> int:
- """Add copy of Booking of copied_id to_end of ledger."""
- copied = self.bookings[copied_id]
- return self._add_new_booking(copied.target,
- copied.booked_lines_copied[1:],
- copied.booked_lines[0].comment)
if __name__ == "__main__":