home · contact · privacy
Overhaul database hierarchies.
authorChristian Heller <c.heller@plomlompom.de>
Tue, 2 Sep 2025 03:19:16 +0000 (05:19 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Tue, 2 Sep 2025 03:19:16 +0000 (05:19 +0200)
ircplom/client.py
ircplom/client_tui.py

index 46b21cc209f9807e6ae659f1eaf1576c830d452e..bc10127f382b1ae8f9bff30cf7aef9495a88c11e 100644 (file)
@@ -2,7 +2,7 @@
 # built-ins
 from abc import ABC, abstractmethod
 from base64 import b64encode
-from dataclasses import dataclass, asdict as dc_asdict, InitVar
+from dataclasses import dataclass, InitVar
 from enum import Enum, auto
 from getpass import getuser
 from threading import Thread
@@ -10,37 +10,58 @@ from typing import Any, Callable, Generic, NamedTuple, Optional, Self, TypeVar
 from uuid import uuid4
 # ourselves
 from ircplom.events import (
-        AffectiveEvent, CrashingException, ExceptionEvent, QueueMixin)
+    AffectiveEvent, CrashingException, ExceptionEvent, QueueMixin)
 from ircplom.irc_conn import (
-    BaseIrcConnection, IrcConnAbortException, IrcMessage, ILLEGAL_NICK_CHARS,
-    ILLEGAL_NICK_FIRSTCHARS, ISUPPORT_DEFAULTS, PORT_SSL)
+    BaseIrcConnection, IrcConnAbortException, IrcMessage,
+    ILLEGAL_NICK_CHARS, ILLEGAL_NICK_FIRSTCHARS, ISUPPORT_DEFAULTS, PORT_SSL)
 
 ClientsDb = dict[str, 'Client']
 
 
-class Db:
-    'Helper with some conveniences around annotated attributes.'
+class _DeepAnnotationsMixin:
+    'Provide ._deep_annotations() of non-underscored annotations of whole MRO.'
 
-    def __init__(self, **kwargs) -> None:
-        self._types: dict[str, type] = {}
-        for c in self.__class__.__mro__:
+    @classmethod
+    def _deep_annotations(cls) -> dict[str, type]:
+        types: dict[str, type] = {}
+        for c in cls.__mro__:
             if hasattr(c, '__annotations__'):
-                self._types = c.__annotations__ | self._types
-        for name, type_ in self._types.items():
-            setattr(self, name, type_())
+                types = c.__annotations__ | types
+        return {k: v for k, v in types.items() if k[0] != '_'}
+
+
+class AutoAttrMixin(_DeepAnnotationsMixin):
+    'Ensures attribute as defined by annotations, and ._make_attr method.'
+
+    def __getattribute__(self, key: str):
+        if key[0] != '_' and (cls := self._deep_annotations().get(key, None)):
+            try:
+                return super().__getattribute__(key)
+            except AttributeError:
+                setattr(self, key, self._make_attr(cls, key))
+        return super().__getattribute__(key)
+
+
+DictItem = TypeVar('DictItem')
+
+
+class Dict(Generic[DictItem]):
+    'Customized dict replacement.'
+
+    def __init__(self, **kwargs) -> None:
+        self._dict: dict[str, DictItem] = {}
         super().__init__(**kwargs)
 
-    def _typecheck(self, key: str, value) -> None:
-        type_ = self._types[key]
-        if hasattr(type_, '__origin__'):
-            assert isinstance(value, type_.__origin__)
-            if len(value):
-                assert hasattr(type_, '__args__')
-                item_type = type_.__args__[0]
-                for item in value:
-                    assert isinstance(item, item_type)
-        else:
-            assert isinstance(value, type_)
+    def __getitem__(self, key: str) -> DictItem:
+        return self._dict[key]
+
+    def clear(self) -> None:
+        'Zero content.'
+        self._dict.clear()
+
+    @property
+    def _item_cls(self):
+        return self.__orig_class__.__args__[0]
 
 
 @dataclass
@@ -53,24 +74,17 @@ class IrcConnSetup:
     password: str = ''
 
 
-class SharedChannelDbFields:
+class SharedClientDbFields(IrcConnSetup):
     'API for fields shared directly in name and type with TUI.'
-    user_ids: tuple[str, ...]
-    # topic: str
-    # channel_modes: str
-
+    connection_state: str = ''
+    isupport: Dict[str]
+    sasl_account: str = ''
+    sasl_auth_state: str = ''
+    user_modes: str = ''
 
-_ChannelDbFields = TypeVar('_ChannelDbFields', bound=SharedChannelDbFields)
-
-
-class SharedClientDbFields(IrcConnSetup, Generic[_ChannelDbFields]):
-    'API for fields shared directly in name and type with TUI.'
-    connection_state: str
-    sasl_account: str
-    sasl_auth_state: str
-    user_modes: str
-    users: Any
-    _channels: dict[str, _ChannelDbFields]
+    def is_chan_name(self, name: str) -> bool:
+        'Tests name to match CHANTYPES prefixes.'
+        return name[0] in self.isupport['CHANTYPES']
 
 
 @dataclass
@@ -80,15 +94,11 @@ class NickUserHost:
     user: str = '?'
     host: str = '?'
 
-    def copy(self) -> Self:
-        'Produce copy not subject to later attribute changes on original.'
-        return self.__class__(**dc_asdict(self))
-
 
 @dataclass
 class ServerCapability:
     'Public API for CAP data.'
-    data: str
+    data: str = ''
     enabled: bool = False
 
 
@@ -224,11 +234,6 @@ _EXPECTATIONS += [
                           _MsgTok.ANY,
                           _MsgTok.ANY,
                           _MsgTok.ANY)),
