home · contact · privacy
Refactor playbook processing code out of TestingClientTui.
authorChristian Heller <c.heller@plomlompom.de>
Sun, 28 Sep 2025 16:46:40 +0000 (18:46 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Sun, 28 Sep 2025 16:46:40 +0000 (18:46 +0200)
src/ircplom/testing.py

index a8da567bf306bd380dcb50205e76457842c0fd58..5e32835d8279f2a67d9ee9b79e63d32b89e71bf6 100644 (file)
@@ -2,7 +2,7 @@
 from contextlib import contextmanager
 from queue import SimpleQueue, Empty as QueueEmpty
 from pathlib import Path
-from typing import Generator, Iterator, Optional
+from typing import Callable, Generator, Iterator, Optional
 from ircplom.events import Event, Loop, QueueMixin
 from ircplom.client import IrcConnection, IrcConnSetup
 from ircplom.client_tui import ClientKnowingTui, ClientTui
@@ -87,51 +87,78 @@ class _TestClientKnowingTui(ClientKnowingTui):
     _cls_conn = _FakeIrcConnection
 
 
-class TestingClientTui(ClientTui):
-    'Collects keypresses via TestTerminal and test file, compares log results.'
-    _client_cls = _TestClientKnowingTui
-    _clients: list[_TestClientKnowingTui]
-    _path_config: Optional[Path] = None
-    _path_logs = None
-    _path_test: Path
+class _Playbook:
+    put_keypress: Optional[Callable] = None
 
-    def __init__(self, **kwargs) -> None:
-        self._clients = []
-        with self._path_test.open('r', encoding='utf8') as f:
-            self._playbook = [line.rstrip() for line in f.readlines()]
-        self._playbook_anchors: dict[str, int] = {}
+    def __init__(self, path: Path, get_client: Callable) -> None:
+        self._get_client = get_client
+        with path.open('r', encoding='utf8') as f:
+            self._lines = [line.rstrip() for line in f.readlines()]
         while True:
             inserts: list[str] = []
-            self._playbook_anchors.clear()
-            for idx, line in enumerate(self._playbook):
+            anchors: dict[str, int] = {}
+            for idx, line in enumerate(self._lines):
                 if line[:1] == '|':
-                    self._playbook_anchors[line[1:]] = idx
-            for idx, line in enumerate(self._playbook):
+                    anchors[line[1:]] = idx
+            for idx, line in enumerate(self._lines):
                 split = self._split_active_line(line)
                 if (not split) or split[0] != 'repeat':
                     continue
                 range_data = split[1].split(' ', maxsplit=2)
                 start_key, end_key = range_data[:2]
-                start = self._playbook_anchors[start_key] + 1
-                end = self._playbook_anchors[end_key]
-                inserts = self._playbook[int(start):int(end)]
+                start = anchors[start_key] + 1
+                end = anchors[end_key]
+                inserts = self._lines[int(start):int(end)]
                 if len(range_data) == 3:
                     for jdx, insert in enumerate(inserts):
                         if (res := self._split_active_line(insert)):
                             inserts[jdx] = ' '.join([range_data[2]] + [res[1]])
-                self._playbook = (self._playbook[:idx]
-                                  + inserts
-                                  + self._playbook[idx + 1:])
+                self._lines =\
+                    self._lines[:idx] + inserts + self._lines[idx + 1:]
                 break
             if not inserts:
                 break
-        self._playbook_idx = 0
-        super().__init__(**kwargs)
-        assert isinstance(self._term, TestTerminal)
-        self._q_keypresses = self._term._q_keypresses
-        if self._playbook_idx == 0:  # __init__ called no ._play_till_next_log
+        self._idx = 0
+
+    @property
+    def _current_line(self) -> str:
+        return self._lines[self._idx]
+
+    def ensure_has_started(self) -> None:
+        'Check if still at beginning, and if so, play till at next log line.'
+        if self._idx == 0:
             self._play_till_next_log()
 
+    def next_log(self) -> tuple[int, tuple[int, ...], str]:
+        'Return index, win IDs, and context of next expected log line.'
+        context, msg = self._current_line.split(maxsplit=1)
+        if ':' in context:
+            _, context = context.split(':')
+        expected_win_ids = tuple(int(idx) for idx in context.split(',') if idx)
+        used_idx = self._idx
+        self._play_till_next_log()
+        return used_idx, expected_win_ids, msg
+
+    def _play_till_next_log(self) -> None:
+        while True:
+            self._idx += 1
+            if (result := self._split_active_line(self._current_line)):
+                context, msg = result
+                if context == '>':
+                    assert self.put_keypress is not None
+                    for c in msg:
+                        self.put_keypress(c)
+                    self.put_keypress('KEY_ENTER')
+                    continue
+                if ':' in context and msg.startswith('< '):
+                    client_id, win_ids = context.split(':')
+                    client = self._get_client(int(client_id))
+                    assert isinstance(client.conn, _FakeIrcConnection)
+                    client.conn.put_server_msg(msg[2:])
+                    if not win_ids:
+                        continue
+                break
+
     @staticmethod
     def _split_active_line(line: str) -> Optional[tuple[str, ...]]:
         'Return two-items tuple of split line, or None if inactive one.'
@@ -139,6 +166,24 @@ class TestingClientTui(ClientTui):
             return None
         return tuple(line.split(' ', maxsplit=1))
 
+
+class TestingClientTui(ClientTui):
+    'Collects keypresses via TestTerminal and test file, compares log results.'
+    _client_cls = _TestClientKnowingTui
+    _clients: list[_TestClientKnowingTui]
+    _path_config: Optional[Path] = None
+    _path_logs = None
+    _path_test: Path
+
+    def __init__(self, **kwargs) -> None:
+        self._clients = []
+        self._playbook = _Playbook(path=self._path_test,
+                                   get_client=lambda idx: self._clients[idx])
+        super().__init__(**kwargs)
+        assert isinstance(self._term, TestTerminal)
+        self._playbook.put_keypress = self._term._q_keypresses.put
+        self._playbook.ensure_has_started()  # if .__init__ didn't yet by log()
+
     @classmethod
     def on_files(cls, path_test: Path, path_config: Optional[Path] = None):
         'Return cls with ._path_test set.'
@@ -158,35 +203,10 @@ class TestingClientTui(ClientTui):
         for c in time_str[:2] + time_str[3:5] + time_str[6:]:
             assert c.isdigit()
         assert time_str[2] == ':' and time_str[5] == ':'
-        context, expected_msg = self._playbook[self._playbook_idx
-                                               ].split(maxsplit=1)
-        if ':' in context:
-            _, context = context.split(':')
-        expected_win_ids = tuple(int(idx) for idx in context.split(',') if idx)
-        info = (self._playbook_idx + 1,
+        idx, expected_win_ids, expected_msg = self._playbook.next_log()
+        info = (idx,
                 'WANTED:', expected_win_ids, expected_msg,
                 'GOT:', win_ids, msg_sans_time)
         assert expected_msg == msg_sans_time, info
         assert expected_win_ids == win_ids, info
-        self._play_till_next_log()
         return win_ids, logged_msg
-
-    def _play_till_next_log(self) -> None:
-        while True:
-            self._playbook_idx += 1
-            line = self._playbook[self._playbook_idx]
-            if (result := self._split_active_line(line)):
-                context, msg = result
-                if context == '>':
-                    for c in msg:
-                        self._q_keypresses.put(c)
-                    self._q_keypresses.put('KEY_ENTER')
-                    continue
-                if ':' in context and msg.startswith('< '):
-                    client_id, win_ids = context.split(':')
-                    client = self._clients[int(client_id)]
-                    assert isinstance(client.conn, _FakeIrcConnection)
-                    client.conn.put_server_msg(msg[2:])
-                    if not win_ids:
-                        continue
-                break