home · contact · privacy
(Selectively) turn prompt input into IrcMessages.
authorChristian Heller <c.heller@plomlompom.de>
Fri, 30 May 2025 12:22:13 +0000 (14:22 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Fri, 30 May 2025 12:22:13 +0000 (14:22 +0200)
ircplom.py

index fabf0a0f1847f0ffca3b9ca744e79e49a1f01221..53b0788168a9f8ae06a5ebde6c919f16bcba927e 100755 (executable)
@@ -174,12 +174,22 @@ class Connection:
         self._socket.sendall(line.encode('utf-8') + IRCSPEC_LINE_SEPARATOR)
 
 
-class IrcMessage(NamedTuple):
+class IrcMessage:
     'Properly structured representation of IRC message as per IRCv3 spec.'
-    tags: dict[str, str]
-    source: str
-    verb: str
-    parameters: list[str]
+
+    def __init__(self,
+                 verb: str,
+                 parameters: Optional[list[str]] = None,
+                 source: str = '',
+                 tags: Optional[dict[str, str]] = None
+                 ) -> None:
+        self.verb: str = verb
+        self.parameters: list[str] = parameters or []
+        self.source: str = source
+        self.tags: dict[str, str] = tags or {}
+
+    def __str__(self) -> str:
+        return f'[{self.tags}[{self.source}[{self.verb}]]][{self.parameters}]'
 
     @classmethod
     def from_raw(cls, raw_msg: str) -> Self:
@@ -195,8 +205,8 @@ class IrcMessage(NamedTuple):
             for str_tag in [s for s in str_tags.split(';') if s]:
                 if '=' in str_tag:
                     key, val = str_tag.split('=', maxsplit=1)
-                    for to_replace, replace_with in IRCSPEC_TAG_ESCAPES:
-                        val = val.replace(to_replace, replace_with)
+                    for to_repl, repl_with in IRCSPEC_TAG_ESCAPES:
+                        val = val.replace(to_repl, repl_with)
                 else:
                     key, val = str_tag, ''
                 tags[key] = val
@@ -239,7 +249,25 @@ class IrcMessage(NamedTuple):
                 if stage.prefix_char:
                     continue
             harvest[stage.name] += char
-        return cls(*[s.processor(harvest[s.name]) for s in stages])
+        return cls(**{s.name: s.processor(harvest[s.name]) for s in stages})
+
+    def send(self, conn: Connection) -> None:
+        'Send self to conn encoded into line.'
+        to_combine = []
+        if self.tags:
+            tag_strs = []
+            for key, val in self.tags.items():
+                tag_strs += [key]
+                if val:
+                    for repl_with, to_repl in reversed(IRCSPEC_TAG_ESCAPES):
+                        val = val.replace(to_repl, repl_with)
+                    tag_strs[-1] += f'={val}'
+            to_combine += ['@' + ';'.join(tag_strs)]
+        to_combine += [self.verb]
+        if self.parameters:
+            to_combine += self.parameters[:-1]
+            to_combine += [f':{self.parameters[-1]}']
+        conn.write_line(' '.join(to_combine))
 
 
 class Loop:
@@ -316,7 +344,10 @@ class TuiLoop(Loop):
     def process_main(self, event: Event) -> bool:
         if not super().process_main(event):
             return False
-        if event.type_ == 'RECV':
+        if event.type_ == 'ALERT':
+            self._log_buffer += [f'???? {event.args[0]}']
+            self._draw_log()
+        elif event.type_ == 'RECV':
             self._log_buffer += [f'<--- {event.args[0]}']
             self._draw_log()
         elif event.type_ == 'SEND':
@@ -324,7 +355,11 @@ class TuiLoop(Loop):
             self._draw_log()
         elif event.type_ == 'INPUT_PROMPT':
             if event.args[0] == 'ENTER':
-                self.broadcast('SEND', self._prompt)
+                toks = self._prompt.split(maxsplit=1)
+                if toks and toks[0] in {'QUIT'}:
+                    self.broadcast('SEND', IrcMessage(toks[0], toks[1:]))
+                else:
+                    self.broadcast('ALERT', f'invalid message: {self._prompt}')
                 self._prompt = ''
             elif event.args[0] == 'BACKSPACE':
                 self._prompt = self._prompt[:-1]
@@ -396,8 +431,12 @@ def run() -> None:
     q_to_main: SimpleQueue[Event] = SimpleQueue()
     with Terminal().context(q_to_main) as term:
         with Connection().context((HOST, PORT), q_to_main) as conn:
-            conn.write_line(f'USER {USERNAME} 0 * :{REALNAME}')
-            conn.write_line(f'NICK {NICKNAME}')
+            q_to_main.put(
+                Event('SEND',
+                      (IrcMessage('USER', [USERNAME, '0', '*', REALNAME]),)))
+            q_to_main.put(
+                Event('SEND',
+                      (IrcMessage('NICK', [NICKNAME]),)))
             while True:
                 event = q_to_main.get()
                 if event.type_ == 'QUIT':
@@ -406,13 +445,17 @@ def run() -> None:
                     term.tui.put(event)
                 elif event.type_ == 'SEND':
                     term.tui.put(event)
-                    conn.write_line(event.args[0])
+                    event.args[0].send(conn)
                 elif event.type_ == 'PING':
-                    q_to_main.put(Event('SEND', (f'PONG {event.args[0]}',)))
+                    q_to_main.put(
+                            Event('SEND',
+                                  (IrcMessage('PONG', [event.args[0]]),)))
                 elif event.type_ == 'RECV':
                     term.tui.put(event)
                 elif event.type_ == 'EXCEPTION':
                     raise event.args[0]
+                elif event.type_ == 'ALERT':
+                    term.tui.put(event)
                 # elif event.type_ == 'DEBUG':
                 #     term.tui.put(event)