-    _MsgParseExpectation(_MsgTok.SERVER,
-                         '366',  # RPL_ENDOFNAMES
-                         ((_MsgTok.NICKNAME, 'set_me_attr:nick'),
-                          _MsgTok.CHANNEL,
-                          _MsgTok.ANY)),  # comment
     _MsgParseExpectation(_MsgTok.SERVER,
                          '375',  # RPL_MOTDSTART already implied by 1st 372
                          ((_MsgTok.NICKNAME, 'set_me_attr:nick'),
@@ -240,7 +245,7 @@ _EXPECTATIONS += [
     _MsgParseExpectation(_MsgTok.SERVER,
                          '005',  # RPL_ISUPPORT
                          ((_MsgTok.NICKNAME, 'set_me_attr:nick'),
-                          (_MsgTok.ANY, ':isupports'),
+                          (_MsgTok.ANY, ':isupport'),
                           _MsgTok.ANY),  # comment
                          idx_into_list=1),
     _MsgParseExpectation(_MsgTok.SERVER,
@@ -388,6 +393,11 @@ _EXPECTATIONS += [
                           '=',
                           (_MsgTok.CHANNEL, ':channel'),
                           (_MsgTok.LIST, ':names'))),
+    _MsgParseExpectation(_MsgTok.SERVER,
+                         '366',  # RPL_ENDOFNAMES
+                         ((_MsgTok.NICKNAME, 'set_me_attr:nick'),
+                          (_MsgTok.CHANNEL, ':channel'),
+                          _MsgTok.ANY)),  # comment
     _MsgParseExpectation((_MsgTok.NICK_USER_HOST, ':joiner'),
                          'JOIN',
                          ((_MsgTok.CHANNEL, ':channel'),)),
@@ -480,43 +490,32 @@ class _IrcConnection(BaseIrcConnection, _ClientIdMixin):
                                     client_id=self.client_id).kw(e=e)
 
 
-class _UpdatingDict:
-    _on_update: Callable
+class _Dict(Dict[DictItem]):
+    _defaults: dict[str, DictItem]
 
-    def __init__(self) -> None:
-        self._dict: dict[str, Any] = {}
+    def __init__(self, **kwargs) -> None:
+        super().__init__(**kwargs)
+        self._defaults = {}
 
-    def __getitem__(self, key: str):
-        return self._dict[key] if key in self._dict else None
+    def __getitem__(self, key: str) -> DictItem:
+        if key not in self._dict and key in self._defaults:
+            return self._defaults[key]
+        return super().__getitem__(key)
 
-    def __setitem__(self, key: str, val: Any) -> None:
-        if isinstance(val, _NickUserHost):
-            val.set_on_update(lambda: self._on_update(key))
+    def __setitem__(self, key: str, val: DictItem) -> None:
+        assert isinstance(val, self._item_cls)
         self._dict[key] = val
-        self._on_update(key)
 
     def __delitem__(self, key: str) -> None:
         del self._dict[key]
-        self._on_update(key)
 
-    @property
     def keys(self) -> tuple[str, ...]:
         'Keys of item registrations.'
         return tuple(self._dict.keys())
 
-    def clear(self) -> None:
-        'Zero dict and send clearance update.'
-        self._dict.clear()
-        self._on_update('')
-
-    def set_on_update(self, name: str, on_update: Callable) -> None:
-        'Caller of on_update with path= set to name.'
-        self._on_update = lambda k: on_update(name, k)
-
-    def set_from_eq_str(self, eq_str: str, cls=str) -> None:
-        'Set from .key_val_from_eq_str result, to cls(val).'
-        key, value = self.key_val_from_eq_str(eq_str)
-        self[key] = cls(value)
+    def values(self) -> tuple[DictItem, ...]:
+        'Items registered.'
+        return tuple(self._dict.values())
 
     @staticmethod
     def key_val_from_eq_str(eq_str: str) -> tuple[str, str]:
@@ -526,29 +525,40 @@ class _UpdatingDict:
 
 
 class _CompletableStringsList:
-    _on_update: Callable
 
     def __init__(self) -> None:
         self._incomplete: list[str] = []
         self.completed: tuple[str, ...] = tuple()
 
-    def append(self, value: str) -> None:
+    def _on_list(self, m_name: str, value: str, complete: bool) -> None:
+        getattr(self._incomplete, m_name)(value)
+        if complete:
+            self.complete()
+
+    def append(self, value: str, complete=False) -> None:
         'Append value to list.'
-        self._incomplete.append(value)
+        self._on_list('append', value, complete)
+
+    def remove(self, value: str, complete=False) -> None:
+        'Remove value from list.'
+        self._on_list('remove', value, complete)
 
     def complete(self) -> None:
-        'Declare list done, call updater if set.'
+        'Declare list done.'
         self.completed = tuple(self._incomplete)
+        # self._incomplete.clear()
+
+    def clear(self) -> None:
+        'Wipe content and declare new emptiness as complete.'
         self._incomplete.clear()
-        if hasattr(self, '_on_update'):
-            self._on_update()
+        self.complete()
 
 
 class _CapsManager:
 
     def __init__(self,
                  sender: Callable[[IrcMessage], None],
-                 caps_dict: _UpdatingDict
+                 caps_dict: '_UpdatingDict[_UpdatingServerCapability]'
                  ) -> None:
         self._dict = caps_dict
         self._send = lambda *params: sender(IrcMessage('CAP', params=params))
@@ -576,7 +586,8 @@ class _CapsManager:
         'Parse CAP message to negot. steps, DB inputs; return if successful.'
         for item in items:
             if verb == 'NEW':
-                self._dict.set_from_eq_str(item, ServerCapability)
+                key, data = _Dict.key_val_from_eq_str(item)
+                self._dict.set_updating(key, ServerCapability(data=data))
             elif verb == 'DEL':
                 del self._dict[item]
             elif verb in {'ACK', 'NACK'}:
@@ -597,67 +608,65 @@ class _CapsManager:
                     list_set = set(target.completed)
                     assert acks == list_set & acks
                     assert set() == list_set & naks
-                    for key, data in [_UpdatingDict.key_val_from_eq_str(entry)
+                    for key, data in [_Dict.key_val_from_eq_str(entry)
                                       for entry in self._ls.completed]:
-                        self._dict[key] = ServerCapability(
-                                data=data, enabled=key in self._list.completed)
+                        self._dict.set_updating(key, ServerCapability(
+                            data=data, enabled=key in self._list.completed))
                     return True
         return False
 
 
-class _Db(Db):
-
-    def __init__(self, on_update: Callable, **kwargs) -> None:
-        self._still_on_init = True
-        self._on_update = on_update
-        super().__init__(**kwargs)
-        for key in [k for k in self._types if self._types[k] is _UpdatingDict]:
-            getattr(self, key).set_on_update(key, self._on_update)
-        self._still_on_init = False
-
-    def __setattr__(self, key: str, value) -> None:
-        if (not hasattr(self, '_still_on_init')) or self._still_on_init:
-            super().__setattr__(key, value)
-            return
-        self._typecheck(key, value)
-        super().__setattr__(key, value)
-        if key[0] != '_':
-            self._on_update(key)
-
+class _Channel:
+    user_ids: _CompletableStringsList
 
-class _ChannelDb(_Db, SharedChannelDbFields):
-
-    def __init__(self, purge_users: Callable, **kwargs) -> None:
+    def __init__(self,
+                 get_id_for_nick: Callable,
+                 get_membership_prefixes: Callable,
+                 purge_users: Callable,
+                 **kwargs
+                 ) -> None:
+        self._get_id_for_nick = get_id_for_nick
+        self._get_membership_prefixes = get_membership_prefixes
         self._purge_users = purge_users
         super().__init__(**kwargs)
 
-    def add_user(self, user_id: str) -> None:
-        'Add user_id to .user_ids.'
-        self.user_ids = tuple(list(self.user_ids) + [user_id])
-        self._on_update('user_ids')
-
-    def remove_user(self, user_id: str) -> None:
-        'Remove user_id from .user_ids.'
-        self.user_ids = tuple(id_ for id_ in self.user_ids if id_ != user_id)
-        self._on_update('user_ids')
+    def add_from_namreply(self, items: tuple[str, ...]):
+        'Add to .user_ids items assumed as nicknames with membership prefixes.'
+        for item in items:
+            nickname = item.lstrip(self._get_membership_prefixes())
+            self.user_ids.append(self._get_id_for_nick(nickname))
+
+    def append_nick(self, nickname: str) -> None:
+        'To .user_ids append .nickname and declare .user_ids complete.'
+        user_id = self._get_id_for_nick(nickname)
+        self.user_ids.append(user_id, complete=True)
+
+    def remove_nick(self, nickname: str) -> None:
+        'From .user_ids remove .nickname and declare .user_ids complete.'
+        user_id = self._get_id_for_nick(nickname)
+        self.user_ids.remove(user_id, complete=True)
         self._purge_users()
 
 
-class _NickUserHost(NickUserHost):
-    _on_update: Callable
+class _NickUserHost(NickUserHost, _DeepAnnotationsMixin):
 
     def __str__(self) -> str:
         return f'{self.nick}!{self.user}@{self.host}'
 
-    def __setattr__(self, key: str, value: Any) -> None:
+    def __eq__(self, other) -> bool:
+        if not isinstance(other, NickUserHost):
+            return False
+        for key in self._deep_annotations().keys():
+            if getattr(self, key) != getattr(other, key):
+                return False
+        return True
+
+    def __setattr__(self, key: str, value: NickUserHost | str) -> None:
         if key == 'nickuserhost' and isinstance(value, _NickUserHost):
-            self.nick = value.nick
-            self.user = value.user
-            self.host = value.host
+            for annotated_key in self._deep_annotations().keys():
+                setattr(self, annotated_key, getattr(value, annotated_key))
         else:
             super().__setattr__(key, value)
-            if key != '_on_update' and hasattr(self, '_on_update'):
-                self._on_update()
 
     @classmethod
     def from_str(cls, value: str) -> Self:
@@ -668,96 +677,183 @@ class _NickUserHost(NickUserHost):
         assert len(toks) == 3
         return cls(*toks)
 
-    def set_on_update(self, on_update: Callable) -> None:
-        'Caller of on_update with path= set to name.'
+
+class _UpdatingMixin(AutoAttrMixin):
+    _on_update: Callable
+
+    def __init__(self, on_update: Callable, **kwargs) -> None:
+        super().__init__(**kwargs)
         self._on_update = on_update
 
+    @property
+    def static_copy(self):
+        'Return non-updating copy of self.'
+        if isinstance(self, _Dict):
+            return None
+        if isinstance(self, _CompletableStringsList):
+            return self.completed
+        for cls in self.__class__.__mro__:
+            if cls != _DeepAnnotationsMixin\
+                    and AutoAttrMixin not in cls.__mro__:
+                obj = cls()
+                break
+        for key in self._deep_annotations():
+            attr_val = getattr(self, key)
+            setattr(obj, key,
+                    attr_val if not isinstance(attr_val, _UpdatingMixin)
+                    else attr_val.static_copy)
+        return obj
+
+    def _make_attr(self, cls: Callable, key: str):
+        return cls(on_update=lambda *steps: self._on_update(key, *steps))
 
-class _ClientDb(_Db, SharedClientDbFields):
-    caps: _UpdatingDict
-    isupports: _UpdatingDict
-    users: _UpdatingDict
-    motd: _CompletableStringsList
-    _channels: dict[str, _ChannelDb]
+    @staticmethod
+    def _silenced_key(key: str, encode=True) -> str:
+        prefix = '__SILENCE__'
+        if encode:
+            return prefix + key
+        return key[len(prefix):] if key.startswith(prefix) else ''
 
-    def __init__(self, **kwargs) -> None:
-        super().__init__(**kwargs)
-        self.motd._on_update = lambda: self._on_update('motd')
+    def __setattr__(self, key: str, value) -> None:
+        if (s_key := self._silenced_key(key, encode=False)):
+            super().__setattr__(s_key, value)
+            return
+        super().__setattr__(key, value)
+        if hasattr(self, '_on_update') and key in self._deep_annotations():
+            self._on_update(key)
+
+    @classmethod
+    def from_silent(cls, original, on_update: Callable) -> Self:
+        'From non-updating original return updating variant.'
+        obj = cls(on_update=on_update)
+        for key in cls._deep_annotations().keys():
+            setattr(obj, cls._silenced_key(key), getattr(original, key))
+        return obj
+
+
+class _UpdatingDict(_UpdatingMixin, _Dict[DictItem]):
+    _create_if_none: Optional[dict[str, Any]] = None
+
+    @property
+    def _values_are_updating(self) -> bool:
+        return _UpdatingMixin in self._item_cls.__mro__
+
+    def __getitem__(self, key: str) -> DictItem:
+        if key not in self._dict:
+            if self._create_if_none is not None:
+                kw = {} | self._create_if_none
+                if self._values_are_updating:
+                    kw |= {'on_update':
+                           lambda *steps: self._on_update(key, *steps)}
+                self._dict[key] = self._item_cls(**kw)
+        return super().__getitem__(key)
+
+    def set_updating(self, key: str, val) -> None:
+        'Set item at key from non-updating val into updating one.'
+        self[key] = self._item_cls.from_silent(
+            original=val,
+            on_update=lambda *steps: self._on_update(key, *steps))
+
+    def __setitem__(self, key: str, val: DictItem) -> None:
+        super().__setitem__(key, val)
+        self._on_update(key)
+
+    def __delitem__(self, key: str) -> None:
+        super().__delitem__(key)
+        self._on_update(key)
+
+    def clear(self) -> None:
+        super().clear()
+        self._on_update()
+
+
+class _UpdatingCompletableStringsList(_UpdatingMixin, _CompletableStringsList):
+
+    def complete(self) -> None:
+        super().complete()
+        self._on_update()
+
+
+class _UpdatingServerCapability(_UpdatingMixin, ServerCapability):
+    pass
+
+
+class _UpdatingChannel(_UpdatingMixin, _Channel):
+    user_ids: _UpdatingCompletableStringsList
+
+
+class _UpdatingNickUserHost(_UpdatingMixin, _NickUserHost):
+    pass
+
+
+class _ClientDb(_UpdatingMixin, SharedClientDbFields):
+    _keep_on_clear = set(IrcConnSetup.__annotations__.keys())
+    caps: _UpdatingDict[_UpdatingServerCapability]
+    channels: _UpdatingDict[_UpdatingChannel]
+    isupport: _UpdatingDict[str]
+    motd: _UpdatingCompletableStringsList
+    users: _UpdatingDict[_UpdatingNickUserHost]
+
+    def __getattribute__(self, key: str):
+        attr = super().__getattribute__(key)
+        if key == 'isupport' and not attr._defaults:
+            attr._defaults = ISUPPORT_DEFAULTS
+        elif key == 'channels' and not attr._create_if_none:
+            attr._create_if_none = {
+                    'get_id_for_nick': self.user_id,
+                    'get_membership_prefixes': self._get_membership_prefixes,
+                    'purge_users': self._purge_users}
+        return attr
+
+    def clear(self) -> None:
+        'Wipe updating attributes.'
+        for key, value in [(k, v) for k, v in self._deep_annotations().items()
+                           if k not in self._keep_on_clear]:
+            if isinstance(value, (_Dict, _CompletableStringsList)):
+                value.clear()
+            elif isinstance(value, str):
+                setattr(self, key, '')
 
     def _purge_users(self) -> None:
         to_keep = {'me'}
-        for chan in self._channels.values():
-            to_keep |= set(chan.user_ids)
-        for user_id in [id_ for id_ in self.users.keys
+        for chan in self.channels.values():
+            to_keep |= set(chan.user_ids.completed)
+        for user_id in [id_ for id_ in self.users.keys()
                         if id_ not in to_keep]:
             del self.users[user_id]
 
-    def needs_arg(self, key: str) -> bool:
-        'Reply if attribute of key may reasonably be addressed without an arg.'
-        return not isinstance(getattr(self, key), (bool, int, str, tuple,
-                                                   _CompletableStringsList))
-
     @property
     def illegal_nick_firstchars(self) -> str:
-        'Calculated from hardcoded constants and .isupports.'
+        'Calculated from hardcoded constants and .isupport.'
         return (ILLEGAL_NICK_CHARS + ILLEGAL_NICK_FIRSTCHARS
-                + self.chan_prefixes + self.membership_prefixes)
+                + self.isupport['CHANTYPES'] + self._get_membership_prefixes())
 
-    @property
-    def chan_prefixes(self) -> str:
-        'Registered possible channel name prefixes.'
-        return self.isupports['CHANTYPES'] or ISUPPORT_DEFAULTS['CHANTYPES']
-
-    @property
-    def membership_prefixes(self) -> str:
+    def _get_membership_prefixes(self) -> str:
         'Registered possible membership nickname prefixes.'
-        prefix = self.isupports['PREFIX'] or ISUPPORT_DEFAULTS['PREFIX']
-        toks = prefix.split(')', maxsplit=1)
+        toks = self.isupport['PREFIX'].split(')', maxsplit=1)
         assert len(toks) == 2
         assert toks[0][0] == '('
         return toks[1]
 
+    def chans_of_user(self, nickname: str) -> dict[str, _UpdatingChannel]:
+        'Return dictionary of channels user is in.'
+        id_ = self.user_id(nickname)
+        return {k: self.channels[k] for k in self.channels.keys()
+                if id_ in self.channels[k].user_ids.completed}
+
     def user_id(self, query: str | _NickUserHost) -> str:
         'Return user_id for nickname of entire NickUserHost, create if none.'
         nick = query if isinstance(query, str) else query.nick
-        matches = [id_ for id_ in self.users.keys
+        matches = [id_ for id_ in self.users.keys()
                    if self.users[id_].nick == nick]
         assert len(matches) < 2
         id_ = matches[0] if matches else str(uuid4())
         if isinstance(query, _NickUserHost):
-            self.users[id_] = query
+            self.users.set_updating(id_, query)
         elif not matches:
-            self.users[id_] = _NickUserHost(query)
+            self.users.set_updating(id_, _NickUserHost(query))
         return id_
 
-    def remove_user(self, user_id: str) -> tuple[str, ...]:
-        'Run remove_user_from_channel on all channels user is in.'
-        affected_chans = []
-        for id_, chan in [(k, v) for k, v in self._channels.items()
-                          if user_id in v.user_ids]:
-            chan.remove_user(user_id)
-            affected_chans += [id_]
-        return tuple(affected_chans)
-
-    @property
-    def chan_names(self) -> tuple[str, ...]:
-        'Return names of joined channels.'
-        return tuple(self._channels.keys())
-
-    def del_chan(self, name: str) -> None:
-        'Remove DB for channel of name.'
-        del self._channels[name]
-        self._purge_users()
-        self._on_update(name)
-
-    def chan(self, name: str) -> _ChannelDb:
-        'Produce DB for channel of name – pre-existing, or newly created.'
-        if name not in self._channels:
-            self._channels[name] = _ChannelDb(
-                on_update=lambda k: self._on_update(name, k),
-                purge_users=self._purge_users)
-        return self._channels[name]
-
 
 class Client(ABC, ClientQueueMixin):
     'Abstracts socket connection, loop over it, and handling messages from it.'
@@ -768,7 +864,6 @@ class Client(ABC, ClientQueueMixin):
         self.client_id = conn_setup.hostname
         super().__init__(client_id=self.client_id, **kwargs)
         self._db = _ClientDb(on_update=self._on_update)
-        self._db.users['me'] = _NickUserHost('?', getuser(), '?')
         self._caps = _CapsManager(self.send, self._db.caps)
         for k in conn_setup.__annotations__:
             setattr(self._db, k, getattr(conn_setup, k))
@@ -797,6 +892,7 @@ class Client(ABC, ClientQueueMixin):
 
     def _on_connect(self) -> None:
         assert self.conn is not None
+        self._db.users.set_updating('me', _NickUserHost('?', getuser(), '?'))
         self._db.connection_state = 'connected'
         self._caps.start_negotation()
         self.send(IrcMessage(verb='USER', params=(
@@ -805,16 +901,11 @@ class Client(ABC, ClientQueueMixin):
         self.send(IrcMessage(verb='NICK', params=(self._db.nick_wanted,)))
 
     def close(self) -> None:
-        'Close both recv Loop and socket.'
-        self._db.connection_state = 'disconnected'
+        'Close connection and wipe memory of its states.'
+        self._db.clear()
         if self.conn:
             self.conn.close()
         self.conn = None
-        for name in self._db.chan_names:
-            self._db.del_chan(name)
-        self._db.isupports.clear()
-        self._db.users['me'].nick = '?'
-        self._db.sasl_auth_state = ''
 
     def on_handled_loop_exception(self, e: IrcConnAbortException) -> None:
         'Gracefully handle broken connection.'
@@ -822,7 +913,7 @@ class Client(ABC, ClientQueueMixin):
         self.close()
 
     @abstractmethod
-    def _on_update(self, path: str, arg: str = '') -> None:
+    def _on_update(self, *path) -> None:
         pass
 
     @abstractmethod
@@ -851,7 +942,7 @@ class Client(ABC, ClientQueueMixin):
     def _match_msg(self, msg: IrcMessage) -> dict[str, Any]:
         'Test .source, .verb, .params.'
         tok_type = (str | _NickUserHost | tuple[str, ...]
-                    | dict[str, str | _ChannelDb])
+                    | dict[str, str | _Channel])
 
         def param_match(ex_tok: str | _MsgTok, msg_tok: str | list[str]
                         ) -> Optional[tok_type | tuple[tok_type, ...]]:
@@ -871,8 +962,8 @@ class Client(ABC, ClientQueueMixin):
                 return msg_tok if ('.' in msg_tok
                                    and not set('@!') & set(msg_tok)) else None
             if ex_tok is _MsgTok.CHANNEL:
-                return {'id': msg_tok, 'db': self._db.chan(msg_tok)
-                        } if msg_tok[0] in self._db.chan_prefixes else None
+                return {'id': msg_tok, 'db': self._db.channels[msg_tok]
+                        } if self._db.is_chan_name(msg_tok) else None
             if ex_tok is _MsgTok.NICKNAME:
                 return (msg_tok
                         if msg_tok[0] not in self._db.illegal_nick_firstchars
@@ -930,19 +1021,19 @@ class Client(ABC, ClientQueueMixin):
                     setattr(self._db, arg, ret[arg])
                 if task == 'set_me_attr':
                     setattr(self._db.users['me'], arg, ret[arg])
-                if task == 'set_user' and ret[arg] != self._db.users['me']:
+                if task == 'set_user':
                     self._db.user_id(ret[arg])
         if ret['verb'] == '005':   # RPL_ISUPPORT
-            for item in ret['isupports']:
+            for item in ret['isupport']:
                 if item[0] == '-':
-                    del self._db.isupports[item[1:]]
+                    del self._db.isupport[item[1:]]
                 else:
-                    self._db.isupports.set_from_eq_str(str(item))
+                    key, data = _Dict.key_val_from_eq_str(item)
+                    self._db.isupport[key] = data
         elif ret['verb'] == '353':  # RPL_NAMREPLY
-            for user_id in [
-                    self._db.user_id(name.lstrip(self._db.membership_prefixes))
-                    for name in ret['names']]:
-                ret['channel']['db'].add_user(user_id)
+            ret['channel']['db'].add_from_namreply(ret['names'])
+        elif ret['verb'] == '366':  # RPL_ENDOFNAMES
+            ret['channel']['db'].user_ids.complete()
         elif ret['verb'] == '372':  # RPL_MOTD
             self._db.motd.append(ret['line'])
         elif ret['verb'] == '376':  # RPL_ENDOFMOTD
@@ -970,7 +1061,7 @@ class Client(ABC, ClientQueueMixin):
         elif ret['verb'] == 'CAP':
             if (self._caps.process_msg(verb=ret['subverb'], items=ret['items'],
                                        complete='tbc' not in ret)
-                    and 'sasl' in self._db.caps.keys
+                    and 'sasl' in self._db.caps.keys()
                     and 'PLAIN' in self._db.caps['sasl'].data.split(',')):
                 if self._db.password:
                     self._db.sasl_auth_state = 'attempting'
@@ -980,7 +1071,7 @@ class Client(ABC, ClientQueueMixin):
         elif ret['verb'] == 'ERROR':
             self.close()
         elif ret['verb'] == 'JOIN' and ret['joiner'] != self._db.users['me']:
-            ret['channel']['db'].add_user(self._db.user_id(ret['joiner']))
+            ret['channel']['db'].append_nick(ret['joiner'])
         elif ret['verb'] == 'NICK':
             user_id = self._db.user_id(ret['named'])
             self._db.users[user_id].nick = ret['nick']
@@ -996,13 +1087,13 @@ class Client(ABC, ClientQueueMixin):
             self._log(ret['message'], out=False, **kw)
         elif ret['verb'] == 'PART':
             if ret['parter'] == self._db.users['me']:
-                self._db.del_chan(ret['channel']['id'])
+                del self._db.channels[ret['channel']['id']]
             else:
-                ret['channel']['db'].remove_user(
-                        self._db.user_id(ret['parter']))
+                ret['channel']['db'].remove_nick(ret['parter'])
         elif ret['verb'] == 'PING':
             self.send(IrcMessage(verb='PONG', params=(ret['reply'],)))
         elif ret['verb'] == 'QUIT':
-            for chan in self._db.remove_user(self._db.user_id(ret['quitter'])):
+            for ch_name, ch in self._db.chans_of_user(ret['quitter']).items():
+                ch.remove_nick(ret['quitter'])
                 self._log(f'{ret["quitter"]} quits: {ret["message"]}',
-                          LogScope.CHAT, target=chan)
+                          LogScope.CHAT, target=ch_name)
index 5664df2af7d7cf373754f20b069169e6a2b1be3e..15125351dba10d123f879c13a532f00a387fc576 100644 (file)
@@ -1,15 +1,16 @@
 'TUI adaptions to Client.'
 # built-ins
 from getpass import getuser
-from dataclasses import dataclass
-from typing import Callable, Optional, Sequence
+from dataclasses import dataclass, asdict as dc_asdict
+from types import get_original_bases
+from typing import Any, Callable, Optional, Sequence
 # ourselves
 from ircplom.tui_base import (BaseTui, PromptWidget, TuiEvent, Window,
                               CMD_SHORTCUTS)
-from ircplom.irc_conn import IrcMessage, ISUPPORT_DEFAULTS
+from ircplom.irc_conn import IrcMessage
 from ircplom.client import (
-        Client, ClientQueueMixin, Db, IrcConnSetup, LogScope, NewClientEvent,
-        NickUserHost, ServerCapability, SharedChannelDbFields,
+        AutoAttrMixin, Client, ClientQueueMixin, Dict, DictItem, IrcConnSetup,
+        LogScope, NewClientEvent, NickUserHost, ServerCapability,
         SharedClientDbFields)
 
 CMD_SHORTCUTS['disconnect'] = 'window.disconnect'
@@ -24,8 +25,6 @@ _LOG_PREFIX_SERVER = '$'
 _LOG_PREFIX_OUT = '>'
 _LOG_PREFIX_IN = '<'
 
-_DbType = bool | int | str | tuple[str, ...] | NickUserHost
-
 
 class _ClientWindow(Window, ClientQueueMixin):
 
@@ -120,103 +119,122 @@ class _ChannelWindow(_ChatWindow):
 
 @dataclass
 class _Update:
-    path: str
-    arg: str = ''
-    value: Optional[_DbType] = None
-    display: str = ''
+    path: tuple[str, ...]
+    value: Optional[Any] = None
 
-    def __post_init__(self, **kwargs) -> None:
-        super().__init__(**kwargs)
-        if not self.display:
-            self.display = str(self.value)
 
+class _UpdatingNode(AutoAttrMixin):
+    log_scopes: dict[tuple[str, ...], LogScope] = {tuple(): LogScope.SERVER}
 
-class _Db(Db):
+    def _make_attr(self, cls: Callable, key: str):
+        return cls()
 
-    def _set_and_check_for_dict(self, update: _Update) -> bool:
-        d = getattr(self, update.path)
-        if update.value is None:
-            if update.arg == '':
-                d.clear()
-            elif update.arg in d:
-                del d[update.arg]
-            return True
-        old_value = d.get(update.arg, None)
-        d[update.arg] = update.value
-        return update.value != old_value
+    @classmethod
+    def _scope(cls, path: tuple[str, ...]) -> LogScope:
+        scopes: dict[tuple[str, ...], LogScope] = {}
+        for c in cls.__mro__:
+            if hasattr(c, 'log_scopes'):
+                scopes = c.log_scopes | scopes
+        return scopes.get(path, scopes[tuple()])
 
     def set_and_check_for_change(self, update: _Update
-                                 ) -> bool | dict[str, tuple[str, ...]]:
+                                 ) -> Optional[tuple[LogScope, Any]]:
         'Apply update, return if that actually made a difference.'
-        self._typecheck(update.path, update.value)
-        old_value = getattr(self, update.path)
-        setattr(self, update.path, update.value)
-        return update.value != old_value
+        key = update.path[0]
+        node = self._get(key)
+        scope = self._scope(update.path)
+        if len(update.path) == 1:
+            if update.value is None:
+                if not self._is_set(key):
+                    return None
+                self._unset(key)
+                return (scope, None)
+            if node == update.value:
+                return None
+            self._set(key, update.value)
+            return (scope, update.value)
+        return node.set_and_check_for_change(_Update(update.path[1:],
+                                                     update.value))
+
+    def _get(self, key: str):
+        return getattr(self, key)
+
+    def _set(self, key: str, value) -> None:
+        setattr(self, key, value)
+
+    def _unset(self, key: str) -> None:
+        getattr(self, key).clear()
+
+    def _is_set(self, key: str) -> bool:
+        return hasattr(self, key)
+
+
+class _UpdatingDict(Dict[DictItem], _UpdatingNode):
+
+    def _get(self, key: str):
+        if key not in self._dict:
+            self._dict[key] = self._item_cls()
+        return self._dict[key]
+
+    def _set(self, key: str, value) -> None:
+        self._dict[key] = value
+
+    def _unset(self, key: str) -> None:
+        del self._dict[key]
 
+    def _is_set(self, key: str) -> bool:
+        return key in self._dict
 
-class _ChannelDb(_Db, SharedChannelDbFields):
+
+class _UpdatingChannel(_UpdatingNode):
+    user_ids: tuple[str, ...] = tuple()
+    log_scopes = {tuple(): LogScope.CHAT}
 
     def set_and_check_for_change(self, update: _Update
-                                 ) -> bool | dict[str, tuple[str, ...]]:
-        if isinstance(getattr(self, update.path), dict):
-            return self._set_and_check_for_dict(update)
-        if update.path == 'user_ids':
+                                 ) -> Optional[tuple[LogScope, Any]]:
+        def fmt_ids(user_ids: tuple[str, ...]) -> str:
+            return ', '.join(user_ids)
+        if update.path == ('user_ids',):
             assert isinstance(update.value, tuple)
-            d = {'joins': tuple(user_id for user_id in update.value
-                                if user_id not in self.user_ids),
-                 'parts': tuple(user_id for user_id in self.user_ids
-                                if user_id not in update.value)}
-            return d if super().set_and_check_for_change(update) else False
+            d: dict[str, str] = {}
+            if not self.user_ids:
+                d['residents'] = fmt_ids(update.value)
+            else:
+                d['joining'] = fmt_ids(tuple(id_ for id_ in update.value
+                                             if id_ not in self.user_ids))
+                d['parting'] = fmt_ids(tuple(id_ for id_ in self.user_ids
+                                             if id_ not in update.value))
+            if super().set_and_check_for_change(update):
+                return (self._scope(update.path), d)
+            return None
         return super().set_and_check_for_change(update)
 
 
-class _TuiClientDb(_Db, SharedClientDbFields):
-    caps: dict[str, str]
-    isupports: dict[str, str]
-    motd: tuple[str, ...]
-    users: dict[str, NickUserHost]
-    _channels: dict[str, _ChannelDb]
+class _UpdatingNickUserHost(_UpdatingNode, NickUserHost):
+    pass
 
-    def set_and_check_for_change(self, update: _Update
-                                 ) -> bool | dict[str, tuple[str, ...]]:
-        result: bool | dict[str, tuple[str, ...]] = False
-        if self.is_chan_name(update.path):
-            chan_name = update.path
-            if update.value is None and not update.arg:
-                del self._channels[chan_name]
-                result = True
-            else:
-                update.path = update.arg
-                update.arg = ''
-                result = self.chan(chan_name).set_and_check_for_change(update)
-                if isinstance(result, dict):
-                    for key, user_ids in result.items():
-                        result[key] = tuple(self.users[user_id].nick
-                                            for user_id in user_ids)
-        elif isinstance(getattr(self, update.path), dict):
-            result = self._set_and_check_for_dict(update)
-        else:
-            result = super().set_and_check_for_change(update)
-        return result
 
-    def is_chan_name(self, name: str) -> bool:
-        'Tests name to match CHANTYPES prefixes.'
-        return name[0] in self.isupports.get('CHANTYPES',
-                                             ISUPPORT_DEFAULTS['CHANTYPES'])
+class _UpdatingServerCapability(_UpdatingNode, ServerCapability):
+    pass
 
-    def chan(self, name: str) -> _ChannelDb:
-        'Produce DB for channel of name – pre-existing, or newly created.'
-        if name not in self._channels:
-            self._channels[name] = _ChannelDb()
-        return self._channels[name]
+
+_UPDATING_DATACLASSES = (_UpdatingNickUserHost, _UpdatingServerCapability)
+
+
+class _TuiClientDb(_UpdatingNode, SharedClientDbFields):
+    caps: _UpdatingDict[_UpdatingServerCapability]
+    isupport: _UpdatingDict[str]
+    motd: tuple[str, ...] = tuple()
+    users: _UpdatingDict[_UpdatingNickUserHost]
+    channels: _UpdatingDict[_UpdatingChannel]
+    log_scopes = {tuple('connection_state'): LogScope.ALL}
 
 
-@dataclass
 class _ClientWindowsManager:
-    _tui_log: Callable
-    _tui_new_window: Callable
 
-    def __post_init__(self, *_, **__) -> None:
+    def __init__(self, tui_log: Callable, tui_new_window: Callable) -> None:
+        self._tui_log = tui_log
+        self._tui_new_window = tui_new_window
         self.db = _TuiClientDb()
         self.windows: list[_ClientWindow] = []
         for scope in (LogScope.SERVER, LogScope.RAW):
@@ -225,8 +243,9 @@ class _ClientWindowsManager:
     def _new_win(self, scope: LogScope, chatname: str = '') -> _ClientWindow:
         kwargs = {'scope': scope, 'log': self.log, 'win_cls': _ClientWindow}
         if scope == LogScope.CHAT:
-            kwargs['win_cls'] = (_ChannelWindow if chatname[0] == '#'
-                                 else _ChatWindow)
+            kwargs['win_cls'] = (
+                    _ChannelWindow if self.db.is_chan_name(chatname)
+                    else _ChatWindow)
             kwargs['chatname'] = chatname
             kwargs['get_nick_data'] = lambda: self.db.users['me'].nick
         win = self._tui_new_window(**kwargs)
@@ -261,30 +280,31 @@ class _ClientWindowsManager:
 
     def update_db(self, update: _Update) -> bool:
         'Apply update to .db, and if changing anything, log and trigger.'
-        is_chan_update = self.db.is_chan_name(update.path)
-        scope = (LogScope.CHAT if is_chan_update
-                 else (LogScope.ALL if update.path == 'connection_state'
-                       else LogScope.SERVER))
-        verb = 'cleared' if update.value is None else 'changed to:'
-        what = f'{update.path}:{update.arg}' if update.arg else update.path
-        log_kwargs = {'target': update.path} if is_chan_update else {}
+        for cls in [cls for cls in _UPDATING_DATACLASSES
+                    if isinstance(update.value, get_original_bases(cls)[1])]:
+            update.value = cls(**dc_asdict(update.value))  # type: ignore
+            break
         result = self.db.set_and_check_for_change(update)
-        if result is False:
+        if result is None:
             return False
-        if isinstance(result, dict):
-            for verb, names in result.items():
-                for name in names:
-                    self.log(f'{name} {verb}', scope=scope, **log_kwargs)
+        scope, value = result
+        log_path = ':'.join(update.path)
+        log_kwargs: dict[str, Any] = {'scope': scope}
+        if scope is LogScope.CHAT:
+            log_kwargs |= {'target': update.path[1]}
+        if value is None:
+            self.log(f'{log_path} cleared', **log_kwargs)
+        elif isinstance(value, dict):
+            for verb, item in [(k, v) for k, v in value.items() if v]:
+                self.log(f'{verb}: {item}', **log_kwargs)
         else:
-            announcement = f'{what} {verb}'
-            if isinstance(update.value, tuple) or announcement[-1] != ':':
-                self.log(announcement, scope=scope, **log_kwargs)
-            if isinstance(update.value, tuple):
-                for item in update.value:
-                    self.log(f'  {item}', scope=scope, **log_kwargs)
-            elif announcement[-1] == ':':
-                self.log(f'{announcement} [{update.display}]',
-                         scope=scope, **log_kwargs)
+            announcement = f'{log_path} set to:'
+            if isinstance(value, tuple):
+                self.log(announcement, **log_kwargs)
+                for item in value:
+                    self.log(f'  {item}', **log_kwargs)
+            else:
+                self.log(f'{announcement} [{value}]', **log_kwargs)
         for win in [w for w in self.windows if isinstance(w, _ChatWindow)]:
             win.set_prompt_prefix()
         return bool([w for w in self.windows if w.tainted])
@@ -317,9 +337,9 @@ class ClientTui(BaseTui):
         'Forward todo to appropriate _ClientWindowsManager.'
         if client_id not in self._client_mngrs:
             self._client_mngrs[client_id] = _ClientWindowsManager(
-                _tui_log=lambda msg, **kw: self._log(
+                tui_log=lambda msg, **kw: self._log(
                     msg, client_id=client_id, **kw),
-                _tui_new_window=lambda win_cls, **kw: self._new_window(
+                tui_new_window=lambda win_cls, **kw: self._new_window(
                     win_cls, _q_out=self._q_out, client_id=client_id, **kw))
         if getattr(self._client_mngrs[client_id], todo)(**kwargs) is not False:
             self.redraw_affected()
@@ -362,7 +382,8 @@ class _ClientKnowingTui(Client):
 
     def privmsg(self, target: str, msg: str) -> None:
         'Catch /privmsg, only allow for channel if in channel, else complain.'
-        if target[0] == '#' and target not in self._db.chan_names:
+        if self._db.is_chan_name(target)\
+                and target not in self._db.channels.keys():
             self._log('not sending, since not in channel',
                       scope=LogScope.SAME, alert=True)
             return
@@ -387,27 +408,15 @@ class _ClientKnowingTui(Client):
                 with open(f'{self.client_id}.log', 'a', encoding='utf8') as f:
                     f.write(('>' if kwargs['out'] else '<') + f' {msg}\n')
 
-    def _on_update(self, path: str, arg: str = '') -> None:
-        value: Optional[_DbType] = None
-        is_chan = path[0] in self._db.chan_prefixes
-        display = ''
-        if arg:
-            if is_chan and path in self._db.chan_names:
-                if (chan := self._db.chan(path)) and hasattr(chan, arg):
-                    value = getattr(chan, arg)
-            else:
-                d = getattr(self._db, path)
-                if arg in d.keys:
-                    value = d[arg]
-                    if isinstance(value, ServerCapability):
-                        display = 'ENABLED' if value.enabled else 'available'
-                        if value.data:
-                            display += f' ({value.data})'
-        elif (not is_chan) and not self._db.needs_arg(path):
-            value = getattr(self._db, path)
-        if isinstance(value, NickUserHost):
-            value = value.copy()
-        elif value and hasattr(value, 'completed'):
-            value = value.completed
-        self._client_tui_trigger('update_db', update=_Update(
-            path, arg, value, display))
+    def _on_update(self, *path) -> None:
+        parent = self._db
+        for step in path[:-1]:
+            parent = (parent[step] if isinstance(parent, Dict)
+                      else getattr(parent, step))
+        last_step = path[-1]
+        value = ((parent[last_step] if last_step in parent.keys()
+                  else None) if isinstance(parent, Dict)
+                 else getattr(parent, last_step))
+        if value and hasattr(value, 'static_copy'):
+            value = value.static_copy
+        self._client_tui_trigger('update_db', update=_Update(path, value))