home · contact · privacy
Remove newlines from server data, add action initiation checks.
[plomrogue2-experiments] / server.py
1 #!/usr/bin/env python3
2
3 import socketserver
4 import threading
5 import queue
6 from parser import ArgError, Parser
7
8
9 # Avoid "Address already in use" errors.
10 socketserver.TCPServer.allow_reuse_address = True
11
12
13 class GameError(Exception):
14     pass
15
16
17 class Server(socketserver.ThreadingTCPServer):
18     """Bind together threaded IO handling server and message queue."""
19
20     def __init__(self, queue, *args, **kwargs):
21         super().__init__(*args, **kwargs)
22         self.queue_out = queue
23         self.daemon_threads = True  # Else, server's threads have daemon=False.
24
25
26 class IO_Handler(socketserver.BaseRequestHandler):
27
28     def handle(self):
29         """Move messages between network socket and main thread via queues.
30
31         On start, sets up new queue, sends it via self.server.queue_out to
32         main thread, and from then on receives messages to send back from the
33         main thread via that new queue.
34
35         At the same time, loops over socket's recv to get messages from the
36         outside via self.server.queue_out into the main thread. Ends connection
37         once a 'QUIT' message is received from socket, and then also kills its
38         own queue.
39
40         All messages to the main thread are tuples, with the first element a
41         meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
42         deletion, and 'COMMAND' for everything else), the second element a UUID
43         that uniquely identifies the thread (so that the main thread knows whom
44         to send replies back to), and optionally a third element for further
45         instructions.
46         """
47         import plom_socket_io
48
49         def caught_send(socket, message):
50             """Send message by socket, catch broken socket connection error."""
51             try:
52                 plom_socket_io.send(socket, message)
53             except plom_socket_io.BrokenSocketConnection:
54                 pass
55
56         def send_queue_messages(socket, queue_in, thread_alive):
57             """Send messages via socket from queue_in while thread_alive[0]."""
58             while thread_alive[0]:
59                 try:
60                     msg = queue_in.get(timeout=1)
61                 except queue.Empty:
62                     continue
63                 caught_send(socket, msg)
64
65         import uuid
66         print('CONNECTION FROM:', str(self.client_address))
67         connection_id = uuid.uuid4()
68         queue_in = queue.Queue()
69         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
70         thread_alive = [True]
71         t = threading.Thread(target=send_queue_messages,
72                              args=(self.request, queue_in, thread_alive))
73         t.start()
74         for message in plom_socket_io.recv(self.request):
75             if message is None:
76                 caught_send(self.request, 'BAD MESSAGE')
77             elif 'QUIT' == message:
78                 caught_send(self.request, 'BYE')
79                 break
80             else:
81                 self.server.queue_out.put(('COMMAND', connection_id, message))
82         self.server.queue_out.put(('KILL_QUEUE', connection_id))
83         thread_alive[0] = False
84         print('CONNECTION CLOSED FROM:', str(self.client_address))
85         self.request.close()
86
87
88 class Task:
89
90     def __init__(self, name, args=(), kwargs={}):
91         self.name = name
92         self.args = args
93         self.kwargs = kwargs
94         self.todo = 1
95
96
97 class Thing:
98
99     def __init__(self, world, type_, position):
100         self.world = world
101         self.type_ = type_
102         self.position = position
103         self.task = Task('wait')
104
105     def task_wait(self):
106         pass
107
108     def task_move(self, direction):
109         if direction == 'UP':
110             self.position[0] -= 1
111         elif direction == 'DOWN':
112             self.position[0] += 1
113         elif direction == 'RIGHT':
114             self.position[1] += 1
115         elif direction == 'LEFT':
116             self.position[1] -= 1
117
118     def decide_task(self):
119         if self.position[1] > 1:
120             self.set_task('move', 'LEFT')
121         elif self.position[1] < 3:
122             self.set_task('move', 'RIGHT')
123         else:
124             self.set_task('wait')
125
126     def check_task(self, task, *args, **kwargs):
127         if task == 'move':
128             if len(args) > 0:
129                 direction = args[0]
130             else:
131                 direction = kwargs['direction']
132             test_pos = self.position[:]
133             if direction == 'UP':
134                 test_pos[0] -= 1
135             elif direction == 'DOWN':
136                 test_pos[0] += 1
137             elif direction == 'RIGHT':
138                 test_pos[1] += 1
139             elif direction == 'LEFT':
140                 test_pos[1] -= 1
141             if test_pos[0] < 0 or test_pos[1] < 0 or \
142                test_pos[0] >= self.world.map_size[0] or \
143                test_pos[1] >= self.world.map_size[1]:
144                 raise GameError('would move outside map bounds')
145             pos_i = test_pos[0] * self.world.map_size[1] + test_pos[1]
146             map_tile = self.world.map_[pos_i]
147             if map_tile != '.':
148                 raise GameError('would move into illegal terrain')
149
150     def set_task(self, task, *args, **kwargs):
151         self.check_task(task, *args, **kwargs)
152         self.task = Task(task, args, kwargs)
153
154     def proceed(self, is_AI=True):
155         """Further the thing in its tasks.
156
157         Decrements .task.todo; if it thus falls to <= 0, enacts method whose
158         name is 'task_' + self.task.name and sets .task = None. If is_AI, calls
159         .decide_task to decide a self.task.
160         """
161         self.task.todo -= 1
162         if self.task.todo <= 0:
163             task = getattr(self, 'task_' + self.task.name)
164             task(*self.task.args, **self.task.kwargs)
165             self.task = None
166         if is_AI and self.task is None:
167             self.decide_task()
168
169
170 class World:
171
172     def __init__(self):
173         self.turn = 0
174         self.map_size = (5, 5)
175         self.map_ = 'xxxxx' +\
176                     'x...x' +\
177                     'x.X.x' +\
178                     'x...x' +\
179                     'xxxxx'
180         self.things = [
181             Thing(self, 'human', [3, 3]),
182             Thing(self, 'monster', [1, 1])
183         ]
184         self.player_i = 0
185         self.player = self.things[self.player_i]
186
187
188 def fib(n):
189     """Calculate n-th Fibonacci number. Very inefficiently."""
190     if n in (1, 2):
191         return 1
192     else:
193         return fib(n-1) + fib(n-2)
194
195
196 class CommandHandler:
197
198     def __init__(self, queues_out):
199         from multiprocessing import Pool
200         self.queues_out = queues_out
201         self.world = World()
202         self.parser = Parser(self)
203         # self.pool and self.pool_result are currently only needed by the FIB
204         # command and the demo of a parallelized game loop in cmd_inc_p.
205         self.pool = Pool()
206         self.pool_result = None
207
208     def handle_input(self, input_, connection_id):
209         """Process input_ to command grammar, call command handler if found."""
210         try:
211             command = self.parser.parse(input_)
212             if command is None:
213                 self.send_to(connection_id, 'UNHANDLED INPUT')
214             else:
215                 command(connection_id=connection_id)
216         except ArgError as e:
217             self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
218         except GameError as e:
219             self.send_to(connection_id, 'GAME ERROR: ' + str(e))
220
221     def send_to(self, connection_id, msg):
222         """Send msg to client of connection_id."""
223         self.queues_out[connection_id].put(msg)
224
225     def send_all(self, msg):
226         """Send msg to all clients."""
227         for connection_id in self.queues_out:
228             self.send_to(connection_id, msg)
229
230     def stringify_yx(self, tuple_):
231         """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
232         return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
233
234     def quoted(self, string):
235         """Quote and escape string so client interprets it as single token."""
236         quoted = []
237         quoted += ['"']
238         for c in string:
239             if c in {'"', '\\'}:
240                 quoted += ['\\']
241             quoted += [c]
242         quoted += ['"']
243         return ''.join(quoted)
244
245     def quoted_map(self, map_string, map_width):
246         """Put \n into map_string at map_width intervals, return quoted whole."""
247         map_lines = []
248         map_size = len(map_string)
249         start_cut = 0
250         while start_cut < map_size:
251             limit = start_cut + map_width
252             map_lines += [map_string[start_cut:limit]]
253             start_cut = limit
254         return self.quoted("\n".join(map_lines))
255
256     def send_all_gamestate(self):
257         """Send out game state data relevant to clients."""
258         self.send_all('NEW_TURN ' + str(self.world.turn))
259         self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
260         self.send_all('TERRAIN\n' + self.quoted_map(self.world.map_,
261                                                     self.world.map_size[1]))
262         for thing in self.world.things:
263             self.send_all('THING TYPE:' + thing.type_ + ' '
264                           + self.stringify_yx(thing.position))
265
266     def proceed_to_next_player_turn(self, connection_id):
267         """Run game world turns until player can decide their next step.
268
269         Sends a 'TURN_FINISHED' message, then iterates through all non-player
270         things, on each step furthering them in their tasks (and letting them
271         decide new ones if they finish). The iteration order is: first all
272         things that come after the player in the world things list, then (after
273         incrementing the world turn) all that come before the player; then the
274         player's .proceed() is run, and if it does not finish his task, the
275         loop starts at the beginning. Once the player's task is finished, the
276         loop breaks, and client-relevant game data is sent.
277         """
278         self.send_all('TURN_FINISHED ' + str(self.world.turn))
279         while True:
280             for thing in self.world.things[self.world.player_i+1:]:
281                 thing.proceed()
282             self.world.turn += 1
283             for thing in self.world.things[:self.world.player_i]:
284                 thing.proceed()
285             self.world.player.proceed(is_AI=False)
286             if self.world.player.task is None:
287                 break
288         self.send_all_gamestate()
289
290     def cmd_MOVE(self, direction, connection_id):
291         """Set player task to 'move' with direction arg, finish player turn."""
292         if direction not in {'UP', 'DOWN', 'RIGHT', 'LEFT'}:
293             raise ArgError('Move argument must be one of: '
294                            'UP, DOWN, RIGHT, LEFT')
295         self.world.player.set_task('move', direction=direction)
296         self.proceed_to_next_player_turn(connection_id)
297     cmd_MOVE.argtypes = 'string'
298
299     def cmd_WAIT(self, connection_id):
300         """Set player task to 'wait', finish player turn."""
301         self.world.player.set_task('wait')
302         self.proceed_to_next_player_turn(connection_id)
303
304     def cmd_GET_TURN(self, connection_id):
305         """Send world.turn to caller."""
306         self.send_to(connection_id, str(self.world.turn))
307
308     def cmd_ECHO(self, msg, connection_id):
309         """Send msg to caller."""
310         self.send_to(connection_id, msg)
311     cmd_ECHO.argtypes = 'string'
312
313     def cmd_ALL(self, msg, connection_id):
314         """Send msg to all clients."""
315         self.send_all(msg)
316     cmd_ALL.argtypes = 'string'
317
318     def cmd_FIB(self, numbers, connection_id):
319         """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
320
321         Numbers are calculated in parallel as far as possible, using fib().
322         A 'CALCULATING …' message is sent to caller before the result.
323         """
324         self.send_to(connection_id, 'CALCULATING …')
325         results = self.pool.map(fib, numbers)
326         reply = ' '.join([str(r) for r in results])
327         self.send_to(connection_id, reply)
328     cmd_FIB.argtypes = 'seq:int:nonneg'
329
330     def cmd_INC_P(self, connection_id):
331         """Increment world.turn, send game turn data to everyone.
332
333         To simulate game processing waiting times, a one second delay between
334         TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
335         calculations are started as pool processes that need to be finished
336         until a further INC finishes the turn.
337
338         This is just a demo structure for how the game loop could work when
339         parallelized. One might imagine a two-step game turn, with a non-action
340         step determining actor tasks (the AI determinations would take the
341         place of the fib calculations here), and an action step wherein these
342         tasks are performed (where now sleep(1) is).
343         """
344         from time import sleep
345         if self.pool_result is not None:
346             self.pool_result.wait()
347         self.send_all('TURN_FINISHED ' + str(self.world.turn))
348         sleep(1)
349         self.world.turn += 1
350         self.send_all_gamestate()
351         self.pool_result = self.pool.map_async(fib, (35, 35))
352
353
354 def io_loop(q):
355     """Handle commands coming through queue q, send results back.
356
357     Commands from q are expected to be tuples, with the first element either
358     'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
359     an optional third element of arbitrary type. The UUID identifies a
360     receiver for replies.
361
362     An 'ADD_QUEUE' command should contain as third element a queue through
363     which to send messages back to the sender of the command. A 'KILL_QUEUE'
364     command removes the queue for that receiver from the list of queues through
365     which to send replies.
366
367     A 'COMMAND' command is specified in greater detail by a string that is the
368     tuple's third element. CommandHandler takes care of processing this and
369     sending out replies.
370     """
371     queues_out = {}
372     command_handler = CommandHandler(queues_out)
373     while True:
374         x = q.get()
375         command_type = x[0]
376         connection_id = x[1]
377         content = None if len(x) == 2 else x[2]
378         if command_type == 'ADD_QUEUE':
379             queues_out[connection_id] = content
380         elif command_type == 'COMMAND':
381             command_handler.handle_input(content, connection_id)
382         elif command_type == 'KILL_QUEUE':
383             del queues_out[connection_id]
384
385
386 q = queue.Queue()
387 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
388 c.start()
389 server = Server(q, ('localhost', 5000), IO_Handler)
390 try:
391     server.serve_forever()
392 except KeyboardInterrupt:
393     pass
394 finally:
395     print('Killing server')
396     server.server_close()