From: Christian Heller Date: Wed, 23 Jul 2025 06:40:07 +0000 (+0200) Subject: Major re-write esp. of Event system, but lots of other stuff too. X-Git-Url: https://plomlompom.com/repos/booking/%22https:/validator.w3.org/%7B%7Bprefix%7D%7D/templates?a=commitdiff_plain;p=ircplom Major re-write esp. of Event system, but lots of other stuff too. --- diff --git a/ircplom.py b/ircplom.py index 3fcceb6..16b6375 100755 --- a/ircplom.py +++ b/ircplom.py @@ -1,31 +1,33 @@ #!/usr/bin/env python3 'Attempt at an IRC client.' -from ircplom.events import EventQueue, ExceptionEvent, QuitEvent -from ircplom.irc_conn import ConnEvent, InitConnectEvent, IrcConnection -from ircplom.tui import Terminal +from queue import SimpleQueue +from ircplom.events import ExceptionEvent, QuitEvent +from ircplom.irc_conn import ClientsDb, ClientEvent, NewClientEvent +from ircplom.tui import Terminal, Tui, TuiEvent def main_loop() -> None: 'Main execution code / loop.' - q_to_main: EventQueue = EventQueue() - connections: set[IrcConnection] = set() + q_events: SimpleQueue = SimpleQueue() + clients_db: ClientsDb = {} try: - with Terminal().context(q_to_main) as term: + with Terminal(q_out=q_events).setup() as term: + tui = Tui(q_out=q_events, term=term) while True: - event = q_to_main.get() - term.tui.put(event) + event = q_events.get() if isinstance(event, QuitEvent): break if isinstance(event, ExceptionEvent): raise event.payload - if isinstance(event, InitConnectEvent): - connections.add(IrcConnection(q_to_main=q_to_main, - *event.payload)) - elif isinstance(event, ConnEvent): - event.conn.handle(event) + if isinstance(event, TuiEvent): + event.affect(tui) + elif isinstance(event, NewClientEvent): + event.affect(clients_db) + elif isinstance(event, ClientEvent): + event.affect(clients_db[event.client_id]) finally: - for conn in connections: - conn.close() + for client in clients_db.values(): + client.close() if __name__ == '__main__': diff --git a/ircplom/events.py b/ircplom/events.py index e912edc..339b1fc 100644 --- a/ircplom/events.py +++ b/ircplom/events.py @@ -1,15 +1,24 @@ 'Event system with event loop.' -from queue import SimpleQueue as EventQueue, Empty as QueueEmpty +from abc import abstractmethod, ABC +from queue import SimpleQueue, Empty as QueueEmpty from threading import Thread -from typing import Iterator, Literal, Optional, Self +from typing import Any, Iterator, Literal, Self class Event: 'Communication unit between threads.' +class AffectiveEvent(Event, ABC): + 'For Events that are to affect other objects.' + + @abstractmethod + def affect(self, target: Any) -> None: + 'To be run by main loop on target.' + + class PayloadMixin: - 'To extend Event with .payload= passed as first argument.' + 'Extends with .payload= passed as first argument.' def __init__(self, payload, **kwargs) -> None: super().__init__(**kwargs) @@ -17,49 +26,38 @@ class PayloadMixin: class ExceptionEvent(Event, PayloadMixin): - 'To signal Exception to be handled by receiver.' + 'To deliver Exception to main loop for handling.' payload: Exception class QuitEvent(Event): - 'To signal any receiver to exit.' + 'To break main loop towards.' + + +class QueueMixin: + 'Adds SimpleQueue addressable via ._put(Event).' + + def __init__(self, q_out: SimpleQueue, **kwargs) -> None: + self._q_out = q_out + super().__init__(**kwargs) + + def _put(self, event: Event) -> None: + self._q_out.put(event) -class BroadcastMixin: - 'To provide .broadcast via newly assigned ._q_to_main.' +class Loop(QueueMixin): + 'Wraps thread looping over iterator, communicating back via q_out.' - def __init__(self, q_to_main: EventQueue, **kwargs) -> None: + def __init__(self, iterator: Iterator, **kwargs) -> None: super().__init__(**kwargs) - self._q_to_main = q_to_main - - def broadcast[E: Event](self, - event_class: type[E], - *args, **kwargs - ) -> None: - "Put event to main loop; if event_class PayloadMixin'd, payload=args." - if PayloadMixin in event_class.__mro__: - kwargs['payload'] = args if len(args) > 1 else args[0] - args = tuple() - self._q_to_main.put(event_class(*args, **kwargs)) - - -class Loop(BroadcastMixin): - 'Wraps thread looping over input queue, potential bonus iterator.' - - def __init__(self, - q_to_main: EventQueue, - bonus_iterator: Optional[Iterator] = None, - **kwargs - ) -> None: - super().__init__(q_to_main=q_to_main, **kwargs) - self._bonus_iterator = bonus_iterator - self._q_input: EventQueue = EventQueue() + self._q_quit: SimpleQueue = SimpleQueue() + self._iterator = iterator self._thread = Thread(target=self._loop, daemon=False) self._thread.start() def stop(self) -> None: - 'Emit QuitEvent to break threaded loop, then wait for break.' - self.put(QuitEvent()) + 'Break threaded loop, but wait for it to finish properly.' + self._q_quit.put(None) self._thread.join() def __enter__(self) -> Self: @@ -69,38 +67,20 @@ class Loop(BroadcastMixin): self.stop() return False # re-raise any exception that above ignored - def put(self, event: Event) -> None: - 'Send event into thread loop.' - self._q_input.put(event) - - def process_main(self, event: Event) -> bool: - 'Process event yielded from input queue.' - if isinstance(event, QuitEvent): - return False - return True - - def process_bonus(self, yielded: str) -> None: - 'Process bonus iterator yield.' - def _loop(self) -> None: - 'Loop over input queue and, if provided, bonus iterator.' try: while True: try: - yield_main = self._q_input.get( - block=True, - timeout=0 if self._bonus_iterator else None) + self._q_quit.get(block=True, timeout=0) except QueueEmpty: pass else: - if self.process_main(yield_main) is False: - break - if self._bonus_iterator: - try: - yield_bonus = next(self._bonus_iterator) - except StopIteration: - break - if yield_bonus: - self.process_bonus(yield_bonus) + break + try: + it_yield = next(self._iterator) + except StopIteration: + break + if it_yield is not None: + self._put(it_yield) except Exception as e: # pylint: disable=broad-exception-caught - self.broadcast(ExceptionEvent, e) + self._put(ExceptionEvent(e)) diff --git a/ircplom/irc_conn.py b/ircplom/irc_conn.py index 289ecfd..6522e58 100644 --- a/ircplom/irc_conn.py +++ b/ircplom/irc_conn.py @@ -1,15 +1,17 @@ 'IRC server connection management.' # built-ins -from dataclasses import dataclass +from abc import ABC, abstractmethod +from getpass import getuser from socket import socket, gaierror as socket_gaierror from threading import Thread from typing import Callable, Iterator, NamedTuple, Optional, Self +from uuid import uuid4, UUID # ourselves -from ircplom.events import BroadcastMixin, Event, Loop, PayloadMixin +from ircplom.events import ( + AffectiveEvent, ExceptionEvent, Loop, PayloadMixin, QueueMixin) -TIMEOUT_LOOP = 0.1 - +_TIMEOUT_RECV_LOOP = 0.1 _TIMEOUT_CONNECT = 5 _CONN_RECV_BUFSIZE = 1024 _PORT = 6667 @@ -21,25 +23,27 @@ _IRCSPEC_TAG_ESCAPES = ((r'\:', ';'), (r'\r', '\r'), (r'\\', '\\')) +ClientsDb = dict[UUID, 'Client'] + -class _IrcMessage: +class IrcMessage: 'Properly structured representation of IRC message as per IRCv3 spec.' _raw: Optional[str] = None def __init__(self, verb: str, - parameters: Optional[tuple[str, ...]] = None, + params: Optional[tuple[str, ...]] = None, source: str = '', tags: Optional[dict[str, str]] = None ) -> None: self.verb: str = verb - self.parameters: tuple[str, ...] = parameters or tuple() + self.params: tuple[str, ...] = params or tuple() self.source: str = source self.tags: dict[str, str] = tags or {} @classmethod def from_raw(cls, raw_msg: str) -> Self: - 'Parse raw IRC message line into properly structured _IrcMessage.' + 'Parse raw IRC message line into properly structured IrcMessage.' class _Stage(NamedTuple): name: str @@ -77,7 +81,7 @@ class _IrcMessage: stages = [_Stage('tags', '@', _parse_tags), _Stage('source', ':'), _Stage('verb', None, lambda s: s.upper()), - _Stage('parameters', None, _split_params)] + _Stage('params', None, _split_params)] harvest = {s.name: '' for s in stages} idx_stage = 0 stage = None @@ -115,122 +119,128 @@ class _IrcMessage: tag_strs[-1] += f'={val}' to_combine += ['@' + ';'.join(tag_strs)] to_combine += [self.verb] - if self.parameters: - to_combine += self.parameters[:-1] - to_combine += [f':{self.parameters[-1]}'] + if self.params: + to_combine += self.params[:-1] + to_combine += [f':{self.params[-1]}'] self._raw = ' '.join(to_combine) return self._raw -@dataclass -class LoginNames: - 'Collects the names needed on server connect for USER, NICK commands.' - user: str - nick: str - real: str - nick_confirmed: bool = False - - -class ConnMixin: - 'Collects an IrcConnection at .conn.' +class ClientIdMixin: + 'Collects a Client\'s ID at .client_id.' - def __init__(self, conn: 'IrcConnection', **kwargs) -> None: + def __init__(self, client_id: UUID, **kwargs) -> None: super().__init__(**kwargs) - self.conn = conn + self.client_id = client_id -class InitConnectEvent(Event, PayloadMixin): - 'Event to trigger connection, with payload (host, LoginNames).' - payload: tuple[str, LoginNames] +class NewClientEvent(AffectiveEvent, PayloadMixin): + 'Put Client .payload into ClientsDb target.' + payload: 'Client' + def affect(self, target: ClientsDb) -> None: + target[self.payload.id_] = self.payload -class ConnEvent(Event, ConnMixin): - 'Event with .conn.' +class ClientEvent(AffectiveEvent, ClientIdMixin): + 'To affect Client identified by ClientIdMixin.' -class _ConnectedEvent(ConnEvent): - 'Event to signal opening of connection.' +class _ConnectedEvent(ClientEvent): -class _DisconnectedEvent(ConnEvent): - 'Event to signal closing of connection' + def affect(self, target: 'Client') -> None: + target.send(IrcMessage(verb='USER', params=(getuser(), '0', '*', + target.realname))) + target.send(IrcMessage(verb='NICK', params=(target.nickname,))) -class InitReconnectEvent(ConnEvent): - 'Event to trigger re-opening of connection.' +class InitReconnectEvent(ClientEvent): + 'To trigger re-opening of connection.' + def affect(self, target: 'Client') -> None: + if target.assumed_open: + target.log('ALERT: Reconnect called, but still seem connected, ' + 'so nothing to do.') + else: + target.start_connecting() -class LogConnEvent(ConnEvent, PayloadMixin): - 'Event to log payload into connection window.' - payload: str +class SendEvent(ClientEvent, PayloadMixin): + 'To trigger sending of payload to server.' + payload: IrcMessage + + def affect(self, target: 'Client') -> None: + target.send(self.payload) -class NickSetEvent(ConnEvent): - 'Event to signal nickname having been set server-side.' +class ClientQueueMixin(QueueMixin): + 'To QueueMixin adds _cput to extend ._put with client_id= setting.' + client_id_name = 'id_' -class _SendEvent(ConnEvent, PayloadMixin): - 'Event to trigger sending of payload to server.' - payload: _IrcMessage + def _cput(self, event_class, **kwargs) -> None: + self._put(event_class(client_id=getattr(self, self.client_id_name), + **kwargs)) -class IrcConnection(BroadcastMixin): +class Client(ABC, ClientQueueMixin): 'Abstracts socket connection, loop over it, and handling messages from it.' - def __init__(self, - hostname: str, - login: LoginNames, - **kwargs + def __init__(self, hostname: str, nickname: str, realname: str, **kwargs ) -> None: super().__init__(**kwargs) + self.id_ = uuid4() self._hostname = hostname - self.login = login self._socket: Optional[socket] = None - self._assumed_open = False - self._recv_loop: Optional[_RecvLoop] = None - self._start_connecting() + self.assumed_open = False + self._recv_loop: Optional[Loop] = None + self.realname = realname + self.update_login(nick_confirmed=False, nickname=nickname) + self.start_connecting() - def _start_connecting(self) -> None: + def start_connecting(self) -> None: + 'Start thread to initiate connection, from socket to recv loop.' def connect(self) -> None: - self._socket = socket() - self.broadcast(LogConnEvent, - f'Connecting to {self._hostname} …') - self._socket.settimeout(_TIMEOUT_CONNECT) try: - self._socket.connect((self._hostname, _PORT)) - except (TimeoutError, socket_gaierror) as e: - self.broadcast(LogConnEvent, f'ALERT: {e}') - return - self._socket.settimeout(TIMEOUT_LOOP) - self._assumed_open = True - self._recv_loop = _RecvLoop(conn=self, - q_to_main=self._q_to_main, - bonus_iterator=self._read_lines()) - self.broadcast(_ConnectedEvent) + self._socket = socket() + self.log(f'Connecting to {self._hostname} …') + self._socket.settimeout(_TIMEOUT_CONNECT) + try: + self._socket.connect((self._hostname, _PORT)) + except (TimeoutError, socket_gaierror) as e: + self.log(f'ALERT: {e}') + return + self._socket.settimeout(_TIMEOUT_RECV_LOOP) + self.assumed_open = True + self._recv_loop = Loop(iterator=self._read_lines(), + q_out=self._q_out) + self._cput(_ConnectedEvent) + except Exception as e: # pylint: disable=broad-exception-caught + self._put(ExceptionEvent(e)) Thread(target=connect, daemon=True, args=(self,)).start() - def broadcast[E: Event](self, - event_class: type[E], - *args, **kwargs - ) -> None: - 'Broadcast event subclassing ConnEvent, with self as its .conn.' - super().broadcast(event_class, conn=self, *args, **kwargs) + @abstractmethod + def log(self, msg: str) -> None: + 'Write msg into log, whatever shape that may have.' - def send(self, verb: str, parameters: tuple[str, ...]) -> None: - 'Broadcast _SendEvent for _IrcMessage(verb, parameters).' - self.broadcast(_SendEvent, _IrcMessage(verb, parameters)) + def send(self, msg: IrcMessage) -> None: + 'Send line-separator-delimited message over socket.' + if not (self._socket and self.assumed_open): + self.log('ALERT: cannot send, assuming connection closed.') + return + self._socket.sendall(msg.raw.encode('utf-8') + _IRCSPEC_LINE_SEPARATOR) + self.log(f'->: {msg.raw}') - def update_login(self, **kwargs) -> None: - 'Adapt .login attributes to kwargs, broadcast NickSetEvent.' - for key, val in kwargs.items(): - setattr(self.login, key, val) - self.broadcast(NickSetEvent) + def update_login(self, nick_confirmed: bool, nickname: str = '') -> None: + 'Manage .nickname, .nick_confirmed – useful for subclass extension.' + if nickname: + self.nickname = nickname + self.nick_confirmed = nick_confirmed def close(self) -> None: - 'Close both RecvLoop and socket.' - self._assumed_open = False + 'Close both recv Loop and socket.' + self.assumed_open = False self.update_login(nick_confirmed=False) if self._recv_loop: self._recv_loop.stop() @@ -239,8 +249,7 @@ class IrcConnection(BroadcastMixin): self._socket.close() self._socket = None - def _read_lines(self) -> Iterator[Optional[str]]: - 'Receive line-separator-delimited messages from socket.' + def _read_lines(self) -> Iterator[Optional['_RecvEvent']]: assert self._socket is not None bytes_total = b'' buffer_linesep = b'' @@ -267,50 +276,20 @@ class IrcConnection(BroadcastMixin): buffer_linesep += c_byted if buffer_linesep == _IRCSPEC_LINE_SEPARATOR: buffer_linesep = b'' - yield bytes_total.decode('utf-8') + yield _RecvEvent(client_id=self.id_, + payload=bytes_total.decode('utf-8')) bytes_total = b'' - def _write_line(self, line: str) -> None: - 'Send line-separator-delimited message over socket.' - if not (self._socket and self._assumed_open): - self.broadcast(LogConnEvent, - 'ALERT: cannot send, assuming connection closed.') - return - self._socket.sendall(line.encode('utf-8') + _IRCSPEC_LINE_SEPARATOR) - - def handle(self, event: ConnEvent) -> None: - 'Process connection-directed Event into further steps.' - if isinstance(event, InitReconnectEvent): - if self._assumed_open: - self.broadcast(LogConnEvent, - 'ALERT: Reconnect called, but still seem ' - 'connected, so nothing to do.') - else: - self._start_connecting() - elif isinstance(event, _ConnectedEvent): - # self.send('CAP', ('LS', '302')) - self.send('USER', (self.login.user, '0', '*', self.login.real)) - self.send('NICK', (self.login.nick,)) - # self.send('CAP', ('LIST',)) - # self.send('CAP', ('END',)) - elif isinstance(event, _DisconnectedEvent): - self.close() - elif isinstance(event, _SendEvent): - self.broadcast(LogConnEvent, f'->: {event.payload.raw}') - self._write_line(event.payload.raw) - - -class _RecvLoop(Loop, ConnMixin): - 'Loop to react on messages from server.' - - def process_bonus(self, yielded: str) -> None: - msg = _IrcMessage.from_raw(yielded) - self.conn.broadcast(LogConnEvent, f'<-: {msg.raw}') + +class _RecvEvent(ClientEvent, PayloadMixin): + payload: str + + def affect(self, target: Client) -> None: + msg = IrcMessage.from_raw(self.payload) + target.log(f'<-: {self.payload}') if msg.verb == 'PING': - self.conn.send('PONG', (msg.parameters[0],)) - elif msg.verb == 'ERROR'\ - and msg.parameters[0].startswith('Closing link:'): - self.conn.broadcast(_DisconnectedEvent) + target.send(IrcMessage(verb='PONG', params=(msg.params[0],))) + if msg.verb == 'ERROR' and msg.params[0].startswith('Closing link:'): + target.close() elif msg.verb in {'001', 'NICK'}: - self.conn.update_login(nick=msg.parameters[0], - nick_confirmed=True) + target.update_login(nickname=msg.params[0], nick_confirmed=True) diff --git a/ircplom/tui.py b/ircplom/tui.py index d05db64..27e2342 100644 --- a/ircplom/tui.py +++ b/ircplom/tui.py @@ -3,24 +3,25 @@ from abc import ABC, abstractmethod from base64 import b64decode from contextlib import contextmanager -from getpass import getuser as getusername from inspect import _empty as inspect_empty, signature, stack from signal import SIGWINCH, signal from typing import Callable, Generator, Iterator, NamedTuple, Optional +from uuid import UUID # requirements.txt from blessed import Terminal as BlessedTerminal # ourselves -from ircplom.events import (BroadcastMixin, Event, EventQueue, Loop, - PayloadMixin, QuitEvent) +from ircplom.events import (AffectiveEvent, Loop, PayloadMixin, QueueMixin, + QuitEvent) from ircplom.irc_conn import ( - ConnEvent, ConnMixin, InitConnectEvent, InitReconnectEvent, - LoginNames, LogConnEvent, NickSetEvent, TIMEOUT_LOOP) + IrcMessage, Client, ClientIdMixin, ClientQueueMixin, + InitReconnectEvent, NewClientEvent, SendEvent) _MIN_HEIGHT = 4 _MIN_WIDTH = 32 +_TIMEOUT_KEYPRESS_LOOP = 0.5 _B64_PREFIX = 'b64:' -_OSC52_PREFIX = ']52;c;' +_OSC52_PREFIX = b']52;c;' _PASTE_DELIMITER = '\007' _PROMPT_TEMPLATE = '> ' @@ -37,8 +38,8 @@ _KEYBINDINGS = { 'KEY_DOWN': ('window.prompt.scroll', 'down'), 'KEY_PGUP': ('window.log.scroll', 'up'), 'KEY_PGDOWN': ('window.log.scroll', 'down'), - '[91, 49, 59, 51, 68]': ('window', 'left'), - '[91, 49, 59, 51, 67]': ('window', 'right'), + 'esc:91:49:59:51:68': ('window', 'left'), + 'esc:91:49:59:51:67': ('window', 'right'), 'KEY_F1': ('window.paste',), } _CMD_SHORTCUTS = { @@ -48,28 +49,29 @@ _CMD_SHORTCUTS = { } -class _LogEvent(Event, PayloadMixin): - 'Event to trigger writing to current Window\'s LogWidget.' - payload: str +class TuiEvent(AffectiveEvent): + 'To affect TUI, and trigger flushed .draw_tainted on it.' + def affect(self, target: 'Tui') -> None: + target.window.draw_tainted() + target.term.flush() -class _SetScreenEvent(Event): - 'Event to trigger re-configuration of screen sizes.' +class _SetScreenEvent(TuiEvent): -class _TuiCmdEvent(Event, PayloadMixin): - 'Event to trigger call of .cmd__ method in TUI tree.' - payload: str + def affect(self, target: 'Tui') -> None: + target.term.calc_geometry() + for window in target.windows: + window.set_geometry() + super().affect(target) class _YX(NamedTuple): - '2-dimensional coordinate.' y: int x: int class _Widget(ABC): - 'Defines most basic TUI object API.' @abstractmethod def __init__(self, **kwargs) -> None: @@ -94,7 +96,6 @@ class _Widget(ABC): class _ScrollableWidget(_Widget, ABC): - 'Defines some API shared between _PromptWidget and _LogWidget.' _history_idx: int def __init__(self, write: Callable[..., None], **kwargs) -> None: @@ -116,7 +117,6 @@ class _ScrollableWidget(_Widget, ABC): class _LogWidget(_ScrollableWidget): - 'Collects line-shaped messages, scrolls and wraps them for display.' _view_size: _YX _y_pgscroll: int @@ -193,7 +193,6 @@ class _LogWidget(_ScrollableWidget): class _PromptWidget(_ScrollableWidget): - 'Keyboard-controlled command input field.' _y: int _width: int _history_idx: int = 0 @@ -202,6 +201,7 @@ class _PromptWidget(_ScrollableWidget): def __init__(self, **kwargs) -> None: super().__init__(**kwargs) + self.prefix = _PROMPT_TEMPLATE self._reset_buffer('') @property @@ -222,7 +222,7 @@ class _PromptWidget(_ScrollableWidget): def draw(self) -> bool: if not super().draw(): return False - prefix = self._prompt[:] + prefix = self.prefix[:] content = self._input_buffer if self._cursor_x == len(self._input_buffer): content += ' ' @@ -244,10 +244,6 @@ class _PromptWidget(_ScrollableWidget): self._write(to_write[cursor_x_to_write + 1:]) return True - @property - def _prompt(self) -> str: - return _PROMPT_TEMPLATE - def _archive_prompt(self) -> None: self.append(self._input_buffer) self._reset_buffer('') @@ -270,11 +266,11 @@ class _PromptWidget(_ScrollableWidget): elif self._input_buffer: self._archive_prompt() - def cmd__append(self, to_append: str) -> None: - 'Append to prompt input buffer.' - self._cursor_x += len(to_append) + def insert(self, to_insert: str) -> None: + 'Insert into prompt input buffer.' + self._cursor_x += len(to_insert) self._input_buffer = (self._input_buffer[:self._cursor_x - 1] - + to_append + + to_insert + self._input_buffer[self._cursor_x - 1:]) self._history_idx = 0 @@ -291,7 +287,7 @@ class _PromptWidget(_ScrollableWidget): if direction == 'left' and self._cursor_x > 0: self._cursor_x -= 1 elif direction == 'right'\ - and self._cursor_x <= len(self._input_buffer): + and self._cursor_x < len(self._input_buffer): self._cursor_x += 1 else: return @@ -309,18 +305,7 @@ class _PromptWidget(_ScrollableWidget): return to_return -class _ConnectionPromptWidget(_PromptWidget, ConnMixin): - 'PromptWidget with attributes, methods for dealing with an IrcConnection.' - - @property - def _prompt(self) -> str: - return ((' ' if self.conn.login.nick_confirmed else '?') - + self.conn.login.nick - + super()._prompt) - - class _Window(_Widget): - 'Collects a log and a prompt meant for the same content stream.' _y_status: int prompt: _PromptWidget @@ -329,8 +314,7 @@ class _Window(_Widget): self.idx = idx self._term = term self.log = _LogWidget(wrap=self._term.wrap, write=self._term.write) - self.prompt = self.__annotations__['prompt'](write=self._term.write, - **kwargs) + self.prompt = self.__annotations__['prompt'](write=self._term.write) if hasattr(self._term, 'size'): self.set_geometry() @@ -369,7 +353,7 @@ class _Window(_Widget): def cmd__paste(self) -> None: 'Write OSC 52 ? sequence to get encoded clipboard paste into stdin.' - self._term.write(f'\033{_OSC52_PREFIX}?{_PASTE_DELIMITER}', + self._term.write(f'\033{_OSC52_PREFIX.decode()}?{_PASTE_DELIMITER}', self._y_status) self.tainted = True @@ -382,31 +366,41 @@ class _Window(_Widget): widget.draw() -class _ConnectionWindow(_Window, ConnMixin): - 'Window with attributes and methods for dealing with an IrcConnection.' - prompt: _ConnectionPromptWidget +class _ClientWindow(_Window, ClientQueueMixin): + client_id_name = 'client_id' + + def __init__(self, client_id: UUID, **kwargs) -> None: + self.client_id = client_id + super().__init__(**kwargs) def cmd__disconnect(self, quit_msg: str = 'ircplom says bye') -> None: 'Send QUIT command to server.' - self.conn.send('QUIT', (quit_msg, )) + self._cput(SendEvent, payload=IrcMessage(verb='QUIT', + params=(quit_msg,))) def cmd__reconnect(self) -> None: 'Attempt reconnection.' - self.conn.broadcast(InitReconnectEvent) + self._cput(InitReconnectEvent) def cmd__nick(self, new_nick: str) -> None: 'Attempt nickname change.' - self.conn.send('NICK', (new_nick, )) + self._cput(SendEvent, payload=IrcMessage(verb='NICK', + params=(new_nick,))) -class _KeyboardLoop(Loop, BroadcastMixin): - 'Loop receiving and translating keyboard events towards main loop.' +class _KeyboardEvent(TuiEvent, PayloadMixin): + payload: str - def process_bonus(self, yielded: str) -> None: - if yielded and ord(yielded[0]) == _ORD_CHAR_RESIZE: - self.broadcast(_SetScreenEvent) - elif yielded.startswith(_B64_PREFIX): - encoded = yielded[len(_B64_PREFIX):] + def affect(self, target: 'Tui') -> None: + if self.payload[0] == _ORD_CHAR_RESIZE: + _SetScreenEvent().affect(target) + if self.payload in _KEYBINDINGS: + cmd_data = _KEYBINDINGS[self.payload] + cmd = target.cmd_name_to_cmd(cmd_data[0]) + if cmd: + cmd(*cmd_data[1:]) + elif self.payload.startswith(_B64_PREFIX): + encoded = self.payload[len(_B64_PREFIX):] to_paste = '' for i, c in enumerate(b64decode(encoded).decode('utf-8')): if i > 512: @@ -417,28 +411,26 @@ class _KeyboardLoop(Loop, BroadcastMixin): to_paste += ' ' else: to_paste += '#' - self.broadcast(_TuiCmdEvent, ('window.prompt.append', to_paste)) - elif yielded in _KEYBINDINGS: - self.broadcast(_TuiCmdEvent, _KEYBINDINGS[yielded]) - elif len(yielded) == 1: - self.broadcast(_TuiCmdEvent, ('window.prompt.append', yielded)) + target.window.prompt.insert(to_paste) + elif len(self.payload) == 1: + target.window.prompt.insert(self.payload) else: - self.broadcast(_LogEvent, - f'ALERT: unknown keyboard input: {yielded}') + target.log(f'ALERT: unknown keyboard input: {self.payload}') + super().affect(target) -class _TuiLoop(Loop, BroadcastMixin): - 'Loop for drawing/updating TUI.' +class Tui(QueueMixin): + 'Base for graphical user interface elements.' def __init__(self, term: 'Terminal', **kwargs) -> None: - self._term = term - self._windows = [_Window(0, self._term)] - self._window_idx = 0 - self._conn_windows: list[_ConnectionWindow] = [] super().__init__(**kwargs) - self.put(_SetScreenEvent()) + self.term = term + self._window_idx = 0 + self.windows = [_Window(idx=self._window_idx, term=self.term)] + self._put(_SetScreenEvent()) - def _cmd_name_to_cmd(self, cmd_name: str) -> Optional[Callable]: + def cmd_name_to_cmd(self, cmd_name: str) -> Optional[Callable]: + 'Map cmd_name to executable TUI element method.' cmd_name = _CMD_SHORTCUTS.get(cmd_name, cmd_name) cmd_parent = self while True: @@ -454,45 +446,14 @@ class _TuiLoop(Loop, BroadcastMixin): return None return getattr(cmd_parent, cmd_name) - def process_main(self, event: Event) -> bool: - if not super().process_main(event): - return False - if isinstance(event, _SetScreenEvent): - self._term.calc_geometry() - for window in self._windows: - window.set_geometry() - elif isinstance(event, _LogEvent): - self.window.log.append(event.payload) - elif isinstance(event, _TuiCmdEvent): - cmd = self._cmd_name_to_cmd(event.payload[0]) - assert cmd is not None - cmd(*event.payload[1:]) - elif isinstance(event, ConnEvent): - matching_wins = [cw for cw in self._conn_windows - if cw.conn == event.conn] - if matching_wins: - conn_win = matching_wins[0] - else: - conn_win = _ConnectionWindow(idx=len(self._windows), - conn=event.conn, - term=self._term) - self._windows += [conn_win] - self._conn_windows += [conn_win] - self._switch_window(conn_win.idx) - if isinstance(event, LogConnEvent): - conn_win.log.append(event.payload) - elif isinstance(event, NickSetEvent): - conn_win.prompt.tainted = True - else: - return True - self.window.draw_tainted() - self._term.flush() - return True - @property def window(self) -> _Window: - 'Currently selected Window.' - return self._windows[self._window_idx] + 'Currently selected _Window.' + return self.windows[self._window_idx] + + def log(self, msg: str) -> None: + 'Post msg to active window\'s log.' + self.window.log.append(msg) def _switch_window(self, idx: int) -> None: self._window_idx = idx @@ -500,9 +461,15 @@ class _TuiLoop(Loop, BroadcastMixin): def cmd__connect(self, hostname: str, nickname: str, realname: str ) -> None: - 'Broadcast InitConnectEvent.' - login = LoginNames(user=getusername(), nick=nickname, real=realname) - self.broadcast(InitConnectEvent, (hostname, login)) + 'Create Client and pass it via NewClientEvent.' + client = _ClientKnowingTui(q_out=self._q_out, hostname=hostname, + nickname=nickname, realname=realname) + new_idx = len(self.windows) + self.windows += [_ClientWindow(idx=new_idx, term=self.term, + q_out=self._q_out, + client_id=client.id_)] + self._switch_window(new_idx) + self._put(NewClientEvent(client)) def cmd__prompt_enter(self) -> None: 'Get prompt content from .window.prompt.enter, parse to & run command.' @@ -513,7 +480,7 @@ class _TuiLoop(Loop, BroadcastMixin): if to_parse[0:1] == '/': toks = to_parse[1:].split(maxsplit=1) alert = f'{toks[0]} unknown' - cmd = self._cmd_name_to_cmd(toks[0]) + cmd = self.cmd_name_to_cmd(toks[0]) if cmd and cmd.__name__ != stack()[0].function: params = signature(cmd).parameters n_args_max = len(params) @@ -534,15 +501,15 @@ class _TuiLoop(Loop, BroadcastMixin): else: alert = 'not prefixed by /' if alert: - self.broadcast(_LogEvent, f'invalid prompt command: {alert}') + self.log(f'invalid prompt command: {alert}') def cmd__quit(self) -> None: - 'Send QUIT to all threads.' - self.broadcast(QuitEvent) + 'Trigger program exit.' + self._put(QuitEvent()) def cmd__window(self, towards: str) -> Optional[str]: 'Switch window selection.' - n_windows = len(self._windows) + n_windows = len(self.windows) if n_windows < 2: return 'no alternate window to move into' if towards in {'left', 'right'}: @@ -560,25 +527,26 @@ class _TuiLoop(Loop, BroadcastMixin): return None -class Terminal: +class Terminal(QueueMixin): 'Abstraction of terminal interface.' size: _YX - tui: _TuiLoop - _blessed: BlessedTerminal _cursor_yx_: _YX - @contextmanager - def context(self, q_to_main: EventQueue) -> Generator: - 'Combine multiple contexts into one.' - signal(SIGWINCH, lambda *_: q_to_main.put(_SetScreenEvent())) + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) self._blessed = BlessedTerminal() + self._cursor_yx = _YX(0, 0) + + @contextmanager + def setup(self) -> Generator: + 'Combine multiple contexts into one and run keypress loop.' + signal(SIGWINCH, lambda *_: self._put(_SetScreenEvent())) + self.clear() with (self._blessed.raw(), self._blessed.fullscreen(), self._blessed.hidden_cursor(), - _KeyboardLoop(q_to_main, self.get_keypresses())): - self._cursor_yx = _YX(0, 0) - with _TuiLoop(self, q_to_main=q_to_main) as self.tui: - yield self + Loop(iterator=self._get_keypresses(), q_out=self._q_out)): + yield self @property def _cursor_yx(self) -> _YX: @@ -629,45 +597,68 @@ class Terminal: print(msg, end='') self._cursor_yx = _YX(self._cursor_yx.y, end_x) - def get_keypresses(self) -> Iterator[str]: - '''Loop through keypresses from terminal, collect what blessed ignores. + def _get_keypresses(self) -> Iterator[Optional[_KeyboardEvent]]: + '''Loop through keypresses from terminal, expand blessed's handling. - (Notably, blessed seems to junk any alt/escape-modifide key events it - does not explicitly know. + Explicitly collect KEY_ESCAPE-modified key sequences, and recognize + OSC52-prefixed pastables to return the respective base64 code, + prefixed with _B64_PREFIX. ''' - n_gotchs_unprocessed = 0 while True: - new_gotchs = [] - if not n_gotchs_unprocessed: - while self._blessed.kbhit(TIMEOUT_LOOP): - gotch = self._blessed.getch() - self._blessed.ungetch(gotch) - new_gotchs += [gotch] - if not self._blessed.kbhit(0): - break - n_gotchs_unprocessed += len(new_gotchs) - blessed_key = self._blessed.inkey(timeout=0, esc_delay=0) - n_chs_blessed_key = len(blessed_key.encode('utf-8')) - unhandleds = [] - if len(new_gotchs) > 1 and blessed_key.name == 'KEY_ESCAPE': - for _ in range(len(new_gotchs) - n_chs_blessed_key): - unhandled = self._blessed.inkey(timeout=0, esc_delay=0) - unhandleds += list(unhandled.encode('utf-8')) - n_gotchs_unprocessed -= 1 - n_gotchs_unprocessed -= n_chs_blessed_key - if unhandleds: - fused = ''.join([chr(n) for n in unhandleds]) - if fused.startswith(_OSC52_PREFIX): - if not (encoded := fused[len(_OSC52_PREFIX):]): - while True: - gotch = self._blessed.getch() - if gotch == _PASTE_DELIMITER: - break - encoded += gotch - yield f'{_B64_PREFIX}{encoded}' - continue - yield str(unhandleds) - elif blessed_key.name: - yield blessed_key.name + to_yield = '' + ks = self._blessed.inkey( + timeout=_TIMEOUT_KEYPRESS_LOOP, # how long until yield None, + esc_delay=0) # incl. until thread dies + if ks.name != 'KEY_ESCAPE': + to_yield = f'{ks.name if ks.name else ks}' else: - yield str(blessed_key) + chars = b'' + while (new_chars := self._blessed.inkey(timeout=0, esc_delay=0 + ).encode('utf-8')): + chars += new_chars + if chars[:len(_OSC52_PREFIX)] == _OSC52_PREFIX: + to_yield = _B64_PREFIX[:] + while (gotch := self._blessed.getch()) != _PASTE_DELIMITER: + to_yield += gotch + else: + to_yield = 'esc:' + ':'.join([str(int(b)) for b in chars]) + yield _KeyboardEvent(to_yield) if to_yield else None + + +class _ClientWindowEvent(TuiEvent, ClientIdMixin): + + def client_win(self, target: Tui) -> _ClientWindow: + 'Identifies proper _ClientWindow in target TUI.' + return [win for win in target.windows + if isinstance(win, _ClientWindow) + and win.client_id == self.client_id][0] + + +class _ClientLogEvent(_ClientWindowEvent, PayloadMixin): + payload: str + + def affect(self, target: Tui) -> None: + self.client_win(target).log.append(self.payload) + super().affect(target) + + +class _ClientPromptEvent(_ClientWindowEvent, PayloadMixin): + payload: tuple[str, str] + + def affect(self, target: Tui) -> None: + prompt = self.client_win(target).prompt + prompt.prefix = ((' ' if self.payload[0] else '?') + + f'{self.payload[1]}{_PROMPT_TEMPLATE}') + prompt.tainted = True + super().affect(target) + + +class _ClientKnowingTui(Client): + + def log(self, msg: str) -> None: + self._cput(_ClientLogEvent, payload=msg) + + def update_login(self, nick_confirmed: bool, nickname: str = '') -> None: + super().update_login(nick_confirmed, nickname) + self._cput(_ClientPromptEvent, payload=(self.nick_confirmed, + self.nickname))