From eefed881e7939398557c3376f254f859974602eb Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Fri, 5 Sep 2025 17:18:44 +0200 Subject: [PATCH] Minor refactoring. --- ircplom/client.py | 116 ++++++++++++++++++++++-------------------- ircplom/client_tui.py | 4 -- 2 files changed, 61 insertions(+), 59 deletions(-) diff --git a/ircplom/client.py b/ircplom/client.py index 74f868c..bdab948 100644 --- a/ircplom/client.py +++ b/ircplom/client.py @@ -50,6 +50,10 @@ class Dict(Generic[DictItem]): self._dict: dict[str, DictItem] = {} super().__init__(**kwargs) + def items(self) -> tuple[tuple[str, DictItem], ...]: + 'Key-value pairs of item registrations.' + return tuple((k, v) for k, v in self._dict.items()) + def __getitem__(self, key: str) -> DictItem: return self._dict[key] @@ -59,7 +63,9 @@ class Dict(Generic[DictItem]): @property def _item_cls(self): - return self.__orig_class__.__args__[0] + orig_cls = (self.__orig_class__ if hasattr(self, '__orig_class__') + else self.__orig_bases__[0]) + return orig_cls.__args__[0] class _Dict(Dict[DictItem]): @@ -454,13 +460,57 @@ class _UpdatingUser(_UpdatingMixin, _User): pass +class _UpdatingUsersDict(_UpdatingDict[_UpdatingUser]): + + def id_for_nickuserhost(self, + nickuserhost: NickUserHost, + create_if_none=False, + allow_none=False, + updating=False + ) -> Optional[str]: + 'Return user_id for nickuserhost.nick, create if none, maybe update.' + matches = [id_ for id_, user in self._dict.items() + if user.nick == nickuserhost.nick] + assert len(matches) in ({0, 1} if (create_if_none or allow_none) + else {1}) + if len(matches) == 1: + id_ = matches[0] + if '?' in {nickuserhost.user, nickuserhost.host}: + assert nickuserhost.user == nickuserhost.host # both are '?' + # only provided with .nick, no fields we could update + return id_ + stored = self._dict[id_] + # .nick by definition same, check other fields for updatability; + # allow where '?', or for set .user only to add "~" prefix, assert + # nothing else could have changed + if stored.host == '?': + assert updating + stored.host = nickuserhost.host + if stored.user == '?': + stored.user = nickuserhost.user + else: + assert nickuserhost.user == stored.user + elif nickuserhost.user == f'~{stored.user}': + assert updating + assert stored.host == nickuserhost.host + stored.user = nickuserhost.user + else: # not seen set .host with unset .user yet + assert stored.user != '?' + elif create_if_none: + id_ = str(uuid4()) + self.set_updating(id_, _User.from_nickuserhost(nickuserhost)) + else: + return None + return id_ + + class _ClientDb(_UpdatingMixin, SharedClientDbFields): _keep_on_clear = set(IrcConnSetup.__annotations__.keys()) caps: _UpdatingDict[_UpdatingServerCapability] channels: _UpdatingDict[_UpdatingChannel] isupport: _UpdatingDict[str] motd: _UpdatingCompletableStringsList - users: _UpdatingDict[_UpdatingUser] + users: _UpdatingUsersDict def __getattribute__(self, key: str): attr = super().__getattribute__(key) @@ -468,7 +518,7 @@ class _ClientDb(_UpdatingMixin, SharedClientDbFields): attr._defaults = ISUPPORT_DEFAULTS elif key == 'channels' and not attr._create_if_none: attr._create_if_none = { - 'userid_for_nickuserhost': self.userid_for_nickuserhost, + 'userid_for_nickuserhost': self.users.id_for_nickuserhost, 'get_membership_prefixes': self._get_membership_prefixes, 'purge_users': self.purge_users} return attr @@ -484,12 +534,9 @@ class _ClientDb(_UpdatingMixin, SharedClientDbFields): def purge_users(self) -> None: 'Remove from .users all not linked to by existing channels, except us.' - to_keep = {'me'} - for chan in self.channels.values(): - to_keep |= set(chan.user_ids.completed) - for user_id in [id_ for id_ in self.users.keys() - if id_ not in to_keep]: - del self.users[user_id] + for id_, user in self.users.items(): + if id_ != 'me' and not self.chans_of_user(user): + del self.users[id_] def is_nick(self, nick: str) -> bool: 'Tests name to match rules for nicknames.' @@ -512,51 +559,10 @@ class _ClientDb(_UpdatingMixin, SharedClientDbFields): def chans_of_user(self, user: _User) -> dict[str, _UpdatingChannel]: 'Return dictionary of channels user is in.' - id_ = self.userid_for_nickuserhost(user) + id_ = self.users.id_for_nickuserhost(user) return {k: self.channels[k] for k in self.channels.keys() if id_ in self.channels[k].user_ids.completed} - def userid_for_nickuserhost(self, - nickuserhost: NickUserHost, - create_if_none=False, - allow_none=False, - updating=False - ) -> Optional[str]: - 'Return user_id for nickuserhost.nick, create if none, maybe update.' - matches = [id_ for id_ in self.users.keys() - if self.users[id_].nick == nickuserhost.nick] - assert len(matches) in ({0, 1} if (create_if_none or allow_none) - else {1}) - if len(matches) == 1: - id_ = matches[0] - if '?' in {nickuserhost.user, nickuserhost.host}: - assert nickuserhost.user == nickuserhost.host # both are '?' - # only provided with .nick, no fields we could update - return id_ - stored = self.users[id_] - # .nick by definition same, check other fields for updatability; - # allow where '?', or for set .user only to add "~" prefix, assert - # nothing else could have changed - if stored.host == '?': - assert updating - stored.host = nickuserhost.host - if stored.user == '?': - stored.user = nickuserhost.user - else: - assert nickuserhost.user == stored.user - elif nickuserhost.user == f'~{stored.user}': - assert updating - assert stored.host == nickuserhost.host - stored.user = nickuserhost.user - else: # not seen set .host with unset .user yet - assert stored.user != '?' - elif create_if_none: - id_ = str(uuid4()) - self.users.set_updating(id_, _User.from_nickuserhost(nickuserhost)) - else: - return None - return id_ - class _CapsManager: @@ -722,8 +728,8 @@ class Client(ABC, ClientQueueMixin): self._log(f'PLEASE IMPLEMENT HANDLER FOR: {msg.raw}') return for n_u_h in ret['_nickuserhosts']: # update, turn into proper users - if (id_ := self.db.userid_for_nickuserhost(n_u_h, allow_none=True, - updating=True)): + if (id_ := self.db.users.id_for_nickuserhost( + n_u_h, allow_none=True, updating=True)): for ret_name in [k for k in ret if ret[k] is n_u_h]: ret[ret_name] = self.db.users[id_] for verb in ('setattr', 'do', 'doafter'): @@ -782,8 +788,8 @@ class Client(ABC, ClientQueueMixin): elif ret['_verb'] == 'JOIN' and ret['joiner'] != self.db.users['me']: self.db.channels[ret['channel']].append_user(ret['joiner']) elif ret['_verb'] == 'NICK': - user_id = self.db.userid_for_nickuserhost(ret['named'], - updating=True) + user_id = self.db.users.id_for_nickuserhost(ret['named'], + updating=True) assert user_id is not None self.db.users[user_id].nick = ret['nick'] if user_id == 'me': diff --git a/ircplom/client_tui.py b/ircplom/client_tui.py index dc7c0fa..6bf296a 100644 --- a/ircplom/client_tui.py +++ b/ircplom/client_tui.py @@ -80,10 +80,6 @@ class _UpdatingNode(AutoAttrMixin): class _UpdatingDict(Dict[DictItem], _UpdatingNode): - def items(self) -> tuple[tuple[str, DictItem], ...]: - 'Key-value pairs of item registrations.' - return tuple((k, v) for k, v in self._dict.items()) - def _get(self, key: str): if key not in self._dict: self._dict[key] = self._item_cls() -- 2.30.2