-"""Collect directly HTTP-related elements."""
-
+'Collect directly HTTP-related elements.'
# standard libs
from pathlib import Path
from typing import Any
from ledgplom.ledger import Account, DatBlock, DEFAULT_INDENT, Ledger
-_SERVER_PORT = 8084
-_SERVER_HOST = '127.0.0.1'
_PATH_TEMPLATES = Path('templates')
-_PREFIX_LEDGER = 'ledger_'
_PREFIX_EDIT = 'edit_'
_PREFIX_FILE = 'file_'
-_TOK_STRUCTURED = 'structured'
+_PREFIX_LEDGER = 'ledger_'
+_SERVER_HOST = '127.0.0.1'
+_SERVER_PORT = 8084
_TOK_RAW = 'raw'
-_PAGENAME_EDIT_STRUCTURED = f'{_PREFIX_EDIT}{_TOK_STRUCTURED}'
+_TOK_STRUCTURED = 'structured'
_PAGENAME_EDIT_RAW = f'{_PREFIX_EDIT}{_TOK_RAW}'
-_PAGENAME_LEDGER_STRUCTURED = f'{_PREFIX_LEDGER}{_TOK_STRUCTURED}'
+_PAGENAME_EDIT_STRUCTURED = f'{_PREFIX_EDIT}{_TOK_STRUCTURED}'
_PAGENAME_LEDGER_RAW = f'{_PREFIX_LEDGER}{_TOK_RAW}'
+_PAGENAME_LEDGER_STRUCTURED = f'{_PREFIX_LEDGER}{_TOK_STRUCTURED}'
class Server(PlomHttpServer):
- """Extends parent by loading .dat file into database for Handler."""
+ 'Extends parent by loading .dat file into database for Handler.'
def __init__(self, path_dat: Path, *args, **kwargs) -> None:
- super().__init__(_PATH_TEMPLATES, (_SERVER_HOST, _SERVER_PORT),
+ super().__init__(_PATH_TEMPLATES,
+ (_SERVER_HOST, _SERVER_PORT),
_Handler)
self.ledger = Ledger(path_dat)
class _Handler(PlomHttpHandler):
- """"Handles HTTP requests."""
+ 'Handles HTTP requests.'
mapper = PlomQueryMap
def _send_rendered(self, tmpl_name: str, ctx: dict[str, Any]) -> None:
self.send_rendered(Path(f'{tmpl_name}.tmpl'), ctx)
- def do_POST(self) -> None:
- """"Route POST requests to respective handlers."""
- # pylint: disable=invalid-name
+ def do_POST(self) -> None: # pylint: disable=invalid-name
+ 'Route POST requests to respective handlers.'
redir_target = Path(self.path)
if (file_prefixed := self.postvars.keys_prefixed(_PREFIX_FILE)):
self.post_file_action(file_prefixed[0])
- elif (self.pagename.startswith(_PREFIX_EDIT)
- and self.postvars.first('apply')):
+ elif self.pagename.startswith(_PREFIX_EDIT)\
+ and self.postvars.first('apply'):
redir_target = self.post_edit()
elif self.pagename.startswith(_PREFIX_LEDGER):
redir_target = self.post_ledger_action()
self.redirect(redir_target)
def post_file_action(self, file_prefixed: str) -> None:
- """Based on file_prefixed name, trigger .server.ledger.(load|save)."""
+ 'Based on file_prefixed name, trigger .server.ledger.(load|save).'
if file_prefixed == f'{_PREFIX_FILE}load':
self.server.ledger.load()
elif file_prefixed == f'{_PREFIX_FILE}save':
self.server.ledger.save()
def post_edit(self) -> Path:
- """Based on postvars, edit targeted Booking."""
+ 'Based on postvars, edit targeted Booking.'
old_id = int(self.path_toks[2])
new_lines = []
if self.pagename == _PAGENAME_EDIT_STRUCTURED:
new_lines += [f'{code} ; {inputs["comment"]}']
new_lines += self.postvars.first('raw_lines').splitlines()
new_id = self.server.ledger.rewrite_block(old_id, new_lines)
- return Path('/').joinpath(self.pagename).joinpath(f'{new_id}')
+ return Path('/', self.pagename, f'{new_id}')
def post_ledger_action(self) -> Path:
- """Call .server.ledger.(move|copy|add_empty_new)_booking."""
+ 'Call .server.ledger.(move|copy|add_empty_new)_booking.'
if 'remove_redundant_empty_lines' in self.postvars.as_dict:
self.server.ledger.remove_redundant_empty_lines()
return Path(self.path)
if 'add_booking' in self.postvars.as_dict:
id_ = self.server.ledger.add_empty_block()
- return Path('/').joinpath(_PAGENAME_EDIT_STRUCTURED
- ).joinpath(f'{id_}')
+ return Path('/', _PAGENAME_EDIT_STRUCTURED, f'{id_}')
keys_prefixed = self.postvars.keys_prefixed(_PREFIX_LEDGER)
action, id_str = keys_prefixed[0].split('_', maxsplit=2)[1:]
id_ = int(id_str)
if action.startswith('move'):
id_ = self.server.ledger.move_block(id_, action == 'moveup')
- return Path(self.path).joinpath(f'#block_{id_}')
+ return Path(self.path, f'#block_{id_}')
id_ = self.server.ledger.copy_block(id_)
- return Path(self.path).joinpath(f'#block_{id_}')
+ return Path(self.path, f'#block_{id_}')
- def do_GET(self) -> None:
- """"Route GET requests to respective handlers."""
- # pylint: disable=invalid-name
+ def do_GET(self) -> None: # pylint: disable=invalid-name
+ 'Route GET requests to respective handlers.'
if self.pagename == 'blocks':
self.redirect(
- Path('/').joinpath(_PAGENAME_EDIT_STRUCTURED
- ).joinpath(self.path_toks[2]))
+ Path('/', _PAGENAME_EDIT_STRUCTURED, self.path_toks[2]))
return
ctx = {'unsaved_changes': self.server.ledger.tainted,
'path': self.path}
self.get_ledger(ctx, False)
def get_balance(self, ctx) -> None:
- """Display tree of calculated Accounts over blocks up_incl+1."""
+ 'Display tree of calculated Accounts over blocks up_incl+1.'
id_ = int(self.params.first('up_incl')
or str(len(self.server.ledger.blocks) - 1))
roots = [ac for ac in self.server.ledger.accounts.values()
self._send_rendered('balance', ctx)
def get_edit(self, ctx, raw: bool) -> None:
- """Display edit form for individual Booking."""
+ 'Display edit form for individual Booking.'
def make_balance_roots(b: DatBlock) -> list[dict[str, Any]]:
acc_changes = b.booking.account_changes if b.booking else {}
self._send_rendered(_PAGENAME_EDIT_STRUCTURED, ctx)
def get_ledger(self, ctx: dict[str, Any], raw: bool) -> None:
- """Display ledger of all Bookings."""
+ 'Display ledger of all Bookings.'
ctx['blocks'] = self.server.ledger.blocks
ctx['has_redundant_empty_lines'] =\
self.server.ledger.has_redundant_empty_lines
-"""Actual ledger classes."""
-
+'Actual ledger classes.'
# standard libs
from abc import ABC, abstractmethod
from datetime import date as dt_date
class _Wealth():
- """Collects amounts mapped to currencies."""
+ 'Collects amounts mapped to currencies.'
def __init__(self, moneys: Optional[dict[str, Decimal]] = None) -> None:
self.moneys = moneys if moneys else {}
self.moneys = temp
def ensure_currencies(self, currencies: set[str]) -> None:
- """Ensure all of currencies have at least a Decimal(0) entry."""
+ 'Ensure all of currencies have at least a Decimal(0) entry.'
for currency in currencies:
if currency not in self.moneys:
self.moneys[currency] = Decimal(0)
self._sort_with_euro_up()
def purge_currencies_except(self, currencies: set[str]) -> None:
- """From .moneys remove currencies except those listed."""
+ 'From .moneys remove currencies except those listed.'
self.moneys = {curr: amt for curr, amt in self.moneys.items()
if curr in currencies}
@property
def sink_empty(self) -> bool:
- """Return if all evens out to zero."""
+ 'Return if all evens out to zero.'
return not bool(self.as_sink.moneys)
@property
def as_sink(self) -> '_Wealth':
- """Drop zero amounts, invert non-zero ones."""
+ 'Drop zero amounts, invert non-zero ones.'
sink = _Wealth()
for moneys in [_Wealth({c: a}) for c, a in self.moneys.items() if a]:
sink -= moneys
class Account:
- """Combine name, position in tree of own, and wealth of self + children."""
+ 'Combine name, position in tree of owner, and wealth of self + children.'
def __init__(self, parent: Optional['Account'], basename: str) -> None:
self._wealth_diffs: dict[int, _Wealth] = {}
self.parent.children += [self]
def _get_local_wealth(self, up_incl: int) -> _Wealth:
- """Calculate by summing all recorded wealth diffs up+incl. _Booking."""
+ 'Calculate by summing all recorded wealth diffs up+incl. _Booking.'
wealth = _Wealth()
for wealth_diff in [wd for id_, wd in self._wealth_diffs.items()
if id_ <= up_incl]:
return wealth
def get_wealth(self, up_incl: int) -> _Wealth:
- """Total of .local_wealth with that of .children up+incl. _Booking."""
+ 'Total of .local_wealth with that of .children up+incl. _Booking.'
total = _Wealth()
total += self._get_local_wealth(up_incl)
for child in self.children:
return total
def add_wealth_diff(self, booking_id: int, wealth_diff: _Wealth) -> None:
- """Add knowledge that _Booking of booking_add added wealth_diff."""
+ 'Add knowledge that _Booking of booking_add added wealth_diff.'
if booking_id in self._wealth_diffs:
self._wealth_diffs[booking_id] += wealth_diff
else:
@staticmethod
def path_to_steps(full_path: str) -> Iterator[tuple[str, str]]:
- """Split full_path into steps, for each return its path, basename."""
+ 'Split full_path into steps, for each return its path, basename.'
rebuilt_path = ''
for step_name in full_path.split(':'):
rebuilt_path += (':' if rebuilt_path else '') + step_name
class _DatLine:
- """Line of .dat file parsed into comments and machine-readable data."""
+ 'Line of .dat file parsed into comments and machine-readable data.'
to_copy = ['code', 'comment']
def __init__(
@property
def code(self) -> str:
- """Return collected code (re-generate by subclasses for writing)."""
+ 'Return collected code (re-generate by subclasses for writing).'
return self._code_read
@property
def raw(self) -> str:
- """Return as how to be written in .dat file's text content."""
+ "Return as how to be written in .dat file's text content."
comment_part = ' ; '.join([''] + [s for s in [self.comment] if s])
code_part = f'{self.code} ' if self.code else ''
return f'{code_part}{comment_part.lstrip()}'.rstrip()
def copy(self) -> Self:
- """Create new instance copying the fields named in .to_copy."""
+ 'Create new instance copying the fields named in .to_copy.'
kwargs = {fieldname: getattr(self, fieldname)
for fieldname in self.to_copy}
return self.__class__(**kwargs)
@classmethod
def from_raw(cls, line: str) -> Self:
- """Parse line into new _DatLine."""
+ 'Parse line into new _DatLine.'
halves = [t.rstrip() for t in line.split(';', maxsplit=1)]
comment = halves[1].lstrip() if len(halves) > 1 else ''
code = halves[0]
@classmethod
def from_subclass(cls, line: '_DatLine') -> Self:
- """Devolve from subclassed line into cls."""
+ 'Devolve from subclassed line into cls.'
return cls(line.code, line.comment)
@property
def comment_instructions(self) -> dict[str, str]:
- """Parse .comment into Account modification instructions."""
+ 'Parse .comment into Account modification instructions.'
instructions = {}
if self.comment.startswith(_PREFIX_DEF):
parts = [part.strip() for part
@classmethod
@abstractmethod
def from_dat(cls, dat_line: '_DatLine') -> Self:
- """Evolve from mere dat_line into subclass."""
+ 'Evolve from mere dat_line into subclass.'
class _GapLine(_DatLineSubclass):
class _BookingLine(_DatLineSubclass):
- """Parsed _DatLine belonging to a _Booking."""
+ 'Parsed _DatLine belonging to a _Booking.'
dictables = {'comment', 'errors'}
def __init__(self, comment, errors: Optional[list[str]]) -> None:
@property
def as_dict(self) -> dict[str, Any]:
- """Return as JSON-ready dict attributes listed in .dictables."""
+ 'Return as JSON-ready dict attributes listed in .dictables.'
def to_dictable(value):
if isinstance(value, (str, int)):
return value
@property
def errors(self) -> list[str]:
- """Return collected errors (subclasses may add dynamic ones)."""
+ 'Return collected errors (subclasses may add dynamic ones).'
return self._errors[:]
class _IntroLine(_BookingLine):
- """First line of a _Booking, expected to carry date etc."""
+ 'First line of a _Booking, expected to carry date etc.'
to_copy = ['date', 'target', 'comment']
dictables = {'date', 'target'} | _BookingLine.dictables
class _TransferLine(_BookingLine):
- """Non-first _Booking line, expected to carry value movement."""
+ 'Non-first _Booking line, expected to carry value movement.'
to_copy = ['account', 'amount', 'currency', 'comment']
dictables = {'amount', 'account', 'currency'} | _BookingLine.dictables
@property
def amount(self) -> Optional[Decimal] | str:
- """Decimal if amount known, None if not, str if un-decimable."""
+ 'Decimal if amount known, None if not, str if un-decimable.'
if not self._amount_str:
return None
try:
@property
def amount_short(self) -> str:
- """If decimal .amount, print ellipsized if too long, else directly."""
+ 'If decimal .amount, print ellipsized if too long, else directly.'
if isinstance(self.amount, Decimal):
exp = self.amount.as_tuple().exponent
assert isinstance(exp, int)
@property
def lines(self) -> list[TypeDatLine]:
- """Return collected lines."""
+ 'Return collected lines.'
return self._lines
def copy(self) -> Self:
- """Re-create via .lines' copy()."""
+ "Re-create via .lines' copy()."
return self.__class__([line.copy() for line in self.lines])
class _Gap(_LinesBlock[_GapLine]):
def add(self, lines: list[_GapLine]) -> None:
- """Grow self by lines."""
+ 'Grow self by lines.'
self._lines += lines
@property
def redundant_empty_lines(self):
- """If self has more empty lines than necessary."""
+ 'If self has more empty lines than necessary.'
redundancies = []
prev_line = None
idx_last_non_empty = -1
return redundancies
def remove_redundant_empty_lines(self):
- """From self remove redundant empty lines."""
+ 'From self remove redundant empty lines.'
for line in self.redundant_empty_lines:
self._lines.remove(line)
@property
def intro_line(self) -> _IntroLine:
- """Return collected _IntroLine."""
+ 'Return collected _IntroLine.'
assert isinstance(self._lines[0], _IntroLine)
return self._lines[0]
@property
def transfer_lines(self) -> list[_TransferLine]:
- """Return collected _TransferLines.""" # NB: currently no easy way to
+ 'Return collected _TransferLines.' # NB: currently no easy way to
return self._lines[1:] # type: ignore # assert mypy list be of type
@property
@property
def date(self) -> str:
- """Chronological position as per .booking, or empty string."""
+ 'Chronological position as per .booking, or empty string.'
return self.intro_line.date
@property
def target(self) -> str:
- """Main other party for transaction."""
+ 'Main other party for transaction.'
return self.intro_line.target
def copy_to_current_date(self) -> Self:
- """Make copy of same lines but now as date."""
+ 'Make copy of same lines but now as date.'
copy = self.copy()
copy.intro_line.date = dt_date.today().isoformat()
return copy
class DatBlock:
- """Unit of lines with optional .booking, and possibly empty .gap."""
+ 'Unit of lines with optional .booking, and possibly empty .gap.'
def __init__(
self,
@property
def id_(self) -> int:
- """Return index in chain."""
+ 'Return index in chain.'
count = -1
block_iterated: Optional[DatBlock] = self
while block_iterated:
@property
def date_error(self) -> str:
- """If not empty, notify about .date not matching position in chain."""
+ 'If not empty, notify about .date not matching position in chain.'
if self.prev and self.prev.date > self.date:
return 'date < previous date'
return ''
@property
def lines(self) -> list[_DatLine]:
- """Return .lines of .booking and .gap as list[_DatLine]."""
+ 'Return .lines of .booking and .gap as list[_DatLine].'
lines = (self.booking.lines if self.booking else []) + self.gap.lines
if self.booking and not self.gap.lines:
lines += [_GapLine()]
@property
def next(self) -> Optional['DatBlock']:
- """Successor in chain."""
+ 'Successor in chain.'
return self._next
@next.setter
@property
def prev(self) -> Optional['DatBlock']:
- """Predecessor in chain."""
+ 'Predecessor in chain.'
return self._prev
@prev.setter
@property
def date(self) -> str:
- """Chronological position as per .booking, or empty string."""
+ 'Chronological position as per .booking, or empty string.'
return self.booking.date if self.booking else ''
def can_move(self, up: bool) -> bool:
- """Whether move up/down in chain possible, respecting .date."""
+ 'Whether move up/down in chain possible, respecting .date.'
if up and ((not self.prev) or self.prev.date != self.date):
return False
if (not up) and ((not self.next) or self.next.date != self.date):
return True
def move(self, up: bool) -> None:
- """Move up/down in chain."""
+ 'Move up/down in chain.'
old_prev = self.prev
old_next = self.next
if up:
old_next.prev = old_prev
def drop(self) -> None:
- """Remove from chain."""
+ 'Remove from chain.'
if self.prev:
self.prev.next = self.next
elif self.next:
self.next.prev = self.prev
def fix_position(self):
- """Move around in chain until properly positioned by .date."""
+ 'Move around in chain until properly positioned by .date.'
while self.prev and self.prev.date > self.date:
self.move(up=True)
while self.next and self.next.date < self.date:
self.move(up=False)
def replace_with(self, new_block: Self) -> None:
- """Have new_block take own position."""
+ 'Have new_block take own position.'
if self.prev:
self.prev.next = new_block
if self.next:
new_block.fix_position()
def copy_to_current_date(self) -> 'DatBlock':
- """Make copy of same lines but now as date, position accordingly."""
+ 'Make copy of same lines but now as date, position accordingly.'
copy = DatBlock(
self.booking.copy_to_current_date() if self.booking else None,
self.gap.copy())
class Ledger:
- """Collection of DatBlocks, _Bookings and Accounts derived from them."""
+ 'Collection of DatBlocks, _Bookings and Accounts derived from them.'
_blocks_start: Optional[DatBlock]
def __init__(self, path_dat: Path) -> None:
self.load()
def load(self) -> None:
- """(Re-)read ledger from file at ._path_dat."""
+ 'Re-)read ledger from file at ._path_dat.'
dat_lines: list[_DatLine] = [
_DatLine.from_raw(line)
for line in self._path_dat.read_text(encoding='utf8').splitlines()]
@property
def blocks(self) -> list[DatBlock]:
- """Return blocks chain as list."""
+ 'Return blocks chain as list.'
blocks = []
block = self._blocks_start
while block:
@property
def _dat_lines(self) -> list[_DatLine]:
- """From .blocks build list of current _DatLines."""
+ 'From .blocks build list of current _DatLines.'
lines = []
for block in self.blocks:
lines += block.lines
@property
def accounts(self) -> dict[str, Account]:
- """Build mapping of account names to Accounts."""
+ 'Build mapping of account names to Accounts.'
accounts: dict[str, Account] = {}
def ensure_accounts(full_path: str) -> None:
return accounts
def save(self) -> None:
- """Save current state to ._path_dat."""
+ 'Save current state to ._path_dat.'
text = '\n'.join([line.raw for line in self._dat_lines])
self._path_dat.write_text(text, encoding='utf8')
self.load()
return hash(tuple(dl.raw for dl in self._dat_lines))
def blocks_valid_up_incl(self, block_id: int) -> bool:
- """Whether nothing questionable about blocks until block_id."""
+ 'Whether nothing questionable about blocks until block_id.'
for block in self.blocks[:block_id]:
if block.booking:
if block.booking.sink_error:
@property
def has_redundant_empty_lines(self) -> bool:
- """If any gaps have redunant empty lines."""
+ 'If any gaps have redunant empty lines.'
return bool([b for b in self.blocks if b.gap.redundant_empty_lines])
def remove_redundant_empty_lines(self) -> None:
- """From all .blocks remove redundant empty lines."""
+ 'From all .blocks remove redundant empty lines.'
for gap in [b.gap for b in self.blocks if b.gap.redundant_empty_lines]:
gap.remove_redundant_empty_lines()
@property
def tainted(self) -> bool:
- """If ._dat_lines different to those of last .load()."""
+ 'If ._dat_lines different to those of last .load().'
return self._hash_dat_lines() != self.last_save_hash
def move_block(self, idx_from: int, up: bool) -> int:
- """Move DatBlock of idx_from step up or downwards"""
+ 'Move DatBlock of idx_from step up or downwards.'
block = self.blocks[idx_from]
block.move(up)
return block.id_
def rewrite_block(self, old_id: int, new_lines: list[str]) -> int:
- """Rewrite with new_lines, move if changed date."""
+ 'Rewrite with new_lines, move if changed date.'
lines_gap_pre_booking: list[_GapLine] = []
lines_booking: list[_BookingLine] = []
lines_gap_post_booking: list[_GapLine] = []
return new_block.id_
def add_empty_block(self) -> int:
- """Add new DatBlock of empty _Booking to end of ledger."""
+ 'Add new DatBlock of empty _Booking to end of ledger.'
new_block = DatBlock(
_Booking([_IntroLine(dt_date.today().isoformat(), '?')]))
self.blocks[-1].next = new_block
return new_block.id_
def copy_block(self, id_: int) -> int:
- """Add copy DatBlock of id_ but with current date."""
+ 'Add copy DatBlock of id_ but with current date.'
copy = self.blocks[id_].copy_to_current_date()
return copy.id_