home · contact · privacy
Restructure TuiPrompt and LogBuffer into Window and its ScrollableWidgets.
authorChristian Heller <c.heller@plomlompom.de>
Tue, 3 Jun 2025 15:11:43 +0000 (17:11 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Tue, 3 Jun 2025 15:11:43 +0000 (17:11 +0200)
ircplom.py

index 2fdd8029fecc5615d0c72427d6aaca8a77107eb4..09ebdc95e3b888e9e2e91ee6818c4b933974e608 100755 (executable)
@@ -1,6 +1,7 @@
 #!/usr/bin/env python3
 'Attempt at an IRC client.'
 
+from abc import ABC, abstractmethod
 from contextlib import contextmanager
 from inspect import _empty as inspect_empty, signature, stack
 from queue import SimpleQueue, Empty as QueueEmpty
@@ -29,8 +30,8 @@ KEYBINDINGS = {
     'KEY_DOWN': ('prompt_scroll', 'down'),
     'KEY_PGUP': ('log_scroll', 'up'),
     'KEY_PGDOWN': ('log_scroll', 'down'),
-    '[91, 49, 59, 51, 68]': ('buffer', 'left'),
-    '[91, 49, 59, 51, 67]': ('buffer', 'right'),
+    '[91, 49, 59, 51, 68]': ('window', 'left'),
+    '[91, 49, 59, 51, 67]': ('window', 'right'),
 }
 
 IRCSPEC_LINE_SEPARATOR = b'\r\n'
@@ -370,134 +371,176 @@ class Loop:
             self._q_to_main.eput('EXCEPTION', e)
 
 
-class LogBuffer:
-    'Collects line-shaped messages, scrolls and wraps them for display.'
-    _display_size: YX
-    _y_pgscroll: int
+class ScrollableWidget(ABC):
+    'Defines some API shared between PromptWidget and LogWidget.'
+    _history_idx: int
 
-    def __init__(self, term: Terminal) -> None:
-        self._term = term
+    def __init__(self, write_yx: Callable[[YX, str], None]) -> None:
+        self._write_yx = write_yx
         self._history: list[str] = []
-        self._wrapped: list[tuple[int, str]] = []
-        self._upscroll_history: int = 0
-        self._upscroll_wrapped: int = 0
-
-    def apply_geometry(self, limit_y: int) -> None:
-        'Calcs display conditions based on new display_size, stored scroll.'
-        self._display_size = YX(limit_y, self._term.size.x)
-        self._y_pgscroll = self._display_size.y // 2
-        self._wrapped.clear()
-        self._wrapped += [(-1, '')] * self._display_size.y
-        self._upscroll_wrapped = 0
-        if not self._history:
-            return
-        for idx_history, line in enumerate(self._history):
-            self._add_wrapped(idx_history, line)
-        last_by_upscroll_history = [
-                t for t in self._wrapped
-                if t[0] == len(self._history) - (self._upscroll_history + 1)]
-        idx_last = self._wrapped.index(last_by_upscroll_history[-1])
-        self._upscroll_wrapped = len(self._wrapped) - (idx_last + 1)
 
-    def _add_wrapped(self, idx_original, line) -> int:
-        wrapped_lines = self._term.wrap(line)
-        self._wrapped += [(idx_original, line) for line in wrapped_lines]
-        return len(wrapped_lines)
+    @abstractmethod
+    def set_geometry(self, measurements: Any) -> None:
+        'Update widget\'s measurements, re-generate content where necessary.'
 
-    def append(self, line) -> None:
-        'Adds line to history, and wrapped to .wrap; preserves scroll.'
-        self._history += [line]
-        n_wrapped = self._add_wrapped(len(self._history), line)
-        if self._upscroll_wrapped > 0:
-            self._upscroll_history += 1
-            self._upscroll_wrapped += n_wrapped
+    @abstractmethod
+    def append(self, to_append: str) -> None:
+        'Append to widget content.'
 
+    @abstractmethod
     def draw(self) -> None:
-        'Print display_size/scroll-appropriate wrapped selection of lines.'
-        start_idx = len(self._wrapped) - (self._display_size.y
-                                          + self._upscroll_wrapped)
-        to_write = [t[1] for t in
-                    self._wrapped[start_idx:-(self._upscroll_wrapped + 1)]]
-        if self._upscroll_wrapped:
-            scroll_info = f'vvv [{self._upscroll_wrapped}] '
-            scroll_info += 'v' * (self._display_size.x - len(scroll_info))
-            to_write += [scroll_info]
-        else:
-            to_write += [self._wrapped[-1][1]]
-        for i, line in enumerate(to_write):
-            self._term.write_yx(YX(i, 0), line)
+        'Print widget\'s content in shape appropriate to applied geometry.'
+
+    @abstractmethod
+    def _scroll(self, up=True) -> None:
+        pass
 
     def scroll(self, up=True) -> None:
-        'Scrolls view down by half of display size.'
-        self._upscroll_wrapped = (
-                min(len(self._wrapped[self._display_size.y:]) - 2,
-                    self._upscroll_wrapped + self._y_pgscroll)
-                if up else max(0, self._upscroll_wrapped - self._y_pgscroll))
-        self._upscroll_history = 0
-        if self._upscroll_wrapped:
-            idx_lowest = self._wrapped[-(self._upscroll_wrapped + 1)][0]
-            self._upscroll_history = len(self._history) - (idx_lowest + 1)
+        'Scroll through stored content/history.'
+        self._scroll(up)
         self.draw()
 
 
-class TuiPrompt:
+class PromptWidget(ScrollableWidget):
     'Keyboard-controlled command input field.'
-    start_y: int
+    _start_y: int
 
-    def __init__(self, term: Terminal) -> None:
-        self._term = term
-        self._buffer = ''
-        self._history: list[str] = []
+    def __init__(self, *args, **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self._input_buffer = ''
         self._history_idx = 0
 
-    def append(self, char: str) -> None:
-        'Append char to current content.'
-        self._buffer += char
+    def set_geometry(self, measurements: int) -> None:
+        self._start_y = measurements
+
+    def append(self, to_append: str) -> None:
+        self._input_buffer += to_append
         self._history_idx = 0
         self.draw()
 
+    def draw(self) -> None:
+        self._write_yx(YX(self._start_y, len(INPUT_PROMPT)),
+                       f'{self._input_buffer}_')
+
+    def _scroll(self, up: bool = True) -> None:
+        if up and -(self._history_idx) < len(self._history):
+            if self._history_idx == 0 and self._input_buffer:
+                self._history += [self._input_buffer[:]]
+                self.clear()
+                self._history_idx -= 1
+            self._history_idx -= 1
+        elif (not up) and self._history_idx < 0:
+            self._history_idx += 1
+            if self._history_idx == 0:
+                self.clear()
+                return
+        else:
+            return
+        self._input_buffer = self._history[self._history_idx][:]
+        self.draw()
+
     def backspace(self) -> None:
         'Truncate current content by one character, if possible.'
-        self._buffer = self._buffer[:-1]
+        self._input_buffer = self._input_buffer[:-1]
         self._history_idx = 0
         self.draw()
 
     def clear(self) -> None:
         'Empty current content.'
-        self._buffer = ''
+        self._input_buffer = ''
         self.draw()
 
-    def draw(self) -> None:
-        'Print into screen..'
-        self._term.write_yx(YX(self.start_y, len(INPUT_PROMPT)),
-                            f'{self._buffer}_')
-
     def enter(self) -> str:
         'Return current content while also clearing and then redrawing.'
-        to_return = self._buffer[:]
+        to_return = self._input_buffer[:]
         if to_return:
             self._history += [to_return]
             self.clear()
             self.draw()
         return to_return
 
-    def scroll(self, up=True) -> None:
-        'Scroll through past prompt inputs.'
-        if up and -(self._history_idx) < len(self._history):
-            if self._history_idx == 0 and self._buffer:
-                self._history += [self._buffer[:]]
-                self.clear()
-                self._history_idx -= 1
+
+class LogWidget(ScrollableWidget):
+    'Collects line-shaped messages, scrolls and wraps them for display.'
+    _view_size: YX
+    _y_pgscroll: int
+
+    def __init__(self, wrap: Callable[[str], list[str]], *args, **kwargs
+                 ) -> None:
+        super().__init__(*args, **kwargs)
+        self._wrap = wrap
+        self._wrapped_idx = self._history_idx = -1
+        self._wrapped: list[tuple[Optional[int], str]] = []
+
+    def _add_wrapped(self, idx_original, line) -> int:
+        wrapped_lines = self._wrap(line)
+        self._wrapped += [(idx_original, line) for line in wrapped_lines]
+        return len(wrapped_lines)
+
+    def set_geometry(self, measurements: YX) -> None:
+        self._view_size = measurements
+        self._y_pgscroll = self._view_size.y // 2
+        self._wrapped.clear()
+        self._wrapped += [(None, '')] * self._view_size.y
+        if not self._history:
+            return
+        for idx_history, line in enumerate(self._history):
+            self._add_wrapped(idx_history, line)
+        wrapped_lines_for_history_idx = [
+                t for t in self._wrapped
+                if t[0] == len(self._history) + self._history_idx]
+        idx_their_last = self._wrapped.index(wrapped_lines_for_history_idx[-1])
+        self._wrapped_idx = idx_their_last - len(self._wrapped)
+
+    def append(self, to_append: str) -> None:
+        self._history += [to_append]
+        n_wrapped_lines = self._add_wrapped(len(self._history) - 1, to_append)
+        if self._wrapped_idx < -1:
             self._history_idx -= 1
-        elif (not up) and self._history_idx < 0:
-            self._history_idx += 1
-            if self._history_idx == 0:
-                self.clear()
-                return
+            self._wrapped_idx -= n_wrapped_lines
+
+    def draw(self) -> None:
+        start_idx = self._wrapped_idx - self._view_size.y
+        end_idx = self._wrapped_idx - 1
+        to_write = [t[1] for t in self._wrapped[start_idx:end_idx]]
+        if self._wrapped_idx < -1:
+            scroll_info = f'vvv [{(-1) * self._wrapped_idx}] '
+            scroll_info += 'v' * (self._view_size.x - len(scroll_info))
+            to_write += [scroll_info]
         else:
-            return
-        self._buffer = self._history[self._history_idx][:]
-        self.draw()
+            to_write += [self._wrapped[-1][1]]
+        for i, line in enumerate(to_write):
+            self._write_yx(YX(i, 0), line)
+
+    def _scroll(self, up: bool = True) -> None:
+        if up:
+            self._wrapped_idx = max(self._view_size.y + 2 - len(self._wrapped),
+                                    self._wrapped_idx - self._y_pgscroll)
+        else:
+            self._wrapped_idx = min(-1,
+                                    self._wrapped_idx + self._y_pgscroll)
+        history_idx_to_wrapped_idx = self._wrapped[self._wrapped_idx][0]
+        assert history_idx_to_wrapped_idx is not None
+        self._history_idx = history_idx_to_wrapped_idx - len(self._history)
+
+
+class Window:
+    'Collects a log and a prompt meant for the same content stream.'
+
+    def __init__(self, term: Terminal) -> None:
+        self._term = term
+        self.log = LogWidget(self._term.wrap, self._term.write_yx)
+        self.prompt = PromptWidget(self._term.write_yx)
+
+    def set_geometry(self, log_height: int, start_y_prompt: int) -> None:
+        'Reconfigure included widgets\' geometry.'
+        self.log.set_geometry(YX(log_height, self._term.size.x))
+        self.prompt.set_geometry(start_y_prompt)
+
+    def draw(self) -> None:
+        'Draw both log and prompt.'
+        self.log.draw()
+        self.prompt.draw()
 
 
 class TuiLoop(Loop):
@@ -505,9 +548,8 @@ class TuiLoop(Loop):
 
     def __init__(self, term: Terminal, *args, **kwargs) -> None:
         self._term = term
-        self._logs = [LogBuffer(self._term) for i in range(2)]
-        self._prompts = [TuiPrompt(self._term) for i in range(2)]
-        self._buffer_idx = 0
+        self._windows = [Window(self._term) for i in range(2)]
+        self._window_idx = 0
         self._calc_and_draw_all()
         self._term.flush()
         super().__init__(*args, **kwargs)
@@ -516,16 +558,16 @@ class TuiLoop(Loop):
         if not super().process_main(event):
             return False
         if event.type_ in {'ALERT', 'RECV', 'SEND'}:
-            self._logs[0].append(f'{event.type_} {event.payload}')
+            self._windows[0].log.append(f'{event.type_} {event.payload}')
             if event.type_ == 'RECV':
-                self._logs[1].append(f'<- {event.payload.raw}')
+                self._windows[1].log.append(f'<- {event.payload.raw}')
             elif event.type_ == 'SEND':
-                self._logs[1].append(f'-> {event.payload.raw}')
-            self._log.draw()
+                self._windows[1].log.append(f'-> {event.payload.raw}')
+            self._window.log.draw()
         elif event.type_ == 'KEYBINDING':
             getattr(self, f'_cmd__{event.payload[0]}')(*event.payload[1:])
         elif event.type_ == 'INPUT_CHAR':
-            self._prompt.append(event.payload)
+            self._window.prompt.append(event.payload)
         elif event.type_ == 'SIGWINCH':
             self._calc_and_draw_all()
         # elif event.type_ == 'DEBUG':
@@ -540,31 +582,25 @@ class TuiLoop(Loop):
         return True
 
     @property
-    def _log(self) -> LogBuffer:
-        return self._logs[self._buffer_idx]
-
-    @property
-    def _prompt(self) -> TuiPrompt:
-        return self._prompts[self._buffer_idx]
+    def _window(self) -> Window:
+        return self._windows[self._window_idx]
 
     def _calc_and_draw_all(self) -> None:
         self._term.clear()
         self._term.calc_geometry()
-        for prompt in self._prompts:
-            prompt.start_y = self._term.size.y - 1
-        self._y_separator = self._term.size.y - 2
-        for log in self._logs:
-            log.apply_geometry(limit_y=self._y_separator)
-        self._term.write_yx(YX(self._y_separator, 0), '=' * self._term.size.x)
-        self._term.write_yx(YX(self._prompt.start_y, 0), INPUT_PROMPT)
-        self._log.draw()
-        self._prompt.draw()
+        y_prompt = self._term.size.y - 1
+        y_separator = self._term.size.y - 2
+        self._term.write_yx(YX(y_separator, 0), '=' * self._term.size.x)
+        self._term.write_yx(YX(y_prompt, 0), INPUT_PROMPT)
+        for window in self._windows:
+            window.set_geometry(y_separator, y_prompt)
+        self._window.draw()
 
     def _cmd__prompt_backspace(self) -> None:
-        self._prompt.backspace()
+        self._window.prompt.backspace()
 
     def _cmd__prompt_enter(self) -> None:
-        to_parse = self._prompt.enter()
+        to_parse = self._window.prompt.enter()
         if not to_parse:
             return
         alert: Optional[str] = None
@@ -596,10 +632,10 @@ class TuiLoop(Loop):
             self.broadcast('ALERT', f'invalid prompt command: {alert}')
 
     def _cmd__prompt_scroll(self, direction: str) -> None:
-        self._prompt.scroll(up=direction == 'up')
+        self._window.prompt.scroll(up=direction == 'up')
 
     def _cmd__log_scroll(self, direction: str) -> None:
-        self._log.scroll(up=direction == 'up')
+        self._window.log.scroll(up=direction == 'up')
 
     def _cmd__disconnect(self, quit_msg: str = 'ircplom says bye') -> None:
         self.broadcast('SEND', IrcMessage('QUIT', [quit_msg]))
@@ -607,23 +643,23 @@ class TuiLoop(Loop):
     def _cmd__quit(self) -> None:
         self.broadcast('QUIT')
 
-    def _cmd__buffer(self, towards: str) -> Optional[str]:
-        n_buffers = len(self._logs)
-        if n_buffers < 2:
-            return 'no alternate buffer to move into'
+    def _cmd__window(self, towards: str) -> Optional[str]:
+        n_windows = len(self._windows)
+        if n_windows < 2:
+            return 'no alternate window to move into'
         if towards in {'left', 'right'}:
             multiplier = (+1) if towards == 'right' else (-1)
-            buffer_idx = self._buffer_idx + multiplier
-            if not 0 <= buffer_idx < n_buffers:
-                buffer_idx -= multiplier * n_buffers
+            window_idx = self._window_idx + multiplier
+            if not 0 <= window_idx < n_windows:
+                window_idx -= multiplier * n_windows
         elif not towards.isdigit():
             return f'neither "left"/"right" nor integer: {towards}'
         else:
-            buffer_idx = int(towards)
-            if not 0 <= buffer_idx < n_buffers:
-                return f'unavailable buffer idx: {buffer_idx}'
-        self._buffer_idx = buffer_idx
-        self._log.draw()
+            window_idx = int(towards)
+            if not 0 <= window_idx < n_windows:
+                return f'unavailable window idx: {window_idx}'
+        self._window_idx = window_idx
+        self._window.draw()
         return None