6 from parser import ArgError, Parser
9 # Avoid "Address already in use" errors.
10 socketserver.TCPServer.allow_reuse_address = True
13 class GameError(Exception):
17 class Server(socketserver.ThreadingTCPServer):
18 """Bind together threaded IO handling server and message queue."""
20 def __init__(self, queue, *args, **kwargs):
21 super().__init__(*args, **kwargs)
22 self.queue_out = queue
23 self.daemon_threads = True # Else, server's threads have daemon=False.
26 class IO_Handler(socketserver.BaseRequestHandler):
29 """Move messages between network socket and main thread via queues.
31 On start, sets up new queue, sends it via self.server.queue_out to
32 main thread, and from then on receives messages to send back from the
33 main thread via that new queue.
35 At the same time, loops over socket's recv to get messages from the
36 outside via self.server.queue_out into the main thread. Ends connection
37 once a 'QUIT' message is received from socket, and then also kills its
40 All messages to the main thread are tuples, with the first element a
41 meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
42 deletion, and 'COMMAND' for everything else), the second element a UUID
43 that uniquely identifies the thread (so that the main thread knows whom
44 to send replies back to), and optionally a third element for further
49 def caught_send(socket, message):
50 """Send message by socket, catch broken socket connection error."""
52 plom_socket_io.send(socket, message)
53 except plom_socket_io.BrokenSocketConnection:
56 def send_queue_messages(socket, queue_in, thread_alive):
57 """Send messages via socket from queue_in while thread_alive[0]."""
58 while thread_alive[0]:
60 msg = queue_in.get(timeout=1)
63 caught_send(socket, msg)
66 print('CONNECTION FROM:', str(self.client_address))
67 connection_id = uuid.uuid4()
68 queue_in = queue.Queue()
69 self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
71 t = threading.Thread(target=send_queue_messages,
72 args=(self.request, queue_in, thread_alive))
74 for message in plom_socket_io.recv(self.request):
76 caught_send(self.request, 'BAD MESSAGE')
77 elif 'QUIT' == message:
78 caught_send(self.request, 'BYE')
81 self.server.queue_out.put(('COMMAND', connection_id, message))
82 self.server.queue_out.put(('KILL_QUEUE', connection_id))
83 thread_alive[0] = False
84 print('CONNECTION CLOSED FROM:', str(self.client_address))
88 def move_pos(direction, pos_yx):
91 elif direction == 'DOWN':
93 elif direction == 'RIGHT':
95 elif direction == 'LEFT':
101 def __init__(self, thing, name, args=(), kwargs={}):
109 if self.name == 'move':
110 if len(self.args) > 0:
111 direction = self.args[0]
113 direction = self.kwargs['direction']
114 test_pos = self.thing.position[:]
115 move_pos(direction, test_pos)
116 if test_pos[0] < 0 or test_pos[1] < 0 or \
117 test_pos[0] >= self.thing.world.map_size[0] or \
118 test_pos[1] >= self.thing.world.map_size[1]:
119 raise GameError('would move outside map bounds')
120 pos_i = test_pos[0] * self.thing.world.map_size[1] + test_pos[1]
121 map_tile = self.thing.world.map_[pos_i]
123 raise GameError('would move into illegal terrain')
128 def __init__(self, world, type_, position):
131 self.position = position
132 self.task = Task(self, 'wait')
137 def task_move(self, direction):
138 move_pos(direction, self.position)
140 def decide_task(self):
141 if self.position[1] > 1:
142 self.set_task('move', 'LEFT')
143 elif self.position[1] < 3:
144 self.set_task('move', 'RIGHT')
146 self.set_task('wait')
148 def set_task(self, task, *args, **kwargs):
149 self.task = Task(self, task, args, kwargs)
152 def proceed(self, is_AI=True):
153 """Further the thing in its tasks.
155 Decrements .task.todo; if it thus falls to <= 0, enacts method whose
156 name is 'task_' + self.task.name and sets .task = None. If is_AI, calls
157 .decide_task to decide a self.task.
160 if self.task.todo <= 0:
161 task = getattr(self, 'task_' + self.task.name)
162 task(*self.task.args, **self.task.kwargs)
164 if is_AI and self.task is None:
172 self.map_size = (5, 5)
173 self.map_ = 'xxxxx' +\
179 Thing(self, 'human', [3, 3]),
180 Thing(self, 'monster', [1, 1])
183 self.player = self.things[self.player_i]
187 """Calculate n-th Fibonacci number. Very inefficiently."""
191 return fib(n-1) + fib(n-2)
194 class CommandHandler:
196 def __init__(self, queues_out):
197 from multiprocessing import Pool
198 self.queues_out = queues_out
200 self.parser = Parser(self)
201 # self.pool and self.pool_result are currently only needed by the FIB
202 # command and the demo of a parallelized game loop in cmd_inc_p.
204 self.pool_result = None
206 def handle_input(self, input_, connection_id):
207 """Process input_ to command grammar, call command handler if found."""
209 command = self.parser.parse(input_)
211 self.send_to(connection_id, 'UNHANDLED INPUT')
213 command(connection_id=connection_id)
214 except ArgError as e:
215 self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
216 except GameError as e:
217 self.send_to(connection_id, 'GAME ERROR: ' + str(e))
219 def send_to(self, connection_id, msg):
220 """Send msg to client of connection_id."""
221 self.queues_out[connection_id].put(msg)
223 def send_all(self, msg):
224 """Send msg to all clients."""
225 for connection_id in self.queues_out:
226 self.send_to(connection_id, msg)
228 def stringify_yx(self, tuple_):
229 """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
230 return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
232 def quoted(self, string):
233 """Quote and escape string so client interprets it as single token."""
241 return ''.join(quoted)
243 def quoted_map(self, map_string, map_width):
244 """Put \n into map_string at map_width intervals, return quoted whole."""
246 map_size = len(map_string)
248 while start_cut < map_size:
249 limit = start_cut + map_width
250 map_lines += [map_string[start_cut:limit]]
252 return self.quoted("\n".join(map_lines))
254 def send_all_gamestate(self):
255 """Send out game state data relevant to clients."""
256 self.send_all('NEW_TURN ' + str(self.world.turn))
257 self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
258 self.send_all('TERRAIN\n' + self.quoted_map(self.world.map_,
259 self.world.map_size[1]))
260 for thing in self.world.things:
261 self.send_all('THING TYPE:' + thing.type_ + ' '
262 + self.stringify_yx(thing.position))
264 def proceed_to_next_player_turn(self, connection_id):
265 """Run game world turns until player can decide their next step.
267 Sends a 'TURN_FINISHED' message, then iterates through all non-player
268 things, on each step furthering them in their tasks (and letting them
269 decide new ones if they finish). The iteration order is: first all
270 things that come after the player in the world things list, then (after
271 incrementing the world turn) all that come before the player; then the
272 player's .proceed() is run, and if it does not finish his task, the
273 loop starts at the beginning. Once the player's task is finished, the
274 loop breaks, and client-relevant game data is sent.
276 self.send_all('TURN_FINISHED ' + str(self.world.turn))
278 for thing in self.world.things[self.world.player_i+1:]:
281 for thing in self.world.things[:self.world.player_i]:
283 self.world.player.proceed(is_AI=False)
284 if self.world.player.task is None:
286 self.send_all_gamestate()
288 def cmd_MOVE(self, direction, connection_id):
289 """Set player task to 'move' with direction arg, finish player turn."""
290 if direction not in {'UP', 'DOWN', 'RIGHT', 'LEFT'}:
291 raise ArgError('Move argument must be one of: '
292 'UP, DOWN, RIGHT, LEFT')
293 self.world.player.set_task('move', direction=direction)
294 self.proceed_to_next_player_turn(connection_id)
295 cmd_MOVE.argtypes = 'string'
297 def cmd_WAIT(self, connection_id):
298 """Set player task to 'wait', finish player turn."""
299 self.world.player.set_task('wait')
300 self.proceed_to_next_player_turn(connection_id)
302 def cmd_GET_TURN(self, connection_id):
303 """Send world.turn to caller."""
304 self.send_to(connection_id, str(self.world.turn))
306 def cmd_ECHO(self, msg, connection_id):
307 """Send msg to caller."""
308 self.send_to(connection_id, msg)
309 cmd_ECHO.argtypes = 'string'
311 def cmd_ALL(self, msg, connection_id):
312 """Send msg to all clients."""
314 cmd_ALL.argtypes = 'string'
316 def cmd_FIB(self, numbers, connection_id):
317 """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
319 Numbers are calculated in parallel as far as possible, using fib().
320 A 'CALCULATING …' message is sent to caller before the result.
322 self.send_to(connection_id, 'CALCULATING …')
323 results = self.pool.map(fib, numbers)
324 reply = ' '.join([str(r) for r in results])
325 self.send_to(connection_id, reply)
326 cmd_FIB.argtypes = 'seq:int:nonneg'
328 def cmd_INC_P(self, connection_id):
329 """Increment world.turn, send game turn data to everyone.
331 To simulate game processing waiting times, a one second delay between
332 TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
333 calculations are started as pool processes that need to be finished
334 until a further INC finishes the turn.
336 This is just a demo structure for how the game loop could work when
337 parallelized. One might imagine a two-step game turn, with a non-action
338 step determining actor tasks (the AI determinations would take the
339 place of the fib calculations here), and an action step wherein these
340 tasks are performed (where now sleep(1) is).
342 from time import sleep
343 if self.pool_result is not None:
344 self.pool_result.wait()
345 self.send_all('TURN_FINISHED ' + str(self.world.turn))
348 self.send_all_gamestate()
349 self.pool_result = self.pool.map_async(fib, (35, 35))
353 """Handle commands coming through queue q, send results back.
355 Commands from q are expected to be tuples, with the first element either
356 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
357 an optional third element of arbitrary type. The UUID identifies a
358 receiver for replies.
360 An 'ADD_QUEUE' command should contain as third element a queue through
361 which to send messages back to the sender of the command. A 'KILL_QUEUE'
362 command removes the queue for that receiver from the list of queues through
363 which to send replies.
365 A 'COMMAND' command is specified in greater detail by a string that is the
366 tuple's third element. CommandHandler takes care of processing this and
370 command_handler = CommandHandler(queues_out)
375 content = None if len(x) == 2 else x[2]
376 if command_type == 'ADD_QUEUE':
377 queues_out[connection_id] = content
378 elif command_type == 'COMMAND':
379 command_handler.handle_input(content, connection_id)
380 elif command_type == 'KILL_QUEUE':
381 del queues_out[connection_id]
385 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
387 server = Server(q, ('localhost', 5000), IO_Handler)
389 server.serve_forever()
390 except KeyboardInterrupt:
393 print('Killing server')
394 server.server_close()