enabled: bool
data: str
+ @staticmethod
+ def split_name_data(raw: str) -> tuple[str, str]:
+ 'Parse version 302 LS listing into cap name and metadata.'
+ toks = raw.split('=', maxsplit=1)
+ return (toks[0], '' if len(toks) == 1 else toks[1])
+
class _CapsManager:
def __init__(self, sender: Callable[[IrcMessage], None], db: 'ClientDb',
) -> None:
self._db = db
- self._send = sender
+ self._send = lambda *params: sender(IrcMessage('CAP', params=params))
self._sent_challenges: list[str] = []
- self.sasl_wait = False
-
- def clear(self) -> None:
- 'Reset all negotiation knowledge to zero.'
- self._reset_negotation_steps()
-
- @property
- def asdict(self) -> Optional[dict[str, _ServerCapability]]:
- 'Return acquired knowledge in optimized format.'
- if (self._db.caps_LS is None) or (self._db.caps_LIST is None):
- return None
- d: dict[str, _ServerCapability] = {}
- for cap_name, data in self._availables.items():
- d[cap_name] = _ServerCapability(
- enabled=cap_name in self._db.caps_LIST,
- data=data)
- return d
-
- @property
- def _availables(self) -> dict[str, str]:
- avs: dict[str, str] = {}
- for item in self._db.caps_LS:
- toks = item.split('=', maxsplit=1)
- avs[toks[0]] = '' if len(toks) == 1 else toks[1]
- return avs
+ self._send('LS', '302')
def process_msg(self, params: tuple[str, ...]) -> bool:
'Parse CAP params to negot. steps, DB inputs; return if successful.'
self._db.append(key, item)
if params[1] != '*':
self._db.set(key, None, confirm=True)
- if self.asdict is not None:
- self.sasl_wait = (
- 'sasl' in self.asdict
- and 'PLAIN' in self.asdict['sasl'].data.split(','))
- if not self.sasl_wait:
- self.challenge('END')
- return True
- if self._db.caps_LS is not None:
- for cap_name in [n for n in _NAMES_DESIRED_SERVER_CAPS
- if n in self._availables]:
- self.challenge('REQ', cap_name)
- self.challenge('LIST')
+ if self._db.caps_LIST is not None:
+ return True
+ if self._db.caps_LS is not None:
+ availables = [_ServerCapability.split_name_data(item)[0]
+ for item in self._db.caps_LS]
+ for cap_name in [n for n in _NAMES_DESIRED_SERVER_CAPS
+ if n in availables]:
+ self._challenge('REQ', cap_name)
+ self._challenge('LIST')
return False
def end_negotiation(self) -> None:
'Stop negotation, without emptying caps DB.'
- self.challenge('END')
- self._reset_negotation_steps()
+ self._send('END')
- def _reset_negotation_steps(self) -> None:
- self.sasl_wait = False
- self._sent_challenges.clear()
-
- def challenge(self, *params) -> None:
+ def _challenge(self, *params) -> None:
'Ensure CAP message of params has been sent, store sended-ness.'
fused = ' '.join(params)
if fused not in self._sent_challenges:
- self._send(IrcMessage('CAP', params=params))
+ self._send(*params)
self._sent_challenges.append(' '.join(params))
kwargs[field_name] = self._dict.get(field_name, None)
return IrcConnSetup(**kwargs)
+ @property
+ def caps(self) -> dict[str, _ServerCapability]:
+ 'Interpret .caps_LS, .caps_LIST into proper _ServerCapability listing.'
+ d: dict[str, _ServerCapability] = {}
+ if None not in (self.caps_LS, self.caps_LIST):
+ for name, data in [_ServerCapability.split_name_data(item)
+ for item in self.caps_LS]:
+ d[name] = _ServerCapability(name in self.caps_LIST, data)
+ return d
+
class Client(ABC, ClientQueueMixin):
'Abstracts socket connection, loop over it, and handling messages from it.'
- nick_confirmed: bool = False
+ _caps: _CapsManager
conn: Optional[_IrcConnection] = None
def __init__(self, conn_setup: IrcConnSetup, **kwargs) -> None:
if self._db.port <= 0:
self._db.set('port', PORT_SSL, confirm=True)
self.client_id = self._db.hostname
- self._caps = _CapsManager(self.send, self._db)
self._prev_verb = ''
super().__init__(client_id=self.client_id, **kwargs)
self._start_connecting()
self._log('connected to server (SSL: '
f'{"yes" if self.conn.ssl else "no"})',
scope=LogScope.ALL)
- self._caps.challenge('LS', '302')
+ self._caps = _CapsManager(self.send, self._db)
conn_setup = self._db.conn_setup # for type-checks and to include
self.send(IrcMessage( # … unconfirmed .nickname
verb='USER', params=(getuser(), '0', '*', conn_setup.realname)))
def close(self) -> None:
'Close both recv Loop and socket.'
self._log(msg='disconnecting from server …', scope=LogScope.ALL)
- self._caps.clear()
if self.conn:
self.conn.close()
self.conn = None
sender=msg.nick_from_source, channel=msg.params[0])
case 'CAP':
if self._caps.process_msg(msg.params[1:]):
- self._log('', caps=self._caps.asdict)
- if self._caps.sasl_wait:
- if self._db.password:
- self._log('trying to authenticate via SASL/plain')
- self.send(IrcMessage('AUTHENTICATE', ('PLAIN',)))
- else:
- self._caps.end_negotiation()
+ self._log('', caps=self._db.caps)
+ if (sasl_caps := self._db.caps.get('sasl', None))\
+ and 'PLAIN' in sasl_caps.data.split(','):
+ if self._db.password:
+ self._log('trying to authenticate via SASL/plain')
+ self.send(IrcMessage('AUTHENTICATE', ('PLAIN',)))
+ else:
+ self._caps.end_negotiation()
case 'AUTHENTICATE':
if msg.params == ('+',):
auth = b64encode((self._db.conn_setup.nickname + '\0' +