From: Christian Heller Date: Sun, 28 Sep 2025 16:46:40 +0000 (+0200) Subject: Refactor playbook processing code out of TestingClientTui. X-Git-Url: https://plomlompom.com/repos/?a=commitdiff_plain;h=d4e0bebdf5552617e67f88319757fa6772105f82;p=ircplom Refactor playbook processing code out of TestingClientTui. --- diff --git a/src/ircplom/testing.py b/src/ircplom/testing.py index a8da567..5e32835 100644 --- a/src/ircplom/testing.py +++ b/src/ircplom/testing.py @@ -2,7 +2,7 @@ from contextlib import contextmanager from queue import SimpleQueue, Empty as QueueEmpty from pathlib import Path -from typing import Generator, Iterator, Optional +from typing import Callable, Generator, Iterator, Optional from ircplom.events import Event, Loop, QueueMixin from ircplom.client import IrcConnection, IrcConnSetup from ircplom.client_tui import ClientKnowingTui, ClientTui @@ -87,51 +87,78 @@ class _TestClientKnowingTui(ClientKnowingTui): _cls_conn = _FakeIrcConnection -class TestingClientTui(ClientTui): - 'Collects keypresses via TestTerminal and test file, compares log results.' - _client_cls = _TestClientKnowingTui - _clients: list[_TestClientKnowingTui] - _path_config: Optional[Path] = None - _path_logs = None - _path_test: Path +class _Playbook: + put_keypress: Optional[Callable] = None - def __init__(self, **kwargs) -> None: - self._clients = [] - with self._path_test.open('r', encoding='utf8') as f: - self._playbook = [line.rstrip() for line in f.readlines()] - self._playbook_anchors: dict[str, int] = {} + def __init__(self, path: Path, get_client: Callable) -> None: + self._get_client = get_client + with path.open('r', encoding='utf8') as f: + self._lines = [line.rstrip() for line in f.readlines()] while True: inserts: list[str] = [] - self._playbook_anchors.clear() - for idx, line in enumerate(self._playbook): + anchors: dict[str, int] = {} + for idx, line in enumerate(self._lines): if line[:1] == '|': - self._playbook_anchors[line[1:]] = idx - for idx, line in enumerate(self._playbook): + anchors[line[1:]] = idx + for idx, line in enumerate(self._lines): split = self._split_active_line(line) if (not split) or split[0] != 'repeat': continue range_data = split[1].split(' ', maxsplit=2) start_key, end_key = range_data[:2] - start = self._playbook_anchors[start_key] + 1 - end = self._playbook_anchors[end_key] - inserts = self._playbook[int(start):int(end)] + start = anchors[start_key] + 1 + end = anchors[end_key] + inserts = self._lines[int(start):int(end)] if len(range_data) == 3: for jdx, insert in enumerate(inserts): if (res := self._split_active_line(insert)): inserts[jdx] = ' '.join([range_data[2]] + [res[1]]) - self._playbook = (self._playbook[:idx] - + inserts - + self._playbook[idx + 1:]) + self._lines =\ + self._lines[:idx] + inserts + self._lines[idx + 1:] break if not inserts: break - self._playbook_idx = 0 - super().__init__(**kwargs) - assert isinstance(self._term, TestTerminal) - self._q_keypresses = self._term._q_keypresses - if self._playbook_idx == 0: # __init__ called no ._play_till_next_log + self._idx = 0 + + @property + def _current_line(self) -> str: + return self._lines[self._idx] + + def ensure_has_started(self) -> None: + 'Check if still at beginning, and if so, play till at next log line.' + if self._idx == 0: self._play_till_next_log() + def next_log(self) -> tuple[int, tuple[int, ...], str]: + 'Return index, win IDs, and context of next expected log line.' + context, msg = self._current_line.split(maxsplit=1) + if ':' in context: + _, context = context.split(':') + expected_win_ids = tuple(int(idx) for idx in context.split(',') if idx) + used_idx = self._idx + self._play_till_next_log() + return used_idx, expected_win_ids, msg + + def _play_till_next_log(self) -> None: + while True: + self._idx += 1 + if (result := self._split_active_line(self._current_line)): + context, msg = result + if context == '>': + assert self.put_keypress is not None + for c in msg: + self.put_keypress(c) + self.put_keypress('KEY_ENTER') + continue + if ':' in context and msg.startswith('< '): + client_id, win_ids = context.split(':') + client = self._get_client(int(client_id)) + assert isinstance(client.conn, _FakeIrcConnection) + client.conn.put_server_msg(msg[2:]) + if not win_ids: + continue + break + @staticmethod def _split_active_line(line: str) -> Optional[tuple[str, ...]]: 'Return two-items tuple of split line, or None if inactive one.' @@ -139,6 +166,24 @@ class TestingClientTui(ClientTui): return None return tuple(line.split(' ', maxsplit=1)) + +class TestingClientTui(ClientTui): + 'Collects keypresses via TestTerminal and test file, compares log results.' + _client_cls = _TestClientKnowingTui + _clients: list[_TestClientKnowingTui] + _path_config: Optional[Path] = None + _path_logs = None + _path_test: Path + + def __init__(self, **kwargs) -> None: + self._clients = [] + self._playbook = _Playbook(path=self._path_test, + get_client=lambda idx: self._clients[idx]) + super().__init__(**kwargs) + assert isinstance(self._term, TestTerminal) + self._playbook.put_keypress = self._term._q_keypresses.put + self._playbook.ensure_has_started() # if .__init__ didn't yet by log() + @classmethod def on_files(cls, path_test: Path, path_config: Optional[Path] = None): 'Return cls with ._path_test set.' @@ -158,35 +203,10 @@ class TestingClientTui(ClientTui): for c in time_str[:2] + time_str[3:5] + time_str[6:]: assert c.isdigit() assert time_str[2] == ':' and time_str[5] == ':' - context, expected_msg = self._playbook[self._playbook_idx - ].split(maxsplit=1) - if ':' in context: - _, context = context.split(':') - expected_win_ids = tuple(int(idx) for idx in context.split(',') if idx) - info = (self._playbook_idx + 1, + idx, expected_win_ids, expected_msg = self._playbook.next_log() + info = (idx, 'WANTED:', expected_win_ids, expected_msg, 'GOT:', win_ids, msg_sans_time) assert expected_msg == msg_sans_time, info assert expected_win_ids == win_ids, info - self._play_till_next_log() return win_ids, logged_msg - - def _play_till_next_log(self) -> None: - while True: - self._playbook_idx += 1 - line = self._playbook[self._playbook_idx] - if (result := self._split_active_line(line)): - context, msg = result - if context == '>': - for c in msg: - self._q_keypresses.put(c) - self._q_keypresses.put('KEY_ENTER') - continue - if ':' in context and msg.startswith('< '): - client_id, win_ids = context.split(':') - client = self._clients[int(client_id)] - assert isinstance(client.conn, _FakeIrcConnection) - client.conn.put_server_msg(msg[2:]) - if not win_ids: - continue - break