from queue import SimpleQueue, Empty as QueueEmpty
from pathlib import Path
from time import sleep
-from textwrap import wrap
from typing import Callable, Generator, Iterator, Optional
from ircplom.events import Event, Loop, QueueMixin
from ircplom.client import IrcConnection, IrcConnSetup
class TestTerminal(QueueMixin, TerminalInterface):
'Collects keypresses from string queue, otherwise mostly dummy.'
+ _cursor_x: int
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._q_keypresses: SimpleQueue = SimpleQueue()
- self._screen: list[list[tuple[tuple[str, ...], str]]] = []
+ self._screen: list[list[Optional[tuple[tuple[str, ...], str]]]] = []
@contextmanager
def setup(self) -> Generator:
self.size = TerminalInterface.__annotations__['size'](24, 80)
self._screen.clear()
for _ in range(self.size.y):
- line: list[tuple[tuple[str, ...], str]] = []
+ line: list[Optional[tuple[tuple[str, ...], str]]] = []
self._screen += [line]
for _ in range(self.size.x):
line += [(tuple(), ' ')]
- def wrap(self, line: str) -> list[str]:
- return wrap(line, width=self.size.x, subsequent_indent=' '*4)
-
- def _length_to_terminal(self, text: str) -> int:
- return len(text)
+ def length_to_term(self, text: str) -> int:
+ return len(text) + len(tuple(c for c in text if c == '💓'))
def _write_w_attrs(self, text: str, attrs: tuple[str, ...]) -> None:
- for i, c in enumerate(text):
- self._screen[self._cursor_yx.y][self._cursor_yx.x + i] = (attrs, c)
+ for c in text:
+ len_to_term = self.length_to_term(c)
+ for i in range(len_to_term):
+ self._screen[self._cursor_y][self._cursor_x+i]\
+ = None if i else (attrs, c)
+ self._cursor_x += len_to_term
+
+ def write(self, y: int, text: str) -> None:
+ self._cursor_x = 0
+ super().write(y, text)
def _get_keypresses(self) -> Iterator[Optional[TuiEvent]]:
while True:
assert 0 <= y < self.size.y
if text.endswith(_SCREENLINE_PADDING_SUFFIX):
text = text[:-len(_SCREENLINE_PADDING_SUFFIX)]
- text += ' ' * (self.size.x - len(text))
+ text += ' ' * (self.size.x - self.length_to_term(text))
+ jumped_nones = 0
for idx, cell_expected in enumerate(
(tuple(attrs_str.split(_SEP_1)), c) for c in text):
+ cell_found = self._screen[y][idx+jumped_nones]
+ if cell_found is None:
+ jumped_nones += 1
+ continue
if _SCREENLINE_IGNORE_CHAR == cell_expected[1]:
continue
- cell_found = self._screen[y][idx]
info = ('AT_X', idx,
'CHAR EXPECTED', cell_expected, 'FOUND', cell_found,
'FULL LINE EXPECTED', text,
- 'FOUND', ''.join(t[1] for t in self._screen[y]))
- assert cell_expected == cell_found, info
+ 'FOUND', ''.join(t[1] for t in self._screen[y] if t))
+ assert cell_expected[1] == cell_found[1], info
+ if cell_expected[1] != ' ':
+ assert cell_expected[0] == cell_found[0], info
class _FakeIrcConnection(IrcConnection):
self._clients += [client]
return client
- def log(self, msg: str, **kwargs) -> None:
+ def log(self, msg: str, escape=True, **kwargs) -> None:
def test_after(cmd_name: str, args: tuple[str, ...], ret) -> None:
assert cmd_name == _MARK_LOG, f'WANTED {_MARK_LOG}, GOT {cmd_name}'
win_ids, logged_msg = ret
assert args[1] == msg_sans_time, info
assert expected_win_ids == win_ids, info
- self._playbook.test_wrap(None, test_after, super().log, msg, **kwargs)
+ self._playbook.test_wrap(None, test_after, super().log,
+ msg, escape=escape, **kwargs)
def cmd__prompt_enter(self) -> None:
def test_before(cmd_name: str, args: tuple[str, ...]) -> None:
LOG_FMT_ALERT: ('bold', 'bright_red'),
_LOG_PREFIX_DEFAULT: ('bright_cyan',),
}
+_WRAP_INDENT = 3
_MIN_HEIGHT = 4
_MIN_WIDTH = 32
CMD_SHORTCUTS: dict[str, str] = {}
+class FormattingString:
+ 'For inserting terminal formatting directives, and escaping their syntax.'
+ _BRACKET_IN = '{'
+ _BRACKET_OUT = '}'
+ _SEP = '|'
+
+ def __init__(self, text: str) -> None:
+ self._text = text
+
+ def __str__(self) -> str:
+ return self._text
+
+ def attrd(self, *attrs) -> str:
+ 'Wrap in formatting applying attrs.'
+ return (self._BRACKET_IN
+ + ','.join(list(attrs))
+ + self._SEP
+ + self._text
+ + self._BRACKET_OUT)
+
+ @classmethod
+ def _is_bracket(cls, c: str) -> bool:
+ return c in {cls._BRACKET_IN, cls._BRACKET_OUT}
+
+ def escape(self) -> str:
+ 'Preserve formatting characters by prefixing them with ._BRACKET_IN.'
+ return ''.join([(self._BRACKET_IN + c if self._is_bracket(c)
+ else c) for c in self._text])
+
+ def stripped(self) -> str:
+ 'Return without formatting directives.'
+ return ''.join([text for _, text in self.parts_w_attrs()])
+
+ @classmethod
+ def _parse_for_formattings(cls,
+ c: str,
+ in_format_def: bool,
+ formattings: list[str],
+ ) -> tuple[bool, bool]:
+ do_not_print = True
+ if in_format_def:
+ if cls._is_bracket(c):
+ do_not_print = False
+ in_format_def = False
+ assert not formattings[-1]
+ formattings.pop(-1)
+ elif c == cls._SEP:
+ in_format_def = False
+ else:
+ formattings[-1] += c
+ elif cls._is_bracket(c):
+ if c == cls._BRACKET_IN:
+ in_format_def = True
+ formattings += ['']
+ elif c == cls._BRACKET_OUT:
+ formattings.pop(-1)
+ else:
+ do_not_print = False
+ return do_not_print, in_format_def
+
+ def parts_w_attrs(self) -> tuple[tuple[tuple[str, ...], str], ...]:
+ 'Break into individually formatted parts, with respective attributes.'
+ next_part = ''
+ formattings: list[str] = []
+ in_format_def = False
+ to_ret: list[tuple[tuple[str, ...], str]] = []
+ for c in self.attrd():
+ if (not in_format_def) and self._is_bracket(c):
+ attrs_to_pass = ['on_black', 'bright_white']
+ for formatting in formattings:
+ for attr in [a for a in formatting.split(',') if a]:
+ if attr.startswith('bright_'):
+ attrs_to_pass[1] = attr
+ else:
+ attrs_to_pass += [attr]
+ to_ret += [(tuple(attrs_to_pass), next_part)]
+ next_part = ''
+ do_not_print, in_format_def\
+ = self._parse_for_formattings(c, in_format_def, formattings)
+ if do_not_print:
+ continue
+ next_part += c
+ return tuple(to_ret)
+
+ def wrap(self, width: int, length_to_term: Callable) -> tuple[str, ...]:
+ 'Break into sequence respecting width, preserving attributes per part.'
+ wrapped_lines: list[str] = []
+ next_part_w_code = ''
+ len_flattened = 0
+ idx = -1
+ formattings: list[str] = []
+ in_format_def = False
+ len_wrapped_prefix = 0
+ while idx+1 < len(self._text):
+ idx += 1
+ c = self._text[idx]
+ next_part_w_code += c
+ do_not_print, in_format_def\
+ = self._parse_for_formattings(c, in_format_def, formattings)
+ if do_not_print:
+ continue
+ len_flattened += length_to_term(c)
+ if len_flattened <= width:
+ continue
+ walk_back = ([-(i+1) for i in range(len(next_part_w_code)
+ - 1 - len_wrapped_prefix)
+ if next_part_w_code[-(i+1)].isspace()] + [-1])[0]
+ wrapped_lines += [next_part_w_code[:walk_back]]
+ idx = idx + walk_back
+ next_part_w_code = ''
+ for fmt in reversed(formattings):
+ next_part_w_code += self._BRACKET_IN + fmt + self._SEP
+ next_part_w_code += ' ' * _WRAP_INDENT
+ len_wrapped_prefix = len(next_part_w_code)
+ len_flattened = _WRAP_INDENT
+ wrapped_lines[-1] += self._BRACKET_OUT * len(formattings)
+ if next_part_w_code.rstrip():
+ wrapped_lines += [next_part_w_code]
+ assert not formattings
+ return tuple(wrapped_lines)
+
+
class _YX(NamedTuple):
y: int
x: int
def __init__(self,
maxlen_log: int,
- wrap: Callable[[str], list[str]],
+ length_to_term: Callable[[str], int],
**kwargs
) -> None:
super().__init__(**kwargs)
self._maxlen_log = maxlen_log
- self._wrap = wrap
- self._wrapped: list[tuple[int, str]] = []
+ self._length_to_term = length_to_term
+ self._formatted: list[tuple[int, str]] = []
self._newest_read_history_idx_pos = self._UNSET_IDX_POS
self._history_offset = 0
self._history_idx_neg = self._UNSET_IDX_NEG
- def _add_wrapped(self, history_idx_pos: int, line: str) -> int:
- wrapped_lines = self._wrap(line)
- self._wrapped += [(self._history_offset + history_idx_pos, line)
- for line in wrapped_lines]
- return len(wrapped_lines)
+ def _add_formatted(self, history_idx_pos: int, line: str) -> int:
+ attrs = []
+ for c in line.split(LOG_FMT_SEP, maxsplit=1)[0]:
+ attrs += list(LOG_FMT_ATTRS.get(c, tuple()))
+ formatted_line = FormattingString(line).attrd(*attrs)
+ formatted_lines = FormattingString(formatted_line
+ ).wrap(self._sizes.x,
+ self._length_to_term)
+ self._formatted += [(self._history_offset + history_idx_pos, line)
+ for line in formatted_lines]
+ return len(formatted_lines)
@property
def _len_full_history(self) -> int:
super().set_geometry(sizes)
if self._drawable:
self._y_pgscroll = self._sizes.y // 2
- self._wrapped.clear()
+ self._formatted.clear()
for history_idx_pos, line in enumerate(self._history):
- self._add_wrapped(history_idx_pos, line)
+ self._add_formatted(history_idx_pos, line)
# ensure that of the full line identified by ._history_idx_neg,
# ._wrapped_idx_neg point to the lowest of its wrap parts
self._wrapped_idx_neg = (
- self._UNSET_IDX_NEG if (not self._wrapped)
- else (-len(self._wrapped)
+ self._UNSET_IDX_NEG if (not self._formatted)
+ else (-len(self._formatted)
+ self._last_wrapped_idx_pos_for_hist_idx_pos(
self._len_full_history + max(self._history_idx_neg,
- self._maxlen_log))))
if not self._UNSET_IDX_NEG != self._history_idx_neg >= -1:
self._history_idx_neg -= 1
if self._drawable:
- n_wrapped = self._add_wrapped(len(self._history) - 1, to_append)
+ n_wrapped = self._add_formatted(len(self._history) - 1, to_append)
if not self._UNSET_IDX_NEG != self._wrapped_idx_neg >= -1:
self._wrapped_idx_neg -= n_wrapped
if len(self._history) > self._maxlen_log:
self._history_idx_neg = max(self._history_idx_neg,
-self._maxlen_log)
wrap_offset = 0
- for wrap_idx_pos, t in enumerate(self._wrapped):
+ for wrap_idx_pos, t in enumerate(self._formatted):
if t[0] == self._history_offset:
wrap_offset = wrap_idx_pos
break
- self._wrapped = self._wrapped[wrap_offset:]
+ self._formatted = self._formatted[wrap_offset:]
self._wrapped_idx_neg = max(self._wrapped_idx_neg,
- -len(self._wrapped))
+ -len(self._formatted))
def _draw(self) -> None:
add_scroll_info = self._wrapped_idx_neg < -1
start_idx_neg = (self._wrapped_idx_neg
- self._sizes.y + 1 + bool(add_scroll_info))
end_idx_neg = (self._wrapped_idx_neg + 1) if add_scroll_info else None
-
- wrapped = self._wrapped[start_idx_neg:end_idx_neg]
+ wrapped = self._formatted[start_idx_neg:end_idx_neg]
while len(wrapped) < self._sizes.y - bool(add_scroll_info):
wrapped.insert(0, (self._PADDING_HISTORY_IDX_POS, ''))
-
- to_write_w_attrs: list[tuple[list[str], str]] = []
- prev_history_idx_pos = self._COMPARAND_HISTORY_IDX_POS
- attrs: list[str]
- for history_idx_pos, line in wrapped:
- if history_idx_pos != prev_history_idx_pos:
- attrs = []
- for c in line.split(LOG_FMT_SEP, maxsplit=1)[0]:
- attrs += list(LOG_FMT_ATTRS.get(c, tuple()))
- prev_history_idx_pos = history_idx_pos
- to_write_w_attrs += [(attrs, line)]
-
+ for idx, line in enumerate([lt[1] for lt in wrapped]):
+ self._write(idx, line)
if add_scroll_info:
scroll_info = f'vvv [{(-1) * self._history_idx_neg - 1}] '
scroll_info += 'v' * (self._sizes.x - len(scroll_info))
- to_write_w_attrs += [(['reverse'], scroll_info)]
-
- for idx, line_t in enumerate(to_write_w_attrs):
- self._write(start_y=idx, attrs=line_t[0], msg=line_t[1])
-
+ self._write(len(wrapped),
+ FormattingString(scroll_info).attrd('reverse'))
self._newest_read_history_idx_pos\
= max(self._newest_read_history_idx_pos, wrapped[-1][0])
def bookmark(self) -> None:
'Store next idx to what most recent line we have (been) scrolled.'
bookmark = (self._BOOKMARK_HISTORY_IDX_POS, '-' * self._sizes.x)
- if bookmark in self._wrapped:
+ if bookmark in self._formatted:
bookmark_idx_neg\
- = self._wrapped.index(bookmark) - len(self._wrapped)
- del self._wrapped[bookmark_idx_neg]
+ = self._formatted.index(bookmark) - len(self._formatted)
+ del self._formatted[bookmark_idx_neg]
if bookmark_idx_neg > self._wrapped_idx_neg:
self._wrapped_idx_neg += 1
if self._newest_read_history_idx_pos < self._history_offset:
return
- if not self._wrapped:
+ if not self._formatted:
return
- self._wrapped.insert(self._bookmark_wrapped_idx_pos, bookmark)
+ self._formatted.insert(self._bookmark_wrapped_idx_pos, bookmark)
self._wrapped_idx_neg -= int(self._bookmark_wrapped_idx_neg
> self._wrapped_idx_neg)
@property
def _bookmark_wrapped_idx_neg(self) -> int:
- return self._bookmark_wrapped_idx_pos - len(self._wrapped)
+ return self._bookmark_wrapped_idx_pos - len(self._formatted)
def _last_wrapped_idx_pos_for_hist_idx_pos(self, hist_idx_pos: int) -> int:
- return [idx for idx, t in enumerate(self._wrapped)
+ return [idx for idx, t in enumerate(self._formatted)
if t[0] == hist_idx_pos][-1]
@property
def _scroll(self, up: bool = True) -> None:
super()._scroll(up)
- if self._drawable and self._wrapped:
- if up and len(self._wrapped) > 2:
+ if self._drawable and self._formatted:
+ if up and len(self._formatted) > 2:
self._wrapped_idx_neg = max(
- -len(self._wrapped),
+ -len(self._formatted),
self._wrapped_idx_neg - self._y_pgscroll)
else:
self._wrapped_idx_neg = min(
idx = self._wrapped_idx_neg - int(
self._wrapped_idx_neg == self._bookmark_wrapped_idx_neg)
self._history_idx_neg = (-self._len_full_history
- + max(0, self._wrapped[idx][0]))
+ + max(0, self._formatted[idx][0]))
class PromptWidget(_ScrollableWidget):
if len(to_write) > self._sizes.x:
to_write = (to_write[:self._sizes.x-len(_PROMPT_ELL_OUT)]
+ _PROMPT_ELL_OUT)
- self._write(to_write[:cursor_x_to_write], self._sizes.y,
- padding=False)
- self._write(to_write[cursor_x_to_write], attrs=['reverse'],
- padding=False)
- self._write(to_write[cursor_x_to_write + 1:])
+ if len(to_write) < self._sizes.x:
+ to_write += ' '
+ self._write(self._sizes.y,
+ to_write[:cursor_x_to_write]
+ + FormattingString(to_write[cursor_x_to_write]
+ ).attrd('reverse')
+ + to_write[cursor_x_to_write + 1:])
def _archive_prompt(self) -> None:
self.append(self.input_buffer)
left = f'{focused.title})'
right = f'({" ".join(listed)}'
width_gap = max(1, (self._sizes.x - len(left) - len(right)))
- self._write(left + '=' * width_gap + right, self._sizes.y)
+ self._write(self._sizes.y, left + '=' * width_gap + right)
class Window:
self.idx = idx
self._term = term
self.history = _HistoryWidget(maxlen_log=maxlen_log,
- wrap=self._term.wrap,
+ length_to_term=self._term.length_to_term,
write=self._term.write)
self.prompt = self.__annotations__['prompt'](write=self._term.write)
if hasattr(self._term, 'size'):
lines += ['']
lines[-1] += c
for y, line in enumerate(lines):
- self._term.write(line, y)
+ self._term.write(y, line)
def cmd__paste(self) -> None:
'Write OSC 52 ? sequence to get encoded clipboard paste into stdin.'
- self._term.write(f'\033{_OSC52_PREFIX.decode()}?{_PASTE_DELIMITER}', 0)
+ self._term.write(0, f'\033{_OSC52_PREFIX.decode()}?{_PASTE_DELIMITER}')
self.taint()
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
- self._cursor_yx = _YX(0, 0)
+ self._cursor_y = 0
@abstractmethod
@contextmanager
'Flush terminal.'
@abstractmethod
- def wrap(self, line: str) -> list[str]:
- 'Wrap line to list of lines fitting into terminal width.'
-
- @abstractmethod
- def _length_to_terminal(self, text: str) -> int:
- pass
+ def length_to_term(self, text: str) -> int:
+ 'How many cells of terminal screen text width will demand.'
@abstractmethod
def _write_w_attrs(self, text: str, attrs: tuple[str, ...]) -> None:
pass
- def write(self,
- msg: str = '',
- start_y: Optional[int] = None,
- attrs: Optional[list[str]] = None,
- padding: bool = True
- ) -> None:
- 'Print to terminal, with position, padding to line end, attributes.'
- if start_y is not None:
- self._cursor_yx = _YX(start_y, 0)
- # ._blessed.length can slow down things notably: only use where needed!
- end_x = self._cursor_yx.x + self._length_to_terminal(msg)
- len_padding = self.size.x - end_x
- if (len_padding := self.size.x - end_x) and padding:
- msg += ' ' * len_padding
- end_x = self.size.x
- attrs_to_pass = ['on_black', 'bright_white']
- for attr in attrs or []:
- if attr.startswith('bright_'):
- attrs_to_pass[1] = attr
- else:
- attrs_to_pass += [attr]
- self._write_w_attrs(msg, tuple(attrs_to_pass))
- self._cursor_yx = _YX(self._cursor_yx.y, end_x)
+ def write(self, y: int, text: str) -> None:
+ 'Write line of text at y, enacting FormattingString directives.'
+ self._cursor_y = y
+ attrs: tuple[str, ...] = tuple()
+ len_written = 0
+ for attrs, part in FormattingString(text).parts_w_attrs():
+ len_written += self.length_to_term(part)
+ self._write_w_attrs(part, attrs)
+ self._write_w_attrs(' ' * (self.size.x - len_written), tuple(attrs))
@abstractmethod
def _get_keypresses(self) -> Iterator[Optional[TuiEvent]]:
# separated to serve as hook for subclass window selection
return [self.window]
- def log(self, msg: str, **kwargs) -> Optional[tuple[tuple[int, ...], str]]:
+ def log(self, msg: str, escape=True, **kwargs
+ ) -> Optional[tuple[tuple[int, ...], str]]:
'Write with timestamp, prefix to what window ._log_target_wins offers.'
+ if escape:
+ msg = FormattingString(msg).escape()
prefix = kwargs.get('prefix', _LOG_PREFIX_DEFAULT)
now = str(datetime.now())
today, time = now[:10], now[11:19]
class Terminal(QueueMixin, TerminalInterface):
'Abstraction of terminal interface.'
- _cursor_yx_: _YX
+ _cursor_y_: int
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
print(self._blessed.normal_cursor, end='', flush=True)
@property
- def _cursor_yx(self) -> _YX:
- return self._cursor_yx_
+ def _cursor_y(self) -> int:
+ return self._cursor_y_
- @_cursor_yx.setter
- def _cursor_yx(self, yx: _YX) -> None:
- print(self._blessed.move_yx(yx.y, yx.x), end='')
- self._cursor_yx_ = yx
+ @_cursor_y.setter
+ def _cursor_y(self, y: int) -> None:
+ print(self._blessed.move_yx(y, 0), end='')
+ self._cursor_y_ = y
def set_size_hide_cursor(self) -> None:
# NB: see note on .setup why cursor hiding here rather than there
def flush(self) -> None:
print('', end='', flush=True)
- def wrap(self, line: str) -> list[str]:
- return self._blessed.wrap(line, width=self.size.x,
- subsequent_indent=' '*4)
-
- def _length_to_terminal(self, text: str) -> int:
+ def length_to_term(self, text: str) -> int:
+ # ._blessed.length can slow down things notably: only use where needed!
return len(text) if text.isascii() else self._blessed.length(text)
def _write_w_attrs(self, text: str, attrs: tuple[str, ...]) -> None: