From: Christian Heller Date: Mon, 4 Aug 2025 09:41:28 +0000 (+0200) Subject: Major restructuring. X-Git-Url: https://plomlompom.com/repos/booking/%7B%7B%20web_path%20%7D%7D/static/%7B%7Bdb.prefix%7D%7D/day?a=commitdiff_plain;h=046a819ee817c6735f6fb0e41edcb36c6b143105;p=ircplom Major restructuring. --- diff --git a/ircplom.py b/ircplom.py index 16b6375..b10fb65 100755 --- a/ircplom.py +++ b/ircplom.py @@ -2,8 +2,9 @@ 'Attempt at an IRC client.' from queue import SimpleQueue from ircplom.events import ExceptionEvent, QuitEvent -from ircplom.irc_conn import ClientsDb, ClientEvent, NewClientEvent -from ircplom.tui import Terminal, Tui, TuiEvent +from ircplom.client import ClientsDb, ClientEvent, NewClientEvent +from ircplom.tui_base import Terminal, TuiEvent +from ircplom.client_tui import ClientTui def main_loop() -> None: @@ -12,7 +13,7 @@ def main_loop() -> None: clients_db: ClientsDb = {} try: with Terminal(q_out=q_events).setup() as term: - tui = Tui(q_out=q_events, term=term) + tui = ClientTui(q_out=q_events, term=term) while True: event = q_events.get() if isinstance(event, QuitEvent): diff --git a/ircplom/client.py b/ircplom/client.py new file mode 100644 index 0000000..9ddea32 --- /dev/null +++ b/ircplom/client.py @@ -0,0 +1,277 @@ +'High-level IRC protocol / server connection management.' +# built-ins +from abc import ABC, abstractmethod +from dataclasses import dataclass +from getpass import getuser +from threading import Thread +from typing import Optional +from uuid import UUID, uuid4 +# ourselves +from ircplom.events import (AffectiveEvent, ExceptionEvent, PayloadMixin, + QueueMixin) +from ircplom.irc_conn import (BaseIrcConnection, IrcConnAbortException, + IrcMessage) + +ClientsDb = dict[UUID, 'Client'] +CHAT_GLOB = '*' + + +@dataclass +class ClientIdMixin: + 'Collects a Client\'s ID at .client_id.' + client_id: UUID + + +@dataclass +class ClientEvent(AffectiveEvent, ClientIdMixin): + 'To affect Client identified by ClientIdMixin.' + + +class _IrcConnection(BaseIrcConnection): + + def __init__(self, client_id: UUID, **kwargs) -> None: + # TODO: find out why I can't just ClientIdMixin here + self.client_id = client_id + super().__init__(**kwargs) + + def _make_recv_event(self, msg: IrcMessage) -> ClientEvent: + + @dataclass + class _RecvEvent(ClientEvent, PayloadMixin): + payload: IrcMessage + + def affect(self, target: 'Client') -> None: + target.handle_msg(self.payload) + + return _RecvEvent(client_id=self.client_id, payload=msg) + + +@dataclass +class ClientQueueMixin(QueueMixin): + 'To QueueMixin adds _cput to extend ._put with client_id= setting.' + client_id_name = 'id_' + + def _cput(self, event_class, **kwargs) -> None: + self._put(event_class(client_id=getattr(self, self.client_id_name), + **kwargs)) + + +@dataclass +class _ServerCapability: + 'Store data collectable via CAPS LS/LIST/NEW.' + enabled: bool + data: str + + def str_for_log(self, name: str) -> str: + 'Optimized for Client.log per-line listing.' + listing = '+' if self.enabled else '-' + listing += f' {name}' + if self.data: + listing += f' ({self.data})' + return listing + + +@dataclass +class IrcConnSetup: + 'All we need to know to set up a new Client connection.' + hostname: str + nickname: str + realname: str + + +class Client(ABC, ClientQueueMixin): + 'Abstracts socket connection, loop over it, and handling messages from it.' + nick_confirmed: bool = False + conn: Optional[_IrcConnection] = None + + def __init__(self, conn_setup: IrcConnSetup, **kwargs) -> None: + super().__init__(**kwargs) + self.conn_setup = conn_setup + self._cap_neg_states: dict[str, bool] = {} + self.caps: dict[str, _ServerCapability] = {} + self.id_ = uuid4() + self.update_login(nick_confirmed=False, + nickname=self.conn_setup.nickname) + self.start_connecting() + + def start_connecting(self) -> None: + 'Start thread to set up _IrcConnection at .conn.' + + def connect(self) -> None: + + @dataclass + class _ConnectedEvent(ClientEvent): + def affect(self, target: 'Client') -> None: + target.on_connect() + + try: + self.conn = _IrcConnection(hostname=self.conn_setup.hostname, + q_out=self.q_out, + client_id=self.id_) + self._cput(_ConnectedEvent) + except IrcConnAbortException as e: + self.log(f'# ALERT: {e}') + except Exception as e: # pylint: disable=broad-exception-caught + self._put(ExceptionEvent(e)) + + Thread(target=connect, daemon=True, args=(self,)).start() + + def on_connect(self) -> None: + 'Steps to perform right after connection.' + self.log(msg='# connected to server', chat=CHAT_GLOB) + self.try_send_cap('LS', ('302',)) + self.send(IrcMessage(verb='USER', + params=(getuser(), '0', '*', + self.conn_setup.realname))) + self.send(IrcMessage(verb='NICK', params=(self.conn_setup.nickname,))) + + def cap_neg_done(self, negotiation_step: str) -> bool: + 'Whether negotiation_step is registered as finished.' + return self._cap_neg_states.get(negotiation_step, False) + + def cap_neg(self, negotiation_step: str) -> bool: + 'Whether negotiation_step is registered at all (started or finished).' + return negotiation_step in self._cap_neg_states + + def cap_neg_set(self, negotiation_step: str, done: bool = False) -> None: + 'Declare negotiation_step started, or (if done) finished.' + self._cap_neg_states[negotiation_step] = done + + def try_send_cap(self, *params, key_fused: bool = False) -> None: + 'Run CAP command with params, handle cap neg. state.' + neg_state_key = ':'.join(params) if key_fused else params[0] + if self.cap_neg(neg_state_key): + return + self.send(IrcMessage(verb='CAP', params=params)) + self.cap_neg_set(neg_state_key) + + def collect_caps(self, params: tuple[str, ...]) -> None: + 'Record available and enabled server capabilities.' + verb = params[0] + items = params[-1].strip().split() + is_final_line = params[1] != '*' + if self.cap_neg_done(verb): + if verb == 'LS': + self.caps.clear() + else: + for cap in self.caps.values(): + cap.enabled = False + self.cap_neg_set(verb) + for item in items: + if verb == 'LS': + splitted = item.split('=', maxsplit=1) + self.caps[splitted[0]] = _ServerCapability( + enabled=False, data=''.join(splitted[1:])) + else: + self.caps[item].enabled = True + if is_final_line: + self.cap_neg_set(verb, done=True) + + @abstractmethod + def log(self, msg: str, chat: str = '') -> None: + '''Write msg into log of chat, whatever shape that may have. + + Messages to chat=CHAT_GLOB are meant to appear in all widgets mapped to + the client, those to chat="" only in the initial connection window. + ''' + + def send(self, msg: IrcMessage, chat: str = '') -> None: + 'Send line-separator-delimited message over socket.' + if not self.conn: + self.log('# ALERT: cannot send, connection seems closed') + return + self.conn.send(msg) + self.log(msg=f'> {msg.raw}', chat=chat) + self.log(msg=f'=>| {msg.raw}', chat=':raw') + + def update_login(self, nick_confirmed: bool, nickname: str = '') -> None: + '''Manage conn_setup..nickname, .nick_confirmed. + + (Useful for subclass extension.) + ''' + first_run = not hasattr(self.conn_setup, 'nickname') + prefix = '# nickname' + if first_run or (nickname and nickname != self.conn_setup.nickname): + verb = ('set' if first_run + else f'changed from "{self.conn_setup.nickname}"') + self.conn_setup.nickname = nickname + self.log(msg=f'{prefix} {verb} to "{nickname}"', chat=CHAT_GLOB) + if first_run or nick_confirmed != self.nick_confirmed: + self.nick_confirmed = nick_confirmed + if not first_run: + self.log(f'{prefix} {"" if nick_confirmed else "un"}confirmed') + + def close(self) -> None: + 'Close both recv Loop and socket.' + self.log(msg='# disconnecting from server', chat=CHAT_GLOB) + if self.conn: + self.conn.close() + self.conn = None + self.update_login(nick_confirmed=False) + + def handle_msg(self, msg: IrcMessage) -> None: + 'Process incoming msg towards appropriate client steps.' + self.log(f'<-| {msg.raw}', ':raw') + match msg.verb: + case 'PING': + self.send(IrcMessage(verb='PONG', params=(msg.params[0],))) + case 'ERROR': + self.close() + case '001' | 'NICK': + self.update_login(nickname=msg.params[0], nick_confirmed=True) + case 'PRIVMSG': + self.log(msg=str(msg.params), chat=msg.source) + case 'CAP': + match msg.params[1]: + case 'LS' | 'LIST': + self.collect_caps(msg.params[1:]) + case 'ACK' | 'NAK': + cap_names = msg.params[-1].split() + for cap_name in cap_names: + self.cap_neg_set(f'REQ:{cap_name}', done=True) + self.caps[cap_name].enabled = (msg.params[1] + == 'ACK') + if self.cap_neg_done('LIST'): + self.try_send_cap('END') + if not self.cap_neg('printing'): + self.log('# server capabilities (enabled: "+"):') + for cap_name, cap in self.caps.items(): + self.log('# ' + cap.str_for_log(cap_name)) + self.cap_neg_set('printing', done=True) + elif self.cap_neg_done('LS'): + for cap_name in ('server-time', 'account-tag', 'sasl'): + if (cap_name in self.caps + and (not self.caps[cap_name].enabled)): + self.try_send_cap('REQ', cap_name, key_fused=True) + self.try_send_cap('LIST') + + +@dataclass +class NewClientEvent(AffectiveEvent, PayloadMixin): + 'Put Client .payload into ClientsDb target.' + payload: 'Client' + + def affect(self, target: ClientsDb) -> None: + target[self.payload.id_] = self.payload + + +@dataclass +class InitReconnectEvent(ClientEvent): + 'To trigger re-opening of connection.' + + def affect(self, target: 'Client') -> None: + if target.conn: + target.log('# ALERT: reconnection called, but still seem ' + 'connected, so nothing to do.') + else: + target.start_connecting() + + +@dataclass +class SendEvent(ClientEvent, PayloadMixin): + 'To trigger sending of payload to server.' + payload: IrcMessage + chat: str = '' + + def affect(self, target: 'Client') -> None: + target.send(msg=self.payload, chat=self.chat) diff --git a/ircplom/client_tui.py b/ircplom/client_tui.py new file mode 100644 index 0000000..7c482ab --- /dev/null +++ b/ircplom/client_tui.py @@ -0,0 +1,135 @@ +'TUI adaptions to Client.' +# built-ins +from dataclasses import dataclass +from uuid import UUID +# ourselves +from ircplom.events import PayloadMixin +from ircplom.tui_base import (BaseTui, Window, TuiEvent, CMD_SHORTCUTS, + PROMPT_TEMPLATE) +from ircplom.irc_conn import IrcMessage +from ircplom.client import (CHAT_GLOB, IrcConnSetup, Client, + ClientIdMixin, ClientQueueMixin, + InitReconnectEvent, NewClientEvent, SendEvent) + + +CMD_SHORTCUTS['disconnect'] = 'window.disconnect' +CMD_SHORTCUTS['nick'] = 'window.nick' +CMD_SHORTCUTS['privmsg'] = 'window.privmsg' +CMD_SHORTCUTS['reconnect'] = 'window.reconnect' + + +class _ClientWindow(Window, ClientQueueMixin): + client_id_name = 'client_id' + + def __init__(self, client_id: UUID, chat: str = '', **kwargs) -> None: + self.client_id = client_id + self.chat = chat + super().__init__(**kwargs) + + def cmd__disconnect(self, quit_msg: str = 'ircplom says bye') -> None: + 'Send QUIT command to server.' + self._cput(SendEvent, + payload=IrcMessage(verb='QUIT', params=(quit_msg,))) + + def cmd__reconnect(self) -> None: + 'Attempt reconnection.' + self._cput(InitReconnectEvent) + + def cmd__nick(self, new_nick: str) -> None: + 'Attempt nickname change.' + self._cput(SendEvent, + payload=IrcMessage(verb='NICK', params=(new_nick,))) + + def cmd__privmsg(self, target: str, msg: str) -> None: + 'Send chat message msg to target.' + self._cput(SendEvent, chat=target, + payload=IrcMessage(verb='PRIVMSG', params=(target, msg))) + + +class ClientTui(BaseTui): + 'TUI expanded towards Client features.' + + def _new_client_window(self, client_id: UUID, chat: str = '' + ) -> _ClientWindow: + new_idx = len(self.windows) + win = _ClientWindow(idx=new_idx, term=self.term, q_out=self.q_out, + client_id=client_id, chat=chat) + self.windows += [win] + self._switch_window(new_idx) + return win + + def client_wins(self, client_id: UUID) -> list[_ClientWindow]: + 'All _ClientWindows matching client_id; if none, create one.' + wins = [win for win in self.windows + if isinstance(win, _ClientWindow) + and win.client_id == client_id] # pylint: disable=no-member + if not wins: + wins = [self._new_client_window(client_id=client_id)] + return wins + + def client_win(self, client_id: UUID, chat: str = '') -> _ClientWindow: + '''That _ClientWindow matching client_id and chat; create if none. + + In case of creation, copy prompt prefix from client's first window. + ''' + client_wins = self.client_wins(client_id) + candidates = [win for win in client_wins if win.chat == chat] + if candidates: + return candidates[0] + win = self._new_client_window(client_id=client_id, chat=chat) + if client_wins: + win.prompt.prefix = client_wins[0].prompt.prefix + return win + + def cmd__connect(self, hostname: str, nickname: str, realname: str + ) -> None: + 'Create Client and pass it via NewClientEvent.' + self._put(NewClientEvent( + _ClientKnowingTui( + q_out=self.q_out, + conn_setup=IrcConnSetup(hostname=hostname, nickname=nickname, + realname=realname)))) + + +@dataclass +class _ClientWindowEvent(TuiEvent, ClientIdMixin): + chat: str = '' + + +@dataclass +class _ClientLogEvent(_ClientWindowEvent, PayloadMixin): + payload: str + + def affect(self, target: ClientTui) -> None: + if self.chat == CHAT_GLOB: + for win in target.client_wins(self.client_id): + win.log.append(self.payload) + else: + target.client_win(self.client_id, self.chat + ).log.append(self.payload) + super().affect(target) + + +@dataclass +class _ClientPromptEvent(_ClientWindowEvent, PayloadMixin): + payload: tuple[str, str] + + def affect(self, target: ClientTui) -> None: + new_prefix = ((' ' if self.payload[0] else '?') + + f'{self.payload[1]}{PROMPT_TEMPLATE}') + for win in target.client_wins(self.client_id): + prompt = win.prompt + prompt.prefix = new_prefix + prompt.tainted = True + super().affect(target) + + +class _ClientKnowingTui(Client): + + def log(self, msg: str, chat: str = '') -> None: + self._cput(_ClientLogEvent, chat=chat, payload=msg) + + def update_login(self, nick_confirmed: bool, nickname: str = '') -> None: + super().update_login(nick_confirmed, nickname) + self._cput(_ClientPromptEvent, payload=(self.nick_confirmed, + self.conn_setup.nickname)) diff --git a/ircplom/irc_conn.py b/ircplom/irc_conn.py index ab9426d..f7f5455 100644 --- a/ircplom/irc_conn.py +++ b/ircplom/irc_conn.py @@ -1,21 +1,12 @@ -'IRC server connection management.' +'Low-level IRC protocol / server connection management.' # built-ins from abc import ABC, abstractmethod -from dataclasses import dataclass -from getpass import getuser from socket import socket, gaierror as socket_gaierror -from threading import Thread from typing import Callable, Iterator, NamedTuple, Optional, Self -from uuid import uuid4, UUID # ourselves -from ircplom.events import ( - AffectiveEvent, ExceptionEvent, Loop, PayloadMixin, QueueMixin) +from ircplom.events import Event, Loop, QueueMixin -ClientsDb = dict[UUID, 'Client'] - -CHAT_GLOB = '*' - _TIMEOUT_RECV_LOOP = 0.1 _TIMEOUT_CONNECT = 5 _CONN_RECV_BUFSIZE = 1024 @@ -129,22 +120,21 @@ class IrcMessage: return self._raw -class _IrcConnAbortException(Exception): - pass +class IrcConnAbortException(Exception): + 'Thrown by BaseIrcConnection on expectable connection failures.' -class _IrcConnection(QueueMixin): +class BaseIrcConnection(QueueMixin, ABC): 'Collects low-level server-client connection management.' - def __init__(self, hostname: str, client_id: UUID, **kwargs) -> None: + def __init__(self, hostname: str, **kwargs) -> None: super().__init__(**kwargs) - self.client_id = client_id self._socket = socket() self._socket.settimeout(_TIMEOUT_CONNECT) try: self._socket.connect((hostname, _PORT)) except (TimeoutError, socket_gaierror) as e: - raise _IrcConnAbortException(e) from e + raise IrcConnAbortException(e) from e self._socket.settimeout(_TIMEOUT_RECV_LOOP) self._recv_loop = Loop(iterator=self._read_lines(), q_out=self.q_out) @@ -157,7 +147,11 @@ class _IrcConnection(QueueMixin): 'Send line-separator-delimited message over socket.' self._socket.sendall(msg.raw.encode('utf-8') + _IRCSPEC_LINE_SEPARATOR) - def _read_lines(self) -> Iterator[Optional['_RecvEvent']]: + @abstractmethod + def _make_recv_event(self, msg: IrcMessage) -> Event: + pass + + def _read_lines(self) -> Iterator[Optional[Event]]: assert self._socket is not None bytes_total = b'' buffer_linesep = b'' @@ -186,255 +180,6 @@ class _IrcConnection(QueueMixin): buffer_linesep += c_byted if buffer_linesep == _IRCSPEC_LINE_SEPARATOR: buffer_linesep = b'' - yield _RecvEvent(client_id=self.client_id, - payload=IrcMessage.from_raw( - bytes_total.decode('utf-8'))) + yield self._make_recv_event( + IrcMessage.from_raw(bytes_total.decode('utf-8'))) bytes_total = b'' - - -@dataclass -class ClientIdMixin: - 'Collects a Client\'s ID at .client_id.' - client_id: UUID - - -@dataclass -class NewClientEvent(AffectiveEvent, PayloadMixin): - 'Put Client .payload into ClientsDb target.' - payload: 'Client' - - def affect(self, target: ClientsDb) -> None: - target[self.payload.id_] = self.payload - - -@dataclass -class ClientEvent(AffectiveEvent, ClientIdMixin): - 'To affect Client identified by ClientIdMixin.' - - -@dataclass -class _ConnectedEvent(ClientEvent): - - def affect(self, target: 'Client') -> None: - target.log(msg='# connected to server', chat=CHAT_GLOB) - target.try_send_cap('LS', ('302',)) - target.send(IrcMessage(verb='USER', - params=(getuser(), '0', '*', - target.conn_setup.realname))) - target.send(IrcMessage(verb='NICK', - params=(target.conn_setup.nickname,))) - - -@dataclass -class InitReconnectEvent(ClientEvent): - 'To trigger re-opening of connection.' - - def affect(self, target: 'Client') -> None: - if target.conn: - target.log('# ALERT: reconnection called, but still seem ' - 'connected, so nothing to do.') - else: - target.start_connecting() - - -@dataclass -class SendEvent(ClientEvent, PayloadMixin): - 'To trigger sending of payload to server.' - payload: IrcMessage - chat: str = '' - - def affect(self, target: 'Client') -> None: - target.send(msg=self.payload, chat=self.chat) - - -@dataclass -class _RecvEvent(ClientEvent, PayloadMixin): - payload: IrcMessage - - def affect(self, target: 'Client') -> None: - target.handle_msg(self.payload) - - -@dataclass -class ClientQueueMixin(QueueMixin): - 'To QueueMixin adds _cput to extend ._put with client_id= setting.' - client_id_name = 'id_' - - def _cput(self, event_class, **kwargs) -> None: - self._put(event_class(client_id=getattr(self, self.client_id_name), - **kwargs)) - - -@dataclass -class ServerCapability: - 'Store data collectable via CAPS LS/LIST/NEW.' - enabled: bool - data: str - - def str_for_log(self, name: str) -> str: - 'Optimized for Client.log per-line listing.' - listing = '+' if self.enabled else '-' - listing += f' {name}' - if self.data: - listing += f' ({self.data})' - return listing - - -@dataclass -class IrcConnSetup: - 'All we need to know to set up a new Client connection.' - hostname: str - nickname: str - realname: str - - -class Client(ABC, ClientQueueMixin): - 'Abstracts socket connection, loop over it, and handling messages from it.' - nick_confirmed: bool = False - conn: Optional[_IrcConnection] = None - - def __init__(self, conn_setup: IrcConnSetup, **kwargs) -> None: - super().__init__(**kwargs) - self.conn_setup = conn_setup - self._cap_neg_states: dict[str, bool] = {} - self.caps: dict[str, ServerCapability] = {} - self.id_ = uuid4() - self.update_login(nick_confirmed=False, - nickname=self.conn_setup.nickname) - self.start_connecting() - - def start_connecting(self) -> None: - 'Start thread to set up IrcConnection at .conn.' - - def connect(self) -> None: - try: - self.conn = _IrcConnection(hostname=self.conn_setup.hostname, - q_out=self.q_out, - client_id=self.id_) - self._cput(_ConnectedEvent) - except _IrcConnAbortException as e: - self.log(f'# ALERT: {e}') - except Exception as e: # pylint: disable=broad-exception-caught - self._put(ExceptionEvent(e)) - - Thread(target=connect, daemon=True, args=(self,)).start() - - def cap_neg_done(self, negotiation_step: str) -> bool: - 'Whether negotiation_step is registered as finished.' - return self._cap_neg_states.get(negotiation_step, False) - - def cap_neg(self, negotiation_step: str) -> bool: - 'Whether negotiation_step is registered at all (started or finished).' - return negotiation_step in self._cap_neg_states - - def cap_neg_set(self, negotiation_step: str, done: bool = False) -> None: - 'Declare negotiation_step started, or (if done) finished.' - self._cap_neg_states[negotiation_step] = done - - def try_send_cap(self, *params, key_fused: bool = False) -> None: - 'Run CAP command with params, handle cap neg. state.' - neg_state_key = ':'.join(params) if key_fused else params[0] - if self.cap_neg(neg_state_key): - return - self.send(IrcMessage(verb='CAP', params=params)) - self.cap_neg_set(neg_state_key) - - def collect_caps(self, params: tuple[str, ...]) -> None: - 'Record available and enabled server capabilities.' - verb = params[0] - items = params[-1].strip().split() - is_final_line = params[1] != '*' - if self.cap_neg_done(verb): - if verb == 'LS': - self.caps.clear() - else: - for cap in self.caps.values(): - cap.enabled = False - self.cap_neg_set(verb) - for item in items: - if verb == 'LS': - splitted = item.split('=', maxsplit=1) - self.caps[splitted[0]] = ServerCapability( - enabled=False, data=''.join(splitted[1:])) - else: - self.caps[item].enabled = True - if is_final_line: - self.cap_neg_set(verb, done=True) - - @abstractmethod - def log(self, msg: str, chat: str = '') -> None: - '''Write msg into log of chat, whatever shape that may have. - - Messages to chat=CHAT_GLOB are meant to appear in all widgets mapped to - the client, those to chat="" only in the initial connection window. - ''' - - def send(self, msg: IrcMessage, chat: str = '') -> None: - 'Send line-separator-delimited message over socket.' - if not self.conn: - self.log('# ALERT: cannot send, connection seems closed') - return - self.conn.send(msg) - self.log(msg=f'> {msg.raw}', chat=chat) - self.log(msg=f'=>| {msg.raw}', chat=':raw') - - def update_login(self, nick_confirmed: bool, nickname: str = '') -> None: - '''Manage conn_setup..nickname, .nick_confirmed. - - (Useful for subclass extension.) - ''' - first_run = not hasattr(self.conn_setup, 'nickname') - prefix = '# nickname' - if first_run or (nickname and nickname != self.conn_setup.nickname): - verb = ('set' if first_run - else f'changed from "{self.conn_setup.nickname}"') - self.conn_setup.nickname = nickname - self.log(msg=f'{prefix} {verb} to "{nickname}"', chat=CHAT_GLOB) - if first_run or nick_confirmed != self.nick_confirmed: - self.nick_confirmed = nick_confirmed - if not first_run: - self.log(f'{prefix} {"" if nick_confirmed else "un"}confirmed') - - def close(self) -> None: - 'Close both recv Loop and socket.' - self.log(msg='# disconnecting from server', chat=CHAT_GLOB) - if self.conn: - self.conn.close() - self.conn = None - self.update_login(nick_confirmed=False) - - def handle_msg(self, msg: IrcMessage) -> None: - 'Process incoming msg towards appropriate client steps.' - self.log(f'<-| {msg.raw}', ':raw') - match msg.verb: - case 'PING': - self.send(IrcMessage(verb='PONG', params=(msg.params[0],))) - case 'ERROR': - self.close() - case '001' | 'NICK': - self.update_login(nickname=msg.params[0], nick_confirmed=True) - case 'PRIVMSG': - self.log(msg=str(msg.params), chat=msg.source) - case 'CAP': - match msg.params[1]: - case 'LS' | 'LIST': - self.collect_caps(msg.params[1:]) - case 'ACK' | 'NAK': - cap_names = msg.params[-1].split() - for cap_name in cap_names: - self.cap_neg_set(f'REQ:{cap_name}', done=True) - self.caps[cap_name].enabled = (msg.params[1] - == 'ACK') - if self.cap_neg_done('LIST'): - self.try_send_cap('END') - if not self.cap_neg('printing'): - self.log('# server capabilities (enabled: "+"):') - for cap_name, cap in self.caps.items(): - self.log('# ' + cap.str_for_log(cap_name)) - self.cap_neg_set('printing', done=True) - elif self.cap_neg_done('LS'): - for cap_name in ('server-time', 'account-tag', 'sasl'): - if (cap_name in self.caps - and (not self.caps[cap_name].enabled)): - self.try_send_cap('REQ', cap_name, key_fused=True) - self.try_send_cap('LIST') diff --git a/ircplom/tui.py b/ircplom/tui.py deleted file mode 100644 index 39b51f3..0000000 --- a/ircplom/tui.py +++ /dev/null @@ -1,725 +0,0 @@ -'Terminal and TUI management.' -# built-ins -from abc import ABC, abstractmethod -from base64 import b64decode -from contextlib import contextmanager -from dataclasses import dataclass -from inspect import _empty as inspect_empty, signature, stack -from signal import SIGWINCH, signal -from typing import Callable, Generator, Iterator, NamedTuple, Optional -from uuid import UUID -# requirements.txt -from blessed import Terminal as BlessedTerminal -# ourselves -from ircplom.events import ( - AffectiveEvent, Loop, PayloadMixin, QueueMixin, QuitEvent) -from ircplom.irc_conn import ( - CHAT_GLOB, IrcConnSetup, IrcMessage, Client, ClientIdMixin, - ClientQueueMixin, InitReconnectEvent, NewClientEvent, SendEvent) - -_MIN_HEIGHT = 4 -_MIN_WIDTH = 32 - -_TIMEOUT_KEYPRESS_LOOP = 0.5 -_B64_PREFIX = 'b64:' -_OSC52_PREFIX = b']52;c;' -_PASTE_DELIMITER = '\007' - -_PROMPT_TEMPLATE = '> ' -_PROMPT_ELL_IN = '<…' -_PROMPT_ELL_OUT = '…>' - -_CHAR_RESIZE = chr(12) -_KEYBINDINGS = { - 'KEY_BACKSPACE': ('window.prompt.backspace',), - 'KEY_ENTER': ('prompt_enter',), - 'KEY_LEFT': ('window.prompt.move_cursor', 'left'), - 'KEY_RIGHT': ('window.prompt.move_cursor', 'right'), - 'KEY_UP': ('window.prompt.scroll', 'up'), - 'KEY_DOWN': ('window.prompt.scroll', 'down'), - 'KEY_PGUP': ('window.log.scroll', 'up'), - 'KEY_PGDOWN': ('window.log.scroll', 'down'), - 'esc:91:49:59:51:68': ('window', 'left'), - 'esc:91:49:59:51:67': ('window', 'right'), - 'KEY_F1': ('window.paste',), -} -_CMD_SHORTCUTS = { - 'disconnect': 'window.disconnect', - 'nick': 'window.nick', - 'privmsg': 'window.privmsg', - 'reconnect': 'window.reconnect' -} - - -@dataclass -class TuiEvent(AffectiveEvent): - 'To affect TUI, and trigger flushed .draw_tainted on it.' - - def affect(self, target: 'Tui') -> None: - target.window.draw_tainted() - target.term.flush() - - -@dataclass -class _SetScreenEvent(TuiEvent): - - def affect(self, target: 'Tui') -> None: - target.term.calc_geometry() - for window in target.windows: - window.set_geometry() - super().affect(target) - - -class _YX(NamedTuple): - y: int - x: int - - -class _Widget(ABC): - - @abstractmethod - def __init__(self, **kwargs) -> None: - super().__init__(**kwargs) - self.tainted = True - self._drawable = False - - @abstractmethod - def set_geometry(self, measurements: _YX) -> bool: - 'Update widget\'s measurements, re-generate content where necessary.' - self.tainted = True - self._drawable = len([m for m in measurements if m < 0]) == 0 - return self._drawable - - @abstractmethod - def draw(self) -> bool: - 'Print widget\'s content in shape appropriate to set geometry.' - if not self._drawable: - return False - self.tainted = False - return True - - -class _ScrollableWidget(_Widget, ABC): - _history_idx: int - - def __init__(self, write: Callable[..., None], **kwargs) -> None: - super().__init__(**kwargs) - self._write = write - self._history: list[str] = [] - - def append(self, to_append: str) -> None: - 'Append to scrollable history.' - self._history += [to_append] - - @abstractmethod - def _scroll(self, up=True) -> None: - self.tainted = True - - def cmd__scroll(self, direction: str) -> None: - 'Scroll through stored content/history.' - self._scroll(up=direction == 'up') - - -class _LogWidget(_ScrollableWidget): - _view_size: _YX - _y_pgscroll: int - - def __init__(self, wrap: Callable[[str], list[str]], **kwargs - ) -> None: - super().__init__(**kwargs) - self._wrap = wrap - self._wrapped_idx = self._history_idx = -1 - self._wrapped: list[tuple[Optional[int], str]] = [] - - def _add_wrapped(self, idx_original, line) -> int: - wrapped_lines = self._wrap(line) - self._wrapped += [(idx_original, line) for line in wrapped_lines] - return len(wrapped_lines) - - def set_geometry(self, measurements: _YX) -> bool: - if not super().set_geometry(measurements): - return False - self._view_size = measurements - self._y_pgscroll = self._view_size.y // 2 - self._wrapped.clear() - self._wrapped += [(None, '')] * self._view_size.y - if not self._history: - return True - for idx_history, line in enumerate(self._history): - self._add_wrapped(idx_history, line) - wrapped_lines_for_history_idx = [ - t for t in self._wrapped - if t[0] == len(self._history) + self._history_idx] - idx_their_last = self._wrapped.index(wrapped_lines_for_history_idx[-1]) - self._wrapped_idx = idx_their_last - len(self._wrapped) - return True - - def append(self, to_append: str) -> None: - super().append(to_append) - self.tainted = True - if self._history_idx < -1: - self._history_idx -= 1 - if not self._drawable: - return - n_wrapped_lines = self._add_wrapped(len(self._history) - 1, to_append) - if self._wrapped_idx < -1: - self._wrapped_idx -= n_wrapped_lines - - def draw(self) -> bool: - if not super().draw(): - return False - start_idx = self._wrapped_idx - self._view_size.y + 1 - end_idx = self._wrapped_idx - to_write = [t[1] for t in self._wrapped[start_idx:end_idx]] - if self._wrapped_idx < -1: - scroll_info = f'vvv [{(-1) * self._wrapped_idx}] ' - scroll_info += 'v' * (self._view_size.x - len(scroll_info)) - to_write += [scroll_info] - else: - to_write += [self._wrapped[self._wrapped_idx][1]] - for i, line in enumerate(to_write): - self._write(line, i) - return True - - def _scroll(self, up: bool = True) -> None: - super()._scroll(up) - if not self._drawable: - return - if up: - self._wrapped_idx = max(self._view_size.y + 1 - len(self._wrapped), - self._wrapped_idx - self._y_pgscroll) - else: - self._wrapped_idx = min(-1, - self._wrapped_idx + self._y_pgscroll) - history_idx_to_wrapped_idx = self._wrapped[self._wrapped_idx][0] - if history_idx_to_wrapped_idx is not None: - self._history_idx = history_idx_to_wrapped_idx - len(self._history) - - -class _PromptWidget(_ScrollableWidget): - _y: int - _width: int - _history_idx: int = 0 - _input_buffer_unsafe: str - _cursor_x: int - - def __init__(self, **kwargs) -> None: - super().__init__(**kwargs) - self.prefix = _PROMPT_TEMPLATE - self._reset_buffer('') - - @property - def _input_buffer(self) -> str: - return self._input_buffer_unsafe[:] - - @_input_buffer.setter - def _input_buffer(self, content) -> None: - self.tainted = True - self._input_buffer_unsafe = content - - def set_geometry(self, measurements: _YX) -> bool: - if not super().set_geometry(measurements): - return False - self._y, self._width = measurements - return True - - def draw(self) -> bool: - if not super().draw(): - return False - prefix = self.prefix[:] - content = self._input_buffer - if self._cursor_x == len(self._input_buffer): - content += ' ' - half_width = (self._width - len(prefix)) // 2 - offset = 0 - if len(prefix) + len(content) > self._width\ - and self._cursor_x > half_width: - prefix += _PROMPT_ELL_IN - offset = min(len(prefix) + len(content) - self._width, - self._cursor_x - half_width + len(_PROMPT_ELL_IN)) - cursor_x_to_write = len(prefix) + self._cursor_x - offset - to_write = f'{prefix}{content[offset:]}' - if len(to_write) > self._width: - to_write = (to_write[:self._width - len(_PROMPT_ELL_OUT)] - + _PROMPT_ELL_OUT) - self._write(to_write[:cursor_x_to_write], self._y, padding=False) - self._write(to_write[cursor_x_to_write], attribute='reverse', - padding=False) - self._write(to_write[cursor_x_to_write + 1:]) - return True - - def _archive_prompt(self) -> None: - self.append(self._input_buffer) - self._reset_buffer('') - - def _scroll(self, up: bool = True) -> None: - super()._scroll(up) - if up and -(self._history_idx) < len(self._history): - if self._history_idx == 0 and self._input_buffer: - self._archive_prompt() - self._history_idx -= 1 - self._history_idx -= 1 - self._reset_buffer(self._history[self._history_idx]) - elif not up: - if self._history_idx < 0: - self._history_idx += 1 - if self._history_idx == 0: - self._reset_buffer('') - else: - self._reset_buffer(self._history[self._history_idx]) - elif self._input_buffer: - self._archive_prompt() - - def insert(self, to_insert: str) -> None: - 'Insert into prompt input buffer.' - self._cursor_x += len(to_insert) - self._input_buffer = (self._input_buffer[:self._cursor_x - 1] - + to_insert - + self._input_buffer[self._cursor_x - 1:]) - self._history_idx = 0 - - def cmd__backspace(self) -> None: - 'Truncate current content by one character, if possible.' - if self._cursor_x > 0: - self._cursor_x -= 1 - self._input_buffer = (self._input_buffer[:self._cursor_x] - + self._input_buffer[self._cursor_x + 1:]) - self._history_idx = 0 - - def cmd__move_cursor(self, direction: str) -> None: - 'Move cursor one space into direction ("left" or "right") if possible.' - if direction == 'left' and self._cursor_x > 0: - self._cursor_x -= 1 - elif direction == 'right'\ - and self._cursor_x < len(self._input_buffer): - self._cursor_x += 1 - else: - return - self.tainted = True - - def _reset_buffer(self, content: str) -> None: - self._input_buffer = content - self._cursor_x = len(self._input_buffer) - - def enter(self) -> str: - 'Return current content while also clearing and then redrawing.' - to_return = self._input_buffer[:] - if to_return: - self._archive_prompt() - return to_return - - -class _Window(_Widget): - _y_status: int - prompt: _PromptWidget - - def __init__(self, idx: int, term: 'Terminal', **kwargs) -> None: - super().__init__(**kwargs) - self.idx = idx - self._term = term - self.log = _LogWidget(wrap=self._term.wrap, write=self._term.write) - self.prompt = self.__annotations__['prompt'](write=self._term.write) - if hasattr(self._term, 'size'): - self.set_geometry() - - def set_geometry(self, _=None) -> bool: - assert _ is None - if self._term.size.y < _MIN_HEIGHT or self._term.size.x < _MIN_WIDTH: - bad_yx = _YX(-1, -1) - super().set_geometry(bad_yx) - self.log.set_geometry(bad_yx) - self.prompt.set_geometry(bad_yx) - return False - super().set_geometry(_YX(0, 0)) - self._y_status = self._term.size.y - 2 - self.log.set_geometry(_YX(self._y_status, self._term.size.x)) - self.prompt.set_geometry(_YX(self._term.size.y - 1, self._term.size.x)) - return True - - def draw(self) -> bool: - self._term.clear() - if not super().draw(): - if self._term.size.x > 0: - lines = [''] - for i, c in enumerate('screen too small'): - if i > 0 and 0 == i % self._term.size.x: - lines += [''] - lines[-1] += c - for y, line in enumerate(lines): - self._term.write(line, y) - return False - idx_box = f'[{self.idx}]' - status_line = idx_box + '=' * (self._term.size.x - len(idx_box)) - self.log.draw() - self._term.write(status_line, self._y_status) - self.prompt.draw() - return True - - def cmd__paste(self) -> None: - 'Write OSC 52 ? sequence to get encoded clipboard paste into stdin.' - self._term.write(f'\033{_OSC52_PREFIX.decode()}?{_PASTE_DELIMITER}', - self._y_status) - self.tainted = True - - def draw_tainted(self) -> None: - 'Draw tainted parts of self.' - if self.tainted: - self.draw() - return - for widget in [w for w in (self.log, self.prompt) if w.tainted]: - widget.draw() - - -@dataclass -class _KeyboardEvent(TuiEvent, PayloadMixin): - payload: str - - def affect(self, target: 'Tui') -> None: - if self.payload[0] == _CHAR_RESIZE: - _SetScreenEvent().affect(target) - return - if self.payload in _KEYBINDINGS: - cmd_data = _KEYBINDINGS[self.payload] - cmd = target.cmd_name_to_cmd(cmd_data[0]) - if cmd: - cmd(*cmd_data[1:]) - elif self.payload.startswith(_B64_PREFIX): - encoded = self.payload[len(_B64_PREFIX):] - to_paste = '' - for i, c in enumerate(b64decode(encoded).decode('utf-8')): - if i > 512: - break - if c.isprintable(): - to_paste += c - elif c.isspace(): - to_paste += ' ' - else: - to_paste += '#' - target.window.prompt.insert(to_paste) - elif len(self.payload) == 1: - target.window.prompt.insert(self.payload) - else: - target.log(f'# ALERT: unknown keyboard input: {self.payload}') - super().affect(target) - - -class Tui(QueueMixin): - 'Base for graphical user interface elements.' - - def __init__(self, term: 'Terminal', **kwargs) -> None: - super().__init__(**kwargs) - self.term = term - self._window_idx = 0 - self.windows = [_Window(idx=self._window_idx, term=self.term)] - self._put(_SetScreenEvent()) - - def cmd_name_to_cmd(self, cmd_name: str) -> Optional[Callable]: - 'Map cmd_name to executable TUI element method.' - cmd_name = _CMD_SHORTCUTS.get(cmd_name, cmd_name) - cmd_parent = self - while True: - cmd_name_toks = cmd_name.split('.', maxsplit=1) - if len(cmd_name_toks) == 1: - break - if not hasattr(cmd_parent, cmd_name_toks[0]): - return None - cmd_parent = getattr(cmd_parent, cmd_name_toks[0]) - cmd_name = cmd_name_toks[1] - cmd_name = f'cmd__{cmd_name}' - if not hasattr(cmd_parent, cmd_name): - return None - return getattr(cmd_parent, cmd_name) - - @property - def window(self) -> _Window: - 'Currently selected _Window.' - return self.windows[self._window_idx] - - def _new_client_window(self, client_id: UUID, chat: str = '' - ) -> '_ClientWindow': - new_idx = len(self.windows) - win = _ClientWindow(idx=new_idx, term=self.term, q_out=self.q_out, - client_id=client_id, chat=chat) - self.windows += [win] - self._switch_window(new_idx) - return win - - def client_wins(self, client_id: UUID) -> list['_ClientWindow']: - 'All _ClientWindows matching client_id; if none, create one.' - wins = [win for win in self.windows - if isinstance(win, _ClientWindow) - and win.client_id == client_id] # pylint: disable=no-member - if not wins: - wins = [self._new_client_window(client_id=client_id)] - return wins - - def client_win(self, client_id: UUID, chat: str = '') -> '_ClientWindow': - '''That _ClientWindow matching client_id and chat; create if none. - - In case of creation, copy prompt prefix from client's first window. - ''' - client_wins = self.client_wins(client_id) - candidates = [win for win in client_wins if win.chat == chat] - if candidates: - return candidates[0] - win = self._new_client_window(client_id=client_id, chat=chat) - if client_wins: - win.prompt.prefix = client_wins[0].prompt.prefix - return win - - def log(self, msg: str) -> None: - 'Post msg to active window\'s log.' - self.window.log.append(msg) - - def _switch_window(self, idx: int) -> None: - self._window_idx = idx - self.window.draw() - - def cmd__connect(self, hostname: str, nickname: str, realname: str - ) -> None: - 'Create Client and pass it via NewClientEvent.' - self._put(NewClientEvent( - _ClientKnowingTui( - q_out=self.q_out, - conn_setup=IrcConnSetup(hostname=hostname, nickname=nickname, - realname=realname)))) - - def cmd__prompt_enter(self) -> None: - 'Get prompt content from .window.prompt.enter, parse to & run command.' - to_parse = self.window.prompt.enter() - if not to_parse: - return - alert: Optional[str] = None - if to_parse[0:1] == '/': - toks = to_parse[1:].split(maxsplit=1) - alert = f'{toks[0]} unknown' - cmd = self.cmd_name_to_cmd(toks[0]) - if cmd and cmd.__name__ != stack()[0].function: - params = signature(cmd).parameters - n_args_max = len(params) - n_args_min = len([p for p in params.values() - if p.default == inspect_empty]) - alert = f'{cmd.__name__} needs between {n_args_min} and '\ - f'{n_args_max} args' - if len(toks) == 1 and not n_args_min: - alert = cmd() - elif len(toks) > 1 and params\ - and n_args_min <= len(toks[1].split()): - args = [] - while len(toks) > 1 and n_args_max: - toks = toks[1].split(maxsplit=1) - args += [toks[0]] - n_args_max -= 1 - alert = cmd(*args) - else: - alert = 'not prefixed by /' - if alert: - self.log(f'# ALERT: invalid prompt command: {alert}') - - def cmd__quit(self) -> None: - 'Trigger program exit.' - self._put(QuitEvent()) - - def cmd__window(self, towards: str) -> Optional[str]: - 'Switch window selection.' - n_windows = len(self.windows) - if n_windows < 2: - return 'no alternate window to move into' - if towards in {'left', 'right'}: - multiplier = (+1) if towards == 'right' else (-1) - window_idx = self._window_idx + multiplier - if not 0 <= window_idx < n_windows: - window_idx -= multiplier * n_windows - elif not towards.isdigit(): - return f'neither "left"/"right" nor integer: {towards}' - else: - window_idx = int(towards) - if not 0 <= window_idx < n_windows: - return f'unavailable window idx: {window_idx}' - self._switch_window(window_idx) - return None - - -class Terminal(QueueMixin): - 'Abstraction of terminal interface.' - size: _YX - _cursor_yx_: _YX - - def __init__(self, **kwargs) -> None: - super().__init__(**kwargs) - self._blessed = BlessedTerminal() - self._cursor_yx = _YX(0, 0) - - @contextmanager - def setup(self) -> Generator: - 'Combine multiple contexts into one and run keypress loop.' - signal(SIGWINCH, lambda *_: self._put(_SetScreenEvent())) - self.clear() - with (self._blessed.raw(), - self._blessed.fullscreen(), - self._blessed.hidden_cursor(), - Loop(iterator=self._get_keypresses(), q_out=self.q_out)): - yield self - - @property - def _cursor_yx(self) -> _YX: - return self._cursor_yx_ - - @_cursor_yx.setter - def _cursor_yx(self, yx: _YX) -> None: - print(self._blessed.move_yx(yx.y, yx.x), end='') - self._cursor_yx_ = yx - - def calc_geometry(self) -> None: - '(Re-)calculate .size..' - self.size = _YX(self._blessed.height, self._blessed.width) - - def clear(self) -> None: - 'Clear terminal.' - print(self._blessed.clear, end='') - - def flush(self) -> None: - 'Flush terminal.' - print('', end='', flush=True) - - def wrap(self, line: str) -> list[str]: - 'Wrap line to list of lines fitting into terminal width.' - return self._blessed.wrap(line, width=self.size.x, - subsequent_indent=' '*4) - - def write(self, - msg: str = '', - start_y: Optional[int] = None, - attribute: Optional[str] = None, - padding: bool = True - ) -> None: - 'Print to terminal, with position, padding to line end, attributes.' - if start_y is not None: - self._cursor_yx = _YX(start_y, 0) - # ._blessed.length can slow down things notably: only use where needed! - end_x = self._cursor_yx.x + (len(msg) if msg.isascii() - else self._blessed.length(msg)) - len_padding = self.size.x - end_x - if len_padding < 0: - msg = self._blessed.truncate(msg, self.size.x - self._cursor_yx.x) - elif padding: - msg += ' ' * len_padding - end_x = self.size.x - if attribute: - msg = getattr(self._blessed, attribute)(msg) - print(msg, end='') - self._cursor_yx = _YX(self._cursor_yx.y, end_x) - - def _get_keypresses(self) -> Iterator[Optional[_KeyboardEvent]]: - '''Loop through keypresses from terminal, expand blessed's handling. - - Explicitly collect KEY_ESCAPE-modified key sequences, and recognize - OSC52-prefixed pastables to return the respective base64 code, - prefixed with _B64_PREFIX. - ''' - while True: - to_yield = '' - ks = self._blessed.inkey( - timeout=_TIMEOUT_KEYPRESS_LOOP, # how long until yield None, - esc_delay=0) # incl. until thread dies - if ks.name != 'KEY_ESCAPE': - to_yield = f'{ks.name if ks.name else ks}' - else: - chars = b'' - while (new_chars := self._blessed.inkey(timeout=0, esc_delay=0 - ).encode('utf-8')): - chars += new_chars - len_prefix = len(_OSC52_PREFIX) - if chars[:len_prefix] == _OSC52_PREFIX: - to_yield = _B64_PREFIX[:] - # sometimes, prev .inkey got some or all (including paste - # delimiter) of the paste code (maybe even more), so first - # harvest potential remains of chars post prefix … - caught_delimiter = False - post_prefix_str = chars[len_prefix:].decode('utf-8') - for idx, c in enumerate(post_prefix_str): - if c == _PASTE_DELIMITER: - caught_delimiter = True - if (remains := post_prefix_str[idx + 1:]): - self._blessed.ungetch(remains) - break - to_yield += c - # … before .getch() further until expected delimiter found - if not caught_delimiter: - while (c := self._blessed.getch()) != _PASTE_DELIMITER: - to_yield += c - else: - to_yield = 'esc:' + ':'.join([str(int(b)) for b in chars]) - yield _KeyboardEvent(to_yield) if to_yield else None - - -class _ClientWindow(_Window, ClientQueueMixin): - client_id_name = 'client_id' - - def __init__(self, client_id: UUID, chat: str = '', **kwargs) -> None: - self.client_id = client_id - self.chat = chat - super().__init__(**kwargs) - - def cmd__disconnect(self, quit_msg: str = 'ircplom says bye') -> None: - 'Send QUIT command to server.' - self._cput(SendEvent, - payload=IrcMessage(verb='QUIT', params=(quit_msg,))) - - def cmd__reconnect(self) -> None: - 'Attempt reconnection.' - self._cput(InitReconnectEvent) - - def cmd__nick(self, new_nick: str) -> None: - 'Attempt nickname change.' - self._cput(SendEvent, - payload=IrcMessage(verb='NICK', params=(new_nick,))) - - def cmd__privmsg(self, target: str, msg: str) -> None: - 'Send chat message msg to target.' - self._cput(SendEvent, chat=target, - payload=IrcMessage(verb='PRIVMSG', params=(target, msg))) - - -@dataclass -class _ClientWindowEvent(TuiEvent, ClientIdMixin): - chat: str = '' - - -@dataclass -class _ClientLogEvent(_ClientWindowEvent, PayloadMixin): - payload: str - - def affect(self, target: Tui) -> None: - if self.chat == CHAT_GLOB: - for win in target.client_wins(self.client_id): - win.log.append(self.payload) - else: - target.client_win(self.client_id, self.chat - ).log.append(self.payload) - super().affect(target) - - -@dataclass -class _ClientPromptEvent(_ClientWindowEvent, PayloadMixin): - payload: tuple[str, str] - - def affect(self, target: Tui) -> None: - new_prefix = ((' ' if self.payload[0] else '?') - + f'{self.payload[1]}{_PROMPT_TEMPLATE}') - for win in target.client_wins(self.client_id): - prompt = win.prompt - prompt.prefix = new_prefix - prompt.tainted = True - super().affect(target) - - -class _ClientKnowingTui(Client): - - def log(self, msg: str, chat: str = '') -> None: - self._cput(_ClientLogEvent, chat=chat, payload=msg) - - def update_login(self, nick_confirmed: bool, nickname: str = '') -> None: - super().update_login(nick_confirmed, nickname) - self._cput(_ClientPromptEvent, payload=(self.nick_confirmed, - self.conn_setup.nickname)) diff --git a/ircplom/tui_base.py b/ircplom/tui_base.py new file mode 100644 index 0000000..e19e90f --- /dev/null +++ b/ircplom/tui_base.py @@ -0,0 +1,605 @@ +'Base Terminal and TUI management.' +# built-ins +from abc import ABC, abstractmethod +from base64 import b64decode +from contextlib import contextmanager +from dataclasses import dataclass +from inspect import _empty as inspect_empty, signature, stack +from signal import SIGWINCH, signal +from typing import Callable, Generator, Iterator, NamedTuple, Optional +# requirements.txt +from blessed import Terminal as BlessedTerminal +# ourselves +from ircplom.events import ( + AffectiveEvent, Loop, PayloadMixin, QueueMixin, QuitEvent) +# from ircplom.irc_conn import IrcMessage + +_MIN_HEIGHT = 4 +_MIN_WIDTH = 32 + +_TIMEOUT_KEYPRESS_LOOP = 0.5 +_B64_PREFIX = 'b64:' +_OSC52_PREFIX = b']52;c;' +_PASTE_DELIMITER = '\007' + +PROMPT_TEMPLATE = '> ' +_PROMPT_ELL_IN = '<…' +_PROMPT_ELL_OUT = '…>' + +_CHAR_RESIZE = chr(12) +_KEYBINDINGS = { + 'KEY_BACKSPACE': ('window.prompt.backspace',), + 'KEY_ENTER': ('prompt_enter',), + 'KEY_LEFT': ('window.prompt.move_cursor', 'left'), + 'KEY_RIGHT': ('window.prompt.move_cursor', 'right'), + 'KEY_UP': ('window.prompt.scroll', 'up'), + 'KEY_DOWN': ('window.prompt.scroll', 'down'), + 'KEY_PGUP': ('window.log.scroll', 'up'), + 'KEY_PGDOWN': ('window.log.scroll', 'down'), + 'esc:91:49:59:51:68': ('window', 'left'), + 'esc:91:49:59:51:67': ('window', 'right'), + 'KEY_F1': ('window.paste',), +} +CMD_SHORTCUTS: dict[str, str] = {} + + +@dataclass +class TuiEvent(AffectiveEvent): + 'To affect TUI, and trigger flushed .draw_tainted on it.' + + def affect(self, target) -> None: + target.window.draw_tainted() + target.term.flush() + + +@dataclass +class _SetScreenEvent(TuiEvent): + + def affect(self, target: 'BaseTui') -> None: + target.term.calc_geometry() + for window in target.windows: + window.set_geometry() + super().affect(target) + + +class _YX(NamedTuple): + y: int + x: int + + +class _Widget(ABC): + + @abstractmethod + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self.tainted = True + self._drawable = False + + @abstractmethod + def set_geometry(self, measurements: _YX) -> bool: + 'Update widget\'s measurements, re-generate content where necessary.' + self.tainted = True + self._drawable = len([m for m in measurements if m < 0]) == 0 + return self._drawable + + @abstractmethod + def draw(self) -> bool: + 'Print widget\'s content in shape appropriate to set geometry.' + if not self._drawable: + return False + self.tainted = False + return True + + +class _ScrollableWidget(_Widget, ABC): + _history_idx: int + + def __init__(self, write: Callable[..., None], **kwargs) -> None: + super().__init__(**kwargs) + self._write = write + self._history: list[str] = [] + + def append(self, to_append: str) -> None: + 'Append to scrollable history.' + self._history += [to_append] + + @abstractmethod + def _scroll(self, up=True) -> None: + self.tainted = True + + def cmd__scroll(self, direction: str) -> None: + 'Scroll through stored content/history.' + self._scroll(up=direction == 'up') + + +class _LogWidget(_ScrollableWidget): + _view_size: _YX + _y_pgscroll: int + + def __init__(self, wrap: Callable[[str], list[str]], **kwargs + ) -> None: + super().__init__(**kwargs) + self._wrap = wrap + self._wrapped_idx = self._history_idx = -1 + self._wrapped: list[tuple[Optional[int], str]] = [] + + def _add_wrapped(self, idx_original, line) -> int: + wrapped_lines = self._wrap(line) + self._wrapped += [(idx_original, line) for line in wrapped_lines] + return len(wrapped_lines) + + def set_geometry(self, measurements: _YX) -> bool: + if not super().set_geometry(measurements): + return False + self._view_size = measurements + self._y_pgscroll = self._view_size.y // 2 + self._wrapped.clear() + self._wrapped += [(None, '')] * self._view_size.y + if not self._history: + return True + for idx_history, line in enumerate(self._history): + self._add_wrapped(idx_history, line) + wrapped_lines_for_history_idx = [ + t for t in self._wrapped + if t[0] == len(self._history) + self._history_idx] + idx_their_last = self._wrapped.index(wrapped_lines_for_history_idx[-1]) + self._wrapped_idx = idx_their_last - len(self._wrapped) + return True + + def append(self, to_append: str) -> None: + super().append(to_append) + self.tainted = True + if self._history_idx < -1: + self._history_idx -= 1 + if not self._drawable: + return + n_wrapped_lines = self._add_wrapped(len(self._history) - 1, to_append) + if self._wrapped_idx < -1: + self._wrapped_idx -= n_wrapped_lines + + def draw(self) -> bool: + if not super().draw(): + return False + start_idx = self._wrapped_idx - self._view_size.y + 1 + end_idx = self._wrapped_idx + to_write = [t[1] for t in self._wrapped[start_idx:end_idx]] + if self._wrapped_idx < -1: + scroll_info = f'vvv [{(-1) * self._wrapped_idx}] ' + scroll_info += 'v' * (self._view_size.x - len(scroll_info)) + to_write += [scroll_info] + else: + to_write += [self._wrapped[self._wrapped_idx][1]] + for i, line in enumerate(to_write): + self._write(line, i) + return True + + def _scroll(self, up: bool = True) -> None: + super()._scroll(up) + if not self._drawable: + return + if up: + self._wrapped_idx = max(self._view_size.y + 1 - len(self._wrapped), + self._wrapped_idx - self._y_pgscroll) + else: + self._wrapped_idx = min(-1, + self._wrapped_idx + self._y_pgscroll) + history_idx_to_wrapped_idx = self._wrapped[self._wrapped_idx][0] + if history_idx_to_wrapped_idx is not None: + self._history_idx = history_idx_to_wrapped_idx - len(self._history) + + +class _PromptWidget(_ScrollableWidget): + _y: int + _width: int + _history_idx: int = 0 + _input_buffer_unsafe: str + _cursor_x: int + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self.prefix = PROMPT_TEMPLATE + self._reset_buffer('') + + @property + def _input_buffer(self) -> str: + return self._input_buffer_unsafe[:] + + @_input_buffer.setter + def _input_buffer(self, content) -> None: + self.tainted = True + self._input_buffer_unsafe = content + + def set_geometry(self, measurements: _YX) -> bool: + if not super().set_geometry(measurements): + return False + self._y, self._width = measurements + return True + + def draw(self) -> bool: + if not super().draw(): + return False + prefix = self.prefix[:] + content = self._input_buffer + if self._cursor_x == len(self._input_buffer): + content += ' ' + half_width = (self._width - len(prefix)) // 2 + offset = 0 + if len(prefix) + len(content) > self._width\ + and self._cursor_x > half_width: + prefix += _PROMPT_ELL_IN + offset = min(len(prefix) + len(content) - self._width, + self._cursor_x - half_width + len(_PROMPT_ELL_IN)) + cursor_x_to_write = len(prefix) + self._cursor_x - offset + to_write = f'{prefix}{content[offset:]}' + if len(to_write) > self._width: + to_write = (to_write[:self._width - len(_PROMPT_ELL_OUT)] + + _PROMPT_ELL_OUT) + self._write(to_write[:cursor_x_to_write], self._y, padding=False) + self._write(to_write[cursor_x_to_write], attribute='reverse', + padding=False) + self._write(to_write[cursor_x_to_write + 1:]) + return True + + def _archive_prompt(self) -> None: + self.append(self._input_buffer) + self._reset_buffer('') + + def _scroll(self, up: bool = True) -> None: + super()._scroll(up) + if up and -(self._history_idx) < len(self._history): + if self._history_idx == 0 and self._input_buffer: + self._archive_prompt() + self._history_idx -= 1 + self._history_idx -= 1 + self._reset_buffer(self._history[self._history_idx]) + elif not up: + if self._history_idx < 0: + self._history_idx += 1 + if self._history_idx == 0: + self._reset_buffer('') + else: + self._reset_buffer(self._history[self._history_idx]) + elif self._input_buffer: + self._archive_prompt() + + def insert(self, to_insert: str) -> None: + 'Insert into prompt input buffer.' + self._cursor_x += len(to_insert) + self._input_buffer = (self._input_buffer[:self._cursor_x - 1] + + to_insert + + self._input_buffer[self._cursor_x - 1:]) + self._history_idx = 0 + + def cmd__backspace(self) -> None: + 'Truncate current content by one character, if possible.' + if self._cursor_x > 0: + self._cursor_x -= 1 + self._input_buffer = (self._input_buffer[:self._cursor_x] + + self._input_buffer[self._cursor_x + 1:]) + self._history_idx = 0 + + def cmd__move_cursor(self, direction: str) -> None: + 'Move cursor one space into direction ("left" or "right") if possible.' + if direction == 'left' and self._cursor_x > 0: + self._cursor_x -= 1 + elif direction == 'right'\ + and self._cursor_x < len(self._input_buffer): + self._cursor_x += 1 + else: + return + self.tainted = True + + def _reset_buffer(self, content: str) -> None: + self._input_buffer = content + self._cursor_x = len(self._input_buffer) + + def enter(self) -> str: + 'Return current content while also clearing and then redrawing.' + to_return = self._input_buffer[:] + if to_return: + self._archive_prompt() + return to_return + + +class Window(_Widget): + 'Widget filling entire screen with sub-widgets like .prompt, .log.' + _y_status: int + prompt: _PromptWidget + + def __init__(self, idx: int, term: 'Terminal', **kwargs) -> None: + super().__init__(**kwargs) + self.idx = idx + self._term = term + self.log = _LogWidget(wrap=self._term.wrap, write=self._term.write) + self.prompt = self.__annotations__['prompt'](write=self._term.write) + if hasattr(self._term, 'size'): + self.set_geometry() + + def set_geometry(self, _=None) -> bool: + assert _ is None + if self._term.size.y < _MIN_HEIGHT or self._term.size.x < _MIN_WIDTH: + bad_yx = _YX(-1, -1) + super().set_geometry(bad_yx) + self.log.set_geometry(bad_yx) + self.prompt.set_geometry(bad_yx) + return False + super().set_geometry(_YX(0, 0)) + self._y_status = self._term.size.y - 2 + self.log.set_geometry(_YX(self._y_status, self._term.size.x)) + self.prompt.set_geometry(_YX(self._term.size.y - 1, self._term.size.x)) + return True + + def draw(self) -> bool: + self._term.clear() + if not super().draw(): + if self._term.size.x > 0: + lines = [''] + for i, c in enumerate('screen too small'): + if i > 0 and 0 == i % self._term.size.x: + lines += [''] + lines[-1] += c + for y, line in enumerate(lines): + self._term.write(line, y) + return False + idx_box = f'[{self.idx}]' + status_line = idx_box + '=' * (self._term.size.x - len(idx_box)) + self.log.draw() + self._term.write(status_line, self._y_status) + self.prompt.draw() + return True + + def cmd__paste(self) -> None: + 'Write OSC 52 ? sequence to get encoded clipboard paste into stdin.' + self._term.write(f'\033{_OSC52_PREFIX.decode()}?{_PASTE_DELIMITER}', + self._y_status) + self.tainted = True + + def draw_tainted(self) -> None: + 'Draw tainted parts of self.' + if self.tainted: + self.draw() + return + for widget in [w for w in (self.log, self.prompt) if w.tainted]: + widget.draw() + + +@dataclass +class _KeyboardEvent(TuiEvent, PayloadMixin): + payload: str + + def affect(self, target: 'BaseTui') -> None: + if self.payload[0] == _CHAR_RESIZE: + _SetScreenEvent().affect(target) + return + if self.payload in _KEYBINDINGS: + cmd_data = _KEYBINDINGS[self.payload] + cmd = target.cmd_name_to_cmd(cmd_data[0]) + if cmd: + cmd(*cmd_data[1:]) + elif self.payload.startswith(_B64_PREFIX): + encoded = self.payload[len(_B64_PREFIX):] + to_paste = '' + for i, c in enumerate(b64decode(encoded).decode('utf-8')): + if i > 512: + break + if c.isprintable(): + to_paste += c + elif c.isspace(): + to_paste += ' ' + else: + to_paste += '#' + target.window.prompt.insert(to_paste) + elif len(self.payload) == 1: + target.window.prompt.insert(self.payload) + else: + target.log(f'# ALERT: unknown keyboard input: {self.payload}') + super().affect(target) + + +class BaseTui(QueueMixin): + 'Base for graphical user interface elements.' + + def __init__(self, term: 'Terminal', **kwargs) -> None: + super().__init__(**kwargs) + self.term = term + self._window_idx = 0 + self.windows = [Window(idx=self._window_idx, term=self.term)] + self._put(_SetScreenEvent()) + + def cmd_name_to_cmd(self, cmd_name: str) -> Optional[Callable]: + 'Map cmd_name to executable TUI element method.' + cmd_name = CMD_SHORTCUTS.get(cmd_name, cmd_name) + cmd_parent = self + while True: + cmd_name_toks = cmd_name.split('.', maxsplit=1) + if len(cmd_name_toks) == 1: + break + if not hasattr(cmd_parent, cmd_name_toks[0]): + return None + cmd_parent = getattr(cmd_parent, cmd_name_toks[0]) + cmd_name = cmd_name_toks[1] + cmd_name = f'cmd__{cmd_name}' + if not hasattr(cmd_parent, cmd_name): + return None + return getattr(cmd_parent, cmd_name) + + @property + def window(self) -> Window: + 'Currently selected Window.' + return self.windows[self._window_idx] + + def log(self, msg: str) -> None: + 'Post msg to active window\'s log.' + self.window.log.append(msg) + + def _switch_window(self, idx: int) -> None: + self._window_idx = idx + self.window.draw() + + def cmd__prompt_enter(self) -> None: + 'Get prompt content from .window.prompt.enter, parse to & run command.' + to_parse = self.window.prompt.enter() + if not to_parse: + return + alert: Optional[str] = None + if to_parse[0:1] == '/': + toks = to_parse[1:].split(maxsplit=1) + alert = f'{toks[0]} unknown' + cmd = self.cmd_name_to_cmd(toks[0]) + if cmd and cmd.__name__ != stack()[0].function: + params = signature(cmd).parameters + n_args_max = len(params) + n_args_min = len([p for p in params.values() + if p.default == inspect_empty]) + alert = f'{cmd.__name__} needs between {n_args_min} and '\ + f'{n_args_max} args' + if len(toks) == 1 and not n_args_min: + alert = cmd() + elif len(toks) > 1 and params\ + and n_args_min <= len(toks[1].split()): + args = [] + while len(toks) > 1 and n_args_max: + toks = toks[1].split(maxsplit=1) + args += [toks[0]] + n_args_max -= 1 + alert = cmd(*args) + else: + alert = 'not prefixed by /' + if alert: + self.log(f'# ALERT: invalid prompt command: {alert}') + + def cmd__quit(self) -> None: + 'Trigger program exit.' + self._put(QuitEvent()) + + def cmd__window(self, towards: str) -> Optional[str]: + 'Switch window selection.' + n_windows = len(self.windows) + if n_windows < 2: + return 'no alternate window to move into' + if towards in {'left', 'right'}: + multiplier = (+1) if towards == 'right' else (-1) + window_idx = self._window_idx + multiplier + if not 0 <= window_idx < n_windows: + window_idx -= multiplier * n_windows + elif not towards.isdigit(): + return f'neither "left"/"right" nor integer: {towards}' + else: + window_idx = int(towards) + if not 0 <= window_idx < n_windows: + return f'unavailable window idx: {window_idx}' + self._switch_window(window_idx) + return None + + +class Terminal(QueueMixin): + 'Abstraction of terminal interface.' + size: _YX + _cursor_yx_: _YX + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self._blessed = BlessedTerminal() + self._cursor_yx = _YX(0, 0) + + @contextmanager + def setup(self) -> Generator: + 'Combine multiple contexts into one and run keypress loop.' + signal(SIGWINCH, lambda *_: self._put(_SetScreenEvent())) + self.clear() + with (self._blessed.raw(), + self._blessed.fullscreen(), + self._blessed.hidden_cursor(), + Loop(iterator=self._get_keypresses(), q_out=self.q_out)): + yield self + + @property + def _cursor_yx(self) -> _YX: + return self._cursor_yx_ + + @_cursor_yx.setter + def _cursor_yx(self, yx: _YX) -> None: + print(self._blessed.move_yx(yx.y, yx.x), end='') + self._cursor_yx_ = yx + + def calc_geometry(self) -> None: + '(Re-)calculate .size..' + self.size = _YX(self._blessed.height, self._blessed.width) + + def clear(self) -> None: + 'Clear terminal.' + print(self._blessed.clear, end='') + + def flush(self) -> None: + 'Flush terminal.' + print('', end='', flush=True) + + def wrap(self, line: str) -> list[str]: + 'Wrap line to list of lines fitting into terminal width.' + return self._blessed.wrap(line, width=self.size.x, + subsequent_indent=' '*4) + + def write(self, + msg: str = '', + start_y: Optional[int] = None, + attribute: Optional[str] = None, + padding: bool = True + ) -> None: + 'Print to terminal, with position, padding to line end, attributes.' + if start_y is not None: + self._cursor_yx = _YX(start_y, 0) + # ._blessed.length can slow down things notably: only use where needed! + end_x = self._cursor_yx.x + (len(msg) if msg.isascii() + else self._blessed.length(msg)) + len_padding = self.size.x - end_x + if len_padding < 0: + msg = self._blessed.truncate(msg, self.size.x - self._cursor_yx.x) + elif padding: + msg += ' ' * len_padding + end_x = self.size.x + if attribute: + msg = getattr(self._blessed, attribute)(msg) + print(msg, end='') + self._cursor_yx = _YX(self._cursor_yx.y, end_x) + + def _get_keypresses(self) -> Iterator[Optional[_KeyboardEvent]]: + '''Loop through keypresses from terminal, expand blessed's handling. + + Explicitly collect KEY_ESCAPE-modified key sequences, and recognize + OSC52-prefixed pastables to return the respective base64 code, + prefixed with _B64_PREFIX. + ''' + while True: + to_yield = '' + ks = self._blessed.inkey( + timeout=_TIMEOUT_KEYPRESS_LOOP, # how long until yield None, + esc_delay=0) # incl. until thread dies + if ks.name != 'KEY_ESCAPE': + to_yield = f'{ks.name if ks.name else ks}' + else: + chars = b'' + while (new_chars := self._blessed.inkey(timeout=0, esc_delay=0 + ).encode('utf-8')): + chars += new_chars + len_prefix = len(_OSC52_PREFIX) + if chars[:len_prefix] == _OSC52_PREFIX: + to_yield = _B64_PREFIX[:] + # sometimes, prev .inkey got some or all (including paste + # delimiter) of the paste code (maybe even more), so first + # harvest potential remains of chars post prefix … + caught_delimiter = False + post_prefix_str = chars[len_prefix:].decode('utf-8') + for idx, c in enumerate(post_prefix_str): + if c == _PASTE_DELIMITER: + caught_delimiter = True + if (remains := post_prefix_str[idx + 1:]): + self._blessed.ungetch(remains) + break + to_yield += c + # … before .getch() further until expected delimiter found + if not caught_delimiter: + while (c := self._blessed.getch()) != _PASTE_DELIMITER: + to_yield += c + else: + to_yield = 'esc:' + ':'.join([str(int(b)) for b in chars]) + yield _KeyboardEvent(to_yield) if to_yield else None