--- /dev/null
+#!/usr/bin/env python3
+'Attempt at an IRC client.'
+
+from contextlib import contextmanager
+from queue import SimpleQueue, Empty as QueueEmpty
+from signal import SIGWINCH, signal
+from socket import socket
+from threading import Thread
+from typing import (
+ Callable, Generator, Iterator, Literal, NamedTuple, Optional, Self)
+
+from blessed import Terminal as BlessedTerminal
+
+
+HOST = 'irc.freenode.net'
+PORT = 6667
+USERNAME = 'foo'
+NICKNAME = 'bar'
+REALNAME = 'debug debugger'
+TIMEOUT_FOR_QUIT = 1.0
+INPUT_PROMPT = ':'
+
+IRCSPEC_LINE_SEPARATOR = b'\r\n'
+IRCSPEC_TAG_ESCAPES = ((r'\:', ';'),
+ (r'\s', ' '),
+ (r'\n', '\n'),
+ (r'\r', '\r'),
+ (r'\\', '\\'))
+
+
+class Event(NamedTuple):
+ 'Communication unit between threads.'
+ type_: str
+ args: tuple = tuple()
+
+
+class YX(NamedTuple):
+ '2-dimensional coordinate.'
+ y: int
+ x: int
+
+
+# def log(msg):
+# from datetime import datetime
+# with open('log.txt', 'a') as f:
+# f.write(f'{datetime.now()} {msg}\n')
+
+
+class Terminal:
+ 'Abstraction of terminal interface.'
+ size: YX
+ tui: 'TuiLoop'
+ _blessed: BlessedTerminal
+
+ @contextmanager
+ def context(self, q_to_main: SimpleQueue) -> Generator:
+ 'Combine multiple contexts into one.'
+ self._blessed = BlessedTerminal()
+ with (self._blessed.raw(),
+ self._blessed.fullscreen(),
+ self._blessed.hidden_cursor(),
+ KeyboardLoop(q_to_main, self.get_keypresses())):
+ with TuiLoop(self, q_to_main) as tui:
+ self.tui = tui
+ signal(SIGWINCH, lambda *_: self.tui.put(Event('SIGWINCH')))
+ yield self
+
+ def calc_geometry(self) -> None:
+ '(Re-)calculate .size..'
+ self.size = YX(self._blessed.height, self._blessed.width)
+
+ def clear(self) -> None:
+ 'Clear terminal.'
+ print(self._blessed.clear, end='')
+
+ def flush(self) -> None:
+ 'Flush terminal.'
+ print('', end='', flush=True)
+
+ def write_yx(self, yx: YX, msg: str) -> None:
+ 'Starting at yx, write line with msg, padded at end with spaces.'
+ len_padding = self.size.x - (yx.x + len(msg))
+ print(self._blessed.move_yx(yx.y, yx.x), end='')
+ print(msg + (' ' * len_padding), end='')
+
+ def get_keypresses(self) -> Iterator[str]:
+ '''Loop through keypresses from terminal, collect what blessed ignores.
+
+ (Notably, blessed seems to junk any alt/escape-modifide key events it
+ does not explicitly know.
+ '''
+ n_gotchs_unprocessed = 0
+ while True:
+ new_gotchs = []
+ if not n_gotchs_unprocessed:
+ while self._blessed.kbhit(TIMEOUT_FOR_QUIT):
+ gotch = self._blessed.getch()
+ self._blessed.ungetch(gotch)
+ new_gotchs += [gotch]
+ if not self._blessed.kbhit(0):
+ break
+ n_gotchs_unprocessed += len(new_gotchs)
+ blessed_key = self._blessed.inkey(timeout=0, esc_delay=0)
+ n_chs_blessed_key = len(blessed_key.encode('utf-8'))
+ unhandleds = []
+ if len(new_gotchs) > 1 and blessed_key.name == 'KEY_ESCAPE':
+ for _ in range(len(new_gotchs) - n_chs_blessed_key):
+ unhandled = self._blessed.inkey(timeout=0, esc_delay=0)
+ unhandleds += list(unhandled.encode('utf-8'))
+ n_gotchs_unprocessed -= 1
+ n_gotchs_unprocessed -= n_chs_blessed_key
+ if unhandleds:
+ yield str(unhandleds)
+ elif blessed_key.name:
+ yield blessed_key.name
+ else:
+ yield str(blessed_key)
+
+
+class Connection:
+ 'Abstraction of Socket connection.'
+ _bufsize = 1024
+ _socket: socket
+
+ @contextmanager
+ def context(self,
+ address: tuple[str, int],
+ q_to_main: SimpleQueue
+ ) -> Generator:
+ 'Wrap socket and recv loop context.'
+ with socket() as sock:
+ self._socket = sock
+ self._socket.settimeout(TIMEOUT_FOR_QUIT)
+ self._socket.connect(address)
+ with SocketRecvLoop(q_to_main, self.read_lines()):
+ yield self
+
+ def read_lines(self) -> Iterator[Optional[str]]:
+ 'Receive line-separator-delimited messages from socket.'
+ bytes_total = b''
+ buffer_linesep = b''
+ while True:
+ try:
+ bytes_new = self._socket.recv(self._bufsize)
+ except TimeoutError:
+ yield None
+ continue
+ except OSError as e:
+ if e.errno == 9:
+ break
+ raise e
+ if not bytes_new:
+ break
+ for c in bytes_new:
+ c_byted = c.to_bytes()
+ if c not in IRCSPEC_LINE_SEPARATOR:
+ bytes_total += c_byted
+ buffer_linesep = b''
+ elif c == IRCSPEC_LINE_SEPARATOR[0]:
+ buffer_linesep = c_byted
+ else:
+ buffer_linesep += c_byted
+ if buffer_linesep == IRCSPEC_LINE_SEPARATOR:
+ buffer_linesep = b''
+ yield bytes_total.decode('utf-8')
+ bytes_total = b''
+
+ def write_line(self, line: str) -> None:
+ 'Send line-separator-delimited message over socket.'
+ self._socket.sendall(line.encode('utf-8') + IRCSPEC_LINE_SEPARATOR)
+
+
+class IrcMessage(NamedTuple):
+ 'Properly structured representation of IRC message as per IRCv3 spec.'
+ tags: dict[str, str]
+ source: str
+ verb: str
+ parameters: list[str]
+
+ @classmethod
+ def from_raw(cls, raw_msg: str) -> Self:
+ 'Parse raw IRC message line into properly structured IrcMessage.'
+
+ class _Stage(NamedTuple):
+ name: str
+ prefix_char: Optional[str]
+ processor: Callable = lambda s: s
+
+ def _parse_tags(str_tags: str) -> dict[str, str]:
+ tags = {}
+ for str_tag in [s for s in str_tags.split(';') if s]:
+ if '=' in str_tag:
+ key, val = str_tag.split('=', maxsplit=1)
+ for to_replace, replace_with in IRCSPEC_TAG_ESCAPES:
+ val.replace(to_replace, replace_with)
+ else:
+ key, val = str_tag, ''
+ tags[key] = val
+ return tags
+
+ def _split_params(str_params: str) -> list[str]:
+ params = []
+ params_stage = 0 # 0: gap, 1: non-trailing, 2: trailing
+ for char in str_params:
+ if char == ' ' and params_stage < 2:
+ params_stage = 0
+ continue
+ if params_stage == 0:
+ params += ['']
+ params_stage += 1
+ if char == ':':
+ params_stage += 1
+ continue
+ params[-1] += char
+ return params
+
+ stages = [_Stage('tags', 'q', _parse_tags),
+ _Stage('source', ':'),
+ _Stage('verb', None, lambda s: s.upper()),
+ _Stage('parameters', None, _split_params)]
+ harvest = {s.name: '' for s in stages}
+ idx_stage = 0
+ stage = None
+ for char in raw_msg:
+ if char == ' ' and idx_stage < (len(stages) - 1):
+ if stage:
+ stage = None
+ continue
+ if not stage:
+ while not stage:
+ idx_stage += 1
+ tested = stages[idx_stage]
+ if (not tested.prefix_char) or char == tested.prefix_char:
+ stage = tested
+ if stage.prefix_char:
+ continue
+ harvest[stage.name] += char
+ return cls(*[s.processor(harvest[s.name]) for s in stages])
+
+
+class Loop:
+ 'Wraps thread looping over .put input queue, potential bonus iterator.'
+
+ def __init__(self,
+ q_to_main: SimpleQueue,
+ bonus_iterator: Optional[Iterator] = None
+ ) -> None:
+ self._q_to_main = q_to_main
+ self._bonus_iterator = bonus_iterator
+ self._q_input: SimpleQueue[Event] = SimpleQueue()
+ Thread(target=self._loop, daemon=False).start()
+
+ def __enter__(self) -> Self:
+ return self
+
+ def __exit__(self, *_) -> Literal[False]:
+ self._q_input.put(Event('QUIT'))
+ return False # re-raise any exception that above ignored
+
+ def put(self, event: Event) -> None:
+ 'Send event into thread loop.'
+ self._q_input.put(event)
+
+ def broadcast(self, type_, *args) -> None:
+ 'Send event to main loop via queue.'
+ self._q_to_main.put(Event(type_, args))
+
+ def process_main(self, event: Event) -> bool:
+ 'Process event yielded from input queue.'
+ if event.type_ == 'QUIT':
+ return False
+ return True
+
+ def process_bonus(self, yielded: str) -> None:
+ 'Process bonus iterator yield.'
+
+ def _loop(self) -> None:
+ 'Loop over input queue and, if provided, bonus iterator.'
+ try:
+ while True:
+ try:
+ yield_main = self._q_input.get(
+ block=True,
+ timeout=0 if self._bonus_iterator else None)
+ except QueueEmpty:
+ pass
+ else:
+ if self.process_main(yield_main) is False:
+ break
+ if self._bonus_iterator:
+ try:
+ yield_bonus = next(self._bonus_iterator)
+ except StopIteration:
+ break
+ if yield_bonus:
+ self.process_bonus(yield_bonus)
+ except Exception as e: # pylint: disable=broad-exception-caught
+ self._q_to_main.put(Event('EXCEPTION', (e,)))
+
+
+class TuiLoop(Loop):
+ 'Loop for drawing/updating TUI.'
+
+ def __init__(self, term: Terminal, *args, **kwargs) -> None:
+ self._term = term
+ self._prompt = ''
+ self._log_buffer: list[str] = []
+ self._calc_and_draw_all()
+ self._term.flush()
+ super().__init__(*args, **kwargs)
+
+ def process_main(self, event: Event) -> bool:
+ if not super().process_main(event):
+ return False
+ if event.type_ == 'RECV':
+ self._log_buffer += [f'<- {event.args[0]}']
+ self._draw_log()
+ elif event.type_ == 'SEND':
+ self._log_buffer += [f'-> {event.args[0]}']
+ self._draw_log()
+ elif event.type_ == 'INPUT_PROMPT':
+ if event.args[0] == 'ENTER':
+ self.broadcast('SEND', self._prompt)
+ self._prompt = ''
+ elif event.args[0] == 'BACKSPACE':
+ self._prompt = self._prompt[:-1]
+ elif event.args[0] == 'CHARACTER':
+ self._prompt += event.args[1]
+ self._draw_prompt()
+ elif event.type_ == 'SIGWINCH':
+ self._calc_and_draw_all()
+ # elif event.type_ == 'DEBUG':
+ # from traceback import format_exception
+ # self._log_buffer += [
+ # f'DEBUG {line}' for line
+ # in '\n'.join(format_exception(event.args[0])).split('\n')]
+ # self._draw_log()
+ self._term.flush()
+ return True
+
+ def _calc_and_draw_all(self) -> None:
+ self._term.clear()
+ self._term.calc_geometry()
+ self._y_prompt = self._term.size.y - 1
+ self._y_separator = self._term.size.y - 2
+ self._draw_frame()
+ self._draw_log()
+ self._draw_prompt()
+
+ def _draw_frame(self) -> None:
+ self._term.write_yx(YX(self._y_separator, 0), '=' * self._term.size.x)
+ self._term.write_yx(YX(self._y_prompt, 0), INPUT_PROMPT)
+
+ def _draw_log(self) -> None:
+ temp_buffer = ([''] * self._term.size.y) + self._log_buffer[:]
+ for i, line in enumerate(temp_buffer[-self._y_separator:]):
+ self._term.write_yx(YX(i, 0), line[:self._term.size.x])
+
+ def _draw_prompt(self) -> None:
+ self._term.write_yx(YX(self._y_prompt, len(INPUT_PROMPT)),
+ f'{self._prompt}_')
+
+
+class SocketRecvLoop(Loop):
+ 'Loop receiving and translating socket messages towards main loop.'
+
+ def process_bonus(self, yielded: str) -> None:
+ msg = IrcMessage.from_raw(yielded)
+ if msg.verb == 'PING':
+ self.broadcast('PING', msg.parameters[0])
+ # DEBUG = 3 / 0
+ self.broadcast('RECV', str(msg))
+
+
+class KeyboardLoop(Loop):
+ 'Loop receiving and translating keyboard events towards main loop.'
+
+ def process_bonus(self, yielded: str) -> None:
+ if yielded == 'KEY_ENTER':
+ self.broadcast('INPUT_PROMPT', 'ENTER')
+ elif yielded == 'KEY_BACKSPACE':
+ self.broadcast('INPUT_PROMPT', 'BACKSPACE')
+ elif len(yielded) == 1:
+ self.broadcast('INPUT_PROMPT', 'CHARACTER', yielded)
+ elif yielded == '[81]':
+ self.broadcast('QUIT')
+ else:
+ self.broadcast('RECV', str(yielded))
+
+
+def run() -> None:
+ 'Main execution code / loop.'
+ q_to_main: SimpleQueue[Event] = SimpleQueue()
+ with Terminal().context(q_to_main) as term:
+ with Connection().context((HOST, PORT), q_to_main) as conn:
+ conn.write_line(f'USER {USERNAME} 0 * :{REALNAME}')
+ conn.write_line(f'NICK {NICKNAME}')
+ while True:
+ event = q_to_main.get()
+ if event.type_ == 'QUIT':
+ break
+ if event.type_ == 'INPUT_PROMPT':
+ term.tui.put(event)
+ elif event.type_ == 'SEND':
+ term.tui.put(event)
+ conn.write_line(event.args[0])
+ elif event.type_ == 'PING':
+ q_to_main.put(Event('SEND', (f'PONG {event.args[0]}',)))
+ elif event.type_ == 'RECV':
+ term.tui.put(event)
+ elif event.type_ == 'EXCEPTION':
+ raise event.args[0]
+ # elif event.type_ == 'DEBUG':
+ # term.tui.put(event)
+
+
+if __name__ == '__main__':
+ run()