home · contact · privacy
Lots of refactoring.
[plomrogue2-experiments] / server.py
1 #!/usr/bin/env python3
2
3 import socketserver
4 import threading
5 import queue
6
7 # Avoid "Address already in use" errors.
8 socketserver.TCPServer.allow_reuse_address = True
9
10
11 class Server(socketserver.ThreadingTCPServer):
12     """Bind together threaded IO handling server and message queue."""
13
14     def __init__(self, queue, *args, **kwargs):
15         super().__init__(*args, **kwargs)
16         self.queue_out = queue
17         self.daemon_threads = True  # Else, server's threads have daemon=False.
18
19
20 class IO_Handler(socketserver.BaseRequestHandler):
21
22     def handle(self):
23         """Move messages between network socket and main thread via queues.
24
25         On start, sets up new queue, sends it via self.server.queue_out to
26         main thread, and from then on receives messages to send back from the
27         main thread via that new queue.
28
29         At the same time, loops over socket's recv to get messages from the
30         outside via self.server.queue_out into the main thread. Ends connection
31         once a 'QUIT' message is received from socket, and then also kills its
32         own queue.
33
34         All messages to the main thread are tuples, with the first element a
35         meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
36         deletion, and 'COMMAND' for everything else), the second element a UUID
37         that uniquely identifies the thread (so that the main thread knows whom
38         to send replies back to), and optionally a third element for further
39         instructions.
40         """
41         import plom_socket_io
42
43         def caught_send(socket, message):
44             """Send message by socket, catch broken socket connection error."""
45             try:
46                 plom_socket_io.send(socket, message)
47             except plom_socket_io.BrokenSocketConnection:
48                 pass
49
50         def send_queue_messages(socket, queue_in, thread_alive):
51             """Send messages via socket from queue_in while thread_alive[0]."""
52             while thread_alive[0]:
53                 try:
54                     msg = queue_in.get(timeout=1)
55                 except queue.Empty:
56                     continue
57                 caught_send(socket, msg)
58
59         import uuid
60         print('CONNECTION FROM:', str(self.client_address))
61         connection_id = uuid.uuid4()
62         queue_in = queue.Queue()
63         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
64         thread_alive = [True]
65         t = threading.Thread(target=send_queue_messages,
66                              args=(self.request, queue_in, thread_alive))
67         t.start()
68         for message in plom_socket_io.recv(self.request):
69             if message is None:
70                 caught_send(self.request, 'BAD MESSAGE')
71             elif 'QUIT' == message:
72                 caught_send(self.request, 'BYE')
73                 break
74             else:
75                 self.server.queue_out.put(('COMMAND', connection_id, message))
76         self.server.queue_out.put(('KILL_QUEUE', connection_id))
77         thread_alive[0] = False
78         print('CONNECTION CLOSED FROM:', str(self.client_address))
79         self.request.close()
80
81
82 class Task:
83
84     def __init__(self, name, args=(), kwargs={}):
85         self.name = name
86         self.args = args
87         self.kwargs = kwargs
88         self.todo = 1
89
90
91 class Thing:
92
93     def __init__(self, type_, position):
94         self.type = type_
95         self.position = position
96         self.task = Task('wait')
97
98     def task_wait(self):
99         pass
100
101     def task_move(self, direction):
102         if direction == 'UP':
103             self.position[0] -= 1
104         elif direction == 'DOWN':
105             self.position[0] += 1
106         elif direction == 'RIGHT':
107             self.position[1] += 1
108         elif direction == 'LEFT':
109             self.position[1] -= 1
110
111     def decide_task(self):
112         if self.position[1] > 1:
113             self.set_task('move', 'LEFT')
114         elif self.position[1] < 3:
115             self.set_task('move', 'RIGHT')
116         else:
117             self.set_task('wait')
118
119     def set_task(self, task, *args, **kwargs):
120         self.task = Task(task, args, kwargs)
121
122     def proceed(self, is_AI=True):
123         """Further the thing in its tasks.
124
125         Decrements .task.todo; if it thus falls to <= 0, enacts method whose
126         name is 'task_' + self.task.name and sets .task = None. If is_AI, calls
127         .decide_task to decide a self.task.
128         """
129         self.task.todo -= 1
130         if self.task.todo <= 0:
131             task = getattr(self, 'task_' + self.task.name)
132             task(*self.task.args, **self.task.kwargs)
133             self.task = None
134         if is_AI and self.task is None:
135             self.decide_task()
136
137
138 class World:
139
140     def __init__(self):
141         self.turn = 0
142         self.map_size = (5, 5)
143         self.map_ = 'xxxxx\n' +\
144                     'x...x\n' +\
145                     'x.X.x\n' +\
146                     'x...x\n' +\
147                     'xxxxx'
148         self.things = [Thing('human', [3, 3]), Thing('monster', [1, 1])]
149         self.player_i = 0
150         self.player = self.things[self.player_i]
151
152
153 def fib(n):
154     """Calculate n-th Fibonacci number. Very inefficiently."""
155     if n in (1, 2):
156         return 1
157     else:
158         return fib(n-1) + fib(n-2)
159
160
161 class ArgumentError(Exception):
162     pass
163
164
165 class CommandHandler:
166
167     def __init__(self, queues_out):
168         from multiprocessing import Pool
169         self.queues_out = queues_out
170         self.world = World()
171         # self.pool and self.pool_result are currently only needed by the FIB
172         # command and the demo of a parallelized game loop in cmd_inc_p.
173         self.pool = Pool()
174         self.pool_result = None
175
176     def send_to(self, connection_id, msg):
177         """Send msg to client of connection_id."""
178         self.queues_out[connection_id].put(msg)
179
180     def send_all(self, msg):
181         """Send msg to all clients."""
182         for connection_id in self.queues_out:
183             self.send_to(connection_id, msg)
184
185     def stringify_yx(self, tuple_):
186         """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
187         return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
188
189     def quoted(self, string):
190         """Quote and escape string so client interprets it as single token."""
191         quoted = []
192         quoted += ['"']
193         for c in string:
194             if c in {'"', '\\'}:
195                 quoted += ['\\']
196             quoted += [c]
197         quoted += ['"']
198         return ''.join(quoted)
199
200     def proceed_to_next_player_turn(self, connection_id):
201         """Run game world turns until player can decide their next step.
202
203         Sends a 'TURN_FINISHED' message, then iterates through all non-player
204         things, on each step furthering them in their tasks (and letting them
205         decide new ones if they finish). The iteration order is: first all
206         things that come after the player in the world things list, then (after
207         incrementing the world turn) all that come before the player; then the
208         player's .proceed() is run, and if it does not finish his task, the
209         loop starts at the beginning. Once the player's task is finished, the
210         loop breaks, and client-relevant game data is sent.
211         """
212         self.send_all('TURN_FINISHED ' + str(self.world.turn))
213         while True:
214             for thing in self.world.things[self.world.player_i+1:]:
215                 thing.proceed()
216             self.world.turn += 1
217             for thing in self.world.things[:self.world.player_i]:
218                 thing.proceed()
219             self.world.player.proceed(is_AI=False)
220             if self.world.player.task is None:
221                 break
222         self.send_all('NEW_TURN ' + str(self.world.turn))
223         self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
224         self.send_all('TERRAIN\n' + self.quoted(self.world.map_))
225         for thing in self.world.things:
226             self.send_all('THING TYPE:' + thing.type + ' '
227                           + self.stringify_yx(thing.position))
228
229     def cmd_fib(self, tokens, connection_id):
230         """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
231
232         Numbers are calculated in parallel as far as possible, using fib().
233         A 'CALCULATING …' message is sent to caller before the result.
234         """
235         if len(tokens) < 2:
236             raise ArgumentError('FIB NEEDS AT LEAST ONE ARGUMENT')
237         numbers = []
238         for token in tokens[1:]:
239             if token == '0' or not token.isdigit():
240                 raise ArgumentError('FIB ARGUMENTS MUST BE INTEGERS > 0')
241             numbers += [int(token)]
242         self.send_to(connection_id, 'CALCULATING …')
243         results = self.pool.map(fib, numbers)
244         reply = ' '.join([str(r) for r in results])
245         self.send_to(connection_id, reply)
246
247     def cmd_inc_p(self, connection_id):
248         """Increment world.turn, send game turn data to everyone.
249
250         To simulate game processing waiting times, a one second delay between
251         TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
252         calculations are started as pool processes that need to be finished
253         until a further INC finishes the turn.
254
255         This is just a demo structure for how the game loop could work when
256         parallelized. One might imagine a two-step game turn, with a non-action
257         step determining actor tasks (the AI determinations would take the
258         place of the fib calculations here), and an action step wherein these
259         tasks are performed (where now sleep(1) is).
260         """
261         from time import sleep
262         if self.pool_result is not None:
263             self.pool_result.wait()
264         self.send_all('TURN_FINISHED ' + str(self.world.turn))
265         sleep(1)
266         self.world.turn += 1
267         self.send_all('NEW_TURN ' + str(self.world.turn))
268         self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
269         self.send_all('TERRAIN\n' + self.quoted(self.world.map_))
270         for thing in self.world.things:
271             self.send_all('THING TYPE:' + thing.type + ' '
272                           + self.stringify_yx(thing.position))
273         self.pool_result = self.pool.map_async(fib, (35, 35))
274
275     def cmd_get_turn(self, connection_id):
276         """Send world.turn to caller."""
277         self.send_to(connection_id, str(self.world.turn))
278
279     def cmd_move(self, direction, connection_id):
280         """Set player task to 'move' with direction arg, finish player turn."""
281         if direction not in {'UP', 'DOWN', 'RIGHT', 'LEFT'}:
282             raise ArgumentError('MOVE ARGUMENT MUST BE ONE OF: '
283                                 'UP, DOWN, RIGHT, LEFT')
284         self.world.player.set_task('move', direction=direction)
285         self.proceed_to_next_player_turn(connection_id)
286
287     def cmd_wait(self, connection_id):
288         """Set player task to 'wait', finish player turn."""
289         self.world.player.set_task('wait')
290         self.proceed_to_next_player_turn(connection_id)
291
292     def cmd_echo(self, tokens, input_, connection_id):
293         """Send message in input_ beyond tokens[0] to caller."""
294         msg = input_[len(tokens[0]) + 1:]
295         self.send_to(connection_id, msg)
296
297     def cmd_all(self, tokens, input_):
298         """Send message in input_ beyond tokens[0] to all clients."""
299         msg = input_[len(tokens[0]) + 1:]
300         self.send_all(msg)
301
302     def handle_input(self, input_, connection_id):
303         """Process input_ to command grammar, call command handler if found."""
304         tokens = [token for token in input_.split(' ') if len(token) > 0]
305         try:
306             if len(tokens) == 0:
307                 self.send_to(connection_id, 'EMPTY COMMAND')
308             elif len(tokens) == 1 and tokens[0] == 'INC_P':
309                 self.cmd_inc_p(connection_id)
310             elif len(tokens) == 1 and tokens[0] == 'GET_TURN':
311                 self.cmd_get_turn(connection_id)
312             elif len(tokens) == 1 and tokens[0] == 'WAIT':
313                 self.cmd_wait(connection_id)
314             elif len(tokens) == 2 and tokens[0] == 'MOVE':
315                 self.cmd_move(tokens[1], connection_id)
316             elif len(tokens) >= 1 and tokens[0] == 'ECHO':
317                 self.cmd_echo(tokens, input_, connection_id)
318             elif len(tokens) >= 1 and tokens[0] == 'ALL':
319                 self.cmd_all(tokens, input_)
320             elif len(tokens) >= 1 and tokens[0] == 'FIB':
321                 # TODO: Should this really block the whole loop?
322                 self.cmd_fib(tokens, connection_id)
323             else:
324                 self.send_to(connection_id, 'UNKNOWN COMMAND')
325         except ArgumentError as e:
326             self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
327
328
329 def io_loop(q):
330     """Handle commands coming through queue q, send results back.
331
332     Commands from q are expected to be tuples, with the first element either
333     'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
334     an optional third element of arbitrary type. The UUID identifies a
335     receiver for replies.
336
337     An 'ADD_QUEUE' command should contain as third element a queue through
338     which to send messages back to the sender of the command. A 'KILL_QUEUE'
339     command removes the queue for that receiver from the list of queues through
340     which to send replies.
341
342     A 'COMMAND' command is specified in greater detail by a string that is the
343     tuple's third element. CommandHandler takes care of processing this and
344     sending out replies.
345     """
346     queues_out = {}
347     command_handler = CommandHandler(queues_out)
348     while True:
349         x = q.get()
350         command_type = x[0]
351         connection_id = x[1]
352         content = None if len(x) == 2 else x[2]
353         if command_type == 'ADD_QUEUE':
354             queues_out[connection_id] = content
355         elif command_type == 'COMMAND':
356             command_handler.handle_input(content, connection_id)
357         elif command_type == 'KILL_QUEUE':
358             del queues_out[connection_id]
359
360
361 q = queue.Queue()
362 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
363 c.start()
364 server = Server(q, ('localhost', 5000), IO_Handler)
365 try:
366     server.serve_forever()
367 except KeyboardInterrupt:
368     pass
369 finally:
370     print('Killing server')
371     server.server_close()