From: Christian Heller Date: Fri, 13 Jun 2025 15:53:23 +0000 (+0200) Subject: Split up code into module files. X-Git-Url: https://plomlompom.com/repos/%7B%7B%20web_path%20%7D%7D/static/%7B%7Bdb.prefix%7D%7D/tasks?a=commitdiff_plain;h=9413b624259d4ca59c09eccadf128c06e5b52e9f;p=ircplom Split up code into module files. --- diff --git a/ircplom/__init__.py b/ircplom/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ircplom/events.py b/ircplom/events.py new file mode 100644 index 0000000..ae0f33c --- /dev/null +++ b/ircplom/events.py @@ -0,0 +1,105 @@ +'Event system with event loop.' +from enum import Enum, auto +from queue import SimpleQueue, Empty as QueueEmpty +from threading import Thread +from typing import Any, Iterator, Literal, NamedTuple, Optional, Self + + +class EventType(Enum): + 'Differentiate Events for different treatment.' + ALERT = auto() + CONN_ALERT = auto() + CONNECTED = auto() + CONN_WINDOW = auto() + DISCONNECTED = auto() + EXCEPTION = auto() + INIT_CONNECT = auto() + INIT_RECONNECT = auto() + KEYBINDING = auto() + NICK_SET = auto() + PING = auto() + PROMPT_ADD = auto() + QUIT = auto() + RECV = auto() + SEND = auto() + SET_SCREEN = auto() + + +class Event(NamedTuple): + 'Communication unit between threads.' + type_: EventType + payload: Any = None + + +class EventQueue(SimpleQueue): + 'SimpleQueue wrapper optimized for handling Events.' + + def eput(self, type_: EventType, payload: Any = None) -> None: + 'Construct Event(type_, payload) and .put it onto queue.' + self.put(Event(type_, payload)) + + +class Loop: + 'Wraps thread looping over .eput input queue, potential bonus iterator.' + + def __init__(self, + q_to_main: EventQueue, + bonus_iterator: Optional[Iterator] = None + ) -> None: + self._q_to_main = q_to_main + self._bonus_iterator = bonus_iterator + self._q_input = EventQueue() + self._thread = Thread(target=self._loop, daemon=False) + self._thread.start() + + def stop(self) -> None: + 'Emit "QUIT" signal to break threaded loop, then wait for break.' + self._q_input.eput(EventType.QUIT) + self._thread.join() + + def __enter__(self) -> Self: + return self + + def __exit__(self, *_) -> Literal[False]: + self.stop() + return False # re-raise any exception that above ignored + + def put(self, event: Event) -> None: + 'Send event into thread loop.' + self._q_input.put(event) + + def broadcast(self, type_: EventType, payload: Any = None) -> None: + 'Send event to main loop via queue.' + self._q_to_main.eput(type_, payload) + + def process_main(self, event: Event) -> bool: + 'Process event yielded from input queue.' + if event.type_ == EventType.QUIT: + return False + return True + + def process_bonus(self, yielded: str) -> None: + 'Process bonus iterator yield.' + + def _loop(self) -> None: + 'Loop over input queue and, if provided, bonus iterator.' + try: + while True: + try: + yield_main = self._q_input.get( + block=True, + timeout=0 if self._bonus_iterator else None) + except QueueEmpty: + pass + else: + if self.process_main(yield_main) is False: + break + if self._bonus_iterator: + try: + yield_bonus = next(self._bonus_iterator) + except StopIteration: + break + if yield_bonus: + self.process_bonus(yield_bonus) + except Exception as e: # pylint: disable=broad-exception-caught + self._q_to_main.eput(EventType.EXCEPTION, e) diff --git a/ircplom/irc_conn.py b/ircplom/irc_conn.py new file mode 100644 index 0000000..eb3c575 --- /dev/null +++ b/ircplom/irc_conn.py @@ -0,0 +1,273 @@ +'IRC server connection management.' +# built-ins +from socket import socket, gaierror as socket_gaierror +from threading import Thread +from typing import Any, Callable, Iterator, NamedTuple, Optional, Self +# ourselves +from ircplom.events import Event, EventQueue, EventType, Loop + + +TIMEOUT_LOOP = 0.1 + +_TIMEOUT_CONNECT = 5 +_CONN_RECV_BUFSIZE = 1024 +_PORT = 6667 + +_IRCSPEC_LINE_SEPARATOR = b'\r\n' +_IRCSPEC_TAG_ESCAPES = ((r'\:', ';'), + (r'\s', ' '), + (r'\n', '\n'), + (r'\r', '\r'), + (r'\\', '\\')) + + +class LoginNames(NamedTuple): + 'Collects the names needed on server connect for USER, NICK commands.' + user: str + nick: str + real: str + + +class IrcConnection: + 'Abstracts socket connection, loop over it, and handling messages from it.' + + def __init__(self, + q_to_main: EventQueue, + idx: int, + hostname: str, + login: LoginNames, + ) -> None: + self._idx = idx + self._q_to_main = q_to_main + self._hostname = hostname + self._login = login + self._socket: Optional[socket] = None + self._assumed_open = False + self._loop: Optional[_ConnectionLoop] = None + self._broadcast(EventType.CONN_WINDOW, self._login.nick) + self._start_connecting() + + def _start_connecting(self) -> None: + + def connect(self) -> None: + self._socket = socket() + self._broadcast(EventType.CONN_ALERT, + f'Connecting to {self._hostname} …') + self._socket.settimeout(_TIMEOUT_CONNECT) + try: + self._socket.connect((self._hostname, _PORT)) + except (TimeoutError, socket_gaierror) as e: + self._broadcast(EventType.CONN_ALERT, str(e)) + return + self._socket.settimeout(TIMEOUT_LOOP) + self._assumed_open = True + self._loop = _ConnectionLoop(self._idx, self._q_to_main, + self._read_lines()) + self._broadcast(EventType.CONNECTED, self._login) + + Thread(target=connect, daemon=True, args=(self,)).start() + + def close(self) -> None: + 'Close both _ConnectionLoop and socket.' + self._assumed_open = False + if self._loop: + self._loop.stop() + self._loop = None + if self._socket: + self._socket.close() + self._socket = None + + def _broadcast(self, type_: EventType, payload: Any = None) -> None: + 'Send event to main loop via queue, with connection index as 1st arg.' + self._q_to_main.eput(type_, (self._idx, payload)) + + def _read_lines(self) -> Iterator[Optional[str]]: + 'Receive line-separator-delimited messages from socket.' + assert self._socket is not None + bytes_total = b'' + buffer_linesep = b'' + while True: + try: + bytes_new = self._socket.recv(_CONN_RECV_BUFSIZE) + except TimeoutError: + yield None + continue + except OSError as e: + if e.errno == 9: + break + raise e + if not bytes_new: + break + for c in bytes_new: + c_byted = c.to_bytes() + if c not in _IRCSPEC_LINE_SEPARATOR: + bytes_total += c_byted + buffer_linesep = b'' + elif c == _IRCSPEC_LINE_SEPARATOR[0]: + buffer_linesep = c_byted + else: + buffer_linesep += c_byted + if buffer_linesep == _IRCSPEC_LINE_SEPARATOR: + buffer_linesep = b'' + yield bytes_total.decode('utf-8') + bytes_total = b'' + + def _write_line(self, line: str) -> None: + 'Send line-separator-delimited message over socket.' + if not (self._socket and self._assumed_open): + self._broadcast(EventType.CONN_ALERT, + 'cannot send, assuming connection closed') + return + self._socket.sendall(line.encode('utf-8') + _IRCSPEC_LINE_SEPARATOR) + + def handle(self, event: Event) -> None: + 'Process connection-directed Event into further steps.' + if event.type_ == EventType.INIT_RECONNECT: + if self._assumed_open: + self._broadcast(EventType.CONN_ALERT, + 'Reconnect called, but still seem connected, ' + 'so nothing to do.') + else: + self._start_connecting() + elif event.type_ == EventType.CONNECTED: + assert self._loop is not None + self._loop.put(event) + elif event.type_ == EventType.DISCONNECTED: + self.close() + elif event.type_ == EventType.SEND: + msg: IrcMessage = event.payload[1] + self._write_line(msg.raw) + + +class IrcMessage: + 'Properly structured representation of IRC message as per IRCv3 spec.' + _raw: Optional[str] = None + + def __init__(self, + verb: str, + parameters: Optional[tuple[str, ...]] = None, + source: str = '', + tags: Optional[dict[str, str]] = None + ) -> None: + self.verb: str = verb + self.parameters: tuple[str, ...] = parameters or tuple() + self.source: str = source + self.tags: dict[str, str] = tags or {} + + @classmethod + def from_raw(cls, raw_msg: str) -> Self: + 'Parse raw IRC message line into properly structured IrcMessage.' + + class _Stage(NamedTuple): + name: str + prefix_char: Optional[str] + processor: Callable = lambda s: s + + def _parse_tags(str_tags: str) -> dict[str, str]: + tags = {} + for str_tag in [s for s in str_tags.split(';') if s]: + if '=' in str_tag: + key, val = str_tag.split('=', maxsplit=1) + for to_repl, repl_with in _IRCSPEC_TAG_ESCAPES: + val = val.replace(to_repl, repl_with) + else: + key, val = str_tag, '' + tags[key] = val + return tags + + def _split_params(str_params: str) -> tuple[str, ...]: + params = [] + params_stage = 0 # 0: gap, 1: non-trailing, 2: trailing + for char in str_params: + if char == ' ' and params_stage < 2: + params_stage = 0 + continue + if params_stage == 0: + params += [''] + params_stage += 1 + if char == ':': + params_stage += 1 + continue + params[-1] += char + return tuple(p for p in params) + + stages = [_Stage('tags', '@', _parse_tags), + _Stage('source', ':'), + _Stage('verb', None, lambda s: s.upper()), + _Stage('parameters', None, _split_params)] + harvest = {s.name: '' for s in stages} + idx_stage = 0 + stage = None + for char in raw_msg: + if char == ' ' and idx_stage < (len(stages) - 1): + if stage: + stage = None + continue + if not stage: + while not stage: + idx_stage += 1 + tested = stages[idx_stage] + if (not tested.prefix_char) or char == tested.prefix_char: + stage = tested + if stage.prefix_char: + continue + harvest[stage.name] += char + msg = cls(**{s.name: s.processor(harvest[s.name]) for s in stages}) + msg._raw = raw_msg + return msg + + @property + def raw(self) -> str: + 'Return raw message code – create from known fields if necessary.' + if not self._raw: + to_combine = [] + if self.tags: + tag_strs = [] + for key, val in self.tags.items(): + tag_strs += [key] + if not val: + continue + for repl_with, to_repl in reversed(_IRCSPEC_TAG_ESCAPES): + val = val.replace(to_repl, repl_with) + tag_strs[-1] += f'={val}' + to_combine += ['@' + ';'.join(tag_strs)] + to_combine += [self.verb] + if self.parameters: + to_combine += self.parameters[:-1] + to_combine += [f':{self.parameters[-1]}'] + self._raw = ' '.join(to_combine) + return self._raw + + +class _ConnectionLoop(Loop): + 'Loop receiving and translating socket messages towards main loop.' + + def __init__(self, connection_idx: int, *args, **kwargs) -> None: + self._conn_idx = connection_idx + super().__init__(*args, **kwargs) + + def _broadcast_conn(self, type_: EventType, *args) -> None: + self.broadcast(type_, (self._conn_idx, *args)) + + def _send(self, verb: str, parameters: tuple[str, ...]) -> None: + self._broadcast_conn(EventType.SEND, IrcMessage(verb, parameters)) + + def process_main(self, event: Event) -> bool: + if not super().process_main(event): + return False + if event.type_ == EventType.CONNECTED: + login = event.payload[1] + self._send('USER', (login.user, '0', '*', login.real)) + self._send('NICK', (login.nick,)) + return True + + def process_bonus(self, yielded: str) -> None: + msg = IrcMessage.from_raw(yielded) + self._broadcast_conn(EventType.RECV, msg) + if msg.verb == 'PING': + self._send('PONG', (msg.parameters[0],)) + elif msg.verb == 'ERROR'\ + and msg.parameters[0].startswith('Closing link:'): + self._broadcast_conn(EventType.DISCONNECTED) + elif msg.verb == '001': + self._broadcast_conn(EventType.NICK_SET, msg.parameters[0]) diff --git a/ircplom/tui.py b/ircplom/tui.py new file mode 100644 index 0000000..b7fcbd3 --- /dev/null +++ b/ircplom/tui.py @@ -0,0 +1,617 @@ +'Terminal and TUI management.' +# built-ins +from abc import ABC, abstractmethod +from base64 import b64decode +from contextlib import contextmanager +from getpass import getuser as getusername +from inspect import _empty as inspect_empty, signature, stack +from signal import SIGWINCH, signal +from typing import Any, Callable, Generator, Iterator, NamedTuple, Optional +# requirements.txt +from blessed import Terminal as BlessedTerminal +# ourselves +from ircplom.events import Event, EventType, EventQueue, Loop +from ircplom.irc_conn import IrcMessage, LoginNames, TIMEOUT_LOOP + + +_B64_PREFIX = 'b64:' +_OSC52_PREFIX = ']52;c;' +_PASTE_DELIMITER = '\007' + +_PROMPT_TEMPLATE = '> ' +_PROMPT_ELL_IN = '<…' +_PROMPT_ELL_OUT = '…>' + +_KEYBINDINGS = { + 'KEY_BACKSPACE': ('window.prompt.backspace',), + 'KEY_ENTER': ('prompt_enter',), + 'KEY_LEFT': ('window.prompt.move_cursor', 'left'), + 'KEY_RIGHT': ('window.prompt.move_cursor', 'right'), + 'KEY_UP': ('window.prompt.scroll', 'up'), + 'KEY_DOWN': ('window.prompt.scroll', 'down'), + 'KEY_PGUP': ('window.log.scroll', 'up'), + 'KEY_PGDOWN': ('window.log.scroll', 'down'), + '[91, 49, 59, 51, 68]': ('window', 'left'), + '[91, 49, 59, 51, 67]': ('window', 'right'), + 'KEY_F1': ('window.paste',), +} +CMD_SHORTCUTS = { + 'disconnect': 'window.disconnect', + 'reconnect': 'window.reconnect' +} + + +class _YX(NamedTuple): + '2-dimensional coordinate.' + y: int + x: int + + +class _Widget(ABC): + 'Defines most basic TUI object API.' + + @abstractmethod + def set_geometry(self, measurements: _YX) -> None: + 'Update widget\'s measurements, re-generate content where necessary.' + + @abstractmethod + def draw(self) -> None: + 'Print widget\'s content in shape appropriate to set geometry.' + + +class _ScrollableWidget(_Widget, ABC): + 'Defines some API shared between _PromptWidget and _LogWidget.' + _history_idx: int + + def __init__(self, write: Callable[..., None], *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self._write = write + self._history: list[str] = [] + + @abstractmethod + def append(self, to_append: str) -> None: + 'Append to widget content.' + + @abstractmethod + def _scroll(self, up=True) -> None: + pass + + def cmd__scroll(self, direction: str) -> None: + 'Scroll through stored content/history.' + self._scroll(up=direction == 'up') + self.draw() + + +class _LogWidget(_ScrollableWidget): + 'Collects line-shaped messages, scrolls and wraps them for display.' + _view_size: _YX + _y_pgscroll: int + + def __init__(self, wrap: Callable[[str], list[str]], *args, **kwargs + ) -> None: + super().__init__(*args, **kwargs) + self._wrap = wrap + self._wrapped_idx = self._history_idx = -1 + self._wrapped: list[tuple[Optional[int], str]] = [] + + def _add_wrapped(self, idx_original, line) -> int: + wrapped_lines = self._wrap(line) + self._wrapped += [(idx_original, line) for line in wrapped_lines] + return len(wrapped_lines) + + def set_geometry(self, measurements: _YX) -> None: + self._view_size = measurements + self._y_pgscroll = self._view_size.y // 2 + self._wrapped.clear() + self._wrapped += [(None, '')] * self._view_size.y + if not self._history: + return + for idx_history, line in enumerate(self._history): + self._add_wrapped(idx_history, line) + wrapped_lines_for_history_idx = [ + t for t in self._wrapped + if t[0] == len(self._history) + self._history_idx] + idx_their_last = self._wrapped.index(wrapped_lines_for_history_idx[-1]) + self._wrapped_idx = idx_their_last - len(self._wrapped) + + def append(self, to_append: str) -> None: + self._history += [to_append] + n_wrapped_lines = self._add_wrapped(len(self._history) - 1, to_append) + if self._wrapped_idx < -1: + self._history_idx -= 1 + self._wrapped_idx -= n_wrapped_lines + + def draw(self) -> None: + start_idx = self._wrapped_idx - self._view_size.y + 1 + end_idx = self._wrapped_idx + to_write = [t[1] for t in self._wrapped[start_idx:end_idx]] + if self._wrapped_idx < -1: + scroll_info = f'vvv [{(-1) * self._wrapped_idx}] ' + scroll_info += 'v' * (self._view_size.x - len(scroll_info)) + to_write += [scroll_info] + else: + to_write += [self._wrapped[self._wrapped_idx][1]] + for i, line in enumerate(to_write): + self._write(line, i) + + def _scroll(self, up: bool = True) -> None: + if up: + self._wrapped_idx = max(self._view_size.y + 1 - len(self._wrapped), + self._wrapped_idx - self._y_pgscroll) + else: + self._wrapped_idx = min(-1, + self._wrapped_idx + self._y_pgscroll) + history_idx_to_wrapped_idx = self._wrapped[self._wrapped_idx][0] + if history_idx_to_wrapped_idx is not None: + self._history_idx = history_idx_to_wrapped_idx - len(self._history) + + +class _PromptWidget(_ScrollableWidget): + 'Keyboard-controlled command input field.' + _y: int + _width: int + _prompt: str = _PROMPT_TEMPLATE + _history_idx = 0 + _input_buffer: str + _cursor_x: int + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self._reset_buffer('') + + def set_geometry(self, measurements: _YX) -> None: + self._y, self._width = measurements + + def append(self, to_append: str) -> None: + self._cursor_x += len(to_append) + self._input_buffer = (self._input_buffer[:self._cursor_x - 1] + + to_append + + self._input_buffer[self._cursor_x - 1:]) + self._history_idx = 0 + self.draw() + + def draw(self) -> None: + prefix = self._prompt[:] + content = self._input_buffer[:] + if self._cursor_x == len(self._input_buffer): + content += ' ' + half_width = (self._width - len(prefix)) // 2 + offset = 0 + if len(prefix) + len(content) > self._width\ + and self._cursor_x > half_width: + prefix += _PROMPT_ELL_IN + offset = min(len(prefix) + len(content) - self._width, + self._cursor_x - half_width + len(_PROMPT_ELL_IN)) + cursor_x_to_write = len(prefix) + self._cursor_x - offset + to_write = f'{prefix}{content[offset:]}' + if len(to_write) > self._width: + to_write = (to_write[:self._width - len(_PROMPT_ELL_OUT)] + + _PROMPT_ELL_OUT) + self._write(to_write[:cursor_x_to_write], self._y, padding=False) + self._write(to_write[cursor_x_to_write], attribute='reverse', + padding=False) + self._write(to_write[cursor_x_to_write + 1:]) + + def _scroll(self, up: bool = True) -> None: + if up and -(self._history_idx) < len(self._history): + if self._history_idx == 0 and self._input_buffer: + self._history += [self._input_buffer[:]] + self._reset_buffer('') + self._history_idx -= 1 + self._history_idx -= 1 + self._reset_buffer(self._history[self._history_idx]) + elif not up: + if self._history_idx < 0: + self._history_idx += 1 + if self._history_idx == 0: + self._reset_buffer('') + else: + self._reset_buffer(self._history[self._history_idx]) + elif self._input_buffer: + self._history += [self._input_buffer[:]] + self._reset_buffer('') + + def cmd__backspace(self) -> None: + 'Truncate current content by one character, if possible.' + if self._cursor_x > 0: + self._cursor_x -= 1 + self._input_buffer = (self._input_buffer[:self._cursor_x] + + self._input_buffer[self._cursor_x + 1:]) + self._history_idx = 0 + self.draw() + + def cmd__move_cursor(self, direction: str) -> None: + 'Move cursor one space into direction ("left" or "right") if possible.' + if direction == 'left' and self._cursor_x > 0: + self._cursor_x -= 1 + elif direction == 'right'\ + and self._cursor_x <= len(self._input_buffer): + self._cursor_x += 1 + else: + return + self.draw() + + def _reset_buffer(self, content: str) -> None: + self._input_buffer = content + self._cursor_x = len(self._input_buffer) + + def enter(self) -> str: + 'Return current content while also clearing and then redrawing.' + to_return = self._input_buffer[:] + if to_return: + self._history += [to_return] + self._reset_buffer('') + self.draw() + return to_return + + +class _ConnectionPromptWidget(_PromptWidget): + 'PromptWidget with attributes, methods for dealing with an IrcConnection.' + _nickname: str = '' + + def update_prompt(self, + nick_confirmed=False, + nick: Optional[str] = None + ) -> None: + 'Update nickname-relevant knowledge to go into prompt string.' + self._prompt = '' + if nick: + self._nickname = nick + if self._nickname: + self._prompt += ' ' if nick_confirmed else '?' + self._prompt += self._nickname + self._prompt += _PROMPT_TEMPLATE + + +class _Window(_Widget): + 'Collects a log and a prompt meant for the same content stream.' + _y_status: int + prompt: _PromptWidget + + def __init__(self, idx: int, term: 'Terminal') -> None: + self.idx = idx + self._term = term + self.log = _LogWidget(self._term.wrap, self._term.write) + self.prompt = self.__annotations__['prompt'](self._term.write) + if hasattr(self._term, 'size'): + self.set_geometry() + + def set_geometry(self, _=None) -> None: + assert _ is None + self._y_status = self._term.size.y - 2 + self.log.set_geometry(_YX(self._y_status, self._term.size.x)) + self.prompt.set_geometry(_YX(self._term.size.y - 1, self._term.size.x)) + + def draw(self) -> None: + idx_box = f'[{self.idx}]' + status_line = idx_box + '=' * (self._term.size.x - len(idx_box)) + self._term.clear() + self.log.draw() + self._term.write(status_line, self._y_status) + self.prompt.draw() + + def cmd__paste(self) -> None: + 'Write OSC 52 ? sequence to get encoded clipboard paste into stdin.' + self._term.write(f'\033{_OSC52_PREFIX}?{_PASTE_DELIMITER}', + self._y_status) + self.draw() + + +class _ConnectionWindow(_Window): + 'Window with attributes and methods for dealing with an IrcConnection.' + prompt: _ConnectionPromptWidget + + def __init__(self, + broadcast: Callable[[EventType, Any], None], + conn_idx: int, + *args, **kwargs + ) -> None: + self._broadcast = broadcast + self._conn_idx = conn_idx + super().__init__(*args, **kwargs) + + def cmd__disconnect(self, quit_msg: str = 'ircplom says bye') -> None: + 'Send QUIT command to server.' + self._broadcast(EventType.SEND, + (self._conn_idx, IrcMessage('QUIT', (quit_msg, )))) + + def cmd__reconnect(self) -> None: + 'Attempt reconnection.' + self._broadcast(EventType.INIT_RECONNECT, (self._conn_idx,)) + + +class _KeyboardLoop(Loop): + 'Loop receiving and translating keyboard events towards main loop.' + + def process_bonus(self, yielded: str) -> None: + if yielded.startswith(_B64_PREFIX): + encoded = yielded[len(_B64_PREFIX):] + to_paste = '' + for i, c in enumerate(b64decode(encoded).decode('utf-8')): + if i > 512: + break + if c.isprintable(): + to_paste += c + elif c.isspace(): + to_paste += ' ' + else: + to_paste += '#' + self.broadcast(EventType.PROMPT_ADD, to_paste) + elif yielded in _KEYBINDINGS: + self.broadcast(EventType.KEYBINDING, _KEYBINDINGS[yielded]) + elif len(yielded) == 1: + self.broadcast(EventType.PROMPT_ADD, yielded) + else: + self.broadcast(EventType.ALERT, + f'unknown keyboard input: {yielded}') + + +class _TuiLoop(Loop): + 'Loop for drawing/updating TUI.' + + def __init__(self, term: 'Terminal', *args, **kwargs) -> None: + self._term = term + self._windows = [_Window(0, self._term)] + self._window_idx = 0 + self._conn_windows: list[_ConnectionWindow] = [] + super().__init__(*args, **kwargs) + self.put(Event(EventType.SET_SCREEN)) + + def _cmd_name_to_cmd(self, cmd_name: str) -> Optional[Callable]: + cmd_name = CMD_SHORTCUTS.get(cmd_name, cmd_name) + cmd_parent = self + while True: + cmd_name_toks = cmd_name.split('.', maxsplit=1) + if len(cmd_name_toks) == 1: + break + if not hasattr(cmd_parent, cmd_name_toks[0]): + return None + cmd_parent = getattr(cmd_parent, cmd_name_toks[0]) + cmd_name = cmd_name_toks[1] + cmd_name = f'cmd__{cmd_name}' + if not hasattr(cmd_parent, cmd_name): + return None + return getattr(cmd_parent, cmd_name) + + def process_main(self, event: Event) -> bool: + if not super().process_main(event): + return False + if event.type_ == EventType.SET_SCREEN: + self._term.calc_geometry() + for window in self._windows: + window.set_geometry() + self.window.draw() + elif event.type_ == EventType.CONN_WINDOW: + conn_win = _ConnectionWindow(broadcast=self.broadcast, + conn_idx=event.payload[0], + idx=len(self._windows), + term=self._term) + conn_win.prompt.update_prompt(nick_confirmed=False, + nick=event.payload[1]) + self._windows += [conn_win] + self._conn_windows += [conn_win] + self._switch_window(conn_win.idx) + elif event.type_ == EventType.ALERT: + self.window.log.append(f'ALERT {event.payload}') + self.window.log.draw() + elif event.type_ in {EventType.RECV, EventType.SEND, + EventType.CONN_ALERT}: + conn_win = self._conn_windows[event.payload[0]] + if event.type_ == EventType.CONN_ALERT: + msg = f'ALERT {event.payload[1]}' + else: + msg = (('<-' if event.type_ == EventType.RECV else '->') + + event.payload[1].raw) + conn_win.log.append(msg) + if conn_win == self.window: + self.window.log.draw() + elif event.type_ == EventType.NICK_SET: + conn_win = self._conn_windows[event.payload[0]] + conn_win.prompt.update_prompt(nick_confirmed=True, + nick=event.payload[1]) + if conn_win == self.window: + self.window.prompt.draw() + elif event.type_ == EventType.DISCONNECTED: + conn_win = self._conn_windows[event.payload[0]] + conn_win.prompt.update_prompt(nick_confirmed=False) + if conn_win == self.window: + self.window.prompt.draw() + elif event.type_ == EventType.KEYBINDING: + cmd = self._cmd_name_to_cmd(event.payload[0]) + assert cmd is not None + cmd(*event.payload[1:]) + elif event.type_ == EventType.PROMPT_ADD: + self.window.prompt.append(event.payload) + # elif event.type_ == EventType.DEBUG: + # from traceback import format_exception + # for line in '\n'.join(format_exception(event.payload) + # ).split('\n'): + # self.window.log.append(f'DEBUG {line}') + # self.window.log.draw() + else: + return True + self._term.flush() + return True + + @property + def window(self) -> _Window: + 'Currently selected Window.' + return self._windows[self._window_idx] + + def _switch_window(self, idx: int) -> None: + self._window_idx = idx + self.window.draw() + + def cmd__connect(self, hostname: str, nickname: str, realname: str + ) -> None: + 'Send INIT_CONNECT command to main loop.' + login = LoginNames(user=getusername(), nick=nickname, real=realname) + self.broadcast(EventType.INIT_CONNECT, (hostname, login)) + + def cmd__prompt_enter(self) -> None: + 'Get prompt content from .window.prompt.enter, parse to & run command.' + to_parse = self.window.prompt.enter() + if not to_parse: + return + alert: Optional[str] = None + if to_parse[0:1] == '/': + toks = to_parse[1:].split(maxsplit=1) + alert = f'{toks[0]} unknown' + cmd = self._cmd_name_to_cmd(toks[0]) + if cmd and cmd.__name__ != stack()[0].function: + params = signature(cmd).parameters + n_args_max = len(params) + n_args_min = len([p for p in params.values() + if p.default == inspect_empty]) + alert = f'{cmd.__name__} needs between {n_args_min} and '\ + f'{n_args_max} args' + if len(toks) == 1 and not n_args_min: + alert = cmd() + elif len(toks) > 1 and params\ + and n_args_min <= len(toks[1].split()): + args = [] + while len(toks) > 1 and n_args_max: + toks = toks[1].split(maxsplit=1) + args += [toks[0]] + n_args_max -= 1 + alert = cmd(*args) + else: + alert = 'not prefixed by /' + if alert: + self.broadcast(EventType.ALERT, f'invalid prompt command: {alert}') + + def cmd__quit(self) -> None: + 'Send QUIT to all threads.' + self.broadcast(EventType.QUIT) + + def cmd__window(self, towards: str) -> Optional[str]: + 'Switch window selection.' + n_windows = len(self._windows) + if n_windows < 2: + return 'no alternate window to move into' + if towards in {'left', 'right'}: + multiplier = (+1) if towards == 'right' else (-1) + window_idx = self._window_idx + multiplier + if not 0 <= window_idx < n_windows: + window_idx -= multiplier * n_windows + elif not towards.isdigit(): + return f'neither "left"/"right" nor integer: {towards}' + else: + window_idx = int(towards) + if not 0 <= window_idx < n_windows: + return f'unavailable window idx: {window_idx}' + self._switch_window(window_idx) + return None + + +class Terminal: + 'Abstraction of terminal interface.' + size: _YX + tui: _TuiLoop + _blessed: BlessedTerminal + _cursor_yx_: _YX + + @contextmanager + def context(self, q_to_main: EventQueue) -> Generator: + 'Combine multiple contexts into one.' + signal(SIGWINCH, lambda *_: q_to_main.eput(EventType.SET_SCREEN)) + self._blessed = BlessedTerminal() + with (self._blessed.raw(), + self._blessed.fullscreen(), + self._blessed.hidden_cursor(), + _KeyboardLoop(q_to_main, self.get_keypresses())): + self._cursor_yx = _YX(0, 0) + with _TuiLoop(self, q_to_main) as self.tui: + yield self + + @property + def _cursor_yx(self) -> _YX: + return self._cursor_yx_ + + @_cursor_yx.setter + def _cursor_yx(self, yx: _YX) -> None: + print(self._blessed.move_yx(yx.y, yx.x), end='') + self._cursor_yx_ = yx + + def calc_geometry(self) -> None: + '(Re-)calculate .size..' + self.size = _YX(self._blessed.height, self._blessed.width) + + def clear(self) -> None: + 'Clear terminal.' + print(self._blessed.clear, end='') + + def flush(self) -> None: + 'Flush terminal.' + print('', end='', flush=True) + + def wrap(self, line: str) -> list[str]: + 'Wrap line to list of lines fitting into terminal width.' + return self._blessed.wrap(line, width=self.size.x, + subsequent_indent=' '*4) + + def write(self, + msg: str = '', + start_y: Optional[int] = None, + attribute: Optional[str] = None, + padding: bool = True + ) -> None: + 'Print to terminal, with position, padding to line end, attributes.' + if start_y: + self._cursor_yx = _YX(start_y, 0) + # ._blessed.length can slow down things notably: only use where needed! + end_x = self._cursor_yx.x + (len(msg) if msg.isascii() + else self._blessed.length(msg)) + len_padding = self.size.x - end_x + if len_padding < 0: + msg = self._blessed.truncate(msg, self.size.x - self._cursor_yx.x) + elif padding: + msg += ' ' * len_padding + end_x = self.size.x + if attribute: + msg = getattr(self._blessed, attribute)(msg) + print(msg, end='') + self._cursor_yx = _YX(self._cursor_yx.y, end_x) + + def get_keypresses(self) -> Iterator[str]: + '''Loop through keypresses from terminal, collect what blessed ignores. + + (Notably, blessed seems to junk any alt/escape-modifide key events it + does not explicitly know. + ''' + n_gotchs_unprocessed = 0 + while True: + new_gotchs = [] + if not n_gotchs_unprocessed: + while self._blessed.kbhit(TIMEOUT_LOOP): + gotch = self._blessed.getch() + self._blessed.ungetch(gotch) + new_gotchs += [gotch] + if not self._blessed.kbhit(0): + break + n_gotchs_unprocessed += len(new_gotchs) + blessed_key = self._blessed.inkey(timeout=0, esc_delay=0) + n_chs_blessed_key = len(blessed_key.encode('utf-8')) + unhandleds = [] + if len(new_gotchs) > 1 and blessed_key.name == 'KEY_ESCAPE': + for _ in range(len(new_gotchs) - n_chs_blessed_key): + unhandled = self._blessed.inkey(timeout=0, esc_delay=0) + unhandleds += list(unhandled.encode('utf-8')) + n_gotchs_unprocessed -= 1 + n_gotchs_unprocessed -= n_chs_blessed_key + if unhandleds: + fused = ''.join([chr(n) for n in unhandleds]) + if fused.startswith(_OSC52_PREFIX): + if not (encoded := fused[len(_OSC52_PREFIX):]): + while True: + gotch = self._blessed.getch() + if gotch == _PASTE_DELIMITER: + break + encoded += gotch + yield f'{_B64_PREFIX}{encoded}' + continue + yield str(unhandleds) + elif blessed_key.name: + yield blessed_key.name + else: + yield str(blessed_key)