+'Terminal and TUI management.'
+# built-ins
+from abc import ABC, abstractmethod
+from base64 import b64decode
+from contextlib import contextmanager
+from getpass import getuser as getusername
+from inspect import _empty as inspect_empty, signature, stack
+from signal import SIGWINCH, signal
+from typing import Any, Callable, Generator, Iterator, NamedTuple, Optional
+# requirements.txt
+from blessed import Terminal as BlessedTerminal
+# ourselves
+from ircplom.events import Event, EventType, EventQueue, Loop
+from ircplom.irc_conn import IrcMessage, LoginNames, TIMEOUT_LOOP
+
+
+_B64_PREFIX = 'b64:'
+_OSC52_PREFIX = ']52;c;'
+_PASTE_DELIMITER = '\007'
+
+_PROMPT_TEMPLATE = '> '
+_PROMPT_ELL_IN = '<…'
+_PROMPT_ELL_OUT = '…>'
+
+_KEYBINDINGS = {
+ 'KEY_BACKSPACE': ('window.prompt.backspace',),
+ 'KEY_ENTER': ('prompt_enter',),
+ 'KEY_LEFT': ('window.prompt.move_cursor', 'left'),
+ 'KEY_RIGHT': ('window.prompt.move_cursor', 'right'),
+ 'KEY_UP': ('window.prompt.scroll', 'up'),
+ 'KEY_DOWN': ('window.prompt.scroll', 'down'),
+ 'KEY_PGUP': ('window.log.scroll', 'up'),
+ 'KEY_PGDOWN': ('window.log.scroll', 'down'),
+ '[91, 49, 59, 51, 68]': ('window', 'left'),
+ '[91, 49, 59, 51, 67]': ('window', 'right'),
+ 'KEY_F1': ('window.paste',),
+}
+CMD_SHORTCUTS = {
+ 'disconnect': 'window.disconnect',
+ 'reconnect': 'window.reconnect'
+}
+
+
+class _YX(NamedTuple):
+ '2-dimensional coordinate.'
+ y: int
+ x: int
+
+
+class _Widget(ABC):
+ 'Defines most basic TUI object API.'
+
+ @abstractmethod
+ def set_geometry(self, measurements: _YX) -> None:
+ 'Update widget\'s measurements, re-generate content where necessary.'
+
+ @abstractmethod
+ def draw(self) -> None:
+ 'Print widget\'s content in shape appropriate to set geometry.'
+
+
+class _ScrollableWidget(_Widget, ABC):
+ 'Defines some API shared between _PromptWidget and _LogWidget.'
+ _history_idx: int
+
+ def __init__(self, write: Callable[..., None], *args, **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+ self._write = write
+ self._history: list[str] = []
+
+ @abstractmethod
+ def append(self, to_append: str) -> None:
+ 'Append to widget content.'
+
+ @abstractmethod
+ def _scroll(self, up=True) -> None:
+ pass
+
+ def cmd__scroll(self, direction: str) -> None:
+ 'Scroll through stored content/history.'
+ self._scroll(up=direction == 'up')
+ self.draw()
+
+
+class _LogWidget(_ScrollableWidget):
+ 'Collects line-shaped messages, scrolls and wraps them for display.'
+ _view_size: _YX
+ _y_pgscroll: int
+
+ def __init__(self, wrap: Callable[[str], list[str]], *args, **kwargs
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self._wrap = wrap
+ self._wrapped_idx = self._history_idx = -1
+ self._wrapped: list[tuple[Optional[int], str]] = []
+
+ def _add_wrapped(self, idx_original, line) -> int:
+ wrapped_lines = self._wrap(line)
+ self._wrapped += [(idx_original, line) for line in wrapped_lines]
+ return len(wrapped_lines)
+
+ def set_geometry(self, measurements: _YX) -> None:
+ self._view_size = measurements
+ self._y_pgscroll = self._view_size.y // 2
+ self._wrapped.clear()
+ self._wrapped += [(None, '')] * self._view_size.y
+ if not self._history:
+ return
+ for idx_history, line in enumerate(self._history):
+ self._add_wrapped(idx_history, line)
+ wrapped_lines_for_history_idx = [
+ t for t in self._wrapped
+ if t[0] == len(self._history) + self._history_idx]
+ idx_their_last = self._wrapped.index(wrapped_lines_for_history_idx[-1])
+ self._wrapped_idx = idx_their_last - len(self._wrapped)
+
+ def append(self, to_append: str) -> None:
+ self._history += [to_append]
+ n_wrapped_lines = self._add_wrapped(len(self._history) - 1, to_append)
+ if self._wrapped_idx < -1:
+ self._history_idx -= 1
+ self._wrapped_idx -= n_wrapped_lines
+
+ def draw(self) -> None:
+ start_idx = self._wrapped_idx - self._view_size.y + 1
+ end_idx = self._wrapped_idx
+ to_write = [t[1] for t in self._wrapped[start_idx:end_idx]]
+ if self._wrapped_idx < -1:
+ scroll_info = f'vvv [{(-1) * self._wrapped_idx}] '
+ scroll_info += 'v' * (self._view_size.x - len(scroll_info))
+ to_write += [scroll_info]
+ else:
+ to_write += [self._wrapped[self._wrapped_idx][1]]
+ for i, line in enumerate(to_write):
+ self._write(line, i)
+
+ def _scroll(self, up: bool = True) -> None:
+ if up:
+ self._wrapped_idx = max(self._view_size.y + 1 - len(self._wrapped),
+ self._wrapped_idx - self._y_pgscroll)
+ else:
+ self._wrapped_idx = min(-1,
+ self._wrapped_idx + self._y_pgscroll)
+ history_idx_to_wrapped_idx = self._wrapped[self._wrapped_idx][0]
+ if history_idx_to_wrapped_idx is not None:
+ self._history_idx = history_idx_to_wrapped_idx - len(self._history)
+
+
+class _PromptWidget(_ScrollableWidget):
+ 'Keyboard-controlled command input field.'
+ _y: int
+ _width: int
+ _prompt: str = _PROMPT_TEMPLATE
+ _history_idx = 0
+ _input_buffer: str
+ _cursor_x: int
+
+ def __init__(self, *args, **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+ self._reset_buffer('')
+
+ def set_geometry(self, measurements: _YX) -> None:
+ self._y, self._width = measurements
+
+ def append(self, to_append: str) -> None:
+ self._cursor_x += len(to_append)
+ self._input_buffer = (self._input_buffer[:self._cursor_x - 1]
+ + to_append
+ + self._input_buffer[self._cursor_x - 1:])
+ self._history_idx = 0
+ self.draw()
+
+ def draw(self) -> None:
+ prefix = self._prompt[:]
+ content = self._input_buffer[:]
+ if self._cursor_x == len(self._input_buffer):
+ content += ' '
+ half_width = (self._width - len(prefix)) // 2
+ offset = 0
+ if len(prefix) + len(content) > self._width\
+ and self._cursor_x > half_width:
+ prefix += _PROMPT_ELL_IN
+ offset = min(len(prefix) + len(content) - self._width,
+ self._cursor_x - half_width + len(_PROMPT_ELL_IN))
+ cursor_x_to_write = len(prefix) + self._cursor_x - offset
+ to_write = f'{prefix}{content[offset:]}'
+ if len(to_write) > self._width:
+ to_write = (to_write[:self._width - len(_PROMPT_ELL_OUT)]
+ + _PROMPT_ELL_OUT)
+ self._write(to_write[:cursor_x_to_write], self._y, padding=False)
+ self._write(to_write[cursor_x_to_write], attribute='reverse',
+ padding=False)
+ self._write(to_write[cursor_x_to_write + 1:])
+
+ def _scroll(self, up: bool = True) -> None:
+ if up and -(self._history_idx) < len(self._history):
+ if self._history_idx == 0 and self._input_buffer:
+ self._history += [self._input_buffer[:]]
+ self._reset_buffer('')
+ self._history_idx -= 1
+ self._history_idx -= 1
+ self._reset_buffer(self._history[self._history_idx])
+ elif not up:
+ if self._history_idx < 0:
+ self._history_idx += 1
+ if self._history_idx == 0:
+ self._reset_buffer('')
+ else:
+ self._reset_buffer(self._history[self._history_idx])
+ elif self._input_buffer:
+ self._history += [self._input_buffer[:]]
+ self._reset_buffer('')
+
+ def cmd__backspace(self) -> None:
+ 'Truncate current content by one character, if possible.'
+ if self._cursor_x > 0:
+ self._cursor_x -= 1
+ self._input_buffer = (self._input_buffer[:self._cursor_x]
+ + self._input_buffer[self._cursor_x + 1:])
+ self._history_idx = 0
+ self.draw()
+
+ def cmd__move_cursor(self, direction: str) -> None:
+ 'Move cursor one space into direction ("left" or "right") if possible.'
+ if direction == 'left' and self._cursor_x > 0:
+ self._cursor_x -= 1
+ elif direction == 'right'\
+ and self._cursor_x <= len(self._input_buffer):
+ self._cursor_x += 1
+ else:
+ return
+ self.draw()
+
+ def _reset_buffer(self, content: str) -> None:
+ self._input_buffer = content
+ self._cursor_x = len(self._input_buffer)
+
+ def enter(self) -> str:
+ 'Return current content while also clearing and then redrawing.'
+ to_return = self._input_buffer[:]
+ if to_return:
+ self._history += [to_return]
+ self._reset_buffer('')
+ self.draw()
+ return to_return
+
+
+class _ConnectionPromptWidget(_PromptWidget):
+ 'PromptWidget with attributes, methods for dealing with an IrcConnection.'
+ _nickname: str = ''
+
+ def update_prompt(self,
+ nick_confirmed=False,
+ nick: Optional[str] = None
+ ) -> None:
+ 'Update nickname-relevant knowledge to go into prompt string.'
+ self._prompt = ''
+ if nick:
+ self._nickname = nick
+ if self._nickname:
+ self._prompt += ' ' if nick_confirmed else '?'
+ self._prompt += self._nickname
+ self._prompt += _PROMPT_TEMPLATE
+
+
+class _Window(_Widget):
+ 'Collects a log and a prompt meant for the same content stream.'
+ _y_status: int
+ prompt: _PromptWidget
+
+ def __init__(self, idx: int, term: 'Terminal') -> None:
+ self.idx = idx
+ self._term = term
+ self.log = _LogWidget(self._term.wrap, self._term.write)
+ self.prompt = self.__annotations__['prompt'](self._term.write)
+ if hasattr(self._term, 'size'):
+ self.set_geometry()
+
+ def set_geometry(self, _=None) -> None:
+ assert _ is None
+ self._y_status = self._term.size.y - 2
+ self.log.set_geometry(_YX(self._y_status, self._term.size.x))
+ self.prompt.set_geometry(_YX(self._term.size.y - 1, self._term.size.x))
+
+ def draw(self) -> None:
+ idx_box = f'[{self.idx}]'
+ status_line = idx_box + '=' * (self._term.size.x - len(idx_box))
+ self._term.clear()
+ self.log.draw()
+ self._term.write(status_line, self._y_status)
+ self.prompt.draw()
+
+ def cmd__paste(self) -> None:
+ 'Write OSC 52 ? sequence to get encoded clipboard paste into stdin.'
+ self._term.write(f'\033{_OSC52_PREFIX}?{_PASTE_DELIMITER}',
+ self._y_status)
+ self.draw()
+
+
+class _ConnectionWindow(_Window):
+ 'Window with attributes and methods for dealing with an IrcConnection.'
+ prompt: _ConnectionPromptWidget
+
+ def __init__(self,
+ broadcast: Callable[[EventType, Any], None],
+ conn_idx: int,
+ *args, **kwargs
+ ) -> None:
+ self._broadcast = broadcast
+ self._conn_idx = conn_idx
+ super().__init__(*args, **kwargs)
+
+ def cmd__disconnect(self, quit_msg: str = 'ircplom says bye') -> None:
+ 'Send QUIT command to server.'
+ self._broadcast(EventType.SEND,
+ (self._conn_idx, IrcMessage('QUIT', (quit_msg, ))))
+
+ def cmd__reconnect(self) -> None:
+ 'Attempt reconnection.'
+ self._broadcast(EventType.INIT_RECONNECT, (self._conn_idx,))
+
+
+class _KeyboardLoop(Loop):
+ 'Loop receiving and translating keyboard events towards main loop.'
+
+ def process_bonus(self, yielded: str) -> None:
+ if yielded.startswith(_B64_PREFIX):
+ encoded = yielded[len(_B64_PREFIX):]
+ to_paste = ''
+ for i, c in enumerate(b64decode(encoded).decode('utf-8')):
+ if i > 512:
+ break
+ if c.isprintable():
+ to_paste += c
+ elif c.isspace():
+ to_paste += ' '
+ else:
+ to_paste += '#'
+ self.broadcast(EventType.PROMPT_ADD, to_paste)
+ elif yielded in _KEYBINDINGS:
+ self.broadcast(EventType.KEYBINDING, _KEYBINDINGS[yielded])
+ elif len(yielded) == 1:
+ self.broadcast(EventType.PROMPT_ADD, yielded)
+ else:
+ self.broadcast(EventType.ALERT,
+ f'unknown keyboard input: {yielded}')
+
+
+class _TuiLoop(Loop):
+ 'Loop for drawing/updating TUI.'
+
+ def __init__(self, term: 'Terminal', *args, **kwargs) -> None:
+ self._term = term
+ self._windows = [_Window(0, self._term)]
+ self._window_idx = 0
+ self._conn_windows: list[_ConnectionWindow] = []
+ super().__init__(*args, **kwargs)
+ self.put(Event(EventType.SET_SCREEN))
+
+ def _cmd_name_to_cmd(self, cmd_name: str) -> Optional[Callable]:
+ cmd_name = CMD_SHORTCUTS.get(cmd_name, cmd_name)
+ cmd_parent = self
+ while True:
+ cmd_name_toks = cmd_name.split('.', maxsplit=1)
+ if len(cmd_name_toks) == 1:
+ break
+ if not hasattr(cmd_parent, cmd_name_toks[0]):
+ return None
+ cmd_parent = getattr(cmd_parent, cmd_name_toks[0])
+ cmd_name = cmd_name_toks[1]
+ cmd_name = f'cmd__{cmd_name}'
+ if not hasattr(cmd_parent, cmd_name):
+ return None
+ return getattr(cmd_parent, cmd_name)
+
+ def process_main(self, event: Event) -> bool:
+ if not super().process_main(event):
+ return False
+ if event.type_ == EventType.SET_SCREEN:
+ self._term.calc_geometry()
+ for window in self._windows:
+ window.set_geometry()
+ self.window.draw()
+ elif event.type_ == EventType.CONN_WINDOW:
+ conn_win = _ConnectionWindow(broadcast=self.broadcast,
+ conn_idx=event.payload[0],
+ idx=len(self._windows),
+ term=self._term)
+ conn_win.prompt.update_prompt(nick_confirmed=False,
+ nick=event.payload[1])
+ self._windows += [conn_win]
+ self._conn_windows += [conn_win]
+ self._switch_window(conn_win.idx)
+ elif event.type_ == EventType.ALERT:
+ self.window.log.append(f'ALERT {event.payload}')
+ self.window.log.draw()
+ elif event.type_ in {EventType.RECV, EventType.SEND,
+ EventType.CONN_ALERT}:
+ conn_win = self._conn_windows[event.payload[0]]
+ if event.type_ == EventType.CONN_ALERT:
+ msg = f'ALERT {event.payload[1]}'
+ else:
+ msg = (('<-' if event.type_ == EventType.RECV else '->')
+ + event.payload[1].raw)
+ conn_win.log.append(msg)
+ if conn_win == self.window:
+ self.window.log.draw()
+ elif event.type_ == EventType.NICK_SET:
+ conn_win = self._conn_windows[event.payload[0]]
+ conn_win.prompt.update_prompt(nick_confirmed=True,
+ nick=event.payload[1])
+ if conn_win == self.window:
+ self.window.prompt.draw()
+ elif event.type_ == EventType.DISCONNECTED:
+ conn_win = self._conn_windows[event.payload[0]]
+ conn_win.prompt.update_prompt(nick_confirmed=False)
+ if conn_win == self.window:
+ self.window.prompt.draw()
+ elif event.type_ == EventType.KEYBINDING:
+ cmd = self._cmd_name_to_cmd(event.payload[0])
+ assert cmd is not None
+ cmd(*event.payload[1:])
+ elif event.type_ == EventType.PROMPT_ADD:
+ self.window.prompt.append(event.payload)
+ # elif event.type_ == EventType.DEBUG:
+ # from traceback import format_exception
+ # for line in '\n'.join(format_exception(event.payload)
+ # ).split('\n'):
+ # self.window.log.append(f'DEBUG {line}')
+ # self.window.log.draw()
+ else:
+ return True
+ self._term.flush()
+ return True
+
+ @property
+ def window(self) -> _Window:
+ 'Currently selected Window.'
+ return self._windows[self._window_idx]
+
+ def _switch_window(self, idx: int) -> None:
+ self._window_idx = idx
+ self.window.draw()
+
+ def cmd__connect(self, hostname: str, nickname: str, realname: str
+ ) -> None:
+ 'Send INIT_CONNECT command to main loop.'
+ login = LoginNames(user=getusername(), nick=nickname, real=realname)
+ self.broadcast(EventType.INIT_CONNECT, (hostname, login))
+
+ def cmd__prompt_enter(self) -> None:
+ 'Get prompt content from .window.prompt.enter, parse to & run command.'
+ to_parse = self.window.prompt.enter()
+ if not to_parse:
+ return
+ alert: Optional[str] = None
+ if to_parse[0:1] == '/':
+ toks = to_parse[1:].split(maxsplit=1)
+ alert = f'{toks[0]} unknown'
+ cmd = self._cmd_name_to_cmd(toks[0])
+ if cmd and cmd.__name__ != stack()[0].function:
+ params = signature(cmd).parameters
+ n_args_max = len(params)
+ n_args_min = len([p for p in params.values()
+ if p.default == inspect_empty])
+ alert = f'{cmd.__name__} needs between {n_args_min} and '\
+ f'{n_args_max} args'
+ if len(toks) == 1 and not n_args_min:
+ alert = cmd()
+ elif len(toks) > 1 and params\
+ and n_args_min <= len(toks[1].split()):
+ args = []
+ while len(toks) > 1 and n_args_max:
+ toks = toks[1].split(maxsplit=1)
+ args += [toks[0]]
+ n_args_max -= 1
+ alert = cmd(*args)
+ else:
+ alert = 'not prefixed by /'
+ if alert:
+ self.broadcast(EventType.ALERT, f'invalid prompt command: {alert}')
+
+ def cmd__quit(self) -> None:
+ 'Send QUIT to all threads.'
+ self.broadcast(EventType.QUIT)
+
+ def cmd__window(self, towards: str) -> Optional[str]:
+ 'Switch window selection.'
+ n_windows = len(self._windows)
+ if n_windows < 2:
+ return 'no alternate window to move into'
+ if towards in {'left', 'right'}:
+ multiplier = (+1) if towards == 'right' else (-1)
+ window_idx = self._window_idx + multiplier
+ if not 0 <= window_idx < n_windows:
+ window_idx -= multiplier * n_windows
+ elif not towards.isdigit():
+ return f'neither "left"/"right" nor integer: {towards}'
+ else:
+ window_idx = int(towards)
+ if not 0 <= window_idx < n_windows:
+ return f'unavailable window idx: {window_idx}'
+ self._switch_window(window_idx)
+ return None
+
+
+class Terminal:
+ 'Abstraction of terminal interface.'
+ size: _YX
+ tui: _TuiLoop
+ _blessed: BlessedTerminal
+ _cursor_yx_: _YX
+
+ @contextmanager
+ def context(self, q_to_main: EventQueue) -> Generator:
+ 'Combine multiple contexts into one.'
+ signal(SIGWINCH, lambda *_: q_to_main.eput(EventType.SET_SCREEN))
+ self._blessed = BlessedTerminal()
+ with (self._blessed.raw(),
+ self._blessed.fullscreen(),
+ self._blessed.hidden_cursor(),
+ _KeyboardLoop(q_to_main, self.get_keypresses())):
+ self._cursor_yx = _YX(0, 0)
+ with _TuiLoop(self, q_to_main) as self.tui:
+ yield self
+
+ @property
+ def _cursor_yx(self) -> _YX:
+ return self._cursor_yx_
+
+ @_cursor_yx.setter
+ def _cursor_yx(self, yx: _YX) -> None:
+ print(self._blessed.move_yx(yx.y, yx.x), end='')
+ self._cursor_yx_ = yx
+
+ def calc_geometry(self) -> None:
+ '(Re-)calculate .size..'
+ self.size = _YX(self._blessed.height, self._blessed.width)
+
+ def clear(self) -> None:
+ 'Clear terminal.'
+ print(self._blessed.clear, end='')
+
+ def flush(self) -> None:
+ 'Flush terminal.'
+ print('', end='', flush=True)
+
+ def wrap(self, line: str) -> list[str]:
+ 'Wrap line to list of lines fitting into terminal width.'
+ return self._blessed.wrap(line, width=self.size.x,
+ subsequent_indent=' '*4)
+
+ def write(self,
+ msg: str = '',
+ start_y: Optional[int] = None,
+ attribute: Optional[str] = None,
+ padding: bool = True
+ ) -> None:
+ 'Print to terminal, with position, padding to line end, attributes.'
+ if start_y:
+ self._cursor_yx = _YX(start_y, 0)
+ # ._blessed.length can slow down things notably: only use where needed!
+ end_x = self._cursor_yx.x + (len(msg) if msg.isascii()
+ else self._blessed.length(msg))
+ len_padding = self.size.x - end_x
+ if len_padding < 0:
+ msg = self._blessed.truncate(msg, self.size.x - self._cursor_yx.x)
+ elif padding:
+ msg += ' ' * len_padding
+ end_x = self.size.x
+ if attribute:
+ msg = getattr(self._blessed, attribute)(msg)
+ print(msg, end='')
+ self._cursor_yx = _YX(self._cursor_yx.y, end_x)
+
+ def get_keypresses(self) -> Iterator[str]:
+ '''Loop through keypresses from terminal, collect what blessed ignores.
+
+ (Notably, blessed seems to junk any alt/escape-modifide key events it
+ does not explicitly know.
+ '''
+ n_gotchs_unprocessed = 0
+ while True:
+ new_gotchs = []
+ if not n_gotchs_unprocessed:
+ while self._blessed.kbhit(TIMEOUT_LOOP):
+ gotch = self._blessed.getch()
+ self._blessed.ungetch(gotch)
+ new_gotchs += [gotch]
+ if not self._blessed.kbhit(0):
+ break
+ n_gotchs_unprocessed += len(new_gotchs)
+ blessed_key = self._blessed.inkey(timeout=0, esc_delay=0)
+ n_chs_blessed_key = len(blessed_key.encode('utf-8'))
+ unhandleds = []
+ if len(new_gotchs) > 1 and blessed_key.name == 'KEY_ESCAPE':
+ for _ in range(len(new_gotchs) - n_chs_blessed_key):
+ unhandled = self._blessed.inkey(timeout=0, esc_delay=0)
+ unhandleds += list(unhandled.encode('utf-8'))
+ n_gotchs_unprocessed -= 1
+ n_gotchs_unprocessed -= n_chs_blessed_key
+ if unhandleds:
+ fused = ''.join([chr(n) for n in unhandleds])
+ if fused.startswith(_OSC52_PREFIX):
+ if not (encoded := fused[len(_OSC52_PREFIX):]):
+ while True:
+ gotch = self._blessed.getch()
+ if gotch == _PASTE_DELIMITER:
+ break
+ encoded += gotch
+ yield f'{_B64_PREFIX}{encoded}'
+ continue
+ yield str(unhandleds)
+ elif blessed_key.name:
+ yield blessed_key.name
+ else:
+ yield str(blessed_key)