home · contact · privacy
In testing, have explicit Playbook testing breaks for prompt and server inputs too.
authorChristian Heller <c.heller@plomlompom.de>
Wed, 15 Oct 2025 02:18:16 +0000 (04:18 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Wed, 15 Oct 2025 02:18:16 +0000 (04:18 +0200)
src/ircplom/testing.py
src/ircplom/tui_base.py
src/tests/_timeout_retries.test
src/tests/pingpong.test
src/tests/test.test
src/tests/tui_draw.test

index c0ffd2a7e93637a6eeb55d649482b171a578971f..4b5e77d433602e3b2e9b32f27ee3743dde8870cf 100644 (file)
@@ -120,12 +120,29 @@ class _FakeIrcConnection(IrcConnection):
 
 class _TestClientKnowingTui(ClientKnowingTui):
     _cls_conn = _FakeIrcConnection
+    playbook: '_Playbook'
+    idx: int
 
     def connect(self) -> None:
         super().connect()
         if self.db.port > _FAKE_TIMEOUT_PORTS_BEYOND:
             self.db.port = self.db.port - 1
 
+    def _test_before(self, context: str, msg: str, cmp_msg: str) -> None:
+        assert context.startswith(_CHAR_SERVER_MSG)
+        assert context[len(_CHAR_SERVER_MSG):] == str(self.idx)
+        assert msg == cmp_msg
+
+    def on_handled_loop_exception(self, e: IrcConnException) -> None:
+        self.playbook.test_wrap(
+                lambda ctx, msg: self._test_before(ctx, msg, str(e)), None,
+                super().on_handled_loop_exception, e)
+
+    def handle_msg(self, msg: IrcMessage) -> None:
+        self.playbook.test_wrap(
+                lambda ctx, msg_: self._test_before(ctx, msg_, msg.raw), None,
+                super().handle_msg, msg)
+
 
 _CHAR_ANCHOR = '|'
 _CHAR_COMMENT = '#'
@@ -238,20 +255,27 @@ class _Playbook:
     def ensure_has_started(self) -> None:
         'Check if still at beginning, and if so, play till at next log line.'
         if self._idx == 0:
-            self._play_till_log()
-
-    def next_log(self) -> tuple[tuple[int, ...], str]:
-        'Return index, win IDs, and context of next expected log line.'
+            self._play_till_test()
+
+    def test_wrap(self,
+                  test_before: Optional[Callable],
+                  test_after: Optional[Callable],
+                  f: Callable,
+                  *f_args, **f_kwargs
+                  ) -> None:
+        'Call f with checks before and after against playbook, play it on.'
         context, msg = self._split_by_context_separator(self._current_line[1])
-        if _CHAR_RANGE in context:
-            _, context = context.split(_CHAR_RANGE)
-        expected_win_ids = tuple(
-                int(idx) for idx in context.split(_CHAR_WIN_ID_SEP) if idx)
+        if test_before:
+            test_before(context, msg)
         self._idx += 1
-        self._play_till_log()
-        return expected_win_ids, msg
-
-    def _play_till_log(self) -> None:
+        next_idx_before = self._idx
+        ret = f(*f_args, **f_kwargs)
+        if test_after:
+            test_after(context, msg, ret)
+        if self._idx == next_idx_before:  # f may have called ._play_till_test
+            self._play_till_test()        # so we avoid jumping over next test
+
+    def _play_till_test(self) -> None:
         while True:
             idx_info, line = self._current_line
             context, msg = self._split_by_context_separator(line)
@@ -264,11 +288,13 @@ class _Playbook:
                 for c in msg:
                     self.put_keypress(c)
                 self.put_keypress('KEY_ENTER')
-            elif context.startswith(_CHAR_SERVER_MSG):
+                break
+            if context.startswith(_CHAR_SERVER_MSG):
                 client = self._get_client(int(context[1:]))
                 assert isinstance(client.conn, _FakeIrcConnection), client.conn
                 client.conn.put_server_msg(msg)
-            elif context == _TOK_TUI:
+                break
+            if context == _TOK_TUI:
                 assert self.assert_screen_line is not None
                 assert self.redraw_affected is not None
                 self.redraw_affected()
@@ -323,20 +349,34 @@ class TestingClientTui(ClientTui):
         return _Bound
 
     def _new_client(self, conn_setup: IrcConnSetup, channels: set[str]):
-        self._clients += [super()._new_client(conn_setup, channels)]
-        return self._clients[-1]
-
-    def log(self, msg: str, **kwargs) -> tuple[tuple[int, ...], str]:
-        win_ids, logged_msg = super().log(msg, **kwargs)
-        fmt, time_str, msg_sans_time = logged_msg.split(' ', maxsplit=2)
-        msg_sans_time = fmt + ' ' + msg_sans_time
-        assert len(time_str) == 8
-        for c in time_str[:2] + time_str[3:5] + time_str[6:]:
-            assert c.isdigit()
-        assert time_str[2] == ':' and time_str[5] == ':'
-        expected_win_ids, expected_msg = self._playbook.next_log()
-        info = ('WANTED:', expected_win_ids, expected_msg,
-                'GOT:', win_ids, msg_sans_time)
-        assert expected_msg == msg_sans_time, info
-        assert expected_win_ids == win_ids, info
-        return win_ids, logged_msg
+        client = super()._new_client(conn_setup, channels)
+        client.playbook = self._playbook
+        client.idx = len(self._clients)
+        self._clients += [client]
+        return client
+
+    def log(self, msg: str, **kwargs) -> None:
+        def test_after(context: str, expected_msg: str, ret) -> None:
+            win_ids, logged_msg = ret
+            fmt, time_str, msg_sans_time = logged_msg.split(' ', maxsplit=2)
+            msg_sans_time = fmt + ' ' + msg_sans_time
+            assert len(time_str) == 8
+            for c in time_str[:2] + time_str[3:5] + time_str[6:]:
+                assert c.isdigit()
+            assert time_str[2] == ':' and time_str[5] == ':'
+            if _CHAR_RANGE in context:
+                _, context = context.split(_CHAR_RANGE)
+            expected_win_ids = tuple(
+                    int(idx) for idx in context.split(_CHAR_WIN_ID_SEP) if idx)
+            info = ('WANTED:', expected_win_ids, expected_msg,
+                    'GOT:', win_ids, msg_sans_time)
+            assert expected_msg == msg_sans_time, info
+            assert expected_win_ids == win_ids, info
+
+        self._playbook.test_wrap(None, test_after, super().log, msg, **kwargs)
+
+    def cmd__prompt_enter(self) -> None:
+        def test_before(context: str, msg: str) -> None:
+            assert context.startswith(_CHAR_PROMPT)
+            assert msg == self.window.prompt.input_buffer[:]
+        self._playbook.test_wrap(test_before, None, super().cmd__prompt_enter)
index bcea3c0d17c0abebd2e0a4d7a9b45d74254cadd8..ef3e60dbd3454dffdd31ff87a044b4527e7f0cdf 100644 (file)
@@ -278,18 +278,19 @@ class PromptWidget(_ScrollableWidget):
         return _PROMPT_TEMPLATE[:]
 
     @property
-    def _input_buffer(self) -> str:
+    def input_buffer(self) -> str:
+        'Always only copy of actual input buffer.'
         return self._input_buffer_unsafe[:]
 
-    @_input_buffer.setter
-    def _input_buffer(self, content) -> None:
+    @input_buffer.setter
+    def input_buffer(self, content) -> None:
         self.taint()
-        self._input_buffer_unsafe = content
+        self._input_buffer_unsafe = content[:]
 
     def _draw(self) -> None:
         prefix = self.prefix[:]
-        content = self._input_buffer
-        if self._cursor_x == len(self._input_buffer):
+        content = self.input_buffer
+        if self._cursor_x == len(self.input_buffer):
             content += ' '
         half_width = (self._sizes.x - len(prefix)) // 2
         offset = 0
@@ -310,13 +311,13 @@ class PromptWidget(_ScrollableWidget):
         self._write(to_write[cursor_x_to_write + 1:])
 
     def _archive_prompt(self) -> None:
-        self.append(self._input_buffer)
+        self.append(self.input_buffer)
         self._reset_buffer('')
 
     def _scroll(self, up: bool = True) -> None:
         super()._scroll(up)
         if up and -(self._history_idx_neg) < len(self._history):
-            if self._history_idx_neg == 0 and self._input_buffer:
+            if self._history_idx_neg == 0 and self.input_buffer:
                 self._archive_prompt()
                 self._history_idx_neg -= 1
             self._history_idx_neg -= 1
@@ -328,45 +329,44 @@ class PromptWidget(_ScrollableWidget):
                     self._reset_buffer('')
                 else:
                     self._reset_buffer(self._history[self._history_idx_neg])
-            elif self._input_buffer:
+            elif self.input_buffer:
                 self._archive_prompt()
 
     def insert(self, to_insert: str) -> None:
         'Insert into prompt input buffer.'
         self._cursor_x += len(to_insert)
-        self._input_buffer = (self._input_buffer[:self._cursor_x - 1]
-                              + to_insert
-                              + self._input_buffer[self._cursor_x - 1:])
+        self.input_buffer = (self.input_buffer[:self._cursor_x - 1]
+                             + to_insert
+                             + self.input_buffer[self._cursor_x - 1:])
         self._history_idx_neg = 0
 
     def cmd__backspace(self) -> None:
         'Truncate current content by one character, if possible.'
         if self._cursor_x > 0:
             self._cursor_x -= 1
-            self._input_buffer = (self._input_buffer[:self._cursor_x]
-                                  + self._input_buffer[self._cursor_x + 1:])
+            self.input_buffer = (self.input_buffer[:self._cursor_x]
+                                 + self.input_buffer[self._cursor_x + 1:])
             self._history_idx_neg = 0
 
     def cmd__move_cursor(self, direction: str) -> None:
         'Move cursor one space into direction ("left" or "right") if possible.'
         if direction == 'left' and self._cursor_x > 0:
             self._cursor_x -= 1
-        elif direction == 'right'\
-                and self._cursor_x < len(self._input_buffer):
+        elif direction == 'right' and self._cursor_x < len(self.input_buffer):
             self._cursor_x += 1
         else:
             return
         self.taint()
 
     def _reset_buffer(self, content: str) -> None:
-        self._input_buffer = content
-        self._cursor_x = len(self._input_buffer)
+        self.input_buffer = content
+        self._cursor_x = len(self.input_buffer)
 
     def enter(self) -> str:
         'Return current content while also clearing and then redrawing.'
-        to_return = self._input_buffer[:]
+        to_return = self.input_buffer
         if to_return:
-            self._archive_prompt()
+            self._archive_prompt()  # empties .input_buffer, thus use to_return
         return to_return
 
 
@@ -563,7 +563,7 @@ class BaseTui(QueueMixin):
         # separated to serve as hook for subclass window selection
         return [self.window]
 
-    def log(self, msg: str, **kwargs) -> tuple[tuple[int, ...], str]:
+    def log(self, msg: str, **kwargs) -> Optional[tuple[tuple[int, ...], str]]:
         'Write with timestamp, prefix to what window ._log_target_wins offers.'
         prefix = kwargs.get('prefix', _LOG_PREFIX_DEFAULT)
         now = str(datetime.now())
index 981190fb532eb069a1fe872acfcdbec3651296c5..f2cd61907aa14ab6f83cb2d3044291432f656268 100644 (file)
@@ -57,6 +57,7 @@
 <0 timeout
 1 ..> PING :what's up?
 <0 timeout
+<0 no timely PONG from server
 1 ..$ connection_state set to: [broken: no timely PONG from server]
 repeat isupport-clear-in isupport-clear-out
 1 ..$ connection_state set to: []
index 5f2d8c69b282c8869416f62db914b510d0f937c2..00582a39b9e435c4b37b9b08a4dee9733f696f08 100644 (file)
@@ -44,6 +44,7 @@
 # another timeout instead of pong? disconnect
 repeat trigger-ping-in trigger-ping-out
 <0 timeout
+<0 no timely PONG from server
 1 ..$ connection_state set to: [broken: no timely PONG from server]
 repeat isupport-clear-in isupport-clear-out
 1 ..$ connection_state set to: []
index c9c898b20b06df8cccac9749d477de43ab44682c..d9f6c5a51f1fdc8a472d16524c247e317ea34c3e 100644 (file)
@@ -336,11 +336,11 @@ repeat isupport-clear-in isupport-clear-out
 1 ..$ motd set to:
 1 ..$ sasl_account set to: []
 1 ..$ sasl_auth_state set to: []
+1 ..$ users cleared
 
 # fail to send in disconnect, check alert window is command prompt window
 > /window 6
 > /privmsg barbar test
-1 ..$ users cleared
 6 .!$ cannot send, connection seems closed
 > /window 1
 > /privmsg barbar test
@@ -390,10 +390,10 @@ repeat conn0 conn1 9
 repeat isupport-clear-in isupport-clear-out 9
 9 ..$ connection_state set to: []
 , ..$ DISCONNECTED
+9 .!$ will retry connecting in 1 seconds
 
 # check that (save TUI tests assuming start on window 0, and no 4 yet) on reconnect, all the same effects can be expected
 > /reconnect
-9 .!$ will retry connecting in 1 seconds
 1 ..$ connection_state set to: [connecting]
 1 ..$ connection_state set to: [connected]
 2,3,4,5,6,7 ..$ CONNECTED
index 15587c2f27f9fb7f66ad184e381e867787232d88..10e07ffbadc97918c5f15976d8870fa780562656 100644 (file)
@@ -70,7 +70,7 @@ TUI 21,13.on_black,bold,bright_red,bright_cyan.invalid prompt command: /foo_0123
 TUI 20,13.on_black,bold,bright_red,bright_cyan.invalid prompt command: /foo_0123456789_0123456789_012345678       
 TUI 21,0.on_black,bold,bright_red,bright_cyan.    unknown                                                                     
 # # check scrolling
-# > /window.prompt.scroll up
+# > /window.history.scroll up
 # > /foo_0123456789_0123456789_012345678
 # 0 .!# invalid prompt command: /foo_0123456789_0123456789_012345678 unknown
 # # > /foo_0123456789_0123456789_012345678