home · contact · privacy
Refactor, fix typo, handle exceptions in client.
[plomrogue2-experiments] / server.py
1 #!/usr/bin/env python3
2
3 import socketserver
4 import threading
5 import queue
6
7 # Avoid "Address already in use" errors.
8 socketserver.TCPServer.allow_reuse_address = True
9
10
11 class Server(socketserver.ThreadingTCPServer):
12     """Bind together threaded IO handling server and message queue."""
13
14     def __init__(self, queue, *args, **kwargs):
15         super().__init__(*args, **kwargs)
16         self.queue_out = queue
17         self.daemon_threads = True  # Else, server's threads have daemon=False.
18
19
20 class IO_Handler(socketserver.BaseRequestHandler):
21
22     def handle(self):
23         """Move messages between network socket and main thread via queues.
24
25         On start, sets up new queue, sends it via self.server.queue_out to
26         main thread, and from then on receives messages to send back from the
27         main thread via that new queue.
28
29         At the same time, loops over socket's recv to get messages from the
30         outside via self.server.queue_out into the main thread. Ends connection
31         once a 'QUIT' message is received from socket, and then also kills its
32         own queue.
33
34         All messages to the main thread are tuples, with the first element a
35         meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
36         deletion, and 'COMMAND' for everything else), the second element a UUID
37         that uniquely identifies the thread (so that the main thread knows whom
38         to send replies back to), and optionally a third element for further
39         instructions.
40         """
41         import plom_socket_io
42
43         def caught_send(socket, message):
44             """Send message by socket, catch broken socket connection error."""
45             try:
46                 plom_socket_io.send(socket, message)
47             except plom_socket_io.BrokenSocketConnection:
48                 pass
49
50         def send_queue_messages(socket, queue_in, thread_alive):
51             """Send messages via socket from queue_in while thread_alive[0]."""
52             while thread_alive[0]:
53                 try:
54                     msg = queue_in.get(timeout=1)
55                 except queue.Empty:
56                     continue
57                 caught_send(socket, msg)
58
59         import uuid
60         print('CONNECTION FROM:', str(self.client_address))
61         connection_id = uuid.uuid4()
62         queue_in = queue.Queue()
63         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
64         thread_alive = [True]
65         t = threading.Thread(target=send_queue_messages,
66                              args=(self.request, queue_in, thread_alive))
67         t.start()
68         for message in plom_socket_io.recv(self.request):
69             if message is None:
70                 caught_send(self.request, 'BAD MESSAGE')
71             elif 'QUIT' == message:
72                 caught_send(self.request, 'BYE')
73                 break
74             else:
75                 self.server.queue_out.put(('COMMAND', connection_id, message))
76         self.server.queue_out.put(('KILL_QUEUE', connection_id))
77         thread_alive[0] = False
78         print('CONNECTION CLOSED FROM:', str(self.client_address))
79         self.request.close()
80
81
82 class World:
83     turn = 0
84     map_ = 'xxxxx\nx...x\nx.X.x\nx...x\nxxxxx'
85     player_pos = (3, 3)
86
87
88 def fib(n):
89     """Calculate n-th Fibonacci number. Very inefficiently."""
90     if n in (1, 2):
91         return 1
92     else:
93         return fib(n-1) + fib(n-2)
94
95
96 class CommandHandler:
97
98     def __init__(self, queues_out):
99         from multiprocessing import Pool
100         self.queues_out = queues_out
101         self.pool = Pool()
102         self.world = World()
103         self.pool_result = None
104
105     def send_to(self, connection_id, msg):
106         """Send msg to client of connection_id."""
107         self.queues_out[connection_id].put(msg)
108
109     def send_all(self, msg):
110         """Send msg to all clients."""
111         for connection_id in self.queues_out:
112             self.send_to(connection_id, msg)
113
114     def cmd_fib(self, tokens, connection_id):
115         """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
116
117         Numbers are calculated in parallel as far as possible, using fib().
118         A 'CALCULATING …' message is sent to caller before the result.
119         """
120         fib_fail = 'MALFORMED FIB REQUEST'
121         if len(tokens) < 2:
122             self.send_to(connection_id, fib_fail)
123             return
124         numbers = []
125         for token in tokens[1:]:
126             if token != '0' and token.isdigit():
127                 numbers += [int(token)]
128             else:
129                 self.send_to(connection_id, fib_fail)
130                 return
131         self.send_to(connection_id, 'CALCULATING …')
132         results = self.pool.map(fib, numbers)
133         reply = ' '.join([str(r) for r in results])
134         self.send_to(connection_id, reply)
135
136     def cmd_inc(self, connection_id):
137         """Increment world.turn, send game turn data to everyone.
138
139         To simulate game processing waiting times, a one second delay between
140         TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
141         calculations are started as pool processes that need to be finished
142         until a further INC finishes the turn.
143         """
144         from time import sleep
145         if self.pool_result is not None:
146             self.pool_result.wait()
147         self.send_all('TURN_FINISHED ' + str(self.world.turn))
148         sleep(1)
149         self.world.turn += 1
150         self.send_all('NEW_TURN ' + str(self.world.turn))
151         self.send_all('TERRAIN\n' + self.world.map_)
152         self.send_all('POSITION_Y ' + str(self.world.player_pos[0]))
153         self.send_all('POSITION_X ' + str(self.world.player_pos[1]))
154         self.pool_result = self.pool.map_async(fib, (35, 35))
155
156     def cmd_get_turn(self, connection_id):
157         """Send world.turn to caller."""
158         self.send_to(connection_id, str(self.world.turn))
159
160     def cmd_echo(self, tokens, input_, connection_id):
161         """Send message in input_ beyond tokens[0] to caller."""
162         msg = input_[len(tokens[0]) + 1:]
163         self.send_to(connection_id, msg)
164
165     def cmd_all(self, tokens, input_):
166         """Send message in input_ beyond tokens[0] to all clients."""
167         msg = input_[len(tokens[0]) + 1:]
168         self.send_all(msg)
169
170     def handle_input(self, input_, connection_id):
171         """Process input_ to command grammar, call command handler if found."""
172         tokens = [token for token in input_.split(' ') if len(token) > 0]
173         if len(tokens) == 0:
174             self.send_to(connection_id, 'EMPTY COMMAND')
175         elif len(tokens) == 1 and tokens[0] == 'INC':
176             self.cmd_inc(connection_id)
177         elif len(tokens) == 1 and tokens[0] == 'GET_TURN':
178             self.cmd_get_turn(connection_id)
179         elif len(tokens) >= 1 and tokens[0] == 'ECHO':
180             self.cmd_echo(tokens, input_, connection_id)
181         elif len(tokens) >= 1 and tokens[0] == 'ALL':
182             self.cmd_all(tokens, input_)
183         elif len(tokens) >= 1 and tokens[0] == 'FIB':
184             # TODO: Should this really block the whole loop?
185             self.cmd_fib(tokens, connection_id)
186         else:
187             self.send_to(connection_id, 'UNKNOWN COMMAND')
188
189
190 def io_loop(q):
191     """Handle commands coming through queue q, send results back.
192
193     Commands from q are expected to be tuples, with the first element either
194     'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
195     an optional third element of arbitrary type. The UUID identifies a
196     receiver for replies.
197
198     An 'ADD_QUEUE' command should contain as third element a queue through
199     which to send messages back to the sender of the command. A 'KILL_QUEUE'
200     command removes the queue for that receiver from the list of queues through
201     which to send replies.
202
203     A 'COMMAND' command is specified in greater detail by a string that is the
204     tuple's third element. CommandHandler takes care of processing this and
205     sending out replies.
206     """
207     queues_out = {}
208     command_handler = CommandHandler(queues_out)
209     while True:
210         x = q.get()
211         command_type = x[0]
212         connection_id = x[1]
213         content = None if len(x) == 2 else x[2]
214         if command_type == 'ADD_QUEUE':
215             queues_out[connection_id] = content
216         elif command_type == 'COMMAND':
217             command_handler.handle_input(content, connection_id)
218         elif command_type == 'KILL_QUEUE':
219             del queues_out[connection_id]
220
221
222 q = queue.Queue()
223 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
224 c.start()
225 server = Server(q, ('localhost', 5000), IO_Handler)
226 try:
227     server.serve_forever()
228 except KeyboardInterrupt:
229     pass
230 finally:
231     print('Killing server')
232     server.server_close()