home · contact · privacy
Add MAP_SIZE reply, add constraint checks.
[plomrogue2-experiments] / server.py
1 #!/usr/bin/env python3
2
3 import socketserver
4 import threading
5 import queue
6
7 # Avoid "Address already in use" errors.
8 socketserver.TCPServer.allow_reuse_address = True
9
10
11 class Server(socketserver.ThreadingTCPServer):
12     """Bind together threaded IO handling server and message queue."""
13
14     def __init__(self, queue, *args, **kwargs):
15         super().__init__(*args, **kwargs)
16         self.queue_out = queue
17         self.daemon_threads = True  # Else, server's threads have daemon=False.
18
19
20 class IO_Handler(socketserver.BaseRequestHandler):
21
22     def handle(self):
23         """Move messages between network socket and main thread via queues.
24
25         On start, sets up new queue, sends it via self.server.queue_out to
26         main thread, and from then on receives messages to send back from the
27         main thread via that new queue.
28
29         At the same time, loops over socket's recv to get messages from the
30         outside via self.server.queue_out into the main thread. Ends connection
31         once a 'QUIT' message is received from socket, and then also kills its
32         own queue.
33
34         All messages to the main thread are tuples, with the first element a
35         meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
36         deletion, and 'COMMAND' for everything else), the second element a UUID
37         that uniquely identifies the thread (so that the main thread knows whom
38         to send replies back to), and optionally a third element for further
39         instructions.
40         """
41         import plom_socket_io
42
43         def caught_send(socket, message):
44             """Send message by socket, catch broken socket connection error."""
45             try:
46                 plom_socket_io.send(socket, message)
47             except plom_socket_io.BrokenSocketConnection:
48                 pass
49
50         def send_queue_messages(socket, queue_in, thread_alive):
51             """Send messages via socket from queue_in while thread_alive[0]."""
52             while thread_alive[0]:
53                 try:
54                     msg = queue_in.get(timeout=1)
55                 except queue.Empty:
56                     continue
57                 caught_send(socket, msg)
58
59         import uuid
60         print('CONNECTION FROM:', str(self.client_address))
61         connection_id = uuid.uuid4()
62         queue_in = queue.Queue()
63         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
64         thread_alive = [True]
65         t = threading.Thread(target=send_queue_messages,
66                              args=(self.request, queue_in, thread_alive))
67         t.start()
68         for message in plom_socket_io.recv(self.request):
69             if message is None:
70                 caught_send(self.request, 'BAD MESSAGE')
71             elif 'QUIT' == message:
72                 caught_send(self.request, 'BYE')
73                 break
74             else:
75                 self.server.queue_out.put(('COMMAND', connection_id, message))
76         self.server.queue_out.put(('KILL_QUEUE', connection_id))
77         thread_alive[0] = False
78         print('CONNECTION CLOSED FROM:', str(self.client_address))
79         self.request.close()
80
81
82 class World:
83     turn = 0
84     map_size = (5, 5)
85     map_ = 'xxxxx\nx...x\nx.X.x\nx...x\nxxxxx'
86     player_pos = (3, 3)
87
88
89 def fib(n):
90     """Calculate n-th Fibonacci number. Very inefficiently."""
91     if n in (1, 2):
92         return 1
93     else:
94         return fib(n-1) + fib(n-2)
95
96
97 class CommandHandler:
98
99     def __init__(self, queues_out):
100         from multiprocessing import Pool
101         self.queues_out = queues_out
102         self.pool = Pool()
103         self.world = World()
104         self.pool_result = None
105
106     def send_to(self, connection_id, msg):
107         """Send msg to client of connection_id."""
108         self.queues_out[connection_id].put(msg)
109
110     def send_all(self, msg):
111         """Send msg to all clients."""
112         for connection_id in self.queues_out:
113             self.send_to(connection_id, msg)
114
115     def cmd_fib(self, tokens, connection_id):
116         """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
117
118         Numbers are calculated in parallel as far as possible, using fib().
119         A 'CALCULATING …' message is sent to caller before the result.
120         """
121         fib_fail = 'MALFORMED FIB REQUEST'
122         if len(tokens) < 2:
123             self.send_to(connection_id, fib_fail)
124             return
125         numbers = []
126         for token in tokens[1:]:
127             if token != '0' and token.isdigit():
128                 numbers += [int(token)]
129             else:
130                 self.send_to(connection_id, fib_fail)
131                 return
132         self.send_to(connection_id, 'CALCULATING …')
133         results = self.pool.map(fib, numbers)
134         reply = ' '.join([str(r) for r in results])
135         self.send_to(connection_id, reply)
136
137     def cmd_inc(self, connection_id):
138         """Increment world.turn, send game turn data to everyone.
139
140         To simulate game processing waiting times, a one second delay between
141         TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
142         calculations are started as pool processes that need to be finished
143         until a further INC finishes the turn.
144         """
145         from time import sleep
146
147         def stringify_yx(tuple_):
148             return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
149
150         if self.pool_result is not None:
151             self.pool_result.wait()
152         self.send_all('TURN_FINISHED ' + str(self.world.turn))
153         sleep(1)
154         self.world.turn += 1
155         self.send_all('NEW_TURN ' + str(self.world.turn))
156         self.send_all('MAP_SIZE ' + stringify_yx(self.world.map_size))
157         self.send_all('TERRAIN\n' + self.world.map_)
158         self.send_all('POSITION ' + stringify_yx(self.world.player_pos))
159         self.pool_result = self.pool.map_async(fib, (35, 35))
160
161     def cmd_get_turn(self, connection_id):
162         """Send world.turn to caller."""
163         self.send_to(connection_id, str(self.world.turn))
164
165     def cmd_echo(self, tokens, input_, connection_id):
166         """Send message in input_ beyond tokens[0] to caller."""
167         msg = input_[len(tokens[0]) + 1:]
168         self.send_to(connection_id, msg)
169
170     def cmd_all(self, tokens, input_):
171         """Send message in input_ beyond tokens[0] to all clients."""
172         msg = input_[len(tokens[0]) + 1:]
173         self.send_all(msg)
174
175     def handle_input(self, input_, connection_id):
176         """Process input_ to command grammar, call command handler if found."""
177         tokens = [token for token in input_.split(' ') if len(token) > 0]
178         if len(tokens) == 0:
179             self.send_to(connection_id, 'EMPTY COMMAND')
180         elif len(tokens) == 1 and tokens[0] == 'INC':
181             self.cmd_inc(connection_id)
182         elif len(tokens) == 1 and tokens[0] == 'GET_TURN':
183             self.cmd_get_turn(connection_id)
184         elif len(tokens) >= 1 and tokens[0] == 'ECHO':
185             self.cmd_echo(tokens, input_, connection_id)
186         elif len(tokens) >= 1 and tokens[0] == 'ALL':
187             self.cmd_all(tokens, input_)
188         elif len(tokens) >= 1 and tokens[0] == 'FIB':
189             # TODO: Should this really block the whole loop?
190             self.cmd_fib(tokens, connection_id)
191         else:
192             self.send_to(connection_id, 'UNKNOWN COMMAND')
193
194
195 def io_loop(q):
196     """Handle commands coming through queue q, send results back.
197
198     Commands from q are expected to be tuples, with the first element either
199     'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
200     an optional third element of arbitrary type. The UUID identifies a
201     receiver for replies.
202
203     An 'ADD_QUEUE' command should contain as third element a queue through
204     which to send messages back to the sender of the command. A 'KILL_QUEUE'
205     command removes the queue for that receiver from the list of queues through
206     which to send replies.
207
208     A 'COMMAND' command is specified in greater detail by a string that is the
209     tuple's third element. CommandHandler takes care of processing this and
210     sending out replies.
211     """
212     queues_out = {}
213     command_handler = CommandHandler(queues_out)
214     while True:
215         x = q.get()
216         command_type = x[0]
217         connection_id = x[1]
218         content = None if len(x) == 2 else x[2]
219         if command_type == 'ADD_QUEUE':
220             queues_out[connection_id] = content
221         elif command_type == 'COMMAND':
222             command_handler.handle_input(content, connection_id)
223         elif command_type == 'KILL_QUEUE':
224             del queues_out[connection_id]
225
226
227 q = queue.Queue()
228 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
229 c.start()
230 server = Server(q, ('localhost', 5000), IO_Handler)
231 try:
232     server.serve_forever()
233 except KeyboardInterrupt:
234     pass
235 finally:
236     print('Killing server')
237     server.server_close()