home · contact · privacy
Initial commit.
authorChristian Heller <c.heller@plomlompom.de>
Thu, 29 May 2025 05:25:36 +0000 (07:25 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Thu, 29 May 2025 05:25:36 +0000 (07:25 +0200)
ircplom.py [new file with mode: 0755]
requirements.txt [new file with mode: 0644]

diff --git a/ircplom.py b/ircplom.py
new file mode 100755 (executable)
index 0000000..6982da2
--- /dev/null
@@ -0,0 +1,418 @@
+#!/usr/bin/env python3
+'Attempt at an IRC client.'
+
+from contextlib import contextmanager
+from queue import SimpleQueue, Empty as QueueEmpty
+from signal import SIGWINCH, signal
+from socket import socket
+from threading import Thread
+from typing import (
+        Callable, Generator, Iterator, Literal, NamedTuple, Optional, Self)
+
+from blessed import Terminal as BlessedTerminal
+
+
+HOST = 'irc.freenode.net'
+PORT = 6667
+USERNAME = 'foo'
+NICKNAME = 'bar'
+REALNAME = 'debug debugger'
+TIMEOUT_FOR_QUIT = 1.0
+INPUT_PROMPT = ':'
+
+IRCSPEC_LINE_SEPARATOR = b'\r\n'
+IRCSPEC_TAG_ESCAPES = ((r'\:', ';'),
+                       (r'\s', ' '),
+                       (r'\n', '\n'),
+                       (r'\r', '\r'),
+                       (r'\\', '\\'))
+
+
+class Event(NamedTuple):
+    'Communication unit between threads.'
+    type_: str
+    args: tuple = tuple()
+
+
+class YX(NamedTuple):
+    '2-dimensional coordinate.'
+    y: int
+    x: int
+
+
+# def log(msg):
+#     from datetime import datetime
+#     with open('log.txt', 'a') as f:
+#         f.write(f'{datetime.now()} {msg}\n')
+
+
+class Terminal:
+    'Abstraction of terminal interface.'
+    size: YX
+    tui: 'TuiLoop'
+    _blessed: BlessedTerminal
+
+    @contextmanager
+    def context(self, q_to_main: SimpleQueue) -> Generator:
+        'Combine multiple contexts into one.'
+        self._blessed = BlessedTerminal()
+        with (self._blessed.raw(),
+              self._blessed.fullscreen(),
+              self._blessed.hidden_cursor(),
+              KeyboardLoop(q_to_main, self.get_keypresses())):
+            with TuiLoop(self, q_to_main) as tui:
+                self.tui = tui
+                signal(SIGWINCH, lambda *_: self.tui.put(Event('SIGWINCH')))
+                yield self
+
+    def calc_geometry(self) -> None:
+        '(Re-)calculate .size..'
+        self.size = YX(self._blessed.height, self._blessed.width)
+
+    def clear(self) -> None:
+        'Clear terminal.'
+        print(self._blessed.clear, end='')
+
+    def flush(self) -> None:
+        'Flush terminal.'
+        print('', end='', flush=True)
+
+    def write_yx(self, yx: YX, msg: str) -> None:
+        'Starting at yx, write line with msg, padded at end with spaces.'
+        len_padding = self.size.x - (yx.x + len(msg))
+        print(self._blessed.move_yx(yx.y, yx.x), end='')
+        print(msg + (' ' * len_padding), end='')
+
+    def get_keypresses(self) -> Iterator[str]:
+        '''Loop through keypresses from terminal, collect what blessed ignores.
+
+        (Notably, blessed seems to junk any alt/escape-modifide key events it
+        does not explicitly know.
+        '''
+        n_gotchs_unprocessed = 0
+        while True:
+            new_gotchs = []
+            if not n_gotchs_unprocessed:
+                while self._blessed.kbhit(TIMEOUT_FOR_QUIT):
+                    gotch = self._blessed.getch()
+                    self._blessed.ungetch(gotch)
+                    new_gotchs += [gotch]
+                    if not self._blessed.kbhit(0):
+                        break
+                n_gotchs_unprocessed += len(new_gotchs)
+            blessed_key = self._blessed.inkey(timeout=0, esc_delay=0)
+            n_chs_blessed_key = len(blessed_key.encode('utf-8'))
+            unhandleds = []
+            if len(new_gotchs) > 1 and blessed_key.name == 'KEY_ESCAPE':
+                for _ in range(len(new_gotchs) - n_chs_blessed_key):
+                    unhandled = self._blessed.inkey(timeout=0, esc_delay=0)
+                    unhandleds += list(unhandled.encode('utf-8'))
+                    n_gotchs_unprocessed -= 1
+            n_gotchs_unprocessed -= n_chs_blessed_key
+            if unhandleds:
+                yield str(unhandleds)
+            elif blessed_key.name:
+                yield blessed_key.name
+            else:
+                yield str(blessed_key)
+
+
+class Connection:
+    'Abstraction of Socket connection.'
+    _bufsize = 1024
+    _socket: socket
+
+    @contextmanager
+    def context(self,
+                address: tuple[str, int],
+                q_to_main: SimpleQueue
+                ) -> Generator:
+        'Wrap socket and recv loop context.'
+        with socket() as sock:
+            self._socket = sock
+            self._socket.settimeout(TIMEOUT_FOR_QUIT)
+            self._socket.connect(address)
+            with SocketRecvLoop(q_to_main, self.read_lines()):
+                yield self
+
+    def read_lines(self) -> Iterator[Optional[str]]:
+        'Receive line-separator-delimited messages from socket.'
+        bytes_total = b''
+        buffer_linesep = b''
+        while True:
+            try:
+                bytes_new = self._socket.recv(self._bufsize)
+            except TimeoutError:
+                yield None
+                continue
+            except OSError as e:
+                if e.errno == 9:
+                    break
+                raise e
+            if not bytes_new:
+                break
+            for c in bytes_new:
+                c_byted = c.to_bytes()
+                if c not in IRCSPEC_LINE_SEPARATOR:
+                    bytes_total += c_byted
+                    buffer_linesep = b''
+                elif c == IRCSPEC_LINE_SEPARATOR[0]:
+                    buffer_linesep = c_byted
+                else:
+                    buffer_linesep += c_byted
+                if buffer_linesep == IRCSPEC_LINE_SEPARATOR:
+                    buffer_linesep = b''
+                    yield bytes_total.decode('utf-8')
+                    bytes_total = b''
+
+    def write_line(self, line: str) -> None:
+        'Send line-separator-delimited message over socket.'
+        self._socket.sendall(line.encode('utf-8') + IRCSPEC_LINE_SEPARATOR)
+
+
+class IrcMessage(NamedTuple):
+    'Properly structured representation of IRC message as per IRCv3 spec.'
+    tags: dict[str, str]
+    source: str
+    verb: str
+    parameters: list[str]
+
+    @classmethod
+    def from_raw(cls, raw_msg: str) -> Self:
+        'Parse raw IRC message line into properly structured IrcMessage.'
+
+        class _Stage(NamedTuple):
+            name: str
+            prefix_char: Optional[str]
+            processor: Callable = lambda s: s
+
+        def _parse_tags(str_tags: str) -> dict[str, str]:
+            tags = {}
+            for str_tag in [s for s in str_tags.split(';') if s]:
+                if '=' in str_tag:
+                    key, val = str_tag.split('=', maxsplit=1)
+                    for to_replace, replace_with in IRCSPEC_TAG_ESCAPES:
+                        val.replace(to_replace, replace_with)
+                else:
+                    key, val = str_tag, ''
+                tags[key] = val
+            return tags
+
+        def _split_params(str_params: str) -> list[str]:
+            params = []
+            params_stage = 0  # 0: gap, 1: non-trailing, 2: trailing
+            for char in str_params:
+                if char == ' ' and params_stage < 2:
+                    params_stage = 0
+                    continue
+                if params_stage == 0:
+                    params += ['']
+                    params_stage += 1
+                    if char == ':':
+                        params_stage += 1
+                        continue
+                params[-1] += char
+            return params
+
+        stages = [_Stage('tags', 'q', _parse_tags),
+                  _Stage('source', ':'),
+                  _Stage('verb', None, lambda s: s.upper()),
+                  _Stage('parameters', None, _split_params)]
+        harvest = {s.name: '' for s in stages}
+        idx_stage = 0
+        stage = None
+        for char in raw_msg:
+            if char == ' ' and idx_stage < (len(stages) - 1):
+                if stage:
+                    stage = None
+                continue
+            if not stage:
+                while not stage:
+                    idx_stage += 1
+                    tested = stages[idx_stage]
+                    if (not tested.prefix_char) or char == tested.prefix_char:
+                        stage = tested
+                if stage.prefix_char:
+                    continue
+            harvest[stage.name] += char
+        return cls(*[s.processor(harvest[s.name]) for s in stages])
+
+
+class Loop:
+    'Wraps thread looping over .put input queue, potential bonus iterator.'
+
+    def __init__(self,
+                 q_to_main: SimpleQueue,
+                 bonus_iterator: Optional[Iterator] = None
+                 ) -> None:
+        self._q_to_main = q_to_main
+        self._bonus_iterator = bonus_iterator
+        self._q_input: SimpleQueue[Event] = SimpleQueue()
+        Thread(target=self._loop, daemon=False).start()
+
+    def __enter__(self) -> Self:
+        return self
+
+    def __exit__(self, *_) -> Literal[False]:
+        self._q_input.put(Event('QUIT'))
+        return False  # re-raise any exception that above ignored
+
+    def put(self, event: Event) -> None:
+        'Send event into thread loop.'
+        self._q_input.put(event)
+
+    def broadcast(self, type_, *args) -> None:
+        'Send event to main loop via queue.'
+        self._q_to_main.put(Event(type_, args))
+
+    def process_main(self, event: Event) -> bool:
+        'Process event yielded from input queue.'
+        if event.type_ == 'QUIT':
+            return False
+        return True
+
+    def process_bonus(self, yielded: str) -> None:
+        'Process bonus iterator yield.'
+
+    def _loop(self) -> None:
+        'Loop over input queue and, if provided, bonus iterator.'
+        try:
+            while True:
+                try:
+                    yield_main = self._q_input.get(
+                        block=True,
+                        timeout=0 if self._bonus_iterator else None)
+                except QueueEmpty:
+                    pass
+                else:
+                    if self.process_main(yield_main) is False:
+                        break
+                if self._bonus_iterator:
+                    try:
+                        yield_bonus = next(self._bonus_iterator)
+                    except StopIteration:
+                        break
+                    if yield_bonus:
+                        self.process_bonus(yield_bonus)
+        except Exception as e:  # pylint: disable=broad-exception-caught
+            self._q_to_main.put(Event('EXCEPTION', (e,)))
+
+
+class TuiLoop(Loop):
+    'Loop for drawing/updating TUI.'
+
+    def __init__(self, term: Terminal, *args, **kwargs) -> None:
+        self._term = term
+        self._prompt = ''
+        self._log_buffer: list[str] = []
+        self._calc_and_draw_all()
+        self._term.flush()
+        super().__init__(*args, **kwargs)
+
+    def process_main(self, event: Event) -> bool:
+        if not super().process_main(event):
+            return False
+        if event.type_ == 'RECV':
+            self._log_buffer += [f'<- {event.args[0]}']
+            self._draw_log()
+        elif event.type_ == 'SEND':
+            self._log_buffer += [f'-> {event.args[0]}']
+            self._draw_log()
+        elif event.type_ == 'INPUT_PROMPT':
+            if event.args[0] == 'ENTER':
+                self.broadcast('SEND', self._prompt)
+                self._prompt = ''
+            elif event.args[0] == 'BACKSPACE':
+                self._prompt = self._prompt[:-1]
+            elif event.args[0] == 'CHARACTER':
+                self._prompt += event.args[1]
+            self._draw_prompt()
+        elif event.type_ == 'SIGWINCH':
+            self._calc_and_draw_all()
+        # elif event.type_ == 'DEBUG':
+        #     from traceback import format_exception
+        #     self._log_buffer += [
+        #         f'DEBUG {line}' for line
+        #         in '\n'.join(format_exception(event.args[0])).split('\n')]
+        #     self._draw_log()
+        self._term.flush()
+        return True
+
+    def _calc_and_draw_all(self) -> None:
+        self._term.clear()
+        self._term.calc_geometry()
+        self._y_prompt = self._term.size.y - 1
+        self._y_separator = self._term.size.y - 2
+        self._draw_frame()
+        self._draw_log()
+        self._draw_prompt()
+
+    def _draw_frame(self) -> None:
+        self._term.write_yx(YX(self._y_separator, 0), '=' * self._term.size.x)
+        self._term.write_yx(YX(self._y_prompt, 0), INPUT_PROMPT)
+
+    def _draw_log(self) -> None:
+        temp_buffer = ([''] * self._term.size.y) + self._log_buffer[:]
+        for i, line in enumerate(temp_buffer[-self._y_separator:]):
+            self._term.write_yx(YX(i, 0), line[:self._term.size.x])
+
+    def _draw_prompt(self) -> None:
+        self._term.write_yx(YX(self._y_prompt, len(INPUT_PROMPT)),
+                            f'{self._prompt}_')
+
+
+class SocketRecvLoop(Loop):
+    'Loop receiving and translating socket messages towards main loop.'
+
+    def process_bonus(self, yielded: str) -> None:
+        msg = IrcMessage.from_raw(yielded)
+        if msg.verb == 'PING':
+            self.broadcast('PING', msg.parameters[0])
+            # DEBUG = 3 / 0
+        self.broadcast('RECV', str(msg))
+
+
+class KeyboardLoop(Loop):
+    'Loop receiving and translating keyboard events towards main loop.'
+
+    def process_bonus(self, yielded: str) -> None:
+        if yielded == 'KEY_ENTER':
+            self.broadcast('INPUT_PROMPT', 'ENTER')
+        elif yielded == 'KEY_BACKSPACE':
+            self.broadcast('INPUT_PROMPT', 'BACKSPACE')
+        elif len(yielded) == 1:
+            self.broadcast('INPUT_PROMPT', 'CHARACTER', yielded)
+        elif yielded == '[81]':
+            self.broadcast('QUIT')
+        else:
+            self.broadcast('RECV', str(yielded))
+
+
+def run() -> None:
+    'Main execution code / loop.'
+    q_to_main: SimpleQueue[Event] = SimpleQueue()
+    with Terminal().context(q_to_main) as term:
+        with Connection().context((HOST, PORT), q_to_main) as conn:
+            conn.write_line(f'USER {USERNAME} 0 * :{REALNAME}')
+            conn.write_line(f'NICK {NICKNAME}')
+            while True:
+                event = q_to_main.get()
+                if event.type_ == 'QUIT':
+                    break
+                if event.type_ == 'INPUT_PROMPT':
+                    term.tui.put(event)
+                elif event.type_ == 'SEND':
+                    term.tui.put(event)
+                    conn.write_line(event.args[0])
+                elif event.type_ == 'PING':
+                    q_to_main.put(Event('SEND', (f'PONG {event.args[0]}',)))
+                elif event.type_ == 'RECV':
+                    term.tui.put(event)
+                elif event.type_ == 'EXCEPTION':
+                    raise event.args[0]
+                # elif event.type_ == 'DEBUG':
+                #     term.tui.put(event)
+
+
+if __name__ == '__main__':
+    run()
diff --git a/requirements.txt b/requirements.txt
new file mode 100644 (file)
index 0000000..d43de1b
--- /dev/null
@@ -0,0 +1 @@
+blessed