--- /dev/null
+'IRC server connection management.'
+# built-ins
+from socket import socket, gaierror as socket_gaierror
+from threading import Thread
+from typing import Any, Callable, Iterator, NamedTuple, Optional, Self
+# ourselves
+from ircplom.events import Event, EventQueue, EventType, Loop
+
+
+TIMEOUT_LOOP = 0.1
+
+_TIMEOUT_CONNECT = 5
+_CONN_RECV_BUFSIZE = 1024
+_PORT = 6667
+
+_IRCSPEC_LINE_SEPARATOR = b'\r\n'
+_IRCSPEC_TAG_ESCAPES = ((r'\:', ';'),
+ (r'\s', ' '),
+ (r'\n', '\n'),
+ (r'\r', '\r'),
+ (r'\\', '\\'))
+
+
+class LoginNames(NamedTuple):
+ 'Collects the names needed on server connect for USER, NICK commands.'
+ user: str
+ nick: str
+ real: str
+
+
+class IrcConnection:
+ 'Abstracts socket connection, loop over it, and handling messages from it.'
+
+ def __init__(self,
+ q_to_main: EventQueue,
+ idx: int,
+ hostname: str,
+ login: LoginNames,
+ ) -> None:
+ self._idx = idx
+ self._q_to_main = q_to_main
+ self._hostname = hostname
+ self._login = login
+ self._socket: Optional[socket] = None
+ self._assumed_open = False
+ self._loop: Optional[_ConnectionLoop] = None
+ self._broadcast(EventType.CONN_WINDOW, self._login.nick)
+ self._start_connecting()
+
+ def _start_connecting(self) -> None:
+
+ def connect(self) -> None:
+ self._socket = socket()
+ self._broadcast(EventType.CONN_ALERT,
+ f'Connecting to {self._hostname} …')
+ self._socket.settimeout(_TIMEOUT_CONNECT)
+ try:
+ self._socket.connect((self._hostname, _PORT))
+ except (TimeoutError, socket_gaierror) as e:
+ self._broadcast(EventType.CONN_ALERT, str(e))
+ return
+ self._socket.settimeout(TIMEOUT_LOOP)
+ self._assumed_open = True
+ self._loop = _ConnectionLoop(self._idx, self._q_to_main,
+ self._read_lines())
+ self._broadcast(EventType.CONNECTED, self._login)
+
+ Thread(target=connect, daemon=True, args=(self,)).start()
+
+ def close(self) -> None:
+ 'Close both _ConnectionLoop and socket.'
+ self._assumed_open = False
+ if self._loop:
+ self._loop.stop()
+ self._loop = None
+ if self._socket:
+ self._socket.close()
+ self._socket = None
+
+ def _broadcast(self, type_: EventType, payload: Any = None) -> None:
+ 'Send event to main loop via queue, with connection index as 1st arg.'
+ self._q_to_main.eput(type_, (self._idx, payload))
+
+ def _read_lines(self) -> Iterator[Optional[str]]:
+ 'Receive line-separator-delimited messages from socket.'
+ assert self._socket is not None
+ bytes_total = b''
+ buffer_linesep = b''
+ while True:
+ try:
+ bytes_new = self._socket.recv(_CONN_RECV_BUFSIZE)
+ except TimeoutError:
+ yield None
+ continue
+ except OSError as e:
+ if e.errno == 9:
+ break
+ raise e
+ if not bytes_new:
+ break
+ for c in bytes_new:
+ c_byted = c.to_bytes()
+ if c not in _IRCSPEC_LINE_SEPARATOR:
+ bytes_total += c_byted
+ buffer_linesep = b''
+ elif c == _IRCSPEC_LINE_SEPARATOR[0]:
+ buffer_linesep = c_byted
+ else:
+ buffer_linesep += c_byted
+ if buffer_linesep == _IRCSPEC_LINE_SEPARATOR:
+ buffer_linesep = b''
+ yield bytes_total.decode('utf-8')
+ bytes_total = b''
+
+ def _write_line(self, line: str) -> None:
+ 'Send line-separator-delimited message over socket.'
+ if not (self._socket and self._assumed_open):
+ self._broadcast(EventType.CONN_ALERT,
+ 'cannot send, assuming connection closed')
+ return
+ self._socket.sendall(line.encode('utf-8') + _IRCSPEC_LINE_SEPARATOR)
+
+ def handle(self, event: Event) -> None:
+ 'Process connection-directed Event into further steps.'
+ if event.type_ == EventType.INIT_RECONNECT:
+ if self._assumed_open:
+ self._broadcast(EventType.CONN_ALERT,
+ 'Reconnect called, but still seem connected, '
+ 'so nothing to do.')
+ else:
+ self._start_connecting()
+ elif event.type_ == EventType.CONNECTED:
+ assert self._loop is not None
+ self._loop.put(event)
+ elif event.type_ == EventType.DISCONNECTED:
+ self.close()
+ elif event.type_ == EventType.SEND:
+ msg: IrcMessage = event.payload[1]
+ self._write_line(msg.raw)
+
+
+class IrcMessage:
+ 'Properly structured representation of IRC message as per IRCv3 spec.'
+ _raw: Optional[str] = None
+
+ def __init__(self,
+ verb: str,
+ parameters: Optional[tuple[str, ...]] = None,
+ source: str = '',
+ tags: Optional[dict[str, str]] = None
+ ) -> None:
+ self.verb: str = verb
+ self.parameters: tuple[str, ...] = parameters or tuple()
+ self.source: str = source
+ self.tags: dict[str, str] = tags or {}
+
+ @classmethod
+ def from_raw(cls, raw_msg: str) -> Self:
+ 'Parse raw IRC message line into properly structured IrcMessage.'
+
+ class _Stage(NamedTuple):
+ name: str
+ prefix_char: Optional[str]
+ processor: Callable = lambda s: s
+
+ def _parse_tags(str_tags: str) -> dict[str, str]:
+ tags = {}
+ for str_tag in [s for s in str_tags.split(';') if s]:
+ if '=' in str_tag:
+ key, val = str_tag.split('=', maxsplit=1)
+ for to_repl, repl_with in _IRCSPEC_TAG_ESCAPES:
+ val = val.replace(to_repl, repl_with)
+ else:
+ key, val = str_tag, ''
+ tags[key] = val
+ return tags
+
+ def _split_params(str_params: str) -> tuple[str, ...]:
+ params = []
+ params_stage = 0 # 0: gap, 1: non-trailing, 2: trailing
+ for char in str_params:
+ if char == ' ' and params_stage < 2:
+ params_stage = 0
+ continue
+ if params_stage == 0:
+ params += ['']
+ params_stage += 1
+ if char == ':':
+ params_stage += 1
+ continue
+ params[-1] += char
+ return tuple(p for p in params)
+
+ stages = [_Stage('tags', '@', _parse_tags),
+ _Stage('source', ':'),
+ _Stage('verb', None, lambda s: s.upper()),
+ _Stage('parameters', None, _split_params)]
+ harvest = {s.name: '' for s in stages}
+ idx_stage = 0
+ stage = None
+ for char in raw_msg:
+ if char == ' ' and idx_stage < (len(stages) - 1):
+ if stage:
+ stage = None
+ continue
+ if not stage:
+ while not stage:
+ idx_stage += 1
+ tested = stages[idx_stage]
+ if (not tested.prefix_char) or char == tested.prefix_char:
+ stage = tested
+ if stage.prefix_char:
+ continue
+ harvest[stage.name] += char
+ msg = cls(**{s.name: s.processor(harvest[s.name]) for s in stages})
+ msg._raw = raw_msg
+ return msg
+
+ @property
+ def raw(self) -> str:
+ 'Return raw message code – create from known fields if necessary.'
+ if not self._raw:
+ to_combine = []
+ if self.tags:
+ tag_strs = []
+ for key, val in self.tags.items():
+ tag_strs += [key]
+ if not val:
+ continue
+ for repl_with, to_repl in reversed(_IRCSPEC_TAG_ESCAPES):
+ val = val.replace(to_repl, repl_with)
+ tag_strs[-1] += f'={val}'
+ to_combine += ['@' + ';'.join(tag_strs)]
+ to_combine += [self.verb]
+ if self.parameters:
+ to_combine += self.parameters[:-1]
+ to_combine += [f':{self.parameters[-1]}']
+ self._raw = ' '.join(to_combine)
+ return self._raw
+
+
+class _ConnectionLoop(Loop):
+ 'Loop receiving and translating socket messages towards main loop.'
+
+ def __init__(self, connection_idx: int, *args, **kwargs) -> None:
+ self._conn_idx = connection_idx
+ super().__init__(*args, **kwargs)
+
+ def _broadcast_conn(self, type_: EventType, *args) -> None:
+ self.broadcast(type_, (self._conn_idx, *args))
+
+ def _send(self, verb: str, parameters: tuple[str, ...]) -> None:
+ self._broadcast_conn(EventType.SEND, IrcMessage(verb, parameters))
+
+ def process_main(self, event: Event) -> bool:
+ if not super().process_main(event):
+ return False
+ if event.type_ == EventType.CONNECTED:
+ login = event.payload[1]
+ self._send('USER', (login.user, '0', '*', login.real))
+ self._send('NICK', (login.nick,))
+ return True
+
+ def process_bonus(self, yielded: str) -> None:
+ msg = IrcMessage.from_raw(yielded)
+ self._broadcast_conn(EventType.RECV, msg)
+ if msg.verb == 'PING':
+ self._send('PONG', (msg.parameters[0],))
+ elif msg.verb == 'ERROR'\
+ and msg.parameters[0].startswith('Closing link:'):
+ self._broadcast_conn(EventType.DISCONNECTED)
+ elif msg.verb == '001':
+ self._broadcast_conn(EventType.NICK_SET, msg.parameters[0])
--- /dev/null
+'Terminal and TUI management.'
+# built-ins
+from abc import ABC, abstractmethod
+from base64 import b64decode
+from contextlib import contextmanager
+from getpass import getuser as getusername
+from inspect import _empty as inspect_empty, signature, stack
+from signal import SIGWINCH, signal
+from typing import Any, Callable, Generator, Iterator, NamedTuple, Optional
+# requirements.txt
+from blessed import Terminal as BlessedTerminal
+# ourselves
+from ircplom.events import Event, EventType, EventQueue, Loop
+from ircplom.irc_conn import IrcMessage, LoginNames, TIMEOUT_LOOP
+
+
+_B64_PREFIX = 'b64:'
+_OSC52_PREFIX = ']52;c;'
+_PASTE_DELIMITER = '\007'
+
+_PROMPT_TEMPLATE = '> '
+_PROMPT_ELL_IN = '<…'
+_PROMPT_ELL_OUT = '…>'
+
+_KEYBINDINGS = {
+ 'KEY_BACKSPACE': ('window.prompt.backspace',),
+ 'KEY_ENTER': ('prompt_enter',),
+ 'KEY_LEFT': ('window.prompt.move_cursor', 'left'),
+ 'KEY_RIGHT': ('window.prompt.move_cursor', 'right'),
+ 'KEY_UP': ('window.prompt.scroll', 'up'),
+ 'KEY_DOWN': ('window.prompt.scroll', 'down'),
+ 'KEY_PGUP': ('window.log.scroll', 'up'),
+ 'KEY_PGDOWN': ('window.log.scroll', 'down'),
+ '[91, 49, 59, 51, 68]': ('window', 'left'),
+ '[91, 49, 59, 51, 67]': ('window', 'right'),
+ 'KEY_F1': ('window.paste',),
+}
+CMD_SHORTCUTS = {
+ 'disconnect': 'window.disconnect',
+ 'reconnect': 'window.reconnect'
+}
+
+
+class _YX(NamedTuple):
+ '2-dimensional coordinate.'
+ y: int
+ x: int
+
+
+class _Widget(ABC):
+ 'Defines most basic TUI object API.'
+
+ @abstractmethod
+ def set_geometry(self, measurements: _YX) -> None:
+ 'Update widget\'s measurements, re-generate content where necessary.'
+
+ @abstractmethod
+ def draw(self) -> None:
+ 'Print widget\'s content in shape appropriate to set geometry.'
+
+
+class _ScrollableWidget(_Widget, ABC):
+ 'Defines some API shared between _PromptWidget and _LogWidget.'
+ _history_idx: int
+
+ def __init__(self, write: Callable[..., None], *args, **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+ self._write = write
+ self._history: list[str] = []
+
+ @abstractmethod
+ def append(self, to_append: str) -> None:
+ 'Append to widget content.'
+
+ @abstractmethod
+ def _scroll(self, up=True) -> None:
+ pass
+
+ def cmd__scroll(self, direction: str) -> None:
+ 'Scroll through stored content/history.'
+ self._scroll(up=direction == 'up')
+ self.draw()
+
+
+class _LogWidget(_ScrollableWidget):
+ 'Collects line-shaped messages, scrolls and wraps them for display.'
+ _view_size: _YX
+ _y_pgscroll: int
+
+ def __init__(self, wrap: Callable[[str], list[str]], *args, **kwargs
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self._wrap = wrap
+ self._wrapped_idx = self._history_idx = -1
+ self._wrapped: list[tuple[Optional[int], str]] = []
+
+ def _add_wrapped(self, idx_original, line) -> int:
+ wrapped_lines = self._wrap(line)
+ self._wrapped += [(idx_original, line) for line in wrapped_lines]
+ return len(wrapped_lines)
+
+ def set_geometry(self, measurements: _YX) -> None:
+ self._view_size = measurements
+ self._y_pgscroll = self._view_size.y // 2
+ self._wrapped.clear()
+ self._wrapped += [(None, '')] * self._view_size.y
+ if not self._history:
+ return
+ for idx_history, line in enumerate(self._history):
+ self._add_wrapped(idx_history, line)
+ wrapped_lines_for_history_idx = [
+ t for t in self._wrapped
+ if t[0] == len(self._history) + self._history_idx]
+ idx_their_last = self._wrapped.index(wrapped_lines_for_history_idx[-1])
+ self._wrapped_idx = idx_their_last - len(self._wrapped)
+
+ def append(self, to_append: str) -> None:
+ self._history += [to_append]
+ n_wrapped_lines = self._add_wrapped(len(self._history) - 1, to_append)
+ if self._wrapped_idx < -1:
+ self._history_idx -= 1
+ self._wrapped_idx -= n_wrapped_lines
+
+ def draw(self) -> None:
+ start_idx = self._wrapped_idx - self._view_size.y + 1
+ end_idx = self._wrapped_idx
+ to_write = [t[1] for t in self._wrapped[start_idx:end_idx]]
+ if self._wrapped_idx < -1:
+ scroll_info = f'vvv [{(-1) * self._wrapped_idx}] '
+ scroll_info += 'v' * (self._view_size.x - len(scroll_info))
+ to_write += [scroll_info]
+ else:
+ to_write += [self._wrapped[self._wrapped_idx][1]]
+ for i, line in enumerate(to_write):
+ self._write(line, i)
+
+ def _scroll(self, up: bool = True) -> None:
+ if up:
+ self._wrapped_idx = max(self._view_size.y + 1 - len(self._wrapped),
+ self._wrapped_idx - self._y_pgscroll)
+ else:
+ self._wrapped_idx = min(-1,
+ self._wrapped_idx + self._y_pgscroll)
+ history_idx_to_wrapped_idx = self._wrapped[self._wrapped_idx][0]
+ if history_idx_to_wrapped_idx is not None:
+ self._history_idx = history_idx_to_wrapped_idx - len(self._history)
+
+
+class _PromptWidget(_ScrollableWidget):
+ 'Keyboard-controlled command input field.'
+ _y: int
+ _width: int
+ _prompt: str = _PROMPT_TEMPLATE
+ _history_idx = 0
+ _input_buffer: str
+ _cursor_x: int
+
+ def __init__(self, *args, **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+ self._reset_buffer('')
+
+ def set_geometry(self, measurements: _YX) -> None:
+ self._y, self._width = measurements
+
+ def append(self, to_append: str) -> None:
+ self._cursor_x += len(to_append)
+ self._input_buffer = (self._input_buffer[:self._cursor_x - 1]
+ + to_append
+ + self._input_buffer[self._cursor_x - 1:])
+ self._history_idx = 0
+ self.draw()
+
+ def draw(self) -> None:
+ prefix = self._prompt[:]
+ content = self._input_buffer[:]
+ if self._cursor_x == len(self._input_buffer):
+ content += ' '
+ half_width = (self._width - len(prefix)) // 2
+ offset = 0
+ if len(prefix) + len(content) > self._width\
+ and self._cursor_x > half_width:
+ prefix += _PROMPT_ELL_IN
+ offset = min(len(prefix) + len(content) - self._width,
+ self._cursor_x - half_width + len(_PROMPT_ELL_IN))
+ cursor_x_to_write = len(prefix) + self._cursor_x - offset
+ to_write = f'{prefix}{content[offset:]}'
+ if len(to_write) > self._width:
+ to_write = (to_write[:self._width - len(_PROMPT_ELL_OUT)]
+ + _PROMPT_ELL_OUT)
+ self._write(to_write[:cursor_x_to_write], self._y, padding=False)
+ self._write(to_write[cursor_x_to_write], attribute='reverse',
+ padding=False)
+ self._write(to_write[cursor_x_to_write + 1:])
+
+ def _scroll(self, up: bool = True) -> None:
+ if up and -(self._history_idx) < len(self._history):
+ if self._history_idx == 0 and self._input_buffer:
+ self._history += [self._input_buffer[:]]
+ self._reset_buffer('')
+ self._history_idx -= 1
+ self._history_idx -= 1
+ self._reset_buffer(self._history[self._history_idx])
+ elif not up:
+ if self._history_idx < 0:
+ self._history_idx += 1
+ if self._history_idx == 0:
+ self._reset_buffer('')
+ else:
+ self._reset_buffer(self._history[self._history_idx])
+ elif self._input_buffer:
+ self._history += [self._input_buffer[:]]
+ self._reset_buffer('')
+
+ def cmd__backspace(self) -> None:
+ 'Truncate current content by one character, if possible.'
+ if self._cursor_x > 0:
+ self._cursor_x -= 1
+ self._input_buffer = (self._input_buffer[:self._cursor_x]
+ + self._input_buffer[self._cursor_x + 1:])
+ self._history_idx = 0
+ self.draw()
+
+ def cmd__move_cursor(self, direction: str) -> None:
+ 'Move cursor one space into direction ("left" or "right") if possible.'
+ if direction == 'left' and self._cursor_x > 0:
+ self._cursor_x -= 1
+ elif direction == 'right'\
+ and self._cursor_x <= len(self._input_buffer):
+ self._cursor_x += 1
+ else:
+ return
+ self.draw()
+
+ def _reset_buffer(self, content: str) -> None:
+ self._input_buffer = content
+ self._cursor_x = len(self._input_buffer)
+
+ def enter(self) -> str:
+ 'Return current content while also clearing and then redrawing.'
+ to_return = self._input_buffer[:]
+ if to_return:
+ self._history += [to_return]
+ self._reset_buffer('')
+ self.draw()
+ return to_return
+
+
+class _ConnectionPromptWidget(_PromptWidget):
+ 'PromptWidget with attributes, methods for dealing with an IrcConnection.'
+ _nickname: str = ''
+
+ def update_prompt(self,
+ nick_confirmed=False,
+ nick: Optional[str] = None
+ ) -> None:
+ 'Update nickname-relevant knowledge to go into prompt string.'
+ self._prompt = ''
+ if nick:
+ self._nickname = nick
+ if self._nickname:
+ self._prompt += ' ' if nick_confirmed else '?'
+ self._prompt += self._nickname
+ self._prompt += _PROMPT_TEMPLATE
+
+
+class _Window(_Widget):
+ 'Collects a log and a prompt meant for the same content stream.'
+ _y_status: int
+ prompt: _PromptWidget
+
+ def __init__(self, idx: int, term: 'Terminal') -> None:
+ self.idx = idx
+ self._term = term
+ self.log = _LogWidget(self._term.wrap, self._term.write)
+ self.prompt = self.__annotations__['prompt'](self._term.write)
+ if hasattr(self._term, 'size'):
+ self.set_geometry()
+
+ def set_geometry(self, _=None) -> None:
+ assert _ is None
+ self._y_status = self._term.size.y - 2
+ self.log.set_geometry(_YX(self._y_status, self._term.size.x))
+ self.prompt.set_geometry(_YX(self._term.size.y - 1, self._term.size.x))
+
+ def draw(self) -> None:
+ idx_box = f'[{self.idx}]'
+ status_line = idx_box + '=' * (self._term.size.x - len(idx_box))
+ self._term.clear()
+ self.log.draw()
+ self._term.write(status_line, self._y_status)
+ self.prompt.draw()
+
+ def cmd__paste(self) -> None:
+ 'Write OSC 52 ? sequence to get encoded clipboard paste into stdin.'
+ self._term.write(f'\033{_OSC52_PREFIX}?{_PASTE_DELIMITER}',
+ self._y_status)
+ self.draw()
+
+
+class _ConnectionWindow(_Window):
+ 'Window with attributes and methods for dealing with an IrcConnection.'
+ prompt: _ConnectionPromptWidget
+
+ def __init__(self,
+ broadcast: Callable[[EventType, Any], None],
+ conn_idx: int,
+ *args, **kwargs
+ ) -> None:
+ self._broadcast = broadcast
+ self._conn_idx = conn_idx
+ super().__init__(*args, **kwargs)
+
+ def cmd__disconnect(self, quit_msg: str = 'ircplom says bye') -> None:
+ 'Send QUIT command to server.'
+ self._broadcast(EventType.SEND,
+ (self._conn_idx, IrcMessage('QUIT', (quit_msg, ))))
+
+ def cmd__reconnect(self) -> None:
+ 'Attempt reconnection.'
+ self._broadcast(EventType.INIT_RECONNECT, (self._conn_idx,))
+
+
+class _KeyboardLoop(Loop):
+ 'Loop receiving and translating keyboard events towards main loop.'
+
+ def process_bonus(self, yielded: str) -> None:
+ if yielded.startswith(_B64_PREFIX):
+ encoded = yielded[len(_B64_PREFIX):]
+ to_paste = ''
+ for i, c in enumerate(b64decode(encoded).decode('utf-8')):
+ if i > 512:
+ break
+ if c.isprintable():
+ to_paste += c
+ elif c.isspace():
+ to_paste += ' '
+ else:
+ to_paste += '#'
+ self.broadcast(EventType.PROMPT_ADD, to_paste)
+ elif yielded in _KEYBINDINGS:
+ self.broadcast(EventType.KEYBINDING, _KEYBINDINGS[yielded])
+ elif len(yielded) == 1:
+ self.broadcast(EventType.PROMPT_ADD, yielded)
+ else:
+ self.broadcast(EventType.ALERT,
+ f'unknown keyboard input: {yielded}')
+
+
+class _TuiLoop(Loop):
+ 'Loop for drawing/updating TUI.'
+
+ def __init__(self, term: 'Terminal', *args, **kwargs) -> None:
+ self._term = term
+ self._windows = [_Window(0, self._term)]
+ self._window_idx = 0
+ self._conn_windows: list[_ConnectionWindow] = []
+ super().__init__(*args, **kwargs)
+ self.put(Event(EventType.SET_SCREEN))
+
+ def _cmd_name_to_cmd(self, cmd_name: str) -> Optional[Callable]:
+ cmd_name = CMD_SHORTCUTS.get(cmd_name, cmd_name)
+ cmd_parent = self
+ while True:
+ cmd_name_toks = cmd_name.split('.', maxsplit=1)
+ if len(cmd_name_toks) == 1:
+ break
+ if not hasattr(cmd_parent, cmd_name_toks[0]):
+ return None
+ cmd_parent = getattr(cmd_parent, cmd_name_toks[0])
+ cmd_name = cmd_name_toks[1]
+ cmd_name = f'cmd__{cmd_name}'
+ if not hasattr(cmd_parent, cmd_name):
+ return None
+ return getattr(cmd_parent, cmd_name)
+
+ def process_main(self, event: Event) -> bool:
+ if not super().process_main(event):
+ return False
+ if event.type_ == EventType.SET_SCREEN:
+ self._term.calc_geometry()
+ for window in self._windows:
+ window.set_geometry()
+ self.window.draw()
+ elif event.type_ == EventType.CONN_WINDOW:
+ conn_win = _ConnectionWindow(broadcast=self.broadcast,
+ conn_idx=event.payload[0],
+ idx=len(self._windows),
+ term=self._term)
+ conn_win.prompt.update_prompt(nick_confirmed=False,
+ nick=event.payload[1])
+ self._windows += [conn_win]
+ self._conn_windows += [conn_win]
+ self._switch_window(conn_win.idx)
+ elif event.type_ == EventType.ALERT:
+ self.window.log.append(f'ALERT {event.payload}')
+ self.window.log.draw()
+ elif event.type_ in {EventType.RECV, EventType.SEND,
+ EventType.CONN_ALERT}:
+ conn_win = self._conn_windows[event.payload[0]]
+ if event.type_ == EventType.CONN_ALERT:
+ msg = f'ALERT {event.payload[1]}'
+ else:
+ msg = (('<-' if event.type_ == EventType.RECV else '->')
+ + event.payload[1].raw)
+ conn_win.log.append(msg)
+ if conn_win == self.window:
+ self.window.log.draw()
+ elif event.type_ == EventType.NICK_SET:
+ conn_win = self._conn_windows[event.payload[0]]
+ conn_win.prompt.update_prompt(nick_confirmed=True,
+ nick=event.payload[1])
+ if conn_win == self.window:
+ self.window.prompt.draw()
+ elif event.type_ == EventType.DISCONNECTED:
+ conn_win = self._conn_windows[event.payload[0]]
+ conn_win.prompt.update_prompt(nick_confirmed=False)
+ if conn_win == self.window:
+ self.window.prompt.draw()
+ elif event.type_ == EventType.KEYBINDING:
+ cmd = self._cmd_name_to_cmd(event.payload[0])
+ assert cmd is not None
+ cmd(*event.payload[1:])
+ elif event.type_ == EventType.PROMPT_ADD:
+ self.window.prompt.append(event.payload)
+ # elif event.type_ == EventType.DEBUG:
+ # from traceback import format_exception
+ # for line in '\n'.join(format_exception(event.payload)
+ # ).split('\n'):
+ # self.window.log.append(f'DEBUG {line}')
+ # self.window.log.draw()
+ else:
+ return True
+ self._term.flush()
+ return True
+
+ @property
+ def window(self) -> _Window:
+ 'Currently selected Window.'
+ return self._windows[self._window_idx]
+
+ def _switch_window(self, idx: int) -> None:
+ self._window_idx = idx
+ self.window.draw()
+
+ def cmd__connect(self, hostname: str, nickname: str, realname: str
+ ) -> None:
+ 'Send INIT_CONNECT command to main loop.'
+ login = LoginNames(user=getusername(), nick=nickname, real=realname)
+ self.broadcast(EventType.INIT_CONNECT, (hostname, login))
+
+ def cmd__prompt_enter(self) -> None:
+ 'Get prompt content from .window.prompt.enter, parse to & run command.'
+ to_parse = self.window.prompt.enter()
+ if not to_parse:
+ return
+ alert: Optional[str] = None
+ if to_parse[0:1] == '/':
+ toks = to_parse[1:].split(maxsplit=1)
+ alert = f'{toks[0]} unknown'
+ cmd = self._cmd_name_to_cmd(toks[0])
+ if cmd and cmd.__name__ != stack()[0].function:
+ params = signature(cmd).parameters
+ n_args_max = len(params)
+ n_args_min = len([p for p in params.values()
+ if p.default == inspect_empty])
+ alert = f'{cmd.__name__} needs between {n_args_min} and '\
+ f'{n_args_max} args'
+ if len(toks) == 1 and not n_args_min:
+ alert = cmd()
+ elif len(toks) > 1 and params\
+ and n_args_min <= len(toks[1].split()):
+ args = []
+ while len(toks) > 1 and n_args_max:
+ toks = toks[1].split(maxsplit=1)
+ args += [toks[0]]
+ n_args_max -= 1
+ alert = cmd(*args)
+ else:
+ alert = 'not prefixed by /'
+ if alert:
+ self.broadcast(EventType.ALERT, f'invalid prompt command: {alert}')
+
+ def cmd__quit(self) -> None:
+ 'Send QUIT to all threads.'
+ self.broadcast(EventType.QUIT)
+
+ def cmd__window(self, towards: str) -> Optional[str]:
+ 'Switch window selection.'
+ n_windows = len(self._windows)
+ if n_windows < 2:
+ return 'no alternate window to move into'
+ if towards in {'left', 'right'}:
+ multiplier = (+1) if towards == 'right' else (-1)
+ window_idx = self._window_idx + multiplier
+ if not 0 <= window_idx < n_windows:
+ window_idx -= multiplier * n_windows
+ elif not towards.isdigit():
+ return f'neither "left"/"right" nor integer: {towards}'
+ else:
+ window_idx = int(towards)
+ if not 0 <= window_idx < n_windows:
+ return f'unavailable window idx: {window_idx}'
+ self._switch_window(window_idx)
+ return None
+
+
+class Terminal:
+ 'Abstraction of terminal interface.'
+ size: _YX
+ tui: _TuiLoop
+ _blessed: BlessedTerminal
+ _cursor_yx_: _YX
+
+ @contextmanager
+ def context(self, q_to_main: EventQueue) -> Generator:
+ 'Combine multiple contexts into one.'
+ signal(SIGWINCH, lambda *_: q_to_main.eput(EventType.SET_SCREEN))
+ self._blessed = BlessedTerminal()
+ with (self._blessed.raw(),
+ self._blessed.fullscreen(),
+ self._blessed.hidden_cursor(),
+ _KeyboardLoop(q_to_main, self.get_keypresses())):
+ self._cursor_yx = _YX(0, 0)
+ with _TuiLoop(self, q_to_main) as self.tui:
+ yield self
+
+ @property
+ def _cursor_yx(self) -> _YX:
+ return self._cursor_yx_
+
+ @_cursor_yx.setter
+ def _cursor_yx(self, yx: _YX) -> None:
+ print(self._blessed.move_yx(yx.y, yx.x), end='')
+ self._cursor_yx_ = yx
+
+ def calc_geometry(self) -> None:
+ '(Re-)calculate .size..'
+ self.size = _YX(self._blessed.height, self._blessed.width)
+
+ def clear(self) -> None:
+ 'Clear terminal.'
+ print(self._blessed.clear, end='')
+
+ def flush(self) -> None:
+ 'Flush terminal.'
+ print('', end='', flush=True)
+
+ def wrap(self, line: str) -> list[str]:
+ 'Wrap line to list of lines fitting into terminal width.'
+ return self._blessed.wrap(line, width=self.size.x,
+ subsequent_indent=' '*4)
+
+ def write(self,
+ msg: str = '',
+ start_y: Optional[int] = None,
+ attribute: Optional[str] = None,
+ padding: bool = True
+ ) -> None:
+ 'Print to terminal, with position, padding to line end, attributes.'
+ if start_y:
+ self._cursor_yx = _YX(start_y, 0)
+ # ._blessed.length can slow down things notably: only use where needed!
+ end_x = self._cursor_yx.x + (len(msg) if msg.isascii()
+ else self._blessed.length(msg))
+ len_padding = self.size.x - end_x
+ if len_padding < 0:
+ msg = self._blessed.truncate(msg, self.size.x - self._cursor_yx.x)
+ elif padding:
+ msg += ' ' * len_padding
+ end_x = self.size.x
+ if attribute:
+ msg = getattr(self._blessed, attribute)(msg)
+ print(msg, end='')
+ self._cursor_yx = _YX(self._cursor_yx.y, end_x)
+
+ def get_keypresses(self) -> Iterator[str]:
+ '''Loop through keypresses from terminal, collect what blessed ignores.
+
+ (Notably, blessed seems to junk any alt/escape-modifide key events it
+ does not explicitly know.
+ '''
+ n_gotchs_unprocessed = 0
+ while True:
+ new_gotchs = []
+ if not n_gotchs_unprocessed:
+ while self._blessed.kbhit(TIMEOUT_LOOP):
+ gotch = self._blessed.getch()
+ self._blessed.ungetch(gotch)
+ new_gotchs += [gotch]
+ if not self._blessed.kbhit(0):
+ break
+ n_gotchs_unprocessed += len(new_gotchs)
+ blessed_key = self._blessed.inkey(timeout=0, esc_delay=0)
+ n_chs_blessed_key = len(blessed_key.encode('utf-8'))
+ unhandleds = []
+ if len(new_gotchs) > 1 and blessed_key.name == 'KEY_ESCAPE':
+ for _ in range(len(new_gotchs) - n_chs_blessed_key):
+ unhandled = self._blessed.inkey(timeout=0, esc_delay=0)
+ unhandleds += list(unhandled.encode('utf-8'))
+ n_gotchs_unprocessed -= 1
+ n_gotchs_unprocessed -= n_chs_blessed_key
+ if unhandleds:
+ fused = ''.join([chr(n) for n in unhandleds])
+ if fused.startswith(_OSC52_PREFIX):
+ if not (encoded := fused[len(_OSC52_PREFIX):]):
+ while True:
+ gotch = self._blessed.getch()
+ if gotch == _PASTE_DELIMITER:
+ break
+ encoded += gotch
+ yield f'{_B64_PREFIX}{encoded}'
+ continue
+ yield str(unhandleds)
+ elif blessed_key.name:
+ yield blessed_key.name
+ else:
+ yield str(blessed_key)