from enum import Enum, auto
from getpass import getuser
from threading import Thread
-from typing import Any, Callable, NamedTuple, Optional
+from typing import Any, Callable, NamedTuple, Optional, Self
# ourselves
from ircplom.events import (
AffectiveEvent, CrashingException, ExceptionEvent, QueueMixin)
_MsgParseExpectation(_MsgTok.SERVER,
'900', # RPL_LOGGEDIN
((_MsgTok.NICKNAME, 'set_db_attr:nickname'),
- (_MsgTok.NICK_USER_HOST, ':full_address'),
+ (_MsgTok.NICK_USER_HOST,
+ 'set_db_attr:_nick_user_host'),
(_MsgTok.ANY, 'set_db_attr:sasl_account'),
_MsgTok.ANY)),
_MsgParseExpectation(_MsgTok.SERVER,
username: str
+class _NickUserHost(NamedTuple):
+ nick: str
+ user: str
+ host: str
+
+ def __str__(self) -> str:
+ return f'{self.nick}!{self.user}@{self.host}'
+
+ @classmethod
+ def from_str(cls, value: str) -> Self:
+ 'Produce from string assumed to fit _!_@_ pattern.'
+ toks = value.split('!')
+ assert len(toks) == 2
+ toks = toks[0:1] + toks[1].split('@')
+ assert len(toks) == 3
+ return cls(*toks)
+
+
class _ClientDb(_Db, SharedClientDbFields):
caps: _UpdatingDict
isupports: _UpdatingDict
_completable_motd: _CompletableStringsList
_channels: dict[str, _ChannelDb]
+ def __init__(self, **kwargs) -> None:
+ super().__init__(**kwargs)
+ self._types['_nick_user_host'] = _NickUserHost
+
def __setattr__(self, key: str, value) -> None:
super().__setattr__(key, value)
if key == 'nickname':
self.nick_wanted = value
+ @property
+ def _nick_user_host(self) -> _NickUserHost:
+ return _NickUserHost(self.nickname, self.username, self.client_host)
+
+ @_nick_user_host.setter
+ def _nick_user_host(self, nick_user_host: _NickUserHost) -> None:
+ self.nickname = nick_user_host.nick
+ self.username = nick_user_host.user
+ self.client_host = nick_user_host.host
+
def needs_arg(self, key: str) -> bool:
'Reply if attribute of key may reasonably be addressed without an arg.'
return not isinstance(getattr(self, key), (bool, int, str, tuple))
def _match_msg(self, msg: IrcMessage) -> dict[str, Any]:
'Test .source, .verb, .params.'
- tok_type = str | tuple[str, ...] | dict[str, str | _ChannelDb]
+ tok_type = (str | _NickUserHost | tuple[str, ...]
+ | dict[str, str | _ChannelDb])
def param_match(ex_tok: str | _MsgTok, msg_tok: str | list[str]
) -> Optional[tok_type | tuple[tok_type, ...]]:
if ex_tok is _MsgTok.NICKNAME:
return msg_tok if msg_tok[0] not in '~&@%+# ' else None
if ex_tok is _MsgTok.NICK_USER_HOST:
- toks = msg_tok.split('!')
- if len(toks) != 2:
+ try:
+ return _NickUserHost.from_str(msg_tok)
+ except AssertionError:
return None
- toks = toks[0:1] + toks[1].split('@')
- return tuple(toks) if len(toks) == 3 else None
if ex_tok is _MsgTok.LIST:
return tuple(msg_tok.split())
return msg_tok
elif ret['verb'] == '433': # ERR_NICKNAMEINUSE
self._log('nickname already in use, trying increment', alert=True)
self.send(IrcMessage('NICK', (_nick_incremented(ret['used']),)))
- elif ret['verb'] == '900': # RPL_LOGGEDIN
- self._db.nickname = ret['full_address'][0]
- self._db.username, self._db.client_host = ret['full_address'][1:]
elif ret['verb'] in {'903', '904'}: # RPL_SASLSUCCESS, ERR_SASLFAIL
self._caps.end_negotiation()
elif ret['verb'] == 'AUTHENTICATE':
elif ret['verb'] == 'ERROR':
self.close()
elif ret['verb'] == 'JOIN':
- self._log(f'{ret["joiner"][0]} {msg.verb.lower()}s '
+ self._log(f'{ret["joiner"]} {msg.verb.lower()}s '
+ f'{ret["channel"]["id"]}',
scope=LogScope.CHAT, target=ret['channel']['id'])
- if ret['joiner'][0] != self._db.nickname:
- ret['channel']['db'].append_completable('users',
- ret['joiner'][0], True)
+ if ret['joiner'].nick != self._db.nickname:
+ ret['channel']['db'].append_completable(
+ 'users', ret['joiner'].nick, True)
elif ret['verb'] == 'NICK':
- if ret['named'][0] == self._db.nickname:
+ if ret['named'].nick == self._db.nickname:
self._db.nickname = ret['nickname']
else:
- for id_, ch in self._db.chans_of_user(ret['named'][0]).items():
- ch.remove_completable('users', ret['named'][0], True)
+ for id_, ch in self._db.chans_of_user(ret['named'].nick
+ ).items():
+ ch.remove_completable('users', ret['named'].nick, True)
ch.append_completable('users', ret['nickname'], True)
- self._log(f'{ret["named"][0]} becomes {ret["nickname"]}',
+ self._log(f'{ret["named"]} becomes {ret["nickname"]}',
scope=LogScope.CHAT, target=id_)
elif ret['verb'] in {'NOTICE', 'PRIVMSG'}:
kw: dict[str, bool | str | LogScope] = {
'as_notice': msg.verb == 'NOTICE'}
if 'sender' in ret: # not just server message
- kw |= {'sender': ret['sender'][0], 'scope': LogScope.CHAT,
- 'target': (ret['sender'][0] if 'nickname' in ret
+ kw |= {'sender': ret['sender'].nick, 'scope': LogScope.CHAT,
+ 'target': (ret['sender'].nick if 'nickname' in ret
else ret['channel']['id'])}
self._log(ret['message'], out=False, **kw)
elif ret['verb'] == 'PART':
reason = f': {ret["reason"]}' if 'reason' in ret else ''
- self._log(f'{ret["parter"][0]} {msg.verb.lower()}s '
+ self._log(f'{ret["parter"]} {msg.verb.lower()}s '
+ f'{ret["channel"]["id"]}{reason}',
scope=LogScope.CHAT, target=ret['channel']['id'])
- if ret['parter'][0] == self._db.nickname:
+ if ret['parter'].nick == self._db.nickname:
self._db.del_chan(ret['channel']['id'])
else:
- ret['channel']['db'].remove_completable('users',
- ret['parter'][0], True)
+ ret['channel']['db'].remove_completable(
+ 'users', ret['parter'].nick, True)
elif ret['verb'] == 'PING':
self.send(IrcMessage(verb='PONG', params=(ret['reply'],)))
elif ret['verb'] == 'QUIT':
- for id_, chan in self._db.chans_of_user(ret['quitter'][0]).items():
- chan.remove_completable('users', ret['quitter'][0], True)
- self._log(f'{ret["quitter"][0]} quits: {ret["message"]}',
+ for id_, ch in self._db.chans_of_user(ret['quitter'].nick).items():
+ ch.remove_completable('users', ret['quitter'].nick, True)
+ self._log(f'{ret["quitter"]} quits: {ret["message"]}',
LogScope.CHAT, target=id_)