# built-ins
from abc import ABC, abstractmethod
from base64 import b64encode
-from dataclasses import dataclass, asdict as dc_asdict, InitVar
+from dataclasses import dataclass, InitVar
from enum import Enum, auto
from getpass import getuser
from threading import Thread
from uuid import uuid4
# ourselves
from ircplom.events import (
- AffectiveEvent, CrashingException, ExceptionEvent, QueueMixin)
+ AffectiveEvent, CrashingException, ExceptionEvent, QueueMixin)
from ircplom.irc_conn import (
- BaseIrcConnection, IrcConnAbortException, IrcMessage, ILLEGAL_NICK_CHARS,
- ILLEGAL_NICK_FIRSTCHARS, ISUPPORT_DEFAULTS, PORT_SSL)
+ BaseIrcConnection, IrcConnAbortException, IrcMessage,
+ ILLEGAL_NICK_CHARS, ILLEGAL_NICK_FIRSTCHARS, ISUPPORT_DEFAULTS, PORT_SSL)
ClientsDb = dict[str, 'Client']
-class Db:
- 'Helper with some conveniences around annotated attributes.'
+class _DeepAnnotationsMixin:
+ 'Provide ._deep_annotations() of non-underscored annotations of whole MRO.'
- def __init__(self, **kwargs) -> None:
- self._types: dict[str, type] = {}
- for c in self.__class__.__mro__:
+ @classmethod
+ def _deep_annotations(cls) -> dict[str, type]:
+ types: dict[str, type] = {}
+ for c in cls.__mro__:
if hasattr(c, '__annotations__'):
- self._types = c.__annotations__ | self._types
- for name, type_ in self._types.items():
- setattr(self, name, type_())
+ types = c.__annotations__ | types
+ return {k: v for k, v in types.items() if k[0] != '_'}
+
+
+class AutoAttrMixin(_DeepAnnotationsMixin):
+ 'Ensures attribute as defined by annotations, and ._make_attr method.'
+
+ def __getattribute__(self, key: str):
+ if key[0] != '_' and (cls := self._deep_annotations().get(key, None)):
+ try:
+ return super().__getattribute__(key)
+ except AttributeError:
+ setattr(self, key, self._make_attr(cls, key))
+ return super().__getattribute__(key)
+
+
+DictItem = TypeVar('DictItem')
+
+
+class Dict(Generic[DictItem]):
+ 'Customized dict replacement.'
+
+ def __init__(self, **kwargs) -> None:
+ self._dict: dict[str, DictItem] = {}
super().__init__(**kwargs)
- def _typecheck(self, key: str, value) -> None:
- type_ = self._types[key]
- if hasattr(type_, '__origin__'):
- assert isinstance(value, type_.__origin__)
- if len(value):
- assert hasattr(type_, '__args__')
- item_type = type_.__args__[0]
- for item in value:
- assert isinstance(item, item_type)
- else:
- assert isinstance(value, type_)
+ def __getitem__(self, key: str) -> DictItem:
+ return self._dict[key]
+
+ def clear(self) -> None:
+ 'Zero content.'
+ self._dict.clear()
+
+ @property
+ def _item_cls(self):
+ return self.__orig_class__.__args__[0]
@dataclass
password: str = ''
-class SharedChannelDbFields:
+class SharedClientDbFields(IrcConnSetup):
'API for fields shared directly in name and type with TUI.'
- user_ids: tuple[str, ...]
- # topic: str
- # channel_modes: str
-
+ connection_state: str = ''
+ isupport: Dict[str]
+ sasl_account: str = ''
+ sasl_auth_state: str = ''
+ user_modes: str = ''
-_ChannelDbFields = TypeVar('_ChannelDbFields', bound=SharedChannelDbFields)
-
-
-class SharedClientDbFields(IrcConnSetup, Generic[_ChannelDbFields]):
- 'API for fields shared directly in name and type with TUI.'
- connection_state: str
- sasl_account: str
- sasl_auth_state: str
- user_modes: str
- users: Any
- _channels: dict[str, _ChannelDbFields]
+ def is_chan_name(self, name: str) -> bool:
+ 'Tests name to match CHANTYPES prefixes.'
+ return name[0] in self.isupport['CHANTYPES']
@dataclass
user: str = '?'
host: str = '?'
- def copy(self) -> Self:
- 'Produce copy not subject to later attribute changes on original.'
- return self.__class__(**dc_asdict(self))
-
@dataclass
class ServerCapability:
'Public API for CAP data.'
- data: str
+ data: str = ''
enabled: bool = False
_MsgTok.ANY,
_MsgTok.ANY,
_MsgTok.ANY)),
- _MsgParseExpectation(_MsgTok.SERVER,
- '366', # RPL_ENDOFNAMES
- ((_MsgTok.NICKNAME, 'set_me_attr:nick'),
- _MsgTok.CHANNEL,
- _MsgTok.ANY)), # comment
_MsgParseExpectation(_MsgTok.SERVER,
'375', # RPL_MOTDSTART already implied by 1st 372
((_MsgTok.NICKNAME, 'set_me_attr:nick'),
_MsgParseExpectation(_MsgTok.SERVER,
'005', # RPL_ISUPPORT
((_MsgTok.NICKNAME, 'set_me_attr:nick'),
- (_MsgTok.ANY, ':isupports'),
+ (_MsgTok.ANY, ':isupport'),
_MsgTok.ANY), # comment
idx_into_list=1),
_MsgParseExpectation(_MsgTok.SERVER,
'=',
(_MsgTok.CHANNEL, ':channel'),
(_MsgTok.LIST, ':names'))),
+ _MsgParseExpectation(_MsgTok.SERVER,
+ '366', # RPL_ENDOFNAMES
+ ((_MsgTok.NICKNAME, 'set_me_attr:nick'),
+ (_MsgTok.CHANNEL, ':channel'),
+ _MsgTok.ANY)), # comment
_MsgParseExpectation((_MsgTok.NICK_USER_HOST, ':joiner'),
'JOIN',
((_MsgTok.CHANNEL, ':channel'),)),
client_id=self.client_id).kw(e=e)
-class _UpdatingDict:
- _on_update: Callable
+class _Dict(Dict[DictItem]):
+ _defaults: dict[str, DictItem]
- def __init__(self) -> None:
- self._dict: dict[str, Any] = {}
+ def __init__(self, **kwargs) -> None:
+ super().__init__(**kwargs)
+ self._defaults = {}
- def __getitem__(self, key: str):
- return self._dict[key] if key in self._dict else None
+ def __getitem__(self, key: str) -> DictItem:
+ if key not in self._dict and key in self._defaults:
+ return self._defaults[key]
+ return super().__getitem__(key)
- def __setitem__(self, key: str, val: Any) -> None:
- if isinstance(val, _NickUserHost):
- val.set_on_update(lambda: self._on_update(key))
+ def __setitem__(self, key: str, val: DictItem) -> None:
+ assert isinstance(val, self._item_cls)
self._dict[key] = val
- self._on_update(key)
def __delitem__(self, key: str) -> None:
del self._dict[key]
- self._on_update(key)
- @property
def keys(self) -> tuple[str, ...]:
'Keys of item registrations.'
return tuple(self._dict.keys())
- def clear(self) -> None:
- 'Zero dict and send clearance update.'
- self._dict.clear()
- self._on_update('')
-
- def set_on_update(self, name: str, on_update: Callable) -> None:
- 'Caller of on_update with path= set to name.'
- self._on_update = lambda k: on_update(name, k)
-
- def set_from_eq_str(self, eq_str: str, cls=str) -> None:
- 'Set from .key_val_from_eq_str result, to cls(val).'
- key, value = self.key_val_from_eq_str(eq_str)
- self[key] = cls(value)
+ def values(self) -> tuple[DictItem, ...]:
+ 'Items registered.'
+ return tuple(self._dict.values())
@staticmethod
def key_val_from_eq_str(eq_str: str) -> tuple[str, str]:
class _CompletableStringsList:
- _on_update: Callable
def __init__(self) -> None:
self._incomplete: list[str] = []
self.completed: tuple[str, ...] = tuple()
- def append(self, value: str) -> None:
+ def _on_list(self, m_name: str, value: str, complete: bool) -> None:
+ getattr(self._incomplete, m_name)(value)
+ if complete:
+ self.complete()
+
+ def append(self, value: str, complete=False) -> None:
'Append value to list.'
- self._incomplete.append(value)
+ self._on_list('append', value, complete)
+
+ def remove(self, value: str, complete=False) -> None:
+ 'Remove value from list.'
+ self._on_list('remove', value, complete)
def complete(self) -> None:
- 'Declare list done, call updater if set.'
+ 'Declare list done.'
self.completed = tuple(self._incomplete)
+ # self._incomplete.clear()
+
+ def clear(self) -> None:
+ 'Wipe content and declare new emptiness as complete.'
self._incomplete.clear()
- if hasattr(self, '_on_update'):
- self._on_update()
+ self.complete()
class _CapsManager:
def __init__(self,
sender: Callable[[IrcMessage], None],
- caps_dict: _UpdatingDict
+ caps_dict: '_UpdatingDict[_UpdatingServerCapability]'
) -> None:
self._dict = caps_dict
self._send = lambda *params: sender(IrcMessage('CAP', params=params))
'Parse CAP message to negot. steps, DB inputs; return if successful.'
for item in items:
if verb == 'NEW':
- self._dict.set_from_eq_str(item, ServerCapability)
+ key, data = _Dict.key_val_from_eq_str(item)
+ self._dict.set_updating(key, ServerCapability(data=data))
elif verb == 'DEL':
del self._dict[item]
elif verb in {'ACK', 'NACK'}:
list_set = set(target.completed)
assert acks == list_set & acks
assert set() == list_set & naks
- for key, data in [_UpdatingDict.key_val_from_eq_str(entry)
+ for key, data in [_Dict.key_val_from_eq_str(entry)
for entry in self._ls.completed]:
- self._dict[key] = ServerCapability(
- data=data, enabled=key in self._list.completed)
+ self._dict.set_updating(key, ServerCapability(
+ data=data, enabled=key in self._list.completed))
return True
return False
-class _Db(Db):
-
- def __init__(self, on_update: Callable, **kwargs) -> None:
- self._still_on_init = True
- self._on_update = on_update
- super().__init__(**kwargs)
- for key in [k for k in self._types if self._types[k] is _UpdatingDict]:
- getattr(self, key).set_on_update(key, self._on_update)
- self._still_on_init = False
-
- def __setattr__(self, key: str, value) -> None:
- if (not hasattr(self, '_still_on_init')) or self._still_on_init:
- super().__setattr__(key, value)
- return
- self._typecheck(key, value)
- super().__setattr__(key, value)
- if key[0] != '_':
- self._on_update(key)
-
+class _Channel:
+ user_ids: _CompletableStringsList
-class _ChannelDb(_Db, SharedChannelDbFields):
-
- def __init__(self, purge_users: Callable, **kwargs) -> None:
+ def __init__(self,
+ get_id_for_nick: Callable,
+ get_membership_prefixes: Callable,
+ purge_users: Callable,
+ **kwargs
+ ) -> None:
+ self._get_id_for_nick = get_id_for_nick
+ self._get_membership_prefixes = get_membership_prefixes
self._purge_users = purge_users
super().__init__(**kwargs)
- def add_user(self, user_id: str) -> None:
- 'Add user_id to .user_ids.'
- self.user_ids = tuple(list(self.user_ids) + [user_id])
- self._on_update('user_ids')
-
- def remove_user(self, user_id: str) -> None:
- 'Remove user_id from .user_ids.'
- self.user_ids = tuple(id_ for id_ in self.user_ids if id_ != user_id)
- self._on_update('user_ids')
+ def add_from_namreply(self, items: tuple[str, ...]):
+ 'Add to .user_ids items assumed as nicknames with membership prefixes.'
+ for item in items:
+ nickname = item.lstrip(self._get_membership_prefixes())
+ self.user_ids.append(self._get_id_for_nick(nickname))
+
+ def append_nick(self, nickname: str) -> None:
+ 'To .user_ids append .nickname and declare .user_ids complete.'
+ user_id = self._get_id_for_nick(nickname)
+ self.user_ids.append(user_id, complete=True)
+
+ def remove_nick(self, nickname: str) -> None:
+ 'From .user_ids remove .nickname and declare .user_ids complete.'
+ user_id = self._get_id_for_nick(nickname)
+ self.user_ids.remove(user_id, complete=True)
self._purge_users()
-class _NickUserHost(NickUserHost):
- _on_update: Callable
+class _NickUserHost(NickUserHost, _DeepAnnotationsMixin):
def __str__(self) -> str:
return f'{self.nick}!{self.user}@{self.host}'
- def __setattr__(self, key: str, value: Any) -> None:
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, NickUserHost):
+ return False
+ for key in self._deep_annotations().keys():
+ if getattr(self, key) != getattr(other, key):
+ return False
+ return True
+
+ def __setattr__(self, key: str, value: NickUserHost | str) -> None:
if key == 'nickuserhost' and isinstance(value, _NickUserHost):
- self.nick = value.nick
- self.user = value.user
- self.host = value.host
+ for annotated_key in self._deep_annotations().keys():
+ setattr(self, annotated_key, getattr(value, annotated_key))
else:
super().__setattr__(key, value)
- if key != '_on_update' and hasattr(self, '_on_update'):
- self._on_update()
@classmethod
def from_str(cls, value: str) -> Self:
assert len(toks) == 3
return cls(*toks)
- def set_on_update(self, on_update: Callable) -> None:
- 'Caller of on_update with path= set to name.'
+
+class _UpdatingMixin(AutoAttrMixin):
+ _on_update: Callable
+
+ def __init__(self, on_update: Callable, **kwargs) -> None:
+ super().__init__(**kwargs)
self._on_update = on_update
+ @property
+ def static_copy(self):
+ 'Return non-updating copy of self.'
+ if isinstance(self, _Dict):
+ return None
+ if isinstance(self, _CompletableStringsList):
+ return self.completed
+ for cls in self.__class__.__mro__:
+ if cls != _DeepAnnotationsMixin\
+ and AutoAttrMixin not in cls.__mro__:
+ obj = cls()
+ break
+ for key in self._deep_annotations():
+ attr_val = getattr(self, key)
+ setattr(obj, key,
+ attr_val if not isinstance(attr_val, _UpdatingMixin)
+ else attr_val.static_copy)
+ return obj
+
+ def _make_attr(self, cls: Callable, key: str):
+ return cls(on_update=lambda *steps: self._on_update(key, *steps))
-class _ClientDb(_Db, SharedClientDbFields):
- caps: _UpdatingDict
- isupports: _UpdatingDict
- users: _UpdatingDict
- motd: _CompletableStringsList
- _channels: dict[str, _ChannelDb]
+ @staticmethod
+ def _silenced_key(key: str, encode=True) -> str:
+ prefix = '__SILENCE__'
+ if encode:
+ return prefix + key
+ return key[len(prefix):] if key.startswith(prefix) else ''
- def __init__(self, **kwargs) -> None:
- super().__init__(**kwargs)
- self.motd._on_update = lambda: self._on_update('motd')
+ def __setattr__(self, key: str, value) -> None:
+ if (s_key := self._silenced_key(key, encode=False)):
+ super().__setattr__(s_key, value)
+ return
+ super().__setattr__(key, value)
+ if hasattr(self, '_on_update') and key in self._deep_annotations():
+ self._on_update(key)
+
+ @classmethod
+ def from_silent(cls, original, on_update: Callable) -> Self:
+ 'From non-updating original return updating variant.'
+ obj = cls(on_update=on_update)
+ for key in cls._deep_annotations().keys():
+ setattr(obj, cls._silenced_key(key), getattr(original, key))
+ return obj
+
+
+class _UpdatingDict(_UpdatingMixin, _Dict[DictItem]):
+ _create_if_none: Optional[dict[str, Any]] = None
+
+ @property
+ def _values_are_updating(self) -> bool:
+ return _UpdatingMixin in self._item_cls.__mro__
+
+ def __getitem__(self, key: str) -> DictItem:
+ if key not in self._dict:
+ if self._create_if_none is not None:
+ kw = {} | self._create_if_none
+ if self._values_are_updating:
+ kw |= {'on_update':
+ lambda *steps: self._on_update(key, *steps)}
+ self._dict[key] = self._item_cls(**kw)
+ return super().__getitem__(key)
+
+ def set_updating(self, key: str, val) -> None:
+ 'Set item at key from non-updating val into updating one.'
+ self[key] = self._item_cls.from_silent(
+ original=val,
+ on_update=lambda *steps: self._on_update(key, *steps))
+
+ def __setitem__(self, key: str, val: DictItem) -> None:
+ super().__setitem__(key, val)
+ self._on_update(key)
+
+ def __delitem__(self, key: str) -> None:
+ super().__delitem__(key)
+ self._on_update(key)
+
+ def clear(self) -> None:
+ super().clear()
+ self._on_update()
+
+
+class _UpdatingCompletableStringsList(_UpdatingMixin, _CompletableStringsList):
+
+ def complete(self) -> None:
+ super().complete()
+ self._on_update()
+
+
+class _UpdatingServerCapability(_UpdatingMixin, ServerCapability):
+ pass
+
+
+class _UpdatingChannel(_UpdatingMixin, _Channel):
+ user_ids: _UpdatingCompletableStringsList
+
+
+class _UpdatingNickUserHost(_UpdatingMixin, _NickUserHost):
+ pass
+
+
+class _ClientDb(_UpdatingMixin, SharedClientDbFields):
+ _keep_on_clear = set(IrcConnSetup.__annotations__.keys())
+ caps: _UpdatingDict[_UpdatingServerCapability]
+ channels: _UpdatingDict[_UpdatingChannel]
+ isupport: _UpdatingDict[str]
+ motd: _UpdatingCompletableStringsList
+ users: _UpdatingDict[_UpdatingNickUserHost]
+
+ def __getattribute__(self, key: str):
+ attr = super().__getattribute__(key)
+ if key == 'isupport' and not attr._defaults:
+ attr._defaults = ISUPPORT_DEFAULTS
+ elif key == 'channels' and not attr._create_if_none:
+ attr._create_if_none = {
+ 'get_id_for_nick': self.user_id,
+ 'get_membership_prefixes': self._get_membership_prefixes,
+ 'purge_users': self._purge_users}
+ return attr
+
+ def clear(self) -> None:
+ 'Wipe updating attributes.'
+ for key, value in [(k, v) for k, v in self._deep_annotations().items()
+ if k not in self._keep_on_clear]:
+ if isinstance(value, (_Dict, _CompletableStringsList)):
+ value.clear()
+ elif isinstance(value, str):
+ setattr(self, key, '')
def _purge_users(self) -> None:
to_keep = {'me'}
- for chan in self._channels.values():
- to_keep |= set(chan.user_ids)
- for user_id in [id_ for id_ in self.users.keys
+ for chan in self.channels.values():
+ to_keep |= set(chan.user_ids.completed)
+ for user_id in [id_ for id_ in self.users.keys()
if id_ not in to_keep]:
del self.users[user_id]
- def needs_arg(self, key: str) -> bool:
- 'Reply if attribute of key may reasonably be addressed without an arg.'
- return not isinstance(getattr(self, key), (bool, int, str, tuple,
- _CompletableStringsList))
-
@property
def illegal_nick_firstchars(self) -> str:
- 'Calculated from hardcoded constants and .isupports.'
+ 'Calculated from hardcoded constants and .isupport.'
return (ILLEGAL_NICK_CHARS + ILLEGAL_NICK_FIRSTCHARS
- + self.chan_prefixes + self.membership_prefixes)
+ + self.isupport['CHANTYPES'] + self._get_membership_prefixes())
- @property
- def chan_prefixes(self) -> str:
- 'Registered possible channel name prefixes.'
- return self.isupports['CHANTYPES'] or ISUPPORT_DEFAULTS['CHANTYPES']
-
- @property
- def membership_prefixes(self) -> str:
+ def _get_membership_prefixes(self) -> str:
'Registered possible membership nickname prefixes.'
- prefix = self.isupports['PREFIX'] or ISUPPORT_DEFAULTS['PREFIX']
- toks = prefix.split(')', maxsplit=1)
+ toks = self.isupport['PREFIX'].split(')', maxsplit=1)
assert len(toks) == 2
assert toks[0][0] == '('
return toks[1]
+ def chans_of_user(self, nickname: str) -> dict[str, _UpdatingChannel]:
+ 'Return dictionary of channels user is in.'
+ id_ = self.user_id(nickname)
+ return {k: self.channels[k] for k in self.channels.keys()
+ if id_ in self.channels[k].user_ids.completed}
+
def user_id(self, query: str | _NickUserHost) -> str:
'Return user_id for nickname of entire NickUserHost, create if none.'
nick = query if isinstance(query, str) else query.nick
- matches = [id_ for id_ in self.users.keys
+ matches = [id_ for id_ in self.users.keys()
if self.users[id_].nick == nick]
assert len(matches) < 2
id_ = matches[0] if matches else str(uuid4())
if isinstance(query, _NickUserHost):
- self.users[id_] = query
+ self.users.set_updating(id_, query)
elif not matches:
- self.users[id_] = _NickUserHost(query)
+ self.users.set_updating(id_, _NickUserHost(query))
return id_
- def remove_user(self, user_id: str) -> tuple[str, ...]:
- 'Run remove_user_from_channel on all channels user is in.'
- affected_chans = []
- for id_, chan in [(k, v) for k, v in self._channels.items()
- if user_id in v.user_ids]:
- chan.remove_user(user_id)
- affected_chans += [id_]
- return tuple(affected_chans)
-
- @property
- def chan_names(self) -> tuple[str, ...]:
- 'Return names of joined channels.'
- return tuple(self._channels.keys())
-
- def del_chan(self, name: str) -> None:
- 'Remove DB for channel of name.'
- del self._channels[name]
- self._purge_users()
- self._on_update(name)
-
- def chan(self, name: str) -> _ChannelDb:
- 'Produce DB for channel of name – pre-existing, or newly created.'
- if name not in self._channels:
- self._channels[name] = _ChannelDb(
- on_update=lambda k: self._on_update(name, k),
- purge_users=self._purge_users)
- return self._channels[name]
-
class Client(ABC, ClientQueueMixin):
'Abstracts socket connection, loop over it, and handling messages from it.'
self.client_id = conn_setup.hostname
super().__init__(client_id=self.client_id, **kwargs)
self._db = _ClientDb(on_update=self._on_update)
- self._db.users['me'] = _NickUserHost('?', getuser(), '?')
self._caps = _CapsManager(self.send, self._db.caps)
for k in conn_setup.__annotations__:
setattr(self._db, k, getattr(conn_setup, k))
def _on_connect(self) -> None:
assert self.conn is not None
+ self._db.users.set_updating('me', _NickUserHost('?', getuser(), '?'))
self._db.connection_state = 'connected'
self._caps.start_negotation()
self.send(IrcMessage(verb='USER', params=(
self.send(IrcMessage(verb='NICK', params=(self._db.nick_wanted,)))
def close(self) -> None:
- 'Close both recv Loop and socket.'
- self._db.connection_state = 'disconnected'
+ 'Close connection and wipe memory of its states.'
+ self._db.clear()
if self.conn:
self.conn.close()
self.conn = None
- for name in self._db.chan_names:
- self._db.del_chan(name)
- self._db.isupports.clear()
- self._db.users['me'].nick = '?'
- self._db.sasl_auth_state = ''
def on_handled_loop_exception(self, e: IrcConnAbortException) -> None:
'Gracefully handle broken connection.'
self.close()
@abstractmethod
- def _on_update(self, path: str, arg: str = '') -> None:
+ def _on_update(self, *path) -> None:
pass
@abstractmethod
def _match_msg(self, msg: IrcMessage) -> dict[str, Any]:
'Test .source, .verb, .params.'
tok_type = (str | _NickUserHost | tuple[str, ...]
- | dict[str, str | _ChannelDb])
+ | dict[str, str | _Channel])
def param_match(ex_tok: str | _MsgTok, msg_tok: str | list[str]
) -> Optional[tok_type | tuple[tok_type, ...]]:
return msg_tok if ('.' in msg_tok
and not set('@!') & set(msg_tok)) else None
if ex_tok is _MsgTok.CHANNEL:
- return {'id': msg_tok, 'db': self._db.chan(msg_tok)
- } if msg_tok[0] in self._db.chan_prefixes else None
+ return {'id': msg_tok, 'db': self._db.channels[msg_tok]
+ } if self._db.is_chan_name(msg_tok) else None
if ex_tok is _MsgTok.NICKNAME:
return (msg_tok
if msg_tok[0] not in self._db.illegal_nick_firstchars
setattr(self._db, arg, ret[arg])
if task == 'set_me_attr':
setattr(self._db.users['me'], arg, ret[arg])
- if task == 'set_user' and ret[arg] != self._db.users['me']:
+ if task == 'set_user':
self._db.user_id(ret[arg])
if ret['verb'] == '005': # RPL_ISUPPORT
- for item in ret['isupports']:
+ for item in ret['isupport']:
if item[0] == '-':
- del self._db.isupports[item[1:]]
+ del self._db.isupport[item[1:]]
else:
- self._db.isupports.set_from_eq_str(str(item))
+ key, data = _Dict.key_val_from_eq_str(item)
+ self._db.isupport[key] = data
elif ret['verb'] == '353': # RPL_NAMREPLY
- for user_id in [
- self._db.user_id(name.lstrip(self._db.membership_prefixes))
- for name in ret['names']]:
- ret['channel']['db'].add_user(user_id)
+ ret['channel']['db'].add_from_namreply(ret['names'])
+ elif ret['verb'] == '366': # RPL_ENDOFNAMES
+ ret['channel']['db'].user_ids.complete()
elif ret['verb'] == '372': # RPL_MOTD
self._db.motd.append(ret['line'])
elif ret['verb'] == '376': # RPL_ENDOFMOTD
elif ret['verb'] == 'CAP':
if (self._caps.process_msg(verb=ret['subverb'], items=ret['items'],
complete='tbc' not in ret)
- and 'sasl' in self._db.caps.keys
+ and 'sasl' in self._db.caps.keys()
and 'PLAIN' in self._db.caps['sasl'].data.split(',')):
if self._db.password:
self._db.sasl_auth_state = 'attempting'
elif ret['verb'] == 'ERROR':
self.close()
elif ret['verb'] == 'JOIN' and ret['joiner'] != self._db.users['me']:
- ret['channel']['db'].add_user(self._db.user_id(ret['joiner']))
+ ret['channel']['db'].append_nick(ret['joiner'])
elif ret['verb'] == 'NICK':
user_id = self._db.user_id(ret['named'])
self._db.users[user_id].nick = ret['nick']
self._log(ret['message'], out=False, **kw)
elif ret['verb'] == 'PART':
if ret['parter'] == self._db.users['me']:
- self._db.del_chan(ret['channel']['id'])
+ del self._db.channels[ret['channel']['id']]
else:
- ret['channel']['db'].remove_user(
- self._db.user_id(ret['parter']))
+ ret['channel']['db'].remove_nick(ret['parter'])
elif ret['verb'] == 'PING':
self.send(IrcMessage(verb='PONG', params=(ret['reply'],)))
elif ret['verb'] == 'QUIT':
- for chan in self._db.remove_user(self._db.user_id(ret['quitter'])):
+ for ch_name, ch in self._db.chans_of_user(ret['quitter']).items():
+ ch.remove_nick(ret['quitter'])
self._log(f'{ret["quitter"]} quits: {ret["message"]}',
- LogScope.CHAT, target=chan)
+ LogScope.CHAT, target=ch_name)
'TUI adaptions to Client.'
# built-ins
from getpass import getuser
-from dataclasses import dataclass
-from typing import Callable, Optional, Sequence
+from dataclasses import dataclass, asdict as dc_asdict
+from types import get_original_bases
+from typing import Any, Callable, Optional, Sequence
# ourselves
from ircplom.tui_base import (BaseTui, PromptWidget, TuiEvent, Window,
CMD_SHORTCUTS)
-from ircplom.irc_conn import IrcMessage, ISUPPORT_DEFAULTS
+from ircplom.irc_conn import IrcMessage
from ircplom.client import (
- Client, ClientQueueMixin, Db, IrcConnSetup, LogScope, NewClientEvent,
- NickUserHost, ServerCapability, SharedChannelDbFields,
+ AutoAttrMixin, Client, ClientQueueMixin, Dict, DictItem, IrcConnSetup,
+ LogScope, NewClientEvent, NickUserHost, ServerCapability,
SharedClientDbFields)
CMD_SHORTCUTS['disconnect'] = 'window.disconnect'
_LOG_PREFIX_OUT = '>'
_LOG_PREFIX_IN = '<'
-_DbType = bool | int | str | tuple[str, ...] | NickUserHost
-
class _ClientWindow(Window, ClientQueueMixin):
@dataclass
class _Update:
- path: str
- arg: str = ''
- value: Optional[_DbType] = None
- display: str = ''
+ path: tuple[str, ...]
+ value: Optional[Any] = None
- def __post_init__(self, **kwargs) -> None:
- super().__init__(**kwargs)
- if not self.display:
- self.display = str(self.value)
+class _UpdatingNode(AutoAttrMixin):
+ log_scopes: dict[tuple[str, ...], LogScope] = {tuple(): LogScope.SERVER}
-class _Db(Db):
+ def _make_attr(self, cls: Callable, key: str):
+ return cls()
- def _set_and_check_for_dict(self, update: _Update) -> bool:
- d = getattr(self, update.path)
- if update.value is None:
- if update.arg == '':
- d.clear()
- elif update.arg in d:
- del d[update.arg]
- return True
- old_value = d.get(update.arg, None)
- d[update.arg] = update.value
- return update.value != old_value
+ @classmethod
+ def _scope(cls, path: tuple[str, ...]) -> LogScope:
+ scopes: dict[tuple[str, ...], LogScope] = {}
+ for c in cls.__mro__:
+ if hasattr(c, 'log_scopes'):
+ scopes = c.log_scopes | scopes
+ return scopes.get(path, scopes[tuple()])
def set_and_check_for_change(self, update: _Update
- ) -> bool | dict[str, tuple[str, ...]]:
+ ) -> Optional[tuple[LogScope, Any]]:
'Apply update, return if that actually made a difference.'
- self._typecheck(update.path, update.value)
- old_value = getattr(self, update.path)
- setattr(self, update.path, update.value)
- return update.value != old_value
+ key = update.path[0]
+ node = self._get(key)
+ scope = self._scope(update.path)
+ if len(update.path) == 1:
+ if update.value is None:
+ if not self._is_set(key):
+ return None
+ self._unset(key)
+ return (scope, None)
+ if node == update.value:
+ return None
+ self._set(key, update.value)
+ return (scope, update.value)
+ return node.set_and_check_for_change(_Update(update.path[1:],
+ update.value))
+
+ def _get(self, key: str):
+ return getattr(self, key)
+
+ def _set(self, key: str, value) -> None:
+ setattr(self, key, value)
+
+ def _unset(self, key: str) -> None:
+ getattr(self, key).clear()
+
+ def _is_set(self, key: str) -> bool:
+ return hasattr(self, key)
+
+
+class _UpdatingDict(Dict[DictItem], _UpdatingNode):
+
+ def _get(self, key: str):
+ if key not in self._dict:
+ self._dict[key] = self._item_cls()
+ return self._dict[key]
+
+ def _set(self, key: str, value) -> None:
+ self._dict[key] = value
+
+ def _unset(self, key: str) -> None:
+ del self._dict[key]
+ def _is_set(self, key: str) -> bool:
+ return key in self._dict
-class _ChannelDb(_Db, SharedChannelDbFields):
+
+class _UpdatingChannel(_UpdatingNode):
+ user_ids: tuple[str, ...] = tuple()
+ log_scopes = {tuple(): LogScope.CHAT}
def set_and_check_for_change(self, update: _Update
- ) -> bool | dict[str, tuple[str, ...]]:
- if isinstance(getattr(self, update.path), dict):
- return self._set_and_check_for_dict(update)
- if update.path == 'user_ids':
+ ) -> Optional[tuple[LogScope, Any]]:
+ def fmt_ids(user_ids: tuple[str, ...]) -> str:
+ return ', '.join(user_ids)
+ if update.path == ('user_ids',):
assert isinstance(update.value, tuple)
- d = {'joins': tuple(user_id for user_id in update.value
- if user_id not in self.user_ids),
- 'parts': tuple(user_id for user_id in self.user_ids
- if user_id not in update.value)}
- return d if super().set_and_check_for_change(update) else False
+ d: dict[str, str] = {}
+ if not self.user_ids:
+ d['residents'] = fmt_ids(update.value)
+ else:
+ d['joining'] = fmt_ids(tuple(id_ for id_ in update.value
+ if id_ not in self.user_ids))
+ d['parting'] = fmt_ids(tuple(id_ for id_ in self.user_ids
+ if id_ not in update.value))
+ if super().set_and_check_for_change(update):
+ return (self._scope(update.path), d)
+ return None
return super().set_and_check_for_change(update)
-class _TuiClientDb(_Db, SharedClientDbFields):
- caps: dict[str, str]
- isupports: dict[str, str]
- motd: tuple[str, ...]
- users: dict[str, NickUserHost]
- _channels: dict[str, _ChannelDb]
+class _UpdatingNickUserHost(_UpdatingNode, NickUserHost):
+ pass
- def set_and_check_for_change(self, update: _Update
- ) -> bool | dict[str, tuple[str, ...]]:
- result: bool | dict[str, tuple[str, ...]] = False
- if self.is_chan_name(update.path):
- chan_name = update.path
- if update.value is None and not update.arg:
- del self._channels[chan_name]
- result = True
- else:
- update.path = update.arg
- update.arg = ''
- result = self.chan(chan_name).set_and_check_for_change(update)
- if isinstance(result, dict):
- for key, user_ids in result.items():
- result[key] = tuple(self.users[user_id].nick
- for user_id in user_ids)
- elif isinstance(getattr(self, update.path), dict):
- result = self._set_and_check_for_dict(update)
- else:
- result = super().set_and_check_for_change(update)
- return result
- def is_chan_name(self, name: str) -> bool:
- 'Tests name to match CHANTYPES prefixes.'
- return name[0] in self.isupports.get('CHANTYPES',
- ISUPPORT_DEFAULTS['CHANTYPES'])
+class _UpdatingServerCapability(_UpdatingNode, ServerCapability):
+ pass
- def chan(self, name: str) -> _ChannelDb:
- 'Produce DB for channel of name – pre-existing, or newly created.'
- if name not in self._channels:
- self._channels[name] = _ChannelDb()
- return self._channels[name]
+
+_UPDATING_DATACLASSES = (_UpdatingNickUserHost, _UpdatingServerCapability)
+
+
+class _TuiClientDb(_UpdatingNode, SharedClientDbFields):
+ caps: _UpdatingDict[_UpdatingServerCapability]
+ isupport: _UpdatingDict[str]
+ motd: tuple[str, ...] = tuple()
+ users: _UpdatingDict[_UpdatingNickUserHost]
+ channels: _UpdatingDict[_UpdatingChannel]
+ log_scopes = {tuple('connection_state'): LogScope.ALL}
-@dataclass
class _ClientWindowsManager:
- _tui_log: Callable
- _tui_new_window: Callable
- def __post_init__(self, *_, **__) -> None:
+ def __init__(self, tui_log: Callable, tui_new_window: Callable) -> None:
+ self._tui_log = tui_log
+ self._tui_new_window = tui_new_window
self.db = _TuiClientDb()
self.windows: list[_ClientWindow] = []
for scope in (LogScope.SERVER, LogScope.RAW):
def _new_win(self, scope: LogScope, chatname: str = '') -> _ClientWindow:
kwargs = {'scope': scope, 'log': self.log, 'win_cls': _ClientWindow}
if scope == LogScope.CHAT:
- kwargs['win_cls'] = (_ChannelWindow if chatname[0] == '#'
- else _ChatWindow)
+ kwargs['win_cls'] = (
+ _ChannelWindow if self.db.is_chan_name(chatname)
+ else _ChatWindow)
kwargs['chatname'] = chatname
kwargs['get_nick_data'] = lambda: self.db.users['me'].nick
win = self._tui_new_window(**kwargs)
def update_db(self, update: _Update) -> bool:
'Apply update to .db, and if changing anything, log and trigger.'
- is_chan_update = self.db.is_chan_name(update.path)
- scope = (LogScope.CHAT if is_chan_update
- else (LogScope.ALL if update.path == 'connection_state'
- else LogScope.SERVER))
- verb = 'cleared' if update.value is None else 'changed to:'
- what = f'{update.path}:{update.arg}' if update.arg else update.path
- log_kwargs = {'target': update.path} if is_chan_update else {}
+ for cls in [cls for cls in _UPDATING_DATACLASSES
+ if isinstance(update.value, get_original_bases(cls)[1])]:
+ update.value = cls(**dc_asdict(update.value)) # type: ignore
+ break
result = self.db.set_and_check_for_change(update)
- if result is False:
+ if result is None:
return False
- if isinstance(result, dict):
- for verb, names in result.items():
- for name in names:
- self.log(f'{name} {verb}', scope=scope, **log_kwargs)
+ scope, value = result
+ log_path = ':'.join(update.path)
+ log_kwargs: dict[str, Any] = {'scope': scope}
+ if scope is LogScope.CHAT:
+ log_kwargs |= {'target': update.path[1]}
+ if value is None:
+ self.log(f'{log_path} cleared', **log_kwargs)
+ elif isinstance(value, dict):
+ for verb, item in [(k, v) for k, v in value.items() if v]:
+ self.log(f'{verb}: {item}', **log_kwargs)
else:
- announcement = f'{what} {verb}'
- if isinstance(update.value, tuple) or announcement[-1] != ':':
- self.log(announcement, scope=scope, **log_kwargs)
- if isinstance(update.value, tuple):
- for item in update.value:
- self.log(f' {item}', scope=scope, **log_kwargs)
- elif announcement[-1] == ':':
- self.log(f'{announcement} [{update.display}]',
- scope=scope, **log_kwargs)
+ announcement = f'{log_path} set to:'
+ if isinstance(value, tuple):
+ self.log(announcement, **log_kwargs)
+ for item in value:
+ self.log(f' {item}', **log_kwargs)
+ else:
+ self.log(f'{announcement} [{value}]', **log_kwargs)
for win in [w for w in self.windows if isinstance(w, _ChatWindow)]:
win.set_prompt_prefix()
return bool([w for w in self.windows if w.tainted])
'Forward todo to appropriate _ClientWindowsManager.'
if client_id not in self._client_mngrs:
self._client_mngrs[client_id] = _ClientWindowsManager(
- _tui_log=lambda msg, **kw: self._log(
+ tui_log=lambda msg, **kw: self._log(
msg, client_id=client_id, **kw),
- _tui_new_window=lambda win_cls, **kw: self._new_window(
+ tui_new_window=lambda win_cls, **kw: self._new_window(
win_cls, _q_out=self._q_out, client_id=client_id, **kw))
if getattr(self._client_mngrs[client_id], todo)(**kwargs) is not False:
self.redraw_affected()
def privmsg(self, target: str, msg: str) -> None:
'Catch /privmsg, only allow for channel if in channel, else complain.'
- if target[0] == '#' and target not in self._db.chan_names:
+ if self._db.is_chan_name(target)\
+ and target not in self._db.channels.keys():
self._log('not sending, since not in channel',
scope=LogScope.SAME, alert=True)
return
with open(f'{self.client_id}.log', 'a', encoding='utf8') as f:
f.write(('>' if kwargs['out'] else '<') + f' {msg}\n')
- def _on_update(self, path: str, arg: str = '') -> None:
- value: Optional[_DbType] = None
- is_chan = path[0] in self._db.chan_prefixes
- display = ''
- if arg:
- if is_chan and path in self._db.chan_names:
- if (chan := self._db.chan(path)) and hasattr(chan, arg):
- value = getattr(chan, arg)
- else:
- d = getattr(self._db, path)
- if arg in d.keys:
- value = d[arg]
- if isinstance(value, ServerCapability):
- display = 'ENABLED' if value.enabled else 'available'
- if value.data:
- display += f' ({value.data})'
- elif (not is_chan) and not self._db.needs_arg(path):
- value = getattr(self._db, path)
- if isinstance(value, NickUserHost):
- value = value.copy()
- elif value and hasattr(value, 'completed'):
- value = value.completed
- self._client_tui_trigger('update_db', update=_Update(
- path, arg, value, display))
+ def _on_update(self, *path) -> None:
+ parent = self._db
+ for step in path[:-1]:
+ parent = (parent[step] if isinstance(parent, Dict)
+ else getattr(parent, step))
+ last_step = path[-1]
+ value = ((parent[last_step] if last_step in parent.keys()
+ else None) if isinstance(parent, Dict)
+ else getattr(parent, last_step))
+ if value and hasattr(value, 'static_copy'):
+ value = value.static_copy
+ self._client_tui_trigger('update_db', update=_Update(path, value))