13 # Avoid "Address already in use" errors.
14 socketserver.TCPServer.allow_reuse_address = True
17 class Server(socketserver.ThreadingTCPServer):
18 """Bind together threaded IO handling server and message queue."""
20 def __init__(self, queue, *args, **kwargs):
21 super().__init__(*args, **kwargs)
22 self.queue_out = queue
23 self.daemon_threads = True # Else, server's threads have daemon=False.
26 class IO_Handler(socketserver.BaseRequestHandler):
29 """Move messages between network socket and main thread via queues.
31 On start, sets up new queue, sends it via self.server.queue_out to
32 main thread, and from then on receives messages to send back from the
33 main thread via that new queue.
35 At the same time, loops over socket's recv to get messages from the
36 outside via self.server.queue_out into the main thread. Ends connection
37 once a 'QUIT' message is received from socket, and then also kills its
40 All messages to the main thread are tuples, with the first element a
41 meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
42 deletion, and 'COMMAND' for everything else), the second element a UUID
43 that uniquely identifies the thread (so that the main thread knows whom
44 to send replies back to), and optionally a third element for further
49 def caught_send(socket, message):
50 """Send message by socket, catch broken socket connection error."""
52 plom_socket_io.send(socket, message)
53 except plom_socket_io.BrokenSocketConnection:
56 def send_queue_messages(socket, queue_in, thread_alive):
57 """Send messages via socket from queue_in while thread_alive[0]."""
58 while thread_alive[0]:
60 msg = queue_in.get(timeout=1)
63 caught_send(socket, msg)
66 print('CONNECTION FROM:', str(self.client_address))
67 connection_id = uuid.uuid4()
68 queue_in = queue.Queue()
69 self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
71 t = threading.Thread(target=send_queue_messages,
72 args=(self.request, queue_in, thread_alive))
74 for message in plom_socket_io.recv(self.request):
76 caught_send(self.request, 'BAD MESSAGE')
77 elif 'QUIT' == message:
78 caught_send(self.request, 'BYE')
81 self.server.queue_out.put(('COMMAND', connection_id, message))
82 self.server.queue_out.put(('KILL_QUEUE', connection_id))
83 thread_alive[0] = False
84 print('CONNECTION CLOSED FROM:', str(self.client_address))
89 """Calculate n-th Fibonacci number. Very inefficiently."""
93 return fib(n-1) + fib(n-2)
96 class CommandHandler(game_common.Commander, server_.game.Commander):
98 def __init__(self, game_file_name):
100 self.world = server_.game.World()
101 self.parser = parser.Parser(self)
102 self.game_file_name = game_file_name
103 # self.pool and self.pool_result are currently only needed by the FIB
104 # command and the demo of a parallelized game loop in cmd_inc_p.
105 from multiprocessing import Pool
107 self.pool_result = None
109 def handle_input(self, input_, connection_id=None, abort_on_error=False,
111 """Process input_ to command grammar, call command handler if found."""
112 from inspect import signature
114 command = self.parser.parse(input_)
116 self.send_to(connection_id, 'UNHANDLED INPUT')
118 if 'connection_id' in list(signature(command).parameters):
119 command(connection_id=connection_id)
123 with open(self.game_file_name, 'a') as f:
124 f.write(input_ + '\n')
125 except parser.ArgError as e:
126 self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
129 except server_.game.GameError as e:
130 self.send_to(connection_id, 'GAME ERROR: ' + str(e))
134 def send_to(self, connection_id, msg):
135 """Send msg to client of connection_id; if no later, print instead."""
137 self.queues_out[connection_id].put(msg)
141 def send_all(self, msg):
142 """Send msg to all clients."""
143 for connection_id in self.queues_out:
144 self.send_to(connection_id, msg)
146 def send_all_gamestate(self):
147 """Send out game state data relevant to clients."""
149 def stringify_yx(tuple_):
150 """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
151 return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
154 """Quote & escape string so client interprets it as single token."""
162 return ''.join(quoted)
164 self.send_all('NEW_TURN ' + str(self.world.turn))
165 self.send_all('MAP_SIZE ' + stringify_yx(self.world.map_size))
166 for y in range(self.world.map_size[0]):
167 width = self.world.map_size[1]
168 terrain_line = self.world.terrain_map[y * width:(y + 1) * width]
169 self.send_all('TERRAIN_LINE %5s %s' % (y, quoted(terrain_line)))
170 for thing in self.world.things:
171 self.send_all('THING_TYPE %s %s' % (thing.id_, thing.type_))
172 self.send_all('THING_POS %s %s' % (thing.id_,
173 stringify_yx(thing.position)))
176 """Send turn finish signal, run game world, send new world data.
178 First sends 'TURN_FINISHED' message, then runs game world
179 until new player input is needed, then sends game state.
181 self.send_all('TURN_FINISHED ' + str(self.world.turn))
182 self.world.proceed_to_next_player_turn()
183 msg = str(self.world.get_player().last_task_result)
184 self.send_all('LAST_PLAYER_TASK_RESULT ' + msg)
185 self.send_all_gamestate()
187 def cmd_FIB(self, numbers, connection_id):
188 """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
190 Numbers are calculated in parallel as far as possible, using fib().
191 A 'CALCULATING …' message is sent to caller before the result.
193 self.send_to(connection_id, 'CALCULATING …')
194 results = self.pool.map(fib, numbers)
195 reply = ' '.join([str(r) for r in results])
196 self.send_to(connection_id, reply)
197 cmd_FIB.argtypes = 'seq:int:nonneg'
199 def cmd_INC_P(self, connection_id):
200 """Increment world.turn, send game turn data to everyone.
202 To simulate game processing waiting times, a one second delay between
203 TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
204 calculations are started as pool processes that need to be finished
205 until a further INC finishes the turn.
207 This is just a demo structure for how the game loop could work when
208 parallelized. One might imagine a two-step game turn, with a non-action
209 step determining actor tasks (the AI determinations would take the
210 place of the fib calculations here), and an action step wherein these
211 tasks are performed (where now sleep(1) is).
213 from time import sleep
214 if self.pool_result is not None:
215 self.pool_result.wait()
216 self.send_all('TURN_FINISHED ' + str(self.world.turn))
219 self.send_all_gamestate()
220 self.pool_result = self.pool.map_async(fib, (35, 35))
223 def io_loop(q, commander):
224 """Handle commands coming through queue q, send results back.
226 Commands from q are expected to be tuples, with the first element either
227 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
228 an optional third element of arbitrary type. The UUID identifies a
229 receiver for replies.
231 An 'ADD_QUEUE' command should contain as third element a queue through
232 which to send messages back to the sender of the command. A 'KILL_QUEUE'
233 command removes the queue for that receiver from the list of queues through
234 which to send replies.
236 A 'COMMAND' command is specified in greater detail by a string that is the
237 tuple's third element. The commander CommandHandler takes care of processing
238 this and sending out replies.
244 content = None if len(x) == 2 else x[2]
245 if command_type == 'ADD_QUEUE':
246 commander.queues_out[connection_id] = content
247 elif command_type == 'COMMAND':
248 commander.handle_input(content, connection_id)
249 elif command_type == 'KILL_QUEUE':
250 del commander.queues_out[connection_id]
253 if len(sys.argv) != 2:
254 print('wrong number of arguments, expected one (game file)')
256 game_file_name = sys.argv[1]
257 commander = CommandHandler(game_file_name)
258 if os.path.exists(game_file_name):
259 if not os.path.isfile(game_file_name):
260 print('game file name does not refer to a valid game file')
262 with open(game_file_name, 'r') as f:
263 lines = f.readlines()
264 for i in range(len(lines)):
266 print("FILE INPUT LINE %s: %s" % (i, line), end='')
267 commander.handle_input(line, abort_on_error=True, store=False)
269 commander.handle_input('MAP_SIZE Y:5,X:5')
270 commander.handle_input('TERRAIN_LINE 0 "xxxxx"')
271 commander.handle_input('TERRAIN_LINE 1 "x...x"')
272 commander.handle_input('TERRAIN_LINE 2 "x.X.x"')
273 commander.handle_input('TERRAIN_LINE 3 "x...x"')
274 commander.handle_input('TERRAIN_LINE 4 "xxxxx"')
275 commander.handle_input('THING_TYPE 0 human')
276 commander.handle_input('THING_POS 0 Y:3,X:3')
277 commander.handle_input('THING_TYPE 1 monster')
278 commander.handle_input('THING_POS 1 Y:1,X:1')
280 c = threading.Thread(target=io_loop, daemon=True, args=(q, commander))
282 server = Server(q, ('localhost', 5000), IO_Handler)
284 server.serve_forever()
285 except KeyboardInterrupt:
288 print('Killing server')
289 server.server_close()