home · contact · privacy
Refactor and extend new library.
[plomrogue2-experiments] / new / plomrogue / io.py
1 import queue
2 import threading
3 import socketserver
4 from plomrogue.errors import GameError, ArgError, BrokenSocketConnection
5 from plomrogue.parser import Parser
6
7
8
9 # Avoid "Address already in use" errors.
10 socketserver.TCPServer.allow_reuse_address = True
11
12
13
14 class PlomSocket:
15
16     def __init__(self, socket):
17         self.socket = socket
18
19     def send(self, message, silent_connection_break=False):
20         """Send via self.socket, encoded/delimited as way recv() expects.
21
22         In detail, all \ and $ in message are escaped with prefixed \,
23         and an unescaped $ is appended as a message delimiter. Then,
24         socket.send() is called as often as necessary to ensure
25         message is sent fully, as socket.send() due to buffering may
26         not send all of it right away.
27
28         Assuming socket is blocking, it's rather improbable that
29         socket.send() will be partial / return a positive value less
30         than the (byte) length of msg – but not entirely out of the
31         question. See: - <http://stackoverflow.com/q/19697218> -
32         <http://stackoverflow.com/q/2618736> -
33         <http://stackoverflow.com/q/8900474>
34
35         This also handles a socket.send() return value of 0, which
36         might be possible or not (?) for blocking sockets: -
37         <http://stackoverflow.com/q/34919846>
38
39         """
40         escaped_message = ''
41         for char in message:
42             if char in ('\\', '$'):
43                 escaped_message += '\\'
44             escaped_message += char
45         escaped_message += '$'
46         data = escaped_message.encode()
47         totalsent = 0
48         while totalsent < len(data):
49             socket_broken = False
50             try:
51                 sent = self.socket.send(data[totalsent:])
52                 socket_broken = sent == 0
53             except OSError as err:
54                 if err.errno == 9:  # "Bad file descriptor", when connection broken
55                     socket_broken = True
56                 else:
57                     raise err
58             if socket_broken and not silent_connection_break:
59                 raise BrokenSocketConnection
60             totalsent = totalsent + sent
61
62     def recv(self):
63         """Get full send()-prepared message from self.socket.
64
65         In detail, socket.recv() is looped over for sequences of bytes
66         that can be decoded as a Unicode string delimited by an
67         unescaped $, with \ and $ escapable by \. If a sequence of
68         characters that ends in an unescaped $ cannot be decoded as
69         Unicode, None is returned as its representation. Stop once
70         socket.recv() returns nothing.
71
72         Under the hood, the TCP stack receives packets that construct
73         the input payload in an internal buffer; socket.recv(BUFSIZE)
74         pops up to BUFSIZE bytes from that buffer, without knowledge
75         either about the input's segmentation into packets, or whether
76         the input is segmented in any other meaningful way; that's why
77         we do our own message segmentation with $ as a delimiter.
78
79         """
80         esc = False
81         data = b''
82         msg = b''
83         while True:
84             data += self.socket.recv(1024)
85             if 0 == len(data):
86                 return
87             cut_off = 0
88             for c in data:
89                 cut_off += 1
90                 if esc:
91                     msg += bytes([c])
92                     esc = False
93                 elif chr(c) == '\\':
94                     esc = True
95                 elif chr(c) == '$':
96                     try:
97                         yield msg.decode()
98                     except UnicodeDecodeError:
99                         yield None
100                     data = data[cut_off:]
101                     msg = b''
102                 else:
103                     msg += bytes([c])
104
105
106
107 class Server(socketserver.ThreadingTCPServer):
108     """Bind together threaded IO handling server and message queue."""
109
110     def __init__(self, queue, port, *args, **kwargs):
111         super().__init__(('localhost', port), IO_Handler, *args, **kwargs)
112         self.queue_out = queue
113         self.daemon_threads = True  # Else, server's threads have daemon=False.
114
115
116
117 class IO_Handler(socketserver.BaseRequestHandler):
118
119     def handle(self):
120         """Move messages between network socket and game IO loop via queues.
121
122         On start (a new connection from client to server), sets up a
123         new queue, sends it via self.server.queue_out to the game IO
124         loop thread, and from then on receives messages to send back
125         from the game IO loop via that new queue.
126
127         At the same time, loops over socket's recv to get messages
128         from the outside into the game IO loop by way of
129         self.server.queue_out into the game IO. Ends connection once a
130         'QUIT' message is received from socket, and then also calls
131         for a kill of its own queue.
132
133         All messages to the game IO loop are tuples, with the first
134         element a meta command ('ADD_QUEUE' for queue creation,
135         'KILL_QUEUE' for queue deletion, and 'COMMAND' for everything
136         else), the second element a UUID that uniquely identifies the
137         thread (so that the game IO loop knows whom to send replies
138         back to), and optionally a third element for further
139         instructions.
140
141         """
142
143         def send_queue_messages(plom_socket, queue_in, thread_alive):
144             """Send messages via socket from queue_in while thread_alive[0]."""
145             while thread_alive[0]:
146                 try:
147                     msg = queue_in.get(timeout=1)
148                 except queue.Empty:
149                     continue
150                 plom_socket.send(msg, True)
151
152         import uuid
153         plom_socket = PlomSocket(self.request)
154         print('CONNECTION FROM:', str(self.client_address))
155         connection_id = uuid.uuid4()
156         queue_in = queue.Queue()
157         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
158         thread_alive = [True]
159         t = threading.Thread(target=send_queue_messages,
160                              args=(plom_socket, queue_in, thread_alive))
161         t.start()
162         for message in plom_socket.recv():
163             if message is None:
164                 plom_socket.send('BAD MESSAGE', True)
165             elif 'QUIT' == message:
166                 plom_socket.send('BYE', True)
167                 break
168             else:
169                 self.server.queue_out.put(('COMMAND', connection_id, message))
170         self.server.queue_out.put(('KILL_QUEUE', connection_id))
171         thread_alive[0] = False
172         print('CONNECTION CLOSED FROM:', str(self.client_address))
173         plom_socket.socket.close()
174
175
176
177 class GameIO():
178
179     def __init__(self, game_file_name, game):
180         self.game_file_name = game_file_name
181         self.queues_out = {}
182         self.parser = Parser(game)
183
184     def loop(self, q):
185         """Handle commands coming through queue q, send results back.
186
187         Commands from q are expected to be tuples, with the first element
188         either 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element
189         a UUID, and an optional third element of arbitrary type. The UUID
190         identifies a receiver for replies.
191
192         An 'ADD_QUEUE' command should contain as third element a queue
193         through which to send messages back to the sender of the
194         command. A 'KILL_QUEUE' command removes the queue for that
195         receiver from the list of queues through which to send replies.
196
197         A 'COMMAND' command is specified in greater detail by a string
198         that is the tuple's third element. The game_command_handler takes
199         care of processing this and sending out replies.
200
201         """
202         while True:
203             x = q.get()
204             command_type = x[0]
205             connection_id = x[1]
206             content = None if len(x) == 2 else x[2]
207             if command_type == 'ADD_QUEUE':
208                 self.queues_out[connection_id] = content
209             elif command_type == 'KILL_QUEUE':
210                 del self.queues_out[connection_id]
211             elif command_type == 'COMMAND':
212                 self.handle_input(content, connection_id)
213
214     def run_loop_with_server(self):
215         """Run connection of server talking to clients and game IO loop.
216
217         We have the TCP server (an instance of Server) and we have the
218         game IO loop, a thread running self.loop. Both communicate with
219         each other via a queue.Queue. While the TCP server may spawn
220         parallel threads to many clients, the IO loop works sequentially
221         through game commands received from the TCP server's threads (=
222         client connections to the TCP server). A processed command may
223         trigger messages to the commanding client or to all clients,
224         delivered from the IO loop to the TCP server via the queue.
225
226         """
227         q = queue.Queue()
228         c = threading.Thread(target=self.loop, daemon=True, args=(q,))
229         c.start()
230         server = Server(q, 5000)
231         try:
232             server.serve_forever()
233         except KeyboardInterrupt:
234             pass
235         finally:
236             print('Killing server')
237             server.server_close()
238
239     def handle_input(self, input_, connection_id=None, store=True):
240         """Process input_ to command grammar, call command handler if found."""
241         from inspect import signature
242
243         def answer(connection_id, msg):
244             if connection_id:
245                 self.send(msg, connection_id)
246             else:
247                 print(msg)
248
249         try:
250             command, args = self.parser.parse(input_)
251             if command is None:
252                 answer(connection_id, 'UNHANDLED_INPUT')
253             else:
254                 if 'connection_id' in list(signature(command).parameters):
255                     command(*args, connection_id=connection_id)
256                 else:
257                     command(*args)
258                     if store and not hasattr(command, 'dont_save'):
259                         with open(self.game_file_name, 'a') as f:
260                             f.write(input_ + '\n')
261         except ArgError as e:
262             answer(connection_id, 'ARGUMENT_ERROR ' + quote(str(e)))
263         except GameError as e:
264             answer(connection_id, 'GAME_ERROR ' + quote(str(e)))
265
266     def send(self, msg, connection_id=None):
267         """Send message msg to server's client(s) via self.queues_out.
268
269         If a specific client is identified by connection_id, only
270         sends msg to that one. Else, sends it to all clients
271         identified in self.queues_out.
272
273         """
274         if connection_id:
275             self.queues_out[connection_id].put(msg)
276         else:
277             for connection_id in self.queues_out:
278                 self.queues_out[connection_id].put(msg)