home · contact · privacy
Fix bug of /day POSTS breaking on empty new_todo fields.
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from __future__ import annotations
3 from inspect import signature
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from binascii import Error as binascii_Exception
7 from http.server import BaseHTTPRequestHandler
8 from http.server import HTTPServer
9 from urllib.parse import urlparse, parse_qs
10 from json import dumps as json_dumps
11 from os.path import split as path_split
12 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
13 from plomtask.dating import date_in_n_days
14 from plomtask.days import Day
15 from plomtask.exceptions import (HandledException, BadFormatException,
16                                  NotFoundException)
17 from plomtask.db import DatabaseConnection, DatabaseFile, BaseModel
18 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
19 from plomtask.conditions import Condition
20 from plomtask.todos import Todo, TodoOrProcStepNode
21 from plomtask.misc import DictableNode
22
23 TEMPLATES_DIR = 'templates'
24
25
26 class TaskServer(HTTPServer):
27     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
28
29     def __init__(self, db_file: DatabaseFile,
30                  *args: Any, **kwargs: Any) -> None:
31         super().__init__(*args, **kwargs)
32         self.db = db_file
33         self.render_mode = 'html'
34         self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
35
36
37 class InputsParser:
38     """Wrapper for validating and retrieving dict-like HTTP inputs."""
39
40     def __init__(self, dict_: dict[str, list[str]]) -> None:
41         self.inputs = dict_
42
43     def get_all_str(self, key: str) -> list[str]:
44         """Retrieve list of string values at key (empty if no key)."""
45         if key not in self.inputs.keys():
46             return []
47         return self.inputs[key]
48
49     def get_all_int(self, key: str, fail_on_empty: bool = False) -> list[int]:
50         """Retrieve list of int values at key."""
51         all_str = self.get_all_str(key)
52         try:
53             return [int(s) for s in all_str if fail_on_empty or s != '']
54         except ValueError as e:
55             msg = f'cannot int a form field value for key {key} in: {all_str}'
56             raise BadFormatException(msg) from e
57
58     def get_str(self, key: str, default: str | None = None) -> str | None:
59         """Retrieve single/first string value of key, or default."""
60         vals = self.get_all_str(key)
61         if vals:
62             return vals[0]
63         return default
64
65     def get_str_or_fail(self, key: str, default: str | None = None) -> str:
66         """Retrieve first string value of key, if none: fail or default."""
67         vals = self.get_all_str(key)
68         if not vals:
69             if default is not None:
70                 return default
71             raise BadFormatException(f'no value found for key: {key}')
72         return vals[0]
73
74     def get_int_or_none(self, key: str) -> int | None:
75         """Retrieve single/first value of key as int, return None if empty."""
76         val = self.get_str_or_fail(key, '')
77         if val == '':
78             return None
79         try:
80             return int(val)
81         except (ValueError, TypeError) as e:
82             msg = f'cannot int form field value for key {key}: {val}'
83             raise BadFormatException(msg) from e
84
85     def get_bool_or_none(self, key: str) -> bool | None:
86         """Return value to key if truish; if no value to key, None."""
87         val = self.get_str(key)
88         if val is None:
89             return None
90         return val in {'True', 'true', '1', 'on'}
91
92     def get_all_of_key_prefixed(self, key_prefix: str) -> dict[str, list[str]]:
93         """Retrieve dict of strings at keys starting with key_prefix."""
94         ret = {}
95         for key in [k for k in self.inputs.keys() if k.startswith(key_prefix)]:
96             ret[key[len(key_prefix):]] = self.inputs[key]
97         return ret
98
99     def get_float_or_fail(self, key: str) -> float:
100         """Retrieve float value of key from self.postvars, fail if none."""
101         val = self.get_str_or_fail(key)
102         try:
103             return float(val)
104         except ValueError as e:
105             msg = f'cannot float form field value for key {key}: {val}'
106             raise BadFormatException(msg) from e
107
108     def get_all_floats_or_nones(self, key: str) -> list[float | None]:
109         """Retrieve list of float value at key, None if empty strings."""
110         ret: list[float | None] = []
111         for val in self.get_all_str(key):
112             if '' == val:
113                 ret += [None]
114             else:
115                 try:
116                     ret += [float(val)]
117                 except ValueError as e:
118                     msg = f'cannot float form field value for key {key}: {val}'
119                     raise BadFormatException(msg) from e
120         return ret
121
122
123 class TaskHandler(BaseHTTPRequestHandler):
124     """Handles single HTTP request."""
125     # pylint: disable=too-many-public-methods
126     server: TaskServer
127     _conn: DatabaseConnection
128     _site: str
129     _form: InputsParser
130     _params: InputsParser
131
132     def _send_page(
133             self, ctx: dict[str, Any], tmpl_name: str, code: int = 200
134             ) -> None:
135         """HTTP-send ctx as HTML or JSON, as defined by .server.render_mode.
136
137         The differentiation by .server.render_mode serves to allow easily
138         comparable JSON responses for automatic testing.
139         """
140         body: str
141         headers: list[tuple[str, str]] = []
142         if 'html' == self.server.render_mode:
143             tmpl = self.server.jinja.get_template(f'{tmpl_name}.html')
144             body = tmpl.render(ctx)
145         else:
146             body = self._ctx_to_json(ctx)
147             headers += [('Content-Type', 'application/json')]
148         self.send_response(code)
149         for header_tuple in headers:
150             self.send_header(*header_tuple)
151         self.end_headers()
152         self.wfile.write(bytes(body, 'utf-8'))
153
154     def _ctx_to_json(self, ctx: dict[str, object]) -> str:
155         """Render ctx into JSON string.
156
157         Flattens any objects that json.dumps might not want to serialize, and
158         turns occurrences of BaseModel objects into listings of their .id_, to
159         be resolved to a full dict inside a top-level '_library' dictionary,
160         to avoid endless and circular nesting.
161         """
162
163         def flatten(node: object) -> object:
164
165             def update_library_with(
166                     item: BaseModel[int] | BaseModel[str]) -> None:
167                 cls_name = item.__class__.__name__
168                 if cls_name not in library:
169                     library[cls_name] = {}
170                 if item.id_ not in library[cls_name]:
171                     d, refs = item.as_dict_and_refs
172                     id_key = '?' if item.id_ is None else item.id_
173                     library[cls_name][id_key] = d
174                     for ref in refs:
175                         update_library_with(ref)
176
177             if isinstance(node, BaseModel):
178                 update_library_with(node)
179                 return node.id_
180             if isinstance(node, DictableNode):
181                 d, refs = node.as_dict_and_refs
182                 for ref in refs:
183                     update_library_with(ref)
184                 return d
185             if isinstance(node, (list, tuple)):
186                 return [flatten(item) for item in node]
187             if isinstance(node, dict):
188                 d = {}
189                 for k, v in node.items():
190                     d[k] = flatten(v)
191                 return d
192             if isinstance(node, HandledException):
193                 return str(node)
194             return node
195
196         library: dict[str, dict[str | int, object]] = {}
197         for k, v in ctx.items():
198             ctx[k] = flatten(v)
199         ctx['_library'] = library
200         return json_dumps(ctx)
201
202     @staticmethod
203     def _request_wrapper(http_method: str, not_found_msg: str
204                          ) -> Callable[..., Callable[[TaskHandler], None]]:
205         """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
206
207         Among other things, conditionally cleans all caches, but only on POST
208         requests, as only those are expected to change the states of objects
209         that may be cached, and certainly only those are expected to write any
210         changes to the database. We want to call them as early though as
211         possible here, either exactly after the specific request handler
212         returns successfully, or right after any exception is triggered –
213         otherwise, race conditions become plausible.
214
215         Note that otherwise any POST attempt, even a failed one, may end in
216         problematic inconsistencies:
217
218         - if the POST handler experiences an Exception, changes to objects
219           won't get written to the DB, but the changed objects may remain in
220           the cache and affect other objects despite their possibly illegal
221           state
222
223         - even if an object was just saved to the DB, we cannot be sure its
224           current state is completely identical to what we'd get if loading it
225           fresh from the DB (e.g. currently Process.n_owners is only updated
226           when loaded anew via .from_table_row, nor is its state written to
227           the DB by .save; a questionable design choice, but proof that we
228           have no guarantee that objects' .save stores all their states we'd
229           prefer at their most up-to-date.
230         """
231
232         def clear_caches() -> None:
233             for cls in (Day, Todo, Condition, Process, ProcessStep):
234                 assert hasattr(cls, 'empty_cache')
235                 cls.empty_cache()
236
237         def decorator(f: Callable[..., str | None]
238                       ) -> Callable[[TaskHandler], None]:
239             def wrapper(self: TaskHandler) -> None:
240                 # pylint: disable=protected-access
241                 # (because pylint here fails to detect the use of wrapper as a
242                 # method to self with respective access privileges)
243                 try:
244                     self._conn = DatabaseConnection(self.server.db)
245                     parsed_url = urlparse(self.path)
246                     self._site = path_split(parsed_url.path)[1]
247                     params = parse_qs(parsed_url.query,
248                                       keep_blank_values=True,
249                                       strict_parsing=True)
250                     self._params = InputsParser(params)
251                     handler_name = f'do_{http_method}_{self._site}'
252                     if hasattr(self, handler_name):
253                         handler = getattr(self, handler_name)
254                         redir_target = f(self, handler)
255                         if 'POST' == http_method:
256                             clear_caches()
257                         if redir_target:
258                             self.send_response(302)
259                             self.send_header('Location', redir_target)
260                             self.end_headers()
261                     else:
262                         msg = f'{not_found_msg}: {self._site}'
263                         raise NotFoundException(msg)
264                 except HandledException as error:
265                     if 'POST' == http_method:
266                         clear_caches()
267                     ctx = {'msg': error}
268                     self._send_page(ctx, 'msg', error.http_code)
269                 finally:
270                     self._conn.close()
271             return wrapper
272         return decorator
273
274     @_request_wrapper('GET', 'Unknown page')
275     def do_GET(self, handler: Callable[[], str | dict[str, object]]
276                ) -> str | None:
277         """Render page with result of handler, or redirect if result is str."""
278         tmpl_name = f'{self._site}'
279         ctx_or_redir_target = handler()
280         if isinstance(ctx_or_redir_target, str):
281             return ctx_or_redir_target
282         self._send_page(ctx_or_redir_target, tmpl_name)
283         return None
284
285     @_request_wrapper('POST', 'Unknown POST target')
286     def do_POST(self, handler: Callable[[], str]) -> str:
287         """Handle POST with handler, prepare redirection to result."""
288         length = int(self.headers['content-length'])
289         postvars = parse_qs(self.rfile.read(length).decode(),
290                             keep_blank_values=True)
291         self._form = InputsParser(postvars)
292         redir_target = handler()
293         self._conn.commit()
294         return redir_target
295
296     # GET handlers
297
298     @staticmethod
299     def _get_item(target_class: Any
300                   ) -> Callable[..., Callable[[TaskHandler],
301                                               dict[str, object]]]:
302         def decorator(f: Callable[..., dict[str, object]]
303                       ) -> Callable[[TaskHandler], dict[str, object]]:
304             def wrapper(self: TaskHandler) -> dict[str, object]:
305                 # pylint: disable=protected-access
306                 # (because pylint here fails to detect the use of wrapper as a
307                 # method to self with respective access privileges)
308                 id_ = None
309                 for val in self._params.get_all_int('id', fail_on_empty=True):
310                     id_ = val
311                 if target_class.can_create_by_id:
312                     item = target_class.by_id_or_create(self._conn, id_)
313                 else:
314                     item = target_class.by_id(self._conn, id_)
315                 if 'exists' in signature(f).parameters:
316                     exists = id_ is not None and target_class._get_cached(id_)
317                     return f(self, item, exists)
318                 return f(self, item)
319             return wrapper
320         return decorator
321
322     def do_GET_(self) -> str:
323         """Return redirect target on GET /."""
324         return '/day'
325
326     def _do_GET_calendar(self) -> dict[str, object]:
327         """Show Days from ?start= to ?end=.
328
329         Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
330         same, the only difference being the HTML template they are rendered to,
331         which .do_GET selects from their method name.
332         """
333         start = self._params.get_str_or_fail('start', '')
334         end = self._params.get_str_or_fail('end', '')
335         end = end if end != '' else date_in_n_days(366)
336         #
337         days, start, end = Day.by_date_range_with_limits(self._conn,
338                                                          (start, end), 'id')
339         days = Day.with_filled_gaps(days, start, end)
340         today = date_in_n_days(0)
341         return {'start': start, 'end': end, 'days': days, 'today': today}
342
343     def do_GET_calendar(self) -> dict[str, object]:
344         """Show Days from ?start= to ?end= – normal view."""
345         return self._do_GET_calendar()
346
347     def do_GET_calendar_txt(self) -> dict[str, object]:
348         """Show Days from ?start= to ?end= – minimalist view."""
349         return self._do_GET_calendar()
350
351     def do_GET_day(self) -> dict[str, object]:
352         """Show single Day of ?date=."""
353         date = self._params.get_str('date', date_in_n_days(0))
354         make_type = self._params.get_str_or_fail('make_type', 'full')
355         #
356         day = Day.by_id_or_create(self._conn, date)
357         conditions_present = []
358         enablers_for = {}
359         disablers_for = {}
360         for todo in day.todos:
361             for condition in todo.conditions + todo.blockers:
362                 if condition not in conditions_present:
363                     conditions_present += [condition]
364                     enablers_for[condition.id_] = [p for p in
365                                                    Process.all(self._conn)
366                                                    if condition in p.enables]
367                     disablers_for[condition.id_] = [p for p in
368                                                     Process.all(self._conn)
369                                                     if condition in p.disables]
370         seen_todos: set[int] = set()
371         top_nodes = [t.get_step_tree(seen_todos)
372                      for t in day.todos if not t.parents]
373         return {'day': day,
374                 'top_nodes': top_nodes,
375                 'make_type': make_type,
376                 'enablers_for': enablers_for,
377                 'disablers_for': disablers_for,
378                 'conditions_present': conditions_present,
379                 'processes': Process.all(self._conn)}
380
381     @_get_item(Todo)
382     def do_GET_todo(self, todo: Todo) -> dict[str, object]:
383         """Show single Todo of ?id=."""
384
385         def walk_process_steps(node_id: int,
386                                process_step_nodes: list[ProcessStepsNode],
387                                steps_nodes: list[TodoOrProcStepNode]) -> int:
388             for process_step_node in process_step_nodes:
389                 node_id += 1
390                 proc = Process.by_id(self._conn,
391                                      process_step_node.step.step_process_id)
392                 node = TodoOrProcStepNode(node_id, None, proc, [])
393                 steps_nodes += [node]
394                 node_id = walk_process_steps(
395                         node_id, process_step_node.steps, node.children)
396             return node_id
397
398         def walk_todo_steps(node_id: int, todos: list[Todo],
399                             steps_nodes: list[TodoOrProcStepNode]) -> int:
400             for todo in todos:
401                 matched = False
402                 for match in [item for item in steps_nodes
403                               if item.process
404                               and item.process == todo.process]:
405                     match.todo = todo
406                     matched = True
407                     for child in match.children:
408                         child.fillable = True
409                     node_id = walk_todo_steps(
410                             node_id, todo.children, match.children)
411                 if not matched:
412                     node_id += 1
413                     node = TodoOrProcStepNode(node_id, todo, None, [])
414                     steps_nodes += [node]
415                     node_id = walk_todo_steps(
416                             node_id, todo.children, node.children)
417             return node_id
418
419         def collect_adoptables_keys(
420                 steps_nodes: list[TodoOrProcStepNode]) -> set[int]:
421             ids = set()
422             for node in steps_nodes:
423                 if not node.todo:
424                     assert isinstance(node.process, Process)
425                     assert isinstance(node.process.id_, int)
426                     ids.add(node.process.id_)
427                 ids = ids | collect_adoptables_keys(node.children)
428             return ids
429
430         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
431         process_tree = todo.process.get_steps(self._conn, None)
432         steps_todo_to_process: list[TodoOrProcStepNode] = []
433         last_node_id = walk_process_steps(0, process_tree,
434                                           steps_todo_to_process)
435         for steps_node in steps_todo_to_process:
436             steps_node.fillable = True
437         walk_todo_steps(last_node_id, todo_steps, steps_todo_to_process)
438         adoptables: dict[int, list[Todo]] = {}
439         any_adoptables = [Todo.by_id(self._conn, t.id_)
440                           for t in Todo.by_date(self._conn, todo.date)
441                           if t.id_ is not None
442                           and t != todo]
443         for id_ in collect_adoptables_keys(steps_todo_to_process):
444             adoptables[id_] = [t for t in any_adoptables
445                                if t.process.id_ == id_]
446         return {'todo': todo,
447                 'steps_todo_to_process': steps_todo_to_process,
448                 'adoption_candidates_for': adoptables,
449                 'process_candidates': sorted(Process.all(self._conn)),
450                 'todo_candidates': any_adoptables,
451                 'condition_candidates': Condition.all(self._conn)}
452
453     def do_GET_todos(self) -> dict[str, object]:
454         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
455         sort_by = self._params.get_str_or_fail('sort_by', 'title')
456         start = self._params.get_str_or_fail('start', '')
457         end = self._params.get_str_or_fail('end', '')
458         process_id = self._params.get_int_or_none('process_id')
459         comment_pattern = self._params.get_str_or_fail('comment_pattern', '')
460         #
461         ret = Todo.by_date_range_with_limits(self._conn, (start, end))
462         todos_by_date_range, start, end = ret
463         todos = [t for t in todos_by_date_range
464                  if comment_pattern in t.comment
465                  and ((not process_id) or t.process.id_ == process_id)]
466         sort_by = Todo.sort_by(todos, sort_by)
467         return {'start': start, 'end': end, 'process_id': process_id,
468                 'comment_pattern': comment_pattern, 'todos': todos,
469                 'all_processes': Process.all(self._conn), 'sort_by': sort_by}
470
471     def do_GET_conditions(self) -> dict[str, object]:
472         """Show all Conditions."""
473         pattern = self._params.get_str_or_fail('pattern', '')
474         sort_by = self._params.get_str_or_fail('sort_by', 'title')
475         #
476         conditions = Condition.matching(self._conn, pattern)
477         sort_by = Condition.sort_by(conditions, sort_by)
478         return {'conditions': conditions,
479                 'sort_by': sort_by,
480                 'pattern': pattern}
481
482     @_get_item(Condition)
483     def do_GET_condition(self,
484                          c: Condition,
485                          exists: bool
486                          ) -> dict[str, object]:
487         """Show Condition of ?id=."""
488         ps = Process.all(self._conn)
489         return {'condition': c,
490                 'is_new': not exists,
491                 'enabled_processes': [p for p in ps if c in p.conditions],
492                 'disabled_processes': [p for p in ps if c in p.blockers],
493                 'enabling_processes': [p for p in ps if c in p.enables],
494                 'disabling_processes': [p for p in ps if c in p.disables]}
495
496     @_get_item(Condition)
497     def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
498         """Show title history of Condition of ?id=."""
499         return {'condition': c}
500
501     @_get_item(Condition)
502     def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
503         """Show description historys of Condition of ?id=."""
504         return {'condition': c}
505
506     @_get_item(Process)
507     def do_GET_process(self,
508                        process: Process,
509                        exists: bool
510                        ) -> dict[str, object]:
511         """Show Process of ?id=."""
512         owner_ids = self._params.get_all_int('step_to')
513         owned_ids = self._params.get_all_int('has_step')
514         title_64 = self._params.get_str('title_b64')
515         title_new = None
516         if title_64:
517             try:
518                 title_new = b64decode(title_64.encode()).decode()
519             except binascii_Exception as exc:
520                 msg = 'invalid base64 for ?title_b64='
521                 raise BadFormatException(msg) from exc
522         #
523         if title_new:
524             process.title.set(title_new)
525         preset_top_step = None
526         owners = process.used_as_step_by(self._conn)
527         for step_id in owner_ids:
528             owners += [Process.by_id(self._conn, step_id)]
529         for process_id in owned_ids:
530             Process.by_id(self._conn, process_id)  # to ensure ID exists
531             preset_top_step = process_id
532         return {'process': process,
533                 'is_new': not exists,
534                 'preset_top_step': preset_top_step,
535                 'steps': process.get_steps(self._conn),
536                 'owners': owners,
537                 'n_todos': len(Todo.by_process_id(self._conn, process.id_)),
538                 'process_candidates': Process.all(self._conn),
539                 'condition_candidates': Condition.all(self._conn)}
540
541     @_get_item(Process)
542     def do_GET_process_titles(self, p: Process) -> dict[str, object]:
543         """Show title history of Process of ?id=."""
544         return {'process': p}
545
546     @_get_item(Process)
547     def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
548         """Show description historys of Process of ?id=."""
549         return {'process': p}
550
551     @_get_item(Process)
552     def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
553         """Show default effort history of Process of ?id=."""
554         return {'process': p}
555
556     def do_GET_processes(self) -> dict[str, object]:
557         """Show all Processes."""
558         pattern = self._params.get_str_or_fail('pattern', '')
559         sort_by = self._params.get_str_or_fail('sort_by', 'title')
560         #
561         processes = Process.matching(self._conn, pattern)
562         sort_by = Process.sort_by(processes, sort_by)
563         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
564
565     # POST handlers
566
567     @staticmethod
568     def _delete_or_post(target_class: Any, redir_target: str = '/'
569                         ) -> Callable[..., Callable[[TaskHandler], str]]:
570         def decorator(f: Callable[..., str]
571                       ) -> Callable[[TaskHandler], str]:
572             def wrapper(self: TaskHandler) -> str:
573                 # pylint: disable=protected-access
574                 # (because pylint here fails to detect the use of wrapper as a
575                 # method to self with respective access privileges)
576                 id_ = self._params.get_int_or_none('id')
577                 for _ in self._form.get_all_str('delete'):
578                     if id_ is None:
579                         msg = 'trying to delete non-saved ' +\
580                                 f'{target_class.__name__}'
581                         raise NotFoundException(msg)
582                     item = target_class.by_id(self._conn, id_)
583                     item.remove(self._conn)
584                     return redir_target
585                 if target_class.can_create_by_id:
586                     item = target_class.by_id_or_create(self._conn, id_)
587                 else:
588                     item = target_class.by_id(self._conn, id_)
589                 return f(self, item)
590             return wrapper
591         return decorator
592
593     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
594         """Update history timestamps for VersionedAttribute."""
595         id_ = self._params.get_int_or_none('id')
596         item = cls.by_id(self._conn, id_)
597         attr = getattr(item, attr_name)
598         for k, vals in self._form.get_all_of_key_prefixed('at:').items():
599             if k[19:] != vals[0]:
600                 attr.reset_timestamp(k, f'{vals[0]}.0')
601         attr.save(self._conn)
602         return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
603
604     def do_POST_day(self) -> str:
605         """Update or insert Day of date and Todos mapped to it."""
606         # pylint: disable=too-many-locals
607         date = self._params.get_str_or_fail('date')
608         day_comment = self._form.get_str_or_fail('day_comment')
609         make_type = self._form.get_str_or_fail('make_type')
610         old_todos = self._form.get_all_int('todo_id')
611         new_todos_by_process = self._form.get_all_int('new_todo')
612         comments = self._form.get_all_str('comment')
613         efforts = self._form.get_all_floats_or_nones('effort')
614         done_todos = self._form.get_all_int('done')
615         is_done = [t_id in done_todos for t_id in old_todos]
616         if not (len(old_todos) == len(is_done) == len(comments)
617                 == len(efforts)):
618             msg = 'not equal number each of number of todo_id, comments, ' +\
619                     'and efforts inputs'
620             raise BadFormatException(msg)
621         for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
622             raise BadFormatException('"done" field refers to unknown Todo')
623         #
624         day = Day.by_id_or_create(self._conn, date)
625         day.comment = day_comment
626         day.save(self._conn)
627         new_todos = []
628         for process_id in sorted(new_todos_by_process):
629             process = Process.by_id(self._conn, process_id)
630             todo = Todo(None, process, False, date)
631             todo.save(self._conn)
632             new_todos += [todo]
633         if 'full' == make_type:
634             for todo in new_todos:
635                 todo.ensure_children(self._conn)
636         for i, todo_id in enumerate(old_todos):
637             todo = Todo.by_id(self._conn, todo_id)
638             todo.is_done = is_done[i]
639             todo.comment = comments[i]
640             todo.effort = efforts[i]
641             todo.save(self._conn)
642         return f'/day?date={date}&make_type={make_type}'
643
644     @_delete_or_post(Todo, '/')
645     def do_POST_todo(self, todo: Todo) -> str:
646         """Update Todo and its children."""
647         # pylint: disable=too-many-locals
648         # pylint: disable=too-many-branches
649         # pylint: disable=too-many-statements
650         assert todo.id_ is not None
651         adoptees = [(id_, todo.id_) for id_ in self._form.get_all_int('adopt')]
652         to_make = {'full': [(id_, todo.id_)
653                             for id_ in self._form.get_all_int('make_full')],
654                    'empty': [(id_, todo.id_)
655                              for id_ in self._form.get_all_int('make_empty')]}
656         step_fillers_to = self._form.get_all_of_key_prefixed('step_filler_to_')
657         to_update: dict[str, Any] = {
658             'comment': self._form.get_str_or_fail('comment', '')}
659         for k in ('is_done', 'calendarize'):
660             v = self._form.get_bool_or_none(k)
661             if v is not None:
662                 to_update[k] = v
663         cond_rels = [self._form.get_all_int(name) for name in
664                      ['conditions', 'blockers', 'enables', 'disables']]
665         effort_or_not = self._form.get_str('effort')
666         if effort_or_not is not None:
667             if effort_or_not == '':
668                 to_update['effort'] = None
669             else:
670                 try:
671                     to_update['effort'] = float(effort_or_not)
672                 except ValueError as e:
673                     msg = 'cannot float form field value for key: effort'
674                     raise BadFormatException(msg) from e
675         for k, fillers in step_fillers_to.items():
676             try:
677                 parent_id = int(k)
678             except ValueError as e:
679                 msg = f'bad step_filler_to_ key: {k}'
680                 raise BadFormatException(msg) from e
681             for filler in [f for f in fillers if f != 'ignore']:
682                 target_id: int
683                 prefix = 'make_'
684                 to_int = filler[5:] if filler.startswith(prefix) else filler
685                 try:
686                     target_id = int(to_int)
687                 except ValueError as e:
688                     msg = f'bad fill_for target: {filler}'
689                     raise BadFormatException(msg) from e
690                 if filler.startswith(prefix):
691                     to_make['empty'] += [(target_id, parent_id)]
692                 else:
693                     adoptees += [(target_id, parent_id)]
694         #
695         todo.set_condition_relations(self._conn, *cond_rels)
696         for parent in [Todo.by_id(self._conn, a[1])
697                        for a in adoptees] + [todo]:
698             for child in parent.children:
699                 if child not in [t[0] for t in adoptees
700                                  if t[0] == child.id_ and t[1] == parent.id_]:
701                     parent.remove_child(child)
702                     parent.save(self._conn)
703         for child_id, parent_id in adoptees:
704             parent = Todo.by_id(self._conn, parent_id)
705             if child_id not in [c.id_ for c in parent.children]:
706                 parent.add_child(Todo.by_id(self._conn, child_id))
707                 parent.save(self._conn)
708         todo.update_attrs(**to_update)
709         for approach, make_data in to_make.items():
710             for process_id, parent_id in make_data:
711                 parent = Todo.by_id(self._conn, parent_id)
712                 process = Process.by_id(self._conn, process_id)
713                 made = Todo(None, process, False, todo.date)
714                 made.save(self._conn)
715                 if 'full' == approach:
716                     made.ensure_children(self._conn)
717                 parent.add_child(made)
718                 parent.save(self._conn)
719         # todo.save() may destroy Todo if .effort < 0, so retrieve .id_ early
720         url = f'/todo?id={todo.id_}'
721         todo.save(self._conn)
722         return url
723
724     def do_POST_process_descriptions(self) -> str:
725         """Update history timestamps for Process.description."""
726         return self._change_versioned_timestamps(Process, 'description')
727
728     def do_POST_process_efforts(self) -> str:
729         """Update history timestamps for Process.effort."""
730         return self._change_versioned_timestamps(Process, 'effort')
731
732     def do_POST_process_titles(self) -> str:
733         """Update history timestamps for Process.title."""
734         return self._change_versioned_timestamps(Process, 'title')
735
736     @_delete_or_post(Process, '/processes')
737     def do_POST_process(self, process: Process) -> str:
738         """Update or insert Process of ?id= and fields defined in postvars."""
739         # pylint: disable=too-many-locals
740
741         def id_or_title(l_id_or_title: list[str]) -> tuple[str, list[int]]:
742             l_ids, title = [], ''
743             for id_or_title in l_id_or_title:
744                 try:
745                     l_ids += [int(id_or_title)]
746                 except ValueError:
747                     title = id_or_title
748             return title, l_ids
749
750         versioned = {'title': self._form.get_str_or_fail('title'),
751                      'description': self._form.get_str_or_fail('description'),
752                      'effort': self._form.get_float_or_fail('effort')}
753         cond_rels = [self._form.get_all_int(s) for s
754                      in ['conditions', 'blockers', 'enables', 'disables']]
755         calendarize = self._form.get_bool_or_none('calendarize')
756         step_of = self._form.get_all_str('step_of')
757         suppressions = self._form.get_all_int('suppresses')
758         kept_steps = self._form.get_all_int('kept_steps')
759         new_top_step_procs = self._form.get_all_str('new_top_step')
760         new_steps_to = {}
761         for step_id in kept_steps:
762             name = f'new_step_to_{step_id}'
763             new_steps_to[step_id] = self._form.get_all_int(name)
764         new_owner_title, owners_to_set = id_or_title(step_of)
765         new_step_title, new_top_step_proc_ids = id_or_title(new_top_step_procs)
766         #
767         for k, v in versioned.items():
768             getattr(process, k).set(v)
769         if calendarize is not None:
770             process.calendarize = calendarize
771         process.save(self._conn)
772         assert isinstance(process.id_, int)
773         # set relations to Conditions and ProcessSteps / other Processes
774         process.set_condition_relations(self._conn, *cond_rels)
775         owned_steps = []
776         for step_id in kept_steps:
777             owned_steps += [ProcessStep.by_id(self._conn, step_id)]
778             owned_steps += [  # new sub-steps
779                     ProcessStep(None, process.id_, step_process_id, step_id)
780                     for step_process_id in new_steps_to[step_id]]
781         for step_process_id in new_top_step_proc_ids:
782             owned_steps += [ProcessStep(None, process.id_, step_process_id,
783                                         None)]
784         process.set_step_relations(self._conn, owners_to_set, suppressions,
785                                    owned_steps)
786         # encode titles for potential newly-to-create Processes up or down
787         params = f'id={process.id_}'
788         if new_step_title:
789             title_b64_encoded = b64encode(new_step_title.encode()).decode()
790             params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
791         elif new_owner_title:
792             title_b64_encoded = b64encode(new_owner_title.encode()).decode()
793             params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
794         process.save(self._conn)
795         return f'/process?{params}'
796
797     def do_POST_condition_descriptions(self) -> str:
798         """Update history timestamps for Condition.description."""
799         return self._change_versioned_timestamps(Condition, 'description')
800
801     def do_POST_condition_titles(self) -> str:
802         """Update history timestamps for Condition.title."""
803         return self._change_versioned_timestamps(Condition, 'title')
804
805     @_delete_or_post(Condition, '/conditions')
806     def do_POST_condition(self, condition: Condition) -> str:
807         """Update/insert Condition of ?id= and fields defined in postvars."""
808         title = self._form.get_str_or_fail('title')
809         description = self._form.get_str_or_fail('description')
810         is_active = self._form.get_bool_or_none('is_active')
811         #
812         if is_active is not None:
813             condition.is_active = is_active
814         condition.title.set(title)
815         condition.description.set(description)
816         condition.save(self._conn)
817         return f'/condition?id={condition.id_}'