4 from plomrogue.errors import GameError, ArgError, BrokenSocketConnection
5 from plomrogue.parser import Parser
6 from plomrogue.misc import quote
10 # Avoid "Address already in use" errors.
11 socketserver.TCPServer.allow_reuse_address = True
17 def __init__(self, socket):
20 def send(self, message, silent_connection_break=False):
21 """Send via self.socket, encoded/delimited as way recv() expects.
23 In detail, all \ and $ in message are escaped with prefixed \,
24 and an unescaped $ is appended as a message delimiter. Then,
25 socket.send() is called as often as necessary to ensure
26 message is sent fully, as socket.send() due to buffering may
27 not send all of it right away.
29 Assuming socket is blocking, it's rather improbable that
30 socket.send() will be partial / return a positive value less
31 than the (byte) length of msg – but not entirely out of the
32 question. See: - <http://stackoverflow.com/q/19697218> -
33 <http://stackoverflow.com/q/2618736> -
34 <http://stackoverflow.com/q/8900474>
36 This also handles a socket.send() return value of 0, which
37 might be possible or not (?) for blocking sockets: -
38 <http://stackoverflow.com/q/34919846>
43 if char in ('\\', '$'):
44 escaped_message += '\\'
45 escaped_message += char
46 escaped_message += '$'
47 data = escaped_message.encode()
49 while totalsent < len(data):
52 sent = self.socket.send(data[totalsent:])
53 socket_broken = sent == 0
54 except OSError as err:
55 if err.errno == 9: # "Bad file descriptor", when connection broken
59 if socket_broken and not silent_connection_break:
60 raise BrokenSocketConnection
61 totalsent = totalsent + sent
64 """Get full send()-prepared message from self.socket.
66 In detail, socket.recv() is looped over for sequences of bytes
67 that can be decoded as a Unicode string delimited by an
68 unescaped $, with \ and $ escapable by \. If a sequence of
69 characters that ends in an unescaped $ cannot be decoded as
70 Unicode, None is returned as its representation. Stop once
71 socket.recv() returns nothing.
73 Under the hood, the TCP stack receives packets that construct
74 the input payload in an internal buffer; socket.recv(BUFSIZE)
75 pops up to BUFSIZE bytes from that buffer, without knowledge
76 either about the input's segmentation into packets, or whether
77 the input is segmented in any other meaningful way; that's why
78 we do our own message segmentation with $ as a delimiter.
85 data += self.socket.recv(1024)
99 except UnicodeDecodeError:
101 data = data[cut_off:]
108 class Server(socketserver.ThreadingTCPServer):
109 """Bind together threaded IO handling server and message queue."""
111 def __init__(self, queue, port, *args, **kwargs):
112 super().__init__(('localhost', port), IO_Handler, *args, **kwargs)
113 self.queue_out = queue
114 self.daemon_threads = True # Else, server's threads have daemon=False.
118 class IO_Handler(socketserver.BaseRequestHandler):
121 """Move messages between network socket and game IO loop via queues.
123 On start (a new connection from client to server), sets up a
124 new queue, sends it via self.server.queue_out to the game IO
125 loop thread, and from then on receives messages to send back
126 from the game IO loop via that new queue.
128 At the same time, loops over socket's recv to get messages
129 from the outside into the game IO loop by way of
130 self.server.queue_out into the game IO. Ends connection once a
131 'QUIT' message is received from socket, and then also calls
132 for a kill of its own queue.
134 All messages to the game IO loop are tuples, with the first
135 element a meta command ('ADD_QUEUE' for queue creation,
136 'KILL_QUEUE' for queue deletion, and 'COMMAND' for everything
137 else), the second element a UUID that uniquely identifies the
138 thread (so that the game IO loop knows whom to send replies
139 back to), and optionally a third element for further
144 def send_queue_messages(plom_socket, queue_in, thread_alive):
145 """Send messages via socket from queue_in while thread_alive[0]."""
146 while thread_alive[0]:
148 msg = queue_in.get(timeout=1)
151 plom_socket.send(msg, True)
154 plom_socket = PlomSocket(self.request)
155 print('CONNECTION FROM:', str(self.client_address))
156 connection_id = uuid.uuid4()
157 queue_in = queue.Queue()
158 self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
159 thread_alive = [True]
160 t = threading.Thread(target=send_queue_messages,
161 args=(plom_socket, queue_in, thread_alive))
163 for message in plom_socket.recv():
165 plom_socket.send('BAD MESSAGE', True)
166 elif 'QUIT' == message:
167 plom_socket.send('BYE', True)
170 self.server.queue_out.put(('COMMAND', connection_id, message))
171 self.server.queue_out.put(('KILL_QUEUE', connection_id))
172 thread_alive[0] = False
173 print('CONNECTION CLOSED FROM:', str(self.client_address))
174 plom_socket.socket.close()
180 def __init__(self, game_file_name, game):
181 self.game_file_name = game_file_name
183 self.parser = Parser(game)
186 """Handle commands coming through queue q, send results back.
188 Commands from q are expected to be tuples, with the first element
189 either 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element
190 a UUID, and an optional third element of arbitrary type. The UUID
191 identifies a receiver for replies.
193 An 'ADD_QUEUE' command should contain as third element a queue
194 through which to send messages back to the sender of the
195 command. A 'KILL_QUEUE' command removes the queue for that
196 receiver from the list of queues through which to send replies.
198 A 'COMMAND' command is specified in greater detail by a string
199 that is the tuple's third element. The game_command_handler takes
200 care of processing this and sending out replies.
207 content = None if len(x) == 2 else x[2]
208 if command_type == 'ADD_QUEUE':
209 self.queues_out[connection_id] = content
210 elif command_type == 'KILL_QUEUE':
211 del self.queues_out[connection_id]
212 elif command_type == 'COMMAND':
213 self.handle_input(content, connection_id)
215 def run_loop_with_server(self):
216 """Run connection of server talking to clients and game IO loop.
218 We have the TCP server (an instance of Server) and we have the
219 game IO loop, a thread running self.loop. Both communicate with
220 each other via a queue.Queue. While the TCP server may spawn
221 parallel threads to many clients, the IO loop works sequentially
222 through game commands received from the TCP server's threads (=
223 client connections to the TCP server). A processed command may
224 trigger messages to the commanding client or to all clients,
225 delivered from the IO loop to the TCP server via the queue.
229 c = threading.Thread(target=self.loop, daemon=True, args=(q,))
231 server = Server(q, 5000)
233 server.serve_forever()
234 except KeyboardInterrupt:
237 print('Killing server')
238 server.server_close()
240 def handle_input(self, input_, connection_id=None, store=True):
241 """Process input_ to command grammar, call command handler if found."""
242 from inspect import signature
244 def answer(connection_id, msg):
246 self.send(msg, connection_id)
251 command, args = self.parser.parse(input_)
253 answer(connection_id, 'UNHANDLED_INPUT')
255 if 'connection_id' in list(signature(command).parameters):
256 command(*args, connection_id=connection_id)
259 if store and not hasattr(command, 'dont_save'):
260 with open(self.game_file_name, 'a') as f:
261 f.write(input_ + '\n')
262 except ArgError as e:
263 answer(connection_id, 'ARGUMENT_ERROR ' + quote(str(e)))
264 except GameError as e:
265 answer(connection_id, 'GAME_ERROR ' + quote(str(e)))
267 def send(self, msg, connection_id=None):
268 """Send message msg to server's client(s) via self.queues_out.
270 If a specific client is identified by connection_id, only
271 sends msg to that one. Else, sends it to all clients
272 identified in self.queues_out.
276 self.queues_out[connection_id].put(msg)
278 for connection_id in self.queues_out:
279 self.queues_out[connection_id].put(msg)