from uuid import UUID, uuid4
# ourselves
from ircplom.db_primitives import (
- Clearable, Completable, CompletableStringsSet, Dict, UpdatingAttrsMixin,
- UpdatingCompletable, UpdatingCompletableStringsOrdered,
- UpdatingCompletableStringsSet, UpdatingDict, UpdatingMixin)
+ Clearable, Completable, CompletableStringsSet, DbLinked, DbLinking,
+ Dict, DictItem, UpdatingAttrsMixin, UpdatingCompletable,
+ UpdatingCompletableStringsOrdered, UpdatingCompletableStringsSet,
+ UpdatingDict, UpdatingMixin)
from ircplom.events import (
- AffectiveEvent, CrashingException, ExceptionEvent, QueueMixin)
+ AffectiveEvent, CrashingException, ExceptionEvent, QueueMixin)
from ircplom.irc_conn import (
- BaseIrcConnection, IrcConnException, IrcMessage, NickUserHost,
- ERR_STR_TIMEOUT, ILLEGAL_NICK_CHARS, ILLEGAL_NICK_FIRSTCHARS,
- ISUPPORT_DEFAULTS, PORT_SSL)
+ BaseIrcConnection, IrcConnException, IrcMessage, NickUserHost,
+ ERR_STR_TIMEOUT, ILLEGAL_NICK_CHARS, ILLEGAL_NICK_FIRSTCHARS,
+ ISUPPORT_DEFAULTS, PORT_SSL)
from ircplom.msg_parse_expectations import MSG_EXPECTATIONS
r'^Closing Link: \(Connection timed out\)$',
r'^Closing Link: \(Ping timeout: [0-9]+ seconds\)$'
)
+_DbLinked = DbLinked['_ClientDb']
+
+
+class _UpdatingDict(UpdatingDict[DictItem], _DbLinked):
+
+ def __getitem__(self, key: str) -> DictItem:
+ return self._extend_with_db(super().__getitem__(key))
def _tuple_key_val_from_eq_str(eq_str: str) -> tuple[str, str]:
NickUserHost(copy.nick, copy.user, copy.host))
-class _Channel(Channel):
- user_ids: CompletableStringsSet
- topic: _CompletableTopic
+class _UpdatingCompletableTopic(UpdatingCompletable, _CompletableTopic):
+ pass
- def __init__(self,
- userid_for_nickuserhost: Callable,
- get_membership_modes: Callable[[], dict[str, str]],
- get_chanmodes: Callable[[], dict[str, str]],
- purge_users: Callable,
- **kwargs
- ) -> None:
- self._userid_for_nickuserhost = userid_for_nickuserhost
- self._get_membership_modes = get_membership_modes
- self._get_chanmodes = get_chanmodes
- self.purge_users = purge_users
- super().__init__(**kwargs)
+
+class _Channel(UpdatingAttrsMixin, _DbLinked, Channel):
+ user_ids: UpdatingCompletableStringsSet
+ topic: _UpdatingCompletableTopic
+ exits: UpdatingDict[str]
+ modes_listy: UpdatingDict[tuple[str, ...]]
+ modes_valued: UpdatingDict[str]
def _id_from_nick(self, nick: str, create_if_none: bool) -> str:
- return self._userid_for_nickuserhost(NickUserHost(nick),
- create_if_none=create_if_none)
+ user_id = self._db.users.id_for_nickuserhost(NickUserHost(nick),
+ create_if_none)
+ assert user_id is not None
+ return user_id
def add_from_namreply(self, items: tuple[str, ...]) -> None:
'Add to .user_ids items assumed as nicknames with membership prefixes.'
for item in items:
mode = ''
- for mode, prefix in [
- (c, pfx) for c, pfx in self._get_membership_modes().items()
- if item.startswith(pfx)]:
+ for mode, prefix in [(c, pfx) for c, pfx
+ in self._db.get_membership_modes().items()
+ if item.startswith(pfx)]:
item = item.lstrip(prefix)
user_id = self._id_from_nick(item, True)
self.user_ids.completable_add(user_id, on_complete=False)
def join_user(self, user: '_User') -> None:
'Register non-"me" user joining channel.'
- user_id = self._userid_for_nickuserhost(nickuserhost=user,
- create_if_none=True,
- updating=True)
+ user_id = self._db.users.id_for_nickuserhost(nickuserhost=user,
+ create_if_none=True,
+ updating=True)
+ assert user_id is not None
if user_id != 'me': # own JOIN would have added name via RPL_NAMREPLY
self.user_ids.completable_add(user_id, on_complete=True)
self.exits[user.id_] = msg
self.user_ids.completable_remove(user.id_, on_complete=True)
del self.exits[user.id_]
- for c in [c for c in self._get_membership_modes().keys()
+ for c in [c for c in self._db.get_membership_modes().keys()
if user.id_ in self.modes_listy.get(c, tuple())]:
self.modes_listy[c] = tuple(uid for uid in self.modes_listy[c]
if uid != user.id_)
- self.purge_users()
+ self._db.users.purge()
def set_modes(self, modeset: str, args_str='') -> None:
'Apply MODE settings on channels.'
- prefix_modes = ''.join(self._get_membership_modes().keys())
- modes = {c: '' for c in 'ABCD'} | self._get_chanmodes()
+ prefix_modes = ''.join(self._db.get_membership_modes().keys())
+ modes = {c: '' for c in 'ABCD'} | self._db.get_chanmodes()
todos: list[tuple[bool, str, str]] = []
args = args_str.split()
idx_args = 0
return name + str(0 if not digits else (int(digits) + 1))
-class _User(_SetNickuserhostMixin, User):
+class _User(UpdatingAttrsMixin, _DbLinked, _SetNickuserhostMixin, User):
+ _modes: set[str]
- def __init__(self,
- names_channels_of_user: Callable,
- remove_from_channels: Callable,
- **kwargs) -> None:
- self._modes: set[str] = set()
- self.names_channels = lambda: names_channels_of_user(self)
- self._remove_from_channels = lambda target, msg: remove_from_channels(
- self, target, msg)
- super().__init__(**kwargs)
+ def names_channels(self) -> tuple[str, ...]:
+ 'Names of channels user currently inhabits.'
+ return self._db.channels.of_user(self)
def part(self, channel_name: str, exit_msg: str) -> None:
'First set .exit_msg, then remove from channel of channel_name.'
- self._remove_from_channels(channel_name, f'P{exit_msg}')
+ self._db.channels[channel_name].remove_user(self, f'P{exit_msg}')
def quit(self, exit_msg: str) -> None:
'First set .exit_msg, then remove from any channels.'
self.exit_msg = f'Q{exit_msg}'
- self._remove_from_channels('', self.exit_msg)
+ self._db.channels.remove_user(self, self.exit_msg)
@property
def id_(self) -> str:
@modes.setter
def modes(self, modeset: str) -> None:
+ if not hasattr(self, '_modes'):
+ self._modes = set()
operation, chars = modeset[:1], modeset[1:]
assert chars and operation in '+-'
for char in chars:
pass
-class _UpdatingCompletableTopic(UpdatingCompletable, _CompletableTopic):
- pass
-
-
-class _UpdatingChannel(UpdatingAttrsMixin, _Channel):
- user_ids: UpdatingCompletableStringsSet
- topic: _UpdatingCompletableTopic
- exits: UpdatingDict[str]
- modes_listy: UpdatingDict[tuple[str, ...]]
- modes_valued: UpdatingDict[str]
-
-
-class _UpdatingUser(UpdatingAttrsMixin, _User):
- pass
-
-
-class _UpdatingUsersDict(UpdatingDict[_UpdatingUser]):
+class _UsersDict(_UpdatingDict[_User]):
_top_id: int
- userlen: int
- def __getitem__(self, key: str) -> _UpdatingUser:
+ def __getitem__(self, key: str) -> _User:
user = super().__getitem__(key)
user.id_ = key
return user
# .nick by definition same, check other fields for updatability;
# allow where '?', or for set .user only to add "~" prefix, assert
# nothing else could have changed
- if stored.user == '?'\
- or nickuserhost.user == f'~{stored.user}'[:self.userlen]:
+ if stored.user == '?' or\
+ (nickuserhost.user
+ == f'~{stored.user}'[:int(self._db.isupport['USERLEN'])]):
assert updating
stored.user = nickuserhost.user
else:
del self[id_]
-class _UpdatingChannelsDict(UpdatingDict[_UpdatingChannel]):
+class _ChannelsDict(_UpdatingDict[_Channel]):
- def _of_user(self, user: _User) -> dict[str, _UpdatingChannel]:
+ def _of_user(self, user: _User) -> dict[str, _Channel]:
return {k: v for k, v in self._dict.items() if user.id_ in v.user_ids}
def of_user(self, user: _User) -> tuple[str, ...]:
'Return names of channels listing user as member.'
return tuple(self._of_user(user).keys())
- def remove_user(self, user: _User, target: str, msg: str) -> None:
- 'Remove user from channel named "target", or all with user if empty.'
- if target:
- self[target].remove_user(user, msg)
- else:
- for channel in self._of_user(user).values():
- channel.remove_user(user, msg)
+ def remove_user(self, user: _User, msg: str) -> None:
+ 'Remove user from all channels they are registered with.'
+ for channel in self._of_user(user).values():
+ channel.remove_user(user, msg)
-class _UpdatingIsupportDict(UpdatingDict[str]):
+class _IsupportDict(UpdatingDict[str]):
def __delitem__(self, key: str) -> None:
if key in ISUPPORT_DEFAULTS:
self[key] = value
-class _ClientDb(Clearable, UpdatingAttrsMixin, SharedClientDbFields):
+class _ClientDb(Clearable, UpdatingAttrsMixin, SharedClientDbFields, DbLinking
+ ):
_updates_cache: dict[tuple[str, ...], Any]
_keep_on_clear = frozenset(IrcConnSetup.__annotations__.keys())
caps: UpdatingDict[_UpdatingServerCapability]
- channels: _UpdatingChannelsDict
- isupport: _UpdatingIsupportDict
+ channels: _ChannelsDict
+ isupport: _IsupportDict
motd: UpdatingCompletableStringsOrdered
- users: _UpdatingUsersDict
+ users: _UsersDict
def __init__(self, **kwargs) -> None:
self._updates_cache = {}
super().__init__(**kwargs)
- def __getattribute__(self, key: str):
- attr = super().__getattribute__(key)
- if key == 'channels' and attr._preset_init_kwargs is None\
- and super().__getattribute__('users'
- )._preset_init_kwargs is not None:
- attr._preset_init_kwargs = {
- 'userid_for_nickuserhost': self.users.id_for_nickuserhost,
- 'get_membership_modes': self.get_membership_modes,
- 'get_chanmodes': self._get_chanmodes,
- 'purge_users': self.users.purge}
- elif key == 'users':
- attr.userlen = int(self.isupport['USERLEN'])
- if attr._preset_init_kwargs is None:
- attr._preset_init_kwargs = {
- 'names_channels_of_user': self.channels.of_user,
- 'remove_from_channels': self.channels.remove_user}
- elif key == 'caps' and attr._preset_init_kwargs is None:
- attr._preset_init_kwargs = {}
- return attr
-
def set_isupport_from_rpl(self, rpl: tuple[str, ...]) -> None:
'Parse rpl for additions/deletions to .isupport dict.'
for item in rpl:
return False
return True
- def _get_chanmodes(self) -> dict[str, str]:
+ def get_chanmodes(self) -> dict[str, str]:
'Parse CHANMODES into dict of mode-type char sequences.'
d = {}
idx_mode_types = 0
Channel, ChatMessage, Client, ClientQueueMixin, ImplementationFail,
IrcConnSetup, NewClientEvent, SendFail, ServerCapability,
SharedClientDbFields, TargetUserOffline, User)
-from ircplom.db_primitives import AutoAttrMixin, Dict, DictItem
+from ircplom.db_primitives import (
+ AutoAttrMixin, DbLinked, DbLinking, Dict, DictItem)
from ircplom.irc_conn import IrcMessage, NickUserHost
from ircplom.tui_base import (
BaseTui, PromptWidget, StylingString, TuiEvent, Window,
_PATH_LOGS = Path.home().joinpath('.local', 'share', 'ircplom', 'logs')
_PATH_CONFIG = Path.home().joinpath('.config', 'ircplom', 'ircplom.toml')
+_DbLinked = DbLinked['_TuiClientDb']
+
class _LogScope(Enum):
'Where log messages should go.'
return
result = (tuple(sorted(update.value)) if isinstance(update.value, set)
else update.value)
- announcement = ':' + ':'.join(update.full_path) + ' '
+ announcement = ':'.join(update.full_path) + ' '
if result == tuple():
announcement += 'emptied'
elif result is None:
announcement += ', '.join(
f'[{item}]' for item in (result if isinstance(result, tuple)
else (result, )))
- update.results += [(_LogScope.DEBUG, [announcement])]
+ update.results += [(_LogScope.DEBUG, announcement)]
def _get(self, key: str) -> Any:
return getattr(self, key)
def _get(self, key: str):
if key not in self._dict:
self._dict[key] = self._item_cls()
- return self._dict[key]
+ return self[key]
def _set(self, key: str, value) -> None:
self[key] = value
return key in self._dict
-class _UpdatingChannel(_UpdatingNode, Channel):
+class _UpdatingDbLinkedDict(_UpdatingDict[DictItem], _DbLinked):
+
+ def __getitem__(self, key: str) -> DictItem:
+ return self._extend_with_db(super().__getitem__(key))
+
+
+class _UpdatingChannel(_UpdatingNode, _DbLinked, Channel):
user_ids: set[str]
exits: _UpdatingDict[str]
modes_listy: _UpdatingDict[tuple[str, ...]]
modes_valued: _UpdatingDict[str]
- def prefixtok_for(self, user_id: str) -> str:
- 'Make ":{prefixes}" log token for user of user_id.'
- d_prefixes = self._get_membership_modes()
- return ':' + ''.join(d_prefixes[c]
- for c, ids in self.modes_listy.items()
- if c in d_prefixes and user_id in ids)
+ def prefix_for(self, user_id: str) -> str:
+ 'Construct prefixes string for user of user_id.'
+ d_prefixes = self._db.get_membership_modes()
+ return ''.join(d_prefixes[c] for c, ids in self.modes_listy.items()
+ if c in d_prefixes and user_id in ids)
def recursive_set_and_report_change(self, update: _Update) -> None:
def diff_in(base: tuple[str, ...], excluder: tuple[str, ...]
) -> tuple[str, ...]:
return tuple(id_ for id_ in base if id_ not in excluder)
- self._get_membership_modes = update.steps[0].get_membership_modes
super().recursive_set_and_report_change(update)
if update.full_path[2] == 'modes_listy'\
- and update.key in self._get_membership_modes().keys()\
+ and update.key in self._db.get_membership_modes().keys()\
and self.user_ids:
update.results += [
- (_LogScope.CHAT, [f'NICK:{id_}', f': gains {update.key}'])
+ (_LogScope.CHAT,
+ f'{self._db.users[id_].nick} gains {update.key}')
for id_ in diff_in(update.value, update.old_value)]
update.results += [
- (_LogScope.CHAT, [f'NICK:{id_}', f': loses {update.key}'])
+ (_LogScope.CHAT,
+ f'{self._db.users[id_].nick} loses {update.key}')
for id_ in diff_in(update.old_value, update.value)
if id_ in self.user_ids]
elif update.key == 'topic':
update.results += [
(_LogScope.CHAT,
- [f':{self.topic.who} set topic: {self.topic.what}'])]
+ f'{self.topic.who} set topic: {self.topic.what}')]
elif update.key == 'user_ids':
if not update.old_value:
- toks = []
- for id_ in sorted(update.value):
- toks += [self.prefixtok_for(id_), f'NICK:{id_}', ':, ']
update.results += [
- (_LogScope.CHAT, [':residents: '] + toks[:-1])]
+ (_LogScope.CHAT,
+ 'residents: '
+ + ', '.join(self.prefix_for(id_)
+ + self._db.users[id_].nick
+ for id_ in sorted(update.value)))]
else:
update.results += [
- (_LogScope.CHAT, [f'NUH:{id_}', ': joins'])
+ (_LogScope.CHAT, f'{self._db.users[id_]} joins')
for id_ in diff_in(update.value, update.old_value)]
update.results += [
- (_LogScope.CHAT, _UpdatingUser.exit_msg_toks(
- f'NUH:{id_}', self.exits[id_]))
+ (_LogScope.CHAT, _UpdatingUser.exit_msg_tok(
+ str(self._db.users[id_]), self.exits[id_]))
for id_ in diff_in(update.old_value, update.value)]
prev_nick = '?'
@staticmethod
- def exit_msg_toks(tok_who: str, exit_code: str) -> list[str]:
+ def exit_msg_tok(who: str, exit_code: str) -> str:
'Construct part/quit message from user identifier, exit_code.'
verb = 'quits' if exit_code[0] == 'Q' else 'parts'
exit_msg = exit_code[1:]
- msg_toks = [tok_who, f': {verb}']
+ msg = f'{who} {verb}'
if exit_msg:
- msg_toks += [f':: {exit_msg}']
- return msg_toks
+ msg += f': {exit_msg}'
+ return msg
def recursive_set_and_report_change(self, update: _Update) -> None:
super().recursive_set_and_report_change(update)
self.prev_nick = update.old_value
if update.old_value != '?':
update.results += [(_LogScope.USER,
- [f':{self.prev} renames {update.value}'])]
+ f'{self.prev} renames {update.value}')]
elif update.key == 'exit_msg' and update.value:
update.results += [(_LogScope.USER_NO_CHANNELS,
- self.exit_msg_toks(f':{self}', update.value))]
+ self.exit_msg_tok(str(self), update.value))]
@property
def prev(self) -> str:
pass
-class _TuiClientDb(_UpdatingNode, SharedClientDbFields):
- channels: _UpdatingDict[_UpdatingChannel]
+class _TuiClientDb(_UpdatingNode, SharedClientDbFields, DbLinking):
+ channels: _UpdatingDbLinkedDict[_UpdatingChannel]
caps: _UpdatingDict[_UpdatingServerCapability]
isupport: _UpdatingDict[str]
motd: tuple[str, ...] = tuple()
'- '
+ (('password: ' + self.password) if self.password
else 'no password')):
- update.results += [(_LogScope.SERVER, [f':{line}'])]
+ update.results += [(_LogScope.SERVER, line)]
elif update.value == 'connected':
- update.results += [(_LogScope.ALL, [':CONNECTED'])]
+ update.results += [(_LogScope.ALL, 'CONNECTED')]
elif update.value == '':
- update.results += [(_LogScope.ALL, [':NOT CONNECTED'])]
+ update.results += [(_LogScope.ALL, 'NOT CONNECTED')]
elif update.key == 'message' and update.value:
assert isinstance(update.value, ChatMessage)
is_server = not (update.value.sender or update.value.target)
is_me = update.value.target and not update.value.sender
brackets = '()' if update.value.is_notice else '[]'
- toks = [f':{brackets[0]}']
+ msg = StylingString(brackets[0])
if update.value.target in self.channels.keys() and (
id_ := 'me' if is_me
else ''.join([id_ for id_, user in self.users.items()
if user.nick == update.value.sender][:1])):
- toks += [self.channels[update.value.target].prefixtok_for(id_)]
- toks += ['RAW:{bold|']
- toks += ['NICK:me' if is_me else (':server' if is_server
- else f':{update.value.sender}')]
- toks += ['RAW:}']
- toks += [f':{brackets[1]} {update.value.content}']
- update.results += [(_LogScope.SERVER if is_server
- else _LogScope.CHAT, toks)]
+ msg += StylingString(
+ self.channels[update.value.target].prefix_for(id_))
+ msg += StylingString(self.users['me'].nick if is_me
+ else ('server' if is_server
+ else update.value.sender)
+ ).attrd('bold')
+ msg += StylingString(f'{brackets[1]} {update.value.content}')
+ update.results += [
+ (_LogScope.SERVER if is_server else _LogScope.CHAT, msg)]
elif update.key == 'motd' and update.value:
- update.results += [(_LogScope.SERVER, [f':{line}'])
+ update.results += [(_LogScope.SERVER, line)
for line in update.value]
return ret
def log(self,
- msg: StylingString,
+ msg: str | StylingString,
scope: _LogScope,
alert=False,
target='',
if not update.results:
return False
for scope, result in update.results:
- msg = StylingString('')
- for item in result:
- transform, content = item.split(':', maxsplit=1)
- if transform in {'NICK', 'NUH'}:
- nuh = self.db.users[content]
- content = str(nuh) if transform == 'NUH' else nuh.nick
- msg += StylingString(content, store_raw=transform == 'RAW')
out: Optional[bool] = None
target = ''
if update.full_path == ('message',):
elif scope in {_LogScope.CHAT, _LogScope.USER,
_LogScope.USER_NO_CHANNELS}:
target = update.full_path[1]
- self.log(msg, scope=scope, target=target, out=out)
+ self.log(msg=result, scope=scope, target=target, out=out)
for win in [w for w in self.windows if isinstance(w, _ChatWindow)]:
win.set_prompt_prefix()
return bool([w for w in self.windows if w.tainted])