7 # Avoid "Address already in use" errors.
8 socketserver.TCPServer.allow_reuse_address = True
11 class Server(socketserver.ThreadingTCPServer):
12 """Bind together threaded IO handling server and message queue."""
14 def __init__(self, queue, *args, **kwargs):
15 super().__init__(*args, **kwargs)
16 self.queue_out = queue
17 self.daemon_threads = True # Else, server's threads have daemon=False.
20 class IO_Handler(socketserver.BaseRequestHandler):
23 """Move messages between network socket and main thread via queues.
25 On start, sets up new queue, sends it via self.server.queue_out to
26 main thread, and from then on receives messages to send back from the
27 main thread via that new queue.
29 At the same time, loops over socket's recv to get messages from the
30 outside via self.server.queue_out into the main thread. Ends connection
31 once a 'QUIT' message is received from socket, and then also kills its
34 All messages to the main thread are tuples, with the first element a
35 meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
36 deletion, and 'COMMAND' for everything else), the second element a UUID
37 that uniquely identifies the thread (so that the main thread knows whom
38 to send replies back to), and optionally a third element for further
43 def caught_send(socket, message):
44 """Send message by socket, catch broken socket connection error."""
46 plom_socket_io.send(socket, message)
47 except plom_socket_io.BrokenSocketConnection:
50 def send_queue_messages(socket, queue_in, thread_alive):
51 """Send messages via socket from queue_in while thread_alive[0]."""
52 while thread_alive[0]:
54 msg = queue_in.get(timeout=1)
57 caught_send(socket, msg)
60 print('CONNECTION FROM:', str(self.client_address))
61 connection_id = uuid.uuid4()
62 queue_in = queue.Queue()
63 self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
65 t = threading.Thread(target=send_queue_messages,
66 args=(self.request, queue_in, thread_alive))
68 for message in plom_socket_io.recv(self.request):
70 caught_send(self.request, 'BAD MESSAGE')
71 elif 'QUIT' == message:
72 caught_send(self.request, 'BYE')
75 self.server.queue_out.put(('COMMAND', connection_id, message))
76 self.server.queue_out.put(('KILL_QUEUE', connection_id))
77 thread_alive[0] = False
78 print('CONNECTION CLOSED FROM:', str(self.client_address))
84 def __init__(self, name, args=(), kwargs={}):
93 def __init__(self, position):
94 self.position = position
95 self.task = Task('wait')
100 def task_move(self, direction):
101 if direction == 'UP':
102 self.position[0] -= 1
103 elif direction == 'DOWN':
104 self.position[0] += 1
106 def decide_task(self):
107 self.set_task('wait')
109 def set_task(self, task, *args, **kwargs):
110 self.task = Task(task, args, kwargs)
112 def proceed(self, is_AI=True):
113 """Further the thing in its tasks.
115 Decrements .task.todo; if it thus falls to <= 0, enacts method whose
116 name is 'task_' + self.task.name and sets .task = None. If is_AI, calls
117 .decide_task to decide a self.task.
120 if self.task.todo <= 0:
121 task= getattr(self, 'task_' + self.task.name)
122 task(*self.task.args, **self.task.kwargs)
124 if is_AI and self.task is None:
132 self.map_size = (5, 5)
133 self.map_ = 'xxxxx\n'+\
138 self.things = [Thing(position=[3, 3]), Thing([1, 1])]
140 self.player = self.things[self.player_i]
144 """Calculate n-th Fibonacci number. Very inefficiently."""
148 return fib(n-1) + fib(n-2)
151 class ArgumentError(Exception):
155 class CommandHandler:
157 def __init__(self, queues_out):
158 from multiprocessing import Pool
159 self.queues_out = queues_out
161 # self.pool and self.pool_result are currently only needed by the FIB
162 # command and the demo of a parallelized game loop in cmd_inc_p.
164 self.pool_result = None
166 def send_to(self, connection_id, msg):
167 """Send msg to client of connection_id."""
168 self.queues_out[connection_id].put(msg)
170 def send_all(self, msg):
171 """Send msg to all clients."""
172 for connection_id in self.queues_out:
173 self.send_to(connection_id, msg)
175 def stringify_yx(self, tuple_):
176 """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
177 return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
179 def proceed_to_next_player_turn(self, connection_id):
180 """Run game world turns until player can decide their next step.
182 Sends a 'TURN_FINISHED' message, then iterates through all non-player
183 things, on each step furthering them in their tasks (and letting them
184 decide new ones if they finish). The iteration order is: first all
185 things that come after the player in the world things list, then (after
186 incrementing the world turn) all that come before the player; then the
187 player's .proceed() is run, and if it does not finish his task, the
188 loop starts at the beginning. Once the player's task is finished, the
189 loop breaks, and client-relevant game data is sent.
191 self.send_all('TURN_FINISHED ' + str(self.world.turn))
193 for thing in self.world.things[self.world.player_i+1:]:
196 for thing in self.world.things[:self.world.player_i]:
198 self.world.player.proceed(is_AI=False)
199 if self.world.player.task is None:
201 self.send_all('NEW_TURN ' + str(self.world.turn))
202 self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
203 self.send_all('TERRAIN\n' + self.world.map_)
204 self.send_all('POSITION ' + self.stringify_yx(self.world.player.position))
206 def cmd_fib(self, tokens, connection_id):
207 """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
209 Numbers are calculated in parallel as far as possible, using fib().
210 A 'CALCULATING …' message is sent to caller before the result.
213 raise ArgumentError('FIB NEEDS AT LEAST ONE ARGUMENT')
215 for token in tokens[1:]:
216 if token == '0' or not token.isdigit():
217 raise ArgumentError('FIB ARGUMENTS MUST BE INTEGERS > 0')
218 numbers += [int(token)]
219 self.send_to(connection_id, 'CALCULATING …')
220 results = self.pool.map(fib, numbers)
221 reply = ' '.join([str(r) for r in results])
222 self.send_to(connection_id, reply)
224 def cmd_inc_p(self, connection_id):
225 """Increment world.turn, send game turn data to everyone.
227 To simulate game processing waiting times, a one second delay between
228 TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
229 calculations are started as pool processes that need to be finished
230 until a further INC finishes the turn.
232 This is just a demo structure for how the game loop could work when
233 parallelized. One might imagine a two-step game turn, with a non-action
234 step determining actor tasks (the AI determinations would take the
235 place of the fib calculations here), and an action step wherein these
236 tasks are performed (where now sleep(1) is).
238 from time import sleep
239 if self.pool_result is not None:
240 self.pool_result.wait()
241 self.send_all('TURN_FINISHED ' + str(self.world.turn))
244 self.send_all('NEW_TURN ' + str(self.world.turn))
245 self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
246 self.send_all('TERRAIN\n' + self.world.map_)
247 self.send_all('POSITION ' + self.stringify_yx(self.world.player.position))
248 self.pool_result = self.pool.map_async(fib, (35, 35))
250 def cmd_get_turn(self, connection_id):
251 """Send world.turn to caller."""
252 self.send_to(connection_id, str(self.world.turn))
254 def cmd_move(self, direction, connection_id):
255 """Set player task to 'move' with direction arg, finish player turn."""
256 if not direction in {'UP', 'DOWN'}:
257 raise ArgumentError('MOVE ARGUMENT MUST BE "UP" or "DOWN"')
258 self.world.player.set_task('move', direction=direction)
259 self.proceed_to_next_player_turn(connection_id)
261 def cmd_wait(self, connection_id):
262 """Set player task to 'wait', finish player turn."""
263 self.world.player.set_task('wait')
264 self.proceed_to_next_player_turn(connection_id)
266 def cmd_echo(self, tokens, input_, connection_id):
267 """Send message in input_ beyond tokens[0] to caller."""
268 msg = input_[len(tokens[0]) + 1:]
269 self.send_to(connection_id, msg)
271 def cmd_all(self, tokens, input_):
272 """Send message in input_ beyond tokens[0] to all clients."""
273 msg = input_[len(tokens[0]) + 1:]
276 def handle_input(self, input_, connection_id):
277 """Process input_ to command grammar, call command handler if found."""
278 tokens = [token for token in input_.split(' ') if len(token) > 0]
281 self.send_to(connection_id, 'EMPTY COMMAND')
282 elif len(tokens) == 1 and tokens[0] == 'INC_P':
283 self.cmd_inc_p(connection_id)
284 elif len(tokens) == 1 and tokens[0] == 'GET_TURN':
285 self.cmd_get_turn(connection_id)
286 elif len(tokens) == 1 and tokens[0] == 'WAIT':
287 self.cmd_wait(connection_id)
288 elif len(tokens) == 2 and tokens[0] == 'MOVE':
289 self.cmd_move(tokens[1], connection_id)
290 elif len(tokens) >= 1 and tokens[0] == 'ECHO':
291 self.cmd_echo(tokens, input_, connection_id)
292 elif len(tokens) >= 1 and tokens[0] == 'ALL':
293 self.cmd_all(tokens, input_)
294 elif len(tokens) >= 1 and tokens[0] == 'FIB':
295 # TODO: Should this really block the whole loop?
296 self.cmd_fib(tokens, connection_id)
298 self.send_to(connection_id, 'UNKNOWN COMMAND')
299 except ArgumentError as e:
300 self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
304 """Handle commands coming through queue q, send results back.
306 Commands from q are expected to be tuples, with the first element either
307 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
308 an optional third element of arbitrary type. The UUID identifies a
309 receiver for replies.
311 An 'ADD_QUEUE' command should contain as third element a queue through
312 which to send messages back to the sender of the command. A 'KILL_QUEUE'
313 command removes the queue for that receiver from the list of queues through
314 which to send replies.
316 A 'COMMAND' command is specified in greater detail by a string that is the
317 tuple's third element. CommandHandler takes care of processing this and
321 command_handler = CommandHandler(queues_out)
326 content = None if len(x) == 2 else x[2]
327 if command_type == 'ADD_QUEUE':
328 queues_out[connection_id] = content
329 elif command_type == 'COMMAND':
330 command_handler.handle_input(content, connection_id)
331 elif command_type == 'KILL_QUEUE':
332 del queues_out[connection_id]
336 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
338 server = Server(q, ('localhost', 5000), IO_Handler)
340 server.serve_forever()
341 except KeyboardInterrupt:
344 print('Killing server')
345 server.server_close()