X-Git-Url: https://plomlompom.com/repos/berlin_corona.txt?a=blobdiff_plain;f=new2%2Fplomrogue%2Fio_tcp.py;fp=new2%2Fplomrogue%2Fio_tcp.py;h=5cf66d9e19fc2b278cd5ed94e5ea0c6721495a91;hb=7ea66be9de28472ea2721b9170d6fe75189a4495;hp=0000000000000000000000000000000000000000;hpb=79efff96255f669af6aa39fe5fdd7f24afede882;p=plomrogue2-experiments diff --git a/new2/plomrogue/io_tcp.py b/new2/plomrogue/io_tcp.py new file mode 100644 index 0000000..5cf66d9 --- /dev/null +++ b/new2/plomrogue/io_tcp.py @@ -0,0 +1,165 @@ +import socketserver + + +# Avoid "Address already in use" errors. +socketserver.TCPServer.allow_reuse_address = True + + + +class PlomSocket: + + def __init__(self, socket): + self.socket = socket + + def send(self, message, silent_connection_break=False): + """Send via self.socket, encoded/delimited as way recv() expects. + + In detail, all \ and $ in message are escaped with prefixed \, + and an unescaped $ is appended as a message delimiter. Then, + socket.send() is called as often as necessary to ensure + message is sent fully, as socket.send() due to buffering may + not send all of it right away. + + Assuming socket is blocking, it's rather improbable that + socket.send() will be partial / return a positive value less + than the (byte) length of msg – but not entirely out of the + question. See: - - + - + + + This also handles a socket.send() return value of 0, which + might be possible or not (?) for blocking sockets: - + + + """ + from plomrogue.errors import BrokenSocketConnection + escaped_message = '' + for char in message: + if char in ('\\', '$'): + escaped_message += '\\' + escaped_message += char + escaped_message += '$' + data = escaped_message.encode() + totalsent = 0 + while totalsent < len(data): + socket_broken = False + try: + sent = self.socket.send(data[totalsent:]) + socket_broken = sent == 0 + except OSError as err: + if err.errno == 9: # "Bad file descriptor", when connection broken + socket_broken = True + else: + raise err + if socket_broken and not silent_connection_break: + raise BrokenSocketConnection + totalsent = totalsent + sent + + def recv(self): + """Get full send()-prepared message from self.socket. + + In detail, socket.recv() is looped over for sequences of bytes + that can be decoded as a Unicode string delimited by an + unescaped $, with \ and $ escapable by \. If a sequence of + characters that ends in an unescaped $ cannot be decoded as + Unicode, None is returned as its representation. Stop once + socket.recv() returns nothing. + + Under the hood, the TCP stack receives packets that construct + the input payload in an internal buffer; socket.recv(BUFSIZE) + pops up to BUFSIZE bytes from that buffer, without knowledge + either about the input's segmentation into packets, or whether + the input is segmented in any other meaningful way; that's why + we do our own message segmentation with $ as a delimiter. + + """ + esc = False + data = b'' + msg = b'' + while True: + data += self.socket.recv(1024) + if 0 == len(data): + return + cut_off = 0 + for c in data: + cut_off += 1 + if esc: + msg += bytes([c]) + esc = False + elif chr(c) == '\\': + esc = True + elif chr(c) == '$': + try: + yield msg.decode() + except UnicodeDecodeError: + yield None + data = data[cut_off:] + msg = b'' + else: + msg += bytes([c]) + + + +class IO_Handler(socketserver.BaseRequestHandler): + + def handle(self): + """Move messages between network socket and game IO loop via queues. + + On start (a new connection from client to server), sets up a + new queue, sends it via self.server.queue_out to the game IO + loop thread, and from then on receives messages to send back + from the game IO loop via that new queue. + + At the same time, loops over socket's recv to get messages + from the outside into the game IO loop by way of + self.server.queue_out into the game IO. Ends connection once a + 'QUIT' message is received from socket, and then also calls + for a kill of its own queue. + + """ + + def send_queue_messages(plom_socket, queue_in, thread_alive): + """Send messages via socket from queue_in while thread_alive[0].""" + while thread_alive[0]: + try: + msg = queue_in.get(timeout=1) + except queue.Empty: + continue + plom_socket.send(msg, True) + + import uuid + import queue + import threading + plom_socket = PlomSocket(self.request) + print('CONNECTION FROM:', str(self.client_address)) + connection_id = uuid.uuid4() + queue_in = queue.Queue() + self.server.clients[connection_id] = queue_in + thread_alive = [True] + t = threading.Thread(target=send_queue_messages, + args=(plom_socket, queue_in, thread_alive)) + t.start() + for message in plom_socket.recv(): + if message is None: + plom_socket.send('BAD MESSAGE', True) + elif 'QUIT' == message: + plom_socket.send('BYE', True) + break + else: + self.server.queue_out.put((connection_id, message)) + del self.server.clients[connection_id] + thread_alive[0] = False + print('CONNECTION CLOSED FROM:', str(self.client_address)) + plom_socket.socket.close() + + + +class PlomTCPServer(socketserver.ThreadingTCPServer): + """Bind together threaded IO handling server and message queue.""" + + def __init__(self, queue, port, *args, **kwargs): + super().__init__(('localhost', port), IO_Handler, *args, **kwargs) + self.queue_out = queue + self.daemon_threads = True # Else, server's threads have daemon=False. + self.clients = {} +