home · contact · privacy
Refactor HTTP module.
authorChristian Heller <c.heller@plomlompom.de>
Wed, 12 Jun 2024 08:25:10 +0000 (10:25 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Wed, 12 Jun 2024 08:25:10 +0000 (10:25 +0200)
plomtask/http.py

index 583203e6eba7a69bbee893d502b13bd132eb69d9..72ad872befe0f663dc6435a5ee8584e66b31a29b 100644 (file)
@@ -1,7 +1,7 @@
 """Web server stuff."""
 from __future__ import annotations
 from dataclasses import dataclass
-from typing import Any
+from typing import Any, Callable
 from base64 import b64encode, b64decode
 from http.server import BaseHTTPRequestHandler
 from http.server import HTTPServer
@@ -112,32 +112,76 @@ class TaskHandler(BaseHTTPRequestHandler):
     # pylint: disable=too-many-public-methods
     server: TaskServer
     conn: DatabaseConnection
-    _site: str
-    _form_data: InputsParser
-    _params: InputsParser
+    site: str
+    form_data: InputsParser
+    params: InputsParser
 
-    def do_GET(self) -> None:
-        """Handle any GET request."""
-        try:
-            self._init_handling()
-            if hasattr(self, f'do_GET_{self._site}'):
-                template = f'{self._site}.html'
-                ctx = getattr(self, f'do_GET_{self._site}')()
-                html = self.server.jinja.get_template(template).render(**ctx)
-                self._send_html(html)
-            elif '' == self._site:
-                self._redirect('/day')
-            else:
-                raise NotFoundException(f'Unknown page: /{self._site}')
-        except HandledException as error:
-            self._send_msg(error, code=error.http_code)
-        finally:
-            self.conn.close()
+    def send_html(self, html: str, code: int = 200) -> None:
+        """Send HTML as proper HTTP response."""
+        self.send_response(code)
+        self.end_headers()
+        self.wfile.write(bytes(html, 'utf-8'))
+
+    @staticmethod
+    def _wrap(method: str, not_found_msg: str
+              ) -> Callable[..., Callable[..., None]]:
+        def decorator(http_method_handler: Callable[..., str]
+                      ) -> Callable[..., None]:
+            def wrapper(self: TaskHandler) -> None:
+                try:
+                    self.conn = DatabaseConnection(self.server.db)
+                    parsed_url = urlparse(self.path)
+                    self.site = path_split(parsed_url.path)[1]
+                    params = parse_qs(parsed_url.query, strict_parsing=True)
+                    self.params = InputsParser(params, False)
+                    handler_name = f'do_{method}_{self.site}'
+                    if hasattr(self, handler_name):
+                        handler = getattr(self, handler_name)
+                        redir_target = http_method_handler(self, handler)
+                        if redir_target:
+                            self.send_response(302)
+                            self.send_header('Location', redir_target)
+                            self.end_headers()
+                    else:
+                        msg = f'{not_found_msg}: {self.site}'
+                        raise NotFoundException(msg)
+                except HandledException as error:
+                    html = self.server.jinja.\
+                            get_template('msg.html').render(msg=error)
+                    self.send_html(html, error.http_code)
+                finally:
+                    self.conn.close()
+            return wrapper
+        return decorator
+
+    @_wrap('GET', 'Unknown page')
+    def do_GET(self, handler: Callable[..., str | dict[str, object]]
+               ) -> str | None:
+        """Render page with result of handler, or redirect if result is str."""
+        template = f'{self.site}.html'
+        ctx_or_redir = handler()
+        if str == type(ctx_or_redir):
+            return ctx_or_redir
+        assert isinstance(ctx_or_redir, dict)
+        html = self.server.jinja.get_template(template).render(**ctx_or_redir)
+        self.send_html(html)
+        return None
+
+    @_wrap('POST', 'Unknown POST target')
+    def do_POST(self, handler: Callable[..., str]) -> str:
+        """Handle POST with handler, prepare redirection to result."""
+        length = int(self.headers['content-length'])
+        postvars = parse_qs(self.rfile.read(length).decode(),
+                            keep_blank_values=True, strict_parsing=True)
+        self.form_data = InputsParser(postvars)
+        redir_target = handler()
+        self.conn.commit()
+        return redir_target
 
     def _do_GET_calendar(self) -> dict[str, object]:
         """Show Days from ?start= to ?end=."""
-        start = self._params.get_str('start')
-        end = self._params.get_str('end')
+        start = self.params.get_str('start')
+        end = self.params.get_str('end')
         if not end:
             end = date_in_n_days(366)
         ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
@@ -148,6 +192,10 @@ class TaskHandler(BaseHTTPRequestHandler):
         today = date_in_n_days(0)
         return {'start': start, 'end': end, 'days': days, 'today': today}
 
+    def do_GET_(self) -> str:
+        """Return redirect target on GET /."""
+        return '/day'
+
     def do_GET_calendar(self) -> dict[str, object]:
         """Show Days from ?start= to ?end= – normal view."""
         return self._do_GET_calendar()
@@ -158,8 +206,8 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     def do_GET_day(self) -> dict[str, object]:
         """Show single Day of ?date=."""
-        date = self._params.get_str('date', date_in_n_days(0))
-        make_type = self._params.get_str('make_type')
+        date = self.params.get_str('date', date_in_n_days(0))
+        make_type = self.params.get_str('make_type')
         todays_todos = Todo.by_date(self.conn, date)
         total_effort = 0.0
         for todo in todays_todos:
@@ -231,7 +279,7 @@ class TaskHandler(BaseHTTPRequestHandler):
                 ids = ids | collect_adoptables_keys(node.children)
             return ids
 
-        id_ = self._params.get_int('id')
+        id_ = self.params.get_int('id')
         todo = Todo.by_id(self.conn, id_)
         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
         process_tree = todo.process.get_steps(self.conn, None)
@@ -257,11 +305,11 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     def do_GET_todos(self) -> dict[str, object]:
         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
-        sort_by = self._params.get_str('sort_by')
-        start = self._params.get_str('start')
-        end = self._params.get_str('end')
-        process_id = self._params.get_int_or_none('process_id')
-        comment_pattern = self._params.get_str('comment_pattern')
+        sort_by = self.params.get_str('sort_by')
+        start = self.params.get_str('start')
+        end = self.params.get_str('end')
+        process_id = self.params.get_int_or_none('process_id')
+        comment_pattern = self.params.get_str('comment_pattern')
         todos = []
         ret = Todo.by_date_range_with_limits(self.conn, (start, end))
         todos_by_date_range, start, end = ret
@@ -290,9 +338,9 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     def do_GET_conditions(self) -> dict[str, object]:
         """Show all Conditions."""
-        pattern = self._params.get_str('pattern')
+        pattern = self.params.get_str('pattern')
         conditions = Condition.matching(self.conn, pattern)
-        sort_by = self._params.get_str('sort_by')
+        sort_by = self.params.get_str('sort_by')
         if sort_by == 'is_active':
             conditions.sort(key=lambda c: c.is_active)
         elif sort_by == '-is_active':
@@ -307,7 +355,7 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     def do_GET_condition(self) -> dict[str, object]:
         """Show Condition of ?id=."""
-        id_ = self._params.get_int_or_none('id')
+        id_ = self.params.get_int_or_none('id')
         c = Condition.by_id(self.conn, id_, create=True)
         ps = Process.all(self.conn)
         return {'condition': c, 'is_new': c.id_ is None,
@@ -318,29 +366,29 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     def do_GET_condition_titles(self) -> dict[str, object]:
         """Show title history of Condition of ?id=."""
-        id_ = self._params.get_int_or_none('id')
+        id_ = self.params.get_int_or_none('id')
         condition = Condition.by_id(self.conn, id_)
         return {'condition': condition}
 
     def do_GET_condition_descriptions(self) -> dict[str, object]:
         """Show description historys of Condition of ?id=."""
-        id_ = self._params.get_int_or_none('id')
+        id_ = self.params.get_int_or_none('id')
         condition = Condition.by_id(self.conn, id_)
         return {'condition': condition}
 
     def do_GET_process(self) -> dict[str, object]:
         """Show Process of ?id=."""
-        id_ = self._params.get_int_or_none('id')
+        id_ = self.params.get_int_or_none('id')
         process = Process.by_id(self.conn, id_, create=True)
-        title_64 = self._params.get_str('title_b64')
+        title_64 = self.params.get_str('title_b64')
         if title_64:
             title = b64decode(title_64.encode()).decode()
             process.title.set(title)
         owners = process.used_as_step_by(self.conn)
-        for step_id in self._params.get_all_int('step_to'):
+        for step_id in self.params.get_all_int('step_to'):
             owners += [Process.by_id(self.conn, step_id)]
         preset_top_step = None
-        for process_id in self._params.get_all_int('has_step'):
+        for process_id in self.params.get_all_int('has_step'):
             preset_top_step = process_id
         return {'process': process, 'is_new': process.id_ is None,
                 'preset_top_step': preset_top_step,
@@ -351,27 +399,27 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     def do_GET_process_titles(self) -> dict[str, object]:
         """Show title history of Process of ?id=."""
-        id_ = self._params.get_int_or_none('id')
+        id_ = self.params.get_int_or_none('id')
         process = Process.by_id(self.conn, id_)
         return {'process': process}
 
     def do_GET_process_descriptions(self) -> dict[str, object]:
         """Show description historys of Process of ?id=."""
-        id_ = self._params.get_int_or_none('id')
+        id_ = self.params.get_int_or_none('id')
         process = Process.by_id(self.conn, id_)
         return {'process': process}
 
     def do_GET_process_efforts(self) -> dict[str, object]:
         """Show default effort history of Process of ?id=."""
-        id_ = self._params.get_int_or_none('id')
+        id_ = self.params.get_int_or_none('id')
         process = Process.by_id(self.conn, id_)
         return {'process': process}
 
     def do_GET_processes(self) -> dict[str, object]:
         """Show all Processes."""
-        pattern = self._params.get_str('pattern')
+        pattern = self.params.get_str('pattern')
         processes = Process.matching(self.conn, pattern)
-        sort_by = self._params.get_str('sort_by')
+        sort_by = self.params.get_str('sort_by')
         if sort_by == 'steps':
             processes.sort(key=lambda p: len(p.explicit_steps))
         elif sort_by == '-steps':
@@ -390,44 +438,24 @@ class TaskHandler(BaseHTTPRequestHandler):
             processes.sort(key=lambda p: p.title.newest)
         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
 
-    def do_POST(self) -> None:
-        """Handle any POST request."""
-        try:
-            self._init_handling()
-            length = int(self.headers['content-length'])
-            postvars = parse_qs(self.rfile.read(length).decode(),
-                                keep_blank_values=True, strict_parsing=True)
-            self._form_data = InputsParser(postvars)
-            if hasattr(self, f'do_POST_{self._site}'):
-                redir_target = getattr(self, f'do_POST_{self._site}')()
-                self.conn.commit()
-            else:
-                msg = f'Page not known as POST target: /{self._site}'
-                raise NotFoundException(msg)
-            self._redirect(redir_target)
-        except HandledException as error:
-            self._send_msg(error, code=error.http_code)
-        finally:
-            self.conn.close()
-
     def do_POST_day(self) -> str:
         """Update or insert Day of date and Todos mapped to it."""
-        date = self._params.get_str('date')
+        date = self.params.get_str('date')
         day = Day.by_id(self.conn, date, create=True)
-        day.comment = self._form_data.get_str('day_comment')
+        day.comment = self.form_data.get_str('day_comment')
         day.save(self.conn)
-        make_type = self._form_data.get_str('make_type')
-        for process_id in sorted(self._form_data.get_all_int('new_todo')):
+        make_type = self.form_data.get_str('make_type')
+        for process_id in sorted(self.form_data.get_all_int('new_todo')):
             if 'empty' == make_type:
                 process = Process.by_id(self.conn, process_id)
                 todo = Todo(None, process, False, date)
                 todo.save(self.conn)
             else:
                 Todo.create_with_children(self.conn, process_id, date)
-        done_ids = self._form_data.get_all_int('done')
-        comments = self._form_data.get_all_str('comment')
-        efforts = self._form_data.get_all_str('effort')
-        for i, todo_id in enumerate(self._form_data.get_all_int('todo_id')):
+        done_ids = self.form_data.get_all_int('done')
+        comments = self.form_data.get_all_str('comment')
+        efforts = self.form_data.get_all_str('effort')
+        for i, todo_id in enumerate(self.form_data.get_all_int('todo_id')):
             todo = Todo.by_id(self.conn, todo_id)
             todo.is_done = todo_id in done_ids
             if len(comments) > 0:
@@ -445,16 +473,16 @@ class TaskHandler(BaseHTTPRequestHandler):
         """Update Todo and its children."""
         # pylint: disable=too-many-locals
         # pylint: disable=too-many-branches
-        id_ = self._params.get_int('id')
-        for _ in self._form_data.get_all_str('delete'):
+        id_ = self.params.get_int('id')
+        for _ in self.form_data.get_all_str('delete'):
             todo = Todo .by_id(self.conn, id_)
             todo.remove(self.conn)
             return '/'
         todo = Todo.by_id(self.conn, id_)
-        adopted_child_ids = self._form_data.get_all_int('adopt')
-        processes_to_make_full = self._form_data.get_all_int('make_full')
-        processes_to_make_empty = self._form_data.get_all_int('make_empty')
-        fill_fors = self._form_data.get_first_strings_starting('fill_for_')
+        adopted_child_ids = self.form_data.get_all_int('adopt')
+        processes_to_make_full = self.form_data.get_all_int('make_full')
+        processes_to_make_empty = self.form_data.get_all_int('make_empty')
+        fill_fors = self.form_data.get_first_strings_starting('fill_for_')
         for v in fill_fors.values():
             if v.startswith('make_empty_'):
                 processes_to_make_empty += [int(v[11:])]
@@ -483,16 +511,16 @@ class TaskHandler(BaseHTTPRequestHandler):
         for process_id in processes_to_make_full:
             made = Todo.create_with_children(self.conn, process_id, todo.date)
             todo.add_child(made)
-        effort = self._form_data.get_str('effort', ignore_strict=True)
+        effort = self.form_data.get_str('effort', ignore_strict=True)
         todo.effort = float(effort) if effort else None
         todo.set_conditions(self.conn,
-                            self._form_data.get_all_int('condition'))
-        todo.set_blockers(self.conn, self._form_data.get_all_int('blocker'))
-        todo.set_enables(self.conn, self._form_data.get_all_int('enables'))
-        todo.set_disables(self.conn, self._form_data.get_all_int('disables'))
-        todo.is_done = len(self._form_data.get_all_str('done')) > 0
-        todo.calendarize = len(self._form_data.get_all_str('calendarize')) > 0
-        todo.comment = self._form_data.get_str('comment', ignore_strict=True)
+                            self.form_data.get_all_int('condition'))
+        todo.set_blockers(self.conn, self.form_data.get_all_int('blocker'))
+        todo.set_enables(self.conn, self.form_data.get_all_int('enables'))
+        todo.set_disables(self.conn, self.form_data.get_all_int('disables'))
+        todo.is_done = len(self.form_data.get_all_str('done')) > 0
+        todo.calendarize = len(self.form_data.get_all_str('calendarize')) > 0
+        todo.comment = self.form_data.get_str('comment', ignore_strict=True)
         todo.save(self.conn)
         for condition in todo.enables:
             condition.save(self.conn)
@@ -502,10 +530,10 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     def _do_POST_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
         """Update history timestamps for VersionedAttribute."""
-        id_ = self._params.get_int_or_none('id')
+        id_ = self.params.get_int_or_none('id')
         item = cls.by_id(self.conn, id_)
         attr = getattr(item, attr_name)
-        for k, v in self._form_data.get_first_strings_starting('at:').items():
+        for k, v in self.form_data.get_first_strings_starting('at:').items():
             old = k[3:]
             if old[19:] != v:
                 attr.reset_timestamp(old, f'{v}.0')
@@ -528,44 +556,44 @@ class TaskHandler(BaseHTTPRequestHandler):
     def do_POST_process(self) -> str:
         """Update or insert Process of ?id= and fields defined in postvars."""
         # pylint: disable=too-many-branches
-        id_ = self._params.get_int_or_none('id')
-        for _ in self._form_data.get_all_str('delete'):
+        id_ = self.params.get_int_or_none('id')
+        for _ in self.form_data.get_all_str('delete'):
             process = Process.by_id(self.conn, id_)
             process.remove(self.conn)
             return '/processes'
         process = Process.by_id(self.conn, id_, create=True)
-        process.title.set(self._form_data.get_str('title'))
-        process.description.set(self._form_data.get_str('description'))
-        process.effort.set(self._form_data.get_float('effort'))
+        process.title.set(self.form_data.get_str('title'))
+        process.description.set(self.form_data.get_str('description'))
+        process.effort.set(self.form_data.get_float('effort'))
         process.set_conditions(self.conn,
-                               self._form_data.get_all_int('condition'))
-        process.set_blockers(self.conn, self._form_data.get_all_int('blocker'))
-        process.set_enables(self.conn, self._form_data.get_all_int('enables'))
+                               self.form_data.get_all_int('condition'))
+        process.set_blockers(self.conn, self.form_data.get_all_int('blocker'))
+        process.set_enables(self.conn, self.form_data.get_all_int('enables'))
         process.set_disables(self.conn,
-                             self._form_data.get_all_int('disables'))
-        process.calendarize = self._form_data.get_all_str('calendarize') != []
+                             self.form_data.get_all_int('disables'))
+        process.calendarize = self.form_data.get_all_str('calendarize') != []
         process.save(self.conn)
         assert isinstance(process.id_, int)
         steps: list[ProcessStep] = []
-        for step_id in self._form_data.get_all_int('keep_step'):
-            if step_id not in self._form_data.get_all_int('steps'):
+        for step_id in self.form_data.get_all_int('keep_step'):
+            if step_id not in self.form_data.get_all_int('steps'):
                 raise BadFormatException('trying to keep unknown step')
-        for step_id in self._form_data.get_all_int('steps'):
-            if step_id not in self._form_data.get_all_int('keep_step'):
+        for step_id in self.form_data.get_all_int('steps'):
+            if step_id not in self.form_data.get_all_int('keep_step'):
                 continue
-            step_process_id = self._form_data.get_int(
+            step_process_id = self.form_data.get_int(
                     f'step_{step_id}_process_id')
-            parent_id = self._form_data.get_int_or_none(
+            parent_id = self.form_data.get_int_or_none(
                     f'step_{step_id}_parent_id')
             steps += [ProcessStep(step_id, process.id_, step_process_id,
                                   parent_id)]
-        for step_id in self._form_data.get_all_int('steps'):
-            for step_process_id in self._form_data.get_all_int(
+        for step_id in self.form_data.get_all_int('steps'):
+            for step_process_id in self.form_data.get_all_int(
                     f'new_step_to_{step_id}'):
                 steps += [ProcessStep(None, process.id_, step_process_id,
                                       step_id)]
         new_step_title = None
-        for step_identifier in self._form_data.get_all_str('new_top_step'):
+        for step_identifier in self.form_data.get_all_str('new_top_step'):
             try:
                 step_process_id = int(step_identifier)
                 steps += [ProcessStep(None, process.id_, step_process_id,
@@ -575,12 +603,12 @@ class TaskHandler(BaseHTTPRequestHandler):
         process.uncache()
         process.set_steps(self.conn, steps)
         process.set_step_suppressions(self.conn,
-                                      self._form_data.
+                                      self.form_data.
                                       get_all_int('suppresses'))
         process.save(self.conn)
         owners_to_set = []
         new_owner_title = None
-        for owner_identifier in self._form_data.get_all_str('step_of'):
+        for owner_identifier in self.form_data.get_all_str('step_of'):
             try:
                 owners_to_set += [int(owner_identifier)]
             except ValueError:
@@ -605,39 +633,14 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     def do_POST_condition(self) -> str:
         """Update/insert Condition of ?id= and fields defined in postvars."""
-        id_ = self._params.get_int_or_none('id')
-        for _ in self._form_data.get_all_str('delete'):
+        id_ = self.params.get_int_or_none('id')
+        for _ in self.form_data.get_all_str('delete'):
             condition = Condition.by_id(self.conn, id_)
             condition.remove(self.conn)
             return '/conditions'
         condition = Condition.by_id(self.conn, id_, create=True)
-        condition.is_active = self._form_data.get_all_str('is_active') != []
-        condition.title.set(self._form_data.get_str('title'))
-        condition.description.set(self._form_data.get_str('description'))
+        condition.is_active = self.form_data.get_all_str('is_active') != []
+        condition.title.set(self.form_data.get_str('title'))
+        condition.description.set(self.form_data.get_str('description'))
         condition.save(self.conn)
         return f'/condition?id={condition.id_}'
-
-    def _init_handling(self) -> None:
-        """Our own __init__, as we're not supposed to use the original."""
-        self.conn = DatabaseConnection(self.server.db)
-        parsed_url = urlparse(self.path)
-        self._site = path_split(parsed_url.path)[1]
-        params = parse_qs(parsed_url.query, strict_parsing=True)
-        self._params = InputsParser(params, False)
-
-    def _redirect(self, target: str) -> None:
-        """Redirect to target."""
-        self.send_response(302)
-        self.send_header('Location', target)
-        self.end_headers()
-
-    def _send_html(self, html: str, code: int = 200) -> None:
-        """Send HTML as proper HTTP response."""
-        self.send_response(code)
-        self.end_headers()
-        self.wfile.write(bytes(html, 'utf-8'))
-
-    def _send_msg(self, msg: Exception, code: int = 400) -> None:
-        """Send message in HTML formatting as HTTP response."""
-        html = self.server.jinja.get_template('msg.html').render(msg=msg)
-        self._send_html(html, code)