From 822c68744ead480ba7a48b786148facc8ef1eb04 Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Wed, 26 Apr 2017 03:26:26 +0200 Subject: [PATCH] Some more experiments with threading. --- README.md | 2 + client.py | 33 +++++------ plom_socket_io.py | 19 ++++-- server.py | 144 +++++++++++++++++++++++++++++----------------- 4 files changed, 120 insertions(+), 78 deletions(-) diff --git a/README.md b/README.md index b65f327..e5301c4 100644 --- a/README.md +++ b/README.md @@ -26,4 +26,6 @@ The following commands can be sent from client to server: such integer n, calculates th nn-th Fibonacci number (this allows for testing parallel CPU-heavy computation) +Additionally, the server regularly sends a counter state to the client. + See `./requirements.txt` for the dependencies. diff --git a/client.py b/client.py index 92d555e..3e50b2e 100755 --- a/client.py +++ b/client.py @@ -1,18 +1,18 @@ #!/usr/bin/env python3 import urwid -import plom_socket_io +import plom_socket_io import socket import threading -class RecvThread(threading.Thread): - """Background thread that delivers messages from the socket to urwid. +def recv_loop(socket, urwid_pipe_write_fd, server_output): + """Loop to receive messages from socket and deliver them to urwid. The message transfer to urwid is a bit weird. The urwid developers warn against sharing urwid resources among threads, and recommend using urwid's watch_pipe mechanism: using a pipe from non-urwid threads into a single - urwid thread. We could pipe the recv output directly, but then we get + urwid thread. We could pipe socket.recv output directly, but then we get complicated buffering situations here as well as in the urwid code that receives the pipe content. It's much easier to update a third resource (server_output, which references an object that's also known to the urwid @@ -20,24 +20,15 @@ class RecvThread(threading.Thread): (urwid_pipe_write_fd) to trigger the urwid code to pull the message in from that third resource. We send a single b' ' through the pipe to trigger it. """ - - def __init__(self, socket, urwid_pipe_write_fd, server_output): - super().__init__() - self.socket = socket - self.urwid_pipe = urwid_pipe_write_fd - self.server_output = server_output - - def run(self): - """On message receive, write to self.server_output, ping urwid pipe.""" - import os - for msg in plom_socket_io.recv(self.socket): - self.server_output[0] = msg - os.write(self.urwid_pipe, b' ') + import os + for msg in plom_socket_io.recv(socket): + server_output[0] = msg + os.write(urwid_pipe_write_fd, b' ') class InputHandler: """Helps delivering data from other thread to widget via message_container. - + The whole class only exists to provide handle_input as a bound method, with widget and message_container pre-set, as (bound) handle_input is used as a callback in urwid's watch_pipe – which merely provides its callback target @@ -63,7 +54,7 @@ class InputHandler: if self.message_container[0] == 'BYE': raise urwid.ExitMainLoop() return - self.widget.set_text('REPLY: ' + self.message_container[0]) + self.widget.set_text('SERVER: ' + self.message_container[0]) class SocketInputWidget(urwid.Filler): @@ -91,7 +82,9 @@ loop = urwid.MainLoop(fill) server_output = [''] write_fd = loop.watch_pipe(getattr(InputHandler(txt, server_output), 'handle_input')) -thread = RecvThread(s, write_fd, server_output) +thread = threading.Thread(target=recv_loop, + kwargs={'socket': s, 'server_output': server_output, + 'urwid_pipe_write_fd': write_fd}) thread.start() loop.run() diff --git a/plom_socket_io.py b/plom_socket_io.py index 07d41ce..ebde3c1 100644 --- a/plom_socket_io.py +++ b/plom_socket_io.py @@ -1,3 +1,7 @@ +class BrokenSocketConnection(Exception): + pass + + def send(socket, message): """Send message via socket, encoded and delimited the way recv() expects. @@ -26,9 +30,17 @@ def send(socket, message): data = escaped_message.encode() totalsent = 0 while totalsent < len(data): - sent = socket.send(data[totalsent:]) - if sent == 0: - raise RuntimeError('socket connection broken') + socket_broken = False + try: + sent = socket.send(data[totalsent:]) + socket_broken = sent == 0 + except OSError as err: + if err.errno == 9: # "Bad file descriptor", when connection broken + socket_broken = True + else: + raise err + if socket_broken: + raise BrokenSocketConnection totalsent = totalsent + sent @@ -48,7 +60,6 @@ def recv(socket): meaningful way; that's why we do our own message segmentation with $ as a delimiter. """ - quit = False esc = False data = b'' msg = b'' diff --git a/server.py b/server.py index b326768..b088206 100755 --- a/server.py +++ b/server.py @@ -1,12 +1,23 @@ #!/usr/bin/env python3 import socketserver -import plom_socket_io +import plom_socket_io +import threading +import time # Avoid "Address already in use" errors. socketserver.TCPServer.allow_reuse_address = True +class Server(socketserver.ThreadingTCPServer): + """Bind together threaded IO handling server and world state (counter).""" + + def __init__(self, counter, *args, **kwargs): + super().__init__(*args, **kwargs) + self.counter = counter + self.daemon_threads = True # Else, server's threads have daemon=False. + + def fib(n): """Calculate n-th Fibonacci number.""" if n in (1, 2): @@ -15,81 +26,106 @@ def fib(n): return fib(n-1) + fib(n-2) -def handle_message(message): - """Evaluate message for computing-heavy tasks to perform, yield result. - - Accepts one command: FIB, followed by positive integers, all tokens - separated by whitespace. Will calculate and return for each such integer n - the n-th Fibonacci number. Uses multiprocessing to perform multiple such - calculations in parallel. Yields a 'CALCULATING …' message before the - calculation starts, and finally yields a message containing the results. - (The 'CALCULATING …' message coming before the results message is currently - the main reason this works as a generator function using yield.) - - When no command can be read into the message, just yields a 'NO COMMAND - UNDERSTOOD:', followed by the message. - """ - tokens = message.split(' ') - if tokens[0] == 'FIB': - msg_fail_fib = 'MALFORMED FIB REQUEST' - if len(tokens) < 2: - yield msg_fail_fib - return - numbers = [] - fail = False - for token in tokens[1:]: - if token != '0' and token.isdigit(): - numbers += [int(token)] - elif token == '': - continue - else: - yield msg_fail_fib - return - yield 'CALCULATING …' - reply = '' - from multiprocessing import Pool - with Pool(len(numbers)) as p: - results = p.map(fib, numbers) - reply = ' '.join([str(r) for r in results]) - yield reply - return - yield 'NO COMMAND UNDERSTOOD: %s' % message - - -class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): - """Enables threading on TCP server for asynchronous IO handling.""" - pass - - -class MyTCPHandler(socketserver.BaseRequestHandler): +class IO_Handler(socketserver.BaseRequestHandler): def handle(self): - """Loop recv for input, act on it, send reply. + """Loop recv for input, send replies; also, send regular counter value. If input is 'QUIT', send reply 'BYE' and end loop / connection. - Otherwise, use handle_message. + Otherwise, use handle_message to interpret and enact commands. """ + def caught_send(socket, message): + """Send message by socket, catch broken socket connection error.""" + try: + plom_socket_io.send(socket, message) + except plom_socket_io.BrokenSocketConnection: + pass + + def send_counter_loop(socket, counter, kill): + """Every 5 seconds, send state of counter[0] until kill[0] set.""" + while not kill[0]: + caught_send(socket, 'COUNTER ' + str(counter[0])) + time.sleep(5) + + def handle_message(message): + """Evaluate message for tasks to perform, yield result. + + Accepts one command: FIB, followed by positive integers, all tokens + separated by whitespace. Will calculate and return for each such + integer n the n-th Fibonacci number. Uses multiprocessing to + perform multiple such calculations in parallel. Yields a + 'CALCULATING …' message before the calculation starts, and finally + yields a message containing the results. (The 'CALCULATING …' + message coming before the results message is currently the main + reason this works as a generator function using yield.) + + When no command can be read into the message, just yields a 'NO + COMMAND UNDERSTOOD:', followed by the message. + """ + from multiprocessing import Pool + tokens = message.split(' ') + if tokens[0] == 'FIB': + msg_fail_fib = 'MALFORMED FIB REQUEST' + if len(tokens) < 2: + yield msg_fail_fib + return + numbers = [] + for token in tokens[1:]: + if token != '0' and token.isdigit(): + numbers += [int(token)] + elif token == '': + continue + else: + yield msg_fail_fib + return + yield 'CALCULATING …' + reply = '' + with Pool(len(numbers)) as p: + results = p.map(fib, numbers) + reply = ' '.join([str(r) for r in results]) + yield reply + return + yield 'NO COMMAND UNDERSTOOD: %s' % message print('CONNECTION FROM:', str(self.client_address)) + counter_loop_killer = [False] + send_count = threading.Thread(target=send_counter_loop, + kwargs={'counter': self.server.counter, + 'socket': self.request, + 'kill': counter_loop_killer}) + send_count.start() for message in plom_socket_io.recv(self.request): if message is None: print('RECEIVED MALFORMED MESSAGE') - plom_socket_io.send(self.request, 'bad message') + caught_send(self.request, 'bad message') elif 'QUIT' == message: - plom_socket_io.send(self.request, 'BYE') + caught_send(self.request, 'BYE') break else: print('RECEIVED MESSAGE:', message) for reply in handle_message(message): - plom_socket_io.send(self.request, reply) + caught_send(self.request, reply) + counter_loop_killer = [True] print('CONNECTION CLOSED:', str(self.client_address)) self.request.close() -server = ThreadedTCPServer(('localhost', 5000), MyTCPHandler) +def inc_loop(counter, interval): + """Loop incrementing counter every interval seconds.""" + while True: + time.sleep(interval) + counter[0] += 1 + + +counter = [0] +b = threading.Thread(target=inc_loop, daemon=True, kwargs={'counter': counter, + 'interval': 1}) +b.start() +server = Server(counter, ('localhost', 5000), IO_Handler) try: server.serve_forever() except KeyboardInterrupt: pass finally: + print('Killing server') server.server_close() -- 2.30.2