9 # Avoid "Address already in use" errors.
10 socketserver.TCPServer.allow_reuse_address = True
13 class Server(socketserver.ThreadingTCPServer):
14 """Bind together threaded IO handling server and message queue."""
16 def __init__(self, queue, port, *args, **kwargs):
17 super().__init__(('localhost', port), IO_Handler, *args, **kwargs)
18 self.queue_out = queue
19 self.daemon_threads = True # Else, server's threads have daemon=False.
22 class IO_Handler(socketserver.BaseRequestHandler):
25 """Move messages between network socket and game IO loop via queues.
27 On start (a new connection from client to server), sets up a
28 new queue, sends it via self.server.queue_out to the game IO
29 loop thread, and from then on receives messages to send back
30 from the game IO loop via that new queue.
32 At the same time, loops over socket's recv to get messages
33 from the outside into the game IO loop by way of
34 self.server.queue_out into the game IO. Ends connection once a
35 'QUIT' message is received from socket, and then also calls
36 for a kill of its own queue.
38 All messages to the game IO loop are tuples, with the first
39 element a meta command ('ADD_QUEUE' for queue creation,
40 'KILL_QUEUE' for queue deletion, and 'COMMAND' for everything
41 else), the second element a UUID that uniquely identifies the
42 thread (so that the game IO loop knows whom to send replies
43 back to), and optionally a third element for further
49 def caught_send(socket, message):
50 """Send message by socket, catch broken socket connection error."""
52 plom_socket_io.send(socket, message)
53 except plom_socket_io.BrokenSocketConnection:
56 def send_queue_messages(socket, queue_in, thread_alive):
57 """Send messages via socket from queue_in while thread_alive[0]."""
58 while thread_alive[0]:
60 msg = queue_in.get(timeout=1)
63 caught_send(socket, msg)
66 print('CONNECTION FROM:', str(self.client_address))
67 connection_id = uuid.uuid4()
68 queue_in = queue.Queue()
69 self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
71 t = threading.Thread(target=send_queue_messages,
72 args=(self.request, queue_in, thread_alive))
74 for message in plom_socket_io.recv(self.request):
76 caught_send(self.request, 'BAD MESSAGE')
77 elif 'QUIT' == message:
78 caught_send(self.request, 'BYE')
81 self.server.queue_out.put(('COMMAND', connection_id, message))
82 self.server.queue_out.put(('KILL_QUEUE', connection_id))
83 thread_alive[0] = False
84 print('CONNECTION CLOSED FROM:', str(self.client_address))
90 def __init__(self, game_file_name, game):
91 self.game_file_name = game_file_name
93 self.parser = parser.Parser(game)
96 """Handle commands coming through queue q, send results back.
98 Commands from q are expected to be tuples, with the first element
99 either 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element
100 a UUID, and an optional third element of arbitrary type. The UUID
101 identifies a receiver for replies.
103 An 'ADD_QUEUE' command should contain as third element a queue
104 through which to send messages back to the sender of the
105 command. A 'KILL_QUEUE' command removes the queue for that
106 receiver from the list of queues through which to send replies.
108 A 'COMMAND' command is specified in greater detail by a string
109 that is the tuple's third element. The game_command_handler takes
110 care of processing this and sending out replies.
117 content = None if len(x) == 2 else x[2]
118 if command_type == 'ADD_QUEUE':
119 self.queues_out[connection_id] = content
120 elif command_type == 'KILL_QUEUE':
121 del self.queues_out[connection_id]
122 elif command_type == 'COMMAND':
123 self.handle_input(content, connection_id)
125 def run_loop_with_server(self):
126 """Run connection of server talking to clients and game IO loop.
128 We have the TCP server (an instance of Server) and we have the
129 game IO loop, a thread running self.loop. Both communicate with
130 each other via a queue.Queue. While the TCP server may spawn
131 parallel threads to many clients, the IO loop works sequentially
132 through game commands received from the TCP server's threads (=
133 client connections to the TCP server). A processed command may
134 trigger messages to the commanding client or to all clients,
135 delivered from the IO loop to the TCP server via the queue.
139 c = threading.Thread(target=self.loop, daemon=True, args=(q,))
141 server = Server(q, 5000)
143 server.serve_forever()
144 except KeyboardInterrupt:
147 print('Killing server')
148 server.server_close()
150 def handle_input(self, input_, connection_id=None, store=True):
151 """Process input_ to command grammar, call command handler if found."""
152 from inspect import signature
155 def answer(connection_id, msg):
157 self.send(msg, connection_id)
162 command = self.parser.parse(input_)
164 answer(connection_id, 'UNHANDLED_INPUT')
166 if 'connection_id' in list(signature(command).parameters):
167 command(connection_id=connection_id)
171 with open(self.game_file_name, 'a') as f:
172 f.write(input_ + '\n')
173 except parser.ArgError as e:
174 answer(connection_id, 'ARGUMENT_ERROR ' + self.quote(str(e)))
175 except server_.game.GameError as e:
176 answer(connection_id, 'GAME_ERROR ' + self.quote(str(e)))
178 def send(self, msg, connection_id=None):
179 """Send message msg to server's client(s) via self.queues_out.
181 If a specific client is identified by connection_id, only
182 sends msg to that one. Else, sends it to all clients
183 identified in self.queues_out.
187 self.queues_out[connection_id].put(msg)
189 for connection_id in self.queues_out:
190 self.queues_out[connection_id].put(msg)
192 def quote(self, string):
193 """Quote & escape string so client interprets it as single token."""
194 # FIXME: Don't do this as a method, makes no sense.
202 return ''.join(quoted)