home · contact · privacy
Improve clarity of request wrapping code.
[plomtask] / plomtask / http.py
index e9e23633a92ddd5b02d9402dfaefae357476b125..d61588a9f7627eb7e0ce1acff8fe211d7065ba1c 100644 (file)
@@ -1,7 +1,7 @@
 """Web server stuff."""
 from __future__ import annotations
 from dataclasses import dataclass
 """Web server stuff."""
 from __future__ import annotations
 from dataclasses import dataclass
-from typing import Any
+from typing import Any, Callable
 from base64 import b64encode, b64decode
 from http.server import BaseHTTPRequestHandler
 from http.server import HTTPServer
 from base64 import b64encode, b64decode
 from http.server import BaseHTTPRequestHandler
 from http.server import HTTPServer
@@ -111,24 +111,72 @@ class TaskHandler(BaseHTTPRequestHandler):
     """Handles single HTTP request."""
     # pylint: disable=too-many-public-methods
     server: TaskServer
     """Handles single HTTP request."""
     # pylint: disable=too-many-public-methods
     server: TaskServer
+    conn: DatabaseConnection
+    site: str
+    form_data: InputsParser
+    params: InputsParser
 
 
-    def do_GET(self) -> None:
-        """Handle any GET request."""
-        try:
-            self._init_handling()
-            if hasattr(self, f'do_GET_{self.site}'):
-                template = f'{self.site}.html'
-                ctx = getattr(self, f'do_GET_{self.site}')()
-                html = self.server.jinja.get_template(template).render(**ctx)
-                self._send_html(html)
-            elif '' == self.site:
-                self._redirect('/day')
-            else:
-                raise NotFoundException(f'Unknown page: /{self.site}')
-        except HandledException as error:
-            self._send_msg(error, code=error.http_code)
-        finally:
-            self.conn.close()
+    def send_html(self, html: str, code: int = 200) -> None:
+        """Send HTML as proper HTTP response."""
+        self.send_response(code)
+        self.end_headers()
+        self.wfile.write(bytes(html, 'utf-8'))
+
+    @staticmethod
+    def _request_wrapper(http_method: str, not_found_msg: str
+                         ) -> Callable[..., Callable[[TaskHandler], None]]:
+        def decorator(f: Callable[..., str | None]
+                      ) -> Callable[[TaskHandler], None]:
+            def wrapper(self: TaskHandler) -> None:
+                try:
+                    self.conn = DatabaseConnection(self.server.db)
+                    parsed_url = urlparse(self.path)
+                    self.site = path_split(parsed_url.path)[1]
+                    params = parse_qs(parsed_url.query, strict_parsing=True)
+                    self.params = InputsParser(params, False)
+                    handler_name = f'do_{http_method}_{self.site}'
+                    if hasattr(self, handler_name):
+                        handler = getattr(self, handler_name)
+                        redir_target = f(self, handler)
+                        if redir_target:
+                            self.send_response(302)
+                            self.send_header('Location', redir_target)
+                            self.end_headers()
+                    else:
+                        msg = f'{not_found_msg}: {self.site}'
+                        raise NotFoundException(msg)
+                except HandledException as error:
+                    html = self.server.jinja.\
+                            get_template('msg.html').render(msg=error)
+                    self.send_html(html, error.http_code)
+                finally:
+                    self.conn.close()
+            return wrapper
+        return decorator
+
+    @_request_wrapper('GET', 'Unknown page')
+    def do_GET(self, handler: Callable[[], str | dict[str, object]]
+               ) -> str | None:
+        """Render page with result of handler, or redirect if result is str."""
+        template = f'{self.site}.html'
+        ctx_or_redir = handler()
+        if str == type(ctx_or_redir):
+            return ctx_or_redir
+        assert isinstance(ctx_or_redir, dict)
+        html = self.server.jinja.get_template(template).render(**ctx_or_redir)
+        self.send_html(html)
+        return None
+
+    @_request_wrapper('POST', 'Unknown POST target')
+    def do_POST(self, handler: Callable[[], str]) -> str:
+        """Handle POST with handler, prepare redirection to result."""
+        length = int(self.headers['content-length'])
+        postvars = parse_qs(self.rfile.read(length).decode(),
+                            keep_blank_values=True, strict_parsing=True)
+        self.form_data = InputsParser(postvars)
+        redir_target = handler()
+        self.conn.commit()
+        return redir_target
 
     def _do_GET_calendar(self) -> dict[str, object]:
         """Show Days from ?start= to ?end=."""
 
     def _do_GET_calendar(self) -> dict[str, object]:
         """Show Days from ?start= to ?end=."""
@@ -144,6 +192,10 @@ class TaskHandler(BaseHTTPRequestHandler):
         today = date_in_n_days(0)
         return {'start': start, 'end': end, 'days': days, 'today': today}
 
         today = date_in_n_days(0)
         return {'start': start, 'end': end, 'days': days, 'today': today}
 
+    def do_GET_(self) -> str:
+        """Return redirect target on GET /."""
+        return '/day'
+
     def do_GET_calendar(self) -> dict[str, object]:
         """Show Days from ?start= to ?end= – normal view."""
         return self._do_GET_calendar()
     def do_GET_calendar(self) -> dict[str, object]:
         """Show Days from ?start= to ?end= – normal view."""
         return self._do_GET_calendar()
