From: Christian Heller Date: Fri, 13 Jun 2025 16:15:25 +0000 (+0200) Subject: Also update main file (forgotten in prev commit). X-Git-Url: https://plomlompom.com/repos/%7B%7B%20web_path%20%7D%7D/static/%7B%7Bprefix%7D%7D/template?a=commitdiff_plain;h=ddad21a0f5a2fc7127a18673148db2242ea8b828;p=ircplom Also update main file (forgotten in prev commit). --- diff --git a/ircplom.py b/ircplom.py index 6afe9dd..cec2ccb 100755 --- a/ircplom.py +++ b/ircplom.py @@ -1,996 +1,8 @@ #!/usr/bin/env python3 'Attempt at an IRC client.' - -from abc import ABC, abstractmethod -from base64 import b64decode -from contextlib import contextmanager -from enum import Enum, auto -from getpass import getuser as getusername -from inspect import _empty as inspect_empty, signature, stack -from queue import SimpleQueue, Empty as QueueEmpty -from signal import SIGWINCH, signal -from socket import socket, gaierror as socket_gaierror -from threading import Thread -from typing import ( - Any, Callable, Generator, Iterator, Literal, NamedTuple, Optional, Self) - -from blessed import Terminal as BlessedTerminal - - -TIMEOUT_CONNECT = 5 -TIMEOUT_LOOP = 0.1 -CONN_RECV_BUFSIZE = 1024 - -KEYBINDINGS = { - 'KEY_BACKSPACE': ('window.prompt.backspace',), - 'KEY_ENTER': ('prompt_enter',), - 'KEY_LEFT': ('window.prompt.move_cursor', 'left'), - 'KEY_RIGHT': ('window.prompt.move_cursor', 'right'), - 'KEY_UP': ('window.prompt.scroll', 'up'), - 'KEY_DOWN': ('window.prompt.scroll', 'down'), - 'KEY_PGUP': ('window.log.scroll', 'up'), - 'KEY_PGDOWN': ('window.log.scroll', 'down'), - '[91, 49, 59, 51, 68]': ('window', 'left'), - '[91, 49, 59, 51, 67]': ('window', 'right'), - 'KEY_F1': ('window.paste',), -} -CMD_SHORTCUTS = { - 'disconnect': 'window.disconnect', - 'reconnect': 'window.reconnect' -} - -B64_PREFIX = 'b64:' -OSC52_PREFIX = ']52;c;' -PASTE_DELIMITER = '\007' - -PROMPT_TEMPLATE = '> ' -PROMPT_ELL_IN = '<…' -PROMPT_ELL_OUT = '…>' - -PORT = 6667 -IRCSPEC_LINE_SEPARATOR = b'\r\n' -IRCSPEC_TAG_ESCAPES = ((r'\:', ';'), - (r'\s', ' '), - (r'\n', '\n'), - (r'\r', '\r'), - (r'\\', '\\')) - - -class EventType(Enum): - 'Differentiate Events for different treatment.' - ALERT = auto() - CONN_ALERT = auto() - CONNECTED = auto() - CONN_WINDOW = auto() - DISCONNECTED = auto() - EXCEPTION = auto() - INIT_CONNECT = auto() - INIT_RECONNECT = auto() - KEYBINDING = auto() - NICK_SET = auto() - PING = auto() - PROMPT_ADD = auto() - QUIT = auto() - RECV = auto() - SEND = auto() - SET_SCREEN = auto() - - -class Event(NamedTuple): - 'Communication unit between threads.' - type_: EventType - payload: Any = None - - -class EventQueue(SimpleQueue): - 'SimpleQueue wrapper optimized for handling Events.' - - def eput(self, type_: EventType, payload: Any = None) -> None: - 'Construct Event(type_, payload) and .put it onto queue.' - self.put(Event(type_, payload)) - - -class YX(NamedTuple): - '2-dimensional coordinate.' - y: int - x: int - - -# def log(msg): -# from datetime import datetime -# with open('log.txt', 'a') as f: -# f.write(f'{datetime.now()} {msg}\n') - - -class Terminal: - 'Abstraction of terminal interface.' - size: YX - tui: 'TuiLoop' - _blessed: BlessedTerminal - _cursor_yx_: YX - - @contextmanager - def context(self, q_to_main: EventQueue) -> Generator: - 'Combine multiple contexts into one.' - signal(SIGWINCH, lambda *_: q_to_main.eput(EventType.SET_SCREEN)) - self._blessed = BlessedTerminal() - with (self._blessed.raw(), - self._blessed.fullscreen(), - self._blessed.hidden_cursor(), - KeyboardLoop(q_to_main, self.get_keypresses())): - self._cursor_yx = YX(0, 0) - with TuiLoop(self, q_to_main) as self.tui: - yield self - - @property - def _cursor_yx(self) -> YX: - return self._cursor_yx_ - - @_cursor_yx.setter - def _cursor_yx(self, yx: YX) -> None: - print(self._blessed.move_yx(yx.y, yx.x), end='') - self._cursor_yx_ = yx - - def calc_geometry(self) -> None: - '(Re-)calculate .size..' - self.size = YX(self._blessed.height, self._blessed.width) - - def clear(self) -> None: - 'Clear terminal.' - print(self._blessed.clear, end='') - - def flush(self) -> None: - 'Flush terminal.' - print('', end='', flush=True) - - def wrap(self, line: str) -> list[str]: - 'Wrap line to list of lines fitting into terminal width.' - return self._blessed.wrap(line, width=self.size.x, - subsequent_indent=' '*4) - - def write(self, - msg: str = '', - start_y: Optional[int] = None, - attribute: Optional[str] = None, - padding: bool = True - ) -> None: - 'Print to terminal, with position, padding to line end, attributes.' - if start_y: - self._cursor_yx = YX(start_y, 0) - # ._blessed.length can slow down things notably: only use where needed! - end_x = self._cursor_yx.x + (len(msg) if msg.isascii() - else self._blessed.length(msg)) - len_padding = self.size.x - end_x - if len_padding < 0: - msg = self._blessed.truncate(msg, self.size.x - self._cursor_yx.x) - elif padding: - msg += ' ' * len_padding - end_x = self.size.x - if attribute: - msg = getattr(self._blessed, attribute)(msg) - print(msg, end='') - self._cursor_yx = YX(self._cursor_yx.y, end_x) - - def get_keypresses(self) -> Iterator[str]: - '''Loop through keypresses from terminal, collect what blessed ignores. - - (Notably, blessed seems to junk any alt/escape-modifide key events it - does not explicitly know. - ''' - n_gotchs_unprocessed = 0 - while True: - new_gotchs = [] - if not n_gotchs_unprocessed: - while self._blessed.kbhit(TIMEOUT_LOOP): - gotch = self._blessed.getch() - self._blessed.ungetch(gotch) - new_gotchs += [gotch] - if not self._blessed.kbhit(0): - break - n_gotchs_unprocessed += len(new_gotchs) - blessed_key = self._blessed.inkey(timeout=0, esc_delay=0) - n_chs_blessed_key = len(blessed_key.encode('utf-8')) - unhandleds = [] - if len(new_gotchs) > 1 and blessed_key.name == 'KEY_ESCAPE': - for _ in range(len(new_gotchs) - n_chs_blessed_key): - unhandled = self._blessed.inkey(timeout=0, esc_delay=0) - unhandleds += list(unhandled.encode('utf-8')) - n_gotchs_unprocessed -= 1 - n_gotchs_unprocessed -= n_chs_blessed_key - if unhandleds: - fused = ''.join([chr(n) for n in unhandleds]) - if fused.startswith(OSC52_PREFIX): - if not (encoded := fused[len(OSC52_PREFIX):]): - while True: - gotch = self._blessed.getch() - if gotch == PASTE_DELIMITER: - break - encoded += gotch - yield f'{B64_PREFIX}{encoded}' - continue - yield str(unhandleds) - elif blessed_key.name: - yield blessed_key.name - else: - yield str(blessed_key) - - -class LoginNames(NamedTuple): - 'Collects the names needed on server connect for USER, NICK commands.' - user: str - nick: str - real: str - - -class IrcConnection: - 'Abstracts socket connection, loop over it, and handling messages from it.' - - def __init__(self, - q_to_main: EventQueue, - idx: int, - hostname: str, - login: LoginNames, - ) -> None: - self._idx = idx - self._q_to_main = q_to_main - self._hostname = hostname - self._login = login - self._socket: Optional[socket] = None - self._assumed_open = False - self._loop: Optional[ConnectionLoop] = None - self._broadcast(EventType.CONN_WINDOW, self._login.nick) - self._start_connecting() - - def _start_connecting(self) -> None: - - def connect(self) -> None: - self._socket = socket() - self._broadcast(EventType.CONN_ALERT, - f'Connecting to {self._hostname} …') - self._socket.settimeout(TIMEOUT_CONNECT) - try: - self._socket.connect((self._hostname, PORT)) - except (TimeoutError, socket_gaierror) as e: - self._broadcast(EventType.CONN_ALERT, str(e)) - return - self._socket.settimeout(TIMEOUT_LOOP) - self._assumed_open = True - self._loop = ConnectionLoop(self._idx, self._q_to_main, - self._read_lines()) - self._broadcast(EventType.CONNECTED, self._login) - - Thread(target=connect, daemon=True, args=(self,)).start() - - def close(self): - 'Close both ConnectionLoop and socket.' - self._assumed_open = False - if self._loop: - self._loop.stop() - self._loop = None - if self._socket: - self._socket.close() - self._socket = None - - def _broadcast(self, type_: EventType, payload: Any = None) -> None: - 'Send event to main loop via queue, with connection index as 1st arg.' - self._q_to_main.eput(type_, (self._idx, payload)) - - def _read_lines(self) -> Iterator[Optional[str]]: - 'Receive line-separator-delimited messages from socket.' - assert self._socket is not None - bytes_total = b'' - buffer_linesep = b'' - while True: - try: - bytes_new = self._socket.recv(CONN_RECV_BUFSIZE) - except TimeoutError: - yield None - continue - except OSError as e: - if e.errno == 9: - break - raise e - if not bytes_new: - break - for c in bytes_new: - c_byted = c.to_bytes() - if c not in IRCSPEC_LINE_SEPARATOR: - bytes_total += c_byted - buffer_linesep = b'' - elif c == IRCSPEC_LINE_SEPARATOR[0]: - buffer_linesep = c_byted - else: - buffer_linesep += c_byted - if buffer_linesep == IRCSPEC_LINE_SEPARATOR: - buffer_linesep = b'' - yield bytes_total.decode('utf-8') - bytes_total = b'' - - def _write_line(self, line: str) -> None: - 'Send line-separator-delimited message over socket.' - if not (self._socket and self._assumed_open): - self._broadcast(EventType.CONN_ALERT, - 'cannot send, assuming connection closed') - return - self._socket.sendall(line.encode('utf-8') + IRCSPEC_LINE_SEPARATOR) - - def handle(self, event: Event) -> None: - 'Process connection-directed Event into further steps.' - if event.type_ == EventType.INIT_RECONNECT: - if self._assumed_open: - self._broadcast(EventType.CONN_ALERT, - 'Reconnect called, but still seem connected, ' - 'so nothing to do.') - else: - self._start_connecting() - elif event.type_ == EventType.CONNECTED: - assert self._loop is not None - self._loop.put(event) - elif event.type_ == EventType.DISCONNECTED: - self.close() - elif event.type_ == EventType.SEND: - msg: IrcMessage = event.payload[1] - self._write_line(msg.raw) - - -class IrcMessage: - 'Properly structured representation of IRC message as per IRCv3 spec.' - _raw: Optional[str] = None - - def __init__(self, - verb: str, - parameters: Optional[tuple[str, ...]] = None, - source: str = '', - tags: Optional[dict[str, str]] = None - ) -> None: - self.verb: str = verb - self.parameters: tuple[str, ...] = parameters or tuple() - self.source: str = source - self.tags: dict[str, str] = tags or {} - - def __str__(self) -> str: - return f'[{self.tags}[{self.source}][{self.verb}]]][{self.parameters}]' - - @classmethod - def from_raw(cls, raw_msg: str) -> Self: - 'Parse raw IRC message line into properly structured IrcMessage.' - - class _Stage(NamedTuple): - name: str - prefix_char: Optional[str] - processor: Callable = lambda s: s - - def _parse_tags(str_tags: str) -> dict[str, str]: - tags = {} - for str_tag in [s for s in str_tags.split(';') if s]: - if '=' in str_tag: - key, val = str_tag.split('=', maxsplit=1) - for to_repl, repl_with in IRCSPEC_TAG_ESCAPES: - val = val.replace(to_repl, repl_with) - else: - key, val = str_tag, '' - tags[key] = val - return tags - - def _split_params(str_params: str) -> tuple[str, ...]: - params = [] - params_stage = 0 # 0: gap, 1: non-trailing, 2: trailing - for char in str_params: - if char == ' ' and params_stage < 2: - params_stage = 0 - continue - if params_stage == 0: - params += [''] - params_stage += 1 - if char == ':': - params_stage += 1 - continue - params[-1] += char - return tuple(p for p in params) - - stages = [_Stage('tags', '@', _parse_tags), - _Stage('source', ':'), - _Stage('verb', None, lambda s: s.upper()), - _Stage('parameters', None, _split_params)] - harvest = {s.name: '' for s in stages} - idx_stage = 0 - stage = None - for char in raw_msg: - if char == ' ' and idx_stage < (len(stages) - 1): - if stage: - stage = None - continue - if not stage: - while not stage: - idx_stage += 1 - tested = stages[idx_stage] - if (not tested.prefix_char) or char == tested.prefix_char: - stage = tested - if stage.prefix_char: - continue - harvest[stage.name] += char - msg = cls(**{s.name: s.processor(harvest[s.name]) for s in stages}) - msg._raw = raw_msg - return msg - - @property - def raw(self) -> str: - 'Return raw message code – create from known fields if necessary.' - if not self._raw: - to_combine = [] - if self.tags: - tag_strs = [] - for key, val in self.tags.items(): - tag_strs += [key] - if not val: - continue - for repl_with, to_repl in reversed(IRCSPEC_TAG_ESCAPES): - val = val.replace(to_repl, repl_with) - tag_strs[-1] += f'={val}' - to_combine += ['@' + ';'.join(tag_strs)] - to_combine += [self.verb] - if self.parameters: - to_combine += self.parameters[:-1] - to_combine += [f':{self.parameters[-1]}'] - self._raw = ' '.join(to_combine) - return self._raw - - -class Loop: - 'Wraps thread looping over .eput input queue, potential bonus iterator.' - - def __init__(self, - q_to_main: EventQueue, - bonus_iterator: Optional[Iterator] = None - ) -> None: - self._q_to_main = q_to_main - self._bonus_iterator = bonus_iterator - self._q_input = EventQueue() - self._thread = Thread(target=self._loop, daemon=False) - self._thread.start() - - def stop(self) -> None: - 'Emit "QUIT" signal to break threaded loop, then wait for break.' - self._q_input.eput(EventType.QUIT) - self._thread.join() - - def __enter__(self) -> Self: - return self - - def __exit__(self, *_) -> Literal[False]: - self.stop() - return False # re-raise any exception that above ignored - - def put(self, event: Event) -> None: - 'Send event into thread loop.' - self._q_input.put(event) - - def broadcast(self, type_: EventType, payload: Any = None) -> None: - 'Send event to main loop via queue.' - self._q_to_main.eput(type_, payload) - - def process_main(self, event: Event) -> bool: - 'Process event yielded from input queue.' - if event.type_ == EventType.QUIT: - return False - return True - - def process_bonus(self, yielded: str) -> None: - 'Process bonus iterator yield.' - - def _loop(self) -> None: - 'Loop over input queue and, if provided, bonus iterator.' - try: - while True: - try: - yield_main = self._q_input.get( - block=True, - timeout=0 if self._bonus_iterator else None) - except QueueEmpty: - pass - else: - if self.process_main(yield_main) is False: - break - if self._bonus_iterator: - try: - yield_bonus = next(self._bonus_iterator) - except StopIteration: - break - if yield_bonus: - self.process_bonus(yield_bonus) - except Exception as e: # pylint: disable=broad-exception-caught - self._q_to_main.eput(EventType.EXCEPTION, e) - - -class Widget(ABC): - 'Defines most basic TUI object API.' - - @abstractmethod - def set_geometry(self, measurements: YX) -> None: - 'Update widget\'s measurements, re-generate content where necessary.' - - @abstractmethod - def draw(self) -> None: - 'Print widget\'s content in shape appropriate to set geometry.' - - -class ScrollableWidget(Widget): - 'Defines some API shared between PromptWidget and LogWidget.' - _history_idx: int - - def __init__(self, write: Callable[..., None], *args, **kwargs) -> None: - super().__init__(*args, **kwargs) - self._write = write - self._history: list[str] = [] - - @abstractmethod - def append(self, to_append: str) -> None: - 'Append to widget content.' - - @abstractmethod - def _scroll(self, up=True) -> None: - pass - - def cmd__scroll(self, direction: str) -> None: - 'Scroll through stored content/history.' - self._scroll(up=direction == 'up') - self.draw() - - -class PromptWidget(ScrollableWidget): - 'Keyboard-controlled command input field.' - _y: int - _width: int - _prompt: str = PROMPT_TEMPLATE - _history_idx = 0 - _input_buffer: str - _cursor_x: int - - def __init__(self, *args, **kwargs) -> None: - super().__init__(*args, **kwargs) - self._reset_buffer('') - - def set_geometry(self, measurements: YX) -> None: - self._y, self._width = measurements - - def append(self, to_append: str) -> None: - self._cursor_x += len(to_append) - self._input_buffer = (self._input_buffer[:self._cursor_x - 1] - + to_append - + self._input_buffer[self._cursor_x - 1:]) - self._history_idx = 0 - self.draw() - - def draw(self) -> None: - prefix = self._prompt[:] - content = self._input_buffer[:] - if self._cursor_x == len(self._input_buffer): - content += ' ' - half_width = (self._width - len(prefix)) // 2 - offset = 0 - if len(prefix) + len(content) > self._width\ - and self._cursor_x > half_width: - prefix += PROMPT_ELL_IN - offset = min(len(prefix) + len(content) - self._width, - self._cursor_x - half_width + len(PROMPT_ELL_IN)) - cursor_x_to_write = len(prefix) + self._cursor_x - offset - to_write = f'{prefix}{content[offset:]}' - if len(to_write) > self._width: - to_write = (to_write[:self._width - len(PROMPT_ELL_OUT)] - + PROMPT_ELL_OUT) - self._write(to_write[:cursor_x_to_write], self._y, padding=False) - self._write(to_write[cursor_x_to_write], attribute='reverse', - padding=False) - self._write(to_write[cursor_x_to_write + 1:]) - - def _scroll(self, up: bool = True) -> None: - if up and -(self._history_idx) < len(self._history): - if self._history_idx == 0 and self._input_buffer: - self._history += [self._input_buffer[:]] - self._reset_buffer('') - self._history_idx -= 1 - self._history_idx -= 1 - self._reset_buffer(self._history[self._history_idx]) - elif not up: - if self._history_idx < 0: - self._history_idx += 1 - if self._history_idx == 0: - self._reset_buffer('') - else: - self._reset_buffer(self._history[self._history_idx]) - elif self._input_buffer: - self._history += [self._input_buffer[:]] - self._reset_buffer('') - - def cmd__backspace(self) -> None: - 'Truncate current content by one character, if possible.' - if self._cursor_x > 0: - self._cursor_x -= 1 - self._input_buffer = (self._input_buffer[:self._cursor_x] - + self._input_buffer[self._cursor_x + 1:]) - self._history_idx = 0 - self.draw() - - def cmd__move_cursor(self, direction: str) -> None: - 'Move cursor one space into direction ("left" or "right") if possible.' - if direction == 'left' and self._cursor_x > 0: - self._cursor_x -= 1 - elif direction == 'right'\ - and self._cursor_x <= len(self._input_buffer): - self._cursor_x += 1 - else: - return - self.draw() - - def _reset_buffer(self, content: str) -> None: - self._input_buffer = content - self._cursor_x = len(self._input_buffer) - - def enter(self) -> str: - 'Return current content while also clearing and then redrawing.' - to_return = self._input_buffer[:] - if to_return: - self._history += [to_return] - self._reset_buffer('') - self.draw() - return to_return - - -class ConnectionPromptWidget(PromptWidget): - 'PromptWidget with attributes, methods for dealing with an IrcConnection.' - _nickname: str = '' - - def update_prompt(self, - nick_confirmed=False, - nick: Optional[str] = None - ) -> None: - 'Update nickname-relevant knowledge to go into prompt string.' - self._prompt = '' - if nick: - self._nickname = nick - if self._nickname: - self._prompt += ' ' if nick_confirmed else '?' - self._prompt += self._nickname - self._prompt += PROMPT_TEMPLATE - - -class LogWidget(ScrollableWidget): - 'Collects line-shaped messages, scrolls and wraps them for display.' - _view_size: YX - _y_pgscroll: int - - def __init__(self, wrap: Callable[[str], list[str]], *args, **kwargs - ) -> None: - super().__init__(*args, **kwargs) - self._wrap = wrap - self._wrapped_idx = self._history_idx = -1 - self._wrapped: list[tuple[Optional[int], str]] = [] - - def _add_wrapped(self, idx_original, line) -> int: - wrapped_lines = self._wrap(line) - self._wrapped += [(idx_original, line) for line in wrapped_lines] - return len(wrapped_lines) - - def set_geometry(self, measurements: YX) -> None: - self._view_size = measurements - self._y_pgscroll = self._view_size.y // 2 - self._wrapped.clear() - self._wrapped += [(None, '')] * self._view_size.y - if not self._history: - return - for idx_history, line in enumerate(self._history): - self._add_wrapped(idx_history, line) - wrapped_lines_for_history_idx = [ - t for t in self._wrapped - if t[0] == len(self._history) + self._history_idx] - idx_their_last = self._wrapped.index(wrapped_lines_for_history_idx[-1]) - self._wrapped_idx = idx_their_last - len(self._wrapped) - - def append(self, to_append: str) -> None: - self._history += [to_append] - n_wrapped_lines = self._add_wrapped(len(self._history) - 1, to_append) - if self._wrapped_idx < -1: - self._history_idx -= 1 - self._wrapped_idx -= n_wrapped_lines - - def draw(self) -> None: - start_idx = self._wrapped_idx - self._view_size.y + 1 - end_idx = self._wrapped_idx - to_write = [t[1] for t in self._wrapped[start_idx:end_idx]] - if self._wrapped_idx < -1: - scroll_info = f'vvv [{(-1) * self._wrapped_idx}] ' - scroll_info += 'v' * (self._view_size.x - len(scroll_info)) - to_write += [scroll_info] - else: - to_write += [self._wrapped[self._wrapped_idx][1]] - for i, line in enumerate(to_write): - self._write(line, i) - - def _scroll(self, up: bool = True) -> None: - if up: - self._wrapped_idx = max(self._view_size.y + 1 - len(self._wrapped), - self._wrapped_idx - self._y_pgscroll) - else: - self._wrapped_idx = min(-1, - self._wrapped_idx + self._y_pgscroll) - history_idx_to_wrapped_idx = self._wrapped[self._wrapped_idx][0] - if history_idx_to_wrapped_idx is not None: - self._history_idx = history_idx_to_wrapped_idx - len(self._history) - - -class Window(Widget): - 'Collects a log and a prompt meant for the same content stream.' - _y_status: int - prompt: PromptWidget - - def __init__(self, idx: int, term: Terminal) -> None: - self.idx = idx - self._term = term - self.log = LogWidget(self._term.wrap, self._term.write) - self.prompt = self.__annotations__['prompt'](self._term.write) - if hasattr(self._term, 'size'): - self.set_geometry() - - def set_geometry(self, _=None) -> None: - assert _ is None - self._y_status = self._term.size.y - 2 - self.log.set_geometry(YX(self._y_status, self._term.size.x)) - self.prompt.set_geometry(YX(self._term.size.y - 1, self._term.size.x)) - - def draw(self) -> None: - idx_box = f'[{self.idx}]' - status_line = idx_box + '=' * (self._term.size.x - len(idx_box)) - self._term.clear() - self.log.draw() - self._term.write(status_line, self._y_status) - self.prompt.draw() - - def cmd__paste(self) -> None: - 'Write OSC 52 ? sequence to get encoded clipboard paste into stdin.' - self._term.write(f'\033{OSC52_PREFIX}?{PASTE_DELIMITER}', - self._y_status) - self.draw() - - -class ConnectionWindow(Window): - 'Window with attributes and methods for dealing with an IrcConnection.' - prompt: ConnectionPromptWidget - - def __init__(self, - broadcast: Callable[[EventType, Any], None], - conn_idx: int, - *args, **kwargs - ) -> None: - self._broadcast = broadcast - self._conn_idx = conn_idx - super().__init__(*args, **kwargs) - - def cmd__disconnect(self, quit_msg: str = 'ircplom says bye') -> None: - 'Send QUIT command to server.' - self._broadcast(EventType.SEND, - (self._conn_idx, IrcMessage('QUIT', (quit_msg, )))) - - def cmd__reconnect(self) -> None: - 'Attempt reconnection.' - self._broadcast(EventType.INIT_RECONNECT, (self._conn_idx,)) - - -class TuiLoop(Loop): - 'Loop for drawing/updating TUI.' - - def __init__(self, term: Terminal, *args, **kwargs) -> None: - self._term = term - self._windows = [Window(0, self._term)] - self._window_idx = 0 - self._conn_windows: list[ConnectionWindow] = [] - super().__init__(*args, **kwargs) - self.put(Event(EventType.SET_SCREEN)) - - def _cmd_name_to_cmd(self, cmd_name: str) -> Optional[Callable]: - cmd_name = CMD_SHORTCUTS.get(cmd_name, cmd_name) - cmd_parent = self - while True: - cmd_name_toks = cmd_name.split('.', maxsplit=1) - if len(cmd_name_toks) == 1: - break - if not hasattr(cmd_parent, cmd_name_toks[0]): - return None - cmd_parent = getattr(cmd_parent, cmd_name_toks[0]) - cmd_name = cmd_name_toks[1] - cmd_name = f'cmd__{cmd_name}' - if not hasattr(cmd_parent, cmd_name): - return None - return getattr(cmd_parent, cmd_name) - - def process_main(self, event: Event) -> bool: - if not super().process_main(event): - return False - if event.type_ == EventType.SET_SCREEN: - self._term.calc_geometry() - for window in self._windows: - window.set_geometry() - self.window.draw() - elif event.type_ == EventType.CONN_WINDOW: - conn_win = ConnectionWindow(broadcast=self.broadcast, - conn_idx=event.payload[0], - idx=len(self._windows), - term=self._term) - conn_win.prompt.update_prompt(nick_confirmed=False, - nick=event.payload[1]) - self._windows += [conn_win] - self._conn_windows += [conn_win] - self._switch_window(conn_win.idx) - elif event.type_ == EventType.ALERT: - self.window.log.append(f'ALERT {event.payload}') - self.window.log.draw() - elif event.type_ in {EventType.RECV, EventType.SEND, - EventType.CONN_ALERT}: - conn_win = self._conn_windows[event.payload[0]] - if event.type_ == EventType.CONN_ALERT: - msg = f'ALERT {event.payload[1]}' - else: - msg = (('<-' if event.type_ == EventType.RECV else '->') - + event.payload[1].raw) - conn_win.log.append(msg) - if conn_win == self.window: - self.window.log.draw() - elif event.type_ == EventType.NICK_SET: - conn_win = self._conn_windows[event.payload[0]] - conn_win.prompt.update_prompt(nick_confirmed=True, - nick=event.payload[1]) - if conn_win == self.window: - self.window.prompt.draw() - elif event.type_ == EventType.DISCONNECTED: - conn_win = self._conn_windows[event.payload[0]] - conn_win.prompt.update_prompt(nick_confirmed=False) - if conn_win == self.window: - self.window.prompt.draw() - elif event.type_ == EventType.KEYBINDING: - cmd = self._cmd_name_to_cmd(event.payload[0]) - assert cmd is not None - cmd(*event.payload[1:]) - elif event.type_ == EventType.PROMPT_ADD: - self.window.prompt.append(event.payload) - # elif event.type_ == EventType.DEBUG: - # from traceback import format_exception - # for line in '\n'.join(format_exception(event.payload) - # ).split('\n'): - # self.window.log.append(f'DEBUG {line}') - # self.window.log.draw() - else: - return True - self._term.flush() - return True - - @property - def window(self) -> Window: - 'Currently selected Window.' - return self._windows[self._window_idx] - - def _switch_window(self, idx: int) -> None: - self._window_idx = idx - self.window.draw() - - def cmd__connect(self, hostname: str, nickname: str, realname: str - ) -> None: - 'Send INIT_CONNECT command to main loop.' - login = LoginNames(user=getusername(), nick=nickname, real=realname) - self.broadcast(EventType.INIT_CONNECT, (hostname, login)) - - def cmd__prompt_enter(self) -> None: - 'Get prompt content from .window.prompt.enter, parse to & run command.' - to_parse = self.window.prompt.enter() - if not to_parse: - return - alert: Optional[str] = None - if to_parse[0:1] == '/': - toks = to_parse[1:].split(maxsplit=1) - alert = f'{toks[0]} unknown' - cmd = self._cmd_name_to_cmd(toks[0]) - if cmd and cmd.__name__ != stack()[0].function: - params = signature(cmd).parameters - n_args_max = len(params) - n_args_min = len([p for p in params.values() - if p.default == inspect_empty]) - alert = f'{cmd.__name__} needs between {n_args_min} and '\ - f'{n_args_max} args' - if len(toks) == 1 and not n_args_min: - alert = cmd() - elif len(toks) > 1 and params\ - and n_args_min <= len(toks[1].split()): - args = [] - while len(toks) > 1 and n_args_max: - toks = toks[1].split(maxsplit=1) - args += [toks[0]] - n_args_max -= 1 - alert = cmd(*args) - else: - alert = 'not prefixed by /' - if alert: - self.broadcast(EventType.ALERT, f'invalid prompt command: {alert}') - - def cmd__quit(self) -> None: - 'Send QUIT to all threads.' - self.broadcast(EventType.QUIT) - - def cmd__window(self, towards: str) -> Optional[str]: - 'Switch window selection.' - n_windows = len(self._windows) - if n_windows < 2: - return 'no alternate window to move into' - if towards in {'left', 'right'}: - multiplier = (+1) if towards == 'right' else (-1) - window_idx = self._window_idx + multiplier - if not 0 <= window_idx < n_windows: - window_idx -= multiplier * n_windows - elif not towards.isdigit(): - return f'neither "left"/"right" nor integer: {towards}' - else: - window_idx = int(towards) - if not 0 <= window_idx < n_windows: - return f'unavailable window idx: {window_idx}' - self._switch_window(window_idx) - return None - - -class ConnectionLoop(Loop): - 'Loop receiving and translating socket messages towards main loop.' - - def __init__(self, connection_idx: int, *args, **kwargs) -> None: - self._conn_idx = connection_idx - super().__init__(*args, **kwargs) - - def _broadcast_conn(self, type_: EventType, *args) -> None: - self.broadcast(type_, (self._conn_idx, *args)) - - def _send(self, verb: str, parameters: tuple[str, ...]) -> None: - self._broadcast_conn(EventType.SEND, IrcMessage(verb, parameters)) - - def process_main(self, event: Event) -> bool: - if not super().process_main(event): - return False - if event.type_ == EventType.CONNECTED: - login = event.payload[1] - self._send('USER', (login.user, '0', '*', login.real)) - self._send('NICK', (login.nick,)) - return True - - def process_bonus(self, yielded: str) -> None: - msg = IrcMessage.from_raw(yielded) - self._broadcast_conn(EventType.RECV, msg) - if msg.verb == 'PING': - self._send('PONG', (msg.parameters[0],)) - elif msg.verb == 'ERROR'\ - and msg.parameters[0].startswith('Closing link:'): - self._broadcast_conn(EventType.DISCONNECTED) - elif msg.verb == '001': - self._broadcast_conn(EventType.NICK_SET, msg.parameters[0]) - - -class KeyboardLoop(Loop): - 'Loop receiving and translating keyboard events towards main loop.' - - def process_bonus(self, yielded: str) -> None: - if yielded.startswith(B64_PREFIX): - encoded = yielded[len(B64_PREFIX):] - to_paste = '' - for i, c in enumerate(b64decode(encoded).decode('utf-8')): - if i > 512: - break - if c.isprintable(): - to_paste += c - elif c.isspace(): - to_paste += ' ' - else: - to_paste += '#' - self.broadcast(EventType.PROMPT_ADD, to_paste) - elif yielded in KEYBINDINGS: - self.broadcast(EventType.KEYBINDING, KEYBINDINGS[yielded]) - elif len(yielded) == 1: - self.broadcast(EventType.PROMPT_ADD, yielded) - else: - self.broadcast(EventType.ALERT, - f'unknown keyboard input: {yielded}') +from ircplom.events import EventQueue, EventType +from ircplom.irc_conn import IrcConnection +from ircplom.tui import Terminal def run() -> None: