home · contact · privacy
Some code-regrouping for better readability.
authorChristian Heller <c.heller@plomlompom.de>
Mon, 25 Aug 2025 23:22:19 +0000 (01:22 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Mon, 25 Aug 2025 23:22:19 +0000 (01:22 +0200)
ircplom/client.py

index b89997c973bb05556606f3162a1fa7a85c2b7c91..9a99aa23cbad30f017f792107553b6bf13d2748b 100644 (file)
@@ -15,6 +15,120 @@ from ircplom.irc_conn import (BaseIrcConnection, IrcConnAbortException,
                               IrcMessage, PORT_SSL)
 
 ClientsDb = dict[str, 'Client']
+
+
+class Db:
+    'Helper with some conveniences around annotated attributes.'
+
+    def __init__(self, **kwargs) -> None:
+        self._types: dict[str, type] = {}
+        for c in self.__class__.__mro__:
+            if hasattr(c, '__annotations__'):
+                self._types = c.__annotations__ | self._types
+        for name, type_ in self._types.items():
+            setattr(self, name, type_())
+        super().__init__(**kwargs)
+
+    def _typecheck(self, key: str, value) -> None:
+        type_ = self._types[key]
+        if hasattr(type_, '__origin__'):
+            assert isinstance(value, type_.__origin__)
+            if len(value):
+                assert hasattr(type_, '__args__')
+                item_type = type_.__args__[0]
+                for item in value:
+                    assert isinstance(item, item_type)
+        else:
+            assert isinstance(value, type_)
+
+
+@dataclass
+class IrcConnSetup:
+    'All we need to know to set up a new Client connection.'
+    hostname: str = ''
+    port: int = 0
+    nick_wanted: str = ''
+    realname: str = ''
+    password: str = ''
+
+
+class SharedChannelDbFields:
+    'API for fields shared directly in name and type with TUI.'
+    user_ids: tuple[str, ...]
+    # topic: str
+    # channel_modes: str
+
+
+_ChannelDbFields = TypeVar('_ChannelDbFields', bound=SharedChannelDbFields)
+
+
+class SharedClientDbFields(IrcConnSetup, Generic[_ChannelDbFields]):
+    'API for fields shared directly in name and type with TUI.'
+    connection_state: str
+    sasl_account: str
+    sasl_auth_state: str
+    user_modes: str
+    users: Any
+    _channels: dict[str, _ChannelDbFields]
+
+
+@dataclass
+class NickUserHost:
+    'Combination of nickname, username on host, and host.'
+    nick: str = '?'
+    user: str = '?'
+    host: str = '?'
+
+    def copy(self) -> Self:
+        'Produce copy not subject to later attribute changes on original.'
+        return self.__class__(**dc_asdict(self))
+
+
+@dataclass
+class ServerCapability:
+    'Public API for CAP data.'
+    data: str
+    enabled: bool = False
+
+
+class LogScope(Enum):
+    'Where log messages should go.'
+    ALL = auto()
+    SERVER = auto()
+    RAW = auto()
+    CHAT = auto()
+    SAME = auto()
+
+
+@dataclass
+class _ClientIdMixin:
+    'Collects a Client\'s ID at .client_id.'
+    client_id: str
+
+
+@dataclass
+class ClientEvent(AffectiveEvent, _ClientIdMixin):
+    'To affect Client identified by ClientIdMixin.'
+
+
+@dataclass
+class NewClientEvent(AffectiveEvent):
+    'Put Client .payload into ClientsDb target.'
+    payload: 'Client'
+
+    def affect(self, target: ClientsDb) -> None:
+        target[self.payload.client_id] = self.payload
+
+
+@dataclass
+class ClientQueueMixin(QueueMixin, _ClientIdMixin):
+    'To QueueMixin adds _cput to send ClientEvent for self.'
+
+    def _client_trigger(self, t_method: str, **kwargs) -> None:
+        self._put(ClientEvent.affector(t_method, client_id=self.client_id
+                                       ).kw(**kwargs))
+
+
 _NAMES_DESIRED_SERVER_CAPS = ('sasl',)
 _ILLEGAL_NICK_FIRSTCHARS = '~&@+# '
 
@@ -347,28 +461,8 @@ def _nick_incremented(nickname: str) -> str:
     return name + str(0 if not digits else (int(digits) + 1))
 
 
-class LogScope(Enum):
-    'Where log messages should go.'
-    ALL = auto()
-    SERVER = auto()
-    RAW = auto()
-    CHAT = auto()
-    SAME = auto()
-
-
-@dataclass
-class ClientIdMixin:
-    'Collects a Client\'s ID at .client_id.'
-    client_id: str
-
-
-@dataclass
-class ClientEvent(AffectiveEvent, ClientIdMixin):
-    'To affect Client identified by ClientIdMixin.'
-
-
 @dataclass
-class _IrcConnection(BaseIrcConnection, ClientIdMixin):
+class _IrcConnection(BaseIrcConnection, _ClientIdMixin):
     hostname: InitVar[str]  # needed by BaseIrcConnection, but not desired as
     port: InitVar[int]      # dataclass fields, only for __post_init__ call
 
@@ -386,65 +480,68 @@ class _IrcConnection(BaseIrcConnection, ClientIdMixin):
                                     client_id=self.client_id).kw(e=e)
 
 
-@dataclass
-class ClientQueueMixin(QueueMixin, ClientIdMixin):
-    'To QueueMixin adds _cput to send ClientEvent for self.'
-
-    def _client_trigger(self, t_method: str, **kwargs) -> None:
-        self._put(ClientEvent.affector(t_method, client_id=self.client_id
-                                       ).kw(**kwargs))
-
-
 class _UpdatingDict:
     _on_update: Callable
 
     def __init__(self) -> None:
         self._dict: dict[str, Any] = {}
 
+    def __getitem__(self, key: str):
+        return self._dict[key]
+
+    def __setitem__(self, key: str, val: Any) -> None:
+        if isinstance(val, _NickUserHost):
+            val.set_on_update(lambda: self._on_update(key))
+        self._dict[key] = val
+        self._on_update(key)
+
+    def __delitem__(self, key: str) -> None:
+        del self._dict[key]
+        self._on_update(key)
+
     @property
     def keys(self) -> tuple[str, ...]:
         'Keys of item registrations.'
         return tuple(self._dict.keys())
 
-    def set_on_update(self, name: str, on_update: Callable) -> None:
-        'Caller of on_update with path= set to name.'
-        self._on_update = lambda k: on_update(name, k)
-
     def clear(self) -> None:
         'Zero dict and send clearance update.'
         self._dict.clear()
         self._on_update('')
 
-    @staticmethod
-    def key_val_from_eq_str(eq_str: str) -> tuple[str, str]:
-        'Split eq_str by "=", and if none, into eq_str and "".'
-        toks = eq_str.split('=', maxsplit=1)
-        return toks[0], '' if len(toks) == 1 else toks[1]
+    def set_on_update(self, name: str, on_update: Callable) -> None:
+        'Caller of on_update with path= set to name.'
+        self._on_update = lambda k: on_update(name, k)
 
     def set_from_eq_str(self, eq_str: str, cls=str) -> None:
         'Set from .key_val_from_eq_str result, to cls(val).'
         key, value = self.key_val_from_eq_str(eq_str)
         self[key] = cls(value)
 
-    def __getitem__(self, key: str):
-        return self._dict[key]
+    @staticmethod
+    def key_val_from_eq_str(eq_str: str) -> tuple[str, str]:
+        'Split eq_str by "=", and if none, into eq_str and "".'
+        toks = eq_str.split('=', maxsplit=1)
+        return toks[0], '' if len(toks) == 1 else toks[1]
 
-    def __setitem__(self, key: str, val: Any) -> None:
-        if isinstance(val, _NickUserHost):
-            val.set_on_update(lambda: self._on_update(key))
-        self._dict[key] = val
-        self._on_update(key)
 
-    def __delitem__(self, key: str) -> None:
-        del self._dict[key]
-        self._on_update(key)
+class _CompletableStringsList:
+    _on_update: Callable
+
+    def __init__(self) -> None:
+        self._incomplete: list[str] = []
+        self.completed: tuple[str, ...] = tuple()
 
+    def append(self, value: str) -> None:
+        'Append value to list.'
+        self._incomplete.append(value)
 
-@dataclass
-class ServerCapability:
-    'Public API for CAP data.'
-    data: str
-    enabled: bool = False
+    def complete(self) -> None:
+        'Declare list done, call updater if set.'
+        self.completed = tuple(self._incomplete)
+        self._incomplete.clear()
+        if hasattr(self, '_on_update'):
+            self._on_update()
 
 
 class _CapsManager:
@@ -470,6 +567,10 @@ class _CapsManager:
         self.clear()
         self._send('LS', '302')
 
+    def end_negotiation(self) -> None:
+        'Stop negotation, without emptying caps DB.'
+        self._send('END')
+
     def process_msg(self, verb: str, items: tuple[str, ...], complete: bool
                     ) -> bool:
         'Parse CAP message to negot. steps, DB inputs; return if successful.'
@@ -503,64 +604,6 @@ class _CapsManager:
                     return True
         return False
 
