From: Christian Heller Date: Fri, 23 Oct 2020 00:37:28 +0000 (+0200) Subject: Re-do IO with websocket capabilities. X-Git-Url: https://plomlompom.com/repos/%7B%7Bprefix%7D%7D/%7B%7B%20web_path%20%7D%7D/static/template?a=commitdiff_plain;h=7ea66be9de28472ea2721b9170d6fe75189a4495;p=plomrogue2-experiments Re-do IO with websocket capabilities. --- diff --git a/new2/plomrogue/io.py b/new2/plomrogue/io.py new file mode 100644 index 0000000..12e7450 --- /dev/null +++ b/new2/plomrogue/io.py @@ -0,0 +1,87 @@ +import queue + + + +class GameIO(): + + def __init__(self, game): + from plomrogue.parser import Parser + self.clients = {} + self.parser = Parser(game) + self.game = game + + def loop(self, q): + """Handle commands coming through queue q, run game, send results back.""" + while True: + try: + command, connection_id = q.get(timeout=1) + self.handle_input(connection_id, command) + except queue.Empty: + self.game.run_tick() + + def run_loop_with_server(self, port, server_class): + """Run connection of server talking to clients and game IO loop. + + We have a server of self.server_class and we have the game IO loop, + a thread running self.loop. Both communicate with each other via a + queue.Queue. While the server may spawn parallel threads to many + clients, the IO loop works sequentially through game commands + received from the server's client connections. A processed command + may trigger messages to the commanding client or to all clients, + delivered from the IO loop to the server via the queue. + + """ + import threading + q = queue.Queue() + c = threading.Thread(target=self.loop, daemon=True, args=(q,)) + c.start() + self.server = server_class(q, port) + try: + self.server.serve_forever() + except KeyboardInterrupt: + pass + finally: + print('Killing server') + self.server.server_close() + + def handle_input(self, input_, connection_id=None): + """Process input_ to command grammar, call command handler if found.""" + from inspect import signature + from plomrogue.errors import GameError, ArgError + from plomrogue.misc import quote + + def answer(connection_id, msg): + if connection_id: + self.send(msg, connection_id) + else: + print(msg) + + try: + command, args = self.parser.parse(input_) + if command is None: + answer(connection_id, 'UNHANDLED_INPUT') + else: + if 'connection_id' in list(signature(command).parameters): + command(*args, connection_id=connection_id) + else: + command(*args) + #if store and not hasattr(command, 'dont_save'): + # with open(self.game_file_name, 'a') as f: + # f.write(input_ + '\n') + except ArgError as e: + answer(connection_id, 'ARGUMENT_ERROR ' + quote(str(e))) + except GameError as e: + answer(connection_id, 'GAME_ERROR ' + quote(str(e))) + + def send(self, msg, connection_id=None): + """Send message msg to server's client(s). + + If a specific client is identified by connection_id, only + sends msg to that one. Else, sends it to all clients. + + """ + if connection_id: + self.server.clients[connection_id].put(msg) + else: + for c in self.server.clients.values(): + c.put(msg) diff --git a/new2/plomrogue/io_tcp.py b/new2/plomrogue/io_tcp.py new file mode 100644 index 0000000..5cf66d9 --- /dev/null +++ b/new2/plomrogue/io_tcp.py @@ -0,0 +1,165 @@ +import socketserver + + +# Avoid "Address already in use" errors. +socketserver.TCPServer.allow_reuse_address = True + + + +class PlomSocket: + + def __init__(self, socket): + self.socket = socket + + def send(self, message, silent_connection_break=False): + """Send via self.socket, encoded/delimited as way recv() expects. + + In detail, all \ and $ in message are escaped with prefixed \, + and an unescaped $ is appended as a message delimiter. Then, + socket.send() is called as often as necessary to ensure + message is sent fully, as socket.send() due to buffering may + not send all of it right away. + + Assuming socket is blocking, it's rather improbable that + socket.send() will be partial / return a positive value less + than the (byte) length of msg – but not entirely out of the + question. See: - - + - + + + This also handles a socket.send() return value of 0, which + might be possible or not (?) for blocking sockets: - + + + """ + from plomrogue.errors import BrokenSocketConnection + escaped_message = '' + for char in message: + if char in ('\\', '$'): + escaped_message += '\\' + escaped_message += char + escaped_message += '$' + data = escaped_message.encode() + totalsent = 0 + while totalsent < len(data): + socket_broken = False + try: + sent = self.socket.send(data[totalsent:]) + socket_broken = sent == 0 + except OSError as err: + if err.errno == 9: # "Bad file descriptor", when connection broken + socket_broken = True + else: + raise err + if socket_broken and not silent_connection_break: + raise BrokenSocketConnection + totalsent = totalsent + sent + + def recv(self): + """Get full send()-prepared message from self.socket. + + In detail, socket.recv() is looped over for sequences of bytes + that can be decoded as a Unicode string delimited by an + unescaped $, with \ and $ escapable by \. If a sequence of + characters that ends in an unescaped $ cannot be decoded as + Unicode, None is returned as its representation. Stop once + socket.recv() returns nothing. + + Under the hood, the TCP stack receives packets that construct + the input payload in an internal buffer; socket.recv(BUFSIZE) + pops up to BUFSIZE bytes from that buffer, without knowledge + either about the input's segmentation into packets, or whether + the input is segmented in any other meaningful way; that's why + we do our own message segmentation with $ as a delimiter. + + """ + esc = False + data = b'' + msg = b'' + while True: + data += self.socket.recv(1024) + if 0 == len(data): + return + cut_off = 0 + for c in data: + cut_off += 1 + if esc: + msg += bytes([c]) + esc = False + elif chr(c) == '\\': + esc = True + elif chr(c) == '$': + try: + yield msg.decode() + except UnicodeDecodeError: + yield None + data = data[cut_off:] + msg = b'' + else: + msg += bytes([c]) + + + +class IO_Handler(socketserver.BaseRequestHandler): + + def handle(self): + """Move messages between network socket and game IO loop via queues. + + On start (a new connection from client to server), sets up a + new queue, sends it via self.server.queue_out to the game IO + loop thread, and from then on receives messages to send back + from the game IO loop via that new queue. + + At the same time, loops over socket's recv to get messages + from the outside into the game IO loop by way of + self.server.queue_out into the game IO. Ends connection once a + 'QUIT' message is received from socket, and then also calls + for a kill of its own queue. + + """ + + def send_queue_messages(plom_socket, queue_in, thread_alive): + """Send messages via socket from queue_in while thread_alive[0].""" + while thread_alive[0]: + try: + msg = queue_in.get(timeout=1) + except queue.Empty: + continue + plom_socket.send(msg, True) + + import uuid + import queue + import threading + plom_socket = PlomSocket(self.request) + print('CONNECTION FROM:', str(self.client_address)) + connection_id = uuid.uuid4() + queue_in = queue.Queue() + self.server.clients[connection_id] = queue_in + thread_alive = [True] + t = threading.Thread(target=send_queue_messages, + args=(plom_socket, queue_in, thread_alive)) + t.start() + for message in plom_socket.recv(): + if message is None: + plom_socket.send('BAD MESSAGE', True) + elif 'QUIT' == message: + plom_socket.send('BYE', True) + break + else: + self.server.queue_out.put((connection_id, message)) + del self.server.clients[connection_id] + thread_alive[0] = False + print('CONNECTION CLOSED FROM:', str(self.client_address)) + plom_socket.socket.close() + + + +class PlomTCPServer(socketserver.ThreadingTCPServer): + """Bind together threaded IO handling server and message queue.""" + + def __init__(self, queue, port, *args, **kwargs): + super().__init__(('localhost', port), IO_Handler, *args, **kwargs) + self.queue_out = queue + self.daemon_threads = True # Else, server's threads have daemon=False. + self.clients = {} + diff --git a/new2/plomrogue/io_websocket.py b/new2/plomrogue/io_websocket.py new file mode 100644 index 0000000..49bdb77 --- /dev/null +++ b/new2/plomrogue/io_websocket.py @@ -0,0 +1,45 @@ +from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket + + + +class PlomWebSocket(WebSocket): + + def handleMessage(self): + if self.data == 'QUIT': + self.sendMessage('BYE') + self.close() + else: + for connection_id in self.server.clients: + if self.server.clients[connection_id] == self: + self.server.queue.put((connection_id, self.data)) + + def handleConnected(self): + import uuid + print('CONNECTION FROM:', self.address) + connection_id = uuid.uuid4() + self.server.clients[connection_id] = self + + def handleClose(self): + print('CONNECTION CLOSED FROM:', self.address) + for connection_ids in self.server.clients: + if self.server.clients[connection_id] == self: + del self.server.clients[connection_id] + + def put(self, msg): + self.sendMessage(msg) + + + +class PlomWebSocketServer(SimpleWebSocketServer): + + def __init__(self, queue, port, *args, **kwargs): + super().__init__('', port, PlomWebSocket) + self.queue = queue + self.clients = {} + + def serve_forever(self): + self.serveforever() + + def server_close(self): + self.close() +