'Event to signal nickname having been set server-side.'
-class SendEvent(ConnEvent, PayloadMixin):
+class _SendEvent(ConnEvent, PayloadMixin):
'Event to trigger sending of payload to server.'
- payload: 'IrcMessage'
+ payload: '_IrcMessage'
class BroadcastConnMixin(BroadcastMixin, ConnIdxMixin):
'Broadcast event subclassing ConnEvent, with connection ID.'
self.broadcast(event_class, conn_idx=self.conn_idx, *args, **kwargs)
+ def send(self, verb: str, parameters: tuple[str, ...]) -> None:
+ 'Broadcast _SendEvent for _IrcMessage(verb, parameters).'
+ self.broadcast_conn(_SendEvent, _IrcMessage(verb, parameters))
+
class IrcConnection(BroadcastConnMixin):
'Abstracts socket connection, loop over it, and handling messages from it.'
return
self._socket.sendall(line.encode('utf-8') + _IRCSPEC_LINE_SEPARATOR)
- def send(self, verb: str, parameters: tuple[str, ...]) -> None:
- 'Broadcast SendEvent for IrcMessage(verb, parameters).'
- self.broadcast_conn(SendEvent, IrcMessage(verb, parameters))
-
def handle(self, event: ConnEvent) -> None:
'Process connection-directed Event into further steps.'
if isinstance(event, InitReconnectEvent):
# self.send('CAP', ('END',))
elif isinstance(event, _DisconnectedEvent):
self.close()
- elif isinstance(event, SendEvent):
+ elif isinstance(event, _SendEvent):
self.broadcast_conn(LogConnEvent, f'->: {event.payload.raw}')
self._write_line(event.payload.raw)
-class IrcMessage:
+class _IrcMessage:
'Properly structured representation of IRC message as per IRCv3 spec.'
_raw: Optional[str] = None
@classmethod
def from_raw(cls, raw_msg: str) -> Self:
- 'Parse raw IRC message line into properly structured IrcMessage.'
+ 'Parse raw IRC message line into properly structured _IrcMessage.'
class _Stage(NamedTuple):
name: str
self._conn = conn
def process_bonus(self, yielded: str) -> None:
- msg = IrcMessage.from_raw(yielded)
+ msg = _IrcMessage.from_raw(yielded)
self._conn.broadcast_conn(LogConnEvent, f'<-: {msg.raw}')
if msg.verb == 'PING':
self._conn.send('PONG', (msg.parameters[0],))
PayloadMixin, QuitEvent)
from ircplom.irc_conn import (
BroadcastConnMixin, ConnEvent, InitConnectEvent, InitConnWindowEvent,
- InitReconnectEvent, IrcMessage, LoginNames, LogConnEvent, NickSetEvent,
- SendEvent, TIMEOUT_LOOP)
+ InitReconnectEvent, LoginNames, LogConnEvent, NickSetEvent,
+ TIMEOUT_LOOP)
_MIN_HEIGHT = 4
_MIN_WIDTH = 32
def cmd__disconnect(self, quit_msg: str = 'ircplom says bye') -> None:
'Send QUIT command to server.'
- self.broadcast_conn(SendEvent, IrcMessage('QUIT', (quit_msg, )))
+ self.send('QUIT', (quit_msg, ))
def cmd__reconnect(self) -> None:
'Attempt reconnection.'
def cmd__nick(self, new_nick: str) -> None:
'Attempt nickname change.'
- self.broadcast_conn(SendEvent, IrcMessage('NICK', (new_nick, )))
+ self.send('NICK', (new_nick, ))
class _KeyboardLoop(Loop, BroadcastMixin):