9 # Avoid "Address already in use" errors.
10 socketserver.TCPServer.allow_reuse_address = True
13 # Our default server port.
17 class Server(socketserver.ThreadingTCPServer):
18 """Bind together threaded IO handling server and message queue."""
20 def __init__(self, queue, *args, **kwargs):
21 super().__init__(('localhost', SERVER_PORT), IO_Handler, *args, **kwargs)
22 self.queue_out = queue
23 self.daemon_threads = True # Else, server's threads have daemon=False.
26 class IO_Handler(socketserver.BaseRequestHandler):
29 """Move messages between network socket and game IO loop via queues.
31 On start (a new connection from client to server), sets up a
32 new queue, sends it via self.server.queue_out to the game IO
33 loop thread, and from then on receives messages to send back
34 from the game IO loop via that new queue.
36 At the same time, loops over socket's recv to get messages
37 from the outside into the game IO loop by way of
38 self.server.queue_out into the game IO. Ends connection once a
39 'QUIT' message is received from socket, and then also calls
40 for a kill of its own queue.
42 All messages to the game IO loop are tuples, with the first
43 element a meta command ('ADD_QUEUE' for queue creation,
44 'KILL_QUEUE' for queue deletion, and 'COMMAND' for everything
45 else), the second element a UUID that uniquely identifies the
46 thread (so that the game IO loop knows whom to send replies
47 back to), and optionally a third element for further
53 def caught_send(socket, message):
54 """Send message by socket, catch broken socket connection error."""
56 plom_socket_io.send(socket, message)
57 except plom_socket_io.BrokenSocketConnection:
60 def send_queue_messages(socket, queue_in, thread_alive):
61 """Send messages via socket from queue_in while thread_alive[0]."""
62 while thread_alive[0]:
64 msg = queue_in.get(timeout=1)
67 caught_send(socket, msg)
70 print('CONNECTION FROM:', str(self.client_address))
71 connection_id = uuid.uuid4()
72 queue_in = queue.Queue()
73 self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
75 t = threading.Thread(target=send_queue_messages,
76 args=(self.request, queue_in, thread_alive))
78 for message in plom_socket_io.recv(self.request):
80 caught_send(self.request, 'BAD MESSAGE')
81 elif 'QUIT' == message:
82 caught_send(self.request, 'BYE')
85 self.server.queue_out.put(('COMMAND', connection_id, message))
86 self.server.queue_out.put(('KILL_QUEUE', connection_id))
87 thread_alive[0] = False
88 print('CONNECTION CLOSED FROM:', str(self.client_address))
94 def __init__(self, game_file_name, game):
95 self.game_file_name = game_file_name
97 self.parser = parser.Parser(game)
100 """Handle commands coming through queue q, send results back.
102 Commands from q are expected to be tuples, with the first element
103 either 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element
104 a UUID, and an optional third element of arbitrary type. The UUID
105 identifies a receiver for replies.
107 An 'ADD_QUEUE' command should contain as third element a queue
108 through which to send messages back to the sender of the
109 command. A 'KILL_QUEUE' command removes the queue for that
110 receiver from the list of queues through which to send replies.
112 A 'COMMAND' command is specified in greater detail by a string
113 that is the tuple's third element. The game_command_handler takes
114 care of processing this and sending out replies.
121 content = None if len(x) == 2 else x[2]
122 if command_type == 'ADD_QUEUE':
123 self.queues_out[connection_id] = content
124 elif command_type == 'KILL_QUEUE':
125 del self.queues_out[connection_id]
126 elif command_type == 'COMMAND':
127 self.handle_input(content, connection_id)
129 def handle_input(self, input_, connection_id=None, store=True):
130 """Process input_ to command grammar, call command handler if found."""
131 from inspect import signature
134 def answer(connection_id, msg):
136 self.send(msg, connection_id)
141 command = self.parser.parse(input_)
143 answer(connection_id, 'UNHANDLED_INPUT')
145 if 'connection_id' in list(signature(command).parameters):
146 command(connection_id=connection_id)
150 with open(self.game_file_name, 'a') as f:
151 f.write(input_ + '\n')
152 except parser.ArgError as e:
153 answer(connection_id, 'ARGUMENT_ERROR ' + self.quote(str(e)))
154 except server_.game.GameError as e:
155 answer(connection_id, 'GAME_ERROR ' + self.quote(str(e)))
157 def send(self, msg, connection_id=None):
158 """Send message msg to server's client(s) via self.queues_out.
160 If a specific client is identified by connection_id, only
161 sends msg to that one. Else, sends it to all clients
162 identified in self.queues_out.
166 self.queues_out[connection_id].put(msg)
168 for connection_id in self.queues_out:
169 self.queues_out[connection_id].put(msg)
171 def quote(self, string):
172 """Quote & escape string so client interprets it as single token."""
173 # FIXME: Don't do this as a method, makes no sense.
181 return ''.join(quoted)
184 def run_server_with_io_loop(game):
185 """Run connection of server talking to clients and game IO loop.
187 We have the TCP server (an instance of Server) and we have the
188 game IO loop, a thread running Game.io.loop. Both communicate with
189 each other via a queue.Queue. While the TCP server may spawn
190 parallel threads to many clients, the IO loop works sequentially
191 through game commands received from the TCP server's threads (=
192 client connections to the TCP server). A processed command may
193 trigger messages to the commanding client or to all clients,
194 delivered from the IO loop to the TCP server via the queue.
198 c = threading.Thread(target=game.io.loop, daemon=True, args=(q,))
202 server.serve_forever()
203 except KeyboardInterrupt:
206 print('Killing server')
207 server.server_close()