from enum import Enum, auto
from getpass import getuser
from threading import Thread
-from typing import Callable, Optional
+from typing import Any, Callable, Optional
# ourselves
from ircplom.events import (
AffectiveEvent, CrashingException, ExceptionEvent, QueueMixin)
).kw(**kwargs))
+class _UpdatingDict:
+ _on_update: Callable
+
+ def __init__(self) -> None:
+ self._dict: dict[str, Any] = {}
+
+ def set_on_update(self, name: str, on_update: Callable) -> None:
+ 'Caller of on_update with path= set to name.'
+ self._on_update = lambda k: on_update(name, k)
+
+ def clear(self) -> None:
+ 'Zero dict and send clearance update.'
+ self._dict.clear()
+ self._on_update('')
+
+ def has(self, key: str) -> bool:
+ 'Test if entry of name in dictionary.'
+ return key in self._dict
+
+ def __getitem__(self, key: str):
+ return self._dict[key]
+
+ def __setitem__(self, key: str, val: Any) -> None:
+ self._dict[key] = val
+ self._on_update(key)
+
+ def __delitem__(self, key: str) -> None:
+ del self._dict[key]
+ self._on_update(key)
+
+
@dataclass
-class _ServerCapability:
+class ServerCapability:
+ 'Public API for CAP data.'
enabled: bool
data: str
+
+class _ServerCapability(ServerCapability):
+
@staticmethod
def split_name_data(raw: str) -> tuple[str, str]:
'Parse version 302 LS listing into cap name and metadata.'
def __init__(self,
sender: Callable[[IrcMessage], None],
- on_update: Callable
+ caps_dict: _UpdatingDict
) -> None:
- self._on_update = lambda: on_update('caps')
+ self._dict = caps_dict
self._send = lambda *params: sender(IrcMessage('CAP', params=params))
self.clear()
def clear(self) -> None:
'Zero internal knowledge.'
+ self._dict.clear()
self._ls = _CompletableStringsList()
self._list = _CompletableStringsList()
self._list_expectations: dict[str, set[str]] = {
self._sent_challenges: list[str] = []
def start_negotation(self) -> None:
- 'Call .clear, send CAPS LS 302, and then on_update.'
+ 'Call .clear, send CAPS LS 302.'
self.clear()
self._send('LS', '302')
- self._on_update()
def process_msg(self, params: tuple[str, ...]) -> bool:
'Parse CAP params to negot. steps, DB inputs; return if successful.'
for item in items:
if verb == 'NEW':
self._ls.append(item, stay_complete=True)
- self._on_update()
+ self._update_cap(item)
elif verb == 'DEL':
self._ls.remove(item, stay_complete=True)
for name in [
name for name in self._list.visible
if _ServerCapability.split_name_data(name)[0] == item]:
self._list.remove(name, stay_complete=True)
- self._on_update()
+ self._del_cap(item)
elif verb in {'ACK', 'NACK'}:
self._list_expectations[verb].add(item)
if verb in {'LS', 'LIST'}:
assert acks == list_set & acks
assert set() == list_set & naks
if self._list.is_complete:
- self._on_update()
+ for name, data in [_ServerCapability.split_name_data(item)
+ for item in self._ls.visible]:
+ self._dict[name] = _ServerCapability(
+ name in self._list.visible, data)
return True
if self._ls.is_complete:
availables = [_ServerCapability.split_name_data(item)[0]
self._send(*params)
self._sent_challenges.append(' '.join(params))
- @property
- def as_caps(self) -> dict[str, _ServerCapability]:
- 'Interpret ._ls, ._list into proper _ServerCapability listing.'
- d = {}
- if self._ls.is_complete and self._list.is_complete:
- for name, data in [_ServerCapability.split_name_data(item)
- for item in self._ls.visible]:
- d[name] = _ServerCapability(name in self._list.visible, data)
- return d
+ def _update_cap(self, full_ls_item: str) -> None:
+ name, data = _ServerCapability.split_name_data(full_ls_item)
+ is_enabled = name in self._list.visible
+ self._dict[name] = _ServerCapability(is_enabled, data)
+
+ def _del_cap(self, full_ls_item) -> None:
+ name, _ = _ServerCapability.split_name_data(full_ls_item)
+ del self._dict[name]
@dataclass
# channel_modes: str
-class _UpdatingDict:
- _on_update: Callable
-
- def __init__(self) -> None:
- self._dict: dict[str, str] = {}
-
- def set_on_update(self, name: str, on_update: Callable) -> None:
- 'Caller of on_update with path= set to name.'
- self._on_update = lambda k: on_update(name, k)
-
- def clear(self) -> None:
- 'Zero dict and send clearance update.'
- self._dict.clear()
- self._on_update('')
-
- def has(self, key: str) -> bool:
- 'Test if entry of name in dictionary.'
- return key in self._dict
-
- def __getitem__(self, key: str) -> str:
- return self._dict[key]
-
- def __setitem__(self, key: str, val: str) -> None:
- self._dict[key] = val
- self._on_update(key)
-
- def __delitem__(self, key: str) -> None:
- del self._dict[key]
- self._on_update(key)
-
-
class _ClientDb(_Db, IrcConnSetup):
+ caps: _UpdatingDict
connection_state: str
client_host: str
isupports: _UpdatingDict
self.client_id = conn_setup.hostname
super().__init__(client_id=self.client_id, **kwargs)
self._db = _ClientDb(on_update=self._on_update)
- self._caps = _CapsManager(self.send, self._on_update)
+ self._caps = _CapsManager(self.send, self._db.caps)
for k in conn_setup.__annotations__:
setattr(self._db, k, getattr(conn_setup, k))
if self._db.port <= 0:
self.send(IrcMessage('AUTHENTICATE', (auth,)))
case 'CAP' if len(msg.params) > 1:
if (self._caps.process_msg(msg.params[1:])
- and (sasl_caps := self._caps.as_caps.get('sasl', None))
+ and self._db.caps.has('sasl')
+ and (sasl_caps := self._db.caps['sasl'])
and ('PLAIN' in sasl_caps.data.split(','))):
if self._db.password:
self._log('trying to authenticate via SASL/plain')
CMD_SHORTCUTS)
from ircplom.irc_conn import IrcMessage
from ircplom.client import (Client, ClientQueueMixin, Db, IrcConnSetup,
- LogScope, NewClientEvent)
+ LogScope, NewClientEvent, ServerCapability)
CMD_SHORTCUTS['disconnect'] = 'window.disconnect'
CMD_SHORTCUTS['join'] = 'window.join'
path: str
arg: str = ''
value: Optional[_DbType] = None
+ display: str = ''
+
+ def __post_init__(self, **kwargs) -> None:
+ super().__init__(**kwargs)
+ if not self.display:
+ self.display = str(self.value)
@property
def is_chan(self) -> bool:
class _TuiClientDb(_Db, IrcConnSetup):
- caps: tuple[str]
+ caps: dict[str, str]
client_host: str
connection_state: str
isupports: dict[str, str]
for item in update.value:
self.log(f' {item}', scope=scope, **log_kwargs)
elif announcement[-1] == ':':
- self.log(f'{announcement} [{update.value}]',
+ self.log(f'{announcement} [{update.display}]',
scope=scope, **log_kwargs)
for win in [w for w in self.windows if isinstance(w, _ChatWindow)]:
win.set_prompt_prefix()
def _on_update(self, path: str, arg: str = '') -> None:
value: Optional[_DbType] = None
is_chan = path[0] == '#'
- if path == 'caps':
- lines: list[str] = []
- for cap_name, cap_entry in self._caps.as_caps.items():
- line = '[*]' if cap_entry.enabled else '[ ]'
- line += f' {cap_name}'
- if cap_entry.data:
- line += f' ({cap_entry.data})'
- lines += [line]
- value = tuple(lines)
- elif arg:
+ display = ''
+ if arg:
if is_chan and self._db.has_chan(path):
is_chan = True
if (chan := self._db.chan(path)) and hasattr(chan, arg):
d = getattr(self._db, path)
if d.has(arg):
value = d[arg]
+ if isinstance(value, ServerCapability):
+ display = 'ENABLED' if value.enabled else 'available'
+ if value.data:
+ display += f' ({value.data})'
elif (not is_chan) and not self._db.needs_arg(path):
value = getattr(self._db, path)
- self._client_tui_trigger('update_db', update=_Update(path, arg, value))
+ self._client_tui_trigger('update_db', update=_Update(
+ path, arg, value, display))