home · contact · privacy
Refactor.
[plomrogue2-experiments] / server.py
1 #!/usr/bin/env python3
2
3 import socketserver
4 import threading
5 import queue
6 import sys
7 import os
8 from parser import ArgError, Parser
9 from server_.game import World, GameError
10
11
12 # Avoid "Address already in use" errors.
13 socketserver.TCPServer.allow_reuse_address = True
14
15
16 class Server(socketserver.ThreadingTCPServer):
17     """Bind together threaded IO handling server and message queue."""
18
19     def __init__(self, queue, *args, **kwargs):
20         super().__init__(*args, **kwargs)
21         self.queue_out = queue
22         self.daemon_threads = True  # Else, server's threads have daemon=False.
23
24
25 class IO_Handler(socketserver.BaseRequestHandler):
26
27     def handle(self):
28         """Move messages between network socket and main thread via queues.
29
30         On start, sets up new queue, sends it via self.server.queue_out to
31         main thread, and from then on receives messages to send back from the
32         main thread via that new queue.
33
34         At the same time, loops over socket's recv to get messages from the
35         outside via self.server.queue_out into the main thread. Ends connection
36         once a 'QUIT' message is received from socket, and then also kills its
37         own queue.
38
39         All messages to the main thread are tuples, with the first element a
40         meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
41         deletion, and 'COMMAND' for everything else), the second element a UUID
42         that uniquely identifies the thread (so that the main thread knows whom
43         to send replies back to), and optionally a third element for further
44         instructions.
45         """
46         import plom_socket_io
47
48         def caught_send(socket, message):
49             """Send message by socket, catch broken socket connection error."""
50             try:
51                 plom_socket_io.send(socket, message)
52             except plom_socket_io.BrokenSocketConnection:
53                 pass
54
55         def send_queue_messages(socket, queue_in, thread_alive):
56             """Send messages via socket from queue_in while thread_alive[0]."""
57             while thread_alive[0]:
58                 try:
59                     msg = queue_in.get(timeout=1)
60                 except queue.Empty:
61                     continue
62                 caught_send(socket, msg)
63
64         import uuid
65         print('CONNECTION FROM:', str(self.client_address))
66         connection_id = uuid.uuid4()
67         queue_in = queue.Queue()
68         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
69         thread_alive = [True]
70         t = threading.Thread(target=send_queue_messages,
71                              args=(self.request, queue_in, thread_alive))
72         t.start()
73         for message in plom_socket_io.recv(self.request):
74             if message is None:
75                 caught_send(self.request, 'BAD MESSAGE')
76             elif 'QUIT' == message:
77                 caught_send(self.request, 'BYE')
78                 break
79             else:
80                 self.server.queue_out.put(('COMMAND', connection_id, message))
81         self.server.queue_out.put(('KILL_QUEUE', connection_id))
82         thread_alive[0] = False
83         print('CONNECTION CLOSED FROM:', str(self.client_address))
84         self.request.close()
85
86
87 def fib(n):
88     """Calculate n-th Fibonacci number. Very inefficiently."""
89     if n in (1, 2):
90         return 1
91     else:
92         return fib(n-1) + fib(n-2)
93
94
95 class CommandHandler:
96
97     def __init__(self):
98         from multiprocessing import Pool
99         self.queues_out = {}
100         self.world = World()
101         self.parser = Parser(self)
102         # self.pool and self.pool_result are currently only needed by the FIB
103         # command and the demo of a parallelized game loop in cmd_inc_p.
104         self.pool = Pool()
105         self.pool_result = None
106
107     def handle_input(self, input_, connection_id=None, abort_on_error=False):
108         """Process input_ to command grammar, call command handler if found."""
109         from inspect import signature
110         try:
111             command = self.parser.parse(input_)
112             if command is None:
113                 self.send_to(connection_id, 'UNHANDLED INPUT')
114             else:
115                 if 'connection_id' in list(signature(command).parameters):
116                     command(connection_id=connection_id)
117                 else:
118                     command()
119         except ArgError as e:
120             self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
121             if abort_on_error:
122                 exit(1)
123         except GameError as e:
124             self.send_to(connection_id, 'GAME ERROR: ' + str(e))
125             if abort_on_error:
126                 exit(1)
127
128     def send_to(self, connection_id, msg):
129         """Send msg to client of connection_id; if no later, print instead."""
130         if connection_id:
131             self.queues_out[connection_id].put(msg)
132         else:
133             print(msg)
134
135     def send_all(self, msg):
136         """Send msg to all clients."""
137         for connection_id in self.queues_out:
138             self.send_to(connection_id, msg)
139
140     def stringify_yx(self, tuple_):
141         """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
142         return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
143
144     def quoted(self, string):
145         """Quote and escape string so client interprets it as single token."""
146         quoted = []
147         quoted += ['"']
148         for c in string:
149             if c in {'"', '\\'}:
150                 quoted += ['\\']
151             quoted += [c]
152         quoted += ['"']
153         return ''.join(quoted)
154
155     def send_all_gamestate(self):
156         """Send out game state data relevant to clients."""
157         self.send_all('NEW_TURN ' + str(self.world.turn))
158         self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
159         for y in range(self.world.map_size[0]):
160             width = self.world.map_size[1]
161             terrain_line = self.world.terrain_map[y * width:(y + 1) * width]
162             self.send_all('TERRAIN_LINE %5s %s' % (y, self.quoted(terrain_line)))
163         for thing in self.world.things:
164             self.send_all('THING_TYPE %s %s' % (thing.id_, thing.type_))
165             self.send_all('THING_POS %s %s' % (thing.id_, self.stringify_yx(thing.position)))
166
167     def proceed(self):
168         """Send turn finish signal, run game world, send new world data.
169
170         First sends 'TURN_FINISHED' message, then runs game world
171         until new player input is needed, then sends game state.
172         """
173         self.send_all('TURN_FINISHED ' + str(self.world.turn))
174         self.world.proceed_to_next_player_turn()
175         self.send_all_gamestate()
176
177     def cmd_MOVE(self, direction):
178         """Set player task to 'move' with direction arg, finish player turn."""
179         if direction not in {'UP', 'DOWN', 'RIGHT', 'LEFT'}:
180             raise ArgError('Move argument must be one of: '
181                            'UP, DOWN, RIGHT, LEFT')
182         self.world.get_player().set_task('move', direction=direction)
183         self.proceed()
184     cmd_MOVE.argtypes = 'string'
185
186     def cmd_WAIT(self):
187         """Set player task to 'wait', finish player turn."""
188         self.world.get_player().set_task('wait')
189         self.proceed()
190
191     def cmd_MAP_SIZE(self, yx):
192         self.world.set_map_size(yx)
193     cmd_MAP_SIZE.argtypes = 'yx_tuple:nonneg'
194
195     def cmd_TERRAIN_LINE(self, y, line):
196         self.world.set_map_line(y, line)
197     cmd_TERRAIN_LINE.argtypes = 'int:nonneg string'
198
199     def cmd_THING_TYPE(self, i, type_):
200         t = self.world.get_thing(i)
201         t.type_ = type_
202     cmd_THING_TYPE.argtypes = 'int:nonneg string'
203
204     def cmd_THING_POS(self, i, yx):
205         t = self.world.get_thing(i)
206         t.position = list(yx)
207     cmd_THING_POS.argtypes = 'int:nonneg yx_tuple:nonneg'
208
209     def cmd_GET_TURN(self, connection_id):
210         """Send world.turn to caller."""
211         self.send_to(connection_id, str(self.world.turn))
212
213     def cmd_ECHO(self, msg, connection_id):
214         """Send msg to caller."""
215         self.send_to(connection_id, msg)
216     cmd_ECHO.argtypes = 'string'
217
218     def cmd_ALL(self, msg, connection_id):
219         """Send msg to all clients."""
220         self.send_all(msg)
221     cmd_ALL.argtypes = 'string'
222
223     def cmd_FIB(self, numbers, connection_id):
224         """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
225
226         Numbers are calculated in parallel as far as possible, using fib().
227         A 'CALCULATING …' message is sent to caller before the result.
228         """
229         self.send_to(connection_id, 'CALCULATING …')
230         results = self.pool.map(fib, numbers)
231         reply = ' '.join([str(r) for r in results])
232         self.send_to(connection_id, reply)
233     cmd_FIB.argtypes = 'seq:int:nonneg'
234
235     def cmd_INC_P(self, connection_id):
236         """Increment world.turn, send game turn data to everyone.
237
238         To simulate game processing waiting times, a one second delay between
239         TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
240         calculations are started as pool processes that need to be finished
241         until a further INC finishes the turn.
242
243         This is just a demo structure for how the game loop could work when
244         parallelized. One might imagine a two-step game turn, with a non-action
245         step determining actor tasks (the AI determinations would take the
246         place of the fib calculations here), and an action step wherein these
247         tasks are performed (where now sleep(1) is).
248         """
249         from time import sleep
250         if self.pool_result is not None:
251             self.pool_result.wait()
252         self.send_all('TURN_FINISHED ' + str(self.world.turn))
253         sleep(1)
254         self.world.turn += 1
255         self.send_all_gamestate()
256         self.pool_result = self.pool.map_async(fib, (35, 35))
257
258
259 def io_loop(q, commander):
260     """Handle commands coming through queue q, send results back.
261
262     Commands from q are expected to be tuples, with the first element either
263     'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
264     an optional third element of arbitrary type. The UUID identifies a
265     receiver for replies.
266
267     An 'ADD_QUEUE' command should contain as third element a queue through
268     which to send messages back to the sender of the command. A 'KILL_QUEUE'
269     command removes the queue for that receiver from the list of queues through
270     which to send replies.
271
272     A 'COMMAND' command is specified in greater detail by a string that is the
273     tuple's third element. The commander CommandHandler takes care of processing
274     this and sending out replies.
275     """
276     while True:
277         x = q.get()
278         command_type = x[0]
279         connection_id = x[1]
280         content = None if len(x) == 2 else x[2]
281         if command_type == 'ADD_QUEUE':
282             commander.queues_out[connection_id] = content
283         elif command_type == 'COMMAND':
284             commander.handle_input(content, connection_id)
285         elif command_type == 'KILL_QUEUE':
286             del commander.queues_out[connection_id]
287
288
289 if len(sys.argv) != 2:
290     print('wrong number of arguments, expected one (game file)')
291     exit(1)
292 game_file_name = sys.argv[1]
293 commander = CommandHandler()
294 if os.path.exists(game_file_name):
295     if not os.path.isfile(game_file_name):
296         print('game file name does not refer to a valid game file')
297     else:
298         with open(game_file_name, 'r') as f:
299             lines = f.readlines()
300         for i in range(len(lines)):
301             line = lines[i]
302             print("FILE INPUT LINE %s: %s" % (i, line), end='')
303             commander.handle_input(line, abort_on_error=True)
304 else:
305     commander.handle_input('MAP_SIZE Y:5,X:5')
306     commander.handle_input('TERRAIN_LINE 0 "xxxxx"')
307     commander.handle_input('TERRAIN_LINE 1 "x...x"')
308     commander.handle_input('TERRAIN_LINE 2 "x.X.x"')
309     commander.handle_input('TERRAIN_LINE 3 "x...x"')
310     commander.handle_input('TERRAIN_LINE 4 "xxxxx"')
311     commander.handle_input('THING_TYPE 0 human')
312     commander.handle_input('THING_POS 0 Y:3,X:3')
313     commander.handle_input('THING_TYPE 1 monster')
314     commander.handle_input('THING_POS 1 Y:1,X:1')
315 q = queue.Queue()
316 c = threading.Thread(target=io_loop, daemon=True, args=(q, commander))
317 c.start()
318 server = Server(q, ('localhost', 5000), IO_Handler)
319 try:
320     server.serve_forever()
321 except KeyboardInterrupt:
322     pass
323 finally:
324     print('Killing server')
325     server.server_close()