home · contact · privacy
Refactor.
[plomrogue2-experiments] / server.py
1 #!/usr/bin/env python3
2
3 import socketserver
4 import threading
5 import queue
6 import sys
7 import os
8 from parser import ArgError, Parser
9 from server_.game import World, GameError
10 from game_common import Commander
11
12
13 # Avoid "Address already in use" errors.
14 socketserver.TCPServer.allow_reuse_address = True
15
16
17 class Server(socketserver.ThreadingTCPServer):
18     """Bind together threaded IO handling server and message queue."""
19
20     def __init__(self, queue, *args, **kwargs):
21         super().__init__(*args, **kwargs)
22         self.queue_out = queue
23         self.daemon_threads = True  # Else, server's threads have daemon=False.
24
25
26 class IO_Handler(socketserver.BaseRequestHandler):
27
28     def handle(self):
29         """Move messages between network socket and main thread via queues.
30
31         On start, sets up new queue, sends it via self.server.queue_out to
32         main thread, and from then on receives messages to send back from the
33         main thread via that new queue.
34
35         At the same time, loops over socket's recv to get messages from the
36         outside via self.server.queue_out into the main thread. Ends connection
37         once a 'QUIT' message is received from socket, and then also kills its
38         own queue.
39
40         All messages to the main thread are tuples, with the first element a
41         meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
42         deletion, and 'COMMAND' for everything else), the second element a UUID
43         that uniquely identifies the thread (so that the main thread knows whom
44         to send replies back to), and optionally a third element for further
45         instructions.
46         """
47         import plom_socket_io
48
49         def caught_send(socket, message):
50             """Send message by socket, catch broken socket connection error."""
51             try:
52                 plom_socket_io.send(socket, message)
53             except plom_socket_io.BrokenSocketConnection:
54                 pass
55
56         def send_queue_messages(socket, queue_in, thread_alive):
57             """Send messages via socket from queue_in while thread_alive[0]."""
58             while thread_alive[0]:
59                 try:
60                     msg = queue_in.get(timeout=1)
61                 except queue.Empty:
62                     continue
63                 caught_send(socket, msg)
64
65         import uuid
66         print('CONNECTION FROM:', str(self.client_address))
67         connection_id = uuid.uuid4()
68         queue_in = queue.Queue()
69         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
70         thread_alive = [True]
71         t = threading.Thread(target=send_queue_messages,
72                              args=(self.request, queue_in, thread_alive))
73         t.start()
74         for message in plom_socket_io.recv(self.request):
75             if message is None:
76                 caught_send(self.request, 'BAD MESSAGE')
77             elif 'QUIT' == message:
78                 caught_send(self.request, 'BYE')
79                 break
80             else:
81                 self.server.queue_out.put(('COMMAND', connection_id, message))
82         self.server.queue_out.put(('KILL_QUEUE', connection_id))
83         thread_alive[0] = False
84         print('CONNECTION CLOSED FROM:', str(self.client_address))
85         self.request.close()
86
87
88 def fib(n):
89     """Calculate n-th Fibonacci number. Very inefficiently."""
90     if n in (1, 2):
91         return 1
92     else:
93         return fib(n-1) + fib(n-2)
94
95
96 class CommandHandler(Commander):
97
98     def __init__(self):
99         from multiprocessing import Pool
100         self.queues_out = {}
101         self.world = World()
102         self.parser = Parser(self)
103         # self.pool and self.pool_result are currently only needed by the FIB
104         # command and the demo of a parallelized game loop in cmd_inc_p.
105         self.pool = Pool()
106         self.pool_result = None
107
108     def handle_input(self, input_, connection_id=None, abort_on_error=False):
109         """Process input_ to command grammar, call command handler if found."""
110         from inspect import signature
111         try:
112             command = self.parser.parse(input_)
113             if command is None:
114                 self.send_to(connection_id, 'UNHANDLED INPUT')
115             else:
116                 if 'connection_id' in list(signature(command).parameters):
117                     command(connection_id=connection_id)
118                 else:
119                     command()
120         except ArgError as e:
121             self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
122             if abort_on_error:
123                 exit(1)
124         except GameError as e:
125             self.send_to(connection_id, 'GAME ERROR: ' + str(e))
126             if abort_on_error:
127                 exit(1)
128
129     def send_to(self, connection_id, msg):
130         """Send msg to client of connection_id; if no later, print instead."""
131         if connection_id:
132             self.queues_out[connection_id].put(msg)
133         else:
134             print(msg)
135
136     def send_all(self, msg):
137         """Send msg to all clients."""
138         for connection_id in self.queues_out:
139             self.send_to(connection_id, msg)
140
141     def send_all_gamestate(self):
142         """Send out game state data relevant to clients."""
143
144         def stringify_yx(tuple_):
145             """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
146             return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
147
148         def quoted(string):
149             """Quote & escape string so client interprets it as single token."""
150             quoted = []
151             quoted += ['"']
152             for c in string:
153                 if c in {'"', '\\'}:
154                     quoted += ['\\']
155                 quoted += [c]
156             quoted += ['"']
157             return ''.join(quoted)
158
159         self.send_all('NEW_TURN ' + str(self.world.turn))
160         self.send_all('MAP_SIZE ' + stringify_yx(self.world.map_size))
161         for y in range(self.world.map_size[0]):
162             width = self.world.map_size[1]
163             terrain_line = self.world.terrain_map[y * width:(y + 1) * width]
164             self.send_all('TERRAIN_LINE %5s %s' % (y, quoted(terrain_line)))
165         for thing in self.world.things:
166             self.send_all('THING_TYPE %s %s' % (thing.id_, thing.type_))
167             self.send_all('THING_POS %s %s' % (thing.id_,
168                                                stringify_yx(thing.position)))
169
170     def proceed(self):
171         """Send turn finish signal, run game world, send new world data.
172
173         First sends 'TURN_FINISHED' message, then runs game world
174         until new player input is needed, then sends game state.
175         """
176         self.send_all('TURN_FINISHED ' + str(self.world.turn))
177         self.world.proceed_to_next_player_turn()
178         self.send_all_gamestate()
179
180     def cmd_MOVE(self, direction):
181         """Set player task to 'move' with direction arg, finish player turn."""
182         if direction not in {'UP', 'DOWN', 'RIGHT', 'LEFT'}:
183             raise ArgError('Move argument must be one of: '
184                            'UP, DOWN, RIGHT, LEFT')
185         self.world.get_player().set_task('move', direction=direction)
186         self.proceed()
187     cmd_MOVE.argtypes = 'string'
188
189     def cmd_WAIT(self):
190         """Set player task to 'wait', finish player turn."""
191         self.world.get_player().set_task('wait')
192         self.proceed()
193
194     def cmd_GET_TURN(self, connection_id):
195         """Send world.turn to caller."""
196         self.send_to(connection_id, str(self.world.turn))
197
198     def cmd_ECHO(self, msg, connection_id):
199         """Send msg to caller."""
200         self.send_to(connection_id, msg)
201     cmd_ECHO.argtypes = 'string'
202
203     def cmd_ALL(self, msg, connection_id):
204         """Send msg to all clients."""
205         self.send_all(msg)
206     cmd_ALL.argtypes = 'string'
207
208     def cmd_FIB(self, numbers, connection_id):
209         """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
210
211         Numbers are calculated in parallel as far as possible, using fib().
212         A 'CALCULATING …' message is sent to caller before the result.
213         """
214         self.send_to(connection_id, 'CALCULATING …')
215         results = self.pool.map(fib, numbers)
216         reply = ' '.join([str(r) for r in results])
217         self.send_to(connection_id, reply)
218     cmd_FIB.argtypes = 'seq:int:nonneg'
219
220     def cmd_INC_P(self, connection_id):
221         """Increment world.turn, send game turn data to everyone.
222
223         To simulate game processing waiting times, a one second delay between
224         TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
225         calculations are started as pool processes that need to be finished
226         until a further INC finishes the turn.
227
228         This is just a demo structure for how the game loop could work when
229         parallelized. One might imagine a two-step game turn, with a non-action
230         step determining actor tasks (the AI determinations would take the
231         place of the fib calculations here), and an action step wherein these
232         tasks are performed (where now sleep(1) is).
233         """
234         from time import sleep
235         if self.pool_result is not None:
236             self.pool_result.wait()
237         self.send_all('TURN_FINISHED ' + str(self.world.turn))
238         sleep(1)
239         self.world.turn += 1
240         self.send_all_gamestate()
241         self.pool_result = self.pool.map_async(fib, (35, 35))
242
243
244 def io_loop(q, commander):
245     """Handle commands coming through queue q, send results back.
246
247     Commands from q are expected to be tuples, with the first element either
248     'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
249     an optional third element of arbitrary type. The UUID identifies a
250     receiver for replies.
251
252     An 'ADD_QUEUE' command should contain as third element a queue through
253     which to send messages back to the sender of the command. A 'KILL_QUEUE'
254     command removes the queue for that receiver from the list of queues through
255     which to send replies.
256
257     A 'COMMAND' command is specified in greater detail by a string that is the
258     tuple's third element. The commander CommandHandler takes care of processing
259     this and sending out replies.
260     """
261     while True:
262         x = q.get()
263         command_type = x[0]
264         connection_id = x[1]
265         content = None if len(x) == 2 else x[2]
266         if command_type == 'ADD_QUEUE':
267             commander.queues_out[connection_id] = content
268         elif command_type == 'COMMAND':
269             commander.handle_input(content, connection_id)
270         elif command_type == 'KILL_QUEUE':
271             del commander.queues_out[connection_id]
272
273
274 if len(sys.argv) != 2:
275     print('wrong number of arguments, expected one (game file)')
276     exit(1)
277 game_file_name = sys.argv[1]
278 commander = CommandHandler()
279 if os.path.exists(game_file_name):
280     if not os.path.isfile(game_file_name):
281         print('game file name does not refer to a valid game file')
282     else:
283         with open(game_file_name, 'r') as f:
284             lines = f.readlines()
285         for i in range(len(lines)):
286             line = lines[i]
287             print("FILE INPUT LINE %s: %s" % (i, line), end='')
288             commander.handle_input(line, abort_on_error=True)
289 else:
290     commander.handle_input('MAP_SIZE Y:5,X:5')
291     commander.handle_input('TERRAIN_LINE 0 "xxxxx"')
292     commander.handle_input('TERRAIN_LINE 1 "x...x"')
293     commander.handle_input('TERRAIN_LINE 2 "x.X.x"')
294     commander.handle_input('TERRAIN_LINE 3 "x...x"')
295     commander.handle_input('TERRAIN_LINE 4 "xxxxx"')
296     commander.handle_input('THING_TYPE 0 human')
297     commander.handle_input('THING_POS 0 Y:3,X:3')
298     commander.handle_input('THING_TYPE 1 monster')
299     commander.handle_input('THING_POS 1 Y:1,X:1')
300 q = queue.Queue()
301 c = threading.Thread(target=io_loop, daemon=True, args=(q, commander))
302 c.start()
303 server = Server(q, ('localhost', 5000), IO_Handler)
304 try:
305     server.serve_forever()
306 except KeyboardInterrupt:
307     pass
308 finally:
309     print('Killing server')
310     server.server_close()