from enum import Enum, auto
from getpass import getuser
from threading import Thread
-from typing import Callable, Optional
+from typing import Any, Callable, NamedTuple, Optional
# ourselves
from ircplom.events import AffectiveEvent, ExceptionEvent, QueueMixin
from ircplom.irc_conn import (BaseIrcConnection, IrcConnAbortException,
class _CapsManager:
- def __init__(self, sender: Callable[[IrcMessage], None]) -> None:
+ def __init__(self, sender: Callable[[IrcMessage], None], db: 'ClientDb',
+ ) -> None:
+ self._db = db
+ self._db.set('caps_LS', [])
+ self._db.set('caps_LIST', [])
self._send = sender
- self._challenges: dict[str, bool] = {}
- self.dict: dict[str, _ServerCapability] = {}
+ self._sent_challenges: list[str] = []
self.sasl_wait = False
def clear(self) -> None:
'Reset all negotiation knowledge to zero.'
self.sasl_wait = False
- self._challenges.clear()
- self.dict.clear()
-
- def process_msg(self, params: tuple[str, ...]) -> bool | str:
- 'Parse CAP params to negot. steps, DB inputs; return error or success.'
- if params[0] in {'NEW', 'DEL'}: # don't bother fiddling, just re-do
- self.clear()
- self.challenge('LS', '302')
- return False
- if self._challenged('END') or self.sasl_wait:
- return f'ignoring post-END CAP message not NEW, DEL: {params}'
+ self._sent_challenges.clear()
+ self._db.set('caps_LS', [], confirm=False)
+ self._db.set('caps_LIST', [], confirm=False)
+
+ @property
+ def asdict(self) -> Optional[dict[str, _ServerCapability]]:
+ 'Return acquired knowledge in optimized format.'
+ if (self._db.caps_LS is None) or (self._db.caps_LIST is None):
+ return None
+ d: dict[str, _ServerCapability] = {}
+ for cap_name, data in self._availables.items():
+ d[cap_name] = _ServerCapability(
+ enabled=cap_name in self._db.caps_LIST,
+ data=data)
+ return d
+
+ def _unconfirmed_caps_list(self, key) -> list[str]:
+ ret = self._db.get_force(key)[0]
+ assert isinstance(ret, list)
+ return ret
+
+ @property
+ def _availables(self) -> dict[str, str]:
+ avs: dict[str, str] = {}
+ for item in self._unconfirmed_caps_list('caps_LS'):
+ toks = item.split('=', maxsplit=1)
+ avs[toks[0]] = '' if len(toks) == 1 else toks[1]
+ return avs
+
+ def process_msg(self, params: tuple[str, ...]) -> bool:
+ 'Parse CAP params to negot. steps, DB inputs; return if successful.'
match params[0]:
- case 'LS' | 'LIST':
- self._collect_caps(params)
+ case 'NEW':
+ for param in params[-1].split():
+ self._db.caps_LS.append(param)
+ case 'DEL':
+ for param in params[-1].split():
+ del self._db.caps_LS[param]
case 'ACK' | 'NAK':
- for cap_name in params[-1].split():
- self._challenge_set(f'REQ:{cap_name}', done=True)
- self.dict[cap_name].enabled = params[0] == 'ACK'
- if self._challenge_met('LIST'):
+ for name in params[-1].split():
+ if params[0] == 'ACK':
+ self._unconfirmed_caps_list('caps_LIST').append(name)
+ case 'LS' | 'LIST':
+ key = f'caps_{params[0]}'
+ caps_list, has_finished = self._db.get_force(key)
+ assert isinstance(caps_list, list)
+ if has_finished:
+ caps_list.clear()
+ self._db.set(key, caps_list, confirm=False)
+ for item in params[-1].strip().split():
+ caps_list.append(item)
+ if params[1] != '*':
+ self._db.set(key, caps_list, confirm=True)
+ if self.asdict is not None:
self.sasl_wait = (
- 'sasl' in self.dict
- and 'PLAIN' in self.dict['sasl'].data.split(','))
+ 'sasl' in self.asdict
+ and 'PLAIN' in self.asdict['sasl'].data.split(','))
if not self.sasl_wait:
self.challenge('END')
return True
- if self._challenge_met('LS'):
- for cap_name in _NAMES_DESIRED_SERVER_CAPS:
- if (cap_name in self.dict
- and (not self.dict[cap_name].enabled)):
- self.challenge('REQ', cap_name, key_fused=True)
+ if self._db.caps_LS is not None:
+ for cap_name in [n for n in _NAMES_DESIRED_SERVER_CAPS
+ if n in self._availables]:
+ self.challenge('REQ', cap_name)
self.challenge('LIST')
return False
- def challenge(self, *params, key_fused: bool = False) -> None:
- 'Run CAP command with params, handle cap neg. state.'
- challenge_key = ':'.join(params) if key_fused else params[0]
- if self._challenged(challenge_key):
- return
- self._send(IrcMessage(verb='CAP', params=params))
- self._challenge_set(challenge_key)
-
- def _challenge_met(self, step: str) -> bool:
- return self._challenges.get(step, False)
-
- def _challenged(self, step: str) -> bool:
- return step in self._challenges
-
- def _challenge_set(self, step: str, done: bool = False) -> None:
- self._challenges[step] = done
-
- def _collect_caps(self, params: tuple[str, ...]) -> None:
- verb = params[0]
- items = params[-1].strip().split()
- is_final_line = params[1] != '*'
- if self._challenge_met(verb):
- if verb == 'LS':
- self.dict.clear()
- else:
- for cap in self.dict.values():
- cap.enabled = False
- self._challenge_set(verb)
- for item in items:
- if verb == 'LS':
- splitted = item.split('=', maxsplit=1)
- self.dict[splitted[0]] = _ServerCapability(
- enabled=False, data=''.join(splitted[1:]))
- else:
- self.dict[item].enabled = True
- if is_final_line:
- self._challenge_set(verb, done=True)
+ def challenge(self, *params) -> None:
+ 'Ensure CAP message of params has been sent, store sended-ness.'
+ fused = ' '.join(params)
+ if fused not in self._sent_challenges:
+ self._send(IrcMessage('CAP', params=params))
+ self._sent_challenges.append(' '.join(params))
-@dataclass
-class IrcConnSetup:
+class IrcConnSetup(NamedTuple):
'All we need to know to set up a new Client connection.'
hostname: str
port: int
password: str
+class ClientDb:
+ 'Optimized for dealing with variable confirmation of values.'
+
+ def __init__(self) -> None:
+ self._dict: dict[str, int | str | list[str]] = {}
+
+ def __getattr__(self, key: str) -> Any:
+ if key[:1] != '_' and key in self._dict:
+ return self._dict[key]
+ return None
+
+ def _unconf_key(self, key) -> str:
+ return f'_{key}'
+
+ def set(self, key: str, value: int | str | list[str], confirm=False
+ ) -> tuple[bool, bool]:
+ 'Ensures setting, returns if changed value or confirmation.'
+ retrieval = self.get_force(key)
+ confirm_changed = confirm != retrieval[1]
+ value_changed = retrieval[0] != value
+ if confirm_changed and retrieval[0] is not None:
+ del self._dict[self._unconf_key(key) if confirm else key]
+ if value_changed or confirm_changed:
+ self._dict[key if confirm else self._unconf_key(key)] = value
+ return (value_changed, confirm_changed)
+
+ def get_force(self, key: str) -> tuple[Optional[int | str | list[str]],
+ bool]:
+ 'Get even if only stored unconfirmed, tell if confirmed was found..'
+ conf = key in self._dict
+ find = self._dict.get(key if conf else self._unconf_key(key), None)
+ return (find, conf)
+
+ @property
+ def conn_setup(self) -> IrcConnSetup:
+ 'Constructed out of stored entries *including* unconfirmed ones.'
+ kwargs: dict[str, Any] = {}
+ for field_name in IrcConnSetup._fields:
+ kwargs[field_name] = self.get_force(field_name)[0]
+ return IrcConnSetup(**kwargs)
+
+
class Client(ABC, ClientQueueMixin):
'Abstracts socket connection, loop over it, and handling messages from it.'
nick_confirmed: bool = False
conn: Optional[_IrcConnection] = None
def __init__(self, conn_setup: IrcConnSetup, **kwargs) -> None:
- self._caps = _CapsManager(self.send)
- self.conn_setup = conn_setup
- if self.conn_setup.port <= 0:
- self.conn_setup.port = PORT_SSL
- self.client_id = self.conn_setup.hostname
+ self._db = ClientDb()
+ for k in conn_setup._fields:
+ self._db.set(k, getattr(conn_setup, k), confirm=k != 'nickname')
+ if conn_setup.port <= 0:
+ self._db.set('port', PORT_SSL, confirm=True)
+ self.client_id = conn_setup.hostname
+ self._caps = _CapsManager(self.send, self._db)
super().__init__(client_id=self.client_id, **kwargs)
- self.update_login(nick_confirmed=False,
- nickname=self.conn_setup.nickname)
self._start_connecting()
def _start_connecting(self) -> None:
try:
if self.conn:
raise IrcConnAbortException('already connected')
- self.conn = _IrcConnection(hostname=self.conn_setup.hostname,
- port=self.conn_setup.port,
+ self.conn = _IrcConnection(hostname=self._db.hostname,
+ port=self._db.port,
_q_out=self._q_out,
client_id=self.client_id)
self._client_trigger('_on_connect')
except Exception as e: # pylint: disable=broad-exception-caught
self._put(ExceptionEvent(e))
- self._log('connecting …', conn_setup=self.conn_setup)
+ self._log('connecting …', conn_setup=self._db.conn_setup)
Thread(target=connect, daemon=True, args=(self,)).start()
def _on_connect(self) -> None:
f'{"yes" if self.conn.ssl else "no"})',
scope=LogScope.ALL)
self._caps.challenge('LS', '302')
- self.send(IrcMessage(verb='USER',
- params=(getuser(), '0', '*',
- self.conn_setup.realname)))
- self.send(IrcMessage(verb='NICK', params=(self.conn_setup.nickname,)))
+ conn_setup = self._db.conn_setup # for type-checks and to include
+ self.send(IrcMessage( # … unconfirmed .nickname
+ verb='USER', params=(getuser(), '0', '*', conn_setup.realname)))
+ self.send(IrcMessage(verb='NICK', params=(conn_setup.nickname,)))
@abstractmethod
def _log(self, msg: str, scope=LogScope.SERVER, **kwargs) -> None:
self._log(to_log, scope=log_target)
self._log(msg.raw, scope=LogScope.RAW, out=True)
- def update_login(self, nick_confirmed: bool, nickname: str = '') -> None:
- '''Manage conn_setup.nickname, .nick_confirmed.
-
- (Useful for subclass extension.)
- '''
- if nickname:
- self.conn_setup.nickname = nickname
- self.nick_confirmed = nick_confirmed
+ def _update_db(self, key: str, value: int | str, confirm: bool) -> None:
+ 'Wrap ._db.set into something accessible to subclass extension.'
+ self._db.set(key, value, confirm)
def close(self) -> None:
'Close both recv Loop and socket.'
if self.conn:
self.conn.close()
self.conn = None
- self.update_login(nick_confirmed=False)
+ nick_key = 'nickname'
+ nickname = self._db.get_force('nickname')[0]
+ assert isinstance(nickname, str)
+ self._update_db(nick_key, value=nickname, confirm=False)
def on_handled_loop_exception(self, e: IrcConnAbortException) -> None:
'Gracefully handle broken connection.'
case 'ERROR':
self.close()
case '001' | 'NICK':
- self.update_login(nickname=msg.params[0], nick_confirmed=True)
+ self._update_db('nickname', value=msg.params[0], confirm=True)
case 'PRIVMSG':
self._log(msg.params[-1], scope=LogScope.CHAT, out=False,
sender=msg.nick_from_source, channel=msg.params[0])
case 'CAP':
- if (result := self._caps.process_msg(msg.params[1:])):
- if isinstance(result, str):
- self._log(result)
- else:
- self._log('', caps=self._caps.dict)
+ if self._caps.process_msg(msg.params[1:]):
+ self._log('', caps=self._caps.asdict)
if self._caps.sasl_wait:
- if self.conn_setup.password:
+ if self._db.password:
self._log('trying to authenticate via SASL/plain')
self.send(IrcMessage('AUTHENTICATE', ('PLAIN',)))
else:
self._caps.challenge('END')
case 'AUTHENTICATE':
if msg.params == ('+',):
- auth = b64encode((self.conn_setup.nickname + '\0' +
- self.conn_setup.nickname + '\0' +
- self.conn_setup.password
+ auth = b64encode((self._db.conn_setup.nickname + '\0' +
+ self._db.conn_setup.nickname + '\0' +
+ self._db.conn_setup.password
).encode('utf-8')).decode('utf-8')
self.send(IrcMessage('AUTHENTICATE', (auth,)))
case '903' | '904':
'TUI adaptions to Client.'
# built-ins
from getpass import getuser
-from dataclasses import dataclass, asdict as dc_asdict
-from inspect import signature
+from dataclasses import dataclass
from typing import Callable, Optional, Sequence
# ourselves
from ircplom.tui_base import (BaseTui, PromptWidget, TuiEvent, Window,
CMD_SHORTCUTS)
from ircplom.irc_conn import IrcMessage
-from ircplom.client import (IrcConnSetup, Client, ClientQueueMixin, LogScope,
- NewClientEvent)
+from ircplom.client import (IrcConnSetup, Client, ClientDb, ClientQueueMixin,
+ LogScope, NewClientEvent)
CMD_SHORTCUTS['disconnect'] = 'window.disconnect'
CMD_SHORTCUTS['join'] = 'window.join'
-CMD_SHORTCUTS['nick'] = 'window.nick'
CMD_SHORTCUTS['part'] = 'window.part'
+CMD_SHORTCUTS['nick'] = 'window.nick'
CMD_SHORTCUTS['privmsg'] = 'window.privmsg'
CMD_SHORTCUTS['reconnect'] = 'window.reconnect'
def prefix(self) -> str:
return (' ' if self._nick_confirmed else '?') + f'[{self._nickname}] '
- def update_prefix(self, nick_confirmed: bool, nickname: str) -> None:
+ def set_prefix_data(self, nick: str, confirmed: bool) -> None:
'Update prompt prefix with nickname data.'
- self._nickname = nickname
- self._nick_confirmed = nick_confirmed
- self._tainted = True
-
- @classmethod
- def prefix_update_keys(cls) -> set:
- 'Set of .update_prefix args, useful for _ClientWindowsManager.'
- return set(list(signature(cls.update_prefix).parameters.keys())[1:])
+ if confirmed != self._nick_confirmed:
+ self._tainted = True
+ self._nick_confirmed = confirmed
+ if nick != self._nickname:
+ self._tainted = True
+ self._nickname = nick
def enter(self) -> str:
to_return = super().enter()
class _PrivmsgWindow(_ClientWindow):
prompt: _PrivmsgPromptWidget
- def __init__(self, chatname: str, **kwargs) -> None:
+ def __init__(self, chatname: str, get_nick_data: Callable, **kwargs
+ ) -> None:
self.chatname = chatname
+ self._get_nick_data = get_nick_data
super().__init__(**kwargs)
self._title = f'{self.client_id} {self.chatname}'
+ self.set_prompt_prefix()
+
+ def set_prompt_prefix(self) -> None:
+ 'Look up relevant DB data to update prompt prefix.'
+ retrieval = self._get_nick_data()
+ assert isinstance(retrieval[0], str)
+ self.prompt.set_prefix_data(*retrieval)
def cmd__chat(self, msg: str) -> None:
'PRIVMSG to target identified by .chatname.'
_tui_new_window: Callable
def __post_init__(self, *_, **__) -> None:
- self.nick_confirmed = False
- self.nickname = '?'
+ self._db = ClientDb()
+ self._db.set('nickname', '?', confirm=False)
self.windows: list[_ClientWindow] = []
for scope in (LogScope.SERVER, LogScope.RAW):
self._new_win(scope)
- def _prompt_update(self, win: _PrivmsgWindow) -> None:
- to_set = win.prompt.prefix_update_keys()
- win.prompt.update_prefix(**{k: getattr(self, k) for k in to_set})
-
def _new_win(self, scope: LogScope, chatname: str = '') -> _ClientWindow:
kwargs = {'scope': scope, 'log': self.log, 'win_cls': _ClientWindow}
if scope == LogScope.CHAT:
kwargs['win_cls'] = _PrivmsgWindow
kwargs['chatname'] = chatname
+ kwargs['get_nick_data'] = lambda: self._db.get_force('nickname')
win = self._tui_new_window(**kwargs)
- if isinstance(win, _PrivmsgWindow):
- self._prompt_update(win)
self.windows += [win]
return win
if 'out' in kwargs:
prefix += _LOG_PREFIX_OUT if kwargs['out'] else _LOG_PREFIX_IN
if scope == LogScope.CHAT:
- nickname = (self.nickname if kwargs['out']
+ nickname = (self._db.nickname if kwargs['out']
else kwargs['sender'])
prefix += f' [{nickname}]'
else:
prefix += _LOG_PREFIX_SERVER
self._tui_log(msg, scope=scope, prefix=prefix, **kwargs)
- def update(self, **kwargs) -> bool:
+ def update(self, key: str, value: str, confirmed: bool, scope: LogScope
+ ) -> bool:
'Apply settings in kwargs, follow representation update triggers.'
- to_change = {}
- for key, new_value in kwargs.items():
- old_value = getattr(self, key)
- if new_value != old_value:
- to_change[key] = (old_value, new_value)
- for key, vals in to_change.items():
- self.log(
- f'changing {key}: [{vals[0]}] -> [{vals[1]}]',
- scope=LogScope.ALL if key == 'nickname' else LogScope.SERVER)
- setattr(self, key, vals[1])
- if _PrivmsgPromptWidget.prefix_update_keys() | set(to_change):
- for win in [w for w in self.windows
- if isinstance(w, _PrivmsgWindow)]:
- self._prompt_update(win)
+ changes = self._db.set(key, value, confirmed)
+ for i, t in enumerate((('', value), ('confirmation of', confirmed))):
+ if changes[i]:
+ self.log(f'changing {t[0]}{key} to: [{t[1]}]', scope=scope)
+ for win in [w for w in self.windows
+ if isinstance(w, _PrivmsgWindow)]:
+ win.set_prompt_prefix()
return bool([w for w in self.windows if w.tainted])
listing += f' ({cap.data})'
to_log += [listing]
if 'conn_setup' in kwargs:
+ conn_setup = kwargs['conn_setup']
to_log += ['connection setup:']
- for k, v in dc_asdict(kwargs['conn_setup']).items():
- to_log += [f' {k}: [{v}]']
+ for k in conn_setup._fields:
+ to_log += [f' {k}: [{getattr(conn_setup, k)}]']
for item in to_log:
self._client_tui_trigger('log', scope=scope, msg=item, **kwargs)
- def update_login(self, nick_confirmed: bool, nickname: str = '') -> None:
- super().update_login(nick_confirmed, nickname)
- self._client_tui_trigger('update', nick_confirmed=self.nick_confirmed,
- nickname=self.conn_setup.nickname)
+ def _update_db(self, key: str, value: int | str, confirm: bool) -> None:
+ super()._update_db(key, value, confirm)
+ self._client_tui_trigger('update', scope=LogScope.SERVER,
+ key=key, value=value, confirmed=confirm)