@@ -386,27 +438,6 @@ class TaskHandler(BaseHTTPRequestHandler):
             processes.sort(key=lambda p: p.title.newest)
         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
 
             processes.sort(key=lambda p: p.title.newest)
         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
 
-    def do_POST(self) -> None:
-        """Handle any POST request."""
-        # pylint: disable=attribute-defined-outside-init
-        try:
-            self._init_handling()
-            length = int(self.headers['content-length'])
-            postvars = parse_qs(self.rfile.read(length).decode(),
-                                keep_blank_values=True, strict_parsing=True)
-            self.form_data = InputsParser(postvars)
-            if hasattr(self, f'do_POST_{self.site}'):
-                redir_target = getattr(self, f'do_POST_{self.site}')()
-                self.conn.commit()
-            else:
-                msg = f'Page not known as POST target: /{self.site}'
-                raise NotFoundException(msg)
-            self._redirect(redir_target)
-        except HandledException as error:
-            self._send_msg(error, code=error.http_code)
-        finally:
-            self.conn.close()
-
     def do_POST_day(self) -> str:
         """Update or insert Day of date and Todos mapped to it."""
         date = self.params.get_str('date')
     def do_POST_day(self) -> str:
         """Update or insert Day of date and Todos mapped to it."""
         date = self.params.get_str('date')
@@ -482,7 +513,8 @@ class TaskHandler(BaseHTTPRequestHandler):
             todo.add_child(made)
         effort = self.form_data.get_str('effort', ignore_strict=True)
         todo.effort = float(effort) if effort else None
             todo.add_child(made)
         effort = self.form_data.get_str('effort', ignore_strict=True)
         todo.effort = float(effort) if effort else None
-        todo.set_conditions(self.conn, self.form_data.get_all_int('condition'))
+        todo.set_conditions(self.conn,
+                            self.form_data.get_all_int('condition'))
         todo.set_blockers(self.conn, self.form_data.get_all_int('blocker'))
         todo.set_enables(self.conn, self.form_data.get_all_int('enables'))
         todo.set_disables(self.conn, self.form_data.get_all_int('disables'))
         todo.set_blockers(self.conn, self.form_data.get_all_int('blocker'))
         todo.set_enables(self.conn, self.form_data.get_all_int('enables'))
         todo.set_disables(self.conn, self.form_data.get_all_int('disables'))
@@ -537,7 +569,8 @@ class TaskHandler(BaseHTTPRequestHandler):
                                self.form_data.get_all_int('condition'))
         process.set_blockers(self.conn, self.form_data.get_all_int('blocker'))
         process.set_enables(self.conn, self.form_data.get_all_int('enables'))
                                self.form_data.get_all_int('condition'))
         process.set_blockers(self.conn, self.form_data.get_all_int('blocker'))
         process.set_enables(self.conn, self.form_data.get_all_int('enables'))
-        process.set_disables(self.conn, self.form_data.get_all_int('disables'))
+        process.set_disables(self.conn,
+                             self.form_data.get_all_int('disables'))
         process.calendarize = self.form_data.get_all_str('calendarize') != []
         process.save(self.conn)
         assert isinstance(process.id_, int)
         process.calendarize = self.form_data.get_all_str('calendarize') != []
         process.save(self.conn)
         assert isinstance(process.id_, int)
@@ -570,7 +603,8 @@ class TaskHandler(BaseHTTPRequestHandler):
         process.uncache()
         process.set_steps(self.conn, steps)
         process.set_step_suppressions(self.conn,
         process.uncache()
         process.set_steps(self.conn, steps)
         process.set_step_suppressions(self.conn,
-                                      self.form_data.get_all_int('suppresses'))
+                                      self.form_data.
+                                      get_all_int('suppresses'))
         process.save(self.conn)
         owners_to_set = []
         new_owner_title = None
         process.save(self.conn)
         owners_to_set = []
         new_owner_title = None
@@ -610,27 +644,3 @@ class TaskHandler(BaseHTTPRequestHandler):
         condition.description.set(self.form_data.get_str('description'))
         condition.save(self.conn)
         return f'/condition?id={condition.id_}'
         condition.description.set(self.form_data.get_str('description'))
         condition.save(self.conn)
         return f'/condition?id={condition.id_}'
-
-    def _init_handling(self) -> None:
-        # pylint: disable=attribute-defined-outside-init
-        self.conn = DatabaseConnection(self.server.db)
-        parsed_url = urlparse(self.path)
-        self.site = path_split(parsed_url.path)[1]
-        params = parse_qs(parsed_url.query, strict_parsing=True)
-        self.params = InputsParser(params, False)
-
-    def _redirect(self, target: str) -> None:
-        self.send_response(302)
-        self.send_header('Location', target)
-        self.end_headers()
-
-    def _send_html(self, html: str, code: int = 200) -> None:
-        """Send HTML as proper HTTP response."""
-        self.send_response(code)
-        self.end_headers()
-        self.wfile.write(bytes(html, 'utf-8'))
-
-    def _send_msg(self, msg: Exception, code: int = 400) -> None:
-        """Send message in HTML formatting as HTTP response."""
-        html = self.server.jinja.get_template('msg.html').render(msg=msg)
-        self._send_html(html, code)