home · contact · privacy
Extend INC command results.
[plomrogue2-experiments] / server.py
1 #!/usr/bin/env python3
2
3 import socketserver
4 import threading
5 import queue
6
7 # Avoid "Address already in use" errors.
8 socketserver.TCPServer.allow_reuse_address = True
9
10
11 class Server(socketserver.ThreadingTCPServer):
12     """Bind together threaded IO handling server and message queue."""
13
14     def __init__(self, queue, *args, **kwargs):
15         super().__init__(*args, **kwargs)
16         self.queue_out = queue
17         self.daemon_threads = True  # Else, server's threads have daemon=False.
18
19
20 class IO_Handler(socketserver.BaseRequestHandler):
21
22     def handle(self):
23         """Move messages between network socket and main thread via queues.
24
25         On start, sets up new queue, sends it via self.server.queue_out to
26         main thread, and from then on receives messages to send back from the
27         main thread via that new queue.
28
29         At the same time, loops over socket's recv to get messages from the
30         outside via self.server.queue_out into the main thread. Ends connection
31         once a 'QUIT' message is received from socket, and then also kills its
32         own queue.
33
34         All messages to the main thread are tuples, with the first element a
35         meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
36         deletion, and 'COMMAND' for everything else), the second element a UUID
37         that uniquely identifies the thread (so that the main thread knows whom
38         to send replies back to), and optionally a third element for further
39         instructions.
40         """
41         import plom_socket_io
42
43         def caught_send(socket, message):
44             """Send message by socket, catch broken socket connection error."""
45             try:
46                 plom_socket_io.send(socket, message)
47             except plom_socket_io.BrokenSocketConnection:
48                 pass
49
50         def send_queue_messages(socket, queue_in, thread_alive):
51             """Send messages via socket from queue_in while thread_alive[0]."""
52             while thread_alive[0]:
53                 try:
54                     msg = queue_in.get(timeout=1)
55                 except queue.Empty:
56                     continue
57                 caught_send(socket, msg)
58
59         import uuid
60         print('CONNECTION FROM:', str(self.client_address))
61         connection_id = uuid.uuid4()
62         queue_in = queue.Queue()
63         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
64         thread_alive = [True]
65         t = threading.Thread(target=send_queue_messages,
66                              args=(self.request, queue_in, thread_alive))
67         t.start()
68         for message in plom_socket_io.recv(self.request):
69             if message is None:
70                 caught_send(self.request, 'BAD MESSAGE')
71             elif 'QUIT' == message:
72                 caught_send(self.request, 'BYE')
73                 break
74             else:
75                 self.server.queue_out.put(('COMMAND', connection_id, message))
76         self.server.queue_out.put(('KILL_QUEUE', connection_id))
77         thread_alive[0] = False
78         print('CONNECTION CLOSED FROM:', str(self.client_address))
79         self.request.close()
80
81
82 class World:
83     turn = 0
84
85
86 def fib(n):
87     """Calculate n-th Fibonacci number. Very inefficiently."""
88     if n in (1, 2):
89         return 1
90     else:
91         return fib(n-1) + fib(n-2)
92
93
94 class CommandHandler:
95
96     def __init__(self, world, queues_out):
97         self.world = world
98         self.queues_out = queues_out
99
100     def send_to(self, connection_id, msg):
101         """Send msg to client of connection_id."""
102         self.queues_out[connection_id].put(msg)
103
104     def send_all(self, msg):
105         """Send msg to all clients."""
106         for connection_id in self.queues_out:
107             self.send_to(connection_id, msg)
108
109     def cmd_fib(self, tokens, connection_id):
110         """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
111
112         Numbers are calculated in parallel as far as possible, using fib().
113         A 'CALCULATING …' message is sent to caller before the result.
114         """
115         from multiprocessing import Pool
116         fib_fail = 'MALFORMED FIB REQUEST'
117         if len(tokens) < 2:
118             self.send_to(connection_id, fib_fail)
119             return
120         numbers = []
121         for token in tokens[1:]:
122             if token != '0' and token.isdigit():
123                 numbers += [int(token)]
124             else:
125                 self.send_to(connection_id, fib_fail)
126                 return
127         self.send_to(connection_id, 'CALCULATING …')
128         with Pool(len(numbers)) as p:
129             results = p.map(fib, numbers)
130         reply = ' '.join([str(r) for r in results])
131         self.send_to(connection_id, reply)
132
133     def cmd_inc(self, connection_id):
134         """Increment world.turn, send TURN_FINISHED, NEW_TURN to everyone."""
135         self.send_all('TURN_FINISHED ' + str(self.world.turn))
136         self.world.turn += 1
137         self.send_all('NEW_TURN ' + str(self.world.turn))
138
139     def cmd_get_turn(self, connection_id):
140         """Send world.turn to caller."""
141         self.send_to(connection_id, str(self.world.turn))
142
143     def cmd_echo(self, tokens, input_, connection_id):
144         """Send message in input_ beyond tokens[0] to caller."""
145         msg = input_[len(tokens[0]) + 1:]
146         self.send_to(connection_id, msg)
147
148     def cmd_all(self, tokens, input_):
149         """Send message in input_ beyond tokens[0] to all clients."""
150         msg = input_[len(tokens[0]) + 1:]
151         self.send_all(msg)
152
153     def handle_input(self, input_, connection_id):
154         """Process input_ to command grammar, call command handler if found."""
155         tokens = [token for token in input_.split(' ') if len(token) > 0]
156         if len(tokens) == 0:
157             self.send_to(connection_id, 'EMPTY COMMAND')
158         elif len(tokens) == 1 and tokens[0] == 'INC':
159             self.cmd_inc(connection_id)
160         elif len(tokens) == 1 and tokens[0] == 'GET_TURN':
161             self.cmd_get_turn(connection_id)
162         elif len(tokens) >= 1 and tokens[0] == 'ECHO':
163             self.cmd_echo(tokens, input_, connection_id)
164         elif len(tokens) >= 1 and tokens[0] == 'ALL':
165             self.cmd_all(tokens, input_)
166         elif len(tokens) >= 1 and tokens[0] == 'FIB':
167             # TODO: Should this really block the whole loop?
168             self.cmd_fib(tokens, connection_id)
169         else:
170             self.send_to(connection_id, 'UNKNOWN COMMAND')
171
172
173 def io_loop(q):
174     """Handle commands coming through queue q, send results back.
175
176     Commands from q are expected to be tuples, with the first element either
177     'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
178     an optional third element of arbitrary type. The UUID identifies a
179     receiver for replies.
180
181     An 'ADD_QUEUE' command should contain as third element a queue through
182     which to send messages back to the sender of the command. A 'KILL_QUEUE'
183     command removes the queue for that receiver from the list of queues through
184     which to send replies.
185
186     A 'COMMAND' command is specified in greater detail by a string that is the
187     tuple's third element. CommandHandler takes care of processing this and
188     sending out replies.
189     """
190     queues_out = {}
191     world = World()
192     command_handler = CommandHandler(world, queues_out)
193     while True:
194         x = q.get()
195         command_type = x[0]
196         connection_id = x[1]
197         content = None if len(x) == 2 else x[2]
198         if command_type == 'ADD_QUEUE':
199             queues_out[connection_id] = content
200         elif command_type == 'COMMAND':
201             command_handler.handle_input(content, connection_id)
202         elif command_type == 'KILL_QUEUE':
203             del queues_out[connection_id]
204
205
206 q = queue.Queue()
207 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
208 c.start()
209 server = Server(q, ('localhost', 5000), IO_Handler)
210 try:
211     server.serve_forever()
212 except KeyboardInterrupt:
213     pass
214 finally:
215     print('Killing server')
216     server.server_close()