From: Christian Heller Date: Tue, 2 Sep 2025 03:19:16 +0000 (+0200) Subject: Overhaul database hierarchies. X-Git-Url: https://plomlompom.com/repos/balance?a=commitdiff_plain;h=c9610b05e33fcb7ec5c40fba18a7ca3aebea1283;p=ircplom Overhaul database hierarchies. --- diff --git a/ircplom/client.py b/ircplom/client.py index 46b21cc..bc10127 100644 --- a/ircplom/client.py +++ b/ircplom/client.py @@ -2,7 +2,7 @@ # built-ins from abc import ABC, abstractmethod from base64 import b64encode -from dataclasses import dataclass, asdict as dc_asdict, InitVar +from dataclasses import dataclass, InitVar from enum import Enum, auto from getpass import getuser from threading import Thread @@ -10,37 +10,58 @@ from typing import Any, Callable, Generic, NamedTuple, Optional, Self, TypeVar from uuid import uuid4 # ourselves from ircplom.events import ( - AffectiveEvent, CrashingException, ExceptionEvent, QueueMixin) + AffectiveEvent, CrashingException, ExceptionEvent, QueueMixin) from ircplom.irc_conn import ( - BaseIrcConnection, IrcConnAbortException, IrcMessage, ILLEGAL_NICK_CHARS, - ILLEGAL_NICK_FIRSTCHARS, ISUPPORT_DEFAULTS, PORT_SSL) + BaseIrcConnection, IrcConnAbortException, IrcMessage, + ILLEGAL_NICK_CHARS, ILLEGAL_NICK_FIRSTCHARS, ISUPPORT_DEFAULTS, PORT_SSL) ClientsDb = dict[str, 'Client'] -class Db: - 'Helper with some conveniences around annotated attributes.' +class _DeepAnnotationsMixin: + 'Provide ._deep_annotations() of non-underscored annotations of whole MRO.' - def __init__(self, **kwargs) -> None: - self._types: dict[str, type] = {} - for c in self.__class__.__mro__: + @classmethod + def _deep_annotations(cls) -> dict[str, type]: + types: dict[str, type] = {} + for c in cls.__mro__: if hasattr(c, '__annotations__'): - self._types = c.__annotations__ | self._types - for name, type_ in self._types.items(): - setattr(self, name, type_()) + types = c.__annotations__ | types + return {k: v for k, v in types.items() if k[0] != '_'} + + +class AutoAttrMixin(_DeepAnnotationsMixin): + 'Ensures attribute as defined by annotations, and ._make_attr method.' + + def __getattribute__(self, key: str): + if key[0] != '_' and (cls := self._deep_annotations().get(key, None)): + try: + return super().__getattribute__(key) + except AttributeError: + setattr(self, key, self._make_attr(cls, key)) + return super().__getattribute__(key) + + +DictItem = TypeVar('DictItem') + + +class Dict(Generic[DictItem]): + 'Customized dict replacement.' + + def __init__(self, **kwargs) -> None: + self._dict: dict[str, DictItem] = {} super().__init__(**kwargs) - def _typecheck(self, key: str, value) -> None: - type_ = self._types[key] - if hasattr(type_, '__origin__'): - assert isinstance(value, type_.__origin__) - if len(value): - assert hasattr(type_, '__args__') - item_type = type_.__args__[0] - for item in value: - assert isinstance(item, item_type) - else: - assert isinstance(value, type_) + def __getitem__(self, key: str) -> DictItem: + return self._dict[key] + + def clear(self) -> None: + 'Zero content.' + self._dict.clear() + + @property + def _item_cls(self): + return self.__orig_class__.__args__[0] @dataclass @@ -53,24 +74,17 @@ class IrcConnSetup: password: str = '' -class SharedChannelDbFields: +class SharedClientDbFields(IrcConnSetup): 'API for fields shared directly in name and type with TUI.' - user_ids: tuple[str, ...] - # topic: str - # channel_modes: str - + connection_state: str = '' + isupport: Dict[str] + sasl_account: str = '' + sasl_auth_state: str = '' + user_modes: str = '' -_ChannelDbFields = TypeVar('_ChannelDbFields', bound=SharedChannelDbFields) - - -class SharedClientDbFields(IrcConnSetup, Generic[_ChannelDbFields]): - 'API for fields shared directly in name and type with TUI.' - connection_state: str - sasl_account: str - sasl_auth_state: str - user_modes: str - users: Any - _channels: dict[str, _ChannelDbFields] + def is_chan_name(self, name: str) -> bool: + 'Tests name to match CHANTYPES prefixes.' + return name[0] in self.isupport['CHANTYPES'] @dataclass @@ -80,15 +94,11 @@ class NickUserHost: user: str = '?' host: str = '?' - def copy(self) -> Self: - 'Produce copy not subject to later attribute changes on original.' - return self.__class__(**dc_asdict(self)) - @dataclass class ServerCapability: 'Public API for CAP data.' - data: str + data: str = '' enabled: bool = False @@ -224,11 +234,6 @@ _EXPECTATIONS += [ _MsgTok.ANY, _MsgTok.ANY, _MsgTok.ANY)), - _MsgParseExpectation(_MsgTok.SERVER, - '366', # RPL_ENDOFNAMES - ((_MsgTok.NICKNAME, 'set_me_attr:nick'), - _MsgTok.CHANNEL, - _MsgTok.ANY)), # comment _MsgParseExpectation(_MsgTok.SERVER, '375', # RPL_MOTDSTART already implied by 1st 372 ((_MsgTok.NICKNAME, 'set_me_attr:nick'), @@ -240,7 +245,7 @@ _EXPECTATIONS += [ _MsgParseExpectation(_MsgTok.SERVER, '005', # RPL_ISUPPORT ((_MsgTok.NICKNAME, 'set_me_attr:nick'), - (_MsgTok.ANY, ':isupports'), + (_MsgTok.ANY, ':isupport'), _MsgTok.ANY), # comment idx_into_list=1), _MsgParseExpectation(_MsgTok.SERVER, @@ -388,6 +393,11 @@ _EXPECTATIONS += [ '=', (_MsgTok.CHANNEL, ':channel'), (_MsgTok.LIST, ':names'))), + _MsgParseExpectation(_MsgTok.SERVER, + '366', # RPL_ENDOFNAMES + ((_MsgTok.NICKNAME, 'set_me_attr:nick'), + (_MsgTok.CHANNEL, ':channel'), + _MsgTok.ANY)), # comment _MsgParseExpectation((_MsgTok.NICK_USER_HOST, ':joiner'), 'JOIN', ((_MsgTok.CHANNEL, ':channel'),)), @@ -480,43 +490,32 @@ class _IrcConnection(BaseIrcConnection, _ClientIdMixin): client_id=self.client_id).kw(e=e) -class _UpdatingDict: - _on_update: Callable +class _Dict(Dict[DictItem]): + _defaults: dict[str, DictItem] - def __init__(self) -> None: - self._dict: dict[str, Any] = {} + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self._defaults = {} - def __getitem__(self, key: str): - return self._dict[key] if key in self._dict else None + def __getitem__(self, key: str) -> DictItem: + if key not in self._dict and key in self._defaults: + return self._defaults[key] + return super().__getitem__(key) - def __setitem__(self, key: str, val: Any) -> None: - if isinstance(val, _NickUserHost): - val.set_on_update(lambda: self._on_update(key)) + def __setitem__(self, key: str, val: DictItem) -> None: + assert isinstance(val, self._item_cls) self._dict[key] = val - self._on_update(key) def __delitem__(self, key: str) -> None: del self._dict[key] - self._on_update(key) - @property def keys(self) -> tuple[str, ...]: 'Keys of item registrations.' return tuple(self._dict.keys()) - def clear(self) -> None: - 'Zero dict and send clearance update.' - self._dict.clear() - self._on_update('') - - def set_on_update(self, name: str, on_update: Callable) -> None: - 'Caller of on_update with path= set to name.' - self._on_update = lambda k: on_update(name, k) - - def set_from_eq_str(self, eq_str: str, cls=str) -> None: - 'Set from .key_val_from_eq_str result, to cls(val).' - key, value = self.key_val_from_eq_str(eq_str) - self[key] = cls(value) + def values(self) -> tuple[DictItem, ...]: + 'Items registered.' + return tuple(self._dict.values()) @staticmethod def key_val_from_eq_str(eq_str: str) -> tuple[str, str]: @@ -526,29 +525,40 @@ class _UpdatingDict: class _CompletableStringsList: - _on_update: Callable def __init__(self) -> None: self._incomplete: list[str] = [] self.completed: tuple[str, ...] = tuple() - def append(self, value: str) -> None: + def _on_list(self, m_name: str, value: str, complete: bool) -> None: + getattr(self._incomplete, m_name)(value) + if complete: + self.complete() + + def append(self, value: str, complete=False) -> None: 'Append value to list.' - self._incomplete.append(value) + self._on_list('append', value, complete) + + def remove(self, value: str, complete=False) -> None: + 'Remove value from list.' + self._on_list('remove', value, complete) def complete(self) -> None: - 'Declare list done, call updater if set.' + 'Declare list done.' self.completed = tuple(self._incomplete) + # self._incomplete.clear() + + def clear(self) -> None: + 'Wipe content and declare new emptiness as complete.' self._incomplete.clear() - if hasattr(self, '_on_update'): - self._on_update() + self.complete() class _CapsManager: def __init__(self, sender: Callable[[IrcMessage], None], - caps_dict: _UpdatingDict + caps_dict: '_UpdatingDict[_UpdatingServerCapability]' ) -> None: self._dict = caps_dict self._send = lambda *params: sender(IrcMessage('CAP', params=params)) @@ -576,7 +586,8 @@ class _CapsManager: 'Parse CAP message to negot. steps, DB inputs; return if successful.' for item in items: if verb == 'NEW': - self._dict.set_from_eq_str(item, ServerCapability) + key, data = _Dict.key_val_from_eq_str(item) + self._dict.set_updating(key, ServerCapability(data=data)) elif verb == 'DEL': del self._dict[item] elif verb in {'ACK', 'NACK'}: @@ -597,67 +608,65 @@ class _CapsManager: list_set = set(target.completed) assert acks == list_set & acks assert set() == list_set & naks - for key, data in [_UpdatingDict.key_val_from_eq_str(entry) + for key, data in [_Dict.key_val_from_eq_str(entry) for entry in self._ls.completed]: - self._dict[key] = ServerCapability( - data=data, enabled=key in self._list.completed) + self._dict.set_updating(key, ServerCapability( + data=data, enabled=key in self._list.completed)) return True return False -class _Db(Db): - - def __init__(self, on_update: Callable, **kwargs) -> None: - self._still_on_init = True - self._on_update = on_update - super().__init__(**kwargs) - for key in [k for k in self._types if self._types[k] is _UpdatingDict]: - getattr(self, key).set_on_update(key, self._on_update) - self._still_on_init = False - - def __setattr__(self, key: str, value) -> None: - if (not hasattr(self, '_still_on_init')) or self._still_on_init: - super().__setattr__(key, value) - return - self._typecheck(key, value) - super().__setattr__(key, value) - if key[0] != '_': - self._on_update(key) - +class _Channel: + user_ids: _CompletableStringsList -class _ChannelDb(_Db, SharedChannelDbFields): - - def __init__(self, purge_users: Callable, **kwargs) -> None: + def __init__(self, + get_id_for_nick: Callable, + get_membership_prefixes: Callable, + purge_users: Callable, + **kwargs + ) -> None: + self._get_id_for_nick = get_id_for_nick + self._get_membership_prefixes = get_membership_prefixes self._purge_users = purge_users super().__init__(**kwargs) - def add_user(self, user_id: str) -> None: - 'Add user_id to .user_ids.' - self.user_ids = tuple(list(self.user_ids) + [user_id]) - self._on_update('user_ids') - - def remove_user(self, user_id: str) -> None: - 'Remove user_id from .user_ids.' - self.user_ids = tuple(id_ for id_ in self.user_ids if id_ != user_id) - self._on_update('user_ids') + def add_from_namreply(self, items: tuple[str, ...]): + 'Add to .user_ids items assumed as nicknames with membership prefixes.' + for item in items: + nickname = item.lstrip(self._get_membership_prefixes()) + self.user_ids.append(self._get_id_for_nick(nickname)) + + def append_nick(self, nickname: str) -> None: + 'To .user_ids append .nickname and declare .user_ids complete.' + user_id = self._get_id_for_nick(nickname) + self.user_ids.append(user_id, complete=True) + + def remove_nick(self, nickname: str) -> None: + 'From .user_ids remove .nickname and declare .user_ids complete.' + user_id = self._get_id_for_nick(nickname) + self.user_ids.remove(user_id, complete=True) self._purge_users() -class _NickUserHost(NickUserHost): - _on_update: Callable +class _NickUserHost(NickUserHost, _DeepAnnotationsMixin): def __str__(self) -> str: return f'{self.nick}!{self.user}@{self.host}' - def __setattr__(self, key: str, value: Any) -> None: + def __eq__(self, other) -> bool: + if not isinstance(other, NickUserHost): + return False + for key in self._deep_annotations().keys(): + if getattr(self, key) != getattr(other, key): + return False + return True + + def __setattr__(self, key: str, value: NickUserHost | str) -> None: if key == 'nickuserhost' and isinstance(value, _NickUserHost): - self.nick = value.nick - self.user = value.user - self.host = value.host + for annotated_key in self._deep_annotations().keys(): + setattr(self, annotated_key, getattr(value, annotated_key)) else: super().__setattr__(key, value) - if key != '_on_update' and hasattr(self, '_on_update'): - self._on_update() @classmethod def from_str(cls, value: str) -> Self: @@ -668,96 +677,183 @@ class _NickUserHost(NickUserHost): assert len(toks) == 3 return cls(*toks) - def set_on_update(self, on_update: Callable) -> None: - 'Caller of on_update with path= set to name.' + +class _UpdatingMixin(AutoAttrMixin): + _on_update: Callable + + def __init__(self, on_update: Callable, **kwargs) -> None: + super().__init__(**kwargs) self._on_update = on_update + @property + def static_copy(self): + 'Return non-updating copy of self.' + if isinstance(self, _Dict): + return None + if isinstance(self, _CompletableStringsList): + return self.completed + for cls in self.__class__.__mro__: + if cls != _DeepAnnotationsMixin\ + and AutoAttrMixin not in cls.__mro__: + obj = cls() + break + for key in self._deep_annotations(): + attr_val = getattr(self, key) + setattr(obj, key, + attr_val if not isinstance(attr_val, _UpdatingMixin) + else attr_val.static_copy) + return obj + + def _make_attr(self, cls: Callable, key: str): + return cls(on_update=lambda *steps: self._on_update(key, *steps)) -class _ClientDb(_Db, SharedClientDbFields): - caps: _UpdatingDict - isupports: _UpdatingDict - users: _UpdatingDict - motd: _CompletableStringsList - _channels: dict[str, _ChannelDb] + @staticmethod + def _silenced_key(key: str, encode=True) -> str: + prefix = '__SILENCE__' + if encode: + return prefix + key + return key[len(prefix):] if key.startswith(prefix) else '' - def __init__(self, **kwargs) -> None: - super().__init__(**kwargs) - self.motd._on_update = lambda: self._on_update('motd') + def __setattr__(self, key: str, value) -> None: + if (s_key := self._silenced_key(key, encode=False)): + super().__setattr__(s_key, value) + return + super().__setattr__(key, value) + if hasattr(self, '_on_update') and key in self._deep_annotations(): + self._on_update(key) + + @classmethod + def from_silent(cls, original, on_update: Callable) -> Self: + 'From non-updating original return updating variant.' + obj = cls(on_update=on_update) + for key in cls._deep_annotations().keys(): + setattr(obj, cls._silenced_key(key), getattr(original, key)) + return obj + + +class _UpdatingDict(_UpdatingMixin, _Dict[DictItem]): + _create_if_none: Optional[dict[str, Any]] = None + + @property + def _values_are_updating(self) -> bool: + return _UpdatingMixin in self._item_cls.__mro__ + + def __getitem__(self, key: str) -> DictItem: + if key not in self._dict: + if self._create_if_none is not None: + kw = {} | self._create_if_none + if self._values_are_updating: + kw |= {'on_update': + lambda *steps: self._on_update(key, *steps)} + self._dict[key] = self._item_cls(**kw) + return super().__getitem__(key) + + def set_updating(self, key: str, val) -> None: + 'Set item at key from non-updating val into updating one.' + self[key] = self._item_cls.from_silent( + original=val, + on_update=lambda *steps: self._on_update(key, *steps)) + + def __setitem__(self, key: str, val: DictItem) -> None: + super().__setitem__(key, val) + self._on_update(key) + + def __delitem__(self, key: str) -> None: + super().__delitem__(key) + self._on_update(key) + + def clear(self) -> None: + super().clear() + self._on_update() + + +class _UpdatingCompletableStringsList(_UpdatingMixin, _CompletableStringsList): + + def complete(self) -> None: + super().complete() + self._on_update() + + +class _UpdatingServerCapability(_UpdatingMixin, ServerCapability): + pass + + +class _UpdatingChannel(_UpdatingMixin, _Channel): + user_ids: _UpdatingCompletableStringsList + + +class _UpdatingNickUserHost(_UpdatingMixin, _NickUserHost): + pass + + +class _ClientDb(_UpdatingMixin, SharedClientDbFields): + _keep_on_clear = set(IrcConnSetup.__annotations__.keys()) + caps: _UpdatingDict[_UpdatingServerCapability] + channels: _UpdatingDict[_UpdatingChannel] + isupport: _UpdatingDict[str] + motd: _UpdatingCompletableStringsList + users: _UpdatingDict[_UpdatingNickUserHost] + + def __getattribute__(self, key: str): + attr = super().__getattribute__(key) + if key == 'isupport' and not attr._defaults: + attr._defaults = ISUPPORT_DEFAULTS + elif key == 'channels' and not attr._create_if_none: + attr._create_if_none = { + 'get_id_for_nick': self.user_id, + 'get_membership_prefixes': self._get_membership_prefixes, + 'purge_users': self._purge_users} + return attr + + def clear(self) -> None: + 'Wipe updating attributes.' + for key, value in [(k, v) for k, v in self._deep_annotations().items() + if k not in self._keep_on_clear]: + if isinstance(value, (_Dict, _CompletableStringsList)): + value.clear() + elif isinstance(value, str): + setattr(self, key, '') def _purge_users(self) -> None: to_keep = {'me'} - for chan in self._channels.values(): - to_keep |= set(chan.user_ids) - for user_id in [id_ for id_ in self.users.keys + for chan in self.channels.values(): + to_keep |= set(chan.user_ids.completed) + for user_id in [id_ for id_ in self.users.keys() if id_ not in to_keep]: del self.users[user_id] - def needs_arg(self, key: str) -> bool: - 'Reply if attribute of key may reasonably be addressed without an arg.' - return not isinstance(getattr(self, key), (bool, int, str, tuple, - _CompletableStringsList)) - @property def illegal_nick_firstchars(self) -> str: - 'Calculated from hardcoded constants and .isupports.' + 'Calculated from hardcoded constants and .isupport.' return (ILLEGAL_NICK_CHARS + ILLEGAL_NICK_FIRSTCHARS - + self.chan_prefixes + self.membership_prefixes) + + self.isupport['CHANTYPES'] + self._get_membership_prefixes()) - @property - def chan_prefixes(self) -> str: - 'Registered possible channel name prefixes.' - return self.isupports['CHANTYPES'] or ISUPPORT_DEFAULTS['CHANTYPES'] - - @property - def membership_prefixes(self) -> str: + def _get_membership_prefixes(self) -> str: 'Registered possible membership nickname prefixes.' - prefix = self.isupports['PREFIX'] or ISUPPORT_DEFAULTS['PREFIX'] - toks = prefix.split(')', maxsplit=1) + toks = self.isupport['PREFIX'].split(')', maxsplit=1) assert len(toks) == 2 assert toks[0][0] == '(' return toks[1] + def chans_of_user(self, nickname: str) -> dict[str, _UpdatingChannel]: + 'Return dictionary of channels user is in.' + id_ = self.user_id(nickname) + return {k: self.channels[k] for k in self.channels.keys() + if id_ in self.channels[k].user_ids.completed} + def user_id(self, query: str | _NickUserHost) -> str: 'Return user_id for nickname of entire NickUserHost, create if none.' nick = query if isinstance(query, str) else query.nick - matches = [id_ for id_ in self.users.keys + matches = [id_ for id_ in self.users.keys() if self.users[id_].nick == nick] assert len(matches) < 2 id_ = matches[0] if matches else str(uuid4()) if isinstance(query, _NickUserHost): - self.users[id_] = query + self.users.set_updating(id_, query) elif not matches: - self.users[id_] = _NickUserHost(query) + self.users.set_updating(id_, _NickUserHost(query)) return id_ - def remove_user(self, user_id: str) -> tuple[str, ...]: - 'Run remove_user_from_channel on all channels user is in.' - affected_chans = [] - for id_, chan in [(k, v) for k, v in self._channels.items() - if user_id in v.user_ids]: - chan.remove_user(user_id) - affected_chans += [id_] - return tuple(affected_chans) - - @property - def chan_names(self) -> tuple[str, ...]: - 'Return names of joined channels.' - return tuple(self._channels.keys()) - - def del_chan(self, name: str) -> None: - 'Remove DB for channel of name.' - del self._channels[name] - self._purge_users() - self._on_update(name) - - def chan(self, name: str) -> _ChannelDb: - 'Produce DB for channel of name – pre-existing, or newly created.' - if name not in self._channels: - self._channels[name] = _ChannelDb( - on_update=lambda k: self._on_update(name, k), - purge_users=self._purge_users) - return self._channels[name] - class Client(ABC, ClientQueueMixin): 'Abstracts socket connection, loop over it, and handling messages from it.' @@ -768,7 +864,6 @@ class Client(ABC, ClientQueueMixin): self.client_id = conn_setup.hostname super().__init__(client_id=self.client_id, **kwargs) self._db = _ClientDb(on_update=self._on_update) - self._db.users['me'] = _NickUserHost('?', getuser(), '?') self._caps = _CapsManager(self.send, self._db.caps) for k in conn_setup.__annotations__: setattr(self._db, k, getattr(conn_setup, k)) @@ -797,6 +892,7 @@ class Client(ABC, ClientQueueMixin): def _on_connect(self) -> None: assert self.conn is not None + self._db.users.set_updating('me', _NickUserHost('?', getuser(), '?')) self._db.connection_state = 'connected' self._caps.start_negotation() self.send(IrcMessage(verb='USER', params=( @@ -805,16 +901,11 @@ class Client(ABC, ClientQueueMixin): self.send(IrcMessage(verb='NICK', params=(self._db.nick_wanted,))) def close(self) -> None: - 'Close both recv Loop and socket.' - self._db.connection_state = 'disconnected' + 'Close connection and wipe memory of its states.' + self._db.clear() if self.conn: self.conn.close() self.conn = None - for name in self._db.chan_names: - self._db.del_chan(name) - self._db.isupports.clear() - self._db.users['me'].nick = '?' - self._db.sasl_auth_state = '' def on_handled_loop_exception(self, e: IrcConnAbortException) -> None: 'Gracefully handle broken connection.' @@ -822,7 +913,7 @@ class Client(ABC, ClientQueueMixin): self.close() @abstractmethod - def _on_update(self, path: str, arg: str = '') -> None: + def _on_update(self, *path) -> None: pass @abstractmethod @@ -851,7 +942,7 @@ class Client(ABC, ClientQueueMixin): def _match_msg(self, msg: IrcMessage) -> dict[str, Any]: 'Test .source, .verb, .params.' tok_type = (str | _NickUserHost | tuple[str, ...] - | dict[str, str | _ChannelDb]) + | dict[str, str | _Channel]) def param_match(ex_tok: str | _MsgTok, msg_tok: str | list[str] ) -> Optional[tok_type | tuple[tok_type, ...]]: @@ -871,8 +962,8 @@ class Client(ABC, ClientQueueMixin): return msg_tok if ('.' in msg_tok and not set('@!') & set(msg_tok)) else None if ex_tok is _MsgTok.CHANNEL: - return {'id': msg_tok, 'db': self._db.chan(msg_tok) - } if msg_tok[0] in self._db.chan_prefixes else None + return {'id': msg_tok, 'db': self._db.channels[msg_tok] + } if self._db.is_chan_name(msg_tok) else None if ex_tok is _MsgTok.NICKNAME: return (msg_tok if msg_tok[0] not in self._db.illegal_nick_firstchars @@ -930,19 +1021,19 @@ class Client(ABC, ClientQueueMixin): setattr(self._db, arg, ret[arg]) if task == 'set_me_attr': setattr(self._db.users['me'], arg, ret[arg]) - if task == 'set_user' and ret[arg] != self._db.users['me']: + if task == 'set_user': self._db.user_id(ret[arg]) if ret['verb'] == '005': # RPL_ISUPPORT - for item in ret['isupports']: + for item in ret['isupport']: if item[0] == '-': - del self._db.isupports[item[1:]] + del self._db.isupport[item[1:]] else: - self._db.isupports.set_from_eq_str(str(item)) + key, data = _Dict.key_val_from_eq_str(item) + self._db.isupport[key] = data elif ret['verb'] == '353': # RPL_NAMREPLY - for user_id in [ - self._db.user_id(name.lstrip(self._db.membership_prefixes)) - for name in ret['names']]: - ret['channel']['db'].add_user(user_id) + ret['channel']['db'].add_from_namreply(ret['names']) + elif ret['verb'] == '366': # RPL_ENDOFNAMES + ret['channel']['db'].user_ids.complete() elif ret['verb'] == '372': # RPL_MOTD self._db.motd.append(ret['line']) elif ret['verb'] == '376': # RPL_ENDOFMOTD @@ -970,7 +1061,7 @@ class Client(ABC, ClientQueueMixin): elif ret['verb'] == 'CAP': if (self._caps.process_msg(verb=ret['subverb'], items=ret['items'], complete='tbc' not in ret) - and 'sasl' in self._db.caps.keys + and 'sasl' in self._db.caps.keys() and 'PLAIN' in self._db.caps['sasl'].data.split(',')): if self._db.password: self._db.sasl_auth_state = 'attempting' @@ -980,7 +1071,7 @@ class Client(ABC, ClientQueueMixin): elif ret['verb'] == 'ERROR': self.close() elif ret['verb'] == 'JOIN' and ret['joiner'] != self._db.users['me']: - ret['channel']['db'].add_user(self._db.user_id(ret['joiner'])) + ret['channel']['db'].append_nick(ret['joiner']) elif ret['verb'] == 'NICK': user_id = self._db.user_id(ret['named']) self._db.users[user_id].nick = ret['nick'] @@ -996,13 +1087,13 @@ class Client(ABC, ClientQueueMixin): self._log(ret['message'], out=False, **kw) elif ret['verb'] == 'PART': if ret['parter'] == self._db.users['me']: - self._db.del_chan(ret['channel']['id']) + del self._db.channels[ret['channel']['id']] else: - ret['channel']['db'].remove_user( - self._db.user_id(ret['parter'])) + ret['channel']['db'].remove_nick(ret['parter']) elif ret['verb'] == 'PING': self.send(IrcMessage(verb='PONG', params=(ret['reply'],))) elif ret['verb'] == 'QUIT': - for chan in self._db.remove_user(self._db.user_id(ret['quitter'])): + for ch_name, ch in self._db.chans_of_user(ret['quitter']).items(): + ch.remove_nick(ret['quitter']) self._log(f'{ret["quitter"]} quits: {ret["message"]}', - LogScope.CHAT, target=chan) + LogScope.CHAT, target=ch_name) diff --git a/ircplom/client_tui.py b/ircplom/client_tui.py index 5664df2..1512535 100644 --- a/ircplom/client_tui.py +++ b/ircplom/client_tui.py @@ -1,15 +1,16 @@ 'TUI adaptions to Client.' # built-ins from getpass import getuser -from dataclasses import dataclass -from typing import Callable, Optional, Sequence +from dataclasses import dataclass, asdict as dc_asdict +from types import get_original_bases +from typing import Any, Callable, Optional, Sequence # ourselves from ircplom.tui_base import (BaseTui, PromptWidget, TuiEvent, Window, CMD_SHORTCUTS) -from ircplom.irc_conn import IrcMessage, ISUPPORT_DEFAULTS +from ircplom.irc_conn import IrcMessage from ircplom.client import ( - Client, ClientQueueMixin, Db, IrcConnSetup, LogScope, NewClientEvent, - NickUserHost, ServerCapability, SharedChannelDbFields, + AutoAttrMixin, Client, ClientQueueMixin, Dict, DictItem, IrcConnSetup, + LogScope, NewClientEvent, NickUserHost, ServerCapability, SharedClientDbFields) CMD_SHORTCUTS['disconnect'] = 'window.disconnect' @@ -24,8 +25,6 @@ _LOG_PREFIX_SERVER = '$' _LOG_PREFIX_OUT = '>' _LOG_PREFIX_IN = '<' -_DbType = bool | int | str | tuple[str, ...] | NickUserHost - class _ClientWindow(Window, ClientQueueMixin): @@ -120,103 +119,122 @@ class _ChannelWindow(_ChatWindow): @dataclass class _Update: - path: str - arg: str = '' - value: Optional[_DbType] = None - display: str = '' + path: tuple[str, ...] + value: Optional[Any] = None - def __post_init__(self, **kwargs) -> None: - super().__init__(**kwargs) - if not self.display: - self.display = str(self.value) +class _UpdatingNode(AutoAttrMixin): + log_scopes: dict[tuple[str, ...], LogScope] = {tuple(): LogScope.SERVER} -class _Db(Db): + def _make_attr(self, cls: Callable, key: str): + return cls() - def _set_and_check_for_dict(self, update: _Update) -> bool: - d = getattr(self, update.path) - if update.value is None: - if update.arg == '': - d.clear() - elif update.arg in d: - del d[update.arg] - return True - old_value = d.get(update.arg, None) - d[update.arg] = update.value - return update.value != old_value + @classmethod + def _scope(cls, path: tuple[str, ...]) -> LogScope: + scopes: dict[tuple[str, ...], LogScope] = {} + for c in cls.__mro__: + if hasattr(c, 'log_scopes'): + scopes = c.log_scopes | scopes + return scopes.get(path, scopes[tuple()]) def set_and_check_for_change(self, update: _Update - ) -> bool | dict[str, tuple[str, ...]]: + ) -> Optional[tuple[LogScope, Any]]: 'Apply update, return if that actually made a difference.' - self._typecheck(update.path, update.value) - old_value = getattr(self, update.path) - setattr(self, update.path, update.value) - return update.value != old_value + key = update.path[0] + node = self._get(key) + scope = self._scope(update.path) + if len(update.path) == 1: + if update.value is None: + if not self._is_set(key): + return None + self._unset(key) + return (scope, None) + if node == update.value: + return None + self._set(key, update.value) + return (scope, update.value) + return node.set_and_check_for_change(_Update(update.path[1:], + update.value)) + + def _get(self, key: str): + return getattr(self, key) + + def _set(self, key: str, value) -> None: + setattr(self, key, value) + + def _unset(self, key: str) -> None: + getattr(self, key).clear() + + def _is_set(self, key: str) -> bool: + return hasattr(self, key) + + +class _UpdatingDict(Dict[DictItem], _UpdatingNode): + + def _get(self, key: str): + if key not in self._dict: + self._dict[key] = self._item_cls() + return self._dict[key] + + def _set(self, key: str, value) -> None: + self._dict[key] = value + + def _unset(self, key: str) -> None: + del self._dict[key] + def _is_set(self, key: str) -> bool: + return key in self._dict -class _ChannelDb(_Db, SharedChannelDbFields): + +class _UpdatingChannel(_UpdatingNode): + user_ids: tuple[str, ...] = tuple() + log_scopes = {tuple(): LogScope.CHAT} def set_and_check_for_change(self, update: _Update - ) -> bool | dict[str, tuple[str, ...]]: - if isinstance(getattr(self, update.path), dict): - return self._set_and_check_for_dict(update) - if update.path == 'user_ids': + ) -> Optional[tuple[LogScope, Any]]: + def fmt_ids(user_ids: tuple[str, ...]) -> str: + return ', '.join(user_ids) + if update.path == ('user_ids',): assert isinstance(update.value, tuple) - d = {'joins': tuple(user_id for user_id in update.value - if user_id not in self.user_ids), - 'parts': tuple(user_id for user_id in self.user_ids - if user_id not in update.value)} - return d if super().set_and_check_for_change(update) else False + d: dict[str, str] = {} + if not self.user_ids: + d['residents'] = fmt_ids(update.value) + else: + d['joining'] = fmt_ids(tuple(id_ for id_ in update.value + if id_ not in self.user_ids)) + d['parting'] = fmt_ids(tuple(id_ for id_ in self.user_ids + if id_ not in update.value)) + if super().set_and_check_for_change(update): + return (self._scope(update.path), d) + return None return super().set_and_check_for_change(update) -class _TuiClientDb(_Db, SharedClientDbFields): - caps: dict[str, str] - isupports: dict[str, str] - motd: tuple[str, ...] - users: dict[str, NickUserHost] - _channels: dict[str, _ChannelDb] +class _UpdatingNickUserHost(_UpdatingNode, NickUserHost): + pass - def set_and_check_for_change(self, update: _Update - ) -> bool | dict[str, tuple[str, ...]]: - result: bool | dict[str, tuple[str, ...]] = False - if self.is_chan_name(update.path): - chan_name = update.path - if update.value is None and not update.arg: - del self._channels[chan_name] - result = True - else: - update.path = update.arg - update.arg = '' - result = self.chan(chan_name).set_and_check_for_change(update) - if isinstance(result, dict): - for key, user_ids in result.items(): - result[key] = tuple(self.users[user_id].nick - for user_id in user_ids) - elif isinstance(getattr(self, update.path), dict): - result = self._set_and_check_for_dict(update) - else: - result = super().set_and_check_for_change(update) - return result - def is_chan_name(self, name: str) -> bool: - 'Tests name to match CHANTYPES prefixes.' - return name[0] in self.isupports.get('CHANTYPES', - ISUPPORT_DEFAULTS['CHANTYPES']) +class _UpdatingServerCapability(_UpdatingNode, ServerCapability): + pass - def chan(self, name: str) -> _ChannelDb: - 'Produce DB for channel of name – pre-existing, or newly created.' - if name not in self._channels: - self._channels[name] = _ChannelDb() - return self._channels[name] + +_UPDATING_DATACLASSES = (_UpdatingNickUserHost, _UpdatingServerCapability) + + +class _TuiClientDb(_UpdatingNode, SharedClientDbFields): + caps: _UpdatingDict[_UpdatingServerCapability] + isupport: _UpdatingDict[str] + motd: tuple[str, ...] = tuple() + users: _UpdatingDict[_UpdatingNickUserHost] + channels: _UpdatingDict[_UpdatingChannel] + log_scopes = {tuple('connection_state'): LogScope.ALL} -@dataclass class _ClientWindowsManager: - _tui_log: Callable - _tui_new_window: Callable - def __post_init__(self, *_, **__) -> None: + def __init__(self, tui_log: Callable, tui_new_window: Callable) -> None: + self._tui_log = tui_log + self._tui_new_window = tui_new_window self.db = _TuiClientDb() self.windows: list[_ClientWindow] = [] for scope in (LogScope.SERVER, LogScope.RAW): @@ -225,8 +243,9 @@ class _ClientWindowsManager: def _new_win(self, scope: LogScope, chatname: str = '') -> _ClientWindow: kwargs = {'scope': scope, 'log': self.log, 'win_cls': _ClientWindow} if scope == LogScope.CHAT: - kwargs['win_cls'] = (_ChannelWindow if chatname[0] == '#' - else _ChatWindow) + kwargs['win_cls'] = ( + _ChannelWindow if self.db.is_chan_name(chatname) + else _ChatWindow) kwargs['chatname'] = chatname kwargs['get_nick_data'] = lambda: self.db.users['me'].nick win = self._tui_new_window(**kwargs) @@ -261,30 +280,31 @@ class _ClientWindowsManager: def update_db(self, update: _Update) -> bool: 'Apply update to .db, and if changing anything, log and trigger.' - is_chan_update = self.db.is_chan_name(update.path) - scope = (LogScope.CHAT if is_chan_update - else (LogScope.ALL if update.path == 'connection_state' - else LogScope.SERVER)) - verb = 'cleared' if update.value is None else 'changed to:' - what = f'{update.path}:{update.arg}' if update.arg else update.path - log_kwargs = {'target': update.path} if is_chan_update else {} + for cls in [cls for cls in _UPDATING_DATACLASSES + if isinstance(update.value, get_original_bases(cls)[1])]: + update.value = cls(**dc_asdict(update.value)) # type: ignore + break result = self.db.set_and_check_for_change(update) - if result is False: + if result is None: return False - if isinstance(result, dict): - for verb, names in result.items(): - for name in names: - self.log(f'{name} {verb}', scope=scope, **log_kwargs) + scope, value = result + log_path = ':'.join(update.path) + log_kwargs: dict[str, Any] = {'scope': scope} + if scope is LogScope.CHAT: + log_kwargs |= {'target': update.path[1]} + if value is None: + self.log(f'{log_path} cleared', **log_kwargs) + elif isinstance(value, dict): + for verb, item in [(k, v) for k, v in value.items() if v]: + self.log(f'{verb}: {item}', **log_kwargs) else: - announcement = f'{what} {verb}' - if isinstance(update.value, tuple) or announcement[-1] != ':': - self.log(announcement, scope=scope, **log_kwargs) - if isinstance(update.value, tuple): - for item in update.value: - self.log(f' {item}', scope=scope, **log_kwargs) - elif announcement[-1] == ':': - self.log(f'{announcement} [{update.display}]', - scope=scope, **log_kwargs) + announcement = f'{log_path} set to:' + if isinstance(value, tuple): + self.log(announcement, **log_kwargs) + for item in value: + self.log(f' {item}', **log_kwargs) + else: + self.log(f'{announcement} [{value}]', **log_kwargs) for win in [w for w in self.windows if isinstance(w, _ChatWindow)]: win.set_prompt_prefix() return bool([w for w in self.windows if w.tainted]) @@ -317,9 +337,9 @@ class ClientTui(BaseTui): 'Forward todo to appropriate _ClientWindowsManager.' if client_id not in self._client_mngrs: self._client_mngrs[client_id] = _ClientWindowsManager( - _tui_log=lambda msg, **kw: self._log( + tui_log=lambda msg, **kw: self._log( msg, client_id=client_id, **kw), - _tui_new_window=lambda win_cls, **kw: self._new_window( + tui_new_window=lambda win_cls, **kw: self._new_window( win_cls, _q_out=self._q_out, client_id=client_id, **kw)) if getattr(self._client_mngrs[client_id], todo)(**kwargs) is not False: self.redraw_affected() @@ -362,7 +382,8 @@ class _ClientKnowingTui(Client): def privmsg(self, target: str, msg: str) -> None: 'Catch /privmsg, only allow for channel if in channel, else complain.' - if target[0] == '#' and target not in self._db.chan_names: + if self._db.is_chan_name(target)\ + and target not in self._db.channels.keys(): self._log('not sending, since not in channel', scope=LogScope.SAME, alert=True) return @@ -387,27 +408,15 @@ class _ClientKnowingTui(Client): with open(f'{self.client_id}.log', 'a', encoding='utf8') as f: f.write(('>' if kwargs['out'] else '<') + f' {msg}\n') - def _on_update(self, path: str, arg: str = '') -> None: - value: Optional[_DbType] = None - is_chan = path[0] in self._db.chan_prefixes - display = '' - if arg: - if is_chan and path in self._db.chan_names: - if (chan := self._db.chan(path)) and hasattr(chan, arg): - value = getattr(chan, arg) - else: - d = getattr(self._db, path) - if arg in d.keys: - value = d[arg] - if isinstance(value, ServerCapability): - display = 'ENABLED' if value.enabled else 'available' - if value.data: - display += f' ({value.data})' - elif (not is_chan) and not self._db.needs_arg(path): - value = getattr(self._db, path) - if isinstance(value, NickUserHost): - value = value.copy() - elif value and hasattr(value, 'completed'): - value = value.completed - self._client_tui_trigger('update_db', update=_Update( - path, arg, value, display)) + def _on_update(self, *path) -> None: + parent = self._db + for step in path[:-1]: + parent = (parent[step] if isinstance(parent, Dict) + else getattr(parent, step)) + last_step = path[-1] + value = ((parent[last_step] if last_step in parent.keys() + else None) if isinstance(parent, Dict) + else getattr(parent, last_step)) + if value and hasattr(value, 'static_copy'): + value = value.static_copy + self._client_tui_trigger('update_db', update=_Update(path, value))