home · contact · privacy
b00ebeba3aa06b38332be265599e43d7e034a857
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from typing import Any
3 from http.server import BaseHTTPRequestHandler
4 from http.server import HTTPServer
5 from urllib.parse import urlparse, parse_qs
6 from os.path import split as path_split
7 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
8 from plomtask.days import Day, todays_date
9 from plomtask.exceptions import HandledException, BadFormatException, \
10         NotFoundException
11 from plomtask.db import DatabaseConnection, DatabaseFile
12 from plomtask.processes import Process
13 from plomtask.conditions import Condition
14 from plomtask.todos import Todo
15
16 TEMPLATES_DIR = 'templates'
17
18
19 class TaskServer(HTTPServer):
20     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
21
22     def __init__(self, db_file: DatabaseFile,
23                  *args: Any, **kwargs: Any) -> None:
24         super().__init__(*args, **kwargs)
25         self.db = db_file
26         self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
27
28
29 class ParamsParser:
30     """Wrapper for validating and retrieving GET params."""
31
32     def __init__(self, params: dict[str, list[str]]) -> None:
33         self.params = params
34
35     def get_str(self, key: str, default: str = '') -> str:
36         """Retrieve string value of key from self.params."""
37         if key not in self.params or 0 == len(self.params[key]):
38             return default
39         return self.params[key][0]
40
41     def get_int_or_none(self, key: str) -> int | None:
42         """Retrieve int value of key from self.params, on empty return None."""
43         if key not in self.params or \
44                 0 == len(''.join(list(self.params[key]))):
45             return None
46         val_str = self.params[key][0]
47         try:
48             return int(val_str)
49         except ValueError as e:
50             raise BadFormatException(f'Bad ?{key}= value: {val_str}') from e
51
52
53 class PostvarsParser:
54     """Postvars wrapper for validating and retrieving form data."""
55
56     def __init__(self, postvars: dict[str, list[str]]) -> None:
57         self.postvars = postvars
58
59     def get_str(self, key: str) -> str:
60         """Retrieve string value of key from self.postvars."""
61         all_str = self.get_all_str(key)
62         if 0 == len(all_str):
63             raise BadFormatException(f'missing value for key: {key}')
64         return all_str[0]
65
66     def get_int(self, key: str) -> int:
67         """Retrieve int value of key from self.postvars."""
68         val = self.get_str(key)
69         try:
70             return int(val)
71         except ValueError as e:
72             msg = f'cannot int form field value: {val}'
73             raise BadFormatException(msg) from e
74
75     def get_int_or_none(self, key: str) -> int | None:
76         """Retrieve int value of key from self.postvars, or None."""
77         if key not in self.postvars or \
78                 0 == len(''.join(list(self.postvars[key]))):
79             return None
80         return self.get_int(key)
81
82     def get_float(self, key: str) -> float:
83         """Retrieve float value of key from self.postvars."""
84         val = self.get_str(key)
85         try:
86             return float(val)
87         except ValueError as e:
88             msg = f'cannot float form field value: {val}'
89             raise BadFormatException(msg) from e
90
91     def get_all_str(self, key: str) -> list[str]:
92         """Retrieve list of string values at key from self.postvars."""
93         if key not in self.postvars:
94             return []
95         return self.postvars[key]
96
97     def get_all_int(self, key: str) -> list[int]:
98         """Retrieve list of int values at key from self.postvars."""
99         all_str = self.get_all_str(key)
100         try:
101             return [int(s) for s in all_str if len(s) > 0]
102         except ValueError as e:
103             msg = f'cannot int a form field value: {all_str}'
104             raise BadFormatException(msg) from e
105
106
107 class TaskHandler(BaseHTTPRequestHandler):
108     """Handles single HTTP request."""
109     server: TaskServer
110
111     def do_GET(self) -> None:
112         """Handle any GET request."""
113         try:
114             conn, site, params = self._init_handling()
115             if site in {'calendar', 'day', 'process', 'processes', 'todo',
116                         'condition', 'conditions'}:
117                 html = getattr(self, f'do_GET_{site}')(conn, params)
118             elif '' == site:
119                 self._redirect('/day')
120                 return
121             else:
122                 raise NotFoundException(f'Unknown page: /{site}')
123             self._send_html(html)
124         except HandledException as error:
125             self._send_msg(error, code=error.http_code)
126         finally:
127             conn.close()
128
129     def do_GET_calendar(self, conn: DatabaseConnection,
130                         params: ParamsParser) -> str:
131         """Show Days from ?start= to ?end=."""
132         start = params.get_str('start')
133         end = params.get_str('end')
134         days = Day.all(conn, date_range=(start, end), fill_gaps=True)
135         return self.server.jinja.get_template('calendar.html').render(
136                 days=days, start=start, end=end)
137
138     def do_GET_day(self, conn: DatabaseConnection,
139                    params: ParamsParser) -> str:
140         """Show single Day of ?date=."""
141         date = params.get_str('date', todays_date())
142         day = Day.by_date(conn, date, create=True)
143         todos = Todo.by_date(conn, date)
144         conditions_listing = []
145         for condition in Condition.all(conn):
146             enablers = Todo.enablers_for_at(conn, condition, date)
147             disablers = Todo.disablers_for_at(conn, condition, date)
148             conditions_listing += [{
149                     'condition': condition,
150                     'enablers': enablers,
151                     'disablers': disablers}]
152         return self.server.jinja.get_template('day.html').render(
153                 day=day, processes=Process.all(conn), todos=todos,
154                 conditions_listing=conditions_listing)
155
156     def do_GET_todo(self, conn: DatabaseConnection, params:
157                     ParamsParser) -> str:
158         """Show single Todo of ?id=."""
159         id_ = params.get_int_or_none('id')
160         todo = Todo.by_id(conn, id_)
161         todo_candidates = Todo.by_date(conn, todo.day.date)
162         return self.server.jinja.get_template('todo.html').render(
163                 todo=todo, todo_candidates=todo_candidates,
164                 condition_candidates=Condition.all(conn))
165
166     def do_GET_conditions(self, conn: DatabaseConnection,
167                           _: ParamsParser) -> str:
168         """Show all Conditions."""
169         return self.server.jinja.get_template('conditions.html').render(
170                 conditions=Condition.all(conn))
171
172     def do_GET_condition(self, conn: DatabaseConnection,
173                          params: ParamsParser) -> str:
174         """Show Condition of ?id=."""
175         id_ = params.get_int_or_none('id')
176         condition = Condition.by_id(conn, id_, create=True)
177         return self.server.jinja.get_template('condition.html').render(
178                 condition=condition)
179
180     def do_GET_process(self, conn: DatabaseConnection,
181                        params: ParamsParser) -> str:
182         """Show process of ?id=."""
183         id_ = params.get_int_or_none('id')
184         process = Process.by_id(conn, id_, create=True)
185         owners = process.used_as_step_by(conn)
186         return self.server.jinja.get_template('process.html').render(
187                 process=process, steps=process.get_steps(conn),
188                 owners=owners, process_candidates=Process.all(conn),
189                 condition_candidates=Condition.all(conn))
190
191     def do_GET_processes(self, conn: DatabaseConnection,
192                          _: ParamsParser) -> str:
193         """Show all Processes."""
194         return self.server.jinja.get_template('processes.html').render(
195                 processes=Process.all(conn))
196
197     def do_POST(self) -> None:
198         """Handle any POST request."""
199         try:
200             conn, site, params = self._init_handling()
201             length = int(self.headers['content-length'])
202             postvars = parse_qs(self.rfile.read(length).decode(),
203                                 keep_blank_values=True, strict_parsing=True)
204             form_data = PostvarsParser(postvars)
205             if site in ('day', 'process', 'todo', 'condition'):
206                 getattr(self, f'do_POST_{site}')(conn, params, form_data)
207                 conn.commit()
208             else:
209                 msg = f'Page not known as POST target: /{site}'
210                 raise NotFoundException(msg)
211             self._redirect('/')
212         except HandledException as error:
213             self._send_msg(error, code=error.http_code)
214         finally:
215             conn.close()
216
217     def do_POST_day(self, conn: DatabaseConnection, params: ParamsParser,
218                     form_data: PostvarsParser) -> None:
219         """Update or insert Day of date and Todos mapped to it."""
220         date = params.get_str('date')
221         day = Day.by_date(conn, date, create=True)
222         day.comment = form_data.get_str('comment')
223         day.save(conn)
224         process_id = form_data.get_int_or_none('new_todo')
225         if process_id is not None:
226             process = Process.by_id(conn, process_id)
227             todo = Todo(None, process, False, day)
228             todo.save(conn)
229
230     def do_POST_todo(self, conn: DatabaseConnection, params: ParamsParser,
231                      form_data: PostvarsParser) -> None:
232         """Update Todo and its children."""
233         id_ = params.get_int_or_none('id')
234         todo = Todo.by_id(conn, id_)
235         child_id = form_data.get_int_or_none('adopt')
236         if child_id is not None:
237             child = Todo.by_id(conn, child_id)
238             todo.add_child(child)
239         todo.set_conditions(conn, form_data.get_all_int('condition'))
240         todo.set_fulfills(conn, form_data.get_all_int('fulfills'))
241         todo.set_undoes(conn, form_data.get_all_int('undoes'))
242         todo.is_done = len(form_data.get_all_str('done')) > 0
243         todo.save(conn)
244         for condition in todo.fulfills:
245             condition.save(conn)
246         for condition in todo.undoes:
247             condition.save(conn)
248
249     def do_POST_process(self, conn: DatabaseConnection, params: ParamsParser,
250                         form_data: PostvarsParser) -> None:
251         """Update or insert Process of ?id= and fields defined in postvars."""
252         id_ = params.get_int_or_none('id')
253         process = Process.by_id(conn, id_, create=True)
254         process.title.set(form_data.get_str('title'))
255         process.description.set(form_data.get_str('description'))
256         process.effort.set(form_data.get_float('effort'))
257         process.set_conditions(conn, form_data.get_all_int('condition'))
258         process.set_fulfills(conn, form_data.get_all_int('fulfills'))
259         process.set_undoes(conn, form_data.get_all_int('undoes'))
260         process.save_without_steps(conn)
261         assert process.id_ is not None  # for mypy
262         process.explicit_steps = []
263         for step_id in form_data.get_all_int('steps'):
264             for step_process_id in\
265                     form_data.get_all_int(f'new_step_to_{step_id}'):
266                 process.add_step(conn, None, step_process_id, step_id)
267             if step_id not in form_data.get_all_int('keep_step'):
268                 continue
269             step_process_id = form_data.get_int(f'step_{step_id}_process_id')
270             parent_id = form_data.get_int_or_none(f'step_{step_id}_parent_id')
271             process.add_step(conn, step_id, step_process_id, parent_id)
272         for step_process_id in form_data.get_all_int('new_top_step'):
273             process.add_step(conn, None, step_process_id, None)
274         process.fix_steps(conn)
275
276     def do_POST_condition(self, conn: DatabaseConnection, params: ParamsParser,
277                           form_data: PostvarsParser) -> None:
278         """Update/insert Condition of ?id= and fields defined in postvars."""
279         id_ = params.get_int_or_none('id')
280         condition = Condition.by_id(conn, id_, create=True)
281         condition.title.set(form_data.get_str('title'))
282         condition.description.set(form_data.get_str('description'))
283         condition.save(conn)
284
285     def _init_handling(self) -> tuple[DatabaseConnection, str, ParamsParser]:
286         conn = DatabaseConnection(self.server.db)
287         parsed_url = urlparse(self.path)
288         site = path_split(parsed_url.path)[1]
289         params = ParamsParser(parse_qs(parsed_url.query, strict_parsing=True))
290         return conn, site, params
291
292     def _redirect(self, target: str) -> None:
293         self.send_response(302)
294         self.send_header('Location', target)
295         self.end_headers()
296
297     def _send_html(self, html: str, code: int = 200) -> None:
298         """Send HTML as proper HTTP response."""
299         self.send_response(code)
300         self.end_headers()
301         self.wfile.write(bytes(html, 'utf-8'))
302
303     def _send_msg(self, msg: Exception, code: int = 400) -> None:
304         """Send message in HTML formatting as HTTP response."""
305         html = self.server.jinja.get_template('msg.html').render(msg=msg)
306         self._send_html(html, code)