class _CapsManager:
def __init__(self,
- sender: Callable[[IrcMessage], None],
+ sender: Callable,
caps_dict: _UpdatingDict[_UpdatingServerCapability]
) -> None:
self._dict = caps_dict
- self._send = lambda *params: sender(IrcMessage('CAP', params=params))
+ self._send = sender
self.clear()
def clear(self) -> None:
self.db.users['me'].nickuserhost = NickUserHost(user=getuser())
self.db.connection_state = 'connected'
self.caps.start_negotation()
- self.send(IrcMessage(verb='USER', params=(
- self.db.users['me'].user.lstrip('~'),
- '0', '*', self.db.realname)))
- self.send(IrcMessage(verb='NICK', params=(self.db.nick_wanted,)))
+ self.send('USER', self.db.users['me'].user.lstrip('~'),
+ '0', '*', self.db.realname)
+ self.send('NICK', self.db.nick_wanted,)
def close(self) -> None:
'Close connection and wipe memory of its states.'
def _log(self, msg: str, scope=LogScope.SERVER, **kwargs) -> None:
pass
- def send(self,
- msg: IrcMessage,
- to_log: str = '',
- log_target: LogScope | str = LogScope.SERVER
- ) -> None:
- 'Send msg over socket, on success log .raw, and optionally set to_log.'
- if not self.conn:
+ def send(self, verb: str, *args) -> None:
+ 'Send msg over socket, on success log .raw.'
+ if self.conn:
+ msg = IrcMessage(verb, args)
+ self.conn.send(msg)
+ self._log(msg.raw, scope=LogScope.RAW, out=True)
+ else:
self._log('cannot send, connection seems closed', alert=True,
scope=LogScope.SAME)
- return
- self.conn.send(msg)
- if to_log:
- if isinstance(log_target, str):
- self._log(to_log, scope=LogScope.CHAT, target=log_target,
- out=True)
- else:
- self._log(to_log, scope=log_target)
- self._log(msg.raw, scope=LogScope.RAW, out=True)
def handle_msg(self, msg: IrcMessage) -> None:
'Log msg.raw, then process incoming msg into appropriate client steps.'
self._log(alert, alert=True)
elif ret['_verb'] == '433': # ERR_NICKNAMEINUSE
self._log('nickname already in use, trying increment', alert=True)
- self.send(IrcMessage(
- 'NICK', (_NickUserHost(nick=ret['used']).incremented,)))
+ self.send('NICK', _NickUserHost(nick=ret['used']).incremented)
elif ret['_verb'] == 'AUTHENTICATE':
auth = b64encode((self.db.nick_wanted + '\0'
+ self.db.nick_wanted + '\0'
+ self.db.password
).encode('utf-8')).decode('utf-8')
- self.send(IrcMessage('AUTHENTICATE', (auth,)))
+ self.send('AUTHENTICATE', auth)
elif ret['_verb'] == 'CAP':
if (self.caps.process_msg(verb=ret['subverb'], items=ret['items'],
complete='tbc' not in ret)
and 'PLAIN' in self.db.caps['sasl'].data.split(',')):
if self.db.password:
self.db.sasl_auth_state = 'attempting'
- self.send(IrcMessage('AUTHENTICATE', ('PLAIN',)))
+ self.send('AUTHENTICATE', 'PLAIN')
else:
self.caps.end_negotiation()
elif ret['_verb'] == 'JOIN' and ret['joiner'] != self.db.users['me']:
del self.db.channels[ret['channel']]
self.db.users.purge()
elif ret['_verb'] == 'PING':
- self.send(IrcMessage(verb='PONG', params=(ret['reply'],)))
+ self.send('PONG', ret['reply'])
elif ret['_verb'] == 'QUIT':
ret['quitter'].quit(ret['message'])
# ourselves
from ircplom.tui_base import (BaseTui, PromptWidget, TuiEvent, Window,
CMD_SHORTCUTS)
-from ircplom.irc_conn import IrcMessage
from ircplom.client import (
AutoAttrMixin, Client, ClientQueueMixin, Dict, DictItem, IrcConnSetup,
LogScope, NewClientEvent, NickUserHost, ServerCapability,
self._title = f'{self.client_id} '\
+ f':{"SERVER" if self.scope == LogScope.SERVER else "RAW"}'
- def _send_msg(self, verb: str, params: tuple[str, ...], **kwargs) -> None:
- self._client_trigger('send', msg=IrcMessage(verb=verb, params=params),
- **kwargs)
+ def _send_msg(self, verb: str, params: tuple[str, ...]) -> None:
+ self._client_trigger('send_w_params_tuple', verb=verb, params=params)
def cmd__disconnect(self, quit_msg: str = 'ircplom says bye') -> None:
'Send QUIT command to server.'
return None
assert update.path[-1] == 'user_ids'
assert isinstance(update.value, tuple)
- d = {'nuhs:joining': tuple(id_ for id_ in update.value
+ d = {'NUHS:joining': tuple(id_ for id_ in update.value
if id_ not in self.user_ids)
- } if self.user_ids else {'nicks:residents': tuple(update.value)}
+ } if self.user_ids else {'NICKS:residents': tuple(update.value)}
if super().set_and_check_for_change(update):
return scope, d
return None
toks = verb.split(':', maxsplit=1)
verb = toks[-1]
transform = toks[0] if len(toks) > 1 else ''
- if transform in {'nicks', 'nuhs'}:
+ if transform in {'NICKS', 'NUHS'}:
nuhs = (self.db.users[id_] for id_ in item)
item = ', '.join([
(str(nuh) if transform == 'nuhs' else nuh.nick)
self._put(TuiEvent.affector('for_client_do').kw(
client_id=self.client_id, todo=todo, **kwargs))
+ def send_w_params_tuple(self, verb: str, params: tuple[str, ...]) -> None:
+ 'Helper for ClientWindow to trigger .send, for it can only do kwargs.'
+ self.send(verb, *params)
+
def privmsg(self, target: str, msg: str) -> None:
'Catch /privmsg, only allow for channel if in channel, else complain.'
if self.db.is_chan_name(target)\
self._log('not sending, since not in channel',
scope=LogScope.SAME, alert=True)
return
- self.send(IrcMessage('PRIVMSG', (target, msg)),
- log_target=target, to_log=msg)
+ self.send('PRIVMSG', target, msg)
+ self._log(msg, scope=LogScope.CHAT, target=target, out=True)
def reconnect(self) -> None:
'Catch /reconnect, only initiate if not connected, else complain back.'