home · contact · privacy
Greatly simplify database code. master
authorChristian Heller <c.heller@plomlompom.de>
Fri, 15 Aug 2025 06:23:31 +0000 (08:23 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Fri, 15 Aug 2025 06:23:31 +0000 (08:23 +0200)
ircplom/client.py
ircplom/client_tui.py

index c985c33c8ab71b622a8e9afe5c0af78badfb9ff6..38745743f5894fe370673958f42d226f320d8bd4 100644 (file)
@@ -143,10 +143,10 @@ class _CapsManager:
                 for item in params[-1].strip().split():
                     self._db.append(key, item)
                 if params[1] != '*':
                 for item in params[-1].strip().split():
                     self._db.append(key, item)
                 if params[1] != '*':
-                    self._db.set(key, None, confirm=True)
-                if self._db.caps_LIST is not None:
+                    self._db.confirm(key)
+                if self._db.confirmed('caps_LIST'):
                     return True
                     return True
-                if self._db.caps_LS is not None:
+                if self._db.confirmed('caps_LS'):
                     availables = [_ServerCapability.split_name_data(item)[0]
                                   for item in self._db.caps_LS]
                     for cap_name in [n for n in _NAMES_DESIRED_SERVER_CAPS
                     availables = [_ServerCapability.split_name_data(item)[0]
                                   for item in self._db.caps_LS]
                     for cap_name in [n for n in _NAMES_DESIRED_SERVER_CAPS
@@ -180,104 +180,51 @@ class _Db:
     'For values of variable confirmation, and reading in multi-line lists.'
 
     def __init__(self) -> None:
     'For values of variable confirmation, and reading in multi-line lists.'
 
     def __init__(self) -> None:
-        self._dict: dict[str, ClientDbType] = {}
-        self._confirmeds: list[str] = []
-
-    def __getattr__(self, key: str) -> Optional[ClientDbType]:
-        if key in self._dict and key in self._confirmeds:
-            value = self._dict[key]
-            self._typecheck(key, value)
-            return value
-        return None
-
-    def __setattr__(self, key: str, *args, **kwargs) -> None:
-        if key[:1] == '_':
-            super().__setattr__(key, *args, **kwargs)
-        else:
-            raise CrashingException(
-                    'no direct attribute setting, use .set() etc.')
-
-    @classmethod
-    def _type_for(cls, key):
-        candidates = [c.__annotations__[key] for c in cls.__mro__
-                      if c is not object and key in c.__annotations__]
-        if not candidates:
-            raise CrashingException(f'{cls} lacks annotation for {key}')
-        return candidates[0]
-
-    @classmethod
-    def _typecheck(cls, key: str, value: ClientDbType) -> None:
-        type_ = cls._type_for(key)
-        fail = True
-        type_found = str(type(value))
-        if not isinstance(type_, type):              # gotta be GenericAlias, …
-            assert hasattr(type_, '__origin__')      # … which, if for list …
-            assert type_.__origin__ is list          # … (only probable …
-            if isinstance(value, type_.__origin__):  # … candidate so far), …
-                fail = False  # be ok if list emtpy  # … stores members' …
-                assert hasattr(type_, '__args__')    # … types at .__args__
-                subtypes_found = set()
-                for subtype in type_.__args__:
-                    for x in [x for x in value if not isinstance(x, subtype)]:
-                        fail = True
-                        subtypes_found.add(str(type(x)))
-                type_found = f'{type_.__origin__}: ' + '|'.join(subtypes_found)
-        elif isinstance(value, type_):
-            return
-        if fail:
-            raise CrashingException(
-                    f'wrong type for {key}: {type_found} (should be: {type_}, '
-                    f'provided value: {value})')
+        annos = {}
+        for c in self.__class__.__mro__:
+            if hasattr(c, '__annotations__'):
+                for k in [k for k in c.__annotations__ if k not in annos]:
+                    annos[k] = c.__annotations__[k]
+        for name, type_ in annos.items():
+            if type_ is int:
+                setattr(self, name, 0)
+            elif type_ is str:
+                setattr(self, name, '')
+            elif hasattr(type_, '__origin__') and type_.__origin__ is list:
+                setattr(self, name, [])
+            else:
+                setattr(self, name, {})
+        self._confirmeds: set[str] = set()
 
     def set(self, key: str, value: Optional[ClientDbType], confirm=False
             ) -> tuple[bool, bool]:
 
     def set(self, key: str, value: Optional[ClientDbType], confirm=False
             ) -> tuple[bool, bool]:
-        'Ensures setting, returns if changed value or confirmation.'
-        if value is not None:
-            self._typecheck(key, value)
-        old_value, was_confirmed = self.get_force(key)
-        value_changed = (value is not None) and value != old_value
-        if value is None:
-            if old_value is None:
-                if confirm:
-                    raise CrashingException('called to unset non-set entry')
-                del self._dict[key]
-        elif value_changed:
-            self._dict[key] = value
-        confirm_changed = confirm != was_confirmed
-        if confirm_changed:
-            if confirm:
-                self._confirmeds += [key]
-            else:
-                self._confirmeds.remove(key)
+        'Set value at key and its confirmation, return what of either changed.'
+        value_changed = False
+        if value is not None and value != getattr(self, key, None):
+            value_changed = True
+            setattr(self, key, value)
+        confirm_changed = self.confirm(key, confirm)
         return (value_changed, confirm_changed)
 
         return (value_changed, confirm_changed)
 
-    def get_force(self, key: str) -> tuple[Optional[ClientDbType], bool]:
-        'Get even if only stored unconfirmed, tell if confirmed..'
-        value = self._dict.get(key, None)
-        if value is not None:
-            self._typecheck(key, value)
-        return (value, key in self._confirmeds)
+    def confirm(self, key, confirm=True) -> bool:
+        'Declare value at key confirmed (or the opposite.'
+        if confirm and key not in self._confirmeds:
+            self._confirmeds.add(key)
+            return True
+        if (not confirm) and key in self._confirmeds:
+            self._confirmeds.remove(key)
+            return True
+        return False
 
     def append(self, key: str, value: str, keep_confirmed=False) -> None:
 
     def append(self, key: str, value: str, keep_confirmed=False) -> None:
-        'To list[str] keyed by key, append value; if non-existant, create it.'
-        if not keep_confirmed and key in self._confirmeds:
+        'To list at key add value; if not keep_confirmed, unconfirm value.'
+        if (not keep_confirmed) and key in self._confirmeds:
             self._confirmeds.remove(key)
             self._confirmeds.remove(key)
-            del self._dict[key]
-        targeted = self._dict.get(key, None)
-        if isinstance(targeted, list):
-            targeted.append(value)
-        elif targeted is None:
-            self._dict[key] = [value]
-        else:
-            raise CrashingException('called on non-list entry')
-
-    def remove(self, key, value: str) -> None:
-        'From list[str] keyed by key, remove value.'
-        targeted = self._dict.get(key, None)
-        if isinstance(targeted, list):
-            targeted.remove(value)
-        else:
-            raise CrashingException('called on non-list entry')
+        getattr(self, key).append(value)
+
+    def remove(self, key: str, value: str) -> None:
+        'From list at key remove value.'
+        getattr(self, key).remove(value)
 
 
 class ClientDbBase(_Db):
 
 
 class ClientDbBase(_Db):
@@ -288,6 +235,10 @@ class ClientDbBase(_Db):
     nickname: str
     user_modes: str
 
     nickname: str
     user_modes: str
 
+    def confirmed(self, key: str) -> bool:
+        'If value at key be confirmed or not.'
+        return key in self._confirmeds
+
 
 class _ChannelDb(_Db):
     users: list[str]
 
 class _ChannelDb(_Db):
     users: list[str]
@@ -304,19 +255,6 @@ class _ClientDb(ClientDbBase):
     realname: str
     _channels: dict[str, _ChannelDb]
 
     realname: str
     _channels: dict[str, _ChannelDb]
 
-    @property
-    def conn_setup(self) -> IrcConnSetup:
-        'Constructed out of stored entries *including* unconfirmed ones.'
-        kwargs: dict[str, ClientDbType] = {}
-        for name in IrcConnSetup._fields:
-            val = self._dict.get(name, None)
-            if val is None:
-                raise CrashingException(f'field not set: {name}')
-            assert self._type_for(name) == IrcConnSetup.__annotations__[name]
-            self._typecheck(name, val)
-            kwargs[name] = val
-        return IrcConnSetup(**kwargs)  # type: ignore  # enough tests above
-
     @property
     def caps(self) -> dict[str, _ServerCapability]:
         'Interpret .caps_LS, .caps_LIST into proper _ServerCapability listing.'
     @property
     def caps(self) -> dict[str, _ServerCapability]:
         'Interpret .caps_LS, .caps_LIST into proper _ServerCapability listing.'
@@ -348,12 +286,14 @@ class Client(ABC, ClientQueueMixin):
     def __init__(self, conn_setup: IrcConnSetup, **kwargs) -> None:
         self._db = _ClientDb()
         for k in conn_setup._fields:
     def __init__(self, conn_setup: IrcConnSetup, **kwargs) -> None:
         self._db = _ClientDb()
         for k in conn_setup._fields:
-            self._db.set(k, getattr(conn_setup, k), confirm=k != 'nickname')
+            setattr(self._db, k, getattr(conn_setup, k))
         if self._db.port <= 0:
         if self._db.port <= 0:
-            self._db.set('port', PORT_SSL, confirm=True)
+            self._db.port = PORT_SSL
         self.client_id = self._db.hostname
         self._prev_verb = ''
         super().__init__(client_id=self.client_id, **kwargs)
         self.client_id = self._db.hostname
         self._prev_verb = ''
         super().__init__(client_id=self.client_id, **kwargs)
+        for k in IrcConnSetup._fields:
+            self._update_db(k, None, confirm=k != 'nickname')
         self._start_connecting()
 
     def _start_connecting(self) -> None:
         self._start_connecting()
 
     def _start_connecting(self) -> None:
@@ -373,7 +313,7 @@ class Client(ABC, ClientQueueMixin):
             except Exception as e:  # pylint: disable=broad-exception-caught
                 self._put(ExceptionEvent(CrashingException(e)))
 
             except Exception as e:  # pylint: disable=broad-exception-caught
                 self._put(ExceptionEvent(CrashingException(e)))
 
-        self._log('connecting …', conn_setup=self._db.conn_setup)
+        self._log('connecting …')
         Thread(target=connect, daemon=True, args=(self,)).start()
 
     def _on_connect(self) -> None:
         Thread(target=connect, daemon=True, args=(self,)).start()
 
     def _on_connect(self) -> None:
@@ -383,10 +323,9 @@ class Client(ABC, ClientQueueMixin):
                   f'{"yes" if self.conn.ssl else "no"})',
                   scope=LogScope.ALL)
         self._caps = _CapsManager(self.send, self._db)
                   f'{"yes" if self.conn.ssl else "no"})',
                   scope=LogScope.ALL)
         self._caps = _CapsManager(self.send, self._db)
-        conn_setup = self._db.conn_setup  # for type-checks and to include
-        self.send(IrcMessage(             # … unconfirmed .nickname
-            verb='USER', params=(getuser(), '0', '*', conn_setup.realname)))
-        self.send(IrcMessage(verb='NICK', params=(conn_setup.nickname,)))
+        self.send(IrcMessage(verb='USER',
+                             params=(getuser(), '0', '*', self._db.realname)))
+        self.send(IrcMessage(verb='NICK', params=(self._db.nickname,)))
 
     @abstractmethod
     def _log(self, msg: str, scope=LogScope.SERVER, **kwargs) -> None:
 
     @abstractmethod
     def _log(self, msg: str, scope=LogScope.SERVER, **kwargs) -> None:
@@ -462,9 +401,9 @@ class Client(ABC, ClientQueueMixin):
                 self._update_db('client_host', confirm=True,
                                 value=msg.params[1].split('@')[-1])
             case 'AUTHENTICATE' if msg.params == ('+',):
                 self._update_db('client_host', confirm=True,
                                 value=msg.params[1].split('@')[-1])
             case 'AUTHENTICATE' if msg.params == ('+',):
-                auth = b64encode((self._db.conn_setup.nickname + '\0' +
-                                  self._db.conn_setup.nickname + '\0' +
-                                  self._db.conn_setup.password
+                auth = b64encode((self._db.nickname + '\0' +
+                                  self._db.nickname + '\0' +
+                                  self._db.password
                                   ).encode('utf-8')).decode('utf-8')
                 self.send(IrcMessage('AUTHENTICATE', (auth,)))
             case 'CAP' if len(msg.params) > 1:
                                   ).encode('utf-8')).decode('utf-8')
                 self.send(IrcMessage('AUTHENTICATE', (auth,)))
             case 'CAP' if len(msg.params) > 1:
@@ -506,7 +445,7 @@ class Client(ABC, ClientQueueMixin):
                                                      msg.nick_from_source)
             case 'PING' if len(msg.params) == 1:
                 self.send(IrcMessage(verb='PONG', params=(msg.params[0],)))
                                                      msg.nick_from_source)
             case 'PING' if len(msg.params) == 1:
                 self.send(IrcMessage(verb='PONG', params=(msg.params[0],)))
-            case '903' | '904' if len(msg.params) == 1:
+            case '903' | '904' if len(msg.params) == 2:
                 alert = msg.verb == '904'
                 self._log(f'SASL auth {"failed" if alert else "succeeded"}',
                           alert=alert)
                 alert = msg.verb == '904'
                 self._log(f'SASL auth {"failed" if alert else "succeeded"}',
                           alert=alert)
index b7541fe1f4a97d5eca6718df0f79ade8acdfa69e..0e1a7d53c4d63808f8441dba3bb0604d0617fea2 100644 (file)
@@ -137,7 +137,8 @@ class _ClientWindowsManager:
             kwargs['win_cls'] = (_ChannelWindow if chatname[0] == '#'
                                  else _ChatWindow)
             kwargs['chatname'] = chatname
             kwargs['win_cls'] = (_ChannelWindow if chatname[0] == '#'
                                  else _ChatWindow)
             kwargs['chatname'] = chatname
-            kwargs['get_nick_data'] = lambda: self._db.get_force('nickname')
+            kwargs['get_nick_data'] = lambda: (
+                    self._db.nickname, self._db.confirmed('nickname'))
         win = self._tui_new_window(**kwargs)
         self.windows += [win]
         return win
         win = self._tui_new_window(**kwargs)
         self.windows += [win]
         return win
