7 # Avoid "Address already in use" errors.
8 socketserver.TCPServer.allow_reuse_address = True
11 class Server(socketserver.ThreadingTCPServer):
12 """Bind together threaded IO handling server and message queue."""
14 def __init__(self, queue, *args, **kwargs):
15 super().__init__(*args, **kwargs)
16 self.queue_out = queue
17 self.daemon_threads = True # Else, server's threads have daemon=False.
21 """Calculate n-th Fibonacci number. Very inefficiently."""
25 return fib(n-1) + fib(n-2)
28 class IO_Handler(socketserver.BaseRequestHandler):
31 """Move messages between network socket and main thread via queues.
33 On start, sets up new queue, sends it via self.server.queue_out to
34 main thread, and from then on receives messages to send back from the
35 main thread via that new queue.
37 At the same time, loops over socket's recv to get messages from the
38 outside via self.server.queue_out into the main thread. Ends connection
39 once a 'QUIT' message is received from socket, and then also kills its
42 All messages to the main thread are tuples, with the first element a
43 meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
44 deletion, and 'COMMAND' for everything else), the second element a UUID
45 that uniquely identifies the thread (so that the main thread knows whom
46 to send replies back to), and optionally a third element for further
51 def caught_send(socket, message):
52 """Send message by socket, catch broken socket connection error."""
54 plom_socket_io.send(socket, message)
55 except plom_socket_io.BrokenSocketConnection:
58 def send_queue_messages(socket, queue_in, thread_alive):
59 """Send messages via socket from queue_in while thread_alive[0]."""
60 while thread_alive[0]:
62 msg = queue_in.get(timeout=1)
65 caught_send(socket, msg)
68 print('CONNECTION FROM:', str(self.client_address))
69 connection_id = uuid.uuid4()
70 queue_in = queue.Queue()
71 self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
73 t = threading.Thread(target=send_queue_messages,
74 args=(self.request, queue_in, thread_alive))
76 for message in plom_socket_io.recv(self.request):
78 caught_send(self.request, 'BAD MESSAGE')
79 elif 'QUIT' == message:
80 caught_send(self.request, 'BYE')
83 self.server.queue_out.put(('COMMAND', connection_id, message))
84 self.server.queue_out.put(('KILL_QUEUE', connection_id))
85 thread_alive[0] = False
86 print('CONNECTION CLOSED:', str(self.client_address))
91 """Handle commands coming through queue q, send results back.
93 Commands from q are expected to be tuples, with the first element either
94 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
95 an optional third element of arbitrary type. The UUID identifies a
98 An 'ADD_QUEUE' command should contain as third element a queue through
99 which to send messages back to the sender of the command. A 'KILL_QUEUE'
100 command removes the queue for that receiver from the list of queues through
101 which to send replies.
103 A 'COMMAND' command is specified in greater detail by a string that is the
104 tuple's third element. Here, the following commands are understood:
105 - A string starting with 'PRIVMSG' returns the space-separated tokens
106 following 'PRIVMSG' to the sender via its receiver queue.
107 - A string starting with 'ALL' sends the space-separated tokens following
108 'ALL' to all receiver queues.
109 - A string starting with 'FIB' followed by space-separated positive
110 integers returns to the receiver queue first a 'CALCULATING …' messsage,
111 and afterwards for each such integer n the n-th Fibonacci number as a
112 space-separated sequence of integers. Fibonacci numbers are calculated
113 in parallel if possible.
115 from multiprocessing import Pool
121 content = None if len(x) == 2 else x[2]
122 if command_type == 'ADD_QUEUE':
123 queues_out[connection_id] = content
124 elif command_type == 'COMMAND':
125 tokens = [token for token in content.split(' ') if len(token) > 0]
127 queues_out[connection_id].put('EMPTY COMMAND')
129 if tokens[0] == 'PRIVMSG':
130 reply = ' '.join(tokens[1:])
131 queues_out[connection_id].put(reply)
132 elif tokens[0] == 'ALL':
133 reply = ' '.join(tokens[1:])
134 for key in queues_out:
135 queues_out[key].put(reply)
136 elif tokens[0] == 'FIB':
137 fib_fail = 'MALFORMED FIB REQUEST'
139 queues_out[connection_id].put(fib_fail)
143 for token in tokens[1:]:
144 if token != '0' and token.isdigit():
145 numbers += [int(token)]
147 queues_out[connection_id].put(fib_fail)
152 queues_out[connection_id].put('CALCULATING …')
154 # this blocks the whole loop, BAD
155 with Pool(len(numbers)) as p:
156 results = p.map(fib, numbers)
157 reply = ' '.join([str(r) for r in results])
158 queues_out[connection_id].put(reply)
160 queues_out[connection_id].put('UNKNOWN COMMAND')
161 elif command_type == 'KILL_QUEUE':
162 del queues_out[connection_id]
166 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
168 server = Server(q, ('localhost', 5000), IO_Handler)
170 server.serve_forever()
171 except KeyboardInterrupt:
174 print('Killing server')
175 server.server_close()