home · contact · privacy
Tighten user identity knowledge controls.
authorChristian Heller <c.heller@plomlompom.de>
Thu, 4 Sep 2025 05:14:01 +0000 (07:14 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Thu, 4 Sep 2025 05:14:01 +0000 (07:14 +0200)
ircplom/client.py
ircplom/msg_parse_expectations.py

index f12ed8ea13efdc63afa8ece4216ad4a95e0b570e..db59a8946405babf53a989e5e4e8b63b376df8c1 100644 (file)
@@ -359,32 +359,34 @@ class _Channel:
     topic: _CompletableTopic
 
     def __init__(self,
-                 get_id_for_nick: Callable,
+                 id_for_nickuserhost: Callable,
                  get_membership_prefixes: Callable,
                  purge_users: Callable,
                  **kwargs
                  ) -> None:
-        self._get_id_for_nick = get_id_for_nick
+        self._id_for_nickuserhost = id_for_nickuserhost
         self._get_membership_prefixes = get_membership_prefixes
-        self._purge_users = purge_users
+        self.purge_users = purge_users
         super().__init__(**kwargs)
 
     def add_from_namreply(self, items: tuple[str, ...]):
         'Add to .user_ids items assumed as nicknames with membership prefixes.'
         for item in items:
             nickname = item.lstrip(self._get_membership_prefixes())
-            self.user_ids.append(self._get_id_for_nick(nickname))
+            self.user_ids.append(
+                    self._id_for_nickuserhost(_NickUserHost(nickname),
+                                              create_if_none=True))
 
-    def append_nick(self, nickname: str) -> None:
+    def append_nick(self, nickuserhost: '_NickUserHost') -> None:
         'To .user_ids append .nickname and declare .user_ids complete.'
-        user_id = self._get_id_for_nick(nickname)
+        user_id = self._id_for_nickuserhost(nickuserhost, create_if_none=True)
         self.user_ids.append(user_id, complete=True)
 
-    def remove_nick(self, nickname: str) -> None:
+    def remove_nick(self, nickuserhost: '_NickUserHost') -> None:
         'From .user_ids remove .nickname and declare .user_ids complete.'
-        user_id = self._get_id_for_nick(nickname)
+        user_id = self._id_for_nickuserhost(nickuserhost)
         self.user_ids.remove(user_id, complete=True)
-        self._purge_users()
+        self.purge_users()
 
 
 class _NickUserHost(NickUserHost):
@@ -465,9 +467,9 @@ class _ClientDb(_UpdatingMixin, SharedClientDbFields):
             attr._defaults = ISUPPORT_DEFAULTS
         elif key == 'channels' and not attr._create_if_none:
             attr._create_if_none = {
-                    'get_id_for_nick': self.user_id,
+                    'id_for_nickuserhost': self.userid_for_nickuserhost,
                     'get_membership_prefixes': self._get_membership_prefixes,
-                    'purge_users': self._purge_users}
+                    'purge_users': self.purge_users}
         return attr
 
     def clear(self) -> None:
@@ -479,7 +481,8 @@ class _ClientDb(_UpdatingMixin, SharedClientDbFields):
             elif isinstance(value, str):
                 setattr(self, key, '')
 
-    def _purge_users(self) -> None:
+    def purge_users(self) -> None:
+        'Remove from .users all not linked to by existing channels, except us.'
         to_keep = {'me'}
         for chan in self.channels.values():
             to_keep |= set(chan.user_ids.completed)
@@ -506,23 +509,39 @@ class _ClientDb(_UpdatingMixin, SharedClientDbFields):
         assert toks[0][0] == '('
         return toks[1]
 
-    def chans_of_user(self, nickname: str) -> dict[str, _UpdatingChannel]:
+    def chans_of_user(self,
+                      nickuserhost: _NickUserHost
+                      ) -> dict[str, _UpdatingChannel]:
         'Return dictionary of channels user is in.'
