+def io_loop(q):
+ """Handle commands coming through queue q, send results back.
+
+ Commands from q are expected to be tuples, with the first element either
+ 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
+ an optional third element of arbitrary type. The UUID identifies a
+ receiver for replies.
+
+ An 'ADD_QUEUE' command should contain as third element a queue through
+ which to send messages back to the sender of the command. A 'KILL_QUEUE'
+ command removes the queue for that receiver from the list of queues through
+ which to send replies.
+
+ A 'COMMAND' command is specified in greater detail by a string that is the
+ tuple's third element. Here, the following commands are understood:
+ - A string starting with 'PRIVMSG' returns the space-separated tokens
+ following 'PRIVMSG' to the sender via its receiver queue.
+ - A string starting with 'ALL' sends the space-separated tokens following
+ 'ALL' to all receiver queues.
+ - A string starting with 'FIB' followed by space-separated positive
+ integers returns to the receiver queue first a 'CALCULATING …' messsage,
+ and afterwards for each such integer n the n-th Fibonacci number as a
+ space-separated sequence of integers. Fibonacci numbers are calculated
+ in parallel if possible.
+ """
+ from multiprocessing import Pool
+ queues_out = {}
+ while True:
+ x = q.get()
+ command_type = x[0]
+ connection_id = x[1]
+ content = None if len(x) == 2 else x[2]
+ if command_type == 'ADD_QUEUE':
+ queues_out[connection_id] = content
+ elif command_type == 'COMMAND':
+ tokens = [token for token in content.split(' ') if len(token) > 0]
+ if len(tokens) == 0:
+ queues_out[connection_id].put('EMPTY COMMAND')
+ continue
+ if tokens[0] == 'PRIVMSG':
+ reply = ' '.join(tokens[1:])
+ queues_out[connection_id].put(reply)
+ elif tokens[0] == 'ALL':
+ reply = ' '.join(tokens[1:])
+ for key in queues_out:
+ queues_out[key].put(reply)
+ elif tokens[0] == 'FIB':
+ fib_fail = 'MALFORMED FIB REQUEST'
+ if len(tokens) < 2:
+ queues_out[connection_id].put(fib_fail)
+ continue
+ numbers = []
+ fail = False
+ for token in tokens[1:]:
+ if token != '0' and token.isdigit():
+ numbers += [int(token)]
+ else:
+ queues_out[connection_id].put(fib_fail)
+ fail = True
+ break
+ if fail:
+ continue
+ queues_out[connection_id].put('CALCULATING …')
+ reply = ''
+ # this blocks the whole loop, BAD
+ with Pool(len(numbers)) as p:
+ results = p.map(fib, numbers)
+ reply = ' '.join([str(r) for r in results])
+ queues_out[connection_id].put(reply)
+ else:
+ queues_out[connection_id].put('UNKNOWN COMMAND')
+ elif command_type == 'KILL_QUEUE':
+ del queues_out[connection_id]
+
+
+q = queue.Queue()
+c = threading.Thread(target=io_loop, daemon=True, args=(q,))
+c.start()
+server = Server(q, ('localhost', 5000), IO_Handler)