home · contact · privacy
Re-organize HTTP module code for better readability.
[plomtask] / plomtask / http.py
index 0aa692545d3e5ae38f4be7190a697df40259dcb0..ba79c5a5a929fb4d02975276a854e5b6de4c999b 100644 (file)
@@ -1,7 +1,7 @@
 """Web server stuff."""
 from __future__ import annotations
 from dataclasses import dataclass
 """Web server stuff."""
 from __future__ import annotations
 from dataclasses import dataclass
-from typing import Any
+from typing import Any, Callable
 from base64 import b64encode, b64decode
 from http.server import BaseHTTPRequestHandler
 from http.server import HTTPServer
 from base64 import b64encode, b64decode
 from http.server import BaseHTTPRequestHandler
 from http.server import HTTPServer
@@ -20,16 +20,6 @@ from plomtask.todos import Todo
 TEMPLATES_DIR = 'templates'
 
 
 TEMPLATES_DIR = 'templates'
 
 
-@dataclass
-class TodoStepsNode:
-    """Collect what's useful for Todo steps tree display."""
-    id_: int
-    todo: Todo | None
-    process: Process | None
-    children: list[TodoStepsNode]
-    fillable: bool = False
-
-
 class TaskServer(HTTPServer):
     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
 
 class TaskServer(HTTPServer):
     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
 
@@ -111,27 +101,86 @@ class TaskHandler(BaseHTTPRequestHandler):
     """Handles single HTTP request."""
     # pylint: disable=too-many-public-methods
     server: TaskServer
     """Handles single HTTP request."""
     # pylint: disable=too-many-public-methods
     server: TaskServer
+    conn: DatabaseConnection
+    site: str
+    form_data: InputsParser
+    params: InputsParser
 
 
-    def do_GET(self) -> None:
-        """Handle any GET request."""
-        try:
-            self._init_handling()
-            if hasattr(self, f'do_GET_{self.site}'):
-                template = f'{self.site}.html'
-                ctx = getattr(self, f'do_GET_{self.site}')()
-                html = self.server.jinja.get_template(template).render(**ctx)
-                self._send_html(html)
-            elif '' == self.site:
-                self._redirect('/day')
-            else:
-                raise NotFoundException(f'Unknown page: /{self.site}')
-        except HandledException as error:
-            self._send_msg(error, code=error.http_code)
-        finally:
-            self.conn.close()
+    def send_html(self, html: str, code: int = 200) -> None:
+        """Send HTML as proper HTTP response."""
+        self.send_response(code)
+        self.end_headers()
+        self.wfile.write(bytes(html, 'utf-8'))
+
+    @staticmethod
+    def _request_wrapper(http_method: str, not_found_msg: str
+                         ) -> Callable[..., Callable[[TaskHandler], None]]:
+        def decorator(f: Callable[..., str | None]
+                      ) -> Callable[[TaskHandler], None]:
+            def wrapper(self: TaskHandler) -> None:
+                try:
+                    self.conn = DatabaseConnection(self.server.db)
+                    parsed_url = urlparse(self.path)
+                    self.site = path_split(parsed_url.path)[1]
+                    params = parse_qs(parsed_url.query, strict_parsing=True)
+                    self.params = InputsParser(params, False)
+                    handler_name = f'do_{http_method}_{self.site}'
+                    if hasattr(self, handler_name):
+                        handler = getattr(self, handler_name)
+                        redir_target = f(self, handler)
+                        if redir_target:
+                            self.send_response(302)
+                            self.send_header('Location', redir_target)
+                            self.end_headers()
+                    else:
+                        msg = f'{not_found_msg}: {self.site}'
+                        raise NotFoundException(msg)
+                except HandledException as error:
+                    html = self.server.jinja.\
+                            get_template('msg.html').render(msg=error)
+                    self.send_html(html, error.http_code)
+                finally:
+                    self.conn.close()
+            return wrapper
+        return decorator
+
+    @_request_wrapper('GET', 'Unknown page')
+    def do_GET(self, handler: Callable[[], str | dict[str, object]]
+               ) -> str | None:
+        """Render page with result of handler, or redirect if result is str."""
+        template = f'{self.site}.html'
+        ctx_or_redir = handler()
+        if str == type(ctx_or_redir):
+            return ctx_or_redir
+        assert isinstance(ctx_or_redir, dict)
+        html = self.server.jinja.get_template(template).render(**ctx_or_redir)
+        self.send_html(html)
+        return None
+
+    @_request_wrapper('POST', 'Unknown POST target')
+    def do_POST(self, handler: Callable[[], str]) -> str:
+        """Handle POST with handler, prepare redirection to result."""
+        length = int(self.headers['content-length'])
+        postvars = parse_qs(self.rfile.read(length).decode(),
+                            keep_blank_values=True, strict_parsing=True)
+        self.form_data = InputsParser(postvars)
+        redir_target = handler()
+        self.conn.commit()
+        return redir_target
+
+    # GET handlers
+
+    def do_GET_(self) -> str:
+        """Return redirect target on GET /."""
+        return '/day'
 
     def _do_GET_calendar(self) -> dict[str, object]:
 
     def _do_GET_calendar(self) -> dict[str, object]:
