IrcMessage, PORT_SSL)
ClientsDb = dict[str, 'Client']
+
+
+class Db:
+ 'Helper with some conveniences around annotated attributes.'
+
+ def __init__(self, **kwargs) -> None:
+ self._types: dict[str, type] = {}
+ for c in self.__class__.__mro__:
+ if hasattr(c, '__annotations__'):
+ self._types = c.__annotations__ | self._types
+ for name, type_ in self._types.items():
+ setattr(self, name, type_())
+ super().__init__(**kwargs)
+
+ def _typecheck(self, key: str, value) -> None:
+ type_ = self._types[key]
+ if hasattr(type_, '__origin__'):
+ assert isinstance(value, type_.__origin__)
+ if len(value):
+ assert hasattr(type_, '__args__')
+ item_type = type_.__args__[0]
+ for item in value:
+ assert isinstance(item, item_type)
+ else:
+ assert isinstance(value, type_)
+
+
+@dataclass
+class IrcConnSetup:
+ 'All we need to know to set up a new Client connection.'
+ hostname: str = ''
+ port: int = 0
+ nick_wanted: str = ''
+ realname: str = ''
+ password: str = ''
+
+
+class SharedChannelDbFields:
+ 'API for fields shared directly in name and type with TUI.'
+ user_ids: tuple[str, ...]
+ # topic: str
+ # channel_modes: str
+
+
+_ChannelDbFields = TypeVar('_ChannelDbFields', bound=SharedChannelDbFields)
+
+
+class SharedClientDbFields(IrcConnSetup, Generic[_ChannelDbFields]):
+ 'API for fields shared directly in name and type with TUI.'
+ connection_state: str
+ sasl_account: str
+ sasl_auth_state: str
+ user_modes: str
+ users: Any
+ _channels: dict[str, _ChannelDbFields]
+
+
+@dataclass
+class NickUserHost:
+ 'Combination of nickname, username on host, and host.'
+ nick: str = '?'
+ user: str = '?'
+ host: str = '?'
+
+ def copy(self) -> Self:
+ 'Produce copy not subject to later attribute changes on original.'
+ return self.__class__(**dc_asdict(self))
+
+
+@dataclass
+class ServerCapability:
+ 'Public API for CAP data.'
+ data: str
+ enabled: bool = False
+
+
+class LogScope(Enum):
+ 'Where log messages should go.'
+ ALL = auto()
+ SERVER = auto()
+ RAW = auto()
+ CHAT = auto()
+ SAME = auto()
+
+
+@dataclass
+class _ClientIdMixin:
+ 'Collects a Client\'s ID at .client_id.'
+ client_id: str
+
+
+@dataclass
+class ClientEvent(AffectiveEvent, _ClientIdMixin):
+ 'To affect Client identified by ClientIdMixin.'
+
+
+@dataclass
+class NewClientEvent(AffectiveEvent):
+ 'Put Client .payload into ClientsDb target.'
+ payload: 'Client'
+
+ def affect(self, target: ClientsDb) -> None:
+ target[self.payload.client_id] = self.payload
+
+
+@dataclass
+class ClientQueueMixin(QueueMixin, _ClientIdMixin):
+ 'To QueueMixin adds _cput to send ClientEvent for self.'
+
+ def _client_trigger(self, t_method: str, **kwargs) -> None:
+ self._put(ClientEvent.affector(t_method, client_id=self.client_id
+ ).kw(**kwargs))
+
+
_NAMES_DESIRED_SERVER_CAPS = ('sasl',)
_ILLEGAL_NICK_FIRSTCHARS = '~&@+# '
return name + str(0 if not digits else (int(digits) + 1))
-class LogScope(Enum):
- 'Where log messages should go.'
- ALL = auto()
- SERVER = auto()
- RAW = auto()
- CHAT = auto()
- SAME = auto()
-
-
-@dataclass
-class ClientIdMixin:
- 'Collects a Client\'s ID at .client_id.'
- client_id: str
-
-
-@dataclass
-class ClientEvent(AffectiveEvent, ClientIdMixin):
- 'To affect Client identified by ClientIdMixin.'
-
-
@dataclass
-class _IrcConnection(BaseIrcConnection, ClientIdMixin):
+class _IrcConnection(BaseIrcConnection, _ClientIdMixin):
hostname: InitVar[str] # needed by BaseIrcConnection, but not desired as
port: InitVar[int] # dataclass fields, only for __post_init__ call
client_id=self.client_id).kw(e=e)
-@dataclass
-class ClientQueueMixin(QueueMixin, ClientIdMixin):
- 'To QueueMixin adds _cput to send ClientEvent for self.'
-
- def _client_trigger(self, t_method: str, **kwargs) -> None:
- self._put(ClientEvent.affector(t_method, client_id=self.client_id
- ).kw(**kwargs))
-
-
class _UpdatingDict:
_on_update: Callable
def __init__(self) -> None:
self._dict: dict[str, Any] = {}
+ def __getitem__(self, key: str):
+ return self._dict[key]
+
+ def __setitem__(self, key: str, val: Any) -> None:
+ if isinstance(val, _NickUserHost):
+ val.set_on_update(lambda: self._on_update(key))
+ self._dict[key] = val
+ self._on_update(key)
+
+ def __delitem__(self, key: str) -> None:
+ del self._dict[key]
+ self._on_update(key)
+
@property
def keys(self) -> tuple[str, ...]:
'Keys of item registrations.'
return tuple(self._dict.keys())
- def set_on_update(self, name: str, on_update: Callable) -> None:
- 'Caller of on_update with path= set to name.'
- self._on_update = lambda k: on_update(name, k)
-
def clear(self) -> None:
'Zero dict and send clearance update.'
self._dict.clear()
self._on_update('')
- @staticmethod
- def key_val_from_eq_str(eq_str: str) -> tuple[str, str]:
- 'Split eq_str by "=", and if none, into eq_str and "".'
- toks = eq_str.split('=', maxsplit=1)
- return toks[0], '' if len(toks) == 1 else toks[1]
+ def set_on_update(self, name: str, on_update: Callable) -> None:
+ 'Caller of on_update with path= set to name.'
+ self._on_update = lambda k: on_update(name, k)
def set_from_eq_str(self, eq_str: str, cls=str) -> None:
'Set from .key_val_from_eq_str result, to cls(val).'
key, value = self.key_val_from_eq_str(eq_str)
self[key] = cls(value)
- def __getitem__(self, key: str):
- return self._dict[key]
+ @staticmethod
+ def key_val_from_eq_str(eq_str: str) -> tuple[str, str]:
+ 'Split eq_str by "=", and if none, into eq_str and "".'
+ toks = eq_str.split('=', maxsplit=1)
+ return toks[0], '' if len(toks) == 1 else toks[1]
- def __setitem__(self, key: str, val: Any) -> None:
- if isinstance(val, _NickUserHost):
- val.set_on_update(lambda: self._on_update(key))
- self._dict[key] = val
- self._on_update(key)
- def __delitem__(self, key: str) -> None:
- del self._dict[key]
- self._on_update(key)
+class _CompletableStringsList:
+ _on_update: Callable
+
+ def __init__(self) -> None:
+ self._incomplete: list[str] = []
+ self.completed: tuple[str, ...] = tuple()
+ def append(self, value: str) -> None:
+ 'Append value to list.'
+ self._incomplete.append(value)
-@dataclass
-class ServerCapability:
- 'Public API for CAP data.'
- data: str
- enabled: bool = False
+ def complete(self) -> None:
+ 'Declare list done, call updater if set.'
+ self.completed = tuple(self._incomplete)
+ self._incomplete.clear()
+ if hasattr(self, '_on_update'):
+ self._on_update()
class _CapsManager:
self.clear()
self._send('LS', '302')
+ def end_negotiation(self) -> None:
+ 'Stop negotation, without emptying caps DB.'
+ self._send('END')
+
def process_msg(self, verb: str, items: tuple[str, ...], complete: bool
) -> bool:
'Parse CAP message to negot. steps, DB inputs; return if successful.'
return True
return False
- def end_negotiation(self) -> None:
- 'Stop negotation, without emptying caps DB.'
- self._send('END')
-
-
-@dataclass
-class IrcConnSetup:
- 'All we need to know to set up a new Client connection.'
- hostname: str = ''
- port: int = 0
- nick_wanted: str = ''
- realname: str = ''
- password: str = ''
-
-
-class Db:
- 'Helper with some conveniences around annotated attributes.'
-
- def __init__(self, **kwargs) -> None:
- self._types: dict[str, type] = {}
- for c in self.__class__.__mro__:
- if hasattr(c, '__annotations__'):
- self._types = c.__annotations__ | self._types
- for name, type_ in self._types.items():
- setattr(self, name, type_())
- super().__init__(**kwargs)
-
- def _typecheck(self, key: str, value) -> None:
- type_ = self._types[key]
- if hasattr(type_, '__origin__'):
- assert isinstance(value, type_.__origin__)
- if len(value):
- assert hasattr(type_, '__args__')
- item_type = type_.__args__[0]
- for item in value:
- assert isinstance(item, item_type)
- else:
- assert isinstance(value, type_)
-
-
-class _CompletableStringsList:
- _on_update: Callable
-
- def __init__(self) -> None:
- self._incomplete: list[str] = []
- self.completed: tuple[str, ...] = tuple()
-
- def append(self, value: str) -> None:
- 'Append value to list.'
- self._incomplete.append(value)
-
- def complete(self) -> None:
- 'Declare list done, call updater if set.'
- self.completed = tuple(self._incomplete)
- self._incomplete.clear()
- if hasattr(self, '_on_update'):
- self._on_update()
-
class _Db(Db):
self._on_update(key)
-class SharedChannelDbFields:
- 'API for fields shared directly in name and type with TUI.'
- user_ids: tuple[str, ...]
- # topic: str
- # channel_modes: str
-
-
-_ChannelDbFields = TypeVar('_ChannelDbFields', bound=SharedChannelDbFields)
-
-
class _ChannelDb(_Db, SharedChannelDbFields):
def __init__(self, purge_users: Callable, **kwargs) -> None:
self._purge_users()
-class SharedClientDbFields(IrcConnSetup, Generic[_ChannelDbFields]):
- 'API for fields shared directly in name and type with TUI.'
- connection_state: str
- sasl_account: str
- sasl_auth_state: str
- user_modes: str
- users: Any
- _channels: dict[str, _ChannelDbFields]
-
-
-@dataclass
-class NickUserHost:
- 'Combination of nickname, username on host, and host.'
- nick: str = '?'
- user: str = '?'
- host: str = '?'
-
- def copy(self) -> Self:
- 'Produce copy not subject to later attribute changes on original.'
- return self.__class__(**dc_asdict(self))
-
-
class _NickUserHost(NickUserHost):
_on_update: Callable
def __str__(self) -> str:
return f'{self.nick}!{self.user}@{self.host}'
+ def __setattr__(self, key: str, value: Any) -> None:
+ if key == 'nickuserhost' and isinstance(value, _NickUserHost):
+ self.nick = value.nick
+ self.user = value.user
+ self.host = value.host
+ else:
+ super().__setattr__(key, value)
+ if key != '_on_update' and hasattr(self, '_on_update'):
+ self._on_update()
+
@classmethod
def from_str(cls, value: str) -> Self:
'Produce from string assumed to fit _!_@_ pattern.'
'Caller of on_update with path= set to name.'
self._on_update = on_update
- def __setattr__(self, key: str, value: Any) -> None:
- if key == 'nickuserhost' and isinstance(value, _NickUserHost):
- self.nick = value.nick
- self.user = value.user
- self.host = value.host
- else:
- super().__setattr__(key, value)
- if key != '_on_update' and hasattr(self, '_on_update'):
- self._on_update()
-
class _ClientDb(_Db, SharedClientDbFields):
caps: _UpdatingDict
super().__init__(**kwargs)
self.motd._on_update = lambda: self._on_update('motd')
+ def _purge_users(self) -> None:
+ to_keep = {'me'}
+ for chan in self._channels.values():
+ to_keep |= set(chan.user_ids)
+ for user_id in [id_ for id_ in self.users.keys
+ if id_ not in to_keep]:
+ del self.users[user_id]
+
+ def needs_arg(self, key: str) -> bool:
+ 'Reply if attribute of key may reasonably be addressed without an arg.'
+ return not isinstance(getattr(self, key), (bool, int, str, tuple,
+ _CompletableStringsList))
+
def user_id(self, query: str | _NickUserHost) -> str:
'Return user_id for nickname of entire NickUserHost, create if none.'
nick = query if isinstance(query, str) else query.nick
self.users[id_] = _NickUserHost(query)
return id_
- def _purge_users(self) -> None:
- to_keep = {'me'}
- for chan in self._channels.values():
- to_keep |= set(chan.user_ids)
- for user_id in [id_ for id_ in self.users.keys
- if id_ not in to_keep]:
- del self.users[user_id]
-
def remove_user(self, user_id: str) -> tuple[str, ...]:
'Run remove_user_from_channel on all channels user is in.'
affected_chans = []
affected_chans += [id_]
return tuple(affected_chans)
- def needs_arg(self, key: str) -> bool:
- 'Reply if attribute of key may reasonably be addressed without an arg.'
- return not isinstance(getattr(self, key), (bool, int, str, tuple,
- _CompletableStringsList))
-
@property
def chan_names(self) -> tuple[str, ...]:
'Return names of joined channels.'
self._db.connection_state = 'connecting'
Thread(target=connect, daemon=True, args=(self,)).start()
- @abstractmethod
- def _on_update(self, path: str, arg: str = '') -> None:
- pass
-
def _on_connect(self) -> None:
assert self.conn is not None
self._db.connection_state = 'connected'
'0', '*', self._db.realname)))
self.send(IrcMessage(verb='NICK', params=(self._db.nick_wanted,)))
+ def close(self) -> None:
+ 'Close both recv Loop and socket.'
+ self._db.connection_state = 'disconnected'
+ if self.conn:
+ self.conn.close()
+ self.conn = None
+ for name in self._db.chan_names:
+ self._db.del_chan(name)
+ self._db.isupports.clear()
+ self._db.users['me'].nick = '?'
+ self._db.sasl_auth_state = ''
+
+ def on_handled_loop_exception(self, e: IrcConnAbortException) -> None:
+ 'Gracefully handle broken connection.'
+ self._log(f'connection broken: {e}', alert=True)
+ self.close()
+
+ @abstractmethod
+ def _on_update(self, path: str, arg: str = '') -> None:
+ pass
+
@abstractmethod
def _log(self, msg: str, scope=LogScope.SERVER, **kwargs) -> None:
pass
self._log(to_log, scope=log_target)
self._log(msg.raw, scope=LogScope.RAW, out=True)
- def close(self) -> None:
- 'Close both recv Loop and socket.'
- self._db.connection_state = 'disconnected'
- if self.conn:
- self.conn.close()
- self.conn = None
- for name in self._db.chan_names:
- self._db.del_chan(name)
- self._db.isupports.clear()
- self._db.users['me'].nick = '?'
- self._db.sasl_auth_state = ''
-
- def on_handled_loop_exception(self, e: IrcConnAbortException) -> None:
- 'Gracefully handle broken connection.'
- self._log(f'connection broken: {e}', alert=True)
- self.close()
-
def _match_msg(self, msg: IrcMessage) -> dict[str, Any]:
'Test .source, .verb, .params.'
tok_type = (str | _NickUserHost | tuple[str, ...]
for chan in self._db.remove_user(self._db.user_id(ret['quitter'])):
self._log(f'{ret["quitter"]} quits: {ret["message"]}',
LogScope.CHAT, target=chan)
-
-
-@dataclass
-class NewClientEvent(AffectiveEvent):
- 'Put Client .payload into ClientsDb target.'
- payload: Client
-
- def affect(self, target: ClientsDb) -> None:
- target[self.payload.client_id] = self.payload