#!/usr/bin/env python3
'Attempt at an IRC client.'
-from ircplom.events import EventQueue, ExceptionEvent, QuitEvent
-from ircplom.irc_conn import ConnEvent, InitConnectEvent, IrcConnection
-from ircplom.tui import Terminal
+from queue import SimpleQueue
+from ircplom.events import ExceptionEvent, QuitEvent
+from ircplom.irc_conn import ClientsDb, ClientEvent, NewClientEvent
+from ircplom.tui import Terminal, Tui, TuiEvent
def main_loop() -> None:
'Main execution code / loop.'
- q_to_main: EventQueue = EventQueue()
- connections: set[IrcConnection] = set()
+ q_events: SimpleQueue = SimpleQueue()
+ clients_db: ClientsDb = {}
try:
- with Terminal().context(q_to_main) as term:
+ with Terminal(q_out=q_events).setup() as term:
+ tui = Tui(q_out=q_events, term=term)
while True:
- event = q_to_main.get()
- term.tui.put(event)
+ event = q_events.get()
if isinstance(event, QuitEvent):
break
if isinstance(event, ExceptionEvent):
raise event.payload
- if isinstance(event, InitConnectEvent):
- connections.add(IrcConnection(q_to_main=q_to_main,
- *event.payload))
- elif isinstance(event, ConnEvent):
- event.conn.handle(event)
+ if isinstance(event, TuiEvent):
+ event.affect(tui)
+ elif isinstance(event, NewClientEvent):
+ event.affect(clients_db)
+ elif isinstance(event, ClientEvent):
+ event.affect(clients_db[event.client_id])
finally:
- for conn in connections:
- conn.close()
+ for client in clients_db.values():
+ client.close()
if __name__ == '__main__':
'Event system with event loop.'
-from queue import SimpleQueue as EventQueue, Empty as QueueEmpty
+from abc import abstractmethod, ABC
+from queue import SimpleQueue, Empty as QueueEmpty
from threading import Thread
-from typing import Iterator, Literal, Optional, Self
+from typing import Any, Iterator, Literal, Self
class Event:
'Communication unit between threads.'
+class AffectiveEvent(Event, ABC):
+ 'For Events that are to affect other objects.'
+
+ @abstractmethod
+ def affect(self, target: Any) -> None:
+ 'To be run by main loop on target.'
+
+
class PayloadMixin:
- 'To extend Event with .payload= passed as first argument.'
+ 'Extends with .payload= passed as first argument.'
def __init__(self, payload, **kwargs) -> None:
super().__init__(**kwargs)
class ExceptionEvent(Event, PayloadMixin):
- 'To signal Exception to be handled by receiver.'
+ 'To deliver Exception to main loop for handling.'
payload: Exception
class QuitEvent(Event):
- 'To signal any receiver to exit.'
+ 'To break main loop towards.'
+
+
+class QueueMixin:
+ 'Adds SimpleQueue addressable via ._put(Event).'
+
+ def __init__(self, q_out: SimpleQueue, **kwargs) -> None:
+ self._q_out = q_out
+ super().__init__(**kwargs)
+
+ def _put(self, event: Event) -> None:
+ self._q_out.put(event)
-class BroadcastMixin:
- 'To provide .broadcast via newly assigned ._q_to_main.'
+class Loop(QueueMixin):
+ 'Wraps thread looping over iterator, communicating back via q_out.'
- def __init__(self, q_to_main: EventQueue, **kwargs) -> None:
+ def __init__(self, iterator: Iterator, **kwargs) -> None:
super().__init__(**kwargs)
- self._q_to_main = q_to_main
-
- def broadcast[E: Event](self,
- event_class: type[E],
- *args, **kwargs
- ) -> None:
- "Put event to main loop; if event_class PayloadMixin'd, payload=args."
- if PayloadMixin in event_class.__mro__:
- kwargs['payload'] = args if len(args) > 1 else args[0]
- args = tuple()
- self._q_to_main.put(event_class(*args, **kwargs))
-
-
-class Loop(BroadcastMixin):
- 'Wraps thread looping over input queue, potential bonus iterator.'
-
- def __init__(self,
- q_to_main: EventQueue,
- bonus_iterator: Optional[Iterator] = None,
- **kwargs
- ) -> None:
- super().__init__(q_to_main=q_to_main, **kwargs)
- self._bonus_iterator = bonus_iterator
- self._q_input: EventQueue = EventQueue()
+ self._q_quit: SimpleQueue = SimpleQueue()
+ self._iterator = iterator
self._thread = Thread(target=self._loop, daemon=False)
self._thread.start()
def stop(self) -> None:
- 'Emit QuitEvent to break threaded loop, then wait for break.'
- self.put(QuitEvent())
+ 'Break threaded loop, but wait for it to finish properly.'
+ self._q_quit.put(None)
self._thread.join()
def __enter__(self) -> Self:
self.stop()
return False # re-raise any exception that above ignored
- def put(self, event: Event) -> None:
- 'Send event into thread loop.'
- self._q_input.put(event)
-
- def process_main(self, event: Event) -> bool:
- 'Process event yielded from input queue.'
- if isinstance(event, QuitEvent):
- return False
- return True
-
- def process_bonus(self, yielded: str) -> None:
- 'Process bonus iterator yield.'
-
def _loop(self) -> None:
- 'Loop over input queue and, if provided, bonus iterator.'
try:
while True:
try:
- yield_main = self._q_input.get(
- block=True,
- timeout=0 if self._bonus_iterator else None)
+ self._q_quit.get(block=True, timeout=0)
except QueueEmpty:
pass
else:
- if self.process_main(yield_main) is False:
- break
- if self._bonus_iterator:
- try:
- yield_bonus = next(self._bonus_iterator)
- except StopIteration:
- break
- if yield_bonus:
- self.process_bonus(yield_bonus)
+ break
+ try:
+ it_yield = next(self._iterator)
+ except StopIteration:
+ break
+ if it_yield is not None:
+ self._put(it_yield)
except Exception as e: # pylint: disable=broad-exception-caught
- self.broadcast(ExceptionEvent, e)
+ self._put(ExceptionEvent(e))
'IRC server connection management.'
# built-ins
-from dataclasses import dataclass
+from abc import ABC, abstractmethod
+from getpass import getuser
from socket import socket, gaierror as socket_gaierror
from threading import Thread
from typing import Callable, Iterator, NamedTuple, Optional, Self
+from uuid import uuid4, UUID
# ourselves
-from ircplom.events import BroadcastMixin, Event, Loop, PayloadMixin
+from ircplom.events import (
+ AffectiveEvent, ExceptionEvent, Loop, PayloadMixin, QueueMixin)
-TIMEOUT_LOOP = 0.1
-
+_TIMEOUT_RECV_LOOP = 0.1
_TIMEOUT_CONNECT = 5
_CONN_RECV_BUFSIZE = 1024
_PORT = 6667
(r'\r', '\r'),
(r'\\', '\\'))
+ClientsDb = dict[UUID, 'Client']
+
-class _IrcMessage:
+class IrcMessage:
'Properly structured representation of IRC message as per IRCv3 spec.'
_raw: Optional[str] = None
def __init__(self,
verb: str,
- parameters: Optional[tuple[str, ...]] = None,
+ params: Optional[tuple[str, ...]] = None,
source: str = '',
tags: Optional[dict[str, str]] = None
) -> None:
self.verb: str = verb
- self.parameters: tuple[str, ...] = parameters or tuple()
+ self.params: tuple[str, ...] = params or tuple()
self.source: str = source
self.tags: dict[str, str] = tags or {}
@classmethod
def from_raw(cls, raw_msg: str) -> Self:
- 'Parse raw IRC message line into properly structured _IrcMessage.'
+ 'Parse raw IRC message line into properly structured IrcMessage.'
class _Stage(NamedTuple):
name: str
stages = [_Stage('tags', '@', _parse_tags),
_Stage('source', ':'),
_Stage('verb', None, lambda s: s.upper()),
- _Stage('parameters', None, _split_params)]
+ _Stage('params', None, _split_params)]
harvest = {s.name: '' for s in stages}
idx_stage = 0
stage = None
tag_strs[-1] += f'={val}'
to_combine += ['@' + ';'.join(tag_strs)]
to_combine += [self.verb]
- if self.parameters:
- to_combine += self.parameters[:-1]
- to_combine += [f':{self.parameters[-1]}']
+ if self.params:
+ to_combine += self.params[:-1]
+ to_combine += [f':{self.params[-1]}']
self._raw = ' '.join(to_combine)
return self._raw
-@dataclass
-class LoginNames:
- 'Collects the names needed on server connect for USER, NICK commands.'
- user: str
- nick: str
- real: str
- nick_confirmed: bool = False
-
-
-class ConnMixin:
- 'Collects an IrcConnection at .conn.'
+class ClientIdMixin:
+ 'Collects a Client\'s ID at .client_id.'
- def __init__(self, conn: 'IrcConnection', **kwargs) -> None:
+ def __init__(self, client_id: UUID, **kwargs) -> None:
super().__init__(**kwargs)
- self.conn = conn
+ self.client_id = client_id
-class InitConnectEvent(Event, PayloadMixin):
- 'Event to trigger connection, with payload (host, LoginNames).'
- payload: tuple[str, LoginNames]
+class NewClientEvent(AffectiveEvent, PayloadMixin):
+ 'Put Client .payload into ClientsDb target.'
+ payload: 'Client'
+ def affect(self, target: ClientsDb) -> None:
+ target[self.payload.id_] = self.payload
-class ConnEvent(Event, ConnMixin):
- 'Event with .conn.'
+class ClientEvent(AffectiveEvent, ClientIdMixin):
+ 'To affect Client identified by ClientIdMixin.'
-class _ConnectedEvent(ConnEvent):
- 'Event to signal opening of connection.'
+class _ConnectedEvent(ClientEvent):
-class _DisconnectedEvent(ConnEvent):
- 'Event to signal closing of connection'
+ def affect(self, target: 'Client') -> None:
+ target.send(IrcMessage(verb='USER', params=(getuser(), '0', '*',
+ target.realname)))
+ target.send(IrcMessage(verb='NICK', params=(target.nickname,)))
-class InitReconnectEvent(ConnEvent):
- 'Event to trigger re-opening of connection.'
+class InitReconnectEvent(ClientEvent):
+ 'To trigger re-opening of connection.'
+ def affect(self, target: 'Client') -> None:
+ if target.assumed_open:
+ target.log('ALERT: Reconnect called, but still seem connected, '
+ 'so nothing to do.')
+ else:
+ target.start_connecting()
-class LogConnEvent(ConnEvent, PayloadMixin):
- 'Event to log payload into connection window.'
- payload: str
+class SendEvent(ClientEvent, PayloadMixin):
+ 'To trigger sending of payload to server.'
+ payload: IrcMessage
+
+ def affect(self, target: 'Client') -> None:
+ target.send(self.payload)
-class NickSetEvent(ConnEvent):
- 'Event to signal nickname having been set server-side.'
+class ClientQueueMixin(QueueMixin):
+ 'To QueueMixin adds _cput to extend ._put with client_id= setting.'
+ client_id_name = 'id_'
-class _SendEvent(ConnEvent, PayloadMixin):
- 'Event to trigger sending of payload to server.'
- payload: _IrcMessage
+ def _cput(self, event_class, **kwargs) -> None:
+ self._put(event_class(client_id=getattr(self, self.client_id_name),
+ **kwargs))
-class IrcConnection(BroadcastMixin):
+class Client(ABC, ClientQueueMixin):
'Abstracts socket connection, loop over it, and handling messages from it.'
- def __init__(self,
- hostname: str,
- login: LoginNames,
- **kwargs
+ def __init__(self, hostname: str, nickname: str, realname: str, **kwargs
) -> None:
super().__init__(**kwargs)
+ self.id_ = uuid4()
self._hostname = hostname
- self.login = login
self._socket: Optional[socket] = None
- self._assumed_open = False
- self._recv_loop: Optional[_RecvLoop] = None
- self._start_connecting()
+ self.assumed_open = False
+ self._recv_loop: Optional[Loop] = None
+ self.realname = realname
+ self.update_login(nick_confirmed=False, nickname=nickname)
+ self.start_connecting()
- def _start_connecting(self) -> None:
+ def start_connecting(self) -> None:
+ 'Start thread to initiate connection, from socket to recv loop.'
def connect(self) -> None:
- self._socket = socket()
- self.broadcast(LogConnEvent,
- f'Connecting to {self._hostname} …')
- self._socket.settimeout(_TIMEOUT_CONNECT)
try:
- self._socket.connect((self._hostname, _PORT))
- except (TimeoutError, socket_gaierror) as e:
- self.broadcast(LogConnEvent, f'ALERT: {e}')
- return
- self._socket.settimeout(TIMEOUT_LOOP)
- self._assumed_open = True
- self._recv_loop = _RecvLoop(conn=self,
- q_to_main=self._q_to_main,
- bonus_iterator=self._read_lines())
- self.broadcast(_ConnectedEvent)
+ self._socket = socket()
+ self.log(f'Connecting to {self._hostname} …')
+ self._socket.settimeout(_TIMEOUT_CONNECT)
+ try:
+ self._socket.connect((self._hostname, _PORT))
+ except (TimeoutError, socket_gaierror) as e:
+ self.log(f'ALERT: {e}')
+ return
+ self._socket.settimeout(_TIMEOUT_RECV_LOOP)
+ self.assumed_open = True
+ self._recv_loop = Loop(iterator=self._read_lines(),
+ q_out=self._q_out)
+ self._cput(_ConnectedEvent)
+ except Exception as e: # pylint: disable=broad-exception-caught
+ self._put(ExceptionEvent(e))
Thread(target=connect, daemon=True, args=(self,)).start()
- def broadcast[E: Event](self,
- event_class: type[E],
- *args, **kwargs
- ) -> None:
- 'Broadcast event subclassing ConnEvent, with self as its .conn.'
- super().broadcast(event_class, conn=self, *args, **kwargs)
+ @abstractmethod
+ def log(self, msg: str) -> None:
+ 'Write msg into log, whatever shape that may have.'
- def send(self, verb: str, parameters: tuple[str, ...]) -> None:
- 'Broadcast _SendEvent for _IrcMessage(verb, parameters).'
- self.broadcast(_SendEvent, _IrcMessage(verb, parameters))
+ def send(self, msg: IrcMessage) -> None:
+ 'Send line-separator-delimited message over socket.'
+ if not (self._socket and self.assumed_open):
+ self.log('ALERT: cannot send, assuming connection closed.')
+ return
+ self._socket.sendall(msg.raw.encode('utf-8') + _IRCSPEC_LINE_SEPARATOR)
+ self.log(f'->: {msg.raw}')
- def update_login(self, **kwargs) -> None:
- 'Adapt .login attributes to kwargs, broadcast NickSetEvent.'
- for key, val in kwargs.items():
- setattr(self.login, key, val)
- self.broadcast(NickSetEvent)
+ def update_login(self, nick_confirmed: bool, nickname: str = '') -> None:
+ 'Manage .nickname, .nick_confirmed – useful for subclass extension.'
+ if nickname:
+ self.nickname = nickname
+ self.nick_confirmed = nick_confirmed
def close(self) -> None:
- 'Close both RecvLoop and socket.'
- self._assumed_open = False
+ 'Close both recv Loop and socket.'
+ self.assumed_open = False
self.update_login(nick_confirmed=False)
if self._recv_loop:
self._recv_loop.stop()
self._socket.close()
self._socket = None
- def _read_lines(self) -> Iterator[Optional[str]]:
- 'Receive line-separator-delimited messages from socket.'
+ def _read_lines(self) -> Iterator[Optional['_RecvEvent']]:
assert self._socket is not None
bytes_total = b''
buffer_linesep = b''
buffer_linesep += c_byted
if buffer_linesep == _IRCSPEC_LINE_SEPARATOR:
buffer_linesep = b''
- yield bytes_total.decode('utf-8')
+ yield _RecvEvent(client_id=self.id_,
+ payload=bytes_total.decode('utf-8'))
bytes_total = b''
- def _write_line(self, line: str) -> None:
- 'Send line-separator-delimited message over socket.'
- if not (self._socket and self._assumed_open):
- self.broadcast(LogConnEvent,
- 'ALERT: cannot send, assuming connection closed.')
- return
- self._socket.sendall(line.encode('utf-8') + _IRCSPEC_LINE_SEPARATOR)
-
- def handle(self, event: ConnEvent) -> None:
- 'Process connection-directed Event into further steps.'
- if isinstance(event, InitReconnectEvent):
- if self._assumed_open:
- self.broadcast(LogConnEvent,
- 'ALERT: Reconnect called, but still seem '
- 'connected, so nothing to do.')
- else:
- self._start_connecting()
- elif isinstance(event, _ConnectedEvent):
- # self.send('CAP', ('LS', '302'))
- self.send('USER', (self.login.user, '0', '*', self.login.real))
- self.send('NICK', (self.login.nick,))
- # self.send('CAP', ('LIST',))
- # self.send('CAP', ('END',))
- elif isinstance(event, _DisconnectedEvent):
- self.close()
- elif isinstance(event, _SendEvent):
- self.broadcast(LogConnEvent, f'->: {event.payload.raw}')
- self._write_line(event.payload.raw)
-
-
-class _RecvLoop(Loop, ConnMixin):
- 'Loop to react on messages from server.'
-
- def process_bonus(self, yielded: str) -> None:
- msg = _IrcMessage.from_raw(yielded)
- self.conn.broadcast(LogConnEvent, f'<-: {msg.raw}')
+
+class _RecvEvent(ClientEvent, PayloadMixin):
+ payload: str
+
+ def affect(self, target: Client) -> None:
+ msg = IrcMessage.from_raw(self.payload)
+ target.log(f'<-: {self.payload}')
if msg.verb == 'PING':
- self.conn.send('PONG', (msg.parameters[0],))
- elif msg.verb == 'ERROR'\
- and msg.parameters[0].startswith('Closing link:'):
- self.conn.broadcast(_DisconnectedEvent)
+ target.send(IrcMessage(verb='PONG', params=(msg.params[0],)))
+ if msg.verb == 'ERROR' and msg.params[0].startswith('Closing link:'):
+ target.close()
elif msg.verb in {'001', 'NICK'}:
- self.conn.update_login(nick=msg.parameters[0],
- nick_confirmed=True)
+ target.update_login(nickname=msg.params[0], nick_confirmed=True)
from abc import ABC, abstractmethod
from base64 import b64decode
from contextlib import contextmanager
-from getpass import getuser as getusername
from inspect import _empty as inspect_empty, signature, stack
from signal import SIGWINCH, signal
from typing import Callable, Generator, Iterator, NamedTuple, Optional
+from uuid import UUID
# requirements.txt
from blessed import Terminal as BlessedTerminal
# ourselves
-from ircplom.events import (BroadcastMixin, Event, EventQueue, Loop,
- PayloadMixin, QuitEvent)
+from ircplom.events import (AffectiveEvent, Loop, PayloadMixin, QueueMixin,
+ QuitEvent)
from ircplom.irc_conn import (
- ConnEvent, ConnMixin, InitConnectEvent, InitReconnectEvent,
- LoginNames, LogConnEvent, NickSetEvent, TIMEOUT_LOOP)
+ IrcMessage, Client, ClientIdMixin, ClientQueueMixin,
+ InitReconnectEvent, NewClientEvent, SendEvent)
_MIN_HEIGHT = 4
_MIN_WIDTH = 32
+_TIMEOUT_KEYPRESS_LOOP = 0.5
_B64_PREFIX = 'b64:'
-_OSC52_PREFIX = ']52;c;'
+_OSC52_PREFIX = b']52;c;'
_PASTE_DELIMITER = '\007'
_PROMPT_TEMPLATE = '> '
'KEY_DOWN': ('window.prompt.scroll', 'down'),
'KEY_PGUP': ('window.log.scroll', 'up'),
'KEY_PGDOWN': ('window.log.scroll', 'down'),
- '[91, 49, 59, 51, 68]': ('window', 'left'),
- '[91, 49, 59, 51, 67]': ('window', 'right'),
+ 'esc:91:49:59:51:68': ('window', 'left'),
+ 'esc:91:49:59:51:67': ('window', 'right'),
'KEY_F1': ('window.paste',),
}
_CMD_SHORTCUTS = {
}
-class _LogEvent(Event, PayloadMixin):
- 'Event to trigger writing to current Window\'s LogWidget.'
- payload: str
+class TuiEvent(AffectiveEvent):
+ 'To affect TUI, and trigger flushed .draw_tainted on it.'
+ def affect(self, target: 'Tui') -> None:
+ target.window.draw_tainted()
+ target.term.flush()
-class _SetScreenEvent(Event):
- 'Event to trigger re-configuration of screen sizes.'
+class _SetScreenEvent(TuiEvent):
-class _TuiCmdEvent(Event, PayloadMixin):
- 'Event to trigger call of .cmd__ method in TUI tree.'
- payload: str
+ def affect(self, target: 'Tui') -> None:
+ target.term.calc_geometry()
+ for window in target.windows:
+ window.set_geometry()
+ super().affect(target)
class _YX(NamedTuple):
- '2-dimensional coordinate.'
y: int
x: int
class _Widget(ABC):
- 'Defines most basic TUI object API.'
@abstractmethod
def __init__(self, **kwargs) -> None:
class _ScrollableWidget(_Widget, ABC):
- 'Defines some API shared between _PromptWidget and _LogWidget.'
_history_idx: int
def __init__(self, write: Callable[..., None], **kwargs) -> None:
class _LogWidget(_ScrollableWidget):
- 'Collects line-shaped messages, scrolls and wraps them for display.'
_view_size: _YX
_y_pgscroll: int
class _PromptWidget(_ScrollableWidget):
- 'Keyboard-controlled command input field.'
_y: int
_width: int
_history_idx: int = 0
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
+ self.prefix = _PROMPT_TEMPLATE
self._reset_buffer('')
@property
def draw(self) -> bool:
if not super().draw():
return False
- prefix = self._prompt[:]
+ prefix = self.prefix[:]
content = self._input_buffer
if self._cursor_x == len(self._input_buffer):
content += ' '
self._write(to_write[cursor_x_to_write + 1:])
return True
- @property
- def _prompt(self) -> str:
- return _PROMPT_TEMPLATE
-
def _archive_prompt(self) -> None:
self.append(self._input_buffer)
self._reset_buffer('')
elif self._input_buffer:
self._archive_prompt()
- def cmd__append(self, to_append: str) -> None:
- 'Append to prompt input buffer.'
- self._cursor_x += len(to_append)
+ def insert(self, to_insert: str) -> None:
+ 'Insert into prompt input buffer.'
+ self._cursor_x += len(to_insert)
self._input_buffer = (self._input_buffer[:self._cursor_x - 1]
- + to_append
+ + to_insert
+ self._input_buffer[self._cursor_x - 1:])
self._history_idx = 0
if direction == 'left' and self._cursor_x > 0:
self._cursor_x -= 1
elif direction == 'right'\
- and self._cursor_x <= len(self._input_buffer):
+ and self._cursor_x < len(self._input_buffer):
self._cursor_x += 1
else:
return
return to_return
-class _ConnectionPromptWidget(_PromptWidget, ConnMixin):
- 'PromptWidget with attributes, methods for dealing with an IrcConnection.'
-
- @property
- def _prompt(self) -> str:
- return ((' ' if self.conn.login.nick_confirmed else '?')
- + self.conn.login.nick
- + super()._prompt)
-
-
class _Window(_Widget):
- 'Collects a log and a prompt meant for the same content stream.'
_y_status: int
prompt: _PromptWidget
self.idx = idx
self._term = term
self.log = _LogWidget(wrap=self._term.wrap, write=self._term.write)
- self.prompt = self.__annotations__['prompt'](write=self._term.write,
- **kwargs)
+ self.prompt = self.__annotations__['prompt'](write=self._term.write)
if hasattr(self._term, 'size'):
self.set_geometry()
def cmd__paste(self) -> None:
'Write OSC 52 ? sequence to get encoded clipboard paste into stdin.'
- self._term.write(f'\033{_OSC52_PREFIX}?{_PASTE_DELIMITER}',
+ self._term.write(f'\033{_OSC52_PREFIX.decode()}?{_PASTE_DELIMITER}',
self._y_status)
self.tainted = True
widget.draw()
-class _ConnectionWindow(_Window, ConnMixin):
- 'Window with attributes and methods for dealing with an IrcConnection.'
- prompt: _ConnectionPromptWidget
+class _ClientWindow(_Window, ClientQueueMixin):
+ client_id_name = 'client_id'
+
+ def __init__(self, client_id: UUID, **kwargs) -> None:
+ self.client_id = client_id
+ super().__init__(**kwargs)
def cmd__disconnect(self, quit_msg: str = 'ircplom says bye') -> None:
'Send QUIT command to server.'
- self.conn.send('QUIT', (quit_msg, ))
+ self._cput(SendEvent, payload=IrcMessage(verb='QUIT',
+ params=(quit_msg,)))
def cmd__reconnect(self) -> None:
'Attempt reconnection.'
- self.conn.broadcast(InitReconnectEvent)
+ self._cput(InitReconnectEvent)
def cmd__nick(self, new_nick: str) -> None:
'Attempt nickname change.'
- self.conn.send('NICK', (new_nick, ))
+ self._cput(SendEvent, payload=IrcMessage(verb='NICK',
+ params=(new_nick,)))
-class _KeyboardLoop(Loop, BroadcastMixin):
- 'Loop receiving and translating keyboard events towards main loop.'
+class _KeyboardEvent(TuiEvent, PayloadMixin):
+ payload: str
- def process_bonus(self, yielded: str) -> None:
- if yielded and ord(yielded[0]) == _ORD_CHAR_RESIZE:
- self.broadcast(_SetScreenEvent)
- elif yielded.startswith(_B64_PREFIX):
- encoded = yielded[len(_B64_PREFIX):]
+ def affect(self, target: 'Tui') -> None:
+ if self.payload[0] == _ORD_CHAR_RESIZE:
+ _SetScreenEvent().affect(target)
+ if self.payload in _KEYBINDINGS:
+ cmd_data = _KEYBINDINGS[self.payload]
+ cmd = target.cmd_name_to_cmd(cmd_data[0])
+ if cmd:
+ cmd(*cmd_data[1:])
+ elif self.payload.startswith(_B64_PREFIX):
+ encoded = self.payload[len(_B64_PREFIX):]
to_paste = ''
for i, c in enumerate(b64decode(encoded).decode('utf-8')):
if i > 512:
to_paste += ' '
else:
to_paste += '#'
- self.broadcast(_TuiCmdEvent, ('window.prompt.append', to_paste))
- elif yielded in _KEYBINDINGS:
- self.broadcast(_TuiCmdEvent, _KEYBINDINGS[yielded])
- elif len(yielded) == 1:
- self.broadcast(_TuiCmdEvent, ('window.prompt.append', yielded))
+ target.window.prompt.insert(to_paste)
+ elif len(self.payload) == 1:
+ target.window.prompt.insert(self.payload)
else:
- self.broadcast(_LogEvent,
- f'ALERT: unknown keyboard input: {yielded}')
+ target.log(f'ALERT: unknown keyboard input: {self.payload}')
+ super().affect(target)
-class _TuiLoop(Loop, BroadcastMixin):
- 'Loop for drawing/updating TUI.'
+class Tui(QueueMixin):
+ 'Base for graphical user interface elements.'
def __init__(self, term: 'Terminal', **kwargs) -> None:
- self._term = term
- self._windows = [_Window(0, self._term)]
- self._window_idx = 0
- self._conn_windows: list[_ConnectionWindow] = []
super().__init__(**kwargs)
- self.put(_SetScreenEvent())
+ self.term = term
+ self._window_idx = 0
+ self.windows = [_Window(idx=self._window_idx, term=self.term)]
+ self._put(_SetScreenEvent())
- def _cmd_name_to_cmd(self, cmd_name: str) -> Optional[Callable]:
+ def cmd_name_to_cmd(self, cmd_name: str) -> Optional[Callable]:
+ 'Map cmd_name to executable TUI element method.'
cmd_name = _CMD_SHORTCUTS.get(cmd_name, cmd_name)
cmd_parent = self
while True:
return None
return getattr(cmd_parent, cmd_name)
- def process_main(self, event: Event) -> bool:
- if not super().process_main(event):
- return False
- if isinstance(event, _SetScreenEvent):
- self._term.calc_geometry()
- for window in self._windows:
- window.set_geometry()
- elif isinstance(event, _LogEvent):
- self.window.log.append(event.payload)
- elif isinstance(event, _TuiCmdEvent):
- cmd = self._cmd_name_to_cmd(event.payload[0])
- assert cmd is not None
- cmd(*event.payload[1:])
- elif isinstance(event, ConnEvent):
- matching_wins = [cw for cw in self._conn_windows
- if cw.conn == event.conn]
- if matching_wins:
- conn_win = matching_wins[0]
- else:
- conn_win = _ConnectionWindow(idx=len(self._windows),
- conn=event.conn,
- term=self._term)
- self._windows += [conn_win]
- self._conn_windows += [conn_win]
- self._switch_window(conn_win.idx)
- if isinstance(event, LogConnEvent):
- conn_win.log.append(event.payload)
- elif isinstance(event, NickSetEvent):
- conn_win.prompt.tainted = True
- else:
- return True
- self.window.draw_tainted()
- self._term.flush()
- return True
-
@property
def window(self) -> _Window:
- 'Currently selected Window.'
- return self._windows[self._window_idx]
+ 'Currently selected _Window.'
+ return self.windows[self._window_idx]
+
+ def log(self, msg: str) -> None:
+ 'Post msg to active window\'s log.'
+ self.window.log.append(msg)
def _switch_window(self, idx: int) -> None:
self._window_idx = idx
def cmd__connect(self, hostname: str, nickname: str, realname: str
) -> None:
- 'Broadcast InitConnectEvent.'
- login = LoginNames(user=getusername(), nick=nickname, real=realname)
- self.broadcast(InitConnectEvent, (hostname, login))
+ 'Create Client and pass it via NewClientEvent.'
+ client = _ClientKnowingTui(q_out=self._q_out, hostname=hostname,
+ nickname=nickname, realname=realname)
+ new_idx = len(self.windows)
+ self.windows += [_ClientWindow(idx=new_idx, term=self.term,
+ q_out=self._q_out,
+ client_id=client.id_)]
+ self._switch_window(new_idx)
+ self._put(NewClientEvent(client))
def cmd__prompt_enter(self) -> None:
'Get prompt content from .window.prompt.enter, parse to & run command.'
if to_parse[0:1] == '/':
toks = to_parse[1:].split(maxsplit=1)
alert = f'{toks[0]} unknown'
- cmd = self._cmd_name_to_cmd(toks[0])
+ cmd = self.cmd_name_to_cmd(toks[0])
if cmd and cmd.__name__ != stack()[0].function:
params = signature(cmd).parameters
n_args_max = len(params)
else:
alert = 'not prefixed by /'
if alert:
- self.broadcast(_LogEvent, f'invalid prompt command: {alert}')
+ self.log(f'invalid prompt command: {alert}')
def cmd__quit(self) -> None:
- 'Send QUIT to all threads.'
- self.broadcast(QuitEvent)
+ 'Trigger program exit.'
+ self._put(QuitEvent())
def cmd__window(self, towards: str) -> Optional[str]:
'Switch window selection.'
- n_windows = len(self._windows)
+ n_windows = len(self.windows)
if n_windows < 2:
return 'no alternate window to move into'
if towards in {'left', 'right'}:
return None
-class Terminal:
+class Terminal(QueueMixin):
'Abstraction of terminal interface.'
size: _YX
- tui: _TuiLoop
- _blessed: BlessedTerminal
_cursor_yx_: _YX
- @contextmanager
- def context(self, q_to_main: EventQueue) -> Generator:
- 'Combine multiple contexts into one.'
- signal(SIGWINCH, lambda *_: q_to_main.put(_SetScreenEvent()))
+ def __init__(self, **kwargs) -> None:
+ super().__init__(**kwargs)
self._blessed = BlessedTerminal()
+ self._cursor_yx = _YX(0, 0)
+
+ @contextmanager
+ def setup(self) -> Generator:
+ 'Combine multiple contexts into one and run keypress loop.'
+ signal(SIGWINCH, lambda *_: self._put(_SetScreenEvent()))
+ self.clear()
with (self._blessed.raw(),
self._blessed.fullscreen(),
self._blessed.hidden_cursor(),
- _KeyboardLoop(q_to_main, self.get_keypresses())):
- self._cursor_yx = _YX(0, 0)
- with _TuiLoop(self, q_to_main=q_to_main) as self.tui:
- yield self
+ Loop(iterator=self._get_keypresses(), q_out=self._q_out)):
+ yield self
@property
def _cursor_yx(self) -> _YX:
print(msg, end='')
self._cursor_yx = _YX(self._cursor_yx.y, end_x)
- def get_keypresses(self) -> Iterator[str]:
- '''Loop through keypresses from terminal, collect what blessed ignores.
+ def _get_keypresses(self) -> Iterator[Optional[_KeyboardEvent]]:
+ '''Loop through keypresses from terminal, expand blessed's handling.
- (Notably, blessed seems to junk any alt/escape-modifide key events it
- does not explicitly know.
+ Explicitly collect KEY_ESCAPE-modified key sequences, and recognize
+ OSC52-prefixed pastables to return the respective base64 code,
+ prefixed with _B64_PREFIX.
'''
- n_gotchs_unprocessed = 0
while True:
- new_gotchs = []
- if not n_gotchs_unprocessed:
- while self._blessed.kbhit(TIMEOUT_LOOP):
- gotch = self._blessed.getch()
- self._blessed.ungetch(gotch)
- new_gotchs += [gotch]
- if not self._blessed.kbhit(0):
- break
- n_gotchs_unprocessed += len(new_gotchs)
- blessed_key = self._blessed.inkey(timeout=0, esc_delay=0)
- n_chs_blessed_key = len(blessed_key.encode('utf-8'))
- unhandleds = []
- if len(new_gotchs) > 1 and blessed_key.name == 'KEY_ESCAPE':
- for _ in range(len(new_gotchs) - n_chs_blessed_key):
- unhandled = self._blessed.inkey(timeout=0, esc_delay=0)
- unhandleds += list(unhandled.encode('utf-8'))
- n_gotchs_unprocessed -= 1
- n_gotchs_unprocessed -= n_chs_blessed_key
- if unhandleds:
- fused = ''.join([chr(n) for n in unhandleds])
- if fused.startswith(_OSC52_PREFIX):
- if not (encoded := fused[len(_OSC52_PREFIX):]):
- while True:
- gotch = self._blessed.getch()
- if gotch == _PASTE_DELIMITER:
- break
- encoded += gotch
- yield f'{_B64_PREFIX}{encoded}'
- continue
- yield str(unhandleds)
- elif blessed_key.name:
- yield blessed_key.name
+ to_yield = ''
+ ks = self._blessed.inkey(
+ timeout=_TIMEOUT_KEYPRESS_LOOP, # how long until yield None,
+ esc_delay=0) # incl. until thread dies
+ if ks.name != 'KEY_ESCAPE':
+ to_yield = f'{ks.name if ks.name else ks}'
else:
- yield str(blessed_key)
+ chars = b''
+ while (new_chars := self._blessed.inkey(timeout=0, esc_delay=0
+ ).encode('utf-8')):
+ chars += new_chars
+ if chars[:len(_OSC52_PREFIX)] == _OSC52_PREFIX:
+ to_yield = _B64_PREFIX[:]
+ while (gotch := self._blessed.getch()) != _PASTE_DELIMITER:
+ to_yield += gotch
+ else:
+ to_yield = 'esc:' + ':'.join([str(int(b)) for b in chars])
+ yield _KeyboardEvent(to_yield) if to_yield else None
+
+
+class _ClientWindowEvent(TuiEvent, ClientIdMixin):
+
+ def client_win(self, target: Tui) -> _ClientWindow:
+ 'Identifies proper _ClientWindow in target TUI.'
+ return [win for win in target.windows
+ if isinstance(win, _ClientWindow)
+ and win.client_id == self.client_id][0]
+
+
+class _ClientLogEvent(_ClientWindowEvent, PayloadMixin):
+ payload: str
+
+ def affect(self, target: Tui) -> None:
+ self.client_win(target).log.append(self.payload)
+ super().affect(target)
+
+
+class _ClientPromptEvent(_ClientWindowEvent, PayloadMixin):
+ payload: tuple[str, str]
+
+ def affect(self, target: Tui) -> None:
+ prompt = self.client_win(target).prompt
+ prompt.prefix = ((' ' if self.payload[0] else '?')
+ + f'{self.payload[1]}{_PROMPT_TEMPLATE}')
+ prompt.tainted = True
+ super().affect(target)
+
+
+class _ClientKnowingTui(Client):
+
+ def log(self, msg: str) -> None:
+ self._cput(_ClientLogEvent, payload=msg)
+
+ def update_login(self, nick_confirmed: bool, nickname: str = '') -> None:
+ super().update_login(nick_confirmed, nickname)
+ self._cput(_ClientPromptEvent, payload=(self.nick_confirmed,
+ self.nickname))