home · contact · privacy
Minor refactoring.
authorChristian Heller <c.heller@plomlompom.de>
Fri, 5 Sep 2025 15:18:44 +0000 (17:18 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Fri, 5 Sep 2025 15:18:44 +0000 (17:18 +0200)
ircplom/client.py
ircplom/client_tui.py

index 74f868c06e6ccf876ae86106c09d1a4441dec42a..bdab9482510dddc87609554dc5dce1dd22e16734 100644 (file)
@@ -50,6 +50,10 @@ class Dict(Generic[DictItem]):
         self._dict: dict[str, DictItem] = {}
         super().__init__(**kwargs)
 
+    def items(self) -> tuple[tuple[str, DictItem], ...]:
+        'Key-value pairs of item registrations.'
+        return tuple((k, v) for k, v in self._dict.items())
+
     def __getitem__(self, key: str) -> DictItem:
         return self._dict[key]
 
@@ -59,7 +63,9 @@ class Dict(Generic[DictItem]):
 
     @property
     def _item_cls(self):
-        return self.__orig_class__.__args__[0]
+        orig_cls = (self.__orig_class__ if hasattr(self, '__orig_class__')
+                    else self.__orig_bases__[0])
+        return orig_cls.__args__[0]
 
 
 class _Dict(Dict[DictItem]):
@@ -454,13 +460,57 @@ class _UpdatingUser(_UpdatingMixin, _User):
     pass
 
 
+class _UpdatingUsersDict(_UpdatingDict[_UpdatingUser]):
+
+    def id_for_nickuserhost(self,
+                            nickuserhost: NickUserHost,
+                            create_if_none=False,
+                            allow_none=False,
+                            updating=False
+                            ) -> Optional[str]:
+        'Return user_id for nickuserhost.nick, create if none, maybe update.'
+        matches = [id_ for id_, user in self._dict.items()
+                   if user.nick == nickuserhost.nick]
+        assert len(matches) in ({0, 1} if (create_if_none or allow_none)
+                                else {1})
+        if len(matches) == 1:
+            id_ = matches[0]
+            if '?' in {nickuserhost.user, nickuserhost.host}:
+                assert nickuserhost.user == nickuserhost.host  # both are '?'
+                # only provided with .nick, no fields we could update
+                return id_
+            stored = self._dict[id_]
+            # .nick by definition same, check other fields for updatability;
+            # allow where '?', or for set .user only to add "~" prefix, assert
+            # nothing else could have changed
+            if stored.host == '?':
+                assert updating
+                stored.host = nickuserhost.host
+                if stored.user == '?':
+                    stored.user = nickuserhost.user
+                else:
+                    assert nickuserhost.user == stored.user
+            elif nickuserhost.user == f'~{stored.user}':
+                assert updating
+                assert stored.host == nickuserhost.host
+                stored.user = nickuserhost.user
+            else:  # not seen set .host with unset .user yet
+                assert stored.user != '?'
+        elif create_if_none:
+            id_ = str(uuid4())
+            self.set_updating(id_, _User.from_nickuserhost(nickuserhost))
+        else:
+            return None
+        return id_
+
+
 class _ClientDb(_UpdatingMixin, SharedClientDbFields):
     _keep_on_clear = set(IrcConnSetup.__annotations__.keys())
     caps: _UpdatingDict[_UpdatingServerCapability]
     channels: _UpdatingDict[_UpdatingChannel]
     isupport: _UpdatingDict[str]
     motd: _UpdatingCompletableStringsList
-    users: _UpdatingDict[_UpdatingUser]
+    users: _UpdatingUsersDict
 
     def __getattribute__(self, key: str):
         attr = super().__getattribute__(key)
@@ -468,7 +518,7 @@ class _ClientDb(_UpdatingMixin, SharedClientDbFields):
             attr._defaults = ISUPPORT_DEFAULTS
         elif key == 'channels' and not attr._create_if_none:
             attr._create_if_none = {
-                    'userid_for_nickuserhost': self.userid_for_nickuserhost,
+                    'userid_for_nickuserhost': self.users.id_for_nickuserhost,
                     'get_membership_prefixes': self._get_membership_prefixes,
                     'purge_users': self.purge_users}
         return attr
@@ -484,12 +534,9 @@ class _ClientDb(_UpdatingMixin, SharedClientDbFields):
 
     def purge_users(self) -> None:
         'Remove from .users all not linked to by existing channels, except us.'
-        to_keep = {'me'}
-        for chan in self.channels.values():
-            to_keep |= set(chan.user_ids.completed)
-        for user_id in [id_ for id_ in self.users.keys()
-                        if id_ not in to_keep]:
-            del self.users[user_id]
+        for id_, user in self.users.items():
+            if id_ != 'me' and not self.chans_of_user(user):
+                del self.users[id_]
 
     def is_nick(self, nick: str) -> bool:
         'Tests name to match rules for nicknames.'
@@ -512,51 +559,10 @@ class _ClientDb(_UpdatingMixin, SharedClientDbFields):
 
     def chans_of_user(self, user: _User) -> dict[str, _UpdatingChannel]:
         'Return dictionary of channels user is in.'
-        id_ = self.userid_for_nickuserhost(user)
+        id_ = self.users.id_for_nickuserhost(user)
         return {k: self.channels[k] for k in self.channels.keys()
                 if id_ in self.channels[k].user_ids.completed}
 
-    def userid_for_nickuserhost(self,
-                                nickuserhost: NickUserHost,
-                                create_if_none=False,
-                                allow_none=False,
-                                updating=False
-                                ) -> Optional[str]:
-        'Return user_id for nickuserhost.nick, create if none, maybe update.'
-        matches = [id_ for id_ in self.users.keys()
-                   if self.users[id_].nick == nickuserhost.nick]
-        assert len(matches) in ({0, 1} if (create_if_none or allow_none)
-                                else {1})
-        if len(matches) == 1:
-            id_ = matches[0]
-            if '?' in {nickuserhost.user, nickuserhost.host}:
-                assert nickuserhost.user == nickuserhost.host  # both are '?'
-                # only provided with .nick, no fields we could update
-                return id_
-            stored = self.users[id_]
-            # .nick by definition same, check other fields for updatability;
-            # allow where '?', or for set .user only to add "~" prefix, assert
-            # nothing else could have changed
-            if stored.host == '?':
-                assert updating
-                stored.host = nickuserhost.host
-                if stored.user == '?':
-                    stored.user = nickuserhost.user
-                else:
-                    assert nickuserhost.user == stored.user
-            elif nickuserhost.user == f'~{stored.user}':
-                assert updating
-                assert stored.host == nickuserhost.host
-                stored.user = nickuserhost.user
-            else:  # not seen set .host with unset .user yet
-                assert stored.user != '?'
-        elif create_if_none:
-            id_ = str(uuid4())
-            self.users.set_updating(id_, _User.from_nickuserhost(nickuserhost))
-        else:
-            return None
-        return id_
-
 
 class _CapsManager:
 
@@ -722,8 +728,8 @@ class Client(ABC, ClientQueueMixin):
             self._log(f'PLEASE IMPLEMENT HANDLER FOR: {msg.raw}')
             return
         for n_u_h in ret['_nickuserhosts']:  # update, turn into proper users
-            if (id_ := self.db.userid_for_nickuserhost(n_u_h, allow_none=True,
-                                                       updating=True)):
+            if (id_ := self.db.users.id_for_nickuserhost(
+                    n_u_h, allow_none=True, updating=True)):
                 for ret_name in [k for k in ret if ret[k] is n_u_h]:
                     ret[ret_name] = self.db.users[id_]
         for verb in ('setattr', 'do', 'doafter'):
@@ -782,8 +788,8 @@ class Client(ABC, ClientQueueMixin):
         elif ret['_verb'] == 'JOIN' and ret['joiner'] != self.db.users['me']:
             self.db.channels[ret['channel']].append_user(ret['joiner'])
         elif ret['_verb'] == 'NICK':
-            user_id = self.db.userid_for_nickuserhost(ret['named'],
-                                                      updating=True)
+            user_id = self.db.users.id_for_nickuserhost(ret['named'],
+                                                        updating=True)
             assert user_id is not None
             self.db.users[user_id].nick = ret['nick']
             if user_id == 'me':
index dc7c0fa1ed978ef4d53a62904ef19e2e168270e7..6bf296a12fbe48a164c293d952a83b905c651aff 100644 (file)
@@ -80,10 +80,6 @@ class _UpdatingNode(AutoAttrMixin):
 
 class _UpdatingDict(Dict[DictItem], _UpdatingNode):
 
-    def items(self) -> tuple[tuple[str, DictItem], ...]:
-        'Key-value pairs of item registrations.'
-        return tuple((k, v) for k, v in self._dict.items())
-
     def _get(self, key: str):
         if key not in self._dict:
             self._dict[key] = self._item_cls()