ILLEGAL_NICK_CHARS, ILLEGAL_NICK_FIRSTCHARS, ISUPPORT_DEFAULTS, PORT_SSL)
from ircplom.msg_parse_expectations import MsgTok, MSG_EXPECTATIONS
-ClientsDb = dict[str, 'Client']
+
+_NAMES_DESIRED_SERVER_CAPS = ('sasl',)
-class _DeepAnnotationsMixin:
- 'Provide ._deep_annotations() of non-underscored annotations of whole MRO.'
+class AutoAttrMixin:
+ 'Ensures attribute as defined by annotations along MRO'
@classmethod
def _deep_annotations(cls) -> dict[str, type]:
types = c.__annotations__ | types
return {k: v for k, v in types.items() if k[0] != '_'}
-
-class AutoAttrMixin(_DeepAnnotationsMixin):
- 'Ensures attribute as defined by annotations, and ._make_attr method.'
-
def __getattribute__(self, key: str):
if key[0] != '_' and (cls := self._deep_annotations().get(key, None)):
try:
return self.__orig_class__.__args__[0]
-@dataclass
-class IrcConnSetup:
- 'All we need to know to set up a new Client connection.'
- hostname: str = ''
- port: int = 0
- nick_wanted: str = ''
- realname: str = ''
- password: str = ''
-
-
-class SharedClientDbFields(IrcConnSetup):
- 'API for fields shared directly in name and type with TUI.'
- connection_state: str = ''
- isupport: Dict[str]
- sasl_account: str = ''
- sasl_auth_state: str = ''
- user_modes: str = ''
-
- def is_chan_name(self, name: str) -> bool:
- 'Tests name to match CHANTYPES prefixes.'
- return name[0] in self.isupport['CHANTYPES']
-
-
-@dataclass
-class NickUserHost:
- 'Combination of nickname, username on host, and host.'
- nick: str = '?'
- user: str = '?'
- host: str = '?'
-
-
-@dataclass
-class ServerCapability:
- 'Public API for CAP data.'
- data: str = ''
- enabled: bool = False
-
-
-class LogScope(Enum):
- 'Where log messages should go.'
- ALL = auto()
- SERVER = auto()
- RAW = auto()
- CHAT = auto()
- SAME = auto()
-
-
-@dataclass
-class _ClientIdMixin:
- 'Collects a Client\'s ID at .client_id.'
- client_id: str
-
-
-@dataclass
-class ClientEvent(AffectiveEvent, _ClientIdMixin):
- 'To affect Client identified by ClientIdMixin.'
-
-
-@dataclass
-class NewClientEvent(AffectiveEvent):
- 'Put Client .payload into ClientsDb target.'
- payload: 'Client'
-
- def affect(self, target: ClientsDb) -> None:
- target[self.payload.client_id] = self.payload
-
-
-@dataclass
-class ClientQueueMixin(QueueMixin, _ClientIdMixin):
- 'To QueueMixin adds _cput to send ClientEvent for self.'
-
- def _client_trigger(self, t_method: str, **kwargs) -> None:
- self._put(ClientEvent.affector(t_method, client_id=self.client_id
- ).kw(**kwargs))
-
-
-_NAMES_DESIRED_SERVER_CAPS = ('sasl',)
-
-
-def _nick_incremented(nickname: str) -> str:
- 'Return nickname with number suffix incremented, or "0" if none.'
- name, digits = ([(nickname, '')]
- + [(nickname[:i], nickname[i:])
- for i in range(len(nickname), 0, -1)
- if nickname[i:].isdigit()]
- )[-1]
- return name + str(0 if not digits else (int(digits) + 1))
-
-
-@dataclass
-class _IrcConnection(BaseIrcConnection, _ClientIdMixin):
- hostname: InitVar[str] # needed by BaseIrcConnection, but not desired as
- port: InitVar[int] # dataclass fields, only for __post_init__ call
-
- def __post_init__(self, hostname, port, **kwargs) -> None:
- super().__init__(hostname=hostname, port=port, _q_out=self._q_out,
- **kwargs)
-
- def _make_recv_event(self, msg: IrcMessage) -> ClientEvent:
- return ClientEvent.affector('handle_msg', client_id=self.client_id
- ).kw(msg=msg)
-
- def _on_handled_loop_exception(self, e: IrcConnAbortException
- ) -> ClientEvent:
- return ClientEvent.affector('on_handled_loop_exception',
- client_id=self.client_id).kw(e=e)
-
-
class _Dict(Dict[DictItem]):
_defaults: dict[str, DictItem]
def complete(self) -> None:
'Declare list done.'
self.completed = tuple(self._incomplete)
- # self._incomplete.clear()
def clear(self) -> None:
'Wipe content and declare new emptiness as complete.'
self.complete()
-class _CapsManager:
-
- def __init__(self,
- sender: Callable[[IrcMessage], None],
- caps_dict: '_UpdatingDict[_UpdatingServerCapability]'
- ) -> None:
- self._dict = caps_dict
- self._send = lambda *params: sender(IrcMessage('CAP', params=params))
- self.clear()
-
- def clear(self) -> None:
- 'Zero internal knowledge.'
- self._dict.clear()
- self._ls = _CompletableStringsList()
- self._list = _CompletableStringsList()
- self._list_expectations: dict[str, set[str]] = {
- 'ACK': set(), 'NAK': set()}
-
- def start_negotation(self) -> None:
- 'Call .clear, send CAPS LS 302.'
- self.clear()
- self._send('LS', '302')
-
- def end_negotiation(self) -> None:
- 'Stop negotation, without emptying caps DB.'
- self._send('END')
-
- def process_msg(self, verb: str, items: tuple[str, ...], complete: bool
- ) -> bool:
- 'Parse CAP message to negot. steps, DB inputs; return if successful.'
- for item in items:
- if verb == 'NEW':
- key, data = _Dict.key_val_from_eq_str(item)
- self._dict.set_updating(key, ServerCapability(data=data))
- elif verb == 'DEL':
- del self._dict[item]
- elif verb in {'ACK', 'NACK'}:
- self._list_expectations[verb].add(item)
- if verb in {'LS', 'LIST'}:
- target = getattr(self, f'_{verb.lower()}')
- for item in items:
- target.append(item)
- if complete:
- target.complete()
- if target == self._ls:
- for cap_name in _NAMES_DESIRED_SERVER_CAPS:
- self._send('REQ', cap_name)
- self._send('LIST')
- elif target == self._list:
- acks = self._list_expectations['ACK']
- naks = self._list_expectations['NAK']
- list_set = set(target.completed)
- assert acks == list_set & acks
- assert set() == list_set & naks
- for key, data in [_Dict.key_val_from_eq_str(entry)
- for entry in self._ls.completed]:
- self._dict.set_updating(key, ServerCapability(
- data=data, enabled=key in self._list.completed))
- return True
- return False
-
-
-class _Channel:
- user_ids: _CompletableStringsList
-
- def __init__(self,
- get_id_for_nick: Callable,
- get_membership_prefixes: Callable,
- purge_users: Callable,
- **kwargs
- ) -> None:
- self._get_id_for_nick = get_id_for_nick
- self._get_membership_prefixes = get_membership_prefixes
- self._purge_users = purge_users
- super().__init__(**kwargs)
-
- def add_from_namreply(self, items: tuple[str, ...]):
- 'Add to .user_ids items assumed as nicknames with membership prefixes.'
- for item in items:
- nickname = item.lstrip(self._get_membership_prefixes())
- self.user_ids.append(self._get_id_for_nick(nickname))
-
- def append_nick(self, nickname: str) -> None:
- 'To .user_ids append .nickname and declare .user_ids complete.'
- user_id = self._get_id_for_nick(nickname)
- self.user_ids.append(user_id, complete=True)
-
- def remove_nick(self, nickname: str) -> None:
- 'From .user_ids remove .nickname and declare .user_ids complete.'
- user_id = self._get_id_for_nick(nickname)
- self.user_ids.remove(user_id, complete=True)
- self._purge_users()
-
-
-class _NickUserHost(NickUserHost, _DeepAnnotationsMixin):
-
- def __str__(self) -> str:
- return f'{self.nick}!{self.user}@{self.host}'
-
- def __eq__(self, other) -> bool:
- if not isinstance(other, NickUserHost):
- return False
- for key in self._deep_annotations().keys():
- if getattr(self, key) != getattr(other, key):
- return False
- return True
-
- def __setattr__(self, key: str, value: NickUserHost | str) -> None:
- if key == 'nickuserhost' and isinstance(value, _NickUserHost):
- for annotated_key in self._deep_annotations().keys():
- setattr(self, annotated_key, getattr(value, annotated_key))
- else:
- super().__setattr__(key, value)
-
- @classmethod
- def from_str(cls, value: str) -> Self:
- 'Produce from string assumed to fit _!_@_ pattern.'
- toks = value.split('!')
- assert len(toks) == 2
- toks = toks[0:1] + toks[1].split('@')
- assert len(toks) == 3
- return cls(*toks)
-
-
class _UpdatingMixin(AutoAttrMixin):
_on_update: Callable
return None
if isinstance(self, _CompletableStringsList):
return self.completed
- for cls in self.__class__.__mro__:
- if cls != _DeepAnnotationsMixin\
- and AutoAttrMixin not in cls.__mro__:
- obj = cls()
- break
+ for cls in [cls for cls in self.__class__.__mro__
+ if AutoAttrMixin not in cls.__mro__]:
+ obj = cls()
+ break
for key in self._deep_annotations():
attr_val = getattr(self, key)
setattr(obj, key,
self._on_update()
+@dataclass
+class IrcConnSetup:
+ 'All we need to know to set up a new Client connection.'
+ hostname: str = ''
+ port: int = 0
+ nick_wanted: str = ''
+ realname: str = ''
+ password: str = ''
+
+
+class SharedClientDbFields(IrcConnSetup):
+ 'API for fields shared directly in name and type with TUI.'
+ connection_state: str = ''
+ isupport: Dict[str]
+ sasl_account: str = ''
+ sasl_auth_state: str = ''
+ user_modes: str = ''
+
+ def is_chan_name(self, name: str) -> bool:
+ 'Tests name to match CHANTYPES prefixes.'
+ return name[0] in self.isupport['CHANTYPES']
+
+
+@dataclass
+class NickUserHost:
+ 'Combination of nickname, username on host, and host.'
+ nick: str = '?'
+ user: str = '?'
+ host: str = '?'
+
+
+@dataclass
+class ServerCapability:
+ 'Public API for CAP data.'
+ data: str = ''
+ enabled: bool = False
+
+
+class LogScope(Enum):
+ 'Where log messages should go.'
+ ALL = auto()
+ SERVER = auto()
+ RAW = auto()
+ CHAT = auto()
+ SAME = auto()
+
+
+@dataclass
+class _ClientIdMixin:
+ 'Collects a Client\'s ID at .client_id.'
+ client_id: str
+
+
+@dataclass
+class ClientQueueMixin(QueueMixin, _ClientIdMixin):
+ 'To QueueMixin adds _cput to send ClientEvent for self.'
+
+ def _client_trigger(self, t_method: str, **kwargs) -> None:
+ self._put(ClientEvent.affector(t_method, client_id=self.client_id
+ ).kw(**kwargs))
+
+
+@dataclass
+class _IrcConnection(BaseIrcConnection, _ClientIdMixin):
+ hostname: InitVar[str] # needed by BaseIrcConnection, but not desired as
+ port: InitVar[int] # dataclass fields, only for __post_init__ call
+
+ def __post_init__(self, hostname, port, **kwargs) -> None:
+ super().__init__(hostname=hostname, port=port, _q_out=self._q_out,
+ **kwargs)
+
+ def _make_recv_event(self, msg: IrcMessage) -> 'ClientEvent':
+ return ClientEvent.affector('handle_msg', client_id=self.client_id
+ ).kw(msg=msg)
+
+ def _on_handled_loop_exception(self, e: IrcConnAbortException
+ ) -> 'ClientEvent':
+ return ClientEvent.affector('on_handled_loop_exception',
+ client_id=self.client_id).kw(e=e)
+
+
+class _Channel:
+ user_ids: _CompletableStringsList
+
+ def __init__(self,
+ get_id_for_nick: Callable,
+ get_membership_prefixes: Callable,
+ purge_users: Callable,
+ **kwargs
+ ) -> None:
+ self._get_id_for_nick = get_id_for_nick
+ self._get_membership_prefixes = get_membership_prefixes
+ self._purge_users = purge_users
+ super().__init__(**kwargs)
+
+ def add_from_namreply(self, items: tuple[str, ...]):
+ 'Add to .user_ids items assumed as nicknames with membership prefixes.'
+ for item in items:
+ nickname = item.lstrip(self._get_membership_prefixes())
+ self.user_ids.append(self._get_id_for_nick(nickname))
+
+ def append_nick(self, nickname: str) -> None:
+ 'To .user_ids append .nickname and declare .user_ids complete.'
+ user_id = self._get_id_for_nick(nickname)
+ self.user_ids.append(user_id, complete=True)
+
+ def remove_nick(self, nickname: str) -> None:
+ 'From .user_ids remove .nickname and declare .user_ids complete.'
+ user_id = self._get_id_for_nick(nickname)
+ self.user_ids.remove(user_id, complete=True)
+ self._purge_users()
+
+
+class _NickUserHost(NickUserHost):
+
+ def __str__(self) -> str:
+ return f'{self.nick}!{self.user}@{self.host}'
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, NickUserHost):
+ return False
+ for key in NickUserHost.__annotations__:
+ if getattr(self, key) != getattr(other, key):
+ return False
+ return True
+
+ def __setattr__(self, key: str, value: NickUserHost | str) -> None:
+ if key == 'nickuserhost' and isinstance(value, _NickUserHost):
+ for annotated_key in NickUserHost.__annotations__:
+ setattr(self, annotated_key, getattr(value, annotated_key))
+ else:
+ super().__setattr__(key, value)
+
+ @classmethod
+ def from_str(cls, value: str) -> Self:
+ 'Produce from string assumed to fit _!_@_ pattern.'
+ toks = value.split('!')
+ assert len(toks) == 2
+ toks = toks[0:1] + toks[1].split('@')
+ assert len(toks) == 3
+ return cls(*toks)
+
+ @property
+ def incremented(self) -> str:
+ 'Return .nick with number suffix incremented, or "0" if none.'
+ name, digits = ([(self.nick, '')]
+ + [(self.nick[:i], self.nick[i:])
+ for i in range(len(self.nick), 0, -1)
+ if self.nick[i:].isdigit()]
+ )[-1]
+ return name + str(0 if not digits else (int(digits) + 1))
+
+
class _UpdatingServerCapability(_UpdatingMixin, ServerCapability):
pass
return id_
+class _CapsManager:
+
+ def __init__(self,
+ sender: Callable[[IrcMessage], None],
+ caps_dict: _UpdatingDict[_UpdatingServerCapability]
+ ) -> None:
+ self._dict = caps_dict
+ self._send = lambda *params: sender(IrcMessage('CAP', params=params))
+ self.clear()
+
+ def clear(self) -> None:
+ 'Zero internal knowledge.'
+ self._dict.clear()
+ self._ls = _CompletableStringsList()
+ self._list = _CompletableStringsList()
+ self._list_expectations: dict[str, set[str]] = {
+ 'ACK': set(), 'NAK': set()}
+
+ def start_negotation(self) -> None:
+ 'Call .clear, send CAPS LS 302.'
+ self.clear()
+ self._send('LS', '302')
+
+ def end_negotiation(self) -> None:
+ 'Stop negotation, without emptying caps DB.'
+ self._send('END')
+
+ def process_msg(self, verb: str, items: tuple[str, ...], complete: bool
+ ) -> bool:
+ 'Parse CAP message to negot. steps, DB inputs; return if successful.'
+ for item in items:
+ if verb == 'NEW':
+ key, data = _Dict.key_val_from_eq_str(item)
+ self._dict.set_updating(key, ServerCapability(data=data))
+ elif verb == 'DEL':
+ del self._dict[item]
+ elif verb in {'ACK', 'NACK'}:
+ self._list_expectations[verb].add(item)
+ if verb in {'LS', 'LIST'}:
+ target = getattr(self, f'_{verb.lower()}')
+ for item in items:
+ target.append(item)
+ if complete:
+ target.complete()
+ if target == self._ls:
+ for cap_name in _NAMES_DESIRED_SERVER_CAPS:
+ self._send('REQ', cap_name)
+ self._send('LIST')
+ elif target == self._list:
+ acks = self._list_expectations['ACK']
+ naks = self._list_expectations['NAK']
+ list_set = set(target.completed)
+ assert acks == list_set & acks
+ assert set() == list_set & naks
+ for key, data in [_Dict.key_val_from_eq_str(entry)
+ for entry in self._ls.completed]:
+ self._dict.set_updating(key, ServerCapability(
+ data=data, enabled=key in self._list.completed))
+ return True
+ return False
+
+
class Client(ABC, ClientQueueMixin):
'Abstracts socket connection, loop over it, and handling messages from it.'
_caps: _CapsManager
self._log(alert, alert=True)
elif ret['verb'] == '433': # ERR_NICKNAMEINUSE
self._log('nickname already in use, trying increment', alert=True)
- self.send(IrcMessage('NICK', (_nick_incremented(ret['used']),)))
+ self.send(IrcMessage(
+ 'NICK', (_NickUserHost(nick=ret['used']).incremented,)))
elif ret['verb'] in {'903', '904'}: # RPL_SASLSUCCESS, ERR_SASLFAIL
self._caps.end_negotiation()
elif ret['verb'] == 'AUTHENTICATE':
ch.remove_nick(ret['quitter'])
self._log(f'{ret["quitter"]} quits: {ret["message"]}',
LogScope.CHAT, target=ch_name)
+
+
+ClientsDb = dict[str, Client]
+
+
+@dataclass
+class NewClientEvent(AffectiveEvent):
+ 'Put Client .payload into ClientsDb target.'
+ payload: 'Client'
+
+ def affect(self, target: ClientsDb) -> None:
+ target[self.payload.client_id] = self.payload
+
+
+@dataclass
+class ClientEvent(AffectiveEvent, _ClientIdMixin):
+ 'To affect Client identified by ClientIdMixin.'
_LOG_PREFIX_IN = '<'
+@dataclass
+class _Update:
+ path: tuple[str, ...]
+ value: Optional[Any] = None
+
+
+class _UpdatingNode(AutoAttrMixin):
+ log_scopes: dict[tuple[str, ...], LogScope] = {tuple(): LogScope.SERVER}
+
+ def _make_attr(self, cls: Callable, key: str):
+ return cls()
+
+ @classmethod
+ def _scope(cls, path: tuple[str, ...]) -> LogScope:
+ scopes: dict[tuple[str, ...], LogScope] = {}
+ for c in cls.__mro__:
+ if hasattr(c, 'log_scopes'):
+ scopes = c.log_scopes | scopes
+ return scopes.get(path, scopes[tuple()])
+
+ def set_and_check_for_change(self, update: _Update
+ ) -> Optional[tuple[LogScope, Any]]:
+ 'Apply update, return if that actually made a difference.'
+ key = update.path[0]
+ node = self._get(key)
+ scope = self._scope(update.path)
+ if len(update.path) == 1:
+ if update.value is None:
+ if not self._is_set(key):
+ return None
+ self._unset(key)
+ return (scope, None)
+ if node == update.value:
+ return None
+ self._set(key, update.value)
+ return (scope, update.value)
+ return node.set_and_check_for_change(_Update(update.path[1:],
+ update.value))
+
+ def _get(self, key: str):
+ return getattr(self, key)
+
+ def _set(self, key: str, value) -> None:
+ setattr(self, key, value)
+
+ def _unset(self, key: str) -> None:
+ getattr(self, key).clear()
+
+ def _is_set(self, key: str) -> bool:
+ return hasattr(self, key)
+
+
+class _UpdatingDict(Dict[DictItem], _UpdatingNode):
+
+ def _get(self, key: str):
+ if key not in self._dict:
+ self._dict[key] = self._item_cls()
+ return self._dict[key]
+
+ def _set(self, key: str, value) -> None:
+ self._dict[key] = value
+
+ def _unset(self, key: str) -> None:
+ del self._dict[key]
+
+ def _is_set(self, key: str) -> bool:
+ return key in self._dict
+
+
class _ClientWindow(Window, ClientQueueMixin):
def __init__(self, scope: LogScope, log: Callable, **kwargs) -> None:
self._send_msg('PART', (self.chatname,))
-@dataclass
-class _Update:
- path: tuple[str, ...]
- value: Optional[Any] = None
-
-
-class _UpdatingNode(AutoAttrMixin):
- log_scopes: dict[tuple[str, ...], LogScope] = {tuple(): LogScope.SERVER}
-
- def _make_attr(self, cls: Callable, key: str):
- return cls()
-
- @classmethod
- def _scope(cls, path: tuple[str, ...]) -> LogScope:
- scopes: dict[tuple[str, ...], LogScope] = {}
- for c in cls.__mro__:
- if hasattr(c, 'log_scopes'):
- scopes = c.log_scopes | scopes
- return scopes.get(path, scopes[tuple()])
-
- def set_and_check_for_change(self, update: _Update
- ) -> Optional[tuple[LogScope, Any]]:
- 'Apply update, return if that actually made a difference.'
- key = update.path[0]
- node = self._get(key)
- scope = self._scope(update.path)
- if len(update.path) == 1:
- if update.value is None:
- if not self._is_set(key):
- return None
- self._unset(key)
- return (scope, None)
- if node == update.value:
- return None
- self._set(key, update.value)
- return (scope, update.value)
- return node.set_and_check_for_change(_Update(update.path[1:],
- update.value))
-
- def _get(self, key: str):
- return getattr(self, key)
-
- def _set(self, key: str, value) -> None:
- setattr(self, key, value)
-
- def _unset(self, key: str) -> None:
- getattr(self, key).clear()
-
- def _is_set(self, key: str) -> bool:
- return hasattr(self, key)
-
-
-class _UpdatingDict(Dict[DictItem], _UpdatingNode):
-
- def _get(self, key: str):
- if key not in self._dict:
- self._dict[key] = self._item_cls()
- return self._dict[key]
-
- def _set(self, key: str, value) -> None:
- self._dict[key] = value
-
- def _unset(self, key: str) -> None:
- del self._dict[key]
-
- def _is_set(self, key: str) -> bool:
- return key in self._dict
-
-
class _UpdatingChannel(_UpdatingNode):
user_ids: tuple[str, ...] = tuple()
log_scopes = {tuple(): LogScope.CHAT}