@@ -279,11 +280,6 @@ class _ClientKnowingTui(Client):
                 if cap.data:
                     listing += f' ({cap.data})'
                 to_log += [listing]
                 if cap.data:
                     listing += f' ({cap.data})'
                 to_log += [listing]
-        if 'conn_setup' in kwargs:
-            conn_setup = kwargs['conn_setup']
-            to_log += ['connection setup:']
-            for k in conn_setup._fields:
-                to_log += [f'  {k}: [{getattr(conn_setup, k)}]']
         for item in to_log:
             self._client_tui_trigger('log', scope=scope, msg=item, **kwargs)
             if scope == LogScope.RAW:
         for item in to_log:
             self._client_tui_trigger('log', scope=scope, msg=item, **kwargs)
             if scope == LogScope.RAW:
@@ -294,7 +290,7 @@ class _ClientKnowingTui(Client):
                    ) -> tuple[bool, bool]:
         value_changed, conf_changed = super()._update_db(key, value, confirm)
         if value is None and not value_changed:  # local ._db may have fallback
                    ) -> tuple[bool, bool]:
         value_changed, conf_changed = super()._update_db(key, value, confirm)
         if value is None and not value_changed:  # local ._db may have fallback
-            value = self._db.get_force(key)[0]   # values Tui._db doesn't
+            value = getattr(self._db, key)       # values Tui._db doesn't
         self._client_tui_trigger('update', scope=LogScope.SERVER,
                                  key=key, value=value, confirmed=confirm)
         return (value_changed, conf_changed)
         self._client_tui_trigger('update', scope=LogScope.SERVER,
                                  key=key, value=value, confirmed=confirm)
         return (value_changed, conf_changed)