-        """Show Days from ?start= to ?end=."""
+        """Show Days from ?start= to ?end=.
+
+        Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
+        same, the only difference being the HTML template they are rendered to,
+        which .do_GET selects from their method name.
+        """
         start = self.params.get_str('start')
         end = self.params.get_str('end')
         if not end:
         start = self.params.get_str('start')
         end = self.params.get_str('end')
         if not end:
@@ -155,6 +204,7 @@ class TaskHandler(BaseHTTPRequestHandler):
     def do_GET_day(self) -> dict[str, object]:
         """Show single Day of ?date=."""
         date = self.params.get_str('date', date_in_n_days(0))
     def do_GET_day(self) -> dict[str, object]:
         """Show single Day of ?date=."""
         date = self.params.get_str('date', date_in_n_days(0))
+        make_type = self.params.get_str('make_type')
         todays_todos = Todo.by_date(self.conn, date)
         total_effort = 0.0
         for todo in todays_todos:
         todays_todos = Todo.by_date(self.conn, date)
         total_effort = 0.0
         for todo in todays_todos:
@@ -178,6 +228,7 @@ class TaskHandler(BaseHTTPRequestHandler):
         return {'day': Day.by_id(self.conn, date, create=True),
                 'total_effort': total_effort,
                 'top_nodes': top_nodes,
         return {'day': Day.by_id(self.conn, date, create=True),
                 'total_effort': total_effort,
                 'top_nodes': top_nodes,
+                'make_type': make_type,
                 'enablers_for': enablers_for,
                 'disablers_for': disablers_for,
                 'conditions_present': conditions_present,
                 'enablers_for': enablers_for,
                 'disablers_for': disablers_for,
                 'conditions_present': conditions_present,
@@ -186,6 +237,15 @@ class TaskHandler(BaseHTTPRequestHandler):
     def do_GET_todo(self) -> dict[str, object]:
         """Show single Todo of ?id=."""
 
     def do_GET_todo(self) -> dict[str, object]:
         """Show single Todo of ?id=."""
 
+        @dataclass
+        class TodoStepsNode:
+            """Collect what's useful for Todo steps tree display."""
+            id_: int
+            todo: Todo | None
+            process: Process | None
+            children: list[TodoStepsNode]  # pylint: disable=undefined-variable
+            fillable: bool = False
+
         def walk_process_steps(id_: int,
                                process_step_nodes: list[ProcessStepsNode],
                                steps_nodes: list[TodoStepsNode]) -> None:
         def walk_process_steps(id_: int,
                                process_step_nodes: list[ProcessStepsNode],
                                steps_nodes: list[TodoStepsNode]) -> None:
@@ -384,26 +444,20 @@ class TaskHandler(BaseHTTPRequestHandler):
             processes.sort(key=lambda p: p.title.newest)
         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
 
             processes.sort(key=lambda p: p.title.newest)
         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
 
-    def do_POST(self) -> None:
-        """Handle any POST request."""
-        # pylint: disable=attribute-defined-outside-init
-        try:
-            self._init_handling()
-            length = int(self.headers['content-length'])
-            postvars = parse_qs(self.rfile.read(length).decode(),
-                                keep_blank_values=True, strict_parsing=True)
-            self.form_data = InputsParser(postvars)
-            if hasattr(self, f'do_POST_{self.site}'):
-                redir_target = getattr(self, f'do_POST_{self.site}')()
-                self.conn.commit()
-            else:
-                msg = f'Page not known as POST target: /{self.site}'
-                raise NotFoundException(msg)
-            self._redirect(redir_target)
-        except HandledException as error:
-            self._send_msg(error, code=error.http_code)
-        finally:
-            self.conn.close()
+    # POST handlers
+
+    def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
+        """Update history timestamps for VersionedAttribute."""
+        id_ = self.params.get_int_or_none('id')
+        item = cls.by_id(self.conn, id_)
+        attr = getattr(item, attr_name)
+        for k, v in self.form_data.get_first_strings_starting('at:').items():
+            old = k[3:]
+            if old[19:] != v:
+                attr.reset_timestamp(old, f'{v}.0')
+        attr.save(self.conn)
+        cls_name = cls.__name__.lower()
+        return f'/{cls_name}_{attr_name}s?id={item.id_}'
 
     def do_POST_day(self) -> str:
         """Update or insert Day of date and Todos mapped to it."""
 
     def do_POST_day(self) -> str:
         """Update or insert Day of date and Todos mapped to it."""
@@ -411,8 +465,14 @@ class TaskHandler(BaseHTTPRequestHandler):
         day = Day.by_id(self.conn, date, create=True)
         day.comment = self.form_data.get_str('day_comment')
         day.save(self.conn)
         day = Day.by_id(self.conn, date, create=True)
         day.comment = self.form_data.get_str('day_comment')
         day.save(self.conn)
+        make_type = self.form_data.get_str('make_type')
         for process_id in sorted(self.form_data.get_all_int('new_todo')):
         for process_id in sorted(self.form_data.get_all_int('new_todo')):
-            Todo.create_with_children(self.conn, process_id, date)
+            if 'empty' == make_type:
+                process = Process.by_id(self.conn, process_id)
+                todo = Todo(None, process, False, date)
+                todo.save(self.conn)
+            else:
+                Todo.create_with_children(self.conn, process_id, date)
         done_ids = self.form_data.get_all_int('done')
         comments = self.form_data.get_all_str('comment')
         efforts = self.form_data.get_all_str('effort')
         done_ids = self.form_data.get_all_int('done')
         comments = self.form_data.get_all_str('comment')
         efforts = self.form_data.get_all_str('effort')
@@ -428,10 +488,12 @@ class TaskHandler(BaseHTTPRequestHandler):
                 condition.save(self.conn)
             for condition in todo.disables:
                 condition.save(self.conn)
                 condition.save(self.conn)
             for condition in todo.disables:
                 condition.save(self.conn)
-        return f'/day?date={date}'
+        return f'/day?date={date}&make_type={make_type}'
 
     def do_POST_todo(self) -> str:
         """Update Todo and its children."""
 
     def do_POST_todo(self) -> str:
         """Update Todo and its children."""
+        # pylint: disable=too-many-locals
+        # pylint: disable=too-many-branches
         id_ = self.params.get_int('id')
         for _ in self.form_data.get_all_str('delete'):
             todo = Todo .by_id(self.conn, id_)
         id_ = self.params.get_int('id')
         for _ in self.form_data.get_all_str('delete'):
             todo = Todo .by_id(self.conn, id_)
@@ -439,11 +501,14 @@ class TaskHandler(BaseHTTPRequestHandler):
             return '/'
         todo = Todo.by_id(self.conn, id_)
         adopted_child_ids = self.form_data.get_all_int('adopt')
             return '/'
         todo = Todo.by_id(self.conn, id_)
         adopted_child_ids = self.form_data.get_all_int('adopt')
-        processes_to_make = self.form_data.get_all_int('make')
+        processes_to_make_full = self.form_data.get_all_int('make_full')
+        processes_to_make_empty = self.form_data.get_all_int('make_empty')
         fill_fors = self.form_data.get_first_strings_starting('fill_for_')
         for v in fill_fors.values():
         fill_fors = self.form_data.get_first_strings_starting('fill_for_')
         for v in fill_fors.values():
-            if v.startswith('make_'):
-                processes_to_make += [int(v[5:])]
+            if v.startswith('make_empty_'):
+                processes_to_make_empty += [int(v[11:])]
+            elif v.startswith('make_full_'):
+                processes_to_make_full += [int(v[10:])]
             elif v != 'ignore':
                 adopted_child_ids += [int(v)]
         to_remove = []
             elif v != 'ignore':
                 adopted_child_ids += [int(v)]
         to_remove = []
@@ -459,12 +524,18 @@ class TaskHandler(BaseHTTPRequestHandler):
                 continue
             child = Todo.by_id(self.conn, child_id)
             todo.add_child(child)
                 continue
             child = Todo.by_id(self.conn, child_id)
             todo.add_child(child)
-        for process_id in processes_to_make:
+        for process_id in processes_to_make_empty:
+            process = Process.by_id(self.conn, process_id)
+            made = Todo(None, process, False, todo.date)
+            made.save(self.conn)
+            todo.add_child(made)
+        for process_id in processes_to_make_full:
             made = Todo.create_with_children(self.conn, process_id, todo.date)
             todo.add_child(made)
         effort = self.form_data.get_str('effort', ignore_strict=True)
         todo.effort = float(effort) if effort else None
             made = Todo.create_with_children(self.conn, process_id, todo.date)
             todo.add_child(made)
         effort = self.form_data.get_str('effort', ignore_strict=True)
         todo.effort = float(effort) if effort else None
-        todo.set_conditions(self.conn, self.form_data.get_all_int('condition'))
+        todo.set_conditions(self.conn,
+                            self.form_data.get_all_int('condition'))
         todo.set_blockers(self.conn, self.form_data.get_all_int('blocker'))
         todo.set_enables(self.conn, self.form_data.get_all_int('enables'))
         todo.set_disables(self.conn, self.form_data.get_all_int('disables'))
         todo.set_blockers(self.conn, self.form_data.get_all_int('blocker'))
         todo.set_enables(self.conn, self.form_data.get_all_int('enables'))
         todo.set_disables(self.conn, self.form_data.get_all_int('disables'))
@@ -478,30 +549,17 @@ class TaskHandler(BaseHTTPRequestHandler):
             condition.save(self.conn)
         return f'/todo?id={todo.id_}'
 
             condition.save(self.conn)
         return f'/todo?id={todo.id_}'
 
-    def _do_POST_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
-        """Update history timestamps for VersionedAttribute."""
-        id_ = self.params.get_int_or_none('id')
-        item = cls.by_id(self.conn, id_)
-        attr = getattr(item, attr_name)
-        for k, v in self.form_data.get_first_strings_starting('at:').items():
-            old = k[3:]
-            if old[19:] != v:
-                attr.reset_timestamp(old, f'{v}.0')
-        attr.save(self.conn)
-        cls_name = cls.__name__.lower()
-        return f'/{cls_name}_{attr_name}s?id={item.id_}'
-
     def do_POST_process_descriptions(self) -> str:
         """Update history timestamps for Process.description."""
     def do_POST_process_descriptions(self) -> str:
         """Update history timestamps for Process.description."""
-        return self._do_POST_versioned_timestamps(Process, 'description')
+        return self._change_versioned_timestamps(Process, 'description')
 
     def do_POST_process_efforts(self) -> str:
         """Update history timestamps for Process.effort."""
 
     def do_POST_process_efforts(self) -> str:
         """Update history timestamps for Process.effort."""
-        return self._do_POST_versioned_timestamps(Process, 'effort')
+        return self._change_versioned_timestamps(Process, 'effort')
 
     def do_POST_process_titles(self) -> str:
         """Update history timestamps for Process.title."""
 
     def do_POST_process_titles(self) -> str:
         """Update history timestamps for Process.title."""
-        return self._do_POST_versioned_timestamps(Process, 'title')
+        return self._change_versioned_timestamps(Process, 'title')
 
     def do_POST_process(self) -> str:
         """Update or insert Process of ?id= and fields defined in postvars."""
 
     def do_POST_process(self) -> str:
         """Update or insert Process of ?id= and fields defined in postvars."""
@@ -519,7 +577,8 @@ class TaskHandler(BaseHTTPRequestHandler):
                                self.form_data.get_all_int('condition'))
         process.set_blockers(self.conn, self.form_data.get_all_int('blocker'))
         process.set_enables(self.conn, self.form_data.get_all_int('enables'))
                                self.form_data.get_all_int('condition'))
         process.set_blockers(self.conn, self.form_data.get_all_int('blocker'))
         process.set_enables(self.conn, self.form_data.get_all_int('enables'))
