home · contact · privacy
Add tests for .source = user address, and have tests potentially return values for...
authorChristian Heller <c.heller@plomlompom.de>
Tue, 19 Aug 2025 05:26:24 +0000 (07:26 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Tue, 19 Aug 2025 05:26:24 +0000 (07:26 +0200)
ircplom/client.py

index 648e284fe4000373214250e90615950be8c4e133..0a1574740eb72681dd8fdb7f32ee95296c93d96c 100644 (file)
@@ -41,6 +41,7 @@ _NUMERICS_TO_IGNORE = (
 
 class _MsgSource(Enum):
     NONE = auto()
+    USER_ADDRESS = auto()
     SERVER = auto()
 
 
@@ -67,34 +68,20 @@ _EXPECTATIONS: dict[str, _MsgParseExpectation] = {
    'AUTHENTICATE': _MsgParseExpectation(params=('+',), source=_MsgSource.NONE),
    'CAP': _MsgParseExpectation(3, 15, source=_MsgSource.SERVER),
    'ERROR': _MsgParseExpectation(1, source=_MsgSource.NONE),
-   'JOIN': _MsgParseExpectation(1),
-   'MODE': _MsgParseExpectation(2),
-   'NICK': _MsgParseExpectation(1),
+   'JOIN': _MsgParseExpectation(1, source=_MsgSource.USER_ADDRESS),
+   'MODE': _MsgParseExpectation(2, source=_MsgSource.USER_ADDRESS),
+   'NICK': _MsgParseExpectation(1, source=_MsgSource.USER_ADDRESS),
    'NOTICE': _MsgParseExpectation(2),
-   'PART': _MsgParseExpectation(1, 2),
+   'PART': _MsgParseExpectation(1, 2, source=_MsgSource.USER_ADDRESS),
    'PING': _MsgParseExpectation(1, source=_MsgSource.NONE),
    'PRIVMSG': _MsgParseExpectation(2),
-   'QUIT': _MsgParseExpectation(1),
+   'QUIT': _MsgParseExpectation(1, source=_MsgSource.USER_ADDRESS),
 }
 
 
 class _IrcMsg(IrcMessage):
     'Extends IrcMessage with some conveniences.'
 
-    # def match(self, verb: str) -> bool:
-    #     'Test .verb, .params.'
-    #     if not verb == self.verb:
-    #         return False
-    #     expect = _EXPECTATIONS[verb]
-    #     if expect.source is not None and self.source != expect.source:
-    #         return False
-    #     if expect.params:
-    #         return self.params == expect.params
-    #     n_msg_params = len(self.params)
-    #     if expect.len_max_params <= expect.len_min_params:
-    #         return n_msg_params == expect.len_min_params
-    #     return expect.len_min_params <= n_msg_params <= expect.len_max_params
-
     @property
     def nick_from_source(self) -> str:
         'Parse .source into user nickname.'
@@ -572,24 +559,40 @@ class Client(ABC, ClientQueueMixin):
         self._log(f'connection broken: {e}', alert=True)
         self.close()
 
-    def _match_msg(self, msg: _IrcMsg, verb: str) -> bool:
+    def _match_msg(self, msg: _IrcMsg, verb: str):
         'Test .verb, .params.'
+        to_return: dict[str, Optional[str]] = {'_': None}
         if not msg.verb == verb:
             return False
         expect = _EXPECTATIONS[verb]
-        if expect.source is _MsgSource.NONE and msg.source != '':
+        if expect.source is _MsgSource.NONE:
+            if msg.source != '':
+                return False
+        elif expect.source is _MsgSource.SERVER:
+            if ('!' in msg.source
+                    or '@' in msg.source
+                    or '.' not in msg.source
+                    or self._db.hostname.split('.')[-2:]
+                    != msg.source.split('.')[-2:]):
+                return False
+        elif expect.source is _MsgSource.USER_ADDRESS:
+            toks = msg.source.split('!')
+            if len(toks) != 2:
+                return False
+            toks = toks[0:1] + toks[1].split('@')
+            if len(toks) != 3:
+                return False
+            to_return['nickname'] = toks[0]
+        if expect.params and msg.params != expect.params:
             return False
-        if expect.source is _MsgSource.SERVER and (
-                '!' in msg.source or '@' in msg.source or '.' not in msg.source
-                or self._db.hostname.split('.')[-2:]
-                != msg.source.split('.')[-2:]):
+        n_len_params = len(msg.params)
+        if expect.len_max_params:
+            if not (expect.len_min_params <= n_len_params
+                    <= expect.len_max_params):
+                return False
+        elif n_len_params != expect.len_min_params:
             return False
-        if expect.params:
-            return msg.params == expect.params
-        n_msg_params = len(msg.params)
-        if expect.len_max_params <= expect.len_min_params:
-            return n_msg_params == expect.len_min_params
-        return expect.len_min_params <= n_msg_params <= expect.len_max_params
+        return to_return
 
     def handle_msg(self, msg: _IrcMsg) -> None:
         'Log msg.raw, then process incoming msg into appropriate client steps.'
@@ -654,18 +657,18 @@ class Client(ABC, ClientQueueMixin):
                     self._caps.end_negotiation()
         elif self._match_msg(msg, 'ERROR'):
             self.close()
-        elif self._match_msg(msg, 'JOIN'):
+        elif (ret := self._match_msg(msg, 'JOIN')):
             channel = msg.params[0]
-            log_msg = f'{msg.nick_from_source} {msg.verb.lower()}s {channel}'
+            log_msg = f'{ret["nickname"]} {msg.verb.lower()}s {channel}'
             self._log(log_msg, scope=LogScope.CHAT, target=channel)
-            if msg.nick_from_source != self._db.nickname:
+            if ret['nickname'] != self._db.nickname:
                 self._db.chan(channel).append_completable(
-                        'users', msg.nick_from_source, stay_complete=True)
+                        'users', ret['nickname'], stay_complete=True)
         elif self._match_msg(msg, 'MODE')\
                 and msg.params[0] == self._db.nickname:
             self._db.user_modes = msg.params[1]
-        elif self._match_msg(msg, 'NICK')\
-                and msg.nick_from_source == self._db.nickname:
+        elif (ret := self._match_msg(msg, 'NICK'))\
+                and ret['nickname'] == self._db.nickname:
             self.set_nick(msg.params[0], confirmed=True)
         elif self._match_msg(msg, 'NOTICE')\
                 and (msg.params[0] != '*' or not self._db.nickname):
@@ -679,26 +682,26 @@ class Client(ABC, ClientQueueMixin):
             if '!' in msg.source:
                 kw |= {'sender': msg.nick_from_source, 'scope': LogScope.CHAT}
             self._log(msg.params[-1], out=False, target=msg.params[0], **kw)
-        elif self._match_msg(msg, 'PART'):
+        elif (ret := self._match_msg(msg, 'PART')):
             channel = msg.params[0]
-            log_msg = f'{msg.nick_from_source} {msg.verb.lower()}s {channel}'
+            log_msg = f'{ret["nickname"]} {msg.verb.lower()}s {channel}'
             if len(msg.params) == 2:
                 log_msg += f': {msg.params[1]}'
             self._log(log_msg, scope=LogScope.CHAT, target=channel)
-            if msg.nick_from_source == self._db.nickname:
+            if ret['nickname'] == self._db.nickname:
                 self._db.del_chan(channel)
             else:
                 self._db.chan(channel).remove_completable(
-                        'users', msg.nick_from_source, stay_complete=True)
+                        'users', ret['nickname'], stay_complete=True)
         elif self._match_msg(msg, 'PING'):
             self.send(IrcMessage(verb='PONG', params=(msg.params[0],)))
-        elif self._match_msg(msg, 'QUIT'):
-            user = msg.nick_from_source
+        elif (ret := self._match_msg(msg, 'QUIT')):
             for chan_name in self._db.chan_names:
                 chan = self._db.chan(chan_name)
-                if user in chan.users:
-                    chan.remove_completable('users', user, stay_complete=True)
-                    self._log(f'{user} quits: {msg.params[0]}',
+                if ret['nickname'] in chan.users:
+                    chan.remove_completable('users', ret['nickname'],
+                                            stay_complete=True)
+                    self._log(f'{ret["nickname"]} quits: {msg.params[0]}',
                               scope=LogScope.CHAT, target=chan_name)
         else:
             self._log(f'PLEASE IMPLEMENT HANDLER FOR: {msg.raw}')