6 from parser import ArgError, Parser
9 # Avoid "Address already in use" errors.
10 socketserver.TCPServer.allow_reuse_address = True
13 class Server(socketserver.ThreadingTCPServer):
14 """Bind together threaded IO handling server and message queue."""
16 def __init__(self, queue, *args, **kwargs):
17 super().__init__(*args, **kwargs)
18 self.queue_out = queue
19 self.daemon_threads = True # Else, server's threads have daemon=False.
22 class IO_Handler(socketserver.BaseRequestHandler):
25 """Move messages between network socket and main thread via queues.
27 On start, sets up new queue, sends it via self.server.queue_out to
28 main thread, and from then on receives messages to send back from the
29 main thread via that new queue.
31 At the same time, loops over socket's recv to get messages from the
32 outside via self.server.queue_out into the main thread. Ends connection
33 once a 'QUIT' message is received from socket, and then also kills its
36 All messages to the main thread are tuples, with the first element a
37 meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
38 deletion, and 'COMMAND' for everything else), the second element a UUID
39 that uniquely identifies the thread (so that the main thread knows whom
40 to send replies back to), and optionally a third element for further
45 def caught_send(socket, message):
46 """Send message by socket, catch broken socket connection error."""
48 plom_socket_io.send(socket, message)
49 except plom_socket_io.BrokenSocketConnection:
52 def send_queue_messages(socket, queue_in, thread_alive):
53 """Send messages via socket from queue_in while thread_alive[0]."""
54 while thread_alive[0]:
56 msg = queue_in.get(timeout=1)
59 caught_send(socket, msg)
62 print('CONNECTION FROM:', str(self.client_address))
63 connection_id = uuid.uuid4()
64 queue_in = queue.Queue()
65 self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
67 t = threading.Thread(target=send_queue_messages,
68 args=(self.request, queue_in, thread_alive))
70 for message in plom_socket_io.recv(self.request):
72 caught_send(self.request, 'BAD MESSAGE')
73 elif 'QUIT' == message:
74 caught_send(self.request, 'BYE')
77 self.server.queue_out.put(('COMMAND', connection_id, message))
78 self.server.queue_out.put(('KILL_QUEUE', connection_id))
79 thread_alive[0] = False
80 print('CONNECTION CLOSED FROM:', str(self.client_address))
86 def __init__(self, name, args=(), kwargs={}):
95 def __init__(self, type_, position):
97 self.position = position
98 self.task = Task('wait')
103 def task_move(self, direction):
104 if direction == 'UP':
105 self.position[0] -= 1
106 elif direction == 'DOWN':
107 self.position[0] += 1
108 elif direction == 'RIGHT':
109 self.position[1] += 1
110 elif direction == 'LEFT':
111 self.position[1] -= 1
113 def decide_task(self):
114 if self.position[1] > 1:
115 self.set_task('move', 'LEFT')
116 elif self.position[1] < 3:
117 self.set_task('move', 'RIGHT')
119 self.set_task('wait')
121 def set_task(self, task, *args, **kwargs):
122 self.task = Task(task, args, kwargs)
124 def proceed(self, is_AI=True):
125 """Further the thing in its tasks.
127 Decrements .task.todo; if it thus falls to <= 0, enacts method whose
128 name is 'task_' + self.task.name and sets .task = None. If is_AI, calls
129 .decide_task to decide a self.task.
132 if self.task.todo <= 0:
133 task = getattr(self, 'task_' + self.task.name)
134 task(*self.task.args, **self.task.kwargs)
136 if is_AI and self.task is None:
144 self.map_size = (5, 5)
145 self.map_ = 'xxxxx\n' +\
150 self.things = [Thing('human', [3, 3]), Thing('monster', [1, 1])]
152 self.player = self.things[self.player_i]
156 """Calculate n-th Fibonacci number. Very inefficiently."""
160 return fib(n-1) + fib(n-2)
163 class CommandHandler:
165 def __init__(self, queues_out):
166 from multiprocessing import Pool
167 self.queues_out = queues_out
169 self.parser = Parser(self)
170 # self.pool and self.pool_result are currently only needed by the FIB
171 # command and the demo of a parallelized game loop in cmd_inc_p.
173 self.pool_result = None
175 def send_to(self, connection_id, msg):
176 """Send msg to client of connection_id."""
177 self.queues_out[connection_id].put(msg)
179 def send_all(self, msg):
180 """Send msg to all clients."""
181 for connection_id in self.queues_out:
182 self.send_to(connection_id, msg)
184 def stringify_yx(self, tuple_):
185 """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
186 return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
188 def quoted(self, string):
189 """Quote and escape string so client interprets it as single token."""
197 return ''.join(quoted)
199 def proceed_to_next_player_turn(self, connection_id):
200 """Run game world turns until player can decide their next step.
202 Sends a 'TURN_FINISHED' message, then iterates through all non-player
203 things, on each step furthering them in their tasks (and letting them
204 decide new ones if they finish). The iteration order is: first all
205 things that come after the player in the world things list, then (after
206 incrementing the world turn) all that come before the player; then the
207 player's .proceed() is run, and if it does not finish his task, the
208 loop starts at the beginning. Once the player's task is finished, the
209 loop breaks, and client-relevant game data is sent.
211 self.send_all('TURN_FINISHED ' + str(self.world.turn))
213 for thing in self.world.things[self.world.player_i+1:]:
216 for thing in self.world.things[:self.world.player_i]:
218 self.world.player.proceed(is_AI=False)
219 if self.world.player.task is None:
221 self.send_all('NEW_TURN ' + str(self.world.turn))
222 self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
223 self.send_all('TERRAIN\n' + self.quoted(self.world.map_))
224 for thing in self.world.things:
225 self.send_all('THING TYPE:' + thing.type + ' '
226 + self.stringify_yx(thing.position))
228 def cmd_FIB(self, numbers, connection_id):
229 """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
231 Numbers are calculated in parallel as far as possible, using fib().
232 A 'CALCULATING …' message is sent to caller before the result.
234 self.send_to(connection_id, 'CALCULATING …')
235 results = self.pool.map(fib, numbers)
236 reply = ' '.join([str(r) for r in results])
237 self.send_to(connection_id, reply)
238 cmd_FIB.argtypes = 'seq:int:nonneg'
240 def cmd_INC_P(self, connection_id):
241 """Increment world.turn, send game turn data to everyone.
243 To simulate game processing waiting times, a one second delay between
244 TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
245 calculations are started as pool processes that need to be finished
246 until a further INC finishes the turn.
248 This is just a demo structure for how the game loop could work when
249 parallelized. One might imagine a two-step game turn, with a non-action
250 step determining actor tasks (the AI determinations would take the
251 place of the fib calculations here), and an action step wherein these
252 tasks are performed (where now sleep(1) is).
254 from time import sleep
255 if self.pool_result is not None:
256 self.pool_result.wait()
257 self.send_all('TURN_FINISHED ' + str(self.world.turn))
260 self.send_all('NEW_TURN ' + str(self.world.turn))
261 self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
262 self.send_all('TERRAIN\n' + self.quoted(self.world.map_))
263 for thing in self.world.things:
264 self.send_all('THING TYPE:' + thing.type + ' '
265 + self.stringify_yx(thing.position))
266 self.pool_result = self.pool.map_async(fib, (35, 35))
268 def cmd_GET_TURN(self, connection_id):
269 """Send world.turn to caller."""
270 self.send_to(connection_id, str(self.world.turn))
272 def cmd_MOVE(self, direction, connection_id):
273 """Set player task to 'move' with direction arg, finish player turn."""
274 if direction not in {'UP', 'DOWN', 'RIGHT', 'LEFT'}:
275 raise ArgError('Move argument must be one of: '
276 'UP, DOWN, RIGHT, LEFT')
277 self.world.player.set_task('move', direction=direction)
278 self.proceed_to_next_player_turn(connection_id)
279 cmd_MOVE.argtypes = 'string'
281 def cmd_WAIT(self, connection_id):
282 """Set player task to 'wait', finish player turn."""
283 self.world.player.set_task('wait')
284 self.proceed_to_next_player_turn(connection_id)
286 def cmd_ECHO(self, msg, connection_id):
287 """Send msg to caller."""
288 self.send_to(connection_id, msg)
289 cmd_ECHO.argtypes = 'string'
291 def cmd_ALL(self, msg, connection_id):
292 """Send msg to all clients."""
294 cmd_ALL.argtypes = 'string'
296 def handle_input(self, input_, connection_id):
297 """Process input_ to command grammar, call command handler if found."""
299 command = self.parser.parse(input_)
301 self.send_to(connection_id, 'UNHANDLED INPUT')
303 command(connection_id=connection_id)
304 except ArgError as e:
305 self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
309 """Handle commands coming through queue q, send results back.
311 Commands from q are expected to be tuples, with the first element either
312 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
313 an optional third element of arbitrary type. The UUID identifies a
314 receiver for replies.
316 An 'ADD_QUEUE' command should contain as third element a queue through
317 which to send messages back to the sender of the command. A 'KILL_QUEUE'
318 command removes the queue for that receiver from the list of queues through
319 which to send replies.
321 A 'COMMAND' command is specified in greater detail by a string that is the
322 tuple's third element. CommandHandler takes care of processing this and
326 command_handler = CommandHandler(queues_out)
331 content = None if len(x) == 2 else x[2]
332 if command_type == 'ADD_QUEUE':
333 queues_out[connection_id] = content
334 elif command_type == 'COMMAND':
335 command_handler.handle_input(content, connection_id)
336 elif command_type == 'KILL_QUEUE':
337 del queues_out[connection_id]
341 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
343 server = Server(q, ('localhost', 5000), IO_Handler)
345 server.serve_forever()
346 except KeyboardInterrupt:
349 print('Killing server')
350 server.server_close()