From: Christian Heller Date: Sun, 17 Aug 2025 14:07:15 +0000 (+0200) Subject: Improve CAPS processing into database and display. X-Git-Url: https://plomlompom.com/repos/%7B%7B%20web_path%20%7D%7D/%7B%7Bdb.prefix%7D%7D/tasks?a=commitdiff_plain;h=9d0df1d4e1cb7e4d4996babf8330603afbffbc8d;p=ircplom Improve CAPS processing into database and display. --- diff --git a/ircplom/client.py b/ircplom/client.py index b376ad7..1a81847 100644 --- a/ircplom/client.py +++ b/ircplom/client.py @@ -6,7 +6,7 @@ from dataclasses import dataclass, InitVar from enum import Enum, auto from getpass import getuser from threading import Thread -from typing import Callable, Optional +from typing import Any, Callable, Optional # ourselves from ircplom.events import ( AffectiveEvent, CrashingException, ExceptionEvent, QueueMixin) @@ -101,11 +101,46 @@ class ClientQueueMixin(QueueMixin, ClientIdMixin): ).kw(**kwargs)) +class _UpdatingDict: + _on_update: Callable + + def __init__(self) -> None: + self._dict: dict[str, Any] = {} + + def set_on_update(self, name: str, on_update: Callable) -> None: + 'Caller of on_update with path= set to name.' + self._on_update = lambda k: on_update(name, k) + + def clear(self) -> None: + 'Zero dict and send clearance update.' + self._dict.clear() + self._on_update('') + + def has(self, key: str) -> bool: + 'Test if entry of name in dictionary.' + return key in self._dict + + def __getitem__(self, key: str): + return self._dict[key] + + def __setitem__(self, key: str, val: Any) -> None: + self._dict[key] = val + self._on_update(key) + + def __delitem__(self, key: str) -> None: + del self._dict[key] + self._on_update(key) + + @dataclass -class _ServerCapability: +class ServerCapability: + 'Public API for CAP data.' enabled: bool data: str + +class _ServerCapability(ServerCapability): + @staticmethod def split_name_data(raw: str) -> tuple[str, str]: 'Parse version 302 LS listing into cap name and metadata.' @@ -117,14 +152,15 @@ class _CapsManager: def __init__(self, sender: Callable[[IrcMessage], None], - on_update: Callable + caps_dict: _UpdatingDict ) -> None: - self._on_update = lambda: on_update('caps') + self._dict = caps_dict self._send = lambda *params: sender(IrcMessage('CAP', params=params)) self.clear() def clear(self) -> None: 'Zero internal knowledge.' + self._dict.clear() self._ls = _CompletableStringsList() self._list = _CompletableStringsList() self._list_expectations: dict[str, set[str]] = { @@ -132,10 +168,9 @@ class _CapsManager: self._sent_challenges: list[str] = [] def start_negotation(self) -> None: - 'Call .clear, send CAPS LS 302, and then on_update.' + 'Call .clear, send CAPS LS 302.' self.clear() self._send('LS', '302') - self._on_update() def process_msg(self, params: tuple[str, ...]) -> bool: 'Parse CAP params to negot. steps, DB inputs; return if successful.' @@ -144,14 +179,14 @@ class _CapsManager: for item in items: if verb == 'NEW': self._ls.append(item, stay_complete=True) - self._on_update() + self._update_cap(item) elif verb == 'DEL': self._ls.remove(item, stay_complete=True) for name in [ name for name in self._list.visible if _ServerCapability.split_name_data(name)[0] == item]: self._list.remove(name, stay_complete=True) - self._on_update() + self._del_cap(item) elif verb in {'ACK', 'NACK'}: self._list_expectations[verb].add(item) if verb in {'LS', 'LIST'}: @@ -167,7 +202,10 @@ class _CapsManager: assert acks == list_set & acks assert set() == list_set & naks if self._list.is_complete: - self._on_update() + for name, data in [_ServerCapability.split_name_data(item) + for item in self._ls.visible]: + self._dict[name] = _ServerCapability( + name in self._list.visible, data) return True if self._ls.is_complete: availables = [_ServerCapability.split_name_data(item)[0] @@ -188,15 +226,14 @@ class _CapsManager: self._send(*params) self._sent_challenges.append(' '.join(params)) - @property - def as_caps(self) -> dict[str, _ServerCapability]: - 'Interpret ._ls, ._list into proper _ServerCapability listing.' - d = {} - if self._ls.is_complete and self._list.is_complete: - for name, data in [_ServerCapability.split_name_data(item) - for item in self._ls.visible]: - d[name] = _ServerCapability(name in self._list.visible, data) - return d + def _update_cap(self, full_ls_item: str) -> None: + name, data = _ServerCapability.split_name_data(full_ls_item) + is_enabled = name in self._list.visible + self._dict[name] = _ServerCapability(is_enabled, data) + + def _del_cap(self, full_ls_item) -> None: + name, _ = _ServerCapability.split_name_data(full_ls_item) + del self._dict[name] @dataclass @@ -310,38 +347,8 @@ class _ChannelDb(_Db): # channel_modes: str -class _UpdatingDict: - _on_update: Callable - - def __init__(self) -> None: - self._dict: dict[str, str] = {} - - def set_on_update(self, name: str, on_update: Callable) -> None: - 'Caller of on_update with path= set to name.' - self._on_update = lambda k: on_update(name, k) - - def clear(self) -> None: - 'Zero dict and send clearance update.' - self._dict.clear() - self._on_update('') - - def has(self, key: str) -> bool: - 'Test if entry of name in dictionary.' - return key in self._dict - - def __getitem__(self, key: str) -> str: - return self._dict[key] - - def __setitem__(self, key: str, val: str) -> None: - self._dict[key] = val - self._on_update(key) - - def __delitem__(self, key: str) -> None: - del self._dict[key] - self._on_update(key) - - class _ClientDb(_Db, IrcConnSetup): + caps: _UpdatingDict connection_state: str client_host: str isupports: _UpdatingDict @@ -380,7 +387,7 @@ class Client(ABC, ClientQueueMixin): self.client_id = conn_setup.hostname super().__init__(client_id=self.client_id, **kwargs) self._db = _ClientDb(on_update=self._on_update) - self._caps = _CapsManager(self.send, self._on_update) + self._caps = _CapsManager(self.send, self._db.caps) for k in conn_setup.__annotations__: setattr(self._db, k, getattr(conn_setup, k)) if self._db.port <= 0: @@ -503,7 +510,8 @@ class Client(ABC, ClientQueueMixin): self.send(IrcMessage('AUTHENTICATE', (auth,))) case 'CAP' if len(msg.params) > 1: if (self._caps.process_msg(msg.params[1:]) - and (sasl_caps := self._caps.as_caps.get('sasl', None)) + and self._db.caps.has('sasl') + and (sasl_caps := self._db.caps['sasl']) and ('PLAIN' in sasl_caps.data.split(','))): if self._db.password: self._log('trying to authenticate via SASL/plain') diff --git a/ircplom/client_tui.py b/ircplom/client_tui.py index 0d86c95..f82535d 100644 --- a/ircplom/client_tui.py +++ b/ircplom/client_tui.py @@ -8,7 +8,7 @@ from ircplom.tui_base import (BaseTui, PromptWidget, TuiEvent, Window, CMD_SHORTCUTS) from ircplom.irc_conn import IrcMessage from ircplom.client import (Client, ClientQueueMixin, Db, IrcConnSetup, - LogScope, NewClientEvent) + LogScope, NewClientEvent, ServerCapability) CMD_SHORTCUTS['disconnect'] = 'window.disconnect' CMD_SHORTCUTS['join'] = 'window.join' @@ -126,6 +126,12 @@ class _Update: path: str arg: str = '' value: Optional[_DbType] = None + display: str = '' + + def __post_init__(self, **kwargs) -> None: + super().__init__(**kwargs) + if not self.display: + self.display = str(self.value) @property def is_chan(self) -> bool: @@ -165,7 +171,7 @@ class _ChannelDb(_Db): class _TuiClientDb(_Db, IrcConnSetup): - caps: tuple[str] + caps: dict[str, str] client_host: str connection_state: str isupports: dict[str, str] @@ -258,7 +264,7 @@ class _ClientWindowsManager: for item in update.value: self.log(f' {item}', scope=scope, **log_kwargs) elif announcement[-1] == ':': - self.log(f'{announcement} [{update.value}]', + self.log(f'{announcement} [{update.display}]', scope=scope, **log_kwargs) for win in [w for w in self.windows if isinstance(w, _ChatWindow)]: win.set_prompt_prefix() @@ -355,16 +361,8 @@ class _ClientKnowingTui(Client): def _on_update(self, path: str, arg: str = '') -> None: value: Optional[_DbType] = None is_chan = path[0] == '#' - if path == 'caps': - lines: list[str] = [] - for cap_name, cap_entry in self._caps.as_caps.items(): - line = '[*]' if cap_entry.enabled else '[ ]' - line += f' {cap_name}' - if cap_entry.data: - line += f' ({cap_entry.data})' - lines += [line] - value = tuple(lines) - elif arg: + display = '' + if arg: if is_chan and self._db.has_chan(path): is_chan = True if (chan := self._db.chan(path)) and hasattr(chan, arg): @@ -373,6 +371,11 @@ class _ClientKnowingTui(Client): d = getattr(self._db, path) if d.has(arg): value = d[arg] + if isinstance(value, ServerCapability): + display = 'ENABLED' if value.enabled else 'available' + if value.data: + display += f' ({value.data})' elif (not is_chan) and not self._db.needs_arg(path): value = getattr(self._db, path) - self._client_tui_trigger('update_db', update=_Update(path, arg, value)) + self._client_tui_trigger('update_db', update=_Update( + path, arg, value, display))