from enum import Enum, auto
from getpass import getuser
from threading import Thread
-from typing import Callable, NamedTuple, Optional
+from typing import Callable, Optional
# ourselves
from ircplom.events import (
AffectiveEvent, CrashingException, ExceptionEvent, QueueMixin)
IrcMessage, PORT_SSL)
ClientsDb = dict[str, 'Client']
-ClientDbType = int | str | list[str]
+CLEAR_WORD = 'CLEAR'
_NAMES_DESIRED_SERVER_CAPS = ('server-time', 'account-tag', 'sasl')
# NB: in below numerics accounting, tuples define inclusive ranges
class _Numerics:
- 'To easen dealing with numeric replies.'
def __init__(self, numerics: tuple[int | tuple[int, int], ...]) -> None:
as_ints = []
@dataclass
class _ServerCapability:
- 'Store data collectable via CAPS LS/LIST/NEW.'
enabled: bool
data: str
class _CapsManager:
- def __init__(self, sender: Callable[[IrcMessage], None], db: '_ClientDb',
+ def __init__(self,
+ sender: Callable[[IrcMessage], None],
+ on_update: Callable
) -> None:
- self._db = db
+ self._on_update = lambda: on_update('caps')
self._send = lambda *params: sender(IrcMessage('CAP', params=params))
+ self.clear()
+
+ def clear(self) -> None:
+ 'Zero internal knowledge.'
+ self._ls = _CompletableStringsList()
+ self._list = _CompletableStringsList()
+ self._list_expectations: dict[str, set[str]] = {
+ 'ACK': set(), 'NAK': set()}
self._sent_challenges: list[str] = []
+
+ def start_negotation(self) -> None:
+ 'Call .clear, send CAPS LS 302, and then on_update.'
+ self.clear()
self._send('LS', '302')
+ self._on_update()
def process_msg(self, params: tuple[str, ...]) -> bool:
'Parse CAP params to negot. steps, DB inputs; return if successful.'
- match params[0]:
- case 'NEW':
- for param in params[-1].split():
- self._db.append('caps_LS', param, keep_confirmed=True)
- case 'DEL':
- for param in params[-1].split():
- self._db.remove('caps_LS', param)
- case 'ACK' | 'NAK':
- for name in params[-1].split():
- if params[0] == 'ACK':
- self._db.append('caps_LIST', name)
- case 'LS' | 'LIST':
- key = f'caps_{params[0]}'
- for item in params[-1].strip().split():
- self._db.append(key, item)
- if params[1] != '*':
- self._db.confirm(key)
- if self._db.confirmed('caps_LIST'):
+ verb = params[0]
+ items = params[-1].strip().split()
+ for item in items:
+ if verb == 'NEW':
+ self._ls.append(item, stay_complete=True)
+ self._on_update()
+ elif verb == 'DEL':
+ self._ls.remove(item, stay_complete=True)
+ for name in [
+ name for name in self._list.visible
+ if _ServerCapability.split_name_data(name)[0] == item]:
+ self._list.remove(name, stay_complete=True)
+ self._on_update()
+ elif verb in {'ACK', 'NACK'}:
+ self._list_expectations[verb].add(item)
+ if verb in {'LS', 'LIST'}:
+ target = getattr(self, f'_{verb.lower()}')
+ for item in items:
+ target.append(item)
+ if params[1] != '*':
+ target.is_complete = True
+ if target == self._list:
+ acks = self._list_expectations['ACK']
+ naks = self._list_expectations['NAK']
+ list_set = set(self._list.visible)
+ assert acks == list_set & acks
+ assert set() == list_set & naks
+ if self._list.is_complete:
+ self._on_update()
return True
- if self._db.confirmed('caps_LS'):
+ if self._ls.is_complete:
availables = [_ServerCapability.split_name_data(item)[0]
- for item in self._db.caps_LS]
+ for item in self._ls.visible]
for cap_name in [n for n in _NAMES_DESIRED_SERVER_CAPS
if n in availables]:
self._challenge('REQ', cap_name)
- self._challenge('LIST')
+ self._challenge('LIST')
return False
def end_negotiation(self) -> None:
self._send('END')
def _challenge(self, *params) -> None:
- 'Ensure CAP message of params has been sent, store sended-ness.'
fused = ' '.join(params)
if fused not in self._sent_challenges:
self._send(*params)
self._sent_challenges.append(' '.join(params))
+ @property
+ def as_caps(self) -> dict[str, _ServerCapability]:
+ 'Interpret ._ls, ._list into proper _ServerCapability listing.'
+ d = {}
+ if self._ls.is_complete and self._list.is_complete:
+ for name, data in [_ServerCapability.split_name_data(item)
+ for item in self._ls.visible]:
+ d[name] = _ServerCapability(name in self._list.visible, data)
+ return d
+
-class IrcConnSetup(NamedTuple):
+@dataclass
+class IrcConnSetup:
'All we need to know to set up a new Client connection.'
- hostname: str
- port: int
- nickname: str
- realname: str
- password: str
+ hostname: str = ''
+ port: int = 0
+ nickname: str = ''
+ realname: str = ''
+ password: str = ''
-class _Db:
- 'For values of variable confirmation, and reading in multi-line lists.'
+class Db:
+ 'Helper with some conveniences around annotated attributes.'
- def __init__(self) -> None:
- annos = {}
+ def __init__(self, **kwargs) -> None:
+ self._types: dict[str, type] = {}
for c in self.__class__.__mro__:
if hasattr(c, '__annotations__'):
- for k in [k for k in c.__annotations__ if k not in annos]:
- annos[k] = c.__annotations__[k]
- for name, type_ in annos.items():
- if type_ is int:
- setattr(self, name, 0)
- elif type_ is str:
- setattr(self, name, '')
- elif hasattr(type_, '__origin__') and type_.__origin__ is list:
- setattr(self, name, [])
- else:
- setattr(self, name, {})
- self._confirmeds: set[str] = set()
-
- def set(self, key: str, value: Optional[ClientDbType], confirm=False
- ) -> tuple[bool, bool]:
- 'Set value at key and its confirmation, return what of either changed.'
- value_changed = False
- if value is not None and value != getattr(self, key, None):
- value_changed = True
- setattr(self, key, value)
- confirm_changed = self.confirm(key, confirm)
- return (value_changed, confirm_changed)
-
- def confirm(self, key, confirm=True) -> bool:
- 'Declare value at key confirmed (or the opposite.'
- if confirm and key not in self._confirmeds:
- self._confirmeds.add(key)
- return True
- if (not confirm) and key in self._confirmeds:
- self._confirmeds.remove(key)
- return True
- return False
+ self._types = c.__annotations__ | self._types
+ self._set_empty_defaults()
+ super().__init__(**kwargs)
- def append(self, key: str, value: str, keep_confirmed=False) -> None:
- 'To list at key add value; if not keep_confirmed, unconfirm value.'
- if (not keep_confirmed) and key in self._confirmeds:
- self._confirmeds.remove(key)
- getattr(self, key).append(value)
+ def _set_empty_defaults(self) -> None:
+ for name, type_ in self._types.items():
+ setattr(self, name, type_())
- def remove(self, key: str, value: str) -> None:
- 'From list at key remove value.'
- getattr(self, key).remove(value)
+ def _typecheck(self, key: str, value) -> None:
+ type_ = self._types[key]
+ if hasattr(type_, '__origin__'):
+ assert isinstance(value, type_.__origin__)
+ if len(value):
+ assert hasattr(type_, '__args__')
+ item_type = type_.__args__[0]
+ for item in value:
+ assert isinstance(item, item_type)
+ else:
+ assert isinstance(value, type_)
-class ClientDbBase(_Db):
- 'DB exposable to TUI.'
- client_host: str
- isupports: list[str]
- motd: list[str]
- nickname: str
- user_modes: str
+class _CompletableStringsList:
+
+ def __init__(self) -> None:
+ self._items: list[str] = []
+ self.is_complete = False
- def confirmed(self, key: str) -> bool:
- 'If value at key be confirmed or not.'
- return key in self._confirmeds
+ @property
+ def visible(self) -> tuple[str, ...]:
+ 'Tuple of collected items if .is_complete, otherwise empty.'
+ return tuple(self._items) if self.is_complete else tuple()
+
+ def _stay_complete(self, assert_complete: bool) -> None:
+ if assert_complete:
+ assert self.is_complete
+ else:
+ if self.is_complete:
+ self._items.clear()
+ self.is_complete = False
+
+ def append(self, item: str, stay_complete=False) -> None:
+ 'Append to list; if .is_complete yet not stay_complete, tabula rasa.'
+ self._stay_complete(stay_complete)
+ self._items.append(item)
+
+ def remove(self, item: str, stay_complete=False) -> None:
+ 'Remove from list; if .is_complete yet not stay_complete, tabula rasa.'
+ self._stay_complete(stay_complete)
+ self._items.remove(item)
+
+
+class _Db(Db):
+
+ def __init__(self, on_update: Callable, **kwargs) -> None:
+ self._still_on_init = True
+ self._on_update = on_update
+ super().__init__(**kwargs)
+ self._still_on_init = False
+
+ def __setattr__(self, key: str, value) -> None:
+ if (not hasattr(self, '_still_on_init')) or self._still_on_init:
+ super().__setattr__(key, value)
+ return
+ self._typecheck(key, value)
+ super().__setattr__(key, value)
+ if key[0] != '_':
+ self._on_update(key)
+
+ def __getattribute__(self, key: str):
+ if key[0] != '_':
+ compl_key = self._completable_key(key)
+ if hasattr(self, compl_key):
+ return getattr(self, compl_key).visible
+ return super().__getattribute__(key)
+
+ @staticmethod
+ def _completable_key(key):
+ return f'_completable_{key}'
+
+ def append_completable(self, key: str, value: str) -> None:
+ 'To completable list of key append value.'
+ getattr(self, self._completable_key(key)).append(value)
+
+ def declare_complete(self, key: str) -> None:
+ 'Declare completable at key complete.'
+ getattr(self, self._completable_key(key)).is_complete = True
+ self._on_update(key)
class _ChannelDb(_Db):
- users: list[str]
- topic: str
- channel_modes: str
-
-
-class _ClientDb(ClientDbBase):
- caps_LS: list[str]
- caps_LIST: list[str]
- hostname: str
- password: str
- port: int
- realname: str
- _channels: dict[str, _ChannelDb]
+ _completable_users: _CompletableStringsList
+ # topic: str
+ # channel_modes: str
- @property
- def caps(self) -> dict[str, _ServerCapability]:
- 'Interpret .caps_LS, .caps_LIST into proper _ServerCapability listing.'
- d: dict[str, _ServerCapability] = {}
- if None not in (self.caps_LS, self.caps_LIST):
- for name, data in [_ServerCapability.split_name_data(item)
- for item in self.caps_LS]:
- d[name] = _ServerCapability(name in self.caps_LIST, data)
- return d
- def del_channel(self, name: str) -> None:
+class _ClientDb(_Db, IrcConnSetup):
+ client_host: str
+ nickname_confirmed: bool
+ user_modes: str
+ _completable_motd: _CompletableStringsList
+ _completable_isupports: _CompletableStringsList
+ _channels: dict[str, _ChannelDb]
+
+ def del_chan(self, name: str) -> None:
'Remove DB for channel of name.'
del self._channels[name]
+ self._on_update(f'{name} {CLEAR_WORD}')
- def channel(self, name: str) -> _ChannelDb:
+ def chan(self, name: str) -> _ChannelDb:
'Produce DB for channel of name – pre-existing, or newly created.'
- if self._channels is None:
- self._channels = {}
if name not in self._channels:
- self._channels[name] = _ChannelDb()
+ self._channels[name] = _ChannelDb(
+ on_update=lambda k: self._on_update(f'{name} {k}'))
return self._channels[name]
conn: Optional[_IrcConnection] = None
def __init__(self, conn_setup: IrcConnSetup, **kwargs) -> None:
- self._db = _ClientDb()
- for k in conn_setup._fields:
+ self.client_id = conn_setup.hostname
+ super().__init__(client_id=self.client_id, **kwargs)
+ self._caps = _CapsManager(self.send, self._on_update)
+ self._db = _ClientDb(on_update=self._on_update)
+ for k in conn_setup.__annotations__:
setattr(self._db, k, getattr(conn_setup, k))
if self._db.port <= 0:
self._db.port = PORT_SSL
- self.client_id = self._db.hostname
self._prev_verb = ''
- super().__init__(client_id=self.client_id, **kwargs)
- for k in IrcConnSetup._fields:
- self._update_db(k, None, confirm=k != 'nickname')
self._start_connecting()
def _start_connecting(self) -> None:
try:
if self.conn:
raise IrcConnAbortException('already connected')
- self.conn = _IrcConnection(hostname=self._db.hostname,
- port=self._db.port,
- _q_out=self._q_out,
- client_id=self.client_id)
+ self.conn = _IrcConnection(
+ hostname=self._db.hostname, port=self._db.port,
+ _q_out=self._q_out, client_id=self.client_id)
self._client_trigger('_on_connect')
except IrcConnAbortException as e:
self._log(f'failed to connect: {e}', alert=True)
self._log('connecting …')
Thread(target=connect, daemon=True, args=(self,)).start()
+ @abstractmethod
+ def _on_update(self, key: str) -> None:
+ pass
+
def _on_connect(self) -> None:
- 'Steps to perform right after connection.'
assert self.conn is not None
self._log('connected to server (SSL: '
f'{"yes" if self.conn.ssl else "no"})',
scope=LogScope.ALL)
- self._caps = _CapsManager(self.send, self._db)
+ self._caps.start_negotation()
self.send(IrcMessage(verb='USER',
params=(getuser(), '0', '*', self._db.realname)))
self.send(IrcMessage(verb='NICK', params=(self._db.nickname,)))
self._log(to_log, scope=log_target)
self._log(msg.raw, scope=LogScope.RAW, out=True)
- def _update_db(self, key: str, value: Optional[ClientDbType], confirm: bool
- ) -> tuple[bool, bool]:
- 'Wrap ._db.set into something accessible to subclass extension.'
- return self._db.set(key, value, confirm)
-
def close(self) -> None:
'Close both recv Loop and socket.'
self._log(msg='disconnecting from server …', scope=LogScope.ALL)
if self.conn:
self.conn.close()
self.conn = None
- self._update_db('nickname', value=None, confirm=False)
+ self._db.nickname_confirmed = False
def on_handled_loop_exception(self, e: IrcConnAbortException) -> None:
'Gracefully handle broken connection.'
'Log msg.raw, then process incoming msg into appropriate client steps.'
self._log(msg.raw, scope=LogScope.RAW, out=False)
if msg.verb != self._prev_verb and self._prev_verb == '005':
- self._update_db('isupports', None, True)
+ self._db.declare_complete('isupports')
self._prev_verb = msg.verb
if _NumericsToConfirmNickname.contain(msg.verb):
- self._update_db('nickname', value=msg.params[0], confirm=True)
+ self._db.nickname = msg.params[0]
+ self._db.nickname_confirmed = True
if _NumericsToIgnore.contain(msg.verb):
return
if msg.verb in {'JOIN', 'PART'} and len(msg.params) >= 1:
match msg.verb:
case '005' if len(msg.params) > 2:
for param in msg.params[1:-1]:
- self._db.append('isupports', param)
+ self._db.append_completable('isupports', param)
case '353' if len(msg.params) == 4:
for user in msg.params[3].split():
- self._db.channel(msg.params[2]).append('users', user)
+ self._db.chan(msg.params[2]
+ ).append_completable('users', user)
case '366' if len(msg.params) == 3:
- self._db.channel(msg.params[1]).set('users', None, True)
+ self._db.chan(msg.params[1]).declare_complete('users')
case '372' if len(msg.params) == 2: # RPL_MOTD
- self._db.append('motd', msg.params[-1])
+ self._db.append_completable('motd', msg.params[1])
case '376' if len(msg.params) == 2: # RPL_ENDOFMOTD
- self._update_db('motd', None, True)
+ self._db.declare_complete('motd')
case '396' if len(msg.params) == 3: # RPL_VISIBLEHOST
# '@'-split because <https://defs.ircdocs.horse/defs/numerics>
# claims: "<hostname> can also be in the form <user@hostname>"
- self._update_db('client_host', confirm=True,
- value=msg.params[1].split('@')[-1])
+ self._db.client_host = msg.params[1].split('@')[-1]
case 'AUTHENTICATE' if msg.params == ('+',):
auth = b64encode((self._db.nickname + '\0' +
self._db.nickname + '\0' +
).encode('utf-8')).decode('utf-8')
self.send(IrcMessage('AUTHENTICATE', (auth,)))
case 'CAP' if len(msg.params) > 1:
- if self._caps.process_msg(msg.params[1:]):
- self._log('', caps=self._db.caps)
- if (sasl_caps := self._db.caps.get('sasl', None))\
- and 'PLAIN' in sasl_caps.data.split(','):
- if self._db.password:
- self._log('trying to authenticate via SASL/plain')
- self.send(IrcMessage('AUTHENTICATE', ('PLAIN',)))
- else:
- self._caps.end_negotiation()
+ if (self._caps.process_msg(msg.params[1:])
+ and (sasl_caps := self._caps.as_caps.get('sasl', None))
+ and ('PLAIN' in sasl_caps.data.split(','))):
+ if self._db.password:
+ self._log('trying to authenticate via SASL/plain')
+ self.send(IrcMessage('AUTHENTICATE', ('PLAIN',)))
+ else:
+ self._caps.end_negotiation()
case 'ERROR' if len(msg.params) == 1:
self.close()
case 'JOIN' if len(msg.params) == 1:
if msg.nick_from_source != self._db.nickname:
- self._db.channel(channel).append(
- 'users', msg.nick_from_source, keep_confirmed=True)
+ self._db.chan(channel).users.append(msg.nick_from_source)
self._log(log_msg, scope=scope, channel=channel)
case 'MODE' if (len(msg.params) == 2
and msg.params[0] == self._db.nickname):
- self._update_db('user_modes', msg.params[1], True)
+ self._db.user_modes = msg.params[1]
case 'NICK' if (len(msg.params) == 1
and msg.nick_from_source == self._db.nickname):
- self._update_db('nickname', msg.params[0], confirm=True)
+ self._db.nickname = msg.params[0]
+ self._db.nickname_confirmed = True
case 'NOTICE' | 'PRIVMSG' if len(msg.params) == 2:
scope = LogScope.CHAT if '!' in msg.source else LogScope.SERVER
self._log(msg.params[-1], scope=scope, channel=msg.params[0],
log_msg += f': {msg.params[1]}'
self._log(log_msg, scope=scope, channel=channel)
if msg.nick_from_source == self._db.nickname:
- self._db.del_channel(channel)
+ self._db.del_chan(channel)
else:
- self._db.channel(channel).remove('users',
- msg.nick_from_source)
+ self._db.chan(channel).users.remove(msg.nick_from_source)
case 'PING' if len(msg.params) == 1:
self.send(IrcMessage(verb='PONG', params=(msg.params[0],)))
case '903' | '904' if len(msg.params) == 2:
from ircplom.tui_base import (BaseTui, PromptWidget, TuiEvent, Window,
CMD_SHORTCUTS)
from ircplom.irc_conn import IrcMessage
-from ircplom.client import (IrcConnSetup, Client, ClientDbBase, ClientDbType,
- ClientQueueMixin, LogScope, NewClientEvent)
+from ircplom.client import (Client, ClientQueueMixin, Db, IrcConnSetup,
+ LogScope, NewClientEvent, CLEAR_WORD)
CMD_SHORTCUTS['disconnect'] = 'window.disconnect'
CMD_SHORTCUTS['join'] = 'window.join'
_LOG_PREFIX_OUT = '>'
_LOG_PREFIX_IN = '<'
+_DbType = bool | int | str | tuple[str, ...]
+
class _ClientWindow(Window, ClientQueueMixin):
self._send_msg('PART', (self.chatname,))
+class _Db(Db):
+
+ def set_and_check_for_change(self, key: str, value: _DbType) -> bool:
+ 'To attribute of key set value, reply if that changed anything.'
+ self._typecheck(key, value)
+ old_value = getattr(self, key)
+ setattr(self, key, value)
+ return value != old_value
+
+
+class _ChannelDb(_Db):
+ users: tuple[str, ...]
+
+ def set_and_check_for_change(self, key: str, value: _DbType) -> bool:
+ if key == CLEAR_WORD:
+ self._set_empty_defaults()
+ return True
+ return super().set_and_check_for_change(key, value)
+
+
+class _TuiClientDb(_Db, IrcConnSetup):
+ caps: tuple[str]
+ client_host: str
+ isupports: tuple[str]
+ motd: tuple[str]
+ nickname_confirmed: bool
+ user_modes: str
+ _channels: dict[str, _ChannelDb]
+
+ def chan(self, name: str) -> _ChannelDb:
+ 'Produce DB for channel of name – pre-existing, or newly created.'
+ if name not in self._channels:
+ self._channels[name] = _ChannelDb()
+ return self._channels[name]
+
+
@dataclass
class _ClientWindowsManager:
_tui_log: Callable
_tui_new_window: Callable
def __post_init__(self, *_, **__) -> None:
- self._db = ClientDbBase()
- self._db.set('nickname', '?', confirm=False)
+ self._db = _TuiClientDb()
self.windows: list[_ClientWindow] = []
for scope in (LogScope.SERVER, LogScope.RAW):
self._new_win(scope)
kwargs['win_cls'] = (_ChannelWindow if chatname[0] == '#'
else _ChatWindow)
kwargs['chatname'] = chatname
- kwargs['get_nick_data'] = lambda: (
- self._db.nickname, self._db.confirmed('nickname'))
+ kwargs['get_nick_data'] = lambda: (self._db.nickname,
+ self._db.nickname_confirmed)
win = self._tui_new_window(**kwargs)
self.windows += [win]
return win
prefix = f'{first_char}{sender_label}'
self._tui_log(msg, scope=scope, prefix=prefix, **kwargs)
- def update(self,
- key: str,
- value: Optional[ClientDbType],
- confirmed: bool,
- scope: LogScope
- ) -> bool:
- 'Apply settings in kwargs, follow representation update triggers.'
- changes = self._db.set(key, value, confirmed)
- for i, t in enumerate((('', value), ('confirmation of ', confirmed))):
- if changes[i] and (i == 0 # re confirms only care about nickname
- or key == 'nickname'):
- announcement = f'changing {t[0]}{key} to:'
- if isinstance(t[1], list):
- self.log(announcement, scope=scope)
- for item in t[1]:
- self.log(f' {item}', scope=scope)
- continue
- self.log(f'changing {t[0]}{key} to: [{t[1]}]', scope=scope)
+ def update_db(self, key: str, value: _DbType) -> bool:
+ 'Ensure key at value, follow representation update triggers.'
+ db: _TuiClientDb | _ChannelDb = self._db
+ scope = LogScope.SERVER
+ log_kwargs: dict[str, str] = {}
+ if ' ' in key:
+ chan_name, key = key.split()
+ db = self._db.chan(chan_name)
+ scope = LogScope.CHAT
+ log_kwargs |= {'channel': chan_name}
+ if not db.set_and_check_for_change(key, value):
+ return False
+ if key != CLEAR_WORD:
+ announcement = f'changed {key} to:'
+ if isinstance(value, tuple):
+ self.log(announcement, scope=scope, **log_kwargs)
+ for item in value:
+ self.log(f' {item}', scope=scope, **log_kwargs)
+ else:
+ self.log(f'{announcement} [{value}]',
+ scope=scope, **log_kwargs)
for win in [w for w in self.windows
if isinstance(w, _ChatWindow)]:
win.set_prompt_prefix()
to_log = []
if msg:
to_log += [msg]
- if 'caps' in kwargs:
- to_log += ['server capabilities (enabled: "+"):']
- for cap_name, cap in kwargs['caps'].items():
- listing = '+' if cap.enabled else '-'
- listing += f' {cap_name}'
- if cap.data:
- listing += f' ({cap.data})'
- to_log += [listing]
for item in to_log:
self._client_tui_trigger('log', scope=scope, msg=item, **kwargs)
if scope == LogScope.RAW:
with open(f'{self.client_id}.log', 'a', encoding='utf8') as f:
f.write(('>' if kwargs['out'] else '<') + f' {msg}\n')
- def _update_db(self, key: str, value: Optional[ClientDbType], confirm: bool
- ) -> tuple[bool, bool]:
- value_changed, conf_changed = super()._update_db(key, value, confirm)
- if value is None and not value_changed: # local ._db may have fallback
- value = getattr(self._db, key) # values Tui._db doesn't
- self._client_tui_trigger('update', scope=LogScope.SERVER,
- key=key, value=value, confirmed=confirm)
- return (value_changed, conf_changed)
+ def _on_update(self, key: str) -> None:
+ value: _DbType
+ if key == 'caps':
+ lines: list[str] = []
+ for cap_name, cap_entry in self._caps.as_caps.items():
+ line = '[*]' if cap_entry.enabled else '[ ]'
+ line += f' {cap_name}'
+ if cap_entry.data:
+ line += f' ({cap_entry.data})'
+ lines += [line]
+ value = tuple(lines)
+ elif ' ' in key:
+ chan_name, arg = key.split()
+ value = ('' if arg == CLEAR_WORD
+ else getattr(self._db.chan(chan_name), arg))
+ else:
+ value = getattr(self._db, key)
+ self._client_tui_trigger('update_db', key=key, value=value)