-        process.set_disables(self.conn, self.form_data.get_all_int('disables'))
+        process.set_disables(self.conn,
+                             self.form_data.get_all_int('disables'))
         process.calendarize = self.form_data.get_all_str('calendarize') != []
         process.save(self.conn)
         assert isinstance(process.id_, int)
         process.calendarize = self.form_data.get_all_str('calendarize') != []
         process.save(self.conn)
         assert isinstance(process.id_, int)
@@ -552,7 +611,8 @@ class TaskHandler(BaseHTTPRequestHandler):
         process.uncache()
         process.set_steps(self.conn, steps)
         process.set_step_suppressions(self.conn,
         process.uncache()
         process.set_steps(self.conn, steps)
         process.set_step_suppressions(self.conn,
-                                      self.form_data.get_all_int('suppresses'))
+                                      self.form_data.
+                                      get_all_int('suppresses'))
         process.save(self.conn)
         owners_to_set = []
         new_owner_title = None
         process.save(self.conn)
         owners_to_set = []
         new_owner_title = None
@@ -573,11 +633,11 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     def do_POST_condition_descriptions(self) -> str:
         """Update history timestamps for Condition.description."""
 
     def do_POST_condition_descriptions(self) -> str:
         """Update history timestamps for Condition.description."""
-        return self._do_POST_versioned_timestamps(Condition, 'description')
+        return self._change_versioned_timestamps(Condition, 'description')
 
     def do_POST_condition_titles(self) -> str:
         """Update history timestamps for Condition.title."""
 
     def do_POST_condition_titles(self) -> str:
         """Update history timestamps for Condition.title."""
