home · contact · privacy
Ensure stringy BaseModel IDs are not empty.
[plomtask] / plomtask / http.py
index d61588a9f7627eb7e0ce1acff8fe211d7065ba1c..230ed3f26c5d53398a27d87558a28abeb28ceff5 100644 (file)
@@ -20,16 +20,6 @@ from plomtask.todos import Todo
 TEMPLATES_DIR = 'templates'
 
 
-@dataclass
-class TodoStepsNode:
-    """Collect what's useful for Todo steps tree display."""
-    id_: int
-    todo: Todo | None
-    process: Process | None
-    children: list[TodoStepsNode]
-    fillable: bool = False
-
-
 class TaskServer(HTTPServer):
     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
 
@@ -112,11 +102,11 @@ class TaskHandler(BaseHTTPRequestHandler):
     # pylint: disable=too-many-public-methods
     server: TaskServer
     conn: DatabaseConnection
-    site: str
-    form_data: InputsParser
-    params: InputsParser
+    _site: str
+    _form_data: InputsParser
+    _params: InputsParser
 
-    def send_html(self, html: str, code: int = 200) -> None:
+    def _send_html(self, html: str, code: int = 200) -> None:
         """Send HTML as proper HTTP response."""
         self.send_response(code)
         self.end_headers()
@@ -128,13 +118,16 @@ class TaskHandler(BaseHTTPRequestHandler):
         def decorator(f: Callable[..., str | None]
                       ) -> Callable[[TaskHandler], None]:
             def wrapper(self: TaskHandler) -> None:
+                # pylint: disable=protected-access
+                # (because pylint here fails to detect the use of wrapper as a
+                # method to self with respective access privileges)
                 try:
                     self.conn = DatabaseConnection(self.server.db)
                     parsed_url = urlparse(self.path)
-                    self.site = path_split(parsed_url.path)[1]
+                    self._site = path_split(parsed_url.path)[1]
                     params = parse_qs(parsed_url.query, strict_parsing=True)
-                    self.params = InputsParser(params, False)
-                    handler_name = f'do_{http_method}_{self.site}'
+                    self._params = InputsParser(params, False)
+                    handler_name = f'do_{http_method}_{self._site}'
                     if hasattr(self, handler_name):
                         handler = getattr(self, handler_name)
                         redir_target = f(self, handler)
@@ -143,12 +136,12 @@ class TaskHandler(BaseHTTPRequestHandler):
                             self.send_header('Location', redir_target)
                             self.end_headers()
                     else:
-                        msg = f'{not_found_msg}: {self.site}'
+                        msg = f'{not_found_msg}: {self._site}'
                         raise NotFoundException(msg)
                 except HandledException as error:
                     html = self.server.jinja.\
                             get_template('msg.html').render(msg=error)
-                    self.send_html(html, error.http_code)
+                    self._send_html(html, error.http_code)
                 finally:
                     self.conn.close()
             return wrapper
@@ -158,13 +151,13 @@ class TaskHandler(BaseHTTPRequestHandler):
     def do_GET(self, handler: Callable[[], str | dict[str, object]]
                ) -> str | None:
         """Render page with result of handler, or redirect if result is str."""
-        template = f'{self.site}.html'
+        template = f'{self._site}.html'
         ctx_or_redir = handler()
         if str == type(ctx_or_redir):
             return ctx_or_redir
         assert isinstance(ctx_or_redir, dict)
         html = self.server.jinja.get_template(template).render(**ctx_or_redir)
-        self.send_html(html)
+        self._send_html(html)
         return None
 
     @_request_wrapper('POST', 'Unknown POST target')
@@ -173,29 +166,34 @@ class TaskHandler(BaseHTTPRequestHandler):
         length = int(self.headers['content-length'])
         postvars = parse_qs(self.rfile.read(length).decode(),
                             keep_blank_values=True, strict_parsing=True)
-        self.form_data = InputsParser(postvars)
+        self._form_data = InputsParser(postvars)
         redir_target = handler()
         self.conn.commit()
         return redir_target
 
