home · contact · privacy
Some more experiments with threading.
authorChristian Heller <c.heller@plomlompom.de>
Wed, 26 Apr 2017 01:26:26 +0000 (03:26 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Wed, 26 Apr 2017 01:26:26 +0000 (03:26 +0200)
README.md
client.py
plom_socket_io.py
server.py

index b65f327805da2c2af0fbcdbd759c3c58714a7815..e5301c4731d980c836d142eb85662876ca925f74 100644 (file)
--- a/README.md
+++ b/README.md
@@ -26,4 +26,6 @@ The following commands can be sent from client to server:
   such integer n, calculates th nn-th Fibonacci number (this allows for testing
   parallel CPU-heavy computation)
 
+Additionally, the server regularly sends a counter state to the client.
+
 See `./requirements.txt` for the dependencies.
index 92d555e359cd757a985748b2eba72120eee1a045..3e50b2ec43dfd7ef6427ca864e05b174fface59d 100755 (executable)
--- a/client.py
+++ b/client.py
@@ -1,18 +1,18 @@
 #!/usr/bin/env python3
 
 import urwid
-import plom_socket_io 
+import plom_socket_io
 import socket
 import threading
 
 
-class RecvThread(threading.Thread):
-    """Background thread that delivers messages from the socket to urwid.
+def recv_loop(socket, urwid_pipe_write_fd, server_output):
+    """Loop to receive messages from socket and deliver them to urwid.
 
     The message transfer to urwid is a bit weird. The urwid developers warn
     against sharing urwid resources among threads, and recommend using urwid's
     watch_pipe mechanism: using a pipe from non-urwid threads into a single
-    urwid thread. We could pipe the recv output directly, but then we get
+    urwid thread. We could pipe socket.recv output directly, but then we get
     complicated buffering situations here as well as in the urwid code that
     receives the pipe content. It's much easier to update a third resource
     (server_output, which references an object that's also known to the urwid
@@ -20,24 +20,15 @@ class RecvThread(threading.Thread):
     (urwid_pipe_write_fd) to trigger the urwid code to pull the message in from
     that third resource. We send a single b' ' through the pipe to trigger it.
     """
-
-    def __init__(self, socket, urwid_pipe_write_fd, server_output):
-        super().__init__()
-        self.socket = socket
-        self.urwid_pipe = urwid_pipe_write_fd
-        self.server_output = server_output
-
-    def run(self):
-        """On message receive, write to self.server_output, ping urwid pipe."""
-        import os
-        for msg in plom_socket_io.recv(self.socket):
-            self.server_output[0] = msg
-            os.write(self.urwid_pipe, b' ')
+    import os
+    for msg in plom_socket_io.recv(socket):
+        server_output[0] = msg
+        os.write(urwid_pipe_write_fd, b' ')
 
 
 class InputHandler:
     """Helps delivering data from other thread to widget via message_container.
-    
+
     The whole class only exists to provide handle_input as a bound method, with
     widget and message_container pre-set, as (bound) handle_input is used as a
     callback in urwid's watch_pipe – which merely provides its callback target
@@ -63,7 +54,7 @@ class InputHandler:
         if self.message_container[0] == 'BYE':
             raise urwid.ExitMainLoop()
             return
-        self.widget.set_text('REPLY: ' + self.message_container[0])
+        self.widget.set_text('SERVER: ' + self.message_container[0])
 
 
 class SocketInputWidget(urwid.Filler):
@@ -91,7 +82,9 @@ loop = urwid.MainLoop(fill)
 server_output = ['']
 write_fd = loop.watch_pipe(getattr(InputHandler(txt, server_output),
                                    'handle_input'))
-thread = RecvThread(s, write_fd, server_output)
+thread = threading.Thread(target=recv_loop,
+                          kwargs={'socket': s, 'server_output': server_output,
+                                  'urwid_pipe_write_fd': write_fd})
 thread.start()
 
 loop.run()
index 07d41ce8bd909bb5e53fd23c829e0b1f7685239f..ebde3c18f5f37af6beeef0ff025593d106ddc1bc 100644 (file)
@@ -1,3 +1,7 @@
+class BrokenSocketConnection(Exception):
+    pass
+
+
 def send(socket, message):
     """Send message via socket, encoded and delimited the way recv() expects.
 
@@ -26,9 +30,17 @@ def send(socket, message):
     data = escaped_message.encode()
     totalsent = 0
     while totalsent < len(data):
-        sent = socket.send(data[totalsent:])
-        if sent == 0:
-            raise RuntimeError('socket connection broken')
+        socket_broken = False
+        try:
+            sent = socket.send(data[totalsent:])
+            socket_broken = sent == 0
+        except OSError as err:
+            if err.errno == 9:  # "Bad file descriptor", when connection broken
+                socket_broken = True
+            else:
+                raise err
+        if socket_broken:
+            raise BrokenSocketConnection
         totalsent = totalsent + sent
 
 
@@ -48,7 +60,6 @@ def recv(socket):
     meaningful way; that's why we do our own message segmentation with $ as a
     delimiter.
     """
-    quit = False
     esc = False
     data = b''
     msg = b''
index b326768ea4a5499818f141afe8b4b7a1e8d3d1ad..b088206ef07ea35a1b91372ca4931c196c74f89f 100755 (executable)
--- a/server.py
+++ b/server.py
@@ -1,12 +1,23 @@
 #!/usr/bin/env python3
 
 import socketserver
-import plom_socket_io 
+import plom_socket_io
+import threading
+import time
 
 # Avoid "Address already in use" errors.
 socketserver.TCPServer.allow_reuse_address = True
 
 
+class Server(socketserver.ThreadingTCPServer):
+    """Bind together threaded IO handling server and world state (counter)."""
+
+    def __init__(self, counter, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.counter = counter
+        self.daemon_threads = True  # Else, server's threads have daemon=False.
+
+
 def fib(n):
     """Calculate n-th Fibonacci number."""
     if n in (1, 2):
@@ -15,81 +26,106 @@ def fib(n):
         return fib(n-1) + fib(n-2)
 
 
-def handle_message(message):
-    """Evaluate message for computing-heavy tasks to perform, yield result.
-
-    Accepts one command: FIB, followed by positive integers, all tokens
-    separated by whitespace. Will calculate and return for each such integer n
-    the n-th Fibonacci number. Uses multiprocessing to perform multiple such
-    calculations in parallel. Yields a 'CALCULATING …' message before the
-    calculation starts, and finally yields a message containing the results.
-    (The 'CALCULATING …' message coming before the results message is currently
-    the main reason this works as a generator function using yield.)
-
-    When no command can be read into the message, just yields a 'NO COMMAND
-    UNDERSTOOD:', followed by the message.
-    """
-    tokens = message.split(' ')
-    if tokens[0] == 'FIB':
-        msg_fail_fib = 'MALFORMED FIB REQUEST'
-        if len(tokens) < 2:
-            yield msg_fail_fib
-            return
-        numbers = []
-        fail = False
-        for token in tokens[1:]:
-            if token != '0' and token.isdigit():
-                numbers += [int(token)]
-            elif token == '':
-                continue
-            else:
-                yield msg_fail_fib
-                return
-        yield 'CALCULATING …'
-        reply = ''
-        from multiprocessing import Pool
-        with Pool(len(numbers)) as p:
-            results = p.map(fib, numbers)
-        reply = ' '.join([str(r) for r in results])
-        yield reply
-        return
-    yield 'NO COMMAND UNDERSTOOD: %s' % message
-
-
-class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
-    """Enables threading on TCP server for asynchronous IO handling."""
-    pass
-
-
-class MyTCPHandler(socketserver.BaseRequestHandler):
+class IO_Handler(socketserver.BaseRequestHandler):
 
     def handle(self):
-        """Loop recv for input, act on it, send reply.
+        """Loop recv for input, send replies; also, send regular counter value.
 
         If input is 'QUIT', send reply 'BYE' and end loop / connection.
-        Otherwise, use handle_message.
+        Otherwise, use handle_message to interpret and enact commands.
         """
+        def caught_send(socket, message):
+            """Send message by socket, catch broken socket connection error."""
+            try:
+                plom_socket_io.send(socket, message)
+            except plom_socket_io.BrokenSocketConnection:
+                pass
+
+        def send_counter_loop(socket, counter, kill):
+            """Every 5 seconds, send state of counter[0] until kill[0] set."""
+            while not kill[0]:
+                caught_send(socket, 'COUNTER ' + str(counter[0]))
+                time.sleep(5)
+
+        def handle_message(message):
+            """Evaluate message for tasks to perform, yield result.
+
+            Accepts one command: FIB, followed by positive integers, all tokens
+            separated by whitespace. Will calculate and return for each such
+            integer n the n-th Fibonacci number. Uses multiprocessing to
+            perform multiple such calculations in parallel. Yields a
+            'CALCULATING …' message before the calculation starts, and finally
+            yields a message containing the results. (The 'CALCULATING …'
+            message coming before the results message is currently the main
+            reason this works as a generator function using yield.)
+
+            When no command can be read into the message, just yields a 'NO
+            COMMAND UNDERSTOOD:', followed by the message.
+            """
+            from multiprocessing import Pool
+            tokens = message.split(' ')
+            if tokens[0] == 'FIB':
+                msg_fail_fib = 'MALFORMED FIB REQUEST'
+                if len(tokens) < 2:
+                    yield msg_fail_fib
+                    return
+                numbers = []
+                for token in tokens[1:]:
+                    if token != '0' and token.isdigit():
+                        numbers += [int(token)]
+                    elif token == '':
+                        continue
+                    else:
+                        yield msg_fail_fib
+                        return
+                yield 'CALCULATING …'
+                reply = ''
+                with Pool(len(numbers)) as p:
+                    results = p.map(fib, numbers)
+                reply = ' '.join([str(r) for r in results])
+                yield reply
+                return
+            yield 'NO COMMAND UNDERSTOOD: %s' % message
 
         print('CONNECTION FROM:', str(self.client_address))
+        counter_loop_killer = [False]
+        send_count = threading.Thread(target=send_counter_loop,
+                                      kwargs={'counter': self.server.counter,
+                                              'socket': self.request,
+                                              'kill': counter_loop_killer})
+        send_count.start()
         for message in plom_socket_io.recv(self.request):
             if message is None:
                 print('RECEIVED MALFORMED MESSAGE')
-                plom_socket_io.send(self.request, 'bad message')
+                caught_send(self.request, 'bad message')
             elif 'QUIT' == message:
-                plom_socket_io.send(self.request, 'BYE')
+                caught_send(self.request, 'BYE')
                 break
             else:
                 print('RECEIVED MESSAGE:', message)
                 for reply in handle_message(message):
-                    plom_socket_io.send(self.request, reply)
+                    caught_send(self.request, reply)
+        counter_loop_killer = [True]
         print('CONNECTION CLOSED:', str(self.client_address))
         self.request.close()
 
 
-server = ThreadedTCPServer(('localhost', 5000), MyTCPHandler)
+def inc_loop(counter, interval):
+    """Loop incrementing counter every interval seconds."""
+    while True:
+        time.sleep(interval)
+        counter[0] += 1
+
+
+counter = [0]
+b = threading.Thread(target=inc_loop, daemon=True, kwargs={'counter': counter,
+                                                           'interval': 1})
+b.start()
+server = Server(counter, ('localhost', 5000), IO_Handler)
 try:
     server.serve_forever()
 except KeyboardInterrupt:
     pass
 finally:
+    print('Killing server')
     server.server_close()