home · contact · privacy
Share parsing code between client and server.
[plomrogue2-experiments] / server.py
1 #!/usr/bin/env python3
2
3 import socketserver
4 import threading
5 import queue
6 from parser import ArgError, Parser
7
8
9 # Avoid "Address already in use" errors.
10 socketserver.TCPServer.allow_reuse_address = True
11
12
13 class Server(socketserver.ThreadingTCPServer):
14     """Bind together threaded IO handling server and message queue."""
15
16     def __init__(self, queue, *args, **kwargs):
17         super().__init__(*args, **kwargs)
18         self.queue_out = queue
19         self.daemon_threads = True  # Else, server's threads have daemon=False.
20
21
22 class IO_Handler(socketserver.BaseRequestHandler):
23
24     def handle(self):
25         """Move messages between network socket and main thread via queues.
26
27         On start, sets up new queue, sends it via self.server.queue_out to
28         main thread, and from then on receives messages to send back from the
29         main thread via that new queue.
30
31         At the same time, loops over socket's recv to get messages from the
32         outside via self.server.queue_out into the main thread. Ends connection
33         once a 'QUIT' message is received from socket, and then also kills its
34         own queue.
35
36         All messages to the main thread are tuples, with the first element a
37         meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
38         deletion, and 'COMMAND' for everything else), the second element a UUID
39         that uniquely identifies the thread (so that the main thread knows whom
40         to send replies back to), and optionally a third element for further
41         instructions.
42         """
43         import plom_socket_io
44
45         def caught_send(socket, message):
46             """Send message by socket, catch broken socket connection error."""
47             try:
48                 plom_socket_io.send(socket, message)
49             except plom_socket_io.BrokenSocketConnection:
50                 pass
51
52         def send_queue_messages(socket, queue_in, thread_alive):
53             """Send messages via socket from queue_in while thread_alive[0]."""
54             while thread_alive[0]:
55                 try:
56                     msg = queue_in.get(timeout=1)
57                 except queue.Empty:
58                     continue
59                 caught_send(socket, msg)
60
61         import uuid
62         print('CONNECTION FROM:', str(self.client_address))
63         connection_id = uuid.uuid4()
64         queue_in = queue.Queue()
65         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
66         thread_alive = [True]
67         t = threading.Thread(target=send_queue_messages,
68                              args=(self.request, queue_in, thread_alive))
69         t.start()
70         for message in plom_socket_io.recv(self.request):
71             if message is None:
72                 caught_send(self.request, 'BAD MESSAGE')
73             elif 'QUIT' == message:
74                 caught_send(self.request, 'BYE')
75                 break
76             else:
77                 self.server.queue_out.put(('COMMAND', connection_id, message))
78         self.server.queue_out.put(('KILL_QUEUE', connection_id))
79         thread_alive[0] = False
80         print('CONNECTION CLOSED FROM:', str(self.client_address))
81         self.request.close()
82
83
84 class Task:
85
86     def __init__(self, name, args=(), kwargs={}):
87         self.name = name
88         self.args = args
89         self.kwargs = kwargs
90         self.todo = 1
91
92
93 class Thing:
94
95     def __init__(self, type_, position):
96         self.type = type_
97         self.position = position
98         self.task = Task('wait')
99
100     def task_wait(self):
101         pass
102
103     def task_move(self, direction):
104         if direction == 'UP':
105             self.position[0] -= 1
106         elif direction == 'DOWN':
107             self.position[0] += 1
108         elif direction == 'RIGHT':
109             self.position[1] += 1
110         elif direction == 'LEFT':
111             self.position[1] -= 1
112
113     def decide_task(self):
114         if self.position[1] > 1:
115             self.set_task('move', 'LEFT')
116         elif self.position[1] < 3:
117             self.set_task('move', 'RIGHT')
118         else:
119             self.set_task('wait')
120
121     def set_task(self, task, *args, **kwargs):
122         self.task = Task(task, args, kwargs)
123
124     def proceed(self, is_AI=True):
125         """Further the thing in its tasks.
126
127         Decrements .task.todo; if it thus falls to <= 0, enacts method whose
128         name is 'task_' + self.task.name and sets .task = None. If is_AI, calls
129         .decide_task to decide a self.task.
130         """
131         self.task.todo -= 1
132         if self.task.todo <= 0:
133             task = getattr(self, 'task_' + self.task.name)
134             task(*self.task.args, **self.task.kwargs)
135             self.task = None
136         if is_AI and self.task is None:
137             self.decide_task()
138
139
140 class World:
141
142     def __init__(self):
143         self.turn = 0
144         self.map_size = (5, 5)
145         self.map_ = 'xxxxx\n' +\
146                     'x...x\n' +\
147                     'x.X.x\n' +\
148                     'x...x\n' +\
149                     'xxxxx'
150         self.things = [Thing('human', [3, 3]), Thing('monster', [1, 1])]
151         self.player_i = 0
152         self.player = self.things[self.player_i]
153
154
155 def fib(n):
156     """Calculate n-th Fibonacci number. Very inefficiently."""
157     if n in (1, 2):
158         return 1
159     else:
160         return fib(n-1) + fib(n-2)
161
162
163 class CommandHandler:
164
165     def __init__(self, queues_out):
166         from multiprocessing import Pool
167         self.queues_out = queues_out
168         self.world = World()
169         self.parser = Parser(self)
170         # self.pool and self.pool_result are currently only needed by the FIB
171         # command and the demo of a parallelized game loop in cmd_inc_p.
172         self.pool = Pool()
173         self.pool_result = None
174
175     def send_to(self, connection_id, msg):
176         """Send msg to client of connection_id."""
177         self.queues_out[connection_id].put(msg)
178
179     def send_all(self, msg):
180         """Send msg to all clients."""
181         for connection_id in self.queues_out:
182             self.send_to(connection_id, msg)
183
184     def stringify_yx(self, tuple_):
185         """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
186         return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
187
188     def quoted(self, string):
189         """Quote and escape string so client interprets it as single token."""
190         quoted = []
191         quoted += ['"']
192         for c in string:
193             if c in {'"', '\\'}:
194                 quoted += ['\\']
195             quoted += [c]
196         quoted += ['"']
197         return ''.join(quoted)
198
199     def proceed_to_next_player_turn(self, connection_id):
200         """Run game world turns until player can decide their next step.
201
202         Sends a 'TURN_FINISHED' message, then iterates through all non-player
203         things, on each step furthering them in their tasks (and letting them
204         decide new ones if they finish). The iteration order is: first all
205         things that come after the player in the world things list, then (after
206         incrementing the world turn) all that come before the player; then the
207         player's .proceed() is run, and if it does not finish his task, the
208         loop starts at the beginning. Once the player's task is finished, the
209         loop breaks, and client-relevant game data is sent.
210         """
211         self.send_all('TURN_FINISHED ' + str(self.world.turn))
212         while True:
213             for thing in self.world.things[self.world.player_i+1:]:
214                 thing.proceed()
215             self.world.turn += 1
216             for thing in self.world.things[:self.world.player_i]:
217                 thing.proceed()
218             self.world.player.proceed(is_AI=False)
219             if self.world.player.task is None:
220                 break
221         self.send_all('NEW_TURN ' + str(self.world.turn))
222         self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
223         self.send_all('TERRAIN\n' + self.quoted(self.world.map_))
224         for thing in self.world.things:
225             self.send_all('THING TYPE:' + thing.type + ' '
226                           + self.stringify_yx(thing.position))
227
228     def cmd_FIB(self, numbers, connection_id):
229         """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
230
231         Numbers are calculated in parallel as far as possible, using fib().
232         A 'CALCULATING …' message is sent to caller before the result.
233         """
234         self.send_to(connection_id, 'CALCULATING …')
235         results = self.pool.map(fib, numbers)
236         reply = ' '.join([str(r) for r in results])
237         self.send_to(connection_id, reply)
238     cmd_FIB.argtypes = 'seq:int:nonneg'
239
240     def cmd_INC_P(self, connection_id):
241         """Increment world.turn, send game turn data to everyone.
242
243         To simulate game processing waiting times, a one second delay between
244         TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
245         calculations are started as pool processes that need to be finished
246         until a further INC finishes the turn.
247
248         This is just a demo structure for how the game loop could work when
249         parallelized. One might imagine a two-step game turn, with a non-action
250         step determining actor tasks (the AI determinations would take the
251         place of the fib calculations here), and an action step wherein these
252         tasks are performed (where now sleep(1) is).
253         """
254         from time import sleep
255         if self.pool_result is not None:
256             self.pool_result.wait()
257         self.send_all('TURN_FINISHED ' + str(self.world.turn))
258         sleep(1)
259         self.world.turn += 1
260         self.send_all('NEW_TURN ' + str(self.world.turn))
261         self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
262         self.send_all('TERRAIN\n' + self.quoted(self.world.map_))
263         for thing in self.world.things:
264             self.send_all('THING TYPE:' + thing.type + ' '
265                           + self.stringify_yx(thing.position))
266         self.pool_result = self.pool.map_async(fib, (35, 35))
267
268     def cmd_GET_TURN(self, connection_id):
269         """Send world.turn to caller."""
270         self.send_to(connection_id, str(self.world.turn))
271
272     def cmd_MOVE(self, direction, connection_id):
273         """Set player task to 'move' with direction arg, finish player turn."""
274         if direction not in {'UP', 'DOWN', 'RIGHT', 'LEFT'}:
275             raise ArgError('Move argument must be one of: '
276                            'UP, DOWN, RIGHT, LEFT')
277         self.world.player.set_task('move', direction=direction)
278         self.proceed_to_next_player_turn(connection_id)
279     cmd_MOVE.argtypes = 'string'
280
281     def cmd_WAIT(self, connection_id):
282         """Set player task to 'wait', finish player turn."""
283         self.world.player.set_task('wait')
284         self.proceed_to_next_player_turn(connection_id)
285
286     def cmd_ECHO(self, msg, connection_id):
287         """Send msg to caller."""
288         self.send_to(connection_id, msg)
289     cmd_ECHO.argtypes = 'string'
290
291     def cmd_ALL(self, msg, connection_id):
292         """Send msg to all clients."""
293         self.send_all(msg)
294     cmd_ALL.argtypes = 'string'
295
296     def handle_input(self, input_, connection_id):
297         """Process input_ to command grammar, call command handler if found."""
298         try:
299             command = self.parser.parse(input_)
300             if command is None:
301                 self.send_to(connection_id, 'UNHANDLED INPUT')
302             else:
303                 command(connection_id=connection_id)
304         except ArgError as e:
305             self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
306
307
308 def io_loop(q):
309     """Handle commands coming through queue q, send results back.
310
311     Commands from q are expected to be tuples, with the first element either
312     'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
313     an optional third element of arbitrary type. The UUID identifies a
314     receiver for replies.
315
316     An 'ADD_QUEUE' command should contain as third element a queue through
317     which to send messages back to the sender of the command. A 'KILL_QUEUE'
318     command removes the queue for that receiver from the list of queues through
319     which to send replies.
320
321     A 'COMMAND' command is specified in greater detail by a string that is the
322     tuple's third element. CommandHandler takes care of processing this and
323     sending out replies.
324     """
325     queues_out = {}
326     command_handler = CommandHandler(queues_out)
327     while True:
328         x = q.get()
329         command_type = x[0]
330         connection_id = x[1]
331         content = None if len(x) == 2 else x[2]
332         if command_type == 'ADD_QUEUE':
333             queues_out[connection_id] = content
334         elif command_type == 'COMMAND':
335             command_handler.handle_input(content, connection_id)
336         elif command_type == 'KILL_QUEUE':
337             del queues_out[connection_id]
338
339
340 q = queue.Queue()
341 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
342 c.start()
343 server = Server(q, ('localhost', 5000), IO_Handler)
344 try:
345     server.serve_forever()
346 except KeyboardInterrupt:
347     pass
348 finally:
349     print('Killing server')
350     server.server_close()