home · contact · privacy
Add TaskHandler code to actually make previous commit work.
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from __future__ import annotations
3 from typing import Any, Callable
4 from base64 import b64encode, b64decode
5 from binascii import Error as binascii_Exception
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
15                                  NotFoundException)
16 from plomtask.db import DatabaseConnection, DatabaseFile, BaseModel
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo, TodoOrProcStepNode
20 from plomtask.misc import DictableNode
21
22 TEMPLATES_DIR = 'templates'
23
24
25 class TaskServer(HTTPServer):
26     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
27
28     def __init__(self, db_file: DatabaseFile,
29                  *args: Any, **kwargs: Any) -> None:
30         super().__init__(*args, **kwargs)
31         self.db = db_file
32         self.render_mode = 'html'
33         self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
34
35
36 class InputsParser:
37     """Wrapper for validating and retrieving dict-like HTTP inputs."""
38
39     def __init__(self, dict_: dict[str, list[str]]) -> None:
40         self.inputs = dict_
41
42     def get_all_str(self, key: str) -> list[str]:
43         """Retrieve list of string values at key (empty if no key)."""
44         if key not in self.inputs.keys():
45             return []
46         return self.inputs[key]
47
48     def get_all_int(self, key: str) -> list[int]:
49         """Retrieve list of int values at key."""
50         all_str = self.get_all_str(key)
51         try:
52             return [int(s) for s in all_str if len(s) > 0]
53         except ValueError as e:
54             msg = f'cannot int a form field value for key {key} in: {all_str}'
55             raise BadFormatException(msg) from e
56
57     def get_str(self, key: str, default: str | None = None) -> str | None:
58         """Retrieve single/first string value of key, or default."""
59         vals = self.get_all_str(key)
60         if vals:
61             return vals[0]
62         return default
63
64     def get_str_or_fail(self, key: str, default: str | None = None) -> str:
65         """Retrieve first string value of key, if none: fail or default."""
66         vals = self.get_all_str(key)
67         if not vals:
68             if default is not None:
69                 return default
70             raise BadFormatException(f'no value found for key: {key}')
71         return vals[0]
72
73     def get_int_or_none(self, key: str) -> int | None:
74         """Retrieve single/first value of key as int, return None if empty."""
75         val = self.get_str_or_fail(key, '')
76         if val == '':
77             return None
78         try:
79             return int(val)
80         except (ValueError, TypeError) as e:
81             msg = f'cannot int form field value for key {key}: {val}'
82             raise BadFormatException(msg) from e
83
84     def get_bool_or_none(self, key: str) -> bool | None:
85         """Return value to key if truish; if no value to key, None."""
86         val = self.get_str(key)
87         if val is None:
88             return None
89         return val in {'True', 'true', '1', 'on'}
90
91     def get_all_of_key_prefixed(self, key_prefix: str) -> dict[str, list[str]]:
92         """Retrieve dict of strings at keys starting with key_prefix."""
93         ret = {}
94         for key in [k for k in self.inputs.keys() if k.startswith(key_prefix)]:
95             ret[key[len(key_prefix):]] = self.inputs[key]
96         return ret
97
98     def get_float_or_fail(self, key: str) -> float:
99         """Retrieve float value of key from self.postvars, fail if none."""
100         val = self.get_str_or_fail(key)
101         try:
102             return float(val)
103         except ValueError as e:
104             msg = f'cannot float form field value for key {key}: {val}'
105             raise BadFormatException(msg) from e
106
107     def get_all_floats_or_nones(self, key: str) -> list[float | None]:
108         """Retrieve list of float value at key, None if empty strings."""
109         ret: list[float | None] = []
110         for val in self.get_all_str(key):
111             if '' == val:
112                 ret += [None]
113             else:
114                 try:
115                     ret += [float(val)]
116                 except ValueError as e:
117                     msg = f'cannot float form field value for key {key}: {val}'
118                     raise BadFormatException(msg) from e
119         return ret
120
121
122 class TaskHandler(BaseHTTPRequestHandler):
123     """Handles single HTTP request."""
124     # pylint: disable=too-many-public-methods
125     server: TaskServer
126     _conn: DatabaseConnection
127     _site: str
128     _form: InputsParser
129     _params: InputsParser
130
131     def _send_page(
132             self, ctx: dict[str, Any], tmpl_name: str, code: int = 200
133             ) -> None:
134         """HTTP-send ctx as HTML or JSON, as defined by .server.render_mode.
135
136         The differentiation by .server.render_mode serves to allow easily
137         comparable JSON responses for automatic testing.
138         """
139         body: str
140         headers: list[tuple[str, str]] = []
141         if 'html' == self.server.render_mode:
142             tmpl = self.server.jinja.get_template(f'{tmpl_name}.html')
143             body = tmpl.render(ctx)
144         else:
145             body = self._ctx_to_json(ctx)
146             headers += [('Content-Type', 'application/json')]
147         self.send_response(code)
148         for header_tuple in headers:
149             self.send_header(*header_tuple)
150         self.end_headers()
151         self.wfile.write(bytes(body, 'utf-8'))
152
153     def _ctx_to_json(self, ctx: dict[str, object]) -> str:
154         """Render ctx into JSON string.
155
156         Flattens any objects that json.dumps might not want to serialize, and
157         turns occurrences of BaseModel objects into listings of their .id_, to
158         be resolved to a full dict inside a top-level '_library' dictionary,
159         to avoid endless and circular nesting.
160         """
161
162         def flatten(node: object) -> object:
163
164             def update_library_with(
165                     item: BaseModel[int] | BaseModel[str]) -> None:
166                 cls_name = item.__class__.__name__
167                 if cls_name not in library:
168                     library[cls_name] = {}
169                 if item.id_ not in library[cls_name]:
170                     d, refs = item.as_dict_and_refs
171                     id_key = '?' if item.id_ is None else item.id_
172                     library[cls_name][id_key] = d
173                     for ref in refs:
174                         update_library_with(ref)
175
176             if isinstance(node, BaseModel):
177                 update_library_with(node)
178                 return node.id_
179             if isinstance(node, DictableNode):
180                 d, refs = node.as_dict_and_refs
181                 for ref in refs:
182                     update_library_with(ref)
183                 return d
184             if isinstance(node, (list, tuple)):
185                 return [flatten(item) for item in node]
186             if isinstance(node, dict):
187                 d = {}
188                 for k, v in node.items():
189                     d[k] = flatten(v)
190                 return d
191             if isinstance(node, HandledException):
192                 return str(node)
193             return node
194
195         library: dict[str, dict[str | int, object]] = {}
196         for k, v in ctx.items():
197             ctx[k] = flatten(v)
198         ctx['_library'] = library
199         return json_dumps(ctx)
200
201     @staticmethod
202     def _request_wrapper(http_method: str, not_found_msg: str
203                          ) -> Callable[..., Callable[[TaskHandler], None]]:
204         """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
205
206         Among other things, conditionally cleans all caches, but only on POST
207         requests, as only those are expected to change the states of objects
208         that may be cached, and certainly only those are expected to write any
209         changes to the database. We want to call them as early though as
210         possible here, either exactly after the specific request handler
211         returns successfully, or right after any exception is triggered –
212         otherwise, race conditions become plausible.
213
214         Note that otherwise any POST attempt, even a failed one, may end in
215         problematic inconsistencies:
216
217         - if the POST handler experiences an Exception, changes to objects
218           won't get written to the DB, but the changed objects may remain in
219           the cache and affect other objects despite their possibly illegal
220           state
221
222         - even if an object was just saved to the DB, we cannot be sure its
223           current state is completely identical to what we'd get if loading it
224           fresh from the DB (e.g. currently Process.n_owners is only updated
225           when loaded anew via .from_table_row, nor is its state written to
226           the DB by .save; a questionable design choice, but proof that we
227           have no guarantee that objects' .save stores all their states we'd
228           prefer at their most up-to-date.
229         """
230
231         def clear_caches() -> None:
232             for cls in (Day, Todo, Condition, Process, ProcessStep):
233                 assert hasattr(cls, 'empty_cache')
234                 cls.empty_cache()
235
236         def decorator(f: Callable[..., str | None]
237                       ) -> Callable[[TaskHandler], None]:
238             def wrapper(self: TaskHandler) -> None:
239                 # pylint: disable=protected-access
240                 # (because pylint here fails to detect the use of wrapper as a
241                 # method to self with respective access privileges)
242                 try:
243                     self._conn = DatabaseConnection(self.server.db)
244                     parsed_url = urlparse(self.path)
245                     self._site = path_split(parsed_url.path)[1]
246                     params = parse_qs(parsed_url.query,
247                                       keep_blank_values=True,
248                                       strict_parsing=True)
249                     self._params = InputsParser(params)
250                     handler_name = f'do_{http_method}_{self._site}'
251                     if hasattr(self, handler_name):
252                         handler = getattr(self, handler_name)
253                         redir_target = f(self, handler)
254                         if 'POST' == http_method:
255                             clear_caches()
256                         if redir_target:
257                             self.send_response(302)
258                             self.send_header('Location', redir_target)
259                             self.end_headers()
260                     else:
261                         msg = f'{not_found_msg}: {self._site}'
262                         raise NotFoundException(msg)
263                 except HandledException as error:
264                     if 'POST' == http_method:
265                         clear_caches()
266                     ctx = {'msg': error}
267                     self._send_page(ctx, 'msg', error.http_code)
268                 finally:
269                     self._conn.close()
270             return wrapper
271         return decorator
272
273     @_request_wrapper('GET', 'Unknown page')
274     def do_GET(self, handler: Callable[[], str | dict[str, object]]
275                ) -> str | None:
276         """Render page with result of handler, or redirect if result is str."""
277         tmpl_name = f'{self._site}'
278         ctx_or_redir_target = handler()
279         if isinstance(ctx_or_redir_target, str):
280             return ctx_or_redir_target
281         self._send_page(ctx_or_redir_target, tmpl_name)
282         return None
283
284     @_request_wrapper('POST', 'Unknown POST target')
285     def do_POST(self, handler: Callable[[], str]) -> str:
286         """Handle POST with handler, prepare redirection to result."""
287         length = int(self.headers['content-length'])
288         postvars = parse_qs(self.rfile.read(length).decode(),
289                             keep_blank_values=True)
290         self._form = InputsParser(postvars)
291         redir_target = handler()
292         self._conn.commit()
293         return redir_target
294
295     # GET handlers
296
297     @staticmethod
298     def _get_item(target_class: Any
299                   ) -> Callable[..., Callable[[TaskHandler],
300                                               dict[str, object]]]:
301         def decorator(f: Callable[..., dict[str, object]]
302                       ) -> Callable[[TaskHandler], dict[str, object]]:
303             def wrapper(self: TaskHandler) -> dict[str, object]:
304                 # pylint: disable=protected-access
305                 # (because pylint here fails to detect the use of wrapper as a
306                 # method to self with respective access privileges)
307                 id_ = self._params.get_int_or_none('id')
308                 if target_class.can_create_by_id:
309                     item = target_class.by_id_or_create(self._conn, id_)
310                 else:
311                     item = target_class.by_id(self._conn, id_)
312                 return f(self, item)
313             return wrapper
314         return decorator
315
316     def do_GET_(self) -> str:
317         """Return redirect target on GET /."""
318         return '/day'
319
320     def _do_GET_calendar(self) -> dict[str, object]:
321         """Show Days from ?start= to ?end=.
322
323         Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
324         same, the only difference being the HTML template they are rendered to,
325         which .do_GET selects from their method name.
326         """
327         start = self._params.get_str_or_fail('start', '')
328         end = self._params.get_str_or_fail('end', '')
329         end = end if end != '' else date_in_n_days(366)
330         #
331         days, start, end = Day.by_date_range_with_limits(self._conn,
332                                                          (start, end), 'id')
333         days = Day.with_filled_gaps(days, start, end)
334         today = date_in_n_days(0)
335         return {'start': start, 'end': end, 'days': days, 'today': today}
336
337     def do_GET_calendar(self) -> dict[str, object]:
338         """Show Days from ?start= to ?end= – normal view."""
339         return self._do_GET_calendar()
340
341     def do_GET_calendar_txt(self) -> dict[str, object]:
342         """Show Days from ?start= to ?end= – minimalist view."""
343         return self._do_GET_calendar()
344
345     def do_GET_day(self) -> dict[str, object]:
346         """Show single Day of ?date=."""
347         date = self._params.get_str_or_fail('date', date_in_n_days(0))
348         make_type = self._params.get_str_or_fail('make_type', 'full')
349         #
350         day = Day.by_id_or_create(self._conn, date)
351         conditions_present = []
352         enablers_for = {}
353         disablers_for = {}
354         for todo in day.todos:
355             for condition in todo.conditions + todo.blockers:
356                 if condition not in conditions_present:
357                     conditions_present += [condition]
358                     enablers_for[condition.id_] = [p for p in
359                                                    Process.all(self._conn)
360                                                    if condition in p.enables]
361                     disablers_for[condition.id_] = [p for p in
362                                                     Process.all(self._conn)
363                                                     if condition in p.disables]
364         seen_todos: set[int] = set()
365         top_nodes = [t.get_step_tree(seen_todos)
366                      for t in day.todos if not t.parents]
367         return {'day': day,
368                 'top_nodes': top_nodes,
369                 'make_type': make_type,
370                 'enablers_for': enablers_for,
371                 'disablers_for': disablers_for,
372                 'conditions_present': conditions_present,
373                 'processes': Process.all(self._conn)}
374
375     @_get_item(Todo)
376     def do_GET_todo(self, todo: Todo) -> dict[str, object]:
377         """Show single Todo of ?id=."""
378
379         def walk_process_steps(node_id: int,
380                                process_step_nodes: list[ProcessStepsNode],
381                                steps_nodes: list[TodoOrProcStepNode]) -> int:
382             for process_step_node in process_step_nodes:
383                 node_id += 1
384                 proc = Process.by_id(self._conn,
385                                      process_step_node.step.step_process_id)
386                 node = TodoOrProcStepNode(node_id, None, proc, [])
387                 steps_nodes += [node]
388                 node_id = walk_process_steps(
389                         node_id, process_step_node.steps, node.children)
390             return node_id
391
392         def walk_todo_steps(node_id: int, todos: list[Todo],
393                             steps_nodes: list[TodoOrProcStepNode]) -> int:
394             for todo in todos:
395                 matched = False
396                 for match in [item for item in steps_nodes
397                               if item.process
398                               and item.process == todo.process]:
399                     match.todo = todo
400                     matched = True
401                     for child in match.children:
402                         child.fillable = True
403                     node_id = walk_todo_steps(
404                             node_id, todo.children, match.children)
405                 if not matched:
406                     node_id += 1
407                     node = TodoOrProcStepNode(node_id, todo, None, [])
408                     steps_nodes += [node]
409                     node_id = walk_todo_steps(
410                             node_id, todo.children, node.children)
411             return node_id
412
413         def collect_adoptables_keys(
414                 steps_nodes: list[TodoOrProcStepNode]) -> set[int]:
415             ids = set()
416             for node in steps_nodes:
417                 if not node.todo:
418                     assert isinstance(node.process, Process)
419                     assert isinstance(node.process.id_, int)
420                     ids.add(node.process.id_)
421                 ids = ids | collect_adoptables_keys(node.children)
422             return ids
423
424         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
425         process_tree = todo.process.get_steps(self._conn, None)
426         steps_todo_to_process: list[TodoOrProcStepNode] = []
427         last_node_id = walk_process_steps(0, process_tree,
428                                           steps_todo_to_process)
429         for steps_node in steps_todo_to_process:
430             steps_node.fillable = True
431         walk_todo_steps(last_node_id, todo_steps, steps_todo_to_process)
432         adoptables: dict[int, list[Todo]] = {}
433         any_adoptables = [Todo.by_id(self._conn, t.id_)
434                           for t in Todo.by_date(self._conn, todo.date)
435                           if t.id_ is not None
436                           and t != todo]
437         for id_ in collect_adoptables_keys(steps_todo_to_process):
438             adoptables[id_] = [t for t in any_adoptables
439                                if t.process.id_ == id_]
440         return {'todo': todo,
441                 'steps_todo_to_process': steps_todo_to_process,
442                 'adoption_candidates_for': adoptables,
443                 'process_candidates': sorted(Process.all(self._conn)),
444                 'todo_candidates': any_adoptables,
445                 'condition_candidates': Condition.all(self._conn)}
446
447     def do_GET_todos(self) -> dict[str, object]:
448         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
449         sort_by = self._params.get_str_or_fail('sort_by', 'title')
450         start = self._params.get_str_or_fail('start', '')
451         end = self._params.get_str_or_fail('end', '')
452         process_id = self._params.get_int_or_none('process_id')
453         comment_pattern = self._params.get_str_or_fail('comment_pattern', '')
454         #
455         ret = Todo.by_date_range_with_limits(self._conn, (start, end))
456         todos_by_date_range, start, end = ret
457         todos = [t for t in todos_by_date_range
458                  if comment_pattern in t.comment
459                  and ((not process_id) or t.process.id_ == process_id)]
460         sort_by = Todo.sort_by(todos, sort_by)
461         return {'start': start, 'end': end, 'process_id': process_id,
462                 'comment_pattern': comment_pattern, 'todos': todos,
463                 'all_processes': Process.all(self._conn), 'sort_by': sort_by}
464
465     def do_GET_conditions(self) -> dict[str, object]:
466         """Show all Conditions."""
467         pattern = self._params.get_str_or_fail('pattern', '')
468         sort_by = self._params.get_str_or_fail('sort_by', 'title')
469         #
470         conditions = Condition.matching(self._conn, pattern)
471         sort_by = Condition.sort_by(conditions, sort_by)
472         return {'conditions': conditions,
473                 'sort_by': sort_by,
474                 'pattern': pattern}
475
476     @_get_item(Condition)
477     def do_GET_condition(self, c: Condition) -> dict[str, object]:
478         """Show Condition of ?id=."""
479         ps = Process.all(self._conn)
480         return {'condition': c, 'is_new': c.id_ is None,
481                 'enabled_processes': [p for p in ps if c in p.conditions],
482                 'disabled_processes': [p for p in ps if c in p.blockers],
483                 'enabling_processes': [p for p in ps if c in p.enables],
484                 'disabling_processes': [p for p in ps if c in p.disables]}
485
486     @_get_item(Condition)
487     def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
488         """Show title history of Condition of ?id=."""
489         return {'condition': c}
490
491     @_get_item(Condition)
492     def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
493         """Show description historys of Condition of ?id=."""
494         return {'condition': c}
495
496     @_get_item(Process)
497     def do_GET_process(self, process: Process) -> dict[str, object]:
498         """Show Process of ?id=."""
499         owner_ids = self._params.get_all_int('step_to')
500         owned_ids = self._params.get_all_int('has_step')
501         title_64 = self._params.get_str('title_b64')
502         title_new = None
503         if title_64:
504             try:
505                 title_new = b64decode(title_64.encode()).decode()
506             except binascii_Exception as exc:
507                 msg = 'invalid base64 for ?title_b64='
508                 raise BadFormatException(msg) from exc
509         #
510         if title_new:
511             process.title.set(title_new)
512         preset_top_step = None
513         owners = process.used_as_step_by(self._conn)
514         for step_id in owner_ids:
515             owners += [Process.by_id(self._conn, step_id)]
516         for process_id in owned_ids:
517             Process.by_id(self._conn, process_id)  # to ensure ID exists
518             preset_top_step = process_id
519         return {'process': process, 'is_new': process.id_ is None,
520                 'preset_top_step': preset_top_step,
521                 'steps': process.get_steps(self._conn),
522                 'owners': owners,
523                 'n_todos': len(Todo.by_process_id(self._conn, process.id_)),
524                 'process_candidates': Process.all(self._conn),
525                 'condition_candidates': Condition.all(self._conn)}
526
527     @_get_item(Process)
528     def do_GET_process_titles(self, p: Process) -> dict[str, object]:
529         """Show title history of Process of ?id=."""
530         return {'process': p}
531
532     @_get_item(Process)
533     def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
534         """Show description historys of Process of ?id=."""
535         return {'process': p}
536
537     @_get_item(Process)
538     def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
539         """Show default effort history of Process of ?id=."""
540         return {'process': p}
541
542     def do_GET_processes(self) -> dict[str, object]:
543         """Show all Processes."""
544         pattern = self._params.get_str_or_fail('pattern', '')
545         sort_by = self._params.get_str_or_fail('sort_by', 'title')
546         #
547         processes = Process.matching(self._conn, pattern)
548         sort_by = Process.sort_by(processes, sort_by)
549         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
550
551     # POST handlers
552
553     @staticmethod
554     def _delete_or_post(target_class: Any, redir_target: str = '/'
555                         ) -> Callable[..., Callable[[TaskHandler], str]]:
556         def decorator(f: Callable[..., str]
557                       ) -> Callable[[TaskHandler], str]:
558             def wrapper(self: TaskHandler) -> str:
559                 # pylint: disable=protected-access
560                 # (because pylint here fails to detect the use of wrapper as a
561                 # method to self with respective access privileges)
562                 id_ = self._params.get_int_or_none('id')
563                 for _ in self._form.get_all_str('delete'):
564                     if id_ is None:
565                         msg = 'trying to delete non-saved ' +\
566                                 f'{target_class.__name__}'
567                         raise NotFoundException(msg)
568                     item = target_class.by_id(self._conn, id_)
569                     item.remove(self._conn)
570                     return redir_target
571                 if target_class.can_create_by_id:
572                     item = target_class.by_id_or_create(self._conn, id_)
573                 else:
574                     item = target_class.by_id(self._conn, id_)
575                 return f(self, item)
576             return wrapper
577         return decorator
578
579     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
580         """Update history timestamps for VersionedAttribute."""
581         id_ = self._params.get_int_or_none('id')
582         item = cls.by_id(self._conn, id_)
583         attr = getattr(item, attr_name)
584         for k, vals in self._form.get_all_of_key_prefixed('at:').items():
585             if k[19:] != vals[0]:
586                 attr.reset_timestamp(k, f'{vals[0]}.0')
587         attr.save(self._conn)
588         return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
589
590     def do_POST_day(self) -> str:
591         """Update or insert Day of date and Todos mapped to it."""
592         # pylint: disable=too-many-locals
593         date = self._params.get_str_or_fail('date')
594         day_comment = self._form.get_str_or_fail('day_comment')
595         make_type = self._form.get_str_or_fail('make_type')
596         old_todos = self._form.get_all_int('todo_id')
597         new_todos_by_process = self._form.get_all_int('new_todo')
598         comments = self._form.get_all_str('comment')
599         efforts = self._form.get_all_floats_or_nones('effort')
600         done_todos = self._form.get_all_int('done')
601         is_done = [t_id in done_todos for t_id in old_todos]
602         if not (len(old_todos) == len(is_done) == len(comments)
603                 == len(efforts)):
604             msg = 'not equal number each of number of todo_id, comments, ' +\
605                     'and efforts inputs'
606             raise BadFormatException(msg)
607         for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
608             raise BadFormatException('"done" field refers to unknown Todo')
609         #
610         day = Day.by_id_or_create(self._conn, date)
611         day.comment = day_comment
612         day.save(self._conn)
613         new_todos = []
614         for process_id in sorted(new_todos_by_process):
615             process = Process.by_id(self._conn, process_id)
616             todo = Todo(None, process, False, date)
617             todo.save(self._conn)
618             new_todos += [todo]
619         if 'full' == make_type:
620             for todo in new_todos:
621                 todo.ensure_children(self._conn)
622         for i, todo_id in enumerate(old_todos):
623             todo = Todo.by_id(self._conn, todo_id)
624             todo.is_done = is_done[i]
625             todo.comment = comments[i]
626             todo.effort = efforts[i]
627             todo.save(self._conn)
628         return f'/day?date={date}&make_type={make_type}'
629
630     @_delete_or_post(Todo, '/')
631     def do_POST_todo(self, todo: Todo) -> str:
632         """Update Todo and its children."""
633         # pylint: disable=too-many-locals
634         # pylint: disable=too-many-branches
635         # pylint: disable=too-many-statements
636         assert todo.id_ is not None
637         adoptees = [(id_, todo.id_) for id_ in self._form.get_all_int('adopt')]
638         to_make = {'full': [(id_, todo.id_)
639                             for id_ in self._form.get_all_int('make_full')],
640                    'empty': [(id_, todo.id_)
641                              for id_ in self._form.get_all_int('make_empty')]}
642         step_fillers_to = self._form.get_all_of_key_prefixed('step_filler_to_')
643         to_update: dict[str, Any] = {
644             'comment': self._form.get_str_or_fail('comment', '')}
645         for k in ('is_done', 'calendarize'):
646             v = self._form.get_bool_or_none(k)
647             if v is not None:
648                 to_update[k] = v
649         cond_rels = [self._form.get_all_int(name) for name in
650                      ['conditions', 'blockers', 'enables', 'disables']]
651         effort_or_not = self._form.get_str('effort')
652         if effort_or_not is not None:
653             if effort_or_not == '':
654                 to_update['effort'] = None
655             else:
656                 try:
657                     to_update['effort'] = float(effort_or_not)
658                 except ValueError as e:
659                     msg = 'cannot float form field value for key: effort'
660                     raise BadFormatException(msg) from e
661         for k, fillers in step_fillers_to.items():
662             try:
663                 parent_id = int(k)
664             except ValueError as e:
665                 msg = f'bad step_filler_to_ key: {k}'
666                 raise BadFormatException(msg) from e
667             for filler in [f for f in fillers if f != 'ignore']:
668                 target_id: int
669                 prefix = 'make_'
670                 to_int = filler[5:] if filler.startswith(prefix) else filler
671                 try:
672                     target_id = int(to_int)
673                 except ValueError as e:
674                     msg = f'bad fill_for target: {filler}'
675                     raise BadFormatException(msg) from e
676                 if filler.startswith(prefix):
677                     to_make['empty'] += [(target_id, parent_id)]
678                 else:
679                     adoptees += [(target_id, parent_id)]
680         #
681         todo.set_condition_relations(self._conn, *cond_rels)
682         for parent in [Todo.by_id(self._conn, a[1])
683                        for a in adoptees] + [todo]:
684             for child in parent.children:
685                 if child not in [t[0] for t in adoptees
686                                  if t[0] == child.id_ and t[1] == parent.id_]:
687                     parent.remove_child(child)
688                     parent.save(self._conn)
689         for child_id, parent_id in adoptees:
690             parent = Todo.by_id(self._conn, parent_id)
691             if child_id not in [c.id_ for c in parent.children]:
692                 parent.add_child(Todo.by_id(self._conn, child_id))
693                 parent.save(self._conn)
694         todo.update_attrs(**to_update)
695         for approach, make_data in to_make.items():
696             for process_id, parent_id in make_data:
697                 parent = Todo.by_id(self._conn, parent_id)
698                 process = Process.by_id(self._conn, process_id)
699                 made = Todo(None, process, False, todo.date)
700                 made.save(self._conn)
701                 if 'full' == approach:
702                     made.ensure_children(self._conn)
703                 parent.add_child(made)
704                 parent.save(self._conn)
705         # todo.save() may destroy Todo if .effort < 0, so retrieve .id_ early
706         url = f'/todo?id={todo.id_}'
707         todo.save(self._conn)
708         return url
709
710     def do_POST_process_descriptions(self) -> str:
711         """Update history timestamps for Process.description."""
712         return self._change_versioned_timestamps(Process, 'description')
713
714     def do_POST_process_efforts(self) -> str:
715         """Update history timestamps for Process.effort."""
716         return self._change_versioned_timestamps(Process, 'effort')
717
718     def do_POST_process_titles(self) -> str:
719         """Update history timestamps for Process.title."""
720         return self._change_versioned_timestamps(Process, 'title')
721
722     @_delete_or_post(Process, '/processes')
723     def do_POST_process(self, process: Process) -> str:
724         """Update or insert Process of ?id= and fields defined in postvars."""
725         # pylint: disable=too-many-locals
726
727         def id_or_title(l_id_or_title: list[str]) -> tuple[str, list[int]]:
728             l_ids, title = [], ''
729             for id_or_title in l_id_or_title:
730                 try:
731                     l_ids += [int(id_or_title)]
732                 except ValueError:
733                     title = id_or_title
734             return title, l_ids
735
736         versioned = {'title': self._form.get_str_or_fail('title'),
737                      'description': self._form.get_str_or_fail('description'),
738                      'effort': self._form.get_float_or_fail('effort')}
739         cond_rels = [self._form.get_all_int(s) for s
740                      in ['conditions', 'blockers', 'enables', 'disables']]
741         calendarize = self._form.get_bool_or_none('calendarize')
742         step_of = self._form.get_all_str('step_of')
743         suppressions = self._form.get_all_int('suppresses')
744         kept_steps = self._form.get_all_int('kept_steps')
745         new_top_step_procs = self._form.get_all_str('new_top_step')
746         new_steps_to = {}
747         for step_id in kept_steps:
748             name = f'new_step_to_{step_id}'
749             new_steps_to[step_id] = self._form.get_all_int(name)
750         new_owner_title, owners_to_set = id_or_title(step_of)
751         new_step_title, new_top_step_proc_ids = id_or_title(new_top_step_procs)
752         #
753         for k, v in versioned.items():
754             getattr(process, k).set(v)
755         if calendarize is not None:
756             process.calendarize = calendarize
757         process.save(self._conn)
758         assert isinstance(process.id_, int)
759         # set relations to Conditions and ProcessSteps / other Processes
760         process.set_condition_relations(self._conn, *cond_rels)
761         owned_steps = []
762         for step_id in kept_steps:
763             owned_steps += [ProcessStep.by_id(self._conn, step_id)]
764             owned_steps += [  # new sub-steps
765                     ProcessStep(None, process.id_, step_process_id, step_id)
766                     for step_process_id in new_steps_to[step_id]]
767         for step_process_id in new_top_step_proc_ids:
768             owned_steps += [ProcessStep(None, process.id_, step_process_id,
769                                         None)]
770         process.set_step_relations(self._conn, owners_to_set, suppressions,
771                                    owned_steps)
772         # encode titles for potential newly-to-create Processes up or down
773         params = f'id={process.id_}'
774         if new_step_title:
775             title_b64_encoded = b64encode(new_step_title.encode()).decode()
776             params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
777         elif new_owner_title:
778             title_b64_encoded = b64encode(new_owner_title.encode()).decode()
779             params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
780         process.save(self._conn)
781         return f'/process?{params}'
782
783     def do_POST_condition_descriptions(self) -> str:
784         """Update history timestamps for Condition.description."""
785         return self._change_versioned_timestamps(Condition, 'description')
786
787     def do_POST_condition_titles(self) -> str:
788         """Update history timestamps for Condition.title."""
789         return self._change_versioned_timestamps(Condition, 'title')
790
791     @_delete_or_post(Condition, '/conditions')
792     def do_POST_condition(self, condition: Condition) -> str:
793         """Update/insert Condition of ?id= and fields defined in postvars."""
794         title = self._form.get_str_or_fail('title')
795         description = self._form.get_str_or_fail('description')
796         is_active = self._form.get_bool_or_none('is_active')
797         #
798         if is_active is not None:
799             condition.is_active = is_active
800         condition.title.set(title)
801         condition.description.set(description)
802         condition.save(self._conn)
803         return f'/condition?id={condition.id_}'