From: Christian Heller Date: Tue, 19 Aug 2025 05:26:24 +0000 (+0200) Subject: Add tests for .source = user address, and have tests potentially return values for... X-Git-Url: https://plomlompom.com/repos/error?a=commitdiff_plain;h=430e64fb3d088ff2ffb07cfc4c3456ee8810850f;p=ircplom Add tests for .source = user address, and have tests potentially return values for further processing. --- diff --git a/ircplom/client.py b/ircplom/client.py index 648e284..0a15747 100644 --- a/ircplom/client.py +++ b/ircplom/client.py @@ -41,6 +41,7 @@ _NUMERICS_TO_IGNORE = ( class _MsgSource(Enum): NONE = auto() + USER_ADDRESS = auto() SERVER = auto() @@ -67,34 +68,20 @@ _EXPECTATIONS: dict[str, _MsgParseExpectation] = { 'AUTHENTICATE': _MsgParseExpectation(params=('+',), source=_MsgSource.NONE), 'CAP': _MsgParseExpectation(3, 15, source=_MsgSource.SERVER), 'ERROR': _MsgParseExpectation(1, source=_MsgSource.NONE), - 'JOIN': _MsgParseExpectation(1), - 'MODE': _MsgParseExpectation(2), - 'NICK': _MsgParseExpectation(1), + 'JOIN': _MsgParseExpectation(1, source=_MsgSource.USER_ADDRESS), + 'MODE': _MsgParseExpectation(2, source=_MsgSource.USER_ADDRESS), + 'NICK': _MsgParseExpectation(1, source=_MsgSource.USER_ADDRESS), 'NOTICE': _MsgParseExpectation(2), - 'PART': _MsgParseExpectation(1, 2), + 'PART': _MsgParseExpectation(1, 2, source=_MsgSource.USER_ADDRESS), 'PING': _MsgParseExpectation(1, source=_MsgSource.NONE), 'PRIVMSG': _MsgParseExpectation(2), - 'QUIT': _MsgParseExpectation(1), + 'QUIT': _MsgParseExpectation(1, source=_MsgSource.USER_ADDRESS), } class _IrcMsg(IrcMessage): 'Extends IrcMessage with some conveniences.' - # def match(self, verb: str) -> bool: - # 'Test .verb, .params.' - # if not verb == self.verb: - # return False - # expect = _EXPECTATIONS[verb] - # if expect.source is not None and self.source != expect.source: - # return False - # if expect.params: - # return self.params == expect.params - # n_msg_params = len(self.params) - # if expect.len_max_params <= expect.len_min_params: - # return n_msg_params == expect.len_min_params - # return expect.len_min_params <= n_msg_params <= expect.len_max_params - @property def nick_from_source(self) -> str: 'Parse .source into user nickname.' @@ -572,24 +559,40 @@ class Client(ABC, ClientQueueMixin): self._log(f'connection broken: {e}', alert=True) self.close() - def _match_msg(self, msg: _IrcMsg, verb: str) -> bool: + def _match_msg(self, msg: _IrcMsg, verb: str): 'Test .verb, .params.' + to_return: dict[str, Optional[str]] = {'_': None} if not msg.verb == verb: return False expect = _EXPECTATIONS[verb] - if expect.source is _MsgSource.NONE and msg.source != '': + if expect.source is _MsgSource.NONE: + if msg.source != '': + return False + elif expect.source is _MsgSource.SERVER: + if ('!' in msg.source + or '@' in msg.source + or '.' not in msg.source + or self._db.hostname.split('.')[-2:] + != msg.source.split('.')[-2:]): + return False + elif expect.source is _MsgSource.USER_ADDRESS: + toks = msg.source.split('!') + if len(toks) != 2: + return False + toks = toks[0:1] + toks[1].split('@') + if len(toks) != 3: + return False + to_return['nickname'] = toks[0] + if expect.params and msg.params != expect.params: return False - if expect.source is _MsgSource.SERVER and ( - '!' in msg.source or '@' in msg.source or '.' not in msg.source - or self._db.hostname.split('.')[-2:] - != msg.source.split('.')[-2:]): + n_len_params = len(msg.params) + if expect.len_max_params: + if not (expect.len_min_params <= n_len_params + <= expect.len_max_params): + return False + elif n_len_params != expect.len_min_params: return False - if expect.params: - return msg.params == expect.params - n_msg_params = len(msg.params) - if expect.len_max_params <= expect.len_min_params: - return n_msg_params == expect.len_min_params - return expect.len_min_params <= n_msg_params <= expect.len_max_params + return to_return def handle_msg(self, msg: _IrcMsg) -> None: 'Log msg.raw, then process incoming msg into appropriate client steps.' @@ -654,18 +657,18 @@ class Client(ABC, ClientQueueMixin): self._caps.end_negotiation() elif self._match_msg(msg, 'ERROR'): self.close() - elif self._match_msg(msg, 'JOIN'): + elif (ret := self._match_msg(msg, 'JOIN')): channel = msg.params[0] - log_msg = f'{msg.nick_from_source} {msg.verb.lower()}s {channel}' + log_msg = f'{ret["nickname"]} {msg.verb.lower()}s {channel}' self._log(log_msg, scope=LogScope.CHAT, target=channel) - if msg.nick_from_source != self._db.nickname: + if ret['nickname'] != self._db.nickname: self._db.chan(channel).append_completable( - 'users', msg.nick_from_source, stay_complete=True) + 'users', ret['nickname'], stay_complete=True) elif self._match_msg(msg, 'MODE')\ and msg.params[0] == self._db.nickname: self._db.user_modes = msg.params[1] - elif self._match_msg(msg, 'NICK')\ - and msg.nick_from_source == self._db.nickname: + elif (ret := self._match_msg(msg, 'NICK'))\ + and ret['nickname'] == self._db.nickname: self.set_nick(msg.params[0], confirmed=True) elif self._match_msg(msg, 'NOTICE')\ and (msg.params[0] != '*' or not self._db.nickname): @@ -679,26 +682,26 @@ class Client(ABC, ClientQueueMixin): if '!' in msg.source: kw |= {'sender': msg.nick_from_source, 'scope': LogScope.CHAT} self._log(msg.params[-1], out=False, target=msg.params[0], **kw) - elif self._match_msg(msg, 'PART'): + elif (ret := self._match_msg(msg, 'PART')): channel = msg.params[0] - log_msg = f'{msg.nick_from_source} {msg.verb.lower()}s {channel}' + log_msg = f'{ret["nickname"]} {msg.verb.lower()}s {channel}' if len(msg.params) == 2: log_msg += f': {msg.params[1]}' self._log(log_msg, scope=LogScope.CHAT, target=channel) - if msg.nick_from_source == self._db.nickname: + if ret['nickname'] == self._db.nickname: self._db.del_chan(channel) else: self._db.chan(channel).remove_completable( - 'users', msg.nick_from_source, stay_complete=True) + 'users', ret['nickname'], stay_complete=True) elif self._match_msg(msg, 'PING'): self.send(IrcMessage(verb='PONG', params=(msg.params[0],))) - elif self._match_msg(msg, 'QUIT'): - user = msg.nick_from_source + elif (ret := self._match_msg(msg, 'QUIT')): for chan_name in self._db.chan_names: chan = self._db.chan(chan_name) - if user in chan.users: - chan.remove_completable('users', user, stay_complete=True) - self._log(f'{user} quits: {msg.params[0]}', + if ret['nickname'] in chan.users: + chan.remove_completable('users', ret['nickname'], + stay_complete=True) + self._log(f'{ret["nickname"]} quits: {msg.params[0]}', scope=LogScope.CHAT, target=chan_name) else: self._log(f'PLEASE IMPLEMENT HANDLER FOR: {msg.raw}')