from abc import ABC, abstractmethod
from base64 import b64encode
from dataclasses import dataclass, InitVar
+from enum import Enum, auto
from getpass import getuser
from threading import Thread
from typing import Callable, Optional
IrcMessage, PORT_SSL)
ClientsDb = dict[str, 'Client']
-STREAM_ALL = '*'
-STREAM_SAME = '='
-STREAM_PREFIX_META = ':'
-STREAM_PREFIXES = ''.join([STREAM_ALL, STREAM_SAME, STREAM_PREFIX_META])
-STREAM_SERVER = f'{STREAM_PREFIX_META}server'
-STREAM_RAW = f'{STREAM_PREFIX_META}raw'
-
_NAMES_DESIRED_SERVER_CAPS = ('server-time', 'account-tag', 'sasl')
+class LogScope(Enum):
+ 'Where log messages should go.'
+ ALL = auto()
+ SERVER = auto()
+ RAW = auto()
+ CHAT = auto()
+ SAME = auto()
+
+
@dataclass
class ClientIdMixin:
'Collects a Client\'s ID at .client_id.'
assert self.conn is not None
self._log('connected to server (SSL: '
f'{"yes" if self.conn.ssl else "no"})',
- stream=STREAM_ALL)
+ scope=LogScope.ALL)
self._caps.challenge('LS', '302')
self.send(IrcMessage(verb='USER',
params=(getuser(), '0', '*',
self.send(IrcMessage(verb='NICK', params=(self.conn_setup.nickname,)))
@abstractmethod
- def _log(self, msg: str, stream: str = STREAM_SERVER, **kwargs) -> None:
+ def _log(self, msg: str, scope=LogScope.SERVER, **kwargs) -> None:
pass
def send(self,
msg: IrcMessage,
to_log: str = '',
- stream_for_log: str = STREAM_SERVER
+ log_target: LogScope | str = LogScope.SERVER
) -> None:
'Send msg over socket, on success log .raw, and optionally set to_log.'
if not self.conn:
self._log('cannot send, connection seems closed', alert=True,
- stream=STREAM_SAME)
+ scope=LogScope.SAME)
return
self.conn.send(msg)
if to_log:
- self._log(to_log, stream=stream_for_log)
- self._log(msg.raw, stream=STREAM_RAW, out=True)
+ if isinstance(log_target, str):
+ self._log(to_log, scope=LogScope.CHAT, nickname=log_target,
+ out=True)
+ else:
+ self._log(to_log, scope=log_target)
+ self._log(msg.raw, scope=LogScope.RAW, out=True)
def update_login(self, nick_confirmed: bool, nickname: str = '') -> None:
'''Manage conn_setup.nickname, .nick_confirmed.
def close(self) -> None:
'Close both recv Loop and socket.'
- self._log(msg='disconnecting from server …', stream=STREAM_ALL)
+ self._log(msg='disconnecting from server …', scope=LogScope.ALL)
self._caps.clear()
if self.conn:
self.conn.close()
def handle_msg(self, msg: IrcMessage) -> None:
'Log msg.raw, then process incoming msg into appropriate client steps.'
- self._log(msg.raw, stream=STREAM_RAW, out=False)
+ self._log(msg.raw, scope=LogScope.RAW, out=False)
match msg.verb:
case 'PING':
self.send(IrcMessage(verb='PONG', params=(msg.params[0],)))
case '001' | 'NICK':
self.update_login(nickname=msg.params[0], nick_confirmed=True)
case 'PRIVMSG':
- self._log(msg.params[-1], out=False,
- stream=msg.source.split('!')[0])
+ self._log(msg.params[-1], scope=LogScope.CHAT, out=False,
+ nickname=msg.source.split('!')[0])
case 'CAP':
if (result := self._caps.process_msg(msg.params[1:])):
if isinstance(result, str):
from ircplom.tui_base import (BaseTui, PromptWidget, TuiEvent, Window,
CMD_SHORTCUTS)
from ircplom.irc_conn import IrcMessage
-from ircplom.client import (
- STREAM_ALL, STREAM_PREFIX_META, STREAM_PREFIXES, STREAM_RAW, STREAM_SAME,
- STREAM_SERVER, IrcConnSetup, Client, ClientQueueMixin, NewClientEvent)
+from ircplom.client import (IrcConnSetup, Client, ClientQueueMixin, LogScope,
+ NewClientEvent)
CMD_SHORTCUTS['disconnect'] = 'window.disconnect'
CMD_SHORTCUTS['nick'] = 'window.nick'
class _ClientWindow(Window, ClientQueueMixin):
- def __init__(self, stream: str, log: Callable, **kwargs) -> None:
- self.stream = stream
+ def __init__(self, scope: LogScope, log: Callable, **kwargs) -> None:
+ self.scope = scope
self._log = log
super().__init__(**kwargs)
@property
def _title(self) -> str:
- return f'{self.client_id} {self.stream}'
+ return f'{self.client_id} '\
+ + f':{"SERVER" if self.scope == LogScope.SERVER else "RAW"}'
def _send_msg(self, verb: str, params: tuple[str, ...], **kwargs) -> None:
self._client_trigger('send', msg=IrcMessage(verb=verb, params=params),
def cmd__disconnect(self, quit_msg: str = 'ircplom says bye') -> None:
'Send QUIT command to server.'
- self._log('requesting disconnect …', stream=STREAM_SERVER)
+ self._log('requesting disconnect …', scope=LogScope.SERVER)
self._send_msg('QUIT', (quit_msg,))
def cmd__reconnect(self) -> None:
def cmd__privmsg(self, target: str, msg: str) -> None:
'Send chat message msg to target.'
- self._send_msg('PRIVMSG', (target, msg), stream_for_log=target,
- to_log=msg)
+ self._send_msg('PRIVMSG', (target, msg), log_target=target, to_log=msg)
class _PrivmsgPromptWidget(PromptWidget):
class _PrivmsgWindow(_ClientWindow):
prompt: _PrivmsgPromptWidget
+ @property
+ def _title(self) -> str:
+ return f'{self.client_id} {self.nickname}'
+
+ def __init__(self, nickname: str, **kwargs) -> None:
+ self.nickname = nickname
+ super().__init__(**kwargs)
+
def cmd__chat(self, msg: str) -> None:
- 'PRIVMSG to target identified by .stream.'
- self.cmd__privmsg(self.stream, msg)
+ 'PRIVMSG to target identified by .nickname.'
+ self.cmd__privmsg(target=self.nickname, msg=msg)
class _ClientWindowsManager:
client_id=client_id, **kw)
self.windows: list[_ClientWindow] = []
self._tui_new_window = new_window
- for stream in (STREAM_SERVER, STREAM_RAW):
- self._new_window(stream)
+ for scope in (LogScope.SERVER, LogScope.RAW):
+ self._new_win(scope)
def _prompt_update(self, win: _PrivmsgWindow) -> None:
to_set = win.prompt.prefix_update_keys()
win.prompt.update_prefix(**{k: getattr(self, k) for k in to_set})
- def _new_window(self, stream: str) -> _ClientWindow:
- win_class = (_PrivmsgWindow if stream[0:1] != STREAM_PREFIX_META
- else _ClientWindow)
- win = self._tui_new_window(win_class, stream=stream, log=self.log)
+ def _new_win(self, scope: LogScope, nickname: str = '') -> _ClientWindow:
+ kwargs = {'scope': scope, 'log': self.log, 'win_cls': _ClientWindow}
+ if scope == LogScope.CHAT:
+ kwargs['win_cls'] = _PrivmsgWindow
+ kwargs['nickname'] = nickname
+ win = self._tui_new_window(**kwargs)
if isinstance(win, _PrivmsgWindow):
self._prompt_update(win)
self.windows += [win]
return win
- def window(self, stream: str) -> _ClientWindow:
- 'Return client window of stream.'
- for win in [w for w in self.windows if w.stream == stream]:
+ def window(self, scope: LogScope, nickname: str = '') -> _ClientWindow:
+ 'Return client window of scope.'
+ for win in [w for w in self.windows if w.scope == scope]:
+ if scope == LogScope.CHAT:
+ if isinstance(win, _PrivmsgWindow)\
+ and win.nickname == nickname:
+ return win
+ continue
return win
- return self._new_window(stream)
+ return self._new_win(scope=scope, nickname=nickname)
- def log(self, msg: str, stream: str, **kwargs) -> None:
- 'From parsing stream, kwargs build prefix before sending to logger.'
+ def log(self, msg: str, scope: LogScope, **kwargs) -> None:
+ 'From parsing scope, kwargs, build prefix before sending to logger.'
prefix = ''
- is_chat = stream[0] not in STREAM_PREFIXES
if 'out' in kwargs:
prefix += _LOG_PREFIX_OUT if kwargs['out'] else _LOG_PREFIX_IN
- elif is_chat:
- prefix += _LOG_PREFIX_OUT
+ if scope == LogScope.CHAT:
+ nickname = self.nickname if kwargs['out'] else kwargs['nickname']
+ prefix += f' [{nickname}]'
else:
prefix += _LOG_PREFIX_SERVER
- if is_chat:
- prefix += f' [{stream if "out" in kwargs else self.nickname}]'
- self._tui_log(msg, stream=stream, prefix=prefix, **kwargs)
+ self._tui_log(msg, scope=scope, prefix=prefix, **kwargs)
def update(self, **kwargs) -> bool:
'Apply settings in kwargs, follow representation update triggers.'
if new_value != old_value:
to_change[key] = (old_value, new_value)
for key, vals in to_change.items():
- self.log(f'changing {key}: [{vals[0]}] -> [{vals[1]}]',
- stream=STREAM_ALL if key == 'nickname' else STREAM_SERVER)
+ self.log(
+ f'changing {key}: [{vals[0]}] -> [{vals[1]}]',
+ scope=LogScope.ALL if key == 'nickname' else LogScope.SERVER)
setattr(self, key, vals[1])
tainteds = False
if _PrivmsgPromptWidget.prefix_update_keys() | set(to_change):
self._client_mngrs: dict[str, _ClientWindowsManager] = {}
def _log_target_wins(self, **kwargs) -> Sequence[Window]:
- stream = kwargs.get('stream', STREAM_SAME)
- if stream != STREAM_SAME and 'client_id' in kwargs:
+ target = kwargs.get('scope', LogScope.SAME)
+ if target != LogScope.SAME:
m = self._client_mngrs[kwargs['client_id']]
- if stream == STREAM_SERVER:
- return [m.window(STREAM_SERVER), m.window(STREAM_RAW)]
- return m.windows if stream == STREAM_ALL else [m.window(stream)]
+ if target == LogScope.ALL:
+ return m.windows
+ if target == LogScope.SERVER:
+ return [m.window(LogScope.SERVER), m.window(LogScope.RAW)]
+ if target == LogScope.CHAT:
+ return [m.window(LogScope.CHAT, nickname=kwargs['nickname'])]
+ return [m.window(target)]
return super()._log_target_wins(**kwargs)
def for_client_do(self, client_id: str, todo: str, **kwargs) -> None:
if client_id not in self._client_mngrs:
self._client_mngrs[client_id] = _ClientWindowsManager(
client_id=client_id, tui_log=self._log,
- new_window=lambda cls, **kw: self._new_window(
- cls, _q_out=self._q_out, client_id=client_id, **kw))
+ new_window=lambda win_cls, **kw: self._new_window(
+ win_cls, _q_out=self._q_out, client_id=client_id, **kw))
if getattr(self._client_mngrs[client_id], todo)(**kwargs) is not False:
self.redraw_affected()
'Catch /reconnect, only initiate if not connected, else complain back.'
if self.conn:
self._log('not re-connecting since already connected',
- stream=STREAM_SAME, alert=True)
+ scope=LogScope.SAME, alert=True)
return
self._start_connecting()
- def _log(self, msg: str, stream: str = STREAM_SERVER, **kwargs) -> None:
+ def _log(self, msg: str, scope=LogScope.SERVER, **kwargs) -> None:
to_log = []
if msg:
to_log += [msg]
for k, v in dc_asdict(kwargs['conn_setup']).items():
to_log += [f' {k}: [{v}]']
for item in to_log:
- self._client_tui_trigger('log', stream=stream, msg=item, **kwargs)
+ self._client_tui_trigger('log', scope=scope, msg=item, **kwargs)
def update_login(self, nick_confirmed: bool, nickname: str = '') -> None:
super().update_login(nick_confirmed, nickname)