home · contact · privacy
Minor tests refactoring.
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from __future__ import annotations
3 from typing import Any, Callable
4 from base64 import b64encode, b64decode
5 from binascii import Error as binascii_Exception
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
15                                  NotFoundException)
16 from plomtask.db import DatabaseConnection, DatabaseFile, BaseModel
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo, TodoOrProcStepNode
20 from plomtask.misc import DictableNode
21
22 TEMPLATES_DIR = 'templates'
23
24
25 class TaskServer(HTTPServer):
26     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
27
28     def __init__(self, db_file: DatabaseFile,
29                  *args: Any, **kwargs: Any) -> None:
30         super().__init__(*args, **kwargs)
31         self.db = db_file
32         self.render_mode = 'html'
33         self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
34
35
36 class InputsParser:
37     """Wrapper for validating and retrieving dict-like HTTP inputs."""
38
39     def __init__(self, dict_: dict[str, list[str]]) -> None:
40         self.inputs = dict_
41
42     def get_all_str(self, key: str) -> list[str]:
43         """Retrieve list of string values at key (empty if no key)."""
44         if key not in self.inputs.keys():
45             return []
46         return self.inputs[key]
47
48     def get_all_int(self, key: str) -> list[int]:
49         """Retrieve list of int values at key."""
50         all_str = self.get_all_str(key)
51         try:
52             return [int(s) for s in all_str if len(s) > 0]
53         except ValueError as e:
54             msg = f'cannot int a form field value for key {key} in: {all_str}'
55             raise BadFormatException(msg) from e
56
57     def get_str(self, key: str, default: str | None = None) -> str | None:
58         """Retrieve single/first string value of key, or default."""
59         vals = self.get_all_str(key)
60         if vals:
61             return vals[0]
62         return default
63
64     def get_str_or_fail(self, key: str, default: str | None = None) -> str:
65         """Retrieve first string value of key, if none: fail or default."""
66         vals = self.get_all_str(key)
67         if not vals:
68             if default is not None:
69                 return default
70             raise BadFormatException(f'no value found for key: {key}')
71         return vals[0]
72
73     def get_int_or_none(self, key: str) -> int | None:
74         """Retrieve single/first value of key as int, return None if empty."""
75         val = self.get_str_or_fail(key, '')
76         if val == '':
77             return None
78         try:
79             return int(val)
80         except (ValueError, TypeError) as e:
81             msg = f'cannot int form field value for key {key}: {val}'
82             raise BadFormatException(msg) from e
83
84     def get_bool_or_none(self, key: str) -> bool | None:
85         """Return value to key if truish; if no value to key, None."""
86         val = self.get_str(key)
87         if val is None:
88             return None
89         return val in {'True', 'true', '1', 'on'}
90
91     def get_firsts_of_key_prefixed(self, prefix: str) -> dict[str, str]:
92         """Retrieve dict of (first) strings at key starting with prefix."""
93         ret = {}
94         for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
95             ret[key] = self.inputs[key][0]
96         return ret
97
98     def get_float_or_fail(self, key: str) -> float:
99         """Retrieve float value of key from self.postvars, fail if none."""
100         val = self.get_str_or_fail(key)
101         try:
102             return float(val)
103         except ValueError as e:
104             msg = f'cannot float form field value for key {key}: {val}'
105             raise BadFormatException(msg) from e
106
107     def get_all_floats_or_nones(self, key: str) -> list[float | None]:
108         """Retrieve list of float value at key, None if empty strings."""
109         ret: list[float | None] = []
110         for val in self.get_all_str(key):
111             if '' == val:
112                 ret += [None]
113             else:
114                 try:
115                     ret += [float(val)]
116                 except ValueError as e:
117                     msg = f'cannot float form field value for key {key}: {val}'
118                     raise BadFormatException(msg) from e
119         return ret
120
121
122 class TaskHandler(BaseHTTPRequestHandler):
123     """Handles single HTTP request."""
124     # pylint: disable=too-many-public-methods
125     server: TaskServer
126     conn: DatabaseConnection
127     _site: str
128     _form: InputsParser
129     _params: InputsParser
130
131     def _send_page(
132             self, ctx: dict[str, Any], tmpl_name: str, code: int = 200
133             ) -> None:
134         """HTTP-send ctx as HTML or JSON, as defined by .server.render_mode.
135
136         The differentiation by .server.render_mode serves to allow easily
137         comparable JSON responses for automatic testing.
138         """
139         body: str
140         headers: list[tuple[str, str]] = []
141         if 'html' == self.server.render_mode:
142             tmpl = self.server.jinja.get_template(f'{tmpl_name}.html')
143             body = tmpl.render(ctx)
144         else:
145             body = self._ctx_to_json(ctx)
146             headers += [('Content-Type', 'application/json')]
147         self.send_response(code)
148         for header_tuple in headers:
149             self.send_header(*header_tuple)
150         self.end_headers()
151         self.wfile.write(bytes(body, 'utf-8'))
152
153     def _ctx_to_json(self, ctx: dict[str, object]) -> str:
154         """Render ctx into JSON string.
155
156         Flattens any objects that json.dumps might not want to serialize, and
157         turns occurrences of BaseModel objects into listings of their .id_, to
158         be resolved to a full dict inside a top-level '_library' dictionary,
159         to avoid endless and circular nesting.
160         """
161
162         def flatten(node: object) -> object:
163
164             def update_library_with(
165                     item: BaseModel[int] | BaseModel[str]) -> None:
166                 cls_name = item.__class__.__name__
167                 if cls_name not in library:
168                     library[cls_name] = {}
169                 if item.id_ not in library[cls_name]:
170                     d, refs = item.as_dict_and_refs
171                     id_key = '?' if item.id_ is None else item.id_
172                     library[cls_name][id_key] = d
173                     for ref in refs:
174                         update_library_with(ref)
175
176             if isinstance(node, BaseModel):
177                 update_library_with(node)
178                 return node.id_
179             if isinstance(node, DictableNode):
180                 d, refs = node.as_dict_and_refs
181                 for ref in refs:
182                     update_library_with(ref)
183                 return d
184             if isinstance(node, (list, tuple)):
185                 return [flatten(item) for item in node]
186             if isinstance(node, dict):
187                 d = {}
188                 for k, v in node.items():
189                     d[k] = flatten(v)
190                 return d
191             if isinstance(node, HandledException):
192                 return str(node)
193             return node
194
195         library: dict[str, dict[str | int, object]] = {}
196         for k, v in ctx.items():
197             ctx[k] = flatten(v)
198         ctx['_library'] = library
199         return json_dumps(ctx)
200
201     @staticmethod
202     def _request_wrapper(http_method: str, not_found_msg: str
203                          ) -> Callable[..., Callable[[TaskHandler], None]]:
204         """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
205
206         Among other things, conditionally cleans all caches, but only on POST
207         requests, as only those are expected to change the states of objects
208         that may be cached, and certainly only those are expected to write any
209         changes to the database. We want to call them as early though as
210         possible here, either exactly after the specific request handler
211         returns successfully, or right after any exception is triggered –
212         otherwise, race conditions become plausible.
213
214         Note that otherwise any POST attempt, even a failed one, may end in
215         problematic inconsistencies:
216
217         - if the POST handler experiences an Exception, changes to objects
218           won't get written to the DB, but the changed objects may remain in
219           the cache and affect other objects despite their possibly illegal
220           state
221
222         - even if an object was just saved to the DB, we cannot be sure its
223           current state is completely identical to what we'd get if loading it
224           fresh from the DB (e.g. currently Process.n_owners is only updated
225           when loaded anew via .from_table_row, nor is its state written to
226           the DB by .save; a questionable design choice, but proof that we
227           have no guarantee that objects' .save stores all their states we'd
228           prefer at their most up-to-date.
229         """
230
231         def clear_caches() -> None:
232             for cls in (Day, Todo, Condition, Process, ProcessStep):
233                 assert hasattr(cls, 'empty_cache')
234                 cls.empty_cache()
235
236         def decorator(f: Callable[..., str | None]
237                       ) -> Callable[[TaskHandler], None]:
238             def wrapper(self: TaskHandler) -> None:
239                 # pylint: disable=protected-access
240                 # (because pylint here fails to detect the use of wrapper as a
241                 # method to self with respective access privileges)
242                 try:
243                     self.conn = DatabaseConnection(self.server.db)
244                     parsed_url = urlparse(self.path)
245                     self._site = path_split(parsed_url.path)[1]
246                     params = parse_qs(parsed_url.query,
247                                       keep_blank_values=True,
248                                       strict_parsing=True)
249                     self._params = InputsParser(params)
250                     handler_name = f'do_{http_method}_{self._site}'
251                     if hasattr(self, handler_name):
252                         handler = getattr(self, handler_name)
253                         redir_target = f(self, handler)
254                         if 'POST' == http_method:
255                             clear_caches()
256                         if redir_target:
257                             self.send_response(302)
258                             self.send_header('Location', redir_target)
259                             self.end_headers()
260                     else:
261                         msg = f'{not_found_msg}: {self._site}'
262                         raise NotFoundException(msg)
263                 except HandledException as error:
264                     if 'POST' == http_method:
265                         clear_caches()
266                     ctx = {'msg': error}
267                     self._send_page(ctx, 'msg', error.http_code)
268                 finally:
269                     self.conn.close()
270             return wrapper
271         return decorator
272
273     @_request_wrapper('GET', 'Unknown page')
274     def do_GET(self, handler: Callable[[], str | dict[str, object]]
275                ) -> str | None:
276         """Render page with result of handler, or redirect if result is str."""
277         tmpl_name = f'{self._site}'
278         ctx_or_redir_target = handler()
279         if isinstance(ctx_or_redir_target, str):
280             return ctx_or_redir_target
281         self._send_page(ctx_or_redir_target, tmpl_name)
282         return None
283
284     @_request_wrapper('POST', 'Unknown POST target')
285     def do_POST(self, handler: Callable[[], str]) -> str:
286         """Handle POST with handler, prepare redirection to result."""
287         length = int(self.headers['content-length'])
288         postvars = parse_qs(self.rfile.read(length).decode(),
289                             keep_blank_values=True)
290         self._form = InputsParser(postvars)
291         redir_target = handler()
292         self.conn.commit()
293         return redir_target
294
295     # GET handlers
296
297     @staticmethod
298     def _get_item(target_class: Any
299                   ) -> Callable[..., Callable[[TaskHandler],
300                                               dict[str, object]]]:
301         def decorator(f: Callable[..., dict[str, object]]
302                       ) -> Callable[[TaskHandler], dict[str, object]]:
303             def wrapper(self: TaskHandler) -> dict[str, object]:
304                 # pylint: disable=protected-access
305                 # (because pylint here fails to detect the use of wrapper as a
306                 # method to self with respective access privileges)
307                 id_ = self._params.get_int_or_none('id')
308                 if target_class.can_create_by_id:
309                     item = target_class.by_id_or_create(self.conn, id_)
310                 else:
311                     item = target_class.by_id(self.conn, id_)
312                 return f(self, item)
313             return wrapper
314         return decorator
315
316     def do_GET_(self) -> str:
317         """Return redirect target on GET /."""
318         return '/day'
319
320     def _do_GET_calendar(self) -> dict[str, object]:
321         """Show Days from ?start= to ?end=.
322
323         Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
324         same, the only difference being the HTML template they are rendered to,
325         which .do_GET selects from their method name.
326         """
327         start = self._params.get_str_or_fail('start', '')
328         end = self._params.get_str_or_fail('end', '')
329         end = end if end != '' else date_in_n_days(366)
330         days, start, end = Day.by_date_range_with_limits(self.conn,
331                                                          (start, end), 'id')
332         days = Day.with_filled_gaps(days, start, end)
333         today = date_in_n_days(0)
334         return {'start': start, 'end': end, 'days': days, 'today': today}
335
336     def do_GET_calendar(self) -> dict[str, object]:
337         """Show Days from ?start= to ?end= – normal view."""
338         return self._do_GET_calendar()
339
340     def do_GET_calendar_txt(self) -> dict[str, object]:
341         """Show Days from ?start= to ?end= – minimalist view."""
342         return self._do_GET_calendar()
343
344     def do_GET_day(self) -> dict[str, object]:
345         """Show single Day of ?date=."""
346         date = self._params.get_str_or_fail('date', date_in_n_days(0))
347         day = Day.by_id_or_create(self.conn, date)
348         make_type = self._params.get_str_or_fail('make_type', '')
349         conditions_present = []
350         enablers_for = {}
351         disablers_for = {}
352         for todo in day.todos:
353             for condition in todo.conditions + todo.blockers:
354                 if condition not in conditions_present:
355                     conditions_present += [condition]
356                     enablers_for[condition.id_] = [p for p in
357                                                    Process.all(self.conn)
358                                                    if condition in p.enables]
359                     disablers_for[condition.id_] = [p for p in
360                                                     Process.all(self.conn)
361                                                     if condition in p.disables]
362         seen_todos: set[int] = set()
363         top_nodes = [t.get_step_tree(seen_todos)
364                      for t in day.todos if not t.parents]
365         return {'day': day,
366                 'top_nodes': top_nodes,
367                 'make_type': make_type,
368                 'enablers_for': enablers_for,
369                 'disablers_for': disablers_for,
370                 'conditions_present': conditions_present,
371                 'processes': Process.all(self.conn)}
372
373     @_get_item(Todo)
374     def do_GET_todo(self, todo: Todo) -> dict[str, object]:
375         """Show single Todo of ?id=."""
376
377         def walk_process_steps(node_id: int,
378                                process_step_nodes: list[ProcessStepsNode],
379                                steps_nodes: list[TodoOrProcStepNode]) -> int:
380             for process_step_node in process_step_nodes:
381                 node_id += 1
382                 proc = Process.by_id(self.conn,
383                                      process_step_node.step.step_process_id)
384                 node = TodoOrProcStepNode(node_id, None, proc, [])
385                 steps_nodes += [node]
386                 node_id = walk_process_steps(
387                         node_id, process_step_node.steps, node.children)
388             return node_id
389
390         def walk_todo_steps(node_id: int, todos: list[Todo],
391                             steps_nodes: list[TodoOrProcStepNode]) -> int:
392             for todo in todos:
393                 matched = False
394                 for match in [item for item in steps_nodes
395                               if item.process
396                               and item.process == todo.process]:
397                     match.todo = todo
398                     matched = True
399                     for child in match.children:
400                         child.fillable = True
401                     node_id = walk_todo_steps(
402                             node_id, todo.children, match.children)
403                 if not matched:
404                     node_id += 1
405                     node = TodoOrProcStepNode(node_id, todo, None, [])
406                     steps_nodes += [node]
407                     node_id = walk_todo_steps(
408                             node_id, todo.children, node.children)
409             return node_id
410
411         def collect_adoptables_keys(
412                 steps_nodes: list[TodoOrProcStepNode]) -> set[int]:
413             ids = set()
414             for node in steps_nodes:
415                 if not node.todo:
416                     assert isinstance(node.process, Process)
417                     assert isinstance(node.process.id_, int)
418                     ids.add(node.process.id_)
419                 ids = ids | collect_adoptables_keys(node.children)
420             return ids
421
422         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
423         process_tree = todo.process.get_steps(self.conn, None)
424         steps_todo_to_process: list[TodoOrProcStepNode] = []
425         last_node_id = walk_process_steps(0, process_tree,
426                                           steps_todo_to_process)
427         for steps_node in steps_todo_to_process:
428             steps_node.fillable = True
429         walk_todo_steps(last_node_id, todo_steps, steps_todo_to_process)
430         adoptables: dict[int, list[Todo]] = {}
431         any_adoptables = [Todo.by_id(self.conn, t.id_)
432                           for t in Todo.by_date(self.conn, todo.date)
433                           if t.id_ is not None
434                           and t != todo]
435         for id_ in collect_adoptables_keys(steps_todo_to_process):
436             adoptables[id_] = [t for t in any_adoptables
437                                if t.process.id_ == id_]
438         return {'todo': todo,
439                 'steps_todo_to_process': steps_todo_to_process,
440                 'adoption_candidates_for': adoptables,
441                 'process_candidates': sorted(Process.all(self.conn)),
442                 'todo_candidates': any_adoptables,
443                 'condition_candidates': Condition.all(self.conn)}
444
445     def do_GET_todos(self) -> dict[str, object]:
446         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
447         sort_by = self._params.get_str_or_fail('sort_by', '')
448         start = self._params.get_str_or_fail('start', '')
449         end = self._params.get_str_or_fail('end', '')
450         process_id = self._params.get_int_or_none('process_id')
451         comment_pattern = self._params.get_str_or_fail('comment_pattern', '')
452         todos = []
453         ret = Todo.by_date_range_with_limits(self.conn, (start, end))
454         todos_by_date_range, start, end = ret
455         todos = [t for t in todos_by_date_range
456                  if comment_pattern in t.comment
457                  and ((not process_id) or t.process.id_ == process_id)]
458         sort_by = Todo.sort_by(todos, sort_by)
459         return {'start': start, 'end': end, 'process_id': process_id,
460                 'comment_pattern': comment_pattern, 'todos': todos,
461                 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
462
463     def do_GET_conditions(self) -> dict[str, object]:
464         """Show all Conditions."""
465         pattern = self._params.get_str_or_fail('pattern', '')
466         sort_by = self._params.get_str_or_fail('sort_by', '')
467         conditions = Condition.matching(self.conn, pattern)
468         sort_by = Condition.sort_by(conditions, sort_by)
469         return {'conditions': conditions,
470                 'sort_by': sort_by,
471                 'pattern': pattern}
472
473     @_get_item(Condition)
474     def do_GET_condition(self, c: Condition) -> dict[str, object]:
475         """Show Condition of ?id=."""
476         ps = Process.all(self.conn)
477         return {'condition': c, 'is_new': c.id_ is None,
478                 'enabled_processes': [p for p in ps if c in p.conditions],
479                 'disabled_processes': [p for p in ps if c in p.blockers],
480                 'enabling_processes': [p for p in ps if c in p.enables],
481                 'disabling_processes': [p for p in ps if c in p.disables]}
482
483     @_get_item(Condition)
484     def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
485         """Show title history of Condition of ?id=."""
486         return {'condition': c}
487
488     @_get_item(Condition)
489     def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
490         """Show description historys of Condition of ?id=."""
491         return {'condition': c}
492
493     @_get_item(Process)
494     def do_GET_process(self, process: Process) -> dict[str, object]:
495         """Show Process of ?id=."""
496         owner_ids = self._params.get_all_int('step_to')
497         owned_ids = self._params.get_all_int('has_step')
498         title_64 = self._params.get_str('title_b64')
499         if title_64:
500             try:
501                 title = b64decode(title_64.encode()).decode()
502             except binascii_Exception as exc:
503                 msg = 'invalid base64 for ?title_b64='
504                 raise BadFormatException(msg) from exc
505             process.title.set(title)
506         preset_top_step = None
507         owners = process.used_as_step_by(self.conn)
508         for step_id in owner_ids:
509             owners += [Process.by_id(self.conn, step_id)]
510         for process_id in owned_ids:
511             Process.by_id(self.conn, process_id)  # to ensure ID exists
512             preset_top_step = process_id
513         return {'process': process, 'is_new': process.id_ is None,
514                 'preset_top_step': preset_top_step,
515                 'steps': process.get_steps(self.conn),
516                 'owners': owners,
517                 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
518                 'process_candidates': Process.all(self.conn),
519                 'condition_candidates': Condition.all(self.conn)}
520
521     @_get_item(Process)
522     def do_GET_process_titles(self, p: Process) -> dict[str, object]:
523         """Show title history of Process of ?id=."""
524         return {'process': p}
525
526     @_get_item(Process)
527     def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
528         """Show description historys of Process of ?id=."""
529         return {'process': p}
530
531     @_get_item(Process)
532     def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
533         """Show default effort history of Process of ?id=."""
534         return {'process': p}
535
536     def do_GET_processes(self) -> dict[str, object]:
537         """Show all Processes."""
538         pattern = self._params.get_str_or_fail('pattern', '')
539         sort_by = self._params.get_str_or_fail('sort_by', '')
540         processes = Process.matching(self.conn, pattern)
541         sort_by = Process.sort_by(processes, sort_by)
542         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
543
544     # POST handlers
545
546     @staticmethod
547     def _delete_or_post(target_class: Any, redir_target: str = '/'
548                         ) -> Callable[..., Callable[[TaskHandler], str]]:
549         def decorator(f: Callable[..., str]
550                       ) -> Callable[[TaskHandler], str]:
551             def wrapper(self: TaskHandler) -> str:
552                 # pylint: disable=protected-access
553                 # (because pylint here fails to detect the use of wrapper as a
554                 # method to self with respective access privileges)
555                 id_ = self._params.get_int_or_none('id')
556                 for _ in self._form.get_all_str('delete'):
557                     if id_ is None:
558                         msg = 'trying to delete non-saved ' +\
559                                 f'{target_class.__name__}'
560                         raise NotFoundException(msg)
561                     item = target_class.by_id(self.conn, id_)
562                     item.remove(self.conn)
563                     return redir_target
564                 if target_class.can_create_by_id:
565                     item = target_class.by_id_or_create(self.conn, id_)
566                 else:
567                     item = target_class.by_id(self.conn, id_)
568                 return f(self, item)
569             return wrapper
570         return decorator
571
572     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
573         """Update history timestamps for VersionedAttribute."""
574         id_ = self._params.get_int_or_none('id')
575         item = cls.by_id(self.conn, id_)
576         attr = getattr(item, attr_name)
577         for k, v in self._form.get_firsts_of_key_prefixed('at:').items():
578             old = k[3:]
579             if old[19:] != v:
580                 attr.reset_timestamp(old, f'{v}.0')
581         attr.save(self.conn)
582         return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
583
584     def do_POST_day(self) -> str:
585         """Update or insert Day of date and Todos mapped to it."""
586         # pylint: disable=too-many-locals
587         date = self._params.get_str_or_fail('date')
588         day_comment = self._form.get_str_or_fail('day_comment')
589         make_type = self._form.get_str_or_fail('make_type')
590         old_todos = self._form.get_all_int('todo_id')
591         new_todos_by_process = self._form.get_all_int('new_todo')
592         comments = self._form.get_all_str('comment')
593         efforts = self._form.get_all_floats_or_nones('effort')
594         done_todos = self._form.get_all_int('done')
595         for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
596             raise BadFormatException('"done" field refers to unknown Todo')
597         is_done = [t_id in done_todos for t_id in old_todos]
598         if not (len(old_todos) == len(is_done) == len(comments)
599                 == len(efforts)):
600             msg = 'not equal number each of number of todo_id, comments, ' +\
601                     'and efforts inputs'
602             raise BadFormatException(msg)
603         day = Day.by_id_or_create(self.conn, date)
604         day.comment = day_comment
605         day.save(self.conn)
606         new_todos = []
607         for process_id in sorted(new_todos_by_process):
608             process = Process.by_id(self.conn, process_id)
609             todo = Todo(None, process, False, date)
610             todo.save(self.conn)
611             new_todos += [todo]
612         if 'full' == make_type:
613             for todo in new_todos:
614                 todo.ensure_children(self.conn)
615         for i, todo_id in enumerate(old_todos):
616             todo = Todo.by_id(self.conn, todo_id)
617             todo.is_done = is_done[i]
618             todo.comment = comments[i]
619             todo.effort = efforts[i]
620             todo.save(self.conn)
621         return f'/day?date={date}&make_type={make_type}'
622
623     @_delete_or_post(Todo, '/')
624     def do_POST_todo(self, todo: Todo) -> str:
625         """Update Todo and its children."""
626         # pylint: disable=too-many-locals
627         # pylint: disable=too-many-branches
628         adopted_child_ids = self._form.get_all_int('adopt')
629         to_make = {'full': self._form.get_all_int('make_full'),
630                    'empty': self._form.get_all_int('make_empty')}
631         step_fillers = self._form.get_all_str('step_filler')
632         to_update: dict[str, Any] = {
633             'comment': self._form.get_str_or_fail('comment', '')}
634         for k in ('is_done', 'calendarize'):
635             v = self._form.get_bool_or_none(k)
636             if v is not None:
637                 to_update[k] = v
638         cond_rels = [self._form.get_all_int(name) for name in
639                      ['conditions', 'blockers', 'enables', 'disables']]
640         effort_or_not = self._form.get_str('effort')
641         if effort_or_not is not None:
642             if effort_or_not == '':
643                 to_update['effort'] = None
644             else:
645                 try:
646                     to_update['effort'] = float(effort_or_not)
647                 except ValueError as e:
648                     msg = 'cannot float form field value for key: effort'
649                     raise BadFormatException(msg) from e
650         todo.set_condition_relations(self.conn, *cond_rels)
651         for filler in [f for f in step_fillers if f != 'ignore']:
652             target_id: int
653             to_int = filler
654             for prefix in [p for p in ['make_empty_', 'make_full_']
655                            if filler.startswith(p)]:
656                 to_int = filler[len(prefix):]
657             try:
658                 target_id = int(to_int)
659             except ValueError as e:
660                 msg = 'bad fill_for target: {filler}'
661                 raise BadFormatException(msg) from e
662             if filler.startswith('make_empty_'):
663                 to_make['empty'] += [target_id]
664             elif filler.startswith('make_full_'):
665                 to_make['full'] += [target_id]
666             else:
667                 adopted_child_ids += [target_id]
668         to_remove = []
669         for child in todo.children:
670             if child.id_ and (child.id_ not in adopted_child_ids):
671                 to_remove += [child.id_]
672         for id_ in to_remove:
673             child = Todo.by_id(self.conn, id_)
674             todo.remove_child(child)
675         for child_id in adopted_child_ids:
676             if child_id not in [c.id_ for c in todo.children]:
677                 todo.add_child(Todo.by_id(self.conn, child_id))
678         todo.update_attrs(**to_update)
679         for approach, proc_ids in to_make.items():
680             for process_id in proc_ids:
681                 process = Process.by_id(self.conn, process_id)
682                 made = Todo(None, process, False, todo.date)
683                 made.save(self.conn)
684                 if 'full' == approach:
685                     made.ensure_children(self.conn)
686                 todo.add_child(made)
687         # todo.save() may destroy Todo if .effort < 0, so retrieve .id_ early
688         url = f'/todo?id={todo.id_}'
689         todo.save(self.conn)
690         return url
691
692     def do_POST_process_descriptions(self) -> str:
693         """Update history timestamps for Process.description."""
694         return self._change_versioned_timestamps(Process, 'description')
695
696     def do_POST_process_efforts(self) -> str:
697         """Update history timestamps for Process.effort."""
698         return self._change_versioned_timestamps(Process, 'effort')
699
700     def do_POST_process_titles(self) -> str:
701         """Update history timestamps for Process.title."""
702         return self._change_versioned_timestamps(Process, 'title')
703
704     @_delete_or_post(Process, '/processes')
705     def do_POST_process(self, process: Process) -> str:
706         """Update or insert Process of ?id= and fields defined in postvars."""
707         # pylint: disable=too-many-locals
708         versioned = {'title': self._form.get_str_or_fail('title'),
709                      'description': self._form.get_str_or_fail('description'),
710                      'effort': self._form.get_float_or_fail('effort')}
711         cond_rels = [self._form.get_all_int(s) for s
712                      in ['conditions', 'blockers', 'enables', 'disables']]
713         calendarize = self._form.get_bool_or_none('calendarize')
714         step_of = self._form.get_all_str('step_of')
715         suppresses = self._form.get_all_int('suppresses')
716         kept_steps = self._form.get_all_int('kept_steps')
717         new_top_steps = self._form.get_all_str('new_top_step')
718         new_steps_to = {}
719         for step_id in kept_steps:
720             name = f'new_step_to_{step_id}'
721             new_steps_to[step_id] = self._form.get_all_int(name)
722         for k, v in versioned.items():
723             getattr(process, k).set(v)
724         process.set_condition_relations(self.conn, *cond_rels)
725         if calendarize is not None:
726             process.calendarize = calendarize
727         process.save(self.conn)
728         assert isinstance(process.id_, int)
729         # set relations to, and if non-existant yet: create, other Processes
730         # pylint: disable=fixme
731         # TODO: in what order to set owners, owneds, and possibly step
732         # suppressions can make the difference between recursion checks
733         # failing; should probably be handled class-internally to Process
734         # rather than here!
735         # 1. owners (upwards)
736         owners_to_set = []
737         new_owner_title = None
738         for owner_identifier in step_of:
739             try:
740                 owners_to_set += [int(owner_identifier)]
741             except ValueError:
742                 new_owner_title = owner_identifier
743         process.set_owners(self.conn, owners_to_set)
744         # 2. owneds (downwards)
745         new_step_title = None
746         steps: list[ProcessStep] = [ProcessStep.by_id(self.conn, step_id)
747                                     for step_id in kept_steps]
748         for step_id in kept_steps:
749             new_sub_steps = [
750                     ProcessStep(None, process.id_, step_process_id, step_id)
751                     for step_process_id in new_steps_to[step_id]]
752             steps += new_sub_steps
753         for step_id_or_new_title in new_top_steps:
754             try:
755                 step_process_id = int(step_id_or_new_title)
756                 step = ProcessStep(None, process.id_, step_process_id, None)
757                 steps += [step]
758             except ValueError:
759                 new_step_title = step_id_or_new_title
760         process.set_steps(self.conn, steps)
761         process.set_step_suppressions(self.conn, suppresses)
762         # encode titles for potentially newly created Processes up or down
763         params = f'id={process.id_}'
764         if new_step_title:
765             title_b64_encoded = b64encode(new_step_title.encode()).decode()
766             params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
767         elif new_owner_title:
768             title_b64_encoded = b64encode(new_owner_title.encode()).decode()
769             params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
770         process.save(self.conn)
771         return f'/process?{params}'
772
773     def do_POST_condition_descriptions(self) -> str:
774         """Update history timestamps for Condition.description."""
775         return self._change_versioned_timestamps(Condition, 'description')
776
777     def do_POST_condition_titles(self) -> str:
778         """Update history timestamps for Condition.title."""
779         return self._change_versioned_timestamps(Condition, 'title')
780
781     @_delete_or_post(Condition, '/conditions')
782     def do_POST_condition(self, condition: Condition) -> str:
783         """Update/insert Condition of ?id= and fields defined in postvars."""
784         title = self._form.get_str_or_fail('title')
785         description = self._form.get_str_or_fail('description')
786         is_active = self._form.get_bool_or_none('is_active')
787         if is_active is not None:
788             condition.is_active = is_active
789         condition.title.set(title)
790         condition.description.set(description)
791         condition.save(self.conn)
792         return f'/condition?id={condition.id_}'