+    # GET handlers
+
+    def do_GET_(self) -> str:
+        """Return redirect target on GET /."""
+        return '/day'
+
     def _do_GET_calendar(self) -> dict[str, object]:
-        """Show Days from ?start= to ?end=."""
-        start = self.params.get_str('start')
-        end = self.params.get_str('end')
+        """Show Days from ?start= to ?end=.
+
+        Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
+        same, the only difference being the HTML template they are rendered to,
+        which .do_GET selects from their method name.
+        """
+        start = self._params.get_str('start')
+        end = self._params.get_str('end')
         if not end:
             end = date_in_n_days(366)
         ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
         days, start, end = ret
         days = Day.with_filled_gaps(days, start, end)
-        for day in days:
-            day.collect_calendarized_todos(self.conn)
         today = date_in_n_days(0)
         return {'start': start, 'end': end, 'days': days, 'today': today}
 
-    def do_GET_(self) -> str:
-        """Return redirect target on GET /."""
-        return '/day'
-
     def do_GET_calendar(self) -> dict[str, object]:
         """Show Days from ?start= to ?end= – normal view."""
         return self._do_GET_calendar()
@@ -206,16 +204,14 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     def do_GET_day(self) -> dict[str, object]:
         """Show single Day of ?date=."""
-        date = self.params.get_str('date', date_in_n_days(0))
-        make_type = self.params.get_str('make_type')
-        todays_todos = Todo.by_date(self.conn, date)
-        total_effort = 0.0
-        for todo in todays_todos:
-            total_effort += todo.performed_effort
+        date = self._params.get_str('date', date_in_n_days(0))
+        day = Day.by_id(self.conn, date, create=True,
+                        init_empty_todo_list=True)
+        make_type = self._params.get_str('make_type')
         conditions_present = []
         enablers_for = {}
         disablers_for = {}
-        for todo in todays_todos:
+        for todo in day.todos:
             for condition in todo.conditions + todo.blockers:
                 if condition not in conditions_present:
                     conditions_present += [condition]
