from enum import Enum, auto
from getpass import getuser
from threading import Thread
-from typing import Any, Callable, NamedTuple, Optional, Self
+from typing import Any, Callable, Generic, NamedTuple, Optional, Self, TypeVar
from uuid import uuid4
# ourselves
from ircplom.events import (
_MsgTok.ANY,
_MsgTok.ANY,
_MsgTok.ANY)),
+ _MsgParseExpectation(_MsgTok.SERVER,
+ '366', # RPL_ENDOFNAMES
+ ((_MsgTok.NICKNAME, 'set_me_attr:nick'),
+ _MsgTok.CHANNEL,
+ _MsgTok.ANY)), # comment
_MsgParseExpectation(_MsgTok.SERVER,
'375', # RPL_MOTDSTART already implied by 1st 372
((_MsgTok.NICKNAME, 'set_me_attr:nick'),
'=',
(_MsgTok.CHANNEL, ':channel'),
(_MsgTok.LIST, ':names'))),
- _MsgParseExpectation(_MsgTok.SERVER,
- '366', # RPL_ENDOFNAMES
- ((_MsgTok.NICKNAME, 'set_me_attr:nick'),
- (_MsgTok.CHANNEL, ':channel'),
- _MsgTok.ANY)), # comment
_MsgParseExpectation((_MsgTok.NICK_USER_HOST, ':joiner'),
'JOIN',
((_MsgTok.CHANNEL, ':channel'),)),
self._on_update(key)
-class _ChannelDb(_Db):
- _completable_user_ids: _CompletableStringsList
+class SharedChannelDbFields:
+ 'API for fields shared directly in name and type with TUI.'
+ user_ids: tuple[str, ...]
# topic: str
# channel_modes: str
-class SharedClientDbFields(IrcConnSetup):
+_ChannelDbFields = TypeVar('_ChannelDbFields', bound=SharedChannelDbFields)
+
+
+class _ChannelDb(_Db, SharedChannelDbFields):
+
+ def __init__(self, purge_users: Callable, **kwargs) -> None:
+ self._purge_users = purge_users
+ super().__init__(**kwargs)
+
+ def add_user(self, user_id: str) -> None:
+ 'Add user_id to .user_ids.'
+ self.user_ids = tuple(list(self.user_ids) + [user_id])
+ self._on_update('user_ids')
+
+ def remove_user(self, user_id: str) -> None:
+ 'Remove user_id from .user_ids.'
+ self.user_ids = tuple(id_ for id_ in self.user_ids if id_ != user_id)
+ self._on_update('user_ids')
+ self._purge_users()
+
+
+class SharedClientDbFields(IrcConnSetup, Generic[_ChannelDbFields]):
'API for fields shared directly in name and type with TUI.'
connection_state: str
sasl_account: str
sasl_auth_state: str
user_modes: str
users: Any
- _channels: dict[str, Any]
-
- def _purge_users(self) -> None:
- to_keep = {'me'}
- for chan in self._channels.values():
- to_keep |= set(chan.user_ids)
- for user_id in [id_ for id_ in self.users.keys if id_ not in to_keep]:
- del self.users[user_id]
-
- def chans_of_user(self, user_id: str) -> tuple[str, ...]:
- 'Return names of channels user of user_id currently participates in.'
- return tuple(k for k, v in self._channels.items()
- if user_id in v.user_ids)
+ _channels: dict[str, _ChannelDbFields]
@dataclass
self.users[id_] = _NickUserHost(query)
return id_
- def remove_user_from_channel(self, user_id: str, chan_name: str) -> None:
- 'Remove user from channel, check that user deleted if that was last.'
- self.chan(chan_name).remove_completable('user_ids', user_id, True)
- if user_id == 'me':
- self.del_chan(chan_name)
- self._purge_users()
+ def _purge_users(self) -> None:
+ to_keep = {'me'}
+ for chan in self._channels.values():
+ to_keep |= set(chan.user_ids)
+ for user_id in [id_ for id_ in self.users.keys
+ if id_ not in to_keep]:
+ del self.users[user_id]
def remove_user(self, user_id: str) -> tuple[str, ...]:
'Run remove_user_from_channel on all channels user is in.'
- affected_chans = self.chans_of_user(user_id)
- for chan_name in affected_chans:
- self.remove_user_from_channel(user_id, chan_name)
- return affected_chans
+ affected_chans = []
+ for id_, chan in [(k, v) for k, v in self._channels.items()
+ if user_id in v.user_ids]:
+ chan.remove_user(user_id)
+ affected_chans += [id_]
+ return tuple(affected_chans)
def needs_arg(self, key: str) -> bool:
'Reply if attribute of key may reasonably be addressed without an arg.'
'Produce DB for channel of name – pre-existing, or newly created.'
if name not in self._channels:
self._channels[name] = _ChannelDb(
- on_update=lambda k: self._on_update(name, k))
+ on_update=lambda k: self._on_update(name, k),
+ purge_users=self._purge_users)
return self._channels[name]
setattr(self._db, arg, ret[arg])
if task == 'set_me_attr':
setattr(self._db.users['me'], arg, ret[arg])
- if task == 'set_user':
+ if task == 'set_user' and ret[arg] != self._db.users['me']:
self._db.user_id(ret[arg])
if ret['verb'] == '005': # RPL_ISUPPORT
for item in ret['isupports']:
self._db.isupports[toks[0]] = (toks[1] if len(toks) > 1
else '')
elif ret['verb'] == '353': # RPL_NAMREPLY
- for id_ in [self._db.user_id(name.lstrip(_ILLEGAL_NICK_FIRSTCHARS))
- for name in ret['names']]:
- ret['channel']['db'].append_completable('user_ids', id_)
- elif ret['verb'] == '366': # RPL_ENDOFNAMES
- ret['channel']['db'].declare_complete('user_ids')
+ for user_id in [
+ self._db.user_id(name.lstrip(_ILLEGAL_NICK_FIRSTCHARS))
+ for name in ret['names']]:
+ ret['channel']['db'].add_user(user_id)
elif ret['verb'] == '372': # RPL_MOTD
self._db.append_completable('motd', ret['line'])
elif ret['verb'] == '376': # RPL_ENDOFMOTD
elif ret['verb'] == 'ERROR':
self.close()
elif ret['verb'] == 'JOIN' and ret['joiner'] != self._db.users['me']:
- ret['channel']['db'].append_completable(
- 'user_ids', self._db.user_id(ret['joiner']), True)
+ ret['channel']['db'].add_user(self._db.user_id(ret['joiner']))
elif ret['verb'] == 'NICK':
user_id = self._db.user_id(ret['named'])
self._db.users[user_id].nick = ret['nick']
else ret['channel']['id'])}
self._log(ret['message'], out=False, **kw)
elif ret['verb'] == 'PART':
- self._db.remove_user_from_channel(self._db.user_id(ret['parter']),
- ret['channel']['id'])
+ if ret['parter'] == self._db.users['me']:
+ self._db.del_chan(ret['channel']['id'])
+ else:
+ ret['channel']['db'].remove_user(
+ self._db.user_id(ret['parter']))
elif ret['verb'] == 'PING':
self.send(IrcMessage(verb='PONG', params=(ret['reply'],)))
elif ret['verb'] == 'QUIT':