7 # Avoid "Address already in use" errors.
8 socketserver.TCPServer.allow_reuse_address = True
11 class Server(socketserver.ThreadingTCPServer):
12 """Bind together threaded IO handling server and message queue."""
14 def __init__(self, queue, *args, **kwargs):
15 super().__init__(*args, **kwargs)
16 self.queue_out = queue
17 self.daemon_threads = True # Else, server's threads have daemon=False.
20 class IO_Handler(socketserver.BaseRequestHandler):
23 """Move messages between network socket and main thread via queues.
25 On start, sets up new queue, sends it via self.server.queue_out to
26 main thread, and from then on receives messages to send back from the
27 main thread via that new queue.
29 At the same time, loops over socket's recv to get messages from the
30 outside via self.server.queue_out into the main thread. Ends connection
31 once a 'QUIT' message is received from socket, and then also kills its
34 All messages to the main thread are tuples, with the first element a
35 meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
36 deletion, and 'COMMAND' for everything else), the second element a UUID
37 that uniquely identifies the thread (so that the main thread knows whom
38 to send replies back to), and optionally a third element for further
43 def caught_send(socket, message):
44 """Send message by socket, catch broken socket connection error."""
46 plom_socket_io.send(socket, message)
47 except plom_socket_io.BrokenSocketConnection:
50 def send_queue_messages(socket, queue_in, thread_alive):
51 """Send messages via socket from queue_in while thread_alive[0]."""
52 while thread_alive[0]:
54 msg = queue_in.get(timeout=1)
57 caught_send(socket, msg)
60 print('CONNECTION FROM:', str(self.client_address))
61 connection_id = uuid.uuid4()
62 queue_in = queue.Queue()
63 self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
65 t = threading.Thread(target=send_queue_messages,
66 args=(self.request, queue_in, thread_alive))
68 for message in plom_socket_io.recv(self.request):
70 caught_send(self.request, 'BAD MESSAGE')
71 elif 'QUIT' == message:
72 caught_send(self.request, 'BYE')
75 self.server.queue_out.put(('COMMAND', connection_id, message))
76 self.server.queue_out.put(('KILL_QUEUE', connection_id))
77 thread_alive[0] = False
78 print('CONNECTION CLOSED FROM:', str(self.client_address))
84 def __init__(self, name, args=(), kwargs={}):
93 def __init__(self, type_, position):
95 self.position = position
96 self.task = Task('wait')
101 def task_move(self, direction):
102 if direction == 'UP':
103 self.position[0] -= 1
104 elif direction == 'DOWN':
105 self.position[0] += 1
106 elif direction == 'RIGHT':
107 self.position[1] += 1
108 elif direction == 'LEFT':
109 self.position[1] -= 1
111 def decide_task(self):
112 if self.position[1] > 1:
113 self.set_task('move', 'LEFT')
114 elif self.position[1] < 3:
115 self.set_task('move', 'RIGHT')
117 self.set_task('wait')
119 def set_task(self, task, *args, **kwargs):
120 self.task = Task(task, args, kwargs)
122 def proceed(self, is_AI=True):
123 """Further the thing in its tasks.
125 Decrements .task.todo; if it thus falls to <= 0, enacts method whose
126 name is 'task_' + self.task.name and sets .task = None. If is_AI, calls
127 .decide_task to decide a self.task.
130 if self.task.todo <= 0:
131 task = getattr(self, 'task_' + self.task.name)
132 task(*self.task.args, **self.task.kwargs)
134 if is_AI and self.task is None:
142 self.map_size = (5, 5)
143 self.map_ = 'xxxxx\n' +\
148 self.things = [Thing('human', [3, 3]), Thing('monster', [1, 1])]
150 self.player = self.things[self.player_i]
154 """Calculate n-th Fibonacci number. Very inefficiently."""
158 return fib(n-1) + fib(n-2)
161 class ArgumentError(Exception):
165 class CommandHandler:
167 def __init__(self, queues_out):
168 from multiprocessing import Pool
169 self.queues_out = queues_out
171 # self.pool and self.pool_result are currently only needed by the FIB
172 # command and the demo of a parallelized game loop in cmd_inc_p.
174 self.pool_result = None
176 def send_to(self, connection_id, msg):
177 """Send msg to client of connection_id."""
178 self.queues_out[connection_id].put(msg)
180 def send_all(self, msg):
181 """Send msg to all clients."""
182 for connection_id in self.queues_out:
183 self.send_to(connection_id, msg)
185 def stringify_yx(self, tuple_):
186 """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
187 return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
189 def quoted(self, string):
190 """Quote and escape string so client interprets it as single token."""
198 return ''.join(quoted)
200 def proceed_to_next_player_turn(self, connection_id):
201 """Run game world turns until player can decide their next step.
203 Sends a 'TURN_FINISHED' message, then iterates through all non-player
204 things, on each step furthering them in their tasks (and letting them
205 decide new ones if they finish). The iteration order is: first all
206 things that come after the player in the world things list, then (after
207 incrementing the world turn) all that come before the player; then the
208 player's .proceed() is run, and if it does not finish his task, the
209 loop starts at the beginning. Once the player's task is finished, the
210 loop breaks, and client-relevant game data is sent.
212 self.send_all('TURN_FINISHED ' + str(self.world.turn))
214 for thing in self.world.things[self.world.player_i+1:]:
217 for thing in self.world.things[:self.world.player_i]:
219 self.world.player.proceed(is_AI=False)
220 if self.world.player.task is None:
222 self.send_all('NEW_TURN ' + str(self.world.turn))
223 self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
224 self.send_all('TERRAIN\n' + self.quoted(self.world.map_))
225 for thing in self.world.things:
226 self.send_all('THING TYPE:' + thing.type + ' '
227 + self.stringify_yx(thing.position))
229 def cmd_fib(self, tokens, connection_id):
230 """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
232 Numbers are calculated in parallel as far as possible, using fib().
233 A 'CALCULATING …' message is sent to caller before the result.
236 raise ArgumentError('FIB NEEDS AT LEAST ONE ARGUMENT')
238 for token in tokens[1:]:
239 if token == '0' or not token.isdigit():
240 raise ArgumentError('FIB ARGUMENTS MUST BE INTEGERS > 0')
241 numbers += [int(token)]
242 self.send_to(connection_id, 'CALCULATING …')
243 results = self.pool.map(fib, numbers)
244 reply = ' '.join([str(r) for r in results])
245 self.send_to(connection_id, reply)
247 def cmd_inc_p(self, connection_id):
248 """Increment world.turn, send game turn data to everyone.
250 To simulate game processing waiting times, a one second delay between
251 TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
252 calculations are started as pool processes that need to be finished
253 until a further INC finishes the turn.
255 This is just a demo structure for how the game loop could work when
256 parallelized. One might imagine a two-step game turn, with a non-action
257 step determining actor tasks (the AI determinations would take the
258 place of the fib calculations here), and an action step wherein these
259 tasks are performed (where now sleep(1) is).
261 from time import sleep
262 if self.pool_result is not None:
263 self.pool_result.wait()
264 self.send_all('TURN_FINISHED ' + str(self.world.turn))
267 self.send_all('NEW_TURN ' + str(self.world.turn))
268 self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
269 self.send_all('TERRAIN\n' + self.quoted(self.world.map_))
270 for thing in self.world.things:
271 self.send_all('THING TYPE:' + thing.type + ' '
272 + self.stringify_yx(thing.position))
273 self.pool_result = self.pool.map_async(fib, (35, 35))
275 def cmd_get_turn(self, connection_id):
276 """Send world.turn to caller."""
277 self.send_to(connection_id, str(self.world.turn))
279 def cmd_move(self, direction, connection_id):
280 """Set player task to 'move' with direction arg, finish player turn."""
281 if direction not in {'UP', 'DOWN', 'RIGHT', 'LEFT'}:
282 raise ArgumentError('MOVE ARGUMENT MUST BE ONE OF: '
283 'UP, DOWN, RIGHT, LEFT')
284 self.world.player.set_task('move', direction=direction)
285 self.proceed_to_next_player_turn(connection_id)
287 def cmd_wait(self, connection_id):
288 """Set player task to 'wait', finish player turn."""
289 self.world.player.set_task('wait')
290 self.proceed_to_next_player_turn(connection_id)
292 def cmd_echo(self, tokens, input_, connection_id):
293 """Send message in input_ beyond tokens[0] to caller."""
294 msg = input_[len(tokens[0]) + 1:]
295 self.send_to(connection_id, msg)
297 def cmd_all(self, tokens, input_):
298 """Send message in input_ beyond tokens[0] to all clients."""
299 msg = input_[len(tokens[0]) + 1:]
302 def handle_input(self, input_, connection_id):
303 """Process input_ to command grammar, call command handler if found."""
304 tokens = [token for token in input_.split(' ') if len(token) > 0]
307 self.send_to(connection_id, 'EMPTY COMMAND')
308 elif len(tokens) == 1 and tokens[0] == 'INC_P':
309 self.cmd_inc_p(connection_id)
310 elif len(tokens) == 1 and tokens[0] == 'GET_TURN':
311 self.cmd_get_turn(connection_id)
312 elif len(tokens) == 1 and tokens[0] == 'WAIT':
313 self.cmd_wait(connection_id)
314 elif len(tokens) == 2 and tokens[0] == 'MOVE':
315 self.cmd_move(tokens[1], connection_id)
316 elif len(tokens) >= 1 and tokens[0] == 'ECHO':
317 self.cmd_echo(tokens, input_, connection_id)
318 elif len(tokens) >= 1 and tokens[0] == 'ALL':
319 self.cmd_all(tokens, input_)
320 elif len(tokens) >= 1 and tokens[0] == 'FIB':
321 # TODO: Should this really block the whole loop?
322 self.cmd_fib(tokens, connection_id)
324 self.send_to(connection_id, 'UNKNOWN COMMAND')
325 except ArgumentError as e:
326 self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
330 """Handle commands coming through queue q, send results back.
332 Commands from q are expected to be tuples, with the first element either
333 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
334 an optional third element of arbitrary type. The UUID identifies a
335 receiver for replies.
337 An 'ADD_QUEUE' command should contain as third element a queue through
338 which to send messages back to the sender of the command. A 'KILL_QUEUE'
339 command removes the queue for that receiver from the list of queues through
340 which to send replies.
342 A 'COMMAND' command is specified in greater detail by a string that is the
343 tuple's third element. CommandHandler takes care of processing this and
347 command_handler = CommandHandler(queues_out)
352 content = None if len(x) == 2 else x[2]
353 if command_type == 'ADD_QUEUE':
354 queues_out[connection_id] = content
355 elif command_type == 'COMMAND':
356 command_handler.handle_input(content, connection_id)
357 elif command_type == 'KILL_QUEUE':
358 del queues_out[connection_id]
362 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
364 server = Server(q, ('localhost', 5000), IO_Handler)
366 server.serve_forever()
367 except KeyboardInterrupt:
370 print('Killing server')
371 server.server_close()