home · contact · privacy
Add most basic Todo family relations.
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from typing import Any
3 from http.server import BaseHTTPRequestHandler
4 from http.server import HTTPServer
5 from urllib.parse import urlparse, parse_qs
6 from os.path import split as path_split
7 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
8 from plomtask.days import Day, todays_date
9 from plomtask.exceptions import HandledException, BadFormatException, \
10         NotFoundException
11 from plomtask.db import DatabaseConnection, DatabaseFile
12 from plomtask.processes import Process
13 from plomtask.todos import Todo
14
15 TEMPLATES_DIR = 'templates'
16
17
18 class TaskServer(HTTPServer):
19     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
20
21     def __init__(self, db_file: DatabaseFile,
22                  *args: Any, **kwargs: Any) -> None:
23         super().__init__(*args, **kwargs)
24         self.db = db_file
25         self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
26
27
28 class ParamsParser:
29     """Wrapper for validating and retrieving GET params."""
30
31     def __init__(self, params: dict[str, list[str]]) -> None:
32         self.params = params
33
34     def get_str(self, key: str, default: str = '') -> str:
35         """Retrieve string value of key from self.params."""
36         if key not in self.params or 0 == len(self.params[key]):
37             return default
38         return self.params[key][0]
39
40     def get_int_or_none(self, key: str) -> int | None:
41         """Retrieve int value of key from self.params, on empty return None."""
42         if key not in self.params or \
43                 0 == len(''.join(list(self.params[key]))):
44             return None
45         val_str = self.params[key][0]
46         try:
47             return int(val_str)
48         except ValueError as e:
49             raise BadFormatException(f'Bad ?{key}= value: {val_str}') from e
50
51
52 class PostvarsParser:
53     """Postvars wrapper for validating and retrieving form data."""
54
55     def __init__(self, postvars: dict[str, list[str]]) -> None:
56         self.postvars = postvars
57
58     def get_str(self, key: str) -> str:
59         """Retrieve string value of key from self.postvars."""
60         all_str = self.get_all_str(key)
61         if 0 == len(all_str):
62             raise BadFormatException(f'missing value for key: {key}')
63         return all_str[0]
64
65     def get_int(self, key: str) -> int:
66         """Retrieve int value of key from self.postvars."""
67         val = self.get_str(key)
68         try:
69             return int(val)
70         except ValueError as e:
71             msg = f'cannot int form field value: {val}'
72             raise BadFormatException(msg) from e
73
74     def get_int_or_none(self, key: str) -> int | None:
75         """Retrieve int value of key from self.postvars, or None."""
76         if key not in self.postvars or \
77                 0 == len(''.join(list(self.postvars[key]))):
78             return None
79         return self.get_int(key)
80
81     def get_float(self, key: str) -> float:
82         """Retrieve float value of key from self.postvars."""
83         val = self.get_str(key)
84         try:
85             return float(val)
86         except ValueError as e:
87             msg = f'cannot float form field value: {val}'
88             raise BadFormatException(msg) from e
89
90     def get_all_str(self, key: str) -> list[str]:
91         """Retrieve list of string values at key from self.postvars."""
92         if key not in self.postvars:
93             return []
94         return self.postvars[key]
95
96     def get_all_int(self, key: str) -> list[int]:
97         """Retrieve list of int values at key from self.postvars."""
98         all_str = self.get_all_str(key)
99         try:
100             return [int(s) for s in all_str if len(s) > 0]
101         except ValueError as e:
102             msg = f'cannot int a form field value: {all_str}'
103             raise BadFormatException(msg) from e
104
105
106 class TaskHandler(BaseHTTPRequestHandler):
107     """Handles single HTTP request."""
108     server: TaskServer
109
110     def do_GET(self) -> None:
111         """Handle any GET request."""
112         try:
113             conn, site, params = self._init_handling()
114             if site in {'calendar', 'day', 'process', 'processes', 'todo'}:
115                 html = getattr(self, f'do_GET_{site}')(conn, params)
116             elif '' == site:
117                 self._redirect('/day')
118                 return
119             else:
120                 raise NotFoundException(f'Unknown page: /{site}')
121             self._send_html(html)
122         except HandledException as error:
123             self._send_msg(error, code=error.http_code)
124         finally:
125             conn.close()
126
127     def do_GET_calendar(self, conn: DatabaseConnection,
128                         params: ParamsParser) -> str:
129         """Show Days from ?start= to ?end=."""
130         start = params.get_str('start')
131         end = params.get_str('end')
132         days = Day.all(conn, date_range=(start, end), fill_gaps=True)
133         return self.server.jinja.get_template('calendar.html').render(
134                 days=days, start=start, end=end)
135
136     def do_GET_day(self, conn: DatabaseConnection,
137                    params: ParamsParser) -> str:
138         """Show single Day of ?date=."""
139         date = params.get_str('date', todays_date())
140         day = Day.by_date(conn, date, create=True)
141         todos = Todo.by_date(conn, date)
142         return self.server.jinja.get_template('day.html').render(
143                 day=day, processes=Process.all(conn), todos=todos)
144
145     def do_GET_todo(self, conn: DatabaseConnection, params:
146                     ParamsParser) -> str:
147         """Show single Todo of ?id=."""
148         id_ = params.get_int_or_none('id')
149         todo = Todo.by_id(conn, id_)
150         candidates = Todo.by_date(conn, todo.day.date)
151         return self.server.jinja.get_template('todo.html').render(
152                 todo=todo, candidates=candidates)
153
154     def do_GET_process(self, conn: DatabaseConnection,
155                        params: ParamsParser) -> str:
156         """Show process of ?id=."""
157         id_ = params.get_int_or_none('id')
158         process = Process.by_id(conn, id_, create=True)
159         owners = process.used_as_step_by(conn)
160         return self.server.jinja.get_template('process.html').render(
161                 process=process, steps=process.get_steps(conn),
162                 owners=owners, candidates=Process.all(conn))
163
164     def do_GET_processes(self, conn: DatabaseConnection,
165                          _: ParamsParser) -> str:
166         """Show all Processes."""
167         return self.server.jinja.get_template('processes.html').render(
168                 processes=Process.all(conn))
169
170     def do_POST(self) -> None:
171         """Handle any POST request."""
172         try:
173             conn, site, params = self._init_handling()
174             length = int(self.headers['content-length'])
175             postvars = parse_qs(self.rfile.read(length).decode(),
176                                 keep_blank_values=True, strict_parsing=True)
177             form_data = PostvarsParser(postvars)
178             if site in ('day', 'process', 'todo'):
179                 getattr(self, f'do_POST_{site}')(conn, params, form_data)
180                 conn.commit()
181             else:
182                 msg = f'Page not known as POST target: /{site}'
183                 raise NotFoundException(msg)
184             self._redirect('/')
185         except HandledException as error:
186             self._send_msg(error, code=error.http_code)
187         finally:
188             conn.close()
189
190     def do_POST_day(self, conn: DatabaseConnection, params: ParamsParser,
191                     form_data: PostvarsParser) -> None:
192         """Update or insert Day of date and Todos mapped to it."""
193         date = params.get_str('date')
194         day = Day.by_date(conn, date, create=True)
195         day.comment = form_data.get_str('comment')
196         day.save(conn)
197         process_id = form_data.get_int_or_none('new_todo')
198         if process_id is not None:
199             process = Process.by_id(conn, process_id)
200             todo = Todo(None, process, False, day)
201             todo.save(conn)
202
203     def do_POST_todo(self, conn: DatabaseConnection, params: ParamsParser,
204                      form_data: PostvarsParser) -> None:
205         """Update Todo and its children."""
206         id_ = params.get_int_or_none('id')
207         todo = Todo.by_id(conn, id_)
208         child_id = form_data.get_int_or_none('adopt')
209         if child_id is not None:
210             child = Todo.by_id(conn, child_id)
211             todo.add_child(child)
212         todo.save(conn)
213
214     def do_POST_process(self, conn: DatabaseConnection, params: ParamsParser,
215                         form_data: PostvarsParser) -> None:
216         """Update or insert Process of ?id= and fields defined in postvars."""
217         id_ = params.get_int_or_none('id')
218         process = Process.by_id(conn, id_, create=True)
219         process.title.set(form_data.get_str('title'))
220         process.description.set(form_data.get_str('description'))
221         process.effort.set(form_data.get_float('effort'))
222         process.save_without_steps(conn)
223         assert process.id_ is not None  # for mypy
224         process.explicit_steps = []
225         for step_id in form_data.get_all_int('steps'):
226             for step_process_id in\
227                     form_data.get_all_int(f'new_step_to_{step_id}'):
228                 process.add_step(conn, None, step_process_id, step_id)
229             if step_id not in form_data.get_all_int('keep_step'):
230                 continue
231             step_process_id = form_data.get_int(f'step_{step_id}_process_id')
232             parent_id = form_data.get_int_or_none(f'step_{step_id}_parent_id')
233             process.add_step(conn, step_id, step_process_id, parent_id)
234         for step_process_id in form_data.get_all_int('new_top_step'):
235             process.add_step(conn, None, step_process_id, None)
236         process.fix_steps(conn)
237
238     def _init_handling(self) -> tuple[DatabaseConnection, str, ParamsParser]:
239         conn = DatabaseConnection(self.server.db)
240         parsed_url = urlparse(self.path)
241         site = path_split(parsed_url.path)[1]
242         params = ParamsParser(parse_qs(parsed_url.query, strict_parsing=True))
243         return conn, site, params
244
245     def _redirect(self, target: str) -> None:
246         self.send_response(302)
247         self.send_header('Location', target)
248         self.end_headers()
249
250     def _send_html(self, html: str, code: int = 200) -> None:
251         """Send HTML as proper HTTP response."""
252         self.send_response(code)
253         self.end_headers()
254         self.wfile.write(bytes(html, 'utf-8'))
255
256     def _send_msg(self, msg: Exception, code: int = 400) -> None:
257         """Send message in HTML formatting as HTTP response."""
258         html = self.server.jinja.get_template('msg.html').render(msg=msg)
259         self._send_html(html, code)