for item in params[-1].strip().split():
self._db.append(key, item)
if params[1] != '*':
- self._db.set(key, None, confirm=True)
- if self._db.caps_LIST is not None:
+ self._db.confirm(key)
+ if self._db.confirmed('caps_LIST'):
return True
- if self._db.caps_LS is not None:
+ if self._db.confirmed('caps_LS'):
availables = [_ServerCapability.split_name_data(item)[0]
for item in self._db.caps_LS]
for cap_name in [n for n in _NAMES_DESIRED_SERVER_CAPS
'For values of variable confirmation, and reading in multi-line lists.'
def __init__(self) -> None:
- self._dict: dict[str, ClientDbType] = {}
- self._confirmeds: list[str] = []
-
- def __getattr__(self, key: str) -> Optional[ClientDbType]:
- if key in self._dict and key in self._confirmeds:
- value = self._dict[key]
- self._typecheck(key, value)
- return value
- return None
-
- def __setattr__(self, key: str, *args, **kwargs) -> None:
- if key[:1] == '_':
- super().__setattr__(key, *args, **kwargs)
- else:
- raise CrashingException(
- 'no direct attribute setting, use .set() etc.')
-
- @classmethod
- def _type_for(cls, key):
- candidates = [c.__annotations__[key] for c in cls.__mro__
- if c is not object and key in c.__annotations__]
- if not candidates:
- raise CrashingException(f'{cls} lacks annotation for {key}')
- return candidates[0]
-
- @classmethod
- def _typecheck(cls, key: str, value: ClientDbType) -> None:
- type_ = cls._type_for(key)
- fail = True
- type_found = str(type(value))
- if not isinstance(type_, type): # gotta be GenericAlias, …
- assert hasattr(type_, '__origin__') # … which, if for list …
- assert type_.__origin__ is list # … (only probable …
- if isinstance(value, type_.__origin__): # … candidate so far), …
- fail = False # be ok if list emtpy # … stores members' …
- assert hasattr(type_, '__args__') # … types at .__args__
- subtypes_found = set()
- for subtype in type_.__args__:
- for x in [x for x in value if not isinstance(x, subtype)]:
- fail = True
- subtypes_found.add(str(type(x)))
- type_found = f'{type_.__origin__}: ' + '|'.join(subtypes_found)
- elif isinstance(value, type_):
- return
- if fail:
- raise CrashingException(
- f'wrong type for {key}: {type_found} (should be: {type_}, '
- f'provided value: {value})')
+ annos = {}
+ for c in self.__class__.__mro__:
+ if hasattr(c, '__annotations__'):
+ for k in [k for k in c.__annotations__ if k not in annos]:
+ annos[k] = c.__annotations__[k]
+ for name, type_ in annos.items():
+ if type_ is int:
+ setattr(self, name, 0)
+ elif type_ is str:
+ setattr(self, name, '')
+ elif hasattr(type_, '__origin__') and type_.__origin__ is list:
+ setattr(self, name, [])
+ else:
+ setattr(self, name, {})
+ self._confirmeds: set[str] = set()
def set(self, key: str, value: Optional[ClientDbType], confirm=False
) -> tuple[bool, bool]:
- 'Ensures setting, returns if changed value or confirmation.'
- if value is not None:
- self._typecheck(key, value)
- old_value, was_confirmed = self.get_force(key)
- value_changed = (value is not None) and value != old_value
- if value is None:
- if old_value is None:
- if confirm:
- raise CrashingException('called to unset non-set entry')
- del self._dict[key]
- elif value_changed:
- self._dict[key] = value
- confirm_changed = confirm != was_confirmed
- if confirm_changed:
- if confirm:
- self._confirmeds += [key]
- else:
- self._confirmeds.remove(key)
+ 'Set value at key and its confirmation, return what of either changed.'
+ value_changed = False
+ if value is not None and value != getattr(self, key, None):
+ value_changed = True
+ setattr(self, key, value)
+ confirm_changed = self.confirm(key, confirm)
return (value_changed, confirm_changed)
- def get_force(self, key: str) -> tuple[Optional[ClientDbType], bool]:
- 'Get even if only stored unconfirmed, tell if confirmed..'
- value = self._dict.get(key, None)
- if value is not None:
- self._typecheck(key, value)
- return (value, key in self._confirmeds)
+ def confirm(self, key, confirm=True) -> bool:
+ 'Declare value at key confirmed (or the opposite.'
+ if confirm and key not in self._confirmeds:
+ self._confirmeds.add(key)
+ return True
+ if (not confirm) and key in self._confirmeds:
+ self._confirmeds.remove(key)
+ return True
+ return False
def append(self, key: str, value: str, keep_confirmed=False) -> None:
- 'To list[str] keyed by key, append value; if non-existant, create it.'
- if not keep_confirmed and key in self._confirmeds:
+ 'To list at key add value; if not keep_confirmed, unconfirm value.'
+ if (not keep_confirmed) and key in self._confirmeds:
self._confirmeds.remove(key)
- del self._dict[key]
- targeted = self._dict.get(key, None)
- if isinstance(targeted, list):
- targeted.append(value)
- elif targeted is None:
- self._dict[key] = [value]
- else:
- raise CrashingException('called on non-list entry')
-
- def remove(self, key, value: str) -> None:
- 'From list[str] keyed by key, remove value.'
- targeted = self._dict.get(key, None)
- if isinstance(targeted, list):
- targeted.remove(value)
- else:
- raise CrashingException('called on non-list entry')
+ getattr(self, key).append(value)
+
+ def remove(self, key: str, value: str) -> None:
+ 'From list at key remove value.'
+ getattr(self, key).remove(value)
class ClientDbBase(_Db):
nickname: str
user_modes: str
+ def confirmed(self, key: str) -> bool:
+ 'If value at key be confirmed or not.'
+ return key in self._confirmeds
+
class _ChannelDb(_Db):
users: list[str]
realname: str
_channels: dict[str, _ChannelDb]
- @property
- def conn_setup(self) -> IrcConnSetup:
- 'Constructed out of stored entries *including* unconfirmed ones.'
- kwargs: dict[str, ClientDbType] = {}
- for name in IrcConnSetup._fields:
- val = self._dict.get(name, None)
- if val is None:
- raise CrashingException(f'field not set: {name}')
- assert self._type_for(name) == IrcConnSetup.__annotations__[name]
- self._typecheck(name, val)
- kwargs[name] = val
- return IrcConnSetup(**kwargs) # type: ignore # enough tests above
-
@property
def caps(self) -> dict[str, _ServerCapability]:
'Interpret .caps_LS, .caps_LIST into proper _ServerCapability listing.'
def __init__(self, conn_setup: IrcConnSetup, **kwargs) -> None:
self._db = _ClientDb()
for k in conn_setup._fields:
- self._db.set(k, getattr(conn_setup, k), confirm=k != 'nickname')
+ setattr(self._db, k, getattr(conn_setup, k))
if self._db.port <= 0:
- self._db.set('port', PORT_SSL, confirm=True)
+ self._db.port = PORT_SSL
self.client_id = self._db.hostname
self._prev_verb = ''
super().__init__(client_id=self.client_id, **kwargs)
+ for k in IrcConnSetup._fields:
+ self._update_db(k, None, confirm=k != 'nickname')
self._start_connecting()
def _start_connecting(self) -> None:
except Exception as e: # pylint: disable=broad-exception-caught
self._put(ExceptionEvent(CrashingException(e)))
- self._log('connecting …', conn_setup=self._db.conn_setup)
+ self._log('connecting …')
Thread(target=connect, daemon=True, args=(self,)).start()
def _on_connect(self) -> None:
f'{"yes" if self.conn.ssl else "no"})',
scope=LogScope.ALL)
self._caps = _CapsManager(self.send, self._db)
- conn_setup = self._db.conn_setup # for type-checks and to include
- self.send(IrcMessage( # … unconfirmed .nickname
- verb='USER', params=(getuser(), '0', '*', conn_setup.realname)))
- self.send(IrcMessage(verb='NICK', params=(conn_setup.nickname,)))
+ self.send(IrcMessage(verb='USER',
+ params=(getuser(), '0', '*', self._db.realname)))
+ self.send(IrcMessage(verb='NICK', params=(self._db.nickname,)))
@abstractmethod
def _log(self, msg: str, scope=LogScope.SERVER, **kwargs) -> None:
self._update_db('client_host', confirm=True,
value=msg.params[1].split('@')[-1])
case 'AUTHENTICATE' if msg.params == ('+',):
- auth = b64encode((self._db.conn_setup.nickname + '\0' +
- self._db.conn_setup.nickname + '\0' +
- self._db.conn_setup.password
+ auth = b64encode((self._db.nickname + '\0' +
+ self._db.nickname + '\0' +
+ self._db.password
).encode('utf-8')).decode('utf-8')
self.send(IrcMessage('AUTHENTICATE', (auth,)))
case 'CAP' if len(msg.params) > 1:
msg.nick_from_source)
case 'PING' if len(msg.params) == 1:
self.send(IrcMessage(verb='PONG', params=(msg.params[0],)))
- case '903' | '904' if len(msg.params) == 1:
+ case '903' | '904' if len(msg.params) == 2:
alert = msg.verb == '904'
self._log(f'SASL auth {"failed" if alert else "succeeded"}',
alert=alert)