home · contact · privacy
Some code re-organization.
authorChristian Heller <c.heller@plomlompom.de>
Tue, 2 Sep 2025 05:37:14 +0000 (07:37 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Tue, 2 Sep 2025 05:37:14 +0000 (07:37 +0200)
ircplom/client.py
ircplom/client_tui.py

index c0c352ea6a35adcac319f2c64cf33d1d6e47dd44..4c8130b505d7651c543b2d54ce73c49951bc5fdd 100644 (file)
@@ -16,11 +16,12 @@ from ircplom.irc_conn import (
     ILLEGAL_NICK_CHARS, ILLEGAL_NICK_FIRSTCHARS, ISUPPORT_DEFAULTS, PORT_SSL)
 from ircplom.msg_parse_expectations import MsgTok, MSG_EXPECTATIONS
 
-ClientsDb = dict[str, 'Client']
+
+_NAMES_DESIRED_SERVER_CAPS = ('sasl',)
 
 
-class _DeepAnnotationsMixin:
-    'Provide ._deep_annotations() of non-underscored annotations of whole MRO.'
+class AutoAttrMixin:
+    'Ensures attribute as defined by annotations along MRO'
 
     @classmethod
     def _deep_annotations(cls) -> dict[str, type]:
@@ -30,10 +31,6 @@ class _DeepAnnotationsMixin:
                 types = c.__annotations__ | types
         return {k: v for k, v in types.items() if k[0] != '_'}
 
-
-class AutoAttrMixin(_DeepAnnotationsMixin):
-    'Ensures attribute as defined by annotations, and ._make_attr method.'
-
     def __getattribute__(self, key: str):
         if key[0] != '_' and (cls := self._deep_annotations().get(key, None)):
             try:
@@ -65,114 +62,6 @@ class Dict(Generic[DictItem]):
         return self.__orig_class__.__args__[0]
 
 
-@dataclass
-class IrcConnSetup:
-    'All we need to know to set up a new Client connection.'
-    hostname: str = ''
-    port: int = 0
-    nick_wanted: str = ''
-    realname: str = ''
-    password: str = ''
-
-
-class SharedClientDbFields(IrcConnSetup):
-    'API for fields shared directly in name and type with TUI.'
-    connection_state: str = ''
-    isupport: Dict[str]
-    sasl_account: str = ''
-    sasl_auth_state: str = ''
-    user_modes: str = ''
-
-    def is_chan_name(self, name: str) -> bool:
-        'Tests name to match CHANTYPES prefixes.'
-        return name[0] in self.isupport['CHANTYPES']
-
-
-@dataclass
-class NickUserHost:
-    'Combination of nickname, username on host, and host.'
-    nick: str = '?'
-    user: str = '?'
-    host: str = '?'
-
-
-@dataclass
-class ServerCapability:
-    'Public API for CAP data.'
-    data: str = ''
-    enabled: bool = False
-
-
-class LogScope(Enum):
-    'Where log messages should go.'
-    ALL = auto()
-    SERVER = auto()
-    RAW = auto()
-    CHAT = auto()
-    SAME = auto()
-
-
-@dataclass
-class _ClientIdMixin:
-    'Collects a Client\'s ID at .client_id.'
-    client_id: str
-
-
-@dataclass
-class ClientEvent(AffectiveEvent, _ClientIdMixin):
-    'To affect Client identified by ClientIdMixin.'
-
-
-@dataclass
-class NewClientEvent(AffectiveEvent):
-    'Put Client .payload into ClientsDb target.'
-    payload: 'Client'
-
-    def affect(self, target: ClientsDb) -> None:
-        target[self.payload.client_id] = self.payload
-
-
-@dataclass
-class ClientQueueMixin(QueueMixin, _ClientIdMixin):
-    'To QueueMixin adds _cput to send ClientEvent for self.'
-
-    def _client_trigger(self, t_method: str, **kwargs) -> None:
-        self._put(ClientEvent.affector(t_method, client_id=self.client_id
-                                       ).kw(**kwargs))
-
-
-_NAMES_DESIRED_SERVER_CAPS = ('sasl',)
-
-
-def _nick_incremented(nickname: str) -> str:
-    'Return nickname with number suffix incremented, or "0" if none.'
-    name, digits = ([(nickname, '')]
-                    + [(nickname[:i], nickname[i:])
-                       for i in range(len(nickname), 0, -1)
-                       if nickname[i:].isdigit()]
-                    )[-1]
-    return name + str(0 if not digits else (int(digits) + 1))
-
-
-@dataclass
-class _IrcConnection(BaseIrcConnection, _ClientIdMixin):
-    hostname: InitVar[str]  # needed by BaseIrcConnection, but not desired as
-    port: InitVar[int]      # dataclass fields, only for __post_init__ call
-
-    def __post_init__(self, hostname, port, **kwargs) -> None:
-        super().__init__(hostname=hostname, port=port, _q_out=self._q_out,
-                         **kwargs)
-
-    def _make_recv_event(self, msg: IrcMessage) -> ClientEvent:
-        return ClientEvent.affector('handle_msg', client_id=self.client_id
-                                    ).kw(msg=msg)
-
-    def _on_handled_loop_exception(self, e: IrcConnAbortException
-                                   ) -> ClientEvent:
-        return ClientEvent.affector('on_handled_loop_exception',
-                                    client_id=self.client_id).kw(e=e)
-
-
 class _Dict(Dict[DictItem]):
     _defaults: dict[str, DictItem]
 
@@ -229,7 +118,6 @@ class _CompletableStringsList:
     def complete(self) -> None:
         'Declare list done.'
         self.completed = tuple(self._incomplete)
-        # self._incomplete.clear()
 
     def clear(self) -> None:
         'Wipe content and declare new emptiness as complete.'
@@ -237,130 +125,6 @@ class _CompletableStringsList:
         self.complete()
 
 
-class _CapsManager:
-
-    def __init__(self,
-                 sender: Callable[[IrcMessage], None],
-                 caps_dict: '_UpdatingDict[_UpdatingServerCapability]'
-                 ) -> None:
-        self._dict = caps_dict
-        self._send = lambda *params: sender(IrcMessage('CAP', params=params))
-        self.clear()
-
-    def clear(self) -> None:
-        'Zero internal knowledge.'
-        self._dict.clear()
-        self._ls = _CompletableStringsList()
-        self._list = _CompletableStringsList()
-        self._list_expectations: dict[str, set[str]] = {
-                'ACK': set(), 'NAK': set()}
-
-    def start_negotation(self) -> None:
-        'Call .clear, send CAPS LS 302.'
-        self.clear()
-        self._send('LS', '302')
-
-    def end_negotiation(self) -> None:
-        'Stop negotation, without emptying caps DB.'
-        self._send('END')
-
-    def process_msg(self, verb: str, items: tuple[str, ...], complete: bool
-                    ) -> bool:
-        'Parse CAP message to negot. steps, DB inputs; return if successful.'
-        for item in items:
-            if verb == 'NEW':
-                key, data = _Dict.key_val_from_eq_str(item)
-                self._dict.set_updating(key, ServerCapability(data=data))
-            elif verb == 'DEL':
-                del self._dict[item]
-            elif verb in {'ACK', 'NACK'}:
-                self._list_expectations[verb].add(item)
-        if verb in {'LS', 'LIST'}:
-            target = getattr(self, f'_{verb.lower()}')
-            for item in items:
-                target.append(item)
-            if complete:
-                target.complete()
-                if target == self._ls:
-                    for cap_name in _NAMES_DESIRED_SERVER_CAPS:
-                        self._send('REQ', cap_name)
-                    self._send('LIST')
-                elif target == self._list:
-                    acks = self._list_expectations['ACK']
-                    naks = self._list_expectations['NAK']
-                    list_set = set(target.completed)
-                    assert acks == list_set & acks
-                    assert set() == list_set & naks
-                    for key, data in [_Dict.key_val_from_eq_str(entry)
-                                      for entry in self._ls.completed]:
-                        self._dict.set_updating(key, ServerCapability(
-                            data=data, enabled=key in self._list.completed))
-                    return True
-        return False
-
-
-class _Channel:
-    user_ids: _CompletableStringsList
-
-    def __init__(self,
-                 get_id_for_nick: Callable,
-                 get_membership_prefixes: Callable,
-                 purge_users: Callable,
-                 **kwargs
-                 ) -> None:
-        self._get_id_for_nick = get_id_for_nick
-        self._get_membership_prefixes = get_membership_prefixes
-        self._purge_users = purge_users
-        super().__init__(**kwargs)
-
-    def add_from_namreply(self, items: tuple[str, ...]):
-        'Add to .user_ids items assumed as nicknames with membership prefixes.'
-        for item in items:
-            nickname = item.lstrip(self._get_membership_prefixes())
-            self.user_ids.append(self._get_id_for_nick(nickname))
-
-    def append_nick(self, nickname: str) -> None:
-        'To .user_ids append .nickname and declare .user_ids complete.'
-        user_id = self._get_id_for_nick(nickname)
-        self.user_ids.append(user_id, complete=True)
-
-    def remove_nick(self, nickname: str) -> None:
-        'From .user_ids remove .nickname and declare .user_ids complete.'
-        user_id = self._get_id_for_nick(nickname)
-        self.user_ids.remove(user_id, complete=True)
-        self._purge_users()
-
-
-class _NickUserHost(NickUserHost, _DeepAnnotationsMixin):
-
-    def __str__(self) -> str:
-        return f'{self.nick}!{self.user}@{self.host}'
-
-    def __eq__(self, other) -> bool:
-        if not isinstance(other, NickUserHost):
-            return False
-        for key in self._deep_annotations().keys():
-            if getattr(self, key) != getattr(other, key):
-                return False
-        return True
-
-    def __setattr__(self, key: str, value: NickUserHost | str) -> None:
-        if key == 'nickuserhost' and isinstance(value, _NickUserHost):
-            for annotated_key in self._deep_annotations().keys():
-                setattr(self, annotated_key, getattr(value, annotated_key))
-        else:
-            super().__setattr__(key, value)
-
-    @classmethod
-    def from_str(cls, value: str) -> Self:
-        'Produce from string assumed to fit _!_@_ pattern.'
-        toks = value.split('!')
-        assert len(toks) == 2
-        toks = toks[0:1] + toks[1].split('@')
-        assert len(toks) == 3
-        return cls(*toks)
-
-
 class _UpdatingMixin(AutoAttrMixin):
     _on_update: Callable
 
@@ -375,11 +139,10 @@ class _UpdatingMixin(AutoAttrMixin):
             return None
         if isinstance(self, _CompletableStringsList):
             return self.completed
-        for cls in self.__class__.__mro__:
-            if cls != _DeepAnnotationsMixin\
-                    and AutoAttrMixin not in cls.__mro__:
-                obj = cls()
-                break
+        for cls in [cls for cls in self.__class__.__mro__
+                    if AutoAttrMixin not in cls.__mro__]:
+            obj = cls()
+            break
         for key in self._deep_annotations():
             attr_val = getattr(self, key)
             setattr(obj, key,
@@ -457,6 +220,159 @@ class _UpdatingCompletableStringsList(_UpdatingMixin, _CompletableStringsList):
         self._on_update()
 
 
+@dataclass
+class IrcConnSetup:
+    'All we need to know to set up a new Client connection.'
+    hostname: str = ''
+    port: int = 0
+    nick_wanted: str = ''
+    realname: str = ''
+    password: str = ''
+
+
+class SharedClientDbFields(IrcConnSetup):
+    'API for fields shared directly in name and type with TUI.'
+    connection_state: str = ''
+    isupport: Dict[str]
+    sasl_account: str = ''
+    sasl_auth_state: str = ''
+    user_modes: str = ''
+
+    def is_chan_name(self, name: str) -> bool:
+        'Tests name to match CHANTYPES prefixes.'
+        return name[0] in self.isupport['CHANTYPES']
+
+
+@dataclass
+class NickUserHost:
+    'Combination of nickname, username on host, and host.'
+    nick: str = '?'
+    user: str = '?'
+    host: str = '?'
+
+
+@dataclass
+class ServerCapability:
+    'Public API for CAP data.'
+    data: str = ''
+    enabled: bool = False
+
+
+class LogScope(Enum):
+    'Where log messages should go.'
+    ALL = auto()
+    SERVER = auto()
+    RAW = auto()
+    CHAT = auto()
+    SAME = auto()
+
+
+@dataclass
+class _ClientIdMixin:
+    'Collects a Client\'s ID at .client_id.'
+    client_id: str
+
+
+@dataclass
+class ClientQueueMixin(QueueMixin, _ClientIdMixin):
+    'To QueueMixin adds _cput to send ClientEvent for self.'
+
+    def _client_trigger(self, t_method: str, **kwargs) -> None:
+        self._put(ClientEvent.affector(t_method, client_id=self.client_id
+                                       ).kw(**kwargs))
+
+
+@dataclass
+class _IrcConnection(BaseIrcConnection, _ClientIdMixin):
+    hostname: InitVar[str]  # needed by BaseIrcConnection, but not desired as
+    port: InitVar[int]      # dataclass fields, only for __post_init__ call
+
+    def __post_init__(self, hostname, port, **kwargs) -> None:
+        super().__init__(hostname=hostname, port=port, _q_out=self._q_out,
+                         **kwargs)
+
+    def _make_recv_event(self, msg: IrcMessage) -> 'ClientEvent':
+        return ClientEvent.affector('handle_msg', client_id=self.client_id
+                                    ).kw(msg=msg)
+
+    def _on_handled_loop_exception(self, e: IrcConnAbortException
+                                   ) -> 'ClientEvent':
+        return ClientEvent.affector('on_handled_loop_exception',
+                                    client_id=self.client_id).kw(e=e)
+
+
+class _Channel:
+    user_ids: _CompletableStringsList
+
+    def __init__(self,
+                 get_id_for_nick: Callable,
+                 get_membership_prefixes: Callable,
+                 purge_users: Callable,
+                 **kwargs
+                 ) -> None:
+        self._get_id_for_nick = get_id_for_nick
+        self._get_membership_prefixes = get_membership_prefixes
+        self._purge_users = purge_users
+        super().__init__(**kwargs)
+
+    def add_from_namreply(self, items: tuple[str, ...]):
+        'Add to .user_ids items assumed as nicknames with membership prefixes.'
+        for item in items:
+            nickname = item.lstrip(self._get_membership_prefixes())
+            self.user_ids.append(self._get_id_for_nick(nickname))
+
+    def append_nick(self, nickname: str) -> None:
+        'To .user_ids append .nickname and declare .user_ids complete.'
+        user_id = self._get_id_for_nick(nickname)
+        self.user_ids.append(user_id, complete=True)
+
+    def remove_nick(self, nickname: str) -> None:
+        'From .user_ids remove .nickname and declare .user_ids complete.'
+        user_id = self._get_id_for_nick(nickname)
+        self.user_ids.remove(user_id, complete=True)
+        self._purge_users()
+
+
+class _NickUserHost(NickUserHost):
+
+    def __str__(self) -> str:
+        return f'{self.nick}!{self.user}@{self.host}'
+
+    def __eq__(self, other) -> bool:
+        if not isinstance(other, NickUserHost):
+            return False
+        for key in NickUserHost.__annotations__:
+            if getattr(self, key) != getattr(other, key):
+                return False
+        return True
+
+    def __setattr__(self, key: str, value: NickUserHost | str) -> None:
+        if key == 'nickuserhost' and isinstance(value, _NickUserHost):
+            for annotated_key in NickUserHost.__annotations__:
+                setattr(self, annotated_key, getattr(value, annotated_key))
+        else:
+            super().__setattr__(key, value)
+
+    @classmethod
+    def from_str(cls, value: str) -> Self:
+        'Produce from string assumed to fit _!_@_ pattern.'
+        toks = value.split('!')
+        assert len(toks) == 2
+        toks = toks[0:1] + toks[1].split('@')
+        assert len(toks) == 3
+        return cls(*toks)
+
+    @property
+    def incremented(self) -> str:
+        'Return .nick with number suffix incremented, or "0" if none.'
+        name, digits = ([(self.nick, '')]
+                        + [(self.nick[:i], self.nick[i:])
+                           for i in range(len(self.nick), 0, -1)
+                           if self.nick[i:].isdigit()]
+                        )[-1]
+        return name + str(0 if not digits else (int(digits) + 1))
+
+
 class _UpdatingServerCapability(_UpdatingMixin, ServerCapability):
     pass
 
@@ -538,6 +454,68 @@ class _ClientDb(_UpdatingMixin, SharedClientDbFields):
         return id_
 
 
+class _CapsManager:
+
+    def __init__(self,
+                 sender: Callable[[IrcMessage], None],
+                 caps_dict: _UpdatingDict[_UpdatingServerCapability]
+                 ) -> None:
+        self._dict = caps_dict
+        self._send = lambda *params: sender(IrcMessage('CAP', params=params))
+        self.clear()
+
+    def clear(self) -> None:
+        'Zero internal knowledge.'
+        self._dict.clear()
+        self._ls = _CompletableStringsList()
+        self._list = _CompletableStringsList()
+        self._list_expectations: dict[str, set[str]] = {
+                'ACK': set(), 'NAK': set()}
+
+    def start_negotation(self) -> None:
+        'Call .clear, send CAPS LS 302.'
+        self.clear()
+        self._send('LS', '302')
+
+    def end_negotiation(self) -> None:
+        'Stop negotation, without emptying caps DB.'
+        self._send('END')
+
+    def process_msg(self, verb: str, items: tuple[str, ...], complete: bool
+                    ) -> bool:
+        'Parse CAP message to negot. steps, DB inputs; return if successful.'
+        for item in items:
+            if verb == 'NEW':
+                key, data = _Dict.key_val_from_eq_str(item)
+                self._dict.set_updating(key, ServerCapability(data=data))
+            elif verb == 'DEL':
+                del self._dict[item]
+            elif verb in {'ACK', 'NACK'}:
+                self._list_expectations[verb].add(item)
+        if verb in {'LS', 'LIST'}:
+            target = getattr(self, f'_{verb.lower()}')
+            for item in items:
+                target.append(item)
+            if complete:
+                target.complete()
+                if target == self._ls:
+                    for cap_name in _NAMES_DESIRED_SERVER_CAPS:
+                        self._send('REQ', cap_name)
+                    self._send('LIST')
+                elif target == self._list:
+                    acks = self._list_expectations['ACK']
+                    naks = self._list_expectations['NAK']
+                    list_set = set(target.completed)
+                    assert acks == list_set & acks
+                    assert set() == list_set & naks
+                    for key, data in [_Dict.key_val_from_eq_str(entry)
+                                      for entry in self._ls.completed]:
+                        self._dict.set_updating(key, ServerCapability(
+                            data=data, enabled=key in self._list.completed))
+                    return True
+        return False
+
+
 class Client(ABC, ClientQueueMixin):
     'Abstracts socket connection, loop over it, and handling messages from it.'
     _caps: _CapsManager
@@ -732,7 +710,8 @@ class Client(ABC, ClientQueueMixin):
             self._log(alert, alert=True)
         elif ret['verb'] == '433':  # ERR_NICKNAMEINUSE
             self._log('nickname already in use, trying increment', alert=True)
-            self.send(IrcMessage('NICK', (_nick_incremented(ret['used']),)))
+            self.send(IrcMessage(
+                'NICK', (_NickUserHost(nick=ret['used']).incremented,)))
         elif ret['verb'] in {'903', '904'}:  # RPL_SASLSUCCESS, ERR_SASLFAIL
             self._caps.end_negotiation()
         elif ret['verb'] == 'AUTHENTICATE':
@@ -780,3 +759,20 @@ class Client(ABC, ClientQueueMixin):
                 ch.remove_nick(ret['quitter'])
                 self._log(f'{ret["quitter"]} quits: {ret["message"]}',
                           LogScope.CHAT, target=ch_name)
+
+
+ClientsDb = dict[str, Client]
+
+
+@dataclass
+class NewClientEvent(AffectiveEvent):
+    'Put Client .payload into ClientsDb target.'
+    payload: 'Client'
+
+    def affect(self, target: ClientsDb) -> None:
+        target[self.payload.client_id] = self.payload
+
+
+@dataclass
+class ClientEvent(AffectiveEvent, _ClientIdMixin):
+    'To affect Client identified by ClientIdMixin.'
index 15125351dba10d123f879c13a532f00a387fc576..b7cc445ff83ae8a68fd175ca8582901cd1cb58e7 100644 (file)
@@ -26,6 +26,75 @@ _LOG_PREFIX_OUT = '>'
 _LOG_PREFIX_IN = '<'
 
 
+@dataclass
+class _Update:
+    path: tuple[str, ...]
+    value: Optional[Any] = None
+
+
+class _UpdatingNode(AutoAttrMixin):
+    log_scopes: dict[tuple[str, ...], LogScope] = {tuple(): LogScope.SERVER}
+
+    def _make_attr(self, cls: Callable, key: str):
+        return cls()
+
+    @classmethod
+    def _scope(cls, path: tuple[str, ...]) -> LogScope:
+        scopes: dict[tuple[str, ...], LogScope] = {}
+        for c in cls.__mro__:
+            if hasattr(c, 'log_scopes'):
+                scopes = c.log_scopes | scopes
+        return scopes.get(path, scopes[tuple()])
+
+    def set_and_check_for_change(self, update: _Update
+                                 ) -> Optional[tuple[LogScope, Any]]:
+        'Apply update, return if that actually made a difference.'
+        key = update.path[0]
+        node = self._get(key)
+        scope = self._scope(update.path)
+        if len(update.path) == 1:
+            if update.value is None:
+                if not self._is_set(key):
+                    return None
+                self._unset(key)
+                return (scope, None)
+            if node == update.value:
+                return None
+            self._set(key, update.value)
+            return (scope, update.value)
+        return node.set_and_check_for_change(_Update(update.path[1:],
+                                                     update.value))
+
+    def _get(self, key: str):
+        return getattr(self, key)
+
+    def _set(self, key: str, value) -> None:
+        setattr(self, key, value)
+
+    def _unset(self, key: str) -> None:
+        getattr(self, key).clear()
+
+    def _is_set(self, key: str) -> bool:
+        return hasattr(self, key)
+
+
+class _UpdatingDict(Dict[DictItem], _UpdatingNode):
+
+    def _get(self, key: str):
+        if key not in self._dict:
+            self._dict[key] = self._item_cls()
+        return self._dict[key]
+
+    def _set(self, key: str, value) -> None:
+        self._dict[key] = value
+
+    def _unset(self, key: str) -> None:
+        del self._dict[key]
+
+    def _is_set(self, key: str) -> bool:
+        return key in self._dict
+
+
 class _ClientWindow(Window, ClientQueueMixin):
 
     def __init__(self, scope: LogScope, log: Callable, **kwargs) -> None:
@@ -117,75 +186,6 @@ class _ChannelWindow(_ChatWindow):
         self._send_msg('PART', (self.chatname,))
 
 
-@dataclass
-class _Update:
-    path: tuple[str, ...]
-    value: Optional[Any] = None
-
-
-class _UpdatingNode(AutoAttrMixin):
-    log_scopes: dict[tuple[str, ...], LogScope] = {tuple(): LogScope.SERVER}
-
-    def _make_attr(self, cls: Callable, key: str):
-        return cls()
-
-    @classmethod
-    def _scope(cls, path: tuple[str, ...]) -> LogScope:
-        scopes: dict[tuple[str, ...], LogScope] = {}
-        for c in cls.__mro__:
-            if hasattr(c, 'log_scopes'):
-                scopes = c.log_scopes | scopes
-        return scopes.get(path, scopes[tuple()])
-
-    def set_and_check_for_change(self, update: _Update
-                                 ) -> Optional[tuple[LogScope, Any]]:
-        'Apply update, return if that actually made a difference.'
-        key = update.path[0]
-        node = self._get(key)
-        scope = self._scope(update.path)
-        if len(update.path) == 1:
-            if update.value is None:
-                if not self._is_set(key):
-                    return None
-                self._unset(key)
-                return (scope, None)
-            if node == update.value:
-                return None
-            self._set(key, update.value)
-            return (scope, update.value)
-        return node.set_and_check_for_change(_Update(update.path[1:],
-                                                     update.value))
-
-    def _get(self, key: str):
-        return getattr(self, key)
-
-    def _set(self, key: str, value) -> None:
-        setattr(self, key, value)
-
-    def _unset(self, key: str) -> None:
-        getattr(self, key).clear()
-
-    def _is_set(self, key: str) -> bool:
-        return hasattr(self, key)
-
-
-class _UpdatingDict(Dict[DictItem], _UpdatingNode):
-
-    def _get(self, key: str):
-        if key not in self._dict:
-            self._dict[key] = self._item_cls()
-        return self._dict[key]
-
-    def _set(self, key: str, value) -> None:
-        self._dict[key] = value
-
-    def _unset(self, key: str) -> None:
-        del self._dict[key]
-
-    def _is_set(self, key: str) -> bool:
-        return key in self._dict
-
-
 class _UpdatingChannel(_UpdatingNode):
     user_ids: tuple[str, ...] = tuple()
     log_scopes = {tuple(): LogScope.CHAT}