put_keypress: Optional[Callable] = None
assert_screen_line: Optional[Callable] = None
redraw_affected: Optional[Callable] = None
+ _lines_t: list[tuple[str, str]]
def __init__(self, path: Path, get_client: Callable, verbose: bool
) -> None:
self._get_client = get_client
self._verbose = verbose
- with path.open('r', encoding='utf8') as f:
- self._lines_t = [(str(idx + 1), line.rstrip('\n'))
- for idx, line in enumerate(f.readlines())]
+ self._idx = 0
def expand_parsed(marker: str, parse_into: Callable, **kwargs) -> None:
new_lines_t: list[tuple[str, str]] = []
**__
) -> list[tuple[str, str]]:
candidates = [
- (_SEP_2.join((index_str, _TOK_IDX_REPEAT, lt[0])), lt[1])
- for lt in fragments[args[0]]]
+ (_SEP_2.join((index_str, _TOK_IDX_REPEAT, sub_index_str)),
+ line)
+ for sub_index_str, line in fragments[args[0]]]
idx_in, idx_out = ((int(val) if val else None)
for val in args[1].split(_SEP_2))
candidates = candidates[idx_in:idx_out]
- rep_offset = int(args[2])
lowest_int: Optional[int] = None
inserts = []
for candidate in candidates:
inserts += [(candidate[0], cmd_name, cmd_args)]
line_offset = 0 - (lowest_int or 0)
return [
- (insert_t[0],
+ (new_index_str,
_SEP_0.join(
- [insert_t[1]]
- + [arg if isinstance(arg, str)
- else (_SEP_1.join(str(n + line_offset + rep_offset)
- for n in arg
- ) or ',')
- for arg in insert_t[2]]
- )
- ) for insert_t in inserts]
-
- self._lines_t = [line_t for line_t in self._lines_t
- if line_t[1].rstrip()
- and not line_t[1].startswith(_MARK_COMMENT)]
-
- fragments: dict[str, tuple[tuple[str, str], ...]] = {}
- anchor = ''
- fragment: list[tuple[str, str]] = []
- fragments_cutoff = 0
- for idx, line_t in enumerate(self._lines_t[:] + [('', '')]):
- if line_t[1].startswith(_MARK_FRAGMENT):
- if anchor:
- fragments[anchor] = tuple(fragment)
- if not line_t[1][len(_MARK_FRAGMENT):].rstrip():
- fragments_cutoff = idx + 1
+ (cmd_name, )
+ + tuple(
+ cmd_arg if isinstance(cmd_arg, str)
+ else (_SEP_1.join(str(n + line_offset + int(args[2]))
+ for n in cmd_arg)
+ or ',')
+ for cmd_arg in cmd_args)))
+ for new_index_str, cmd_name, cmd_args in inserts]
+
+ def expand_inserts() -> None:
+ fragments: dict[str, tuple[tuple[str, str], ...]] = {}
+ anchor = ''
+ fragment: list[tuple[str, str]] = []
+ fragments_cutoff = 0
+ for idx, line_t in enumerate(self._lines_t[:] + [('', '')]):
+ if line_t[1].startswith(_MARK_FRAGMENT):
+ if anchor:
+ fragments[anchor] = tuple(fragment)
+ if not line_t[1][len(_MARK_FRAGMENT):].rstrip():
+ fragments_cutoff = idx + 1
+ break
+ anchor = line_t[1].split(_SEP_0, maxsplit=1)[1]
+ fragment.clear()
+ continue
+ fragment += [line_t]
+ self._lines_t = self._lines_t[fragments_cutoff:]
+ while True:
+ snapshot_before = self._lines_t[:]
+ expand_parsed(_MARK_INSERT, insert, fragments=fragments)
+ if self._lines_t == snapshot_before:
break
- anchor = line_t[1].split(_SEP_0, maxsplit=1)[1]
- fragment.clear()
- continue
- fragment += [line_t]
- self._lines_t = self._lines_t[fragments_cutoff:]
- while True:
- snapshot_before = self._lines_t[:]
- expand_parsed(_MARK_INSERT, insert, fragments=fragments)
- if self._lines_t == snapshot_before:
- break
+ with path.open('r', encoding='utf8') as f:
+ self._lines_t = [(str(idx + 1), line.rstrip('\n'))
+ for idx, line in enumerate(f.readlines())
+ if line.rstrip()
+ and not line.startswith(_MARK_COMMENT)]
+ expand_inserts()
expand_parsed(_MARK_LOGSRVRMSG, split_server_put_and_log)
-
if self._verbose:
title_idx = 'line number(s)'
title_cmdname = 'command'
- self._max_len_idx = max(len(line_t[0])
- for line_t in self._lines_t)
- self._max_len_idx = max(self._max_len_idx, len(title_idx))
- self._max_len_cmdname = len(title_cmdname)
- self._max_len_midargs = 0
- for line_t in self._lines_t:
- cmd_name, args = self._cmdname_and_args_from(line_t[1])
- self._max_len_cmdname = max(len(cmd_name),
- self._max_len_cmdname)
- self._max_len_midargs = max(len(self._args_verbose(args[:-1])),
- self._max_len_midargs)
- print(self._str_padded_to(title_idx, self._max_len_idx),
- self._str_padded_to(title_cmdname, self._max_len_cmdname),
- 'arguments')
- self._idx = 0
+ self._colwidths: dict[str, int] = {}
+ for idx_str, line in self._lines_t:
+ self._colwidths['idx'] = max(
+ len(title_idx),
+ len(idx_str),
+ self._colwidths.get('idx', 0))
+ cmd_name, args = self._cmdname_and_args_from(line)
+ self._colwidths['cmdname'] = max(
+ len(title_cmdname),
+ len(cmd_name),
+ self._colwidths.get('cmdname', 0))
+ self._colwidths['midargs'] = max(
+ len(self._args_verbose(args[:-1])),
+ self._colwidths.get('midargs', 0))
+ self._print_verbose_columns(
+ (title_idx, title_cmdname, 'arguments'))
@staticmethod
def _args_verbose(args: tuple[str, ...]) -> str:
cmd_name, remains = line.split(_SEP_0, maxsplit=1)
return cmd_name, cls._args_for_cmd(cmd_name, remains)
- @staticmethod
- def _str_padded_to(msg: str, length: int) -> str:
- return msg + ' ' * (length - len(msg))
-
@property
def _current_line(self) -> tuple[str, str]:
return self._lines_t[self._idx]
+ def _print_verbose_columns(self, items: tuple[str, ...]) -> None:
+ to_print = []
+ for idx, colname in enumerate(('idx', 'cmdname', 'midargs'
+ )[:len(items) - 1]):
+ msg = items[idx]
+ to_print += [msg + ' ' * (self._colwidths[colname] - len(msg))]
+ to_print += [items[-1]]
+ print(*to_print)
+
def ensure_has_started(self) -> None:
'Check if still at beginning, and if so, play till at next log line.'
if self._idx == 0:
idx_info, line = self._current_line
cmd_name, args = self._cmdname_and_args_from(line)
if self._verbose:
- print(self._str_padded_to(idx_info, self._max_len_idx),
- self._str_padded_to(cmd_name, self._max_len_cmdname),
- self._str_padded_to(self._args_verbose(args[:-1]),
- self._max_len_midargs),
- self._args_verbose(args[-1:]))
+ self._print_verbose_columns((idx_info, cmd_name,
+ self._args_verbose(args[:-1]),
+ self._args_verbose(args[-1:])))
if cmd_name == _MARK_PROMPT:
assert self.put_keypress is not None
for c in args[0]: