self._send = sender
self._challenges: dict[str, bool] = {}
self._dict: dict[str, _ServerCapability] = {}
- self.auth_wait = False
+ self.sasl_wait = False
def clear(self) -> None:
'Reset all negotiation knowledge to zero.'
- self.auth_wait = False
+ self.sasl_wait = False
self._challenges.clear()
self._dict.clear()
self.clear()
self.challenge('LS', '302')
return []
- if self._challenged('END') or self.auth_wait:
+ if self._challenged('END') or self.sasl_wait:
return [f'ignoring post-END CAP message not NEW, DEL: {params}']
match params[0]:
case 'LS' | 'LIST':
self._challenge_set(f'REQ:{cap_name}', done=True)
self._dict[cap_name].enabled = params[0] == 'ACK'
if self._challenge_met('LIST'):
- if self.could_sasl_plain:
- self.auth_wait = True
- else:
+ self.sasl_wait = (
+ 'sasl' in self._dict
+ and 'PLAIN' in self._dict['sasl'].data.split(','))
+ if not self.sasl_wait:
self.challenge('END')
return (['server capabilities (enabled: "+"):']
+ [cap.str_for_log(cap_name)
self._send(IrcMessage(verb='CAP', params=params))
self._challenge_set(challenge_key)
- @property
- def could_sasl_plain(self) -> bool:
- 'Whether opportunity for some SASL AUTHENTICATE PLAIN attempt.'
- return ('sasl' in self._dict
- and 'PLAIN' in self._dict['sasl'].data.split(','))
-
def _challenge_met(self, step: str) -> bool:
return self._challenges.get(step, False)
case 'CAP':
for to_log in self._caps.process_msg(msg.params[1:]):
self.log.add(to_log)
- if self._caps.auth_wait and self._caps.could_sasl_plain:
+ if self._caps.sasl_wait:
if self.conn_setup.password:
self.send(IrcMessage('AUTHENTICATE', ('PLAIN',)))
else:
self.conn_setup.password
).encode('utf-8')).decode('utf-8')
self.send(IrcMessage('AUTHENTICATE', (auth,)))
- case '904':
- self.log.alert('SASL authentication failed')
case '903' | '904':
+ if msg.verb == '904':
+ self.log.alert('SASL authentication failed')
self._caps.challenge('END')