self._dict: dict[str, DictItem] = {}
super().__init__(**kwargs)
+ def items(self) -> tuple[tuple[str, DictItem], ...]:
+ 'Key-value pairs of item registrations.'
+ return tuple((k, v) for k, v in self._dict.items())
+
def __getitem__(self, key: str) -> DictItem:
return self._dict[key]
@property
def _item_cls(self):
- return self.__orig_class__.__args__[0]
+ orig_cls = (self.__orig_class__ if hasattr(self, '__orig_class__')
+ else self.__orig_bases__[0])
+ return orig_cls.__args__[0]
class _Dict(Dict[DictItem]):
pass
+class _UpdatingUsersDict(_UpdatingDict[_UpdatingUser]):
+
+ def id_for_nickuserhost(self,
+ nickuserhost: NickUserHost,
+ create_if_none=False,
+ allow_none=False,
+ updating=False
+ ) -> Optional[str]:
+ 'Return user_id for nickuserhost.nick, create if none, maybe update.'
+ matches = [id_ for id_, user in self._dict.items()
+ if user.nick == nickuserhost.nick]
+ assert len(matches) in ({0, 1} if (create_if_none or allow_none)
+ else {1})
+ if len(matches) == 1:
+ id_ = matches[0]
+ if '?' in {nickuserhost.user, nickuserhost.host}:
+ assert nickuserhost.user == nickuserhost.host # both are '?'
+ # only provided with .nick, no fields we could update
+ return id_
+ stored = self._dict[id_]
+ # .nick by definition same, check other fields for updatability;
+ # allow where '?', or for set .user only to add "~" prefix, assert
+ # nothing else could have changed
+ if stored.host == '?':
+ assert updating
+ stored.host = nickuserhost.host
+ if stored.user == '?':
+ stored.user = nickuserhost.user
+ else:
+ assert nickuserhost.user == stored.user
+ elif nickuserhost.user == f'~{stored.user}':
+ assert updating
+ assert stored.host == nickuserhost.host
+ stored.user = nickuserhost.user
+ else: # not seen set .host with unset .user yet
+ assert stored.user != '?'
+ elif create_if_none:
+ id_ = str(uuid4())
+ self.set_updating(id_, _User.from_nickuserhost(nickuserhost))
+ else:
+ return None
+ return id_
+
+
class _ClientDb(_UpdatingMixin, SharedClientDbFields):
_keep_on_clear = set(IrcConnSetup.__annotations__.keys())
caps: _UpdatingDict[_UpdatingServerCapability]
channels: _UpdatingDict[_UpdatingChannel]
isupport: _UpdatingDict[str]
motd: _UpdatingCompletableStringsList
- users: _UpdatingDict[_UpdatingUser]
+ users: _UpdatingUsersDict
def __getattribute__(self, key: str):
attr = super().__getattribute__(key)
attr._defaults = ISUPPORT_DEFAULTS
elif key == 'channels' and not attr._create_if_none:
attr._create_if_none = {
- 'userid_for_nickuserhost': self.userid_for_nickuserhost,
+ 'userid_for_nickuserhost': self.users.id_for_nickuserhost,
'get_membership_prefixes': self._get_membership_prefixes,
'purge_users': self.purge_users}
return attr
def purge_users(self) -> None:
'Remove from .users all not linked to by existing channels, except us.'
- to_keep = {'me'}
- for chan in self.channels.values():
- to_keep |= set(chan.user_ids.completed)
- for user_id in [id_ for id_ in self.users.keys()
- if id_ not in to_keep]:
- del self.users[user_id]
+ for id_, user in self.users.items():
+ if id_ != 'me' and not self.chans_of_user(user):
+ del self.users[id_]
def is_nick(self, nick: str) -> bool:
'Tests name to match rules for nicknames.'
def chans_of_user(self, user: _User) -> dict[str, _UpdatingChannel]:
'Return dictionary of channels user is in.'
- id_ = self.userid_for_nickuserhost(user)
+ id_ = self.users.id_for_nickuserhost(user)
return {k: self.channels[k] for k in self.channels.keys()
if id_ in self.channels[k].user_ids.completed}
- def userid_for_nickuserhost(self,
- nickuserhost: NickUserHost,
- create_if_none=False,
- allow_none=False,
- updating=False
- ) -> Optional[str]:
- 'Return user_id for nickuserhost.nick, create if none, maybe update.'
- matches = [id_ for id_ in self.users.keys()
- if self.users[id_].nick == nickuserhost.nick]
- assert len(matches) in ({0, 1} if (create_if_none or allow_none)
- else {1})
- if len(matches) == 1:
- id_ = matches[0]
- if '?' in {nickuserhost.user, nickuserhost.host}:
- assert nickuserhost.user == nickuserhost.host # both are '?'
- # only provided with .nick, no fields we could update
- return id_
- stored = self.users[id_]
- # .nick by definition same, check other fields for updatability;
- # allow where '?', or for set .user only to add "~" prefix, assert
- # nothing else could have changed
- if stored.host == '?':
- assert updating
- stored.host = nickuserhost.host
- if stored.user == '?':
- stored.user = nickuserhost.user
- else:
- assert nickuserhost.user == stored.user
- elif nickuserhost.user == f'~{stored.user}':
- assert updating
- assert stored.host == nickuserhost.host
- stored.user = nickuserhost.user
- else: # not seen set .host with unset .user yet
- assert stored.user != '?'
- elif create_if_none:
- id_ = str(uuid4())
- self.users.set_updating(id_, _User.from_nickuserhost(nickuserhost))
- else:
- return None
- return id_
-
class _CapsManager:
self._log(f'PLEASE IMPLEMENT HANDLER FOR: {msg.raw}')
return
for n_u_h in ret['_nickuserhosts']: # update, turn into proper users
- if (id_ := self.db.userid_for_nickuserhost(n_u_h, allow_none=True,
- updating=True)):
+ if (id_ := self.db.users.id_for_nickuserhost(
+ n_u_h, allow_none=True, updating=True)):
for ret_name in [k for k in ret if ret[k] is n_u_h]:
ret[ret_name] = self.db.users[id_]
for verb in ('setattr', 'do', 'doafter'):
elif ret['_verb'] == 'JOIN' and ret['joiner'] != self.db.users['me']:
self.db.channels[ret['channel']].append_user(ret['joiner'])
elif ret['_verb'] == 'NICK':
- user_id = self.db.userid_for_nickuserhost(ret['named'],
- updating=True)
+ user_id = self.db.users.id_for_nickuserhost(ret['named'],
+ updating=True)
assert user_id is not None
self.db.users[user_id].nick = ret['nick']
if user_id == 'me':