home · contact · privacy
Refactor.
[plomrogue2-experiments] / server_ / io.py
1 import socketserver
2 import threading
3 import queue
4 import sys
5 sys.path.append('../')
6 import parser
7
8
9 # Avoid "Address already in use" errors.
10 socketserver.TCPServer.allow_reuse_address = True
11
12
13 # Our default server port.
14 SERVER_PORT=5000
15
16
17 class Server(socketserver.ThreadingTCPServer):
18     """Bind together threaded IO handling server and message queue."""
19
20     def __init__(self, queue, *args, **kwargs):
21         super().__init__(('localhost', SERVER_PORT), IO_Handler, *args, **kwargs)
22         self.queue_out = queue
23         self.daemon_threads = True  # Else, server's threads have daemon=False.
24
25
26 class IO_Handler(socketserver.BaseRequestHandler):
27
28     def handle(self):
29         """Move messages between network socket and game IO loop via queues.
30
31         On start (a new connection from client to server), sets up a
32         new queue, sends it via self.server.queue_out to the game IO
33         loop thread, and from then on receives messages to send back
34         from the game IO loop via that new queue.
35
36         At the same time, loops over socket's recv to get messages
37         from the outside into the game IO loop by way of
38         self.server.queue_out into the game IO. Ends connection once a
39         'QUIT' message is received from socket, and then also calls
40         for a kill of its own queue.
41
42         All messages to the game IO loop are tuples, with the first
43         element a meta command ('ADD_QUEUE' for queue creation,
44         'KILL_QUEUE' for queue deletion, and 'COMMAND' for everything
45         else), the second element a UUID that uniquely identifies the
46         thread (so that the game IO loop knows whom to send replies
47         back to), and optionally a third element for further
48         instructions.
49
50         """
51         import plom_socket_io
52
53         def caught_send(socket, message):
54             """Send message by socket, catch broken socket connection error."""
55             try:
56                 plom_socket_io.send(socket, message)
57             except plom_socket_io.BrokenSocketConnection:
58                 pass
59
60         def send_queue_messages(socket, queue_in, thread_alive):
61             """Send messages via socket from queue_in while thread_alive[0]."""
62             while thread_alive[0]:
63                 try:
64                     msg = queue_in.get(timeout=1)
65                 except queue.Empty:
66                     continue
67                 caught_send(socket, msg)
68
69         import uuid
70         print('CONNECTION FROM:', str(self.client_address))
71         connection_id = uuid.uuid4()
72         queue_in = queue.Queue()
73         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
74         thread_alive = [True]
75         t = threading.Thread(target=send_queue_messages,
76                              args=(self.request, queue_in, thread_alive))
77         t.start()
78         for message in plom_socket_io.recv(self.request):
79             if message is None:
80                 caught_send(self.request, 'BAD MESSAGE')
81             elif 'QUIT' == message:
82                 caught_send(self.request, 'BYE')
83                 break
84             else:
85                 self.server.queue_out.put(('COMMAND', connection_id, message))
86         self.server.queue_out.put(('KILL_QUEUE', connection_id))
87         thread_alive[0] = False
88         print('CONNECTION CLOSED FROM:', str(self.client_address))
89         self.request.close()
90
91
92 class GameIO():
93
94     def __init__(self, game_file_name, game):
95         self.game_file_name = game_file_name
96         self.queues_out = {}
97         self.parser = parser.Parser(game)
98
99     def loop(self, q):
100         """Handle commands coming through queue q, send results back.
101
102         Commands from q are expected to be tuples, with the first element
103         either 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element
104         a UUID, and an optional third element of arbitrary type. The UUID
105         identifies a receiver for replies.
106
107         An 'ADD_QUEUE' command should contain as third element a queue
108         through which to send messages back to the sender of the
109         command. A 'KILL_QUEUE' command removes the queue for that
110         receiver from the list of queues through which to send replies.
111
112         A 'COMMAND' command is specified in greater detail by a string
113         that is the tuple's third element. The game_command_handler takes
114         care of processing this and sending out replies.
115
116         """
117         while True:
118             x = q.get()
119             command_type = x[0]
120             connection_id = x[1]
121             content = None if len(x) == 2 else x[2]
122             if command_type == 'ADD_QUEUE':
123                 self.queues_out[connection_id] = content
124             elif command_type == 'KILL_QUEUE':
125                 del self.queues_out[connection_id]
126             elif command_type == 'COMMAND':
127                 self.handle_input(content, connection_id)
128
129     def handle_input(self, input_, connection_id=None, store=True):
130         """Process input_ to command grammar, call command handler if found."""
131         from inspect import signature
132         import server_.game
133
134         def answer(connection_id, msg):
135             if connection_id:
136                 self.send(msg, connection_id)
137             else:
138                 print(msg)
139
140         try:
141             command = self.parser.parse(input_)
142             if command is None:
143                 answer(connection_id, 'UNHANDLED_INPUT')
144             else:
145                 if 'connection_id' in list(signature(command).parameters):
146                     command(connection_id=connection_id)
147                 else:
148                     command()
149                     if store:
150                         with open(self.game_file_name, 'a') as f:
151                             f.write(input_ + '\n')
152         except parser.ArgError as e:
153             answer(connection_id, 'ARGUMENT_ERROR ' + self.quote(str(e)))
154         except server_.game.GameError as e:
155             answer(connection_id, 'GAME_ERROR ' + self.quote(str(e)))
156
157     def send(self, msg, connection_id=None):
158         """Send message msg to server's client(s) via self.queues_out.
159
160         If a specific client is identified by connection_id, only
161         sends msg to that one. Else, sends it to all clients
162         identified in self.queues_out.
163
164         """
165         if connection_id:
166             self.queues_out[connection_id].put(msg)
167         else:
168             for connection_id in self.queues_out:
169                 self.queues_out[connection_id].put(msg)
170
171     def quote(self, string):
172         """Quote & escape string so client interprets it as single token."""
173         # FIXME: Don't do this as a method, makes no sense.
174         quoted = []
175         quoted += ['"']
176         for c in string:
177             if c in {'"', '\\'}:
178                 quoted += ['\\']
179             quoted += [c]
180         quoted += ['"']
181         return ''.join(quoted)
182
183
184 def run_server_with_io_loop(game):
185     """Run connection of server talking to clients and game IO loop.
186
187     We have the TCP server (an instance of Server) and we have the
188     game IO loop, a thread running Game.io.loop. Both communicate with
189     each other via a queue.Queue. While the TCP server may spawn
190     parallel threads to many clients, the IO loop works sequentially
191     through game commands received from the TCP server's threads (=
192     client connections to the TCP server). A processed command may
193     trigger messages to the commanding client or to all clients,
194     delivered from the IO loop to the TCP server via the queue.
195
196     """
197     q = queue.Queue()
198     c = threading.Thread(target=game.io.loop, daemon=True, args=(q,))
199     c.start()
200     server = Server(q)
201     try:
202         server.serve_forever()
203     except KeyboardInterrupt:
204         pass
205     finally:
206         print('Killing server')
207         server.server_close()