home · contact · privacy
Divide .dat into line blocks, of which Bookings are just a sub-class. master
authorChristian Heller <c.heller@plomlompom.de>
Tue, 1 Apr 2025 07:14:21 +0000 (09:14 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Tue, 1 Apr 2025 07:14:21 +0000 (09:14 +0200)
src/ledgplom/http.py
src/ledgplom/ledger.py
src/templates/_macros.tmpl

index 232b427fac4b7359177576cf2ac86a2948785906..65a689426067c817b2ead6f71f5baa6fa3f632d2 100644 (file)
@@ -155,7 +155,6 @@ class _Handler(PlomHttpHandler):
                     break
                 if already_registered:
                     continue
-                self.server.ledger.ensure_account(path)
                 before = self.server.ledger.accounts[path].get_wealth(id_ - 1)
                 after = self.server.ledger.accounts[path].get_wealth(id_)
                 direct_target = full_path == path
@@ -180,8 +179,7 @@ class _Handler(PlomHttpHandler):
                     parent_children = node['children']
         ctx['roots'] = observed_tree
         ctx['id'] = id_
-        ctx['dat_lines'] = [dl if raw else dl.as_dict
-                            for dl in booking.booked_lines]
+        ctx['dat_lines'] = [dl if raw else dl.as_dict for dl in booking.lines]
         ctx['sink_error'] = booking.sink_error
         ctx['valid'] = self.server.ledger.bookings_valid_up_incl(id_)
         if not raw:
index cac18fb4d35cf52b4cfd53ae93ee64b321820703..dda0dec446d2e0180bfc4882619b20fac89cc07a 100644 (file)
@@ -130,12 +130,11 @@ class Account:
 class DatLine(_Dictable):
     """Line of .dat file parsed into comments and machine-readable data."""
     dictables = {'booked', 'code', 'comment', 'error', 'is_intro'}
-    prev_line_empty: bool
 
     def __init__(
             self,
-            code: str,
-            comment: str,
+            code: str = '',
+            comment: str = '',
             add_indent: bool = False
             ) -> None:
         self.comment = comment
@@ -144,14 +143,10 @@ class DatLine(_Dictable):
                                                   if s])
         self.booking: Optional['_Booking'] = None
         self.booked: Optional[BookingLine] = None
-
-    @classmethod
-    def new_empty(cls) -> Self:
-        """Create empty DatLine."""
-        return cls('', '')
+        self.prev: Optional[DatLine] = None
 
     def copy_unbooked(self) -> 'DatLine':
-        """Create DatLine of .code and .comment, but no Booking ties yet."""
+        """Create DatLine of .code and .comment, but no _Booking ties yet."""
         return DatLine(self.code, self.comment)
 
     @classmethod
@@ -203,12 +198,89 @@ class DatLine(_Dictable):
         return self.raw.replace(' ', '&nbsp;')
 
 
+class _LinesBlock:
+    """Represents either _Booking or a gap between bookings."""
+
+    def __init__(self, lines: list[DatLine]) -> None:
+        self.lines = lines
+        self._next_block: Optional[_LinesBlock] = None
+        self._prev_block: Optional[_LinesBlock] = None
+
+    @property
+    def _block_id(self) -> int:
+        count = -1
+        block_iterated: Optional[_LinesBlock] = self
+        while block_iterated:
+            block_iterated = block_iterated.prev_block
+            count += 1
+        return count
+
+    @property
+    def start_block(self) -> '_LinesBlock':
+        """For chain surrounding self, get first item."""
+        block_iterated = self
+        while True:
+            if not block_iterated.prev_block:
+                return block_iterated
+            block_iterated = block_iterated.prev_block
+
+    @property
+    def last_block(self) -> '_LinesBlock':
+        """For chain surrounding self, get last item."""
+        block_iterated = self
+        while True:
+            if not block_iterated.next_block:
+                return block_iterated
+            block_iterated = block_iterated.next_block
+
+    def _neighbor_block(
+            self,
+            new_this_block: Optional['_LinesBlock'],
+            this: str,
+            that: str,
+            ) -> None:
+        if (old_this_block := getattr(self, f'_{this}_block')):
+            setattr(old_this_block, f'_{that}_block', None)
+        if new_this_block:
+            if (new_this_that := getattr(new_this_block, f'_{that}_block')):
+                setattr(new_this_that, f'_{this}_block', None)
+            setattr(new_this_block, f'_{that}_block', self)
+        setattr(self, f'_{this}_block', new_this_block)
+
+    @property
+    def next_block(self) -> Optional['_LinesBlock']:
+        """Next block in chain."""
+        return self._next_block
+
+    @next_block.setter
+    def next_block(self, new_next_block: Optional['_LinesBlock']) -> None:
+        self._neighbor_block(new_next_block, 'next', 'prev')
+
+    @property
+    def prev_block(self) -> Optional['_LinesBlock']:
+        """Prev block in chain."""
+        return self._prev_block
+
+    @prev_block.setter
+    def prev_block(self, new_prev_block: Optional['_LinesBlock']) -> None:
+        self._neighbor_block(new_prev_block, 'prev', 'next')
+
+
 class BookingLine(_Dictable, ABC):
     """Parsed code part of a DatLine belonging to a _Booking."""
 
     def __init__(self, idx: int, errors: Optional[list[str]] = None) -> None:
         self.idx = idx