@@ -227,9 +223,8 @@ class TaskHandler(BaseHTTPRequestHandler):
                                                     if condition in p.disables]
         seen_todos: set[int] = set()
         top_nodes = [t.get_step_tree(seen_todos)
-                     for t in todays_todos if not t.parents]
-        return {'day': Day.by_id(self.conn, date, create=True),
-                'total_effort': total_effort,
+                     for t in day.todos if not t.parents]
+        return {'day': day,
                 'top_nodes': top_nodes,
                 'make_type': make_type,
                 'enablers_for': enablers_for,
@@ -240,6 +235,15 @@ class TaskHandler(BaseHTTPRequestHandler):
     def do_GET_todo(self) -> dict[str, object]:
         """Show single Todo of ?id=."""
 
+        @dataclass
+        class TodoStepsNode:
+            """Collect what's useful for Todo steps tree display."""
+            id_: int
+            todo: Todo | None
+            process: Process | None
+            children: list[TodoStepsNode]  # pylint: disable=undefined-variable
+            fillable: bool = False
+
         def walk_process_steps(id_: int,
                                process_step_nodes: list[ProcessStepsNode],
                                steps_nodes: list[TodoStepsNode]) -> None:
@@ -279,7 +283,7 @@ class TaskHandler(BaseHTTPRequestHandler):
                 ids = ids | collect_adoptables_keys(node.children)
             return ids
 
-        id_ = self.params.get_int('id')
+        id_ = self._params.get_int('id')
         todo = Todo.by_id(self.conn, id_)
         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
         process_tree = todo.process.get_steps(self.conn, None)
@@ -305,11 +309,11 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     def do_GET_todos(self) -> dict[str, object]:
         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
-        sort_by = self.params.get_str('sort_by')
-        start = self.params.get_str('start')
-        end = self.params.get_str('end')
-        process_id = self.params.get_int_or_none('process_id')
-        comment_pattern = self.params.get_str('comment_pattern')
+        sort_by = self._params.get_str('sort_by')
+        start = self._params.get_str('start')
+        end = self._params.get_str('end')
+        process_id = self._params.get_int_or_none('process_id')
+        comment_pattern = self._params.get_str('comment_pattern')
         todos = []
         ret = Todo.by_date_range_with_limits(self.conn, (start, end))
         todos_by_date_range, start, end = ret
@@ -338,9 +342,9 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     def do_GET_conditions(self) -> dict[str, object]:
         """Show all Conditions."""
-        pattern = self.params.get_str('pattern')
+        pattern = self._params.get_str('pattern')
         conditions = Condition.matching(self.conn, pattern)
-        sort_by = self.params.get_str('sort_by')
+        sort_by = self._params.get_str('sort_by')
         if sort_by == 'is_active':
             conditions.sort(key=lambda c: c.is_active)
         elif sort_by == '-is_active':
@@ -355,7 +359,7 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     def do_GET_condition(self) -> dict[str, object]:
         """Show Condition of ?id=."""
-        id_ = self.params.get_int_or_none('id')
+        id_ = self._params.get_int_or_none('id')
         c = Condition.by_id(self.conn, id_, create=True)
         ps = Process.all(self.conn)
         return {'condition': c, 'is_new': c.id_ is None,
@@ -366,29 +370,29 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     def do_GET_condition_titles(self) -> dict[str, object]:
         """Show title history of Condition of ?id=."""
-        id_ = self.params.get_int_or_none('id')
+        id_ = self._params.get_int_or_none('id')
         condition = Condition.by_id(self.conn, id_)
         return {'condition': condition}
 
     def do_GET_condition_descriptions(self) -> dict[str, object]:
         """Show description historys of Condition of ?id=."""
-        id_ = self.params.get_int_or_none('id')
+        id_ = self._params.get_int_or_none('id')
         condition = Condition.by_id(self.conn, id_)
         return {'condition': condition}
 
     def do_GET_process(self) -> dict[str, object]:
         """Show Process of ?id=."""
-        id_ = self.params.get_int_or_none('id')
+        id_ = self._params.get_int_or_none('id')
         process = Process.by_id(self.conn, id_, create=True)
-        title_64 = self.params.get_str('title_b64')
+        title_64 = self._params.get_str('title_b64')
         if title_64:
             title = b64decode(title_64.encode()).decode()
             process.title.set(title)
         owners = process.used_as_step_by(self.conn)
-        for step_id in self.params.get_all_int('step_to'):
+        for step_id in self._params.get_all_int('step_to'):
             owners += [Process.by_id(self.conn, step_id)]
         preset_top_step = None
-        for process_id in self.params.get_all_int('has_step'):
+        for process_id in self._params.get_all_int('has_step'):
             preset_top_step = process_id
         return {'process': process, 'is_new': process.id_ is None,
                 'preset_top_step': preset_top_step,
@@ -399,27 +403,27 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     def do_GET_process_titles(self) -> dict[str, object]:
         """Show title history of Process of ?id=."""
-        id_ = self.params.get_int_or_none('id')
+        id_ = self._params.get_int_or_none('id')
         process = Process.by_id(self.conn, id_)
         return {'process': process}
 
     def do_GET_process_descriptions(self) -> dict[str, object]:
         """Show description historys of Process of ?id=."""
-        id_ = self.params.get_int_or_none('id')
+        id_ = self._params.get_int_or_none('id')
         process = Process.by_id(self.conn, id_)
         return {'process': process}
 
     def do_GET_process_efforts(self) -> dict[str, object]:
         """Show default effort history of Process of ?id=."""
-        id_ = self.params.get_int_or_none('id')
+        id_ = self._params.get_int_or_none('id')
         process = Process.by_id(self.conn, id_)
         return {'process': process}
 
     def do_GET_processes(self) -> dict[str, object]:
         """Show all Processes."""
-        pattern = self.params.get_str('pattern')
+        pattern = self._params.get_str('pattern')
         processes = Process.matching(self.conn, pattern)
-        sort_by = self.params.get_str('sort_by')
+        sort_by = self._params.get_str('sort_by')
         if sort_by == 'steps':
             processes.sort(key=lambda p: len(p.explicit_steps))
         elif sort_by == '-steps':
@@ -438,24 +442,39 @@ class TaskHandler(BaseHTTPRequestHandler):
             processes.sort(key=lambda p: p.title.newest)
         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
 
+    # POST handlers
+
+    def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
+        """Update history timestamps for VersionedAttribute."""
+        id_ = self._params.get_int_or_none('id')
+        item = cls.by_id(self.conn, id_)
+        attr = getattr(item, attr_name)
+        for k, v in self._form_data.get_first_strings_starting('at:').items():
+            old = k[3:]
+            if old[19:] != v:
+                attr.reset_timestamp(old, f'{v}.0')
+        attr.save(self.conn)
+        cls_name = cls.__name__.lower()
+        return f'/{cls_name}_{attr_name}s?id={item.id_}'
+
     def do_POST_day(self) -> str:
         """Update or insert Day of date and Todos mapped to it."""
-        date = self.params.get_str('date')
+        date = self._params.get_str('date')
         day = Day.by_id(self.conn, date, create=True)
-        day.comment = self.form_data.get_str('day_comment')
+        day.comment = self._form_data.get_str('day_comment')
         day.save(self.conn)
-        make_type = self.form_data.get_str('make_type')
-        for process_id in sorted(self.form_data.get_all_int('new_todo')):
+        make_type = self._form_data.get_str('make_type')
+        for process_id in sorted(self._form_data.get_all_int('new_todo')):
             if 'empty' == make_type:
                 process = Process.by_id(self.conn, process_id)
                 todo = Todo(None, process, False, date)
                 todo.save(self.conn)
             else:
                 Todo.create_with_children(self.conn, process_id, date)
-        done_ids = self.form_data.get_all_int('done')
-        comments = self.form_data.get_all_str('comment')
-        efforts = self.form_data.get_all_str('effort')
-        for i, todo_id in enumerate(self.form_data.get_all_int('todo_id')):
+        done_ids = self._form_data.get_all_int('done')
+        comments = self._form_data.get_all_str('comment')
+        efforts = self._form_data.get_all_str('effort')
+        for i, todo_id in enumerate(self._form_data.get_all_int('todo_id')):
             todo = Todo.by_id(self.conn, todo_id)
             todo.is_done = todo_id in done_ids
             if len(comments) > 0:
@@ -473,16 +492,16 @@ class TaskHandler(BaseHTTPRequestHandler):
         """Update Todo and its children."""
         # pylint: disable=too-many-locals
         # pylint: disable=too-many-branches
-        id_ = self.params.get_int('id')
-        for _ in self.form_data.get_all_str('delete'):
+        id_ = self._params.get_int('id')
+        for _ in self._form_data.get_all_str('delete'):
             todo = Todo .by_id(self.conn, id_)
             todo.remove(self.conn)
             return '/'
         todo = Todo.by_id(self.conn, id_)
-        adopted_child_ids = self.form_data.get_all_int('adopt')
-        processes_to_make_full = self.form_data.get_all_int('make_full')
-        processes_to_make_empty = self.form_data.get_all_int('make_empty')
-        fill_fors = self.form_data.get_first_strings_starting('fill_for_')
+        adopted_child_ids = self._form_data.get_all_int('adopt')
+        processes_to_make_full = self._form_data.get_all_int('make_full')
+        processes_to_make_empty = self._form_data.get_all_int('make_empty')
+        fill_fors = self._form_data.get_first_strings_starting('fill_for_')
         for v in fill_fors.values():
             if v.startswith('make_empty_'):
                 processes_to_make_empty += [int(v[11:])]
@@ -511,16 +530,16 @@ class TaskHandler(BaseHTTPRequestHandler):
         for process_id in processes_to_make_full:
             made = Todo.create_with_children(self.conn, process_id, todo.date)
             todo.add_child(made)
-        effort = self.form_data.get_str('effort', ignore_strict=True)
+        effort = self._form_data.get_str('effort', ignore_strict=True)
         todo.effort = float(effort) if effort else None
         todo.set_conditions(self.conn,
-                            self.form_data.get_all_int('condition'))
-        todo.set_blockers(self.conn, self.form_data.get_all_int('blocker'))
-        todo.set_enables(self.conn, self.form_data.get_all_int('enables'))
-        todo.set_disables(self.conn, self.form_data.get_all_int('disables'))
-        todo.is_done = len(self.form_data.get_all_str('done')) > 0
-        todo.calendarize = len(self.form_data.get_all_str('calendarize')) > 0
-        todo.comment = self.form_data.get_str('comment', ignore_strict=True)
+                            self._form_data.get_all_int('condition'))
+        todo.set_blockers(self.conn, self._form_data.get_all_int('blocker'))
+        todo.set_enables(self.conn, self._form_data.get_all_int('enables'))
+        todo.set_disables(self.conn, self._form_data.get_all_int('disables'))
+        todo.is_done = len(self._form_data.get_all_str('done')) > 0
+        todo.calendarize = len(self._form_data.get_all_str('calendarize')) > 0
+        todo.comment = self._form_data.get_str('comment', ignore_strict=True)
         todo.save(self.conn)
         for condition in todo.enables:
             condition.save(self.conn)
@@ -528,72 +547,59 @@ class TaskHandler(BaseHTTPRequestHandler):
             condition.save(self.conn)
         return f'/todo?id={todo.id_}'
 
-    def _do_POST_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
-        """Update history timestamps for VersionedAttribute."""
-        id_ = self.params.get_int_or_none('id')
-        item = cls.by_id(self.conn, id_)
-        attr = getattr(item, attr_name)
-        for k, v in self.form_data.get_first_strings_starting('at:').items():
-            old = k[3:]
-            if old[19:] != v:
-                attr.reset_timestamp(old, f'{v}.0')
-        attr.save(self.conn)
-        cls_name = cls.__name__.lower()
-        return f'/{cls_name}_{attr_name}s?id={item.id_}'
-
     def do_POST_process_descriptions(self) -> str:
         """Update history timestamps for Process.description."""
-        return self._do_POST_versioned_timestamps(Process, 'description')
+        return self._change_versioned_timestamps(Process, 'description')
 
     def do_POST_process_efforts(self) -> str:
         """Update history timestamps for Process.effort."""
-        return self._do_POST_versioned_timestamps(Process, 'effort')
+        return self._change_versioned_timestamps(Process, 'effort')
 
     def do_POST_process_titles(self) -> str:
         """Update history timestamps for Process.title."""
-        return self._do_POST_versioned_timestamps(Process, 'title')
+        return self._change_versioned_timestamps(Process, 'title')
 
     def do_POST_process(self) -> str:
         """Update or insert Process of ?id= and fields defined in postvars."""
         # pylint: disable=too-many-branches
-        id_ = self.params.get_int_or_none('id')
-        for _ in self.form_data.get_all_str('delete'):
+        id_ = self._params.get_int_or_none('id')
+        for _ in self._form_data.get_all_str('delete'):
             process = Process.by_id(self.conn, id_)
             process.remove(self.conn)
             return '/processes'
         process = Process.by_id(self.conn, id_, create=True)
-        process.title.set(self.form_data.get_str('title'))
-        process.description.set(self.form_data.get_str('description'))
-        process.effort.set(self.form_data.get_float('effort'))
+        process.title.set(self._form_data.get_str('title'))
+        process.description.set(self._form_data.get_str('description'))
+        process.effort.set(self._form_data.get_float('effort'))
         process.set_conditions(self.conn,
-                               self.form_data.get_all_int('condition'))
-        process.set_blockers(self.conn, self.form_data.get_all_int('blocker'))
-        process.set_enables(self.conn, self.form_data.get_all_int('enables'))
+                               self._form_data.get_all_int('condition'))
+        process.set_blockers(self.conn, self._form_data.get_all_int('blocker'))
+        process.set_enables(self.conn, self._form_data.get_all_int('enables'))
         process.set_disables(self.conn,
-                             self.form_data.get_all_int('disables'))
-        process.calendarize = self.form_data.get_all_str('calendarize') != []
+                             self._form_data.get_all_int('disables'))
+        process.calendarize = self._form_data.get_all_str('calendarize') != []
         process.save(self.conn)
         assert isinstance(process.id_, int)
         steps: list[ProcessStep] = []
-        for step_id in self.form_data.get_all_int('keep_step'):
-            if step_id not in self.form_data.get_all_int('steps'):
+        for step_id in self._form_data.get_all_int('keep_step'):
+            if step_id not in self._form_data.get_all_int('steps'):
                 raise BadFormatException('trying to keep unknown step')
-        for step_id in self.form_data.get_all_int('steps'):
-            if step_id not in self.form_data.get_all_int('keep_step'):
+        for step_id in self._form_data.get_all_int('steps'):
+            if step_id not in self._form_data.get_all_int('keep_step'):
                 continue
-            step_process_id = self.form_data.get_int(
+            step_process_id = self._form_data.get_int(
                     f'step_{step_id}_process_id')
-            parent_id = self.form_data.get_int_or_none(
+            parent_id = self._form_data.get_int_or_none(
                     f'step_{step_id}_parent_id')
             steps += [ProcessStep(step_id, process.id_, step_process_id,
                                   parent_id)]
-        for step_id in self.form_data.get_all_int('steps'):
-            for step_process_id in self.form_data.get_all_int(
+        for step_id in self._form_data.get_all_int('steps'):
+            for step_process_id in self._form_data.get_all_int(
                     f'new_step_to_{step_id}'):
                 steps += [ProcessStep(None, process.id_, step_process_id,
                                       step_id)]
         new_step_title = None
-        for step_identifier in self.form_data.get_all_str('new_top_step'):
+        for step_identifier in self._form_data.get_all_str('new_top_step'):
             try:
                 step_process_id = int(step_identifier)
                 steps += [ProcessStep(None, process.id_, step_process_id,
@@ -603,12 +609,12 @@ class TaskHandler(BaseHTTPRequestHandler):
         process.uncache()
         process.set_steps(self.conn, steps)
         process.set_step_suppressions(self.conn,
-                                      self.form_data.
+                                      self._form_data.
                                       get_all_int('suppresses'))
         process.save(self.conn)
         owners_to_set = []
         new_owner_title = None
-        for owner_identifier in self.form_data.get_all_str('step_of'):
+        for owner_identifier in self._form_data.get_all_str('step_of'):
             try:
                 owners_to_set += [int(owner_identifier)]
             except ValueError:
@@ -625,22 +631,22 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     def do_POST_condition_descriptions(self) -> str:
         """Update history timestamps for Condition.description."""
-        return self._do_POST_versioned_timestamps(Condition, 'description')
+        return self._change_versioned_timestamps(Condition, 'description')
 
     def do_POST_condition_titles(self) -> str:
         """Update history timestamps for Condition.title."""
-        return self._do_POST_versioned_timestamps(Condition, 'title')
+        return self._change_versioned_timestamps(Condition, 'title')
 
     def do_POST_condition(self) -> str:
         """Update/insert Condition of ?id= and fields defined in postvars."""
-        id_ = self.params.get_int_or_none('id')
-        for _ in self.form_data.get_all_str('delete'):
+        id_ = self._params.get_int_or_none('id')
+        for _ in self._form_data.get_all_str('delete'):
             condition = Condition.by_id(self.conn, id_)
             condition.remove(self.conn)
             return '/conditions'
         condition = Condition.by_id(self.conn, id_, create=True)
-        condition.is_active = self.form_data.get_all_str('is_active') != []
-        condition.title.set(self.form_data.get_str('title'))
-        condition.description.set(self.form_data.get_str('description'))
+        condition.is_active = self._form_data.get_all_str('is_active') != []
+        condition.title.set(self._form_data.get_str('title'))
+        condition.description.set(self._form_data.get_str('description'))
         condition.save(self.conn)
         return f'/condition?id={condition.id_}'