-    def end_negotiation(self) -> None:
-        'Stop negotation, without emptying caps DB.'
-        self._send('END')
-
-
-@dataclass
-class IrcConnSetup:
-    'All we need to know to set up a new Client connection.'
-    hostname: str = ''
-    port: int = 0
-    nick_wanted: str = ''
-    realname: str = ''
-    password: str = ''
-
-
-class Db:
-    'Helper with some conveniences around annotated attributes.'
-
-    def __init__(self, **kwargs) -> None:
-        self._types: dict[str, type] = {}
-        for c in self.__class__.__mro__:
-            if hasattr(c, '__annotations__'):
-                self._types = c.__annotations__ | self._types
-        for name, type_ in self._types.items():
-            setattr(self, name, type_())
-        super().__init__(**kwargs)
-
-    def _typecheck(self, key: str, value) -> None:
-        type_ = self._types[key]
-        if hasattr(type_, '__origin__'):
-            assert isinstance(value, type_.__origin__)
-            if len(value):
-                assert hasattr(type_, '__args__')
-                item_type = type_.__args__[0]
-                for item in value:
-                    assert isinstance(item, item_type)
-        else:
-            assert isinstance(value, type_)
-
-
-class _CompletableStringsList:
-    _on_update: Callable
-
-    def __init__(self) -> None:
-        self._incomplete: list[str] = []
-        self.completed: tuple[str, ...] = tuple()
-
-    def append(self, value: str) -> None:
-        'Append value to list.'
-        self._incomplete.append(value)
-
-    def complete(self) -> None:
-        'Declare list done, call updater if set.'
-        self.completed = tuple(self._incomplete)
-        self._incomplete.clear()
-        if hasattr(self, '_on_update'):
-            self._on_update()
-
 
 class _Db(Db):
 
@@ -582,16 +625,6 @@ class _Db(Db):
             self._on_update(key)
 
 
-class SharedChannelDbFields:
-    'API for fields shared directly in name and type with TUI.'
-    user_ids: tuple[str, ...]
-    # topic: str
-    # channel_modes: str
-
-
-_ChannelDbFields = TypeVar('_ChannelDbFields', bound=SharedChannelDbFields)
-
-
 class _ChannelDb(_Db, SharedChannelDbFields):
 
     def __init__(self, purge_users: Callable, **kwargs) -> None:
@@ -610,34 +643,22 @@ class _ChannelDb(_Db, SharedChannelDbFields):
         self._purge_users()
 
 
-class SharedClientDbFields(IrcConnSetup, Generic[_ChannelDbFields]):
-    'API for fields shared directly in name and type with TUI.'
-    connection_state: str
-    sasl_account: str
-    sasl_auth_state: str
-    user_modes: str
-    users: Any
-    _channels: dict[str, _ChannelDbFields]
-
-
-@dataclass
-class NickUserHost:
-    'Combination of nickname, username on host, and host.'
-    nick: str = '?'
-    user: str = '?'
-    host: str = '?'
-
-    def copy(self) -> Self:
-        'Produce copy not subject to later attribute changes on original.'
-        return self.__class__(**dc_asdict(self))
-
-
 class _NickUserHost(NickUserHost):
     _on_update: Callable
 
     def __str__(self) -> str:
         return f'{self.nick}!{self.user}@{self.host}'
 
+    def __setattr__(self, key: str, value: Any) -> None:
+        if key == 'nickuserhost' and isinstance(value, _NickUserHost):
+            self.nick = value.nick
+            self.user = value.user
+            self.host = value.host
+        else:
+            super().__setattr__(key, value)
+            if key != '_on_update' and hasattr(self, '_on_update'):
+                self._on_update()
+
     @classmethod
     def from_str(cls, value: str) -> Self:
         'Produce from string assumed to fit _!_@_ pattern.'
@@ -651,16 +672,6 @@ class _NickUserHost(NickUserHost):
         'Caller of on_update with path= set to name.'
         self._on_update = on_update
 
-    def __setattr__(self, key: str, value: Any) -> None:
-        if key == 'nickuserhost' and isinstance(value, _NickUserHost):
-            self.nick = value.nick
-            self.user = value.user
-            self.host = value.host
-        else:
-            super().__setattr__(key, value)
-            if key != '_on_update' and hasattr(self, '_on_update'):
-                self._on_update()
-
 
 class _ClientDb(_Db, SharedClientDbFields):
     caps: _UpdatingDict
@@ -673,6 +684,19 @@ class _ClientDb(_Db, SharedClientDbFields):
         super().__init__(**kwargs)
         self.motd._on_update = lambda: self._on_update('motd')
 