-        self.errors: list[str] = errors if errors else []
+        self._errors: list[str] = errors if errors else []
+
+    @property
+    def errors(self) -> list[str]:
+        """Return ._errors, allowing sub-classes to add entries dynamically."""
+        return self._errors
+
+    def add_error(self, msg: str) -> None:
+        """Store message to display with line's dynamic .errors."""
+        self._errors += [msg]
 
     @abstractmethod
     def to_code(self) -> str:
@@ -227,26 +299,44 @@ class IntroLine(BookingLine):
             self,
             date: str,
             target: str,
-            errors: Optional[list[str]] = None
+            errors: Optional[list[str]] = None,
+            booking: Optional['_Booking'] = None
             ) -> None:
         super().__init__(0, errors)
         self.target = target
         self.date = date
+        if not IntroLine._date_valid(self.date):
+            self._errors += [f'not properly formatted legal date: {self.date}']
+        self._booking = booking
+
+    @staticmethod
+    def _date_valid(date: str) -> bool:
         try:
-            dt_date.fromisoformat(self.date)
+            dt_date.fromisoformat(date)
         except ValueError:
-            self.errors += [f'not properly formatted legal date: {self.date}']
+            return False
+        return True
+
+    @property
+    def errors(self) -> list[str]:
+        errors = self._errors[:]
+        if (self._booking and self._booking.prev
+                and IntroLine._date_valid(self._booking.date)
+                and IntroLine._date_valid(self._booking.prev.date)
+                and self._booking.prev.date > self._booking.date):
+            errors += ['date < previous valid date']
+        return errors
 
     @classmethod
-    def from_code(cls, code: str) -> Self:
-        """Parse from ledger file line code part."""
+    def from_code(cls, code: str, booking: '_Booking') -> Self:
+        """Parse from ledger file line code part, assume booking context."""
         errors = []
         if code[0].isspace():
             errors += ['intro line indented']
         toks = code.lstrip().split(maxsplit=1)
         if len(toks) == 1:
             errors += ['illegal number of tokens']
-        return cls(toks[0], toks[1] if len(toks) > 1 else '', errors)
+        return cls(toks[0], toks[1] if len(toks) > 1 else '', errors, booking)
 
     def to_code(self) -> str:
         return f'{self.date} {self.target}'
@@ -304,31 +394,24 @@ class TransferLine(BookingLine):
         return ''
 
 
-class _Booking:
+class _Booking(_LinesBlock):
     """Represents lines of individual booking."""
-    next: Optional[Self]
-    prev: Optional[Self]
-
-    def __init__(self,
-                 id_: int,
-                 booked_lines: list[DatLine],
-                 gap_lines: Optional[list[DatLine]] = None
-                 ) -> None:
-        self.next, self.prev = None, None
-        self.id_ = id_
-        self.booked_lines = booked_lines[:]
-        self._gap_lines = gap_lines[:] if gap_lines else []
-        # parse booked_lines into Intro- and TransferLines
-        for line in booked_lines:
+
+    def __init__(self, lines: list[DatLine]) -> None:
+        super().__init__(lines)
+        self.parse()
+
+    def parse(self) -> None:
+        """Parse .lines for BookingLine structures, .account_changes"""
+        for line in self.lines:
             line.booking = self
