From: Christian Heller Date: Thu, 4 Sep 2025 05:14:01 +0000 (+0200) Subject: Tighten user identity knowledge controls. X-Git-Url: https://plomlompom.com/repos/day?a=commitdiff_plain;h=a753e8c8078f560109b66531b38fe608f9f4d191;p=ircplom Tighten user identity knowledge controls. --- diff --git a/ircplom/client.py b/ircplom/client.py index f12ed8e..db59a89 100644 --- a/ircplom/client.py +++ b/ircplom/client.py @@ -359,32 +359,34 @@ class _Channel: topic: _CompletableTopic def __init__(self, - get_id_for_nick: Callable, + id_for_nickuserhost: Callable, get_membership_prefixes: Callable, purge_users: Callable, **kwargs ) -> None: - self._get_id_for_nick = get_id_for_nick + self._id_for_nickuserhost = id_for_nickuserhost self._get_membership_prefixes = get_membership_prefixes - self._purge_users = purge_users + self.purge_users = purge_users super().__init__(**kwargs) def add_from_namreply(self, items: tuple[str, ...]): 'Add to .user_ids items assumed as nicknames with membership prefixes.' for item in items: nickname = item.lstrip(self._get_membership_prefixes()) - self.user_ids.append(self._get_id_for_nick(nickname)) + self.user_ids.append( + self._id_for_nickuserhost(_NickUserHost(nickname), + create_if_none=True)) - def append_nick(self, nickname: str) -> None: + def append_nick(self, nickuserhost: '_NickUserHost') -> None: 'To .user_ids append .nickname and declare .user_ids complete.' - user_id = self._get_id_for_nick(nickname) + user_id = self._id_for_nickuserhost(nickuserhost, create_if_none=True) self.user_ids.append(user_id, complete=True) - def remove_nick(self, nickname: str) -> None: + def remove_nick(self, nickuserhost: '_NickUserHost') -> None: 'From .user_ids remove .nickname and declare .user_ids complete.' - user_id = self._get_id_for_nick(nickname) + user_id = self._id_for_nickuserhost(nickuserhost) self.user_ids.remove(user_id, complete=True) - self._purge_users() + self.purge_users() class _NickUserHost(NickUserHost): @@ -465,9 +467,9 @@ class _ClientDb(_UpdatingMixin, SharedClientDbFields): attr._defaults = ISUPPORT_DEFAULTS elif key == 'channels' and not attr._create_if_none: attr._create_if_none = { - 'get_id_for_nick': self.user_id, + 'id_for_nickuserhost': self.userid_for_nickuserhost, 'get_membership_prefixes': self._get_membership_prefixes, - 'purge_users': self._purge_users} + 'purge_users': self.purge_users} return attr def clear(self) -> None: @@ -479,7 +481,8 @@ class _ClientDb(_UpdatingMixin, SharedClientDbFields): elif isinstance(value, str): setattr(self, key, '') - def _purge_users(self) -> None: + def purge_users(self) -> None: + 'Remove from .users all not linked to by existing channels, except us.' to_keep = {'me'} for chan in self.channels.values(): to_keep |= set(chan.user_ids.completed) @@ -506,23 +509,39 @@ class _ClientDb(_UpdatingMixin, SharedClientDbFields): assert toks[0][0] == '(' return toks[1] - def chans_of_user(self, nickname: str) -> dict[str, _UpdatingChannel]: + def chans_of_user(self, + nickuserhost: _NickUserHost + ) -> dict[str, _UpdatingChannel]: 'Return dictionary of channels user is in.' - id_ = self.user_id(nickname) + id_ = self.userid_for_nickuserhost(nickuserhost) return {k: self.channels[k] for k in self.channels.keys() if id_ in self.channels[k].user_ids.completed} - def user_id(self, query: str | _NickUserHost) -> str: - 'Return user_id for nickname of entire NickUserHost, create if none.' - nick = query if isinstance(query, str) else query.nick + def userid_for_nickuserhost(self, + nickuserhost: _NickUserHost, + create_if_none=False) -> str: + 'Return user_id for nickuserhost.nick, create if none, maybe update.' matches = [id_ for id_ in self.users.keys() - if self.users[id_].nick == nick] - assert len(matches) < 2 - id_ = matches[0] if matches else str(uuid4()) - if isinstance(query, _NickUserHost): - self.users.set_updating(id_, query) - elif not matches: - self.users.set_updating(id_, _NickUserHost(query)) + if self.users[id_].nick == nickuserhost.nick] + assert len(matches) in ({0, 1} if create_if_none else {1}) + if len(matches) == 1: + id_ = matches[0] + if '?' in {nickuserhost.user, nickuserhost.host}: + assert nickuserhost.user == nickuserhost.host + return id_ + stored = self.users[id_] + if '?' in {stored.user, stored.host}: + assert stored.user == stored.host + self.users.set_updating(id_, nickuserhost) + else: + assert nickuserhost.host == stored.host + if nickuserhost.user == f'~{stored.user}': + self.users.set_updating(id_, nickuserhost) + else: + assert nickuserhost.user == stored.user + else: + id_ = str(uuid4()) + self.users.set_updating(id_, nickuserhost) return id_ @@ -692,8 +711,9 @@ class Client(ABC, ClientQueueMixin): for task, tok_names in [t for t in ret['_tasks'].items() if t[0].verb == 'set']: assert task.path == ('user',) - assert tok_names == ['sender'] - self.db.user_id(ret['sender']) + assert tok_names in (['sender'], ['joiner']) + self.db.userid_for_nickuserhost(ret[tok_names[0]], + create_if_none=True) for verb in ('setattr', 'do', 'doafter'): for task, tok_names in [t for t in ret['_tasks'].items() if t[0].verb == verb]: @@ -751,7 +771,7 @@ class Client(ABC, ClientQueueMixin): and ret['joiner'].nick != self.db.users['me'].nick: self.db.channels[ret['channel']].append_nick(ret['joiner']) elif ret['_verb'] == 'NICK': - user_id = self.db.user_id(ret['named']) + user_id = self.db.userid_for_nickuserhost(ret['named']) self.db.users[user_id].nick = ret['nick'] if user_id == 'me': self.db.nick_wanted = ret['nick'] @@ -770,6 +790,7 @@ class Client(ABC, ClientQueueMixin): LogScope.CHAT, target=ret['channel']) if ret['parter'] == self.db.users['me']: del self.db.channels[ret['channel']] + self.db.purge_users() elif ret['_verb'] == 'PING': self.send(IrcMessage(verb='PONG', params=(ret['reply'],))) elif ret['_verb'] == 'QUIT': diff --git a/ircplom/msg_parse_expectations.py b/ircplom/msg_parse_expectations.py index 585f25b..9983902 100644 --- a/ircplom/msg_parse_expectations.py +++ b/ircplom/msg_parse_expectations.py @@ -458,7 +458,7 @@ MSG_EXPECTATIONS: list[_MsgParseExpectation] = [ _MsgParseExpectation( 'JOIN', - (_MsgTok.NICK_USER_HOST, ':joiner'), + (_MsgTok.NICK_USER_HOST, 'set_user:joiner'), ((_MsgTok.CHANNEL, ':channel'),)), _MsgParseExpectation(