from http.server import BaseHTTPRequestHandler
from http.server import HTTPServer
from urllib.parse import urlparse, parse_qs
+from json import dumps as json_dumps
from os.path import split as path_split
from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
from plomtask.dating import date_in_n_days
*args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.db = db_file
- self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
+ self.headers: list[tuple[str, str]] = []
+ self._render_mode = 'html'
+ self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
+
+ def set_json_mode(self) -> None:
+ """Make server send JSON instead of HTML responses."""
+ self._render_mode = 'json'
+ self.headers += [('Content-Type', 'application/json')]
+
+ @staticmethod
+ def ctx_to_json(ctx: dict[str, object]) -> str:
+ """Render ctx into JSON string."""
+ def walk_ctx(node: object) -> Any:
+ if hasattr(node, 'as_dict_into_reference'):
+ if hasattr(node, 'id_') and node.id_ is not None:
+ return node.as_dict_into_reference(library)
+ if hasattr(node, 'as_dict'):
+ return node.as_dict
+ if isinstance(node, (list, tuple)):
+ return [walk_ctx(x) for x in node]
+ if isinstance(node, HandledException):
+ return str(node)
+ return node
+ library: dict[str, dict[str | int, object]] = {}
+ for k, v in ctx.items():
+ ctx[k] = walk_ctx(v)
+ ctx['_library'] = library
+ return json_dumps(ctx)
+
+ def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
+ """Render ctx according to self._render_mode.."""
+ tmpl_name = f'{tmpl_name}.{self._render_mode}'
+ if 'html' == self._render_mode:
+ template = self._jinja.get_template(tmpl_name)
+ return template.render(ctx)
+ return self.__class__.ctx_to_json(ctx)
class InputsParser:
_form_data: InputsParser
_params: InputsParser
- def _send_html(self, html: str, code: int = 200) -> None:
- """Send HTML as proper HTTP response."""
+ def _send_page(self,
+ ctx: dict[str, Any],
+ tmpl_name: str,
+ code: int = 200
+ ) -> None:
+ """Send ctx as proper HTTP response."""
+ body = self.server.render(ctx, tmpl_name)
self.send_response(code)
+ for header_tuple in self.server.headers:
+ self.send_header(*header_tuple)
self.end_headers()
- self.wfile.write(bytes(html, 'utf-8'))
+ self.wfile.write(bytes(body, 'utf-8'))
@staticmethod
def _request_wrapper(http_method: str, not_found_msg: str
for cls in (Day, Todo, Condition, Process, ProcessStep):
assert hasattr(cls, 'empty_cache')
cls.empty_cache()
- tmpl = self.server.jinja.get_template('msg.html')
- html = tmpl.render(msg=error)
- self._send_html(html, error.http_code)
+ ctx = {'msg': error}
+ self._send_page(ctx, 'msg', error.http_code)
finally:
self.conn.close()
return wrapper
def do_GET(self, handler: Callable[[], str | dict[str, object]]
) -> str | None:
"""Render page with result of handler, or redirect if result is str."""
- template = f'{self._site}.html'
- ctx_or_redir = handler()
- if str == type(ctx_or_redir):
- return ctx_or_redir
- assert isinstance(ctx_or_redir, dict)
- html = self.server.jinja.get_template(template).render(**ctx_or_redir)
- self._send_html(html)
+ tmpl_name = f'{self._site}'
+ ctx_or_redir_target = handler()
+ if isinstance(ctx_or_redir_target, str):
+ return ctx_or_redir_target
+ self._send_page(ctx_or_redir_target, tmpl_name)
return None
@_request_wrapper('POST', 'Unknown POST target')
def do_GET_day(self) -> dict[str, object]:
"""Show single Day of ?date=."""
date = self._params.get_str('date', date_in_n_days(0))
- day = Day.by_id(self.conn, date, create=True)
+ day = Day.by_id_or_create(self.conn, date)
make_type = self._params.get_str('make_type')
conditions_present = []
enablers_for = {}
adoptables: dict[int, list[Todo]] = {}
any_adoptables = [Todo.by_id(self.conn, t.id_)
for t in Todo.by_date(self.conn, todo.date)
- if t != todo]
+ if t.id_ is not None
+ and t != todo]
for id_ in collect_adoptables_keys(steps_todo_to_process):
adoptables[id_] = [t for t in any_adoptables
if t.process.id_ == id_]
todos.sort(key=lambda t: t.date, reverse=True)
else:
todos.sort(key=lambda t: t.date)
+ sort_by = 'title'
return {'start': start, 'end': end, 'process_id': process_id,
'comment_pattern': comment_pattern, 'todos': todos,
'all_processes': Process.all(self.conn), 'sort_by': sort_by}
conditions.sort(key=lambda c: c.title.newest, reverse=True)
else:
conditions.sort(key=lambda c: c.title.newest)
+ sort_by = 'title'
return {'conditions': conditions,
'sort_by': sort_by,
'pattern': pattern}
def do_GET_condition(self) -> dict[str, object]:
"""Show Condition of ?id=."""
id_ = self._params.get_int_or_none('id')
- c = Condition.by_id(self.conn, id_, create=True)
+ c = Condition.by_id_or_create(self.conn, id_)
ps = Process.all(self.conn)
return {'condition': c, 'is_new': c.id_ is None,
'enabled_processes': [p for p in ps if c in p.conditions],
def do_GET_condition_titles(self) -> dict[str, object]:
"""Show title history of Condition of ?id=."""
- id_ = self._params.get_int_or_none('id')
+ id_ = self._params.get_int('id')
condition = Condition.by_id(self.conn, id_)
return {'condition': condition}
def do_GET_condition_descriptions(self) -> dict[str, object]:
"""Show description historys of Condition of ?id=."""
- id_ = self._params.get_int_or_none('id')
+ id_ = self._params.get_int('id')
condition = Condition.by_id(self.conn, id_)
return {'condition': condition}
def do_GET_process(self) -> dict[str, object]:
"""Show Process of ?id=."""
id_ = self._params.get_int_or_none('id')
- process = Process.by_id(self.conn, id_, create=True)
+ process = Process.by_id_or_create(self.conn, id_)
title_64 = self._params.get_str('title_b64')
if title_64:
title = b64decode(title_64.encode()).decode()
def do_GET_process_titles(self) -> dict[str, object]:
"""Show title history of Process of ?id=."""
- id_ = self._params.get_int_or_none('id')
+ id_ = self._params.get_int('id')
process = Process.by_id(self.conn, id_)
return {'process': process}
def do_GET_process_descriptions(self) -> dict[str, object]:
"""Show description historys of Process of ?id=."""
- id_ = self._params.get_int_or_none('id')
+ id_ = self._params.get_int('id')
process = Process.by_id(self.conn, id_)
return {'process': process}
def do_GET_process_efforts(self) -> dict[str, object]:
"""Show default effort history of Process of ?id=."""
- id_ = self._params.get_int_or_none('id')
+ id_ = self._params.get_int('id')
process = Process.by_id(self.conn, id_)
return {'process': process}
processes.sort(key=lambda p: p.title.newest, reverse=True)
else:
processes.sort(key=lambda p: p.title.newest)
+ sort_by = 'title'
return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
# POST handlers
def do_POST_day(self) -> str:
"""Update or insert Day of date and Todos mapped to it."""
date = self._params.get_str('date')
- day = Day.by_id(self.conn, date, create=True)
+ day = Day.by_id_or_create(self.conn, date)
day.comment = self._form_data.get_str('day_comment')
day.save(self.conn)
make_type = self._form_data.get_str('make_type')
# pylint: disable=too-many-branches
id_ = self._params.get_int_or_none('id')
for _ in self._form_data.get_all_str('delete'):
+ if id_ is None:
+ raise NotFoundException('trying to delete non-saved Process')
process = Process.by_id(self.conn, id_)
process.remove(self.conn)
return '/processes'
- process = Process.by_id(self.conn, id_, create=True)
+ process = Process.by_id_or_create(self.conn, id_)
process.title.set(self._form_data.get_str('title'))
process.description.set(self._form_data.get_str('description'))
process.effort.set(self._form_data.get_float('effort'))
"""Update/insert Condition of ?id= and fields defined in postvars."""
id_ = self._params.get_int_or_none('id')
for _ in self._form_data.get_all_str('delete'):
- condition = Condition.by_id(self.conn, id_)
+ if id_ is None:
+ raise NotFoundException('trying to delete non-saved Condition')
+ condition = Condition.by_id_or_create(self.conn, id_)
condition.remove(self.conn)
return '/conditions'
- condition = Condition.by_id(self.conn, id_, create=True)
- condition.is_active = self._form_data.get_all_str('is_active') != []
+ condition = Condition.by_id_or_create(self.conn, id_)
+ condition.is_active = self._form_data.get_str('is_active') == 'True'
condition.title.set(self._form_data.get_str('title'))
condition.description.set(self._form_data.get_str('description'))
condition.save(self.conn)