home · contact · privacy
Refactor BaseModel sorting from GET handlers into class definitions.
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
15                                  NotFoundException)
16 from plomtask.db import DatabaseConnection, DatabaseFile
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo
20
21 TEMPLATES_DIR = 'templates'
22
23
24 class TaskServer(HTTPServer):
25     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
26
27     def __init__(self, db_file: DatabaseFile,
28                  *args: Any, **kwargs: Any) -> None:
29         super().__init__(*args, **kwargs)
30         self.db = db_file
31         self.headers: list[tuple[str, str]] = []
32         self._render_mode = 'html'
33         self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
34
35     def set_json_mode(self) -> None:
36         """Make server send JSON instead of HTML responses."""
37         self._render_mode = 'json'
38         self.headers += [('Content-Type', 'application/json')]
39
40     @staticmethod
41     def ctx_to_json(ctx: dict[str, object]) -> str:
42         """Render ctx into JSON string."""
43         def walk_ctx(node: object) -> Any:
44             if hasattr(node, 'as_dict_into_reference'):
45                 if hasattr(node, 'id_') and node.id_ is not None:
46                     return node.as_dict_into_reference(library)
47             if hasattr(node, 'as_dict'):
48                 return node.as_dict
49             if isinstance(node, (list, tuple)):
50                 return [walk_ctx(x) for x in node]
51             if isinstance(node, dict):
52                 d = {}
53                 for k, v in node.items():
54                     d[k] = walk_ctx(v)
55                 return d
56             if isinstance(node, HandledException):
57                 return str(node)
58             return node
59         library: dict[str, dict[str | int, object]] = {}
60         for k, v in ctx.items():
61             ctx[k] = walk_ctx(v)
62         ctx['_library'] = library
63         return json_dumps(ctx)
64
65     def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
66         """Render ctx according to self._render_mode.."""
67         tmpl_name = f'{tmpl_name}.{self._render_mode}'
68         if 'html' == self._render_mode:
69             template = self._jinja.get_template(tmpl_name)
70             return template.render(ctx)
71         return self.__class__.ctx_to_json(ctx)
72
73
74 class InputsParser:
75     """Wrapper for validating and retrieving dict-like HTTP inputs."""
76
77     def __init__(self, dict_: dict[str, list[str]],
78                  strictness: bool = True) -> None:
79         self.inputs = dict_
80         self.strict = strictness
81
82     def get_str(self, key: str, default: str = '',
83                 ignore_strict: bool = False) -> str:
84         """Retrieve single/first string value of key, or default."""
85         if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
86             if self.strict and not ignore_strict:
87                 raise BadFormatException(f'no value found for key {key}')
88             return default
89         return self.inputs[key][0]
90
91     def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
92         """Retrieve dict of (first) strings at key starting with prefix."""
93         ret = {}
94         for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
95             ret[key] = self.inputs[key][0]
96         return ret
97
98     def get_int(self, key: str) -> int:
99         """Retrieve single/first value of key as int, error if empty."""
100         val = self.get_int_or_none(key)
101         if val is None:
102             raise BadFormatException(f'unexpected empty value for: {key}')
103         return val
104
105     def get_int_or_none(self, key: str) -> int | None:
106         """Retrieve single/first value of key as int, return None if empty."""
107         val = self.get_str(key, ignore_strict=True)
108         if val == '':
109             return None
110         try:
111             return int(val)
112         except ValueError as e:
113             msg = f'cannot int form field value for key {key}: {val}'
114             raise BadFormatException(msg) from e
115
116     def get_float(self, key: str) -> float:
117         """Retrieve float value of key from self.postvars."""
118         val = self.get_str(key)
119         try:
120             return float(val)
121         except ValueError as e:
122             msg = f'cannot float form field value for key {key}: {val}'
123             raise BadFormatException(msg) from e
124
125     def get_all_str(self, key: str) -> list[str]:
126         """Retrieve list of string values at key."""
127         if key not in self.inputs.keys():
128             return []
129         return self.inputs[key]
130
131     def get_all_int(self, key: str) -> list[int]:
132         """Retrieve list of int values at key."""
133         all_str = self.get_all_str(key)
134         try:
135             return [int(s) for s in all_str if len(s) > 0]
136         except ValueError as e:
137             msg = f'cannot int a form field value for key {key} in: {all_str}'
138             raise BadFormatException(msg) from e
139
140     def get_all_floats_or_nones(self, key: str) -> list[float | None]:
141         """Retrieve list of float value at key, None if empty strings."""
142         ret: list[float | None] = []
143         for val in self.get_all_str(key):
144             if '' == val:
145                 ret += [None]
146             else:
147                 try:
148                     ret += [float(val)]
149                 except ValueError as e:
150                     msg = f'cannot float form field value for key {key}: {val}'
151                     raise BadFormatException(msg) from e
152         return ret
153
154
155 class TaskHandler(BaseHTTPRequestHandler):
156     """Handles single HTTP request."""
157     # pylint: disable=too-many-public-methods
158     server: TaskServer
159     conn: DatabaseConnection
160     _site: str
161     _form_data: InputsParser
162     _params: InputsParser
163
164     def _send_page(self,
165                    ctx: dict[str, Any],
166                    tmpl_name: str,
167                    code: int = 200
168                    ) -> None:
169         """Send ctx as proper HTTP response."""
170         body = self.server.render(ctx, tmpl_name)
171         self.send_response(code)
172         for header_tuple in self.server.headers:
173             self.send_header(*header_tuple)
174         self.end_headers()
175         self.wfile.write(bytes(body, 'utf-8'))
176
177     @staticmethod
178     def _request_wrapper(http_method: str, not_found_msg: str
179                          ) -> Callable[..., Callable[[TaskHandler], None]]:
180         def decorator(f: Callable[..., str | None]
181                       ) -> Callable[[TaskHandler], None]:
182             def wrapper(self: TaskHandler) -> None:
183                 # pylint: disable=protected-access
184                 # (because pylint here fails to detect the use of wrapper as a
185                 # method to self with respective access privileges)
186                 try:
187                     self.conn = DatabaseConnection(self.server.db)
188                     parsed_url = urlparse(self.path)
189                     self._site = path_split(parsed_url.path)[1]
190                     params = parse_qs(parsed_url.query, strict_parsing=True)
191                     self._params = InputsParser(params, False)
192                     handler_name = f'do_{http_method}_{self._site}'
193                     if hasattr(self, handler_name):
194                         handler = getattr(self, handler_name)
195                         redir_target = f(self, handler)
196                         if redir_target:
197                             self.send_response(302)
198                             self.send_header('Location', redir_target)
199                             self.end_headers()
200                     else:
201                         msg = f'{not_found_msg}: {self._site}'
202                         raise NotFoundException(msg)
203                 except HandledException as error:
204                     for cls in (Day, Todo, Condition, Process, ProcessStep):
205                         assert hasattr(cls, 'empty_cache')
206                         cls.empty_cache()
207                     ctx = {'msg': error}
208                     self._send_page(ctx, 'msg', error.http_code)
209                 finally:
210                     self.conn.close()
211             return wrapper
212         return decorator
213
214     @_request_wrapper('GET', 'Unknown page')
215     def do_GET(self, handler: Callable[[], str | dict[str, object]]
216                ) -> str | None:
217         """Render page with result of handler, or redirect if result is str."""
218         tmpl_name = f'{self._site}'
219         ctx_or_redir_target = handler()
220         if isinstance(ctx_or_redir_target, str):
221             return ctx_or_redir_target
222         self._send_page(ctx_or_redir_target, tmpl_name)
223         return None
224
225     @_request_wrapper('POST', 'Unknown POST target')
226     def do_POST(self, handler: Callable[[], str]) -> str:
227         """Handle POST with handler, prepare redirection to result."""
228         length = int(self.headers['content-length'])
229         postvars = parse_qs(self.rfile.read(length).decode(),
230                             keep_blank_values=True, strict_parsing=True)
231         self._form_data = InputsParser(postvars)
232         redir_target = handler()
233         self.conn.commit()
234         return redir_target
235
236     # GET handlers
237
238     @staticmethod
239     def _get_item(target_class: Any
240                   ) -> Callable[..., Callable[[TaskHandler],
241                                               dict[str, object]]]:
242         def decorator(f: Callable[..., dict[str, object]]
243                       ) -> Callable[[TaskHandler], dict[str, object]]:
244             def wrapper(self: TaskHandler) -> dict[str, object]:
245                 # pylint: disable=protected-access
246                 # (because pylint here fails to detect the use of wrapper as a
247                 # method to self with respective access privileges)
248                 id_ = self._params.get_int_or_none('id')
249                 if target_class.can_create_by_id:
250                     item = target_class.by_id_or_create(self.conn, id_)
251                 else:
252                     item = target_class.by_id(self.conn, id_)
253                 return f(self, item)
254             return wrapper
255         return decorator
256
257     def do_GET_(self) -> str:
258         """Return redirect target on GET /."""
259         return '/day'
260
261     def _do_GET_calendar(self) -> dict[str, object]:
262         """Show Days from ?start= to ?end=.
263
264         Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
265         same, the only difference being the HTML template they are rendered to,
266         which .do_GET selects from their method name.
267         """
268         start = self._params.get_str('start')
269         end = self._params.get_str('end')
270         if not end:
271             end = date_in_n_days(366)
272         ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
273         days, start, end = ret
274         days = Day.with_filled_gaps(days, start, end)
275         today = date_in_n_days(0)
276         return {'start': start, 'end': end, 'days': days, 'today': today}
277
278     def do_GET_calendar(self) -> dict[str, object]:
279         """Show Days from ?start= to ?end= – normal view."""
280         return self._do_GET_calendar()
281
282     def do_GET_calendar_txt(self) -> dict[str, object]:
283         """Show Days from ?start= to ?end= – minimalist view."""
284         return self._do_GET_calendar()
285
286     def do_GET_day(self) -> dict[str, object]:
287         """Show single Day of ?date=."""
288         date = self._params.get_str('date', date_in_n_days(0))
289         day = Day.by_id_or_create(self.conn, date)
290         make_type = self._params.get_str('make_type')
291         conditions_present = []
292         enablers_for = {}
293         disablers_for = {}
294         for todo in day.todos:
295             for condition in todo.conditions + todo.blockers:
296                 if condition not in conditions_present:
297                     conditions_present += [condition]
298                     enablers_for[condition.id_] = [p for p in
299                                                    Process.all(self.conn)
300                                                    if condition in p.enables]
301                     disablers_for[condition.id_] = [p for p in
302                                                     Process.all(self.conn)
303                                                     if condition in p.disables]
304         seen_todos: set[int] = set()
305         top_nodes = [t.get_step_tree(seen_todos)
306                      for t in day.todos if not t.parents]
307         return {'day': day,
308                 'top_nodes': top_nodes,
309                 'make_type': make_type,
310                 'enablers_for': enablers_for,
311                 'disablers_for': disablers_for,
312                 'conditions_present': conditions_present,
313                 'processes': Process.all(self.conn)}
314
315     @_get_item(Todo)
316     def do_GET_todo(self, todo: Todo) -> dict[str, object]:
317         """Show single Todo of ?id=."""
318
319         @dataclass
320         class TodoStepsNode:
321             """Collect what's useful for Todo steps tree display."""
322             id_: int
323             todo: Todo | None
324             process: Process | None
325             children: list[TodoStepsNode]  # pylint: disable=undefined-variable
326             fillable: bool = False
327
328         def walk_process_steps(id_: int,
329                                process_step_nodes: list[ProcessStepsNode],
330                                steps_nodes: list[TodoStepsNode]) -> None:
331             for process_step_node in process_step_nodes:
332                 id_ += 1
333                 node = TodoStepsNode(id_, None, process_step_node.process, [])
334                 steps_nodes += [node]
335                 walk_process_steps(id_, list(process_step_node.steps.values()),
336                                    node.children)
337
338         def walk_todo_steps(id_: int, todos: list[Todo],
339                             steps_nodes: list[TodoStepsNode]) -> None:
340             for todo in todos:
341                 matched = False
342                 for match in [item for item in steps_nodes
343                               if item.process
344                               and item.process == todo.process]:
345                     match.todo = todo
346                     matched = True
347                     for child in match.children:
348                         child.fillable = True
349                     walk_todo_steps(id_, todo.children, match.children)
350                 if not matched:
351                     id_ += 1
352                     node = TodoStepsNode(id_, todo, None, [])
353                     steps_nodes += [node]
354                     walk_todo_steps(id_, todo.children, node.children)
355
356         def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
357                                     ) -> set[int]:
358             ids = set()
359             for node in steps_nodes:
360                 if not node.todo:
361                     assert isinstance(node.process, Process)
362                     assert isinstance(node.process.id_, int)
363                     ids.add(node.process.id_)
364                 ids = ids | collect_adoptables_keys(node.children)
365             return ids
366
367         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
368         process_tree = todo.process.get_steps(self.conn, None)
369         steps_todo_to_process: list[TodoStepsNode] = []
370         walk_process_steps(0, list(process_tree.values()),
371                            steps_todo_to_process)
372         for steps_node in steps_todo_to_process:
373             steps_node.fillable = True
374         walk_todo_steps(len(steps_todo_to_process), todo_steps,
375                         steps_todo_to_process)
376         adoptables: dict[int, list[Todo]] = {}
377         any_adoptables = [Todo.by_id(self.conn, t.id_)
378                           for t in Todo.by_date(self.conn, todo.date)
379                           if t.id_ is not None
380                           and t != todo]
381         for id_ in collect_adoptables_keys(steps_todo_to_process):
382             adoptables[id_] = [t for t in any_adoptables
383                                if t.process.id_ == id_]
384         return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
385                 'adoption_candidates_for': adoptables,
386                 'process_candidates': Process.all(self.conn),
387                 'todo_candidates': any_adoptables,
388                 'condition_candidates': Condition.all(self.conn)}
389
390     def do_GET_todos(self) -> dict[str, object]:
391         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
392         sort_by = self._params.get_str('sort_by')
393         start = self._params.get_str('start')
394         end = self._params.get_str('end')
395         process_id = self._params.get_int_or_none('process_id')
396         comment_pattern = self._params.get_str('comment_pattern')
397         todos = []
398         ret = Todo.by_date_range_with_limits(self.conn, (start, end))
399         todos_by_date_range, start, end = ret
400         todos = [t for t in todos_by_date_range
401                  if comment_pattern in t.comment
402                  and ((not process_id) or t.process.id_ == process_id)]
403         sort_by = Todo.sort_by(todos, sort_by)
404         return {'start': start, 'end': end, 'process_id': process_id,
405                 'comment_pattern': comment_pattern, 'todos': todos,
406                 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
407
408     def do_GET_conditions(self) -> dict[str, object]:
409         """Show all Conditions."""
410         pattern = self._params.get_str('pattern')
411         sort_by = self._params.get_str('sort_by')
412         conditions = Condition.matching(self.conn, pattern)
413         sort_by = Condition.sort_by(conditions, sort_by)
414         return {'conditions': conditions,
415                 'sort_by': sort_by,
416                 'pattern': pattern}
417
418     @_get_item(Condition)
419     def do_GET_condition(self, c: Condition) -> dict[str, object]:
420         """Show Condition of ?id=."""
421         ps = Process.all(self.conn)
422         return {'condition': c, 'is_new': c.id_ is None,
423                 'enabled_processes': [p for p in ps if c in p.conditions],
424                 'disabled_processes': [p for p in ps if c in p.blockers],
425                 'enabling_processes': [p for p in ps if c in p.enables],
426                 'disabling_processes': [p for p in ps if c in p.disables]}
427
428     @_get_item(Condition)
429     def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
430         """Show title history of Condition of ?id=."""
431         return {'condition': c}
432
433     @_get_item(Condition)
434     def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
435         """Show description historys of Condition of ?id=."""
436         return {'condition': c}
437
438     @_get_item(Process)
439     def do_GET_process(self, process: Process) -> dict[str, object]:
440         """Show Process of ?id=."""
441         owner_ids = self._params.get_all_int('step_to')
442         owned_ids = self._params.get_all_int('has_step')
443         title_64 = self._params.get_str('title_b64')
444         if title_64:
445             title = b64decode(title_64.encode()).decode()
446             process.title.set(title)
447         owners = process.used_as_step_by(self.conn)
448         for step_id in owner_ids:
449             owners += [Process.by_id(self.conn, step_id)]
450         preset_top_step = None
451         for process_id in owned_ids:
452             preset_top_step = process_id
453         return {'process': process, 'is_new': process.id_ is None,
454                 'preset_top_step': preset_top_step,
455                 'steps': process.get_steps(self.conn), 'owners': owners,
456                 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
457                 'process_candidates': Process.all(self.conn),
458                 'condition_candidates': Condition.all(self.conn)}
459
460     @_get_item(Process)
461     def do_GET_process_titles(self, p: Process) -> dict[str, object]:
462         """Show title history of Process of ?id=."""
463         return {'process': p}
464
465     @_get_item(Process)
466     def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
467         """Show description historys of Process of ?id=."""
468         return {'process': p}
469
470     @_get_item(Process)
471     def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
472         """Show default effort history of Process of ?id=."""
473         return {'process': p}
474
475     def do_GET_processes(self) -> dict[str, object]:
476         """Show all Processes."""
477         pattern = self._params.get_str('pattern')
478         sort_by = self._params.get_str('sort_by')
479         processes = Process.matching(self.conn, pattern)
480         sort_by = Process.sort_by(processes, sort_by)
481         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
482
483     # POST handlers
484
485     @staticmethod
486     def _delete_or_post(target_class: Any, redir_target: str = '/'
487                         ) -> Callable[..., Callable[[TaskHandler], str]]:
488         def decorator(f: Callable[..., str]
489                       ) -> Callable[[TaskHandler], str]:
490             def wrapper(self: TaskHandler) -> str:
491                 # pylint: disable=protected-access
492                 # (because pylint here fails to detect the use of wrapper as a
493                 # method to self with respective access privileges)
494                 id_ = self._params.get_int_or_none('id')
495                 for _ in self._form_data.get_all_str('delete'):
496                     if id_ is None:
497                         msg = 'trying to delete non-saved ' +\
498                                 f'{target_class.__name__}'
499                         raise NotFoundException(msg)
500                     item = target_class.by_id(self.conn, id_)
501                     item.remove(self.conn)
502                     return redir_target
503                 if target_class.can_create_by_id:
504                     item = target_class.by_id_or_create(self.conn, id_)
505                 else:
506                     item = target_class.by_id(self.conn, id_)
507                 return f(self, item)
508             return wrapper
509         return decorator
510
511     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
512         """Update history timestamps for VersionedAttribute."""
513         id_ = self._params.get_int_or_none('id')
514         item = cls.by_id(self.conn, id_)
515         attr = getattr(item, attr_name)
516         for k, v in self._form_data.get_first_strings_starting('at:').items():
517             old = k[3:]
518             if old[19:] != v:
519                 attr.reset_timestamp(old, f'{v}.0')
520         attr.save(self.conn)
521         return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
522
523     def do_POST_day(self) -> str:
524         """Update or insert Day of date and Todos mapped to it."""
525         # pylint: disable=too-many-locals
526         date = self._params.get_str('date')
527         day_comment = self._form_data.get_str('day_comment')
528         make_type = self._form_data.get_str('make_type')
529         old_todos = self._form_data.get_all_int('todo_id')
530         new_todos = self._form_data.get_all_int('new_todo')
531         comments = self._form_data.get_all_str('comment')
532         efforts = self._form_data.get_all_floats_or_nones('effort')
533         done_todos = self._form_data.get_all_int('done')
534         for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
535             raise BadFormatException('"done" field refers to unknown Todo')
536         is_done = [t_id in done_todos for t_id in old_todos]
537         if not (len(old_todos) == len(is_done) == len(comments)
538                 == len(efforts)):
539             msg = 'not equal number each of number of todo_id, comments, ' +\
540                     'and efforts inputs'
541             raise BadFormatException(msg)
542         day = Day.by_id_or_create(self.conn, date)
543         day.comment = day_comment
544         day.save(self.conn)
545         for process_id in sorted(new_todos):
546             if 'empty' == make_type:
547                 process = Process.by_id(self.conn, process_id)
548                 todo = Todo(None, process, False, date)
549                 todo.save(self.conn)
550             else:
551                 Todo.create_with_children(self.conn, process_id, date)
552         for i, todo_id in enumerate(old_todos):
553             todo = Todo.by_id(self.conn, todo_id)
554             todo.is_done = is_done[i]
555             todo.comment = comments[i]
556             todo.effort = efforts[i]
557             todo.save(self.conn)
558         return f'/day?date={date}&make_type={make_type}'
559
560     @_delete_or_post(Todo, '/')
561     def do_POST_todo(self, todo: Todo) -> str:
562         """Update Todo and its children."""
563         # pylint: disable=too-many-locals
564         adopted_child_ids = self._form_data.get_all_int('adopt')
565         processes_to_make_full = self._form_data.get_all_int('make_full')
566         processes_to_make_empty = self._form_data.get_all_int('make_empty')
567         fill_fors = self._form_data.get_first_strings_starting('fill_for_')
568         effort = self._form_data.get_str('effort', ignore_strict=True)
569         conditions = self._form_data.get_all_int('conditions')
570         disables = self._form_data.get_all_int('disables')
571         blockers = self._form_data.get_all_int('blockers')
572         enables = self._form_data.get_all_int('enables')
573         is_done = len(self._form_data.get_all_str('done')) > 0
574         calendarize = len(self._form_data.get_all_str('calendarize')) > 0
575         comment = self._form_data.get_str('comment', ignore_strict=True)
576         for v in fill_fors.values():
577             if v.startswith('make_empty_'):
578                 processes_to_make_empty += [int(v[11:])]
579             elif v.startswith('make_full_'):
580                 processes_to_make_full += [int(v[10:])]
581             elif v != 'ignore':
582                 adopted_child_ids += [int(v)]
583         to_remove = []
584         for child in todo.children:
585             assert isinstance(child.id_, int)
586             if child.id_ not in adopted_child_ids:
587                 to_remove += [child.id_]
588         for id_ in to_remove:
589             child = Todo.by_id(self.conn, id_)
590             todo.remove_child(child)
591         for child_id in adopted_child_ids:
592             if child_id in [c.id_ for c in todo.children]:
593                 continue
594             child = Todo.by_id(self.conn, child_id)
595             todo.add_child(child)
596         for process_id in processes_to_make_empty:
597             process = Process.by_id(self.conn, process_id)
598             made = Todo(None, process, False, todo.date)
599             made.save(self.conn)
600             todo.add_child(made)
601         for process_id in processes_to_make_full:
602             made = Todo.create_with_children(self.conn, process_id, todo.date)
603             todo.add_child(made)
604         todo.effort = float(effort) if effort else None
605         todo.set_conditions(self.conn, conditions)
606         todo.set_blockers(self.conn, blockers)
607         todo.set_enables(self.conn, enables)
608         todo.set_disables(self.conn, disables)
609         todo.is_done = is_done
610         todo.calendarize = calendarize
611         todo.comment = comment
612         todo.save(self.conn)
613         return f'/todo?id={todo.id_}'
614
615     def do_POST_process_descriptions(self) -> str:
616         """Update history timestamps for Process.description."""
617         return self._change_versioned_timestamps(Process, 'description')
618
619     def do_POST_process_efforts(self) -> str:
620         """Update history timestamps for Process.effort."""
621         return self._change_versioned_timestamps(Process, 'effort')
622
623     def do_POST_process_titles(self) -> str:
624         """Update history timestamps for Process.title."""
625         return self._change_versioned_timestamps(Process, 'title')
626
627     @_delete_or_post(Process, '/processes')
628     def do_POST_process(self, process: Process) -> str:
629         """Update or insert Process of ?id= and fields defined in postvars."""
630         # pylint: disable=too-many-locals
631         # pylint: disable=too-many-statements
632         title = self._form_data.get_str('title')
633         description = self._form_data.get_str('description')
634         effort = self._form_data.get_float('effort')
635         conditions = self._form_data.get_all_int('conditions')
636         blockers = self._form_data.get_all_int('blockers')
637         enables = self._form_data.get_all_int('enables')
638         disables = self._form_data.get_all_int('disables')
639         calendarize = self._form_data.get_all_str('calendarize') != []
640         suppresses = self._form_data.get_all_int('suppresses')
641         step_of = self._form_data.get_all_str('step_of')
642         keep_steps = self._form_data.get_all_int('keep_step')
643         step_ids = self._form_data.get_all_int('steps')
644         new_top_steps = self._form_data.get_all_str('new_top_step')
645         step_process_id_to = {}
646         step_parent_id_to = {}
647         new_steps_to = {}
648         for step_id in step_ids:
649             name = f'new_step_to_{step_id}'
650             new_steps_to[step_id] = self._form_data.get_all_int(name)
651         for step_id in keep_steps:
652             name = f'step_{step_id}_process_id'
653             step_process_id_to[step_id] = self._form_data.get_int(name)
654             name = f'step_{step_id}_parent_id'
655             step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
656         process.title.set(title)
657         process.description.set(description)
658         process.effort.set(effort)
659         process.set_conditions(self.conn, conditions)
660         process.set_blockers(self.conn, blockers)
661         process.set_enables(self.conn, enables)
662         process.set_disables(self.conn, disables)
663         process.calendarize = calendarize
664         process.save(self.conn)
665         assert isinstance(process.id_, int)
666         new_step_title = None
667         steps: list[ProcessStep] = []
668         for step_id in keep_steps:
669             if step_id not in step_ids:
670                 raise BadFormatException('trying to keep unknown step')
671             step = ProcessStep(step_id, process.id_,
672                                step_process_id_to[step_id],
673                                step_parent_id_to[step_id])
674             steps += [step]
675         for step_id in step_ids:
676             new = [ProcessStep(None, process.id_, step_process_id, step_id)
677                    for step_process_id in new_steps_to[step_id]]
678             steps += new
679         for step_identifier in new_top_steps:
680             try:
681                 step_process_id = int(step_identifier)
682                 step = ProcessStep(None, process.id_, step_process_id, None)
683                 steps += [step]
684             except ValueError:
685                 new_step_title = step_identifier
686         process.set_steps(self.conn, steps)
687         process.set_step_suppressions(self.conn, suppresses)
688         owners_to_set = []
689         new_owner_title = None
690         for owner_identifier in step_of:
691             try:
692                 owners_to_set += [int(owner_identifier)]
693             except ValueError:
694                 new_owner_title = owner_identifier
695         process.set_owners(self.conn, owners_to_set)
696         params = f'id={process.id_}'
697         if new_step_title:
698             title_b64_encoded = b64encode(new_step_title.encode()).decode()
699             params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
700         elif new_owner_title:
701             title_b64_encoded = b64encode(new_owner_title.encode()).decode()
702             params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
703         process.save(self.conn)
704         return f'/process?{params}'
705
706     def do_POST_condition_descriptions(self) -> str:
707         """Update history timestamps for Condition.description."""
708         return self._change_versioned_timestamps(Condition, 'description')
709
710     def do_POST_condition_titles(self) -> str:
711         """Update history timestamps for Condition.title."""
712         return self._change_versioned_timestamps(Condition, 'title')
713
714     @_delete_or_post(Condition, '/conditions')
715     def do_POST_condition(self, condition: Condition) -> str:
716         """Update/insert Condition of ?id= and fields defined in postvars."""
717         is_active = self._form_data.get_str('is_active') == 'True'
718         title = self._form_data.get_str('title')
719         description = self._form_data.get_str('description')
720         condition.is_active = is_active
721         condition.title.set(title)
722         condition.description.set(description)
723         condition.save(self.conn)
724         return f'/condition?id={condition.id_}'