-        return self._do_POST_versioned_timestamps(Condition, 'title')
+        return self._change_versioned_timestamps(Condition, 'title')
 
     def do_POST_condition(self) -> str:
         """Update/insert Condition of ?id= and fields defined in postvars."""
 
     def do_POST_condition(self) -> str:
         """Update/insert Condition of ?id= and fields defined in postvars."""
@@ -592,27 +652,3 @@ class TaskHandler(BaseHTTPRequestHandler):
         condition.description.set(self.form_data.get_str('description'))
         condition.save(self.conn)
         return f'/condition?id={condition.id_}'
         condition.description.set(self.form_data.get_str('description'))
         condition.save(self.conn)
         return f'/condition?id={condition.id_}'
-
-    def _init_handling(self) -> None:
-        # pylint: disable=attribute-defined-outside-init
-        self.conn = DatabaseConnection(self.server.db)
-        parsed_url = urlparse(self.path)
-        self.site = path_split(parsed_url.path)[1]
-        params = parse_qs(parsed_url.query, strict_parsing=True)
-        self.params = InputsParser(params, False)
-
-    def _redirect(self, target: str) -> None:
-        self.send_response(302)
-        self.send_header('Location', target)
-        self.end_headers()
-
-    def _send_html(self, html: str, code: int = 200) -> None:
-        """Send HTML as proper HTTP response."""
-        self.send_response(code)
-        self.end_headers()
-        self.wfile.write(bytes(html, 'utf-8'))
-
-    def _send_msg(self, msg: Exception, code: int = 400) -> None:
-        """Send message in HTML formatting as HTTP response."""
-        html = self.server.jinja.get_template('msg.html').render(msg=msg)
-        self._send_html(html, code)