self._dict.clear()
self._on_update('')
+ @staticmethod
+ def key_val_from_eq_str(eq_str: str) -> tuple[str, str]:
+ 'Split eq_str by "=", and if none, into eq_str and "".'
+ toks = eq_str.split('=', maxsplit=1)
+ return toks[0], '' if len(toks) == 1 else toks[1]
+
+ def set_from_eq_str(self, eq_str: str, cls=str) -> None:
+ 'Set from .key_val_from_eq_str result, to cls(val).'
+ key, value = self.key_val_from_eq_str(eq_str)
+ self[key] = cls(value)
+
def __getitem__(self, key: str):
return self._dict[key]
@dataclass
class ServerCapability:
'Public API for CAP data.'
- enabled: bool
data: str
-
-
-class _ServerCapability(ServerCapability):
-
- @staticmethod
- def split_name_data(raw: str) -> tuple[str, str]:
- 'Parse version 302 LS listing into cap name and metadata.'
- toks = raw.split('=', maxsplit=1)
- return (toks[0], '' if len(toks) == 1 else toks[1])
+ enabled: bool = False
class _CapsManager:
'Parse CAP message to negot. steps, DB inputs; return if successful.'
for item in items:
if verb == 'NEW':
- name, data = _ServerCapability.split_name_data(item)
- self._dict[name] = _ServerCapability(False, data)
+ self._dict.set_from_eq_str(item, ServerCapability)
elif verb == 'DEL':
- name, _ = _ServerCapability.split_name_data(item)
- del self._dict[name]
+ del self._dict[item]
elif verb in {'ACK', 'NACK'}:
self._list_expectations[verb].add(item)
if verb in {'LS', 'LIST'}:
if complete:
target.complete()
if target == self._ls:
- availables = [_ServerCapability.split_name_data(item)[0]
- for item in self._ls.completed]
- for cap_name in [n for n in _NAMES_DESIRED_SERVER_CAPS
- if n in availables]:
+ for cap_name in _NAMES_DESIRED_SERVER_CAPS:
self._send('REQ', cap_name)
self._send('LIST')
elif target == self._list:
list_set = set(target.completed)
assert acks == list_set & acks
assert set() == list_set & naks
- for name, data in [_ServerCapability.split_name_data(item)
- for item in self._ls.completed]:
- self._dict[name] = _ServerCapability(
- name in self._list.completed, data)
+ for key, data in [_UpdatingDict.key_val_from_eq_str(entry)
+ for entry in self._ls.completed]:
+ self._dict[key] = ServerCapability(
+ data=data, enabled=key in self._list.completed)
return True
return False
self._db.user_id(ret[arg])
if ret['verb'] == '005': # RPL_ISUPPORT
for item in ret['isupports']:
- toks = item.split('=', maxsplit=1)
- if toks[0][0] == '-':
- del self._db.isupports[toks[0][1:]]
+ if item[0] == '-':
+ del self._db.isupports[item[1:]]
else:
- self._db.isupports[toks[0]] = (toks[1] if len(toks) > 1
- else '')
+ self._db.isupports.set_from_eq_str(str(item))
elif ret['verb'] == '353': # RPL_NAMREPLY
for user_id in [
self._db.user_id(name.lstrip(_ILLEGAL_NICK_FIRSTCHARS))