-        self.intro_line = IntroLine.from_code(self.booked_lines[0].code)
+        self.intro_line = IntroLine.from_code(self.lines[0].code, self)
         self._transfer_lines = [
                 TransferLine.from_code_at_idx(b_line.code, i+1)
-                for i, b_line in enumerate(self.booked_lines[1:])]
-        self.booked_lines[0].booked = self.intro_line
+                for i, b_line in enumerate(self.lines[1:])]
+        self.lines[0].booked = self.intro_line
         for i, b_line in enumerate(self._transfer_lines):
-            self.booked_lines[i + 1].booked = b_line
-        # calculate .account_changes
+            self.lines[i + 1].booked = b_line
         changes = _Wealth()
         sink_account = None
         self.sink_error = ''
@@ -351,34 +434,39 @@ class _Booking:
         elif not changes.sink_empty:
             self.sink_error = 'needed sink missing'
 
-    def recalc_prev_next(self, bookings: list[Self]) -> None:
-        """Assuming .id_ to be index in bookings, link prev + next bookings."""
-        self.prev = bookings[self.id_ - 1] if self.id_ > 0 else None
-        if self.prev:
-            self.prev.next = self
-        self.next = (bookings[self.id_ + 1] if self.id_ + 1 < len(bookings)
-                     else None)
-        if self.next:
-            self.next.prev = self
-
     @property
-    def gap_lines(self) -> list[DatLine]:
-        """Return ._gap_lines or, if empty, list of one empty DatLine."""
-        return self._gap_lines if self._gap_lines else [DatLine.new_empty()]
+    def id_(self):
+        """Index in chain of bookings."""
+        return self._block_id // 2
 
-    @gap_lines.setter
-    def gap_lines(self, gap_lines=list[DatLine]) -> None:
-        self._gap_lines = gap_lines[:]
+    @staticmethod
+    def _cast(item: Optional[_LinesBlock]) -> Optional['_Booking']:
+        assert isinstance(item, (_Booking, type(None)))
+        return item
+
+    @property
+    def next(self) -> Optional['_Booking']:
+        """Next Booking, assuming it's two block steps away."""
+        return self._cast(self.next_block.next_block if self.next_block
+                          else None)
 
     @property
-    def gap_lines_copied(self) -> list[DatLine]:
-        """Return new DatLines generated from .gap_lines."""
-        return [dat_line.copy_unbooked() for dat_line in self.gap_lines]
+    def prev(self) -> Optional['_Booking']:
+        """Prev Booking, assuming it's two block steps away."""
+        return self._cast(self.prev_block.prev_block if self.prev_block
+                          else None)
+
+    def fix_position(self):
+        """Move around in bookings chain until properly positioned by .date."""
+        while self.prev and self.prev.date > self.date:
+            self.move(up=True)
+        while self.next and self.next.date < self.date:
+            self.move(up=False)
 
     @property
-    def booked_lines_copied(self) -> list[DatLine]:
+    def lines_copied(self) -> list[DatLine]:
         """Return new DatLines generated from .booked_lines."""
-        return [dat_line.copy_unbooked() for dat_line in self.booked_lines]
+        return [dat_line.copy_unbooked() for dat_line in self.lines]
 
     @property
     def target(self) -> str:
@@ -398,6 +486,32 @@ class _Booking:
             return False
         return True
 
+    def move(self, up: bool) -> None:
+        """Move self and following gap up or down in line blocks chain."""
+        old_next = self.next
+        assert old_next is not None
+        assert self.next_block is not None
+        if up:
+            old_prev = self.prev
+            assert old_prev is not None
+            assert old_prev.prev_block is not None
+            assert old_prev.next_block is not None
+            old_prev.prev_block.next_block = self
+            self.next_block.next_block = old_prev
+            old_prev.next_block.next_block = old_next
+        else:
+            old_prev_block = self.prev_block
+            self.next_block.next_block = old_next.next
+            self.prev_block = old_next.next_block
+            old_next.prev_block = old_prev_block
+
+    def drop(self) -> None:
+        """Remove self and following gap from line blocks chain."""
+        assert self.prev_block is not None
+        assert self.next_block is not None
+        self.prev_block.lines += self.next_block.lines
+        self.prev_block.next_block = self.next_block.next_block
+
     @property
     def is_questionable(self) -> bool:
         """Whether lines count any errors, or add up to a .sink_error."""
