+ _conn: DatabaseConnection
+ _site: str
+ _form: InputsParser
+ _params: InputsParser
+
+ def _send_page(
+ self, ctx: dict[str, Any], tmpl_name: str, code: int = 200
+ ) -> None:
+ """HTTP-send ctx as HTML or JSON, as defined by .server.render_mode.
+
+ The differentiation by .server.render_mode serves to allow easily
+ comparable JSON responses for automatic testing.
+ """
+ body: str
+ headers: list[tuple[str, str]] = []
+ if 'html' == self.server.render_mode:
+ tmpl = self.server.jinja.get_template(f'{tmpl_name}.html')
+ body = tmpl.render(ctx)
+ else:
+ body = self._ctx_to_json(ctx)
+ headers += [('Content-Type', 'application/json')]
+ self.send_response(code)
+ for header_tuple in headers:
+ self.send_header(*header_tuple)
+ self.end_headers()
+ self.wfile.write(bytes(body, 'utf-8'))
+
+ def _ctx_to_json(self, ctx: dict[str, object]) -> str:
+ """Render ctx into JSON string.
+
+ Flattens any objects that json.dumps might not want to serialize, and
+ turns occurrences of BaseModel objects into listings of their .id_, to
+ be resolved to a full dict inside a top-level '_library' dictionary,
+ to avoid endless and circular nesting.
+ """
+
+ def flatten(node: object) -> object:
+
+ def update_library_with(
+ item: BaseModel[int] | BaseModel[str]) -> None:
+ cls_name = item.__class__.__name__
+ if cls_name not in library:
+ library[cls_name] = {}
+ if item.id_ not in library[cls_name]:
+ d, refs = item.as_dict_and_refs
+ id_key = '?' if item.id_ is None else item.id_
+ library[cls_name][id_key] = d
+ for ref in refs:
+ update_library_with(ref)
+
+ if isinstance(node, BaseModel):
+ update_library_with(node)
+ return node.id_
+ if isinstance(node, DictableNode):
+ d, refs = node.as_dict_and_refs
+ for ref in refs:
+ update_library_with(ref)
+ return d
+ if isinstance(node, (list, tuple)):
+ return [flatten(item) for item in node]
+ if isinstance(node, dict):
+ d = {}
+ for k, v in node.items():
+ d[k] = flatten(v)
+ return d
+ if isinstance(node, HandledException):
+ return str(node)
+ return node
+
+ library: dict[str, dict[str | int, object]] = {}
+ for k, v in ctx.items():
+ ctx[k] = flatten(v)
+ ctx['_library'] = library
+ return json_dumps(ctx)
+
+ @staticmethod
+ def _request_wrapper(http_method: str, not_found_msg: str
+ ) -> Callable[..., Callable[[TaskHandler], None]]:
+ """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
+
+ Among other things, conditionally cleans all caches, but only on POST
+ requests, as only those are expected to change the states of objects
+ that may be cached, and certainly only those are expected to write any
+ changes to the database. We want to call them as early though as
+ possible here, either exactly after the specific request handler
+ returns successfully, or right after any exception is triggered –
+ otherwise, race conditions become plausible.
+
+ Note that otherwise any POST attempt, even a failed one, may end in
+ problematic inconsistencies:
+
+ - if the POST handler experiences an Exception, changes to objects
+ won't get written to the DB, but the changed objects may remain in
+ the cache and affect other objects despite their possibly illegal
+ state
+
+ - even if an object was just saved to the DB, we cannot be sure its
+ current state is completely identical to what we'd get if loading it
+ fresh from the DB (e.g. currently Process.n_owners is only updated
+ when loaded anew via .from_table_row, nor is its state written to
+ the DB by .save; a questionable design choice, but proof that we
+ have no guarantee that objects' .save stores all their states we'd
+ prefer at their most up-to-date.
+ """
+
+ def clear_caches() -> None:
+ for cls in (Day, Todo, Condition, Process, ProcessStep):
+ assert hasattr(cls, 'empty_cache')
+ cls.empty_cache()
+
+ def decorator(f: Callable[..., str | None]
+ ) -> Callable[[TaskHandler], None]:
+ def wrapper(self: TaskHandler) -> None:
+ # pylint: disable=protected-access
+ # (because pylint here fails to detect the use of wrapper as a
+ # method to self with respective access privileges)
+ try:
+ self._conn = DatabaseConnection(self.server.db)
+ parsed_url = urlparse(self.path)
+ self._site = path_split(parsed_url.path)[1]
+ params = parse_qs(parsed_url.query,
+ keep_blank_values=True,
+ strict_parsing=True)
+ self._params = InputsParser(params)
+ handler_name = f'do_{http_method}_{self._site}'
+ if hasattr(self, handler_name):
+ handler = getattr(self, handler_name)
+ redir_target = f(self, handler)
+ if 'POST' == http_method:
+ clear_caches()
+ if redir_target:
+ self.send_response(302)
+ self.send_header('Location', redir_target)
+ self.end_headers()
+ else:
+ msg = f'{not_found_msg}: {self._site}'
+ raise NotFoundException(msg)
+ except HandledException as error:
+ if 'POST' == http_method:
+ clear_caches()
+ ctx = {'msg': error}
+ self._send_page(ctx, 'msg', error.http_code)
+ finally:
+ self._conn.close()
+ return wrapper
+ return decorator
+
+ @_request_wrapper('GET', 'Unknown page')
+ def do_GET(self, handler: Callable[[], str | dict[str, object]]
+ ) -> str | None:
+ """Render page with result of handler, or redirect if result is str."""
+ tmpl_name = f'{self._site}'
+ ctx_or_redir_target = handler()
+ if isinstance(ctx_or_redir_target, str):
+ return ctx_or_redir_target
+ self._send_page(ctx_or_redir_target, tmpl_name)
+ return None
+
+ @_request_wrapper('POST', 'Unknown POST target')
+ def do_POST(self, handler: Callable[[], str]) -> str:
+ """Handle POST with handler, prepare redirection to result."""
+ length = int(self.headers['content-length'])
+ postvars = parse_qs(self.rfile.read(length).decode(),
+ keep_blank_values=True)
+ self._form = InputsParser(postvars)
+ redir_target = handler()
+ self._conn.commit()
+ return redir_target
+
+ # GET handlers
+
+ @staticmethod
+ def _get_item(target_class: Any
+ ) -> Callable[..., Callable[[TaskHandler],
+ dict[str, object]]]:
+ def decorator(f: Callable[..., dict[str, object]]
+ ) -> Callable[[TaskHandler], dict[str, object]]:
+ def wrapper(self: TaskHandler) -> dict[str, object]:
+ # pylint: disable=protected-access
+ # (because pylint here fails to detect the use of wrapper as a
+ # method to self with respective access privileges)
+ id_ = None
+ for val in self._params.get_all_int('id', fail_on_empty=True):
+ id_ = val
+ if target_class.can_create_by_id:
+ item = target_class.by_id_or_create(self._conn, id_)
+ else:
+ item = target_class.by_id(self._conn, id_)
+ if 'exists' in signature(f).parameters:
+ exists = id_ is not None and target_class._get_cached(id_)
+ return f(self, item, exists)
+ return f(self, item)
+ return wrapper
+ return decorator
+
+ def do_GET_(self) -> str:
+ """Return redirect target on GET /."""
+ return '/day'
+
+ def _do_GET_calendar(self) -> dict[str, object]:
+ """Show Days from ?start= to ?end=.
+
+ Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
+ same, the only difference being the HTML template they are rendered to,
+ which .do_GET selects from their method name.
+ """
+ start = self._params.get_str_or_fail('start', '')
+ end = self._params.get_str_or_fail('end', '')
+ end = end if end != '' else date_in_n_days(366)
+ #
+ days, start, end = Day.by_date_range_with_limits(self._conn,
+ (start, end), 'id')
+ days = Day.with_filled_gaps(days, start, end)
+ today = date_in_n_days(0)
+ return {'start': start, 'end': end, 'days': days, 'today': today}