home · contact · privacy
Add processing log display to single-file testing. master
authorChristian Heller <c.heller@plomlompom.de>
Wed, 8 Oct 2025 19:27:58 +0000 (21:27 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Wed, 8 Oct 2025 19:27:58 +0000 (21:27 +0200)
src/ircplom/testing.py
src/run.py

index fd63e6dd8f9f724f5e33f25b23275b2220a71679..253a1ebf235c38c3555e40b61e9145c84af6d632 100644 (file)
@@ -120,64 +120,94 @@ _TOK_WAIT = 'wait'
 class _Playbook:
     put_keypress: Optional[Callable] = None
 
 class _Playbook:
     put_keypress: Optional[Callable] = None
 
-    def __init__(self, path: Path, get_client: Callable) -> None:
+    def __init__(self, path: Path, get_client: Callable, verbose: bool
+                 ) -> None:
         self._get_client = get_client
         self._get_client = get_client
+        self._verbose = verbose
         with path.open('r', encoding='utf8') as f:
         with path.open('r', encoding='utf8') as f:
-            self._lines = [line.rstrip('\n') for line in f.readlines()]
+            self._lines_t = [(str(idx + 1), line.rstrip('\n'))
+                             for idx, line in enumerate(f.readlines())]
 
         def expand_parsed(marker, parse_into, **kwargs) -> bool:
 
         def expand_parsed(marker, parse_into, **kwargs) -> bool:
-            inserts: list[str] = []
-            for idx, line in enumerate(self._lines):
-                if not line.startswith(marker):
+            inserts: list[tuple[str, str]] = []
+            for idx, line_t in enumerate(self._lines_t):
+                if not line_t[1].startswith(marker):
                     continue
                     continue
-                inserts = parse_into(line, **kwargs)
-                self._lines =\
-                    self._lines[:idx] + inserts + self._lines[idx + 1:]
+                inserts = parse_into(line_t, **kwargs)
+                self._lines_t = (self._lines_t[:idx]
+                                 + inserts
+                                 + self._lines_t[idx + 1:])
                 break
             return bool(inserts)
 
                 break
             return bool(inserts)
 
-        def split_server_put_and_log(line: str, **_) -> list[str]:
-            context, msg = self._split_by_context_separator(line)
+        def split_server_put_and_log(line_t: tuple[str, str], **_
+                                     ) -> list[tuple[str, str]]:
+            index_str = line_t[0]
+            context, msg = self._split_by_context_separator(line_t[1])
             fmt, msg_no_fmt = msg.split(LOG_FMT_SEP, maxsplit=1)
             for c in fmt[:-1]:
                 assert c in LOG_FMT_ATTRS
             assert fmt[-1] == LOG_PREFIX_IN
             c_id, win_ids = context[1:].split(_CHAR_ID_TYPE_SEP, maxsplit=1)
             fmt, msg_no_fmt = msg.split(LOG_FMT_SEP, maxsplit=1)
             for c in fmt[:-1]:
                 assert c in LOG_FMT_ATTRS
             assert fmt[-1] == LOG_PREFIX_IN
             c_id, win_ids = context[1:].split(_CHAR_ID_TYPE_SEP, maxsplit=1)
-            return [f'{_CHAR_SERVER_MSG}{c_id}{_CHAR_CONTEXT_SEP}{msg_no_fmt}',
-                    win_ids + _CHAR_CONTEXT_SEP + msg]
-
-        def repeat(line: str, anchors: dict[str, int], **__) -> list[str]:
-            range_data = line[len(_TOK_REPEAT) + 1:].split(
+            return [
+                (index_str + ':s:0',
+                 f'{_CHAR_SERVER_MSG}{c_id}{_CHAR_CONTEXT_SEP}{msg_no_fmt}'),
+                (index_str + ':s:1',
+                 win_ids + _CHAR_CONTEXT_SEP + msg)]
+
+        def repeat(line_t: tuple[str, str], anchors: dict[str, int], **__
+                   ) -> list[tuple[str, str]]:
+            index_str = line_t[0]
+            range_data = line_t[1][len(_TOK_REPEAT) + 1:].split(
                     _CHAR_RANGE_DATA_SEP, maxsplit=2)
             start_key, end_key = range_data[:2]
             start = anchors[start_key] + 1
             end = anchors[end_key]
                     _CHAR_RANGE_DATA_SEP, maxsplit=2)
             start_key, end_key = range_data[:2]
             start = anchors[start_key] + 1
             end = anchors[end_key]
-            inserts = self._lines[int(start):int(end)]
-            if len(range_data) == 2:
-                return inserts
-            for jdx, insert in enumerate(inserts):
-                if (not insert) or insert.startswith(_CHAR_ANCHOR):
-                    continue
-                _, msg = self._split_by_context_separator(insert)
-                inserts[jdx] = _CHAR_CONTEXT_SEP.join([range_data[2]] + [msg])
+            inserts: list[tuple[str, str]] = []
+            for inserted_t in self._lines_t[int(start):int(end)]:
+                insert = inserted_t[1]
+                if len(range_data) != 2\
+                        and insert and not insert.startswith(_CHAR_ANCHOR):
+                    insert = _CHAR_CONTEXT_SEP.join(
+                        [range_data[2]]
+                        + [self._split_by_context_separator(insert)[1]])
+                inserts += [(index_str + ':r:' + inserted_t[0], insert)]
             return inserts
 
         while expand_parsed(_CHAR_ID_TYPE_SEP, split_server_put_and_log):
             pass
         while True:
             anchors: dict[str, int] = {}
             return inserts
 
         while expand_parsed(_CHAR_ID_TYPE_SEP, split_server_put_and_log):
             pass
         while True:
             anchors: dict[str, int] = {}
-            for idx, line in enumerate(self._lines):
-                if line[:1] == _CHAR_ANCHOR:
-                    anchors[line[2:]] = idx
+            for idx, line_t in enumerate(self._lines_t):
+                if line_t[1][:1] == _CHAR_ANCHOR:
+                    anchors[line_t[1][2:]] = idx
             if not expand_parsed(_TOK_REPEAT, repeat, anchors=anchors):
                 break
             if not expand_parsed(_TOK_REPEAT, repeat, anchors=anchors):
                 break
-        self._lines = [ln for ln in self._lines
-                       if ln and ln[:1] not in {_CHAR_ANCHOR, _CHAR_COMMENT}]
+        self._lines_t = [
+                line_t for line_t in self._lines_t
+                if line_t[1]
+                and line_t[1][:1] not in {_CHAR_ANCHOR, _CHAR_COMMENT}]
+        if self._verbose:
+            self._max_len_idx = max(len(line_t[0]) for line_t in self._lines_t)
+            self._max_len_ctx = max(
+                    len(self._split_by_context_separator(line_t[1])[0])
+                    for line_t in self._lines_t)
+            title_idx = 'line number(s)'
+            title_ctx = 'context'
+            self._max_len_idx = max(self._max_len_idx, len(title_idx))
+            self._max_len_ctx = max(self._max_len_ctx, len(title_ctx))
+            print(self._str_padded_to(title_idx, self._max_len_idx),
+                  self._str_padded_to(title_ctx, self._max_len_ctx),
+                  'content')
         self._idx = 0
 
         self._idx = 0
 
+    @staticmethod
+    def _str_padded_to(msg: str, length: int) -> str:
+        return msg + ' ' * (length - len(msg))
+
     @property
     @property
-    def _current_line(self) -> str:
-        return self._lines[self._idx]
+    def _current_line(self) -> tuple[str, str]:
+        return self._lines_t[self._idx]
 
     def ensure_has_started(self) -> None:
         'Check if still at beginning, and if so, play till at next log line.'
 
     def ensure_has_started(self) -> None:
         'Check if still at beginning, and if so, play till at next log line.'
@@ -186,7 +216,7 @@ class _Playbook:
 
     def next_log(self) -> tuple[int, tuple[int, ...], str]:
         'Return index, win IDs, and context of next expected log line.'
 
     def next_log(self) -> tuple[int, tuple[int, ...], str]:
         'Return index, win IDs, and context of next expected log line.'
-        context, msg = self._split_by_context_separator(self._current_line)
+        context, msg = self._split_by_context_separator(self._current_line[1])
         if _CHAR_RANGE in context:
             _, context = context.split(_CHAR_RANGE)
         expected_win_ids = tuple(
         if _CHAR_RANGE in context:
             _, context = context.split(_CHAR_RANGE)
         expected_win_ids = tuple(
@@ -198,7 +228,12 @@ class _Playbook:
 
     def _play_till_log(self) -> None:
         while True:
 
     def _play_till_log(self) -> None:
         while True:
-            context, msg = self._split_by_context_separator(self._current_line)
+            idx_info, line = self._current_line
+            context, msg = self._split_by_context_separator(line)
+            if self._verbose:
+                print(self._str_padded_to(idx_info, self._max_len_idx),
+                      self._str_padded_to(context, self._max_len_ctx),
+                      msg)
             if context == _CHAR_PROMPT:
                 assert self.put_keypress is not None
                 for c in msg:
             if context == _CHAR_PROMPT:
                 assert self.put_keypress is not None
                 for c in msg:
@@ -228,6 +263,7 @@ class TestingClientTui(ClientTui):
     _path_config: Optional[Path] = None
     _path_logs = None
     _path_test: Path
     _path_config: Optional[Path] = None
     _path_logs = None
     _path_test: Path
+    _verbose: bool
 
     def __init__(self, **kwargs) -> None:
         path_config = PATH_TESTS.joinpath(self._path_test.stem + '.toml')
 
     def __init__(self, **kwargs) -> None:
         path_config = PATH_TESTS.joinpath(self._path_test.stem + '.toml')
@@ -235,7 +271,7 @@ class TestingClientTui(ClientTui):
             assert path_config.is_file()
             self._path_config = path_config
         self._clients = []
             assert path_config.is_file()
             self._path_config = path_config
         self._clients = []
-        self._playbook = _Playbook(path=self._path_test,
+        self._playbook = _Playbook(path=self._path_test, verbose=self._verbose,
                                    get_client=lambda idx: self._clients[idx])
         super().__init__(**kwargs)
         assert isinstance(self._term, TestTerminal)
                                    get_client=lambda idx: self._clients[idx])
         super().__init__(**kwargs)
         assert isinstance(self._term, TestTerminal)
@@ -246,10 +282,11 @@ class TestingClientTui(ClientTui):
         self._window_idx = idx
 
     @classmethod
         self._window_idx = idx
 
     @classmethod
-    def on_file(cls, path_test: Path):
+    def on_file(cls, path_test: Path, verbose: bool):
         'Return cls with ._path_test set.'
         class _Bound(TestingClientTui):
             _path_test = path_test
         'Return cls with ._path_test set.'
         class _Bound(TestingClientTui):
             _path_test = path_test
+            _verbose = verbose
         return _Bound
 
     def _new_client(self, conn_setup: IrcConnSetup, channels: set[str]):
         return _Bound
 
     def _new_client(self, conn_setup: IrcConnSetup, channels: set[str]):
index 0c54b38a5c5f15421fff89947a8ce8a4d3077e8c..336203d90c97fb612d31ca1ba8bdd8c93ebc3816 100755 (executable)
@@ -79,7 +79,9 @@ if __name__ == '__main__':
                         f'no test files in {PATH_TESTS} matching [{selector}]')
             for path in collected_paths:
                 print(f'running test: {path}')
                         f'no test files in {PATH_TESTS} matching [{selector}]')
             for path in collected_paths:
                 print(f'running test: {path}')
-                main_loop(TestTerminal, TestingClientTui.on_file(path))
+                main_loop(TestTerminal,
+                          TestingClientTui.on_file(path,
+                                                   len(collected_paths) == 1))
                 print('(success!)')
         else:
             raise _HandledException(f'unrecognized argument(s): {argv[1:]}')
                 print('(success!)')
         else:
             raise _HandledException(f'unrecognized argument(s): {argv[1:]}')