From 14c4bfd4ff6b3750f13749baabd68d6578a03157 Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Thu, 21 Aug 2025 08:10:02 +0200 Subject: [PATCH] Differentiate NICK_USER_HOST into actual _NickUserHost structure. --- ircplom/client.py | 85 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 58 insertions(+), 27 deletions(-) diff --git a/ircplom/client.py b/ircplom/client.py index f63ed5c..73b4e6b 100644 --- a/ircplom/client.py +++ b/ircplom/client.py @@ -6,7 +6,7 @@ from dataclasses import dataclass, InitVar from enum import Enum, auto from getpass import getuser from threading import Thread -from typing import Any, Callable, NamedTuple, Optional +from typing import Any, Callable, NamedTuple, Optional, Self # ourselves from ircplom.events import ( AffectiveEvent, CrashingException, ExceptionEvent, QueueMixin) @@ -142,7 +142,8 @@ _EXPECTATIONS += [ _MsgParseExpectation(_MsgTok.SERVER, '900', # RPL_LOGGEDIN ((_MsgTok.NICKNAME, 'set_db_attr:nickname'), - (_MsgTok.NICK_USER_HOST, ':full_address'), + (_MsgTok.NICK_USER_HOST, + 'set_db_attr:_nick_user_host'), (_MsgTok.ANY, 'set_db_attr:sasl_account'), _MsgTok.ANY)), _MsgParseExpectation(_MsgTok.SERVER, @@ -666,17 +667,49 @@ class SharedClientDbFields(IrcConnSetup): username: str +class _NickUserHost(NamedTuple): + nick: str + user: str + host: str + + def __str__(self) -> str: + return f'{self.nick}!{self.user}@{self.host}' + + @classmethod + def from_str(cls, value: str) -> Self: + 'Produce from string assumed to fit _!_@_ pattern.' + toks = value.split('!') + assert len(toks) == 2 + toks = toks[0:1] + toks[1].split('@') + assert len(toks) == 3 + return cls(*toks) + + class _ClientDb(_Db, SharedClientDbFields): caps: _UpdatingDict isupports: _UpdatingDict _completable_motd: _CompletableStringsList _channels: dict[str, _ChannelDb] + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self._types['_nick_user_host'] = _NickUserHost + def __setattr__(self, key: str, value) -> None: super().__setattr__(key, value) if key == 'nickname': self.nick_wanted = value + @property + def _nick_user_host(self) -> _NickUserHost: + return _NickUserHost(self.nickname, self.username, self.client_host) + + @_nick_user_host.setter + def _nick_user_host(self, nick_user_host: _NickUserHost) -> None: + self.nickname = nick_user_host.nick + self.username = nick_user_host.user + self.client_host = nick_user_host.host + def needs_arg(self, key: str) -> bool: 'Reply if attribute of key may reasonably be addressed without an arg.' return not isinstance(getattr(self, key), (bool, int, str, tuple)) @@ -793,7 +826,8 @@ class Client(ABC, ClientQueueMixin): def _match_msg(self, msg: IrcMessage) -> dict[str, Any]: 'Test .source, .verb, .params.' - tok_type = str | tuple[str, ...] | dict[str, str | _ChannelDb] + tok_type = (str | _NickUserHost | tuple[str, ...] + | dict[str, str | _ChannelDb]) def param_match(ex_tok: str | _MsgTok, msg_tok: str | list[str] ) -> Optional[tok_type | tuple[tok_type, ...]]: @@ -818,11 +852,10 @@ class Client(ABC, ClientQueueMixin): if ex_tok is _MsgTok.NICKNAME: return msg_tok if msg_tok[0] not in '~&@%+# ' else None if ex_tok is _MsgTok.NICK_USER_HOST: - toks = msg_tok.split('!') - if len(toks) != 2: + try: + return _NickUserHost.from_str(msg_tok) + except AssertionError: return None - toks = toks[0:1] + toks[1].split('@') - return tuple(toks) if len(toks) == 3 else None if ex_tok is _MsgTok.LIST: return tuple(msg_tok.split()) return msg_tok @@ -899,9 +932,6 @@ class Client(ABC, ClientQueueMixin): elif ret['verb'] == '433': # ERR_NICKNAMEINUSE self._log('nickname already in use, trying increment', alert=True) self.send(IrcMessage('NICK', (_nick_incremented(ret['used']),))) - elif ret['verb'] == '900': # RPL_LOGGEDIN - self._db.nickname = ret['full_address'][0] - self._db.username, self._db.client_host = ret['full_address'][1:] elif ret['verb'] in {'903', '904'}: # RPL_SASLSUCCESS, ERR_SASLFAIL self._caps.end_negotiation() elif ret['verb'] == 'AUTHENTICATE': @@ -923,45 +953,46 @@ class Client(ABC, ClientQueueMixin): elif ret['verb'] == 'ERROR': self.close() elif ret['verb'] == 'JOIN': - self._log(f'{ret["joiner"][0]} {msg.verb.lower()}s ' + self._log(f'{ret["joiner"]} {msg.verb.lower()}s ' + f'{ret["channel"]["id"]}', scope=LogScope.CHAT, target=ret['channel']['id']) - if ret['joiner'][0] != self._db.nickname: - ret['channel']['db'].append_completable('users', - ret['joiner'][0], True) + if ret['joiner'].nick != self._db.nickname: + ret['channel']['db'].append_completable( + 'users', ret['joiner'].nick, True) elif ret['verb'] == 'NICK': - if ret['named'][0] == self._db.nickname: + if ret['named'].nick == self._db.nickname: self._db.nickname = ret['nickname'] else: - for id_, ch in self._db.chans_of_user(ret['named'][0]).items(): - ch.remove_completable('users', ret['named'][0], True) + for id_, ch in self._db.chans_of_user(ret['named'].nick + ).items(): + ch.remove_completable('users', ret['named'].nick, True) ch.append_completable('users', ret['nickname'], True) - self._log(f'{ret["named"][0]} becomes {ret["nickname"]}', + self._log(f'{ret["named"]} becomes {ret["nickname"]}', scope=LogScope.CHAT, target=id_) elif ret['verb'] in {'NOTICE', 'PRIVMSG'}: kw: dict[str, bool | str | LogScope] = { 'as_notice': msg.verb == 'NOTICE'} if 'sender' in ret: # not just server message - kw |= {'sender': ret['sender'][0], 'scope': LogScope.CHAT, - 'target': (ret['sender'][0] if 'nickname' in ret + kw |= {'sender': ret['sender'].nick, 'scope': LogScope.CHAT, + 'target': (ret['sender'].nick if 'nickname' in ret else ret['channel']['id'])} self._log(ret['message'], out=False, **kw) elif ret['verb'] == 'PART': reason = f': {ret["reason"]}' if 'reason' in ret else '' - self._log(f'{ret["parter"][0]} {msg.verb.lower()}s ' + self._log(f'{ret["parter"]} {msg.verb.lower()}s ' + f'{ret["channel"]["id"]}{reason}', scope=LogScope.CHAT, target=ret['channel']['id']) - if ret['parter'][0] == self._db.nickname: + if ret['parter'].nick == self._db.nickname: self._db.del_chan(ret['channel']['id']) else: - ret['channel']['db'].remove_completable('users', - ret['parter'][0], True) + ret['channel']['db'].remove_completable( + 'users', ret['parter'].nick, True) elif ret['verb'] == 'PING': self.send(IrcMessage(verb='PONG', params=(ret['reply'],))) elif ret['verb'] == 'QUIT': - for id_, chan in self._db.chans_of_user(ret['quitter'][0]).items(): - chan.remove_completable('users', ret['quitter'][0], True) - self._log(f'{ret["quitter"][0]} quits: {ret["message"]}', + for id_, ch in self._db.chans_of_user(ret['quitter'].nick).items(): + ch.remove_completable('users', ret['quitter'].nick, True) + self._log(f'{ret["quitter"]} quits: {ret["message"]}', LogScope.CHAT, target=id_) -- 2.30.2