#!/usr/bin/env python3
import urwid
-import plom_socket_io
+import plom_socket_io
import socket
import threading
-class RecvThread(threading.Thread):
- """Background thread that delivers messages from the socket to urwid.
+def recv_loop(socket, urwid_pipe_write_fd, server_output):
+ """Loop to receive messages from socket and deliver them to urwid.
The message transfer to urwid is a bit weird. The urwid developers warn
against sharing urwid resources among threads, and recommend using urwid's
watch_pipe mechanism: using a pipe from non-urwid threads into a single
- urwid thread. We could pipe the recv output directly, but then we get
+ urwid thread. We could pipe socket.recv output directly, but then we get
complicated buffering situations here as well as in the urwid code that
receives the pipe content. It's much easier to update a third resource
(server_output, which references an object that's also known to the urwid
(urwid_pipe_write_fd) to trigger the urwid code to pull the message in from
that third resource. We send a single b' ' through the pipe to trigger it.
"""
-
- def __init__(self, socket, urwid_pipe_write_fd, server_output):
- super().__init__()
- self.socket = socket
- self.urwid_pipe = urwid_pipe_write_fd
- self.server_output = server_output
-
- def run(self):
- """On message receive, write to self.server_output, ping urwid pipe."""
- import os
- for msg in plom_socket_io.recv(self.socket):
- self.server_output[0] = msg
- os.write(self.urwid_pipe, b' ')
+ import os
+ for msg in plom_socket_io.recv(socket):
+ server_output[0] = msg
+ os.write(urwid_pipe_write_fd, b' ')
class InputHandler:
"""Helps delivering data from other thread to widget via message_container.
-
+
The whole class only exists to provide handle_input as a bound method, with
widget and message_container pre-set, as (bound) handle_input is used as a
callback in urwid's watch_pipe – which merely provides its callback target
if self.message_container[0] == 'BYE':
raise urwid.ExitMainLoop()
return
- self.widget.set_text('REPLY: ' + self.message_container[0])
+ self.widget.set_text('SERVER: ' + self.message_container[0])
class SocketInputWidget(urwid.Filler):
server_output = ['']
write_fd = loop.watch_pipe(getattr(InputHandler(txt, server_output),
'handle_input'))
-thread = RecvThread(s, write_fd, server_output)
+thread = threading.Thread(target=recv_loop,
+ kwargs={'socket': s, 'server_output': server_output,
+ 'urwid_pipe_write_fd': write_fd})
thread.start()
loop.run()
#!/usr/bin/env python3
import socketserver
-import plom_socket_io
+import plom_socket_io
+import threading
+import time
# Avoid "Address already in use" errors.
socketserver.TCPServer.allow_reuse_address = True
+class Server(socketserver.ThreadingTCPServer):
+ """Bind together threaded IO handling server and world state (counter)."""
+
+ def __init__(self, counter, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.counter = counter
+ self.daemon_threads = True # Else, server's threads have daemon=False.
+
+
def fib(n):
"""Calculate n-th Fibonacci number."""
if n in (1, 2):
return fib(n-1) + fib(n-2)
-def handle_message(message):
- """Evaluate message for computing-heavy tasks to perform, yield result.
-
- Accepts one command: FIB, followed by positive integers, all tokens
- separated by whitespace. Will calculate and return for each such integer n
- the n-th Fibonacci number. Uses multiprocessing to perform multiple such
- calculations in parallel. Yields a 'CALCULATING …' message before the
- calculation starts, and finally yields a message containing the results.
- (The 'CALCULATING …' message coming before the results message is currently
- the main reason this works as a generator function using yield.)
-
- When no command can be read into the message, just yields a 'NO COMMAND
- UNDERSTOOD:', followed by the message.
- """
- tokens = message.split(' ')
- if tokens[0] == 'FIB':
- msg_fail_fib = 'MALFORMED FIB REQUEST'
- if len(tokens) < 2:
- yield msg_fail_fib
- return
- numbers = []
- fail = False
- for token in tokens[1:]:
- if token != '0' and token.isdigit():
- numbers += [int(token)]
- elif token == '':
- continue
- else:
- yield msg_fail_fib
- return
- yield 'CALCULATING …'
- reply = ''
- from multiprocessing import Pool
- with Pool(len(numbers)) as p:
- results = p.map(fib, numbers)
- reply = ' '.join([str(r) for r in results])
- yield reply
- return
- yield 'NO COMMAND UNDERSTOOD: %s' % message
-
-
-class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
- """Enables threading on TCP server for asynchronous IO handling."""
- pass
-
-
-class MyTCPHandler(socketserver.BaseRequestHandler):
+class IO_Handler(socketserver.BaseRequestHandler):
def handle(self):
- """Loop recv for input, act on it, send reply.
+ """Loop recv for input, send replies; also, send regular counter value.
If input is 'QUIT', send reply 'BYE' and end loop / connection.
- Otherwise, use handle_message.
+ Otherwise, use handle_message to interpret and enact commands.
"""
+ def caught_send(socket, message):
+ """Send message by socket, catch broken socket connection error."""
+ try:
+ plom_socket_io.send(socket, message)
+ except plom_socket_io.BrokenSocketConnection:
+ pass
+
+ def send_counter_loop(socket, counter, kill):
+ """Every 5 seconds, send state of counter[0] until kill[0] set."""
+ while not kill[0]:
+ caught_send(socket, 'COUNTER ' + str(counter[0]))
+ time.sleep(5)
+
+ def handle_message(message):
+ """Evaluate message for tasks to perform, yield result.
+
+ Accepts one command: FIB, followed by positive integers, all tokens
+ separated by whitespace. Will calculate and return for each such
+ integer n the n-th Fibonacci number. Uses multiprocessing to
+ perform multiple such calculations in parallel. Yields a
+ 'CALCULATING …' message before the calculation starts, and finally
+ yields a message containing the results. (The 'CALCULATING …'
+ message coming before the results message is currently the main
+ reason this works as a generator function using yield.)
+
+ When no command can be read into the message, just yields a 'NO
+ COMMAND UNDERSTOOD:', followed by the message.
+ """
+ from multiprocessing import Pool
+ tokens = message.split(' ')
+ if tokens[0] == 'FIB':
+ msg_fail_fib = 'MALFORMED FIB REQUEST'
+ if len(tokens) < 2:
+ yield msg_fail_fib
+ return
+ numbers = []
+ for token in tokens[1:]:
+ if token != '0' and token.isdigit():
+ numbers += [int(token)]
+ elif token == '':
+ continue
+ else:
+ yield msg_fail_fib
+ return
+ yield 'CALCULATING …'
+ reply = ''
+ with Pool(len(numbers)) as p:
+ results = p.map(fib, numbers)
+ reply = ' '.join([str(r) for r in results])
+ yield reply
+ return
+ yield 'NO COMMAND UNDERSTOOD: %s' % message
print('CONNECTION FROM:', str(self.client_address))
+ counter_loop_killer = [False]
+ send_count = threading.Thread(target=send_counter_loop,
+ kwargs={'counter': self.server.counter,
+ 'socket': self.request,
+ 'kill': counter_loop_killer})
+ send_count.start()
for message in plom_socket_io.recv(self.request):
if message is None:
print('RECEIVED MALFORMED MESSAGE')
- plom_socket_io.send(self.request, 'bad message')
+ caught_send(self.request, 'bad message')
elif 'QUIT' == message:
- plom_socket_io.send(self.request, 'BYE')
+ caught_send(self.request, 'BYE')
break
else:
print('RECEIVED MESSAGE:', message)
for reply in handle_message(message):
- plom_socket_io.send(self.request, reply)
+ caught_send(self.request, reply)
+ counter_loop_killer = [True]
print('CONNECTION CLOSED:', str(self.client_address))
self.request.close()
-server = ThreadedTCPServer(('localhost', 5000), MyTCPHandler)
+def inc_loop(counter, interval):
+ """Loop incrementing counter every interval seconds."""
+ while True:
+ time.sleep(interval)
+ counter[0] += 1
+
+
+counter = [0]
+b = threading.Thread(target=inc_loop, daemon=True, kwargs={'counter': counter,
+ 'interval': 1})
+b.start()
+server = Server(counter, ('localhost', 5000), IO_Handler)
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
+ print('Killing server')
server.server_close()