X-Git-Url: https://plomlompom.com/repos/foo.html?a=blobdiff_plain;f=server.py;h=b088206ef07ea35a1b91372ca4931c196c74f89f;hb=822c68744ead480ba7a48b786148facc8ef1eb04;hp=b326768ea4a5499818f141afe8b4b7a1e8d3d1ad;hpb=47dc6843988096102d5424d00c0e411a608792e2;p=plomrogue2-experiments diff --git a/server.py b/server.py index b326768..b088206 100755 --- a/server.py +++ b/server.py @@ -1,12 +1,23 @@ #!/usr/bin/env python3 import socketserver -import plom_socket_io +import plom_socket_io +import threading +import time # Avoid "Address already in use" errors. socketserver.TCPServer.allow_reuse_address = True +class Server(socketserver.ThreadingTCPServer): + """Bind together threaded IO handling server and world state (counter).""" + + def __init__(self, counter, *args, **kwargs): + super().__init__(*args, **kwargs) + self.counter = counter + self.daemon_threads = True # Else, server's threads have daemon=False. + + def fib(n): """Calculate n-th Fibonacci number.""" if n in (1, 2): @@ -15,81 +26,106 @@ def fib(n): return fib(n-1) + fib(n-2) -def handle_message(message): - """Evaluate message for computing-heavy tasks to perform, yield result. - - Accepts one command: FIB, followed by positive integers, all tokens - separated by whitespace. Will calculate and return for each such integer n - the n-th Fibonacci number. Uses multiprocessing to perform multiple such - calculations in parallel. Yields a 'CALCULATING …' message before the - calculation starts, and finally yields a message containing the results. - (The 'CALCULATING …' message coming before the results message is currently - the main reason this works as a generator function using yield.) - - When no command can be read into the message, just yields a 'NO COMMAND - UNDERSTOOD:', followed by the message. - """ - tokens = message.split(' ') - if tokens[0] == 'FIB': - msg_fail_fib = 'MALFORMED FIB REQUEST' - if len(tokens) < 2: - yield msg_fail_fib - return - numbers = [] - fail = False - for token in tokens[1:]: - if token != '0' and token.isdigit(): - numbers += [int(token)] - elif token == '': - continue - else: - yield msg_fail_fib - return - yield 'CALCULATING …' - reply = '' - from multiprocessing import Pool - with Pool(len(numbers)) as p: - results = p.map(fib, numbers) - reply = ' '.join([str(r) for r in results]) - yield reply - return - yield 'NO COMMAND UNDERSTOOD: %s' % message - - -class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): - """Enables threading on TCP server for asynchronous IO handling.""" - pass - - -class MyTCPHandler(socketserver.BaseRequestHandler): +class IO_Handler(socketserver.BaseRequestHandler): def handle(self): - """Loop recv for input, act on it, send reply. + """Loop recv for input, send replies; also, send regular counter value. If input is 'QUIT', send reply 'BYE' and end loop / connection. - Otherwise, use handle_message. + Otherwise, use handle_message to interpret and enact commands. """ + def caught_send(socket, message): + """Send message by socket, catch broken socket connection error.""" + try: + plom_socket_io.send(socket, message) + except plom_socket_io.BrokenSocketConnection: + pass + + def send_counter_loop(socket, counter, kill): + """Every 5 seconds, send state of counter[0] until kill[0] set.""" + while not kill[0]: + caught_send(socket, 'COUNTER ' + str(counter[0])) + time.sleep(5) + + def handle_message(message): + """Evaluate message for tasks to perform, yield result. + + Accepts one command: FIB, followed by positive integers, all tokens + separated by whitespace. Will calculate and return for each such + integer n the n-th Fibonacci number. Uses multiprocessing to + perform multiple such calculations in parallel. Yields a + 'CALCULATING …' message before the calculation starts, and finally + yields a message containing the results. (The 'CALCULATING …' + message coming before the results message is currently the main + reason this works as a generator function using yield.) + + When no command can be read into the message, just yields a 'NO + COMMAND UNDERSTOOD:', followed by the message. + """ + from multiprocessing import Pool + tokens = message.split(' ') + if tokens[0] == 'FIB': + msg_fail_fib = 'MALFORMED FIB REQUEST' + if len(tokens) < 2: + yield msg_fail_fib + return + numbers = [] + for token in tokens[1:]: + if token != '0' and token.isdigit(): + numbers += [int(token)] + elif token == '': + continue + else: + yield msg_fail_fib + return + yield 'CALCULATING …' + reply = '' + with Pool(len(numbers)) as p: + results = p.map(fib, numbers) + reply = ' '.join([str(r) for r in results]) + yield reply + return + yield 'NO COMMAND UNDERSTOOD: %s' % message print('CONNECTION FROM:', str(self.client_address)) + counter_loop_killer = [False] + send_count = threading.Thread(target=send_counter_loop, + kwargs={'counter': self.server.counter, + 'socket': self.request, + 'kill': counter_loop_killer}) + send_count.start() for message in plom_socket_io.recv(self.request): if message is None: print('RECEIVED MALFORMED MESSAGE') - plom_socket_io.send(self.request, 'bad message') + caught_send(self.request, 'bad message') elif 'QUIT' == message: - plom_socket_io.send(self.request, 'BYE') + caught_send(self.request, 'BYE') break else: print('RECEIVED MESSAGE:', message) for reply in handle_message(message): - plom_socket_io.send(self.request, reply) + caught_send(self.request, reply) + counter_loop_killer = [True] print('CONNECTION CLOSED:', str(self.client_address)) self.request.close() -server = ThreadedTCPServer(('localhost', 5000), MyTCPHandler) +def inc_loop(counter, interval): + """Loop incrementing counter every interval seconds.""" + while True: + time.sleep(interval) + counter[0] += 1 + + +counter = [0] +b = threading.Thread(target=inc_loop, daemon=True, kwargs={'counter': counter, + 'interval': 1}) +b.start() +server = Server(counter, ('localhost', 5000), IO_Handler) try: server.serve_forever() except KeyboardInterrupt: pass finally: + print('Killing server') server.server_close()