class DatLine(_Dictable):
"""Line of .dat file parsed into comments and machine-readable data."""
dictables = {'booked', 'code', 'comment', 'error', 'is_intro'}
- prev_line_empty: bool
def __init__(
self,
- code: str,
- comment: str,
+ code: str = '',
+ comment: str = '',
add_indent: bool = False
) -> None:
self.comment = comment
if s])
self.booking: Optional['_Booking'] = None
self.booked: Optional[BookingLine] = None
-
- @classmethod
- def new_empty(cls) -> Self:
- """Create empty DatLine."""
- return cls('', '')
+ self.prev: Optional[DatLine] = None
def copy_unbooked(self) -> 'DatLine':
- """Create DatLine of .code and .comment, but no Booking ties yet."""
+ """Create DatLine of .code and .comment, but no _Booking ties yet."""
return DatLine(self.code, self.comment)
@classmethod
return self.raw.replace(' ', ' ')
+class _LinesBlock:
+ """Represents either _Booking or a gap between bookings."""
+
+ def __init__(self, lines: list[DatLine]) -> None:
+ self.lines = lines
+ self._next_block: Optional[_LinesBlock] = None
+ self._prev_block: Optional[_LinesBlock] = None
+
+ @property
+ def _block_id(self) -> int:
+ count = -1
+ block_iterated: Optional[_LinesBlock] = self
+ while block_iterated:
+ block_iterated = block_iterated.prev_block
+ count += 1
+ return count
+
+ @property
+ def start_block(self) -> '_LinesBlock':
+ """For chain surrounding self, get first item."""
+ block_iterated = self
+ while True:
+ if not block_iterated.prev_block:
+ return block_iterated
+ block_iterated = block_iterated.prev_block
+
+ @property
+ def last_block(self) -> '_LinesBlock':
+ """For chain surrounding self, get last item."""
+ block_iterated = self
+ while True:
+ if not block_iterated.next_block:
+ return block_iterated
+ block_iterated = block_iterated.next_block
+
+ def _neighbor_block(
+ self,
+ new_this_block: Optional['_LinesBlock'],
+ this: str,
+ that: str,
+ ) -> None:
+ if (old_this_block := getattr(self, f'_{this}_block')):
+ setattr(old_this_block, f'_{that}_block', None)
+ if new_this_block:
+ if (new_this_that := getattr(new_this_block, f'_{that}_block')):
+ setattr(new_this_that, f'_{this}_block', None)
+ setattr(new_this_block, f'_{that}_block', self)
+ setattr(self, f'_{this}_block', new_this_block)
+
+ @property
+ def next_block(self) -> Optional['_LinesBlock']:
+ """Next block in chain."""
+ return self._next_block
+
+ @next_block.setter
+ def next_block(self, new_next_block: Optional['_LinesBlock']) -> None:
+ self._neighbor_block(new_next_block, 'next', 'prev')
+
+ @property
+ def prev_block(self) -> Optional['_LinesBlock']:
+ """Prev block in chain."""
+ return self._prev_block
+
+ @prev_block.setter
+ def prev_block(self, new_prev_block: Optional['_LinesBlock']) -> None:
+ self._neighbor_block(new_prev_block, 'prev', 'next')
+
+
class BookingLine(_Dictable, ABC):
"""Parsed code part of a DatLine belonging to a _Booking."""
def __init__(self, idx: int, errors: Optional[list[str]] = None) -> None:
self.idx = idx
- self.errors: list[str] = errors if errors else []
+ self._errors: list[str] = errors if errors else []
+
+ @property
+ def errors(self) -> list[str]:
+ """Return ._errors, allowing sub-classes to add entries dynamically."""
+ return self._errors
+
+ def add_error(self, msg: str) -> None:
+ """Store message to display with line's dynamic .errors."""
+ self._errors += [msg]
@abstractmethod
def to_code(self) -> str:
self,
date: str,
target: str,
- errors: Optional[list[str]] = None
+ errors: Optional[list[str]] = None,
+ booking: Optional['_Booking'] = None
) -> None:
super().__init__(0, errors)
self.target = target
self.date = date
+ if not IntroLine._date_valid(self.date):
+ self._errors += [f'not properly formatted legal date: {self.date}']
+ self._booking = booking
+
+ @staticmethod
+ def _date_valid(date: str) -> bool:
try:
- dt_date.fromisoformat(self.date)
+ dt_date.fromisoformat(date)
except ValueError:
- self.errors += [f'not properly formatted legal date: {self.date}']
+ return False
+ return True
+
+ @property
+ def errors(self) -> list[str]:
+ errors = self._errors[:]
+ if (self._booking and self._booking.prev
+ and IntroLine._date_valid(self._booking.date)
+ and IntroLine._date_valid(self._booking.prev.date)
+ and self._booking.prev.date > self._booking.date):
+ errors += ['date < previous valid date']
+ return errors
@classmethod
- def from_code(cls, code: str) -> Self:
- """Parse from ledger file line code part."""
+ def from_code(cls, code: str, booking: '_Booking') -> Self:
+ """Parse from ledger file line code part, assume booking context."""
errors = []
if code[0].isspace():
errors += ['intro line indented']
toks = code.lstrip().split(maxsplit=1)
if len(toks) == 1:
errors += ['illegal number of tokens']
- return cls(toks[0], toks[1] if len(toks) > 1 else '', errors)
+ return cls(toks[0], toks[1] if len(toks) > 1 else '', errors, booking)
def to_code(self) -> str:
return f'{self.date} {self.target}'
return ''
-class _Booking:
+class _Booking(_LinesBlock):
"""Represents lines of individual booking."""
- next: Optional[Self]
- prev: Optional[Self]
-
- def __init__(self,
- id_: int,
- booked_lines: list[DatLine],
- gap_lines: Optional[list[DatLine]] = None
- ) -> None:
- self.next, self.prev = None, None
- self.id_ = id_
- self.booked_lines = booked_lines[:]
- self._gap_lines = gap_lines[:] if gap_lines else []
- # parse booked_lines into Intro- and TransferLines
- for line in booked_lines:
+
+ def __init__(self, lines: list[DatLine]) -> None:
+ super().__init__(lines)
+ self.parse()
+
+ def parse(self) -> None:
+ """Parse .lines for BookingLine structures, .account_changes"""
+ for line in self.lines:
line.booking = self
- self.intro_line = IntroLine.from_code(self.booked_lines[0].code)
+ self.intro_line = IntroLine.from_code(self.lines[0].code, self)
self._transfer_lines = [
TransferLine.from_code_at_idx(b_line.code, i+1)
- for i, b_line in enumerate(self.booked_lines[1:])]
- self.booked_lines[0].booked = self.intro_line
+ for i, b_line in enumerate(self.lines[1:])]
+ self.lines[0].booked = self.intro_line
for i, b_line in enumerate(self._transfer_lines):
- self.booked_lines[i + 1].booked = b_line
- # calculate .account_changes
+ self.lines[i + 1].booked = b_line
changes = _Wealth()
sink_account = None
self.sink_error = ''
elif not changes.sink_empty:
self.sink_error = 'needed sink missing'
- def recalc_prev_next(self, bookings: list[Self]) -> None:
- """Assuming .id_ to be index in bookings, link prev + next bookings."""
- self.prev = bookings[self.id_ - 1] if self.id_ > 0 else None
- if self.prev:
- self.prev.next = self
- self.next = (bookings[self.id_ + 1] if self.id_ + 1 < len(bookings)
- else None)
- if self.next:
- self.next.prev = self
-
@property
- def gap_lines(self) -> list[DatLine]:
- """Return ._gap_lines or, if empty, list of one empty DatLine."""
- return self._gap_lines if self._gap_lines else [DatLine.new_empty()]
+ def id_(self):
+ """Index in chain of bookings."""
+ return self._block_id // 2
- @gap_lines.setter
- def gap_lines(self, gap_lines=list[DatLine]) -> None:
- self._gap_lines = gap_lines[:]
+ @staticmethod
+ def _cast(item: Optional[_LinesBlock]) -> Optional['_Booking']:
+ assert isinstance(item, (_Booking, type(None)))
+ return item
+
+ @property
+ def next(self) -> Optional['_Booking']:
+ """Next Booking, assuming it's two block steps away."""
+ return self._cast(self.next_block.next_block if self.next_block
+ else None)
@property
- def gap_lines_copied(self) -> list[DatLine]:
- """Return new DatLines generated from .gap_lines."""
- return [dat_line.copy_unbooked() for dat_line in self.gap_lines]
+ def prev(self) -> Optional['_Booking']:
+ """Prev Booking, assuming it's two block steps away."""
+ return self._cast(self.prev_block.prev_block if self.prev_block
+ else None)
+
+ def fix_position(self):
+ """Move around in bookings chain until properly positioned by .date."""
+ while self.prev and self.prev.date > self.date:
+ self.move(up=True)
+ while self.next and self.next.date < self.date:
+ self.move(up=False)
@property
- def booked_lines_copied(self) -> list[DatLine]:
+ def lines_copied(self) -> list[DatLine]:
"""Return new DatLines generated from .booked_lines."""
- return [dat_line.copy_unbooked() for dat_line in self.booked_lines]
+ return [dat_line.copy_unbooked() for dat_line in self.lines]
@property
def target(self) -> str:
return False
return True
+ def move(self, up: bool) -> None:
+ """Move self and following gap up or down in line blocks chain."""
+ old_next = self.next
+ assert old_next is not None
+ assert self.next_block is not None
+ if up:
+ old_prev = self.prev
+ assert old_prev is not None
+ assert old_prev.prev_block is not None
+ assert old_prev.next_block is not None
+ old_prev.prev_block.next_block = self
+ self.next_block.next_block = old_prev
+ old_prev.next_block.next_block = old_next
+ else:
+ old_prev_block = self.prev_block
+ self.next_block.next_block = old_next.next
+ self.prev_block = old_next.next_block
+ old_next.prev_block = old_prev_block
+
+ def drop(self) -> None:
+ """Remove self and following gap from line blocks chain."""
+ assert self.prev_block is not None
+ assert self.next_block is not None
+ self.prev_block.lines += self.next_block.lines
+ self.prev_block.next_block = self.next_block.next_block
+
@property
def is_questionable(self) -> bool:
"""Whether lines count any errors, or add up to a .sink_error."""
class Ledger:
"""Collection of DatLines, and Accounts, _Bookings derived from them."""
- accounts: dict[str, Account]
- bookings: list[_Booking]
- dat_lines: list[DatLine]
- initial_gap_lines: list[DatLine]
+ _blocks_start: Optional[_LinesBlock]
def __init__(self, path_dat: Path) -> None:
self._path_dat = path_dat
self.load()
+ @property
+ def _blocks(self) -> list[_LinesBlock]:
+ blocks = []
+ block = self._blocks_start
+ while block:
+ blocks += [block]
+ block = block.next_block
+ return blocks
+
def load(self) -> None:
"""(Re-)read ledger from file at ._path_dat."""
- self.accounts, self.bookings, self.initial_gap_lines = {}, [], []
- self.dat_lines: list[DatLine] = [
+ dat_lines: list[DatLine] = [
DatLine.from_raw(line)
for line in self._path_dat.read_text(encoding='utf8').splitlines()]
- self.last_save_hash = self._hash_dat_lines()
- booked: list[DatLine] = []
- gap_lines: list[DatLine] = []
- booking: Optional[_Booking] = None
- for dat_line in self.dat_lines + [DatLine.new_empty()]:
- if dat_line.code:
- if gap_lines:
- if booking:
- booking.gap_lines = gap_lines[:]
- else:
- self.initial_gap_lines = gap_lines[:]
- gap_lines.clear()
- booked += [dat_line]
- else:
- gap_lines += [dat_line]
- if booked:
- booking = _Booking(len(self.bookings), booked[:])
- self.bookings += [booking]
- booked.clear()
- if booking:
- booking.gap_lines = gap_lines[:-1]
- self._sync(recalc_datlines=False)
-
- def _sync(self, recalc_datlines=True, check_dates=True):
- if recalc_datlines:
- self.dat_lines = self.initial_gap_lines[:]
- for booking in self.bookings:
- self.dat_lines += booking.booked_lines
- self.dat_lines += booking.gap_lines
- for idx, booking in enumerate(self.bookings[1:]):
- booking.prev = self.bookings[idx]
- for idx, booking in enumerate(self.bookings[:-1]):
- booking.next = self.bookings[idx + 1]
- self.bookings[-1].next = None
- if check_dates:
- last_date = ''
- err_msg = 'date < previous valid date'
- for booking in self.bookings:
- if err_msg in booking.intro_line.errors:
- booking.intro_line.errors.remove(err_msg)
- if last_date > booking.date:
- booking.intro_line.errors += [err_msg]
+ booked = False
+ self._blocks_start = None
+ block_lines: list[DatLine] = []
+ for dat_line in dat_lines + [DatLine()]:
+ if bool(dat_line.code) != booked:
+ block = (_Booking if booked else _LinesBlock)(block_lines[:])
+ if self._blocks_start:
+ block.prev_block = self._blocks_start.last_block
else:
- last_date = booking.date
- self._recalc_prev_line_empty()
- self.accounts = {}
+ self._blocks_start = block
+ booked = not booked
+ block_lines.clear()
+ block_lines += [dat_line]
+ if self._blocks_start:
+ _LinesBlock(block_lines[:]).prev_block =\
+ self._blocks_start.last_block
+ self.last_save_hash = self._hash_dat_lines()
+
+ @property
+ def dat_lines(self) -> list[DatLine]:
+ """From ._blocks build list of current DatLines."""
+ lines = []
+ for block in self._blocks:
+ lines += block.lines
+ for i, line in enumerate(lines):
+ line.prev = lines[i - 1] if i > 0 else None
+ return lines
+
+ @property
+ def bookings(self) -> list[_Booking]:
+ """Build chain of bookings."""
+ return [block for block in self._blocks if isinstance(block, _Booking)]
+
+ @property
+ def accounts(self) -> dict[str, Account]:
+ """Build mapping of account names to Accounts."""
+ accounts: dict[str, Account] = {}
+
+ def ensure_accounts(full_path: str) -> None:
+ parent_path = ''
+ for path, step_name in Account.path_to_steps(full_path):
+ if path not in accounts:
+ accounts[path] = Account(
+ accounts[parent_path] if parent_path else None,
+ step_name)
+ parent_path = path
+
for dat_line in self.dat_lines:
for acc_name, desc in dat_line.comment_instructions.items():
- self.ensure_account(acc_name)
- self.accounts[acc_name].desc = desc
+ ensure_accounts(acc_name)
+ accounts[acc_name].desc = desc
for booking in self.bookings:
for acc_name, wealth in booking.account_changes.items():
- self.ensure_account(acc_name)
- self.accounts[acc_name].add_wealth_diff(booking.id_, wealth)
-
- def ensure_account(self, full_path: str) -> None:
- """If full_path not in self.accounts, add its tree with Accounts."""
- parent_path = ''
- for path, step_name in Account.path_to_steps(full_path):
- if path not in self.accounts:
- self.accounts[path] = Account(
- self.accounts[parent_path] if parent_path else None,
- step_name)
- parent_path = path
+ ensure_accounts(acc_name)
+ accounts[acc_name].add_wealth_diff(booking.id_, wealth)
+ return accounts
def save(self) -> None:
"""Save current state to ._path_dat."""
"""If .dat_lines different to those of last .load()."""
return self._hash_dat_lines() != self.last_save_hash
- def _recalc_prev_line_empty(self) -> None:
- prev_line = None
- for line in self.dat_lines:
- line.prev_line_empty = False
- if prev_line:
- line.prev_line_empty = not (prev_line.code
- + prev_line.comment_in_ledger)
- if prev_line or line.code + line.comment_in_ledger: # jump over
- prev_line = line # empty start
-
- def _move_booking(self, idx_from: int, idx_to: int):
- moving = self.bookings[idx_from]
- if idx_from >= idx_to: # moving upward, deletion must
- del self.bookings[idx_from] # precede insertion to keep
- self.bookings[idx_to:idx_to] = [moving] # deletion index, downwards
- if idx_from < idx_to: # the other way around keeps
- del self.bookings[idx_from] # insertion index
- min_idx, max_idx = min(idx_from, idx_to), max(idx_from, idx_to)
- for idx, booking in enumerate(self.bookings[min_idx:max_idx + 1]):
- booking.id_ = min_idx + idx
-
def move_booking(self, idx_from: int, up: bool) -> int:
"""Move _Booking of old_id one step up or downwards"""
- new_id = idx_from + (-1 if up else 1)
- idx_to = new_id + (0 if up else 1) # down-move implies jump over next
- self._move_booking(idx_from, idx_to)
- self._sync()
- return new_id
+ booking = self.bookings[idx_from]
+ booking.move(up)
+ return booking.id_
def rewrite_booking(self, old_id: int, new_lines: list[DatLine]) -> int:
"""Rewrite _Booking with new_lines, move if changed date."""
- old_booking = self.bookings[old_id]
+ booking = self.bookings[old_id]
booked_start, booked_end, gap_start_found = -1, 0, False
for i, line in enumerate(new_lines):
if booked_start < 0 and line.code.strip(): # ignore any initial
booked_end += 1 # provided we're not yet in the gap
elif line.code.strip():
new_lines[i] = DatLine.from_raw(f'; {line.code}')
- before_gap = new_lines[:booked_start]
- new_booked_lines = (new_lines[booked_start:booked_end]
- if booked_start > -1 else [])
- after_gap = old_booking.gap_lines_copied # new gap be old gap _plus_
- after_gap += new_lines[booked_end:] # any new gap lines
- if not new_booked_lines: # interpret empty posting as deletion request
- del self.bookings[old_id]
- for booking in self.bookings[old_id:]:
- booking.id_ -= 1
- leftover_gap = before_gap + after_gap
- if old_id == 0:
- self.initial_gap_lines += leftover_gap
- else:
- self.bookings[old_id - 1].gap_lines += leftover_gap
- self._sync(check_dates=False)
+ assert booking.prev_block is not None
+ assert booking.next_block is not None
+ booking.prev_block.lines += new_lines[:booked_start]
+ booking.next_block.lines = (new_lines[booked_end:]
+ + booking.next_block.lines)
+ booking.lines = (
+ new_lines[booked_start:booked_end] if booked_start > -1
+ else [])
+ if not booking.lines: # interpret empty posting as deletion request
+ booking.drop()
return old_id if old_id < len(self.bookings) else 0
- if old_id == 0:
- self.initial_gap_lines += before_gap
- else:
- self.bookings[old_id - 1].gap_lines += before_gap
- new_date = new_booked_lines[0].code.lstrip().split(maxsplit=1)[0]
- updated = _Booking(old_id, new_booked_lines, after_gap)
- self.bookings[old_id] = updated
- if new_date != old_booking.date: # if changed date, move to there
- if self.bookings[0].date > new_date:
- new_id = 0
- elif self.bookings[-1].date < new_date:
- new_id = self.bookings[-1].id_ + 1
- else:
- of_date_1st = i_booking = self.bookings[0]
- while i_booking.next:
- if of_date_1st.date != i_booking.date:
- of_date_1st = i_booking
- if i_booking.next.date > new_date:
- break
- i_booking = i_booking.next
- # ensure that, if we land in group of like-dated _Bookings, we
- # land on the edge closest to our last position
- new_id = (of_date_1st.id_ if old_id < i_booking.id_
- else i_booking.id_ + 1)
- self._move_booking(old_id, new_id)
- self._sync(check_dates=False)
- return updated.id_
+ booking.parse()
+ booking.fix_position()
+ return booking.id_
def _add_new_booking(
self,
) -> int:
intro = IntroLine(dt_date.today().isoformat(), target)
booking = _Booking(
- len(self.bookings),
- [intro.to_dat_line(intro_comment)] + dat_lines_transaction)
- self.bookings += [booking]
- self._sync()
+ [intro.to_dat_line(intro_comment)] + dat_lines_transaction)
+ booking.next_block = _LinesBlock([DatLine()])
+ if self._blocks_start:
+ self._blocks_start.last_block.next_block = booking
+ else:
+ self._blocks_start = booking
+ booking.fix_position()
return booking.id_
def add_empty_booking(self) -> int:
"""Add copy of _Booking of copied_id to_end of ledger."""
copied = self.bookings[copied_id]
return self._add_new_booking(copied.target,
- copied.booked_lines_copied[1:],
- copied.booked_lines[0].comment)
+ copied.lines_copied[1:],
+ copied.lines[0].comment)