#!/usr/bin/env python3
'Attempt at an IRC client.'
-
-from abc import ABC, abstractmethod
-from base64 import b64decode
-from contextlib import contextmanager
-from enum import Enum, auto
-from getpass import getuser as getusername
-from inspect import _empty as inspect_empty, signature, stack
-from queue import SimpleQueue, Empty as QueueEmpty
-from signal import SIGWINCH, signal
-from socket import socket, gaierror as socket_gaierror
-from threading import Thread
-from typing import (
- Any, Callable, Generator, Iterator, Literal, NamedTuple, Optional, Self)
-
-from blessed import Terminal as BlessedTerminal
-
-
-TIMEOUT_CONNECT = 5
-TIMEOUT_LOOP = 0.1
-CONN_RECV_BUFSIZE = 1024
-
-KEYBINDINGS = {
- 'KEY_BACKSPACE': ('window.prompt.backspace',),
- 'KEY_ENTER': ('prompt_enter',),
- 'KEY_LEFT': ('window.prompt.move_cursor', 'left'),
- 'KEY_RIGHT': ('window.prompt.move_cursor', 'right'),
- 'KEY_UP': ('window.prompt.scroll', 'up'),
- 'KEY_DOWN': ('window.prompt.scroll', 'down'),
- 'KEY_PGUP': ('window.log.scroll', 'up'),
- 'KEY_PGDOWN': ('window.log.scroll', 'down'),
- '[91, 49, 59, 51, 68]': ('window', 'left'),
- '[91, 49, 59, 51, 67]': ('window', 'right'),
- 'KEY_F1': ('window.paste',),
-}
-CMD_SHORTCUTS = {
- 'disconnect': 'window.disconnect',
- 'reconnect': 'window.reconnect'
-}
-
-B64_PREFIX = 'b64:'
-OSC52_PREFIX = ']52;c;'
-PASTE_DELIMITER = '\007'
-
-PROMPT_TEMPLATE = '> '
-PROMPT_ELL_IN = '<…'
-PROMPT_ELL_OUT = '…>'
-
-PORT = 6667
-IRCSPEC_LINE_SEPARATOR = b'\r\n'
-IRCSPEC_TAG_ESCAPES = ((r'\:', ';'),
- (r'\s', ' '),
- (r'\n', '\n'),
- (r'\r', '\r'),
- (r'\\', '\\'))
-
-
-class EventType(Enum):
- 'Differentiate Events for different treatment.'
- ALERT = auto()
- CONN_ALERT = auto()
- CONNECTED = auto()
- CONN_WINDOW = auto()
- DISCONNECTED = auto()
- EXCEPTION = auto()
- INIT_CONNECT = auto()
- INIT_RECONNECT = auto()
- KEYBINDING = auto()
- NICK_SET = auto()
- PING = auto()
- PROMPT_ADD = auto()
- QUIT = auto()
- RECV = auto()
- SEND = auto()
- SET_SCREEN = auto()
-
-
-class Event(NamedTuple):
- 'Communication unit between threads.'
- type_: EventType
- payload: Any = None
-
-
-class EventQueue(SimpleQueue):
- 'SimpleQueue wrapper optimized for handling Events.'
-
- def eput(self, type_: EventType, payload: Any = None) -> None:
- 'Construct Event(type_, payload) and .put it onto queue.'
- self.put(Event(type_, payload))
-
-
-class YX(NamedTuple):
- '2-dimensional coordinate.'
- y: int
- x: int
-
-
-# def log(msg):
-# from datetime import datetime
-# with open('log.txt', 'a') as f:
-# f.write(f'{datetime.now()} {msg}\n')
-
-
-class Terminal:
- 'Abstraction of terminal interface.'
- size: YX
- tui: 'TuiLoop'
- _blessed: BlessedTerminal
- _cursor_yx_: YX
-
- @contextmanager
- def context(self, q_to_main: EventQueue) -> Generator:
- 'Combine multiple contexts into one.'
- signal(SIGWINCH, lambda *_: q_to_main.eput(EventType.SET_SCREEN))
- self._blessed = BlessedTerminal()
- with (self._blessed.raw(),
- self._blessed.fullscreen(),
- self._blessed.hidden_cursor(),
- KeyboardLoop(q_to_main, self.get_keypresses())):
- self._cursor_yx = YX(0, 0)
- with TuiLoop(self, q_to_main) as self.tui:
- yield self
-
- @property
- def _cursor_yx(self) -> YX:
- return self._cursor_yx_
-
- @_cursor_yx.setter
- def _cursor_yx(self, yx: YX) -> None:
- print(self._blessed.move_yx(yx.y, yx.x), end='')
- self._cursor_yx_ = yx
-
- def calc_geometry(self) -> None:
- '(Re-)calculate .size..'
- self.size = YX(self._blessed.height, self._blessed.width)
-
- def clear(self) -> None:
- 'Clear terminal.'
- print(self._blessed.clear, end='')
-
- def flush(self) -> None:
- 'Flush terminal.'
- print('', end='', flush=True)
-
- def wrap(self, line: str) -> list[str]:
- 'Wrap line to list of lines fitting into terminal width.'
- return self._blessed.wrap(line, width=self.size.x,
- subsequent_indent=' '*4)
-
- def write(self,
- msg: str = '',
- start_y: Optional[int] = None,
- attribute: Optional[str] = None,
- padding: bool = True
- ) -> None:
- 'Print to terminal, with position, padding to line end, attributes.'
- if start_y:
- self._cursor_yx = YX(start_y, 0)
- # ._blessed.length can slow down things notably: only use where needed!
- end_x = self._cursor_yx.x + (len(msg) if msg.isascii()
- else self._blessed.length(msg))
- len_padding = self.size.x - end_x
- if len_padding < 0:
- msg = self._blessed.truncate(msg, self.size.x - self._cursor_yx.x)
- elif padding:
- msg += ' ' * len_padding
- end_x = self.size.x
- if attribute:
- msg = getattr(self._blessed, attribute)(msg)
- print(msg, end='')
- self._cursor_yx = YX(self._cursor_yx.y, end_x)
-
- def get_keypresses(self) -> Iterator[str]:
- '''Loop through keypresses from terminal, collect what blessed ignores.
-
- (Notably, blessed seems to junk any alt/escape-modifide key events it
- does not explicitly know.
- '''
- n_gotchs_unprocessed = 0
- while True:
- new_gotchs = []
- if not n_gotchs_unprocessed:
- while self._blessed.kbhit(TIMEOUT_LOOP):
- gotch = self._blessed.getch()
- self._blessed.ungetch(gotch)
- new_gotchs += [gotch]
- if not self._blessed.kbhit(0):
- break
- n_gotchs_unprocessed += len(new_gotchs)
- blessed_key = self._blessed.inkey(timeout=0, esc_delay=0)
- n_chs_blessed_key = len(blessed_key.encode('utf-8'))
- unhandleds = []
- if len(new_gotchs) > 1 and blessed_key.name == 'KEY_ESCAPE':
- for _ in range(len(new_gotchs) - n_chs_blessed_key):
- unhandled = self._blessed.inkey(timeout=0, esc_delay=0)
- unhandleds += list(unhandled.encode('utf-8'))
- n_gotchs_unprocessed -= 1
- n_gotchs_unprocessed -= n_chs_blessed_key
- if unhandleds:
- fused = ''.join([chr(n) for n in unhandleds])
- if fused.startswith(OSC52_PREFIX):
- if not (encoded := fused[len(OSC52_PREFIX):]):
- while True:
- gotch = self._blessed.getch()
- if gotch == PASTE_DELIMITER:
- break
- encoded += gotch
- yield f'{B64_PREFIX}{encoded}'
- continue
- yield str(unhandleds)
- elif blessed_key.name:
- yield blessed_key.name
- else:
- yield str(blessed_key)
-
-
-class LoginNames(NamedTuple):
- 'Collects the names needed on server connect for USER, NICK commands.'
- user: str
- nick: str
- real: str
-
-
-class IrcConnection:
- 'Abstracts socket connection, loop over it, and handling messages from it.'
-
- def __init__(self,
- q_to_main: EventQueue,
- idx: int,
- hostname: str,
- login: LoginNames,
- ) -> None:
- self._idx = idx
- self._q_to_main = q_to_main
- self._hostname = hostname
- self._login = login
- self._socket: Optional[socket] = None
- self._assumed_open = False
- self._loop: Optional[ConnectionLoop] = None
- self._broadcast(EventType.CONN_WINDOW, self._login.nick)
- self._start_connecting()
-
- def _start_connecting(self) -> None:
-
- def connect(self) -> None:
- self._socket = socket()
- self._broadcast(EventType.CONN_ALERT,
- f'Connecting to {self._hostname} …')
- self._socket.settimeout(TIMEOUT_CONNECT)
- try:
- self._socket.connect((self._hostname, PORT))
- except (TimeoutError, socket_gaierror) as e:
- self._broadcast(EventType.CONN_ALERT, str(e))
- return
- self._socket.settimeout(TIMEOUT_LOOP)
- self._assumed_open = True
- self._loop = ConnectionLoop(self._idx, self._q_to_main,
- self._read_lines())
- self._broadcast(EventType.CONNECTED, self._login)
-
- Thread(target=connect, daemon=True, args=(self,)).start()
-
- def close(self):
- 'Close both ConnectionLoop and socket.'
- self._assumed_open = False
- if self._loop:
- self._loop.stop()
- self._loop = None
- if self._socket:
- self._socket.close()
- self._socket = None
-
- def _broadcast(self, type_: EventType, payload: Any = None) -> None:
- 'Send event to main loop via queue, with connection index as 1st arg.'
- self._q_to_main.eput(type_, (self._idx, payload))
-
- def _read_lines(self) -> Iterator[Optional[str]]:
- 'Receive line-separator-delimited messages from socket.'
- assert self._socket is not None
- bytes_total = b''
- buffer_linesep = b''
- while True:
- try:
- bytes_new = self._socket.recv(CONN_RECV_BUFSIZE)
- except TimeoutError:
- yield None
- continue
- except OSError as e:
- if e.errno == 9:
- break
- raise e
- if not bytes_new:
- break
- for c in bytes_new:
- c_byted = c.to_bytes()
- if c not in IRCSPEC_LINE_SEPARATOR:
- bytes_total += c_byted
- buffer_linesep = b''
- elif c == IRCSPEC_LINE_SEPARATOR[0]:
- buffer_linesep = c_byted
- else:
- buffer_linesep += c_byted
- if buffer_linesep == IRCSPEC_LINE_SEPARATOR:
- buffer_linesep = b''
- yield bytes_total.decode('utf-8')
- bytes_total = b''
-
- def _write_line(self, line: str) -> None:
- 'Send line-separator-delimited message over socket.'
- if not (self._socket and self._assumed_open):
- self._broadcast(EventType.CONN_ALERT,
- 'cannot send, assuming connection closed')
- return
- self._socket.sendall(line.encode('utf-8') + IRCSPEC_LINE_SEPARATOR)
-
- def handle(self, event: Event) -> None:
- 'Process connection-directed Event into further steps.'
- if event.type_ == EventType.INIT_RECONNECT:
- if self._assumed_open:
- self._broadcast(EventType.CONN_ALERT,
- 'Reconnect called, but still seem connected, '
- 'so nothing to do.')
- else:
- self._start_connecting()
- elif event.type_ == EventType.CONNECTED:
- assert self._loop is not None
- self._loop.put(event)
- elif event.type_ == EventType.DISCONNECTED:
- self.close()
- elif event.type_ == EventType.SEND:
- msg: IrcMessage = event.payload[1]
- self._write_line(msg.raw)
-
-
-class IrcMessage:
- 'Properly structured representation of IRC message as per IRCv3 spec.'
- _raw: Optional[str] = None
-
- def __init__(self,
- verb: str,
- parameters: Optional[tuple[str, ...]] = None,
- source: str = '',
- tags: Optional[dict[str, str]] = None
- ) -> None:
- self.verb: str = verb
- self.parameters: tuple[str, ...] = parameters or tuple()
- self.source: str = source
- self.tags: dict[str, str] = tags or {}
-
- def __str__(self) -> str:
- return f'[{self.tags}[{self.source}][{self.verb}]]][{self.parameters}]'
-
- @classmethod
- def from_raw(cls, raw_msg: str) -> Self:
- 'Parse raw IRC message line into properly structured IrcMessage.'
-
- class _Stage(NamedTuple):
- name: str
- prefix_char: Optional[str]
- processor: Callable = lambda s: s
-
- def _parse_tags(str_tags: str) -> dict[str, str]:
- tags = {}
- for str_tag in [s for s in str_tags.split(';') if s]:
- if '=' in str_tag:
- key, val = str_tag.split('=', maxsplit=1)
- for to_repl, repl_with in IRCSPEC_TAG_ESCAPES:
- val = val.replace(to_repl, repl_with)
- else:
- key, val = str_tag, ''
- tags[key] = val
- return tags
-
- def _split_params(str_params: str) -> tuple[str, ...]:
- params = []
- params_stage = 0 # 0: gap, 1: non-trailing, 2: trailing
- for char in str_params:
- if char == ' ' and params_stage < 2:
- params_stage = 0
- continue
- if params_stage == 0:
- params += ['']
- params_stage += 1
- if char == ':':
- params_stage += 1
- continue
- params[-1] += char
- return tuple(p for p in params)
-
- stages = [_Stage('tags', '@', _parse_tags),
- _Stage('source', ':'),
- _Stage('verb', None, lambda s: s.upper()),
- _Stage('parameters', None, _split_params)]
- harvest = {s.name: '' for s in stages}
- idx_stage = 0
- stage = None
- for char in raw_msg:
- if char == ' ' and idx_stage < (len(stages) - 1):
- if stage:
- stage = None
- continue
- if not stage:
- while not stage:
- idx_stage += 1
- tested = stages[idx_stage]
- if (not tested.prefix_char) or char == tested.prefix_char:
- stage = tested
- if stage.prefix_char:
- continue
- harvest[stage.name] += char
- msg = cls(**{s.name: s.processor(harvest[s.name]) for s in stages})
- msg._raw = raw_msg
- return msg
-
- @property
- def raw(self) -> str:
- 'Return raw message code – create from known fields if necessary.'
- if not self._raw:
- to_combine = []
- if self.tags:
- tag_strs = []
- for key, val in self.tags.items():
- tag_strs += [key]
- if not val:
- continue
- for repl_with, to_repl in reversed(IRCSPEC_TAG_ESCAPES):
- val = val.replace(to_repl, repl_with)
- tag_strs[-1] += f'={val}'
- to_combine += ['@' + ';'.join(tag_strs)]
- to_combine += [self.verb]
- if self.parameters:
- to_combine += self.parameters[:-1]
- to_combine += [f':{self.parameters[-1]}']
- self._raw = ' '.join(to_combine)
- return self._raw
-
-
-class Loop:
- 'Wraps thread looping over .eput input queue, potential bonus iterator.'
-
- def __init__(self,
- q_to_main: EventQueue,
- bonus_iterator: Optional[Iterator] = None
- ) -> None:
- self._q_to_main = q_to_main
- self._bonus_iterator = bonus_iterator
- self._q_input = EventQueue()
- self._thread = Thread(target=self._loop, daemon=False)
- self._thread.start()
-
- def stop(self) -> None:
- 'Emit "QUIT" signal to break threaded loop, then wait for break.'
- self._q_input.eput(EventType.QUIT)
- self._thread.join()
-
- def __enter__(self) -> Self:
- return self
-
- def __exit__(self, *_) -> Literal[False]:
- self.stop()
- return False # re-raise any exception that above ignored
-
- def put(self, event: Event) -> None:
- 'Send event into thread loop.'
- self._q_input.put(event)
-
- def broadcast(self, type_: EventType, payload: Any = None) -> None:
- 'Send event to main loop via queue.'
- self._q_to_main.eput(type_, payload)
-
- def process_main(self, event: Event) -> bool:
- 'Process event yielded from input queue.'
- if event.type_ == EventType.QUIT:
- return False
- return True
-
- def process_bonus(self, yielded: str) -> None:
- 'Process bonus iterator yield.'
-
- def _loop(self) -> None:
- 'Loop over input queue and, if provided, bonus iterator.'
- try:
- while True:
- try:
- yield_main = self._q_input.get(
- block=True,
- timeout=0 if self._bonus_iterator else None)
- except QueueEmpty:
- pass
- else:
- if self.process_main(yield_main) is False:
- break
- if self._bonus_iterator:
- try:
- yield_bonus = next(self._bonus_iterator)
- except StopIteration:
- break
- if yield_bonus:
- self.process_bonus(yield_bonus)
- except Exception as e: # pylint: disable=broad-exception-caught
- self._q_to_main.eput(EventType.EXCEPTION, e)
-
-
-class Widget(ABC):
- 'Defines most basic TUI object API.'
-
- @abstractmethod
- def set_geometry(self, measurements: YX) -> None:
- 'Update widget\'s measurements, re-generate content where necessary.'
-
- @abstractmethod
- def draw(self) -> None:
- 'Print widget\'s content in shape appropriate to set geometry.'
-
-
-class ScrollableWidget(Widget):
- 'Defines some API shared between PromptWidget and LogWidget.'
- _history_idx: int
-
- def __init__(self, write: Callable[..., None], *args, **kwargs) -> None:
- super().__init__(*args, **kwargs)
- self._write = write
- self._history: list[str] = []
-
- @abstractmethod
- def append(self, to_append: str) -> None:
- 'Append to widget content.'
-
- @abstractmethod
- def _scroll(self, up=True) -> None:
- pass
-
- def cmd__scroll(self, direction: str) -> None:
- 'Scroll through stored content/history.'
- self._scroll(up=direction == 'up')
- self.draw()
-
-
-class PromptWidget(ScrollableWidget):
- 'Keyboard-controlled command input field.'
- _y: int
- _width: int
- _prompt: str = PROMPT_TEMPLATE
- _history_idx = 0
- _input_buffer: str
- _cursor_x: int
-
- def __init__(self, *args, **kwargs) -> None:
- super().__init__(*args, **kwargs)
- self._reset_buffer('')
-
- def set_geometry(self, measurements: YX) -> None:
- self._y, self._width = measurements
-
- def append(self, to_append: str) -> None:
- self._cursor_x += len(to_append)
- self._input_buffer = (self._input_buffer[:self._cursor_x - 1]
- + to_append
- + self._input_buffer[self._cursor_x - 1:])
- self._history_idx = 0
- self.draw()
-
- def draw(self) -> None:
- prefix = self._prompt[:]
- content = self._input_buffer[:]
- if self._cursor_x == len(self._input_buffer):
- content += ' '
- half_width = (self._width - len(prefix)) // 2
- offset = 0
- if len(prefix) + len(content) > self._width\
- and self._cursor_x > half_width:
- prefix += PROMPT_ELL_IN
- offset = min(len(prefix) + len(content) - self._width,
- self._cursor_x - half_width + len(PROMPT_ELL_IN))
- cursor_x_to_write = len(prefix) + self._cursor_x - offset
- to_write = f'{prefix}{content[offset:]}'
- if len(to_write) > self._width:
- to_write = (to_write[:self._width - len(PROMPT_ELL_OUT)]
- + PROMPT_ELL_OUT)
- self._write(to_write[:cursor_x_to_write], self._y, padding=False)
- self._write(to_write[cursor_x_to_write], attribute='reverse',
- padding=False)
- self._write(to_write[cursor_x_to_write + 1:])
-
- def _scroll(self, up: bool = True) -> None:
- if up and -(self._history_idx) < len(self._history):
- if self._history_idx == 0 and self._input_buffer:
- self._history += [self._input_buffer[:]]
- self._reset_buffer('')
- self._history_idx -= 1
- self._history_idx -= 1
- self._reset_buffer(self._history[self._history_idx])
- elif not up:
- if self._history_idx < 0:
- self._history_idx += 1
- if self._history_idx == 0:
- self._reset_buffer('')
- else:
- self._reset_buffer(self._history[self._history_idx])
- elif self._input_buffer:
- self._history += [self._input_buffer[:]]
- self._reset_buffer('')
-
- def cmd__backspace(self) -> None:
- 'Truncate current content by one character, if possible.'
- if self._cursor_x > 0:
- self._cursor_x -= 1
- self._input_buffer = (self._input_buffer[:self._cursor_x]
- + self._input_buffer[self._cursor_x + 1:])
- self._history_idx = 0
- self.draw()
-
- def cmd__move_cursor(self, direction: str) -> None:
- 'Move cursor one space into direction ("left" or "right") if possible.'
- if direction == 'left' and self._cursor_x > 0:
- self._cursor_x -= 1
- elif direction == 'right'\
- and self._cursor_x <= len(self._input_buffer):
- self._cursor_x += 1
- else:
- return
- self.draw()
-
- def _reset_buffer(self, content: str) -> None:
- self._input_buffer = content
- self._cursor_x = len(self._input_buffer)
-
- def enter(self) -> str:
- 'Return current content while also clearing and then redrawing.'
- to_return = self._input_buffer[:]
- if to_return:
- self._history += [to_return]
- self._reset_buffer('')
- self.draw()
- return to_return
-
-
-class ConnectionPromptWidget(PromptWidget):
- 'PromptWidget with attributes, methods for dealing with an IrcConnection.'
- _nickname: str = ''
-
- def update_prompt(self,
- nick_confirmed=False,
- nick: Optional[str] = None
- ) -> None:
- 'Update nickname-relevant knowledge to go into prompt string.'
- self._prompt = ''
- if nick:
- self._nickname = nick
- if self._nickname:
- self._prompt += ' ' if nick_confirmed else '?'
- self._prompt += self._nickname
- self._prompt += PROMPT_TEMPLATE
-
-
-class LogWidget(ScrollableWidget):
- 'Collects line-shaped messages, scrolls and wraps them for display.'
- _view_size: YX
- _y_pgscroll: int
-
- def __init__(self, wrap: Callable[[str], list[str]], *args, **kwargs
- ) -> None:
- super().__init__(*args, **kwargs)
- self._wrap = wrap
- self._wrapped_idx = self._history_idx = -1
- self._wrapped: list[tuple[Optional[int], str]] = []
-
- def _add_wrapped(self, idx_original, line) -> int:
- wrapped_lines = self._wrap(line)
- self._wrapped += [(idx_original, line) for line in wrapped_lines]
- return len(wrapped_lines)
-
- def set_geometry(self, measurements: YX) -> None:
- self._view_size = measurements
- self._y_pgscroll = self._view_size.y // 2
- self._wrapped.clear()
- self._wrapped += [(None, '')] * self._view_size.y
- if not self._history:
- return
- for idx_history, line in enumerate(self._history):
- self._add_wrapped(idx_history, line)
- wrapped_lines_for_history_idx = [
- t for t in self._wrapped
- if t[0] == len(self._history) + self._history_idx]
- idx_their_last = self._wrapped.index(wrapped_lines_for_history_idx[-1])
- self._wrapped_idx = idx_their_last - len(self._wrapped)
-
- def append(self, to_append: str) -> None:
- self._history += [to_append]
- n_wrapped_lines = self._add_wrapped(len(self._history) - 1, to_append)
- if self._wrapped_idx < -1:
- self._history_idx -= 1
- self._wrapped_idx -= n_wrapped_lines
-
- def draw(self) -> None:
- start_idx = self._wrapped_idx - self._view_size.y + 1
- end_idx = self._wrapped_idx
- to_write = [t[1] for t in self._wrapped[start_idx:end_idx]]
- if self._wrapped_idx < -1:
- scroll_info = f'vvv [{(-1) * self._wrapped_idx}] '
- scroll_info += 'v' * (self._view_size.x - len(scroll_info))
- to_write += [scroll_info]
- else:
- to_write += [self._wrapped[self._wrapped_idx][1]]
- for i, line in enumerate(to_write):
- self._write(line, i)
-
- def _scroll(self, up: bool = True) -> None:
- if up:
- self._wrapped_idx = max(self._view_size.y + 1 - len(self._wrapped),
- self._wrapped_idx - self._y_pgscroll)
- else:
- self._wrapped_idx = min(-1,
- self._wrapped_idx + self._y_pgscroll)
- history_idx_to_wrapped_idx = self._wrapped[self._wrapped_idx][0]
- if history_idx_to_wrapped_idx is not None:
- self._history_idx = history_idx_to_wrapped_idx - len(self._history)
-
-
-class Window(Widget):
- 'Collects a log and a prompt meant for the same content stream.'
- _y_status: int
- prompt: PromptWidget
-
- def __init__(self, idx: int, term: Terminal) -> None:
- self.idx = idx
- self._term = term
- self.log = LogWidget(self._term.wrap, self._term.write)
- self.prompt = self.__annotations__['prompt'](self._term.write)
- if hasattr(self._term, 'size'):
- self.set_geometry()
-
- def set_geometry(self, _=None) -> None:
- assert _ is None
- self._y_status = self._term.size.y - 2
- self.log.set_geometry(YX(self._y_status, self._term.size.x))
- self.prompt.set_geometry(YX(self._term.size.y - 1, self._term.size.x))
-
- def draw(self) -> None:
- idx_box = f'[{self.idx}]'
- status_line = idx_box + '=' * (self._term.size.x - len(idx_box))
- self._term.clear()
- self.log.draw()
- self._term.write(status_line, self._y_status)
- self.prompt.draw()
-
- def cmd__paste(self) -> None:
- 'Write OSC 52 ? sequence to get encoded clipboard paste into stdin.'
- self._term.write(f'\033{OSC52_PREFIX}?{PASTE_DELIMITER}',
- self._y_status)
- self.draw()
-
-
-class ConnectionWindow(Window):
- 'Window with attributes and methods for dealing with an IrcConnection.'
- prompt: ConnectionPromptWidget
-
- def __init__(self,
- broadcast: Callable[[EventType, Any], None],
- conn_idx: int,
- *args, **kwargs
- ) -> None:
- self._broadcast = broadcast
- self._conn_idx = conn_idx
- super().__init__(*args, **kwargs)
-
- def cmd__disconnect(self, quit_msg: str = 'ircplom says bye') -> None:
- 'Send QUIT command to server.'
- self._broadcast(EventType.SEND,
- (self._conn_idx, IrcMessage('QUIT', (quit_msg, ))))
-
- def cmd__reconnect(self) -> None:
- 'Attempt reconnection.'
- self._broadcast(EventType.INIT_RECONNECT, (self._conn_idx,))
-
-
-class TuiLoop(Loop):
- 'Loop for drawing/updating TUI.'
-
- def __init__(self, term: Terminal, *args, **kwargs) -> None:
- self._term = term
- self._windows = [Window(0, self._term)]
- self._window_idx = 0
- self._conn_windows: list[ConnectionWindow] = []
- super().__init__(*args, **kwargs)
- self.put(Event(EventType.SET_SCREEN))
-
- def _cmd_name_to_cmd(self, cmd_name: str) -> Optional[Callable]:
- cmd_name = CMD_SHORTCUTS.get(cmd_name, cmd_name)
- cmd_parent = self
- while True:
- cmd_name_toks = cmd_name.split('.', maxsplit=1)
- if len(cmd_name_toks) == 1:
- break
- if not hasattr(cmd_parent, cmd_name_toks[0]):
- return None
- cmd_parent = getattr(cmd_parent, cmd_name_toks[0])
- cmd_name = cmd_name_toks[1]
- cmd_name = f'cmd__{cmd_name}'
- if not hasattr(cmd_parent, cmd_name):
- return None
- return getattr(cmd_parent, cmd_name)
-
- def process_main(self, event: Event) -> bool:
- if not super().process_main(event):
- return False
- if event.type_ == EventType.SET_SCREEN:
- self._term.calc_geometry()
- for window in self._windows:
- window.set_geometry()
- self.window.draw()
- elif event.type_ == EventType.CONN_WINDOW:
- conn_win = ConnectionWindow(broadcast=self.broadcast,
- conn_idx=event.payload[0],
- idx=len(self._windows),
- term=self._term)
- conn_win.prompt.update_prompt(nick_confirmed=False,
- nick=event.payload[1])
- self._windows += [conn_win]
- self._conn_windows += [conn_win]
- self._switch_window(conn_win.idx)
- elif event.type_ == EventType.ALERT:
- self.window.log.append(f'ALERT {event.payload}')
- self.window.log.draw()
- elif event.type_ in {EventType.RECV, EventType.SEND,
- EventType.CONN_ALERT}:
- conn_win = self._conn_windows[event.payload[0]]
- if event.type_ == EventType.CONN_ALERT:
- msg = f'ALERT {event.payload[1]}'
- else:
- msg = (('<-' if event.type_ == EventType.RECV else '->')
- + event.payload[1].raw)
- conn_win.log.append(msg)
- if conn_win == self.window:
- self.window.log.draw()
- elif event.type_ == EventType.NICK_SET:
- conn_win = self._conn_windows[event.payload[0]]
- conn_win.prompt.update_prompt(nick_confirmed=True,
- nick=event.payload[1])
- if conn_win == self.window:
- self.window.prompt.draw()
- elif event.type_ == EventType.DISCONNECTED:
- conn_win = self._conn_windows[event.payload[0]]
- conn_win.prompt.update_prompt(nick_confirmed=False)
- if conn_win == self.window:
- self.window.prompt.draw()
- elif event.type_ == EventType.KEYBINDING:
- cmd = self._cmd_name_to_cmd(event.payload[0])
- assert cmd is not None
- cmd(*event.payload[1:])
- elif event.type_ == EventType.PROMPT_ADD:
- self.window.prompt.append(event.payload)
- # elif event.type_ == EventType.DEBUG:
- # from traceback import format_exception
- # for line in '\n'.join(format_exception(event.payload)
- # ).split('\n'):
- # self.window.log.append(f'DEBUG {line}')
- # self.window.log.draw()
- else:
- return True
- self._term.flush()
- return True
-
- @property
- def window(self) -> Window:
- 'Currently selected Window.'
- return self._windows[self._window_idx]
-
- def _switch_window(self, idx: int) -> None:
- self._window_idx = idx
- self.window.draw()
-
- def cmd__connect(self, hostname: str, nickname: str, realname: str
- ) -> None:
- 'Send INIT_CONNECT command to main loop.'
- login = LoginNames(user=getusername(), nick=nickname, real=realname)
- self.broadcast(EventType.INIT_CONNECT, (hostname, login))
-
- def cmd__prompt_enter(self) -> None:
- 'Get prompt content from .window.prompt.enter, parse to & run command.'
- to_parse = self.window.prompt.enter()
- if not to_parse:
- return
- alert: Optional[str] = None
- if to_parse[0:1] == '/':
- toks = to_parse[1:].split(maxsplit=1)
- alert = f'{toks[0]} unknown'
- cmd = self._cmd_name_to_cmd(toks[0])
- if cmd and cmd.__name__ != stack()[0].function:
- params = signature(cmd).parameters
- n_args_max = len(params)
- n_args_min = len([p for p in params.values()
- if p.default == inspect_empty])
- alert = f'{cmd.__name__} needs between {n_args_min} and '\
- f'{n_args_max} args'
- if len(toks) == 1 and not n_args_min:
- alert = cmd()
- elif len(toks) > 1 and params\
- and n_args_min <= len(toks[1].split()):
- args = []
- while len(toks) > 1 and n_args_max:
- toks = toks[1].split(maxsplit=1)
- args += [toks[0]]
- n_args_max -= 1
- alert = cmd(*args)
- else:
- alert = 'not prefixed by /'
- if alert:
- self.broadcast(EventType.ALERT, f'invalid prompt command: {alert}')
-
- def cmd__quit(self) -> None:
- 'Send QUIT to all threads.'
- self.broadcast(EventType.QUIT)
-
- def cmd__window(self, towards: str) -> Optional[str]:
- 'Switch window selection.'
- n_windows = len(self._windows)
- if n_windows < 2:
- return 'no alternate window to move into'
- if towards in {'left', 'right'}:
- multiplier = (+1) if towards == 'right' else (-1)
- window_idx = self._window_idx + multiplier
- if not 0 <= window_idx < n_windows:
- window_idx -= multiplier * n_windows
- elif not towards.isdigit():
- return f'neither "left"/"right" nor integer: {towards}'
- else:
- window_idx = int(towards)
- if not 0 <= window_idx < n_windows:
- return f'unavailable window idx: {window_idx}'
- self._switch_window(window_idx)
- return None
-
-
-class ConnectionLoop(Loop):
- 'Loop receiving and translating socket messages towards main loop.'
-
- def __init__(self, connection_idx: int, *args, **kwargs) -> None:
- self._conn_idx = connection_idx
- super().__init__(*args, **kwargs)
-
- def _broadcast_conn(self, type_: EventType, *args) -> None:
- self.broadcast(type_, (self._conn_idx, *args))
-
- def _send(self, verb: str, parameters: tuple[str, ...]) -> None:
- self._broadcast_conn(EventType.SEND, IrcMessage(verb, parameters))
-
- def process_main(self, event: Event) -> bool:
- if not super().process_main(event):
- return False
- if event.type_ == EventType.CONNECTED:
- login = event.payload[1]
- self._send('USER', (login.user, '0', '*', login.real))
- self._send('NICK', (login.nick,))
- return True
-
- def process_bonus(self, yielded: str) -> None:
- msg = IrcMessage.from_raw(yielded)
- self._broadcast_conn(EventType.RECV, msg)
- if msg.verb == 'PING':
- self._send('PONG', (msg.parameters[0],))
- elif msg.verb == 'ERROR'\
- and msg.parameters[0].startswith('Closing link:'):
- self._broadcast_conn(EventType.DISCONNECTED)
- elif msg.verb == '001':
- self._broadcast_conn(EventType.NICK_SET, msg.parameters[0])
-
-
-class KeyboardLoop(Loop):
- 'Loop receiving and translating keyboard events towards main loop.'
-
- def process_bonus(self, yielded: str) -> None:
- if yielded.startswith(B64_PREFIX):
- encoded = yielded[len(B64_PREFIX):]
- to_paste = ''
- for i, c in enumerate(b64decode(encoded).decode('utf-8')):
- if i > 512:
- break
- if c.isprintable():
- to_paste += c
- elif c.isspace():
- to_paste += ' '
- else:
- to_paste += '#'
- self.broadcast(EventType.PROMPT_ADD, to_paste)
- elif yielded in KEYBINDINGS:
- self.broadcast(EventType.KEYBINDING, KEYBINDINGS[yielded])
- elif len(yielded) == 1:
- self.broadcast(EventType.PROMPT_ADD, yielded)
- else:
- self.broadcast(EventType.ALERT,
- f'unknown keyboard input: {yielded}')
+from ircplom.events import EventQueue, EventType
+from ircplom.irc_conn import IrcConnection
+from ircplom.tui import Terminal
def run() -> None: