from contextlib import contextmanager
from queue import SimpleQueue, Empty as QueueEmpty
from pathlib import Path
-from typing import Generator, Iterator, Optional
+from typing import Callable, Generator, Iterator, Optional
from ircplom.events import Event, Loop, QueueMixin
from ircplom.client import IrcConnection, IrcConnSetup
from ircplom.client_tui import ClientKnowingTui, ClientTui
_cls_conn = _FakeIrcConnection
-class TestingClientTui(ClientTui):
- 'Collects keypresses via TestTerminal and test file, compares log results.'
- _client_cls = _TestClientKnowingTui
- _clients: list[_TestClientKnowingTui]
- _path_config: Optional[Path] = None
- _path_logs = None
- _path_test: Path
+class _Playbook:
+ put_keypress: Optional[Callable] = None
- def __init__(self, **kwargs) -> None:
- self._clients = []
- with self._path_test.open('r', encoding='utf8') as f:
- self._playbook = [line.rstrip() for line in f.readlines()]
- self._playbook_anchors: dict[str, int] = {}
+ def __init__(self, path: Path, get_client: Callable) -> None:
+ self._get_client = get_client
+ with path.open('r', encoding='utf8') as f:
+ self._lines = [line.rstrip() for line in f.readlines()]
while True:
inserts: list[str] = []
- self._playbook_anchors.clear()
- for idx, line in enumerate(self._playbook):
+ anchors: dict[str, int] = {}
+ for idx, line in enumerate(self._lines):
if line[:1] == '|':
- self._playbook_anchors[line[1:]] = idx
- for idx, line in enumerate(self._playbook):
+ anchors[line[1:]] = idx
+ for idx, line in enumerate(self._lines):
split = self._split_active_line(line)
if (not split) or split[0] != 'repeat':
continue
range_data = split[1].split(' ', maxsplit=2)
start_key, end_key = range_data[:2]
- start = self._playbook_anchors[start_key] + 1
- end = self._playbook_anchors[end_key]
- inserts = self._playbook[int(start):int(end)]
+ start = anchors[start_key] + 1
+ end = anchors[end_key]
+ inserts = self._lines[int(start):int(end)]
if len(range_data) == 3:
for jdx, insert in enumerate(inserts):
if (res := self._split_active_line(insert)):
inserts[jdx] = ' '.join([range_data[2]] + [res[1]])
- self._playbook = (self._playbook[:idx]
- + inserts
- + self._playbook[idx + 1:])
+ self._lines =\
+ self._lines[:idx] + inserts + self._lines[idx + 1:]
break
if not inserts:
break
- self._playbook_idx = 0
- super().__init__(**kwargs)
- assert isinstance(self._term, TestTerminal)
- self._q_keypresses = self._term._q_keypresses
- if self._playbook_idx == 0: # __init__ called no ._play_till_next_log
+ self._idx = 0
+
+ @property
+ def _current_line(self) -> str:
+ return self._lines[self._idx]
+
+ def ensure_has_started(self) -> None:
+ 'Check if still at beginning, and if so, play till at next log line.'
+ if self._idx == 0:
self._play_till_next_log()
+ def next_log(self) -> tuple[int, tuple[int, ...], str]:
+ 'Return index, win IDs, and context of next expected log line.'
+ context, msg = self._current_line.split(maxsplit=1)
+ if ':' in context:
+ _, context = context.split(':')
+ expected_win_ids = tuple(int(idx) for idx in context.split(',') if idx)
+ used_idx = self._idx
+ self._play_till_next_log()
+ return used_idx, expected_win_ids, msg
+
+ def _play_till_next_log(self) -> None:
+ while True:
+ self._idx += 1
+ if (result := self._split_active_line(self._current_line)):
+ context, msg = result
+ if context == '>':
+ assert self.put_keypress is not None
+ for c in msg:
+ self.put_keypress(c)
+ self.put_keypress('KEY_ENTER')
+ continue
+ if ':' in context and msg.startswith('< '):
+ client_id, win_ids = context.split(':')
+ client = self._get_client(int(client_id))
+ assert isinstance(client.conn, _FakeIrcConnection)
+ client.conn.put_server_msg(msg[2:])
+ if not win_ids:
+ continue
+ break
+
@staticmethod
def _split_active_line(line: str) -> Optional[tuple[str, ...]]:
'Return two-items tuple of split line, or None if inactive one.'
return None
return tuple(line.split(' ', maxsplit=1))
+
+class TestingClientTui(ClientTui):
+ 'Collects keypresses via TestTerminal and test file, compares log results.'
+ _client_cls = _TestClientKnowingTui
+ _clients: list[_TestClientKnowingTui]
+ _path_config: Optional[Path] = None
+ _path_logs = None
+ _path_test: Path
+
+ def __init__(self, **kwargs) -> None:
+ self._clients = []
+ self._playbook = _Playbook(path=self._path_test,
+ get_client=lambda idx: self._clients[idx])
+ super().__init__(**kwargs)
+ assert isinstance(self._term, TestTerminal)
+ self._playbook.put_keypress = self._term._q_keypresses.put
+ self._playbook.ensure_has_started() # if .__init__ didn't yet by log()
+
@classmethod
def on_files(cls, path_test: Path, path_config: Optional[Path] = None):
'Return cls with ._path_test set.'
for c in time_str[:2] + time_str[3:5] + time_str[6:]:
assert c.isdigit()
assert time_str[2] == ':' and time_str[5] == ':'
- context, expected_msg = self._playbook[self._playbook_idx
- ].split(maxsplit=1)
- if ':' in context:
- _, context = context.split(':')
- expected_win_ids = tuple(int(idx) for idx in context.split(',') if idx)
- info = (self._playbook_idx + 1,
+ idx, expected_win_ids, expected_msg = self._playbook.next_log()
+ info = (idx,
'WANTED:', expected_win_ids, expected_msg,
'GOT:', win_ids, msg_sans_time)
assert expected_msg == msg_sans_time, info
assert expected_win_ids == win_ids, info
- self._play_till_next_log()
return win_ids, logged_msg
-
- def _play_till_next_log(self) -> None:
- while True:
- self._playbook_idx += 1
- line = self._playbook[self._playbook_idx]
- if (result := self._split_active_line(line)):
- context, msg = result
- if context == '>':
- for c in msg:
- self._q_keypresses.put(c)
- self._q_keypresses.put('KEY_ENTER')
- continue
- if ':' in context and msg.startswith('< '):
- client_id, win_ids = context.split(':')
- client = self._clients[int(client_id)]
- assert isinstance(client.conn, _FakeIrcConnection)
- client.conn.put_server_msg(msg[2:])
- if not win_ids:
- continue
- break