home · contact · privacy
Add Thing types and map them to symbols to display in client.
[plomrogue2-experiments] / server.py
1 #!/usr/bin/env python3
2
3 import socketserver
4 import threading
5 import queue
6
7 # Avoid "Address already in use" errors.
8 socketserver.TCPServer.allow_reuse_address = True
9
10
11 class Server(socketserver.ThreadingTCPServer):
12     """Bind together threaded IO handling server and message queue."""
13
14     def __init__(self, queue, *args, **kwargs):
15         super().__init__(*args, **kwargs)
16         self.queue_out = queue
17         self.daemon_threads = True  # Else, server's threads have daemon=False.
18
19
20 class IO_Handler(socketserver.BaseRequestHandler):
21
22     def handle(self):
23         """Move messages between network socket and main thread via queues.
24
25         On start, sets up new queue, sends it via self.server.queue_out to
26         main thread, and from then on receives messages to send back from the
27         main thread via that new queue.
28
29         At the same time, loops over socket's recv to get messages from the
30         outside via self.server.queue_out into the main thread. Ends connection
31         once a 'QUIT' message is received from socket, and then also kills its
32         own queue.
33
34         All messages to the main thread are tuples, with the first element a
35         meta command ('ADD_QUEUE' for queue creation, 'KILL_QUEUE' for queue
36         deletion, and 'COMMAND' for everything else), the second element a UUID
37         that uniquely identifies the thread (so that the main thread knows whom
38         to send replies back to), and optionally a third element for further
39         instructions.
40         """
41         import plom_socket_io
42
43         def caught_send(socket, message):
44             """Send message by socket, catch broken socket connection error."""
45             try:
46                 plom_socket_io.send(socket, message)
47             except plom_socket_io.BrokenSocketConnection:
48                 pass
49
50         def send_queue_messages(socket, queue_in, thread_alive):
51             """Send messages via socket from queue_in while thread_alive[0]."""
52             while thread_alive[0]:
53                 try:
54                     msg = queue_in.get(timeout=1)
55                 except queue.Empty:
56                     continue
57                 caught_send(socket, msg)
58
59         import uuid
60         print('CONNECTION FROM:', str(self.client_address))
61         connection_id = uuid.uuid4()
62         queue_in = queue.Queue()
63         self.server.queue_out.put(('ADD_QUEUE', connection_id, queue_in))
64         thread_alive = [True]
65         t = threading.Thread(target=send_queue_messages,
66                              args=(self.request, queue_in, thread_alive))
67         t.start()
68         for message in plom_socket_io.recv(self.request):
69             if message is None:
70                 caught_send(self.request, 'BAD MESSAGE')
71             elif 'QUIT' == message:
72                 caught_send(self.request, 'BYE')
73                 break
74             else:
75                 self.server.queue_out.put(('COMMAND', connection_id, message))
76         self.server.queue_out.put(('KILL_QUEUE', connection_id))
77         thread_alive[0] = False
78         print('CONNECTION CLOSED FROM:', str(self.client_address))
79         self.request.close()
80
81
82 class Task:
83
84     def __init__(self, name, args=(), kwargs={}):
85         self.name = name
86         self.args = args
87         self.kwargs = kwargs
88         self.todo = 1
89
90
91 class Thing:
92
93     def __init__(self, type_, position):
94         self.type = type_
95         self.position = position
96         self.task = Task('wait')
97
98     def task_wait(self):
99         pass
100
101     def task_move(self, direction):
102         if direction == 'UP':
103             self.position[0] -= 1
104         elif direction == 'DOWN':
105             self.position[0] += 1
106
107     def decide_task(self):
108         self.set_task('wait')
109
110     def set_task(self, task, *args, **kwargs):
111         self.task = Task(task, args, kwargs)
112
113     def proceed(self, is_AI=True):
114         """Further the thing in its tasks.
115
116         Decrements .task.todo; if it thus falls to <= 0, enacts method whose
117         name is 'task_' + self.task.name and sets .task = None. If is_AI, calls
118         .decide_task to decide a self.task.
119         """
120         self.task.todo -= 1
121         if self.task.todo <= 0:
122             task= getattr(self, 'task_' + self.task.name)
123             task(*self.task.args, **self.task.kwargs)
124             self.task = None
125         if is_AI and self.task is None:
126             self.decide_task()
127
128
129 class World:
130
131     def __init__(self):
132         self.turn = 0
133         self.map_size = (5, 5)
134         self.map_ = 'xxxxx\n'+\
135                     'x...x\n'+\
136                     'x.X.x\n'+\
137                     'x...x\n'+\
138                     'xxxxx'
139         self.things = [Thing('human', [3, 3]), Thing('monster', [1, 1])]
140         self.player_i = 0
141         self.player = self.things[self.player_i]
142
143
144 def fib(n):
145     """Calculate n-th Fibonacci number. Very inefficiently."""
146     if n in (1, 2):
147         return 1
148     else:
149         return fib(n-1) + fib(n-2)
150
151
152 class ArgumentError(Exception):
153     pass
154
155
156 class CommandHandler:
157
158     def __init__(self, queues_out):
159         from multiprocessing import Pool
160         self.queues_out = queues_out
161         self.world = World()
162         # self.pool and self.pool_result are currently only needed by the FIB
163         # command and the demo of a parallelized game loop in cmd_inc_p.
164         self.pool = Pool()
165         self.pool_result = None
166
167     def send_to(self, connection_id, msg):
168         """Send msg to client of connection_id."""
169         self.queues_out[connection_id].put(msg)
170
171     def send_all(self, msg):
172         """Send msg to all clients."""
173         for connection_id in self.queues_out:
174             self.send_to(connection_id, msg)
175
176     def stringify_yx(self, tuple_):
177         """Transform tuple (y,x) into string 'Y:'+str(y)+',X:'+str(x)."""
178         return 'Y:' + str(tuple_[0]) + ',X:' + str(tuple_[1])
179
180     def proceed_to_next_player_turn(self, connection_id):
181         """Run game world turns until player can decide their next step.
182
183         Sends a 'TURN_FINISHED' message, then iterates through all non-player
184         things, on each step furthering them in their tasks (and letting them
185         decide new ones if they finish). The iteration order is: first all
186         things that come after the player in the world things list, then (after
187         incrementing the world turn) all that come before the player; then the
188         player's .proceed() is run, and if it does not finish his task, the
189         loop starts at the beginning. Once the player's task is finished, the
190         loop breaks, and client-relevant game data is sent.
191         """
192         self.send_all('TURN_FINISHED ' + str(self.world.turn))
193         while True:
194             for thing in self.world.things[self.world.player_i+1:]:
195                 thing.proceed()
196             self.world.turn += 1
197             for thing  in self.world.things[:self.world.player_i]:
198                 thing.proceed()
199             self.world.player.proceed(is_AI=False)
200             if self.world.player.task is None:
201                 break
202         self.send_all('NEW_TURN ' + str(self.world.turn))
203         self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
204         self.send_all('TERRAIN\n' + self.world.map_)
205         for thing in self.world.things:
206             self.send_all('THING TYPE:' + thing.type + ' '
207                           + self.stringify_yx(thing.position))
208
209     def cmd_fib(self, tokens, connection_id):
210         """Reply with n-th Fibonacci numbers, n taken from tokens[1:].
211
212         Numbers are calculated in parallel as far as possible, using fib().
213         A 'CALCULATING …' message is sent to caller before the result.
214         """
215         if len(tokens) < 2:
216             raise ArgumentError('FIB NEEDS AT LEAST ONE ARGUMENT')
217         numbers = []
218         for token in tokens[1:]:
219             if token == '0' or not token.isdigit():
220                 raise ArgumentError('FIB ARGUMENTS MUST BE INTEGERS > 0')
221             numbers += [int(token)]
222         self.send_to(connection_id, 'CALCULATING …')
223         results = self.pool.map(fib, numbers)
224         reply = ' '.join([str(r) for r in results])
225         self.send_to(connection_id, reply)
226
227     def cmd_inc_p(self, connection_id):
228         """Increment world.turn, send game turn data to everyone.
229
230         To simulate game processing waiting times, a one second delay between
231         TURN_FINISHED and NEW_TURN occurs; after NEW_TURN, some expensive
232         calculations are started as pool processes that need to be finished
233         until a further INC finishes the turn.
234
235         This is just a demo structure for how the game loop could work when
236         parallelized. One might imagine a two-step game turn, with a non-action
237         step determining actor tasks (the AI determinations would take the
238         place of the fib calculations here), and an action step wherein these
239         tasks are performed (where now sleep(1) is).
240         """
241         from time import sleep
242         if self.pool_result is not None:
243             self.pool_result.wait()
244         self.send_all('TURN_FINISHED ' + str(self.world.turn))
245         sleep(1)
246         self.world.turn += 1
247         self.send_all('NEW_TURN ' + str(self.world.turn))
248         self.send_all('MAP_SIZE ' + self.stringify_yx(self.world.map_size))
249         self.send_all('TERRAIN\n' + self.world.map_)
250         for thing in self.world.things:
251             self.send_all('THING TYPE:' + thing.type + ' '
252                           + self.stringify_yx(thing.position))
253         self.pool_result = self.pool.map_async(fib, (35, 35))
254
255     def cmd_get_turn(self, connection_id):
256         """Send world.turn to caller."""
257         self.send_to(connection_id, str(self.world.turn))
258
259     def cmd_move(self, direction, connection_id):
260         """Set player task to 'move' with direction arg, finish player turn."""
261         if not direction in {'UP', 'DOWN'}:
262             raise ArgumentError('MOVE ARGUMENT MUST BE "UP" or "DOWN"')
263         self.world.player.set_task('move', direction=direction)
264         self.proceed_to_next_player_turn(connection_id)
265
266     def cmd_wait(self, connection_id):
267         """Set player task to 'wait', finish player turn."""
268         self.world.player.set_task('wait')
269         self.proceed_to_next_player_turn(connection_id)
270
271     def cmd_echo(self, tokens, input_, connection_id):
272         """Send message in input_ beyond tokens[0] to caller."""
273         msg = input_[len(tokens[0]) + 1:]
274         self.send_to(connection_id, msg)
275
276     def cmd_all(self, tokens, input_):
277         """Send message in input_ beyond tokens[0] to all clients."""
278         msg = input_[len(tokens[0]) + 1:]
279         self.send_all(msg)
280
281     def handle_input(self, input_, connection_id):
282         """Process input_ to command grammar, call command handler if found."""
283         tokens = [token for token in input_.split(' ') if len(token) > 0]
284         try:
285             if len(tokens) == 0:
286                 self.send_to(connection_id, 'EMPTY COMMAND')
287             elif len(tokens) == 1 and tokens[0] == 'INC_P':
288                 self.cmd_inc_p(connection_id)
289             elif len(tokens) == 1 and tokens[0] == 'GET_TURN':
290                 self.cmd_get_turn(connection_id)
291             elif len(tokens) == 1 and tokens[0] == 'WAIT':
292                 self.cmd_wait(connection_id)
293             elif len(tokens) == 2 and tokens[0] == 'MOVE':
294                 self.cmd_move(tokens[1], connection_id)
295             elif len(tokens) >= 1 and tokens[0] == 'ECHO':
296                 self.cmd_echo(tokens, input_, connection_id)
297             elif len(tokens) >= 1 and tokens[0] == 'ALL':
298                 self.cmd_all(tokens, input_)
299             elif len(tokens) >= 1 and tokens[0] == 'FIB':
300                 # TODO: Should this really block the whole loop?
301                 self.cmd_fib(tokens, connection_id)
302             else:
303                 self.send_to(connection_id, 'UNKNOWN COMMAND')
304         except ArgumentError as e:
305             self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))
306
307
308 def io_loop(q):
309     """Handle commands coming through queue q, send results back.
310
311     Commands from q are expected to be tuples, with the first element either
312     'ADD_QUEUE', 'COMMAND', or 'KILL_QUEUE', the second element a UUID, and
313     an optional third element of arbitrary type. The UUID identifies a
314     receiver for replies.
315
316     An 'ADD_QUEUE' command should contain as third element a queue through
317     which to send messages back to the sender of the command. A 'KILL_QUEUE'
318     command removes the queue for that receiver from the list of queues through
319     which to send replies.
320
321     A 'COMMAND' command is specified in greater detail by a string that is the
322     tuple's third element. CommandHandler takes care of processing this and
323     sending out replies.
324     """
325     queues_out = {}
326     command_handler = CommandHandler(queues_out)
327     while True:
328         x = q.get()
329         command_type = x[0]
330         connection_id = x[1]
331         content = None if len(x) == 2 else x[2]
332         if command_type == 'ADD_QUEUE':
333             queues_out[connection_id] = content
334         elif command_type == 'COMMAND':
335             command_handler.handle_input(content, connection_id)
336         elif command_type == 'KILL_QUEUE':
337             del queues_out[connection_id]
338
339
340 q = queue.Queue()
341 c = threading.Thread(target=io_loop, daemon=True, args=(q,))
342 c.start()
343 server = Server(q, ('localhost', 5000), IO_Handler)
344 try:
345     server.serve_forever()
346 except KeyboardInterrupt:
347     pass
348 finally:
349     print('Killing server')
350     server.server_close()