home · contact · privacy
Add some more checks against evil URLs.
[plomlombot-irc.git] / plomlombot.py
1 import socket
2 import datetime
3 import select
4 import time
5 import re
6 import urllib.request
7 import html
8
9 servernet = "irc.freenode.net"
10 port = 6667
11 servername = ""
12 timeout = 240
13 username = "plomlombot"
14 nickname = username
15 channel = "#zrolaps-test"
16
17 class IO:
18
19     def __init__(self, server, port):
20         self.socket = socket.socket()
21         self.socket.connect((server, port))
22         self.socket.setblocking(0)
23         self.line_buffer = []
24         self.rune_buffer = ""
25         self.last_pong = time.time()
26
27     def _pingtest(self):
28         if self.last_pong + timeout < time.time():
29             raise RuntimeError("server not answering")
30         self.send_line("PING " + nickname + " " + servername)
31
32     def send_line(self, msg):
33         msg = msg.replace("\r", " ")
34         msg = msg.replace("\n", " ")
35         if len(msg.encode("utf-8")) > 510:
36             print("NOT SENT LINE TO SERVER (too long): " + msg)
37         print("LINE TO SERVER: "
38             + str(datetime.datetime.now()) + ": " + msg)
39         msg = msg + "\r\n"
40         msg_len = len(msg)
41         total_sent_len = 0
42         while total_sent_len < msg_len:
43             sent_len = self.socket.send(bytes(msg[total_sent_len:], "UTF-8"))
44             if sent_len == 0:
45                 raise RuntimeError("socket connection broken")
46             total_sent_len += sent_len
47
48     def recv_line_wrapped(self):
49         if len(self.line_buffer) > 0:
50             return self.line_buffer.pop(0)
51         while True:
52             ready = select.select([self.socket], [], [], int(timeout / 2))
53             if not ready[0]:
54                 self._pingtest()
55                 return None
56             self.last_pong = time.time()
57             received_runes = self.socket.recv(1024).decode("UTF-8")
58             if len(received_runes) == 0:
59                 raise RuntimeError("socket connection broken")
60             self.rune_buffer += received_runes 
61             lines_split = str.split(self.rune_buffer, "\r\n")
62             self.line_buffer += lines_split[:-1]
63             self.rune_buffer = lines_split[-1]
64             if len(self.line_buffer) > 0:
65                 return self.line_buffer.pop(0)
66
67     def recv_line(self):
68         line = self.recv_line_wrapped()
69         if line:
70             print("LINE FROM SERVER " + str(datetime.datetime.now()) + ": " +
71             line)
72         return line
73
74 io = IO(servernet, port)
75 io.send_line("NICK " + nickname)
76 io.send_line("USER " + username + " 0 * : ")
77 io.send_line("JOIN " + channel)
78 servername = io.recv_line().split(" ")[0][1:]
79 while 1:
80     line = io.recv_line()
81     if not line:
82         continue
83     tokens = line.split(" ")
84     if len(tokens) > 1:
85         if tokens[1] == "PRIVMSG":
86             sender = ""
87             for rune in tokens[0]:
88                 if rune == "!":
89                     break
90                 if rune != ":":
91                     sender += rune
92             receiver = ""
93             for rune in tokens[2]:
94                 if rune == "!":
95                     break
96                 if rune != ":":
97                     receiver += rune
98             target = sender
99             if receiver != nickname:
100                 target = receiver
101             msg = str.join(" ", tokens[3:])[1:]
102             matches = re.findall("(https?://[^\s]+)", msg)
103             for i in range(len(matches)):
104                 url = matches[i]
105                 webpage = urllib.request.urlopen(url)
106                 content_type = webpage.info().get_content_type()
107                 charset = webpage.info().get_content_charset()
108                 if not charset or not content_type in ('text/html', 'text/xml',
109                     'application/xhtml+xml'):
110                     continue
111                 content = webpage.read().decode(charset)
112                 title = str(content).split('<title>')[1].split('</title>')[0]
113                 title = html.unescape(title)
114                 io.send_line("PRIVMSG "
115                     + target + " :page title for url: " + title)
116         if tokens[0] == "PING":
117             io.send_line("PONG " + tokens[1])