home · contact · privacy
Refactor.
[plomrogue2-experiments] / server_ / io.py
1 import socketserver
2 import threading
3 import queue
4
5
6 # Avoid "Address already in use" errors.
7 socketserver.TCPServer.allow_reuse_address = True
8
9
10 # Our default server port.
11 SERVER_PORT=5000
12
13
14 class Server(socketserver.ThreadingTCPServer):
15     """Bind together threaded IO handling server and message queue."""
16
17     def __init__(self, queue, *args, **kwargs):
18         super().__init__(('localhost', SERVER_PORT), IO_Handler, *args, **kwargs)
19         self.queue_out = queue
20         self.daemon_threads = True  # Else, server's threads have daemon=False.
21
22
23 class IO_Handler(socketserver.BaseRequestHandler):
24
25     def handle(self):
26         """Move messages between network socket and game IO loop via queues.
27
28         On start (a new connection from client to server), sets up a
29         new queue, sends it via self.server.queue_out to the game IO
30         loop thread, and from then on receives messages to send back
31         from the game IO loop via that new queue.
32
33         At the same time, loops over socket's recv to get messages
34         from the outside via self.server.queue_out into the game IO
35         loop. Ends connection once a 'QUIT' message is received from
36         socket, and then also calls for a kill of its own queue.
37
38         All messages to the game IO loop are tuples, with the first
39         element a meta command ('ADD_QUEUE' for queue creation,
40         'KILL_QUEUE' for queue deletion, and 'COMMAND' for everything
41         else), the second element a UUID that uniquely identifies the
42         thread (so that the game IO loop knows whom to send replies
43         back to), and optionally a third element for further
44         instructions.
45
46         """
47         import plom_socket_io
48
49         def caught_send(socket, message):
50             """Send message by socket, catch broken socket connection error."""
51             try:
52                 plom_socket_io.send(socket, message)
53             except plom_socket_io.BrokenSocketConnection:
54                 pass
55
56         def send_queue_messages(socket, queue_in, thread_alive):
57             """Send messages via socket from queue_in while thread_alive[0]."""
58             while thread_alive[0]:
59                 try:
60                     msg = queue_in.get(timeout=1)
61                 except queue.Empty:
62                     continue
63                 caught_send(socket, msg)
64
65         import uuid
66         print('CONNECTION FROM:', str(self.client_address))
67         connection_id = uuid.uuid4()
68         queue_in = queue.Queue()
69         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
70         thread_alive = [True]
71         t = threading.Thread(target=send_queue_messages,
72                              args=(self.request, queue_in, thread_alive))
73         t.start()
74         for message in plom_socket_io.recv(self.request):
75             if message is None:
76                 caught_send(self.request, 'BAD MESSAGE')
77             elif 'QUIT' == message:
78                 caught_send(self.request, 'BYE')
79                 break
80             else:
81                 self.server.queue_out.put(('COMMAND', connection_id, message))
82         self.server.queue_out.put(('KILL_QUEUE', connection_id))
83         thread_alive[0] = False
84         print('CONNECTION CLOSED FROM:', str(self.client_address))
85         self.request.close()
86
87
88 def io_loop(q, game_command_handler):
89     """Handle commands coming through queue q, send results back.
90
91     Commands from q are expected to be tuples, with the first element
92     either 'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element
93     a UUID, and an optional third element of arbitrary type. The UUID
94     identifies a receiver for replies.
95
96     An 'ADD_QUEUE' command should contain as third element a queue
97     through which to send messages back to the sender of the
98     command. A 'KILL_QUEUE' command removes the queue for that
99     receiver from the list of queues through which to send replies.
100
101     A 'COMMAND' command is specified in greater detail by a string
102     that is the tuple's third element. The game_command_handler takes
103     care of processing this and sending out replies.
104
105     """
106     while True:
107         x = q.get()
108         command_type = x[0]
109         connection_id = x[1]
110         content = None if len(x) == 2 else x[2]
111         if command_type == 'ADD_QUEUE':
112             game_command_handler.queues_out[connection_id] = content
113         elif command_type == 'COMMAND':
114             game_command_handler.handle_input(content, connection_id)
115         elif command_type == 'KILL_QUEUE':
116             del game_command_handler.queues_out[connection_id]
117
118
119 def run_server_with_io_loop(command_handler):
120     """Run connection of server talking to clients and game IO loop.
121
122     We have the TCP server (an instance of Server) and we have the
123     game IO loop, a thread running io_loop. Both communicate with each
124     other via a queue.Queue. While the TCP server may spawn parallel
125     threads to many clients, the IO loop works sequentially through
126     game commands received from the TCP server's threads (= client
127     connections to the TCP server), calling command_handler to process
128     them. A processed command may trigger messages to the commanding
129     client or to all clients, delivered from the IO loop to the TCP
130     server via the queue.
131
132     """
133     q = queue.Queue()
134     c = threading.Thread(target=io_loop, daemon=True, args=(q, command_handler))
135     c.start()
136     server = Server(q)
137     try:
138         server.serve_forever()
139     except KeyboardInterrupt:
140         pass
141     finally:
142         print('Killing server')
143         server.server_close()