'Attempt at an IRC client.'
from queue import SimpleQueue
from ircplom.events import ExceptionEvent, QuitEvent
-from ircplom.irc_conn import ClientsDb, ClientEvent, NewClientEvent
-from ircplom.tui import Terminal, Tui, TuiEvent
+from ircplom.client import ClientsDb, ClientEvent, NewClientEvent
+from ircplom.tui_base import Terminal, TuiEvent
+from ircplom.client_tui import ClientTui
def main_loop() -> None:
clients_db: ClientsDb = {}
try:
with Terminal(q_out=q_events).setup() as term:
- tui = Tui(q_out=q_events, term=term)
+ tui = ClientTui(q_out=q_events, term=term)
while True:
event = q_events.get()
if isinstance(event, QuitEvent):
--- /dev/null
+'High-level IRC protocol / server connection management.'
+# built-ins
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from getpass import getuser
+from threading import Thread
+from typing import Optional
+from uuid import UUID, uuid4
+# ourselves
+from ircplom.events import (AffectiveEvent, ExceptionEvent, PayloadMixin,
+ QueueMixin)
+from ircplom.irc_conn import (BaseIrcConnection, IrcConnAbortException,
+ IrcMessage)
+
+ClientsDb = dict[UUID, 'Client']
+CHAT_GLOB = '*'
+
+
+@dataclass
+class ClientIdMixin:
+ 'Collects a Client\'s ID at .client_id.'
+ client_id: UUID
+
+
+@dataclass
+class ClientEvent(AffectiveEvent, ClientIdMixin):
+ 'To affect Client identified by ClientIdMixin.'
+
+
+class _IrcConnection(BaseIrcConnection):
+
+ def __init__(self, client_id: UUID, **kwargs) -> None:
+ # TODO: find out why I can't just ClientIdMixin here
+ self.client_id = client_id
+ super().__init__(**kwargs)
+
+ def _make_recv_event(self, msg: IrcMessage) -> ClientEvent:
+
+ @dataclass
+ class _RecvEvent(ClientEvent, PayloadMixin):
+ payload: IrcMessage
+
+ def affect(self, target: 'Client') -> None:
+ target.handle_msg(self.payload)
+
+ return _RecvEvent(client_id=self.client_id, payload=msg)
+
+
+@dataclass
+class ClientQueueMixin(QueueMixin):
+ 'To QueueMixin adds _cput to extend ._put with client_id= setting.'
+ client_id_name = 'id_'
+
+ def _cput(self, event_class, **kwargs) -> None:
+ self._put(event_class(client_id=getattr(self, self.client_id_name),
+ **kwargs))
+
+
+@dataclass
+class _ServerCapability:
+ 'Store data collectable via CAPS LS/LIST/NEW.'
+ enabled: bool
+ data: str
+
+ def str_for_log(self, name: str) -> str:
+ 'Optimized for Client.log per-line listing.'
+ listing = '+' if self.enabled else '-'
+ listing += f' {name}'
+ if self.data:
+ listing += f' ({self.data})'
+ return listing
+
+
+@dataclass
+class IrcConnSetup:
+ 'All we need to know to set up a new Client connection.'
+ hostname: str
+ nickname: str
+ realname: str
+
+
+class Client(ABC, ClientQueueMixin):
+ 'Abstracts socket connection, loop over it, and handling messages from it.'
+ nick_confirmed: bool = False
+ conn: Optional[_IrcConnection] = None
+
+ def __init__(self, conn_setup: IrcConnSetup, **kwargs) -> None:
+ super().__init__(**kwargs)
+ self.conn_setup = conn_setup
+ self._cap_neg_states: dict[str, bool] = {}
+ self.caps: dict[str, _ServerCapability] = {}
+ self.id_ = uuid4()
+ self.update_login(nick_confirmed=False,
+ nickname=self.conn_setup.nickname)
+ self.start_connecting()
+
+ def start_connecting(self) -> None:
+ 'Start thread to set up _IrcConnection at .conn.'
+
+ def connect(self) -> None:
+
+ @dataclass
+ class _ConnectedEvent(ClientEvent):
+ def affect(self, target: 'Client') -> None:
+ target.on_connect()
+
+ try:
+ self.conn = _IrcConnection(hostname=self.conn_setup.hostname,
+ q_out=self.q_out,
+ client_id=self.id_)
+ self._cput(_ConnectedEvent)
+ except IrcConnAbortException as e:
+ self.log(f'# ALERT: {e}')
+ except Exception as e: # pylint: disable=broad-exception-caught
+ self._put(ExceptionEvent(e))
+
+ Thread(target=connect, daemon=True, args=(self,)).start()
+
+ def on_connect(self) -> None:
+ 'Steps to perform right after connection.'
+ self.log(msg='# connected to server', chat=CHAT_GLOB)
+ self.try_send_cap('LS', ('302',))
+ self.send(IrcMessage(verb='USER',
+ params=(getuser(), '0', '*',
+ self.conn_setup.realname)))
+ self.send(IrcMessage(verb='NICK', params=(self.conn_setup.nickname,)))
+
+ def cap_neg_done(self, negotiation_step: str) -> bool:
+ 'Whether negotiation_step is registered as finished.'
+ return self._cap_neg_states.get(negotiation_step, False)
+
+ def cap_neg(self, negotiation_step: str) -> bool:
+ 'Whether negotiation_step is registered at all (started or finished).'
+ return negotiation_step in self._cap_neg_states
+
+ def cap_neg_set(self, negotiation_step: str, done: bool = False) -> None:
+ 'Declare negotiation_step started, or (if done) finished.'
+ self._cap_neg_states[negotiation_step] = done
+
+ def try_send_cap(self, *params, key_fused: bool = False) -> None:
+ 'Run CAP command with params, handle cap neg. state.'
+ neg_state_key = ':'.join(params) if key_fused else params[0]
+ if self.cap_neg(neg_state_key):
+ return
+ self.send(IrcMessage(verb='CAP', params=params))
+ self.cap_neg_set(neg_state_key)
+
+ def collect_caps(self, params: tuple[str, ...]) -> None:
+ 'Record available and enabled server capabilities.'
+ verb = params[0]
+ items = params[-1].strip().split()
+ is_final_line = params[1] != '*'
+ if self.cap_neg_done(verb):
+ if verb == 'LS':
+ self.caps.clear()
+ else:
+ for cap in self.caps.values():
+ cap.enabled = False
+ self.cap_neg_set(verb)
+ for item in items:
+ if verb == 'LS':
+ splitted = item.split('=', maxsplit=1)
+ self.caps[splitted[0]] = _ServerCapability(
+ enabled=False, data=''.join(splitted[1:]))
+ else:
+ self.caps[item].enabled = True
+ if is_final_line:
+ self.cap_neg_set(verb, done=True)
+
+ @abstractmethod
+ def log(self, msg: str, chat: str = '') -> None:
+ '''Write msg into log of chat, whatever shape that may have.
+
+ Messages to chat=CHAT_GLOB are meant to appear in all widgets mapped to
+ the client, those to chat="" only in the initial connection window.
+ '''
+
+ def send(self, msg: IrcMessage, chat: str = '') -> None:
+ 'Send line-separator-delimited message over socket.'
+ if not self.conn:
+ self.log('# ALERT: cannot send, connection seems closed')
+ return
+ self.conn.send(msg)
+ self.log(msg=f'> {msg.raw}', chat=chat)
+ self.log(msg=f'=>| {msg.raw}', chat=':raw')
+
+ def update_login(self, nick_confirmed: bool, nickname: str = '') -> None:
+ '''Manage conn_setup..nickname, .nick_confirmed.
+
+ (Useful for subclass extension.)
+ '''
+ first_run = not hasattr(self.conn_setup, 'nickname')
+ prefix = '# nickname'
+ if first_run or (nickname and nickname != self.conn_setup.nickname):
+ verb = ('set' if first_run
+ else f'changed from "{self.conn_setup.nickname}"')
+ self.conn_setup.nickname = nickname
+ self.log(msg=f'{prefix} {verb} to "{nickname}"', chat=CHAT_GLOB)
+ if first_run or nick_confirmed != self.nick_confirmed:
+ self.nick_confirmed = nick_confirmed
+ if not first_run:
+ self.log(f'{prefix} {"" if nick_confirmed else "un"}confirmed')
+
+ def close(self) -> None:
+ 'Close both recv Loop and socket.'
+ self.log(msg='# disconnecting from server', chat=CHAT_GLOB)
+ if self.conn:
+ self.conn.close()
+ self.conn = None
+ self.update_login(nick_confirmed=False)
+
+ def handle_msg(self, msg: IrcMessage) -> None:
+ 'Process incoming msg towards appropriate client steps.'
+ self.log(f'<-| {msg.raw}', ':raw')
+ match msg.verb:
+ case 'PING':
+ self.send(IrcMessage(verb='PONG', params=(msg.params[0],)))
+ case 'ERROR':
+ self.close()
+ case '001' | 'NICK':
+ self.update_login(nickname=msg.params[0], nick_confirmed=True)
+ case 'PRIVMSG':
+ self.log(msg=str(msg.params), chat=msg.source)
+ case 'CAP':
+ match msg.params[1]:
+ case 'LS' | 'LIST':
+ self.collect_caps(msg.params[1:])
+ case 'ACK' | 'NAK':
+ cap_names = msg.params[-1].split()
+ for cap_name in cap_names:
+ self.cap_neg_set(f'REQ:{cap_name}', done=True)
+ self.caps[cap_name].enabled = (msg.params[1]
+ == 'ACK')
+ if self.cap_neg_done('LIST'):
+ self.try_send_cap('END')
+ if not self.cap_neg('printing'):
+ self.log('# server capabilities (enabled: "+"):')
+ for cap_name, cap in self.caps.items():
+ self.log('# ' + cap.str_for_log(cap_name))
+ self.cap_neg_set('printing', done=True)
+ elif self.cap_neg_done('LS'):
+ for cap_name in ('server-time', 'account-tag', 'sasl'):
+ if (cap_name in self.caps
+ and (not self.caps[cap_name].enabled)):
+ self.try_send_cap('REQ', cap_name, key_fused=True)
+ self.try_send_cap('LIST')
+
+
+@dataclass
+class NewClientEvent(AffectiveEvent, PayloadMixin):
+ 'Put Client .payload into ClientsDb target.'
+ payload: 'Client'
+
+ def affect(self, target: ClientsDb) -> None:
+ target[self.payload.id_] = self.payload
+
+
+@dataclass
+class InitReconnectEvent(ClientEvent):
+ 'To trigger re-opening of connection.'
+
+ def affect(self, target: 'Client') -> None:
+ if target.conn:
+ target.log('# ALERT: reconnection called, but still seem '
+ 'connected, so nothing to do.')
+ else:
+ target.start_connecting()
+
+
+@dataclass
+class SendEvent(ClientEvent, PayloadMixin):
+ 'To trigger sending of payload to server.'
+ payload: IrcMessage
+ chat: str = ''
+
+ def affect(self, target: 'Client') -> None:
+ target.send(msg=self.payload, chat=self.chat)
--- /dev/null
+'TUI adaptions to Client.'
+# built-ins
+from dataclasses import dataclass
+from uuid import UUID
+# ourselves
+from ircplom.events import PayloadMixin
+from ircplom.tui_base import (BaseTui, Window, TuiEvent, CMD_SHORTCUTS,
+ PROMPT_TEMPLATE)
+from ircplom.irc_conn import IrcMessage
+from ircplom.client import (CHAT_GLOB, IrcConnSetup, Client,
+ ClientIdMixin, ClientQueueMixin,
+ InitReconnectEvent, NewClientEvent, SendEvent)
+
+
+CMD_SHORTCUTS['disconnect'] = 'window.disconnect'
+CMD_SHORTCUTS['nick'] = 'window.nick'
+CMD_SHORTCUTS['privmsg'] = 'window.privmsg'
+CMD_SHORTCUTS['reconnect'] = 'window.reconnect'
+
+
+class _ClientWindow(Window, ClientQueueMixin):
+ client_id_name = 'client_id'
+
+ def __init__(self, client_id: UUID, chat: str = '', **kwargs) -> None:
+ self.client_id = client_id
+ self.chat = chat
+ super().__init__(**kwargs)
+
+ def cmd__disconnect(self, quit_msg: str = 'ircplom says bye') -> None:
+ 'Send QUIT command to server.'
+ self._cput(SendEvent,
+ payload=IrcMessage(verb='QUIT', params=(quit_msg,)))
+
+ def cmd__reconnect(self) -> None:
+ 'Attempt reconnection.'
+ self._cput(InitReconnectEvent)
+
+ def cmd__nick(self, new_nick: str) -> None:
+ 'Attempt nickname change.'
+ self._cput(SendEvent,
+ payload=IrcMessage(verb='NICK', params=(new_nick,)))
+
+ def cmd__privmsg(self, target: str, msg: str) -> None:
+ 'Send chat message msg to target.'
+ self._cput(SendEvent, chat=target,
+ payload=IrcMessage(verb='PRIVMSG', params=(target, msg)))
+
+
+class ClientTui(BaseTui):
+ 'TUI expanded towards Client features.'
+
+ def _new_client_window(self, client_id: UUID, chat: str = ''
+ ) -> _ClientWindow:
+ new_idx = len(self.windows)
+ win = _ClientWindow(idx=new_idx, term=self.term, q_out=self.q_out,
+ client_id=client_id, chat=chat)
+ self.windows += [win]
+ self._switch_window(new_idx)
+ return win
+
+ def client_wins(self, client_id: UUID) -> list[_ClientWindow]:
+ 'All _ClientWindows matching client_id; if none, create one.'
+ wins = [win for win in self.windows
+ if isinstance(win, _ClientWindow)
+ and win.client_id == client_id] # pylint: disable=no-member
+ if not wins:
+ wins = [self._new_client_window(client_id=client_id)]
+ return wins
+
+ def client_win(self, client_id: UUID, chat: str = '') -> _ClientWindow:
+ '''That _ClientWindow matching client_id and chat; create if none.
+
+ In case of creation, copy prompt prefix from client's first window.
+ '''
+ client_wins = self.client_wins(client_id)
+ candidates = [win for win in client_wins if win.chat == chat]
+ if candidates:
+ return candidates[0]
+ win = self._new_client_window(client_id=client_id, chat=chat)
+ if client_wins:
+ win.prompt.prefix = client_wins[0].prompt.prefix
+ return win
+
+ def cmd__connect(self, hostname: str, nickname: str, realname: str
+ ) -> None:
+ 'Create Client and pass it via NewClientEvent.'
+ self._put(NewClientEvent(
+ _ClientKnowingTui(
+ q_out=self.q_out,
+ conn_setup=IrcConnSetup(hostname=hostname, nickname=nickname,
+ realname=realname))))
+
+
+@dataclass
+class _ClientWindowEvent(TuiEvent, ClientIdMixin):
+ chat: str = ''
+
+
+@dataclass
+class _ClientLogEvent(_ClientWindowEvent, PayloadMixin):
+ payload: str
+
+ def affect(self, target: ClientTui) -> None:
+ if self.chat == CHAT_GLOB:
+ for win in target.client_wins(self.client_id):
+ win.log.append(self.payload)
+ else:
+ target.client_win(self.client_id, self.chat
+ ).log.append(self.payload)
+ super().affect(target)
+
+
+@dataclass
+class _ClientPromptEvent(_ClientWindowEvent, PayloadMixin):
+ payload: tuple[str, str]
+
+ def affect(self, target: ClientTui) -> None:
+ new_prefix = ((' ' if self.payload[0] else '?')
+ + f'{self.payload[1]}{PROMPT_TEMPLATE}')
+ for win in target.client_wins(self.client_id):
+ prompt = win.prompt
+ prompt.prefix = new_prefix
+ prompt.tainted = True
+ super().affect(target)
+
+
+class _ClientKnowingTui(Client):
+
+ def log(self, msg: str, chat: str = '') -> None:
+ self._cput(_ClientLogEvent, chat=chat, payload=msg)
+
+ def update_login(self, nick_confirmed: bool, nickname: str = '') -> None:
+ super().update_login(nick_confirmed, nickname)
+ self._cput(_ClientPromptEvent, payload=(self.nick_confirmed,
+ self.conn_setup.nickname))
-'IRC server connection management.'
+'Low-level IRC protocol / server connection management.'
# built-ins
from abc import ABC, abstractmethod
-from dataclasses import dataclass
-from getpass import getuser
from socket import socket, gaierror as socket_gaierror
-from threading import Thread
from typing import Callable, Iterator, NamedTuple, Optional, Self
-from uuid import uuid4, UUID
# ourselves
-from ircplom.events import (
- AffectiveEvent, ExceptionEvent, Loop, PayloadMixin, QueueMixin)
+from ircplom.events import Event, Loop, QueueMixin
-ClientsDb = dict[UUID, 'Client']
-
-CHAT_GLOB = '*'
-
_TIMEOUT_RECV_LOOP = 0.1
_TIMEOUT_CONNECT = 5
_CONN_RECV_BUFSIZE = 1024
return self._raw
-class _IrcConnAbortException(Exception):
- pass
+class IrcConnAbortException(Exception):
+ 'Thrown by BaseIrcConnection on expectable connection failures.'
-class _IrcConnection(QueueMixin):
+class BaseIrcConnection(QueueMixin, ABC):
'Collects low-level server-client connection management.'
- def __init__(self, hostname: str, client_id: UUID, **kwargs) -> None:
+ def __init__(self, hostname: str, **kwargs) -> None:
super().__init__(**kwargs)
- self.client_id = client_id
self._socket = socket()
self._socket.settimeout(_TIMEOUT_CONNECT)
try:
self._socket.connect((hostname, _PORT))
except (TimeoutError, socket_gaierror) as e:
- raise _IrcConnAbortException(e) from e
+ raise IrcConnAbortException(e) from e
self._socket.settimeout(_TIMEOUT_RECV_LOOP)
self._recv_loop = Loop(iterator=self._read_lines(), q_out=self.q_out)
'Send line-separator-delimited message over socket.'
self._socket.sendall(msg.raw.encode('utf-8') + _IRCSPEC_LINE_SEPARATOR)
- def _read_lines(self) -> Iterator[Optional['_RecvEvent']]:
+ @abstractmethod
+ def _make_recv_event(self, msg: IrcMessage) -> Event:
+ pass
+
+ def _read_lines(self) -> Iterator[Optional[Event]]:
assert self._socket is not None
bytes_total = b''
buffer_linesep = b''
buffer_linesep += c_byted
if buffer_linesep == _IRCSPEC_LINE_SEPARATOR:
buffer_linesep = b''
- yield _RecvEvent(client_id=self.client_id,
- payload=IrcMessage.from_raw(
- bytes_total.decode('utf-8')))
+ yield self._make_recv_event(
+ IrcMessage.from_raw(bytes_total.decode('utf-8')))
bytes_total = b''
-
-
-@dataclass
-class ClientIdMixin:
- 'Collects a Client\'s ID at .client_id.'
- client_id: UUID
-
-
-@dataclass
-class NewClientEvent(AffectiveEvent, PayloadMixin):
- 'Put Client .payload into ClientsDb target.'
- payload: 'Client'
-
- def affect(self, target: ClientsDb) -> None:
- target[self.payload.id_] = self.payload
-
-
-@dataclass
-class ClientEvent(AffectiveEvent, ClientIdMixin):
- 'To affect Client identified by ClientIdMixin.'
-
-
-@dataclass
-class _ConnectedEvent(ClientEvent):
-
- def affect(self, target: 'Client') -> None:
- target.log(msg='# connected to server', chat=CHAT_GLOB)
- target.try_send_cap('LS', ('302',))
- target.send(IrcMessage(verb='USER',
- params=(getuser(), '0', '*',
- target.conn_setup.realname)))
- target.send(IrcMessage(verb='NICK',
- params=(target.conn_setup.nickname,)))
-
-
-@dataclass
-class InitReconnectEvent(ClientEvent):
- 'To trigger re-opening of connection.'
-
- def affect(self, target: 'Client') -> None:
- if target.conn:
- target.log('# ALERT: reconnection called, but still seem '
- 'connected, so nothing to do.')
- else:
- target.start_connecting()
-
-
-@dataclass
-class SendEvent(ClientEvent, PayloadMixin):
- 'To trigger sending of payload to server.'
- payload: IrcMessage
- chat: str = ''
-
- def affect(self, target: 'Client') -> None:
- target.send(msg=self.payload, chat=self.chat)
-
-
-@dataclass
-class _RecvEvent(ClientEvent, PayloadMixin):
- payload: IrcMessage
-
- def affect(self, target: 'Client') -> None:
- target.handle_msg(self.payload)
-
-
-@dataclass
-class ClientQueueMixin(QueueMixin):
- 'To QueueMixin adds _cput to extend ._put with client_id= setting.'
- client_id_name = 'id_'
-
- def _cput(self, event_class, **kwargs) -> None:
- self._put(event_class(client_id=getattr(self, self.client_id_name),
- **kwargs))
-
-
-@dataclass
-class ServerCapability:
- 'Store data collectable via CAPS LS/LIST/NEW.'
- enabled: bool
- data: str
-
- def str_for_log(self, name: str) -> str:
- 'Optimized for Client.log per-line listing.'
- listing = '+' if self.enabled else '-'
- listing += f' {name}'
- if self.data:
- listing += f' ({self.data})'
- return listing
-
-
-@dataclass
-class IrcConnSetup:
- 'All we need to know to set up a new Client connection.'
- hostname: str
- nickname: str
- realname: str
-
-
-class Client(ABC, ClientQueueMixin):
- 'Abstracts socket connection, loop over it, and handling messages from it.'
- nick_confirmed: bool = False
- conn: Optional[_IrcConnection] = None
-
- def __init__(self, conn_setup: IrcConnSetup, **kwargs) -> None:
- super().__init__(**kwargs)
- self.conn_setup = conn_setup
- self._cap_neg_states: dict[str, bool] = {}
- self.caps: dict[str, ServerCapability] = {}
- self.id_ = uuid4()
- self.update_login(nick_confirmed=False,
- nickname=self.conn_setup.nickname)
- self.start_connecting()
-
- def start_connecting(self) -> None:
- 'Start thread to set up IrcConnection at .conn.'
-
- def connect(self) -> None:
- try:
- self.conn = _IrcConnection(hostname=self.conn_setup.hostname,
- q_out=self.q_out,
- client_id=self.id_)
- self._cput(_ConnectedEvent)
- except _IrcConnAbortException as e:
- self.log(f'# ALERT: {e}')
- except Exception as e: # pylint: disable=broad-exception-caught
- self._put(ExceptionEvent(e))
-
- Thread(target=connect, daemon=True, args=(self,)).start()
-
- def cap_neg_done(self, negotiation_step: str) -> bool:
- 'Whether negotiation_step is registered as finished.'
- return self._cap_neg_states.get(negotiation_step, False)
-
- def cap_neg(self, negotiation_step: str) -> bool:
- 'Whether negotiation_step is registered at all (started or finished).'
- return negotiation_step in self._cap_neg_states
-
- def cap_neg_set(self, negotiation_step: str, done: bool = False) -> None:
- 'Declare negotiation_step started, or (if done) finished.'
- self._cap_neg_states[negotiation_step] = done
-
- def try_send_cap(self, *params, key_fused: bool = False) -> None:
- 'Run CAP command with params, handle cap neg. state.'
- neg_state_key = ':'.join(params) if key_fused else params[0]
- if self.cap_neg(neg_state_key):
- return
- self.send(IrcMessage(verb='CAP', params=params))
- self.cap_neg_set(neg_state_key)
-
- def collect_caps(self, params: tuple[str, ...]) -> None:
- 'Record available and enabled server capabilities.'
- verb = params[0]
- items = params[-1].strip().split()
- is_final_line = params[1] != '*'
- if self.cap_neg_done(verb):
- if verb == 'LS':
- self.caps.clear()
- else:
- for cap in self.caps.values():
- cap.enabled = False
- self.cap_neg_set(verb)
- for item in items:
- if verb == 'LS':
- splitted = item.split('=', maxsplit=1)
- self.caps[splitted[0]] = ServerCapability(
- enabled=False, data=''.join(splitted[1:]))
- else:
- self.caps[item].enabled = True
- if is_final_line:
- self.cap_neg_set(verb, done=True)
-
- @abstractmethod
- def log(self, msg: str, chat: str = '') -> None:
- '''Write msg into log of chat, whatever shape that may have.
-
- Messages to chat=CHAT_GLOB are meant to appear in all widgets mapped to
- the client, those to chat="" only in the initial connection window.
- '''
-
- def send(self, msg: IrcMessage, chat: str = '') -> None:
- 'Send line-separator-delimited message over socket.'
- if not self.conn:
- self.log('# ALERT: cannot send, connection seems closed')
- return
- self.conn.send(msg)
- self.log(msg=f'> {msg.raw}', chat=chat)
- self.log(msg=f'=>| {msg.raw}', chat=':raw')
-
- def update_login(self, nick_confirmed: bool, nickname: str = '') -> None:
- '''Manage conn_setup..nickname, .nick_confirmed.
-
- (Useful for subclass extension.)
- '''
- first_run = not hasattr(self.conn_setup, 'nickname')
- prefix = '# nickname'
- if first_run or (nickname and nickname != self.conn_setup.nickname):
- verb = ('set' if first_run
- else f'changed from "{self.conn_setup.nickname}"')
- self.conn_setup.nickname = nickname
- self.log(msg=f'{prefix} {verb} to "{nickname}"', chat=CHAT_GLOB)
- if first_run or nick_confirmed != self.nick_confirmed:
- self.nick_confirmed = nick_confirmed
- if not first_run:
- self.log(f'{prefix} {"" if nick_confirmed else "un"}confirmed')
-
- def close(self) -> None:
- 'Close both recv Loop and socket.'
- self.log(msg='# disconnecting from server', chat=CHAT_GLOB)
- if self.conn:
- self.conn.close()
- self.conn = None
- self.update_login(nick_confirmed=False)
-
- def handle_msg(self, msg: IrcMessage) -> None:
- 'Process incoming msg towards appropriate client steps.'
- self.log(f'<-| {msg.raw}', ':raw')
- match msg.verb:
- case 'PING':
- self.send(IrcMessage(verb='PONG', params=(msg.params[0],)))
- case 'ERROR':
- self.close()
- case '001' | 'NICK':
- self.update_login(nickname=msg.params[0], nick_confirmed=True)
- case 'PRIVMSG':
- self.log(msg=str(msg.params), chat=msg.source)
- case 'CAP':
- match msg.params[1]:
- case 'LS' | 'LIST':
- self.collect_caps(msg.params[1:])
- case 'ACK' | 'NAK':
- cap_names = msg.params[-1].split()
- for cap_name in cap_names:
- self.cap_neg_set(f'REQ:{cap_name}', done=True)
- self.caps[cap_name].enabled = (msg.params[1]
- == 'ACK')
- if self.cap_neg_done('LIST'):
- self.try_send_cap('END')
- if not self.cap_neg('printing'):
- self.log('# server capabilities (enabled: "+"):')
- for cap_name, cap in self.caps.items():
- self.log('# ' + cap.str_for_log(cap_name))
- self.cap_neg_set('printing', done=True)
- elif self.cap_neg_done('LS'):
- for cap_name in ('server-time', 'account-tag', 'sasl'):
- if (cap_name in self.caps
- and (not self.caps[cap_name].enabled)):
- self.try_send_cap('REQ', cap_name, key_fused=True)
- self.try_send_cap('LIST')
+++ /dev/null
-'Terminal and TUI management.'
-# built-ins
-from abc import ABC, abstractmethod
-from base64 import b64decode
-from contextlib import contextmanager
-from dataclasses import dataclass
-from inspect import _empty as inspect_empty, signature, stack
-from signal import SIGWINCH, signal
-from typing import Callable, Generator, Iterator, NamedTuple, Optional
-from uuid import UUID
-# requirements.txt
-from blessed import Terminal as BlessedTerminal
-# ourselves
-from ircplom.events import (
- AffectiveEvent, Loop, PayloadMixin, QueueMixin, QuitEvent)
-from ircplom.irc_conn import (
- CHAT_GLOB, IrcConnSetup, IrcMessage, Client, ClientIdMixin,
- ClientQueueMixin, InitReconnectEvent, NewClientEvent, SendEvent)
-
-_MIN_HEIGHT = 4
-_MIN_WIDTH = 32
-
-_TIMEOUT_KEYPRESS_LOOP = 0.5
-_B64_PREFIX = 'b64:'
-_OSC52_PREFIX = b']52;c;'
-_PASTE_DELIMITER = '\007'
-
-_PROMPT_TEMPLATE = '> '
-_PROMPT_ELL_IN = '<…'
-_PROMPT_ELL_OUT = '…>'
-
-_CHAR_RESIZE = chr(12)
-_KEYBINDINGS = {
- 'KEY_BACKSPACE': ('window.prompt.backspace',),
- 'KEY_ENTER': ('prompt_enter',),
- 'KEY_LEFT': ('window.prompt.move_cursor', 'left'),
- 'KEY_RIGHT': ('window.prompt.move_cursor', 'right'),
- 'KEY_UP': ('window.prompt.scroll', 'up'),
- 'KEY_DOWN': ('window.prompt.scroll', 'down'),
- 'KEY_PGUP': ('window.log.scroll', 'up'),
- 'KEY_PGDOWN': ('window.log.scroll', 'down'),
- 'esc:91:49:59:51:68': ('window', 'left'),
- 'esc:91:49:59:51:67': ('window', 'right'),
- 'KEY_F1': ('window.paste',),
-}
-_CMD_SHORTCUTS = {
- 'disconnect': 'window.disconnect',
- 'nick': 'window.nick',
- 'privmsg': 'window.privmsg',
- 'reconnect': 'window.reconnect'
-}
-
-
-@dataclass
-class TuiEvent(AffectiveEvent):
- 'To affect TUI, and trigger flushed .draw_tainted on it.'
-
- def affect(self, target: 'Tui') -> None:
- target.window.draw_tainted()
- target.term.flush()
-
-
-@dataclass
-class _SetScreenEvent(TuiEvent):
-
- def affect(self, target: 'Tui') -> None:
- target.term.calc_geometry()
- for window in target.windows:
- window.set_geometry()
- super().affect(target)
-
-
-class _YX(NamedTuple):
- y: int
- x: int
-
-
-class _Widget(ABC):
-
- @abstractmethod
- def __init__(self, **kwargs) -> None:
- super().__init__(**kwargs)
- self.tainted = True
- self._drawable = False
-
- @abstractmethod
- def set_geometry(self, measurements: _YX) -> bool:
- 'Update widget\'s measurements, re-generate content where necessary.'
- self.tainted = True
- self._drawable = len([m for m in measurements if m < 0]) == 0
- return self._drawable
-
- @abstractmethod
- def draw(self) -> bool:
- 'Print widget\'s content in shape appropriate to set geometry.'
- if not self._drawable:
- return False
- self.tainted = False
- return True
-
-
-class _ScrollableWidget(_Widget, ABC):
- _history_idx: int
-
- def __init__(self, write: Callable[..., None], **kwargs) -> None:
- super().__init__(**kwargs)
- self._write = write
- self._history: list[str] = []
-
- def append(self, to_append: str) -> None:
- 'Append to scrollable history.'
- self._history += [to_append]
-
- @abstractmethod
- def _scroll(self, up=True) -> None:
- self.tainted = True
-
- def cmd__scroll(self, direction: str) -> None:
- 'Scroll through stored content/history.'
- self._scroll(up=direction == 'up')
-
-
-class _LogWidget(_ScrollableWidget):
- _view_size: _YX
- _y_pgscroll: int
-
- def __init__(self, wrap: Callable[[str], list[str]], **kwargs
- ) -> None:
- super().__init__(**kwargs)
- self._wrap = wrap
- self._wrapped_idx = self._history_idx = -1
- self._wrapped: list[tuple[Optional[int], str]] = []
-
- def _add_wrapped(self, idx_original, line) -> int:
- wrapped_lines = self._wrap(line)
- self._wrapped += [(idx_original, line) for line in wrapped_lines]
- return len(wrapped_lines)
-
- def set_geometry(self, measurements: _YX) -> bool:
- if not super().set_geometry(measurements):
- return False
- self._view_size = measurements
- self._y_pgscroll = self._view_size.y // 2
- self._wrapped.clear()
- self._wrapped += [(None, '')] * self._view_size.y
- if not self._history:
- return True
- for idx_history, line in enumerate(self._history):
- self._add_wrapped(idx_history, line)
- wrapped_lines_for_history_idx = [
- t for t in self._wrapped
- if t[0] == len(self._history) + self._history_idx]
- idx_their_last = self._wrapped.index(wrapped_lines_for_history_idx[-1])
- self._wrapped_idx = idx_their_last - len(self._wrapped)
- return True
-
- def append(self, to_append: str) -> None:
- super().append(to_append)
- self.tainted = True
- if self._history_idx < -1:
- self._history_idx -= 1
- if not self._drawable:
- return
- n_wrapped_lines = self._add_wrapped(len(self._history) - 1, to_append)
- if self._wrapped_idx < -1:
- self._wrapped_idx -= n_wrapped_lines
-
- def draw(self) -> bool:
- if not super().draw():
- return False
- start_idx = self._wrapped_idx - self._view_size.y + 1
- end_idx = self._wrapped_idx
- to_write = [t[1] for t in self._wrapped[start_idx:end_idx]]
- if self._wrapped_idx < -1:
- scroll_info = f'vvv [{(-1) * self._wrapped_idx}] '
- scroll_info += 'v' * (self._view_size.x - len(scroll_info))
- to_write += [scroll_info]
- else:
- to_write += [self._wrapped[self._wrapped_idx][1]]
- for i, line in enumerate(to_write):
- self._write(line, i)
- return True
-
- def _scroll(self, up: bool = True) -> None:
- super()._scroll(up)
- if not self._drawable:
- return
- if up:
- self._wrapped_idx = max(self._view_size.y + 1 - len(self._wrapped),
- self._wrapped_idx - self._y_pgscroll)
- else:
- self._wrapped_idx = min(-1,
- self._wrapped_idx + self._y_pgscroll)
- history_idx_to_wrapped_idx = self._wrapped[self._wrapped_idx][0]
- if history_idx_to_wrapped_idx is not None:
- self._history_idx = history_idx_to_wrapped_idx - len(self._history)
-
-
-class _PromptWidget(_ScrollableWidget):
- _y: int
- _width: int
- _history_idx: int = 0
- _input_buffer_unsafe: str
- _cursor_x: int
-
- def __init__(self, **kwargs) -> None:
- super().__init__(**kwargs)
- self.prefix = _PROMPT_TEMPLATE
- self._reset_buffer('')
-
- @property
- def _input_buffer(self) -> str:
- return self._input_buffer_unsafe[:]
-
- @_input_buffer.setter
- def _input_buffer(self, content) -> None:
- self.tainted = True
- self._input_buffer_unsafe = content
-
- def set_geometry(self, measurements: _YX) -> bool:
- if not super().set_geometry(measurements):
- return False
- self._y, self._width = measurements
- return True
-
- def draw(self) -> bool:
- if not super().draw():
- return False
- prefix = self.prefix[:]
- content = self._input_buffer
- if self._cursor_x == len(self._input_buffer):
- content += ' '
- half_width = (self._width - len(prefix)) // 2
- offset = 0
- if len(prefix) + len(content) > self._width\
- and self._cursor_x > half_width:
- prefix += _PROMPT_ELL_IN
- offset = min(len(prefix) + len(content) - self._width,
- self._cursor_x - half_width + len(_PROMPT_ELL_IN))
- cursor_x_to_write = len(prefix) + self._cursor_x - offset
- to_write = f'{prefix}{content[offset:]}'
- if len(to_write) > self._width:
- to_write = (to_write[:self._width - len(_PROMPT_ELL_OUT)]
- + _PROMPT_ELL_OUT)
- self._write(to_write[:cursor_x_to_write], self._y, padding=False)
- self._write(to_write[cursor_x_to_write], attribute='reverse',
- padding=False)
- self._write(to_write[cursor_x_to_write + 1:])
- return True
-
- def _archive_prompt(self) -> None:
- self.append(self._input_buffer)
- self._reset_buffer('')
-
- def _scroll(self, up: bool = True) -> None:
- super()._scroll(up)
- if up and -(self._history_idx) < len(self._history):
- if self._history_idx == 0 and self._input_buffer:
- self._archive_prompt()
- self._history_idx -= 1
- self._history_idx -= 1
- self._reset_buffer(self._history[self._history_idx])
- elif not up:
- if self._history_idx < 0:
- self._history_idx += 1
- if self._history_idx == 0:
- self._reset_buffer('')
- else:
- self._reset_buffer(self._history[self._history_idx])
- elif self._input_buffer:
- self._archive_prompt()
-
- def insert(self, to_insert: str) -> None:
- 'Insert into prompt input buffer.'
- self._cursor_x += len(to_insert)
- self._input_buffer = (self._input_buffer[:self._cursor_x - 1]
- + to_insert
- + self._input_buffer[self._cursor_x - 1:])
- self._history_idx = 0
-
- def cmd__backspace(self) -> None:
- 'Truncate current content by one character, if possible.'
- if self._cursor_x > 0:
- self._cursor_x -= 1
- self._input_buffer = (self._input_buffer[:self._cursor_x]
- + self._input_buffer[self._cursor_x + 1:])
- self._history_idx = 0
-
- def cmd__move_cursor(self, direction: str) -> None:
- 'Move cursor one space into direction ("left" or "right") if possible.'
- if direction == 'left' and self._cursor_x > 0:
- self._cursor_x -= 1
- elif direction == 'right'\
- and self._cursor_x < len(self._input_buffer):
- self._cursor_x += 1
- else:
- return
- self.tainted = True
-
- def _reset_buffer(self, content: str) -> None:
- self._input_buffer = content
- self._cursor_x = len(self._input_buffer)
-
- def enter(self) -> str:
- 'Return current content while also clearing and then redrawing.'
- to_return = self._input_buffer[:]
- if to_return:
- self._archive_prompt()
- return to_return
-
-
-class _Window(_Widget):
- _y_status: int
- prompt: _PromptWidget
-
- def __init__(self, idx: int, term: 'Terminal', **kwargs) -> None:
- super().__init__(**kwargs)
- self.idx = idx
- self._term = term
- self.log = _LogWidget(wrap=self._term.wrap, write=self._term.write)
- self.prompt = self.__annotations__['prompt'](write=self._term.write)
- if hasattr(self._term, 'size'):
- self.set_geometry()
-
- def set_geometry(self, _=None) -> bool:
- assert _ is None
- if self._term.size.y < _MIN_HEIGHT or self._term.size.x < _MIN_WIDTH:
- bad_yx = _YX(-1, -1)
- super().set_geometry(bad_yx)
- self.log.set_geometry(bad_yx)
- self.prompt.set_geometry(bad_yx)
- return False
- super().set_geometry(_YX(0, 0))
- self._y_status = self._term.size.y - 2
- self.log.set_geometry(_YX(self._y_status, self._term.size.x))
- self.prompt.set_geometry(_YX(self._term.size.y - 1, self._term.size.x))
- return True
-
- def draw(self) -> bool:
- self._term.clear()
- if not super().draw():
- if self._term.size.x > 0:
- lines = ['']
- for i, c in enumerate('screen too small'):
- if i > 0 and 0 == i % self._term.size.x:
- lines += ['']
- lines[-1] += c
- for y, line in enumerate(lines):
- self._term.write(line, y)
- return False
- idx_box = f'[{self.idx}]'
- status_line = idx_box + '=' * (self._term.size.x - len(idx_box))
- self.log.draw()
- self._term.write(status_line, self._y_status)
- self.prompt.draw()
- return True
-
- def cmd__paste(self) -> None:
- 'Write OSC 52 ? sequence to get encoded clipboard paste into stdin.'
- self._term.write(f'\033{_OSC52_PREFIX.decode()}?{_PASTE_DELIMITER}',
- self._y_status)
- self.tainted = True
-
- def draw_tainted(self) -> None:
- 'Draw tainted parts of self.'
- if self.tainted:
- self.draw()
- return
- for widget in [w for w in (self.log, self.prompt) if w.tainted]:
- widget.draw()
-
-
-@dataclass
-class _KeyboardEvent(TuiEvent, PayloadMixin):
- payload: str
-
- def affect(self, target: 'Tui') -> None:
- if self.payload[0] == _CHAR_RESIZE:
- _SetScreenEvent().affect(target)
- return
- if self.payload in _KEYBINDINGS:
- cmd_data = _KEYBINDINGS[self.payload]
- cmd = target.cmd_name_to_cmd(cmd_data[0])
- if cmd:
- cmd(*cmd_data[1:])
- elif self.payload.startswith(_B64_PREFIX):
- encoded = self.payload[len(_B64_PREFIX):]
- to_paste = ''
- for i, c in enumerate(b64decode(encoded).decode('utf-8')):
- if i > 512:
- break
- if c.isprintable():
- to_paste += c
- elif c.isspace():
- to_paste += ' '
- else:
- to_paste += '#'
- target.window.prompt.insert(to_paste)
- elif len(self.payload) == 1:
- target.window.prompt.insert(self.payload)
- else:
- target.log(f'# ALERT: unknown keyboard input: {self.payload}')
- super().affect(target)
-
-
-class Tui(QueueMixin):
- 'Base for graphical user interface elements.'
-
- def __init__(self, term: 'Terminal', **kwargs) -> None:
- super().__init__(**kwargs)
- self.term = term
- self._window_idx = 0
- self.windows = [_Window(idx=self._window_idx, term=self.term)]
- self._put(_SetScreenEvent())
-
- def cmd_name_to_cmd(self, cmd_name: str) -> Optional[Callable]:
- 'Map cmd_name to executable TUI element method.'
- cmd_name = _CMD_SHORTCUTS.get(cmd_name, cmd_name)
- cmd_parent = self
- while True:
- cmd_name_toks = cmd_name.split('.', maxsplit=1)
- if len(cmd_name_toks) == 1:
- break
- if not hasattr(cmd_parent, cmd_name_toks[0]):
- return None
- cmd_parent = getattr(cmd_parent, cmd_name_toks[0])
- cmd_name = cmd_name_toks[1]
- cmd_name = f'cmd__{cmd_name}'
- if not hasattr(cmd_parent, cmd_name):
- return None
- return getattr(cmd_parent, cmd_name)
-
- @property
- def window(self) -> _Window:
- 'Currently selected _Window.'
- return self.windows[self._window_idx]
-
- def _new_client_window(self, client_id: UUID, chat: str = ''
- ) -> '_ClientWindow':
- new_idx = len(self.windows)
- win = _ClientWindow(idx=new_idx, term=self.term, q_out=self.q_out,
- client_id=client_id, chat=chat)
- self.windows += [win]
- self._switch_window(new_idx)
- return win
-
- def client_wins(self, client_id: UUID) -> list['_ClientWindow']:
- 'All _ClientWindows matching client_id; if none, create one.'
- wins = [win for win in self.windows
- if isinstance(win, _ClientWindow)
- and win.client_id == client_id] # pylint: disable=no-member
- if not wins:
- wins = [self._new_client_window(client_id=client_id)]
- return wins
-
- def client_win(self, client_id: UUID, chat: str = '') -> '_ClientWindow':
- '''That _ClientWindow matching client_id and chat; create if none.
-
- In case of creation, copy prompt prefix from client's first window.
- '''
- client_wins = self.client_wins(client_id)
- candidates = [win for win in client_wins if win.chat == chat]
- if candidates:
- return candidates[0]
- win = self._new_client_window(client_id=client_id, chat=chat)
- if client_wins:
- win.prompt.prefix = client_wins[0].prompt.prefix
- return win
-
- def log(self, msg: str) -> None:
- 'Post msg to active window\'s log.'
- self.window.log.append(msg)
-
- def _switch_window(self, idx: int) -> None:
- self._window_idx = idx
- self.window.draw()
-
- def cmd__connect(self, hostname: str, nickname: str, realname: str
- ) -> None:
- 'Create Client and pass it via NewClientEvent.'
- self._put(NewClientEvent(
- _ClientKnowingTui(
- q_out=self.q_out,
- conn_setup=IrcConnSetup(hostname=hostname, nickname=nickname,
- realname=realname))))
-
- def cmd__prompt_enter(self) -> None:
- 'Get prompt content from .window.prompt.enter, parse to & run command.'
- to_parse = self.window.prompt.enter()
- if not to_parse:
- return
- alert: Optional[str] = None
- if to_parse[0:1] == '/':
- toks = to_parse[1:].split(maxsplit=1)
- alert = f'{toks[0]} unknown'
- cmd = self.cmd_name_to_cmd(toks[0])
- if cmd and cmd.__name__ != stack()[0].function:
- params = signature(cmd).parameters
- n_args_max = len(params)
- n_args_min = len([p for p in params.values()
- if p.default == inspect_empty])
- alert = f'{cmd.__name__} needs between {n_args_min} and '\
- f'{n_args_max} args'
- if len(toks) == 1 and not n_args_min:
- alert = cmd()
- elif len(toks) > 1 and params\
- and n_args_min <= len(toks[1].split()):
- args = []
- while len(toks) > 1 and n_args_max:
- toks = toks[1].split(maxsplit=1)
- args += [toks[0]]
- n_args_max -= 1
- alert = cmd(*args)
- else:
- alert = 'not prefixed by /'
- if alert:
- self.log(f'# ALERT: invalid prompt command: {alert}')
-
- def cmd__quit(self) -> None:
- 'Trigger program exit.'
- self._put(QuitEvent())
-
- def cmd__window(self, towards: str) -> Optional[str]:
- 'Switch window selection.'
- n_windows = len(self.windows)
- if n_windows < 2:
- return 'no alternate window to move into'
- if towards in {'left', 'right'}:
- multiplier = (+1) if towards == 'right' else (-1)
- window_idx = self._window_idx + multiplier
- if not 0 <= window_idx < n_windows:
- window_idx -= multiplier * n_windows
- elif not towards.isdigit():
- return f'neither "left"/"right" nor integer: {towards}'
- else:
- window_idx = int(towards)
- if not 0 <= window_idx < n_windows:
- return f'unavailable window idx: {window_idx}'
- self._switch_window(window_idx)
- return None
-
-
-class Terminal(QueueMixin):
- 'Abstraction of terminal interface.'
- size: _YX
- _cursor_yx_: _YX
-
- def __init__(self, **kwargs) -> None:
- super().__init__(**kwargs)
- self._blessed = BlessedTerminal()
- self._cursor_yx = _YX(0, 0)
-
- @contextmanager
- def setup(self) -> Generator:
- 'Combine multiple contexts into one and run keypress loop.'
- signal(SIGWINCH, lambda *_: self._put(_SetScreenEvent()))
- self.clear()
- with (self._blessed.raw(),
- self._blessed.fullscreen(),
- self._blessed.hidden_cursor(),
- Loop(iterator=self._get_keypresses(), q_out=self.q_out)):
- yield self
-
- @property
- def _cursor_yx(self) -> _YX:
- return self._cursor_yx_
-
- @_cursor_yx.setter
- def _cursor_yx(self, yx: _YX) -> None:
- print(self._blessed.move_yx(yx.y, yx.x), end='')
- self._cursor_yx_ = yx
-
- def calc_geometry(self) -> None:
- '(Re-)calculate .size..'
- self.size = _YX(self._blessed.height, self._blessed.width)
-
- def clear(self) -> None:
- 'Clear terminal.'
- print(self._blessed.clear, end='')
-
- def flush(self) -> None:
- 'Flush terminal.'
- print('', end='', flush=True)
-
- def wrap(self, line: str) -> list[str]:
- 'Wrap line to list of lines fitting into terminal width.'
- return self._blessed.wrap(line, width=self.size.x,
- subsequent_indent=' '*4)
-
- def write(self,
- msg: str = '',
- start_y: Optional[int] = None,
- attribute: Optional[str] = None,
- padding: bool = True
- ) -> None:
- 'Print to terminal, with position, padding to line end, attributes.'
- if start_y is not None:
- self._cursor_yx = _YX(start_y, 0)
- # ._blessed.length can slow down things notably: only use where needed!
- end_x = self._cursor_yx.x + (len(msg) if msg.isascii()
- else self._blessed.length(msg))
- len_padding = self.size.x - end_x
- if len_padding < 0:
- msg = self._blessed.truncate(msg, self.size.x - self._cursor_yx.x)
- elif padding:
- msg += ' ' * len_padding
- end_x = self.size.x
- if attribute:
- msg = getattr(self._blessed, attribute)(msg)
- print(msg, end='')
- self._cursor_yx = _YX(self._cursor_yx.y, end_x)
-
- def _get_keypresses(self) -> Iterator[Optional[_KeyboardEvent]]:
- '''Loop through keypresses from terminal, expand blessed's handling.
-
- Explicitly collect KEY_ESCAPE-modified key sequences, and recognize
- OSC52-prefixed pastables to return the respective base64 code,
- prefixed with _B64_PREFIX.
- '''
- while True:
- to_yield = ''
- ks = self._blessed.inkey(
- timeout=_TIMEOUT_KEYPRESS_LOOP, # how long until yield None,
- esc_delay=0) # incl. until thread dies
- if ks.name != 'KEY_ESCAPE':
- to_yield = f'{ks.name if ks.name else ks}'
- else:
- chars = b''
- while (new_chars := self._blessed.inkey(timeout=0, esc_delay=0
- ).encode('utf-8')):
- chars += new_chars
- len_prefix = len(_OSC52_PREFIX)
- if chars[:len_prefix] == _OSC52_PREFIX:
- to_yield = _B64_PREFIX[:]
- # sometimes, prev .inkey got some or all (including paste
- # delimiter) of the paste code (maybe even more), so first
- # harvest potential remains of chars post prefix …
- caught_delimiter = False
- post_prefix_str = chars[len_prefix:].decode('utf-8')
- for idx, c in enumerate(post_prefix_str):
- if c == _PASTE_DELIMITER:
- caught_delimiter = True
- if (remains := post_prefix_str[idx + 1:]):
- self._blessed.ungetch(remains)
- break
- to_yield += c
- # … before .getch() further until expected delimiter found
- if not caught_delimiter:
- while (c := self._blessed.getch()) != _PASTE_DELIMITER:
- to_yield += c
- else:
- to_yield = 'esc:' + ':'.join([str(int(b)) for b in chars])
- yield _KeyboardEvent(to_yield) if to_yield else None
-
-
-class _ClientWindow(_Window, ClientQueueMixin):
- client_id_name = 'client_id'
-
- def __init__(self, client_id: UUID, chat: str = '', **kwargs) -> None:
- self.client_id = client_id
- self.chat = chat
- super().__init__(**kwargs)
-
- def cmd__disconnect(self, quit_msg: str = 'ircplom says bye') -> None:
- 'Send QUIT command to server.'
- self._cput(SendEvent,
- payload=IrcMessage(verb='QUIT', params=(quit_msg,)))
-
- def cmd__reconnect(self) -> None:
- 'Attempt reconnection.'
- self._cput(InitReconnectEvent)
-
- def cmd__nick(self, new_nick: str) -> None:
- 'Attempt nickname change.'
- self._cput(SendEvent,
- payload=IrcMessage(verb='NICK', params=(new_nick,)))
-
- def cmd__privmsg(self, target: str, msg: str) -> None:
- 'Send chat message msg to target.'
- self._cput(SendEvent, chat=target,
- payload=IrcMessage(verb='PRIVMSG', params=(target, msg)))
-
-
-@dataclass
-class _ClientWindowEvent(TuiEvent, ClientIdMixin):
- chat: str = ''
-
-
-@dataclass
-class _ClientLogEvent(_ClientWindowEvent, PayloadMixin):
- payload: str
-
- def affect(self, target: Tui) -> None:
- if self.chat == CHAT_GLOB:
- for win in target.client_wins(self.client_id):
- win.log.append(self.payload)
- else:
- target.client_win(self.client_id, self.chat
- ).log.append(self.payload)
- super().affect(target)
-
-
-@dataclass
-class _ClientPromptEvent(_ClientWindowEvent, PayloadMixin):
- payload: tuple[str, str]
-
- def affect(self, target: Tui) -> None:
- new_prefix = ((' ' if self.payload[0] else '?')
- + f'{self.payload[1]}{_PROMPT_TEMPLATE}')
- for win in target.client_wins(self.client_id):
- prompt = win.prompt
- prompt.prefix = new_prefix
- prompt.tainted = True
- super().affect(target)
-
-
-class _ClientKnowingTui(Client):
-
- def log(self, msg: str, chat: str = '') -> None:
- self._cput(_ClientLogEvent, chat=chat, payload=msg)
-
- def update_login(self, nick_confirmed: bool, nickname: str = '') -> None:
- super().update_login(nick_confirmed, nickname)
- self._cput(_ClientPromptEvent, payload=(self.nick_confirmed,
- self.conn_setup.nickname))
--- /dev/null
+'Base Terminal and TUI management.'
+# built-ins
+from abc import ABC, abstractmethod
+from base64 import b64decode
+from contextlib import contextmanager
+from dataclasses import dataclass
+from inspect import _empty as inspect_empty, signature, stack
+from signal import SIGWINCH, signal
+from typing import Callable, Generator, Iterator, NamedTuple, Optional
+# requirements.txt
+from blessed import Terminal as BlessedTerminal
+# ourselves
+from ircplom.events import (
+ AffectiveEvent, Loop, PayloadMixin, QueueMixin, QuitEvent)
+# from ircplom.irc_conn import IrcMessage
+
+_MIN_HEIGHT = 4
+_MIN_WIDTH = 32
+
+_TIMEOUT_KEYPRESS_LOOP = 0.5
+_B64_PREFIX = 'b64:'
+_OSC52_PREFIX = b']52;c;'
+_PASTE_DELIMITER = '\007'
+
+PROMPT_TEMPLATE = '> '
+_PROMPT_ELL_IN = '<…'
+_PROMPT_ELL_OUT = '…>'
+
+_CHAR_RESIZE = chr(12)
+_KEYBINDINGS = {
+ 'KEY_BACKSPACE': ('window.prompt.backspace',),
+ 'KEY_ENTER': ('prompt_enter',),
+ 'KEY_LEFT': ('window.prompt.move_cursor', 'left'),
+ 'KEY_RIGHT': ('window.prompt.move_cursor', 'right'),
+ 'KEY_UP': ('window.prompt.scroll', 'up'),
+ 'KEY_DOWN': ('window.prompt.scroll', 'down'),
+ 'KEY_PGUP': ('window.log.scroll', 'up'),
+ 'KEY_PGDOWN': ('window.log.scroll', 'down'),
+ 'esc:91:49:59:51:68': ('window', 'left'),
+ 'esc:91:49:59:51:67': ('window', 'right'),
+ 'KEY_F1': ('window.paste',),
+}
+CMD_SHORTCUTS: dict[str, str] = {}
+
+
+@dataclass
+class TuiEvent(AffectiveEvent):
+ 'To affect TUI, and trigger flushed .draw_tainted on it.'
+
+ def affect(self, target) -> None:
+ target.window.draw_tainted()
+ target.term.flush()
+
+
+@dataclass
+class _SetScreenEvent(TuiEvent):
+
+ def affect(self, target: 'BaseTui') -> None:
+ target.term.calc_geometry()
+ for window in target.windows:
+ window.set_geometry()
+ super().affect(target)
+
+
+class _YX(NamedTuple):
+ y: int
+ x: int
+
+
+class _Widget(ABC):
+
+ @abstractmethod
+ def __init__(self, **kwargs) -> None:
+ super().__init__(**kwargs)
+ self.tainted = True
+ self._drawable = False
+
+ @abstractmethod
+ def set_geometry(self, measurements: _YX) -> bool:
+ 'Update widget\'s measurements, re-generate content where necessary.'
+ self.tainted = True
+ self._drawable = len([m for m in measurements if m < 0]) == 0
+ return self._drawable
+
+ @abstractmethod
+ def draw(self) -> bool:
+ 'Print widget\'s content in shape appropriate to set geometry.'
+ if not self._drawable:
+ return False
+ self.tainted = False
+ return True
+
+
+class _ScrollableWidget(_Widget, ABC):
+ _history_idx: int
+
+ def __init__(self, write: Callable[..., None], **kwargs) -> None:
+ super().__init__(**kwargs)
+ self._write = write
+ self._history: list[str] = []
+
+ def append(self, to_append: str) -> None:
+ 'Append to scrollable history.'
+ self._history += [to_append]
+
+ @abstractmethod
+ def _scroll(self, up=True) -> None:
+ self.tainted = True
+
+ def cmd__scroll(self, direction: str) -> None:
+ 'Scroll through stored content/history.'
+ self._scroll(up=direction == 'up')
+
+
+class _LogWidget(_ScrollableWidget):
+ _view_size: _YX
+ _y_pgscroll: int
+
+ def __init__(self, wrap: Callable[[str], list[str]], **kwargs
+ ) -> None:
+ super().__init__(**kwargs)
+ self._wrap = wrap
+ self._wrapped_idx = self._history_idx = -1
+ self._wrapped: list[tuple[Optional[int], str]] = []
+
+ def _add_wrapped(self, idx_original, line) -> int:
+ wrapped_lines = self._wrap(line)
+ self._wrapped += [(idx_original, line) for line in wrapped_lines]
+ return len(wrapped_lines)
+
+ def set_geometry(self, measurements: _YX) -> bool:
+ if not super().set_geometry(measurements):
+ return False
+ self._view_size = measurements
+ self._y_pgscroll = self._view_size.y // 2
+ self._wrapped.clear()
+ self._wrapped += [(None, '')] * self._view_size.y
+ if not self._history:
+ return True
+ for idx_history, line in enumerate(self._history):
+ self._add_wrapped(idx_history, line)
+ wrapped_lines_for_history_idx = [
+ t for t in self._wrapped
+ if t[0] == len(self._history) + self._history_idx]
+ idx_their_last = self._wrapped.index(wrapped_lines_for_history_idx[-1])
+ self._wrapped_idx = idx_their_last - len(self._wrapped)
+ return True
+
+ def append(self, to_append: str) -> None:
+ super().append(to_append)
+ self.tainted = True
+ if self._history_idx < -1:
+ self._history_idx -= 1
+ if not self._drawable:
+ return
+ n_wrapped_lines = self._add_wrapped(len(self._history) - 1, to_append)
+ if self._wrapped_idx < -1:
+ self._wrapped_idx -= n_wrapped_lines
+
+ def draw(self) -> bool:
+ if not super().draw():
+ return False
+ start_idx = self._wrapped_idx - self._view_size.y + 1
+ end_idx = self._wrapped_idx
+ to_write = [t[1] for t in self._wrapped[start_idx:end_idx]]
+ if self._wrapped_idx < -1:
+ scroll_info = f'vvv [{(-1) * self._wrapped_idx}] '
+ scroll_info += 'v' * (self._view_size.x - len(scroll_info))
+ to_write += [scroll_info]
+ else:
+ to_write += [self._wrapped[self._wrapped_idx][1]]
+ for i, line in enumerate(to_write):
+ self._write(line, i)
+ return True
+
+ def _scroll(self, up: bool = True) -> None:
+ super()._scroll(up)
+ if not self._drawable:
+ return
+ if up:
+ self._wrapped_idx = max(self._view_size.y + 1 - len(self._wrapped),
+ self._wrapped_idx - self._y_pgscroll)
+ else:
+ self._wrapped_idx = min(-1,
+ self._wrapped_idx + self._y_pgscroll)
+ history_idx_to_wrapped_idx = self._wrapped[self._wrapped_idx][0]
+ if history_idx_to_wrapped_idx is not None:
+ self._history_idx = history_idx_to_wrapped_idx - len(self._history)
+
+
+class _PromptWidget(_ScrollableWidget):
+ _y: int
+ _width: int
+ _history_idx: int = 0
+ _input_buffer_unsafe: str
+ _cursor_x: int
+
+ def __init__(self, **kwargs) -> None:
+ super().__init__(**kwargs)
+ self.prefix = PROMPT_TEMPLATE
+ self._reset_buffer('')
+
+ @property
+ def _input_buffer(self) -> str:
+ return self._input_buffer_unsafe[:]
+
+ @_input_buffer.setter
+ def _input_buffer(self, content) -> None:
+ self.tainted = True
+ self._input_buffer_unsafe = content
+
+ def set_geometry(self, measurements: _YX) -> bool:
+ if not super().set_geometry(measurements):
+ return False
+ self._y, self._width = measurements
+ return True
+
+ def draw(self) -> bool:
+ if not super().draw():
+ return False
+ prefix = self.prefix[:]
+ content = self._input_buffer
+ if self._cursor_x == len(self._input_buffer):
+ content += ' '
+ half_width = (self._width - len(prefix)) // 2
+ offset = 0
+ if len(prefix) + len(content) > self._width\
+ and self._cursor_x > half_width:
+ prefix += _PROMPT_ELL_IN
+ offset = min(len(prefix) + len(content) - self._width,
+ self._cursor_x - half_width + len(_PROMPT_ELL_IN))
+ cursor_x_to_write = len(prefix) + self._cursor_x - offset
+ to_write = f'{prefix}{content[offset:]}'
+ if len(to_write) > self._width:
+ to_write = (to_write[:self._width - len(_PROMPT_ELL_OUT)]
+ + _PROMPT_ELL_OUT)
+ self._write(to_write[:cursor_x_to_write], self._y, padding=False)
+ self._write(to_write[cursor_x_to_write], attribute='reverse',
+ padding=False)
+ self._write(to_write[cursor_x_to_write + 1:])
+ return True
+
+ def _archive_prompt(self) -> None:
+ self.append(self._input_buffer)
+ self._reset_buffer('')
+
+ def _scroll(self, up: bool = True) -> None:
+ super()._scroll(up)
+ if up and -(self._history_idx) < len(self._history):
+ if self._history_idx == 0 and self._input_buffer:
+ self._archive_prompt()
+ self._history_idx -= 1
+ self._history_idx -= 1
+ self._reset_buffer(self._history[self._history_idx])
+ elif not up:
+ if self._history_idx < 0:
+ self._history_idx += 1
+ if self._history_idx == 0:
+ self._reset_buffer('')
+ else:
+ self._reset_buffer(self._history[self._history_idx])
+ elif self._input_buffer:
+ self._archive_prompt()
+
+ def insert(self, to_insert: str) -> None:
+ 'Insert into prompt input buffer.'
+ self._cursor_x += len(to_insert)
+ self._input_buffer = (self._input_buffer[:self._cursor_x - 1]
+ + to_insert
+ + self._input_buffer[self._cursor_x - 1:])
+ self._history_idx = 0
+
+ def cmd__backspace(self) -> None:
+ 'Truncate current content by one character, if possible.'
+ if self._cursor_x > 0:
+ self._cursor_x -= 1
+ self._input_buffer = (self._input_buffer[:self._cursor_x]
+ + self._input_buffer[self._cursor_x + 1:])
+ self._history_idx = 0
+
+ def cmd__move_cursor(self, direction: str) -> None:
+ 'Move cursor one space into direction ("left" or "right") if possible.'
+ if direction == 'left' and self._cursor_x > 0:
+ self._cursor_x -= 1
+ elif direction == 'right'\
+ and self._cursor_x < len(self._input_buffer):
+ self._cursor_x += 1
+ else:
+ return
+ self.tainted = True
+
+ def _reset_buffer(self, content: str) -> None:
+ self._input_buffer = content
+ self._cursor_x = len(self._input_buffer)
+
+ def enter(self) -> str:
+ 'Return current content while also clearing and then redrawing.'
+ to_return = self._input_buffer[:]
+ if to_return:
+ self._archive_prompt()
+ return to_return
+
+
+class Window(_Widget):
+ 'Widget filling entire screen with sub-widgets like .prompt, .log.'
+ _y_status: int
+ prompt: _PromptWidget
+
+ def __init__(self, idx: int, term: 'Terminal', **kwargs) -> None:
+ super().__init__(**kwargs)
+ self.idx = idx
+ self._term = term
+ self.log = _LogWidget(wrap=self._term.wrap, write=self._term.write)
+ self.prompt = self.__annotations__['prompt'](write=self._term.write)
+ if hasattr(self._term, 'size'):
+ self.set_geometry()
+
+ def set_geometry(self, _=None) -> bool:
+ assert _ is None
+ if self._term.size.y < _MIN_HEIGHT or self._term.size.x < _MIN_WIDTH:
+ bad_yx = _YX(-1, -1)
+ super().set_geometry(bad_yx)
+ self.log.set_geometry(bad_yx)
+ self.prompt.set_geometry(bad_yx)
+ return False
+ super().set_geometry(_YX(0, 0))
+ self._y_status = self._term.size.y - 2
+ self.log.set_geometry(_YX(self._y_status, self._term.size.x))
+ self.prompt.set_geometry(_YX(self._term.size.y - 1, self._term.size.x))
+ return True
+
+ def draw(self) -> bool:
+ self._term.clear()
+ if not super().draw():
+ if self._term.size.x > 0:
+ lines = ['']
+ for i, c in enumerate('screen too small'):
+ if i > 0 and 0 == i % self._term.size.x:
+ lines += ['']
+ lines[-1] += c
+ for y, line in enumerate(lines):
+ self._term.write(line, y)
+ return False
+ idx_box = f'[{self.idx}]'
+ status_line = idx_box + '=' * (self._term.size.x - len(idx_box))
+ self.log.draw()
+ self._term.write(status_line, self._y_status)
+ self.prompt.draw()
+ return True
+
+ def cmd__paste(self) -> None:
+ 'Write OSC 52 ? sequence to get encoded clipboard paste into stdin.'
+ self._term.write(f'\033{_OSC52_PREFIX.decode()}?{_PASTE_DELIMITER}',
+ self._y_status)
+ self.tainted = True
+
+ def draw_tainted(self) -> None:
+ 'Draw tainted parts of self.'
+ if self.tainted:
+ self.draw()
+ return
+ for widget in [w for w in (self.log, self.prompt) if w.tainted]:
+ widget.draw()
+
+
+@dataclass
+class _KeyboardEvent(TuiEvent, PayloadMixin):
+ payload: str
+
+ def affect(self, target: 'BaseTui') -> None:
+ if self.payload[0] == _CHAR_RESIZE:
+ _SetScreenEvent().affect(target)
+ return
+ if self.payload in _KEYBINDINGS:
+ cmd_data = _KEYBINDINGS[self.payload]
+ cmd = target.cmd_name_to_cmd(cmd_data[0])
+ if cmd:
+ cmd(*cmd_data[1:])
+ elif self.payload.startswith(_B64_PREFIX):
+ encoded = self.payload[len(_B64_PREFIX):]
+ to_paste = ''
+ for i, c in enumerate(b64decode(encoded).decode('utf-8')):
+ if i > 512:
+ break
+ if c.isprintable():
+ to_paste += c
+ elif c.isspace():
+ to_paste += ' '
+ else:
+ to_paste += '#'
+ target.window.prompt.insert(to_paste)
+ elif len(self.payload) == 1:
+ target.window.prompt.insert(self.payload)
+ else:
+ target.log(f'# ALERT: unknown keyboard input: {self.payload}')
+ super().affect(target)
+
+
+class BaseTui(QueueMixin):
+ 'Base for graphical user interface elements.'
+
+ def __init__(self, term: 'Terminal', **kwargs) -> None:
+ super().__init__(**kwargs)
+ self.term = term
+ self._window_idx = 0
+ self.windows = [Window(idx=self._window_idx, term=self.term)]
+ self._put(_SetScreenEvent())
+
+ def cmd_name_to_cmd(self, cmd_name: str) -> Optional[Callable]:
+ 'Map cmd_name to executable TUI element method.'
+ cmd_name = CMD_SHORTCUTS.get(cmd_name, cmd_name)
+ cmd_parent = self
+ while True:
+ cmd_name_toks = cmd_name.split('.', maxsplit=1)
+ if len(cmd_name_toks) == 1:
+ break
+ if not hasattr(cmd_parent, cmd_name_toks[0]):
+ return None
+ cmd_parent = getattr(cmd_parent, cmd_name_toks[0])
+ cmd_name = cmd_name_toks[1]
+ cmd_name = f'cmd__{cmd_name}'
+ if not hasattr(cmd_parent, cmd_name):
+ return None
+ return getattr(cmd_parent, cmd_name)
+
+ @property
+ def window(self) -> Window:
+ 'Currently selected Window.'
+ return self.windows[self._window_idx]
+
+ def log(self, msg: str) -> None:
+ 'Post msg to active window\'s log.'
+ self.window.log.append(msg)
+
+ def _switch_window(self, idx: int) -> None:
+ self._window_idx = idx
+ self.window.draw()
+
+ def cmd__prompt_enter(self) -> None:
+ 'Get prompt content from .window.prompt.enter, parse to & run command.'
+ to_parse = self.window.prompt.enter()
+ if not to_parse:
+ return
+ alert: Optional[str] = None
+ if to_parse[0:1] == '/':
+ toks = to_parse[1:].split(maxsplit=1)
+ alert = f'{toks[0]} unknown'
+ cmd = self.cmd_name_to_cmd(toks[0])
+ if cmd and cmd.__name__ != stack()[0].function:
+ params = signature(cmd).parameters
+ n_args_max = len(params)
+ n_args_min = len([p for p in params.values()
+ if p.default == inspect_empty])
+ alert = f'{cmd.__name__} needs between {n_args_min} and '\
+ f'{n_args_max} args'
+ if len(toks) == 1 and not n_args_min:
+ alert = cmd()
+ elif len(toks) > 1 and params\
+ and n_args_min <= len(toks[1].split()):
+ args = []
+ while len(toks) > 1 and n_args_max:
+ toks = toks[1].split(maxsplit=1)
+ args += [toks[0]]
+ n_args_max -= 1
+ alert = cmd(*args)
+ else:
+ alert = 'not prefixed by /'
+ if alert:
+ self.log(f'# ALERT: invalid prompt command: {alert}')
+
+ def cmd__quit(self) -> None:
+ 'Trigger program exit.'
+ self._put(QuitEvent())
+
+ def cmd__window(self, towards: str) -> Optional[str]:
+ 'Switch window selection.'
+ n_windows = len(self.windows)
+ if n_windows < 2:
+ return 'no alternate window to move into'
+ if towards in {'left', 'right'}:
+ multiplier = (+1) if towards == 'right' else (-1)
+ window_idx = self._window_idx + multiplier
+ if not 0 <= window_idx < n_windows:
+ window_idx -= multiplier * n_windows
+ elif not towards.isdigit():
+ return f'neither "left"/"right" nor integer: {towards}'
+ else:
+ window_idx = int(towards)
+ if not 0 <= window_idx < n_windows:
+ return f'unavailable window idx: {window_idx}'
+ self._switch_window(window_idx)
+ return None
+
+
+class Terminal(QueueMixin):
+ 'Abstraction of terminal interface.'
+ size: _YX
+ _cursor_yx_: _YX
+
+ def __init__(self, **kwargs) -> None:
+ super().__init__(**kwargs)
+ self._blessed = BlessedTerminal()
+ self._cursor_yx = _YX(0, 0)
+
+ @contextmanager
+ def setup(self) -> Generator:
+ 'Combine multiple contexts into one and run keypress loop.'
+ signal(SIGWINCH, lambda *_: self._put(_SetScreenEvent()))
+ self.clear()
+ with (self._blessed.raw(),
+ self._blessed.fullscreen(),
+ self._blessed.hidden_cursor(),
+ Loop(iterator=self._get_keypresses(), q_out=self.q_out)):
+ yield self
+
+ @property
+ def _cursor_yx(self) -> _YX:
+ return self._cursor_yx_
+
+ @_cursor_yx.setter
+ def _cursor_yx(self, yx: _YX) -> None:
+ print(self._blessed.move_yx(yx.y, yx.x), end='')
+ self._cursor_yx_ = yx
+
+ def calc_geometry(self) -> None:
+ '(Re-)calculate .size..'
+ self.size = _YX(self._blessed.height, self._blessed.width)
+
+ def clear(self) -> None:
+ 'Clear terminal.'
+ print(self._blessed.clear, end='')
+
+ def flush(self) -> None:
+ 'Flush terminal.'
+ print('', end='', flush=True)
+
+ def wrap(self, line: str) -> list[str]:
+ 'Wrap line to list of lines fitting into terminal width.'
+ return self._blessed.wrap(line, width=self.size.x,
+ subsequent_indent=' '*4)
+
+ def write(self,
+ msg: str = '',
+ start_y: Optional[int] = None,
+ attribute: Optional[str] = None,
+ padding: bool = True
+ ) -> None:
+ 'Print to terminal, with position, padding to line end, attributes.'
+ if start_y is not None:
+ self._cursor_yx = _YX(start_y, 0)
+ # ._blessed.length can slow down things notably: only use where needed!
+ end_x = self._cursor_yx.x + (len(msg) if msg.isascii()
+ else self._blessed.length(msg))
+ len_padding = self.size.x - end_x
+ if len_padding < 0:
+ msg = self._blessed.truncate(msg, self.size.x - self._cursor_yx.x)
+ elif padding:
+ msg += ' ' * len_padding
+ end_x = self.size.x
+ if attribute:
+ msg = getattr(self._blessed, attribute)(msg)
+ print(msg, end='')
+ self._cursor_yx = _YX(self._cursor_yx.y, end_x)
+
+ def _get_keypresses(self) -> Iterator[Optional[_KeyboardEvent]]:
+ '''Loop through keypresses from terminal, expand blessed's handling.
+
+ Explicitly collect KEY_ESCAPE-modified key sequences, and recognize
+ OSC52-prefixed pastables to return the respective base64 code,
+ prefixed with _B64_PREFIX.
+ '''
+ while True:
+ to_yield = ''
+ ks = self._blessed.inkey(
+ timeout=_TIMEOUT_KEYPRESS_LOOP, # how long until yield None,
+ esc_delay=0) # incl. until thread dies
+ if ks.name != 'KEY_ESCAPE':
+ to_yield = f'{ks.name if ks.name else ks}'
+ else:
+ chars = b''
+ while (new_chars := self._blessed.inkey(timeout=0, esc_delay=0
+ ).encode('utf-8')):
+ chars += new_chars
+ len_prefix = len(_OSC52_PREFIX)
+ if chars[:len_prefix] == _OSC52_PREFIX:
+ to_yield = _B64_PREFIX[:]
+ # sometimes, prev .inkey got some or all (including paste
+ # delimiter) of the paste code (maybe even more), so first
+ # harvest potential remains of chars post prefix …
+ caught_delimiter = False
+ post_prefix_str = chars[len_prefix:].decode('utf-8')
+ for idx, c in enumerate(post_prefix_str):
+ if c == _PASTE_DELIMITER:
+ caught_delimiter = True
+ if (remains := post_prefix_str[idx + 1:]):
+ self._blessed.ungetch(remains)
+ break
+ to_yield += c
+ # … before .getch() further until expected delimiter found
+ if not caught_delimiter:
+ while (c := self._blessed.getch()) != _PASTE_DELIMITER:
+ to_yield += c
+ else:
+ to_yield = 'esc:' + ':'.join([str(int(b)) for b in chars])
+ yield _KeyboardEvent(to_yield) if to_yield else None