self._send = sender
self._challenges: dict[str, bool] = {}
self._dict: dict[str, _ServerCapability] = {}
+ self.auth_wait = False
def clear(self) -> None:
'Reset all negotiation knowledge to zero.'
+ self.auth_wait = False
self._challenges.clear()
self._dict.clear()
self.clear()
self.challenge('LS', '302')
return []
- if self._challenged('END'):
+ if self._challenged('END') or self.auth_wait:
return [f'ignoring post-END CAP message not NEW, DEL: {params}']
match params[0]:
case 'LS' | 'LIST':
self._challenge_set(f'REQ:{cap_name}', done=True)
self._dict[cap_name].enabled = params[0] == 'ACK'
if self._challenge_met('LIST'):
- self.challenge('END')
+ if self.could_sasl_plain:
+ self.auth_wait = True
+ else:
+ self.challenge('END')
return (['server capabilities (enabled: "+"):']
+ [cap.str_for_log(cap_name)
for cap_name, cap in self._dict.items()])
@property
def could_sasl_plain(self) -> bool:
'Whether opportunity for some SASL AUTHENTICATE PLAIN attempt.'
- return (self._challenged('END')
- and 'sasl' in self._dict
+ return ('sasl' in self._dict
and 'PLAIN' in self._dict['sasl'].data.split(','))
def _challenge_met(self, step: str) -> bool:
case 'CAP':
for to_log in self._caps.process_msg(msg.params[1:]):
self.log.add(to_log)
- if self._caps.could_sasl_plain and self.conn_setup.password:
- # NB: spec recommends to AUTHENTICATE during, rather than
- # after, CAPS negotation; opting for simplicity now instead
- self.send(IrcMessage('AUTHENTICATE', ('PLAIN',)))
+ if self._caps.auth_wait and self._caps.could_sasl_plain:
+ if self.conn_setup.password:
+ self.send(IrcMessage('AUTHENTICATE', ('PLAIN',)))
+ else:
+ self._caps.challenge('END')
case 'AUTHENTICATE':
if msg.params == ('+',):
auth = b64encode((self.conn_setup.nickname + '\0' +
self.send(IrcMessage('AUTHENTICATE', (auth,)))
case '904':
self.log.alert('SASL authentication failed')
+ case '903' | '904':
+ self._caps.challenge('END')
@dataclass