home · contact · privacy
Refactoring.
[plomrogue2-experiments] / server.py
1 #!/usr/bin/env python3
2
3 import socketserver
4 import threading
5 import queue
6 from parser import ArgError, Parser
7
8
9 # Avoid "Address already in use" errors.
10 socketserver.TCPServer.allow_reuse_address = True
11
12
13 class GameError(Exception):
14     pass
15
16
17 class Server(socketserver.ThreadingTCPServer):
18     """Bind together threaded IO handling server and message queue."""
19
20     def __init__(self, queue, *args, **kwargs):
21         super().__init__(*args, **kwargs)
22         self.queue_out = queue
23         self.daemon_threads = True  # Else, server's threads have daemon=False.
24
25
26 class IO_Handler(socketserver.BaseRequestHandler):
27
28     def handle(self):
29         """Move messages between network socket and main thread via queues.
30
31         On start, sets up new queue, sends it via self.server.queue_out to
32         main thread, and from then on receives messages to send back from the
33         main thread via that new queue.
34
35         At the same time, loops over socket's recv to get messages from the
36         outside via self.server.queue_out into the main thread. Ends connection
37         once a 'QUIT' message is received from socket, and then also kills its
38         own queue.
39
40         All messages to the main thread are tuples, with the first element a
41         meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
42         deletion, and 'COMMAND' for everything else), the second element a UUID
43         that uniquely identifies the thread (so that the main thread knows whom
44         to send replies back to), and optionally a third element for further
45         instructions.
46         """
47         import plom_socket_io
48
49         def caught_send(socket, message):
50             """Send message by socket, catch broken socket connection error."""
51             try:
52                 plom_socket_io.send(socket, message)
53             except plom_socket_io.BrokenSocketConnection:
54                 pass
55
56         def send_queue_messages(socket, queue_in, thread_alive):
57             """Send messages via socket from queue_in while thread_alive[0]."""
58             while thread_alive[0]:
59                 try:
60                     msg = queue_in.get(timeout=1)
61                 except queue.Empty:
62                     continue
63                 caught_send(socket, msg)
64
65         import uuid
66         print('CONNECTION FROM:', str(self.client_address))
67         connection_id = uuid.uuid4()
68         queue_in = queue.Queue()
69         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
70         thread_alive = [True]
71         t = threading.Thread(target=send_queue_messages,
72                              args=(self.request, queue_in, thread_alive))
73         t.start()
74         for message in plom_socket_io.recv(self.request):
75             if message is None:
76                 caught_send(self.request, 'BAD MESSAGE')
77             elif 'QUIT' == message:
78                 caught_send(self.request, 'BYE')
79                 break
80             else:
81                 self.server.queue_out.put(('COMMAND', connection_id, message))
82         self.server.queue_out.put(('KILL_QUEUE', connection_id))
83         thread_alive[0] = False
84         print('CONNECTION CLOSED FROM:', str(self.client_address))
85         self.request.close()
86
87
88 def move_pos(direction, pos_yx):
89     if direction == 'UP':
90         pos_yx[0] -= 1
91     elif direction == 'DOWN':
92         pos_yx[0] += 1
93     elif direction == 'RIGHT':
94         pos_yx[1] += 1
95     elif direction == 'LEFT':
96         pos_yx[1] -= 1
97
98
99 class Task:
100
101     def __init__(self, thing, name, args=(), kwargs={}):
102         self.name = name
103         self.thing = thing
104         self.args = args
105         self.kwargs = kwargs
106         self.todo = 1
107
108     def check(self):
109         if self.name == 'move':
110             if len(self.args) > 0:
111                 direction = self.args[0]
112             else:
113                 direction = self.kwargs['direction']
114             test_pos = self.thing.position[:]
115             move_pos(direction, test_pos)
116             if test_pos[0] < 0 or test_pos[1] < 0 or \
117                test_pos[0] >= self.thing.world.map_size[0] or \
118                test_pos[1] >= self.thing.world.map_size[1]:
119                 raise GameError('would move outside map bounds')
120             pos_i = test_pos[0] * self.thing.world.map_size[1] + test_pos[1]
121             map_tile = self.thing.world.map_[pos_i]
122             if map_tile != '.':
123                 raise GameError('would move into illegal terrain')
124
125
126 class Thing:
127
128     def __init__(self, world, type_, position):
129         self.world = world
130         self.type_ = type_
131         self.position = position
132         self.task = Task(self, 'wait')
133
134     def task_wait(self):
135         pass
136
137     def task_move(self, direction):
138         move_pos(direction, self.position)
139
140     def decide_task(self):
141         if self.position[1] > 1:
142             self.set_task('move', 'LEFT')
143         elif self.position[1] < 3:
144             self.set_task('move', 'RIGHT')
145         else:
146             self.set_task('wait')
147
148     def set_task(self, task, *args, **kwargs):
149         self.task = Task(self, task, args, kwargs)
150         self.task.check()
151
152     def proceed(self, is_AI=True):
153         """Further the thing in its tasks.
154
155         Decrements .task.todo; if it thus falls to <= 0, enacts method whose
156         name is 'task_' + self.task.name and sets .task = None. If is_AI, calls
157         .decide_task to decide a self.task.
158         """
159         self.task.todo -= 1
160         if self.task.todo <= 0:
161             task = getattr(self, 'task_' + self.task.name)
162             task(*self.task.args, **self.task.kwargs)
163             self.task = None
164         if is_AI and self.task is None:
165             self.decide_task()
166
167
168 class World:
169
170     def __init__(self):
171         self.turn = 0
172         self.map_size = (5, 5)
173         self.map_ = 'xxxxx' +\
174                     'x...x' +\
175                     'x.X.x' +\
176                     'x...x' +\
177                     'xxxxx'
178         self.things = [
179             Thing(self, 'human', [3, 3]),
180             Thing(self, 'monster', [1, 1])
181         ]
182         self.player_i = 0
183         self.player = self.things[self.player_i]
184
185
186 def fib(n):
187     """Calculate n-th Fibonacci number. Very inefficiently."""
188     if n in (1, 2):
189         return 1
190     else:
191         return fib(n-1) + fib(n-2)
192
193
194 class CommandHandler:
195
196     def __init__(self, queues_out):
197         from multiprocessing import Pool
198         self.queues_out = queues_out
199         self.world = World()
200         self.parser = Parser(self)
201         # self.pool and self.pool_result are currently only needed by the FIB
202         # command and the demo of a parallelized game loop in cmd_inc_p.
203         self.pool = Pool()
204         self.pool_result = None
205
206     def handle_input(self, input_, connection_id):
207         """Process input_ to command grammar, call command handler if found."""
208         try:
209             command = self.parser.parse(input_)
210             if command is None:
211                 self.send_to(connection_id, 'UNHANDLED INPUT')
212             else:
213                 command(connection_id=connection_id)
214         except ArgError as e:
215             self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
216         except GameError as e:
217             self.send_to(connection_id, 'GAME ERROR: ' + str(e))
218
219     def send_to(self, connection_id, msg):
220         """Send msg to client of connection_id."""
221         self.queues_out[connection_id].put(msg)
222
223     def send_all(self, msg):
224         """Send msg to all clients."""
225         for connection_id in self.queues_out:
226             self.send_to(connection_id, msg)
227
228     def stringify_yx(self, tuple_):
229         """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
230         return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
231
232     def quoted(self, string):
233         """Quote and escape string so client interprets it as single token."""
234         quoted = []
235         quoted += ['"']
236         for c in string:
237             if c in {'"', '\\'}:
238                 quoted += ['\\']
239             quoted += [c]
240         quoted += ['"']
241         return ''.join(quoted)
242
243     def quoted_map(self, map_string, map_width):
244         """Put \n into map_string at map_width intervals, return quoted whole."""
245         map_lines = []
246         map_size = len(map_string)
247         start_cut = 0
248         while start_cut < map_size:
249             limit = start_cut + map_width
250             map_lines += [map_string[start_cut:limit]]
251             start_cut = limit
252         return self.quoted("\n".join(map_lines))
253
254     def send_all_gamestate(self):
255         """Send out game state data relevant to clients."""
256         self.send_all('NEW_TURN ' + str(self.world.turn))
257         self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
258         self.send_all('TERRAIN\n' + self.quoted_map(self.world.map_,
259                                                     self.world.map_size[1]))
260         for thing in self.world.things:
261             self.send_all('THING TYPE:' + thing.type_ + ' '
262                           + self.stringify_yx(thing.position))
263
264     def proceed_to_next_player_turn(self, connection_id):
265         """Run game world turns until player can decide their next step.
266
267         Sends a 'TURN_FINISHED' message, then iterates through all non-player
268         things, on each step furthering them in their tasks (and letting them
269         decide new ones if they finish). The iteration order is: first all
270         things that come after the player in the world things list, then (after
271         incrementing the world turn) all that come before the player; then the
272         player's .proceed() is run, and if it does not finish his task, the
273         loop starts at the beginning. Once the player's task is finished, the
274         loop breaks, and client-relevant game data is sent.
275         """
276         self.send_all('TURN_FINISHED ' + str(self.world.turn))
277         while True:
278             for thing in self.world.things[self.world.player_i+1:]:
279                 thing.proceed()
280             self.world.turn += 1
281             for thing in self.world.things[:self.world.player_i]:
282                 thing.proceed()
283             self.world.player.proceed(is_AI=False)
284             if self.world.player.task is None:
285                 break
286         self.send_all_gamestate()
287
288     def cmd_MOVE(self, direction, connection_id):
289         """Set player task to 'move' with direction arg, finish player turn."""
290         if direction not in {'UP', 'DOWN', 'RIGHT', 'LEFT'}:
291             raise ArgError('Move argument must be one of: '
292                            'UP, DOWN, RIGHT, LEFT')
293         self.world.player.set_task('move', direction=direction)
294         self.proceed_to_next_player_turn(connection_id)
295     cmd_MOVE.argtypes = 'string'
296
297     def cmd_WAIT(self, connection_id):
298         """Set player task to 'wait', finish player turn."""
299         self.world.player.set_task('wait')
300         self.proceed_to_next_player_turn(connection_id)
301
302     def cmd_GET_TURN(self, connection_id):
303         """Send world.turn to caller."""
304         self.send_to(connection_id, str(self.world.turn))
305
306     def cmd_ECHO(self, msg, connection_id):
307         """Send msg to caller."""
308         self.send_to(connection_id, msg)
309     cmd_ECHO.argtypes = 'string'
310
311     def cmd_ALL(self, msg, connection_id):
312         """Send msg to all clients."""
313         self.send_all(msg)
314     cmd_ALL.argtypes = 'string'
315
316     def cmd_FIB(self, numbers, connection_id):
317         """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
318
319         Numbers are calculated in parallel as far as possible, using fib().
320         A 'CALCULATING …' message is sent to caller before the result.
321         """
322         self.send_to(connection_id, 'CALCULATING …')
323         results = self.pool.map(fib, numbers)
324         reply = ' '.join([str(r) for r in results])
325         self.send_to(connection_id, reply)
326     cmd_FIB.argtypes = 'seq:int:nonneg'
327
328     def cmd_INC_P(self, connection_id):
329         """Increment world.turn, send game turn data to everyone.
330
331         To simulate game processing waiting times, a one second delay between
332         TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
333         calculations are started as pool processes that need to be finished
334         until a further INC finishes the turn.
335
336         This is just a demo structure for how the game loop could work when
337         parallelized. One might imagine a two-step game turn, with a non-action
338         step determining actor tasks (the AI determinations would take the
339         place of the fib calculations here), and an action step wherein these
340         tasks are performed (where now sleep(1) is).
341         """
342         from time import sleep
343         if self.pool_result is not None:
344             self.pool_result.wait()
345         self.send_all('TURN_FINISHED ' + str(self.world.turn))
346         sleep(1)
347         self.world.turn += 1
348         self.send_all_gamestate()
349         self.pool_result = self.pool.map_async(fib, (35, 35))
350
351
352 def io_loop(q):
353     """Handle commands coming through queue q, send results back.
354
355     Commands from q are expected to be tuples, with the first element either
356     'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
357     an optional third element of arbitrary type. The UUID identifies a
358     receiver for replies.
359
360     An 'ADD_QUEUE' command should contain as third element a queue through
361     which to send messages back to the sender of the command. A 'KILL_QUEUE'
362     command removes the queue for that receiver from the list of queues through
363     which to send replies.
364
365     A 'COMMAND' command is specified in greater detail by a string that is the
366     tuple's third element. CommandHandler takes care of processing this and
367     sending out replies.
368     """
369     queues_out = {}
370     command_handler = CommandHandler(queues_out)
371     while True:
372         x = q.get()
373         command_type = x[0]
374         connection_id = x[1]
375         content = None if len(x) == 2 else x[2]
376         if command_type == 'ADD_QUEUE':
377             queues_out[connection_id] = content
378         elif command_type == 'COMMAND':
379             command_handler.handle_input(content, connection_id)
380         elif command_type == 'KILL_QUEUE':
381             del queues_out[connection_id]
382
383
384 q = queue.Queue()
385 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
386 c.start()
387 server = Server(q, ('localhost', 5000), IO_Handler)
388 try:
389     server.serve_forever()
390 except KeyboardInterrupt:
391     pass
392 finally:
393     print('Killing server')
394     server.server_close()