home · contact · privacy
Fix minor bug.
[plomrogue2-experiments] / new / plomrogue / io.py
1 import queue
2 import threading
3 import socketserver
4 from plomrogue.errors import GameError, ArgError, BrokenSocketConnection
5 from plomrogue.parser import Parser
6 from plomrogue.misc import quote
7
8
9
10 # Avoid "Address already in use" errors.
11 socketserver.TCPServer.allow_reuse_address = True
12
13
14
15 class PlomSocket:
16
17     def __init__(self, socket):
18         self.socket = socket
19
20     def send(self, message, silent_connection_break=False):
21         """Send via self.socket, encoded/delimited as way recv() expects.
22
23         In detail, all \ and $ in message are escaped with prefixed \,
24         and an unescaped $ is appended as a message delimiter. Then,
25         socket.send() is called as often as necessary to ensure
26         message is sent fully, as socket.send() due to buffering may
27         not send all of it right away.
28
29         Assuming socket is blocking, it's rather improbable that
30         socket.send() will be partial / return a positive value less
31         than the (byte) length of msg – but not entirely out of the
32         question. See: - <http://stackoverflow.com/q/19697218> -
33         <http://stackoverflow.com/q/2618736> -
34         <http://stackoverflow.com/q/8900474>
35
36         This also handles a socket.send() return value of 0, which
37         might be possible or not (?) for blocking sockets: -
38         <http://stackoverflow.com/q/34919846>
39
40         """
41         escaped_message = ''
42         for char in message:
43             if char in ('\\', '$'):
44                 escaped_message += '\\'
45             escaped_message += char
46         escaped_message += '$'
47         data = escaped_message.encode()
48         totalsent = 0
49         while totalsent < len(data):
50             socket_broken = False
51             try:
52                 sent = self.socket.send(data[totalsent:])
53                 socket_broken = sent == 0
54             except OSError as err:
55                 if err.errno == 9:  # "Bad file descriptor", when connection broken
56                     socket_broken = True
57                 else:
58                     raise err
59             if socket_broken and not silent_connection_break:
60                 raise BrokenSocketConnection
61             totalsent = totalsent + sent
62
63     def recv(self):
64         """Get full send()-prepared message from self.socket.
65
66         In detail, socket.recv() is looped over for sequences of bytes
67         that can be decoded as a Unicode string delimited by an
68         unescaped $, with \ and $ escapable by \. If a sequence of
69         characters that ends in an unescaped $ cannot be decoded as
70         Unicode, None is returned as its representation. Stop once
71         socket.recv() returns nothing.
72
73         Under the hood, the TCP stack receives packets that construct
74         the input payload in an internal buffer; socket.recv(BUFSIZE)
75         pops up to BUFSIZE bytes from that buffer, without knowledge
76         either about the input's segmentation into packets, or whether
77         the input is segmented in any other meaningful way; that's why
78         we do our own message segmentation with $ as a delimiter.
79
80         """
81         esc = False
82         data = b''
83         msg = b''
84         while True:
85             data += self.socket.recv(1024)
86             if 0 == len(data):
87                 return
88             cut_off = 0
89             for c in data:
90                 cut_off += 1
91                 if esc:
92                     msg += bytes([c])
93                     esc = False
94                 elif chr(c) == '\\':
95                     esc = True
96                 elif chr(c) == '$':
97                     try:
98                         yield msg.decode()
99                     except UnicodeDecodeError:
100                         yield None
101                     data = data[cut_off:]
102                     msg = b''
103                 else:
104                     msg += bytes([c])
105
106
107
108 class Server(socketserver.ThreadingTCPServer):
109     """Bind together threaded IO handling server and message queue."""
110
111     def __init__(self, queue, port, *args, **kwargs):
112         super().__init__(('localhost', port), IO_Handler, *args, **kwargs)
113         self.queue_out = queue
114         self.daemon_threads = True  # Else, server's threads have daemon=False.
115
116
117
118 class IO_Handler(socketserver.BaseRequestHandler):
119
120     def handle(self):
121         """Move messages between network socket and game IO loop via queues.
122
123         On start (a new connection from client to server), sets up a
124         new queue, sends it via self.server.queue_out to the game IO
125         loop thread, and from then on receives messages to send back
126         from the game IO loop via that new queue.
127
128         At the same time, loops over socket's recv to get messages
129         from the outside into the game IO loop by way of
130         self.server.queue_out into the game IO. Ends connection once a
131         'QUIT' message is received from socket, and then also calls
132         for a kill of its own queue.
133
134         All messages to the game IO loop are tuples, with the first
135         element a meta command ('ADD_QUEUE' for queue creation,
136         'KILL_QUEUE' for queue deletion, and 'COMMAND' for everything
137         else), the second element a UUID that uniquely identifies the
138         thread (so that the game IO loop knows whom to send replies
139         back to), and optionally a third element for further
140         instructions.
141
142         """
143
144         def send_queue_messages(plom_socket, queue_in, thread_alive):
145             """Send messages via socket from queue_in while thread_alive[0]."""
146             while thread_alive[0]:
147                 try:
148                     msg = queue_in.get(timeout=1)
149                 except queue.Empty:
150                     continue
151                 plom_socket.send(msg, True)
152
153         import uuid
154         plom_socket = PlomSocket(self.request)
155         print('CONNECTION FROM:', str(self.client_address))
156         connection_id = uuid.uuid4()
157         queue_in = queue.Queue()
158         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
159         thread_alive = [True]
160         t = threading.Thread(target=send_queue_messages,
161                              args=(plom_socket, queue_in, thread_alive))
162         t.start()
163         for message in plom_socket.recv():
164             if message is None:
165                 plom_socket.send('BAD MESSAGE', True)
166             elif 'QUIT' == message:
167                 plom_socket.send('BYE', True)
168                 break
169             else:
170                 self.server.queue_out.put(('COMMAND', connection_id, message))
171         self.server.queue_out.put(('KILL_QUEUE', connection_id))
172         thread_alive[0] = False
173         print('CONNECTION CLOSED FROM:', str(self.client_address))
174         plom_socket.socket.close()
175
176
177
178 class GameIO():
179
180     def __init__(self, game_file_name, game):
181         self.game_file_name = game_file_name
182         self.queues_out = {}
183         self.parser = Parser(game)
184
185     def loop(self, q):
186         """Handle commands coming through queue q, send results back.
187
188         Commands from q are expected to be tuples, with the first element
189         either 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element
190         a UUID, and an optional third element of arbitrary type. The UUID
191         identifies a receiver for replies.
192
193         An 'ADD_QUEUE' command should contain as third element a queue
194         through which to send messages back to the sender of the
195         command. A 'KILL_QUEUE' command removes the queue for that
196         receiver from the list of queues through which to send replies.
197
198         A 'COMMAND' command is specified in greater detail by a string
199         that is the tuple's third element. The game_command_handler takes
200         care of processing this and sending out replies.
201
202         """
203         while True:
204             x = q.get()
205             command_type = x[0]
206             connection_id = x[1]
207             content = None if len(x) == 2 else x[2]
208             if command_type == 'ADD_QUEUE':
209                 self.queues_out[connection_id] = content
210             elif command_type == 'KILL_QUEUE':
211                 del self.queues_out[connection_id]
212             elif command_type == 'COMMAND':
213                 self.handle_input(content, connection_id)
214
215     def run_loop_with_server(self):
216         """Run connection of server talking to clients and game IO loop.
217
218         We have the TCP server (an instance of Server) and we have the
219         game IO loop, a thread running self.loop. Both communicate with
220         each other via a queue.Queue. While the TCP server may spawn
221         parallel threads to many clients, the IO loop works sequentially
222         through game commands received from the TCP server's threads (=
223         client connections to the TCP server). A processed command may
224         trigger messages to the commanding client or to all clients,
225         delivered from the IO loop to the TCP server via the queue.
226
227         """
228         q = queue.Queue()
229         c = threading.Thread(target=self.loop, daemon=True, args=(q,))
230         c.start()
231         server = Server(q, 5000)
232         try:
233             server.serve_forever()
234         except KeyboardInterrupt:
235             pass
236         finally:
237             print('Killing server')
238             server.server_close()
239
240     def handle_input(self, input_, connection_id=None, store=True):
241         """Process input_ to command grammar, call command handler if found."""
242         from inspect import signature
243
244         def answer(connection_id, msg):
245             if connection_id:
246                 self.send(msg, connection_id)
247             else:
248                 print(msg)
249
250         try:
251             command, args = self.parser.parse(input_)
252             if command is None:
253                 answer(connection_id, 'UNHANDLED_INPUT')
254             else:
255                 if 'connection_id' in list(signature(command).parameters):
256                     command(*args, connection_id=connection_id)
257                 else:
258                     command(*args)
259                     if store and not hasattr(command, 'dont_save'):
260                         with open(self.game_file_name, 'a') as f:
261                             f.write(input_ + '\n')
262         except ArgError as e:
263             answer(connection_id, 'ARGUMENT_ERROR ' + quote(str(e)))
264         except GameError as e:
265             answer(connection_id, 'GAME_ERROR ' + quote(str(e)))
266
267     def send(self, msg, connection_id=None):
268         """Send message msg to server's client(s) via self.queues_out.
269
270         If a specific client is identified by connection_id, only
271         sends msg to that one. Else, sends it to all clients
272         identified in self.queues_out.
273
274         """
275         if connection_id:
276             self.queues_out[connection_id].put(msg)
277         else:
278             for connection_id in self.queues_out:
279                 self.queues_out[connection_id].put(msg)