-'Terminal and TUI management.'
-# built-ins
-from abc import ABC, abstractmethod
-from base64 import b64decode
-from contextlib import contextmanager
-from dataclasses import dataclass
-from inspect import _empty as inspect_empty, signature, stack
-from signal import SIGWINCH, signal
-from typing import Callable, Generator, Iterator, NamedTuple, Optional
-from uuid import UUID
-# requirements.txt
-from blessed import Terminal as BlessedTerminal
-# ourselves
-from ircplom.events import (
- AffectiveEvent, Loop, PayloadMixin, QueueMixin, QuitEvent)
-from ircplom.irc_conn import (
- CHAT_GLOB, IrcConnSetup, IrcMessage, Client, ClientIdMixin,
- ClientQueueMixin, InitReconnectEvent, NewClientEvent, SendEvent)
-
-_MIN_HEIGHT = 4
-_MIN_WIDTH = 32
-
-_TIMEOUT_KEYPRESS_LOOP = 0.5
-_B64_PREFIX = 'b64:'
-_OSC52_PREFIX = b']52;c;'
-_PASTE_DELIMITER = '\007'
-
-_PROMPT_TEMPLATE = '> '
-_PROMPT_ELL_IN = '<…'
-_PROMPT_ELL_OUT = '…>'
-
-_CHAR_RESIZE = chr(12)
-_KEYBINDINGS = {
- 'KEY_BACKSPACE': ('window.prompt.backspace',),
- 'KEY_ENTER': ('prompt_enter',),
- 'KEY_LEFT': ('window.prompt.move_cursor', 'left'),
- 'KEY_RIGHT': ('window.prompt.move_cursor', 'right'),
- 'KEY_UP': ('window.prompt.scroll', 'up'),
- 'KEY_DOWN': ('window.prompt.scroll', 'down'),
- 'KEY_PGUP': ('window.log.scroll', 'up'),
- 'KEY_PGDOWN': ('window.log.scroll', 'down'),
- 'esc:91:49:59:51:68': ('window', 'left'),
- 'esc:91:49:59:51:67': ('window', 'right'),
- 'KEY_F1': ('window.paste',),
-}
-_CMD_SHORTCUTS = {
- 'disconnect': 'window.disconnect',
- 'nick': 'window.nick',
- 'privmsg': 'window.privmsg',
- 'reconnect': 'window.reconnect'
-}
-
-
-@dataclass
-class TuiEvent(AffectiveEvent):
- 'To affect TUI, and trigger flushed .draw_tainted on it.'
-
- def affect(self, target: 'Tui') -> None:
- target.window.draw_tainted()
- target.term.flush()
-
-
-@dataclass
-class _SetScreenEvent(TuiEvent):
-
- def affect(self, target: 'Tui') -> None:
- target.term.calc_geometry()
- for window in target.windows:
- window.set_geometry()
- super().affect(target)
-
-
-class _YX(NamedTuple):
- y: int
- x: int
-
-
-class _Widget(ABC):
-
- @abstractmethod
- def __init__(self, **kwargs) -> None:
- super().__init__(**kwargs)
- self.tainted = True
- self._drawable = False
-
- @abstractmethod
- def set_geometry(self, measurements: _YX) -> bool:
- 'Update widget\'s measurements, re-generate content where necessary.'
- self.tainted = True
- self._drawable = len([m for m in measurements if m < 0]) == 0
- return self._drawable
-
- @abstractmethod
- def draw(self) -> bool:
- 'Print widget\'s content in shape appropriate to set geometry.'
- if not self._drawable:
- return False
- self.tainted = False
- return True
-
-
-class _ScrollableWidget(_Widget, ABC):
- _history_idx: int
-
- def __init__(self, write: Callable[..., None], **kwargs) -> None:
- super().__init__(**kwargs)
- self._write = write
- self._history: list[str] = []
-
- def append(self, to_append: str) -> None:
- 'Append to scrollable history.'
- self._history += [to_append]
-
- @abstractmethod
- def _scroll(self, up=True) -> None:
- self.tainted = True
-
- def cmd__scroll(self, direction: str) -> None:
- 'Scroll through stored content/history.'
- self._scroll(up=direction == 'up')
-
-
-class _LogWidget(_ScrollableWidget):
- _view_size: _YX
- _y_pgscroll: int
-
- def __init__(self, wrap: Callable[[str], list[str]], **kwargs
- ) -> None:
- super().__init__(**kwargs)
- self._wrap = wrap
- self._wrapped_idx = self._history_idx = -1
- self._wrapped: list[tuple[Optional[int], str]] = []
-
- def _add_wrapped(self, idx_original, line) -> int:
- wrapped_lines = self._wrap(line)
- self._wrapped += [(idx_original, line) for line in wrapped_lines]
- return len(wrapped_lines)
-
- def set_geometry(self, measurements: _YX) -> bool:
- if not super().set_geometry(measurements):
- return False
- self._view_size = measurements
- self._y_pgscroll = self._view_size.y // 2
- self._wrapped.clear()
- self._wrapped += [(None, '')] * self._view_size.y
- if not self._history:
- return True
- for idx_history, line in enumerate(self._history):
- self._add_wrapped(idx_history, line)
- wrapped_lines_for_history_idx = [
- t for t in self._wrapped
- if t[0] == len(self._history) + self._history_idx]
- idx_their_last = self._wrapped.index(wrapped_lines_for_history_idx[-1])
- self._wrapped_idx = idx_their_last - len(self._wrapped)
- return True
-
- def append(self, to_append: str) -> None:
- super().append(to_append)
- self.tainted = True
- if self._history_idx < -1:
- self._history_idx -= 1
- if not self._drawable:
- return
- n_wrapped_lines = self._add_wrapped(len(self._history) - 1, to_append)
- if self._wrapped_idx < -1:
- self._wrapped_idx -= n_wrapped_lines
-
- def draw(self) -> bool:
- if not super().draw():
- return False
- start_idx = self._wrapped_idx - self._view_size.y + 1
- end_idx = self._wrapped_idx
- to_write = [t[1] for t in self._wrapped[start_idx:end_idx]]
- if self._wrapped_idx < -1:
- scroll_info = f'vvv [{(-1) * self._wrapped_idx}] '
- scroll_info += 'v' * (self._view_size.x - len(scroll_info))
- to_write += [scroll_info]
- else:
- to_write += [self._wrapped[self._wrapped_idx][1]]
- for i, line in enumerate(to_write):
- self._write(line, i)
- return True
-
- def _scroll(self, up: bool = True) -> None:
- super()._scroll(up)
- if not self._drawable:
- return
- if up:
- self._wrapped_idx = max(self._view_size.y + 1 - len(self._wrapped),
- self._wrapped_idx - self._y_pgscroll)
- else:
- self._wrapped_idx = min(-1,
- self._wrapped_idx + self._y_pgscroll)
- history_idx_to_wrapped_idx = self._wrapped[self._wrapped_idx][0]
- if history_idx_to_wrapped_idx is not None:
- self._history_idx = history_idx_to_wrapped_idx - len(self._history)
-
-
-class _PromptWidget(_ScrollableWidget):
- _y: int
- _width: int
- _history_idx: int = 0
- _input_buffer_unsafe: str
- _cursor_x: int
-
- def __init__(self, **kwargs) -> None:
- super().__init__(**kwargs)
- self.prefix = _PROMPT_TEMPLATE
- self._reset_buffer('')
-
- @property
- def _input_buffer(self) -> str:
- return self._input_buffer_unsafe[:]
-
- @_input_buffer.setter
- def _input_buffer(self, content) -> None:
- self.tainted = True
- self._input_buffer_unsafe = content
-
- def set_geometry(self, measurements: _YX) -> bool:
- if not super().set_geometry(measurements):
- return False
- self._y, self._width = measurements
- return True
-
- def draw(self) -> bool:
- if not super().draw():
- return False
- prefix = self.prefix[:]
- content = self._input_buffer
- if self._cursor_x == len(self._input_buffer):
- content += ' '
- half_width = (self._width - len(prefix)) // 2
- offset = 0
- if len(prefix) + len(content) > self._width\
- and self._cursor_x > half_width:
- prefix += _PROMPT_ELL_IN
- offset = min(len(prefix) + len(content) - self._width,
- self._cursor_x - half_width + len(_PROMPT_ELL_IN))
- cursor_x_to_write = len(prefix) + self._cursor_x - offset
- to_write = f'{prefix}{content[offset:]}'
- if len(to_write) > self._width:
- to_write = (to_write[:self._width - len(_PROMPT_ELL_OUT)]
- + _PROMPT_ELL_OUT)
- self._write(to_write[:cursor_x_to_write], self._y, padding=False)
- self._write(to_write[cursor_x_to_write], attribute='reverse',
- padding=False)
- self._write(to_write[cursor_x_to_write + 1:])
- return True
-
- def _archive_prompt(self) -> None:
- self.append(self._input_buffer)
- self._reset_buffer('')
-
- def _scroll(self, up: bool = True) -> None:
- super()._scroll(up)
- if up and -(self._history_idx) < len(self._history):
- if self._history_idx == 0 and self._input_buffer:
- self._archive_prompt()
- self._history_idx -= 1
- self._history_idx -= 1
- self._reset_buffer(self._history[self._history_idx])
- elif not up:
- if self._history_idx < 0:
- self._history_idx += 1
- if self._history_idx == 0:
- self._reset_buffer('')
- else:
- self._reset_buffer(self._history[self._history_idx])
- elif self._input_buffer:
- self._archive_prompt()
-
- def insert(self, to_insert: str) -> None:
- 'Insert into prompt input buffer.'
- self._cursor_x += len(to_insert)
- self._input_buffer = (self._input_buffer[:self._cursor_x - 1]
- + to_insert
- + self._input_buffer[self._cursor_x - 1:])
- self._history_idx = 0
-
- def cmd__backspace(self) -> None:
- 'Truncate current content by one character, if possible.'
- if self._cursor_x > 0:
- self._cursor_x -= 1
- self._input_buffer = (self._input_buffer[:self._cursor_x]
- + self._input_buffer[self._cursor_x + 1:])
- self._history_idx = 0
-
- def cmd__move_cursor(self, direction: str) -> None:
- 'Move cursor one space into direction ("left" or "right") if possible.'
- if direction == 'left' and self._cursor_x > 0:
- self._cursor_x -= 1
- elif direction == 'right'\
- and self._cursor_x < len(self._input_buffer):
- self._cursor_x += 1
- else:
- return
- self.tainted = True
-
- def _reset_buffer(self, content: str) -> None:
- self._input_buffer = content
- self._cursor_x = len(self._input_buffer)
-
- def enter(self) -> str:
- 'Return current content while also clearing and then redrawing.'
- to_return = self._input_buffer[:]
- if to_return:
- self._archive_prompt()
- return to_return
-
-
-class _Window(_Widget):
- _y_status: int
- prompt: _PromptWidget
-
- def __init__(self, idx: int, term: 'Terminal', **kwargs) -> None:
- super().__init__(**kwargs)
- self.idx = idx
- self._term = term
- self.log = _LogWidget(wrap=self._term.wrap, write=self._term.write)
- self.prompt = self.__annotations__['prompt'](write=self._term.write)
- if hasattr(self._term, 'size'):
- self.set_geometry()
-
- def set_geometry(self, _=None) -> bool:
- assert _ is None
- if self._term.size.y < _MIN_HEIGHT or self._term.size.x < _MIN_WIDTH:
- bad_yx = _YX(-1, -1)
- super().set_geometry(bad_yx)
- self.log.set_geometry(bad_yx)
- self.prompt.set_geometry(bad_yx)
- return False
- super().set_geometry(_YX(0, 0))
- self._y_status = self._term.size.y - 2
- self.log.set_geometry(_YX(self._y_status, self._term.size.x))
- self.prompt.set_geometry(_YX(self._term.size.y - 1, self._term.size.x))
- return True
-
- def draw(self) -> bool:
- self._term.clear()
- if not super().draw():
- if self._term.size.x > 0:
- lines = ['']
- for i, c in enumerate('screen too small'):
- if i > 0 and 0 == i % self._term.size.x:
- lines += ['']
- lines[-1] += c
- for y, line in enumerate(lines):
- self._term.write(line, y)
- return False
- idx_box = f'[{self.idx}]'
- status_line = idx_box + '=' * (self._term.size.x - len(idx_box))
- self.log.draw()
- self._term.write(status_line, self._y_status)
- self.prompt.draw()
- return True
-
- def cmd__paste(self) -> None:
- 'Write OSC 52 ? sequence to get encoded clipboard paste into stdin.'
- self._term.write(f'\033{_OSC52_PREFIX.decode()}?{_PASTE_DELIMITER}',
- self._y_status)
- self.tainted = True
-
- def draw_tainted(self) -> None:
- 'Draw tainted parts of self.'
- if self.tainted:
- self.draw()
- return
- for widget in [w for w in (self.log, self.prompt) if w.tainted]:
- widget.draw()
-
-
-@dataclass
-class _KeyboardEvent(TuiEvent, PayloadMixin):
- payload: str
-
- def affect(self, target: 'Tui') -> None:
- if self.payload[0] == _CHAR_RESIZE:
- _SetScreenEvent().affect(target)
- return
- if self.payload in _KEYBINDINGS:
- cmd_data = _KEYBINDINGS[self.payload]
- cmd = target.cmd_name_to_cmd(cmd_data[0])
- if cmd:
- cmd(*cmd_data[1:])
- elif self.payload.startswith(_B64_PREFIX):
- encoded = self.payload[len(_B64_PREFIX):]
- to_paste = ''
- for i, c in enumerate(b64decode(encoded).decode('utf-8')):
- if i > 512:
- break
- if c.isprintable():
- to_paste += c
- elif c.isspace():
- to_paste += ' '
- else:
- to_paste += '#'
- target.window.prompt.insert(to_paste)
- elif len(self.payload) == 1:
- target.window.prompt.insert(self.payload)
- else:
- target.log(f'# ALERT: unknown keyboard input: {self.payload}')
- super().affect(target)
-
-
-class Tui(QueueMixin):
- 'Base for graphical user interface elements.'
-
- def __init__(self, term: 'Terminal', **kwargs) -> None:
- super().__init__(**kwargs)
- self.term = term
- self._window_idx = 0
- self.windows = [_Window(idx=self._window_idx, term=self.term)]
- self._put(_SetScreenEvent())
-
- def cmd_name_to_cmd(self, cmd_name: str) -> Optional[Callable]:
- 'Map cmd_name to executable TUI element method.'
- cmd_name = _CMD_SHORTCUTS.get(cmd_name, cmd_name)
- cmd_parent = self
- while True:
- cmd_name_toks = cmd_name.split('.', maxsplit=1)
- if len(cmd_name_toks) == 1:
- break
- if not hasattr(cmd_parent, cmd_name_toks[0]):
- return None
- cmd_parent = getattr(cmd_parent, cmd_name_toks[0])
- cmd_name = cmd_name_toks[1]
- cmd_name = f'cmd__{cmd_name}'
- if not hasattr(cmd_parent, cmd_name):
- return None
- return getattr(cmd_parent, cmd_name)
-
- @property
- def window(self) -> _Window:
- 'Currently selected _Window.'
- return self.windows[self._window_idx]
-
- def _new_client_window(self, client_id: UUID, chat: str = ''
- ) -> '_ClientWindow':
- new_idx = len(self.windows)
- win = _ClientWindow(idx=new_idx, term=self.term, q_out=self.q_out,
- client_id=client_id, chat=chat)
- self.windows += [win]
- self._switch_window(new_idx)
- return win
-
- def client_wins(self, client_id: UUID) -> list['_ClientWindow']:
- 'All _ClientWindows matching client_id; if none, create one.'
- wins = [win for win in self.windows
- if isinstance(win, _ClientWindow)
- and win.client_id == client_id] # pylint: disable=no-member
- if not wins:
- wins = [self._new_client_window(client_id=client_id)]
- return wins
-
- def client_win(self, client_id: UUID, chat: str = '') -> '_ClientWindow':
- '''That _ClientWindow matching client_id and chat; create if none.
-
- In case of creation, copy prompt prefix from client's first window.
- '''
- client_wins = self.client_wins(client_id)
- candidates = [win for win in client_wins if win.chat == chat]
- if candidates:
- return candidates[0]
- win = self._new_client_window(client_id=client_id, chat=chat)
- if client_wins:
- win.prompt.prefix = client_wins[0].prompt.prefix
- return win
-
- def log(self, msg: str) -> None:
- 'Post msg to active window\'s log.'
- self.window.log.append(msg)
-
- def _switch_window(self, idx: int) -> None:
- self._window_idx = idx
- self.window.draw()
-
- def cmd__connect(self, hostname: str, nickname: str, realname: str
- ) -> None:
- 'Create Client and pass it via NewClientEvent.'
- self._put(NewClientEvent(
- _ClientKnowingTui(
- q_out=self.q_out,
- conn_setup=IrcConnSetup(hostname=hostname, nickname=nickname,
- realname=realname))))
-
- def cmd__prompt_enter(self) -> None:
- 'Get prompt content from .window.prompt.enter, parse to & run command.'
- to_parse = self.window.prompt.enter()
- if not to_parse:
- return
- alert: Optional[str] = None
- if to_parse[0:1] == '/':
- toks = to_parse[1:].split(maxsplit=1)
- alert = f'{toks[0]} unknown'
- cmd = self.cmd_name_to_cmd(toks[0])
- if cmd and cmd.__name__ != stack()[0].function:
- params = signature(cmd).parameters
- n_args_max = len(params)
- n_args_min = len([p for p in params.values()
- if p.default == inspect_empty])
- alert = f'{cmd.__name__} needs between {n_args_min} and '\
- f'{n_args_max} args'
- if len(toks) == 1 and not n_args_min:
- alert = cmd()
- elif len(toks) > 1 and params\
- and n_args_min <= len(toks[1].split()):
- args = []
- while len(toks) > 1 and n_args_max:
- toks = toks[1].split(maxsplit=1)
- args += [toks[0]]
- n_args_max -= 1
- alert = cmd(*args)
- else:
- alert = 'not prefixed by /'
- if alert:
- self.log(f'# ALERT: invalid prompt command: {alert}')
-
- def cmd__quit(self) -> None:
- 'Trigger program exit.'
- self._put(QuitEvent())
-
- def cmd__window(self, towards: str) -> Optional[str]:
- 'Switch window selection.'
- n_windows = len(self.windows)
- if n_windows < 2:
- return 'no alternate window to move into'
- if towards in {'left', 'right'}:
- multiplier = (+1) if towards == 'right' else (-1)
- window_idx = self._window_idx + multiplier
- if not 0 <= window_idx < n_windows:
- window_idx -= multiplier * n_windows
- elif not towards.isdigit():
- return f'neither "left"/"right" nor integer: {towards}'
- else:
- window_idx = int(towards)
- if not 0 <= window_idx < n_windows:
- return f'unavailable window idx: {window_idx}'
- self._switch_window(window_idx)
- return None
-
-
-class Terminal(QueueMixin):
- 'Abstraction of terminal interface.'
- size: _YX
- _cursor_yx_: _YX
-
- def __init__(self, **kwargs) -> None:
- super().__init__(**kwargs)
- self._blessed = BlessedTerminal()
- self._cursor_yx = _YX(0, 0)
-
- @contextmanager
- def setup(self) -> Generator:
- 'Combine multiple contexts into one and run keypress loop.'
- signal(SIGWINCH, lambda *_: self._put(_SetScreenEvent()))
- self.clear()
- with (self._blessed.raw(),
- self._blessed.fullscreen(),
- self._blessed.hidden_cursor(),
- Loop(iterator=self._get_keypresses(), q_out=self.q_out)):
- yield self
-
- @property
- def _cursor_yx(self) -> _YX:
- return self._cursor_yx_
-
- @_cursor_yx.setter
- def _cursor_yx(self, yx: _YX) -> None:
- print(self._blessed.move_yx(yx.y, yx.x), end='')
- self._cursor_yx_ = yx
-
- def calc_geometry(self) -> None:
- '(Re-)calculate .size..'
- self.size = _YX(self._blessed.height, self._blessed.width)
-
- def clear(self) -> None:
- 'Clear terminal.'
- print(self._blessed.clear, end='')
-
- def flush(self) -> None:
- 'Flush terminal.'
- print('', end='', flush=True)
-
- def wrap(self, line: str) -> list[str]:
- 'Wrap line to list of lines fitting into terminal width.'
- return self._blessed.wrap(line, width=self.size.x,
- subsequent_indent=' '*4)
-
- def write(self,
- msg: str = '',
- start_y: Optional[int] = None,
- attribute: Optional[str] = None,
- padding: bool = True
- ) -> None:
- 'Print to terminal, with position, padding to line end, attributes.'
- if start_y is not None:
- self._cursor_yx = _YX(start_y, 0)
- # ._blessed.length can slow down things notably: only use where needed!
- end_x = self._cursor_yx.x + (len(msg) if msg.isascii()
- else self._blessed.length(msg))
- len_padding = self.size.x - end_x
- if len_padding < 0:
- msg = self._blessed.truncate(msg, self.size.x - self._cursor_yx.x)
- elif padding:
- msg += ' ' * len_padding
- end_x = self.size.x
- if attribute:
- msg = getattr(self._blessed, attribute)(msg)
- print(msg, end='')
- self._cursor_yx = _YX(self._cursor_yx.y, end_x)
-
- def _get_keypresses(self) -> Iterator[Optional[_KeyboardEvent]]:
- '''Loop through keypresses from terminal, expand blessed's handling.
-
- Explicitly collect KEY_ESCAPE-modified key sequences, and recognize
- OSC52-prefixed pastables to return the respective base64 code,
- prefixed with _B64_PREFIX.
- '''
- while True:
- to_yield = ''
- ks = self._blessed.inkey(
- timeout=_TIMEOUT_KEYPRESS_LOOP, # how long until yield None,
- esc_delay=0) # incl. until thread dies
- if ks.name != 'KEY_ESCAPE':
- to_yield = f'{ks.name if ks.name else ks}'
- else:
- chars = b''
- while (new_chars := self._blessed.inkey(timeout=0, esc_delay=0
- ).encode('utf-8')):
- chars += new_chars
- len_prefix = len(_OSC52_PREFIX)
- if chars[:len_prefix] == _OSC52_PREFIX:
- to_yield = _B64_PREFIX[:]
- # sometimes, prev .inkey got some or all (including paste
- # delimiter) of the paste code (maybe even more), so first
- # harvest potential remains of chars post prefix …
- caught_delimiter = False
- post_prefix_str = chars[len_prefix:].decode('utf-8')
- for idx, c in enumerate(post_prefix_str):
- if c == _PASTE_DELIMITER:
- caught_delimiter = True
- if (remains := post_prefix_str[idx + 1:]):
- self._blessed.ungetch(remains)
- break
- to_yield += c
- # … before .getch() further until expected delimiter found
- if not caught_delimiter:
- while (c := self._blessed.getch()) != _PASTE_DELIMITER:
- to_yield += c
- else:
- to_yield = 'esc:' + ':'.join([str(int(b)) for b in chars])
- yield _KeyboardEvent(to_yield) if to_yield else None
-
-
-class _ClientWindow(_Window, ClientQueueMixin):
- client_id_name = 'client_id'
-
- def __init__(self, client_id: UUID, chat: str = '', **kwargs) -> None:
- self.client_id = client_id
- self.chat = chat
- super().__init__(**kwargs)
-
- def cmd__disconnect(self, quit_msg: str = 'ircplom says bye') -> None:
- 'Send QUIT command to server.'
- self._cput(SendEvent,
- payload=IrcMessage(verb='QUIT', params=(quit_msg,)))
-
- def cmd__reconnect(self) -> None:
- 'Attempt reconnection.'
- self._cput(InitReconnectEvent)
-
- def cmd__nick(self, new_nick: str) -> None:
- 'Attempt nickname change.'
- self._cput(SendEvent,
- payload=IrcMessage(verb='NICK', params=(new_nick,)))
-
- def cmd__privmsg(self, target: str, msg: str) -> None:
- 'Send chat message msg to target.'
- self._cput(SendEvent, chat=target,
- payload=IrcMessage(verb='PRIVMSG', params=(target, msg)))
-
-
-@dataclass
-class _ClientWindowEvent(TuiEvent, ClientIdMixin):
- chat: str = ''
-
-
-@dataclass
-class _ClientLogEvent(_ClientWindowEvent, PayloadMixin):
- payload: str
-
- def affect(self, target: Tui) -> None:
- if self.chat == CHAT_GLOB:
- for win in target.client_wins(self.client_id):
- win.log.append(self.payload)
- else:
- target.client_win(self.client_id, self.chat
- ).log.append(self.payload)
- super().affect(target)
-
-
-@dataclass
-class _ClientPromptEvent(_ClientWindowEvent, PayloadMixin):
- payload: tuple[str, str]
-
- def affect(self, target: Tui) -> None:
- new_prefix = ((' ' if self.payload[0] else '?')
- + f'{self.payload[1]}{_PROMPT_TEMPLATE}')
- for win in target.client_wins(self.client_id):
- prompt = win.prompt
- prompt.prefix = new_prefix
- prompt.tainted = True
- super().affect(target)
-
-
-class _ClientKnowingTui(Client):
-
- def log(self, msg: str, chat: str = '') -> None:
- self._cput(_ClientLogEvent, chat=chat, payload=msg)
-
- def update_login(self, nick_confirmed: bool, nickname: str = '') -> None:
- super().update_login(nick_confirmed, nickname)
- self._cput(_ClientPromptEvent, payload=(self.nick_confirmed,
- self.conn_setup.nickname))