home · contact · privacy
Merge branch 'master' of github.com:plomlompom/url-catcher
[url-catcher] / url_catcher.py
1 #!/usr/bin/python3
2
3 import bottle
4 import validators
5 import html
6 import os
7 import os.path
8 import time
9 import json
10 import smtplib
11 import email.mime.text
12 import tempfile
13 import shutil
14
15 slowdown_reset = 60 * 60 * 24
16 ips_dir = 'ips'
17 lists_dir = 'lists'
18 captchas_dir = 'captchas'
19 customizations_path = 'customizations.json'
20 messages = {
21     'internalServerError': 'Internal server error.',
22     'badPageName': 'Bad page name.',
23     'wrongCaptcha': 'Wrong captcha.',
24     'invalidURL': 'Invalid URL.',
25     'recordedURL': 'Recorded URL: ',
26     'pleaseWait': 'Too many attempts from your IP. Wait this many seconds: ',
27     'mailSubject': '[url_catcher.py] New URL submitted',
28     'mailBodyPage': 'New URL submitted for page: ',
29     'mailBodyURL': 'URL is: ',
30 }
31 mail_config = {
32     'from': 'foo@example.org',
33     'to': 'bar@example.org',
34 }
35 if os.path.isfile(customizations_path):
36     customizations_file = open(customizations_path)
37     customizations = json.load(customizations_file)
38     customizations_file.close()
39     for key in customizations['translations']:
40         messages[key] = customizations['translations'][key]
41     for key in customizations['mailConfig']:
42         mail_config[key] = customizations['mailConfig'][key]
43     if 'slowdownReset' in customizations:
44         slowdown_reset = customizations['slowdownReset']
45 os.makedirs(ips_dir, exist_ok=True)
46 os.makedirs(lists_dir, exist_ok=True)
47
48
49 def atomic_write(path, content, mode):
50     """Atomic write/append to file."""
51     _, tmpPath = tempfile.mkstemp()
52     if 'a' == mode and os.path.exists(path):
53         shutil.copy2(path, tmpPath)
54     f = open(tmpPath, mode)
55     f.write(content)
56     f.flush()
57     os.fsync(f.fileno())
58     f.close()
59     os.rename(tmpPath, path)
60
61
62 def send_mail(page, url):
63     """Send mail telling about page URL list update."""
64     body = messages['mailBodyPage'] + page + '\n' + messages['mailBodyURL'] + \
65         url
66     msg = email.mime.text.MIMEText(body)
67     msg['Subject'] = messages['mailSubject']
68     msg['From'] = mail_config['from']
69     msg['To'] = mail_config['to']
70     s = smtplib.SMTP('localhost')
71     s.send_message(msg)
72     s.quit()
73
74
75 @bottle.error(500)
76 def internal_error(error):
77     """If trouble, don't leak bottle.py's detailed error description."""
78     return messages['internalServerError']
79
80
81 @bottle.post('/uwsgi/post_link')
82 def post_link():
83     """Record URL if all sane, send mail to curator."""
84
85     # Slow down repeat requests.
86     now = int(time.time())
87     start_date = now
88     attempts = 0
89     rewrite = True
90     ip = bottle.request.environ.get('REMOTE_ADDR')
91     ip_file_path = ips_dir + '/' + ip
92     try:
93         if os.path.isfile(ip_file_path):
94             ip_file = open(ip_file_path, 'r')
95             ip_data = ip_file.readlines()
96             ip_file.close()
97             old_start_date = int(ip_data[0])
98             if old_start_date + slowdown_reset > now:
99                 attempts = int(ip_data[1])
100                 start_date = old_start_date
101                 wait_period = 2**attempts
102                 if start_date + wait_period > now:
103                     limit = min(start_date + wait_period,
104                         start_date + slowdown_reset)
105                     rewrite = False
106                     remaining_wait = limit - now
107                     msg = messages['pleaseWait'] + str(remaining_wait)
108                     return bottle.HTTPResponse(msg, 429,
109                         {'Retry-After': str(remaining_wait)})
110                 attempts += 1 
111     except:
112         raise
113     finally:
114         if rewrite:
115             atomic_write(ip_file_path,
116                 str(start_date) + '\n' + str(attempts), 'w')
117
118     # Derive page / page file name.
119     page = bottle.request.forms.get('page')
120     if '\0' in page or '/' in page or '.' in page or len(page.encode()) > 255:
121         return bottle.HTTPResponse(messages['badPageName'], 400)
122
123     # Test captcha.
124     captcha_file = open(captchas_dir + '/' + page, 'r')
125     captcha_correct = captcha_file.readline().rstrip()
126     captcha_file.close()
127     captcha_input = bottle.request.forms.get('captcha')
128     if captcha_correct != captcha_input:
129         return bottle.HTTPResponse(messages['wrongCaptcha'], 400)
130
131     # Record URL.
132     url = bottle.request.forms.get('url')
133     if not validators.url(url):
134         return bottle.HTTPResponse(messages['invalidURL'], 400)
135     send_mail(page, url)
136     atomic_write(lists_dir + '/' + page, url + '\n', 'a')
137     url_html = html.escape(url)
138
139     # Response body.
140     return messages['recordedURL'] + url_html
141
142
143 bottle.debug(True)
144 # Non-uWSGI mode.
145 if __name__ == '__main__':
146     bottle.run(host='localhost', port=8080)
147 # uWSGI mode.
148 else:
149     app = application = bottle.default_app()