7 # Avoid "Address already in use" errors.
8 socketserver.TCPServer.allow_reuse_address = True
11 class Server(socketserver.ThreadingTCPServer):
12 """Bind together threaded IO handling server and message queue."""
14 def __init__(self, queue, *args, **kwargs):
15 super().__init__(*args, **kwargs)
16 self.queue_out = queue
17 self.daemon_threads = True # Else, server's threads have daemon=False.
20 class IO_Handler(socketserver.BaseRequestHandler):
23 """Move messages between network socket and main thread via queues.
25 On start, sets up new queue, sends it via self.server.queue_out to
26 main thread, and from then on receives messages to send back from the
27 main thread via that new queue.
29 At the same time, loops over socket's recv to get messages from the
30 outside via self.server.queue_out into the main thread. Ends connection
31 once a 'QUIT' message is received from socket, and then also kills its
34 All messages to the main thread are tuples, with the first element a
35 meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
36 deletion, and 'COMMAND' for everything else), the second element a UUID
37 that uniquely identifies the thread (so that the main thread knows whom
38 to send replies back to), and optionally a third element for further
43 def caught_send(socket, message):
44 """Send message by socket, catch broken socket connection error."""
46 plom_socket_io.send(socket, message)
47 except plom_socket_io.BrokenSocketConnection:
50 def send_queue_messages(socket, queue_in, thread_alive):
51 """Send messages via socket from queue_in while thread_alive[0]."""
52 while thread_alive[0]:
54 msg = queue_in.get(timeout=1)
57 caught_send(socket, msg)
60 print('CONNECTION FROM:', str(self.client_address))
61 connection_id = uuid.uuid4()
62 queue_in = queue.Queue()
63 self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
65 t = threading.Thread(target=send_queue_messages,
66 args=(self.request, queue_in, thread_alive))
68 for message in plom_socket_io.recv(self.request):
70 caught_send(self.request, 'BAD MESSAGE')
71 elif 'QUIT' == message:
72 caught_send(self.request, 'BYE')
75 self.server.queue_out.put(('COMMAND', connection_id, message))
76 self.server.queue_out.put(('KILL_QUEUE', connection_id))
77 thread_alive[0] = False
78 print('CONNECTION CLOSED FROM:', str(self.client_address))
84 def __init__(self, position):
85 self.position = position
92 def task_moveup(self):
95 def task_movedown(self):
98 def decide_task(self):
101 def set_task(self, task):
105 def proceed(self, is_AI=True):
106 """Further the thing in its tasks.
108 Decrements self.todo; if it thus falls to <= 0, enacts the method whose
109 name is 'task_' + self.task and sets self.task = None. If is_AI, calls
110 self.decide_task to decide a new self.task.
114 task= getattr(self, 'task_' + self.task)
117 if is_AI and self.task is None:
125 self.map_size = (5, 5)
126 self.map_ = 'xxxxx\n'+\
131 self.things = [Thing(position=[3, 3]), Thing([1, 1])]
133 self.player = self.things[self.player_i]
137 """Calculate n-th Fibonacci number. Very inefficiently."""
141 return fib(n-1) + fib(n-2)
144 class ArgumentError(Exception):
148 class CommandHandler:
150 def __init__(self, queues_out):
151 from multiprocessing import Pool
152 self.queues_out = queues_out
154 # self.pool and self.pool_result are currently only needed by the FIB
155 # command and the demo of a parallelized game loop in cmd_inc_p.
157 self.pool_result = None
159 def send_to(self, connection_id, msg):
160 """Send msg to client of connection_id."""
161 self.queues_out[connection_id].put(msg)
163 def send_all(self, msg):
164 """Send msg to all clients."""
165 for connection_id in self.queues_out:
166 self.send_to(connection_id, msg)
168 def stringify_yx(self, tuple_):
169 """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
170 return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
172 def proceed_to_next_player_turn(self, connection_id):
173 """Run game world turns until player can decide their next step.
175 Sends a 'TURN_FINISHED' message, then iterates through all non-player
176 things, on each step furthering them in their tasks (and letting them
177 decide new ones if they finish). The iteration order is: first all
178 things that come after the player in the world things list, then (after
179 incrementing the world turn) all that come before the player; then the
180 player's .proceed() is run, and if it does not finish his task, the
181 loop starts at the beginning. Once the player's task is finished, the
182 loop breaks, and client-relevant game data is sent.
184 self.send_all('TURN_FINISHED ' + str(self.world.turn))
186 for thing in self.world.things[self.world.player_i+1:]:
189 for thing in self.world.things[:self.world.player_i]:
191 self.world.player.proceed(is_AI=False)
192 if self.world.player.task is None:
194 self.send_all('NEW_TURN ' + str(self.world.turn))
195 self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
196 self.send_all('TERRAIN\n' + self.world.map_)
197 self.send_all('POSITION ' + self.stringify_yx(self.world.player.position))
199 def cmd_fib(self, tokens, connection_id):
200 """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
202 Numbers are calculated in parallel as far as possible, using fib().
203 A 'CALCULATING …' message is sent to caller before the result.
206 raise ArgumentError('FIB NEEDS AT LEAST ONE ARGUMENT')
208 for token in tokens[1:]:
209 if token == '0' or not token.isdigit():
210 raise ArgumentError('FIB ARGUMENTS MUST BE INTEGERS > 0')
211 numbers += [int(token)]
212 self.send_to(connection_id, 'CALCULATING …')
213 results = self.pool.map(fib, numbers)
214 reply = ' '.join([str(r) for r in results])
215 self.send_to(connection_id, reply)
217 def cmd_inc_p(self, connection_id):
218 """Increment world.turn, send game turn data to everyone.
220 To simulate game processing waiting times, a one second delay between
221 TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
222 calculations are started as pool processes that need to be finished
223 until a further INC finishes the turn.
225 This is just a demo structure for how the game loop could work when
226 parallelized. One might imagine a two-step game turn, with a non-action
227 step determining actor tasks (the AI determinations would take the
228 place of the fib calculations here), and an action step wherein these
229 tasks are performed (where now sleep(1) is).
231 from time import sleep
232 if self.pool_result is not None:
233 self.pool_result.wait()
234 self.send_all('TURN_FINISHED ' + str(self.world.turn))
237 self.send_all('NEW_TURN ' + str(self.world.turn))
238 self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
239 self.send_all('TERRAIN\n' + self.world.map_)
240 self.send_all('POSITION ' + self.stringify_yx(self.world.player.position))
241 self.pool_result = self.pool.map_async(fib, (35, 35))
243 def cmd_get_turn(self, connection_id):
244 """Send world.turn to caller."""
245 self.send_to(connection_id, str(self.world.turn))
247 def cmd_move(self, direction, connection_id):
248 """Set player task to 'moveup' or 'movedown', finish player turn."""
249 if not direction in {'UP', 'DOWN'}:
250 raise ArgumentError('MOVE ARGUMENT MUST BE "UP" or "DOWN"')
251 if direction == 'UP':
252 self.world.player.set_task('moveup')
254 self.world.player.set_task('movedown')
255 self.proceed_to_next_player_turn(connection_id)
257 def cmd_wait(self, connection_id):
258 """Set player task to 'wait', finish player turn."""
259 self.world.player.set_task('wait')
260 self.proceed_to_next_player_turn(connection_id)
262 def cmd_echo(self, tokens, input_, connection_id):
263 """Send message in input_ beyond tokens[0] to caller."""
264 msg = input_[len(tokens[0]) + 1:]
265 self.send_to(connection_id, msg)
267 def cmd_all(self, tokens, input_):
268 """Send message in input_ beyond tokens[0] to all clients."""
269 msg = input_[len(tokens[0]) + 1:]
272 def handle_input(self, input_, connection_id):
273 """Process input_ to command grammar, call command handler if found."""
274 tokens = [token for token in input_.split(' ') if len(token) > 0]
277 self.send_to(connection_id, 'EMPTY COMMAND')
278 elif len(tokens) == 1 and tokens[0] == 'INC_P':
279 self.cmd_inc_p(connection_id)
280 elif len(tokens) == 1 and tokens[0] == 'GET_TURN':
281 self.cmd_get_turn(connection_id)
282 elif len(tokens) == 1 and tokens[0] == 'WAIT':
283 self.cmd_wait(connection_id)
284 elif len(tokens) == 2 and tokens[0] == 'MOVE':
285 self.cmd_move(tokens[1], connection_id)
286 elif len(tokens) >= 1 and tokens[0] == 'ECHO':
287 self.cmd_echo(tokens, input_, connection_id)
288 elif len(tokens) >= 1 and tokens[0] == 'ALL':
289 self.cmd_all(tokens, input_)
290 elif len(tokens) >= 1 and tokens[0] == 'FIB':
291 # TODO: Should this really block the whole loop?
292 self.cmd_fib(tokens, connection_id)
294 self.send_to(connection_id, 'UNKNOWN COMMAND')
295 except ArgumentError as e:
296 self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
300 """Handle commands coming through queue q, send results back.
302 Commands from q are expected to be tuples, with the first element either
303 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
304 an optional third element of arbitrary type. The UUID identifies a
305 receiver for replies.
307 An 'ADD_QUEUE' command should contain as third element a queue through
308 which to send messages back to the sender of the command. A 'KILL_QUEUE'
309 command removes the queue for that receiver from the list of queues through
310 which to send replies.
312 A 'COMMAND' command is specified in greater detail by a string that is the
313 tuple's third element. CommandHandler takes care of processing this and
317 command_handler = CommandHandler(queues_out)
322 content = None if len(x) == 2 else x[2]
323 if command_type == 'ADD_QUEUE':
324 queues_out[connection_id] = content
325 elif command_type == 'COMMAND':
326 command_handler.handle_input(content, connection_id)
327 elif command_type == 'KILL_QUEUE':
328 del queues_out[connection_id]
332 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
334 server = Server(q, ('localhost', 5000), IO_Handler)
336 server.serve_forever()
337 except KeyboardInterrupt:
340 print('Killing server')
341 server.server_close()