home · contact · privacy
Add server reply simulation to testing, move into own module.
authorChristian Heller <c.heller@plomlompom.de>
Mon, 15 Sep 2025 05:35:42 +0000 (07:35 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Mon, 15 Sep 2025 05:35:42 +0000 (07:35 +0200)
ircplom.py
ircplom/client.py
ircplom/client_tui.py
ircplom/irc_conn.py
ircplom/testing.py [new file with mode: 0644]
ircplom/tui_base.py

index 0522008d14a83519989e0e6b761518de38f58401..42f9dc91719d0c254e41486213407843757a1439 100755 (executable)
@@ -4,9 +4,9 @@ from queue import SimpleQueue
 from sys import argv
 from ircplom.events import ExceptionEvent, QuitEvent
 from ircplom.client import ClientsDb, ClientEvent, NewClientEvent
-from ircplom.tui_base import (BaseTui, Terminal, TerminalInterface,
-                              TestTerminal, TestTui, TuiEvent)
+from ircplom.tui_base import BaseTui, Terminal, TerminalInterface, TuiEvent
 from ircplom.client_tui import ClientTui
+from ircplom.testing import TestTerminal, TestingClientTui
 
 
 def main_loop(cls_term: type[TerminalInterface], cls_tui: type[BaseTui]
@@ -36,7 +36,7 @@ def main_loop(cls_term: type[TerminalInterface], cls_tui: type[BaseTui]
 
 if __name__ == '__main__':
     if len(argv) > 1 and argv[1] == 'test':
-        main_loop(TestTerminal, TestTui)
+        main_loop(TestTerminal, TestingClientTui)
         print('test finished')
     else:
         main_loop(Terminal, ClientTui)
index 1cd98787a390baedab5a27f62d17ecc90ae656d8..a5fe3e9377f6ddb9467fbf0c5bb12560c61a7547 100644 (file)
@@ -708,10 +708,14 @@ class _CapsManager(_Clearable):
         return False
 
 
-class Client(ABC, ClientQueueMixin):
+_ClientConnClass = TypeVar('_ClientConnClass', bound=_IrcConnection)
+
+
+class Client(ABC, ClientQueueMixin, Generic[_ClientConnClass]):
     'Abstracts socket connection, loop over it, and handling messages from it.'
     _caps: _CapsManager
     conn: Optional[_IrcConnection] = None
+    _cls_conn: _ClientConnClass
 
     def __init__(self, conn_setup: IrcConnSetup, **kwargs) -> None:
         self.client_id = conn_setup.hostname
@@ -730,7 +734,7 @@ class Client(ABC, ClientQueueMixin):
             try:
                 if self.conn:
                     raise IrcConnAbortException('already connected')
-                self.conn = _IrcConnection(
+                self.conn = self._cls_conn(
                     hostname=self.db.hostname, port=self.db.port,
                     _q_out=self._q_out, client_id=self.client_id)
                 self._client_trigger('_on_connect')
index 4b338b5917cd658d2f482123d10191fb51abe15b..8f409697470090f86221b5daa3313be1c659d7d0 100644 (file)
@@ -431,6 +431,9 @@ class ClientTui(BaseTui):
         if getattr(self._client_mngrs[client_id], todo)(**kwargs) is not False:
             self.redraw_affected()
 
+    def _new_client(self, conn_setup: IrcConnSetup) -> 'ClientKnowingTui':
+        return ClientKnowingTui(_q_out=self._q_out, conn_setup=conn_setup)
+
     def cmd__connect(self,
                      host_port: str,
                      nickname_pw: str = '',
@@ -453,15 +456,12 @@ class ClientTui(BaseTui):
         password = split[1] if len(split) > 1 else ''
         if not realname:
             realname = nickname
-        self._put(NewClientEvent(
-            _ClientKnowingTui(
-                _q_out=self._q_out,
-                conn_setup=IrcConnSetup(
-                    hostname, port, nickname, realname, password))))
+        self._put(NewClientEvent(self._new_client(IrcConnSetup(
+            hostname, port, nickname, realname, password))))
         return None
 
 
-class _ClientKnowingTui(Client):
+class ClientKnowingTui(Client):
 
     def _client_tui_trigger(self, todo: str, **kwargs) -> None:
         self._put(TuiEvent.affector('for_client_do').kw(
index add211d631d7fd96634df47d2b8c3bba2991d4be..af04814b4b73254c9775bfd69acfd86b59f3c50d 100644 (file)
@@ -136,8 +136,12 @@ class BaseIrcConnection(QueueMixin, ABC):
 
     def __init__(self, hostname: str, port: int, **kwargs) -> None:
         super().__init__(**kwargs)
-        self._socket = socket()
         self.ssl = port == PORT_SSL
+        self._set_up_socket(hostname, port)
+        self._recv_loop = Loop(iterator=self._read_lines(), _q_out=self._q_out)
+
+    def _set_up_socket(self, hostname: str, port: int) -> None:
+        self._socket = socket()
         if self.ssl:
             self._socket = create_ssl_context().wrap_socket(
                 self._socket, server_hostname=hostname)
@@ -147,7 +151,6 @@ class BaseIrcConnection(QueueMixin, ABC):
         except (TimeoutError, socket_gaierror) as e:
             raise IrcConnAbortException(e) from e
         self._socket.settimeout(_TIMEOUT_RECV_LOOP)
-        self._recv_loop = Loop(iterator=self._read_lines(), _q_out=self._q_out)
 
     def close(self) -> None:
         'Stop recv loop and close socket.'
diff --git a/ircplom/testing.py b/ircplom/testing.py
new file mode 100644 (file)
index 0000000..56afde7
--- /dev/null
@@ -0,0 +1,186 @@
+'Basic testing.'
+from contextlib import contextmanager
+from queue import SimpleQueue, Empty as QueueEmpty
+from typing import Generator, Iterator, Optional
+from ircplom.events import Event, Loop, QueueMixin
+from ircplom.client import _IrcConnection, IrcConnSetup
+from ircplom.client_tui import ClientKnowingTui, ClientTui
+from ircplom.irc_conn import IrcMessage
+from ircplom.tui_base import TerminalInterface, TuiEvent
+
+
+_PLAYBOOK: tuple[str, ...] = (
+    '> /help',
+    '0 # commands available in this window:',
+    '0 #   /connect HOST_PORT [NICKNAME_PW] [REALNAME]',
+    '0 #   /help ',
+    '0 #   /list ',
+    '0 #   /prompt_enter ',
+    '0 #   /quit ',
+    '0 #   /window TOWARDS',
+    '0 #   /window.history.scroll DIRECTION',
+    '0 #   /window.paste ',
+    '0 #   /window.prompt.backspace ',
+    '0 #   /window.prompt.move_cursor DIRECTION',
+    '0 #   /window.prompt.scroll DIRECTION',
+    '> /list',
+    '0 # windows available via /window:',
+    '0 #   0) :start',
+    '> /connect foo.bar.baz foo:bar baz',
+    '1,2 $ caps cleared',
+    '1,2 $ caps cleared',
+    '1,2 $ hostname set to: [foo.bar.baz]',
+    '1,2 $ port set to: [-1]',
+    '1,2 $ nick_wanted set to: [foo]',
+    '1,2 $ realname set to: [baz]',
+    '1,2 $ password set to: [bar]',
+    '1,2 $ port set to: [6697]',
+    '1,2 $ connection_state set to: [connecting]',
+    '1,2 $ channels cleared',
+    '1,2 $ users cleared',
+    '1,2 $ ?!?@? renames ?',
+    '1,2 $ users:me:user set to: [plom]',
+    '1,2 $ connection_state set to: [connected]',
+    '1,2 $ caps cleared',
+    '2 > CAP LS :302',
+    '2 > USER plom 0 * :baz',
+    '2 > NICK :foo',
+    '2 < :*.?.net NOTICE * :*** Looking up your ident...',
+    '1,2 $$$ *** Looking up your ident...',
+    '2 < :*.?.net NOTICE * :*** Looking up your hostname...',
+    '1,2 $$$ *** Looking up your hostname...',
+    '2 < :*.?.net NOTICE * :*** Found your hostname (…) -- cached...',
+    '1,2 $$$ *** Found your hostname (…) -- cached...',
+    '2 < :*.?.net CAP * LS : foo bar sasl=PLAIN,EXTERNAL baz',
+    '1,2 $ isupport cleared',
+    '2 > CAP REQ :sasl',
+    '2 > CAP :LIST',
+    '2 < PING :?',
+    '2 > PONG :?',
+    '> /quit',
+    '2 < :*.?.net CAP ? ACK :sasl',
+    '2 < :*.?.net CAP ? LIST :cap-notify sasl',
+    '0 < ',
+)
+
+
+class TestTerminal(QueueMixin, TerminalInterface):
+    'Collects keypresses from string queue, otherwise mostly dummy.'
+
+    def __init__(self, **kwargs) -> None:
+        super().__init__(**kwargs)
+        self._q_keypresses: SimpleQueue = SimpleQueue()
+
+    @contextmanager
+    def setup(self) -> Generator:
+        with Loop(iterator=self._get_keypresses(), _q_out=self._q_out):
+            yield self
+
+    def flush(self) -> None:
+        pass
+
+    def calc_geometry(self) -> None:
+        self.size = TerminalInterface.__annotations__['size'](0, 0)
+
+    def wrap(self, line: str) -> list[str]:
+        return []
+
+    def write(self,
+              msg: str = '',
+              start_y: Optional[int] = None,
+              attribute: Optional[str] = None,
+              padding: bool = True
+              ) -> None:
+        pass
+
+    def _get_keypresses(self) -> Iterator[Optional[TuiEvent]]:
+        while True:
+            try:
+                to_yield = self._q_keypresses.get(timeout=0.1)
+            except QueueEmpty:
+                break
+            yield TuiEvent.affector('handle_keyboard_event'
+                                    ).kw(typed_in=to_yield)
+
+
+class _FakeIrcConnection(_IrcConnection):
+
+    def __init__(self, **kwargs) -> None:
+        self._q_server_msgs: SimpleQueue = SimpleQueue()
+        super().__init__(**kwargs)
+
+    def put_server_msg(self, msg: str) -> None:
+        'Simulate message coming from server.'
+        self._q_server_msgs.put(msg)
+
+    def _set_up_socket(self, hostname: str, port: int) -> None:
+        pass
+
+    def close(self) -> None:
+        self._recv_loop.stop()
+
+    def send(self, msg: IrcMessage) -> None:
+        pass
+
+    def _read_lines(self) -> Iterator[Optional[Event]]:
+        while True:
+            try:
+                msg = self._q_server_msgs.get(timeout=0.1)
+            except QueueEmpty:
+                break
+            yield self._make_recv_event(IrcMessage.from_raw(msg))
+
+
+class _TestClientKnowingTui(ClientKnowingTui):
+    _cls_conn = _FakeIrcConnection
+
+
+class TestingClientTui(ClientTui):
+    'Collects keypresses via TestTerminal from PLAYBOOK, compares log results.'
+    _client: _TestClientKnowingTui
+
+    def __init__(self, **kwargs) -> None:
+        super().__init__(**kwargs)
+        assert isinstance(self._term, TestTerminal)
+        self._q_keypresses = self._term._q_keypresses
+        self._playbook = _PLAYBOOK
+        self._playbook_idx = -1
+        self._play_till_next_log()
+
+    def _new_client(self, conn_setup: IrcConnSetup) -> _TestClientKnowingTui:
+        self._client = _TestClientKnowingTui(_q_out=self._q_out,
+                                             conn_setup=conn_setup)
+        return self._client
+
+    def _log(self, msg: str, **kwargs) -> tuple[tuple[int, ...], str]:
+        win_ids, logged_msg = super()._log(msg, **kwargs)
+        time_str, msg_sans_time = logged_msg.split(' ', maxsplit=1)
+        assert len(time_str) == 8
+        for c in time_str[:2] + time_str[3:5] + time_str[6:]:
+            assert c.isdigit()
+        assert time_str[2] == ':' and time_str[5] == ':'
+        context, expected_msg = self._playbook[self._playbook_idx
+                                               ].split(maxsplit=1)
+        expected_win_ids = tuple(int(idx) for idx in context.split(','))
+        info = (self._playbook_idx,
+                expected_win_ids, win_ids,
+                expected_msg, msg_sans_time)
+        assert expected_msg == msg_sans_time, info
+        assert expected_win_ids == win_ids, info
+        self._play_till_next_log()
+        return win_ids, logged_msg
+
+    def _play_till_next_log(self) -> None:
+        while True:
+            self._playbook_idx += 1
+            line = self._playbook[self._playbook_idx]
+            context, msg = line.split(' ', maxsplit=1)
+            if context == '>':
+                for c in msg:
+                    self._q_keypresses.put(c)
+                self._q_keypresses.put('KEY_ENTER')
+            else:
+                if context == '2' and msg[:2] == '< ':
+                    assert isinstance(self._client.conn, _FakeIrcConnection)
+                    self._client.conn.put_server_msg(msg[2:])
+                break
index 91ca6e63c245d215c170df979920cb8d43a81efd..41406b813f4099b2543b2102f8c27f1fc347c5a9 100644 (file)
@@ -5,7 +5,6 @@ from base64 import b64decode
 from contextlib import contextmanager
 from datetime import datetime
 from inspect import _empty as inspect_empty, signature, stack
-from queue import SimpleQueue, Empty as QueueEmpty
 from signal import SIGWINCH, signal
 from typing import (Callable, Generator, Iterator, NamedTuple, Optional,
                     Sequence)
@@ -443,15 +442,18 @@ class BaseTui(QueueMixin):
         # separated to serve as hook for subclass window selection
         return [self.window]
 
-    def _log(self, msg: str, **kwargs) -> None:
+    def _log(self, msg: str, **kwargs) -> tuple[tuple[int, ...], str]:
         prefix = kwargs.get('prefix', _LOG_PREFIX_DEFAULT)
         if kwargs.get('alert', False):
             prefix = _LOG_PREFIX_ALERT + prefix
         msg = f'{str(datetime.now())[11:19]} {prefix} {msg}'
+        affected_win_indices = []
         for win in self._log_target_wins(**kwargs):
+            affected_win_indices += [win.idx]
             win.history.append(msg)
             if win != self.window:
                 self._status_line.taint()
+        return tuple(affected_win_indices), msg
 
     def _new_window(self, win_class=Window, **kwargs) -> Window:
         new_idx = len(self._windows)
@@ -717,92 +719,3 @@ class Terminal(QueueMixin, TerminalInterface):
             yield (TuiEvent.affector('handle_keyboard_event'
                                      ).kw(typed_in=to_yield) if to_yield
                    else None)
-
-
-PLAYBOOK: tuple[str, ...] = (
-    '<',
-    '>/help',
-    '<commands available in this window:',
-    '<  /help ',
-    '<  /list ',
-    '<  /prompt_enter ',
-    '<  /quit ',
-    '<  /window TOWARDS',
-    '<  /window.history.scroll DIRECTION',
-    '<  /window.paste ',
-    '<  /window.prompt.backspace ',
-    '<  /window.prompt.move_cursor DIRECTION',
-    '<  /window.prompt.scroll DIRECTION',
-    '>/list',
-    '<windows available via /window:',
-    '<  0) :start',
-    '>/quit',
-    '<'
-)
-
-
-class TestTerminal(QueueMixin, TerminalInterface):
-    'Collects keypresses from string queue, otherwise mostly dummy.'
-
-    def __init__(self, **kwargs) -> None:
-        super().__init__(**kwargs)
-        self._q_keypresses: SimpleQueue = SimpleQueue()
-
-    @contextmanager
-    def setup(self) -> Generator:
-        with Loop(iterator=self._get_keypresses(), _q_out=self._q_out):
-            yield self
-
-    def flush(self) -> None:
-        pass
-
-    def calc_geometry(self) -> None:
-        self.size = _YX(0, 0)
-
-    def wrap(self, line: str) -> list[str]:
-        return []
-
-    def write(self,
-              msg: str = '',
-              start_y: Optional[int] = None,
-              attribute: Optional[str] = None,
-              padding: bool = True
-              ) -> None:
-        pass
-
-    def _get_keypresses(self) -> Iterator[Optional[TuiEvent]]:
-        while True:
-            try:
-                to_yield = self._q_keypresses.get(timeout=0.1)
-            except QueueEmpty:
-                break
-            yield TuiEvent.affector('handle_keyboard_event'
-                                    ).kw(typed_in=to_yield)
-
-
-class TestTui(BaseTui):
-    'Collects keypresses via TestTerminal from PLAYBOOK, compares log results.'
-
-    def __init__(self, **kwargs) -> None:
-        super().__init__(**kwargs)
-        self._test_idx = 0
-        assert isinstance(self._term, TestTerminal)
-        self._q_keypresses = self._term._q_keypresses
-        self._log('')
-
-    def _log(self, msg: str, **kwargs) -> None:
-        entry = PLAYBOOK[self._test_idx]
-        direction, expected = entry[0], entry[1:]
-        assert direction == '<'
-        assert expected == msg, (self._test_idx, expected, msg)
-        super()._log(msg, **kwargs)
-        while True:
-            self._test_idx += 1
-            entry = PLAYBOOK[self._test_idx]
-            direction, msg = entry[0], entry[1:]
-            if direction == '>':
-                for c in msg:
-                    self._q_keypresses.put(c)
-                self._q_keypresses.put('KEY_ENTER')
-            else:
-                break