6 from parser import ArgError, Parser
9 # Avoid "Address already in use" errors.
10 socketserver.TCPServer.allow_reuse_address = True
13 class GameError(Exception):
17 class Server(socketserver.ThreadingTCPServer):
18 """Bind together threaded IO handling server and message queue."""
20 def __init__(self, queue, *args, **kwargs):
21 super().__init__(*args, **kwargs)
22 self.queue_out = queue
23 self.daemon_threads = True # Else, server's threads have daemon=False.
26 class IO_Handler(socketserver.BaseRequestHandler):
29 """Move messages between network socket and main thread via queues.
31 On start, sets up new queue, sends it via self.server.queue_out to
32 main thread, and from then on receives messages to send back from the
33 main thread via that new queue.
35 At the same time, loops over socket's recv to get messages from the
36 outside via self.server.queue_out into the main thread. Ends connection
37 once a 'QUIT' message is received from socket, and then also kills its
40 All messages to the main thread are tuples, with the first element a
41 meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
42 deletion, and 'COMMAND' for everything else), the second element a UUID
43 that uniquely identifies the thread (so that the main thread knows whom
44 to send replies back to), and optionally a third element for further
49 def caught_send(socket, message):
50 """Send message by socket, catch broken socket connection error."""
52 plom_socket_io.send(socket, message)
53 except plom_socket_io.BrokenSocketConnection:
56 def send_queue_messages(socket, queue_in, thread_alive):
57 """Send messages via socket from queue_in while thread_alive[0]."""
58 while thread_alive[0]:
60 msg = queue_in.get(timeout=1)
63 caught_send(socket, msg)
66 print('CONNECTION FROM:', str(self.client_address))
67 connection_id = uuid.uuid4()
68 queue_in = queue.Queue()
69 self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
71 t = threading.Thread(target=send_queue_messages,
72 args=(self.request, queue_in, thread_alive))
74 for message in plom_socket_io.recv(self.request):
76 caught_send(self.request, 'BAD MESSAGE')
77 elif 'QUIT' == message:
78 caught_send(self.request, 'BYE')
81 self.server.queue_out.put(('COMMAND', connection_id, message))
82 self.server.queue_out.put(('KILL_QUEUE', connection_id))
83 thread_alive[0] = False
84 print('CONNECTION CLOSED FROM:', str(self.client_address))
90 def __init__(self, name, args=(), kwargs={}):
99 def __init__(self, world, type_, position):
102 self.position = position
103 self.task = Task('wait')
105 def _move_pos(self, direction, pos_yx):
106 if direction == 'UP':
108 elif direction == 'DOWN':
110 elif direction == 'RIGHT':
112 elif direction == 'LEFT':
118 def task_move(self, direction):
119 self._move_pos(direction, self.position)
121 def decide_task(self):
122 if self.position[1] > 1:
123 self.set_task('move', 'LEFT')
124 elif self.position[1] < 3:
125 self.set_task('move', 'RIGHT')
127 self.set_task('wait')
129 def check_task(self, task, *args, **kwargs):
134 direction = kwargs['direction']
135 test_pos = self.position[:]
136 self._move_pos(direction, test_pos)
137 if test_pos[0] < 0 or test_pos[1] < 0 or \
138 test_pos[0] >= self.world.map_size[0] or \
139 test_pos[1] >= self.world.map_size[1]:
140 raise GameError('would move outside map bounds')
141 pos_i = test_pos[0] * self.world.map_size[1] + test_pos[1]
142 map_tile = self.world.map_[pos_i]
144 raise GameError('would move into illegal terrain')
146 def set_task(self, task, *args, **kwargs):
147 self.check_task(task, *args, **kwargs)
148 self.task = Task(task, args, kwargs)
150 def proceed(self, is_AI=True):
151 """Further the thing in its tasks.
153 Decrements .task.todo; if it thus falls to <= 0, enacts method whose
154 name is 'task_' + self.task.name and sets .task = None. If is_AI, calls
155 .decide_task to decide a self.task.
158 if self.task.todo <= 0:
159 task = getattr(self, 'task_' + self.task.name)
160 task(*self.task.args, **self.task.kwargs)
162 if is_AI and self.task is None:
170 self.map_size = (5, 5)
171 self.map_ = 'xxxxx' +\
177 Thing(self, 'human', [3, 3]),
178 Thing(self, 'monster', [1, 1])
181 self.player = self.things[self.player_i]
185 """Calculate n-th Fibonacci number. Very inefficiently."""
189 return fib(n-1) + fib(n-2)
192 class CommandHandler:
194 def __init__(self, queues_out):
195 from multiprocessing import Pool
196 self.queues_out = queues_out
198 self.parser = Parser(self)
199 # self.pool and self.pool_result are currently only needed by the FIB
200 # command and the demo of a parallelized game loop in cmd_inc_p.
202 self.pool_result = None
204 def handle_input(self, input_, connection_id):
205 """Process input_ to command grammar, call command handler if found."""
207 command = self.parser.parse(input_)
209 self.send_to(connection_id, 'UNHANDLED INPUT')
211 command(connection_id=connection_id)
212 except ArgError as e:
213 self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
214 except GameError as e:
215 self.send_to(connection_id, 'GAME ERROR: ' + str(e))
217 def send_to(self, connection_id, msg):
218 """Send msg to client of connection_id."""
219 self.queues_out[connection_id].put(msg)
221 def send_all(self, msg):
222 """Send msg to all clients."""
223 for connection_id in self.queues_out:
224 self.send_to(connection_id, msg)
226 def stringify_yx(self, tuple_):
227 """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
228 return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
230 def quoted(self, string):
231 """Quote and escape string so client interprets it as single token."""
239 return ''.join(quoted)
241 def quoted_map(self, map_string, map_width):
242 """Put \n into map_string at map_width intervals, return quoted whole."""
244 map_size = len(map_string)
246 while start_cut < map_size:
247 limit = start_cut + map_width
248 map_lines += [map_string[start_cut:limit]]
250 return self.quoted("\n".join(map_lines))
252 def send_all_gamestate(self):
253 """Send out game state data relevant to clients."""
254 self.send_all('NEW_TURN ' + str(self.world.turn))
255 self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
256 self.send_all('TERRAIN\n' + self.quoted_map(self.world.map_,
257 self.world.map_size[1]))
258 for thing in self.world.things:
259 self.send_all('THING TYPE:' + thing.type_ + ' '
260 + self.stringify_yx(thing.position))
262 def proceed_to_next_player_turn(self, connection_id):
263 """Run game world turns until player can decide their next step.
265 Sends a 'TURN_FINISHED' message, then iterates through all non-player
266 things, on each step furthering them in their tasks (and letting them
267 decide new ones if they finish). The iteration order is: first all
268 things that come after the player in the world things list, then (after
269 incrementing the world turn) all that come before the player; then the
270 player's .proceed() is run, and if it does not finish his task, the
271 loop starts at the beginning. Once the player's task is finished, the
272 loop breaks, and client-relevant game data is sent.
274 self.send_all('TURN_FINISHED ' + str(self.world.turn))
276 for thing in self.world.things[self.world.player_i+1:]:
279 for thing in self.world.things[:self.world.player_i]:
281 self.world.player.proceed(is_AI=False)
282 if self.world.player.task is None:
284 self.send_all_gamestate()
286 def cmd_MOVE(self, direction, connection_id):
287 """Set player task to 'move' with direction arg, finish player turn."""
288 if direction not in {'UP', 'DOWN', 'RIGHT', 'LEFT'}:
289 raise ArgError('Move argument must be one of: '
290 'UP, DOWN, RIGHT, LEFT')
291 self.world.player.set_task('move', direction=direction)
292 self.proceed_to_next_player_turn(connection_id)
293 cmd_MOVE.argtypes = 'string'
295 def cmd_WAIT(self, connection_id):
296 """Set player task to 'wait', finish player turn."""
297 self.world.player.set_task('wait')
298 self.proceed_to_next_player_turn(connection_id)
300 def cmd_GET_TURN(self, connection_id):
301 """Send world.turn to caller."""
302 self.send_to(connection_id, str(self.world.turn))
304 def cmd_ECHO(self, msg, connection_id):
305 """Send msg to caller."""
306 self.send_to(connection_id, msg)
307 cmd_ECHO.argtypes = 'string'
309 def cmd_ALL(self, msg, connection_id):
310 """Send msg to all clients."""
312 cmd_ALL.argtypes = 'string'
314 def cmd_FIB(self, numbers, connection_id):
315 """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
317 Numbers are calculated in parallel as far as possible, using fib().
318 A 'CALCULATING …' message is sent to caller before the result.
320 self.send_to(connection_id, 'CALCULATING …')
321 results = self.pool.map(fib, numbers)
322 reply = ' '.join([str(r) for r in results])
323 self.send_to(connection_id, reply)
324 cmd_FIB.argtypes = 'seq:int:nonneg'
326 def cmd_INC_P(self, connection_id):
327 """Increment world.turn, send game turn data to everyone.
329 To simulate game processing waiting times, a one second delay between
330 TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
331 calculations are started as pool processes that need to be finished
332 until a further INC finishes the turn.
334 This is just a demo structure for how the game loop could work when
335 parallelized. One might imagine a two-step game turn, with a non-action
336 step determining actor tasks (the AI determinations would take the
337 place of the fib calculations here), and an action step wherein these
338 tasks are performed (where now sleep(1) is).
340 from time import sleep
341 if self.pool_result is not None:
342 self.pool_result.wait()
343 self.send_all('TURN_FINISHED ' + str(self.world.turn))
346 self.send_all_gamestate()
347 self.pool_result = self.pool.map_async(fib, (35, 35))
351 """Handle commands coming through queue q, send results back.
353 Commands from q are expected to be tuples, with the first element either
354 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
355 an optional third element of arbitrary type. The UUID identifies a
356 receiver for replies.
358 An 'ADD_QUEUE' command should contain as third element a queue through
359 which to send messages back to the sender of the command. A 'KILL_QUEUE'
360 command removes the queue for that receiver from the list of queues through
361 which to send replies.
363 A 'COMMAND' command is specified in greater detail by a string that is the
364 tuple's third element. CommandHandler takes care of processing this and
368 command_handler = CommandHandler(queues_out)
373 content = None if len(x) == 2 else x[2]
374 if command_type == 'ADD_QUEUE':
375 queues_out[connection_id] = content
376 elif command_type == 'COMMAND':
377 command_handler.handle_input(content, connection_id)
378 elif command_type == 'KILL_QUEUE':
379 del queues_out[connection_id]
383 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
385 server = Server(q, ('localhost', 5000), IO_Handler)
387 server.serve_forever()
388 except KeyboardInterrupt:
391 print('Killing server')
392 server.server_close()