(1, 5),
(251, 255),
(265, 266),
+ 353,
+ 366,
372,
(375, 376),
396,
self._db.append('caps_LS', param, keep_confirmed=True)
case 'DEL':
for param in params[-1].split():
- self._db.caps_LS.remove(param)
+ self._db.remove('caps_LS', param)
case 'ACK' | 'NAK':
for name in params[-1].split():
if params[0] == 'ACK':
password: str
-class ClientDbBase:
+class _Db:
'For values of variable confirmation, and reading in multi-line lists.'
- client_host: str
- isupports: list[str]
- motd: list[str]
- nickname: str
- user_modes: str
def __init__(self) -> None:
self._dict: dict[str, ClientDbType] = {}
self._typecheck(key, value)
return (value, key in self._confirmeds)
-
-class _ClientDb(ClientDbBase):
- caps_LS: list[str]
- caps_LIST: list[str]
- hostname: str
- password: str
- port: int
- realname: str
-
def append(self, key: str, value: str, keep_confirmed=False) -> None:
'To list[str] keyed by key, append value; if non-existant, create it.'
if not keep_confirmed and key in self._confirmeds:
else:
raise CrashingException('called on non-list entry')
+ def remove(self, key, value: str) -> None:
+ 'From list[str] keyed by key, remove value.'
+ targeted = self._dict.get(key, None)
+ if isinstance(targeted, list):
+ targeted.remove(value)
+ else:
+ raise CrashingException('called on non-list entry')
+
+
+class ClientDbBase(_Db):
+ 'DB exposable to TUI.'
+ client_host: str
+ isupports: list[str]
+ motd: list[str]
+ nickname: str
+ user_modes: str
+
+
+class _ChannelDb(_Db):
+ users: list[str]
+ topic: str
+ channel_modes: str
+
+
+class _ClientDb(ClientDbBase):
+ caps_LS: list[str]
+ caps_LIST: list[str]
+ hostname: str
+ password: str
+ port: int
+ realname: str
+ _channels: dict[str, _ChannelDb]
+
@property
def conn_setup(self) -> IrcConnSetup:
'Constructed out of stored entries *including* unconfirmed ones.'
d[name] = _ServerCapability(name in self.caps_LIST, data)
return d
+ def del_channel(self, name: str) -> None:
+ 'Remove DB for channel of name.'
+ del self._channels[name]
+
+ def channel(self, name: str) -> _ChannelDb:
+ 'Produce DB for channel of name – pre-existing, or newly created.'
+ if self._channels is None:
+ self._channels = {}
+ if name not in self._channels:
+ self._channels[name] = _ChannelDb()
+ return self._channels[name]
+
class Client(ABC, ClientQueueMixin):
'Abstracts socket connection, loop over it, and handling messages from it.'
case '005' if len(msg.params) > 2:
for param in msg.params[1:-1]:
self._db.append('isupports', param)
+ case '353' if len(msg.params) == 4:
+ for user in msg.params[3].split():
+ self._db.channel(msg.params[2]).append('users', user)
+ case '366' if len(msg.params) == 3:
+ self._db.channel(msg.params[1]).set('users', None, True)
case '372' if len(msg.params) == 2: # RPL_MOTD
self._db.append('motd', msg.params[-1])
case '376' if len(msg.params) == 2: # RPL_ENDOFMOTD
case 'ERROR' if len(msg.params) == 1:
self.close()
case 'JOIN' if len(msg.params) == 1:
+ if msg.nick_from_source != self._db.nickname:
+ self._db.channel(channel).append(
+ 'users', msg.nick_from_source, keep_confirmed=True)
self._log(log_msg, scope=scope, channel=channel)
case 'MODE' if (len(msg.params) == 2
and msg.params[0] == self._db.nickname):
if len(msg.params) > 1:
log_msg += f': {msg.params[1]}'
self._log(log_msg, scope=scope, channel=channel)
+ if msg.nick_from_source == self._db.nickname:
+ self._db.del_channel(channel)
+ else:
+ self._db.channel(channel).remove('users',
+ msg.nick_from_source)
case 'PING' if len(msg.params) == 1:
self.send(IrcMessage(verb='PONG', params=(msg.params[0],)))
case '903' | '904' if len(msg.params) == 1: