home · contact · privacy
Give feedback on aborted tasks.
[plomrogue2-experiments] / server.py
1 #!/usr/bin/env python3
2
3 import socketserver
4 import threading
5 import queue
6 import sys
7 import os
8 import parser
9 import server_.game
10 import game_common
11
12
13 # Avoid "Address already in use" errors.
14 socketserver.TCPServer.allow_reuse_address = True
15
16
17 class Server(socketserver.ThreadingTCPServer):
18     """Bind together threaded IO handling server and message queue."""
19
20     def __init__(self, queue, *args, **kwargs):
21         super().__init__(*args, **kwargs)
22         self.queue_out = queue
23         self.daemon_threads = True  # Else, server's threads have daemon=False.
24
25
26 class IO_Handler(socketserver.BaseRequestHandler):
27
28     def handle(self):
29         """Move messages between network socket and main thread via queues.
30
31         On start, sets up new queue, sends it via self.server.queue_out to
32         main thread, and from then on receives messages to send back from the
33         main thread via that new queue.
34
35         At the same time, loops over socket's recv to get messages from the
36         outside via self.server.queue_out into the main thread. Ends connection
37         once a 'QUIT' message is received from socket, and then also kills its
38         own queue.
39
40         All messages to the main thread are tuples, with the first element a
41         meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
42         deletion, and 'COMMAND' for everything else), the second element a UUID
43         that uniquely identifies the thread (so that the main thread knows whom
44         to send replies back to), and optionally a third element for further
45         instructions.
46         """
47         import plom_socket_io
48
49         def caught_send(socket, message):
50             """Send message by socket, catch broken socket connection error."""
51             try:
52                 plom_socket_io.send(socket, message)
53             except plom_socket_io.BrokenSocketConnection:
54                 pass
55
56         def send_queue_messages(socket, queue_in, thread_alive):
57             """Send messages via socket from queue_in while thread_alive[0]."""
58             while thread_alive[0]:
59                 try:
60                     msg = queue_in.get(timeout=1)
61                 except queue.Empty:
62                     continue
63                 caught_send(socket, msg)
64
65         import uuid
66         print('CONNECTION FROM:', str(self.client_address))
67         connection_id = uuid.uuid4()
68         queue_in = queue.Queue()
69         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
70         thread_alive = [True]
71         t = threading.Thread(target=send_queue_messages,
72                              args=(self.request, queue_in, thread_alive))
73         t.start()
74         for message in plom_socket_io.recv(self.request):
75             if message is None:
76                 caught_send(self.request, 'BAD MESSAGE')
77             elif 'QUIT' == message:
78                 caught_send(self.request, 'BYE')
79                 break
80             else:
81                 self.server.queue_out.put(('COMMAND', connection_id, message))
82         self.server.queue_out.put(('KILL_QUEUE', connection_id))
83         thread_alive[0] = False
84         print('CONNECTION CLOSED FROM:', str(self.client_address))
85         self.request.close()
86
87
88 def fib(n):
89     """Calculate n-th Fibonacci number. Very inefficiently."""
90     if n in (1, 2):
91         return 1
92     else:
93         return fib(n-1) + fib(n-2)
94
95
96 class CommandHandler(game_common.Commander, server_.game.Commander):
97
98     def __init__(self, game_file_name):
99         self.queues_out = {}
100         self.world = server_.game.World()
101         self.parser = parser.Parser(self)
102         self.game_file_name = game_file_name
103         # self.pool and self.pool_result are currently only needed by the FIB
104         # command and the demo of a parallelized game loop in cmd_inc_p.
105         from multiprocessing import Pool
106         self.pool = Pool()
107         self.pool_result = None
108
109     def handle_input(self, input_, connection_id=None, abort_on_error=False,
110                      store=True):
111         """Process input_ to command grammar, call command handler if found."""
112         from inspect import signature
113         try:
114             command = self.parser.parse(input_)
115             if command is None:
116                 self.send_to(connection_id, 'UNHANDLED INPUT')
117             else:
118                 if 'connection_id' in list(signature(command).parameters):
119                     command(connection_id=connection_id)
120                 else:
121                     command()
122                     if store:
123                         with open(self.game_file_name, 'a') as f:
124                             f.write(input_ + '\n')
125         except parser.ArgError as e:
126             self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
127             if abort_on_error:
128                 exit(1)
129         except server_.game.GameError as e:
130             self.send_to(connection_id, 'GAME ERROR: ' + str(e))
131             if abort_on_error:
132                 exit(1)
133
134     def send_to(self, connection_id, msg):
135         """Send msg to client of connection_id; if no later, print instead."""
136         if connection_id:
137             self.queues_out[connection_id].put(msg)
138         else:
139             print(msg)
140
141     def send_all(self, msg):
142         """Send msg to all clients."""
143         for connection_id in self.queues_out:
144             self.send_to(connection_id, msg)
145
146     def send_all_gamestate(self):
147         """Send out game state data relevant to clients."""
148
149         def stringify_yx(tuple_):
150             """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
151             return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
152
153         def quoted(string):
154             """Quote & escape string so client interprets it as single token."""
155             quoted = []
156             quoted += ['"']
157             for c in string:
158                 if c in {'"', '\\'}:
159                     quoted += ['\\']
160                 quoted += [c]
161             quoted += ['"']
162             return ''.join(quoted)
163
164         self.send_all('NEW_TURN ' + str(self.world.turn))
165         self.send_all('MAP_SIZE ' + stringify_yx(self.world.map_size))
166         for y in range(self.world.map_size[0]):
167             width = self.world.map_size[1]
168             terrain_line = self.world.terrain_map[y * width:(y + 1) * width]
169             self.send_all('TERRAIN_LINE %5s %s' % (y, quoted(terrain_line)))
170         for thing in self.world.things:
171             self.send_all('THING_TYPE %s %s' % (thing.id_, thing.type_))
172             self.send_all('THING_POS %s %s' % (thing.id_,
173                                                stringify_yx(thing.position)))
174
175     def proceed(self):
176         """Send turn finish signal, run game world, send new world data.
177
178         First sends 'TURN_FINISHED' message, then runs game world
179         until new player input is needed, then sends game state.
180         """
181         self.send_all('TURN_FINISHED ' + str(self.world.turn))
182         self.world.proceed_to_next_player_turn()
183         msg = str(self.world.get_player().last_task_result)
184         self.send_all('LAST_PLAYER_TASK_RESULT ' + msg)
185         self.send_all_gamestate()
186
187     def cmd_FIB(self, numbers, connection_id):
188         """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
189
190         Numbers are calculated in parallel as far as possible, using fib().
191         A 'CALCULATING …' message is sent to caller before the result.
192         """
193         self.send_to(connection_id, 'CALCULATING …')
194         results = self.pool.map(fib, numbers)
195         reply = ' '.join([str(r) for r in results])
196         self.send_to(connection_id, reply)
197     cmd_FIB.argtypes = 'seq:int:nonneg'
198
199     def cmd_INC_P(self, connection_id):
200         """Increment world.turn, send game turn data to everyone.
201
202         To simulate game processing waiting times, a one second delay between
203         TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
204         calculations are started as pool processes that need to be finished
205         until a further INC finishes the turn.
206
207         This is just a demo structure for how the game loop could work when
208         parallelized. One might imagine a two-step game turn, with a non-action
209         step determining actor tasks (the AI determinations would take the
210         place of the fib calculations here), and an action step wherein these
211         tasks are performed (where now sleep(1) is).
212         """
213         from time import sleep
214         if self.pool_result is not None:
215             self.pool_result.wait()
216         self.send_all('TURN_FINISHED ' + str(self.world.turn))
217         sleep(1)
218         self.world.turn += 1
219         self.send_all_gamestate()
220         self.pool_result = self.pool.map_async(fib, (35, 35))
221
222
223 def io_loop(q, commander):
224     """Handle commands coming through queue q, send results back.
225
226     Commands from q are expected to be tuples, with the first element either
227     'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
228     an optional third element of arbitrary type. The UUID identifies a
229     receiver for replies.
230
231     An 'ADD_QUEUE' command should contain as third element a queue through
232     which to send messages back to the sender of the command. A 'KILL_QUEUE'
233     command removes the queue for that receiver from the list of queues through
234     which to send replies.
235
236     A 'COMMAND' command is specified in greater detail by a string that is the
237     tuple's third element. The commander CommandHandler takes care of processing
238     this and sending out replies.
239     """
240     while True:
241         x = q.get()
242         command_type = x[0]
243         connection_id = x[1]
244         content = None if len(x) == 2 else x[2]
245         if command_type == 'ADD_QUEUE':
246             commander.queues_out[connection_id] = content
247         elif command_type == 'COMMAND':
248             commander.handle_input(content, connection_id)
249         elif command_type == 'KILL_QUEUE':
250             del commander.queues_out[connection_id]
251
252
253 if len(sys.argv) != 2:
254     print('wrong number of arguments, expected one (game file)')
255     exit(1)
256 game_file_name = sys.argv[1]
257 commander = CommandHandler(game_file_name)
258 if os.path.exists(game_file_name):
259     if not os.path.isfile(game_file_name):
260         print('game file name does not refer to a valid game file')
261     else:
262         with open(game_file_name, 'r') as f:
263             lines = f.readlines()
264         for i in range(len(lines)):
265             line = lines[i]
266             print("FILE INPUT LINE %s: %s" % (i, line), end='')
267             commander.handle_input(line, abort_on_error=True, store=False)
268 else:
269     commander.handle_input('MAP_SIZE Y:5,X:5')
270     commander.handle_input('TERRAIN_LINE 0 "xxxxx"')
271     commander.handle_input('TERRAIN_LINE 1 "x...x"')
272     commander.handle_input('TERRAIN_LINE 2 "x.X.x"')
273     commander.handle_input('TERRAIN_LINE 3 "x...x"')
274     commander.handle_input('TERRAIN_LINE 4 "xxxxx"')
275     commander.handle_input('THING_TYPE 0 human')
276     commander.handle_input('THING_POS 0 Y:3,X:3')
277     commander.handle_input('THING_TYPE 1 monster')
278     commander.handle_input('THING_POS 1 Y:1,X:1')
279 q = queue.Queue()
280 c = threading.Thread(target=io_loop, daemon=True, args=(q, commander))
281 c.start()
282 server = Server(q, ('localhost', 5000), IO_Handler)
283 try:
284     server.serve_forever()
285 except KeyboardInterrupt:
286     pass
287 finally:
288     print('Killing server')
289     server.server_close()