def clear(self) -> None:
'Reset all negotiation knowledge to zero.'
- self.sasl_wait = False
- self._sent_challenges.clear()
+ self._reset_negotation_steps()
self._db.set('caps_LS', [], confirm=False)
self._db.set('caps_LIST', [], confirm=False)
data=data)
return d
- def _unconfirmed_caps_list(self, key) -> list[str]:
- ret = self._db.get_force(key)[0]
- assert isinstance(ret, list)
- return ret
-
@property
def _availables(self) -> dict[str, str]:
avs: dict[str, str] = {}
- for item in self._unconfirmed_caps_list('caps_LS'):
+ for item in self._db.caps_LS:
toks = item.split('=', maxsplit=1)
avs[toks[0]] = '' if len(toks) == 1 else toks[1]
return avs
match params[0]:
case 'NEW':
for param in params[-1].split():
- self._db.caps_LS.append(param)
+ self._db.append('caps_LS', param)
case 'DEL':
for param in params[-1].split():
del self._db.caps_LS[param]
case 'ACK' | 'NAK':
for name in params[-1].split():
if params[0] == 'ACK':
- self._unconfirmed_caps_list('caps_LIST').append(name)
+ self._db.append('caps_LIST', name)
case 'LS' | 'LIST':
key = f'caps_{params[0]}'
- caps_list, has_finished = self._db.get_force(key)
- assert isinstance(caps_list, list)
- if has_finished:
- caps_list.clear()
- self._db.set(key, caps_list, confirm=False)
+ if getattr(self._db, key) is not None:
+ self._db.set(key, [], confirm=False)
for item in params[-1].strip().split():
- caps_list.append(item)
+ self._db.append(key, item)
if params[1] != '*':
- self._db.set(key, caps_list, confirm=True)
+ self._db.set(key, None, confirm=True)
if self.asdict is not None:
self.sasl_wait = (
'sasl' in self.asdict
self.challenge('LIST')
return False
+ def end_negotiation(self) -> None:
+ 'Stop negotation, without emptying caps DB.'
+ self.challenge('END')
+ self._reset_negotation_steps()
+
+ def _reset_negotation_steps(self) -> None:
+ self.sasl_wait = False
+ self._sent_challenges.clear()
+
def challenge(self, *params) -> None:
'Ensure CAP message of params has been sent, store sended-ness.'
fused = ' '.join(params)
def _unconf_key(self, key) -> str:
return f'_{key}'
- def set(self, key: str, value: int | str | list[str], confirm=False
+ def set(self,
+ key: str,
+ value: Optional[int | str | list[str]],
+ confirm=False
) -> tuple[bool, bool]:
'Ensures setting, returns if changed value or confirmation.'
- retrieval = self.get_force(key)
- confirm_changed = confirm != retrieval[1]
- value_changed = retrieval[0] != value
- if confirm_changed and retrieval[0] is not None:
- del self._dict[self._unconf_key(key) if confirm else key]
- if value_changed or confirm_changed:
- self._dict[key if confirm else self._unconf_key(key)] = value
+ old_value, was_confirmed = self.get_force(key)
+ assert (old_value is not None) or (value is not None)
+ confirm_changed = confirm != was_confirmed
+ key_to_set = key if confirm else self._unconf_key(key)
+ key_to_unset = self._unconf_key(key) if confirm else key
+ value_changed = (value is not None) and value != old_value
+ if value is not None:
+ self._dict[key_to_set] = value
+ if confirm_changed and old_value is not None:
+ if value is None:
+ self._dict[key_to_set] = old_value
+ del self._dict[key_to_unset]
return (value_changed, confirm_changed)
+ def append(self, key, value: str) -> None:
+ 'To list[str] keyed by key, append value.'
+ appendable = self.get_force(key)[0]
+ assert isinstance(appendable, list)
+ appendable.append(value)
+
def get_force(self, key: str) -> tuple[Optional[int | str | list[str]],
bool]:
'Get even if only stored unconfirmed, tell if confirmed was found..'
self._db = ClientDb()
for k in conn_setup._fields:
self._db.set(k, getattr(conn_setup, k), confirm=k != 'nickname')
- if conn_setup.port <= 0:
+ if self._db.port <= 0:
self._db.set('port', PORT_SSL, confirm=True)
- self.client_id = conn_setup.hostname
+ self.client_id = self._db.hostname
self._caps = _CapsManager(self.send, self._db)
super().__init__(client_id=self.client_id, **kwargs)
self._start_connecting()
self._log(to_log, scope=log_target)
self._log(msg.raw, scope=LogScope.RAW, out=True)
- def _update_db(self, key: str, value: int | str, confirm: bool) -> None:
+ def _update_db(self, key: str, value: Optional[int | str], confirm: bool
+ ) -> None:
'Wrap ._db.set into something accessible to subclass extension.'
self._db.set(key, value, confirm)
if self.conn:
self.conn.close()
self.conn = None
- nick_key = 'nickname'
- nickname = self._db.get_force('nickname')[0]
- assert isinstance(nickname, str)
- self._update_db(nick_key, value=nickname, confirm=False)
+ self._update_db('nickname', value=None, confirm=False)
def on_handled_loop_exception(self, e: IrcConnAbortException) -> None:
'Gracefully handle broken connection.'
case '375':
self._db.set('motd', [], False)
case '372':
- motd = self._db.get_force('motd')[0]
- assert isinstance(motd, list)
- motd += [msg.params[-1]]
+ self._db.append('motd', msg.params[-1])
case '376':
- motd = self._db.get_force('motd')[0]
- assert isinstance(motd, list)
- self._db.set('motd', motd, True)
+ self._db.set('motd', None, confirm=True)
for line in self._db.motd:
self._log(line, as_notice=True)
case 'PING':
self._log('trying to authenticate via SASL/plain')
self.send(IrcMessage('AUTHENTICATE', ('PLAIN',)))
else:
- self._caps.challenge('END')
+ self._caps.end_negotiation()
case 'AUTHENTICATE':
if msg.params == ('+',):
auth = b64encode((self._db.conn_setup.nickname + '\0' +
alert = msg.verb == '904'
self._log(f'SASL auth {"failed" if alert else "succeeded"}',
alert=alert)
- self._caps.challenge('END')
+ self._caps.end_negotiation()
case 'JOIN' | 'PART':
user, channel = msg.nick_from_source, msg.params[-1]
self._log(f'{user} {msg.verb.lower()}s {channel}',