home · contact · privacy
Minor refactoring.
[plomrogue2-experiments] / server.py
1 #!/usr/bin/env python3
2
3 import socketserver
4 import threading
5 import queue
6 from parser import ArgError, Parser
7
8
9 # Avoid "Address already in use" errors.
10 socketserver.TCPServer.allow_reuse_address = True
11
12
13 class GameError(Exception):
14     pass
15
16
17 class Server(socketserver.ThreadingTCPServer):
18     """Bind together threaded IO handling server and message queue."""
19
20     def __init__(self, queue, *args, **kwargs):
21         super().__init__(*args, **kwargs)
22         self.queue_out = queue
23         self.daemon_threads = True  # Else, server's threads have daemon=False.
24
25
26 class IO_Handler(socketserver.BaseRequestHandler):
27
28     def handle(self):
29         """Move messages between network socket and main thread via queues.
30
31         On start, sets up new queue, sends it via self.server.queue_out to
32         main thread, and from then on receives messages to send back from the
33         main thread via that new queue.
34
35         At the same time, loops over socket's recv to get messages from the
36         outside via self.server.queue_out into the main thread. Ends connection
37         once a 'QUIT' message is received from socket, and then also kills its
38         own queue.
39
40         All messages to the main thread are tuples, with the first element a
41         meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
42         deletion, and 'COMMAND' for everything else), the second element a UUID
43         that uniquely identifies the thread (so that the main thread knows whom
44         to send replies back to), and optionally a third element for further
45         instructions.
46         """
47         import plom_socket_io
48
49         def caught_send(socket, message):
50             """Send message by socket, catch broken socket connection error."""
51             try:
52                 plom_socket_io.send(socket, message)
53             except plom_socket_io.BrokenSocketConnection:
54                 pass
55
56         def send_queue_messages(socket, queue_in, thread_alive):
57             """Send messages via socket from queue_in while thread_alive[0]."""
58             while thread_alive[0]:
59                 try:
60                     msg = queue_in.get(timeout=1)
61                 except queue.Empty:
62                     continue
63                 caught_send(socket, msg)
64
65         import uuid
66         print('CONNECTION FROM:', str(self.client_address))
67         connection_id = uuid.uuid4()
68         queue_in = queue.Queue()
69         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
70         thread_alive = [True]
71         t = threading.Thread(target=send_queue_messages,
72                              args=(self.request, queue_in, thread_alive))
73         t.start()
74         for message in plom_socket_io.recv(self.request):
75             if message is None:
76                 caught_send(self.request, 'BAD MESSAGE')
77             elif 'QUIT' == message:
78                 caught_send(self.request, 'BYE')
79                 break
80             else:
81                 self.server.queue_out.put(('COMMAND', connection_id, message))
82         self.server.queue_out.put(('KILL_QUEUE', connection_id))
83         thread_alive[0] = False
84         print('CONNECTION CLOSED FROM:', str(self.client_address))
85         self.request.close()
86
87
88 class Task:
89
90     def __init__(self, name, args=(), kwargs={}):
91         self.name = name
92         self.args = args
93         self.kwargs = kwargs
94         self.todo = 1
95
96
97 class Thing:
98
99     def __init__(self, world, type_, position):
100         self.world = world
101         self.type_ = type_
102         self.position = position
103         self.task = Task('wait')
104
105     def _move_pos(self, direction, pos_yx):
106         if direction == 'UP':
107             pos_yx[0] -= 1
108         elif direction == 'DOWN':
109             pos_yx[0] += 1
110         elif direction == 'RIGHT':
111             pos_yx[1] += 1
112         elif direction == 'LEFT':
113             pos_yx[1] -= 1
114
115     def task_wait(self):
116         pass
117
118     def task_move(self, direction):
119         self._move_pos(direction, self.position)
120
121     def decide_task(self):
122         if self.position[1] > 1:
123             self.set_task('move', 'LEFT')
124         elif self.position[1] < 3:
125             self.set_task('move', 'RIGHT')
126         else:
127             self.set_task('wait')
128
129     def check_task(self, task, *args, **kwargs):
130         if task == 'move':
131             if len(args) > 0:
132                 direction = args[0]
133             else:
134                 direction = kwargs['direction']
135             test_pos = self.position[:]
136             self._move_pos(direction, test_pos)
137             if test_pos[0] < 0 or test_pos[1] < 0 or \
138                test_pos[0] >= self.world.map_size[0] or \
139                test_pos[1] >= self.world.map_size[1]:
140                 raise GameError('would move outside map bounds')
141             pos_i = test_pos[0] * self.world.map_size[1] + test_pos[1]
142             map_tile = self.world.map_[pos_i]
143             if map_tile != '.':
144                 raise GameError('would move into illegal terrain')
145
146     def set_task(self, task, *args, **kwargs):
147         self.check_task(task, *args, **kwargs)
148         self.task = Task(task, args, kwargs)
149
150     def proceed(self, is_AI=True):
151         """Further the thing in its tasks.
152
153         Decrements .task.todo; if it thus falls to <= 0, enacts method whose
154         name is 'task_' + self.task.name and sets .task = None. If is_AI, calls
155         .decide_task to decide a self.task.
156         """
157         self.task.todo -= 1
158         if self.task.todo <= 0:
159             task = getattr(self, 'task_' + self.task.name)
160             task(*self.task.args, **self.task.kwargs)
161             self.task = None
162         if is_AI and self.task is None:
163             self.decide_task()
164
165
166 class World:
167
168     def __init__(self):
169         self.turn = 0
170         self.map_size = (5, 5)
171         self.map_ = 'xxxxx' +\
172                     'x...x' +\
173                     'x.X.x' +\
174                     'x...x' +\
175                     'xxxxx'
176         self.things = [
177             Thing(self, 'human', [3, 3]),
178             Thing(self, 'monster', [1, 1])
179         ]
180         self.player_i = 0
181         self.player = self.things[self.player_i]
182
183
184 def fib(n):
185     """Calculate n-th Fibonacci number. Very inefficiently."""
186     if n in (1, 2):
187         return 1
188     else:
189         return fib(n-1) + fib(n-2)
190
191
192 class CommandHandler:
193
194     def __init__(self, queues_out):
195         from multiprocessing import Pool
196         self.queues_out = queues_out
197         self.world = World()
198         self.parser = Parser(self)
199         # self.pool and self.pool_result are currently only needed by the FIB
200         # command and the demo of a parallelized game loop in cmd_inc_p.
201         self.pool = Pool()
202         self.pool_result = None
203
204     def handle_input(self, input_, connection_id):
205         """Process input_ to command grammar, call command handler if found."""
206         try:
207             command = self.parser.parse(input_)
208             if command is None:
209                 self.send_to(connection_id, 'UNHANDLED INPUT')
210             else:
211                 command(connection_id=connection_id)
212         except ArgError as e:
213             self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
214         except GameError as e:
215             self.send_to(connection_id, 'GAME ERROR: ' + str(e))
216
217     def send_to(self, connection_id, msg):
218         """Send msg to client of connection_id."""
219         self.queues_out[connection_id].put(msg)
220
221     def send_all(self, msg):
222         """Send msg to all clients."""
223         for connection_id in self.queues_out:
224             self.send_to(connection_id, msg)
225
226     def stringify_yx(self, tuple_):
227         """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
228         return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
229
230     def quoted(self, string):
231         """Quote and escape string so client interprets it as single token."""
232         quoted = []
233         quoted += ['"']
234         for c in string:
235             if c in {'"', '\\'}:
236                 quoted += ['\\']
237             quoted += [c]
238         quoted += ['"']
239         return ''.join(quoted)
240
241     def quoted_map(self, map_string, map_width):
242         """Put \n into map_string at map_width intervals, return quoted whole."""
243         map_lines = []
244         map_size = len(map_string)
245         start_cut = 0
246         while start_cut < map_size:
247             limit = start_cut + map_width
248             map_lines += [map_string[start_cut:limit]]
249             start_cut = limit
250         return self.quoted("\n".join(map_lines))
251
252     def send_all_gamestate(self):
253         """Send out game state data relevant to clients."""
254         self.send_all('NEW_TURN ' + str(self.world.turn))
255         self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
256         self.send_all('TERRAIN\n' + self.quoted_map(self.world.map_,
257                                                     self.world.map_size[1]))
258         for thing in self.world.things:
259             self.send_all('THING TYPE:' + thing.type_ + ' '
260                           + self.stringify_yx(thing.position))
261
262     def proceed_to_next_player_turn(self, connection_id):
263         """Run game world turns until player can decide their next step.
264
265         Sends a 'TURN_FINISHED' message, then iterates through all non-player
266         things, on each step furthering them in their tasks (and letting them
267         decide new ones if they finish). The iteration order is: first all
268         things that come after the player in the world things list, then (after
269         incrementing the world turn) all that come before the player; then the
270         player's .proceed() is run, and if it does not finish his task, the
271         loop starts at the beginning. Once the player's task is finished, the
272         loop breaks, and client-relevant game data is sent.
273         """
274         self.send_all('TURN_FINISHED ' + str(self.world.turn))
275         while True:
276             for thing in self.world.things[self.world.player_i+1:]:
277                 thing.proceed()
278             self.world.turn += 1
279             for thing in self.world.things[:self.world.player_i]:
280                 thing.proceed()
281             self.world.player.proceed(is_AI=False)
282             if self.world.player.task is None:
283                 break
284         self.send_all_gamestate()
285
286     def cmd_MOVE(self, direction, connection_id):
287         """Set player task to 'move' with direction arg, finish player turn."""
288         if direction not in {'UP', 'DOWN', 'RIGHT', 'LEFT'}:
289             raise ArgError('Move argument must be one of: '
290                            'UP, DOWN, RIGHT, LEFT')
291         self.world.player.set_task('move', direction=direction)
292         self.proceed_to_next_player_turn(connection_id)
293     cmd_MOVE.argtypes = 'string'
294
295     def cmd_WAIT(self, connection_id):
296         """Set player task to 'wait', finish player turn."""
297         self.world.player.set_task('wait')
298         self.proceed_to_next_player_turn(connection_id)
299
300     def cmd_GET_TURN(self, connection_id):
301         """Send world.turn to caller."""
302         self.send_to(connection_id, str(self.world.turn))
303
304     def cmd_ECHO(self, msg, connection_id):
305         """Send msg to caller."""
306         self.send_to(connection_id, msg)
307     cmd_ECHO.argtypes = 'string'
308
309     def cmd_ALL(self, msg, connection_id):
310         """Send msg to all clients."""
311         self.send_all(msg)
312     cmd_ALL.argtypes = 'string'
313
314     def cmd_FIB(self, numbers, connection_id):
315         """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
316
317         Numbers are calculated in parallel as far as possible, using fib().
318         A 'CALCULATING …' message is sent to caller before the result.
319         """
320         self.send_to(connection_id, 'CALCULATING …')
321         results = self.pool.map(fib, numbers)
322         reply = ' '.join([str(r) for r in results])
323         self.send_to(connection_id, reply)
324     cmd_FIB.argtypes = 'seq:int:nonneg'
325
326     def cmd_INC_P(self, connection_id):
327         """Increment world.turn, send game turn data to everyone.
328
329         To simulate game processing waiting times, a one second delay between
330         TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
331         calculations are started as pool processes that need to be finished
332         until a further INC finishes the turn.
333
334         This is just a demo structure for how the game loop could work when
335         parallelized. One might imagine a two-step game turn, with a non-action
336         step determining actor tasks (the AI determinations would take the
337         place of the fib calculations here), and an action step wherein these
338         tasks are performed (where now sleep(1) is).
339         """
340         from time import sleep
341         if self.pool_result is not None:
342             self.pool_result.wait()
343         self.send_all('TURN_FINISHED ' + str(self.world.turn))
344         sleep(1)
345         self.world.turn += 1
346         self.send_all_gamestate()
347         self.pool_result = self.pool.map_async(fib, (35, 35))
348
349
350 def io_loop(q):
351     """Handle commands coming through queue q, send results back.
352
353     Commands from q are expected to be tuples, with the first element either
354     'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
355     an optional third element of arbitrary type. The UUID identifies a
356     receiver for replies.
357
358     An 'ADD_QUEUE' command should contain as third element a queue through
359     which to send messages back to the sender of the command. A 'KILL_QUEUE'
360     command removes the queue for that receiver from the list of queues through
361     which to send replies.
362
363     A 'COMMAND' command is specified in greater detail by a string that is the
364     tuple's third element. CommandHandler takes care of processing this and
365     sending out replies.
366     """
367     queues_out = {}
368     command_handler = CommandHandler(queues_out)
369     while True:
370         x = q.get()
371         command_type = x[0]
372         connection_id = x[1]
373         content = None if len(x) == 2 else x[2]
374         if command_type == 'ADD_QUEUE':
375             queues_out[connection_id] = content
376         elif command_type == 'COMMAND':
377             command_handler.handle_input(content, connection_id)
378         elif command_type == 'KILL_QUEUE':
379             del queues_out[connection_id]
380
381
382 q = queue.Queue()
383 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
384 c.start()
385 server = Server(q, ('localhost', 5000), IO_Handler)
386 try:
387     server.serve_forever()
388 except KeyboardInterrupt:
389     pass
390 finally:
391     print('Killing server')
392     server.server_close()