From: Christian Heller Date: Tue, 2 Sep 2025 05:37:14 +0000 (+0200) Subject: Some code re-organization. X-Git-Url: https://plomlompom.com/repos/day?a=commitdiff_plain;h=c1705be632500dee6fbf854d6070fed02dc2dcfb;p=ircplom Some code re-organization. --- diff --git a/ircplom/client.py b/ircplom/client.py index c0c352e..4c8130b 100644 --- a/ircplom/client.py +++ b/ircplom/client.py @@ -16,11 +16,12 @@ from ircplom.irc_conn import ( ILLEGAL_NICK_CHARS, ILLEGAL_NICK_FIRSTCHARS, ISUPPORT_DEFAULTS, PORT_SSL) from ircplom.msg_parse_expectations import MsgTok, MSG_EXPECTATIONS -ClientsDb = dict[str, 'Client'] + +_NAMES_DESIRED_SERVER_CAPS = ('sasl',) -class _DeepAnnotationsMixin: - 'Provide ._deep_annotations() of non-underscored annotations of whole MRO.' +class AutoAttrMixin: + 'Ensures attribute as defined by annotations along MRO' @classmethod def _deep_annotations(cls) -> dict[str, type]: @@ -30,10 +31,6 @@ class _DeepAnnotationsMixin: types = c.__annotations__ | types return {k: v for k, v in types.items() if k[0] != '_'} - -class AutoAttrMixin(_DeepAnnotationsMixin): - 'Ensures attribute as defined by annotations, and ._make_attr method.' - def __getattribute__(self, key: str): if key[0] != '_' and (cls := self._deep_annotations().get(key, None)): try: @@ -65,114 +62,6 @@ class Dict(Generic[DictItem]): return self.__orig_class__.__args__[0] -@dataclass -class IrcConnSetup: - 'All we need to know to set up a new Client connection.' - hostname: str = '' - port: int = 0 - nick_wanted: str = '' - realname: str = '' - password: str = '' - - -class SharedClientDbFields(IrcConnSetup): - 'API for fields shared directly in name and type with TUI.' - connection_state: str = '' - isupport: Dict[str] - sasl_account: str = '' - sasl_auth_state: str = '' - user_modes: str = '' - - def is_chan_name(self, name: str) -> bool: - 'Tests name to match CHANTYPES prefixes.' - return name[0] in self.isupport['CHANTYPES'] - - -@dataclass -class NickUserHost: - 'Combination of nickname, username on host, and host.' - nick: str = '?' - user: str = '?' - host: str = '?' - - -@dataclass -class ServerCapability: - 'Public API for CAP data.' - data: str = '' - enabled: bool = False - - -class LogScope(Enum): - 'Where log messages should go.' - ALL = auto() - SERVER = auto() - RAW = auto() - CHAT = auto() - SAME = auto() - - -@dataclass -class _ClientIdMixin: - 'Collects a Client\'s ID at .client_id.' - client_id: str - - -@dataclass -class ClientEvent(AffectiveEvent, _ClientIdMixin): - 'To affect Client identified by ClientIdMixin.' - - -@dataclass -class NewClientEvent(AffectiveEvent): - 'Put Client .payload into ClientsDb target.' - payload: 'Client' - - def affect(self, target: ClientsDb) -> None: - target[self.payload.client_id] = self.payload - - -@dataclass -class ClientQueueMixin(QueueMixin, _ClientIdMixin): - 'To QueueMixin adds _cput to send ClientEvent for self.' - - def _client_trigger(self, t_method: str, **kwargs) -> None: - self._put(ClientEvent.affector(t_method, client_id=self.client_id - ).kw(**kwargs)) - - -_NAMES_DESIRED_SERVER_CAPS = ('sasl',) - - -def _nick_incremented(nickname: str) -> str: - 'Return nickname with number suffix incremented, or "0" if none.' - name, digits = ([(nickname, '')] - + [(nickname[:i], nickname[i:]) - for i in range(len(nickname), 0, -1) - if nickname[i:].isdigit()] - )[-1] - return name + str(0 if not digits else (int(digits) + 1)) - - -@dataclass -class _IrcConnection(BaseIrcConnection, _ClientIdMixin): - hostname: InitVar[str] # needed by BaseIrcConnection, but not desired as - port: InitVar[int] # dataclass fields, only for __post_init__ call - - def __post_init__(self, hostname, port, **kwargs) -> None: - super().__init__(hostname=hostname, port=port, _q_out=self._q_out, - **kwargs) - - def _make_recv_event(self, msg: IrcMessage) -> ClientEvent: - return ClientEvent.affector('handle_msg', client_id=self.client_id - ).kw(msg=msg) - - def _on_handled_loop_exception(self, e: IrcConnAbortException - ) -> ClientEvent: - return ClientEvent.affector('on_handled_loop_exception', - client_id=self.client_id).kw(e=e) - - class _Dict(Dict[DictItem]): _defaults: dict[str, DictItem] @@ -229,7 +118,6 @@ class _CompletableStringsList: def complete(self) -> None: 'Declare list done.' self.completed = tuple(self._incomplete) - # self._incomplete.clear() def clear(self) -> None: 'Wipe content and declare new emptiness as complete.' @@ -237,130 +125,6 @@ class _CompletableStringsList: self.complete() -class _CapsManager: - - def __init__(self, - sender: Callable[[IrcMessage], None], - caps_dict: '_UpdatingDict[_UpdatingServerCapability]' - ) -> None: - self._dict = caps_dict - self._send = lambda *params: sender(IrcMessage('CAP', params=params)) - self.clear() - - def clear(self) -> None: - 'Zero internal knowledge.' - self._dict.clear() - self._ls = _CompletableStringsList() - self._list = _CompletableStringsList() - self._list_expectations: dict[str, set[str]] = { - 'ACK': set(), 'NAK': set()} - - def start_negotation(self) -> None: - 'Call .clear, send CAPS LS 302.' - self.clear() - self._send('LS', '302') - - def end_negotiation(self) -> None: - 'Stop negotation, without emptying caps DB.' - self._send('END') - - def process_msg(self, verb: str, items: tuple[str, ...], complete: bool - ) -> bool: - 'Parse CAP message to negot. steps, DB inputs; return if successful.' - for item in items: - if verb == 'NEW': - key, data = _Dict.key_val_from_eq_str(item) - self._dict.set_updating(key, ServerCapability(data=data)) - elif verb == 'DEL': - del self._dict[item] - elif verb in {'ACK', 'NACK'}: - self._list_expectations[verb].add(item) - if verb in {'LS', 'LIST'}: - target = getattr(self, f'_{verb.lower()}') - for item in items: - target.append(item) - if complete: - target.complete() - if target == self._ls: - for cap_name in _NAMES_DESIRED_SERVER_CAPS: - self._send('REQ', cap_name) - self._send('LIST') - elif target == self._list: - acks = self._list_expectations['ACK'] - naks = self._list_expectations['NAK'] - list_set = set(target.completed) - assert acks == list_set & acks - assert set() == list_set & naks - for key, data in [_Dict.key_val_from_eq_str(entry) - for entry in self._ls.completed]: - self._dict.set_updating(key, ServerCapability( - data=data, enabled=key in self._list.completed)) - return True - return False - - -class _Channel: - user_ids: _CompletableStringsList - - def __init__(self, - get_id_for_nick: Callable, - get_membership_prefixes: Callable, - purge_users: Callable, - **kwargs - ) -> None: - self._get_id_for_nick = get_id_for_nick - self._get_membership_prefixes = get_membership_prefixes - self._purge_users = purge_users - super().__init__(**kwargs) - - def add_from_namreply(self, items: tuple[str, ...]): - 'Add to .user_ids items assumed as nicknames with membership prefixes.' - for item in items: - nickname = item.lstrip(self._get_membership_prefixes()) - self.user_ids.append(self._get_id_for_nick(nickname)) - - def append_nick(self, nickname: str) -> None: - 'To .user_ids append .nickname and declare .user_ids complete.' - user_id = self._get_id_for_nick(nickname) - self.user_ids.append(user_id, complete=True) - - def remove_nick(self, nickname: str) -> None: - 'From .user_ids remove .nickname and declare .user_ids complete.' - user_id = self._get_id_for_nick(nickname) - self.user_ids.remove(user_id, complete=True) - self._purge_users() - - -class _NickUserHost(NickUserHost, _DeepAnnotationsMixin): - - def __str__(self) -> str: - return f'{self.nick}!{self.user}@{self.host}' - - def __eq__(self, other) -> bool: - if not isinstance(other, NickUserHost): - return False - for key in self._deep_annotations().keys(): - if getattr(self, key) != getattr(other, key): - return False - return True - - def __setattr__(self, key: str, value: NickUserHost | str) -> None: - if key == 'nickuserhost' and isinstance(value, _NickUserHost): - for annotated_key in self._deep_annotations().keys(): - setattr(self, annotated_key, getattr(value, annotated_key)) - else: - super().__setattr__(key, value) - - @classmethod - def from_str(cls, value: str) -> Self: - 'Produce from string assumed to fit _!_@_ pattern.' - toks = value.split('!') - assert len(toks) == 2 - toks = toks[0:1] + toks[1].split('@') - assert len(toks) == 3 - return cls(*toks) - - class _UpdatingMixin(AutoAttrMixin): _on_update: Callable @@ -375,11 +139,10 @@ class _UpdatingMixin(AutoAttrMixin): return None if isinstance(self, _CompletableStringsList): return self.completed - for cls in self.__class__.__mro__: - if cls != _DeepAnnotationsMixin\ - and AutoAttrMixin not in cls.__mro__: - obj = cls() - break + for cls in [cls for cls in self.__class__.__mro__ + if AutoAttrMixin not in cls.__mro__]: + obj = cls() + break for key in self._deep_annotations(): attr_val = getattr(self, key) setattr(obj, key, @@ -457,6 +220,159 @@ class _UpdatingCompletableStringsList(_UpdatingMixin, _CompletableStringsList): self._on_update() +@dataclass +class IrcConnSetup: + 'All we need to know to set up a new Client connection.' + hostname: str = '' + port: int = 0 + nick_wanted: str = '' + realname: str = '' + password: str = '' + + +class SharedClientDbFields(IrcConnSetup): + 'API for fields shared directly in name and type with TUI.' + connection_state: str = '' + isupport: Dict[str] + sasl_account: str = '' + sasl_auth_state: str = '' + user_modes: str = '' + + def is_chan_name(self, name: str) -> bool: + 'Tests name to match CHANTYPES prefixes.' + return name[0] in self.isupport['CHANTYPES'] + + +@dataclass +class NickUserHost: + 'Combination of nickname, username on host, and host.' + nick: str = '?' + user: str = '?' + host: str = '?' + + +@dataclass +class ServerCapability: + 'Public API for CAP data.' + data: str = '' + enabled: bool = False + + +class LogScope(Enum): + 'Where log messages should go.' + ALL = auto() + SERVER = auto() + RAW = auto() + CHAT = auto() + SAME = auto() + + +@dataclass +class _ClientIdMixin: + 'Collects a Client\'s ID at .client_id.' + client_id: str + + +@dataclass +class ClientQueueMixin(QueueMixin, _ClientIdMixin): + 'To QueueMixin adds _cput to send ClientEvent for self.' + + def _client_trigger(self, t_method: str, **kwargs) -> None: + self._put(ClientEvent.affector(t_method, client_id=self.client_id + ).kw(**kwargs)) + + +@dataclass +class _IrcConnection(BaseIrcConnection, _ClientIdMixin): + hostname: InitVar[str] # needed by BaseIrcConnection, but not desired as + port: InitVar[int] # dataclass fields, only for __post_init__ call + + def __post_init__(self, hostname, port, **kwargs) -> None: + super().__init__(hostname=hostname, port=port, _q_out=self._q_out, + **kwargs) + + def _make_recv_event(self, msg: IrcMessage) -> 'ClientEvent': + return ClientEvent.affector('handle_msg', client_id=self.client_id + ).kw(msg=msg) + + def _on_handled_loop_exception(self, e: IrcConnAbortException + ) -> 'ClientEvent': + return ClientEvent.affector('on_handled_loop_exception', + client_id=self.client_id).kw(e=e) + + +class _Channel: + user_ids: _CompletableStringsList + + def __init__(self, + get_id_for_nick: Callable, + get_membership_prefixes: Callable, + purge_users: Callable, + **kwargs + ) -> None: + self._get_id_for_nick = get_id_for_nick + self._get_membership_prefixes = get_membership_prefixes + self._purge_users = purge_users + super().__init__(**kwargs) + + def add_from_namreply(self, items: tuple[str, ...]): + 'Add to .user_ids items assumed as nicknames with membership prefixes.' + for item in items: + nickname = item.lstrip(self._get_membership_prefixes()) + self.user_ids.append(self._get_id_for_nick(nickname)) + + def append_nick(self, nickname: str) -> None: + 'To .user_ids append .nickname and declare .user_ids complete.' + user_id = self._get_id_for_nick(nickname) + self.user_ids.append(user_id, complete=True) + + def remove_nick(self, nickname: str) -> None: + 'From .user_ids remove .nickname and declare .user_ids complete.' + user_id = self._get_id_for_nick(nickname) + self.user_ids.remove(user_id, complete=True) + self._purge_users() + + +class _NickUserHost(NickUserHost): + + def __str__(self) -> str: + return f'{self.nick}!{self.user}@{self.host}' + + def __eq__(self, other) -> bool: + if not isinstance(other, NickUserHost): + return False + for key in NickUserHost.__annotations__: + if getattr(self, key) != getattr(other, key): + return False + return True + + def __setattr__(self, key: str, value: NickUserHost | str) -> None: + if key == 'nickuserhost' and isinstance(value, _NickUserHost): + for annotated_key in NickUserHost.__annotations__: + setattr(self, annotated_key, getattr(value, annotated_key)) + else: + super().__setattr__(key, value) + + @classmethod + def from_str(cls, value: str) -> Self: + 'Produce from string assumed to fit _!_@_ pattern.' + toks = value.split('!') + assert len(toks) == 2 + toks = toks[0:1] + toks[1].split('@') + assert len(toks) == 3 + return cls(*toks) + + @property + def incremented(self) -> str: + 'Return .nick with number suffix incremented, or "0" if none.' + name, digits = ([(self.nick, '')] + + [(self.nick[:i], self.nick[i:]) + for i in range(len(self.nick), 0, -1) + if self.nick[i:].isdigit()] + )[-1] + return name + str(0 if not digits else (int(digits) + 1)) + + class _UpdatingServerCapability(_UpdatingMixin, ServerCapability): pass @@ -538,6 +454,68 @@ class _ClientDb(_UpdatingMixin, SharedClientDbFields): return id_ +class _CapsManager: + + def __init__(self, + sender: Callable[[IrcMessage], None], + caps_dict: _UpdatingDict[_UpdatingServerCapability] + ) -> None: + self._dict = caps_dict + self._send = lambda *params: sender(IrcMessage('CAP', params=params)) + self.clear() + + def clear(self) -> None: + 'Zero internal knowledge.' + self._dict.clear() + self._ls = _CompletableStringsList() + self._list = _CompletableStringsList() + self._list_expectations: dict[str, set[str]] = { + 'ACK': set(), 'NAK': set()} + + def start_negotation(self) -> None: + 'Call .clear, send CAPS LS 302.' + self.clear() + self._send('LS', '302') + + def end_negotiation(self) -> None: + 'Stop negotation, without emptying caps DB.' + self._send('END') + + def process_msg(self, verb: str, items: tuple[str, ...], complete: bool + ) -> bool: + 'Parse CAP message to negot. steps, DB inputs; return if successful.' + for item in items: + if verb == 'NEW': + key, data = _Dict.key_val_from_eq_str(item) + self._dict.set_updating(key, ServerCapability(data=data)) + elif verb == 'DEL': + del self._dict[item] + elif verb in {'ACK', 'NACK'}: + self._list_expectations[verb].add(item) + if verb in {'LS', 'LIST'}: + target = getattr(self, f'_{verb.lower()}') + for item in items: + target.append(item) + if complete: + target.complete() + if target == self._ls: + for cap_name in _NAMES_DESIRED_SERVER_CAPS: + self._send('REQ', cap_name) + self._send('LIST') + elif target == self._list: + acks = self._list_expectations['ACK'] + naks = self._list_expectations['NAK'] + list_set = set(target.completed) + assert acks == list_set & acks + assert set() == list_set & naks + for key, data in [_Dict.key_val_from_eq_str(entry) + for entry in self._ls.completed]: + self._dict.set_updating(key, ServerCapability( + data=data, enabled=key in self._list.completed)) + return True + return False + + class Client(ABC, ClientQueueMixin): 'Abstracts socket connection, loop over it, and handling messages from it.' _caps: _CapsManager @@ -732,7 +710,8 @@ class Client(ABC, ClientQueueMixin): self._log(alert, alert=True) elif ret['verb'] == '433': # ERR_NICKNAMEINUSE self._log('nickname already in use, trying increment', alert=True) - self.send(IrcMessage('NICK', (_nick_incremented(ret['used']),))) + self.send(IrcMessage( + 'NICK', (_NickUserHost(nick=ret['used']).incremented,))) elif ret['verb'] in {'903', '904'}: # RPL_SASLSUCCESS, ERR_SASLFAIL self._caps.end_negotiation() elif ret['verb'] == 'AUTHENTICATE': @@ -780,3 +759,20 @@ class Client(ABC, ClientQueueMixin): ch.remove_nick(ret['quitter']) self._log(f'{ret["quitter"]} quits: {ret["message"]}', LogScope.CHAT, target=ch_name) + + +ClientsDb = dict[str, Client] + + +@dataclass +class NewClientEvent(AffectiveEvent): + 'Put Client .payload into ClientsDb target.' + payload: 'Client' + + def affect(self, target: ClientsDb) -> None: + target[self.payload.client_id] = self.payload + + +@dataclass +class ClientEvent(AffectiveEvent, _ClientIdMixin): + 'To affect Client identified by ClientIdMixin.' diff --git a/ircplom/client_tui.py b/ircplom/client_tui.py index 1512535..b7cc445 100644 --- a/ircplom/client_tui.py +++ b/ircplom/client_tui.py @@ -26,6 +26,75 @@ _LOG_PREFIX_OUT = '>' _LOG_PREFIX_IN = '<' +@dataclass +class _Update: + path: tuple[str, ...] + value: Optional[Any] = None + + +class _UpdatingNode(AutoAttrMixin): + log_scopes: dict[tuple[str, ...], LogScope] = {tuple(): LogScope.SERVER} + + def _make_attr(self, cls: Callable, key: str): + return cls() + + @classmethod + def _scope(cls, path: tuple[str, ...]) -> LogScope: + scopes: dict[tuple[str, ...], LogScope] = {} + for c in cls.__mro__: + if hasattr(c, 'log_scopes'): + scopes = c.log_scopes | scopes + return scopes.get(path, scopes[tuple()]) + + def set_and_check_for_change(self, update: _Update + ) -> Optional[tuple[LogScope, Any]]: + 'Apply update, return if that actually made a difference.' + key = update.path[0] + node = self._get(key) + scope = self._scope(update.path) + if len(update.path) == 1: + if update.value is None: + if not self._is_set(key): + return None + self._unset(key) + return (scope, None) + if node == update.value: + return None + self._set(key, update.value) + return (scope, update.value) + return node.set_and_check_for_change(_Update(update.path[1:], + update.value)) + + def _get(self, key: str): + return getattr(self, key) + + def _set(self, key: str, value) -> None: + setattr(self, key, value) + + def _unset(self, key: str) -> None: + getattr(self, key).clear() + + def _is_set(self, key: str) -> bool: + return hasattr(self, key) + + +class _UpdatingDict(Dict[DictItem], _UpdatingNode): + + def _get(self, key: str): + if key not in self._dict: + self._dict[key] = self._item_cls() + return self._dict[key] + + def _set(self, key: str, value) -> None: + self._dict[key] = value + + def _unset(self, key: str) -> None: + del self._dict[key] + + def _is_set(self, key: str) -> bool: + return key in self._dict + + class _ClientWindow(Window, ClientQueueMixin): def __init__(self, scope: LogScope, log: Callable, **kwargs) -> None: @@ -117,75 +186,6 @@ class _ChannelWindow(_ChatWindow): self._send_msg('PART', (self.chatname,)) -@dataclass -class _Update: - path: tuple[str, ...] - value: Optional[Any] = None - - -class _UpdatingNode(AutoAttrMixin): - log_scopes: dict[tuple[str, ...], LogScope] = {tuple(): LogScope.SERVER} - - def _make_attr(self, cls: Callable, key: str): - return cls() - - @classmethod - def _scope(cls, path: tuple[str, ...]) -> LogScope: - scopes: dict[tuple[str, ...], LogScope] = {} - for c in cls.__mro__: - if hasattr(c, 'log_scopes'): - scopes = c.log_scopes | scopes - return scopes.get(path, scopes[tuple()]) - - def set_and_check_for_change(self, update: _Update - ) -> Optional[tuple[LogScope, Any]]: - 'Apply update, return if that actually made a difference.' - key = update.path[0] - node = self._get(key) - scope = self._scope(update.path) - if len(update.path) == 1: - if update.value is None: - if not self._is_set(key): - return None - self._unset(key) - return (scope, None) - if node == update.value: - return None - self._set(key, update.value) - return (scope, update.value) - return node.set_and_check_for_change(_Update(update.path[1:], - update.value)) - - def _get(self, key: str): - return getattr(self, key) - - def _set(self, key: str, value) -> None: - setattr(self, key, value) - - def _unset(self, key: str) -> None: - getattr(self, key).clear() - - def _is_set(self, key: str) -> bool: - return hasattr(self, key) - - -class _UpdatingDict(Dict[DictItem], _UpdatingNode): - - def _get(self, key: str): - if key not in self._dict: - self._dict[key] = self._item_cls() - return self._dict[key] - - def _set(self, key: str, value) -> None: - self._dict[key] = value - - def _unset(self, key: str) -> None: - del self._dict[key] - - def _is_set(self, key: str) -> bool: - return key in self._dict - - class _UpdatingChannel(_UpdatingNode): user_ids: tuple[str, ...] = tuple() log_scopes = {tuple(): LogScope.CHAT}