home · contact · privacy
Improve POST /day input validation.
[plomtask] / plomtask / http.py
index 4c0d6a38dddd2b43a0abd0052a2e7dab7386923d..27066036b2c300e23f41f7d348e5090905c9f2f8 100644 (file)
@@ -137,6 +137,20 @@ class InputsParser:
             msg = f'cannot int a form field value for key {key} in: {all_str}'
             raise BadFormatException(msg) from e
 
+    def get_all_floats_or_nones(self, key: str) -> list[float | None]:
+        """Retrieve list of float value at key, None if empty strings."""
+        ret: list[float | None] = []
+        for val in self.get_all_str(key):
+            if '' == val:
+                ret += [None]
+            else:
+                try:
+                    ret += [float(val)]
+                except ValueError as e:
+                    msg = f'cannot float form field value for key {key}: {val}'
+                    raise BadFormatException(msg) from e
+        return ret
+
 
 class TaskHandler(BaseHTTPRequestHandler):
     """Handles single HTTP request."""
@@ -221,6 +235,25 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     # GET handlers
 
+    @staticmethod
+    def _get_item(target_class: Any
+                  ) -> Callable[..., Callable[[TaskHandler],
+                                              dict[str, object]]]:
+        def decorator(f: Callable[..., dict[str, object]]
+                      ) -> Callable[[TaskHandler], dict[str, object]]:
+            def wrapper(self: TaskHandler) -> dict[str, object]:
+                # pylint: disable=protected-access
+                # (because pylint here fails to detect the use of wrapper as a
+                # method to self with respective access privileges)
+                id_ = self._params.get_int_or_none('id')
+                if target_class.can_create_by_id:
+                    item = target_class.by_id_or_create(self.conn, id_)
+                else:
+                    item = target_class.by_id(self.conn, id_)
+                return f(self, item)
+            return wrapper
+        return decorator
+
     def do_GET_(self) -> str:
         """Return redirect target on GET /."""
         return '/day'
@@ -279,7 +312,8 @@ class TaskHandler(BaseHTTPRequestHandler):
                 'conditions_present': conditions_present,
                 'processes': Process.all(self.conn)}
 
-    def do_GET_todo(self) -> dict[str, object]:
+    @_get_item(Todo)
+    def do_GET_todo(self, todo: Todo) -> dict[str, object]:
         """Show single Todo of ?id=."""
 
         @dataclass
@@ -330,8 +364,6 @@ class TaskHandler(BaseHTTPRequestHandler):
                 ids = ids | collect_adoptables_keys(node.children)
             return ids
 
-        id_ = self._params.get_int('id')
-        todo = Todo.by_id(self.conn, id_)
         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
         process_tree = todo.process.get_steps(self.conn, None)
         steps_todo_to_process: list[TodoStepsNode] = []
@@ -407,10 +439,9 @@ class TaskHandler(BaseHTTPRequestHandler):
                 'sort_by': sort_by,
                 'pattern': pattern}
 
-    def do_GET_condition(self) -> dict[str, object]:
+    @_get_item(Condition)
+    def do_GET_condition(self, c: Condition) -> dict[str, object]:
         """Show Condition of ?id=."""
-        id_ = self._params.get_int_or_none('id')
-        c = Condition.by_id_or_create(self.conn, id_)
         ps = Process.all(self.conn)
         return {'condition': c, 'is_new': c.id_ is None,
                 'enabled_processes': [p for p in ps if c in p.conditions],
@@ -418,22 +449,19 @@ class TaskHandler(BaseHTTPRequestHandler):
                 'enabling_processes': [p for p in ps if c in p.enables],
                 'disabling_processes': [p for p in ps if c in p.disables]}
 
-    def do_GET_condition_titles(self) -> dict[str, object]:
+    @_get_item(Condition)
+    def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
         """Show title history of Condition of ?id=."""
-        id_ = self._params.get_int('id')
-        condition = Condition.by_id(self.conn, id_)
-        return {'condition': condition}
+        return {'condition': c}
 
-    def do_GET_condition_descriptions(self) -> dict[str, object]:
+    @_get_item(Condition)
+    def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
         """Show description historys of Condition of ?id=."""
-        id_ = self._params.get_int('id')
-        condition = Condition.by_id(self.conn, id_)
-        return {'condition': condition}
+        return {'condition': c}
 
-    def do_GET_process(self) -> dict[str, object]:
+    @_get_item(Process)
+    def do_GET_process(self, process: Process) -> dict[str, object]:
         """Show Process of ?id=."""
-        id_ = self._params.get_int_or_none('id')
-        process = Process.by_id_or_create(self.conn, id_)
         title_64 = self._params.get_str('title_b64')
         if title_64:
             title = b64decode(title_64.encode()).decode()
@@ -451,23 +479,20 @@ class TaskHandler(BaseHTTPRequestHandler):
                 'process_candidates': Process.all(self.conn),
                 'condition_candidates': Condition.all(self.conn)}
 
-    def do_GET_process_titles(self) -> dict[str, object]:
+    @_get_item(Process)
+    def do_GET_process_titles(self, p: Process) -> dict[str, object]:
         """Show title history of Process of ?id=."""
-        id_ = self._params.get_int('id')
-        process = Process.by_id(self.conn, id_)
-        return {'process': process}
+        return {'process': p}
 
-    def do_GET_process_descriptions(self) -> dict[str, object]:
+    @_get_item(Process)
+    def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
         """Show description historys of Process of ?id=."""
-        id_ = self._params.get_int('id')
-        process = Process.by_id(self.conn, id_)
-        return {'process': process}
+        return {'process': p}
 
-    def do_GET_process_efforts(self) -> dict[str, object]:
+    @_get_item(Process)
+    def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
         """Show default effort history of Process of ?id=."""
-        id_ = self._params.get_int('id')
-        process = Process.by_id(self.conn, id_)
-        return {'process': process}
+        return {'process': p}
 
     def do_GET_processes(self) -> dict[str, object]:
         """Show all Processes."""
@@ -541,13 +566,14 @@ class TaskHandler(BaseHTTPRequestHandler):
         make_type = self._form_data.get_str('make_type')
         old_todos = self._form_data.get_all_int('todo_id')
         new_todos = self._form_data.get_all_int('new_todo')
-        is_done = [t_id in self._form_data.get_all_int('done')
-                   for t_id in old_todos]
         comments = self._form_data.get_all_str('comment')
-        efforts = [float(effort) if effort else None
-                   for effort in self._form_data.get_all_str('effort')]
-        if old_todos and 3*[len(old_todos)] != [len(is_done), len(comments),
-                                                len(efforts)]:
+        efforts = self._form_data.get_all_floats_or_nones('effort')
+        done_todos = self._form_data.get_all_int('done')
+        for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
+            raise BadFormatException('"done" field refers to unknown Todo')
+        is_done = [t_id in done_todos for t_id in old_todos]
+        if not (len(old_todos) == len(is_done) == len(comments)
+                == len(efforts)):
             msg = 'not equal number each of number of todo_id, comments, ' +\
                     'and efforts inputs'
             raise BadFormatException(msg)