home · contact · privacy
Refactor.
[plomrogue2-experiments] / server.py
1 #!/usr/bin/env python3
2
3 import socketserver
4 import threading
5 import queue
6 from parser import ArgError, Parser
7 from server_.game import World, GameError
8
9
10 # Avoid "Address already in use" errors.
11 socketserver.TCPServer.allow_reuse_address = True
12
13
14 class Server(socketserver.ThreadingTCPServer):
15     """Bind together threaded IO handling server and message queue."""
16
17     def __init__(self, queue, *args, **kwargs):
18         super().__init__(*args, **kwargs)
19         self.queue_out = queue
20         self.daemon_threads = True  # Else, server's threads have daemon=False.
21
22
23 class IO_Handler(socketserver.BaseRequestHandler):
24
25     def handle(self):
26         """Move messages between network socket and main thread via queues.
27
28         On start, sets up new queue, sends it via self.server.queue_out to
29         main thread, and from then on receives messages to send back from the
30         main thread via that new queue.
31
32         At the same time, loops over socket's recv to get messages from the
33         outside via self.server.queue_out into the main thread. Ends connection
34         once a 'QUIT' message is received from socket, and then also kills its
35         own queue.
36
37         All messages to the main thread are tuples, with the first element a
38         meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
39         deletion, and 'COMMAND' for everything else), the second element a UUID
40         that uniquely identifies the thread (so that the main thread knows whom
41         to send replies back to), and optionally a third element for further
42         instructions.
43         """
44         import plom_socket_io
45
46         def caught_send(socket, message):
47             """Send message by socket, catch broken socket connection error."""
48             try:
49                 plom_socket_io.send(socket, message)
50             except plom_socket_io.BrokenSocketConnection:
51                 pass
52
53         def send_queue_messages(socket, queue_in, thread_alive):
54             """Send messages via socket from queue_in while thread_alive[0]."""
55             while thread_alive[0]:
56                 try:
57                     msg = queue_in.get(timeout=1)
58                 except queue.Empty:
59                     continue
60                 caught_send(socket, msg)
61
62         import uuid
63         print('CONNECTION FROM:', str(self.client_address))
64         connection_id = uuid.uuid4()
65         queue_in = queue.Queue()
66         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
67         thread_alive = [True]
68         t = threading.Thread(target=send_queue_messages,
69                              args=(self.request, queue_in, thread_alive))
70         t.start()
71         for message in plom_socket_io.recv(self.request):
72             if message is None:
73                 caught_send(self.request, 'BAD MESSAGE')
74             elif 'QUIT' == message:
75                 caught_send(self.request, 'BYE')
76                 break
77             else:
78                 self.server.queue_out.put(('COMMAND', connection_id, message))
79         self.server.queue_out.put(('KILL_QUEUE', connection_id))
80         thread_alive[0] = False
81         print('CONNECTION CLOSED FROM:', str(self.client_address))
82         self.request.close()
83
84
85 def fib(n):
86     """Calculate n-th Fibonacci number. Very inefficiently."""
87     if n in (1, 2):
88         return 1
89     else:
90         return fib(n-1) + fib(n-2)
91
92
93 class CommandHandler:
94
95     def __init__(self, queues_out):
96         from multiprocessing import Pool
97         self.queues_out = queues_out
98         self.world = World()
99         self.parser = Parser(self)
100         # self.pool and self.pool_result are currently only needed by the FIB
101         # command and the demo of a parallelized game loop in cmd_inc_p.
102         self.pool = Pool()
103         self.pool_result = None
104
105     def handle_input(self, input_, connection_id):
106         """Process input_ to command grammar, call command handler if found."""
107         try:
108             command = self.parser.parse(input_)
109             if command is None:
110                 self.send_to(connection_id, 'UNHANDLED INPUT')
111             else:
112                 command(connection_id=connection_id)
113         except ArgError as e:
114             self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
115         except GameError as e:
116             self.send_to(connection_id, 'GAME ERROR: ' + str(e))
117
118     def send_to(self, connection_id, msg):
119         """Send msg to client of connection_id."""
120         self.queues_out[connection_id].put(msg)
121
122     def send_all(self, msg):
123         """Send msg to all clients."""
124         for connection_id in self.queues_out:
125             self.send_to(connection_id, msg)
126
127     def stringify_yx(self, tuple_):
128         """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
129         return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
130
131     def quoted(self, string):
132         """Quote and escape string so client interprets it as single token."""
133         quoted = []
134         quoted += ['"']
135         for c in string:
136             if c in {'"', '\\'}:
137                 quoted += ['\\']
138             quoted += [c]
139         quoted += ['"']
140         return ''.join(quoted)
141
142     def quoted_map(self, map_string, map_width):
143         """Put \n into map_string at map_width intervals, return quoted whole."""
144         map_lines = []
145         map_size = len(map_string)
146         start_cut = 0
147         while start_cut < map_size:
148             limit = start_cut + map_width
149             map_lines += [map_string[start_cut:limit]]
150             start_cut = limit
151         return self.quoted("\n".join(map_lines))
152
153     def send_all_gamestate(self):
154         """Send out game state data relevant to clients."""
155         self.send_all('NEW_TURN ' + str(self.world.turn))
156         self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
157         self.send_all('TERRAIN\n' + self.quoted_map(self.world.map_,
158                                                     self.world.map_size[1]))
159         for thing in self.world.things:
160             self.send_all('THING TYPE:' + thing.type_ + ' '
161                           + self.stringify_yx(thing.position))
162
163     def proceed_to_next_player_turn(self, connection_id):
164         """Run game world turns until player can decide their next step.
165
166         Sends a 'TURN_FINISHED' message, then iterates through all non-player
167         things, on each step furthering them in their tasks (and letting them
168         decide new ones if they finish). The iteration order is: first all
169         things that come after the player in the world things list, then (after
170         incrementing the world turn) all that come before the player; then the
171         player's .proceed() is run, and if it does not finish his task, the
172         loop starts at the beginning. Once the player's task is finished, the
173         loop breaks, and client-relevant game data is sent.
174         """
175         self.send_all('TURN_FINISHED ' + str(self.world.turn))
176         while True:
177             for thing in self.world.things[self.world.player_i+1:]:
178                 thing.proceed()
179             self.world.turn += 1
180             for thing in self.world.things[:self.world.player_i]:
181                 thing.proceed()
182             self.world.player.proceed(is_AI=False)
183             if self.world.player.task is None:
184                 break
185         self.send_all_gamestate()
186
187     def cmd_MOVE(self, direction, connection_id):
188         """Set player task to 'move' with direction arg, finish player turn."""
189         if direction not in {'UP', 'DOWN', 'RIGHT', 'LEFT'}:
190             raise ArgError('Move argument must be one of: '
191                            'UP, DOWN, RIGHT, LEFT')
192         self.world.player.set_task('move', direction=direction)
193         self.proceed_to_next_player_turn(connection_id)
194     cmd_MOVE.argtypes = 'string'
195
196     def cmd_WAIT(self, connection_id):
197         """Set player task to 'wait', finish player turn."""
198         self.world.player.set_task('wait')
199         self.proceed_to_next_player_turn(connection_id)
200
201     def cmd_GET_TURN(self, connection_id):
202         """Send world.turn to caller."""
203         self.send_to(connection_id, str(self.world.turn))
204
205     def cmd_ECHO(self, msg, connection_id):
206         """Send msg to caller."""
207         self.send_to(connection_id, msg)
208     cmd_ECHO.argtypes = 'string'
209
210     def cmd_ALL(self, msg, connection_id):
211         """Send msg to all clients."""
212         self.send_all(msg)
213     cmd_ALL.argtypes = 'string'
214
215     def cmd_FIB(self, numbers, connection_id):
216         """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
217
218         Numbers are calculated in parallel as far as possible, using fib().
219         A 'CALCULATING …' message is sent to caller before the result.
220         """
221         self.send_to(connection_id, 'CALCULATING …')
222         results = self.pool.map(fib, numbers)
223         reply = ' '.join([str(r) for r in results])
224         self.send_to(connection_id, reply)
225     cmd_FIB.argtypes = 'seq:int:nonneg'
226
227     def cmd_INC_P(self, connection_id):
228         """Increment world.turn, send game turn data to everyone.
229
230         To simulate game processing waiting times, a one second delay between
231         TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
232         calculations are started as pool processes that need to be finished
233         until a further INC finishes the turn.
234
235         This is just a demo structure for how the game loop could work when
236         parallelized. One might imagine a two-step game turn, with a non-action
237         step determining actor tasks (the AI determinations would take the
238         place of the fib calculations here), and an action step wherein these
239         tasks are performed (where now sleep(1) is).
240         """
241         from time import sleep
242         if self.pool_result is not None:
243             self.pool_result.wait()
244         self.send_all('TURN_FINISHED ' + str(self.world.turn))
245         sleep(1)
246         self.world.turn += 1
247         self.send_all_gamestate()
248         self.pool_result = self.pool.map_async(fib, (35, 35))
249
250
251 def io_loop(q):
252     """Handle commands coming through queue q, send results back.
253
254     Commands from q are expected to be tuples, with the first element either
255     'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
256     an optional third element of arbitrary type. The UUID identifies a
257     receiver for replies.
258
259     An 'ADD_QUEUE' command should contain as third element a queue through
260     which to send messages back to the sender of the command. A 'KILL_QUEUE'
261     command removes the queue for that receiver from the list of queues through
262     which to send replies.
263
264     A 'COMMAND' command is specified in greater detail by a string that is the
265     tuple's third element. CommandHandler takes care of processing this and
266     sending out replies.
267     """
268     queues_out = {}
269     command_handler = CommandHandler(queues_out)
270     while True:
271         x = q.get()
272         command_type = x[0]
273         connection_id = x[1]
274         content = None if len(x) == 2 else x[2]
275         if command_type == 'ADD_QUEUE':
276             queues_out[connection_id] = content
277         elif command_type == 'COMMAND':
278             command_handler.handle_input(content, connection_id)
279         elif command_type == 'KILL_QUEUE':
280             del queues_out[connection_id]
281
282
283 q = queue.Queue()
284 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
285 c.start()
286 server = Server(q, ('localhost', 5000), IO_Handler)
287 try:
288     server.serve_forever()
289 except KeyboardInterrupt:
290     pass
291 finally:
292     print('Killing server')
293     server.server_close()