class _Playbook:
put_keypress: Optional[Callable] = None
- def __init__(self, path: Path, get_client: Callable) -> None:
+ def __init__(self, path: Path, get_client: Callable, verbose: bool
+ ) -> None:
self._get_client = get_client
+ self._verbose = verbose
with path.open('r', encoding='utf8') as f:
- self._lines = [line.rstrip('\n') for line in f.readlines()]
+ self._lines_t = [(str(idx + 1), line.rstrip('\n'))
+ for idx, line in enumerate(f.readlines())]
def expand_parsed(marker, parse_into, **kwargs) -> bool:
- inserts: list[str] = []
- for idx, line in enumerate(self._lines):
- if not line.startswith(marker):
+ inserts: list[tuple[str, str]] = []
+ for idx, line_t in enumerate(self._lines_t):
+ if not line_t[1].startswith(marker):
continue
- inserts = parse_into(line, **kwargs)
- self._lines =\
- self._lines[:idx] + inserts + self._lines[idx + 1:]
+ inserts = parse_into(line_t, **kwargs)
+ self._lines_t = (self._lines_t[:idx]
+ + inserts
+ + self._lines_t[idx + 1:])
break
return bool(inserts)
- def split_server_put_and_log(line: str, **_) -> list[str]:
- context, msg = self._split_by_context_separator(line)
+ def split_server_put_and_log(line_t: tuple[str, str], **_
+ ) -> list[tuple[str, str]]:
+ index_str = line_t[0]
+ context, msg = self._split_by_context_separator(line_t[1])
fmt, msg_no_fmt = msg.split(LOG_FMT_SEP, maxsplit=1)
for c in fmt[:-1]:
assert c in LOG_FMT_ATTRS
assert fmt[-1] == LOG_PREFIX_IN
c_id, win_ids = context[1:].split(_CHAR_ID_TYPE_SEP, maxsplit=1)
- return [f'{_CHAR_SERVER_MSG}{c_id}{_CHAR_CONTEXT_SEP}{msg_no_fmt}',
- win_ids + _CHAR_CONTEXT_SEP + msg]
-
- def repeat(line: str, anchors: dict[str, int], **__) -> list[str]:
- range_data = line[len(_TOK_REPEAT) + 1:].split(
+ return [
+ (index_str + ':s:0',
+ f'{_CHAR_SERVER_MSG}{c_id}{_CHAR_CONTEXT_SEP}{msg_no_fmt}'),
+ (index_str + ':s:1',
+ win_ids + _CHAR_CONTEXT_SEP + msg)]
+
+ def repeat(line_t: tuple[str, str], anchors: dict[str, int], **__
+ ) -> list[tuple[str, str]]:
+ index_str = line_t[0]
+ range_data = line_t[1][len(_TOK_REPEAT) + 1:].split(
_CHAR_RANGE_DATA_SEP, maxsplit=2)
start_key, end_key = range_data[:2]
start = anchors[start_key] + 1
end = anchors[end_key]
- inserts = self._lines[int(start):int(end)]
- if len(range_data) == 2:
- return inserts
- for jdx, insert in enumerate(inserts):
- if (not insert) or insert.startswith(_CHAR_ANCHOR):
- continue
- _, msg = self._split_by_context_separator(insert)
- inserts[jdx] = _CHAR_CONTEXT_SEP.join([range_data[2]] + [msg])
+ inserts: list[tuple[str, str]] = []
+ for inserted_t in self._lines_t[int(start):int(end)]:
+ insert = inserted_t[1]
+ if len(range_data) != 2\
+ and insert and not insert.startswith(_CHAR_ANCHOR):
+ insert = _CHAR_CONTEXT_SEP.join(
+ [range_data[2]]
+ + [self._split_by_context_separator(insert)[1]])
+ inserts += [(index_str + ':r:' + inserted_t[0], insert)]
return inserts
while expand_parsed(_CHAR_ID_TYPE_SEP, split_server_put_and_log):
pass
while True:
anchors: dict[str, int] = {}
- for idx, line in enumerate(self._lines):
- if line[:1] == _CHAR_ANCHOR:
- anchors[line[2:]] = idx
+ for idx, line_t in enumerate(self._lines_t):
+ if line_t[1][:1] == _CHAR_ANCHOR:
+ anchors[line_t[1][2:]] = idx
if not expand_parsed(_TOK_REPEAT, repeat, anchors=anchors):
break
- self._lines = [ln for ln in self._lines
- if ln and ln[:1] not in {_CHAR_ANCHOR, _CHAR_COMMENT}]
+ self._lines_t = [
+ line_t for line_t in self._lines_t
+ if line_t[1]
+ and line_t[1][:1] not in {_CHAR_ANCHOR, _CHAR_COMMENT}]
+ if self._verbose:
+ self._max_len_idx = max(len(line_t[0]) for line_t in self._lines_t)
+ self._max_len_ctx = max(
+ len(self._split_by_context_separator(line_t[1])[0])
+ for line_t in self._lines_t)
+ title_idx = 'line number(s)'
+ title_ctx = 'context'
+ self._max_len_idx = max(self._max_len_idx, len(title_idx))
+ self._max_len_ctx = max(self._max_len_ctx, len(title_ctx))
+ print(self._str_padded_to(title_idx, self._max_len_idx),
+ self._str_padded_to(title_ctx, self._max_len_ctx),
+ 'content')
self._idx = 0
+ @staticmethod
+ def _str_padded_to(msg: str, length: int) -> str:
+ return msg + ' ' * (length - len(msg))
+
@property
- def _current_line(self) -> str:
- return self._lines[self._idx]
+ def _current_line(self) -> tuple[str, str]:
+ return self._lines_t[self._idx]
def ensure_has_started(self) -> None:
'Check if still at beginning, and if so, play till at next log line.'
def next_log(self) -> tuple[int, tuple[int, ...], str]:
'Return index, win IDs, and context of next expected log line.'
- context, msg = self._split_by_context_separator(self._current_line)
+ context, msg = self._split_by_context_separator(self._current_line[1])
if _CHAR_RANGE in context:
_, context = context.split(_CHAR_RANGE)
expected_win_ids = tuple(
def _play_till_log(self) -> None:
while True:
- context, msg = self._split_by_context_separator(self._current_line)
+ idx_info, line = self._current_line
+ context, msg = self._split_by_context_separator(line)
+ if self._verbose:
+ print(self._str_padded_to(idx_info, self._max_len_idx),
+ self._str_padded_to(context, self._max_len_ctx),
+ msg)
if context == _CHAR_PROMPT:
assert self.put_keypress is not None
for c in msg:
_path_config: Optional[Path] = None
_path_logs = None
_path_test: Path
+ _verbose: bool
def __init__(self, **kwargs) -> None:
path_config = PATH_TESTS.joinpath(self._path_test.stem + '.toml')
assert path_config.is_file()
self._path_config = path_config
self._clients = []
- self._playbook = _Playbook(path=self._path_test,
+ self._playbook = _Playbook(path=self._path_test, verbose=self._verbose,
get_client=lambda idx: self._clients[idx])
super().__init__(**kwargs)
assert isinstance(self._term, TestTerminal)
self._window_idx = idx
@classmethod
- def on_file(cls, path_test: Path):
+ def on_file(cls, path_test: Path, verbose: bool):
'Return cls with ._path_test set.'
class _Bound(TestingClientTui):
_path_test = path_test
+ _verbose = verbose
return _Bound
def _new_client(self, conn_setup: IrcConnSetup, channels: set[str]):