From c86a742f015d09b24883394b6d67f6bf0451b7c2 Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Thu, 29 May 2025 07:25:36 +0200 Subject: [PATCH] Initial commit. --- ircplom.py | 418 +++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 1 + 2 files changed, 419 insertions(+) create mode 100755 ircplom.py create mode 100644 requirements.txt diff --git a/ircplom.py b/ircplom.py new file mode 100755 index 0000000..6982da2 --- /dev/null +++ b/ircplom.py @@ -0,0 +1,418 @@ +#!/usr/bin/env python3 +'Attempt at an IRC client.' + +from contextlib import contextmanager +from queue import SimpleQueue, Empty as QueueEmpty +from signal import SIGWINCH, signal +from socket import socket +from threading import Thread +from typing import ( + Callable, Generator, Iterator, Literal, NamedTuple, Optional, Self) + +from blessed import Terminal as BlessedTerminal + + +HOST = 'irc.freenode.net' +PORT = 6667 +USERNAME = 'foo' +NICKNAME = 'bar' +REALNAME = 'debug debugger' +TIMEOUT_FOR_QUIT = 1.0 +INPUT_PROMPT = ':' + +IRCSPEC_LINE_SEPARATOR = b'\r\n' +IRCSPEC_TAG_ESCAPES = ((r'\:', ';'), + (r'\s', ' '), + (r'\n', '\n'), + (r'\r', '\r'), + (r'\\', '\\')) + + +class Event(NamedTuple): + 'Communication unit between threads.' + type_: str + args: tuple = tuple() + + +class YX(NamedTuple): + '2-dimensional coordinate.' + y: int + x: int + + +# def log(msg): +# from datetime import datetime +# with open('log.txt', 'a') as f: +# f.write(f'{datetime.now()} {msg}\n') + + +class Terminal: + 'Abstraction of terminal interface.' + size: YX + tui: 'TuiLoop' + _blessed: BlessedTerminal + + @contextmanager + def context(self, q_to_main: SimpleQueue) -> Generator: + 'Combine multiple contexts into one.' + self._blessed = BlessedTerminal() + with (self._blessed.raw(), + self._blessed.fullscreen(), + self._blessed.hidden_cursor(), + KeyboardLoop(q_to_main, self.get_keypresses())): + with TuiLoop(self, q_to_main) as tui: + self.tui = tui + signal(SIGWINCH, lambda *_: self.tui.put(Event('SIGWINCH'))) + yield self + + def calc_geometry(self) -> None: + '(Re-)calculate .size..' + self.size = YX(self._blessed.height, self._blessed.width) + + def clear(self) -> None: + 'Clear terminal.' + print(self._blessed.clear, end='') + + def flush(self) -> None: + 'Flush terminal.' + print('', end='', flush=True) + + def write_yx(self, yx: YX, msg: str) -> None: + 'Starting at yx, write line with msg, padded at end with spaces.' + len_padding = self.size.x - (yx.x + len(msg)) + print(self._blessed.move_yx(yx.y, yx.x), end='') + print(msg + (' ' * len_padding), end='') + + def get_keypresses(self) -> Iterator[str]: + '''Loop through keypresses from terminal, collect what blessed ignores. + + (Notably, blessed seems to junk any alt/escape-modifide key events it + does not explicitly know. + ''' + n_gotchs_unprocessed = 0 + while True: + new_gotchs = [] + if not n_gotchs_unprocessed: + while self._blessed.kbhit(TIMEOUT_FOR_QUIT): + gotch = self._blessed.getch() + self._blessed.ungetch(gotch) + new_gotchs += [gotch] + if not self._blessed.kbhit(0): + break + n_gotchs_unprocessed += len(new_gotchs) + blessed_key = self._blessed.inkey(timeout=0, esc_delay=0) + n_chs_blessed_key = len(blessed_key.encode('utf-8')) + unhandleds = [] + if len(new_gotchs) > 1 and blessed_key.name == 'KEY_ESCAPE': + for _ in range(len(new_gotchs) - n_chs_blessed_key): + unhandled = self._blessed.inkey(timeout=0, esc_delay=0) + unhandleds += list(unhandled.encode('utf-8')) + n_gotchs_unprocessed -= 1 + n_gotchs_unprocessed -= n_chs_blessed_key + if unhandleds: + yield str(unhandleds) + elif blessed_key.name: + yield blessed_key.name + else: + yield str(blessed_key) + + +class Connection: + 'Abstraction of Socket connection.' + _bufsize = 1024 + _socket: socket + + @contextmanager + def context(self, + address: tuple[str, int], + q_to_main: SimpleQueue + ) -> Generator: + 'Wrap socket and recv loop context.' + with socket() as sock: + self._socket = sock + self._socket.settimeout(TIMEOUT_FOR_QUIT) + self._socket.connect(address) + with SocketRecvLoop(q_to_main, self.read_lines()): + yield self + + def read_lines(self) -> Iterator[Optional[str]]: + 'Receive line-separator-delimited messages from socket.' + bytes_total = b'' + buffer_linesep = b'' + while True: + try: + bytes_new = self._socket.recv(self._bufsize) + except TimeoutError: + yield None + continue + except OSError as e: + if e.errno == 9: + break + raise e + if not bytes_new: + break + for c in bytes_new: + c_byted = c.to_bytes() + if c not in IRCSPEC_LINE_SEPARATOR: + bytes_total += c_byted + buffer_linesep = b'' + elif c == IRCSPEC_LINE_SEPARATOR[0]: + buffer_linesep = c_byted + else: + buffer_linesep += c_byted + if buffer_linesep == IRCSPEC_LINE_SEPARATOR: + buffer_linesep = b'' + yield bytes_total.decode('utf-8') + bytes_total = b'' + + def write_line(self, line: str) -> None: + 'Send line-separator-delimited message over socket.' + self._socket.sendall(line.encode('utf-8') + IRCSPEC_LINE_SEPARATOR) + + +class IrcMessage(NamedTuple): + 'Properly structured representation of IRC message as per IRCv3 spec.' + tags: dict[str, str] + source: str + verb: str + parameters: list[str] + + @classmethod + def from_raw(cls, raw_msg: str) -> Self: + 'Parse raw IRC message line into properly structured IrcMessage.' + + class _Stage(NamedTuple): + name: str + prefix_char: Optional[str] + processor: Callable = lambda s: s + + def _parse_tags(str_tags: str) -> dict[str, str]: + tags = {} + for str_tag in [s for s in str_tags.split(';') if s]: + if '=' in str_tag: + key, val = str_tag.split('=', maxsplit=1) + for to_replace, replace_with in IRCSPEC_TAG_ESCAPES: + val.replace(to_replace, replace_with) + else: + key, val = str_tag, '' + tags[key] = val + return tags + + def _split_params(str_params: str) -> list[str]: + params = [] + params_stage = 0 # 0: gap, 1: non-trailing, 2: trailing + for char in str_params: + if char == ' ' and params_stage < 2: + params_stage = 0 + continue + if params_stage == 0: + params += [''] + params_stage += 1 + if char == ':': + params_stage += 1 + continue + params[-1] += char + return params + + stages = [_Stage('tags', 'q', _parse_tags), + _Stage('source', ':'), + _Stage('verb', None, lambda s: s.upper()), + _Stage('parameters', None, _split_params)] + harvest = {s.name: '' for s in stages} + idx_stage = 0 + stage = None + for char in raw_msg: + if char == ' ' and idx_stage < (len(stages) - 1): + if stage: + stage = None + continue + if not stage: + while not stage: + idx_stage += 1 + tested = stages[idx_stage] + if (not tested.prefix_char) or char == tested.prefix_char: + stage = tested + if stage.prefix_char: + continue + harvest[stage.name] += char + return cls(*[s.processor(harvest[s.name]) for s in stages]) + + +class Loop: + 'Wraps thread looping over .put input queue, potential bonus iterator.' + + def __init__(self, + q_to_main: SimpleQueue, + bonus_iterator: Optional[Iterator] = None + ) -> None: + self._q_to_main = q_to_main + self._bonus_iterator = bonus_iterator + self._q_input: SimpleQueue[Event] = SimpleQueue() + Thread(target=self._loop, daemon=False).start() + + def __enter__(self) -> Self: + return self + + def __exit__(self, *_) -> Literal[False]: + self._q_input.put(Event('QUIT')) + return False # re-raise any exception that above ignored + + def put(self, event: Event) -> None: + 'Send event into thread loop.' + self._q_input.put(event) + + def broadcast(self, type_, *args) -> None: + 'Send event to main loop via queue.' + self._q_to_main.put(Event(type_, args)) + + def process_main(self, event: Event) -> bool: + 'Process event yielded from input queue.' + if event.type_ == 'QUIT': + return False + return True + + def process_bonus(self, yielded: str) -> None: + 'Process bonus iterator yield.' + + def _loop(self) -> None: + 'Loop over input queue and, if provided, bonus iterator.' + try: + while True: + try: + yield_main = self._q_input.get( + block=True, + timeout=0 if self._bonus_iterator else None) + except QueueEmpty: + pass + else: + if self.process_main(yield_main) is False: + break + if self._bonus_iterator: + try: + yield_bonus = next(self._bonus_iterator) + except StopIteration: + break + if yield_bonus: + self.process_bonus(yield_bonus) + except Exception as e: # pylint: disable=broad-exception-caught + self._q_to_main.put(Event('EXCEPTION', (e,))) + + +class TuiLoop(Loop): + 'Loop for drawing/updating TUI.' + + def __init__(self, term: Terminal, *args, **kwargs) -> None: + self._term = term + self._prompt = '' + self._log_buffer: list[str] = [] + self._calc_and_draw_all() + self._term.flush() + super().__init__(*args, **kwargs) + + def process_main(self, event: Event) -> bool: + if not super().process_main(event): + return False + if event.type_ == 'RECV': + self._log_buffer += [f'<- {event.args[0]}'] + self._draw_log() + elif event.type_ == 'SEND': + self._log_buffer += [f'-> {event.args[0]}'] + self._draw_log() + elif event.type_ == 'INPUT_PROMPT': + if event.args[0] == 'ENTER': + self.broadcast('SEND', self._prompt) + self._prompt = '' + elif event.args[0] == 'BACKSPACE': + self._prompt = self._prompt[:-1] + elif event.args[0] == 'CHARACTER': + self._prompt += event.args[1] + self._draw_prompt() + elif event.type_ == 'SIGWINCH': + self._calc_and_draw_all() + # elif event.type_ == 'DEBUG': + # from traceback import format_exception + # self._log_buffer += [ + # f'DEBUG {line}' for line + # in '\n'.join(format_exception(event.args[0])).split('\n')] + # self._draw_log() + self._term.flush() + return True + + def _calc_and_draw_all(self) -> None: + self._term.clear() + self._term.calc_geometry() + self._y_prompt = self._term.size.y - 1 + self._y_separator = self._term.size.y - 2 + self._draw_frame() + self._draw_log() + self._draw_prompt() + + def _draw_frame(self) -> None: + self._term.write_yx(YX(self._y_separator, 0), '=' * self._term.size.x) + self._term.write_yx(YX(self._y_prompt, 0), INPUT_PROMPT) + + def _draw_log(self) -> None: + temp_buffer = ([''] * self._term.size.y) + self._log_buffer[:] + for i, line in enumerate(temp_buffer[-self._y_separator:]): + self._term.write_yx(YX(i, 0), line[:self._term.size.x]) + + def _draw_prompt(self) -> None: + self._term.write_yx(YX(self._y_prompt, len(INPUT_PROMPT)), + f'{self._prompt}_') + + +class SocketRecvLoop(Loop): + 'Loop receiving and translating socket messages towards main loop.' + + def process_bonus(self, yielded: str) -> None: + msg = IrcMessage.from_raw(yielded) + if msg.verb == 'PING': + self.broadcast('PING', msg.parameters[0]) + # DEBUG = 3 / 0 + self.broadcast('RECV', str(msg)) + + +class KeyboardLoop(Loop): + 'Loop receiving and translating keyboard events towards main loop.' + + def process_bonus(self, yielded: str) -> None: + if yielded == 'KEY_ENTER': + self.broadcast('INPUT_PROMPT', 'ENTER') + elif yielded == 'KEY_BACKSPACE': + self.broadcast('INPUT_PROMPT', 'BACKSPACE') + elif len(yielded) == 1: + self.broadcast('INPUT_PROMPT', 'CHARACTER', yielded) + elif yielded == '[81]': + self.broadcast('QUIT') + else: + self.broadcast('RECV', str(yielded)) + + +def run() -> None: + 'Main execution code / loop.' + q_to_main: SimpleQueue[Event] = SimpleQueue() + with Terminal().context(q_to_main) as term: + with Connection().context((HOST, PORT), q_to_main) as conn: + conn.write_line(f'USER {USERNAME} 0 * :{REALNAME}') + conn.write_line(f'NICK {NICKNAME}') + while True: + event = q_to_main.get() + if event.type_ == 'QUIT': + break + if event.type_ == 'INPUT_PROMPT': + term.tui.put(event) + elif event.type_ == 'SEND': + term.tui.put(event) + conn.write_line(event.args[0]) + elif event.type_ == 'PING': + q_to_main.put(Event('SEND', (f'PONG {event.args[0]}',))) + elif event.type_ == 'RECV': + term.tui.put(event) + elif event.type_ == 'EXCEPTION': + raise event.args[0] + # elif event.type_ == 'DEBUG': + # term.tui.put(event) + + +if __name__ == '__main__': + run() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..d43de1b --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +blessed -- 2.30.2