7 # Avoid "Address already in use" errors.
8 socketserver.TCPServer.allow_reuse_address = True
11 class Server(socketserver.ThreadingTCPServer):
12 """Bind together threaded IO handling server and message queue."""
14 def __init__(self, queue, *args, **kwargs):
15 super().__init__(*args, **kwargs)
16 self.queue_out = queue
17 self.daemon_threads = True # Else, server's threads have daemon=False.
20 class IO_Handler(socketserver.BaseRequestHandler):
23 """Move messages between network socket and main thread via queues.
25 On start, sets up new queue, sends it via self.server.queue_out to
26 main thread, and from then on receives messages to send back from the
27 main thread via that new queue.
29 At the same time, loops over socket's recv to get messages from the
30 outside via self.server.queue_out into the main thread. Ends connection
31 once a 'QUIT' message is received from socket, and then also kills its
34 All messages to the main thread are tuples, with the first element a
35 meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
36 deletion, and 'COMMAND' for everything else), the second element a UUID
37 that uniquely identifies the thread (so that the main thread knows whom
38 to send replies back to), and optionally a third element for further
43 def caught_send(socket, message):
44 """Send message by socket, catch broken socket connection error."""
46 plom_socket_io.send(socket, message)
47 except plom_socket_io.BrokenSocketConnection:
50 def send_queue_messages(socket, queue_in, thread_alive):
51 """Send messages via socket from queue_in while thread_alive[0]."""
52 while thread_alive[0]:
54 msg = queue_in.get(timeout=1)
57 caught_send(socket, msg)
60 print('CONNECTION FROM:', str(self.client_address))
61 connection_id = uuid.uuid4()
62 queue_in = queue.Queue()
63 self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
65 t = threading.Thread(target=send_queue_messages,
66 args=(self.request, queue_in, thread_alive))
68 for message in plom_socket_io.recv(self.request):
70 caught_send(self.request, 'BAD MESSAGE')
71 elif 'QUIT' == message:
72 caught_send(self.request, 'BYE')
75 self.server.queue_out.put(('COMMAND', connection_id, message))
76 self.server.queue_out.put(('KILL_QUEUE', connection_id))
77 thread_alive[0] = False
78 print('CONNECTION CLOSED FROM:', str(self.client_address))
84 def __init__(self, name, args=(), kwargs={}):
93 def __init__(self, type_, position):
95 self.position = position
96 self.task = Task('wait')
101 def task_move(self, direction):
102 if direction == 'UP':
103 self.position[0] -= 1
104 elif direction == 'DOWN':
105 self.position[0] += 1
107 def decide_task(self):
108 self.set_task('wait')
110 def set_task(self, task, *args, **kwargs):
111 self.task = Task(task, args, kwargs)
113 def proceed(self, is_AI=True):
114 """Further the thing in its tasks.
116 Decrements .task.todo; if it thus falls to <= 0, enacts method whose
117 name is 'task_' + self.task.name and sets .task = None. If is_AI, calls
118 .decide_task to decide a self.task.
121 if self.task.todo <= 0:
122 task= getattr(self, 'task_' + self.task.name)
123 task(*self.task.args, **self.task.kwargs)
125 if is_AI and self.task is None:
133 self.map_size = (5, 5)
134 self.map_ = 'xxxxx\n'+\
139 self.things = [Thing('human', [3, 3]), Thing('monster', [1, 1])]
141 self.player = self.things[self.player_i]
145 """Calculate n-th Fibonacci number. Very inefficiently."""
149 return fib(n-1) + fib(n-2)
152 class ArgumentError(Exception):
156 class CommandHandler:
158 def __init__(self, queues_out):
159 from multiprocessing import Pool
160 self.queues_out = queues_out
162 # self.pool and self.pool_result are currently only needed by the FIB
163 # command and the demo of a parallelized game loop in cmd_inc_p.
165 self.pool_result = None
167 def send_to(self, connection_id, msg):
168 """Send msg to client of connection_id."""
169 self.queues_out[connection_id].put(msg)
171 def send_all(self, msg):
172 """Send msg to all clients."""
173 for connection_id in self.queues_out:
174 self.send_to(connection_id, msg)
176 def stringify_yx(self, tuple_):
177 """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
178 return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
180 def proceed_to_next_player_turn(self, connection_id):
181 """Run game world turns until player can decide their next step.
183 Sends a 'TURN_FINISHED' message, then iterates through all non-player
184 things, on each step furthering them in their tasks (and letting them
185 decide new ones if they finish). The iteration order is: first all
186 things that come after the player in the world things list, then (after
187 incrementing the world turn) all that come before the player; then the
188 player's .proceed() is run, and if it does not finish his task, the
189 loop starts at the beginning. Once the player's task is finished, the
190 loop breaks, and client-relevant game data is sent.
192 self.send_all('TURN_FINISHED ' + str(self.world.turn))
194 for thing in self.world.things[self.world.player_i+1:]:
197 for thing in self.world.things[:self.world.player_i]:
199 self.world.player.proceed(is_AI=False)
200 if self.world.player.task is None:
202 self.send_all('NEW_TURN ' + str(self.world.turn))
203 self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
204 self.send_all('TERRAIN\n' + self.world.map_)
205 for thing in self.world.things:
206 self.send_all('THING TYPE:' + thing.type + ' '
207 + self.stringify_yx(thing.position))
209 def cmd_fib(self, tokens, connection_id):
210 """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
212 Numbers are calculated in parallel as far as possible, using fib().
213 A 'CALCULATING …' message is sent to caller before the result.
216 raise ArgumentError('FIB NEEDS AT LEAST ONE ARGUMENT')
218 for token in tokens[1:]:
219 if token == '0' or not token.isdigit():
220 raise ArgumentError('FIB ARGUMENTS MUST BE INTEGERS > 0')
221 numbers += [int(token)]
222 self.send_to(connection_id, 'CALCULATING …')
223 results = self.pool.map(fib, numbers)
224 reply = ' '.join([str(r) for r in results])
225 self.send_to(connection_id, reply)
227 def cmd_inc_p(self, connection_id):
228 """Increment world.turn, send game turn data to everyone.
230 To simulate game processing waiting times, a one second delay between
231 TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
232 calculations are started as pool processes that need to be finished
233 until a further INC finishes the turn.
235 This is just a demo structure for how the game loop could work when
236 parallelized. One might imagine a two-step game turn, with a non-action
237 step determining actor tasks (the AI determinations would take the
238 place of the fib calculations here), and an action step wherein these
239 tasks are performed (where now sleep(1) is).
241 from time import sleep
242 if self.pool_result is not None:
243 self.pool_result.wait()
244 self.send_all('TURN_FINISHED ' + str(self.world.turn))
247 self.send_all('NEW_TURN ' + str(self.world.turn))
248 self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
249 self.send_all('TERRAIN\n' + self.world.map_)
250 for thing in self.world.things:
251 self.send_all('THING TYPE:' + thing.type + ' '
252 + self.stringify_yx(thing.position))
253 self.pool_result = self.pool.map_async(fib, (35, 35))
255 def cmd_get_turn(self, connection_id):
256 """Send world.turn to caller."""
257 self.send_to(connection_id, str(self.world.turn))
259 def cmd_move(self, direction, connection_id):
260 """Set player task to 'move' with direction arg, finish player turn."""
261 if not direction in {'UP', 'DOWN'}:
262 raise ArgumentError('MOVE ARGUMENT MUST BE "UP" or "DOWN"')
263 self.world.player.set_task('move', direction=direction)
264 self.proceed_to_next_player_turn(connection_id)
266 def cmd_wait(self, connection_id):
267 """Set player task to 'wait', finish player turn."""
268 self.world.player.set_task('wait')
269 self.proceed_to_next_player_turn(connection_id)
271 def cmd_echo(self, tokens, input_, connection_id):
272 """Send message in input_ beyond tokens[0] to caller."""
273 msg = input_[len(tokens[0]) + 1:]
274 self.send_to(connection_id, msg)
276 def cmd_all(self, tokens, input_):
277 """Send message in input_ beyond tokens[0] to all clients."""
278 msg = input_[len(tokens[0]) + 1:]
281 def handle_input(self, input_, connection_id):
282 """Process input_ to command grammar, call command handler if found."""
283 tokens = [token for token in input_.split(' ') if len(token) > 0]
286 self.send_to(connection_id, 'EMPTY COMMAND')
287 elif len(tokens) == 1 and tokens[0] == 'INC_P':
288 self.cmd_inc_p(connection_id)
289 elif len(tokens) == 1 and tokens[0] == 'GET_TURN':
290 self.cmd_get_turn(connection_id)
291 elif len(tokens) == 1 and tokens[0] == 'WAIT':
292 self.cmd_wait(connection_id)
293 elif len(tokens) == 2 and tokens[0] == 'MOVE':
294 self.cmd_move(tokens[1], connection_id)
295 elif len(tokens) >= 1 and tokens[0] == 'ECHO':
296 self.cmd_echo(tokens, input_, connection_id)
297 elif len(tokens) >= 1 and tokens[0] == 'ALL':
298 self.cmd_all(tokens, input_)
299 elif len(tokens) >= 1 and tokens[0] == 'FIB':
300 # TODO: Should this really block the whole loop?
301 self.cmd_fib(tokens, connection_id)
303 self.send_to(connection_id, 'UNKNOWN COMMAND')
304 except ArgumentError as e:
305 self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
309 """Handle commands coming through queue q, send results back.
311 Commands from q are expected to be tuples, with the first element either
312 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
313 an optional third element of arbitrary type. The UUID identifies a
314 receiver for replies.
316 An 'ADD_QUEUE' command should contain as third element a queue through
317 which to send messages back to the sender of the command. A 'KILL_QUEUE'
318 command removes the queue for that receiver from the list of queues through
319 which to send replies.
321 A 'COMMAND' command is specified in greater detail by a string that is the
322 tuple's third element. CommandHandler takes care of processing this and
326 command_handler = CommandHandler(queues_out)
331 content = None if len(x) == 2 else x[2]
332 if command_type == 'ADD_QUEUE':
333 queues_out[connection_id] = content
334 elif command_type == 'COMMAND':
335 command_handler.handle_input(content, connection_id)
336 elif command_type == 'KILL_QUEUE':
337 del queues_out[connection_id]
341 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
343 server = Server(q, ('localhost', 5000), IO_Handler)
345 server.serve_forever()
346 except KeyboardInterrupt:
349 print('Killing server')
350 server.server_close()