8 # Avoid "Address already in use" errors.
9 socketserver.TCPServer.allow_reuse_address = True
12 class Server(socketserver.ThreadingTCPServer):
13 """Bind together threaded IO handling server and world state (counter)."""
15 def __init__(self, counter, *args, **kwargs):
16 super().__init__(*args, **kwargs)
17 self.counter = counter
18 self.daemon_threads = True # Else, server's threads have daemon=False.
22 """Calculate n-th Fibonacci number."""
26 return fib(n-1) + fib(n-2)
29 class IO_Handler(socketserver.BaseRequestHandler):
32 """Loop recv for input, send replies; also, send regular counter value.
34 If input is 'QUIT', send reply 'BYE' and end loop / connection.
35 Otherwise, use handle_message to interpret and enact commands.
37 def caught_send(socket, message):
38 """Send message by socket, catch broken socket connection error."""
40 plom_socket_io.send(socket, message)
41 except plom_socket_io.BrokenSocketConnection:
44 def send_counter_loop(socket, counter, kill):
45 """Every 5 seconds, send state of counter[0] until kill[0] set."""
47 caught_send(socket, 'COUNTER ' + str(counter[0]))
50 def handle_message(message):
51 """Evaluate message for tasks to perform, yield result.
53 Accepts one command: FIB, followed by positive integers, all tokens
54 separated by whitespace. Will calculate and return for each such
55 integer n the n-th Fibonacci number. Uses multiprocessing to
56 perform multiple such calculations in parallel. Yields a
57 'CALCULATING …' message before the calculation starts, and finally
58 yields a message containing the results. (The 'CALCULATING …'
59 message coming before the results message is currently the main
60 reason this works as a generator function using yield.)
62 When no command can be read into the message, just yields a 'NO
63 COMMAND UNDERSTOOD:', followed by the message.
65 from multiprocessing import Pool
66 tokens = message.split(' ')
67 if tokens[0] == 'FIB':
68 msg_fail_fib = 'MALFORMED FIB REQUEST'
73 for token in tokens[1:]:
74 if token != '0' and token.isdigit():
75 numbers += [int(token)]
83 with Pool(len(numbers)) as p:
84 results = p.map(fib, numbers)
85 reply = ' '.join([str(r) for r in results])
88 yield 'NO COMMAND UNDERSTOOD: %s' % message
90 print('CONNECTION FROM:', str(self.client_address))
91 counter_loop_killer = [False]
92 send_count = threading.Thread(target=send_counter_loop,
93 kwargs={'counter': self.server.counter,
94 'socket': self.request,
95 'kill': counter_loop_killer})
97 for message in plom_socket_io.recv(self.request):
99 print('RECEIVED MALFORMED MESSAGE')
100 caught_send(self.request, 'bad message')
101 elif 'QUIT' == message:
102 caught_send(self.request, 'BYE')
105 print('RECEIVED MESSAGE:', message)
106 for reply in handle_message(message):
107 caught_send(self.request, reply)
108 counter_loop_killer = [True]
109 print('CONNECTION CLOSED:', str(self.client_address))
113 def inc_loop(counter, interval):
114 """Loop incrementing counter every interval seconds."""
121 b = threading.Thread(target=inc_loop, daemon=True, kwargs={'counter': counter,
124 server = Server(counter, ('localhost', 5000), IO_Handler)
126 server.serve_forever()
127 except KeyboardInterrupt:
130 print('Killing server')
131 server.server_close()