verb: str,
parameters: Optional[list[str]] = None,
source: str = '',
- tags: Optional[dict[str, str]] = None
+ tags: Optional[dict[str, str]] = None,
+ raw: Optional[str] = None
) -> None:
+ self._raw = raw
self.verb: str = verb
self.parameters: list[str] = parameters or []
self.source: str = source
if stage.prefix_char:
continue
harvest[stage.name] += char
- return cls(**{s.name: s.processor(harvest[s.name]) for s in stages})
+ return cls(**{s.name: s.processor(harvest[s.name]) for s in stages},
+ raw=raw_msg)
- def send(self, conn: Connection) -> None:
- 'Send self to conn encoded into line.'
- to_combine = []
- if self.tags:
- tag_strs = []
- for key, val in self.tags.items():
- tag_strs += [key]
- if val:
+ @property
+ def raw(self) -> str:
+ 'Return raw message code – create from known fields if necessary.'
+ if not self._raw:
+ to_combine = []
+ if self.tags:
+ tag_strs = []
+ for key, val in self.tags.items():
+ tag_strs += [key]
+ if not val:
+ continue
for repl_with, to_repl in reversed(IRCSPEC_TAG_ESCAPES):
val = val.replace(to_repl, repl_with)
tag_strs[-1] += f'={val}'
- to_combine += ['@' + ';'.join(tag_strs)]
- to_combine += [self.verb]
- if self.parameters:
- to_combine += self.parameters[:-1]
- to_combine += [f':{self.parameters[-1]}']
- conn.write_line(' '.join(to_combine))
+ to_combine += ['@' + ';'.join(tag_strs)]
+ to_combine += [self.verb]
+ if self.parameters:
+ to_combine += self.parameters[:-1]
+ to_combine += [f':{self.parameters[-1]}']
+ self._raw = ' '.join(to_combine)
+ return self._raw
+
+ def send(self, conn: Connection) -> None:
+ 'Send self to conn encoded into line.'
+ conn.write_line(self.raw)
class Loop:
def __init__(self, term: Terminal, *args, **kwargs) -> None:
self._term = term
self._prompt = ''
- self._log_buffer = LogBuffer(self._term.wrap)
+ self._logs = [LogBuffer(self._term.wrap) for i in range(2)]
+ self._log_selected = 0
self._upscroll = 0
self._calc_and_draw_all()
self._term.flush()
if not super().process_main(event):
return False
if event.type_ in {'ALERT', 'RECV', 'SEND'}:
- self._log_buffer.append(f'{event.type_} {event.payload}')
+ self._logs[0].append(f'{event.type_} {event.payload}')
+ if event.type_ == 'RECV':
+ self._logs[1].append(f'<- {event.payload.raw}')
+ elif event.type_ == 'SEND':
+ self._logs[1].append(f'-> {event.payload.raw}')
self._draw_log()
elif event.type_ == 'KEYBINDING':
getattr(self, f'_kb__{event.payload}')()
elif event.type_ == 'INPUT_CHAR':
self._prompt += event.payload
self._draw_prompt()
+ elif event.type_ == 'BUFFER':
+ if 0 <= event.payload < len(self._logs):
+ self._log_selected = event.payload
+ self._draw_log()
+ else:
+ self.broadcast('ALERT', 'invalid buffer number')
elif event.type_ == 'SIGWINCH':
self._calc_and_draw_all()
# elif event.type_ == 'DEBUG':
# from traceback import format_exception
- # self._log_buffer += [
+ # self._log+= [
# f'DEBUG {line}' for line
# in '\n'.join(format_exception(event.payload)).split('\n')]
# self._draw_log()
self._term.flush()
return True
+ @property
+ def _log(self) -> LogBuffer:
+ return self._logs[self._log_selected]
+
def _kb__prompt_backspace(self) -> None:
self._prompt = self._prompt[:-1]
self._draw_prompt()
self._draw_prompt()
def _kb__scroll_up(self) -> None:
- self._log_buffer.scroll_up()
+ self._log.scroll_up()
self._draw_log()
def _kb__scroll_down(self) -> None:
- self._log_buffer.scroll_down()
+ self._log.scroll_down()
self._draw_log()
def _calc_and_draw_all(self) -> None:
self._term.calc_geometry()
self._y_prompt = self._term.size.y - 1
self._y_separator = self._term.size.y - 2
- self._log_buffer.apply_geometry(YX(self._y_separator,
- self._term.size.x))
+ for log in self._logs:
+ log.apply_geometry(YX(self._y_separator, self._term.size.x))
self._draw_frame()
self._draw_log()
self._draw_prompt()
self._term.write_yx(YX(self._y_prompt, 0), INPUT_PROMPT)
def _draw_log(self) -> None:
- for i, line in enumerate(self._log_buffer.wrapped):
+ for i, line in enumerate(self._log.wrapped):
self._term.write_yx(YX(i, 0), line)
def _draw_prompt(self) -> None:
if command == 'quit' and len(toks_init) == 1:
q_to_main.eput('QUIT')
continue
+ if command == 'buffer' and len(toks_init) == 2:
+ if toks_init[1].isdigit():
+ q_to_main.eput('BUFFER', int(toks_init[1]))
+ continue
q_to_main.eput('ALERT',
f'invalid prompt command: {event.payload}')