From 7ea66be9de28472ea2721b9170d6fe75189a4495 Mon Sep 17 00:00:00 2001
From: Christian Heller <c.heller@plomlompom.de>
Date: Fri, 23 Oct 2020 02:37:28 +0200
Subject: [PATCH] Re-do IO with websocket capabilities.

---
 new2/plomrogue/io.py           |  87 +++++++++++++++++
 new2/plomrogue/io_tcp.py       | 165 +++++++++++++++++++++++++++++++++
 new2/plomrogue/io_websocket.py |  45 +++++++++
 3 files changed, 297 insertions(+)
 create mode 100644 new2/plomrogue/io.py
 create mode 100644 new2/plomrogue/io_tcp.py
 create mode 100644 new2/plomrogue/io_websocket.py

diff --git a/new2/plomrogue/io.py b/new2/plomrogue/io.py
new file mode 100644
index 0000000..12e7450
--- /dev/null
+++ b/new2/plomrogue/io.py
@@ -0,0 +1,87 @@
+import queue
+
+
+
+class GameIO():
+
+    def __init__(self, game):
+        from plomrogue.parser import Parser
+        self.clients = {}
+        self.parser = Parser(game)
+        self.game = game
+
+    def loop(self, q):
+        """Handle commands coming through queue q, run game, send results back."""
+        while True:
+            try:
+                command, connection_id = q.get(timeout=1)
+                self.handle_input(connection_id, command)
+            except queue.Empty:
+                self.game.run_tick()
+
+    def run_loop_with_server(self, port, server_class):
+        """Run connection of server talking to clients and game IO loop.
+
+        We have a server of self.server_class and we have the game IO loop,
+        a thread running self.loop. Both communicate with each other via a
+        queue.Queue. While the server may spawn parallel threads to many
+        clients, the IO loop works sequentially through game commands
+        received from the server's client connections.  A processed command
+        may trigger messages to the commanding client or to all clients,
+        delivered from the IO loop to the server via the queue.
+
+        """
+        import threading
+        q = queue.Queue()
+        c = threading.Thread(target=self.loop, daemon=True, args=(q,))
+        c.start()
+        self.server = server_class(q, port)
+        try:
+            self.server.serve_forever()
+        except KeyboardInterrupt:
+            pass
+        finally:
+            print('Killing server')
+            self.server.server_close()
+
+    def handle_input(self, input_, connection_id=None):
+        """Process input_ to command grammar, call command handler if found."""
+        from inspect import signature
+        from plomrogue.errors import GameError, ArgError
+        from plomrogue.misc import quote
+
+        def answer(connection_id, msg):
+            if connection_id:
+                self.send(msg, connection_id)
+            else:
+                print(msg)
+
+        try:
+            command, args = self.parser.parse(input_)
+            if command is None:
+                answer(connection_id, 'UNHANDLED_INPUT')
+            else:
+                if 'connection_id' in list(signature(command).parameters):
+                    command(*args, connection_id=connection_id)
+                else:
+                    command(*args)
+                    #if store and not hasattr(command, 'dont_save'):
+                    #    with open(self.game_file_name, 'a') as f:
+                    #        f.write(input_ + '\n')
+        except ArgError as e:
+            answer(connection_id, 'ARGUMENT_ERROR ' + quote(str(e)))
+        except GameError as e:
+            answer(connection_id, 'GAME_ERROR ' + quote(str(e)))
+
+    def send(self, msg, connection_id=None):
+        """Send message msg to server's client(s).
+
+        If a specific client is identified by connection_id, only
+        sends msg to that one. Else, sends it to all clients.
+
+        """
+        if connection_id:
+            self.server.clients[connection_id].put(msg)
+        else:
+            for c in self.server.clients.values():
+                c.put(msg)
diff --git a/new2/plomrogue/io_tcp.py b/new2/plomrogue/io_tcp.py
new file mode 100644
index 0000000..5cf66d9
--- /dev/null
+++ b/new2/plomrogue/io_tcp.py
@@ -0,0 +1,165 @@
+import socketserver
+
+
+# Avoid "Address already in use" errors.
+socketserver.TCPServer.allow_reuse_address = True
+
+
+
+class PlomSocket:
+
+    def __init__(self, socket):
+        self.socket = socket
+
+    def send(self, message, silent_connection_break=False):
+        """Send via self.socket, encoded/delimited as way recv() expects.
+
+        In detail, all \ and $ in message are escaped with prefixed \,
+        and an unescaped $ is appended as a message delimiter. Then,
+        socket.send() is called as often as necessary to ensure
+        message is sent fully, as socket.send() due to buffering may
+        not send all of it right away.
+
+        Assuming socket is blocking, it's rather improbable that
+        socket.send() will be partial / return a positive value less
+        than the (byte) length of msg – but not entirely out of the
+        question. See: - <http://stackoverflow.com/q/19697218> -
+        <http://stackoverflow.com/q/2618736> -
+        <http://stackoverflow.com/q/8900474>
+
+        This also handles a socket.send() return value of 0, which
+        might be possible or not (?) for blocking sockets: -
+        <http://stackoverflow.com/q/34919846>
+
+        """
+        from plomrogue.errors import BrokenSocketConnection
+        escaped_message = ''
+        for char in message:
+            if char in ('\\', '$'):
+                escaped_message += '\\'
+            escaped_message += char
+        escaped_message += '$'
+        data = escaped_message.encode()
+        totalsent = 0
+        while totalsent < len(data):
+            socket_broken = False
+            try:
+                sent = self.socket.send(data[totalsent:])
+                socket_broken = sent == 0
+            except OSError as err:
+                if err.errno == 9:  # "Bad file descriptor", when connection broken
+                    socket_broken = True
+                else:
+                    raise err
+            if socket_broken and not silent_connection_break:
+                raise BrokenSocketConnection
+            totalsent = totalsent + sent
+
+    def recv(self):
+        """Get full send()-prepared message from self.socket.
+
+        In detail, socket.recv() is looped over for sequences of bytes
+        that can be decoded as a Unicode string delimited by an
+        unescaped $, with \ and $ escapable by \. If a sequence of
+        characters that ends in an unescaped $ cannot be decoded as
+        Unicode, None is returned as its representation. Stop once
+        socket.recv() returns nothing.
+
+        Under the hood, the TCP stack receives packets that construct
+        the input payload in an internal buffer; socket.recv(BUFSIZE)
+        pops up to BUFSIZE bytes from that buffer, without knowledge
+        either about the input's segmentation into packets, or whether
+        the input is segmented in any other meaningful way; that's why
+        we do our own message segmentation with $ as a delimiter.
+
+        """
+        esc = False
+        data = b''
+        msg = b''
+        while True:
+            data += self.socket.recv(1024)
+            if 0 == len(data):
+                return
+            cut_off = 0
+            for c in data:
+                cut_off += 1
+                if esc:
+                    msg += bytes([c])
+                    esc = False
+                elif chr(c) == '\\':
+                    esc = True
+                elif chr(c) == '$':
+                    try:
+                        yield msg.decode()
+                    except UnicodeDecodeError:
+                        yield None
+                    data = data[cut_off:]
+                    msg = b''
+                else:
+                    msg += bytes([c])
+
+
+
+class IO_Handler(socketserver.BaseRequestHandler):
+
+    def handle(self):
+        """Move messages between network socket and game IO loop via queues.
+
+        On start (a new connection from client to server), sets up a
+        new queue, sends it via self.server.queue_out to the game IO
+        loop thread, and from then on receives messages to send back
+        from the game IO loop via that new queue.
+
+        At the same time, loops over socket's recv to get messages
+        from the outside into the game IO loop by way of
+        self.server.queue_out into the game IO. Ends connection once a
+        'QUIT' message is received from socket, and then also calls
+        for a kill of its own queue.
+
+        """
+
+        def send_queue_messages(plom_socket, queue_in, thread_alive):
+            """Send messages via socket from queue_in while thread_alive[0]."""
+            while thread_alive[0]:
+                try:
+                    msg = queue_in.get(timeout=1)
+                except queue.Empty:
+                    continue
+                plom_socket.send(msg, True)
+
+        import uuid
+        import queue
+        import threading
+        plom_socket = PlomSocket(self.request)
+        print('CONNECTION FROM:', str(self.client_address))
+        connection_id = uuid.uuid4()
+        queue_in = queue.Queue()
+        self.server.clients[connection_id] = queue_in
+        thread_alive = [True]
+        t = threading.Thread(target=send_queue_messages,
+                             args=(plom_socket, queue_in, thread_alive))
+        t.start()
+        for message in plom_socket.recv():
+            if message is None:
+                plom_socket.send('BAD MESSAGE', True)
+            elif 'QUIT' == message:
+                plom_socket.send('BYE', True)
+                break
+            else:
+                self.server.queue_out.put((connection_id, message))
+        del self.server.clients[connection_id]
+        thread_alive[0] = False
+        print('CONNECTION CLOSED FROM:', str(self.client_address))
+        plom_socket.socket.close()
+
+
+
+class PlomTCPServer(socketserver.ThreadingTCPServer):
+    """Bind together threaded IO handling server and message queue."""
+
+    def __init__(self, queue, port, *args, **kwargs):
+        super().__init__(('localhost', port), IO_Handler, *args, **kwargs)
+        self.queue_out = queue
+        self.daemon_threads = True  # Else, server's threads have daemon=False.
+        self.clients = {}
+
diff --git a/new2/plomrogue/io_websocket.py b/new2/plomrogue/io_websocket.py
new file mode 100644
index 0000000..49bdb77
--- /dev/null
+++ b/new2/plomrogue/io_websocket.py
@@ -0,0 +1,45 @@
+from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket
+
+
+
+class PlomWebSocket(WebSocket):
+
+    def handleMessage(self):
+        if self.data == 'QUIT':
+            self.sendMessage('BYE')
+            self.close()
+        else:
+            for connection_id in self.server.clients:
+                if self.server.clients[connection_id] == self:
+                    self.server.queue.put((connection_id, self.data))
+    
+    def handleConnected(self):
+        import uuid
+        print('CONNECTION FROM:', self.address)
+        connection_id = uuid.uuid4()
+        self.server.clients[connection_id] = self
+
+    def handleClose(self):
+        print('CONNECTION CLOSED FROM:', self.address)
+        for connection_ids in self.server.clients:
+            if self.server.clients[connection_id] == self:
+                del self.server.clients[connection_id]
+
+    def put(self, msg):
+        self.sendMessage(msg)
+
+
+
+class PlomWebSocketServer(SimpleWebSocketServer):
+
+    def __init__(self, queue, port, *args, **kwargs):
+        super().__init__('', port, PlomWebSocket)
+        self.queue = queue
+        self.clients = {}
+
+    def serve_forever(self):
+        self.serveforever()
+
+    def server_close(self):
+        self.close()
+
-- 
2.30.2