home · contact · privacy
Refactor transfer of task command arguments.
[plomrogue2-experiments] / server.py
1 #!/usr/bin/env python3
2
3 import socketserver
4 import threading
5 import queue
6
7 # Avoid "Address already in use" errors.
8 socketserver.TCPServer.allow_reuse_address = True
9
10
11 class Server(socketserver.ThreadingTCPServer):
12     """Bind together threaded IO handling server and message queue."""
13
14     def __init__(self, queue, *args, **kwargs):
15         super().__init__(*args, **kwargs)
16         self.queue_out = queue
17         self.daemon_threads = True  # Else, server's threads have daemon=False.
18
19
20 class IO_Handler(socketserver.BaseRequestHandler):
21
22     def handle(self):
23         """Move messages between network socket and main thread via queues.
24
25         On start, sets up new queue, sends it via self.server.queue_out to
26         main thread, and from then on receives messages to send back from the
27         main thread via that new queue.
28
29         At the same time, loops over socket's recv to get messages from the
30         outside via self.server.queue_out into the main thread. Ends connection
31         once a 'QUIT' message is received from socket, and then also kills its
32         own queue.
33
34         All messages to the main thread are tuples, with the first element a
35         meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
36         deletion, and 'COMMAND' for everything else), the second element a UUID
37         that uniquely identifies the thread (so that the main thread knows whom
38         to send replies back to), and optionally a third element for further
39         instructions.
40         """
41         import plom_socket_io
42
43         def caught_send(socket, message):
44             """Send message by socket, catch broken socket connection error."""
45             try:
46                 plom_socket_io.send(socket, message)
47             except plom_socket_io.BrokenSocketConnection:
48                 pass
49
50         def send_queue_messages(socket, queue_in, thread_alive):
51             """Send messages via socket from queue_in while thread_alive[0]."""
52             while thread_alive[0]:
53                 try:
54                     msg = queue_in.get(timeout=1)
55                 except queue.Empty:
56                     continue
57                 caught_send(socket, msg)
58
59         import uuid
60         print('CONNECTION FROM:', str(self.client_address))
61         connection_id = uuid.uuid4()
62         queue_in = queue.Queue()
63         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
64         thread_alive = [True]
65         t = threading.Thread(target=send_queue_messages,
66                              args=(self.request, queue_in, thread_alive))
67         t.start()
68         for message in plom_socket_io.recv(self.request):
69             if message is None:
70                 caught_send(self.request, 'BAD MESSAGE')
71             elif 'QUIT' == message:
72                 caught_send(self.request, 'BYE')
73                 break
74             else:
75                 self.server.queue_out.put(('COMMAND', connection_id, message))
76         self.server.queue_out.put(('KILL_QUEUE', connection_id))
77         thread_alive[0] = False
78         print('CONNECTION CLOSED FROM:', str(self.client_address))
79         self.request.close()
80
81
82 class Task:
83
84     def __init__(self, name, args=(), kwargs={}):
85         self.name = name
86         self.args = args
87         self.kwargs = kwargs
88         self.todo = 1
89
90
91 class Thing:
92
93     def __init__(self, position):
94         self.position = position
95         self.task = Task('wait')
96
97     def task_wait(self):
98         pass
99
100     def task_move(self, direction):
101         if direction == 'UP':
102             self.position[0] -= 1
103         elif direction == 'DOWN':
104             self.position[0] += 1
105
106     def decide_task(self):
107         self.set_task('wait')
108
109     def set_task(self, task, *args, **kwargs):
110         self.task = Task(task, args, kwargs)
111
112     def proceed(self, is_AI=True):
113         """Further the thing in its tasks.
114
115         Decrements .task.todo; if it thus falls to <= 0, enacts method whose
116         name is 'task_' + self.task.name and sets .task = None. If is_AI, calls
117         .decide_task to decide a self.task.
118         """
119         self.task.todo -= 1
120         if self.task.todo <= 0:
121             task= getattr(self, 'task_' + self.task.name)
122             task(*self.task.args, **self.task.kwargs)
123             self.task = None
124         if is_AI and self.task is None:
125             self.decide_task()
126
127
128 class World:
129
130     def __init__(self):
131         self.turn = 0
132         self.map_size = (5, 5)
133         self.map_ = 'xxxxx\n'+\
134                     'x...x\n'+\
135                     'x.X.x\n'+\
136                     'x...x\n'+\
137                     'xxxxx'
138         self.things = [Thing(position=[3, 3]), Thing([1, 1])]
139         self.player_i = 0
140         self.player = self.things[self.player_i]
141
142
143 def fib(n):
144     """Calculate n-th Fibonacci number. Very inefficiently."""
145     if n in (1, 2):
146         return 1
147     else:
148         return fib(n-1) + fib(n-2)
149
150
151 class ArgumentError(Exception):
152     pass
153
154
155 class CommandHandler:
156
157     def __init__(self, queues_out):
158         from multiprocessing import Pool
159         self.queues_out = queues_out
160         self.world = World()
161         # self.pool and self.pool_result are currently only needed by the FIB
162         # command and the demo of a parallelized game loop in cmd_inc_p.
163         self.pool = Pool()
164         self.pool_result = None
165
166     def send_to(self, connection_id, msg):
167         """Send msg to client of connection_id."""
168         self.queues_out[connection_id].put(msg)
169
170     def send_all(self, msg):
171         """Send msg to all clients."""
172         for connection_id in self.queues_out:
173             self.send_to(connection_id, msg)
174
175     def stringify_yx(self, tuple_):
176         """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
177         return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
178
179     def proceed_to_next_player_turn(self, connection_id):
180         """Run game world turns until player can decide their next step.
181
182         Sends a 'TURN_FINISHED' message, then iterates through all non-player
183         things, on each step furthering them in their tasks (and letting them
184         decide new ones if they finish). The iteration order is: first all
185         things that come after the player in the world things list, then (after
186         incrementing the world turn) all that come before the player; then the
187         player's .proceed() is run, and if it does not finish his task, the
188         loop starts at the beginning. Once the player's task is finished, the
189         loop breaks, and client-relevant game data is sent.
190         """
191         self.send_all('TURN_FINISHED ' + str(self.world.turn))
192         while True:
193             for thing in self.world.things[self.world.player_i+1:]:
194                 thing.proceed()
195             self.world.turn += 1
196             for thing  in self.world.things[:self.world.player_i]:
197                 thing.proceed()
198             self.world.player.proceed(is_AI=False)
199             if self.world.player.task is None:
200                 break
201         self.send_all('NEW_TURN ' + str(self.world.turn))
202         self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
203         self.send_all('TERRAIN\n' + self.world.map_)
204         self.send_all('POSITION ' + self.stringify_yx(self.world.player.position))
205
206     def cmd_fib(self, tokens, connection_id):
207         """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
208
209         Numbers are calculated in parallel as far as possible, using fib().
210         A 'CALCULATING …' message is sent to caller before the result.
211         """
212         if len(tokens) < 2:
213             raise ArgumentError('FIB NEEDS AT LEAST ONE ARGUMENT')
214         numbers = []
215         for token in tokens[1:]:
216             if token == '0' or not token.isdigit():
217                 raise ArgumentError('FIB ARGUMENTS MUST BE INTEGERS > 0')
218             numbers += [int(token)]
219         self.send_to(connection_id, 'CALCULATING …')
220         results = self.pool.map(fib, numbers)
221         reply = ' '.join([str(r) for r in results])
222         self.send_to(connection_id, reply)
223
224     def cmd_inc_p(self, connection_id):
225         """Increment world.turn, send game turn data to everyone.
226
227         To simulate game processing waiting times, a one second delay between
228         TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
229         calculations are started as pool processes that need to be finished
230         until a further INC finishes the turn.
231
232         This is just a demo structure for how the game loop could work when
233         parallelized. One might imagine a two-step game turn, with a non-action
234         step determining actor tasks (the AI determinations would take the
235         place of the fib calculations here), and an action step wherein these
236         tasks are performed (where now sleep(1) is).
237         """
238         from time import sleep
239         if self.pool_result is not None:
240             self.pool_result.wait()
241         self.send_all('TURN_FINISHED ' + str(self.world.turn))
242         sleep(1)
243         self.world.turn += 1
244         self.send_all('NEW_TURN ' + str(self.world.turn))
245         self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
246         self.send_all('TERRAIN\n' + self.world.map_)
247         self.send_all('POSITION ' + self.stringify_yx(self.world.player.position))
248         self.pool_result = self.pool.map_async(fib, (35, 35))
249
250     def cmd_get_turn(self, connection_id):
251         """Send world.turn to caller."""
252         self.send_to(connection_id, str(self.world.turn))
253
254     def cmd_move(self, direction, connection_id):
255         """Set player task to 'move' with direction arg, finish player turn."""
256         if not direction in {'UP', 'DOWN'}:
257             raise ArgumentError('MOVE ARGUMENT MUST BE "UP" or "DOWN"')
258         self.world.player.set_task('move', direction=direction)
259         self.proceed_to_next_player_turn(connection_id)
260
261     def cmd_wait(self, connection_id):
262         """Set player task to 'wait', finish player turn."""
263         self.world.player.set_task('wait')
264         self.proceed_to_next_player_turn(connection_id)
265
266     def cmd_echo(self, tokens, input_, connection_id):
267         """Send message in input_ beyond tokens[0] to caller."""
268         msg = input_[len(tokens[0]) + 1:]
269         self.send_to(connection_id, msg)
270
271     def cmd_all(self, tokens, input_):
272         """Send message in input_ beyond tokens[0] to all clients."""
273         msg = input_[len(tokens[0]) + 1:]
274         self.send_all(msg)
275
276     def handle_input(self, input_, connection_id):
277         """Process input_ to command grammar, call command handler if found."""
278         tokens = [token for token in input_.split(' ') if len(token) > 0]
279         try:
280             if len(tokens) == 0:
281                 self.send_to(connection_id, 'EMPTY COMMAND')
282             elif len(tokens) == 1 and tokens[0] == 'INC_P':
283                 self.cmd_inc_p(connection_id)
284             elif len(tokens) == 1 and tokens[0] == 'GET_TURN':
285                 self.cmd_get_turn(connection_id)
286             elif len(tokens) == 1 and tokens[0] == 'WAIT':
287                 self.cmd_wait(connection_id)
288             elif len(tokens) == 2 and tokens[0] == 'MOVE':
289                 self.cmd_move(tokens[1], connection_id)
290             elif len(tokens) >= 1 and tokens[0] == 'ECHO':
291                 self.cmd_echo(tokens, input_, connection_id)
292             elif len(tokens) >= 1 and tokens[0] == 'ALL':
293                 self.cmd_all(tokens, input_)
294             elif len(tokens) >= 1 and tokens[0] == 'FIB':
295                 # TODO: Should this really block the whole loop?
296                 self.cmd_fib(tokens, connection_id)
297             else:
298                 self.send_to(connection_id, 'UNKNOWN COMMAND')
299         except ArgumentError as e:
300             self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
301
302
303 def io_loop(q):
304     """Handle commands coming through queue q, send results back.
305
306     Commands from q are expected to be tuples, with the first element either
307     'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
308     an optional third element of arbitrary type. The UUID identifies a
309     receiver for replies.
310
311     An 'ADD_QUEUE' command should contain as third element a queue through
312     which to send messages back to the sender of the command. A 'KILL_QUEUE'
313     command removes the queue for that receiver from the list of queues through
314     which to send replies.
315
316     A 'COMMAND' command is specified in greater detail by a string that is the
317     tuple's third element. CommandHandler takes care of processing this and
318     sending out replies.
319     """
320     queues_out = {}
321     command_handler = CommandHandler(queues_out)
322     while True:
323         x = q.get()
324         command_type = x[0]
325         connection_id = x[1]
326         content = None if len(x) == 2 else x[2]
327         if command_type == 'ADD_QUEUE':
328             queues_out[connection_id] = content
329         elif command_type == 'COMMAND':
330             command_handler.handle_input(content, connection_id)
331         elif command_type == 'KILL_QUEUE':
332             del queues_out[connection_id]
333
334
335 q = queue.Queue()
336 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
337 c.start()
338 server = Server(q, ('localhost', 5000), IO_Handler)
339 try:
340     server.serve_forever()
341 except KeyboardInterrupt:
342     pass
343 finally:
344     print('Killing server')
345     server.server_close()