home · contact · privacy
Minor reorganization of GET handlers code.
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
15                                  NotFoundException)
16 from plomtask.db import DatabaseConnection, DatabaseFile
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo
20
21 TEMPLATES_DIR = 'templates'
22
23
24 class TaskServer(HTTPServer):
25     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
26
27     def __init__(self, db_file: DatabaseFile,
28                  *args: Any, **kwargs: Any) -> None:
29         super().__init__(*args, **kwargs)
30         self.db = db_file
31         self.headers: list[tuple[str, str]] = []
32         self._render_mode = 'html'
33         self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
34
35     def set_json_mode(self) -> None:
36         """Make server send JSON instead of HTML responses."""
37         self._render_mode = 'json'
38         self.headers += [('Content-Type', 'application/json')]
39
40     @staticmethod
41     def ctx_to_json(ctx: dict[str, object]) -> str:
42         """Render ctx into JSON string."""
43         def walk_ctx(node: object) -> Any:
44             if hasattr(node, 'as_dict_into_reference'):
45                 if hasattr(node, 'id_') and node.id_ is not None:
46                     return node.as_dict_into_reference(library)
47             if hasattr(node, 'as_dict'):
48                 return node.as_dict
49             if isinstance(node, (list, tuple)):
50                 return [walk_ctx(x) for x in node]
51             if isinstance(node, dict):
52                 d = {}
53                 for k, v in node.items():
54                     d[k] = walk_ctx(v)
55                 return d
56             if isinstance(node, HandledException):
57                 return str(node)
58             return node
59         library: dict[str, dict[str | int, object]] = {}
60         for k, v in ctx.items():
61             ctx[k] = walk_ctx(v)
62         ctx['_library'] = library
63         return json_dumps(ctx)
64
65     def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
66         """Render ctx according to self._render_mode.."""
67         tmpl_name = f'{tmpl_name}.{self._render_mode}'
68         if 'html' == self._render_mode:
69             template = self._jinja.get_template(tmpl_name)
70             return template.render(ctx)
71         return self.__class__.ctx_to_json(ctx)
72
73
74 class InputsParser:
75     """Wrapper for validating and retrieving dict-like HTTP inputs."""
76
77     def __init__(self, dict_: dict[str, list[str]],
78                  strictness: bool = True) -> None:
79         self.inputs = dict_
80         self.strict = strictness
81
82     def get_str(self, key: str, default: str = '',
83                 ignore_strict: bool = False) -> str:
84         """Retrieve single/first string value of key, or default."""
85         if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
86             if self.strict and not ignore_strict:
87                 raise BadFormatException(f'no value found for key {key}')
88             return default
89         return self.inputs[key][0]
90
91     def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
92         """Retrieve dict of (first) strings at key starting with prefix."""
93         ret = {}
94         for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
95             ret[key] = self.inputs[key][0]
96         return ret
97
98     def get_int(self, key: str) -> int:
99         """Retrieve single/first value of key as int, error if empty."""
100         val = self.get_int_or_none(key)
101         if val is None:
102             raise BadFormatException(f'unexpected empty value for: {key}')
103         return val
104
105     def get_int_or_none(self, key: str) -> int | None:
106         """Retrieve single/first value of key as int, return None if empty."""
107         val = self.get_str(key, ignore_strict=True)
108         if val == '':
109             return None
110         try:
111             return int(val)
112         except ValueError as e:
113             msg = f'cannot int form field value for key {key}: {val}'
114             raise BadFormatException(msg) from e
115
116     def get_float(self, key: str) -> float:
117         """Retrieve float value of key from self.postvars."""
118         val = self.get_str(key)
119         try:
120             return float(val)
121         except ValueError as e:
122             msg = f'cannot float form field value for key {key}: {val}'
123             raise BadFormatException(msg) from e
124
125     def get_all_str(self, key: str) -> list[str]:
126         """Retrieve list of string values at key."""
127         if key not in self.inputs.keys():
128             return []
129         return self.inputs[key]
130
131     def get_all_int(self, key: str) -> list[int]:
132         """Retrieve list of int values at key."""
133         all_str = self.get_all_str(key)
134         try:
135             return [int(s) for s in all_str if len(s) > 0]
136         except ValueError as e:
137             msg = f'cannot int a form field value for key {key} in: {all_str}'
138             raise BadFormatException(msg) from e
139
140     def get_all_floats_or_nones(self, key: str) -> list[float | None]:
141         """Retrieve list of float value at key, None if empty strings."""
142         ret: list[float | None] = []
143         for val in self.get_all_str(key):
144             if '' == val:
145                 ret += [None]
146             else:
147                 try:
148                     ret += [float(val)]
149                 except ValueError as e:
150                     msg = f'cannot float form field value for key {key}: {val}'
151                     raise BadFormatException(msg) from e
152         return ret
153
154
155 class TaskHandler(BaseHTTPRequestHandler):
156     """Handles single HTTP request."""
157     # pylint: disable=too-many-public-methods
158     server: TaskServer
159     conn: DatabaseConnection
160     _site: str
161     _form_data: InputsParser
162     _params: InputsParser
163
164     def _send_page(self,
165                    ctx: dict[str, Any],
166                    tmpl_name: str,
167                    code: int = 200
168                    ) -> None:
169         """Send ctx as proper HTTP response."""
170         body = self.server.render(ctx, tmpl_name)
171         self.send_response(code)
172         for header_tuple in self.server.headers:
173             self.send_header(*header_tuple)
174         self.end_headers()
175         self.wfile.write(bytes(body, 'utf-8'))
176
177     @staticmethod
178     def _request_wrapper(http_method: str, not_found_msg: str
179                          ) -> Callable[..., Callable[[TaskHandler], None]]:
180         def decorator(f: Callable[..., str | None]
181                       ) -> Callable[[TaskHandler], None]:
182             def wrapper(self: TaskHandler) -> None:
183                 # pylint: disable=protected-access
184                 # (because pylint here fails to detect the use of wrapper as a
185                 # method to self with respective access privileges)
186                 try:
187                     self.conn = DatabaseConnection(self.server.db)
188                     parsed_url = urlparse(self.path)
189                     self._site = path_split(parsed_url.path)[1]
190                     params = parse_qs(parsed_url.query, strict_parsing=True)
191                     self._params = InputsParser(params, False)
192                     handler_name = f'do_{http_method}_{self._site}'
193                     if hasattr(self, handler_name):
194                         handler = getattr(self, handler_name)
195                         redir_target = f(self, handler)
196                         if redir_target:
197                             self.send_response(302)
198                             self.send_header('Location', redir_target)
199                             self.end_headers()
200                     else:
201                         msg = f'{not_found_msg}: {self._site}'
202                         raise NotFoundException(msg)
203                 except HandledException as error:
204                     for cls in (Day, Todo, Condition, Process, ProcessStep):
205                         assert hasattr(cls, 'empty_cache')
206                         cls.empty_cache()
207                     ctx = {'msg': error}
208                     self._send_page(ctx, 'msg', error.http_code)
209                 finally:
210                     self.conn.close()
211             return wrapper
212         return decorator
213
214     @_request_wrapper('GET', 'Unknown page')
215     def do_GET(self, handler: Callable[[], str | dict[str, object]]
216                ) -> str | None:
217         """Render page with result of handler, or redirect if result is str."""
218         tmpl_name = f'{self._site}'
219         ctx_or_redir_target = handler()
220         if isinstance(ctx_or_redir_target, str):
221             return ctx_or_redir_target
222         self._send_page(ctx_or_redir_target, tmpl_name)
223         return None
224
225     @_request_wrapper('POST', 'Unknown POST target')
226     def do_POST(self, handler: Callable[[], str]) -> str:
227         """Handle POST with handler, prepare redirection to result."""
228         length = int(self.headers['content-length'])
229         postvars = parse_qs(self.rfile.read(length).decode(),
230                             keep_blank_values=True, strict_parsing=True)
231         self._form_data = InputsParser(postvars)
232         redir_target = handler()
233         self.conn.commit()
234         return redir_target
235
236     # GET handlers
237
238     @staticmethod
239     def _get_item(target_class: Any
240                   ) -> Callable[..., Callable[[TaskHandler],
241                                               dict[str, object]]]:
242         def decorator(f: Callable[..., dict[str, object]]
243                       ) -> Callable[[TaskHandler], dict[str, object]]:
244             def wrapper(self: TaskHandler) -> dict[str, object]:
245                 # pylint: disable=protected-access
246                 # (because pylint here fails to detect the use of wrapper as a
247                 # method to self with respective access privileges)
248                 id_ = self._params.get_int_or_none('id')
249                 if target_class.can_create_by_id:
250                     item = target_class.by_id_or_create(self.conn, id_)
251                 else:
252                     item = target_class.by_id(self.conn, id_)
253                 return f(self, item)
254             return wrapper
255         return decorator
256
257     def do_GET_(self) -> str:
258         """Return redirect target on GET /."""
259         return '/day'
260
261     def _do_GET_calendar(self) -> dict[str, object]:
262         """Show Days from ?start= to ?end=.
263
264         Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
265         same, the only difference being the HTML template they are rendered to,
266         which .do_GET selects from their method name.
267         """
268         start = self._params.get_str('start')
269         end = self._params.get_str('end')
270         if not end:
271             end = date_in_n_days(366)
272         ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
273         days, start, end = ret
274         days = Day.with_filled_gaps(days, start, end)
275         today = date_in_n_days(0)
276         return {'start': start, 'end': end, 'days': days, 'today': today}
277
278     def do_GET_calendar(self) -> dict[str, object]:
279         """Show Days from ?start= to ?end= – normal view."""
280         return self._do_GET_calendar()
281
282     def do_GET_calendar_txt(self) -> dict[str, object]:
283         """Show Days from ?start= to ?end= – minimalist view."""
284         return self._do_GET_calendar()
285
286     def do_GET_day(self) -> dict[str, object]:
287         """Show single Day of ?date=."""
288         date = self._params.get_str('date', date_in_n_days(0))
289         day = Day.by_id_or_create(self.conn, date)
290         make_type = self._params.get_str('make_type')
291         conditions_present = []
292         enablers_for = {}
293         disablers_for = {}
294         for todo in day.todos:
295             for condition in todo.conditions + todo.blockers:
296                 if condition not in conditions_present:
297                     conditions_present += [condition]
298                     enablers_for[condition.id_] = [p for p in
299                                                    Process.all(self.conn)
300                                                    if condition in p.enables]
301                     disablers_for[condition.id_] = [p for p in
302                                                     Process.all(self.conn)
303                                                     if condition in p.disables]
304         seen_todos: set[int] = set()
305         top_nodes = [t.get_step_tree(seen_todos)
306                      for t in day.todos if not t.parents]
307         return {'day': day,
308                 'top_nodes': top_nodes,
309                 'make_type': make_type,
310                 'enablers_for': enablers_for,
311                 'disablers_for': disablers_for,
312                 'conditions_present': conditions_present,
313                 'processes': Process.all(self.conn)}
314
315     @_get_item(Todo)
316     def do_GET_todo(self, todo: Todo) -> dict[str, object]:
317         """Show single Todo of ?id=."""
318
319         @dataclass
320         class TodoStepsNode:
321             """Collect what's useful for Todo steps tree display."""
322             id_: int
323             todo: Todo | None
324             process: Process | None
325             children: list[TodoStepsNode]  # pylint: disable=undefined-variable
326             fillable: bool = False
327
328         def walk_process_steps(id_: int,
329                                process_step_nodes: list[ProcessStepsNode],
330                                steps_nodes: list[TodoStepsNode]) -> None:
331             for process_step_node in process_step_nodes:
332                 id_ += 1
333                 node = TodoStepsNode(id_, None, process_step_node.process, [])
334                 steps_nodes += [node]
335                 walk_process_steps(id_, list(process_step_node.steps.values()),
336                                    node.children)
337
338         def walk_todo_steps(id_: int, todos: list[Todo],
339                             steps_nodes: list[TodoStepsNode]) -> None:
340             for todo in todos:
341                 matched = False
342                 for match in [item for item in steps_nodes
343                               if item.process
344                               and item.process == todo.process]:
345                     match.todo = todo
346                     matched = True
347                     for child in match.children:
348                         child.fillable = True
349                     walk_todo_steps(id_, todo.children, match.children)
350                 if not matched:
351                     id_ += 1
352                     node = TodoStepsNode(id_, todo, None, [])
353                     steps_nodes += [node]
354                     walk_todo_steps(id_, todo.children, node.children)
355
356         def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
357                                     ) -> set[int]:
358             ids = set()
359             for node in steps_nodes:
360                 if not node.todo:
361                     assert isinstance(node.process, Process)
362                     assert isinstance(node.process.id_, int)
363                     ids.add(node.process.id_)
364                 ids = ids | collect_adoptables_keys(node.children)
365             return ids
366
367         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
368         process_tree = todo.process.get_steps(self.conn, None)
369         steps_todo_to_process: list[TodoStepsNode] = []
370         walk_process_steps(0, list(process_tree.values()),
371                            steps_todo_to_process)
372         for steps_node in steps_todo_to_process:
373             steps_node.fillable = True
374         walk_todo_steps(len(steps_todo_to_process), todo_steps,
375                         steps_todo_to_process)
376         adoptables: dict[int, list[Todo]] = {}
377         any_adoptables = [Todo.by_id(self.conn, t.id_)
378                           for t in Todo.by_date(self.conn, todo.date)
379                           if t.id_ is not None
380                           and t != todo]
381         for id_ in collect_adoptables_keys(steps_todo_to_process):
382             adoptables[id_] = [t for t in any_adoptables
383                                if t.process.id_ == id_]
384         return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
385                 'adoption_candidates_for': adoptables,
386                 'process_candidates': Process.all(self.conn),
387                 'todo_candidates': any_adoptables,
388                 'condition_candidates': Condition.all(self.conn)}
389
390     def do_GET_todos(self) -> dict[str, object]:
391         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
392         sort_by = self._params.get_str('sort_by')
393         start = self._params.get_str('start')
394         end = self._params.get_str('end')
395         process_id = self._params.get_int_or_none('process_id')
396         comment_pattern = self._params.get_str('comment_pattern')
397         todos = []
398         ret = Todo.by_date_range_with_limits(self.conn, (start, end))
399         todos_by_date_range, start, end = ret
400         todos = [t for t in todos_by_date_range
401                  if comment_pattern in t.comment
402                  and ((not process_id) or t.process.id_ == process_id)]
403         if sort_by == 'doneness':
404             todos.sort(key=lambda t: t.is_done)
405         elif sort_by == '-doneness':
406             todos.sort(key=lambda t: t.is_done, reverse=True)
407         elif sort_by == 'title':
408             todos.sort(key=lambda t: t.title_then)
409         elif sort_by == '-title':
410             todos.sort(key=lambda t: t.title_then, reverse=True)
411         elif sort_by == 'comment':
412             todos.sort(key=lambda t: t.comment)
413         elif sort_by == '-comment':
414             todos.sort(key=lambda t: t.comment, reverse=True)
415         elif sort_by == '-date':
416             todos.sort(key=lambda t: t.date, reverse=True)
417         else:
418             todos.sort(key=lambda t: t.date)
419             sort_by = 'title'
420         return {'start': start, 'end': end, 'process_id': process_id,
421                 'comment_pattern': comment_pattern, 'todos': todos,
422                 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
423
424     def do_GET_conditions(self) -> dict[str, object]:
425         """Show all Conditions."""
426         pattern = self._params.get_str('pattern')
427         sort_by = self._params.get_str('sort_by')
428         conditions = Condition.matching(self.conn, pattern)
429         if sort_by == 'is_active':
430             conditions.sort(key=lambda c: c.is_active)
431         elif sort_by == '-is_active':
432             conditions.sort(key=lambda c: c.is_active, reverse=True)
433         elif sort_by == '-title':
434             conditions.sort(key=lambda c: c.title.newest, reverse=True)
435         else:
436             conditions.sort(key=lambda c: c.title.newest)
437             sort_by = 'title'
438         return {'conditions': conditions,
439                 'sort_by': sort_by,
440                 'pattern': pattern}
441
442     @_get_item(Condition)
443     def do_GET_condition(self, c: Condition) -> dict[str, object]:
444         """Show Condition of ?id=."""
445         ps = Process.all(self.conn)
446         return {'condition': c, 'is_new': c.id_ is None,
447                 'enabled_processes': [p for p in ps if c in p.conditions],
448                 'disabled_processes': [p for p in ps if c in p.blockers],
449                 'enabling_processes': [p for p in ps if c in p.enables],
450                 'disabling_processes': [p for p in ps if c in p.disables]}
451
452     @_get_item(Condition)
453     def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
454         """Show title history of Condition of ?id=."""
455         return {'condition': c}
456
457     @_get_item(Condition)
458     def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
459         """Show description historys of Condition of ?id=."""
460         return {'condition': c}
461
462     @_get_item(Process)
463     def do_GET_process(self, process: Process) -> dict[str, object]:
464         """Show Process of ?id=."""
465         owner_ids = self._params.get_all_int('step_to')
466         owned_ids = self._params.get_all_int('has_step')
467         title_64 = self._params.get_str('title_b64')
468         if title_64:
469             title = b64decode(title_64.encode()).decode()
470             process.title.set(title)
471         owners = process.used_as_step_by(self.conn)
472         for step_id in owner_ids:
473             owners += [Process.by_id(self.conn, step_id)]
474         preset_top_step = None
475         for process_id in owned_ids:
476             preset_top_step = process_id
477         return {'process': process, 'is_new': process.id_ is None,
478                 'preset_top_step': preset_top_step,
479                 'steps': process.get_steps(self.conn), 'owners': owners,
480                 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
481                 'process_candidates': Process.all(self.conn),
482                 'condition_candidates': Condition.all(self.conn)}
483
484     @_get_item(Process)
485     def do_GET_process_titles(self, p: Process) -> dict[str, object]:
486         """Show title history of Process of ?id=."""
487         return {'process': p}
488
489     @_get_item(Process)
490     def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
491         """Show description historys of Process of ?id=."""
492         return {'process': p}
493
494     @_get_item(Process)
495     def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
496         """Show default effort history of Process of ?id=."""
497         return {'process': p}
498
499     def do_GET_processes(self) -> dict[str, object]:
500         """Show all Processes."""
501         pattern = self._params.get_str('pattern')
502         sort_by = self._params.get_str('sort_by')
503         processes = Process.matching(self.conn, pattern)
504         if sort_by == 'steps':
505             processes.sort(key=lambda p: len(p.explicit_steps))
506         elif sort_by == '-steps':
507             processes.sort(key=lambda p: len(p.explicit_steps), reverse=True)
508         elif sort_by == 'owners':
509             processes.sort(key=lambda p: p.n_owners or 0)
510         elif sort_by == '-owners':
511             processes.sort(key=lambda p: p.n_owners or 0, reverse=True)
512         elif sort_by == 'effort':
513             processes.sort(key=lambda p: p.effort.newest)
514         elif sort_by == '-effort':
515             processes.sort(key=lambda p: p.effort.newest, reverse=True)
516         elif sort_by == '-title':
517             processes.sort(key=lambda p: p.title.newest, reverse=True)
518         else:
519             processes.sort(key=lambda p: p.title.newest)
520             sort_by = 'title'
521         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
522
523     # POST handlers
524
525     @staticmethod
526     def _delete_or_post(target_class: Any, redir_target: str = '/'
527                         ) -> Callable[..., Callable[[TaskHandler], str]]:
528         def decorator(f: Callable[..., str]
529                       ) -> Callable[[TaskHandler], str]:
530             def wrapper(self: TaskHandler) -> str:
531                 # pylint: disable=protected-access
532                 # (because pylint here fails to detect the use of wrapper as a
533                 # method to self with respective access privileges)
534                 id_ = self._params.get_int_or_none('id')
535                 for _ in self._form_data.get_all_str('delete'):
536                     if id_ is None:
537                         msg = 'trying to delete non-saved ' +\
538                                 f'{target_class.__name__}'
539                         raise NotFoundException(msg)
540                     item = target_class.by_id(self.conn, id_)
541                     item.remove(self.conn)
542                     return redir_target
543                 if target_class.can_create_by_id:
544                     item = target_class.by_id_or_create(self.conn, id_)
545                 else:
546                     item = target_class.by_id(self.conn, id_)
547                 return f(self, item)
548             return wrapper
549         return decorator
550
551     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
552         """Update history timestamps for VersionedAttribute."""
553         id_ = self._params.get_int_or_none('id')
554         item = cls.by_id(self.conn, id_)
555         attr = getattr(item, attr_name)
556         for k, v in self._form_data.get_first_strings_starting('at:').items():
557             old = k[3:]
558             if old[19:] != v:
559                 attr.reset_timestamp(old, f'{v}.0')
560         attr.save(self.conn)
561         return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
562
563     def do_POST_day(self) -> str:
564         """Update or insert Day of date and Todos mapped to it."""
565         # pylint: disable=too-many-locals
566         date = self._params.get_str('date')
567         day_comment = self._form_data.get_str('day_comment')
568         make_type = self._form_data.get_str('make_type')
569         old_todos = self._form_data.get_all_int('todo_id')
570         new_todos = self._form_data.get_all_int('new_todo')
571         comments = self._form_data.get_all_str('comment')
572         efforts = self._form_data.get_all_floats_or_nones('effort')
573         done_todos = self._form_data.get_all_int('done')
574         for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
575             raise BadFormatException('"done" field refers to unknown Todo')
576         is_done = [t_id in done_todos for t_id in old_todos]
577         if not (len(old_todos) == len(is_done) == len(comments)
578                 == len(efforts)):
579             msg = 'not equal number each of number of todo_id, comments, ' +\
580                     'and efforts inputs'
581             raise BadFormatException(msg)
582         day = Day.by_id_or_create(self.conn, date)
583         day.comment = day_comment
584         day.save(self.conn)
585         for process_id in sorted(new_todos):
586             if 'empty' == make_type:
587                 process = Process.by_id(self.conn, process_id)
588                 todo = Todo(None, process, False, date)
589                 todo.save(self.conn)
590             else:
591                 Todo.create_with_children(self.conn, process_id, date)
592         for i, todo_id in enumerate(old_todos):
593             todo = Todo.by_id(self.conn, todo_id)
594             todo.is_done = is_done[i]
595             todo.comment = comments[i]
596             todo.effort = efforts[i]
597             todo.save(self.conn)
598         return f'/day?date={date}&make_type={make_type}'
599
600     @_delete_or_post(Todo, '/')
601     def do_POST_todo(self, todo: Todo) -> str:
602         """Update Todo and its children."""
603         # pylint: disable=too-many-locals
604         adopted_child_ids = self._form_data.get_all_int('adopt')
605         processes_to_make_full = self._form_data.get_all_int('make_full')
606         processes_to_make_empty = self._form_data.get_all_int('make_empty')
607         fill_fors = self._form_data.get_first_strings_starting('fill_for_')
608         effort = self._form_data.get_str('effort', ignore_strict=True)
609         conditions = self._form_data.get_all_int('conditions')
610         disables = self._form_data.get_all_int('disables')
611         blockers = self._form_data.get_all_int('blockers')
612         enables = self._form_data.get_all_int('enables')
613         is_done = len(self._form_data.get_all_str('done')) > 0
614         calendarize = len(self._form_data.get_all_str('calendarize')) > 0
615         comment = self._form_data.get_str('comment', ignore_strict=True)
616         for v in fill_fors.values():
617             if v.startswith('make_empty_'):
618                 processes_to_make_empty += [int(v[11:])]
619             elif v.startswith('make_full_'):
620                 processes_to_make_full += [int(v[10:])]
621             elif v != 'ignore':
622                 adopted_child_ids += [int(v)]
623         to_remove = []
624         for child in todo.children:
625             assert isinstance(child.id_, int)
626             if child.id_ not in adopted_child_ids:
627                 to_remove += [child.id_]
628         for id_ in to_remove:
629             child = Todo.by_id(self.conn, id_)
630             todo.remove_child(child)
631         for child_id in adopted_child_ids:
632             if child_id in [c.id_ for c in todo.children]:
633                 continue
634             child = Todo.by_id(self.conn, child_id)
635             todo.add_child(child)
636         for process_id in processes_to_make_empty:
637             process = Process.by_id(self.conn, process_id)
638             made = Todo(None, process, False, todo.date)
639             made.save(self.conn)
640             todo.add_child(made)
641         for process_id in processes_to_make_full:
642             made = Todo.create_with_children(self.conn, process_id, todo.date)
643             todo.add_child(made)
644         todo.effort = float(effort) if effort else None
645         todo.set_conditions(self.conn, conditions)
646         todo.set_blockers(self.conn, blockers)
647         todo.set_enables(self.conn, enables)
648         todo.set_disables(self.conn, disables)
649         todo.is_done = is_done
650         todo.calendarize = calendarize
651         todo.comment = comment
652         todo.save(self.conn)
653         return f'/todo?id={todo.id_}'
654
655     def do_POST_process_descriptions(self) -> str:
656         """Update history timestamps for Process.description."""
657         return self._change_versioned_timestamps(Process, 'description')
658
659     def do_POST_process_efforts(self) -> str:
660         """Update history timestamps for Process.effort."""
661         return self._change_versioned_timestamps(Process, 'effort')
662
663     def do_POST_process_titles(self) -> str:
664         """Update history timestamps for Process.title."""
665         return self._change_versioned_timestamps(Process, 'title')
666
667     @_delete_or_post(Process, '/processes')
668     def do_POST_process(self, process: Process) -> str:
669         """Update or insert Process of ?id= and fields defined in postvars."""
670         # pylint: disable=too-many-locals
671         # pylint: disable=too-many-statements
672         title = self._form_data.get_str('title')
673         description = self._form_data.get_str('description')
674         effort = self._form_data.get_float('effort')
675         conditions = self._form_data.get_all_int('conditions')
676         blockers = self._form_data.get_all_int('blockers')
677         enables = self._form_data.get_all_int('enables')
678         disables = self._form_data.get_all_int('disables')
679         calendarize = self._form_data.get_all_str('calendarize') != []
680         suppresses = self._form_data.get_all_int('suppresses')
681         step_of = self._form_data.get_all_str('step_of')
682         keep_steps = self._form_data.get_all_int('keep_step')
683         step_ids = self._form_data.get_all_int('steps')
684         new_top_steps = self._form_data.get_all_str('new_top_step')
685         step_process_id_to = {}
686         step_parent_id_to = {}
687         new_steps_to = {}
688         for step_id in step_ids:
689             name = f'new_step_to_{step_id}'
690             new_steps_to[step_id] = self._form_data.get_all_int(name)
691         for step_id in keep_steps:
692             name = f'step_{step_id}_process_id'
693             step_process_id_to[step_id] = self._form_data.get_int(name)
694             name = f'step_{step_id}_parent_id'
695             step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
696         process.title.set(title)
697         process.description.set(description)
698         process.effort.set(effort)
699         process.set_conditions(self.conn, conditions)
700         process.set_blockers(self.conn, blockers)
701         process.set_enables(self.conn, enables)
702         process.set_disables(self.conn, disables)
703         process.calendarize = calendarize
704         process.save(self.conn)
705         assert isinstance(process.id_, int)
706         new_step_title = None
707         steps: list[ProcessStep] = []
708         for step_id in keep_steps:
709             if step_id not in step_ids:
710                 raise BadFormatException('trying to keep unknown step')
711             step = ProcessStep(step_id, process.id_,
712                                step_process_id_to[step_id],
713                                step_parent_id_to[step_id])
714             steps += [step]
715         for step_id in step_ids:
716             new = [ProcessStep(None, process.id_, step_process_id, step_id)
717                    for step_process_id in new_steps_to[step_id]]
718             steps += new
719         for step_identifier in new_top_steps:
720             try:
721                 step_process_id = int(step_identifier)
722                 step = ProcessStep(None, process.id_, step_process_id, None)
723                 steps += [step]
724             except ValueError:
725                 new_step_title = step_identifier
726         process.set_steps(self.conn, steps)
727         process.set_step_suppressions(self.conn, suppresses)
728         owners_to_set = []
729         new_owner_title = None
730         for owner_identifier in step_of:
731             try:
732                 owners_to_set += [int(owner_identifier)]
733             except ValueError:
734                 new_owner_title = owner_identifier
735         process.set_owners(self.conn, owners_to_set)
736         params = f'id={process.id_}'
737         if new_step_title:
738             title_b64_encoded = b64encode(new_step_title.encode()).decode()
739             params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
740         elif new_owner_title:
741             title_b64_encoded = b64encode(new_owner_title.encode()).decode()
742             params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
743         process.save(self.conn)
744         return f'/process?{params}'
745
746     def do_POST_condition_descriptions(self) -> str:
747         """Update history timestamps for Condition.description."""
748         return self._change_versioned_timestamps(Condition, 'description')
749
750     def do_POST_condition_titles(self) -> str:
751         """Update history timestamps for Condition.title."""
752         return self._change_versioned_timestamps(Condition, 'title')
753
754     @_delete_or_post(Condition, '/conditions')
755     def do_POST_condition(self, condition: Condition) -> str:
756         """Update/insert Condition of ?id= and fields defined in postvars."""
757         is_active = self._form_data.get_str('is_active') == 'True'
758         title = self._form_data.get_str('title')
759         description = self._form_data.get_str('description')
760         condition.is_active = is_active
761         condition.title.set(title)
762         condition.description.set(description)
763         condition.save(self.conn)
764         return f'/condition?id={condition.id_}'