home · contact · privacy
Refactor client connection code.
[plomrogue2] / plomrogue_client / socket.py
1 #!/usr/bin/env python3
2 import queue
3 import threading
4 import time
5 import datetime
6 from plomrogue.errors import BrokenSocketConnection
7 from plomrogue.io_tcp import PlomSocket
8 from ws4py.client import WebSocketBaseClient
9
10
11
12 class WebSocketClient(WebSocketBaseClient):
13
14     def __init__(self, recv_handler, *args, **kwargs):
15         super().__init__(*args, **kwargs)
16         self.recv_handler = recv_handler
17         self.connect()
18
19     def received_message(self, message):
20         if message.is_text:
21             message = str(message)
22             self.recv_handler(message)
23
24     @property
25     def plom_closed(self):
26         return self.client_terminated
27
28
29
30 class PlomSocketClient(PlomSocket):
31
32     def __init__(self, recv_handler, url):
33         import socket
34         self.recv_handler = recv_handler
35         host, port = url.split(':')
36         super().__init__(socket.create_connection((host, port)))
37
38     def close(self):
39         self.socket.close()
40
41     def run(self):
42         import ssl
43         try:
44             for msg in self.recv():
45                 if msg == 'NEED_SSL':
46                     self.socket = ssl.wrap_socket(self.socket)
47                     continue
48                 self.recv_handler(msg)
49         except BrokenSocketConnection:
50             pass  # we assume socket will be known as dead by now
51
52
53
54 class ClientSocket():
55
56     def __init__(self, host, logger=None):
57         self.socket = None
58         self.host = host
59         self.queue = queue.Queue()
60         self.disconnected = True
61         self.force_instant_connect = True
62         self.interval = datetime.timedelta(seconds=5)
63         self.last_ping = datetime.datetime.now() - self.interval
64         self.logger = logger
65
66     def log(self, msg):
67         if self.logger:
68             self.logger(msg)
69
70     def connect(self):
71
72         def handle_recv(msg):
73             if msg == 'BYE':
74                 self.socket.close()
75             else:
76                 self.queue.put(msg)
77
78         self.log('attempting connect')
79         socket_client_class = PlomSocketClient
80         if self.host.startswith('ws://') or self.host.startswith('wss://'):
81             socket_client_class = WebSocketClient
82         try:
83             self.socket = socket_client_class(handle_recv, self.host)
84             self.socket_thread = threading.Thread(target=self.socket.run)
85             self.socket_thread.start()
86             self.disconnected = False
87             time.sleep(0.1)  # give potential SSL negotation some time …
88             self.log('connected')
89         except ConnectionRefusedError:
90             self.log('server connect failure')
91             self.disconnected = True
92
93     def send(self, msg):
94         try:
95             if self.socket is None:
96                 raise BrokenSocketConnection
97             if hasattr(self.socket, 'plom_closed') and self.socket.plom_closed:
98                 raise BrokenSocketConnection
99             self.socket.send(msg)
100         except (BrokenPipeError, BrokenSocketConnection):
101             self.log('server disconnected :(')
102             self.disconnected = True
103             self.force_instant_connect = True
104
105     def keep_connection_alive(self):
106         if self.disconnected and self.force_instant_connect:
107             self.force_instant_connect = False
108             self.connect()
109         now = datetime.datetime.now()
110         if now - self.last_ping > self.interval:
111             if self.disconnected:
112                 self.connect()
113             else:
114                 self.send('PING')
115             self.last_ping = now
116
117     def get_message(self):
118         while True:
119             try:
120                 msg = self.queue.get(block=False)
121                 yield msg 
122             except queue.Empty:
123                 break
124         return None