6 from parser import ArgError, Parser
9 # Avoid "Address already in use" errors.
10 socketserver.TCPServer.allow_reuse_address = True
13 class GameError(Exception):
17 class Server(socketserver.ThreadingTCPServer):
18 """Bind together threaded IO handling server and message queue."""
20 def __init__(self, queue, *args, **kwargs):
21 super().__init__(*args, **kwargs)
22 self.queue_out = queue
23 self.daemon_threads = True # Else, server's threads have daemon=False.
26 class IO_Handler(socketserver.BaseRequestHandler):
29 """Move messages between network socket and main thread via queues.
31 On start, sets up new queue, sends it via self.server.queue_out to
32 main thread, and from then on receives messages to send back from the
33 main thread via that new queue.
35 At the same time, loops over socket's recv to get messages from the
36 outside via self.server.queue_out into the main thread. Ends connection
37 once a 'QUIT' message is received from socket, and then also kills its
40 All messages to the main thread are tuples, with the first element a
41 meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
42 deletion, and 'COMMAND' for everything else), the second element a UUID
43 that uniquely identifies the thread (so that the main thread knows whom
44 to send replies back to), and optionally a third element for further
49 def caught_send(socket, message):
50 """Send message by socket, catch broken socket connection error."""
52 plom_socket_io.send(socket, message)
53 except plom_socket_io.BrokenSocketConnection:
56 def send_queue_messages(socket, queue_in, thread_alive):
57 """Send messages via socket from queue_in while thread_alive[0]."""
58 while thread_alive[0]:
60 msg = queue_in.get(timeout=1)
63 caught_send(socket, msg)
66 print('CONNECTION FROM:', str(self.client_address))
67 connection_id = uuid.uuid4()
68 queue_in = queue.Queue()
69 self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
71 t = threading.Thread(target=send_queue_messages,
72 args=(self.request, queue_in, thread_alive))
74 for message in plom_socket_io.recv(self.request):
76 caught_send(self.request, 'BAD MESSAGE')
77 elif 'QUIT' == message:
78 caught_send(self.request, 'BYE')
81 self.server.queue_out.put(('COMMAND', connection_id, message))
82 self.server.queue_out.put(('KILL_QUEUE', connection_id))
83 thread_alive[0] = False
84 print('CONNECTION CLOSED FROM:', str(self.client_address))
90 def __init__(self, name, args=(), kwargs={}):
99 def __init__(self, world, type_, position):
102 self.position = position
103 self.task = Task('wait')
108 def task_move(self, direction):
109 if direction == 'UP':
110 self.position[0] -= 1
111 elif direction == 'DOWN':
112 self.position[0] += 1
113 elif direction == 'RIGHT':
114 self.position[1] += 1
115 elif direction == 'LEFT':
116 self.position[1] -= 1
118 def decide_task(self):
119 if self.position[1] > 1:
120 self.set_task('move', 'LEFT')
121 elif self.position[1] < 3:
122 self.set_task('move', 'RIGHT')
124 self.set_task('wait')
126 def check_task(self, task, *args, **kwargs):
131 direction = kwargs['direction']
132 test_pos = self.position[:]
133 if direction == 'UP':
135 elif direction == 'DOWN':
137 elif direction == 'RIGHT':
139 elif direction == 'LEFT':
141 if test_pos[0] < 0 or test_pos[1] < 0 or \
142 test_pos[0] >= self.world.map_size[0] or \
143 test_pos[1] >= self.world.map_size[1]:
144 raise GameError('would move outside map bounds')
145 pos_i = test_pos[0] * self.world.map_size[1] + test_pos[1]
146 map_tile = self.world.map_[pos_i]
148 raise GameError('would move into illegal terrain')
150 def set_task(self, task, *args, **kwargs):
151 self.check_task(task, *args, **kwargs)
152 self.task = Task(task, args, kwargs)
154 def proceed(self, is_AI=True):
155 """Further the thing in its tasks.
157 Decrements .task.todo; if it thus falls to <= 0, enacts method whose
158 name is 'task_' + self.task.name and sets .task = None. If is_AI, calls
159 .decide_task to decide a self.task.
162 if self.task.todo <= 0:
163 task = getattr(self, 'task_' + self.task.name)
164 task(*self.task.args, **self.task.kwargs)
166 if is_AI and self.task is None:
174 self.map_size = (5, 5)
175 self.map_ = 'xxxxx' +\
181 Thing(self, 'human', [3, 3]),
182 Thing(self, 'monster', [1, 1])
185 self.player = self.things[self.player_i]
189 """Calculate n-th Fibonacci number. Very inefficiently."""
193 return fib(n-1) + fib(n-2)
196 class CommandHandler:
198 def __init__(self, queues_out):
199 from multiprocessing import Pool
200 self.queues_out = queues_out
202 self.parser = Parser(self)
203 # self.pool and self.pool_result are currently only needed by the FIB
204 # command and the demo of a parallelized game loop in cmd_inc_p.
206 self.pool_result = None
208 def handle_input(self, input_, connection_id):
209 """Process input_ to command grammar, call command handler if found."""
211 command = self.parser.parse(input_)
213 self.send_to(connection_id, 'UNHANDLED INPUT')
215 command(connection_id=connection_id)
216 except ArgError as e:
217 self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
218 except GameError as e:
219 self.send_to(connection_id, 'GAME ERROR: ' + str(e))
221 def send_to(self, connection_id, msg):
222 """Send msg to client of connection_id."""
223 self.queues_out[connection_id].put(msg)
225 def send_all(self, msg):
226 """Send msg to all clients."""
227 for connection_id in self.queues_out:
228 self.send_to(connection_id, msg)
230 def stringify_yx(self, tuple_):
231 """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
232 return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
234 def quoted(self, string):
235 """Quote and escape string so client interprets it as single token."""
243 return ''.join(quoted)
245 def quoted_map(self, map_string, map_width):
246 """Put \n into map_string at map_width intervals, return quoted whole."""
248 map_size = len(map_string)
250 while start_cut < map_size:
251 limit = start_cut + map_width
252 map_lines += [map_string[start_cut:limit]]
254 return self.quoted("\n".join(map_lines))
256 def send_all_gamestate(self):
257 """Send out game state data relevant to clients."""
258 self.send_all('NEW_TURN ' + str(self.world.turn))
259 self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
260 self.send_all('TERRAIN\n' + self.quoted_map(self.world.map_,
261 self.world.map_size[1]))
262 for thing in self.world.things:
263 self.send_all('THING TYPE:' + thing.type_ + ' '
264 + self.stringify_yx(thing.position))
266 def proceed_to_next_player_turn(self, connection_id):
267 """Run game world turns until player can decide their next step.
269 Sends a 'TURN_FINISHED' message, then iterates through all non-player
270 things, on each step furthering them in their tasks (and letting them
271 decide new ones if they finish). The iteration order is: first all
272 things that come after the player in the world things list, then (after
273 incrementing the world turn) all that come before the player; then the
274 player's .proceed() is run, and if it does not finish his task, the
275 loop starts at the beginning. Once the player's task is finished, the
276 loop breaks, and client-relevant game data is sent.
278 self.send_all('TURN_FINISHED ' + str(self.world.turn))
280 for thing in self.world.things[self.world.player_i+1:]:
283 for thing in self.world.things[:self.world.player_i]:
285 self.world.player.proceed(is_AI=False)
286 if self.world.player.task is None:
288 self.send_all_gamestate()
290 def cmd_MOVE(self, direction, connection_id):
291 """Set player task to 'move' with direction arg, finish player turn."""
292 if direction not in {'UP', 'DOWN', 'RIGHT', 'LEFT'}:
293 raise ArgError('Move argument must be one of: '
294 'UP, DOWN, RIGHT, LEFT')
295 self.world.player.set_task('move', direction=direction)
296 self.proceed_to_next_player_turn(connection_id)
297 cmd_MOVE.argtypes = 'string'
299 def cmd_WAIT(self, connection_id):
300 """Set player task to 'wait', finish player turn."""
301 self.world.player.set_task('wait')
302 self.proceed_to_next_player_turn(connection_id)
304 def cmd_GET_TURN(self, connection_id):
305 """Send world.turn to caller."""
306 self.send_to(connection_id, str(self.world.turn))
308 def cmd_ECHO(self, msg, connection_id):
309 """Send msg to caller."""
310 self.send_to(connection_id, msg)
311 cmd_ECHO.argtypes = 'string'
313 def cmd_ALL(self, msg, connection_id):
314 """Send msg to all clients."""
316 cmd_ALL.argtypes = 'string'
318 def cmd_FIB(self, numbers, connection_id):
319 """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
321 Numbers are calculated in parallel as far as possible, using fib().
322 A 'CALCULATING …' message is sent to caller before the result.
324 self.send_to(connection_id, 'CALCULATING …')
325 results = self.pool.map(fib, numbers)
326 reply = ' '.join([str(r) for r in results])
327 self.send_to(connection_id, reply)
328 cmd_FIB.argtypes = 'seq:int:nonneg'
330 def cmd_INC_P(self, connection_id):
331 """Increment world.turn, send game turn data to everyone.
333 To simulate game processing waiting times, a one second delay between
334 TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
335 calculations are started as pool processes that need to be finished
336 until a further INC finishes the turn.
338 This is just a demo structure for how the game loop could work when
339 parallelized. One might imagine a two-step game turn, with a non-action
340 step determining actor tasks (the AI determinations would take the
341 place of the fib calculations here), and an action step wherein these
342 tasks are performed (where now sleep(1) is).
344 from time import sleep
345 if self.pool_result is not None:
346 self.pool_result.wait()
347 self.send_all('TURN_FINISHED ' + str(self.world.turn))
350 self.send_all_gamestate()
351 self.pool_result = self.pool.map_async(fib, (35, 35))
355 """Handle commands coming through queue q, send results back.
357 Commands from q are expected to be tuples, with the first element either
358 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
359 an optional third element of arbitrary type. The UUID identifies a
360 receiver for replies.
362 An 'ADD_QUEUE' command should contain as third element a queue through
363 which to send messages back to the sender of the command. A 'KILL_QUEUE'
364 command removes the queue for that receiver from the list of queues through
365 which to send replies.
367 A 'COMMAND' command is specified in greater detail by a string that is the
368 tuple's third element. CommandHandler takes care of processing this and
372 command_handler = CommandHandler(queues_out)
377 content = None if len(x) == 2 else x[2]
378 if command_type == 'ADD_QUEUE':
379 queues_out[connection_id] = content
380 elif command_type == 'COMMAND':
381 command_handler.handle_input(content, connection_id)
382 elif command_type == 'KILL_QUEUE':
383 del queues_out[connection_id]
387 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
389 server = Server(q, ('localhost', 5000), IO_Handler)
391 server.serve_forever()
392 except KeyboardInterrupt:
395 print('Killing server')
396 server.server_close()