home · contact · privacy
Add game loop, task progress logic.
[plomrogue2-experiments] / server.py
1 #!/usr/bin/env python3
2
3 import socketserver
4 import threading
5 import queue
6
7 # Avoid "Address already in use" errors.
8 socketserver.TCPServer.allow_reuse_address = True
9
10
11 class Server(socketserver.ThreadingTCPServer):
12     """Bind together threaded IO handling server and message queue."""
13
14     def __init__(self, queue, *args, **kwargs):
15         super().__init__(*args, **kwargs)
16         self.queue_out = queue
17         self.daemon_threads = True  # Else, server's threads have daemon=False.
18
19
20 class IO_Handler(socketserver.BaseRequestHandler):
21
22     def handle(self):
23         """Move messages between network socket and main thread via queues.
24
25         On start, sets up new queue, sends it via self.server.queue_out to
26         main thread, and from then on receives messages to send back from the
27         main thread via that new queue.
28
29         At the same time, loops over socket's recv to get messages from the
30         outside via self.server.queue_out into the main thread. Ends connection
31         once a 'QUIT' message is received from socket, and then also kills its
32         own queue.
33
34         All messages to the main thread are tuples, with the first element a
35         meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
36         deletion, and 'COMMAND' for everything else), the second element a UUID
37         that uniquely identifies the thread (so that the main thread knows whom
38         to send replies back to), and optionally a third element for further
39         instructions.
40         """
41         import plom_socket_io
42
43         def caught_send(socket, message):
44             """Send message by socket, catch broken socket connection error."""
45             try:
46                 plom_socket_io.send(socket, message)
47             except plom_socket_io.BrokenSocketConnection:
48                 pass
49
50         def send_queue_messages(socket, queue_in, thread_alive):
51             """Send messages via socket from queue_in while thread_alive[0]."""
52             while thread_alive[0]:
53                 try:
54                     msg = queue_in.get(timeout=1)
55                 except queue.Empty:
56                     continue
57                 caught_send(socket, msg)
58
59         import uuid
60         print('CONNECTION FROM:', str(self.client_address))
61         connection_id = uuid.uuid4()
62         queue_in = queue.Queue()
63         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
64         thread_alive = [True]
65         t = threading.Thread(target=send_queue_messages,
66                              args=(self.request, queue_in, thread_alive))
67         t.start()
68         for message in plom_socket_io.recv(self.request):
69             if message is None:
70                 caught_send(self.request, 'BAD MESSAGE')
71             elif 'QUIT' == message:
72                 caught_send(self.request, 'BYE')
73                 break
74             else:
75                 self.server.queue_out.put(('COMMAND', connection_id, message))
76         self.server.queue_out.put(('KILL_QUEUE', connection_id))
77         thread_alive[0] = False
78         print('CONNECTION CLOSED FROM:', str(self.client_address))
79         self.request.close()
80
81
82 class Thing:
83
84     def __init__(self, position):
85         self.position = position
86         self.task = 'wait'
87         self.todo = 0
88
89     def task_wait(self):
90         pass
91
92     def task_moveup(self):
93         self.position[0] -= 1
94
95     def task_movedown(self):
96         self.position[0] += 1
97
98     def decide_task(self):
99         self.set_task('wait')
100
101     def set_task(self, task):
102         self.task = task
103         self.todo = 1
104
105     def proceed(self, is_AI=True):
106         """Further the thing in its tasks.
107
108         Decrements self.todo; if it thus falls to <= 0, enacts the method whose
109         name is 'task_' + self.task and sets self.task = None. If is_AI, calls
110         self.decide_task to decide a new self.task.
111         """
112         self.todo -= 1
113         if self.todo <= 0:
114             task= getattr(self, 'task_' + self.task)
115             task()
116             self.task = None
117         if is_AI and self.task is None:
118             self.decide_task()
119
120
121 class World:
122
123     def __init__(self):
124         self.turn = 0
125         self.map_size = (5, 5)
126         self.map_ = 'xxxxx\n'+\
127                     'x...x\n'+\
128                     'x.X.x\n'+\
129                     'x...x\n'+\
130                     'xxxxx'
131         self.things = [Thing(position=[3, 3]), Thing([1, 1])]
132         self.player_i = 0
133         self.player = self.things[self.player_i]
134
135
136 def fib(n):
137     """Calculate n-th Fibonacci number. Very inefficiently."""
138     if n in (1, 2):
139         return 1
140     else:
141         return fib(n-1) + fib(n-2)
142
143
144 class ArgumentError(Exception):
145     pass
146
147
148 class CommandHandler:
149
150     def __init__(self, queues_out):
151         from multiprocessing import Pool
152         self.queues_out = queues_out
153         self.world = World()
154         # self.pool and self.pool_result are currently only needed by the FIB
155         # command and the demo of a parallelized game loop in cmd_inc_p.
156         self.pool = Pool()
157         self.pool_result = None
158
159     def send_to(self, connection_id, msg):
160         """Send msg to client of connection_id."""
161         self.queues_out[connection_id].put(msg)
162
163     def send_all(self, msg):
164         """Send msg to all clients."""
165         for connection_id in self.queues_out:
166             self.send_to(connection_id, msg)
167
168     def stringify_yx(self, tuple_):
169         """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
170         return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
171
172     def proceed_to_next_player_turn(self, connection_id):
173         """Run game world turns until player can decide their next step.
174
175         Sends a 'TURN_FINISHED' message, then iterates through all non-player
176         things, on each step furthering them in their tasks (and letting them
177         decide new ones if they finish). The iteration order is: first all
178         things that come after the player in the world things list, then (after
179         incrementing the world turn) all that come before the player; then the
180         player's .proceed() is run, and if it does not finish his task, the
181         loop starts at the beginning. Once the player's task is finished, the
182         loop breaks, and client-relevant game data is sent.
183         """
184         self.send_all('TURN_FINISHED ' + str(self.world.turn))
185         while True:
186             for thing in self.world.things[self.world.player_i+1:]:
187                 thing.proceed()
188             self.world.turn += 1
189             for thing  in self.world.things[:self.world.player_i]:
190                 thing.proceed()
191             self.world.player.proceed(is_AI=False)
192             if self.world.player.task is None:
193                 break
194         self.send_all('NEW_TURN ' + str(self.world.turn))
195         self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
196         self.send_all('TERRAIN\n' + self.world.map_)
197         self.send_all('POSITION ' + self.stringify_yx(self.world.player.position))
198
199     def cmd_fib(self, tokens, connection_id):
200         """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
201
202         Numbers are calculated in parallel as far as possible, using fib().
203         A 'CALCULATING …' message is sent to caller before the result.
204         """
205         if len(tokens) < 2:
206             raise ArgumentError('FIB NEEDS AT LEAST ONE ARGUMENT')
207         numbers = []
208         for token in tokens[1:]:
209             if token == '0' or not token.isdigit():
210                 raise ArgumentError('FIB ARGUMENTS MUST BE INTEGERS > 0')
211             numbers += [int(token)]
212         self.send_to(connection_id, 'CALCULATING …')
213         results = self.pool.map(fib, numbers)
214         reply = ' '.join([str(r) for r in results])
215         self.send_to(connection_id, reply)
216
217     def cmd_inc_p(self, connection_id):
218         """Increment world.turn, send game turn data to everyone.
219
220         To simulate game processing waiting times, a one second delay between
221         TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
222         calculations are started as pool processes that need to be finished
223         until a further INC finishes the turn.
224
225         This is just a demo structure for how the game loop could work when
226         parallelized. One might imagine a two-step game turn, with a non-action
227         step determining actor tasks (the AI determinations would take the
228         place of the fib calculations here), and an action step wherein these
229         tasks are performed (where now sleep(1) is).
230         """
231         from time import sleep
232         if self.pool_result is not None:
233             self.pool_result.wait()
234         self.send_all('TURN_FINISHED ' + str(self.world.turn))
235         sleep(1)
236         self.world.turn += 1
237         self.send_all('NEW_TURN ' + str(self.world.turn))
238         self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
239         self.send_all('TERRAIN\n' + self.world.map_)
240         self.send_all('POSITION ' + self.stringify_yx(self.world.player.position))
241         self.pool_result = self.pool.map_async(fib, (35, 35))
242
243     def cmd_get_turn(self, connection_id):
244         """Send world.turn to caller."""
245         self.send_to(connection_id, str(self.world.turn))
246
247     def cmd_move(self, direction, connection_id):
248         """Set player task to 'moveup' or 'movedown', finish player turn."""
249         if not direction in {'UP', 'DOWN'}:
250             raise ArgumentError('MOVE ARGUMENT MUST BE "UP" or "DOWN"')
251         if direction == 'UP':
252             self.world.player.set_task('moveup')
253         else:
254             self.world.player.set_task('movedown')
255         self.proceed_to_next_player_turn(connection_id)
256
257     def cmd_wait(self, connection_id):
258         """Set player task to 'wait', finish player turn."""
259         self.world.player.set_task('wait')
260         self.proceed_to_next_player_turn(connection_id)
261
262     def cmd_echo(self, tokens, input_, connection_id):
263         """Send message in input_ beyond tokens[0] to caller."""
264         msg = input_[len(tokens[0]) + 1:]
265         self.send_to(connection_id, msg)
266
267     def cmd_all(self, tokens, input_):
268         """Send message in input_ beyond tokens[0] to all clients."""
269         msg = input_[len(tokens[0]) + 1:]
270         self.send_all(msg)
271
272     def handle_input(self, input_, connection_id):
273         """Process input_ to command grammar, call command handler if found."""
274         tokens = [token for token in input_.split(' ') if len(token) > 0]
275         try:
276             if len(tokens) == 0:
277                 self.send_to(connection_id, 'EMPTY COMMAND')
278             elif len(tokens) == 1 and tokens[0] == 'INC_P':
279                 self.cmd_inc_p(connection_id)
280             elif len(tokens) == 1 and tokens[0] == 'GET_TURN':
281                 self.cmd_get_turn(connection_id)
282             elif len(tokens) == 1 and tokens[0] == 'WAIT':
283                 self.cmd_wait(connection_id)
284             elif len(tokens) == 2 and tokens[0] == 'MOVE':
285                 self.cmd_move(tokens[1], connection_id)
286             elif len(tokens) >= 1 and tokens[0] == 'ECHO':
287                 self.cmd_echo(tokens, input_, connection_id)
288             elif len(tokens) >= 1 and tokens[0] == 'ALL':
289                 self.cmd_all(tokens, input_)
290             elif len(tokens) >= 1 and tokens[0] == 'FIB':
291                 # TODO: Should this really block the whole loop?
292                 self.cmd_fib(tokens, connection_id)
293             else:
294                 self.send_to(connection_id, 'UNKNOWN COMMAND')
295         except ArgumentError as e:
296             self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
297
298
299 def io_loop(q):
300     """Handle commands coming through queue q, send results back.
301
302     Commands from q are expected to be tuples, with the first element either
303     'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
304     an optional third element of arbitrary type. The UUID identifies a
305     receiver for replies.
306
307     An 'ADD_QUEUE' command should contain as third element a queue through
308     which to send messages back to the sender of the command. A 'KILL_QUEUE'
309     command removes the queue for that receiver from the list of queues through
310     which to send replies.
311
312     A 'COMMAND' command is specified in greater detail by a string that is the
313     tuple's third element. CommandHandler takes care of processing this and
314     sending out replies.
315     """
316     queues_out = {}
317     command_handler = CommandHandler(queues_out)
318     while True:
319         x = q.get()
320         command_type = x[0]
321         connection_id = x[1]
322         content = None if len(x) == 2 else x[2]
323         if command_type == 'ADD_QUEUE':
324             queues_out[connection_id] = content
325         elif command_type == 'COMMAND':
326             command_handler.handle_input(content, connection_id)
327         elif command_type == 'KILL_QUEUE':
328             del queues_out[connection_id]
329
330
331 q = queue.Queue()
332 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
333 c.start()
334 server = Server(q, ('localhost', 5000), IO_Handler)
335 try:
336     server.serve_forever()
337 except KeyboardInterrupt:
338     pass
339 finally:
340     print('Killing server')
341     server.server_close()