@@ -411,85 +525,82 @@ class _Booking:
 
 class Ledger:
     """Collection of DatLines, and Accounts, _Bookings derived from them."""
-    accounts: dict[str, Account]
-    bookings: list[_Booking]
-    dat_lines: list[DatLine]
-    initial_gap_lines: list[DatLine]
+    _blocks_start: Optional[_LinesBlock]
 
     def __init__(self, path_dat: Path) -> None:
         self._path_dat = path_dat
         self.load()
 
+    @property
+    def _blocks(self) -> list[_LinesBlock]:
+        blocks = []
+        block = self._blocks_start
+        while block:
+            blocks += [block]
+            block = block.next_block
+        return blocks
+
     def load(self) -> None:
         """(Re-)read ledger from file at ._path_dat."""
-        self.accounts, self.bookings, self.initial_gap_lines = {}, [], []
-        self.dat_lines: list[DatLine] = [
+        dat_lines: list[DatLine] = [
             DatLine.from_raw(line)
             for line in self._path_dat.read_text(encoding='utf8').splitlines()]
-        self.last_save_hash = self._hash_dat_lines()
-        booked: list[DatLine] = []
-        gap_lines: list[DatLine] = []
-        booking: Optional[_Booking] = None
-        for dat_line in self.dat_lines + [DatLine.new_empty()]:
-            if dat_line.code:
-                if gap_lines:
-                    if booking:
-                        booking.gap_lines = gap_lines[:]
-                    else:
-                        self.initial_gap_lines = gap_lines[:]
-                    gap_lines.clear()
-                booked += [dat_line]
-            else:
-                gap_lines += [dat_line]
-                if booked:
-                    booking = _Booking(len(self.bookings), booked[:])
-                    self.bookings += [booking]
-                    booked.clear()
-        if booking:
-            booking.gap_lines = gap_lines[:-1]
-        self._sync(recalc_datlines=False)
-
-    def _sync(self, recalc_datlines=True, check_dates=True):
-        if recalc_datlines:
-            self.dat_lines = self.initial_gap_lines[:]
-            for booking in self.bookings:
-                self.dat_lines += booking.booked_lines
-                self.dat_lines += booking.gap_lines
-        for idx, booking in enumerate(self.bookings[1:]):
-            booking.prev = self.bookings[idx]
-        for idx, booking in enumerate(self.bookings[:-1]):
-            booking.next = self.bookings[idx + 1]
-        self.bookings[-1].next = None
-        if check_dates:
-            last_date = ''
-            err_msg = 'date < previous valid date'
-            for booking in self.bookings:
-                if err_msg in booking.intro_line.errors:
-                    booking.intro_line.errors.remove(err_msg)
-                if last_date > booking.date:
-                    booking.intro_line.errors += [err_msg]
+        booked = False
+        self._blocks_start = None
+        block_lines: list[DatLine] = []
+        for dat_line in dat_lines + [DatLine()]:
+            if bool(dat_line.code) != booked:
+                block = (_Booking if booked else _LinesBlock)(block_lines[:])
+                if self._blocks_start:
+                    block.prev_block = self._blocks_start.last_block
                 else:
-                    last_date = booking.date
-        self._recalc_prev_line_empty()
-        self.accounts = {}
+                    self._blocks_start = block
+                booked = not booked
+                block_lines.clear()
+            block_lines += [dat_line]
+        if self._blocks_start:
+            _LinesBlock(block_lines[:]).prev_block =\
+                    self._blocks_start.last_block
+        self.last_save_hash = self._hash_dat_lines()
+
+    @property
+    def dat_lines(self) -> list[DatLine]:
+        """From ._blocks build list of current DatLines."""
+        lines = []
+        for block in self._blocks:
+            lines += block.lines
+        for i, line in enumerate(lines):
+            line.prev = lines[i - 1] if i > 0 else None
+        return lines
+
+    @property
+    def bookings(self) -> list[_Booking]:
+        """Build chain of bookings."""
+        return [block for block in self._blocks if isinstance(block, _Booking)]
+
+    @property
+    def accounts(self) -> dict[str, Account]:
+        """Build mapping of account names to Accounts."""
+        accounts: dict[str, Account] = {}
+
+        def ensure_accounts(full_path: str) -> None:
+            parent_path = ''
+            for path, step_name in Account.path_to_steps(full_path):
+                if path not in accounts:
+                    accounts[path] = Account(
+                        accounts[parent_path] if parent_path else None,
+                        step_name)
+                parent_path = path
+
         for dat_line in self.dat_lines:
             for acc_name, desc in dat_line.comment_instructions.items():
-                self.ensure_account(acc_name)
-                self.accounts[acc_name].desc = desc
+                ensure_accounts(acc_name)
+                accounts[acc_name].desc = desc
         for booking in self.bookings:
             for acc_name, wealth in booking.account_changes.items():
-                self.ensure_account(acc_name)
-                self.accounts[acc_name].add_wealth_diff(booking.id_, wealth)
-
-    def ensure_account(self, full_path: str) -> None:
-        """If full_path not in self.accounts, add its tree with Accounts."""
-        parent_path = ''
-        for path, step_name in Account.path_to_steps(full_path):
-            if path not in self.accounts:
-                self.accounts[path] = Account(
-                        self.accounts[parent_path] if parent_path else None,
-                        step_name)
-            parent_path = path
+                ensure_accounts(acc_name)
+                accounts[acc_name].add_wealth_diff(booking.id_, wealth)
+        return accounts
 
     def save(self) -> None:
         """Save current state to ._path_dat."""
@@ -511,38 +622,15 @@ class Ledger:
         """If .dat_lines different to those of last .load()."""
         return self._hash_dat_lines() != self.last_save_hash
 
-    def _recalc_prev_line_empty(self) -> None:
-        prev_line = None
-        for line in self.dat_lines:
-            line.prev_line_empty = False
-            if prev_line:
-                line.prev_line_empty = not (prev_line.code
-                                            + prev_line.comment_in_ledger)
-            if prev_line or line.code + line.comment_in_ledger:  # jump over
-                prev_line = line                                 # empty start
-
-    def _move_booking(self, idx_from: int, idx_to: int):
-        moving = self.bookings[idx_from]
-        if idx_from >= idx_to:                   # moving upward, deletion must
-            del self.bookings[idx_from]          # precede insertion to keep
-        self.bookings[idx_to:idx_to] = [moving]  # deletion index, downwards
-        if idx_from < idx_to:                    # the other way around keeps
-            del self.bookings[idx_from]          # insertion index
-        min_idx, max_idx = min(idx_from, idx_to), max(idx_from, idx_to)
-        for idx, booking in enumerate(self.bookings[min_idx:max_idx + 1]):
-            booking.id_ = min_idx + idx
-
     def move_booking(self, idx_from: int, up: bool) -> int:
         """Move _Booking of old_id one step up or downwards"""
-        new_id = idx_from + (-1 if up else 1)
-        idx_to = new_id + (0 if up else 1)  # down-move implies jump over next
-        self._move_booking(idx_from, idx_to)
-        self._sync()
-        return new_id
+        booking = self.bookings[idx_from]
+        booking.move(up)
+        return booking.id_
 
     def rewrite_booking(self, old_id: int, new_lines: list[DatLine]) -> int:
         """Rewrite _Booking with new_lines, move if changed date."""
-        old_booking = self.bookings[old_id]
+        booking = self.bookings[old_id]
         booked_start, booked_end, gap_start_found = -1, 0, False
         for i, line in enumerate(new_lines):
             if booked_start < 0 and line.code.strip():  # ignore any initial
@@ -553,49 +641,20 @@ class Ledger:
                 booked_end += 1      # provided we're not yet in the gap
             elif line.code.strip():
                 new_lines[i] = DatLine.from_raw(f'; {line.code}')
-        before_gap = new_lines[:booked_start]
-        new_booked_lines = (new_lines[booked_start:booked_end]
-                            if booked_start > -1 else [])
-        after_gap = old_booking.gap_lines_copied  # new gap be old gap _plus_
-        after_gap += new_lines[booked_end:]       # any new gap lines
-        if not new_booked_lines:  # interpret empty posting as deletion request
-            del self.bookings[old_id]
-            for booking in self.bookings[old_id:]:
-                booking.id_ -= 1
-            leftover_gap = before_gap + after_gap
-            if old_id == 0:
-                self.initial_gap_lines += leftover_gap
-            else:
-                self.bookings[old_id - 1].gap_lines += leftover_gap
-            self._sync(check_dates=False)
+        assert booking.prev_block is not None
+        assert booking.next_block is not None
+        booking.prev_block.lines += new_lines[:booked_start]
+        booking.next_block.lines = (new_lines[booked_end:]
+                                    + booking.next_block.lines)
+        booking.lines = (
+                new_lines[booked_start:booked_end] if booked_start > -1
+                else [])
+        if not booking.lines:  # interpret empty posting as deletion request
+            booking.drop()
             return old_id if old_id < len(self.bookings) else 0
-        if old_id == 0:
-            self.initial_gap_lines += before_gap
-        else:
-            self.bookings[old_id - 1].gap_lines += before_gap
-        new_date = new_booked_lines[0].code.lstrip().split(maxsplit=1)[0]
-        updated = _Booking(old_id, new_booked_lines, after_gap)
-        self.bookings[old_id] = updated
-        if new_date != old_booking.date:  # if changed date, move to there
-            if self.bookings[0].date > new_date:
-                new_id = 0
-            elif self.bookings[-1].date < new_date:
-                new_id = self.bookings[-1].id_ + 1
-            else:
-                of_date_1st = i_booking = self.bookings[0]
-                while i_booking.next:
-                    if of_date_1st.date != i_booking.date:
-                        of_date_1st = i_booking
-                    if i_booking.next.date > new_date:
-                        break
-                    i_booking = i_booking.next
-                # ensure that, if we land in group of like-dated _Bookings, we
-                # land on the edge closest to our last position
-                new_id = (of_date_1st.id_ if old_id < i_booking.id_
-                          else i_booking.id_ + 1)
-            self._move_booking(old_id, new_id)
-        self._sync(check_dates=False)
-        return updated.id_
+        booking.parse()
+        booking.fix_position()
+        return booking.id_
 
     def _add_new_booking(
             self,
@@ -605,10 +664,13 @@ class Ledger:
             ) -> int:
         intro = IntroLine(dt_date.today().isoformat(), target)
         booking = _Booking(
-            len(self.bookings),
-            [intro.to_dat_line(intro_comment)] + dat_lines_transaction)
-        self.bookings += [booking]
-        self._sync()
+                [intro.to_dat_line(intro_comment)] + dat_lines_transaction)
+        booking.next_block = _LinesBlock([DatLine()])
+        if self._blocks_start:
+            self._blocks_start.last_block.next_block = booking
+        else:
+            self._blocks_start = booking
+        booking.fix_position()
         return booking.id_
 
     def add_empty_booking(self) -> int:
@@ -619,5 +681,5 @@ class Ledger:
         """Add copy of _Booking of copied_id to_end of ledger."""
         copied = self.bookings[copied_id]
         return self._add_new_booking(copied.target,
-                                     copied.booked_lines_copied[1:],
-                                     copied.booked_lines[0].comment)
+                                     copied.lines_copied[1:],
+                                     copied.lines[0].comment)
index 84be211e5d2d79c0590f398c2f36bec42752894a..9e8ec3e98fc4e7f9f25d1723fd3dcc83e573b05b 100644 (file)
@@ -34,7 +34,7 @@ table.ledger tr > td:first-child { text-align: right; }
 {% for dat_line in dat_lines %}
   {% if raw or dat_line.code or dat_line.comment_in_ledger %}
 
-    {% if (not raw) and dat_line.prev_line_empty %}
+    {% if (not raw) and dat_line.prev.raw == "" %}
       <tr ><td>&nbsp;</td></tr>
     {% endif %}
     <tr class="alternating">