home · contact · privacy
Some more experiments with threading.
[plomrogue2-experiments] / server.py
1 #!/usr/bin/env python3
2
3 import socketserver
4 import plom_socket_io
5 import threading
6 import time
7
8 # Avoid "Address already in use" errors.
9 socketserver.TCPServer.allow_reuse_address = True
10
11
12 class Server(socketserver.ThreadingTCPServer):
13     """Bind together threaded IO handling server and world state (counter)."""
14
15     def __init__(self, counter, *args, **kwargs):
16         super().__init__(*args, **kwargs)
17         self.counter = counter
18         self.daemon_threads = True  # Else, server's threads have daemon=False.
19
20
21 def fib(n):
22     """Calculate n-th Fibonacci number."""
23     if n in (1, 2):
24         return 1
25     else:
26         return fib(n-1) + fib(n-2)
27
28
29 class IO_Handler(socketserver.BaseRequestHandler):
30
31     def handle(self):
32         """Loop recv for input, send replies; also, send regular counter value.
33
34         If input is 'QUIT', send reply 'BYE' and end loop / connection.
35         Otherwise, use handle_message to interpret and enact commands.
36         """
37         def caught_send(socket, message):
38             """Send message by socket, catch broken socket connection error."""
39             try:
40                 plom_socket_io.send(socket, message)
41             except plom_socket_io.BrokenSocketConnection:
42                 pass
43
44         def send_counter_loop(socket, counter, kill):
45             """Every 5 seconds, send state of counter[0] until kill[0] set."""
46             while not kill[0]:
47                 caught_send(socket, 'COUNTER ' + str(counter[0]))
48                 time.sleep(5)
49
50         def handle_message(message):
51             """Evaluate message for tasks to perform, yield result.
52
53             Accepts one command: FIB, followed by positive integers, all tokens
54             separated by whitespace. Will calculate and return for each such
55             integer n the n-th Fibonacci number. Uses multiprocessing to
56             perform multiple such calculations in parallel. Yields a
57             'CALCULATING …' message before the calculation starts, and finally
58             yields a message containing the results. (The 'CALCULATING …'
59             message coming before the results message is currently the main
60             reason this works as a generator function using yield.)
61
62             When no command can be read into the message, just yields a 'NO
63             COMMAND UNDERSTOOD:', followed by the message.
64             """
65             from multiprocessing import Pool
66             tokens = message.split(' ')
67             if tokens[0] == 'FIB':
68                 msg_fail_fib = 'MALFORMED FIB REQUEST'
69                 if len(tokens) < 2:
70                     yield msg_fail_fib
71                     return
72                 numbers = []
73                 for token in tokens[1:]:
74                     if token != '0' and token.isdigit():
75                         numbers += [int(token)]
76                     elif token == '':
77                         continue
78                     else:
79                         yield msg_fail_fib
80                         return
81                 yield 'CALCULATING …'
82                 reply = ''
83                 with Pool(len(numbers)) as p:
84                     results = p.map(fib, numbers)
85                 reply = ' '.join([str(r) for r in results])
86                 yield reply
87                 return
88             yield 'NO COMMAND UNDERSTOOD: %s' % message
89
90         print('CONNECTION FROM:', str(self.client_address))
91         counter_loop_killer = [False]
92         send_count = threading.Thread(target=send_counter_loop,
93                                       kwargs={'counter': self.server.counter,
94                                               'socket': self.request,
95                                               'kill': counter_loop_killer})
96         send_count.start()
97         for message in plom_socket_io.recv(self.request):
98             if message is None:
99                 print('RECEIVED MALFORMED MESSAGE')
100                 caught_send(self.request, 'bad message')
101             elif 'QUIT' == message:
102                 caught_send(self.request, 'BYE')
103                 break
104             else:
105                 print('RECEIVED MESSAGE:', message)
106                 for reply in handle_message(message):
107                     caught_send(self.request, reply)
108         counter_loop_killer = [True]
109         print('CONNECTION CLOSED:', str(self.client_address))
110         self.request.close()
111
112
113 def inc_loop(counter, interval):
114     """Loop incrementing counter every interval seconds."""
115     while True:
116         time.sleep(interval)
117         counter[0] += 1
118
119
120 counter = [0]
121 b = threading.Thread(target=inc_loop, daemon=True, kwargs={'counter': counter,
122                                                            'interval': 1})
123 b.start()
124 server = Server(counter, ('localhost', 5000), IO_Handler)
125 try:
126     server.serve_forever()
127 except KeyboardInterrupt:
128     pass
129 finally:
130     print('Killing server')
131     server.server_close()