#!/usr/bin/env python3
'Attempt at an IRC client.'
+from abc import ABC, abstractmethod
from contextlib import contextmanager
from inspect import _empty as inspect_empty, signature, stack
from queue import SimpleQueue, Empty as QueueEmpty
'KEY_DOWN': ('prompt_scroll', 'down'),
'KEY_PGUP': ('log_scroll', 'up'),
'KEY_PGDOWN': ('log_scroll', 'down'),
- '[91, 49, 59, 51, 68]': ('buffer', 'left'),
- '[91, 49, 59, 51, 67]': ('buffer', 'right'),
+ '[91, 49, 59, 51, 68]': ('window', 'left'),
+ '[91, 49, 59, 51, 67]': ('window', 'right'),
}
IRCSPEC_LINE_SEPARATOR = b'\r\n'
self._q_to_main.eput('EXCEPTION', e)
-class LogBuffer:
- 'Collects line-shaped messages, scrolls and wraps them for display.'
- _display_size: YX
- _y_pgscroll: int
+class ScrollableWidget(ABC):
+ 'Defines some API shared between PromptWidget and LogWidget.'
+ _history_idx: int
- def __init__(self, term: Terminal) -> None:
- self._term = term
+ def __init__(self, write_yx: Callable[[YX, str], None]) -> None:
+ self._write_yx = write_yx
self._history: list[str] = []
- self._wrapped: list[tuple[int, str]] = []
- self._upscroll_history: int = 0
- self._upscroll_wrapped: int = 0
-
- def apply_geometry(self, limit_y: int) -> None:
- 'Calcs display conditions based on new display_size, stored scroll.'
- self._display_size = YX(limit_y, self._term.size.x)
- self._y_pgscroll = self._display_size.y // 2
- self._wrapped.clear()
- self._wrapped += [(-1, '')] * self._display_size.y
- self._upscroll_wrapped = 0
- if not self._history:
- return
- for idx_history, line in enumerate(self._history):
- self._add_wrapped(idx_history, line)
- last_by_upscroll_history = [
- t for t in self._wrapped
- if t[0] == len(self._history) - (self._upscroll_history + 1)]
- idx_last = self._wrapped.index(last_by_upscroll_history[-1])
- self._upscroll_wrapped = len(self._wrapped) - (idx_last + 1)
- def _add_wrapped(self, idx_original, line) -> int:
- wrapped_lines = self._term.wrap(line)
- self._wrapped += [(idx_original, line) for line in wrapped_lines]
- return len(wrapped_lines)
+ @abstractmethod
+ def set_geometry(self, measurements: Any) -> None:
+ 'Update widget\'s measurements, re-generate content where necessary.'
- def append(self, line) -> None:
- 'Adds line to history, and wrapped to .wrap; preserves scroll.'
- self._history += [line]
- n_wrapped = self._add_wrapped(len(self._history), line)
- if self._upscroll_wrapped > 0:
- self._upscroll_history += 1
- self._upscroll_wrapped += n_wrapped
+ @abstractmethod
+ def append(self, to_append: str) -> None:
+ 'Append to widget content.'
+ @abstractmethod
def draw(self) -> None:
- 'Print display_size/scroll-appropriate wrapped selection of lines.'
- start_idx = len(self._wrapped) - (self._display_size.y
- + self._upscroll_wrapped)
- to_write = [t[1] for t in
- self._wrapped[start_idx:-(self._upscroll_wrapped + 1)]]
- if self._upscroll_wrapped:
- scroll_info = f'vvv [{self._upscroll_wrapped}] '
- scroll_info += 'v' * (self._display_size.x - len(scroll_info))
- to_write += [scroll_info]
- else:
- to_write += [self._wrapped[-1][1]]
- for i, line in enumerate(to_write):
- self._term.write_yx(YX(i, 0), line)
+ 'Print widget\'s content in shape appropriate to applied geometry.'
+
+ @abstractmethod
+ def _scroll(self, up=True) -> None:
+ pass
def scroll(self, up=True) -> None:
- 'Scrolls view down by half of display size.'
- self._upscroll_wrapped = (
- min(len(self._wrapped[self._display_size.y:]) - 2,
- self._upscroll_wrapped + self._y_pgscroll)
- if up else max(0, self._upscroll_wrapped - self._y_pgscroll))
- self._upscroll_history = 0
- if self._upscroll_wrapped:
- idx_lowest = self._wrapped[-(self._upscroll_wrapped + 1)][0]
- self._upscroll_history = len(self._history) - (idx_lowest + 1)
+ 'Scroll through stored content/history.'
+ self._scroll(up)
self.draw()
-class TuiPrompt:
+class PromptWidget(ScrollableWidget):
'Keyboard-controlled command input field.'
- start_y: int
+ _start_y: int
- def __init__(self, term: Terminal) -> None:
- self._term = term
- self._buffer = ''
- self._history: list[str] = []
+ def __init__(self, *args, **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+ self._input_buffer = ''
self._history_idx = 0
- def append(self, char: str) -> None:
- 'Append char to current content.'
- self._buffer += char
+ def set_geometry(self, measurements: int) -> None:
+ self._start_y = measurements
+
+ def append(self, to_append: str) -> None:
+ self._input_buffer += to_append
self._history_idx = 0
self.draw()
+ def draw(self) -> None:
+ self._write_yx(YX(self._start_y, len(INPUT_PROMPT)),
+ f'{self._input_buffer}_')
+
+ def _scroll(self, up: bool = True) -> None:
+ if up and -(self._history_idx) < len(self._history):
+ if self._history_idx == 0 and self._input_buffer:
+ self._history += [self._input_buffer[:]]
+ self.clear()
+ self._history_idx -= 1
+ self._history_idx -= 1
+ elif (not up) and self._history_idx < 0:
+ self._history_idx += 1
+ if self._history_idx == 0:
+ self.clear()
+ return
+ else:
+ return
+ self._input_buffer = self._history[self._history_idx][:]
+ self.draw()
+
def backspace(self) -> None:
'Truncate current content by one character, if possible.'
- self._buffer = self._buffer[:-1]
+ self._input_buffer = self._input_buffer[:-1]
self._history_idx = 0
self.draw()
def clear(self) -> None:
'Empty current content.'
- self._buffer = ''
+ self._input_buffer = ''
self.draw()
- def draw(self) -> None:
- 'Print into screen..'
- self._term.write_yx(YX(self.start_y, len(INPUT_PROMPT)),
- f'{self._buffer}_')
-
def enter(self) -> str:
'Return current content while also clearing and then redrawing.'
- to_return = self._buffer[:]
+ to_return = self._input_buffer[:]
if to_return:
self._history += [to_return]
self.clear()
self.draw()
return to_return
- def scroll(self, up=True) -> None:
- 'Scroll through past prompt inputs.'
- if up and -(self._history_idx) < len(self._history):
- if self._history_idx == 0 and self._buffer:
- self._history += [self._buffer[:]]
- self.clear()
- self._history_idx -= 1
+
+class LogWidget(ScrollableWidget):
+ 'Collects line-shaped messages, scrolls and wraps them for display.'
+ _view_size: YX
+ _y_pgscroll: int
+
+ def __init__(self, wrap: Callable[[str], list[str]], *args, **kwargs
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self._wrap = wrap
+ self._wrapped_idx = self._history_idx = -1
+ self._wrapped: list[tuple[Optional[int], str]] = []
+
+ def _add_wrapped(self, idx_original, line) -> int:
+ wrapped_lines = self._wrap(line)
+ self._wrapped += [(idx_original, line) for line in wrapped_lines]
+ return len(wrapped_lines)
+
+ def set_geometry(self, measurements: YX) -> None:
+ self._view_size = measurements
+ self._y_pgscroll = self._view_size.y // 2
+ self._wrapped.clear()
+ self._wrapped += [(None, '')] * self._view_size.y
+ if not self._history:
+ return
+ for idx_history, line in enumerate(self._history):
+ self._add_wrapped(idx_history, line)
+ wrapped_lines_for_history_idx = [
+ t for t in self._wrapped
+ if t[0] == len(self._history) + self._history_idx]
+ idx_their_last = self._wrapped.index(wrapped_lines_for_history_idx[-1])
+ self._wrapped_idx = idx_their_last - len(self._wrapped)
+
+ def append(self, to_append: str) -> None:
+ self._history += [to_append]
+ n_wrapped_lines = self._add_wrapped(len(self._history) - 1, to_append)
+ if self._wrapped_idx < -1:
self._history_idx -= 1
- elif (not up) and self._history_idx < 0:
- self._history_idx += 1
- if self._history_idx == 0:
- self.clear()
- return
+ self._wrapped_idx -= n_wrapped_lines
+
+ def draw(self) -> None:
+ start_idx = self._wrapped_idx - self._view_size.y
+ end_idx = self._wrapped_idx - 1
+ to_write = [t[1] for t in self._wrapped[start_idx:end_idx]]
+ if self._wrapped_idx < -1:
+ scroll_info = f'vvv [{(-1) * self._wrapped_idx}] '
+ scroll_info += 'v' * (self._view_size.x - len(scroll_info))
+ to_write += [scroll_info]
else:
- return
- self._buffer = self._history[self._history_idx][:]
- self.draw()
+ to_write += [self._wrapped[-1][1]]
+ for i, line in enumerate(to_write):
+ self._write_yx(YX(i, 0), line)
+
+ def _scroll(self, up: bool = True) -> None:
+ if up:
+ self._wrapped_idx = max(self._view_size.y + 2 - len(self._wrapped),
+ self._wrapped_idx - self._y_pgscroll)
+ else:
+ self._wrapped_idx = min(-1,
+ self._wrapped_idx + self._y_pgscroll)
+ history_idx_to_wrapped_idx = self._wrapped[self._wrapped_idx][0]
+ assert history_idx_to_wrapped_idx is not None
+ self._history_idx = history_idx_to_wrapped_idx - len(self._history)
+
+
+class Window:
+ 'Collects a log and a prompt meant for the same content stream.'
+
+ def __init__(self, term: Terminal) -> None:
+ self._term = term
+ self.log = LogWidget(self._term.wrap, self._term.write_yx)
+ self.prompt = PromptWidget(self._term.write_yx)
+
+ def set_geometry(self, log_height: int, start_y_prompt: int) -> None:
+ 'Reconfigure included widgets\' geometry.'
+ self.log.set_geometry(YX(log_height, self._term.size.x))
+ self.prompt.set_geometry(start_y_prompt)
+
+ def draw(self) -> None:
+ 'Draw both log and prompt.'
+ self.log.draw()
+ self.prompt.draw()
class TuiLoop(Loop):
def __init__(self, term: Terminal, *args, **kwargs) -> None:
self._term = term
- self._logs = [LogBuffer(self._term) for i in range(2)]
- self._prompts = [TuiPrompt(self._term) for i in range(2)]
- self._buffer_idx = 0
+ self._windows = [Window(self._term) for i in range(2)]
+ self._window_idx = 0
self._calc_and_draw_all()
self._term.flush()
super().__init__(*args, **kwargs)
if not super().process_main(event):
return False
if event.type_ in {'ALERT', 'RECV', 'SEND'}:
- self._logs[0].append(f'{event.type_} {event.payload}')
+ self._windows[0].log.append(f'{event.type_} {event.payload}')
if event.type_ == 'RECV':
- self._logs[1].append(f'<- {event.payload.raw}')
+ self._windows[1].log.append(f'<- {event.payload.raw}')
elif event.type_ == 'SEND':
- self._logs[1].append(f'-> {event.payload.raw}')
- self._log.draw()
+ self._windows[1].log.append(f'-> {event.payload.raw}')
+ self._window.log.draw()
elif event.type_ == 'KEYBINDING':
getattr(self, f'_cmd__{event.payload[0]}')(*event.payload[1:])
elif event.type_ == 'INPUT_CHAR':
- self._prompt.append(event.payload)
+ self._window.prompt.append(event.payload)
elif event.type_ == 'SIGWINCH':
self._calc_and_draw_all()
# elif event.type_ == 'DEBUG':
return True
@property
- def _log(self) -> LogBuffer:
- return self._logs[self._buffer_idx]
-
- @property
- def _prompt(self) -> TuiPrompt:
- return self._prompts[self._buffer_idx]
+ def _window(self) -> Window:
+ return self._windows[self._window_idx]
def _calc_and_draw_all(self) -> None:
self._term.clear()
self._term.calc_geometry()
- for prompt in self._prompts:
- prompt.start_y = self._term.size.y - 1
- self._y_separator = self._term.size.y - 2
- for log in self._logs:
- log.apply_geometry(limit_y=self._y_separator)
- self._term.write_yx(YX(self._y_separator, 0), '=' * self._term.size.x)
- self._term.write_yx(YX(self._prompt.start_y, 0), INPUT_PROMPT)
- self._log.draw()
- self._prompt.draw()
+ y_prompt = self._term.size.y - 1
+ y_separator = self._term.size.y - 2
+ self._term.write_yx(YX(y_separator, 0), '=' * self._term.size.x)
+ self._term.write_yx(YX(y_prompt, 0), INPUT_PROMPT)
+ for window in self._windows:
+ window.set_geometry(y_separator, y_prompt)
+ self._window.draw()
def _cmd__prompt_backspace(self) -> None:
- self._prompt.backspace()
+ self._window.prompt.backspace()
def _cmd__prompt_enter(self) -> None:
- to_parse = self._prompt.enter()
+ to_parse = self._window.prompt.enter()
if not to_parse:
return
alert: Optional[str] = None
self.broadcast('ALERT', f'invalid prompt command: {alert}')
def _cmd__prompt_scroll(self, direction: str) -> None:
- self._prompt.scroll(up=direction == 'up')
+ self._window.prompt.scroll(up=direction == 'up')
def _cmd__log_scroll(self, direction: str) -> None:
- self._log.scroll(up=direction == 'up')
+ self._window.log.scroll(up=direction == 'up')
def _cmd__disconnect(self, quit_msg: str = 'ircplom says bye') -> None:
self.broadcast('SEND', IrcMessage('QUIT', [quit_msg]))
def _cmd__quit(self) -> None:
self.broadcast('QUIT')
- def _cmd__buffer(self, towards: str) -> Optional[str]:
- n_buffers = len(self._logs)
- if n_buffers < 2:
- return 'no alternate buffer to move into'
+ def _cmd__window(self, towards: str) -> Optional[str]:
+ n_windows = len(self._windows)
+ if n_windows < 2:
+ return 'no alternate window to move into'
if towards in {'left', 'right'}:
multiplier = (+1) if towards == 'right' else (-1)
- buffer_idx = self._buffer_idx + multiplier
- if not 0 <= buffer_idx < n_buffers:
- buffer_idx -= multiplier * n_buffers
+ window_idx = self._window_idx + multiplier
+ if not 0 <= window_idx < n_windows:
+ window_idx -= multiplier * n_windows
elif not towards.isdigit():
return f'neither "left"/"right" nor integer: {towards}'
else:
- buffer_idx = int(towards)
- if not 0 <= buffer_idx < n_buffers:
- return f'unavailable buffer idx: {buffer_idx}'
- self._buffer_idx = buffer_idx
- self._log.draw()
+ window_idx = int(towards)
+ if not 0 <= window_idx < n_windows:
+ return f'unavailable window idx: {window_idx}'
+ self._window_idx = window_idx
+ self._window.draw()
return None