IrcMessage, PORT_SSL)
ClientsDb = dict[str, 'Client']
-_NAMES_DESIRED_SERVER_CAPS = ('server-time', 'account-tag', 'sasl')
+_NAMES_DESIRED_SERVER_CAPS = ('sasl',)
# NB: in below numerics accounting, tuples define inclusive ranges
_NUMERICS_TO_CONFIRM_NICKNAME = (
)
+_EXPECTATIONS: dict[str, dict[str, tuple[int, bool] | tuple[str, ...]]]
+_EXPECTATIONS = {
+ '005': {'len_params': (3, True)},
+ '353': {'len_params': (4, False)},
+ '366': {'len_params': (3, False)},
+ '372': {'len_params': (2, False)},
+ '376': {'len_params': (2, False)},
+ '396': {'len_params': (3, False)},
+ '432': {'len_params': (3, False)},
+ '433': {'len_params': (3, False)},
+ '900': {'len_params': (4, False)},
+ '903': {'len_params': (2, False)},
+ '904': {'len_params': (2, False)},
+ 'AUTHENTICATE': {'params': ('+',)},
+ 'CAP': {'len_params': (3, True)},
+ 'JOIN': {'len_params': (1, False)},
+ 'ERROR': {'len_params': (1, False)},
+ 'MODE': {'len_params': (2, False)},
+ 'NICK': {'len_params': (1, False)},
+ 'NOTICE': {'len_params': (2, False)},
+ 'PRIVMSG': {'len_params': (2, False)},
+ 'PART': {'len_params': (1, True)},
+ 'PING': {'len_params': (1, False)},
+}
+
+
class _IrcMsg(IrcMessage):
'Extends IrcMessage with some conveniences.'
- def match(self, verb: str, len_params=1, len_is_min=False) -> bool:
+ def match(self, verb: str) -> bool:
'Test .verb, len(.params).'
+ if not verb == self.verb:
+ return False
+ expectations: dict[str, tuple[int, bool] | tuple[str, ...]]
+ expectations = _EXPECTATIONS[verb]
+ if 'params' in expectations:
+ return self.params == expectations['params']
+ len_params, len_is_min = expectations.get('len_params', (1, False))
+ assert isinstance(len_params, int)
n_msg_params = len(self.params)
- return (self.verb == verb
- and (n_msg_params == len_params
- or len_is_min and n_msg_params > len_params))
+ return bool(self.verb == verb
+ and (n_msg_params == len_params
+ or (len_is_min and n_msg_params > len_params)))
@property
def nick_from_source(self) -> str:
self.set_nick(msg.params[0], confirmed=True)
if _NumericsToIgnore.contain(msg.verb):
return
- if msg.match('005', 2, True): # RPL_ISUPPORT
+ if msg.match('005'): # RPL_ISUPPORT
self._db.process_isupport(msg.params[1:-1])
- elif msg.match('353', 4) and msg.params[1] == '=': # RPL_NAMREPLY
+ elif msg.match('353') and msg.params[1] == '=': # RPL_NAMREPLY
for user in msg.params[3].split():
self._db.chan(msg.params[2]).append_completable('users', user)
- elif msg.match('366', 3): # RPL_ENDOFNAMES
+ elif msg.match('366'): # RPL_ENDOFNAMES
self._db.chan(msg.params[1]).declare_complete('users')
- elif msg.match('372', 2): # RPL_MOTD
+ elif msg.match('372'): # RPL_MOTD
self._db.append_completable('motd', msg.params[1])
- elif msg.match('376', 2): # RPL_ENDOFMOTD
+ elif msg.match('376'): # RPL_ENDOFMOTD
self._db.declare_complete('motd')
- elif msg.match('396', 3): # RPL_VISIBLEHOST
+ elif msg.match('396'): # RPL_VISIBLEHOST
# '@'-split because <https://defs.ircdocs.horse/defs/numerics>
# claims: "<hostname> can also be in the form <user@hostname>"
self._db.client_host = msg.params[1].split('@')[-1]
- elif msg.match('432', 3): # ERR_ERRONEOUSNICKNAME
+ elif msg.match('432'): # ERR_ERRONEOUSNICKNAME
alert = 'nickname refused for bad format'
if msg.params[0] == '*':
alert += ', giving up'
else:
self.set_nick(msg.params[0], confirmed=True)
self._log(alert, alert=True)
- elif msg.match('433', 3): # ERR_NICKNAMEINUSE
+ elif msg.match('433'): # ERR_NICKNAMEINUSE
self._log('nickname already in use, trying increment', alert=True)
self.set_nick(self._db.nick_incremented)
- elif msg.match('900', 4): # RPL_LOGGEDIN
+ elif msg.match('900'): # RPL_LOGGEDIN
self._db.nickname, remainder = msg.params[1].split('!', maxsplit=1)
self._db.username, self._db.client_host = remainder.split('@')
self._db.sasl_account = msg.params[2]
- elif msg.match('903', 2) or msg.match('904', 2): # RPL_SASLSUCCESS, …
+ elif msg.match('903') or msg.match('904'): # RPL_SASLSUCCESS, …
self._db.sasl_auth_state = 'WIN' if msg.verb == '903' else 'FAIL'
self._caps.end_negotiation() # … or ERR_SASLFAIL
- elif msg.match('AUTHENTICATE') and msg.params[0] == '+':
+ elif msg.match('AUTHENTICATE'):
auth = b64encode((self._db.nick_wanted + '\0'
+ self._db.nick_wanted + '\0'
+ self._db.password
).encode('utf-8')).decode('utf-8')
self.send(IrcMessage('AUTHENTICATE', (auth,)))
- elif msg.match('CAP', len_is_min=True):
+ elif msg.match('CAP'):
if (self._caps.process_msg(msg.params[1:])
and self._db.caps.has('sasl')
and 'PLAIN' in self._db.caps['sasl'].data.split(',')):
self._caps.end_negotiation()
elif msg.match('ERROR'):
self.close()
- elif msg.match('MODE', 2) and msg.params[0] == self._db.nickname:
+ elif msg.match('JOIN'):
+ channel = msg.params[0]
+ log_msg = f'{msg.nick_from_source} {msg.verb.lower()}s {channel}'
+ self._log(log_msg, scope=LogScope.CHAT, target=channel)
+ if msg.nick_from_source != self._db.nickname:
+ self._db.chan(channel).append_completable(
+ 'users', msg.nick_from_source, stay_complete=True)
+ elif msg.match('MODE') and msg.params[0] == self._db.nickname:
self._db.user_modes = msg.params[1]
elif msg.match('NICK') and msg.nick_from_source == self._db.nickname:
self.set_nick(msg.params[0], confirmed=True)
- elif msg.match('NOTICE', 2) and (msg.params[0] != '*'
- or not self._db.nickname):
+ elif msg.match('NOTICE') and (msg.params[0] != '*'
+ or not self._db.nickname):
kw: dict[str, str | LogScope] = {}
if '!' in msg.source:
kw |= {'sender': msg.nick_from_source, 'scope': LogScope.CHAT}
self._log(msg.params[-1], out=False, target=msg.params[0],
as_notice=True, **kw)
- elif msg.match('PRIVMSG', 2) and msg.params[0] != '*':
+ elif msg.match('PRIVMSG') and msg.params[0] != '*':
kw = {}
if '!' in msg.source:
kw |= {'sender': msg.nick_from_source, 'scope': LogScope.CHAT}
self._log(msg.params[-1], out=False, target=msg.params[0], **kw)
- elif msg.match('PART', len_is_min=True) or msg.match('JOIN'):
+ elif msg.match('PART'):
channel = msg.params[0]
log_msg = f'{msg.nick_from_source} {msg.verb.lower()}s {channel}'
- if msg.match('PART', 2, True):
+ if len(msg.params) == 2:
log_msg += f': {msg.params[1]}'
self._log(log_msg, scope=LogScope.CHAT, target=channel)
- if msg.verb == 'JOIN':
- if msg.nick_from_source != self._db.nickname:
- self._db.chan(channel).append_completable(
- 'users', msg.nick_from_source, stay_complete=True)
- elif msg.nick_from_source == self._db.nickname:
+ if msg.nick_from_source == self._db.nickname:
self._db.del_chan(channel)
else:
self._db.chan(channel).remove_completable(