home · contact · privacy
Add basic movement AI test, extend move command directions.
[plomrogue2-experiments] / server.py
1 #!/usr/bin/env python3
2
3 import socketserver
4 import threading
5 import queue
6
7 # Avoid "Address already in use" errors.
8 socketserver.TCPServer.allow_reuse_address = True
9
10
11 class Server(socketserver.ThreadingTCPServer):
12     """Bind together threaded IO handling server and message queue."""
13
14     def __init__(self, queue, *args, **kwargs):
15         super().__init__(*args, **kwargs)
16         self.queue_out = queue
17         self.daemon_threads = True  # Else, server's threads have daemon=False.
18
19
20 class IO_Handler(socketserver.BaseRequestHandler):
21
22     def handle(self):
23         """Move messages between network socket and main thread via queues.
24
25         On start, sets up new queue, sends it via self.server.queue_out to
26         main thread, and from then on receives messages to send back from the
27         main thread via that new queue.
28
29         At the same time, loops over socket's recv to get messages from the
30         outside via self.server.queue_out into the main thread. Ends connection
31         once a 'QUIT' message is received from socket, and then also kills its
32         own queue.
33
34         All messages to the main thread are tuples, with the first element a
35         meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
36         deletion, and 'COMMAND' for everything else), the second element a UUID
37         that uniquely identifies the thread (so that the main thread knows whom
38         to send replies back to), and optionally a third element for further
39         instructions.
40         """
41         import plom_socket_io
42
43         def caught_send(socket, message):
44             """Send message by socket, catch broken socket connection error."""
45             try:
46                 plom_socket_io.send(socket, message)
47             except plom_socket_io.BrokenSocketConnection:
48                 pass
49
50         def send_queue_messages(socket, queue_in, thread_alive):
51             """Send messages via socket from queue_in while thread_alive[0]."""
52             while thread_alive[0]:
53                 try:
54                     msg = queue_in.get(timeout=1)
55                 except queue.Empty:
56                     continue
57                 caught_send(socket, msg)
58
59         import uuid
60         print('CONNECTION FROM:', str(self.client_address))
61         connection_id = uuid.uuid4()
62         queue_in = queue.Queue()
63         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
64         thread_alive = [True]
65         t = threading.Thread(target=send_queue_messages,
66                              args=(self.request, queue_in, thread_alive))
67         t.start()
68         for message in plom_socket_io.recv(self.request):
69             if message is None:
70                 caught_send(self.request, 'BAD MESSAGE')
71             elif 'QUIT' == message:
72                 caught_send(self.request, 'BYE')
73                 break
74             else:
75                 self.server.queue_out.put(('COMMAND', connection_id, message))
76         self.server.queue_out.put(('KILL_QUEUE', connection_id))
77         thread_alive[0] = False
78         print('CONNECTION CLOSED FROM:', str(self.client_address))
79         self.request.close()
80
81
82 class Task:
83
84     def __init__(self, name, args=(), kwargs={}):
85         self.name = name
86         self.args = args
87         self.kwargs = kwargs
88         self.todo = 1
89
90
91 class Thing:
92
93     def __init__(self, type_, position):
94         self.type = type_
95         self.position = position
96         self.task = Task('wait')
97
98     def task_wait(self):
99         pass
100
101     def task_move(self, direction):
102         if direction == 'UP':
103             self.position[0] -= 1
104         elif direction == 'DOWN':
105             self.position[0] += 1
106         elif direction == 'RIGHT':
107             self.position[1] += 1
108         elif direction == 'LEFT':
109             self.position[1] -= 1
110
111     def decide_task(self):
112         if self.position[1] > 1:
113             self.set_task('move', 'LEFT')
114         elif self.position[1] < 3:
115             self.set_task('move', 'RIGHT')
116         else:
117             self.set_task('wait')
118
119     def set_task(self, task, *args, **kwargs):
120         self.task = Task(task, args, kwargs)
121
122     def proceed(self, is_AI=True):
123         """Further the thing in its tasks.
124
125         Decrements .task.todo; if it thus falls to <= 0, enacts method whose
126         name is 'task_' + self.task.name and sets .task = None. If is_AI, calls
127         .decide_task to decide a self.task.
128         """
129         self.task.todo -= 1
130         if self.task.todo <= 0:
131             task= getattr(self, 'task_' + self.task.name)
132             task(*self.task.args, **self.task.kwargs)
133             self.task = None
134         if is_AI and self.task is None:
135             self.decide_task()
136
137
138 class World:
139
140     def __init__(self):
141         self.turn = 0
142         self.map_size = (5, 5)
143         self.map_ = 'xxxxx\n'+\
144                     'x...x\n'+\
145                     'x.X.x\n'+\
146                     'x...x\n'+\
147                     'xxxxx'
148         self.things = [Thing('human', [3, 3]), Thing('monster', [1, 1])]
149         self.player_i = 0
150         self.player = self.things[self.player_i]
151
152
153 def fib(n):
154     """Calculate n-th Fibonacci number. Very inefficiently."""
155     if n in (1, 2):
156         return 1
157     else:
158         return fib(n-1) + fib(n-2)
159
160
161 class ArgumentError(Exception):
162     pass
163
164
165 class CommandHandler:
166
167     def __init__(self, queues_out):
168         from multiprocessing import Pool
169         self.queues_out = queues_out
170         self.world = World()
171         # self.pool and self.pool_result are currently only needed by the FIB
172         # command and the demo of a parallelized game loop in cmd_inc_p.
173         self.pool = Pool()
174         self.pool_result = None
175
176     def send_to(self, connection_id, msg):
177         """Send msg to client of connection_id."""
178         self.queues_out[connection_id].put(msg)
179
180     def send_all(self, msg):
181         """Send msg to all clients."""
182         for connection_id in self.queues_out:
183             self.send_to(connection_id, msg)
184
185     def stringify_yx(self, tuple_):
186         """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
187         return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
188
189     def proceed_to_next_player_turn(self, connection_id):
190         """Run game world turns until player can decide their next step.
191
192         Sends a 'TURN_FINISHED' message, then iterates through all non-player
193         things, on each step furthering them in their tasks (and letting them
194         decide new ones if they finish). The iteration order is: first all
195         things that come after the player in the world things list, then (after
196         incrementing the world turn) all that come before the player; then the
197         player's .proceed() is run, and if it does not finish his task, the
198         loop starts at the beginning. Once the player's task is finished, the
199         loop breaks, and client-relevant game data is sent.
200         """
201         self.send_all('TURN_FINISHED ' + str(self.world.turn))
202         while True:
203             for thing in self.world.things[self.world.player_i+1:]:
204                 thing.proceed()
205             self.world.turn += 1
206             for thing  in self.world.things[:self.world.player_i]:
207                 thing.proceed()
208             self.world.player.proceed(is_AI=False)
209             if self.world.player.task is None:
210                 break
211         self.send_all('NEW_TURN ' + str(self.world.turn))
212         self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
213         self.send_all('TERRAIN\n' + self.world.map_)
214         for thing in self.world.things:
215             self.send_all('THING TYPE:' + thing.type + ' '
216                           + self.stringify_yx(thing.position))
217
218     def cmd_fib(self, tokens, connection_id):
219         """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
220
221         Numbers are calculated in parallel as far as possible, using fib().
222         A 'CALCULATING …' message is sent to caller before the result.
223         """
224         if len(tokens) < 2:
225             raise ArgumentError('FIB NEEDS AT LEAST ONE ARGUMENT')
226         numbers = []
227         for token in tokens[1:]:
228             if token == '0' or not token.isdigit():
229                 raise ArgumentError('FIB ARGUMENTS MUST BE INTEGERS > 0')
230             numbers += [int(token)]
231         self.send_to(connection_id, 'CALCULATING …')
232         results = self.pool.map(fib, numbers)
233         reply = ' '.join([str(r) for r in results])
234         self.send_to(connection_id, reply)
235
236     def cmd_inc_p(self, connection_id):
237         """Increment world.turn, send game turn data to everyone.
238
239         To simulate game processing waiting times, a one second delay between
240         TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
241         calculations are started as pool processes that need to be finished
242         until a further INC finishes the turn.
243
244         This is just a demo structure for how the game loop could work when
245         parallelized. One might imagine a two-step game turn, with a non-action
246         step determining actor tasks (the AI determinations would take the
247         place of the fib calculations here), and an action step wherein these
248         tasks are performed (where now sleep(1) is).
249         """
250         from time import sleep
251         if self.pool_result is not None:
252             self.pool_result.wait()
253         self.send_all('TURN_FINISHED ' + str(self.world.turn))
254         sleep(1)
255         self.world.turn += 1
256         self.send_all('NEW_TURN ' + str(self.world.turn))
257         self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
258         self.send_all('TERRAIN\n' + self.world.map_)
259         for thing in self.world.things:
260             self.send_all('THING TYPE:' + thing.type + ' '
261                           + self.stringify_yx(thing.position))
262         self.pool_result = self.pool.map_async(fib, (35, 35))
263
264     def cmd_get_turn(self, connection_id):
265         """Send world.turn to caller."""
266         self.send_to(connection_id, str(self.world.turn))
267
268     def cmd_move(self, direction, connection_id):
269         """Set player task to 'move' with direction arg, finish player turn."""
270         if not direction in {'UP', 'DOWN', 'RIGHT', 'LEFT'}:
271             raise ArgumentError('MOVE ARGUMENT MUST BE ONE OF: '
272                                 'UP, DOWN, RIGHT, LEFT')
273         self.world.player.set_task('move', direction=direction)
274         self.proceed_to_next_player_turn(connection_id)
275
276     def cmd_wait(self, connection_id):
277         """Set player task to 'wait', finish player turn."""
278         self.world.player.set_task('wait')
279         self.proceed_to_next_player_turn(connection_id)
280
281     def cmd_echo(self, tokens, input_, connection_id):
282         """Send message in input_ beyond tokens[0] to caller."""
283         msg = input_[len(tokens[0]) + 1:]
284         self.send_to(connection_id, msg)
285
286     def cmd_all(self, tokens, input_):
287         """Send message in input_ beyond tokens[0] to all clients."""
288         msg = input_[len(tokens[0]) + 1:]
289         self.send_all(msg)
290
291     def handle_input(self, input_, connection_id):
292         """Process input_ to command grammar, call command handler if found."""
293         tokens = [token for token in input_.split(' ') if len(token) > 0]
294         try:
295             if len(tokens) == 0:
296                 self.send_to(connection_id, 'EMPTY COMMAND')
297             elif len(tokens) == 1 and tokens[0] == 'INC_P':
298                 self.cmd_inc_p(connection_id)
299             elif len(tokens) == 1 and tokens[0] == 'GET_TURN':
300                 self.cmd_get_turn(connection_id)
301             elif len(tokens) == 1 and tokens[0] == 'WAIT':
302                 self.cmd_wait(connection_id)
303             elif len(tokens) == 2 and tokens[0] == 'MOVE':
304                 self.cmd_move(tokens[1], connection_id)
305             elif len(tokens) >= 1 and tokens[0] == 'ECHO':
306                 self.cmd_echo(tokens, input_, connection_id)
307             elif len(tokens) >= 1 and tokens[0] == 'ALL':
308                 self.cmd_all(tokens, input_)
309             elif len(tokens) >= 1 and tokens[0] == 'FIB':
310                 # TODO: Should this really block the whole loop?
311                 self.cmd_fib(tokens, connection_id)
312             else:
313                 self.send_to(connection_id, 'UNKNOWN COMMAND')
314         except ArgumentError as e:
315             self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
316
317
318 def io_loop(q):
319     """Handle commands coming through queue q, send results back.
320
321     Commands from q are expected to be tuples, with the first element either
322     'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
323     an optional third element of arbitrary type. The UUID identifies a
324     receiver for replies.
325
326     An 'ADD_QUEUE' command should contain as third element a queue through
327     which to send messages back to the sender of the command. A 'KILL_QUEUE'
328     command removes the queue for that receiver from the list of queues through
329     which to send replies.
330
331     A 'COMMAND' command is specified in greater detail by a string that is the
332     tuple's third element. CommandHandler takes care of processing this and
333     sending out replies.
334     """
335     queues_out = {}
336     command_handler = CommandHandler(queues_out)
337     while True:
338         x = q.get()
339         command_type = x[0]
340         connection_id = x[1]
341         content = None if len(x) == 2 else x[2]
342         if command_type == 'ADD_QUEUE':
343             queues_out[connection_id] = content
344         elif command_type == 'COMMAND':
345             command_handler.handle_input(content, connection_id)
346         elif command_type == 'KILL_QUEUE':
347             del queues_out[connection_id]
348
349
350 q = queue.Queue()
351 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
352 c.start()
353 server = Server(q, ('localhost', 5000), IO_Handler)
354 try:
355     server.serve_forever()
356 except KeyboardInterrupt:
357     pass
358 finally:
359     print('Killing server')
360     server.server_close()