def __init__(self, conn_setup: IrcConnSetup, **kwargs) -> None:
self.client_id = conn_setup.hostname
super().__init__(client_id=self.client_id, **kwargs)
- self._db = _ClientDb(on_update=self._on_update)
- self._caps = _CapsManager(self.send, self._db.caps)
+ self.db = _ClientDb(on_update=self._on_update)
+ self.caps = _CapsManager(self.send, self.db.caps)
for k in conn_setup.__annotations__:
- setattr(self._db, k, getattr(conn_setup, k))
- if self._db.port <= 0:
- self._db.port = PORT_SSL
+ setattr(self.db, k, getattr(conn_setup, k))
+ if self.db.port <= 0:
+ self.db.port = PORT_SSL
self._start_connecting()
def _start_connecting(self) -> None:
if self.conn:
raise IrcConnAbortException('already connected')
self.conn = _IrcConnection(
- hostname=self._db.hostname, port=self._db.port,
+ hostname=self.db.hostname, port=self.db.port,
_q_out=self._q_out, client_id=self.client_id)
self._client_trigger('_on_connect')
except IrcConnAbortException as e:
except Exception as e: # pylint: disable=broad-exception-caught
self._put(ExceptionEvent(CrashingException(e)))
- self._db.connection_state = 'connecting'
+ self.db.connection_state = 'connecting'
Thread(target=connect, daemon=True, args=(self,)).start()
def _on_connect(self) -> None:
assert self.conn is not None
- self._db.users.set_updating('me', _NickUserHost('?', getuser(), '?'))
- self._db.connection_state = 'connected'
- self._caps.start_negotation()
+ self.db.users.set_updating('me', _NickUserHost('?', getuser(), '?'))
+ self.db.connection_state = 'connected'
+ self.caps.start_negotation()
self.send(IrcMessage(verb='USER', params=(
- self._db.users['me'].user.lstrip('~'),
- '0', '*', self._db.realname)))
- self.send(IrcMessage(verb='NICK', params=(self._db.nick_wanted,)))
+ self.db.users['me'].user.lstrip('~'),
+ '0', '*', self.db.realname)))
+ self.send(IrcMessage(verb='NICK', params=(self.db.nick_wanted,)))
def close(self) -> None:
'Close connection and wipe memory of its states.'
- self._db.clear()
+ self.db.clear()
if self.conn:
self.conn.close()
self.conn = None
return msg_tok if ('.' in msg_tok
and not set('@!') & set(msg_tok)) else None
if ex_tok is MsgTok.CHANNEL:
- return msg_tok if self._db.is_chan_name(msg_tok) else None
+ return msg_tok if self.db.is_chan_name(msg_tok) else None
if ex_tok is MsgTok.NICKNAME:
return (msg_tok
- if msg_tok[0] not in self._db.illegal_nick_firstchars
+ if msg_tok[0] not in self.db.illegal_nick_firstchars
else None)
if ex_tok is MsgTok.NICK_USER_HOST:
try:
return tuple(msg_tok.split())
return msg_tok
+ def into_tasks_and_key(code: str, to_ret: dict[str, Any]) -> str:
+ tasks = to_ret['_tasks']
+ cmds_str, key = code.split(':', maxsplit=1) if code else ('', '')
+ for command in [t for t in cmds_str.split(',') if t]:
+ tasks[command] = tasks.get(command, []) + [key]
+ return key
+
for ex in [ex for ex in MSG_EXPECTATIONS if ex.verb == msg.verb]:
- tasks: dict[str, list[str]] = {}
- to_return: dict[str, Any] = {'verb': ex.verb, '_tasks': tasks}
+ to_return: dict[str, Any] = {'verb': ex.verb, '_tasks': {}}
ex_tok_fields = tuple([ex.source] + list(ex.params))
msg_params: list[str | list[str]]
if ex.idx_into_list < 0:
continue
passing = True
for idx, ex_tok in enumerate(ex_tok_fields):
- ex_tok, key = ((ex_tok[0], ex_tok[1])
- if isinstance(ex_tok, tuple) else (ex_tok, ''))
- tasks_, key = key.split(':', maxsplit=1) if key else ('', '')
- for task in [t for t in tasks_.split(',') if t]:
- tasks[task] = tasks.get(task, []) + [key]
+ ex_tok, code = ((ex_tok[0], ex_tok[1])
+ if isinstance(ex_tok, tuple) else (ex_tok, ''))
+ key = into_tasks_and_key(code, to_return)
to_return[key] = param_match(ex_tok, msg_tok_fields[idx])
if to_return[key] is None:
passing = False
break
if passing:
+ for code in ex.bonus_tasks:
+ into_tasks_and_key(code, to_return)
return to_return
return {}
self._log(f'PLEASE IMPLEMENT HANDLER FOR: {msg.raw}')
return
for task, tok_names in ret['_tasks'].items():
- task_verb, target_name = task.split('_')
+ task_verb, target_name = task.split('_', maxsplit=1)
if task_verb == 'set' and target_name == 'user':
for tok_name in tok_names:
- self._db.user_id(ret[tok_name])
+ self.db.user_id(ret[tok_name])
continue
path_toks = target_name.split('.')
- assert path_toks[0] == 'db'
- parent = self._db
- for step in path_toks[1:]:
- parent = (parent[ret[step] if step.isupper() else step]
- if isinstance(parent, Dict)
- else getattr(parent, step))
+ node = self
+ for step in [t for t in path_toks if t]:
+ node = (node[ret[step] if step.isupper() else step]
+ if isinstance(node, Dict)
+ else getattr(node, step))
for tok_name in sorted(tok_names):
+ # FIXME: alphabetical sorting of tok_names merely hack to parse
+ # TOPIC messages, to ensure any setattr_topic:what be processed
+ # before any setattr_topic:who, i.e. properly completing .topic
if task_verb == 'setattr':
- # NB: alphabetical sorting of tok_names purely as a hack
- # to ensure any setattr_topic:what be processed before any
- # setattr_topic:who, i.e. for parsing TOPIC message, FIXME
- setattr(parent, tok_name, ret[tok_name])
+ setattr(node, tok_name, ret[tok_name])
+ elif task_verb == 'do':
+ getattr(node, tok_name)()
if ret['verb'] == '005': # RPL_ISUPPORT
for item in ret['isupport']:
if item[0] == '-':
- del self._db.isupport[item[1:]]
+ del self.db.isupport[item[1:]]
else:
key, data = _Dict.key_val_from_eq_str(item)
- self._db.isupport[key] = data
+ self.db.isupport[key] = data
elif ret['verb'] == '353': # RPL_NAMREPLY
- self._db.channels[ret['channel']].add_from_namreply(ret['names'])
- elif ret['verb'] == '366': # RPL_ENDOFNAMES
- self._db.channels[ret['channel']].user_ids.complete()
+ self.db.channels[ret['channel']].add_from_namreply(ret['names'])
elif ret['verb'] == '372': # RPL_MOTD
- self._db.motd.append(ret['line'])
- elif ret['verb'] == '376': # RPL_ENDOFMOTD
- self._db.motd.complete()
+ self.db.motd.append(ret['line'])
elif ret['verb'] == '401': # ERR_NOSUCHNICK
self._log(f'{ret["target"]} not online', scope=LogScope.CHAT,
target=ret['target'], alert=True)
self._log('nickname already in use, trying increment', alert=True)
self.send(IrcMessage(
'NICK', (_NickUserHost(nick=ret['used']).incremented,)))
- elif ret['verb'] in {'903', '904'}: # RPL_SASLSUCCESS, ERR_SASLFAIL
- self._caps.end_negotiation()
elif ret['verb'] == 'AUTHENTICATE':
- auth = b64encode((self._db.nick_wanted + '\0'
- + self._db.nick_wanted + '\0'
- + self._db.password
+ auth = b64encode((self.db.nick_wanted + '\0'
+ + self.db.nick_wanted + '\0'
+ + self.db.password
).encode('utf-8')).decode('utf-8')
self.send(IrcMessage('AUTHENTICATE', (auth,)))
elif ret['verb'] == 'CAP':
- if (self._caps.process_msg(verb=ret['subverb'], items=ret['items'],
- complete='tbc' not in ret)
- and 'sasl' in self._db.caps.keys()
- and 'PLAIN' in self._db.caps['sasl'].data.split(',')):
- if self._db.password:
- self._db.sasl_auth_state = 'attempting'
+ if (self.caps.process_msg(verb=ret['subverb'], items=ret['items'],
+ complete='tbc' not in ret)
+ and 'sasl' in self.db.caps.keys()
+ and 'PLAIN' in self.db.caps['sasl'].data.split(',')):
+ if self.db.password:
+ self.db.sasl_auth_state = 'attempting'
self.send(IrcMessage('AUTHENTICATE', ('PLAIN',)))
else:
- self._caps.end_negotiation()
- elif ret['verb'] == 'ERROR':
- self.close()
+ self.caps.end_negotiation()
elif ret['verb'] == 'JOIN'\
- and ret['joiner'].nick != self._db.users['me'].nick:
- self._db.channels[ret['channel']].append_nick(ret['joiner'])
+ and ret['joiner'].nick != self.db.users['me'].nick:
+ self.db.channels[ret['channel']].append_nick(ret['joiner'])
elif ret['verb'] == 'NICK':
- user_id = self._db.user_id(ret['named'])
- self._db.users[user_id].nick = ret['nick']
+ user_id = self.db.user_id(ret['named'])
+ self.db.users[user_id].nick = ret['nick']
if user_id == 'me':
- self._db.nick_wanted = ret['nick']
+ self.db.nick_wanted = ret['nick']
elif ret['verb'] in {'NOTICE', 'PRIVMSG'}:
kw: dict[str, bool | str | LogScope] = {
'as_notice': msg.verb == 'NOTICE'}
else ret['channel'])}
self._log(ret['message'], out=False, **kw)
elif ret['verb'] == 'PART':
- self._db.channels[ret['channel']].remove_nick(ret['parter'])
+ self.db.channels[ret['channel']].remove_nick(ret['parter'])
self._log(f'{ret["parter"]} parts: {ret["message"]}',
LogScope.CHAT, target=ret['channel'])
- if ret['parter'] == self._db.users['me']:
- del self._db.channels[ret['channel']]
+ if ret['parter'] == self.db.users['me']:
+ del self.db.channels[ret['channel']]
elif ret['verb'] == 'PING':
self.send(IrcMessage(verb='PONG', params=(ret['reply'],)))
elif ret['verb'] == 'QUIT':
- for ch_name, ch in self._db.chans_of_user(ret['quitter']).items():
+ for ch_name, ch in self.db.chans_of_user(ret['quitter']).items():
ch.remove_nick(ret['quitter'])
self._log(f'{ret["quitter"]} quits: {ret["message"]}',
LogScope.CHAT, target=ch_name)
verb: str
params: tuple[_MsgTokGuide, ...] = tuple()
idx_into_list: int = -1
+ bonus_tasks: tuple[str, ...] = tuple()
MSG_EXPECTATIONS: list[_MsgParseExpectation] = []
_MsgParseExpectation(MsgTok.SERVER,
'376', # RPL_ENDOFMOTD
((MsgTok.NICKNAME, 'setattr_db.users.me:nick'),
- MsgTok.ANY)), # comment
+ MsgTok.ANY), # comment
+ bonus_tasks=('do_db.motd:complete',)),
_MsgParseExpectation(MsgTok.SERVER,
'396', # RPL_VISIBLEHOST
((MsgTok.NICKNAME, 'setattr_db.users.me:nick'),
_MsgParseExpectation(MsgTok.SERVER,
'903', # RPL_SASLSUCCESS
((MsgTok.NICKNAME, 'setattr_db.users.me:nick'),
- (MsgTok.ANY, 'setattr_db:sasl_auth_state'))),
+ (MsgTok.ANY, 'setattr_db:sasl_auth_state')),
+ bonus_tasks=('do_caps:end_negotiation',)),
_MsgParseExpectation(MsgTok.SERVER,
'904', # ERR_SASLFAIL
((MsgTok.NICKNAME, 'setattr_db.users.me:nick'),
- (MsgTok.ANY, 'setattr_db:sasl_auth_state'))),
+ (MsgTok.ANY, 'setattr_db:sasl_auth_state')),
+ bonus_tasks=('do_caps:end_negotiation',)),
_MsgParseExpectation(MsgTok.NONE,
'AUTHENTICATE',
('+',)),
_MsgParseExpectation(MsgTok.SERVER,
'366', # RPL_ENDOFNAMES
((MsgTok.NICKNAME, 'setattr_db.users.me:nick'),
- (MsgTok.CHANNEL, ':channel'),
- MsgTok.ANY)), # comment
+ (MsgTok.CHANNEL, ':CHAN'),
+ MsgTok.ANY), # comment
+ bonus_tasks=('do_db.channels.CHAN.user_ids:complete',)),
_MsgParseExpectation((MsgTok.NICK_USER_HOST, ':joiner'),
'JOIN',
((MsgTok.CHANNEL, ':channel'),)),
MSG_EXPECTATIONS += [
_MsgParseExpectation(MsgTok.NONE,
'ERROR',
- ((MsgTok.ANY, 'setattr_db:connection_state'),)),
+ ((MsgTok.ANY, 'setattr_db:connection_state'),),
+ bonus_tasks=('do_:close',)),
_MsgParseExpectation((MsgTok.NICK_USER_HOST, 'setattr_db.users.me:nickuserhost'),
'MODE',
((MsgTok.NICKNAME, 'setattr_db.users.me:nick'),