7 # Avoid "Address already in use" errors.
8 socketserver.TCPServer.allow_reuse_address = True
11 class Server(socketserver.ThreadingTCPServer):
12 """Bind together threaded IO handling server and message queue."""
14 def __init__(self, queue, *args, **kwargs):
15 super().__init__(*args, **kwargs)
16 self.queue_out = queue
17 self.daemon_threads = True # Else, server's threads have daemon=False.
20 class IO_Handler(socketserver.BaseRequestHandler):
23 """Move messages between network socket and main thread via queues.
25 On start, sets up new queue, sends it via self.server.queue_out to
26 main thread, and from then on receives messages to send back from the
27 main thread via that new queue.
29 At the same time, loops over socket's recv to get messages from the
30 outside via self.server.queue_out into the main thread. Ends connection
31 once a 'QUIT' message is received from socket, and then also kills its
34 All messages to the main thread are tuples, with the first element a
35 meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
36 deletion, and 'COMMAND' for everything else), the second element a UUID
37 that uniquely identifies the thread (so that the main thread knows whom
38 to send replies back to), and optionally a third element for further
43 def caught_send(socket, message):
44 """Send message by socket, catch broken socket connection error."""
46 plom_socket_io.send(socket, message)
47 except plom_socket_io.BrokenSocketConnection:
50 def send_queue_messages(socket, queue_in, thread_alive):
51 """Send messages via socket from queue_in while thread_alive[0]."""
52 while thread_alive[0]:
54 msg = queue_in.get(timeout=1)
57 caught_send(socket, msg)
60 print('CONNECTION FROM:', str(self.client_address))
61 connection_id = uuid.uuid4()
62 queue_in = queue.Queue()
63 self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
65 t = threading.Thread(target=send_queue_messages,
66 args=(self.request, queue_in, thread_alive))
68 for message in plom_socket_io.recv(self.request):
70 caught_send(self.request, 'BAD MESSAGE')
71 elif 'QUIT' == message:
72 caught_send(self.request, 'BYE')
75 self.server.queue_out.put(('COMMAND', connection_id, message))
76 self.server.queue_out.put(('KILL_QUEUE', connection_id))
77 thread_alive[0] = False
78 print('CONNECTION CLOSED FROM:', str(self.client_address))
84 map_ = 'xxxxx\nx...x\nx.X.x\nx...x\nxxxxx'
89 """Calculate n-th Fibonacci number. Very inefficiently."""
93 return fib(n-1) + fib(n-2)
98 def __init__(self, queues_out):
99 from multiprocessing import Pool
100 self.queues_out = queues_out
103 self.pool_result = None
105 def send_to(self, connection_id, msg):
106 """Send msg to client of connection_id."""
107 self.queues_out[connection_id].put(msg)
109 def send_all(self, msg):
110 """Send msg to all clients."""
111 for connection_id in self.queues_out:
112 self.send_to(connection_id, msg)
114 def cmd_fib(self, tokens, connection_id):
115 """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
117 Numbers are calculated in parallel as far as possible, using fib().
118 A 'CALCULATING …' message is sent to caller before the result.
120 fib_fail = 'MALFORMED FIB REQUEST'
122 self.send_to(connection_id, fib_fail)
125 for token in tokens[1:]:
126 if token != '0' and token.isdigit():
127 numbers += [int(token)]
129 self.send_to(connection_id, fib_fail)
131 self.send_to(connection_id, 'CALCULATING …')
132 results = self.pool.map(fib, numbers)
133 reply = ' '.join([str(r) for r in results])
134 self.send_to(connection_id, reply)
136 def cmd_inc(self, connection_id):
137 """Increment world.turn, send game turn data to everyone.
139 To simulate game processing waiting times, a one second delay between
140 TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
141 calculations are started as pool processes that need to be finished
142 until a further INC finishes the turn.
144 from time import sleep
145 if self.pool_result is not None:
146 self.pool_result.wait()
147 self.send_all('TURN_FINISHED ' + str(self.world.turn))
150 self.send_all('NEW_TURN ' + str(self.world.turn))
151 self.send_all('TERRAIN\n' + self.world.map_)
152 self.send_all('POSITION y:' + str(self.world.player_pos[0]) +
153 ',x:' + str(self.world.player_pos[1]))
154 self.pool_result = self.pool.map_async(fib, (35, 35))
156 def cmd_get_turn(self, connection_id):
157 """Send world.turn to caller."""
158 self.send_to(connection_id, str(self.world.turn))
160 def cmd_echo(self, tokens, input_, connection_id):
161 """Send message in input_ beyond tokens[0] to caller."""
162 msg = input_[len(tokens[0]) + 1:]
163 self.send_to(connection_id, msg)
165 def cmd_all(self, tokens, input_):
166 """Send message in input_ beyond tokens[0] to all clients."""
167 msg = input_[len(tokens[0]) + 1:]
170 def handle_input(self, input_, connection_id):
171 """Process input_ to command grammar, call command handler if found."""
172 tokens = [token for token in input_.split(' ') if len(token) > 0]
174 self.send_to(connection_id, 'EMPTY COMMAND')
175 elif len(tokens) == 1 and tokens[0] == 'INC':
176 self.cmd_inc(connection_id)
177 elif len(tokens) == 1 and tokens[0] == 'GET_TURN':
178 self.cmd_get_turn(connection_id)
179 elif len(tokens) >= 1 and tokens[0] == 'ECHO':
180 self.cmd_echo(tokens, input_, connection_id)
181 elif len(tokens) >= 1 and tokens[0] == 'ALL':
182 self.cmd_all(tokens, input_)
183 elif len(tokens) >= 1 and tokens[0] == 'FIB':
184 # TODO: Should this really block the whole loop?
185 self.cmd_fib(tokens, connection_id)
187 self.send_to(connection_id, 'UNKNOWN COMMAND')
191 """Handle commands coming through queue q, send results back.
193 Commands from q are expected to be tuples, with the first element either
194 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
195 an optional third element of arbitrary type. The UUID identifies a
196 receiver for replies.
198 An 'ADD_QUEUE' command should contain as third element a queue through
199 which to send messages back to the sender of the command. A 'KILL_QUEUE'
200 command removes the queue for that receiver from the list of queues through
201 which to send replies.
203 A 'COMMAND' command is specified in greater detail by a string that is the
204 tuple's third element. CommandHandler takes care of processing this and
208 command_handler = CommandHandler(queues_out)
213 content = None if len(x) == 2 else x[2]
214 if command_type == 'ADD_QUEUE':
215 queues_out[connection_id] = content
216 elif command_type == 'COMMAND':
217 command_handler.handle_input(content, connection_id)
218 elif command_type == 'KILL_QUEUE':
219 del queues_out[connection_id]
223 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
225 server = Server(q, ('localhost', 5000), IO_Handler)
227 server.serve_forever()
228 except KeyboardInterrupt:
231 print('Killing server')
232 server.server_close()