home · contact · privacy
Improve server by using queues.
[plomrogue2-experiments] / server.py
1 #!/usr/bin/env python3
2
3 import socketserver
4 import plom_socket_io
5 import threading
6 import time
7 import queue
8
9 # Avoid "Address already in use" errors.
10 socketserver.TCPServer.allow_reuse_address = True
11
12
13 class Server(socketserver.ThreadingTCPServer):
14     """Bind together threaded IO handling server and message queue."""
15
16     def __init__(self, queue, *args, **kwargs):
17         super().__init__(*args, **kwargs)
18         self.queue_out = queue
19         self.daemon_threads = True  # Else, server's threads have daemon=False.
20
21
22 def fib(n):
23     """Calculate n-th Fibonacci number. Very inefficiently."""
24     if n in (1, 2):
25         return 1
26     else:
27         return fib(n-1) + fib(n-2)
28
29
30 class IO_Handler(socketserver.BaseRequestHandler):
31
32     def handle(self):
33         """Move messages between network socket and main thread via queues.
34
35         On start, sets up new queue, sends it via self.server.queue_out to
36         main thread, and from then on receives messages to send back from the
37         main thread via that new queue.
38
39         At the same time, loops over socket's recv to get messages from the
40         outside via self.server.queue_out into the main thread. Ends connection
41         once a 'QUIT' message is received from socket, and then also kills its
42         own queue.
43
44         All messages to the main thread are tuples, with the first element a
45         meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
46         deletion, and 'COMMAND' for everything else), the second element a UUID
47         that uniquely identifies the thread (so that the main thread knows whom
48         to send replies back to), and optionally a third element for further
49         instructions.
50         """
51         def caught_send(socket, message):
52             """Send message by socket, catch broken socket connection error."""
53             try:
54                 plom_socket_io.send(socket, message)
55             except plom_socket_io.BrokenSocketConnection:
56                 pass
57
58         def send_queue_messages(socket, queue_in, thread_alive):
59             """Send messages via socket from queue_in while thread_alive[0]."""
60             while thread_alive[0]:
61                 try:
62                     msg = queue_in.get(timeout=1)
63                 except queue.Empty:
64                     continue
65                 caught_send(socket, msg)
66
67         import uuid
68         print('CONNECTION FROM:', str(self.client_address))
69         connection_id = uuid.uuid4()
70         queue_in = queue.Queue()
71         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
72         thread_alive = [True]
73         t = threading.Thread(target=send_queue_messages,
74                              args=(self.request, queue_in, thread_alive))
75         t.start()
76         for message in plom_socket_io.recv(self.request):
77             if message is None:
78                 caught_send(self.request, 'BAD MESSAGE')
79             elif 'QUIT' == message:
80                 caught_send(self.request, 'BYE')
81                 break
82             else:
83                 self.server.queue_out.put(('COMMAND', connection_id, message))
84         self.server.queue_out.put(('KILL_QUEUE', connection_id))
85         thread_alive[0] = False
86         print('CONNECTION CLOSED:', str(self.client_address))
87         self.request.close()
88
89
90 def io_loop(q):
91     """Handle commands coming through queue q, send results back. 
92
93     Commands from q are expected to be tuples, with the first element either
94     'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
95     an optional third element of arbitrary type. The UUID identifies a
96     receiver for replies.
97
98     An 'ADD_QUEUE' command should contain as third element a queue through
99     which to send messages back to the sender of the command. A 'KILL_QUEUE'
100     command removes the queue for that receiver from the list of queues through
101     which to send replies.
102
103     A 'COMMAND' command is specified in greater detail by a string that is the
104     tuple's third element. Here, the following commands are understood:
105     - A string starting with 'PRIVMSG' returns the space-separated tokens
106       following 'PRIVMSG' to the sender via its receiver queue.
107     - A string starting with 'ALL' sends the space-separated tokens following
108       'ALL' to all receiver queues.
109     - A string starting with 'FIB' followed by space-separated positive
110       integers returns to the receiver queue first a 'CALCULATING …' messsage,
111       and afterwards for each such integer n the n-th Fibonacci number as a
112       space-separated sequence of integers. Fibonacci numbers are calculated
113       in parallel if possible.
114     """
115     from multiprocessing import Pool
116     queues_out = {}
117     while True:
118         x = q.get()
119         command_type = x[0]
120         connection_id = x[1]
121         content = None if len(x) == 2 else x[2]
122         if command_type == 'ADD_QUEUE':
123             queues_out[connection_id] = content
124         elif command_type == 'COMMAND':
125             tokens = [token for token in content.split(' ') if len(token) > 0]
126             if len(tokens) == 0:
127                 queues_out[connection_id].put('EMPTY COMMAND')
128                 continue
129             if  tokens[0] == 'PRIVMSG':
130                 reply = ' '.join(tokens[1:])
131                 queues_out[connection_id].put(reply)
132             elif tokens[0] == 'ALL':
133                 reply = ' '.join(tokens[1:])
134                 for key in queues_out:
135                     queues_out[key].put(reply)
136             elif tokens[0] == 'FIB':
137                 fib_fail = 'MALFORMED FIB REQUEST'
138                 if len(tokens) < 2:
139                     queues_out[connection_id].put(fib_fail)
140                     continue
141                 numbers = []
142                 fail = False
143                 for token in tokens[1:]:
144                     if token != '0' and token.isdigit():
145                         numbers += [int(token)]
146                     else:
147                         queues_out[connection_id].put(fib_fail)
148                         fail = True
149                         break
150                 if fail:
151                     continue
152                 queues_out[connection_id].put('CALCULATING …')
153                 reply = ''
154                 # this blocks the whole loop, BAD
155                 with Pool(len(numbers)) as p:
156                     results = p.map(fib, numbers)
157                 reply = ' '.join([str(r) for r in results])
158                 queues_out[connection_id].put(reply)
159             else:
160                 queues_out[connection_id].put('UNKNOWN COMMAND')
161         elif command_type == 'KILL_QUEUE':
162             del queues_out[connection_id]
163
164
165 q = queue.Queue()
166 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
167 c.start()
168 server = Server(q, ('localhost', 5000), IO_Handler)
169 try:
170     server.serve_forever()
171 except KeyboardInterrupt:
172     pass
173 finally:
174     print('Killing server')
175     server.server_close()