From: Christian Heller Date: Sat, 16 Aug 2025 23:11:38 +0000 (+0200) Subject: Clean up database transfers between Client and TUI. X-Git-Url: https://plomlompom.com/repos/%7B%7B%20web_path%20%7D%7D/%7B%7Bprefix%7D%7D/template?a=commitdiff_plain;h=5d66fc813bb42ef3260b5db1a59487faa7480658;p=ircplom Clean up database transfers between Client and TUI. --- diff --git a/ircplom/client.py b/ircplom/client.py index 3874574..834104c 100644 --- a/ircplom/client.py +++ b/ircplom/client.py @@ -6,7 +6,7 @@ from dataclasses import dataclass, InitVar from enum import Enum, auto from getpass import getuser from threading import Thread -from typing import Callable, NamedTuple, Optional +from typing import Callable, Optional # ourselves from ircplom.events import ( AffectiveEvent, CrashingException, ExceptionEvent, QueueMixin) @@ -14,7 +14,7 @@ from ircplom.irc_conn import (BaseIrcConnection, IrcConnAbortException, IrcMessage, PORT_SSL) ClientsDb = dict[str, 'Client'] -ClientDbType = int | str | list[str] +CLEAR_WORD = 'CLEAR' _NAMES_DESIRED_SERVER_CAPS = ('server-time', 'account-tag', 'sasl') # NB: in below numerics accounting, tuples define inclusive ranges @@ -46,7 +46,6 @@ class LogScope(Enum): class _Numerics: - 'To easen dealing with numeric replies.' def __init__(self, numerics: tuple[int | tuple[int, int], ...]) -> None: as_ints = [] @@ -105,7 +104,6 @@ class ClientQueueMixin(QueueMixin, ClientIdMixin): @dataclass class _ServerCapability: - 'Store data collectable via CAPS LS/LIST/NEW.' enabled: bool data: str @@ -118,41 +116,67 @@ class _ServerCapability: class _CapsManager: - def __init__(self, sender: Callable[[IrcMessage], None], db: '_ClientDb', + def __init__(self, + sender: Callable[[IrcMessage], None], + on_update: Callable ) -> None: - self._db = db + self._on_update = lambda: on_update('caps') self._send = lambda *params: sender(IrcMessage('CAP', params=params)) + self.clear() + + def clear(self) -> None: + 'Zero internal knowledge.' + self._ls = _CompletableStringsList() + self._list = _CompletableStringsList() + self._list_expectations: dict[str, set[str]] = { + 'ACK': set(), 'NAK': set()} self._sent_challenges: list[str] = [] + + def start_negotation(self) -> None: + 'Call .clear, send CAPS LS 302, and then on_update.' + self.clear() self._send('LS', '302') + self._on_update() def process_msg(self, params: tuple[str, ...]) -> bool: 'Parse CAP params to negot. steps, DB inputs; return if successful.' - match params[0]: - case 'NEW': - for param in params[-1].split(): - self._db.append('caps_LS', param, keep_confirmed=True) - case 'DEL': - for param in params[-1].split(): - self._db.remove('caps_LS', param) - case 'ACK' | 'NAK': - for name in params[-1].split(): - if params[0] == 'ACK': - self._db.append('caps_LIST', name) - case 'LS' | 'LIST': - key = f'caps_{params[0]}' - for item in params[-1].strip().split(): - self._db.append(key, item) - if params[1] != '*': - self._db.confirm(key) - if self._db.confirmed('caps_LIST'): + verb = params[0] + items = params[-1].strip().split() + for item in items: + if verb == 'NEW': + self._ls.append(item, stay_complete=True) + self._on_update() + elif verb == 'DEL': + self._ls.remove(item, stay_complete=True) + for name in [ + name for name in self._list.visible + if _ServerCapability.split_name_data(name)[0] == item]: + self._list.remove(name, stay_complete=True) + self._on_update() + elif verb in {'ACK', 'NACK'}: + self._list_expectations[verb].add(item) + if verb in {'LS', 'LIST'}: + target = getattr(self, f'_{verb.lower()}') + for item in items: + target.append(item) + if params[1] != '*': + target.is_complete = True + if target == self._list: + acks = self._list_expectations['ACK'] + naks = self._list_expectations['NAK'] + list_set = set(self._list.visible) + assert acks == list_set & acks + assert set() == list_set & naks + if self._list.is_complete: + self._on_update() return True - if self._db.confirmed('caps_LS'): + if self._ls.is_complete: availables = [_ServerCapability.split_name_data(item)[0] - for item in self._db.caps_LS] + for item in self._ls.visible] for cap_name in [n for n in _NAMES_DESIRED_SERVER_CAPS if n in availables]: self._challenge('REQ', cap_name) - self._challenge('LIST') + self._challenge('LIST') return False def end_negotiation(self) -> None: @@ -160,121 +184,152 @@ class _CapsManager: self._send('END') def _challenge(self, *params) -> None: - 'Ensure CAP message of params has been sent, store sended-ness.' fused = ' '.join(params) if fused not in self._sent_challenges: self._send(*params) self._sent_challenges.append(' '.join(params)) + @property + def as_caps(self) -> dict[str, _ServerCapability]: + 'Interpret ._ls, ._list into proper _ServerCapability listing.' + d = {} + if self._ls.is_complete and self._list.is_complete: + for name, data in [_ServerCapability.split_name_data(item) + for item in self._ls.visible]: + d[name] = _ServerCapability(name in self._list.visible, data) + return d + -class IrcConnSetup(NamedTuple): +@dataclass +class IrcConnSetup: 'All we need to know to set up a new Client connection.' - hostname: str - port: int - nickname: str - realname: str - password: str + hostname: str = '' + port: int = 0 + nickname: str = '' + realname: str = '' + password: str = '' -class _Db: - 'For values of variable confirmation, and reading in multi-line lists.' +class Db: + 'Helper with some conveniences around annotated attributes.' - def __init__(self) -> None: - annos = {} + def __init__(self, **kwargs) -> None: + self._types: dict[str, type] = {} for c in self.__class__.__mro__: if hasattr(c, '__annotations__'): - for k in [k for k in c.__annotations__ if k not in annos]: - annos[k] = c.__annotations__[k] - for name, type_ in annos.items(): - if type_ is int: - setattr(self, name, 0) - elif type_ is str: - setattr(self, name, '') - elif hasattr(type_, '__origin__') and type_.__origin__ is list: - setattr(self, name, []) - else: - setattr(self, name, {}) - self._confirmeds: set[str] = set() - - def set(self, key: str, value: Optional[ClientDbType], confirm=False - ) -> tuple[bool, bool]: - 'Set value at key and its confirmation, return what of either changed.' - value_changed = False - if value is not None and value != getattr(self, key, None): - value_changed = True - setattr(self, key, value) - confirm_changed = self.confirm(key, confirm) - return (value_changed, confirm_changed) - - def confirm(self, key, confirm=True) -> bool: - 'Declare value at key confirmed (or the opposite.' - if confirm and key not in self._confirmeds: - self._confirmeds.add(key) - return True - if (not confirm) and key in self._confirmeds: - self._confirmeds.remove(key) - return True - return False + self._types = c.__annotations__ | self._types + self._set_empty_defaults() + super().__init__(**kwargs) - def append(self, key: str, value: str, keep_confirmed=False) -> None: - 'To list at key add value; if not keep_confirmed, unconfirm value.' - if (not keep_confirmed) and key in self._confirmeds: - self._confirmeds.remove(key) - getattr(self, key).append(value) + def _set_empty_defaults(self) -> None: + for name, type_ in self._types.items(): + setattr(self, name, type_()) - def remove(self, key: str, value: str) -> None: - 'From list at key remove value.' - getattr(self, key).remove(value) + def _typecheck(self, key: str, value) -> None: + type_ = self._types[key] + if hasattr(type_, '__origin__'): + assert isinstance(value, type_.__origin__) + if len(value): + assert hasattr(type_, '__args__') + item_type = type_.__args__[0] + for item in value: + assert isinstance(item, item_type) + else: + assert isinstance(value, type_) -class ClientDbBase(_Db): - 'DB exposable to TUI.' - client_host: str - isupports: list[str] - motd: list[str] - nickname: str - user_modes: str +class _CompletableStringsList: + + def __init__(self) -> None: + self._items: list[str] = [] + self.is_complete = False - def confirmed(self, key: str) -> bool: - 'If value at key be confirmed or not.' - return key in self._confirmeds + @property + def visible(self) -> tuple[str, ...]: + 'Tuple of collected items if .is_complete, otherwise empty.' + return tuple(self._items) if self.is_complete else tuple() + + def _stay_complete(self, assert_complete: bool) -> None: + if assert_complete: + assert self.is_complete + else: + if self.is_complete: + self._items.clear() + self.is_complete = False + + def append(self, item: str, stay_complete=False) -> None: + 'Append to list; if .is_complete yet not stay_complete, tabula rasa.' + self._stay_complete(stay_complete) + self._items.append(item) + + def remove(self, item: str, stay_complete=False) -> None: + 'Remove from list; if .is_complete yet not stay_complete, tabula rasa.' + self._stay_complete(stay_complete) + self._items.remove(item) + + +class _Db(Db): + + def __init__(self, on_update: Callable, **kwargs) -> None: + self._still_on_init = True + self._on_update = on_update + super().__init__(**kwargs) + self._still_on_init = False + + def __setattr__(self, key: str, value) -> None: + if (not hasattr(self, '_still_on_init')) or self._still_on_init: + super().__setattr__(key, value) + return + self._typecheck(key, value) + super().__setattr__(key, value) + if key[0] != '_': + self._on_update(key) + + def __getattribute__(self, key: str): + if key[0] != '_': + compl_key = self._completable_key(key) + if hasattr(self, compl_key): + return getattr(self, compl_key).visible + return super().__getattribute__(key) + + @staticmethod + def _completable_key(key): + return f'_completable_{key}' + + def append_completable(self, key: str, value: str) -> None: + 'To completable list of key append value.' + getattr(self, self._completable_key(key)).append(value) + + def declare_complete(self, key: str) -> None: + 'Declare completable at key complete.' + getattr(self, self._completable_key(key)).is_complete = True + self._on_update(key) class _ChannelDb(_Db): - users: list[str] - topic: str - channel_modes: str - - -class _ClientDb(ClientDbBase): - caps_LS: list[str] - caps_LIST: list[str] - hostname: str - password: str - port: int - realname: str - _channels: dict[str, _ChannelDb] + _completable_users: _CompletableStringsList + # topic: str + # channel_modes: str - @property - def caps(self) -> dict[str, _ServerCapability]: - 'Interpret .caps_LS, .caps_LIST into proper _ServerCapability listing.' - d: dict[str, _ServerCapability] = {} - if None not in (self.caps_LS, self.caps_LIST): - for name, data in [_ServerCapability.split_name_data(item) - for item in self.caps_LS]: - d[name] = _ServerCapability(name in self.caps_LIST, data) - return d - def del_channel(self, name: str) -> None: +class _ClientDb(_Db, IrcConnSetup): + client_host: str + nickname_confirmed: bool + user_modes: str + _completable_motd: _CompletableStringsList + _completable_isupports: _CompletableStringsList + _channels: dict[str, _ChannelDb] + + def del_chan(self, name: str) -> None: 'Remove DB for channel of name.' del self._channels[name] + self._on_update(f'{name} {CLEAR_WORD}') - def channel(self, name: str) -> _ChannelDb: + def chan(self, name: str) -> _ChannelDb: 'Produce DB for channel of name – pre-existing, or newly created.' - if self._channels is None: - self._channels = {} if name not in self._channels: - self._channels[name] = _ChannelDb() + self._channels[name] = _ChannelDb( + on_update=lambda k: self._on_update(f'{name} {k}')) return self._channels[name] @@ -284,16 +339,15 @@ class Client(ABC, ClientQueueMixin): conn: Optional[_IrcConnection] = None def __init__(self, conn_setup: IrcConnSetup, **kwargs) -> None: - self._db = _ClientDb() - for k in conn_setup._fields: + self.client_id = conn_setup.hostname + super().__init__(client_id=self.client_id, **kwargs) + self._caps = _CapsManager(self.send, self._on_update) + self._db = _ClientDb(on_update=self._on_update) + for k in conn_setup.__annotations__: setattr(self._db, k, getattr(conn_setup, k)) if self._db.port <= 0: self._db.port = PORT_SSL - self.client_id = self._db.hostname self._prev_verb = '' - super().__init__(client_id=self.client_id, **kwargs) - for k in IrcConnSetup._fields: - self._update_db(k, None, confirm=k != 'nickname') self._start_connecting() def _start_connecting(self) -> None: @@ -302,10 +356,9 @@ class Client(ABC, ClientQueueMixin): try: if self.conn: raise IrcConnAbortException('already connected') - self.conn = _IrcConnection(hostname=self._db.hostname, - port=self._db.port, - _q_out=self._q_out, - client_id=self.client_id) + self.conn = _IrcConnection( + hostname=self._db.hostname, port=self._db.port, + _q_out=self._q_out, client_id=self.client_id) self._client_trigger('_on_connect') except IrcConnAbortException as e: self._log(f'failed to connect: {e}', alert=True) @@ -316,13 +369,16 @@ class Client(ABC, ClientQueueMixin): self._log('connecting …') Thread(target=connect, daemon=True, args=(self,)).start() + @abstractmethod + def _on_update(self, key: str) -> None: + pass + def _on_connect(self) -> None: - 'Steps to perform right after connection.' assert self.conn is not None self._log('connected to server (SSL: ' f'{"yes" if self.conn.ssl else "no"})', scope=LogScope.ALL) - self._caps = _CapsManager(self.send, self._db) + self._caps.start_negotation() self.send(IrcMessage(verb='USER', params=(getuser(), '0', '*', self._db.realname))) self.send(IrcMessage(verb='NICK', params=(self._db.nickname,))) @@ -350,18 +406,13 @@ class Client(ABC, ClientQueueMixin): self._log(to_log, scope=log_target) self._log(msg.raw, scope=LogScope.RAW, out=True) - def _update_db(self, key: str, value: Optional[ClientDbType], confirm: bool - ) -> tuple[bool, bool]: - 'Wrap ._db.set into something accessible to subclass extension.' - return self._db.set(key, value, confirm) - def close(self) -> None: 'Close both recv Loop and socket.' self._log(msg='disconnecting from server …', scope=LogScope.ALL) if self.conn: self.conn.close() self.conn = None - self._update_db('nickname', value=None, confirm=False) + self._db.nickname_confirmed = False def on_handled_loop_exception(self, e: IrcConnAbortException) -> None: 'Gracefully handle broken connection.' @@ -372,10 +423,11 @@ class Client(ABC, ClientQueueMixin): 'Log msg.raw, then process incoming msg into appropriate client steps.' self._log(msg.raw, scope=LogScope.RAW, out=False) if msg.verb != self._prev_verb and self._prev_verb == '005': - self._update_db('isupports', None, True) + self._db.declare_complete('isupports') self._prev_verb = msg.verb if _NumericsToConfirmNickname.contain(msg.verb): - self._update_db('nickname', value=msg.params[0], confirm=True) + self._db.nickname = msg.params[0] + self._db.nickname_confirmed = True if _NumericsToIgnore.contain(msg.verb): return if msg.verb in {'JOIN', 'PART'} and len(msg.params) >= 1: @@ -385,21 +437,21 @@ class Client(ABC, ClientQueueMixin): match msg.verb: case '005' if len(msg.params) > 2: for param in msg.params[1:-1]: - self._db.append('isupports', param) + self._db.append_completable('isupports', param) case '353' if len(msg.params) == 4: for user in msg.params[3].split(): - self._db.channel(msg.params[2]).append('users', user) + self._db.chan(msg.params[2] + ).append_completable('users', user) case '366' if len(msg.params) == 3: - self._db.channel(msg.params[1]).set('users', None, True) + self._db.chan(msg.params[1]).declare_complete('users') case '372' if len(msg.params) == 2: # RPL_MOTD - self._db.append('motd', msg.params[-1]) + self._db.append_completable('motd', msg.params[1]) case '376' if len(msg.params) == 2: # RPL_ENDOFMOTD - self._update_db('motd', None, True) + self._db.declare_complete('motd') case '396' if len(msg.params) == 3: # RPL_VISIBLEHOST # '@'-split because # claims: " can also be in the form " - self._update_db('client_host', confirm=True, - value=msg.params[1].split('@')[-1]) + self._db.client_host = msg.params[1].split('@')[-1] case 'AUTHENTICATE' if msg.params == ('+',): auth = b64encode((self._db.nickname + '\0' + self._db.nickname + '\0' + @@ -407,28 +459,27 @@ class Client(ABC, ClientQueueMixin): ).encode('utf-8')).decode('utf-8') self.send(IrcMessage('AUTHENTICATE', (auth,))) case 'CAP' if len(msg.params) > 1: - if self._caps.process_msg(msg.params[1:]): - self._log('', caps=self._db.caps) - if (sasl_caps := self._db.caps.get('sasl', None))\ - and 'PLAIN' in sasl_caps.data.split(','): - if self._db.password: - self._log('trying to authenticate via SASL/plain') - self.send(IrcMessage('AUTHENTICATE', ('PLAIN',))) - else: - self._caps.end_negotiation() + if (self._caps.process_msg(msg.params[1:]) + and (sasl_caps := self._caps.as_caps.get('sasl', None)) + and ('PLAIN' in sasl_caps.data.split(','))): + if self._db.password: + self._log('trying to authenticate via SASL/plain') + self.send(IrcMessage('AUTHENTICATE', ('PLAIN',))) + else: + self._caps.end_negotiation() case 'ERROR' if len(msg.params) == 1: self.close() case 'JOIN' if len(msg.params) == 1: if msg.nick_from_source != self._db.nickname: - self._db.channel(channel).append( - 'users', msg.nick_from_source, keep_confirmed=True) + self._db.chan(channel).users.append(msg.nick_from_source) self._log(log_msg, scope=scope, channel=channel) case 'MODE' if (len(msg.params) == 2 and msg.params[0] == self._db.nickname): - self._update_db('user_modes', msg.params[1], True) + self._db.user_modes = msg.params[1] case 'NICK' if (len(msg.params) == 1 and msg.nick_from_source == self._db.nickname): - self._update_db('nickname', msg.params[0], confirm=True) + self._db.nickname = msg.params[0] + self._db.nickname_confirmed = True case 'NOTICE' | 'PRIVMSG' if len(msg.params) == 2: scope = LogScope.CHAT if '!' in msg.source else LogScope.SERVER self._log(msg.params[-1], scope=scope, channel=msg.params[0], @@ -439,10 +490,9 @@ class Client(ABC, ClientQueueMixin): log_msg += f': {msg.params[1]}' self._log(log_msg, scope=scope, channel=channel) if msg.nick_from_source == self._db.nickname: - self._db.del_channel(channel) + self._db.del_chan(channel) else: - self._db.channel(channel).remove('users', - msg.nick_from_source) + self._db.chan(channel).users.remove(msg.nick_from_source) case 'PING' if len(msg.params) == 1: self.send(IrcMessage(verb='PONG', params=(msg.params[0],))) case '903' | '904' if len(msg.params) == 2: diff --git a/ircplom/client_tui.py b/ircplom/client_tui.py index 0e1a7d5..cfe7ed4 100644 --- a/ircplom/client_tui.py +++ b/ircplom/client_tui.py @@ -7,8 +7,8 @@ from typing import Callable, Optional, Sequence from ircplom.tui_base import (BaseTui, PromptWidget, TuiEvent, Window, CMD_SHORTCUTS) from ircplom.irc_conn import IrcMessage -from ircplom.client import (IrcConnSetup, Client, ClientDbBase, ClientDbType, - ClientQueueMixin, LogScope, NewClientEvent) +from ircplom.client import (Client, ClientQueueMixin, Db, IrcConnSetup, + LogScope, NewClientEvent, CLEAR_WORD) CMD_SHORTCUTS['disconnect'] = 'window.disconnect' CMD_SHORTCUTS['join'] = 'window.join' @@ -22,6 +22,8 @@ _LOG_PREFIX_SERVER = '$' _LOG_PREFIX_OUT = '>' _LOG_PREFIX_IN = '<' +_DbType = bool | int | str | tuple[str, ...] + class _ClientWindow(Window, ClientQueueMixin): @@ -119,14 +121,49 @@ class _ChannelWindow(_ChatWindow): self._send_msg('PART', (self.chatname,)) +class _Db(Db): + + def set_and_check_for_change(self, key: str, value: _DbType) -> bool: + 'To attribute of key set value, reply if that changed anything.' + self._typecheck(key, value) + old_value = getattr(self, key) + setattr(self, key, value) + return value != old_value + + +class _ChannelDb(_Db): + users: tuple[str, ...] + + def set_and_check_for_change(self, key: str, value: _DbType) -> bool: + if key == CLEAR_WORD: + self._set_empty_defaults() + return True + return super().set_and_check_for_change(key, value) + + +class _TuiClientDb(_Db, IrcConnSetup): + caps: tuple[str] + client_host: str + isupports: tuple[str] + motd: tuple[str] + nickname_confirmed: bool + user_modes: str + _channels: dict[str, _ChannelDb] + + def chan(self, name: str) -> _ChannelDb: + 'Produce DB for channel of name – pre-existing, or newly created.' + if name not in self._channels: + self._channels[name] = _ChannelDb() + return self._channels[name] + + @dataclass class _ClientWindowsManager: _tui_log: Callable _tui_new_window: Callable def __post_init__(self, *_, **__) -> None: - self._db = ClientDbBase() - self._db.set('nickname', '?', confirm=False) + self._db = _TuiClientDb() self.windows: list[_ClientWindow] = [] for scope in (LogScope.SERVER, LogScope.RAW): self._new_win(scope) @@ -137,8 +174,8 @@ class _ClientWindowsManager: kwargs['win_cls'] = (_ChannelWindow if chatname[0] == '#' else _ChatWindow) kwargs['chatname'] = chatname - kwargs['get_nick_data'] = lambda: ( - self._db.nickname, self._db.confirmed('nickname')) + kwargs['get_nick_data'] = lambda: (self._db.nickname, + self._db.nickname_confirmed) win = self._tui_new_window(**kwargs) self.windows += [win] return win @@ -167,24 +204,27 @@ class _ClientWindowsManager: prefix = f'{first_char}{sender_label}' self._tui_log(msg, scope=scope, prefix=prefix, **kwargs) - def update(self, - key: str, - value: Optional[ClientDbType], - confirmed: bool, - scope: LogScope - ) -> bool: - 'Apply settings in kwargs, follow representation update triggers.' - changes = self._db.set(key, value, confirmed) - for i, t in enumerate((('', value), ('confirmation of ', confirmed))): - if changes[i] and (i == 0 # re confirms only care about nickname - or key == 'nickname'): - announcement = f'changing {t[0]}{key} to:' - if isinstance(t[1], list): - self.log(announcement, scope=scope) - for item in t[1]: - self.log(f' {item}', scope=scope) - continue - self.log(f'changing {t[0]}{key} to: [{t[1]}]', scope=scope) + def update_db(self, key: str, value: _DbType) -> bool: + 'Ensure key at value, follow representation update triggers.' + db: _TuiClientDb | _ChannelDb = self._db + scope = LogScope.SERVER + log_kwargs: dict[str, str] = {} + if ' ' in key: + chan_name, key = key.split() + db = self._db.chan(chan_name) + scope = LogScope.CHAT + log_kwargs |= {'channel': chan_name} + if not db.set_and_check_for_change(key, value): + return False + if key != CLEAR_WORD: + announcement = f'changed {key} to:' + if isinstance(value, tuple): + self.log(announcement, scope=scope, **log_kwargs) + for item in value: + self.log(f' {item}', scope=scope, **log_kwargs) + else: + self.log(f'{announcement} [{value}]', + scope=scope, **log_kwargs) for win in [w for w in self.windows if isinstance(w, _ChatWindow)]: win.set_prompt_prefix() @@ -272,25 +312,27 @@ class _ClientKnowingTui(Client): to_log = [] if msg: to_log += [msg] - if 'caps' in kwargs: - to_log += ['server capabilities (enabled: "+"):'] - for cap_name, cap in kwargs['caps'].items(): - listing = '+' if cap.enabled else '-' - listing += f' {cap_name}' - if cap.data: - listing += f' ({cap.data})' - to_log += [listing] for item in to_log: self._client_tui_trigger('log', scope=scope, msg=item, **kwargs) if scope == LogScope.RAW: with open(f'{self.client_id}.log', 'a', encoding='utf8') as f: f.write(('>' if kwargs['out'] else '<') + f' {msg}\n') - def _update_db(self, key: str, value: Optional[ClientDbType], confirm: bool - ) -> tuple[bool, bool]: - value_changed, conf_changed = super()._update_db(key, value, confirm) - if value is None and not value_changed: # local ._db may have fallback - value = getattr(self._db, key) # values Tui._db doesn't - self._client_tui_trigger('update', scope=LogScope.SERVER, - key=key, value=value, confirmed=confirm) - return (value_changed, conf_changed) + def _on_update(self, key: str) -> None: + value: _DbType + if key == 'caps': + lines: list[str] = [] + for cap_name, cap_entry in self._caps.as_caps.items(): + line = '[*]' if cap_entry.enabled else '[ ]' + line += f' {cap_name}' + if cap_entry.data: + line += f' ({cap_entry.data})' + lines += [line] + value = tuple(lines) + elif ' ' in key: + chan_name, arg = key.split() + value = ('' if arg == CLEAR_WORD + else getattr(self._db.chan(chan_name), arg)) + else: + value = getattr(self._db, key) + self._client_tui_trigger('update_db', key=key, value=value)