6 from parser import ArgError, Parser
7 from server_.game import World, GameError
10 # Avoid "Address already in use" errors.
11 socketserver.TCPServer.allow_reuse_address = True
14 class Server(socketserver.ThreadingTCPServer):
15 """Bind together threaded IO handling server and message queue."""
17 def __init__(self, queue, *args, **kwargs):
18 super().__init__(*args, **kwargs)
19 self.queue_out = queue
20 self.daemon_threads = True # Else, server's threads have daemon=False.
23 class IO_Handler(socketserver.BaseRequestHandler):
26 """Move messages between network socket and main thread via queues.
28 On start, sets up new queue, sends it via self.server.queue_out to
29 main thread, and from then on receives messages to send back from the
30 main thread via that new queue.
32 At the same time, loops over socket's recv to get messages from the
33 outside via self.server.queue_out into the main thread. Ends connection
34 once a 'QUIT' message is received from socket, and then also kills its
37 All messages to the main thread are tuples, with the first element a
38 meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
39 deletion, and 'COMMAND' for everything else), the second element a UUID
40 that uniquely identifies the thread (so that the main thread knows whom
41 to send replies back to), and optionally a third element for further
46 def caught_send(socket, message):
47 """Send message by socket, catch broken socket connection error."""
49 plom_socket_io.send(socket, message)
50 except plom_socket_io.BrokenSocketConnection:
53 def send_queue_messages(socket, queue_in, thread_alive):
54 """Send messages via socket from queue_in while thread_alive[0]."""
55 while thread_alive[0]:
57 msg = queue_in.get(timeout=1)
60 caught_send(socket, msg)
63 print('CONNECTION FROM:', str(self.client_address))
64 connection_id = uuid.uuid4()
65 queue_in = queue.Queue()
66 self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
68 t = threading.Thread(target=send_queue_messages,
69 args=(self.request, queue_in, thread_alive))
71 for message in plom_socket_io.recv(self.request):
73 caught_send(self.request, 'BAD MESSAGE')
74 elif 'QUIT' == message:
75 caught_send(self.request, 'BYE')
78 self.server.queue_out.put(('COMMAND', connection_id, message))
79 self.server.queue_out.put(('KILL_QUEUE', connection_id))
80 thread_alive[0] = False
81 print('CONNECTION CLOSED FROM:', str(self.client_address))
86 """Calculate n-th Fibonacci number. Very inefficiently."""
90 return fib(n-1) + fib(n-2)
95 def __init__(self, queues_out):
96 from multiprocessing import Pool
97 self.queues_out = queues_out
99 self.parser = Parser(self)
100 # self.pool and self.pool_result are currently only needed by the FIB
101 # command and the demo of a parallelized game loop in cmd_inc_p.
103 self.pool_result = None
105 def handle_input(self, input_, connection_id):
106 """Process input_ to command grammar, call command handler if found."""
108 command = self.parser.parse(input_)
110 self.send_to(connection_id, 'UNHANDLED INPUT')
112 command(connection_id=connection_id)
113 except ArgError as e:
114 self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
115 except GameError as e:
116 self.send_to(connection_id, 'GAME ERROR: ' + str(e))
118 def send_to(self, connection_id, msg):
119 """Send msg to client of connection_id."""
120 self.queues_out[connection_id].put(msg)
122 def send_all(self, msg):
123 """Send msg to all clients."""
124 for connection_id in self.queues_out:
125 self.send_to(connection_id, msg)
127 def stringify_yx(self, tuple_):
128 """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
129 return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
131 def quoted(self, string):
132 """Quote and escape string so client interprets it as single token."""
140 return ''.join(quoted)
142 def quoted_map(self, map_string, map_width):
143 """Put \n into map_string at map_width intervals, return quoted whole."""
145 map_size = len(map_string)
147 while start_cut < map_size:
148 limit = start_cut + map_width
149 map_lines += [map_string[start_cut:limit]]
151 return self.quoted("\n".join(map_lines))
153 def send_all_gamestate(self):
154 """Send out game state data relevant to clients."""
155 self.send_all('NEW_TURN ' + str(self.world.turn))
156 self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
157 self.send_all('TERRAIN\n' + self.quoted_map(self.world.map_,
158 self.world.map_size[1]))
159 for thing in self.world.things:
160 self.send_all('THING TYPE:' + thing.type_ + ' '
161 + self.stringify_yx(thing.position))
163 def proceed_to_next_player_turn(self, connection_id):
164 """Run game world turns until player can decide their next step.
166 Sends a 'TURN_FINISHED' message, then iterates through all non-player
167 things, on each step furthering them in their tasks (and letting them
168 decide new ones if they finish). The iteration order is: first all
169 things that come after the player in the world things list, then (after
170 incrementing the world turn) all that come before the player; then the
171 player's .proceed() is run, and if it does not finish his task, the
172 loop starts at the beginning. Once the player's task is finished, the
173 loop breaks, and client-relevant game data is sent.
175 self.send_all('TURN_FINISHED ' + str(self.world.turn))
177 for thing in self.world.things[self.world.player_i+1:]:
180 for thing in self.world.things[:self.world.player_i]:
182 self.world.player.proceed(is_AI=False)
183 if self.world.player.task is None:
185 self.send_all_gamestate()
187 def cmd_MOVE(self, direction, connection_id):
188 """Set player task to 'move' with direction arg, finish player turn."""
189 if direction not in {'UP', 'DOWN', 'RIGHT', 'LEFT'}:
190 raise ArgError('Move argument must be one of: '
191 'UP, DOWN, RIGHT, LEFT')
192 self.world.player.set_task('move', direction=direction)
193 self.proceed_to_next_player_turn(connection_id)
194 cmd_MOVE.argtypes = 'string'
196 def cmd_WAIT(self, connection_id):
197 """Set player task to 'wait', finish player turn."""
198 self.world.player.set_task('wait')
199 self.proceed_to_next_player_turn(connection_id)
201 def cmd_GET_TURN(self, connection_id):
202 """Send world.turn to caller."""
203 self.send_to(connection_id, str(self.world.turn))
205 def cmd_ECHO(self, msg, connection_id):
206 """Send msg to caller."""
207 self.send_to(connection_id, msg)
208 cmd_ECHO.argtypes = 'string'
210 def cmd_ALL(self, msg, connection_id):
211 """Send msg to all clients."""
213 cmd_ALL.argtypes = 'string'
215 def cmd_FIB(self, numbers, connection_id):
216 """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
218 Numbers are calculated in parallel as far as possible, using fib().
219 A 'CALCULATING …' message is sent to caller before the result.
221 self.send_to(connection_id, 'CALCULATING …')
222 results = self.pool.map(fib, numbers)
223 reply = ' '.join([str(r) for r in results])
224 self.send_to(connection_id, reply)
225 cmd_FIB.argtypes = 'seq:int:nonneg'
227 def cmd_INC_P(self, connection_id):
228 """Increment world.turn, send game turn data to everyone.
230 To simulate game processing waiting times, a one second delay between
231 TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
232 calculations are started as pool processes that need to be finished
233 until a further INC finishes the turn.
235 This is just a demo structure for how the game loop could work when
236 parallelized. One might imagine a two-step game turn, with a non-action
237 step determining actor tasks (the AI determinations would take the
238 place of the fib calculations here), and an action step wherein these
239 tasks are performed (where now sleep(1) is).
241 from time import sleep
242 if self.pool_result is not None:
243 self.pool_result.wait()
244 self.send_all('TURN_FINISHED ' + str(self.world.turn))
247 self.send_all_gamestate()
248 self.pool_result = self.pool.map_async(fib, (35, 35))
252 """Handle commands coming through queue q, send results back.
254 Commands from q are expected to be tuples, with the first element either
255 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
256 an optional third element of arbitrary type. The UUID identifies a
257 receiver for replies.
259 An 'ADD_QUEUE' command should contain as third element a queue through
260 which to send messages back to the sender of the command. A 'KILL_QUEUE'
261 command removes the queue for that receiver from the list of queues through
262 which to send replies.
264 A 'COMMAND' command is specified in greater detail by a string that is the
265 tuple's third element. CommandHandler takes care of processing this and
269 command_handler = CommandHandler(queues_out)
274 content = None if len(x) == 2 else x[2]
275 if command_type == 'ADD_QUEUE':
276 queues_out[connection_id] = content
277 elif command_type == 'COMMAND':
278 command_handler.handle_input(content, connection_id)
279 elif command_type == 'KILL_QUEUE':
280 del queues_out[connection_id]
284 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
286 server = Server(q, ('localhost', 5000), IO_Handler)
288 server.serve_forever()
289 except KeyboardInterrupt:
292 print('Killing server')
293 server.server_close()