home · contact · privacy
Split up code into module files. master
authorChristian Heller <c.heller@plomlompom.de>
Fri, 13 Jun 2025 15:53:23 +0000 (17:53 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Fri, 13 Jun 2025 15:53:23 +0000 (17:53 +0200)
ircplom/__init__.py [new file with mode: 0644]
ircplom/events.py [new file with mode: 0644]
ircplom/irc_conn.py [new file with mode: 0644]
ircplom/tui.py [new file with mode: 0644]

diff --git a/ircplom/__init__.py b/ircplom/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/ircplom/events.py b/ircplom/events.py
new file mode 100644 (file)
index 0000000..ae0f33c
--- /dev/null
@@ -0,0 +1,105 @@
+'Event system with event loop.'
+from enum import Enum, auto
+from queue import SimpleQueue, Empty as QueueEmpty
+from threading import Thread
+from typing import Any, Iterator, Literal, NamedTuple, Optional, Self
+
+
+class EventType(Enum):
+    'Differentiate Events for different treatment.'
+    ALERT = auto()
+    CONN_ALERT = auto()
+    CONNECTED = auto()
+    CONN_WINDOW = auto()
+    DISCONNECTED = auto()
+    EXCEPTION = auto()
+    INIT_CONNECT = auto()
+    INIT_RECONNECT = auto()
+    KEYBINDING = auto()
+    NICK_SET = auto()
+    PING = auto()
+    PROMPT_ADD = auto()
+    QUIT = auto()
+    RECV = auto()
+    SEND = auto()
+    SET_SCREEN = auto()
+
+
+class Event(NamedTuple):
+    'Communication unit between threads.'
+    type_: EventType
+    payload: Any = None
+
+
+class EventQueue(SimpleQueue):
+    'SimpleQueue wrapper optimized for handling Events.'
+
+    def eput(self, type_: EventType, payload: Any = None) -> None:
+        'Construct Event(type_, payload) and .put it onto queue.'
+        self.put(Event(type_, payload))
+
+
+class Loop:
+    'Wraps thread looping over .eput input queue, potential bonus iterator.'
+
+    def __init__(self,
+                 q_to_main: EventQueue,
+                 bonus_iterator: Optional[Iterator] = None
+                 ) -> None:
+        self._q_to_main = q_to_main
+        self._bonus_iterator = bonus_iterator
+        self._q_input = EventQueue()
+        self._thread = Thread(target=self._loop, daemon=False)
+        self._thread.start()
+
+    def stop(self) -> None:
+        'Emit "QUIT" signal to break threaded loop, then wait for break.'
+        self._q_input.eput(EventType.QUIT)
+        self._thread.join()
+
+    def __enter__(self) -> Self:
+        return self
+
+    def __exit__(self, *_) -> Literal[False]:
+        self.stop()
+        return False  # re-raise any exception that above ignored
+
+    def put(self, event: Event) -> None:
+        'Send event into thread loop.'
+        self._q_input.put(event)
+
+    def broadcast(self, type_: EventType, payload: Any = None) -> None:
+        'Send event to main loop via queue.'
+        self._q_to_main.eput(type_, payload)
+
+    def process_main(self, event: Event) -> bool:
+        'Process event yielded from input queue.'
+        if event.type_ == EventType.QUIT:
+            return False
+        return True
+
+    def process_bonus(self, yielded: str) -> None:
+        'Process bonus iterator yield.'
+
+    def _loop(self) -> None:
+        'Loop over input queue and, if provided, bonus iterator.'
+        try:
+            while True:
+                try:
+                    yield_main = self._q_input.get(
+                        block=True,
+                        timeout=0 if self._bonus_iterator else None)
+                except QueueEmpty:
+                    pass
+                else:
+                    if self.process_main(yield_main) is False:
+                        break
+                if self._bonus_iterator:
+                    try:
+                        yield_bonus = next(self._bonus_iterator)
+                    except StopIteration:
+                        break
+                    if yield_bonus:
+                        self.process_bonus(yield_bonus)
+        except Exception as e:  # pylint: disable=broad-exception-caught
+            self._q_to_main.eput(EventType.EXCEPTION, e)
diff --git a/ircplom/irc_conn.py b/ircplom/irc_conn.py
new file mode 100644 (file)
index 0000000..eb3c575
--- /dev/null
@@ -0,0 +1,273 @@
+'IRC server connection management.'
+# built-ins
+from socket import socket, gaierror as socket_gaierror
+from threading import Thread
+from typing import Any, Callable, Iterator, NamedTuple, Optional, Self
+# ourselves
+from ircplom.events import Event, EventQueue, EventType, Loop
+
+
+TIMEOUT_LOOP = 0.1
+
+_TIMEOUT_CONNECT = 5
+_CONN_RECV_BUFSIZE = 1024
+_PORT = 6667
+
+_IRCSPEC_LINE_SEPARATOR = b'\r\n'
+_IRCSPEC_TAG_ESCAPES = ((r'\:', ';'),
+                        (r'\s', ' '),
+                        (r'\n', '\n'),
+                        (r'\r', '\r'),
+                        (r'\\', '\\'))
+
+
+class LoginNames(NamedTuple):
+    'Collects the names needed on server connect for USER, NICK commands.'
+    user: str
+    nick: str
+    real: str
+
+
+class IrcConnection:
+    'Abstracts socket connection, loop over it, and handling messages from it.'
+
+    def __init__(self,
+                 q_to_main: EventQueue,
+                 idx: int,
+                 hostname: str,
+                 login: LoginNames,
+                 ) -> None:
+        self._idx = idx
+        self._q_to_main = q_to_main
+        self._hostname = hostname
+        self._login = login
+        self._socket: Optional[socket] = None
+        self._assumed_open = False
+        self._loop: Optional[_ConnectionLoop] = None
+        self._broadcast(EventType.CONN_WINDOW, self._login.nick)
+        self._start_connecting()
+
+    def _start_connecting(self) -> None:
+
+        def connect(self) -> None:
+            self._socket = socket()
+            self._broadcast(EventType.CONN_ALERT,
+                            f'Connecting to {self._hostname} …')
+            self._socket.settimeout(_TIMEOUT_CONNECT)
+            try:
+                self._socket.connect((self._hostname, _PORT))
+            except (TimeoutError, socket_gaierror) as e:
+                self._broadcast(EventType.CONN_ALERT, str(e))
+                return
+            self._socket.settimeout(TIMEOUT_LOOP)
+            self._assumed_open = True
+            self._loop = _ConnectionLoop(self._idx, self._q_to_main,
+                                         self._read_lines())
+            self._broadcast(EventType.CONNECTED, self._login)
+
+        Thread(target=connect, daemon=True, args=(self,)).start()
+
+    def close(self) -> None:
+        'Close both _ConnectionLoop and socket.'
+        self._assumed_open = False
+        if self._loop:
+            self._loop.stop()
+        self._loop = None
+        if self._socket:
+            self._socket.close()
+        self._socket = None
+
+    def _broadcast(self, type_: EventType, payload: Any = None) -> None:
+        'Send event to main loop via queue, with connection index as 1st arg.'
+        self._q_to_main.eput(type_, (self._idx, payload))
+
+    def _read_lines(self) -> Iterator[Optional[str]]:
+        'Receive line-separator-delimited messages from socket.'
+        assert self._socket is not None
+        bytes_total = b''
+        buffer_linesep = b''
+        while True:
+            try:
+                bytes_new = self._socket.recv(_CONN_RECV_BUFSIZE)
+            except TimeoutError:
+                yield None
+                continue
+            except OSError as e:
+                if e.errno == 9:
+                    break
+                raise e
+            if not bytes_new:
+                break
+            for c in bytes_new:
+                c_byted = c.to_bytes()
+                if c not in _IRCSPEC_LINE_SEPARATOR:
+                    bytes_total += c_byted
+                    buffer_linesep = b''
+                elif c == _IRCSPEC_LINE_SEPARATOR[0]:
+                    buffer_linesep = c_byted
+                else:
+                    buffer_linesep += c_byted
+                if buffer_linesep == _IRCSPEC_LINE_SEPARATOR:
+                    buffer_linesep = b''
+                    yield bytes_total.decode('utf-8')
+                    bytes_total = b''
+
+    def _write_line(self, line: str) -> None:
+        'Send line-separator-delimited message over socket.'
+        if not (self._socket and self._assumed_open):
+            self._broadcast(EventType.CONN_ALERT,
+                            'cannot send, assuming connection closed')
+            return
+        self._socket.sendall(line.encode('utf-8') + _IRCSPEC_LINE_SEPARATOR)
+
+    def handle(self, event: Event) -> None:
+        'Process connection-directed Event into further steps.'
+        if event.type_ == EventType.INIT_RECONNECT:
+            if self._assumed_open:
+                self._broadcast(EventType.CONN_ALERT,
+                                'Reconnect called, but still seem connected, '
+                                'so nothing to do.')
+            else:
+                self._start_connecting()
+        elif event.type_ == EventType.CONNECTED:
+            assert self._loop is not None
+            self._loop.put(event)
+        elif event.type_ == EventType.DISCONNECTED:
+            self.close()
+        elif event.type_ == EventType.SEND:
+            msg: IrcMessage = event.payload[1]
+            self._write_line(msg.raw)
+
+
+class IrcMessage:
+    'Properly structured representation of IRC message as per IRCv3 spec.'
+    _raw: Optional[str] = None
+
+    def __init__(self,
+                 verb: str,
+                 parameters: Optional[tuple[str, ...]] = None,
+                 source: str = '',
+                 tags: Optional[dict[str, str]] = None
+                 ) -> None:
+        self.verb: str = verb
+        self.parameters: tuple[str, ...] = parameters or tuple()
+        self.source: str = source
+        self.tags: dict[str, str] = tags or {}
+
+    @classmethod
+    def from_raw(cls, raw_msg: str) -> Self:
+        'Parse raw IRC message line into properly structured IrcMessage.'
+
+        class _Stage(NamedTuple):
+            name: str
+            prefix_char: Optional[str]
+            processor: Callable = lambda s: s
+
+        def _parse_tags(str_tags: str) -> dict[str, str]:
+            tags = {}
+            for str_tag in [s for s in str_tags.split(';') if s]:
+                if '=' in str_tag:
+                    key, val = str_tag.split('=', maxsplit=1)
+                    for to_repl, repl_with in _IRCSPEC_TAG_ESCAPES:
+                        val = val.replace(to_repl, repl_with)
+                else:
+                    key, val = str_tag, ''
+                tags[key] = val
+            return tags
+
+        def _split_params(str_params: str) -> tuple[str, ...]:
+            params = []
+            params_stage = 0  # 0: gap, 1: non-trailing, 2: trailing
+            for char in str_params:
+                if char == ' ' and params_stage < 2:
+                    params_stage = 0
+                    continue
+                if params_stage == 0:
+                    params += ['']
+                    params_stage += 1
+                    if char == ':':
+                        params_stage += 1
+                        continue
+                params[-1] += char
+            return tuple(p for p in params)
+
+        stages = [_Stage('tags', '@', _parse_tags),
+                  _Stage('source', ':'),
+                  _Stage('verb', None, lambda s: s.upper()),
+                  _Stage('parameters', None, _split_params)]
+        harvest = {s.name: '' for s in stages}
+        idx_stage = 0
+        stage = None
+        for char in raw_msg:
+            if char == ' ' and idx_stage < (len(stages) - 1):
+                if stage:
+                    stage = None
+                continue
+            if not stage:
+                while not stage:
+                    idx_stage += 1
+                    tested = stages[idx_stage]
+                    if (not tested.prefix_char) or char == tested.prefix_char:
+                        stage = tested
+                if stage.prefix_char:
+                    continue
+            harvest[stage.name] += char
+        msg = cls(**{s.name: s.processor(harvest[s.name]) for s in stages})
+        msg._raw = raw_msg
+        return msg
+
+    @property
+    def raw(self) -> str:
+        'Return raw message code – create from known fields if necessary.'
+        if not self._raw:
+            to_combine = []
+            if self.tags:
+                tag_strs = []
+                for key, val in self.tags.items():
+                    tag_strs += [key]
+                    if not val:
+                        continue
+                    for repl_with, to_repl in reversed(_IRCSPEC_TAG_ESCAPES):
+                        val = val.replace(to_repl, repl_with)
+                    tag_strs[-1] += f'={val}'
+                to_combine += ['@' + ';'.join(tag_strs)]
+            to_combine += [self.verb]
+            if self.parameters:
+                to_combine += self.parameters[:-1]
+                to_combine += [f':{self.parameters[-1]}']
+            self._raw = ' '.join(to_combine)
+        return self._raw
+
+
+class _ConnectionLoop(Loop):
+    'Loop receiving and translating socket messages towards main loop.'
+
+    def __init__(self, connection_idx: int, *args, **kwargs) -> None:
+        self._conn_idx = connection_idx
+        super().__init__(*args, **kwargs)
+
+    def _broadcast_conn(self, type_: EventType, *args) -> None:
+        self.broadcast(type_, (self._conn_idx, *args))
+
+    def _send(self, verb: str, parameters: tuple[str, ...]) -> None:
+        self._broadcast_conn(EventType.SEND, IrcMessage(verb, parameters))
+
+    def process_main(self, event: Event) -> bool:
+        if not super().process_main(event):
+            return False
+        if event.type_ == EventType.CONNECTED:
+            login = event.payload[1]
+            self._send('USER', (login.user, '0', '*', login.real))
+            self._send('NICK', (login.nick,))
+        return True
+
+    def process_bonus(self, yielded: str) -> None:
+        msg = IrcMessage.from_raw(yielded)
+        self._broadcast_conn(EventType.RECV, msg)
+        if msg.verb == 'PING':
+            self._send('PONG', (msg.parameters[0],))
+        elif msg.verb == 'ERROR'\
+                and msg.parameters[0].startswith('Closing link:'):
+            self._broadcast_conn(EventType.DISCONNECTED)
+        elif msg.verb == '001':
+            self._broadcast_conn(EventType.NICK_SET, msg.parameters[0])
diff --git a/ircplom/tui.py b/ircplom/tui.py
new file mode 100644 (file)
index 0000000..b7fcbd3
--- /dev/null
@@ -0,0 +1,617 @@
+'Terminal and TUI management.'
+# built-ins
+from abc import ABC, abstractmethod
+from base64 import b64decode
+from contextlib import contextmanager
+from getpass import getuser as getusername
+from inspect import _empty as inspect_empty, signature, stack
+from signal import SIGWINCH, signal
+from typing import Any, Callable, Generator, Iterator, NamedTuple, Optional
+# requirements.txt
+from blessed import Terminal as BlessedTerminal
+# ourselves
+from ircplom.events import Event, EventType, EventQueue, Loop
+from ircplom.irc_conn import IrcMessage, LoginNames, TIMEOUT_LOOP
+
+
+_B64_PREFIX = 'b64:'
+_OSC52_PREFIX = ']52;c;'
+_PASTE_DELIMITER = '\007'
+
+_PROMPT_TEMPLATE = '> '
+_PROMPT_ELL_IN = '<…'
+_PROMPT_ELL_OUT = '…>'
+
+_KEYBINDINGS = {
+    'KEY_BACKSPACE': ('window.prompt.backspace',),
+    'KEY_ENTER': ('prompt_enter',),
+    'KEY_LEFT': ('window.prompt.move_cursor', 'left'),
+    'KEY_RIGHT': ('window.prompt.move_cursor', 'right'),
+    'KEY_UP': ('window.prompt.scroll', 'up'),
+    'KEY_DOWN': ('window.prompt.scroll', 'down'),
+    'KEY_PGUP': ('window.log.scroll', 'up'),
+    'KEY_PGDOWN': ('window.log.scroll', 'down'),
+    '[91, 49, 59, 51, 68]': ('window', 'left'),
+    '[91, 49, 59, 51, 67]': ('window', 'right'),
+    'KEY_F1': ('window.paste',),
+}
+CMD_SHORTCUTS = {
+    'disconnect': 'window.disconnect',
+    'reconnect': 'window.reconnect'
+}
+
+
+class _YX(NamedTuple):
+    '2-dimensional coordinate.'
+    y: int
+    x: int
+
+
+class _Widget(ABC):
+    'Defines most basic TUI object API.'
+
+    @abstractmethod
+    def set_geometry(self, measurements: _YX) -> None:
+        'Update widget\'s measurements, re-generate content where necessary.'
+
+    @abstractmethod
+    def draw(self) -> None:
+        'Print widget\'s content in shape appropriate to set geometry.'
+
+
+class _ScrollableWidget(_Widget, ABC):
+    'Defines some API shared between _PromptWidget and _LogWidget.'
+    _history_idx: int
+
+    def __init__(self, write: Callable[..., None], *args, **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self._write = write
+        self._history: list[str] = []
+
+    @abstractmethod
+    def append(self, to_append: str) -> None:
+        'Append to widget content.'
+
+    @abstractmethod
+    def _scroll(self, up=True) -> None:
+        pass
+
+    def cmd__scroll(self, direction: str) -> None:
+        'Scroll through stored content/history.'
+        self._scroll(up=direction == 'up')
+        self.draw()
+
+
+class _LogWidget(_ScrollableWidget):
+    'Collects line-shaped messages, scrolls and wraps them for display.'
+    _view_size: _YX
+    _y_pgscroll: int
+
+    def __init__(self, wrap: Callable[[str], list[str]], *args, **kwargs
+                 ) -> None:
+        super().__init__(*args, **kwargs)
+        self._wrap = wrap
+        self._wrapped_idx = self._history_idx = -1
+        self._wrapped: list[tuple[Optional[int], str]] = []
+
+    def _add_wrapped(self, idx_original, line) -> int:
+        wrapped_lines = self._wrap(line)
+        self._wrapped += [(idx_original, line) for line in wrapped_lines]
+        return len(wrapped_lines)
+
+    def set_geometry(self, measurements: _YX) -> None:
+        self._view_size = measurements
+        self._y_pgscroll = self._view_size.y // 2
+        self._wrapped.clear()
+        self._wrapped += [(None, '')] * self._view_size.y
+        if not self._history:
+            return
+        for idx_history, line in enumerate(self._history):
+            self._add_wrapped(idx_history, line)
+        wrapped_lines_for_history_idx = [
+                t for t in self._wrapped
+                if t[0] == len(self._history) + self._history_idx]
+        idx_their_last = self._wrapped.index(wrapped_lines_for_history_idx[-1])
+        self._wrapped_idx = idx_their_last - len(self._wrapped)
+
+    def append(self, to_append: str) -> None:
+        self._history += [to_append]
+        n_wrapped_lines = self._add_wrapped(len(self._history) - 1, to_append)
+        if self._wrapped_idx < -1:
+            self._history_idx -= 1
+            self._wrapped_idx -= n_wrapped_lines
+
+    def draw(self) -> None:
+        start_idx = self._wrapped_idx - self._view_size.y + 1
+        end_idx = self._wrapped_idx
+        to_write = [t[1] for t in self._wrapped[start_idx:end_idx]]
+        if self._wrapped_idx < -1:
+            scroll_info = f'vvv [{(-1) * self._wrapped_idx}] '
+            scroll_info += 'v' * (self._view_size.x - len(scroll_info))
+            to_write += [scroll_info]
+        else:
+            to_write += [self._wrapped[self._wrapped_idx][1]]
+        for i, line in enumerate(to_write):
+            self._write(line, i)
+
+    def _scroll(self, up: bool = True) -> None:
+        if up:
+            self._wrapped_idx = max(self._view_size.y + 1 - len(self._wrapped),
+                                    self._wrapped_idx - self._y_pgscroll)
+        else:
+            self._wrapped_idx = min(-1,
+                                    self._wrapped_idx + self._y_pgscroll)
+        history_idx_to_wrapped_idx = self._wrapped[self._wrapped_idx][0]
+        if history_idx_to_wrapped_idx is not None:
+            self._history_idx = history_idx_to_wrapped_idx - len(self._history)
+
+
+class _PromptWidget(_ScrollableWidget):
+    'Keyboard-controlled command input field.'
+    _y: int
+    _width: int
+    _prompt: str = _PROMPT_TEMPLATE
+    _history_idx = 0
+    _input_buffer: str
+    _cursor_x: int
+
+    def __init__(self, *args, **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self._reset_buffer('')
+
+    def set_geometry(self, measurements: _YX) -> None:
+        self._y, self._width = measurements
+
+    def append(self, to_append: str) -> None:
+        self._cursor_x += len(to_append)
+        self._input_buffer = (self._input_buffer[:self._cursor_x - 1]
+                              + to_append
+                              + self._input_buffer[self._cursor_x - 1:])
+        self._history_idx = 0
+        self.draw()
+
+    def draw(self) -> None:
+        prefix = self._prompt[:]
+        content = self._input_buffer[:]
+        if self._cursor_x == len(self._input_buffer):
+            content += ' '
+        half_width = (self._width - len(prefix)) // 2
+        offset = 0
+        if len(prefix) + len(content) > self._width\
+                and self._cursor_x > half_width:
+            prefix += _PROMPT_ELL_IN
+            offset = min(len(prefix) + len(content) - self._width,
+                         self._cursor_x - half_width + len(_PROMPT_ELL_IN))
+        cursor_x_to_write = len(prefix) + self._cursor_x - offset
+        to_write = f'{prefix}{content[offset:]}'
+        if len(to_write) > self._width:
+            to_write = (to_write[:self._width - len(_PROMPT_ELL_OUT)]
+                        + _PROMPT_ELL_OUT)
+        self._write(to_write[:cursor_x_to_write], self._y, padding=False)
+        self._write(to_write[cursor_x_to_write], attribute='reverse',
+                    padding=False)
+        self._write(to_write[cursor_x_to_write + 1:])
+
+    def _scroll(self, up: bool = True) -> None:
+        if up and -(self._history_idx) < len(self._history):
+            if self._history_idx == 0 and self._input_buffer:
+                self._history += [self._input_buffer[:]]
+                self._reset_buffer('')
+                self._history_idx -= 1
+            self._history_idx -= 1
+            self._reset_buffer(self._history[self._history_idx])
+        elif not up:
+            if self._history_idx < 0:
+                self._history_idx += 1
+                if self._history_idx == 0:
+                    self._reset_buffer('')
+                else:
+                    self._reset_buffer(self._history[self._history_idx])
+            elif self._input_buffer:
+                self._history += [self._input_buffer[:]]
+                self._reset_buffer('')
+
+    def cmd__backspace(self) -> None:
+        'Truncate current content by one character, if possible.'
+        if self._cursor_x > 0:
+            self._cursor_x -= 1
+            self._input_buffer = (self._input_buffer[:self._cursor_x]
+                                  + self._input_buffer[self._cursor_x + 1:])
+            self._history_idx = 0
+            self.draw()
+
+    def cmd__move_cursor(self, direction: str) -> None:
+        'Move cursor one space into direction ("left" or "right") if possible.'
+        if direction == 'left' and self._cursor_x > 0:
+            self._cursor_x -= 1
+        elif direction == 'right'\
+                and self._cursor_x <= len(self._input_buffer):
+            self._cursor_x += 1
+        else:
+            return
+        self.draw()
+
+    def _reset_buffer(self, content: str) -> None:
+        self._input_buffer = content
+        self._cursor_x = len(self._input_buffer)
+
+    def enter(self) -> str:
+        'Return current content while also clearing and then redrawing.'
+        to_return = self._input_buffer[:]
+        if to_return:
+            self._history += [to_return]
+            self._reset_buffer('')
+            self.draw()
+        return to_return
+
+
+class _ConnectionPromptWidget(_PromptWidget):
+    'PromptWidget with attributes, methods for dealing with an IrcConnection.'
+    _nickname: str = ''
+
+    def update_prompt(self,
+                      nick_confirmed=False,
+                      nick: Optional[str] = None
+                      ) -> None:
+        'Update nickname-relevant knowledge to go into prompt string.'
+        self._prompt = ''
+        if nick:
+            self._nickname = nick
+        if self._nickname:
+            self._prompt += ' ' if nick_confirmed else '?'
+            self._prompt += self._nickname
+        self._prompt += _PROMPT_TEMPLATE
+
+
+class _Window(_Widget):
+    'Collects a log and a prompt meant for the same content stream.'
+    _y_status: int
+    prompt: _PromptWidget
+
+    def __init__(self, idx: int, term: 'Terminal') -> None:
+        self.idx = idx
+        self._term = term
+        self.log = _LogWidget(self._term.wrap, self._term.write)
+        self.prompt = self.__annotations__['prompt'](self._term.write)
+        if hasattr(self._term, 'size'):
+            self.set_geometry()
+
+    def set_geometry(self, _=None) -> None:
+        assert _ is None
+        self._y_status = self._term.size.y - 2
+        self.log.set_geometry(_YX(self._y_status, self._term.size.x))
+        self.prompt.set_geometry(_YX(self._term.size.y - 1, self._term.size.x))
+
+    def draw(self) -> None:
+        idx_box = f'[{self.idx}]'
+        status_line = idx_box + '=' * (self._term.size.x - len(idx_box))
+        self._term.clear()
+        self.log.draw()
+        self._term.write(status_line, self._y_status)
+        self.prompt.draw()
+
+    def cmd__paste(self) -> None:
+        'Write OSC 52 ? sequence to get encoded clipboard paste into stdin.'
+        self._term.write(f'\033{_OSC52_PREFIX}?{_PASTE_DELIMITER}',
+                         self._y_status)
+        self.draw()
+
+
+class _ConnectionWindow(_Window):
+    'Window with attributes and methods for dealing with an IrcConnection.'
+    prompt: _ConnectionPromptWidget
+
+    def __init__(self,
+                 broadcast: Callable[[EventType, Any], None],
+                 conn_idx: int,
+                 *args, **kwargs
+                 ) -> None:
+        self._broadcast = broadcast
+        self._conn_idx = conn_idx
+        super().__init__(*args, **kwargs)
+
+    def cmd__disconnect(self, quit_msg: str = 'ircplom says bye') -> None:
+        'Send QUIT command to server.'
+        self._broadcast(EventType.SEND,
+                        (self._conn_idx, IrcMessage('QUIT', (quit_msg, ))))
+
+    def cmd__reconnect(self) -> None:
+        'Attempt reconnection.'
+        self._broadcast(EventType.INIT_RECONNECT, (self._conn_idx,))
+
+
+class _KeyboardLoop(Loop):
+    'Loop receiving and translating keyboard events towards main loop.'
+
+    def process_bonus(self, yielded: str) -> None:
+        if yielded.startswith(_B64_PREFIX):
+            encoded = yielded[len(_B64_PREFIX):]
+            to_paste = ''
+            for i, c in enumerate(b64decode(encoded).decode('utf-8')):
+                if i > 512:
+                    break
+                if c.isprintable():
+                    to_paste += c
+                elif c.isspace():
+                    to_paste += ' '
+                else:
+                    to_paste += '#'
+            self.broadcast(EventType.PROMPT_ADD, to_paste)
+        elif yielded in _KEYBINDINGS:
+            self.broadcast(EventType.KEYBINDING, _KEYBINDINGS[yielded])
+        elif len(yielded) == 1:
+            self.broadcast(EventType.PROMPT_ADD, yielded)
+        else:
+            self.broadcast(EventType.ALERT,
+                           f'unknown keyboard input: {yielded}')
+
+
+class _TuiLoop(Loop):
+    'Loop for drawing/updating TUI.'
+
+    def __init__(self, term: 'Terminal', *args, **kwargs) -> None:
+        self._term = term
+        self._windows = [_Window(0, self._term)]
+        self._window_idx = 0
+        self._conn_windows: list[_ConnectionWindow] = []
+        super().__init__(*args, **kwargs)
+        self.put(Event(EventType.SET_SCREEN))
+
+    def _cmd_name_to_cmd(self, cmd_name: str) -> Optional[Callable]:
+        cmd_name = CMD_SHORTCUTS.get(cmd_name, cmd_name)
+        cmd_parent = self
+        while True:
+            cmd_name_toks = cmd_name.split('.', maxsplit=1)
+            if len(cmd_name_toks) == 1:
+                break
+            if not hasattr(cmd_parent, cmd_name_toks[0]):
+                return None
+            cmd_parent = getattr(cmd_parent, cmd_name_toks[0])
+            cmd_name = cmd_name_toks[1]
+        cmd_name = f'cmd__{cmd_name}'
+        if not hasattr(cmd_parent, cmd_name):
+            return None
+        return getattr(cmd_parent, cmd_name)
+
+    def process_main(self, event: Event) -> bool:
+        if not super().process_main(event):
+            return False
+        if event.type_ == EventType.SET_SCREEN:
+            self._term.calc_geometry()
+            for window in self._windows:
+                window.set_geometry()
+            self.window.draw()
+        elif event.type_ == EventType.CONN_WINDOW:
+            conn_win = _ConnectionWindow(broadcast=self.broadcast,
+                                         conn_idx=event.payload[0],
+                                         idx=len(self._windows),
+                                         term=self._term)
+            conn_win.prompt.update_prompt(nick_confirmed=False,
+                                          nick=event.payload[1])
+            self._windows += [conn_win]
+            self._conn_windows += [conn_win]
+            self._switch_window(conn_win.idx)
+        elif event.type_ == EventType.ALERT:
+            self.window.log.append(f'ALERT {event.payload}')
+            self.window.log.draw()
+        elif event.type_ in {EventType.RECV, EventType.SEND,
+                             EventType.CONN_ALERT}:
+            conn_win = self._conn_windows[event.payload[0]]
+            if event.type_ == EventType.CONN_ALERT:
+                msg = f'ALERT {event.payload[1]}'
+            else:
+                msg = (('<-' if event.type_ == EventType.RECV else '->')
+                       + event.payload[1].raw)
+            conn_win.log.append(msg)
+            if conn_win == self.window:
+                self.window.log.draw()
+        elif event.type_ == EventType.NICK_SET:
+            conn_win = self._conn_windows[event.payload[0]]
+            conn_win.prompt.update_prompt(nick_confirmed=True,
+                                          nick=event.payload[1])
+            if conn_win == self.window:
+                self.window.prompt.draw()
+        elif event.type_ == EventType.DISCONNECTED:
+            conn_win = self._conn_windows[event.payload[0]]
+            conn_win.prompt.update_prompt(nick_confirmed=False)
+            if conn_win == self.window:
+                self.window.prompt.draw()
+        elif event.type_ == EventType.KEYBINDING:
+            cmd = self._cmd_name_to_cmd(event.payload[0])
+            assert cmd is not None
+            cmd(*event.payload[1:])
+        elif event.type_ == EventType.PROMPT_ADD:
+            self.window.prompt.append(event.payload)
+        # elif event.type_ == EventType.DEBUG:
+        #     from traceback import format_exception
+        #     for line in '\n'.join(format_exception(event.payload)
+        #                           ).split('\n'):
+        #         self.window.log.append(f'DEBUG {line}')
+        #     self.window.log.draw()
+        else:
+            return True
+        self._term.flush()
+        return True
+
+    @property
+    def window(self) -> _Window:
+        'Currently selected Window.'
+        return self._windows[self._window_idx]
+
+    def _switch_window(self, idx: int) -> None:
+        self._window_idx = idx
+        self.window.draw()
+
+    def cmd__connect(self, hostname: str, nickname: str, realname: str
+                     ) -> None:
+        'Send INIT_CONNECT command to main loop.'
+        login = LoginNames(user=getusername(), nick=nickname, real=realname)
+        self.broadcast(EventType.INIT_CONNECT, (hostname, login))
+
+    def cmd__prompt_enter(self) -> None:
+        'Get prompt content from .window.prompt.enter, parse to & run command.'
+        to_parse = self.window.prompt.enter()
+        if not to_parse:
+            return
+        alert: Optional[str] = None
+        if to_parse[0:1] == '/':
+            toks = to_parse[1:].split(maxsplit=1)
+            alert = f'{toks[0]} unknown'
+            cmd = self._cmd_name_to_cmd(toks[0])
+            if cmd and cmd.__name__ != stack()[0].function:
+                params = signature(cmd).parameters
+                n_args_max = len(params)
+                n_args_min = len([p for p in params.values()
+                                  if p.default == inspect_empty])
+                alert = f'{cmd.__name__} needs between {n_args_min} and '\
+                        f'{n_args_max} args'
+                if len(toks) == 1 and not n_args_min:
+                    alert = cmd()
+                elif len(toks) > 1 and params\
+                        and n_args_min <= len(toks[1].split()):
+                    args = []
+                    while len(toks) > 1 and n_args_max:
+                        toks = toks[1].split(maxsplit=1)
+                        args += [toks[0]]
+                        n_args_max -= 1
+                    alert = cmd(*args)
+        else:
+            alert = 'not prefixed by /'
+        if alert:
+            self.broadcast(EventType.ALERT, f'invalid prompt command: {alert}')
+
+    def cmd__quit(self) -> None:
+        'Send QUIT to all threads.'
+        self.broadcast(EventType.QUIT)
+
+    def cmd__window(self, towards: str) -> Optional[str]:
+        'Switch window selection.'
+        n_windows = len(self._windows)
+        if n_windows < 2:
+            return 'no alternate window to move into'
+        if towards in {'left', 'right'}:
+            multiplier = (+1) if towards == 'right' else (-1)
+            window_idx = self._window_idx + multiplier
+            if not 0 <= window_idx < n_windows:
+                window_idx -= multiplier * n_windows
+        elif not towards.isdigit():
+            return f'neither "left"/"right" nor integer: {towards}'
+        else:
+            window_idx = int(towards)
+            if not 0 <= window_idx < n_windows:
+                return f'unavailable window idx: {window_idx}'
+        self._switch_window(window_idx)
+        return None
+
+
+class Terminal:
+    'Abstraction of terminal interface.'
+    size: _YX
+    tui: _TuiLoop
+    _blessed: BlessedTerminal
+    _cursor_yx_: _YX
+
+    @contextmanager
+    def context(self, q_to_main: EventQueue) -> Generator:
+        'Combine multiple contexts into one.'
+        signal(SIGWINCH, lambda *_: q_to_main.eput(EventType.SET_SCREEN))
+        self._blessed = BlessedTerminal()
+        with (self._blessed.raw(),
+              self._blessed.fullscreen(),
+              self._blessed.hidden_cursor(),
+              _KeyboardLoop(q_to_main, self.get_keypresses())):
+            self._cursor_yx = _YX(0, 0)
+            with _TuiLoop(self, q_to_main) as self.tui:
+                yield self
+
+    @property
+    def _cursor_yx(self) -> _YX:
+        return self._cursor_yx_
+
+    @_cursor_yx.setter
+    def _cursor_yx(self, yx: _YX) -> None:
+        print(self._blessed.move_yx(yx.y, yx.x), end='')
+        self._cursor_yx_ = yx
+
+    def calc_geometry(self) -> None:
+        '(Re-)calculate .size..'
+        self.size = _YX(self._blessed.height, self._blessed.width)
+
+    def clear(self) -> None:
+        'Clear terminal.'
+        print(self._blessed.clear, end='')
+
+    def flush(self) -> None:
+        'Flush terminal.'
+        print('', end='', flush=True)
+
+    def wrap(self, line: str) -> list[str]:
+        'Wrap line to list of lines fitting into terminal width.'
+        return self._blessed.wrap(line, width=self.size.x,
+                                  subsequent_indent=' '*4)
+
+    def write(self,
+              msg: str = '',
+              start_y: Optional[int] = None,
+              attribute: Optional[str] = None,
+              padding: bool = True
+              ) -> None:
+        'Print to terminal, with position, padding to line end, attributes.'
+        if start_y:
+            self._cursor_yx = _YX(start_y, 0)
+        # ._blessed.length can slow down things notably: only use where needed!
+        end_x = self._cursor_yx.x + (len(msg) if msg.isascii()
+                                     else self._blessed.length(msg))
+        len_padding = self.size.x - end_x
+        if len_padding < 0:
+            msg = self._blessed.truncate(msg, self.size.x - self._cursor_yx.x)
+        elif padding:
+            msg += ' ' * len_padding
+            end_x = self.size.x
+        if attribute:
+            msg = getattr(self._blessed, attribute)(msg)
+        print(msg, end='')
+        self._cursor_yx = _YX(self._cursor_yx.y, end_x)
+
+    def get_keypresses(self) -> Iterator[str]:
+        '''Loop through keypresses from terminal, collect what blessed ignores.
+
+        (Notably, blessed seems to junk any alt/escape-modifide key events it
+        does not explicitly know.
+        '''
+        n_gotchs_unprocessed = 0
+        while True:
+            new_gotchs = []
+            if not n_gotchs_unprocessed:
+                while self._blessed.kbhit(TIMEOUT_LOOP):
+                    gotch = self._blessed.getch()
+                    self._blessed.ungetch(gotch)
+                    new_gotchs += [gotch]
+                    if not self._blessed.kbhit(0):
+                        break
+                n_gotchs_unprocessed += len(new_gotchs)
+            blessed_key = self._blessed.inkey(timeout=0, esc_delay=0)
+            n_chs_blessed_key = len(blessed_key.encode('utf-8'))
+            unhandleds = []
+            if len(new_gotchs) > 1 and blessed_key.name == 'KEY_ESCAPE':
+                for _ in range(len(new_gotchs) - n_chs_blessed_key):
+                    unhandled = self._blessed.inkey(timeout=0, esc_delay=0)
+                    unhandleds += list(unhandled.encode('utf-8'))
+                    n_gotchs_unprocessed -= 1
+            n_gotchs_unprocessed -= n_chs_blessed_key
+            if unhandleds:
+                fused = ''.join([chr(n) for n in unhandleds])
+                if fused.startswith(_OSC52_PREFIX):
+                    if not (encoded := fused[len(_OSC52_PREFIX):]):
+                        while True:
+                            gotch = self._blessed.getch()
+                            if gotch == _PASTE_DELIMITER:
+                                break
+                            encoded += gotch
+                    yield f'{_B64_PREFIX}{encoded}'
+                    continue
+                yield str(unhandleds)
+            elif blessed_key.name:
+                yield blessed_key.name
+            else:
+                yield str(blessed_key)