from blessed import Terminal as BlessedTerminal
-HOST = 'irc.freenode.net'
PORT = 6667
-USERNAME = 'foo'
-NICKNAME = 'bar'
-REALNAME = 'debug debugger'
TIMEOUT_CONNECT = 5
TIMEOUT_LOOP = 0.1
+CONN_RECV_BUFSIZE = 1024
INPUT_PROMPT = '> '
KEYBINDINGS = {
yield str(blessed_key)
-class Connection:
- 'Abstraction of Socket connection.'
- _bufsize = 1024
+class IrcConnection:
+ 'Abstracts socket connection, loop over it, and handling messages from it.'
_socket: socket
- @contextmanager
- def context(self,
- address: tuple[str, int],
- q_to_main: EventQueue
- ) -> Generator:
- 'Wrap socket and recv loop context.'
- with socket() as self._socket:
- self._socket.settimeout(TIMEOUT_CONNECT)
- self._socket.connect(address)
- self._socket.settimeout(TIMEOUT_LOOP)
- with SocketRecvLoop(q_to_main, self.read_lines()):
- yield self
-
- def read_lines(self) -> Iterator[Optional[str]]:
+ def __init__(self,
+ q_to_main: EventQueue,
+ idx: int,
+ hostname: str,
+ login: tuple[str, str, str]
+ ) -> None:
+ self._idx = idx
+ self._q_to_main = q_to_main
+ self._socket = socket()
+ self._socket.settimeout(TIMEOUT_CONNECT)
+ self._socket.connect((hostname, PORT))
+ self._socket.settimeout(TIMEOUT_LOOP)
+ self._recv_loop = SocketRecvLoop(self._idx, self._q_to_main,
+ self._read_lines())
+ self._broadcast('CONNECTION_WINDOW', self._idx)
+ self._broadcast('SEND', IrcMessage('USER', [login[0], '0', '*',
+ login[2]]))
+ self._broadcast('SEND', IrcMessage('NICK', [login[1]]))
+
+ def close(self):
+ 'Close both SocketRecvLoop and socket.'
+ self._recv_loop.stop()
+ self._socket.close()
+
+ def _broadcast(self, type_: str, payload: Any = None) -> None:
+ 'Send event to main loop via queue, with connection index as 1st arg.'
+ self._q_to_main.eput(type_, (self._idx, payload))
+
+ def _read_lines(self) -> Iterator[Optional[str]]:
'Receive line-separator-delimited messages from socket.'
bytes_total = b''
buffer_linesep = b''
while True:
try:
- bytes_new = self._socket.recv(self._bufsize)
+ bytes_new = self._socket.recv(CONN_RECV_BUFSIZE)
except TimeoutError:
yield None
continue
yield bytes_total.decode('utf-8')
bytes_total = b''
- def write_line(self, line: str) -> None:
+ def _write_line(self, line: str) -> None:
'Send line-separator-delimited message over socket.'
self._socket.sendall(line.encode('utf-8') + IRCSPEC_LINE_SEPARATOR)
+ def handle(self, event: Event) -> None:
+ 'Process connection-directed Event into further steps.'
+ msg: IrcMessage = event.payload[1]
+ if event.type_ == 'SEND':
+ self._write_line(msg.raw)
+ if event.type_ == 'RECV' and msg.verb == 'PING':
+ self._broadcast('SEND', IrcMessage('PONG', [msg.parameters[0]]))
+
class IrcMessage:
'Properly structured representation of IRC message as per IRCv3 spec.'
self._raw = ' '.join(to_combine)
return self._raw
- def send(self, conn: Connection) -> None:
- 'Send self to conn encoded into line.'
- conn.write_line(self.raw)
-
class Loop:
'Wraps thread looping over .eput input queue, potential bonus iterator.'
self._thread = Thread(target=self._loop, daemon=False)
self._thread.start()
+ def stop(self) -> None:
+ 'Emit "QUIT" signal to break threaded loop, then wait for break.'
+ self._q_input.eput('QUIT')
+ self._thread.join()
+
def __enter__(self) -> Self:
return self
def __exit__(self, *_) -> Literal[False]:
- self._q_input.eput('QUIT')
- self._thread.join()
+ self.stop()
return False # re-raise any exception that above ignored
def put(self, event: Event) -> None:
self._wrapped_idx = min(-1,
self._wrapped_idx + self._y_pgscroll)
history_idx_to_wrapped_idx = self._wrapped[self._wrapped_idx][0]
- assert history_idx_to_wrapped_idx is not None
+ assert if history_idx_to_wrapped_idx is not None:
self._history_idx = history_idx_to_wrapped_idx - len(self._history)
def __init__(self, term: Terminal, *args, **kwargs) -> None:
self._term = term
- self._windows = [Window(self._term) for i in range(2)]
+ self._windows = [Window(self._term)]
+ self._conn_windows: list[Window] = []
self._window_idx = 0
self._calc_and_draw_all()
self._term.flush()
def process_main(self, event: Event) -> bool:
if not super().process_main(event):
return False
- if event.type_ in {'ALERT', 'RECV', 'SEND'}:
- self._windows[0].log.append(f'{event.type_} {event.payload}')
- if event.type_ == 'RECV':
- self._windows[1].log.append(f'<- {event.payload.raw}')
- elif event.type_ == 'SEND':
- self._windows[1].log.append(f'-> {event.payload.raw}')
+ if event.type_ == 'CONNECTION_WINDOW':
+ conn_win = Window(self._term)
+ conn_win.set_geometry((self._y_separator, self._y_prompt))
+ self._windows += [conn_win]
+ self._conn_windows += [conn_win]
+ elif event.type_ == 'ALERT':
+ self.window.log.append(f'{event.type_} {event.payload}')
self.window.log.draw()
+ elif event.type_ in {'RECV', 'SEND'}:
+ conn_win = self._conn_windows[event.payload[0]]
+ prefix = '<-' if event.type_ == 'RECV' else '->'
+ conn_win.log.append(f'{prefix} {event.payload[1].raw}')
+ if conn_win == self.window:
+ self.window.log.draw()
elif event.type_ == 'KEYBINDING':
cmd = self._cmd_name_to_cmd(event.payload[0])
assert cmd is not None
def _calc_and_draw_all(self) -> None:
self._term.clear()
self._term.calc_geometry()
- y_prompt = self._term.size.y - 1
- y_separator = self._term.size.y - 2
- self._term.write_yx(YX(y_separator, 0), '=' * self._term.size.x)
- self._term.write_yx(YX(y_prompt, 0), INPUT_PROMPT)
+ self._y_prompt = self._term.size.y - 1
+ self._y_separator = self._term.size.y - 2
+ self._term.write_yx(YX(self._y_separator, 0), '=' * self._term.size.x)
+ self._term.write_yx(YX(self._y_prompt, 0), INPUT_PROMPT)
for window in self._windows:
- window.set_geometry((y_separator, y_prompt))
+ window.set_geometry((self._y_separator, self._y_prompt))
self.window.draw()
+ def cmd__connect(self,
+ hostname: str,
+ username: str,
+ nickname: str,
+ realname: str
+ ) -> None:
+ 'Send INIT_CONNECTION command to main loop.'
+ self.broadcast('INIT_CONNECTION',
+ (hostname, (username, nickname, realname)))
+
def cmd__disconnect(self, quit_msg: str = 'ircplom says bye') -> None:
'Send QUIT command to server.'
self.broadcast('SEND', IrcMessage('QUIT', [quit_msg]))
class SocketRecvLoop(Loop):
'Loop receiving and translating socket messages towards main loop.'
+ def __init__(self, connection_idx: int, *args, **kwargs) -> None:
+ self._conn_idx = connection_idx
+ super().__init__(*args, **kwargs)
+
def process_bonus(self, yielded: str) -> None:
- self.broadcast('RECV', IrcMessage.from_raw(yielded))
+ self.broadcast('RECV', (self._conn_idx, IrcMessage.from_raw(yielded)))
class KeyboardLoop(Loop):
def run() -> None:
'Main execution code / loop.'
q_to_main = EventQueue()
- with Terminal().context(q_to_main) as term:
- with Connection().context((HOST, PORT), q_to_main) as conn:
- q_to_main.eput('SEND', IrcMessage('USER', [USERNAME, '0', '*',
- REALNAME]))
- q_to_main.eput('SEND', IrcMessage('NICK', [NICKNAME]))
+ connections: list[IrcConnection] = []
+ try:
+ with Terminal().context(q_to_main) as term:
while True:
event = q_to_main.get()
term.tui.put(event)
break
if event.type_ == 'EXCEPTION':
raise event.payload
- if event.type_ == 'SEND':
- event.payload.send(conn)
- elif event.type_ == 'RECV':
- msg: IrcMessage = event.payload
- if msg.verb == 'PING':
- q_to_main.eput('SEND',
- IrcMessage('PONG', [msg.parameters[0]]))
+ if event.type_ == 'INIT_CONNECTION':
+ connections += [IrcConnection(q_to_main, len(connections),
+ *event.payload)]
+ elif event.type_ in {'RECV', 'SEND'}:
+ connections[event.payload[0]].handle(event)
+ finally:
+ for conn in connections:
+ conn.close()
if __name__ == '__main__':