home · contact · privacy
Tidy up (even if pylint disapproves) unwieldy POST handlers code.
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
15                                  NotFoundException)
16 from plomtask.db import DatabaseConnection, DatabaseFile
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo
20
21 TEMPLATES_DIR = 'templates'
22
23
24 class TaskServer(HTTPServer):
25     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
26
27     def __init__(self, db_file: DatabaseFile,
28                  *args: Any, **kwargs: Any) -> None:
29         super().__init__(*args, **kwargs)
30         self.db = db_file
31         self.headers: list[tuple[str, str]] = []
32         self._render_mode = 'html'
33         self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
34
35     def set_json_mode(self) -> None:
36         """Make server send JSON instead of HTML responses."""
37         self._render_mode = 'json'
38         self.headers += [('Content-Type', 'application/json')]
39
40     @staticmethod
41     def ctx_to_json(ctx: dict[str, object]) -> str:
42         """Render ctx into JSON string."""
43         def walk_ctx(node: object) -> Any:
44             if hasattr(node, 'as_dict_into_reference'):
45                 if hasattr(node, 'id_') and node.id_ is not None:
46                     return node.as_dict_into_reference(library)
47             if hasattr(node, 'as_dict'):
48                 return node.as_dict
49             if isinstance(node, (list, tuple)):
50                 return [walk_ctx(x) for x in node]
51             if isinstance(node, dict):
52                 d = {}
53                 for k, v in node.items():
54                     d[k] = walk_ctx(v)
55                 return d
56             if isinstance(node, HandledException):
57                 return str(node)
58             return node
59         library: dict[str, dict[str | int, object]] = {}
60         for k, v in ctx.items():
61             ctx[k] = walk_ctx(v)
62         ctx['_library'] = library
63         return json_dumps(ctx)
64
65     def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
66         """Render ctx according to self._render_mode.."""
67         tmpl_name = f'{tmpl_name}.{self._render_mode}'
68         if 'html' == self._render_mode:
69             template = self._jinja.get_template(tmpl_name)
70             return template.render(ctx)
71         return self.__class__.ctx_to_json(ctx)
72
73
74 class InputsParser:
75     """Wrapper for validating and retrieving dict-like HTTP inputs."""
76
77     def __init__(self, dict_: dict[str, list[str]],
78                  strictness: bool = True) -> None:
79         self.inputs = dict_
80         self.strict = strictness
81
82     def get_str(self, key: str, default: str = '',
83                 ignore_strict: bool = False) -> str:
84         """Retrieve single/first string value of key, or default."""
85         if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
86             if self.strict and not ignore_strict:
87                 raise BadFormatException(f'no value found for key {key}')
88             return default
89         return self.inputs[key][0]
90
91     def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
92         """Retrieve dict of (first) strings at key starting with prefix."""
93         ret = {}
94         for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
95             ret[key] = self.inputs[key][0]
96         return ret
97
98     def get_int(self, key: str) -> int:
99         """Retrieve single/first value of key as int, error if empty."""
100         val = self.get_int_or_none(key)
101         if val is None:
102             raise BadFormatException(f'unexpected empty value for: {key}')
103         return val
104
105     def get_int_or_none(self, key: str) -> int | None:
106         """Retrieve single/first value of key as int, return None if empty."""
107         val = self.get_str(key, ignore_strict=True)
108         if val == '':
109             return None
110         try:
111             return int(val)
112         except ValueError as e:
113             msg = f'cannot int form field value for key {key}: {val}'
114             raise BadFormatException(msg) from e
115
116     def get_float(self, key: str) -> float:
117         """Retrieve float value of key from self.postvars."""
118         val = self.get_str(key)
119         try:
120             return float(val)
121         except ValueError as e:
122             msg = f'cannot float form field value for key {key}: {val}'
123             raise BadFormatException(msg) from e
124
125     def get_all_str(self, key: str) -> list[str]:
126         """Retrieve list of string values at key."""
127         if key not in self.inputs.keys():
128             return []
129         return self.inputs[key]
130
131     def get_all_int(self, key: str) -> list[int]:
132         """Retrieve list of int values at key."""
133         all_str = self.get_all_str(key)
134         try:
135             return [int(s) for s in all_str if len(s) > 0]
136         except ValueError as e:
137             msg = f'cannot int a form field value for key {key} in: {all_str}'
138             raise BadFormatException(msg) from e
139
140
141 class TaskHandler(BaseHTTPRequestHandler):
142     """Handles single HTTP request."""
143     # pylint: disable=too-many-public-methods
144     server: TaskServer
145     conn: DatabaseConnection
146     _site: str
147     _form_data: InputsParser
148     _params: InputsParser
149
150     def _send_page(self,
151                    ctx: dict[str, Any],
152                    tmpl_name: str,
153                    code: int = 200
154                    ) -> None:
155         """Send ctx as proper HTTP response."""
156         body = self.server.render(ctx, tmpl_name)
157         self.send_response(code)
158         for header_tuple in self.server.headers:
159             self.send_header(*header_tuple)
160         self.end_headers()
161         self.wfile.write(bytes(body, 'utf-8'))
162
163     @staticmethod
164     def _request_wrapper(http_method: str, not_found_msg: str
165                          ) -> Callable[..., Callable[[TaskHandler], None]]:
166         def decorator(f: Callable[..., str | None]
167                       ) -> Callable[[TaskHandler], None]:
168             def wrapper(self: TaskHandler) -> None:
169                 # pylint: disable=protected-access
170                 # (because pylint here fails to detect the use of wrapper as a
171                 # method to self with respective access privileges)
172                 try:
173                     self.conn = DatabaseConnection(self.server.db)
174                     parsed_url = urlparse(self.path)
175                     self._site = path_split(parsed_url.path)[1]
176                     params = parse_qs(parsed_url.query, strict_parsing=True)
177                     self._params = InputsParser(params, False)
178                     handler_name = f'do_{http_method}_{self._site}'
179                     if hasattr(self, handler_name):
180                         handler = getattr(self, handler_name)
181                         redir_target = f(self, handler)
182                         if redir_target:
183                             self.send_response(302)
184                             self.send_header('Location', redir_target)
185                             self.end_headers()
186                     else:
187                         msg = f'{not_found_msg}: {self._site}'
188                         raise NotFoundException(msg)
189                 except HandledException as error:
190                     for cls in (Day, Todo, Condition, Process, ProcessStep):
191                         assert hasattr(cls, 'empty_cache')
192                         cls.empty_cache()
193                     ctx = {'msg': error}
194                     self._send_page(ctx, 'msg', error.http_code)
195                 finally:
196                     self.conn.close()
197             return wrapper
198         return decorator
199
200     @_request_wrapper('GET', 'Unknown page')
201     def do_GET(self, handler: Callable[[], str | dict[str, object]]
202                ) -> str | None:
203         """Render page with result of handler, or redirect if result is str."""
204         tmpl_name = f'{self._site}'
205         ctx_or_redir_target = handler()
206         if isinstance(ctx_or_redir_target, str):
207             return ctx_or_redir_target
208         self._send_page(ctx_or_redir_target, tmpl_name)
209         return None
210
211     @_request_wrapper('POST', 'Unknown POST target')
212     def do_POST(self, handler: Callable[[], str]) -> str:
213         """Handle POST with handler, prepare redirection to result."""
214         length = int(self.headers['content-length'])
215         postvars = parse_qs(self.rfile.read(length).decode(),
216                             keep_blank_values=True, strict_parsing=True)
217         self._form_data = InputsParser(postvars)
218         redir_target = handler()
219         self.conn.commit()
220         return redir_target
221
222     # GET handlers
223
224     def do_GET_(self) -> str:
225         """Return redirect target on GET /."""
226         return '/day'
227
228     def _do_GET_calendar(self) -> dict[str, object]:
229         """Show Days from ?start= to ?end=.
230
231         Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
232         same, the only difference being the HTML template they are rendered to,
233         which .do_GET selects from their method name.
234         """
235         start = self._params.get_str('start')
236         end = self._params.get_str('end')
237         if not end:
238             end = date_in_n_days(366)
239         ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
240         days, start, end = ret
241         days = Day.with_filled_gaps(days, start, end)
242         today = date_in_n_days(0)
243         return {'start': start, 'end': end, 'days': days, 'today': today}
244
245     def do_GET_calendar(self) -> dict[str, object]:
246         """Show Days from ?start= to ?end= – normal view."""
247         return self._do_GET_calendar()
248
249     def do_GET_calendar_txt(self) -> dict[str, object]:
250         """Show Days from ?start= to ?end= – minimalist view."""
251         return self._do_GET_calendar()
252
253     def do_GET_day(self) -> dict[str, object]:
254         """Show single Day of ?date=."""
255         date = self._params.get_str('date', date_in_n_days(0))
256         day = Day.by_id_or_create(self.conn, date)
257         make_type = self._params.get_str('make_type')
258         conditions_present = []
259         enablers_for = {}
260         disablers_for = {}
261         for todo in day.todos:
262             for condition in todo.conditions + todo.blockers:
263                 if condition not in conditions_present:
264                     conditions_present += [condition]
265                     enablers_for[condition.id_] = [p for p in
266                                                    Process.all(self.conn)
267                                                    if condition in p.enables]
268                     disablers_for[condition.id_] = [p for p in
269                                                     Process.all(self.conn)
270                                                     if condition in p.disables]
271         seen_todos: set[int] = set()
272         top_nodes = [t.get_step_tree(seen_todos)
273                      for t in day.todos if not t.parents]
274         return {'day': day,
275                 'top_nodes': top_nodes,
276                 'make_type': make_type,
277                 'enablers_for': enablers_for,
278                 'disablers_for': disablers_for,
279                 'conditions_present': conditions_present,
280                 'processes': Process.all(self.conn)}
281
282     def do_GET_todo(self) -> dict[str, object]:
283         """Show single Todo of ?id=."""
284
285         @dataclass
286         class TodoStepsNode:
287             """Collect what's useful for Todo steps tree display."""
288             id_: int
289             todo: Todo | None
290             process: Process | None
291             children: list[TodoStepsNode]  # pylint: disable=undefined-variable
292             fillable: bool = False
293
294         def walk_process_steps(id_: int,
295                                process_step_nodes: list[ProcessStepsNode],
296                                steps_nodes: list[TodoStepsNode]) -> None:
297             for process_step_node in process_step_nodes:
298                 id_ += 1
299                 node = TodoStepsNode(id_, None, process_step_node.process, [])
300                 steps_nodes += [node]
301                 walk_process_steps(id_, list(process_step_node.steps.values()),
302                                    node.children)
303
304         def walk_todo_steps(id_: int, todos: list[Todo],
305                             steps_nodes: list[TodoStepsNode]) -> None:
306             for todo in todos:
307                 matched = False
308                 for match in [item for item in steps_nodes
309                               if item.process
310                               and item.process == todo.process]:
311                     match.todo = todo
312                     matched = True
313                     for child in match.children:
314                         child.fillable = True
315                     walk_todo_steps(id_, todo.children, match.children)
316                 if not matched:
317                     id_ += 1
318                     node = TodoStepsNode(id_, todo, None, [])
319                     steps_nodes += [node]
320                     walk_todo_steps(id_, todo.children, node.children)
321
322         def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
323                                     ) -> set[int]:
324             ids = set()
325             for node in steps_nodes:
326                 if not node.todo:
327                     assert isinstance(node.process, Process)
328                     assert isinstance(node.process.id_, int)
329                     ids.add(node.process.id_)
330                 ids = ids | collect_adoptables_keys(node.children)
331             return ids
332
333         id_ = self._params.get_int('id')
334         todo = Todo.by_id(self.conn, id_)
335         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
336         process_tree = todo.process.get_steps(self.conn, None)
337         steps_todo_to_process: list[TodoStepsNode] = []
338         walk_process_steps(0, list(process_tree.values()),
339                            steps_todo_to_process)
340         for steps_node in steps_todo_to_process:
341             steps_node.fillable = True
342         walk_todo_steps(len(steps_todo_to_process), todo_steps,
343                         steps_todo_to_process)
344         adoptables: dict[int, list[Todo]] = {}
345         any_adoptables = [Todo.by_id(self.conn, t.id_)
346                           for t in Todo.by_date(self.conn, todo.date)
347                           if t.id_ is not None
348                           and t != todo]
349         for id_ in collect_adoptables_keys(steps_todo_to_process):
350             adoptables[id_] = [t for t in any_adoptables
351                                if t.process.id_ == id_]
352         return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
353                 'adoption_candidates_for': adoptables,
354                 'process_candidates': Process.all(self.conn),
355                 'todo_candidates': any_adoptables,
356                 'condition_candidates': Condition.all(self.conn)}
357
358     def do_GET_todos(self) -> dict[str, object]:
359         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
360         sort_by = self._params.get_str('sort_by')
361         start = self._params.get_str('start')
362         end = self._params.get_str('end')
363         process_id = self._params.get_int_or_none('process_id')
364         comment_pattern = self._params.get_str('comment_pattern')
365         todos = []
366         ret = Todo.by_date_range_with_limits(self.conn, (start, end))
367         todos_by_date_range, start, end = ret
368         todos = [t for t in todos_by_date_range
369                  if comment_pattern in t.comment
370                  and ((not process_id) or t.process.id_ == process_id)]
371         if sort_by == 'doneness':
372             todos.sort(key=lambda t: t.is_done)
373         elif sort_by == '-doneness':
374             todos.sort(key=lambda t: t.is_done, reverse=True)
375         elif sort_by == 'title':
376             todos.sort(key=lambda t: t.title_then)
377         elif sort_by == '-title':
378             todos.sort(key=lambda t: t.title_then, reverse=True)
379         elif sort_by == 'comment':
380             todos.sort(key=lambda t: t.comment)
381         elif sort_by == '-comment':
382             todos.sort(key=lambda t: t.comment, reverse=True)
383         elif sort_by == '-date':
384             todos.sort(key=lambda t: t.date, reverse=True)
385         else:
386             todos.sort(key=lambda t: t.date)
387             sort_by = 'title'
388         return {'start': start, 'end': end, 'process_id': process_id,
389                 'comment_pattern': comment_pattern, 'todos': todos,
390                 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
391
392     def do_GET_conditions(self) -> dict[str, object]:
393         """Show all Conditions."""
394         pattern = self._params.get_str('pattern')
395         conditions = Condition.matching(self.conn, pattern)
396         sort_by = self._params.get_str('sort_by')
397         if sort_by == 'is_active':
398             conditions.sort(key=lambda c: c.is_active)
399         elif sort_by == '-is_active':
400             conditions.sort(key=lambda c: c.is_active, reverse=True)
401         elif sort_by == '-title':
402             conditions.sort(key=lambda c: c.title.newest, reverse=True)
403         else:
404             conditions.sort(key=lambda c: c.title.newest)
405             sort_by = 'title'
406         return {'conditions': conditions,
407                 'sort_by': sort_by,
408                 'pattern': pattern}
409
410     def do_GET_condition(self) -> dict[str, object]:
411         """Show Condition of ?id=."""
412         id_ = self._params.get_int_or_none('id')
413         c = Condition.by_id_or_create(self.conn, id_)
414         ps = Process.all(self.conn)
415         return {'condition': c, 'is_new': c.id_ is None,
416                 'enabled_processes': [p for p in ps if c in p.conditions],
417                 'disabled_processes': [p for p in ps if c in p.blockers],
418                 'enabling_processes': [p for p in ps if c in p.enables],
419                 'disabling_processes': [p for p in ps if c in p.disables]}
420
421     def do_GET_condition_titles(self) -> dict[str, object]:
422         """Show title history of Condition of ?id=."""
423         id_ = self._params.get_int('id')
424         condition = Condition.by_id(self.conn, id_)
425         return {'condition': condition}
426
427     def do_GET_condition_descriptions(self) -> dict[str, object]:
428         """Show description historys of Condition of ?id=."""
429         id_ = self._params.get_int('id')
430         condition = Condition.by_id(self.conn, id_)
431         return {'condition': condition}
432
433     def do_GET_process(self) -> dict[str, object]:
434         """Show Process of ?id=."""
435         id_ = self._params.get_int_or_none('id')
436         process = Process.by_id_or_create(self.conn, id_)
437         title_64 = self._params.get_str('title_b64')
438         if title_64:
439             title = b64decode(title_64.encode()).decode()
440             process.title.set(title)
441         owners = process.used_as_step_by(self.conn)
442         for step_id in self._params.get_all_int('step_to'):
443             owners += [Process.by_id(self.conn, step_id)]
444         preset_top_step = None
445         for process_id in self._params.get_all_int('has_step'):
446             preset_top_step = process_id
447         return {'process': process, 'is_new': process.id_ is None,
448                 'preset_top_step': preset_top_step,
449                 'steps': process.get_steps(self.conn), 'owners': owners,
450                 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
451                 'process_candidates': Process.all(self.conn),
452                 'condition_candidates': Condition.all(self.conn)}
453
454     def do_GET_process_titles(self) -> dict[str, object]:
455         """Show title history of Process of ?id=."""
456         id_ = self._params.get_int('id')
457         process = Process.by_id(self.conn, id_)
458         return {'process': process}
459
460     def do_GET_process_descriptions(self) -> dict[str, object]:
461         """Show description historys of Process of ?id=."""
462         id_ = self._params.get_int('id')
463         process = Process.by_id(self.conn, id_)
464         return {'process': process}
465
466     def do_GET_process_efforts(self) -> dict[str, object]:
467         """Show default effort history of Process of ?id=."""
468         id_ = self._params.get_int('id')
469         process = Process.by_id(self.conn, id_)
470         return {'process': process}
471
472     def do_GET_processes(self) -> dict[str, object]:
473         """Show all Processes."""
474         pattern = self._params.get_str('pattern')
475         processes = Process.matching(self.conn, pattern)
476         sort_by = self._params.get_str('sort_by')
477         if sort_by == 'steps':
478             processes.sort(key=lambda p: len(p.explicit_steps))
479         elif sort_by == '-steps':
480             processes.sort(key=lambda p: len(p.explicit_steps), reverse=True)
481         elif sort_by == 'owners':
482             processes.sort(key=lambda p: p.n_owners or 0)
483         elif sort_by == '-owners':
484             processes.sort(key=lambda p: p.n_owners or 0, reverse=True)
485         elif sort_by == 'effort':
486             processes.sort(key=lambda p: p.effort.newest)
487         elif sort_by == '-effort':
488             processes.sort(key=lambda p: p.effort.newest, reverse=True)
489         elif sort_by == '-title':
490             processes.sort(key=lambda p: p.title.newest, reverse=True)
491         else:
492             processes.sort(key=lambda p: p.title.newest)
493             sort_by = 'title'
494         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
495
496     # POST handlers
497
498     @staticmethod
499     def _delete_or_post(target_class: Any, redir_target: str = '/'
500                         ) -> Callable[..., Callable[[TaskHandler], str]]:
501         def decorator(f: Callable[..., str]
502                       ) -> Callable[[TaskHandler], str]:
503             def wrapper(self: TaskHandler) -> str:
504                 # pylint: disable=protected-access
505                 # (because pylint here fails to detect the use of wrapper as a
506                 # method to self with respective access privileges)
507                 id_ = self._params.get_int_or_none('id')
508                 for _ in self._form_data.get_all_str('delete'):
509                     if id_ is None:
510                         msg = 'trying to delete non-saved ' +\
511                                 f'{target_class.__name__}'
512                         raise NotFoundException(msg)
513                     item = target_class.by_id(self.conn, id_)
514                     item.remove(self.conn)
515                     return redir_target
516                 if target_class.can_create_by_id:
517                     item = target_class.by_id_or_create(self.conn, id_)
518                 else:
519                     item = target_class.by_id(self.conn, id_)
520                 return f(self, item)
521             return wrapper
522         return decorator
523
524     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
525         """Update history timestamps for VersionedAttribute."""
526         id_ = self._params.get_int_or_none('id')
527         item = cls.by_id(self.conn, id_)
528         attr = getattr(item, attr_name)
529         for k, v in self._form_data.get_first_strings_starting('at:').items():
530             old = k[3:]
531             if old[19:] != v:
532                 attr.reset_timestamp(old, f'{v}.0')
533         attr.save(self.conn)
534         return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
535
536     def do_POST_day(self) -> str:
537         """Update or insert Day of date and Todos mapped to it."""
538         # pylint: disable=too-many-locals
539         date = self._params.get_str('date')
540         day_comment = self._form_data.get_str('day_comment')
541         make_type = self._form_data.get_str('make_type')
542         old_todos = self._form_data.get_all_int('todo_id')
543         new_todos = self._form_data.get_all_int('new_todo')
544         is_done = [t_id in self._form_data.get_all_int('done')
545                    for t_id in old_todos]
546         comments = self._form_data.get_all_str('comment')
547         efforts = [float(effort) if effort else None
548                    for effort in self._form_data.get_all_str('effort')]
549         if old_todos and 3*[len(old_todos)] != [len(is_done), len(comments),
550                                                 len(efforts)]:
551             msg = 'not equal number each of number of todo_id, comments, ' +\
552                     'and efforts inputs'
553             raise BadFormatException(msg)
554         day = Day.by_id_or_create(self.conn, date)
555         day.comment = day_comment
556         day.save(self.conn)
557         for process_id in sorted(new_todos):
558             if 'empty' == make_type:
559                 process = Process.by_id(self.conn, process_id)
560                 todo = Todo(None, process, False, date)
561                 todo.save(self.conn)
562             else:
563                 Todo.create_with_children(self.conn, process_id, date)
564         for i, todo_id in enumerate(old_todos):
565             todo = Todo.by_id(self.conn, todo_id)
566             todo.is_done = is_done[i]
567             todo.comment = comments[i]
568             todo.effort = efforts[i]
569             todo.save(self.conn)
570         return f'/day?date={date}&make_type={make_type}'
571
572     @_delete_or_post(Todo, '/')
573     def do_POST_todo(self, todo: Todo) -> str:
574         """Update Todo and its children."""
575         # pylint: disable=too-many-locals
576         adopted_child_ids = self._form_data.get_all_int('adopt')
577         processes_to_make_full = self._form_data.get_all_int('make_full')
578         processes_to_make_empty = self._form_data.get_all_int('make_empty')
579         fill_fors = self._form_data.get_first_strings_starting('fill_for_')
580         effort = self._form_data.get_str('effort', ignore_strict=True)
581         conditions = self._form_data.get_all_int('conditions')
582         disables = self._form_data.get_all_int('disables')
583         blockers = self._form_data.get_all_int('blockers')
584         enables = self._form_data.get_all_int('enables')
585         is_done = len(self._form_data.get_all_str('done')) > 0
586         calendarize = len(self._form_data.get_all_str('calendarize')) > 0
587         comment = self._form_data.get_str('comment', ignore_strict=True)
588         for v in fill_fors.values():
589             if v.startswith('make_empty_'):
590                 processes_to_make_empty += [int(v[11:])]
591             elif v.startswith('make_full_'):
592                 processes_to_make_full += [int(v[10:])]
593             elif v != 'ignore':
594                 adopted_child_ids += [int(v)]
595         to_remove = []
596         for child in todo.children:
597             assert isinstance(child.id_, int)
598             if child.id_ not in adopted_child_ids:
599                 to_remove += [child.id_]
600         for id_ in to_remove:
601             child = Todo.by_id(self.conn, id_)
602             todo.remove_child(child)
603         for child_id in adopted_child_ids:
604             if child_id in [c.id_ for c in todo.children]:
605                 continue
606             child = Todo.by_id(self.conn, child_id)
607             todo.add_child(child)
608         for process_id in processes_to_make_empty:
609             process = Process.by_id(self.conn, process_id)
610             made = Todo(None, process, False, todo.date)
611             made.save(self.conn)
612             todo.add_child(made)
613         for process_id in processes_to_make_full:
614             made = Todo.create_with_children(self.conn, process_id, todo.date)
615             todo.add_child(made)
616         todo.effort = float(effort) if effort else None
617         todo.set_conditions(self.conn, conditions)
618         todo.set_blockers(self.conn, blockers)
619         todo.set_enables(self.conn, enables)
620         todo.set_disables(self.conn, disables)
621         todo.is_done = is_done
622         todo.calendarize = calendarize
623         todo.comment = comment
624         todo.save(self.conn)
625         return f'/todo?id={todo.id_}'
626
627     def do_POST_process_descriptions(self) -> str:
628         """Update history timestamps for Process.description."""
629         return self._change_versioned_timestamps(Process, 'description')
630
631     def do_POST_process_efforts(self) -> str:
632         """Update history timestamps for Process.effort."""
633         return self._change_versioned_timestamps(Process, 'effort')
634
635     def do_POST_process_titles(self) -> str:
636         """Update history timestamps for Process.title."""
637         return self._change_versioned_timestamps(Process, 'title')
638
639     @_delete_or_post(Process, '/processes')
640     def do_POST_process(self, process: Process) -> str:
641         """Update or insert Process of ?id= and fields defined in postvars."""
642         # pylint: disable=too-many-locals
643         # pylint: disable=too-many-statements
644         title = self._form_data.get_str('title')
645         description = self._form_data.get_str('description')
646         effort = self._form_data.get_float('effort')
647         conditions = self._form_data.get_all_int('conditions')
648         blockers = self._form_data.get_all_int('blockers')
649         enables = self._form_data.get_all_int('enables')
650         disables = self._form_data.get_all_int('disables')
651         calendarize = self._form_data.get_all_str('calendarize') != []
652         suppresses = self._form_data.get_all_int('suppresses')
653         step_of = self._form_data.get_all_str('step_of')
654         keep_steps = self._form_data.get_all_int('keep_step')
655         step_ids = self._form_data.get_all_int('steps')
656         new_top_steps = self._form_data.get_all_str('new_top_step')
657         step_process_id_to = {}
658         step_parent_id_to = {}
659         new_steps_to = {}
660         for step_id in step_ids:
661             name = f'new_step_to_{step_id}'
662             new_steps_to[step_id] = self._form_data.get_all_int(name)
663         for step_id in keep_steps:
664             name = f'step_{step_id}_process_id'
665             step_process_id_to[step_id] = self._form_data.get_int(name)
666             name = f'step_{step_id}_parent_id'
667             step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
668         process.title.set(title)
669         process.description.set(description)
670         process.effort.set(effort)
671         process.set_conditions(self.conn, conditions)
672         process.set_blockers(self.conn, blockers)
673         process.set_enables(self.conn, enables)
674         process.set_disables(self.conn, disables)
675         process.calendarize = calendarize
676         process.save(self.conn)
677         assert isinstance(process.id_, int)
678         new_step_title = None
679         steps: list[ProcessStep] = []
680         for step_id in keep_steps:
681             if step_id not in step_ids:
682                 raise BadFormatException('trying to keep unknown step')
683             step = ProcessStep(step_id, process.id_,
684                                step_process_id_to[step_id],
685                                step_parent_id_to[step_id])
686             steps += [step]
687         for step_id in step_ids:
688             new = [ProcessStep(None, process.id_, step_process_id, step_id)
689                    for step_process_id in new_steps_to[step_id]]
690             steps += new
691         for step_identifier in new_top_steps:
692             try:
693                 step_process_id = int(step_identifier)
694                 step = ProcessStep(None, process.id_, step_process_id, None)
695                 steps += [step]
696             except ValueError:
697                 new_step_title = step_identifier
698         process.set_steps(self.conn, steps)
699         process.set_step_suppressions(self.conn, suppresses)
700         owners_to_set = []
701         new_owner_title = None
702         for owner_identifier in step_of:
703             try:
704                 owners_to_set += [int(owner_identifier)]
705             except ValueError:
706                 new_owner_title = owner_identifier
707         process.set_owners(self.conn, owners_to_set)
708         params = f'id={process.id_}'
709         if new_step_title:
710             title_b64_encoded = b64encode(new_step_title.encode()).decode()
711             params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
712         elif new_owner_title:
713             title_b64_encoded = b64encode(new_owner_title.encode()).decode()
714             params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
715         process.save(self.conn)
716         return f'/process?{params}'
717
718     def do_POST_condition_descriptions(self) -> str:
719         """Update history timestamps for Condition.description."""
720         return self._change_versioned_timestamps(Condition, 'description')
721
722     def do_POST_condition_titles(self) -> str:
723         """Update history timestamps for Condition.title."""
724         return self._change_versioned_timestamps(Condition, 'title')
725
726     @_delete_or_post(Condition, '/conditions')
727     def do_POST_condition(self, condition: Condition) -> str:
728         """Update/insert Condition of ?id= and fields defined in postvars."""
729         is_active = self._form_data.get_str('is_active') == 'True'
730         title = self._form_data.get_str('title')
731         description = self._form_data.get_str('description')
732         condition.is_active = is_active
733         condition.title.set(title)
734         condition.description.set(description)
735         condition.save(self.conn)
736         return f'/condition?id={condition.id_}'