+    def _purge_users(self) -> None:
+        to_keep = {'me'}
+        for chan in self._channels.values():
+            to_keep |= set(chan.user_ids)
+        for user_id in [id_ for id_ in self.users.keys
+                        if id_ not in to_keep]:
+            del self.users[user_id]
+
+    def needs_arg(self, key: str) -> bool:
+        'Reply if attribute of key may reasonably be addressed without an arg.'
+        return not isinstance(getattr(self, key), (bool, int, str, tuple,
+                                                   _CompletableStringsList))
+
     def user_id(self, query: str | _NickUserHost) -> str:
         'Return user_id for nickname of entire NickUserHost, create if none.'
         nick = query if isinstance(query, str) else query.nick
@@ -686,14 +710,6 @@ class _ClientDb(_Db, SharedClientDbFields):
             self.users[id_] = _NickUserHost(query)
         return id_
 
-    def _purge_users(self) -> None:
-        to_keep = {'me'}
-        for chan in self._channels.values():
-            to_keep |= set(chan.user_ids)
-        for user_id in [id_ for id_ in self.users.keys
-                        if id_ not in to_keep]:
-            del self.users[user_id]
-
     def remove_user(self, user_id: str) -> tuple[str, ...]:
         'Run remove_user_from_channel on all channels user is in.'
         affected_chans = []
@@ -703,11 +719,6 @@ class _ClientDb(_Db, SharedClientDbFields):
             affected_chans += [id_]
         return tuple(affected_chans)
 
-    def needs_arg(self, key: str) -> bool:
-        'Reply if attribute of key may reasonably be addressed without an arg.'
-        return not isinstance(getattr(self, key), (bool, int, str, tuple,
-                                                   _CompletableStringsList))
-
     @property
     def chan_names(self) -> tuple[str, ...]:
         'Return names of joined channels.'
@@ -764,10 +775,6 @@ class Client(ABC, ClientQueueMixin):
         self._db.connection_state = 'connecting'
         Thread(target=connect, daemon=True, args=(self,)).start()
 
-    @abstractmethod
-    def _on_update(self, path: str, arg: str = '') -> None:
-        pass
-
     def _on_connect(self) -> None:
         assert self.conn is not None
         self._db.connection_state = 'connected'
@@ -777,6 +784,27 @@ class Client(ABC, ClientQueueMixin):
             '0', '*', self._db.realname)))
         self.send(IrcMessage(verb='NICK', params=(self._db.nick_wanted,)))
 
+    def close(self) -> None:
+        'Close both recv Loop and socket.'
+        self._db.connection_state = 'disconnected'
+        if self.conn:
+            self.conn.close()
+        self.conn = None
+        for name in self._db.chan_names:
+            self._db.del_chan(name)
+        self._db.isupports.clear()
+        self._db.users['me'].nick = '?'
+        self._db.sasl_auth_state = ''
+
+    def on_handled_loop_exception(self, e: IrcConnAbortException) -> None:
+        'Gracefully handle broken connection.'
+        self._log(f'connection broken: {e}', alert=True)
+        self.close()
+
+    @abstractmethod
+    def _on_update(self, path: str, arg: str = '') -> None:
+        pass
+
     @abstractmethod
     def _log(self, msg: str, scope=LogScope.SERVER, **kwargs) -> None:
         pass
@@ -800,23 +828,6 @@ class Client(ABC, ClientQueueMixin):
                 self._log(to_log, scope=log_target)
         self._log(msg.raw, scope=LogScope.RAW, out=True)
 
-    def close(self) -> None:
-        'Close both recv Loop and socket.'
-        self._db.connection_state = 'disconnected'
-        if self.conn:
-            self.conn.close()
-        self.conn = None
-        for name in self._db.chan_names:
-            self._db.del_chan(name)
-        self._db.isupports.clear()
-        self._db.users['me'].nick = '?'
-        self._db.sasl_auth_state = ''
-
-    def on_handled_loop_exception(self, e: IrcConnAbortException) -> None:
-        'Gracefully handle broken connection.'
-        self._log(f'connection broken: {e}', alert=True)
-        self.close()
-
     def _match_msg(self, msg: IrcMessage) -> dict[str, Any]:
         'Test .source, .verb, .params.'
         tok_type = (str | _NickUserHost | tuple[str, ...]
@@ -974,12 +985,3 @@ class Client(ABC, ClientQueueMixin):
             for chan in self._db.remove_user(self._db.user_id(ret['quitter'])):
                 self._log(f'{ret["quitter"]} quits: {ret["message"]}',
                           LogScope.CHAT, target=chan)
-
-
-@dataclass
-class NewClientEvent(AffectiveEvent):
-    'Put Client .payload into ClientsDb target.'
-    payload: Client
-
-    def affect(self, target: ClientsDb) -> None:
-        target[self.payload.client_id] = self.payload