home · contact · privacy
Specify widget classes, names by their use.
[plomrogue2-experiments] / server.py
1 #!/usr/bin/env python3
2
3 import socketserver
4 import threading
5 import queue
6
7 # Avoid "Address already in use" errors.
8 socketserver.TCPServer.allow_reuse_address = True
9
10
11 class Server(socketserver.ThreadingTCPServer):
12     """Bind together threaded IO handling server and message queue."""
13
14     def __init__(self, queue, *args, **kwargs):
15         super().__init__(*args, **kwargs)
16         self.queue_out = queue
17         self.daemon_threads = True  # Else, server's threads have daemon=False.
18
19
20 class IO_Handler(socketserver.BaseRequestHandler):
21
22     def handle(self):
23         """Move messages between network socket and main thread via queues.
24
25         On start, sets up new queue, sends it via self.server.queue_out to
26         main thread, and from then on receives messages to send back from the
27         main thread via that new queue.
28
29         At the same time, loops over socket's recv to get messages from the
30         outside via self.server.queue_out into the main thread. Ends connection
31         once a 'QUIT' message is received from socket, and then also kills its
32         own queue.
33
34         All messages to the main thread are tuples, with the first element a
35         meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
36         deletion, and 'COMMAND' for everything else), the second element a UUID
37         that uniquely identifies the thread (so that the main thread knows whom
38         to send replies back to), and optionally a third element for further
39         instructions.
40         """
41         import plom_socket_io
42
43         def caught_send(socket, message):
44             """Send message by socket, catch broken socket connection error."""
45             try:
46                 plom_socket_io.send(socket, message)
47             except plom_socket_io.BrokenSocketConnection:
48                 pass
49
50         def send_queue_messages(socket, queue_in, thread_alive):
51             """Send messages via socket from queue_in while thread_alive[0]."""
52             while thread_alive[0]:
53                 try:
54                     msg = queue_in.get(timeout=1)
55                 except queue.Empty:
56                     continue
57                 caught_send(socket, msg)
58
59         import uuid
60         print('CONNECTION FROM:', str(self.client_address))
61         connection_id = uuid.uuid4()
62         queue_in = queue.Queue()
63         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
64         thread_alive = [True]
65         t = threading.Thread(target=send_queue_messages,
66                              args=(self.request, queue_in, thread_alive))
67         t.start()
68         for message in plom_socket_io.recv(self.request):
69             if message is None:
70                 caught_send(self.request, 'BAD MESSAGE')
71             elif 'QUIT' == message:
72                 caught_send(self.request, 'BYE')
73                 break
74             else:
75                 self.server.queue_out.put(('COMMAND', connection_id, message))
76         self.server.queue_out.put(('KILL_QUEUE', connection_id))
77         thread_alive[0] = False
78         print('CONNECTION CLOSED FROM:', str(self.client_address))
79         self.request.close()
80
81
82 class World:
83     turn = 0
84     map_size = (5, 5)
85     map_ = 'xxxxx\n'+\
86            'x...x\n'+\
87            'x.X.x\n'+\
88            'x...x\n'+\
89            'xxxxx'
90     player_pos = [3, 3]
91
92
93 def fib(n):
94     """Calculate n-th Fibonacci number. Very inefficiently."""
95     if n in (1, 2):
96         return 1
97     else:
98         return fib(n-1) + fib(n-2)
99
100
101 class ArgumentError(Exception):
102     pass
103
104
105 class CommandHandler:
106
107     def __init__(self, queues_out):
108         from multiprocessing import Pool
109         self.queues_out = queues_out
110         self.pool = Pool()
111         self.world = World()
112         self.pool_result = None
113
114     def send_to(self, connection_id, msg):
115         """Send msg to client of connection_id."""
116         self.queues_out[connection_id].put(msg)
117
118     def send_all(self, msg):
119         """Send msg to all clients."""
120         for connection_id in self.queues_out:
121             self.send_to(connection_id, msg)
122
123     def stringify_yx(self, tuple_):
124         """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
125         return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
126
127     def cmd_fib(self, tokens, connection_id):
128         """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
129
130         Numbers are calculated in parallel as far as possible, using fib().
131         A 'CALCULATING …' message is sent to caller before the result.
132         """
133         if len(tokens) < 2:
134             raise ArgumentError('FIB NEEDS AT LEAST ONE ARGUMENT')
135         numbers = []
136         for token in tokens[1:]:
137             if token == '0' or not token.isdigit():
138                 raise ArgumentError('FIB ARGUMENTS MUST BE INTEGERS > 0')
139             numbers += [int(token)]
140         self.send_to(connection_id, 'CALCULATING …')
141         results = self.pool.map(fib, numbers)
142         reply = ' '.join([str(r) for r in results])
143         self.send_to(connection_id, reply)
144
145     def cmd_inc(self, connection_id):
146         """Increment world.turn, send game turn data to everyone.
147
148         To simulate game processing waiting times, a one second delay between
149         TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
150         calculations are started as pool processes that need to be finished
151         until a further INC finishes the turn.
152         """
153         from time import sleep
154         if self.pool_result is not None:
155             self.pool_result.wait()
156         self.send_all('TURN_FINISHED ' + str(self.world.turn))
157         sleep(1)
158         self.world.turn += 1
159         self.send_all('NEW_TURN ' + str(self.world.turn))
160         self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
161         self.send_all('TERRAIN\n' + self.world.map_)
162         self.send_all('POSITION ' + self.stringify_yx(self.world.player_pos))
163         self.pool_result = self.pool.map_async(fib, (35, 35))
164
165     def cmd_get_turn(self, connection_id):
166         """Send world.turn to caller."""
167         self.send_to(connection_id, str(self.world.turn))
168
169     def cmd_move(self, direction):
170         """Move player 'UP' or 'DOWN' depending on direction string."""
171         if not direction in {'UP', 'DOWN'}:
172             raise ArgumentError('MOVE ARGUMENT MUST BE "UP" or "DOWN"')
173         if direction == 'UP':
174             self.world.player_pos[0] -= 1
175         else:
176             self.world.player_pos[0] += 1
177         self.send_all('POSITION ' + self.stringify_yx(self.world.player_pos))
178
179     def cmd_echo(self, tokens, input_, connection_id):
180         """Send message in input_ beyond tokens[0] to caller."""
181         msg = input_[len(tokens[0]) + 1:]
182         self.send_to(connection_id, msg)
183
184     def cmd_all(self, tokens, input_):
185         """Send message in input_ beyond tokens[0] to all clients."""
186         msg = input_[len(tokens[0]) + 1:]
187         self.send_all(msg)
188
189     def handle_input(self, input_, connection_id):
190         """Process input_ to command grammar, call command handler if found."""
191         tokens = [token for token in input_.split(' ') if len(token) > 0]
192         try:
193             if len(tokens) == 0:
194                 self.send_to(connection_id, 'EMPTY COMMAND')
195             elif len(tokens) == 1 and tokens[0] == 'INC':
196                 self.cmd_inc(connection_id)
197             elif len(tokens) == 1 and tokens[0] == 'GET_TURN':
198                 self.cmd_get_turn(connection_id)
199             elif len(tokens) == 2 and tokens[0] == 'MOVE':
200                 self.cmd_move(tokens[1])
201             elif len(tokens) >= 1 and tokens[0] == 'ECHO':
202                 self.cmd_echo(tokens, input_, connection_id)
203             elif len(tokens) >= 1 and tokens[0] == 'ALL':
204                 self.cmd_all(tokens, input_)
205             elif len(tokens) >= 1 and tokens[0] == 'FIB':
206                 # TODO: Should this really block the whole loop?
207                 self.cmd_fib(tokens, connection_id)
208             else:
209                 self.send_to(connection_id, 'UNKNOWN COMMAND')
210         except ArgumentError as e:
211             self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
212
213
214 def io_loop(q):
215     """Handle commands coming through queue q, send results back.
216
217     Commands from q are expected to be tuples, with the first element either
218     'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
219     an optional third element of arbitrary type. The UUID identifies a
220     receiver for replies.
221
222     An 'ADD_QUEUE' command should contain as third element a queue through
223     which to send messages back to the sender of the command. A 'KILL_QUEUE'
224     command removes the queue for that receiver from the list of queues through
225     which to send replies.
226
227     A 'COMMAND' command is specified in greater detail by a string that is the
228     tuple's third element. CommandHandler takes care of processing this and
229     sending out replies.
230     """
231     queues_out = {}
232     command_handler = CommandHandler(queues_out)
233     while True:
234         x = q.get()
235         command_type = x[0]
236         connection_id = x[1]
237         content = None if len(x) == 2 else x[2]
238         if command_type == 'ADD_QUEUE':
239             queues_out[connection_id] = content
240         elif command_type == 'COMMAND':
241             command_handler.handle_input(content, connection_id)
242         elif command_type == 'KILL_QUEUE':
243             del queues_out[connection_id]
244
245
246 q = queue.Queue()
247 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
248 c.start()
249 server = Server(q, ('localhost', 5000), IO_Handler)
250 try:
251     server.serve_forever()
252 except KeyboardInterrupt:
253     pass
254 finally:
255     print('Killing server')
256     server.server_close()