'Attempt at an IRC client.'
from contextlib import contextmanager
+from inspect import _empty as inspect_empty, signature
from queue import SimpleQueue, Empty as QueueEmpty
from signal import SIGWINCH, signal
from socket import socket
self._logs[1].append(f'-> {event.payload.raw}')
self._draw_log()
elif event.type_ == 'KEYBINDING':
- getattr(self, f'_kb__{event.payload}')()
+ getattr(self, f'_cmd__{event.payload}')()
elif event.type_ == 'INPUT_CHAR':
self._prompt += event.payload
self._draw_prompt()
- elif event.type_ == 'BUFFER':
- if 0 <= event.payload < len(self._logs):
- self._log_selected = event.payload
- self._draw_log()
- else:
- self.broadcast('ALERT', 'invalid buffer number')
elif event.type_ == 'SIGWINCH':
self._calc_and_draw_all()
# elif event.type_ == 'DEBUG':
def _log(self) -> LogBuffer:
return self._logs[self._log_selected]
- def _kb__prompt_backspace(self) -> None:
+ def _cmd__prompt_backspace(self) -> None:
self._prompt = self._prompt[:-1]
self._draw_prompt()
- def _kb__prompt_enter(self) -> None:
+ def _cmd__prompt_enter(self) -> None:
if self._prompt:
- self.broadcast('PROMPT_COMMAND', self._prompt)
+ success = False
+ if len(self._prompt) > 1 and self._prompt[0] == '/':
+ toks = self._prompt[1:].split(maxsplit=1)
+ method_name = f'_cmd__{toks[0]}'
+ if hasattr(self, method_name):
+ method = getattr(self, method_name)
+ params = signature(method).parameters
+ n_args_max = len(params)
+ n_args_min = len([p for p in params.values()
+ if p.default == inspect_empty])
+ if len(toks) == 1 and not n_args_min:
+ success = method()
+ elif len(toks) > 1 and params\
+ and n_args_min <= len(toks[1].split()):
+ args = []
+ while len(toks) > 1 and n_args_max:
+ toks = toks[1].split(maxsplit=1)
+ args += [toks[0]]
+ n_args_max -= 1
+ success = method(*args)
+ if not success:
+ self.broadcast('ALERT',
+ f'invalid prompt command: {self._prompt}')
self._prompt = ''
self._draw_prompt()
- def _kb__scroll_up(self) -> None:
+ def _cmd__scroll_up(self) -> None:
self._log.scroll_up()
self._draw_log()
- def _kb__scroll_down(self) -> None:
+ def _cmd__scroll_down(self) -> None:
self._log.scroll_down()
self._draw_log()
+ def _cmd__disconnect(self, quit_msg: str = 'ircplom says bye') -> bool:
+ self.broadcast('SEND', IrcMessage('QUIT', [quit_msg]))
+ return True
+
+ def _cmd__quit(self) -> bool:
+ self.broadcast('QUIT')
+ return True
+
+ def _cmd__buffer(self, buffer_idx_str: str) -> bool:
+ if not buffer_idx_str.isdigit():
+ return False
+ buffer_idx = int(buffer_idx_str)
+ if 0 <= buffer_idx < len(self._logs):
+ self._log_selected = buffer_idx
+ self._draw_log()
+ else:
+ self.broadcast('ALERT', 'invalid buffer idx')
+ return True
+
def _calc_and_draw_all(self) -> None:
self._term.clear()
self._term.calc_geometry()
if msg.verb == 'PING':
q_to_main.eput('SEND',
IrcMessage('PONG', [msg.parameters[0]]))
- elif event.type_ == 'PROMPT_COMMAND':
- if event.payload[0] == '/':
- toks_init = event.payload[1:].split(maxsplit=1)
- command = toks_init[0]
- if command == 'disconnect':
- q_to_main.eput('SEND',
- IrcMessage('QUIT', toks_init[1:]))
- continue
- if command == 'quit' and len(toks_init) == 1:
- q_to_main.eput('QUIT')
- continue
- if command == 'buffer' and len(toks_init) == 2:
- if toks_init[1].isdigit():
- q_to_main.eput('BUFFER', int(toks_init[1]))
- continue
- q_to_main.eput('ALERT',
- f'invalid prompt command: {event.payload}')
if __name__ == '__main__':