home · contact · privacy
Simulate world processing during INC calls.
[plomrogue2-experiments] / server.py
1 #!/usr/bin/env python3
2
3 import socketserver
4 import threading
5 import queue
6
7 # Avoid "Address already in use" errors.
8 socketserver.TCPServer.allow_reuse_address = True
9
10
11 class Server(socketserver.ThreadingTCPServer):
12     """Bind together threaded IO handling server and message queue."""
13
14     def __init__(self, queue, *args, **kwargs):
15         super().__init__(*args, **kwargs)
16         self.queue_out = queue
17         self.daemon_threads = True  # Else, server's threads have daemon=False.
18
19
20 class IO_Handler(socketserver.BaseRequestHandler):
21
22     def handle(self):
23         """Move messages between network socket and main thread via queues.
24
25         On start, sets up new queue, sends it via self.server.queue_out to
26         main thread, and from then on receives messages to send back from the
27         main thread via that new queue.
28
29         At the same time, loops over socket's recv to get messages from the
30         outside via self.server.queue_out into the main thread. Ends connection
31         once a 'QUIT' message is received from socket, and then also kills its
32         own queue.
33
34         All messages to the main thread are tuples, with the first element a
35         meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
36         deletion, and 'COMMAND' for everything else), the second element a UUID
37         that uniquely identifies the thread (so that the main thread knows whom
38         to send replies back to), and optionally a third element for further
39         instructions.
40         """
41         import plom_socket_io
42
43         def caught_send(socket, message):
44             """Send message by socket, catch broken socket connection error."""
45             try:
46                 plom_socket_io.send(socket, message)
47             except plom_socket_io.BrokenSocketConnection:
48                 pass
49
50         def send_queue_messages(socket, queue_in, thread_alive):
51             """Send messages via socket from queue_in while thread_alive[0]."""
52             while thread_alive[0]:
53                 try:
54                     msg = queue_in.get(timeout=1)
55                 except queue.Empty:
56                     continue
57                 caught_send(socket, msg)
58
59         import uuid
60         print('CONNECTION FROM:', str(self.client_address))
61         connection_id = uuid.uuid4()
62         queue_in = queue.Queue()
63         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
64         thread_alive = [True]
65         t = threading.Thread(target=send_queue_messages,
66                              args=(self.request, queue_in, thread_alive))
67         t.start()
68         for message in plom_socket_io.recv(self.request):
69             if message is None:
70                 caught_send(self.request, 'BAD MESSAGE')
71             elif 'QUIT' == message:
72                 caught_send(self.request, 'BYE')
73                 break
74             else:
75                 self.server.queue_out.put(('COMMAND', connection_id, message))
76         self.server.queue_out.put(('KILL_QUEUE', connection_id))
77         thread_alive[0] = False
78         print('CONNECTION CLOSED FROM:', str(self.client_address))
79         self.request.close()
80
81
82 class World:
83     turn = 0
84
85
86 def fib(n):
87     """Calculate n-th Fibonacci number. Very inefficiently."""
88     if n in (1, 2):
89         return 1
90     else:
91         return fib(n-1) + fib(n-2)
92
93
94 class CommandHandler:
95
96     def __init__(self, queues_out):
97         from multiprocessing import Pool
98         self.queues_out = queues_out
99         self.pool = Pool()
100         self.world = World()
101         self.pool_result = None
102
103     def send_to(self, connection_id, msg):
104         """Send msg to client of connection_id."""
105         self.queues_out[connection_id].put(msg)
106
107     def send_all(self, msg):
108         """Send msg to all clients."""
109         for connection_id in self.queues_out:
110             self.send_to(connection_id, msg)
111
112     def cmd_fib(self, tokens, connection_id):
113         """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
114
115         Numbers are calculated in parallel as far as possible, using fib().
116         A 'CALCULATING …' message is sent to caller before the result.
117         """
118         fib_fail = 'MALFORMED FIB REQUEST'
119         if len(tokens) < 2:
120             self.send_to(connection_id, fib_fail)
121             return
122         numbers = []
123         for token in tokens[1:]:
124             if token != '0' and token.isdigit():
125                 numbers += [int(token)]
126             else:
127                 self.send_to(connection_id, fib_fail)
128                 return
129         self.send_to(connection_id, 'CALCULATING …')
130         results = self.pool.map(fib, numbers)
131         reply = ' '.join([str(r) for r in results])
132         self.send_to(connection_id, reply)
133
134     def cmd_inc(self, connection_id):
135         """Increment world.turn, send TURN_FINISHED, NEW_TURN to everyone.
136
137         To simulate game processing waiting times, a one second delay between
138         TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
139         calculations are started as pool processes that need to be finished
140         until a further INC finishes the turn.
141         """
142         from time import sleep
143         if self.pool_result is not None:
144             self.pool_result.wait()
145         self.send_all('TURN_FINISHED ' + str(self.world.turn))
146         sleep(1)
147         self.world.turn += 1
148         self.send_all('NEW_TURN ' + str(self.world.turn))
149         self.pool_result = self.pool.map_async(fib, (35,35))
150
151     def cmd_get_turn(self, connection_id):
152         """Send world.turn to caller."""
153         self.send_to(connection_id, str(self.world.turn))
154
155     def cmd_echo(self, tokens, input_, connection_id):
156         """Send message in input_ beyond tokens[0] to caller."""
157         msg = input_[len(tokens[0]) + 1:]
158         self.send_to(connection_id, msg)
159
160     def cmd_all(self, tokens, input_):
161         """Send message in input_ beyond tokens[0] to all clients."""
162         msg = input_[len(tokens[0]) + 1:]
163         self.send_all(msg)
164
165     def handle_input(self, input_, connection_id):
166         """Process input_ to command grammar, call command handler if found."""
167         tokens = [token for token in input_.split(' ') if len(token) > 0]
168         if len(tokens) == 0:
169             self.send_to(connection_id, 'EMPTY COMMAND')
170         elif len(tokens) == 1 and tokens[0] == 'INC':
171             self.cmd_inc(connection_id)
172         elif len(tokens) == 1 and tokens[0] == 'GET_TURN':
173             self.cmd_get_turn(connection_id)
174         elif len(tokens) >= 1 and tokens[0] == 'ECHO':
175             self.cmd_echo(tokens, input_, connection_id)
176         elif len(tokens) >= 1 and tokens[0] == 'ALL':
177             self.cmd_all(tokens, input_)
178         elif len(tokens) >= 1 and tokens[0] == 'FIB':
179             # TODO: Should this really block the whole loop?
180             self.cmd_fib(tokens, connection_id)
181         else:
182             self.send_to(connection_id, 'UNKNOWN COMMAND')
183
184
185 def io_loop(q):
186     """Handle commands coming through queue q, send results back.
187
188     Commands from q are expected to be tuples, with the first element either
189     'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
190     an optional third element of arbitrary type. The UUID identifies a
191     receiver for replies.
192
193     An 'ADD_QUEUE' command should contain as third element a queue through
194     which to send messages back to the sender of the command. A 'KILL_QUEUE'
195     command removes the queue for that receiver from the list of queues through
196     which to send replies.
197
198     A 'COMMAND' command is specified in greater detail by a string that is the
199     tuple's third element. CommandHandler takes care of processing this and
200     sending out replies.
201     """
202     queues_out = {}
203     command_handler = CommandHandler(queues_out)
204     while True:
205         x = q.get()
206         command_type = x[0]
207         connection_id = x[1]
208         content = None if len(x) == 2 else x[2]
209         if command_type == 'ADD_QUEUE':
210             queues_out[connection_id] = content
211         elif command_type == 'COMMAND':
212             command_handler.handle_input(content, connection_id)
213         elif command_type == 'KILL_QUEUE':
214             del queues_out[connection_id]
215
216
217 q = queue.Queue()
218 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
219 c.start()
220 server = Server(q, ('localhost', 5000), IO_Handler)
221 try:
222     server.serve_forever()
223 except KeyboardInterrupt:
224     pass
225 finally:
226     print('Killing server')
227     server.server_close()