7 # Avoid "Address already in use" errors.
8 socketserver.TCPServer.allow_reuse_address = True
11 class Server(socketserver.ThreadingTCPServer):
12 """Bind together threaded IO handling server and message queue."""
14 def __init__(self, queue, *args, **kwargs):
15 super().__init__(*args, **kwargs)
16 self.queue_out = queue
17 self.daemon_threads = True # Else, server's threads have daemon=False.
20 class IO_Handler(socketserver.BaseRequestHandler):
23 """Move messages between network socket and main thread via queues.
25 On start, sets up new queue, sends it via self.server.queue_out to
26 main thread, and from then on receives messages to send back from the
27 main thread via that new queue.
29 At the same time, loops over socket's recv to get messages from the
30 outside via self.server.queue_out into the main thread. Ends connection
31 once a 'QUIT' message is received from socket, and then also kills its
34 All messages to the main thread are tuples, with the first element a
35 meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
36 deletion, and 'COMMAND' for everything else), the second element a UUID
37 that uniquely identifies the thread (so that the main thread knows whom
38 to send replies back to), and optionally a third element for further
43 def caught_send(socket, message):
44 """Send message by socket, catch broken socket connection error."""
46 plom_socket_io.send(socket, message)
47 except plom_socket_io.BrokenSocketConnection:
50 def send_queue_messages(socket, queue_in, thread_alive):
51 """Send messages via socket from queue_in while thread_alive[0]."""
52 while thread_alive[0]:
54 msg = queue_in.get(timeout=1)
57 caught_send(socket, msg)
60 print('CONNECTION FROM:', str(self.client_address))
61 connection_id = uuid.uuid4()
62 queue_in = queue.Queue()
63 self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
65 t = threading.Thread(target=send_queue_messages,
66 args=(self.request, queue_in, thread_alive))
68 for message in plom_socket_io.recv(self.request):
70 caught_send(self.request, 'BAD MESSAGE')
71 elif 'QUIT' == message:
72 caught_send(self.request, 'BYE')
75 self.server.queue_out.put(('COMMAND', connection_id, message))
76 self.server.queue_out.put(('KILL_QUEUE', connection_id))
77 thread_alive[0] = False
78 print('CONNECTION CLOSED FROM:', str(self.client_address))
87 """Calculate n-th Fibonacci number. Very inefficiently."""
91 return fib(n-1) + fib(n-2)
96 def __init__(self, world, queues_out):
98 self.queues_out = queues_out
100 def send_to(self, connection_id, msg):
101 """Send msg to client of connection_id."""
102 self.queues_out[connection_id].put(msg)
104 def send_all(self, msg):
105 """Send msg to all clients."""
106 for connection_id in self.queues_out:
107 self.send_to(connection_id, msg)
109 def cmd_fib(self, tokens, connection_id):
110 """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
112 Numbers are calculated in parallel as far as possible, using fib().
113 A 'CALCULATING …' message is sent to caller before the result.
115 from multiprocessing import Pool
116 fib_fail = 'MALFORMED FIB REQUEST'
118 self.send_to(connection_id, fib_fail)
121 for token in tokens[1:]:
122 if token != '0' and token.isdigit():
123 numbers += [int(token)]
125 self.send_to(connection_id, fib_fail)
127 self.send_to(connection_id, 'CALCULATING …')
128 with Pool(len(numbers)) as p:
129 results = p.map(fib, numbers)
130 reply = ' '.join([str(r) for r in results])
131 self.send_to(connection_id, reply)
133 def cmd_inc(self, connection_id):
134 """Increment world.turn, send NEW_TURN message to all clients."""
136 self.send_all('NEW_TURN ' + str(self.world.turn))
138 def cmd_get_turn(self, connection_id):
139 """Send world.turn to caller."""
140 self.send_to(connection_id, str(self.world.turn))
142 def cmd_echo(self, tokens, input_, connection_id):
143 """Send message in input_ beyond tokens[0] to caller."""
144 msg = input_[len(tokens[0]) + 1:]
145 self.send_to(connection_id, msg)
147 def cmd_all(self, tokens, input_):
148 """Send message in input_ beyond tokens[0] to all clients."""
149 msg = input_[len(tokens[0]) + 1:]
152 def handle_input(self, input_, connection_id):
153 """Process input_ to command grammar, call command handler if found."""
154 tokens = [token for token in input_.split(' ') if len(token) > 0]
156 self.send_to(connection_id, 'EMPTY COMMAND')
157 elif len(tokens) == 1 and tokens[0] == 'INC':
158 self.cmd_inc(connection_id)
159 elif len(tokens) == 1 and tokens[0] == 'GET_TURN':
160 self.cmd_get_turn(connection_id)
161 elif len(tokens) >= 1 and tokens[0] == 'ECHO':
162 self.cmd_echo(tokens, input_, connection_id)
163 elif len(tokens) >= 1 and tokens[0] == 'ALL':
164 self.cmd_all(tokens, input_)
165 elif len(tokens) >= 1 and tokens[0] == 'FIB':
166 # TODO: Should this really block the whole loop?
167 self.cmd_fib(tokens, connection_id)
169 self.send_to(connection_id, 'UNKNOWN COMMAND')
173 """Handle commands coming through queue q, send results back.
175 Commands from q are expected to be tuples, with the first element either
176 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
177 an optional third element of arbitrary type. The UUID identifies a
178 receiver for replies.
180 An 'ADD_QUEUE' command should contain as third element a queue through
181 which to send messages back to the sender of the command. A 'KILL_QUEUE'
182 command removes the queue for that receiver from the list of queues through
183 which to send replies.
185 A 'COMMAND' command is specified in greater detail by a string that is the
186 tuple's third element. CommandHandler takes care of processing this and
191 command_handler = CommandHandler(world, queues_out)
196 content = None if len(x) == 2 else x[2]
197 if command_type == 'ADD_QUEUE':
198 queues_out[connection_id] = content
199 elif command_type == 'COMMAND':
200 command_handler.handle_input(content, connection_id)
201 elif command_type == 'KILL_QUEUE':
202 del queues_out[connection_id]
206 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
208 server = Server(q, ('localhost', 5000), IO_Handler)
210 server.serve_forever()
211 except KeyboardInterrupt:
214 print('Killing server')
215 server.server_close()