From 4c633486d35862198e0ed6553ee52e43a0401a3b Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Sat, 19 Aug 2017 23:53:12 +0200 Subject: [PATCH] Extend, reorganize commands server is capable of. --- client.py | 9 ++-- server.py | 154 ++++++++++++++++++++++++++++++++++-------------------- 2 files changed, 101 insertions(+), 62 deletions(-) diff --git a/client.py b/client.py index 9092fe6..97e0588 100755 --- a/client.py +++ b/client.py @@ -6,7 +6,7 @@ import socket import threading -class UrwidSetup(): +class UrwidSetup: def __init__(self, socket): """Build client urwid interface around socket communication. @@ -23,13 +23,12 @@ class UrwidSetup(): mechanism instead: using a pipe from non-urwid threads into a single urwid thread. We use self.recv_loop_thread to poll the socket, therein write socket.recv output to an object that is then linked to by - self.server_output (which is known the urwid thread), and then use the + self.server_output (which is known to the urwid thread), then use the pipe to urwid to trigger it pulling new data from self.server_output to handle via self.InputHandler. (We *could* pipe socket.recv output directly, but then we get complicated buffering situations here as well - as in the urwid code that receives the pipe output. It's much easier to - just tell the urwid code where it finds a full new server message to - handle.) + as in the urwid code that receives the pipe output. It's easier to just + tell the urwid code where it finds full new server messages to handle.) """ self.socket = socket self.main_loop = urwid.MainLoop(self.setup_widgets()) diff --git a/server.py b/server.py index 6e3564b..0a0ac88 100755 --- a/server.py +++ b/server.py @@ -17,14 +17,6 @@ class Server(socketserver.ThreadingTCPServer): self.daemon_threads = True # Else, server's threads have daemon=False. -def fib(n): - """Calculate n-th Fibonacci number. Very inefficiently.""" - if n in (1, 2): - return 1 - else: - return fib(n-1) + fib(n-2) - - class IO_Handler(socketserver.BaseRequestHandler): def handle(self): @@ -83,12 +75,102 @@ class IO_Handler(socketserver.BaseRequestHandler): self.server.queue_out.put(('COMMAND', connection_id, message)) self.server.queue_out.put(('KILL_QUEUE', connection_id)) thread_alive[0] = False - print('CONNECTION CLOSED:', str(self.client_address)) + print('CONNECTION CLOSED FROM:', str(self.client_address)) self.request.close() +class World: + turn = 0 + + +def fib(n): + """Calculate n-th Fibonacci number. Very inefficiently.""" + if n in (1, 2): + return 1 + else: + return fib(n-1) + fib(n-2) + + +class CommandHandler: + + def __init__(self, world, queues_out): + self.world = world + self.queues_out = queues_out + + def send_to(self, connection_id, msg): + """Send msg to client of connection_id.""" + self.queues_out[connection_id].put(msg) + + def send_all(self, msg): + """Send msg to all clients.""" + for connection_id in self.queues_out: + self.send_to(connection_id, msg) + + def cmd_fib(self, tokens, connection_id): + """Reply with n-th Fibonacci numbers, n taken from tokens[1:]. + + Numbers are calculated in parallel as far as possible, using fib(). + A 'CALCULATING …' message is sent to caller before the result. + """ + from multiprocessing import Pool + fib_fail = 'MALFORMED FIB REQUEST' + if len(tokens) < 2: + self.send_to(connection_id, fib_fail) + return + numbers = [] + for token in tokens[1:]: + if token != '0' and token.isdigit(): + numbers += [int(token)] + else: + self.send_to(connection_id, fib_fail) + return + self.send_to(connection_id, 'CALCULATING …') + with Pool(len(numbers)) as p: + results = p.map(fib, numbers) + reply = ' '.join([str(r) for r in results]) + self.send_to(connection_id, reply) + + def cmd_inc(self, connection_id): + """Increment world.turn, send NEW_TURN message to all clients.""" + self.world.turn += 1 + self.send_all('NEW_TURN ' + str(self.world.turn)) + + def cmd_get_turn(self, connection_id): + """Send world.turn to caller.""" + self.send_to(connection_id, str(self.world.turn)) + + def cmd_echo(self, tokens, input_, connection_id): + """Send message in input_ beyond tokens[0] to caller.""" + msg = input_[len(tokens[0]) + 1:] + self.send_to(connection_id, msg) + + def cmd_all(self, tokens, input_): + """Send message in input_ beyond tokens[0] to all clients.""" + msg = input_[len(tokens[0]) + 1:] + self.send_all(msg) + + def handle_input(self, input_, connection_id): + """Process input_ to command grammar, call command handler if found.""" + tokens = [token for token in input_.split(' ') if len(token) > 0] + if len(tokens) == 0: + self.send_to(connection_id, 'EMPTY COMMAND') + elif len(tokens) == 1 and tokens[0] == 'INC': + self.cmd_inc(connection_id) + elif len(tokens) == 1 and tokens[0] == 'GET_TURN': + self.cmd_get_turn(connection_id) + elif len(tokens) >= 1 and tokens[0] == 'ECHO': + self.cmd_echo(tokens, input_, connection_id) + elif len(tokens) >= 1 and tokens[0] == 'ALL': + self.cmd_all(tokens, input_) + elif len(tokens) >= 1 and tokens[0] == 'FIB': + # TODO: Should this really block the whole loop? + self.cmd_fib(tokens, connection_id) + else: + self.send_to(connection_id, 'UNKNOWN COMMAND') + + def io_loop(q): - """Handle commands coming through queue q, send results back. + """Handle commands coming through queue q, send results back. Commands from q are expected to be tuples, with the first element either 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and @@ -101,19 +183,12 @@ def io_loop(q): which to send replies. A 'COMMAND' command is specified in greater detail by a string that is the - tuple's third element. Here, the following commands are understood: - - A string starting with 'PRIVMSG' returns the space-separated tokens - following 'PRIVMSG' to the sender via its receiver queue. - - A string starting with 'ALL' sends the space-separated tokens following - 'ALL' to all receiver queues. - - A string starting with 'FIB' followed by space-separated positive - integers returns to the receiver queue first a 'CALCULATING …' messsage, - and afterwards for each such integer n the n-th Fibonacci number as a - space-separated sequence of integers. Fibonacci numbers are calculated - in parallel if possible. + tuple's third element. CommandHandler takes care of processing this and + sending out replies. """ - from multiprocessing import Pool queues_out = {} + world = World() + command_handler = CommandHandler(world, queues_out) while True: x = q.get() command_type = x[0] @@ -122,42 +197,7 @@ def io_loop(q): if command_type == 'ADD_QUEUE': queues_out[connection_id] = content elif command_type == 'COMMAND': - tokens = [token for token in content.split(' ') if len(token) > 0] - if len(tokens) == 0: - queues_out[connection_id].put('EMPTY COMMAND') - continue - if tokens[0] == 'PRIVMSG': - reply = ' '.join(tokens[1:]) - queues_out[connection_id].put(reply) - elif tokens[0] == 'ALL': - reply = ' '.join(tokens[1:]) - for key in queues_out: - queues_out[key].put(reply) - elif tokens[0] == 'FIB': - fib_fail = 'MALFORMED FIB REQUEST' - if len(tokens) < 2: - queues_out[connection_id].put(fib_fail) - continue - numbers = [] - fail = False - for token in tokens[1:]: - if token != '0' and token.isdigit(): - numbers += [int(token)] - else: - queues_out[connection_id].put(fib_fail) - fail = True - break - if fail: - continue - queues_out[connection_id].put('CALCULATING …') - reply = '' - # this blocks the whole loop, BAD - with Pool(len(numbers)) as p: - results = p.map(fib, numbers) - reply = ' '.join([str(r) for r in results]) - queues_out[connection_id].put(reply) - else: - queues_out[connection_id].put('UNKNOWN COMMAND') + command_handler.handle_input(content, connection_id) elif command_type == 'KILL_QUEUE': del queues_out[connection_id] -- 2.30.2