1 """Web server stuff."""
3 from http.server import BaseHTTPRequestHandler
4 from http.server import HTTPServer
5 from urllib.parse import urlparse, parse_qs
6 from os.path import split as path_split
7 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
8 from plomtask.days import Day, todays_date
9 from plomtask.exceptions import HandledException, BadFormatException, \
11 from plomtask.db import DatabaseConnection, DatabaseFile
12 from plomtask.processes import Process
13 from plomtask.todos import Todo
15 TEMPLATES_DIR = 'templates'
18 class TaskServer(HTTPServer):
19 """Variant of HTTPServer that knows .jinja as Jinja Environment."""
21 def __init__(self, db_file: DatabaseFile,
22 *args: Any, **kwargs: Any) -> None:
23 super().__init__(*args, **kwargs)
25 self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
29 """Wrapper for validating and retrieving GET params."""
31 def __init__(self, params: dict[str, list[str]]) -> None:
34 def get_str(self, key: str, default: str = '') -> str:
35 """Retrieve string value of key from self.params."""
36 if key not in self.params or 0 == len(self.params[key]):
38 return self.params[key][0]
40 def get_int_or_none(self, key: str) -> int | None:
41 """Retrieve int value of key from self.params, on empty return None."""
42 if key not in self.params or \
43 0 == len(''.join(list(self.params[key]))):
45 val_str = self.params[key][0]
48 except ValueError as e:
49 raise BadFormatException(f'Bad ?{key}= value: {val_str}') from e
53 """Postvars wrapper for validating and retrieving form data."""
55 def __init__(self, postvars: dict[str, list[str]]) -> None:
56 self.postvars = postvars
58 def get_str(self, key: str) -> str:
59 """Retrieve string value of key from self.postvars."""
60 all_str = self.get_all_str(key)
62 raise BadFormatException(f'missing value for key: {key}')
65 def get_int(self, key: str) -> int:
66 """Retrieve int value of key from self.postvars."""
67 val = self.get_str(key)
70 except ValueError as e:
71 msg = f'cannot int form field value: {val}'
72 raise BadFormatException(msg) from e
74 def get_int_or_none(self, key: str) -> int | None:
75 """Retrieve int value of key from self.postvars, or None."""
76 if key not in self.postvars or \
77 0 == len(''.join(list(self.postvars[key]))):
79 return self.get_int(key)
81 def get_float(self, key: str) -> float:
82 """Retrieve float value of key from self.postvars."""
83 val = self.get_str(key)
86 except ValueError as e:
87 msg = f'cannot float form field value: {val}'
88 raise BadFormatException(msg) from e
90 def get_all_str(self, key: str) -> list[str]:
91 """Retrieve list of string values at key from self.postvars."""
92 if key not in self.postvars:
94 return self.postvars[key]
96 def get_all_int(self, key: str) -> list[int]:
97 """Retrieve list of int values at key from self.postvars."""
98 all_str = self.get_all_str(key)
100 return [int(s) for s in all_str if len(s) > 0]
101 except ValueError as e:
102 msg = f'cannot int a form field value: {all_str}'
103 raise BadFormatException(msg) from e
106 class TaskHandler(BaseHTTPRequestHandler):
107 """Handles single HTTP request."""
110 def do_GET(self) -> None:
111 """Handle any GET request."""
113 conn, site, params = self._init_handling()
114 if site in {'calendar', 'day', 'process', 'processes', 'todo'}:
115 html = getattr(self, f'do_GET_{site}')(conn, params)
117 self._redirect('/day')
120 raise NotFoundException(f'Unknown page: /{site}')
121 self._send_html(html)
122 except HandledException as error:
123 self._send_msg(error, code=error.http_code)
127 def do_GET_calendar(self, conn: DatabaseConnection,
128 params: ParamsParser) -> str:
129 """Show Days from ?start= to ?end=."""
130 start = params.get_str('start')
131 end = params.get_str('end')
132 days = Day.all(conn, date_range=(start, end), fill_gaps=True)
133 return self.server.jinja.get_template('calendar.html').render(
134 days=days, start=start, end=end)
136 def do_GET_day(self, conn: DatabaseConnection,
137 params: ParamsParser) -> str:
138 """Show single Day of ?date=."""
139 date = params.get_str('date', todays_date())
140 day = Day.by_date(conn, date, create=True)
141 todos = Todo.by_date(conn, date)
142 return self.server.jinja.get_template('day.html').render(
143 day=day, processes=Process.all(conn), todos=todos)
145 def do_GET_todo(self, conn: DatabaseConnection, params:
146 ParamsParser) -> str:
147 """Show single Todo of ?id=."""
148 id_ = params.get_int_or_none('id')
149 todo = Todo.by_id(conn, id_)
150 candidates = Todo.by_date(conn, todo.day.date)
151 return self.server.jinja.get_template('todo.html').render(
152 todo=todo, candidates=candidates)
154 def do_GET_process(self, conn: DatabaseConnection,
155 params: ParamsParser) -> str:
156 """Show process of ?id=."""
157 id_ = params.get_int_or_none('id')
158 process = Process.by_id(conn, id_, create=True)
159 owners = process.used_as_step_by(conn)
160 return self.server.jinja.get_template('process.html').render(
161 process=process, steps=process.get_steps(conn),
162 owners=owners, candidates=Process.all(conn))
164 def do_GET_processes(self, conn: DatabaseConnection,
165 _: ParamsParser) -> str:
166 """Show all Processes."""
167 return self.server.jinja.get_template('processes.html').render(
168 processes=Process.all(conn))
170 def do_POST(self) -> None:
171 """Handle any POST request."""
173 conn, site, params = self._init_handling()
174 length = int(self.headers['content-length'])
175 postvars = parse_qs(self.rfile.read(length).decode(),
176 keep_blank_values=True, strict_parsing=True)
177 form_data = PostvarsParser(postvars)
178 if site in ('day', 'process', 'todo'):
179 getattr(self, f'do_POST_{site}')(conn, params, form_data)
182 msg = f'Page not known as POST target: /{site}'
183 raise NotFoundException(msg)
185 except HandledException as error:
186 self._send_msg(error, code=error.http_code)
190 def do_POST_day(self, conn: DatabaseConnection, params: ParamsParser,
191 form_data: PostvarsParser) -> None:
192 """Update or insert Day of date and Todos mapped to it."""
193 date = params.get_str('date')
194 day = Day.by_date(conn, date, create=True)
195 day.comment = form_data.get_str('comment')
197 process_id = form_data.get_int_or_none('new_todo')
198 if process_id is not None:
199 process = Process.by_id(conn, process_id)
200 todo = Todo(None, process, False, day)
203 def do_POST_todo(self, conn: DatabaseConnection, params: ParamsParser,
204 form_data: PostvarsParser) -> None:
205 """Update Todo and its children."""
206 id_ = params.get_int_or_none('id')
207 todo = Todo.by_id(conn, id_)
208 child_id = form_data.get_int_or_none('adopt')
209 if child_id is not None:
210 child = Todo.by_id(conn, child_id)
211 todo.add_child(child)
214 def do_POST_process(self, conn: DatabaseConnection, params: ParamsParser,
215 form_data: PostvarsParser) -> None:
216 """Update or insert Process of ?id= and fields defined in postvars."""
217 id_ = params.get_int_or_none('id')
218 process = Process.by_id(conn, id_, create=True)
219 process.title.set(form_data.get_str('title'))
220 process.description.set(form_data.get_str('description'))
221 process.effort.set(form_data.get_float('effort'))
222 process.save_without_steps(conn)
223 assert process.id_ is not None # for mypy
224 process.explicit_steps = []
225 for step_id in form_data.get_all_int('steps'):
226 for step_process_id in\
227 form_data.get_all_int(f'new_step_to_{step_id}'):
228 process.add_step(conn, None, step_process_id, step_id)
229 if step_id not in form_data.get_all_int('keep_step'):
231 step_process_id = form_data.get_int(f'step_{step_id}_process_id')
232 parent_id = form_data.get_int_or_none(f'step_{step_id}_parent_id')
233 process.add_step(conn, step_id, step_process_id, parent_id)
234 for step_process_id in form_data.get_all_int('new_top_step'):
235 process.add_step(conn, None, step_process_id, None)
236 process.fix_steps(conn)
238 def _init_handling(self) -> tuple[DatabaseConnection, str, ParamsParser]:
239 conn = DatabaseConnection(self.server.db)
240 parsed_url = urlparse(self.path)
241 site = path_split(parsed_url.path)[1]
242 params = ParamsParser(parse_qs(parsed_url.query, strict_parsing=True))
243 return conn, site, params
245 def _redirect(self, target: str) -> None:
246 self.send_response(302)
247 self.send_header('Location', target)
250 def _send_html(self, html: str, code: int = 200) -> None:
251 """Send HTML as proper HTTP response."""
252 self.send_response(code)
254 self.wfile.write(bytes(html, 'utf-8'))
256 def _send_msg(self, msg: Exception, code: int = 400) -> None:
257 """Send message in HTML formatting as HTTP response."""
258 html = self.server.jinja.get_template('msg.html').render(msg=msg)
259 self._send_html(html, code)