From: Christian Heller Date: Tue, 1 Apr 2025 07:14:21 +0000 (+0200) Subject: Divide .dat into line blocks, of which Bookings are just a sub-class. X-Git-Url: https://plomlompom.com/repos/%7B%7B%20web_path%20%7D%7D/%7B%7Bdb.prefix%7D%7D/tasks?a=commitdiff_plain;h=8cb0000d94ad25a1cdcbf8b1c44bbd7d4abac842;p=ledgplom Divide .dat into line blocks, of which Bookings are just a sub-class. --- diff --git a/src/ledgplom/http.py b/src/ledgplom/http.py index 232b427..65a6894 100644 --- a/src/ledgplom/http.py +++ b/src/ledgplom/http.py @@ -155,7 +155,6 @@ class _Handler(PlomHttpHandler): break if already_registered: continue - self.server.ledger.ensure_account(path) before = self.server.ledger.accounts[path].get_wealth(id_ - 1) after = self.server.ledger.accounts[path].get_wealth(id_) direct_target = full_path == path @@ -180,8 +179,7 @@ class _Handler(PlomHttpHandler): parent_children = node['children'] ctx['roots'] = observed_tree ctx['id'] = id_ - ctx['dat_lines'] = [dl if raw else dl.as_dict - for dl in booking.booked_lines] + ctx['dat_lines'] = [dl if raw else dl.as_dict for dl in booking.lines] ctx['sink_error'] = booking.sink_error ctx['valid'] = self.server.ledger.bookings_valid_up_incl(id_) if not raw: diff --git a/src/ledgplom/ledger.py b/src/ledgplom/ledger.py index cac18fb..dda0dec 100644 --- a/src/ledgplom/ledger.py +++ b/src/ledgplom/ledger.py @@ -130,12 +130,11 @@ class Account: class DatLine(_Dictable): """Line of .dat file parsed into comments and machine-readable data.""" dictables = {'booked', 'code', 'comment', 'error', 'is_intro'} - prev_line_empty: bool def __init__( self, - code: str, - comment: str, + code: str = '', + comment: str = '', add_indent: bool = False ) -> None: self.comment = comment @@ -144,14 +143,10 @@ class DatLine(_Dictable): if s]) self.booking: Optional['_Booking'] = None self.booked: Optional[BookingLine] = None - - @classmethod - def new_empty(cls) -> Self: - """Create empty DatLine.""" - return cls('', '') + self.prev: Optional[DatLine] = None def copy_unbooked(self) -> 'DatLine': - """Create DatLine of .code and .comment, but no Booking ties yet.""" + """Create DatLine of .code and .comment, but no _Booking ties yet.""" return DatLine(self.code, self.comment) @classmethod @@ -203,12 +198,89 @@ class DatLine(_Dictable): return self.raw.replace(' ', ' ') +class _LinesBlock: + """Represents either _Booking or a gap between bookings.""" + + def __init__(self, lines: list[DatLine]) -> None: + self.lines = lines + self._next_block: Optional[_LinesBlock] = None + self._prev_block: Optional[_LinesBlock] = None + + @property + def _block_id(self) -> int: + count = -1 + block_iterated: Optional[_LinesBlock] = self + while block_iterated: + block_iterated = block_iterated.prev_block + count += 1 + return count + + @property + def start_block(self) -> '_LinesBlock': + """For chain surrounding self, get first item.""" + block_iterated = self + while True: + if not block_iterated.prev_block: + return block_iterated + block_iterated = block_iterated.prev_block + + @property + def last_block(self) -> '_LinesBlock': + """For chain surrounding self, get last item.""" + block_iterated = self + while True: + if not block_iterated.next_block: + return block_iterated + block_iterated = block_iterated.next_block + + def _neighbor_block( + self, + new_this_block: Optional['_LinesBlock'], + this: str, + that: str, + ) -> None: + if (old_this_block := getattr(self, f'_{this}_block')): + setattr(old_this_block, f'_{that}_block', None) + if new_this_block: + if (new_this_that := getattr(new_this_block, f'_{that}_block')): + setattr(new_this_that, f'_{this}_block', None) + setattr(new_this_block, f'_{that}_block', self) + setattr(self, f'_{this}_block', new_this_block) + + @property + def next_block(self) -> Optional['_LinesBlock']: + """Next block in chain.""" + return self._next_block + + @next_block.setter + def next_block(self, new_next_block: Optional['_LinesBlock']) -> None: + self._neighbor_block(new_next_block, 'next', 'prev') + + @property + def prev_block(self) -> Optional['_LinesBlock']: + """Prev block in chain.""" + return self._prev_block + + @prev_block.setter + def prev_block(self, new_prev_block: Optional['_LinesBlock']) -> None: + self._neighbor_block(new_prev_block, 'prev', 'next') + + class BookingLine(_Dictable, ABC): """Parsed code part of a DatLine belonging to a _Booking.""" def __init__(self, idx: int, errors: Optional[list[str]] = None) -> None: self.idx = idx - self.errors: list[str] = errors if errors else [] + self._errors: list[str] = errors if errors else [] + + @property + def errors(self) -> list[str]: + """Return ._errors, allowing sub-classes to add entries dynamically.""" + return self._errors + + def add_error(self, msg: str) -> None: + """Store message to display with line's dynamic .errors.""" + self._errors += [msg] @abstractmethod def to_code(self) -> str: @@ -227,26 +299,44 @@ class IntroLine(BookingLine): self, date: str, target: str, - errors: Optional[list[str]] = None + errors: Optional[list[str]] = None, + booking: Optional['_Booking'] = None ) -> None: super().__init__(0, errors) self.target = target self.date = date + if not IntroLine._date_valid(self.date): + self._errors += [f'not properly formatted legal date: {self.date}'] + self._booking = booking + + @staticmethod + def _date_valid(date: str) -> bool: try: - dt_date.fromisoformat(self.date) + dt_date.fromisoformat(date) except ValueError: - self.errors += [f'not properly formatted legal date: {self.date}'] + return False + return True + + @property + def errors(self) -> list[str]: + errors = self._errors[:] + if (self._booking and self._booking.prev + and IntroLine._date_valid(self._booking.date) + and IntroLine._date_valid(self._booking.prev.date) + and self._booking.prev.date > self._booking.date): + errors += ['date < previous valid date'] + return errors @classmethod - def from_code(cls, code: str) -> Self: - """Parse from ledger file line code part.""" + def from_code(cls, code: str, booking: '_Booking') -> Self: + """Parse from ledger file line code part, assume booking context.""" errors = [] if code[0].isspace(): errors += ['intro line indented'] toks = code.lstrip().split(maxsplit=1) if len(toks) == 1: errors += ['illegal number of tokens'] - return cls(toks[0], toks[1] if len(toks) > 1 else '', errors) + return cls(toks[0], toks[1] if len(toks) > 1 else '', errors, booking) def to_code(self) -> str: return f'{self.date} {self.target}' @@ -304,31 +394,24 @@ class TransferLine(BookingLine): return '' -class _Booking: +class _Booking(_LinesBlock): """Represents lines of individual booking.""" - next: Optional[Self] - prev: Optional[Self] - - def __init__(self, - id_: int, - booked_lines: list[DatLine], - gap_lines: Optional[list[DatLine]] = None - ) -> None: - self.next, self.prev = None, None - self.id_ = id_ - self.booked_lines = booked_lines[:] - self._gap_lines = gap_lines[:] if gap_lines else [] - # parse booked_lines into Intro- and TransferLines - for line in booked_lines: + + def __init__(self, lines: list[DatLine]) -> None: + super().__init__(lines) + self.parse() + + def parse(self) -> None: + """Parse .lines for BookingLine structures, .account_changes""" + for line in self.lines: line.booking = self - self.intro_line = IntroLine.from_code(self.booked_lines[0].code) + self.intro_line = IntroLine.from_code(self.lines[0].code, self) self._transfer_lines = [ TransferLine.from_code_at_idx(b_line.code, i+1) - for i, b_line in enumerate(self.booked_lines[1:])] - self.booked_lines[0].booked = self.intro_line + for i, b_line in enumerate(self.lines[1:])] + self.lines[0].booked = self.intro_line for i, b_line in enumerate(self._transfer_lines): - self.booked_lines[i + 1].booked = b_line - # calculate .account_changes + self.lines[i + 1].booked = b_line changes = _Wealth() sink_account = None self.sink_error = '' @@ -351,34 +434,39 @@ class _Booking: elif not changes.sink_empty: self.sink_error = 'needed sink missing' - def recalc_prev_next(self, bookings: list[Self]) -> None: - """Assuming .id_ to be index in bookings, link prev + next bookings.""" - self.prev = bookings[self.id_ - 1] if self.id_ > 0 else None - if self.prev: - self.prev.next = self - self.next = (bookings[self.id_ + 1] if self.id_ + 1 < len(bookings) - else None) - if self.next: - self.next.prev = self - @property - def gap_lines(self) -> list[DatLine]: - """Return ._gap_lines or, if empty, list of one empty DatLine.""" - return self._gap_lines if self._gap_lines else [DatLine.new_empty()] + def id_(self): + """Index in chain of bookings.""" + return self._block_id // 2 - @gap_lines.setter - def gap_lines(self, gap_lines=list[DatLine]) -> None: - self._gap_lines = gap_lines[:] + @staticmethod + def _cast(item: Optional[_LinesBlock]) -> Optional['_Booking']: + assert isinstance(item, (_Booking, type(None))) + return item + + @property + def next(self) -> Optional['_Booking']: + """Next Booking, assuming it's two block steps away.""" + return self._cast(self.next_block.next_block if self.next_block + else None) @property - def gap_lines_copied(self) -> list[DatLine]: - """Return new DatLines generated from .gap_lines.""" - return [dat_line.copy_unbooked() for dat_line in self.gap_lines] + def prev(self) -> Optional['_Booking']: + """Prev Booking, assuming it's two block steps away.""" + return self._cast(self.prev_block.prev_block if self.prev_block + else None) + + def fix_position(self): + """Move around in bookings chain until properly positioned by .date.""" + while self.prev and self.prev.date > self.date: + self.move(up=True) + while self.next and self.next.date < self.date: + self.move(up=False) @property - def booked_lines_copied(self) -> list[DatLine]: + def lines_copied(self) -> list[DatLine]: """Return new DatLines generated from .booked_lines.""" - return [dat_line.copy_unbooked() for dat_line in self.booked_lines] + return [dat_line.copy_unbooked() for dat_line in self.lines] @property def target(self) -> str: @@ -398,6 +486,32 @@ class _Booking: return False return True + def move(self, up: bool) -> None: + """Move self and following gap up or down in line blocks chain.""" + old_next = self.next + assert old_next is not None + assert self.next_block is not None + if up: + old_prev = self.prev + assert old_prev is not None + assert old_prev.prev_block is not None + assert old_prev.next_block is not None + old_prev.prev_block.next_block = self + self.next_block.next_block = old_prev + old_prev.next_block.next_block = old_next + else: + old_prev_block = self.prev_block + self.next_block.next_block = old_next.next + self.prev_block = old_next.next_block + old_next.prev_block = old_prev_block + + def drop(self) -> None: + """Remove self and following gap from line blocks chain.""" + assert self.prev_block is not None + assert self.next_block is not None + self.prev_block.lines += self.next_block.lines + self.prev_block.next_block = self.next_block.next_block + @property def is_questionable(self) -> bool: """Whether lines count any errors, or add up to a .sink_error.""" @@ -411,85 +525,82 @@ class _Booking: class Ledger: """Collection of DatLines, and Accounts, _Bookings derived from them.""" - accounts: dict[str, Account] - bookings: list[_Booking] - dat_lines: list[DatLine] - initial_gap_lines: list[DatLine] + _blocks_start: Optional[_LinesBlock] def __init__(self, path_dat: Path) -> None: self._path_dat = path_dat self.load() + @property + def _blocks(self) -> list[_LinesBlock]: + blocks = [] + block = self._blocks_start + while block: + blocks += [block] + block = block.next_block + return blocks + def load(self) -> None: """(Re-)read ledger from file at ._path_dat.""" - self.accounts, self.bookings, self.initial_gap_lines = {}, [], [] - self.dat_lines: list[DatLine] = [ + dat_lines: list[DatLine] = [ DatLine.from_raw(line) for line in self._path_dat.read_text(encoding='utf8').splitlines()] - self.last_save_hash = self._hash_dat_lines() - booked: list[DatLine] = [] - gap_lines: list[DatLine] = [] - booking: Optional[_Booking] = None - for dat_line in self.dat_lines + [DatLine.new_empty()]: - if dat_line.code: - if gap_lines: - if booking: - booking.gap_lines = gap_lines[:] - else: - self.initial_gap_lines = gap_lines[:] - gap_lines.clear() - booked += [dat_line] - else: - gap_lines += [dat_line] - if booked: - booking = _Booking(len(self.bookings), booked[:]) - self.bookings += [booking] - booked.clear() - if booking: - booking.gap_lines = gap_lines[:-1] - self._sync(recalc_datlines=False) - - def _sync(self, recalc_datlines=True, check_dates=True): - if recalc_datlines: - self.dat_lines = self.initial_gap_lines[:] - for booking in self.bookings: - self.dat_lines += booking.booked_lines - self.dat_lines += booking.gap_lines - for idx, booking in enumerate(self.bookings[1:]): - booking.prev = self.bookings[idx] - for idx, booking in enumerate(self.bookings[:-1]): - booking.next = self.bookings[idx + 1] - self.bookings[-1].next = None - if check_dates: - last_date = '' - err_msg = 'date < previous valid date' - for booking in self.bookings: - if err_msg in booking.intro_line.errors: - booking.intro_line.errors.remove(err_msg) - if last_date > booking.date: - booking.intro_line.errors += [err_msg] + booked = False + self._blocks_start = None + block_lines: list[DatLine] = [] + for dat_line in dat_lines + [DatLine()]: + if bool(dat_line.code) != booked: + block = (_Booking if booked else _LinesBlock)(block_lines[:]) + if self._blocks_start: + block.prev_block = self._blocks_start.last_block else: - last_date = booking.date - self._recalc_prev_line_empty() - self.accounts = {} + self._blocks_start = block + booked = not booked + block_lines.clear() + block_lines += [dat_line] + if self._blocks_start: + _LinesBlock(block_lines[:]).prev_block =\ + self._blocks_start.last_block + self.last_save_hash = self._hash_dat_lines() + + @property + def dat_lines(self) -> list[DatLine]: + """From ._blocks build list of current DatLines.""" + lines = [] + for block in self._blocks: + lines += block.lines + for i, line in enumerate(lines): + line.prev = lines[i - 1] if i > 0 else None + return lines + + @property + def bookings(self) -> list[_Booking]: + """Build chain of bookings.""" + return [block for block in self._blocks if isinstance(block, _Booking)] + + @property + def accounts(self) -> dict[str, Account]: + """Build mapping of account names to Accounts.""" + accounts: dict[str, Account] = {} + + def ensure_accounts(full_path: str) -> None: + parent_path = '' + for path, step_name in Account.path_to_steps(full_path): + if path not in accounts: + accounts[path] = Account( + accounts[parent_path] if parent_path else None, + step_name) + parent_path = path + for dat_line in self.dat_lines: for acc_name, desc in dat_line.comment_instructions.items(): - self.ensure_account(acc_name) - self.accounts[acc_name].desc = desc + ensure_accounts(acc_name) + accounts[acc_name].desc = desc for booking in self.bookings: for acc_name, wealth in booking.account_changes.items(): - self.ensure_account(acc_name) - self.accounts[acc_name].add_wealth_diff(booking.id_, wealth) - - def ensure_account(self, full_path: str) -> None: - """If full_path not in self.accounts, add its tree with Accounts.""" - parent_path = '' - for path, step_name in Account.path_to_steps(full_path): - if path not in self.accounts: - self.accounts[path] = Account( - self.accounts[parent_path] if parent_path else None, - step_name) - parent_path = path + ensure_accounts(acc_name) + accounts[acc_name].add_wealth_diff(booking.id_, wealth) + return accounts def save(self) -> None: """Save current state to ._path_dat.""" @@ -511,38 +622,15 @@ class Ledger: """If .dat_lines different to those of last .load().""" return self._hash_dat_lines() != self.last_save_hash - def _recalc_prev_line_empty(self) -> None: - prev_line = None - for line in self.dat_lines: - line.prev_line_empty = False - if prev_line: - line.prev_line_empty = not (prev_line.code - + prev_line.comment_in_ledger) - if prev_line or line.code + line.comment_in_ledger: # jump over - prev_line = line # empty start - - def _move_booking(self, idx_from: int, idx_to: int): - moving = self.bookings[idx_from] - if idx_from >= idx_to: # moving upward, deletion must - del self.bookings[idx_from] # precede insertion to keep - self.bookings[idx_to:idx_to] = [moving] # deletion index, downwards - if idx_from < idx_to: # the other way around keeps - del self.bookings[idx_from] # insertion index - min_idx, max_idx = min(idx_from, idx_to), max(idx_from, idx_to) - for idx, booking in enumerate(self.bookings[min_idx:max_idx + 1]): - booking.id_ = min_idx + idx - def move_booking(self, idx_from: int, up: bool) -> int: """Move _Booking of old_id one step up or downwards""" - new_id = idx_from + (-1 if up else 1) - idx_to = new_id + (0 if up else 1) # down-move implies jump over next - self._move_booking(idx_from, idx_to) - self._sync() - return new_id + booking = self.bookings[idx_from] + booking.move(up) + return booking.id_ def rewrite_booking(self, old_id: int, new_lines: list[DatLine]) -> int: """Rewrite _Booking with new_lines, move if changed date.""" - old_booking = self.bookings[old_id] + booking = self.bookings[old_id] booked_start, booked_end, gap_start_found = -1, 0, False for i, line in enumerate(new_lines): if booked_start < 0 and line.code.strip(): # ignore any initial @@ -553,49 +641,20 @@ class Ledger: booked_end += 1 # provided we're not yet in the gap elif line.code.strip(): new_lines[i] = DatLine.from_raw(f'; {line.code}') - before_gap = new_lines[:booked_start] - new_booked_lines = (new_lines[booked_start:booked_end] - if booked_start > -1 else []) - after_gap = old_booking.gap_lines_copied # new gap be old gap _plus_ - after_gap += new_lines[booked_end:] # any new gap lines - if not new_booked_lines: # interpret empty posting as deletion request - del self.bookings[old_id] - for booking in self.bookings[old_id:]: - booking.id_ -= 1 - leftover_gap = before_gap + after_gap - if old_id == 0: - self.initial_gap_lines += leftover_gap - else: - self.bookings[old_id - 1].gap_lines += leftover_gap - self._sync(check_dates=False) + assert booking.prev_block is not None + assert booking.next_block is not None + booking.prev_block.lines += new_lines[:booked_start] + booking.next_block.lines = (new_lines[booked_end:] + + booking.next_block.lines) + booking.lines = ( + new_lines[booked_start:booked_end] if booked_start > -1 + else []) + if not booking.lines: # interpret empty posting as deletion request + booking.drop() return old_id if old_id < len(self.bookings) else 0 - if old_id == 0: - self.initial_gap_lines += before_gap - else: - self.bookings[old_id - 1].gap_lines += before_gap - new_date = new_booked_lines[0].code.lstrip().split(maxsplit=1)[0] - updated = _Booking(old_id, new_booked_lines, after_gap) - self.bookings[old_id] = updated - if new_date != old_booking.date: # if changed date, move to there - if self.bookings[0].date > new_date: - new_id = 0 - elif self.bookings[-1].date < new_date: - new_id = self.bookings[-1].id_ + 1 - else: - of_date_1st = i_booking = self.bookings[0] - while i_booking.next: - if of_date_1st.date != i_booking.date: - of_date_1st = i_booking - if i_booking.next.date > new_date: - break - i_booking = i_booking.next - # ensure that, if we land in group of like-dated _Bookings, we - # land on the edge closest to our last position - new_id = (of_date_1st.id_ if old_id < i_booking.id_ - else i_booking.id_ + 1) - self._move_booking(old_id, new_id) - self._sync(check_dates=False) - return updated.id_ + booking.parse() + booking.fix_position() + return booking.id_ def _add_new_booking( self, @@ -605,10 +664,13 @@ class Ledger: ) -> int: intro = IntroLine(dt_date.today().isoformat(), target) booking = _Booking( - len(self.bookings), - [intro.to_dat_line(intro_comment)] + dat_lines_transaction) - self.bookings += [booking] - self._sync() + [intro.to_dat_line(intro_comment)] + dat_lines_transaction) + booking.next_block = _LinesBlock([DatLine()]) + if self._blocks_start: + self._blocks_start.last_block.next_block = booking + else: + self._blocks_start = booking + booking.fix_position() return booking.id_ def add_empty_booking(self) -> int: @@ -619,5 +681,5 @@ class Ledger: """Add copy of _Booking of copied_id to_end of ledger.""" copied = self.bookings[copied_id] return self._add_new_booking(copied.target, - copied.booked_lines_copied[1:], - copied.booked_lines[0].comment) + copied.lines_copied[1:], + copied.lines[0].comment) diff --git a/src/templates/_macros.tmpl b/src/templates/_macros.tmpl index 84be211..9e8ec3e 100644 --- a/src/templates/_macros.tmpl +++ b/src/templates/_macros.tmpl @@ -34,7 +34,7 @@ table.ledger tr > td:first-child { text-align: right; } {% for dat_line in dat_lines %} {% if raw or dat_line.code or dat_line.comment_in_ledger %} - {% if (not raw) and dat_line.prev_line_empty %} + {% if (not raw) and dat_line.prev.raw == "" %}   {% endif %}