-        id_ = self.user_id(nickname)
+        id_ = self.userid_for_nickuserhost(nickuserhost)
         return {k: self.channels[k] for k in self.channels.keys()
                 if id_ in self.channels[k].user_ids.completed}
 
-    def user_id(self, query: str | _NickUserHost) -> str:
-        'Return user_id for nickname of entire NickUserHost, create if none.'
-        nick = query if isinstance(query, str) else query.nick
+    def userid_for_nickuserhost(self,
+                                nickuserhost: _NickUserHost,
+                                create_if_none=False) -> str:
+        'Return user_id for nickuserhost.nick, create if none, maybe update.'
         matches = [id_ for id_ in self.users.keys()
-                   if self.users[id_].nick == nick]
-        assert len(matches) < 2
-        id_ = matches[0] if matches else str(uuid4())
-        if isinstance(query, _NickUserHost):
-            self.users.set_updating(id_, query)
-        elif not matches:
-            self.users.set_updating(id_, _NickUserHost(query))
+                   if self.users[id_].nick == nickuserhost.nick]
+        assert len(matches) in ({0, 1} if create_if_none else {1})
+        if len(matches) == 1:
+            id_ = matches[0]
+            if '?' in {nickuserhost.user, nickuserhost.host}:
+                assert nickuserhost.user == nickuserhost.host
+                return id_
+            stored = self.users[id_]
+            if '?' in {stored.user, stored.host}:
+                assert stored.user == stored.host
+                self.users.set_updating(id_, nickuserhost)
+            else:
+                assert nickuserhost.host == stored.host
+                if nickuserhost.user == f'~{stored.user}':
+                    self.users.set_updating(id_, nickuserhost)
+                else:
+                    assert nickuserhost.user == stored.user
+        else:
+            id_ = str(uuid4())
+            self.users.set_updating(id_, nickuserhost)
         return id_
 
 
@@ -692,8 +711,9 @@ class Client(ABC, ClientQueueMixin):
         for task, tok_names in [t for t in ret['_tasks'].items()
                                 if t[0].verb == 'set']:
             assert task.path == ('user',)
-            assert tok_names == ['sender']
-            self.db.user_id(ret['sender'])
+            assert tok_names in (['sender'], ['joiner'])
+            self.db.userid_for_nickuserhost(ret[tok_names[0]],
+                                            create_if_none=True)
         for verb in ('setattr', 'do', 'doafter'):
             for task, tok_names in [t for t in ret['_tasks'].items()
                                     if t[0].verb == verb]:
@@ -751,7 +771,7 @@ class Client(ABC, ClientQueueMixin):
                 and ret['joiner'].nick != self.db.users['me'].nick:
             self.db.channels[ret['channel']].append_nick(ret['joiner'])
         elif ret['_verb'] == 'NICK':
-            user_id = self.db.user_id(ret['named'])
+            user_id = self.db.userid_for_nickuserhost(ret['named'])
             self.db.users[user_id].nick = ret['nick']
             if user_id == 'me':
                 self.db.nick_wanted = ret['nick']
@@ -770,6 +790,7 @@ class Client(ABC, ClientQueueMixin):
                           LogScope.CHAT, target=ret['channel'])
             if ret['parter'] == self.db.users['me']:
                 del self.db.channels[ret['channel']]
+                self.db.purge_users()
         elif ret['_verb'] == 'PING':
             self.send(IrcMessage(verb='PONG', params=(ret['reply'],)))
         elif ret['_verb'] == 'QUIT':
index 585f25bce376525facb316ca6c525d81057d591f..99839023ce299555e62bd788a24e321b6dd274c3 100644 (file)
@@ -458,7 +458,7 @@ MSG_EXPECTATIONS: list[_MsgParseExpectation] = [
 
     _MsgParseExpectation(
         'JOIN',
-        (_MsgTok.NICK_USER_HOST, ':joiner'),
+        (_MsgTok.NICK_USER_HOST, 'set_user:joiner'),
         ((_MsgTok.CHANNEL, ':channel'),)),
 
     _MsgParseExpectation(