class _TestClientKnowingTui(ClientKnowingTui):
_cls_conn = _FakeIrcConnection
+ playbook: '_Playbook'
+ idx: int
def connect(self) -> None:
super().connect()
if self.db.port > _FAKE_TIMEOUT_PORTS_BEYOND:
self.db.port = self.db.port - 1
+ def _test_before(self, context: str, msg: str, cmp_msg: str) -> None:
+ assert context.startswith(_CHAR_SERVER_MSG)
+ assert context[len(_CHAR_SERVER_MSG):] == str(self.idx)
+ assert msg == cmp_msg
+
+ def on_handled_loop_exception(self, e: IrcConnException) -> None:
+ self.playbook.test_wrap(
+ lambda ctx, msg: self._test_before(ctx, msg, str(e)), None,
+ super().on_handled_loop_exception, e)
+
+ def handle_msg(self, msg: IrcMessage) -> None:
+ self.playbook.test_wrap(
+ lambda ctx, msg_: self._test_before(ctx, msg_, msg.raw), None,
+ super().handle_msg, msg)
+
_CHAR_ANCHOR = '|'
_CHAR_COMMENT = '#'
def ensure_has_started(self) -> None:
'Check if still at beginning, and if so, play till at next log line.'
if self._idx == 0:
- self._play_till_log()
-
- def next_log(self) -> tuple[tuple[int, ...], str]:
- 'Return index, win IDs, and context of next expected log line.'
+ self._play_till_test()
+
+ def test_wrap(self,
+ test_before: Optional[Callable],
+ test_after: Optional[Callable],
+ f: Callable,
+ *f_args, **f_kwargs
+ ) -> None:
+ 'Call f with checks before and after against playbook, play it on.'
context, msg = self._split_by_context_separator(self._current_line[1])
- if _CHAR_RANGE in context:
- _, context = context.split(_CHAR_RANGE)
- expected_win_ids = tuple(
- int(idx) for idx in context.split(_CHAR_WIN_ID_SEP) if idx)
+ if test_before:
+ test_before(context, msg)
self._idx += 1
- self._play_till_log()
- return expected_win_ids, msg
-
- def _play_till_log(self) -> None:
+ next_idx_before = self._idx
+ ret = f(*f_args, **f_kwargs)
+ if test_after:
+ test_after(context, msg, ret)
+ if self._idx == next_idx_before: # f may have called ._play_till_test
+ self._play_till_test() # so we avoid jumping over next test
+
+ def _play_till_test(self) -> None:
while True:
idx_info, line = self._current_line
context, msg = self._split_by_context_separator(line)
for c in msg:
self.put_keypress(c)
self.put_keypress('KEY_ENTER')
- elif context.startswith(_CHAR_SERVER_MSG):
+ break
+ if context.startswith(_CHAR_SERVER_MSG):
client = self._get_client(int(context[1:]))
assert isinstance(client.conn, _FakeIrcConnection), client.conn
client.conn.put_server_msg(msg)
- elif context == _TOK_TUI:
+ break
+ if context == _TOK_TUI:
assert self.assert_screen_line is not None
assert self.redraw_affected is not None
self.redraw_affected()
return _Bound
def _new_client(self, conn_setup: IrcConnSetup, channels: set[str]):
- self._clients += [super()._new_client(conn_setup, channels)]
- return self._clients[-1]
-
- def log(self, msg: str, **kwargs) -> tuple[tuple[int, ...], str]:
- win_ids, logged_msg = super().log(msg, **kwargs)
- fmt, time_str, msg_sans_time = logged_msg.split(' ', maxsplit=2)
- msg_sans_time = fmt + ' ' + msg_sans_time
- assert len(time_str) == 8
- for c in time_str[:2] + time_str[3:5] + time_str[6:]:
- assert c.isdigit()
- assert time_str[2] == ':' and time_str[5] == ':'
- expected_win_ids, expected_msg = self._playbook.next_log()
- info = ('WANTED:', expected_win_ids, expected_msg,
- 'GOT:', win_ids, msg_sans_time)
- assert expected_msg == msg_sans_time, info
- assert expected_win_ids == win_ids, info
- return win_ids, logged_msg
+ client = super()._new_client(conn_setup, channels)
+ client.playbook = self._playbook
+ client.idx = len(self._clients)
+ self._clients += [client]
+ return client
+
+ def log(self, msg: str, **kwargs) -> None:
+ def test_after(context: str, expected_msg: str, ret) -> None:
+ win_ids, logged_msg = ret
+ fmt, time_str, msg_sans_time = logged_msg.split(' ', maxsplit=2)
+ msg_sans_time = fmt + ' ' + msg_sans_time
+ assert len(time_str) == 8
+ for c in time_str[:2] + time_str[3:5] + time_str[6:]:
+ assert c.isdigit()
+ assert time_str[2] == ':' and time_str[5] == ':'
+ if _CHAR_RANGE in context:
+ _, context = context.split(_CHAR_RANGE)
+ expected_win_ids = tuple(
+ int(idx) for idx in context.split(_CHAR_WIN_ID_SEP) if idx)
+ info = ('WANTED:', expected_win_ids, expected_msg,
+ 'GOT:', win_ids, msg_sans_time)
+ assert expected_msg == msg_sans_time, info
+ assert expected_win_ids == win_ids, info
+
+ self._playbook.test_wrap(None, test_after, super().log, msg, **kwargs)
+
+ def cmd__prompt_enter(self) -> None:
+ def test_before(context: str, msg: str) -> None:
+ assert context.startswith(_CHAR_PROMPT)
+ assert msg == self.window.prompt.input_buffer[:]
+ self._playbook.test_wrap(test_before, None, super().cmd__prompt_enter)
return _PROMPT_TEMPLATE[:]
@property
- def _input_buffer(self) -> str:
+ def input_buffer(self) -> str:
+ 'Always only copy of actual input buffer.'
return self._input_buffer_unsafe[:]
- @_input_buffer.setter
- def _input_buffer(self, content) -> None:
+ @input_buffer.setter
+ def input_buffer(self, content) -> None:
self.taint()
- self._input_buffer_unsafe = content
+ self._input_buffer_unsafe = content[:]
def _draw(self) -> None:
prefix = self.prefix[:]
- content = self._input_buffer
- if self._cursor_x == len(self._input_buffer):
+ content = self.input_buffer
+ if self._cursor_x == len(self.input_buffer):
content += ' '
half_width = (self._sizes.x - len(prefix)) // 2
offset = 0
self._write(to_write[cursor_x_to_write + 1:])
def _archive_prompt(self) -> None:
- self.append(self._input_buffer)
+ self.append(self.input_buffer)
self._reset_buffer('')
def _scroll(self, up: bool = True) -> None:
super()._scroll(up)
if up and -(self._history_idx_neg) < len(self._history):
- if self._history_idx_neg == 0 and self._input_buffer:
+ if self._history_idx_neg == 0 and self.input_buffer:
self._archive_prompt()
self._history_idx_neg -= 1
self._history_idx_neg -= 1
self._reset_buffer('')
else:
self._reset_buffer(self._history[self._history_idx_neg])
- elif self._input_buffer:
+ elif self.input_buffer:
self._archive_prompt()
def insert(self, to_insert: str) -> None:
'Insert into prompt input buffer.'
self._cursor_x += len(to_insert)
- self._input_buffer = (self._input_buffer[:self._cursor_x - 1]
- + to_insert
- + self._input_buffer[self._cursor_x - 1:])
+ self.input_buffer = (self.input_buffer[:self._cursor_x - 1]
+ + to_insert
+ + self.input_buffer[self._cursor_x - 1:])
self._history_idx_neg = 0
def cmd__backspace(self) -> None:
'Truncate current content by one character, if possible.'
if self._cursor_x > 0:
self._cursor_x -= 1
- self._input_buffer = (self._input_buffer[:self._cursor_x]
- + self._input_buffer[self._cursor_x + 1:])
+ self.input_buffer = (self.input_buffer[:self._cursor_x]
+ + self.input_buffer[self._cursor_x + 1:])
self._history_idx_neg = 0
def cmd__move_cursor(self, direction: str) -> None:
'Move cursor one space into direction ("left" or "right") if possible.'
if direction == 'left' and self._cursor_x > 0:
self._cursor_x -= 1
- elif direction == 'right'\
- and self._cursor_x < len(self._input_buffer):
+ elif direction == 'right' and self._cursor_x < len(self.input_buffer):
self._cursor_x += 1
else:
return
self.taint()
def _reset_buffer(self, content: str) -> None:
- self._input_buffer = content
- self._cursor_x = len(self._input_buffer)
+ self.input_buffer = content
+ self._cursor_x = len(self.input_buffer)
def enter(self) -> str:
'Return current content while also clearing and then redrawing.'
- to_return = self._input_buffer[:]
+ to_return = self.input_buffer
if to_return:
- self._archive_prompt()
+ self._archive_prompt() # empties .input_buffer, thus use to_return
return to_return
# separated to serve as hook for subclass window selection
return [self.window]
- def log(self, msg: str, **kwargs) -> tuple[tuple[int, ...], str]:
+ def log(self, msg: str, **kwargs) -> Optional[tuple[tuple[int, ...], str]]:
'Write with timestamp, prefix to what window ._log_target_wins offers.'
prefix = kwargs.get('prefix', _LOG_PREFIX_DEFAULT)
now = str(datetime.now())