home · contact · privacy
Simplify do_POST_todo code.
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from __future__ import annotations
3 from typing import Any, Callable
4 from base64 import b64encode, b64decode
5 from binascii import Error as binascii_Exception
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
15                                  NotFoundException)
16 from plomtask.db import DatabaseConnection, DatabaseFile, BaseModel
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo, TodoOrProcStepNode
20 from plomtask.misc import DictableNode
21
22 TEMPLATES_DIR = 'templates'
23
24
25 class TaskServer(HTTPServer):
26     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
27
28     def __init__(self, db_file: DatabaseFile,
29                  *args: Any, **kwargs: Any) -> None:
30         super().__init__(*args, **kwargs)
31         self.db = db_file
32         self.render_mode = 'html'
33         self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
34
35
36 class InputsParser:
37     """Wrapper for validating and retrieving dict-like HTTP inputs."""
38
39     def __init__(self, dict_: dict[str, list[str]]) -> None:
40         self.inputs = dict_
41
42     def get_all_str(self, key: str) -> list[str]:
43         """Retrieve list of string values at key (empty if no key)."""
44         if key not in self.inputs.keys():
45             return []
46         return self.inputs[key]
47
48     def get_all_int(self, key: str) -> list[int]:
49         """Retrieve list of int values at key."""
50         all_str = self.get_all_str(key)
51         try:
52             return [int(s) for s in all_str if len(s) > 0]
53         except ValueError as e:
54             msg = f'cannot int a form field value for key {key} in: {all_str}'
55             raise BadFormatException(msg) from e
56
57     def get_str(self, key: str, default: str | None = None) -> str | None:
58         """Retrieve single/first string value of key, or default."""
59         vals = self.get_all_str(key)
60         if vals:
61             return vals[0]
62         return default
63
64     def get_str_or_fail(self, key: str, default: str | None = None) -> str:
65         """Retrieve first string value of key, if none: fail or default."""
66         vals = self.get_all_str(key)
67         if not vals:
68             if default is not None:
69                 return default
70             raise BadFormatException(f'no value found for key: {key}')
71         return vals[0]
72
73     def get_int_or_none(self, key: str) -> int | None:
74         """Retrieve single/first value of key as int, return None if empty."""
75         val = self.get_str_or_fail(key, '')
76         if val == '':
77             return None
78         try:
79             return int(val)
80         except (ValueError, TypeError) as e:
81             msg = f'cannot int form field value for key {key}: {val}'
82             raise BadFormatException(msg) from e
83
84     def get_bool_or_none(self, key: str) -> bool | None:
85         """Return value to key if truish; if no value to key, None."""
86         val = self.get_str(key)
87         if val is None:
88             return None
89         return val in {'True', 'true', '1', 'on'}
90
91     def get_firsts_of_key_prefixed(self, prefix: str) -> dict[str, str]:
92         """Retrieve dict of (first) strings at key starting with prefix."""
93         ret = {}
94         for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
95             ret[key] = self.inputs[key][0]
96         return ret
97
98     def get_float_or_fail(self, key: str) -> float:
99         """Retrieve float value of key from self.postvars, fail if none."""
100         val = self.get_str_or_fail(key)
101         try:
102             return float(val)
103         except ValueError as e:
104             msg = f'cannot float form field value for key {key}: {val}'
105             raise BadFormatException(msg) from e
106
107     def get_all_floats_or_nones(self, key: str) -> list[float | None]:
108         """Retrieve list of float value at key, None if empty strings."""
109         ret: list[float | None] = []
110         for val in self.get_all_str(key):
111             if '' == val:
112                 ret += [None]
113             else:
114                 try:
115                     ret += [float(val)]
116                 except ValueError as e:
117                     msg = f'cannot float form field value for key {key}: {val}'
118                     raise BadFormatException(msg) from e
119         return ret
120
121
122 class TaskHandler(BaseHTTPRequestHandler):
123     """Handles single HTTP request."""
124     # pylint: disable=too-many-public-methods
125     server: TaskServer
126     _conn: DatabaseConnection
127     _site: str
128     _form: InputsParser
129     _params: InputsParser
130
131     def _send_page(
132             self, ctx: dict[str, Any], tmpl_name: str, code: int = 200
133             ) -> None:
134         """HTTP-send ctx as HTML or JSON, as defined by .server.render_mode.
135
136         The differentiation by .server.render_mode serves to allow easily
137         comparable JSON responses for automatic testing.
138         """
139         body: str
140         headers: list[tuple[str, str]] = []
141         if 'html' == self.server.render_mode:
142             tmpl = self.server.jinja.get_template(f'{tmpl_name}.html')
143             body = tmpl.render(ctx)
144         else:
145             body = self._ctx_to_json(ctx)
146             headers += [('Content-Type', 'application/json')]
147         self.send_response(code)
148         for header_tuple in headers:
149             self.send_header(*header_tuple)
150         self.end_headers()
151         self.wfile.write(bytes(body, 'utf-8'))
152
153     def _ctx_to_json(self, ctx: dict[str, object]) -> str:
154         """Render ctx into JSON string.
155
156         Flattens any objects that json.dumps might not want to serialize, and
157         turns occurrences of BaseModel objects into listings of their .id_, to
158         be resolved to a full dict inside a top-level '_library' dictionary,
159         to avoid endless and circular nesting.
160         """
161
162         def flatten(node: object) -> object:
163
164             def update_library_with(
165                     item: BaseModel[int] | BaseModel[str]) -> None:
166                 cls_name = item.__class__.__name__
167                 if cls_name not in library:
168                     library[cls_name] = {}
169                 if item.id_ not in library[cls_name]:
170                     d, refs = item.as_dict_and_refs
171                     id_key = '?' if item.id_ is None else item.id_
172                     library[cls_name][id_key] = d
173                     for ref in refs:
174                         update_library_with(ref)
175
176             if isinstance(node, BaseModel):
177                 update_library_with(node)
178                 return node.id_
179             if isinstance(node, DictableNode):
180                 d, refs = node.as_dict_and_refs
181                 for ref in refs:
182                     update_library_with(ref)
183                 return d
184             if isinstance(node, (list, tuple)):
185                 return [flatten(item) for item in node]
186             if isinstance(node, dict):
187                 d = {}
188                 for k, v in node.items():
189                     d[k] = flatten(v)
190                 return d
191             if isinstance(node, HandledException):
192                 return str(node)
193             return node
194
195         library: dict[str, dict[str | int, object]] = {}
196         for k, v in ctx.items():
197             ctx[k] = flatten(v)
198         ctx['_library'] = library
199         return json_dumps(ctx)
200
201     @staticmethod
202     def _request_wrapper(http_method: str, not_found_msg: str
203                          ) -> Callable[..., Callable[[TaskHandler], None]]:
204         """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
205
206         Among other things, conditionally cleans all caches, but only on POST
207         requests, as only those are expected to change the states of objects
208         that may be cached, and certainly only those are expected to write any
209         changes to the database. We want to call them as early though as
210         possible here, either exactly after the specific request handler
211         returns successfully, or right after any exception is triggered –
212         otherwise, race conditions become plausible.
213
214         Note that otherwise any POST attempt, even a failed one, may end in
215         problematic inconsistencies:
216
217         - if the POST handler experiences an Exception, changes to objects
218           won't get written to the DB, but the changed objects may remain in
219           the cache and affect other objects despite their possibly illegal
220           state
221
222         - even if an object was just saved to the DB, we cannot be sure its
223           current state is completely identical to what we'd get if loading it
224           fresh from the DB (e.g. currently Process.n_owners is only updated
225           when loaded anew via .from_table_row, nor is its state written to
226           the DB by .save; a questionable design choice, but proof that we
227           have no guarantee that objects' .save stores all their states we'd
228           prefer at their most up-to-date.
229         """
230
231         def clear_caches() -> None:
232             for cls in (Day, Todo, Condition, Process, ProcessStep):
233                 assert hasattr(cls, 'empty_cache')
234                 cls.empty_cache()
235
236         def decorator(f: Callable[..., str | None]
237                       ) -> Callable[[TaskHandler], None]:
238             def wrapper(self: TaskHandler) -> None:
239                 # pylint: disable=protected-access
240                 # (because pylint here fails to detect the use of wrapper as a
241                 # method to self with respective access privileges)
242                 try:
243                     self._conn = DatabaseConnection(self.server.db)
244                     parsed_url = urlparse(self.path)
245                     self._site = path_split(parsed_url.path)[1]
246                     params = parse_qs(parsed_url.query,
247                                       keep_blank_values=True,
248                                       strict_parsing=True)
249                     self._params = InputsParser(params)
250                     handler_name = f'do_{http_method}_{self._site}'
251                     if hasattr(self, handler_name):
252                         handler = getattr(self, handler_name)
253                         redir_target = f(self, handler)
254                         if 'POST' == http_method:
255                             clear_caches()
256                         if redir_target:
257                             self.send_response(302)
258                             self.send_header('Location', redir_target)
259                             self.end_headers()
260                     else:
261                         msg = f'{not_found_msg}: {self._site}'
262                         raise NotFoundException(msg)
263                 except HandledException as error:
264                     if 'POST' == http_method:
265                         clear_caches()
266                     ctx = {'msg': error}
267                     self._send_page(ctx, 'msg', error.http_code)
268                 finally:
269                     self._conn.close()
270             return wrapper
271         return decorator
272
273     @_request_wrapper('GET', 'Unknown page')
274     def do_GET(self, handler: Callable[[], str | dict[str, object]]
275                ) -> str | None:
276         """Render page with result of handler, or redirect if result is str."""
277         tmpl_name = f'{self._site}'
278         ctx_or_redir_target = handler()
279         if isinstance(ctx_or_redir_target, str):
280             return ctx_or_redir_target
281         self._send_page(ctx_or_redir_target, tmpl_name)
282         return None
283
284     @_request_wrapper('POST', 'Unknown POST target')
285     def do_POST(self, handler: Callable[[], str]) -> str:
286         """Handle POST with handler, prepare redirection to result."""
287         length = int(self.headers['content-length'])
288         postvars = parse_qs(self.rfile.read(length).decode(),
289                             keep_blank_values=True)
290         self._form = InputsParser(postvars)
291         redir_target = handler()
292         self._conn.commit()
293         return redir_target
294
295     # GET handlers
296
297     @staticmethod
298     def _get_item(target_class: Any
299                   ) -> Callable[..., Callable[[TaskHandler],
300                                               dict[str, object]]]:
301         def decorator(f: Callable[..., dict[str, object]]
302                       ) -> Callable[[TaskHandler], dict[str, object]]:
303             def wrapper(self: TaskHandler) -> dict[str, object]:
304                 # pylint: disable=protected-access
305                 # (because pylint here fails to detect the use of wrapper as a
306                 # method to self with respective access privileges)
307                 id_ = self._params.get_int_or_none('id')
308                 if target_class.can_create_by_id:
309                     item = target_class.by_id_or_create(self._conn, id_)
310                 else:
311                     item = target_class.by_id(self._conn, id_)
312                 return f(self, item)
313             return wrapper
314         return decorator
315
316     def do_GET_(self) -> str:
317         """Return redirect target on GET /."""
318         return '/day'
319
320     def _do_GET_calendar(self) -> dict[str, object]:
321         """Show Days from ?start= to ?end=.
322
323         Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
324         same, the only difference being the HTML template they are rendered to,
325         which .do_GET selects from their method name.
326         """
327         start = self._params.get_str_or_fail('start', '')
328         end = self._params.get_str_or_fail('end', '')
329         end = end if end != '' else date_in_n_days(366)
330         #
331         days, start, end = Day.by_date_range_with_limits(self._conn,
332                                                          (start, end), 'id')
333         days = Day.with_filled_gaps(days, start, end)
334         today = date_in_n_days(0)
335         return {'start': start, 'end': end, 'days': days, 'today': today}
336
337     def do_GET_calendar(self) -> dict[str, object]:
338         """Show Days from ?start= to ?end= – normal view."""
339         return self._do_GET_calendar()
340
341     def do_GET_calendar_txt(self) -> dict[str, object]:
342         """Show Days from ?start= to ?end= – minimalist view."""
343         return self._do_GET_calendar()
344
345     def do_GET_day(self) -> dict[str, object]:
346         """Show single Day of ?date=."""
347         date = self._params.get_str_or_fail('date', date_in_n_days(0))
348         make_type = self._params.get_str_or_fail('make_type', 'full')
349         #
350         day = Day.by_id_or_create(self._conn, date)
351         conditions_present = []
352         enablers_for = {}
353         disablers_for = {}
354         for todo in day.todos:
355             for condition in todo.conditions + todo.blockers:
356                 if condition not in conditions_present:
357                     conditions_present += [condition]
358                     enablers_for[condition.id_] = [p for p in
359                                                    Process.all(self._conn)
360                                                    if condition in p.enables]
361                     disablers_for[condition.id_] = [p for p in
362                                                     Process.all(self._conn)
363                                                     if condition in p.disables]
364         seen_todos: set[int] = set()
365         top_nodes = [t.get_step_tree(seen_todos)
366                      for t in day.todos if not t.parents]
367         return {'day': day,
368                 'top_nodes': top_nodes,
369                 'make_type': make_type,
370                 'enablers_for': enablers_for,
371                 'disablers_for': disablers_for,
372                 'conditions_present': conditions_present,
373                 'processes': Process.all(self._conn)}
374
375     @_get_item(Todo)
376     def do_GET_todo(self, todo: Todo) -> dict[str, object]:
377         """Show single Todo of ?id=."""
378
379         def walk_process_steps(node_id: int,
380                                process_step_nodes: list[ProcessStepsNode],
381                                steps_nodes: list[TodoOrProcStepNode]) -> int:
382             for process_step_node in process_step_nodes:
383                 node_id += 1
384                 proc = Process.by_id(self._conn,
385                                      process_step_node.step.step_process_id)
386                 node = TodoOrProcStepNode(node_id, None, proc, [])
387                 steps_nodes += [node]
388                 node_id = walk_process_steps(
389                         node_id, process_step_node.steps, node.children)
390             return node_id
391
392         def walk_todo_steps(node_id: int, todos: list[Todo],
393                             steps_nodes: list[TodoOrProcStepNode]) -> int:
394             for todo in todos:
395                 matched = False
396                 for match in [item for item in steps_nodes
397                               if item.process
398                               and item.process == todo.process]:
399                     match.todo = todo
400                     matched = True
401                     for child in match.children:
402                         child.fillable = True
403                     node_id = walk_todo_steps(
404                             node_id, todo.children, match.children)
405                 if not matched:
406                     node_id += 1
407                     node = TodoOrProcStepNode(node_id, todo, None, [])
408                     steps_nodes += [node]
409                     node_id = walk_todo_steps(
410                             node_id, todo.children, node.children)
411             return node_id
412
413         def collect_adoptables_keys(
414                 steps_nodes: list[TodoOrProcStepNode]) -> set[int]:
415             ids = set()
416             for node in steps_nodes:
417                 if not node.todo:
418                     assert isinstance(node.process, Process)
419                     assert isinstance(node.process.id_, int)
420                     ids.add(node.process.id_)
421                 ids = ids | collect_adoptables_keys(node.children)
422             return ids
423
424         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
425         process_tree = todo.process.get_steps(self._conn, None)
426         steps_todo_to_process: list[TodoOrProcStepNode] = []
427         last_node_id = walk_process_steps(0, process_tree,
428                                           steps_todo_to_process)
429         for steps_node in steps_todo_to_process:
430             steps_node.fillable = True
431         walk_todo_steps(last_node_id, todo_steps, steps_todo_to_process)
432         adoptables: dict[int, list[Todo]] = {}
433         any_adoptables = [Todo.by_id(self._conn, t.id_)
434                           for t in Todo.by_date(self._conn, todo.date)
435                           if t.id_ is not None
436                           and t != todo]
437         for id_ in collect_adoptables_keys(steps_todo_to_process):
438             adoptables[id_] = [t for t in any_adoptables
439                                if t.process.id_ == id_]
440         return {'todo': todo,
441                 'steps_todo_to_process': steps_todo_to_process,
442                 'adoption_candidates_for': adoptables,
443                 'process_candidates': sorted(Process.all(self._conn)),
444                 'todo_candidates': any_adoptables,
445                 'condition_candidates': Condition.all(self._conn)}
446
447     def do_GET_todos(self) -> dict[str, object]:
448         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
449         sort_by = self._params.get_str_or_fail('sort_by', 'title')
450         start = self._params.get_str_or_fail('start', '')
451         end = self._params.get_str_or_fail('end', '')
452         process_id = self._params.get_int_or_none('process_id')
453         comment_pattern = self._params.get_str_or_fail('comment_pattern', '')
454         #
455         todos = []
456         ret = Todo.by_date_range_with_limits(self._conn, (start, end))
457         todos_by_date_range, start, end = ret
458         todos = [t for t in todos_by_date_range
459                  if comment_pattern in t.comment
460                  and ((not process_id) or t.process.id_ == process_id)]
461         sort_by = Todo.sort_by(todos, sort_by)
462         return {'start': start, 'end': end, 'process_id': process_id,
463                 'comment_pattern': comment_pattern, 'todos': todos,
464                 'all_processes': Process.all(self._conn), 'sort_by': sort_by}
465
466     def do_GET_conditions(self) -> dict[str, object]:
467         """Show all Conditions."""
468         pattern = self._params.get_str_or_fail('pattern', '')
469         sort_by = self._params.get_str_or_fail('sort_by', 'title')
470         #
471         conditions = Condition.matching(self._conn, pattern)
472         sort_by = Condition.sort_by(conditions, sort_by)
473         return {'conditions': conditions,
474                 'sort_by': sort_by,
475                 'pattern': pattern}
476
477     @_get_item(Condition)
478     def do_GET_condition(self, c: Condition) -> dict[str, object]:
479         """Show Condition of ?id=."""
480         ps = Process.all(self._conn)
481         return {'condition': c, 'is_new': c.id_ is None,
482                 'enabled_processes': [p for p in ps if c in p.conditions],
483                 'disabled_processes': [p for p in ps if c in p.blockers],
484                 'enabling_processes': [p for p in ps if c in p.enables],
485                 'disabling_processes': [p for p in ps if c in p.disables]}
486
487     @_get_item(Condition)
488     def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
489         """Show title history of Condition of ?id=."""
490         return {'condition': c}
491
492     @_get_item(Condition)
493     def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
494         """Show description historys of Condition of ?id=."""
495         return {'condition': c}
496
497     @_get_item(Process)
498     def do_GET_process(self, process: Process) -> dict[str, object]:
499         """Show Process of ?id=."""
500         owner_ids = self._params.get_all_int('step_to')
501         owned_ids = self._params.get_all_int('has_step')
502         title_64 = self._params.get_str('title_b64')
503         #
504         if title_64:
505             try:
506                 title = b64decode(title_64.encode()).decode()
507             except binascii_Exception as exc:
508                 msg = 'invalid base64 for ?title_b64='
509                 raise BadFormatException(msg) from exc
510             process.title.set(title)
511         preset_top_step = None
512         owners = process.used_as_step_by(self._conn)
513         for step_id in owner_ids:
514             owners += [Process.by_id(self._conn, step_id)]
515         for process_id in owned_ids:
516             Process.by_id(self._conn, process_id)  # to ensure ID exists
517             preset_top_step = process_id
518         return {'process': process, 'is_new': process.id_ is None,
519                 'preset_top_step': preset_top_step,
520                 'steps': process.get_steps(self._conn),
521                 'owners': owners,
522                 'n_todos': len(Todo.by_process_id(self._conn, process.id_)),
523                 'process_candidates': Process.all(self._conn),
524                 'condition_candidates': Condition.all(self._conn)}
525
526     @_get_item(Process)
527     def do_GET_process_titles(self, p: Process) -> dict[str, object]:
528         """Show title history of Process of ?id=."""
529         return {'process': p}
530
531     @_get_item(Process)
532     def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
533         """Show description historys of Process of ?id=."""
534         return {'process': p}
535
536     @_get_item(Process)
537     def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
538         """Show default effort history of Process of ?id=."""
539         return {'process': p}
540
541     def do_GET_processes(self) -> dict[str, object]:
542         """Show all Processes."""
543         pattern = self._params.get_str_or_fail('pattern', '')
544         sort_by = self._params.get_str_or_fail('sort_by', 'title')
545         #
546         processes = Process.matching(self._conn, pattern)
547         sort_by = Process.sort_by(processes, sort_by)
548         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
549
550     # POST handlers
551
552     @staticmethod
553     def _delete_or_post(target_class: Any, redir_target: str = '/'
554                         ) -> Callable[..., Callable[[TaskHandler], str]]:
555         def decorator(f: Callable[..., str]
556                       ) -> Callable[[TaskHandler], str]:
557             def wrapper(self: TaskHandler) -> str:
558                 # pylint: disable=protected-access
559                 # (because pylint here fails to detect the use of wrapper as a
560                 # method to self with respective access privileges)
561                 id_ = self._params.get_int_or_none('id')
562                 for _ in self._form.get_all_str('delete'):
563                     if id_ is None:
564                         msg = 'trying to delete non-saved ' +\
565                                 f'{target_class.__name__}'
566                         raise NotFoundException(msg)
567                     item = target_class.by_id(self._conn, id_)
568                     item.remove(self._conn)
569                     return redir_target
570                 if target_class.can_create_by_id:
571                     item = target_class.by_id_or_create(self._conn, id_)
572                 else:
573                     item = target_class.by_id(self._conn, id_)
574                 return f(self, item)
575             return wrapper
576         return decorator
577
578     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
579         """Update history timestamps for VersionedAttribute."""
580         id_ = self._params.get_int_or_none('id')
581         item = cls.by_id(self._conn, id_)
582         attr = getattr(item, attr_name)
583         for k, v in self._form.get_firsts_of_key_prefixed('at:').items():
584             old = k[3:]
585             if old[19:] != v:
586                 attr.reset_timestamp(old, f'{v}.0')
587         attr.save(self._conn)
588         return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
589
590     def do_POST_day(self) -> str:
591         """Update or insert Day of date and Todos mapped to it."""
592         # pylint: disable=too-many-locals
593         date = self._params.get_str_or_fail('date')
594         day_comment = self._form.get_str_or_fail('day_comment')
595         make_type = self._form.get_str_or_fail('make_type')
596         old_todos = self._form.get_all_int('todo_id')
597         new_todos_by_process = self._form.get_all_int('new_todo')
598         comments = self._form.get_all_str('comment')
599         efforts = self._form.get_all_floats_or_nones('effort')
600         done_todos = self._form.get_all_int('done')
601         #
602         for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
603             raise BadFormatException('"done" field refers to unknown Todo')
604         is_done = [t_id in done_todos for t_id in old_todos]
605         if not (len(old_todos) == len(is_done) == len(comments)
606                 == len(efforts)):
607             msg = 'not equal number each of number of todo_id, comments, ' +\
608                     'and efforts inputs'
609             raise BadFormatException(msg)
610         day = Day.by_id_or_create(self._conn, date)
611         day.comment = day_comment
612         day.save(self._conn)
613         new_todos = []
614         for process_id in sorted(new_todos_by_process):
615             process = Process.by_id(self._conn, process_id)
616             todo = Todo(None, process, False, date)
617             todo.save(self._conn)
618             new_todos += [todo]
619         if 'full' == make_type:
620             for todo in new_todos:
621                 todo.ensure_children(self._conn)
622         for i, todo_id in enumerate(old_todos):
623             todo = Todo.by_id(self._conn, todo_id)
624             todo.is_done = is_done[i]
625             todo.comment = comments[i]
626             todo.effort = efforts[i]
627             todo.save(self._conn)
628         return f'/day?date={date}&make_type={make_type}'
629
630     @_delete_or_post(Todo, '/')
631     def do_POST_todo(self, todo: Todo) -> str:
632         """Update Todo and its children."""
633         # pylint: disable=too-many-locals
634         # pylint: disable=too-many-branches
635         adoptees = self._form.get_all_int('adopt')
636         to_make = {'full': self._form.get_all_int('make_full'),
637                    'empty': self._form.get_all_int('make_empty')}
638         step_fillers = self._form.get_all_str('step_filler')
639         to_update: dict[str, Any] = {
640             'comment': self._form.get_str_or_fail('comment', '')}
641         for k in ('is_done', 'calendarize'):
642             v = self._form.get_bool_or_none(k)
643             if v is not None:
644                 to_update[k] = v
645         cond_rels = [self._form.get_all_int(name) for name in
646                      ['conditions', 'blockers', 'enables', 'disables']]
647         effort_or_not = self._form.get_str('effort')
648         if effort_or_not is not None:
649             if effort_or_not == '':
650                 to_update['effort'] = None
651             else:
652                 try:
653                     to_update['effort'] = float(effort_or_not)
654                 except ValueError as e:
655                     msg = 'cannot float form field value for key: effort'
656                     raise BadFormatException(msg) from e
657         for filler in [f for f in step_fillers if f != 'ignore']:
658             target_id: int
659             to_int = filler
660             for prefix in [p for p in ['make_empty_', 'make_full_']
661                            if filler.startswith(p)]:
662                 to_int = filler[len(prefix):]
663             try:
664                 target_id = int(to_int)
665             except ValueError as e:
666                 msg = 'bad fill_for target: {filler}'
667                 raise BadFormatException(msg) from e
668             if filler.startswith('make_empty_'):
669                 to_make['empty'] += [target_id]
670             elif filler.startswith('make_full_'):
671                 to_make['full'] += [target_id]
672             else:
673                 adoptees += [target_id]
674         #
675         todo.set_condition_relations(self._conn, *cond_rels)
676         for child in [c for c in todo.children if c.id_ not in adoptees]:
677              todo.remove_child(child)
678         for child_id in [id_ for id_ in adoptees
679                          if id_ not in [c.id_ for c in todo.children]]:
680              todo.add_child(Todo.by_id(self._conn, child_id))
681         todo.update_attrs(**to_update)
682         for approach, proc_ids in to_make.items():
683             for process_id in proc_ids:
684                 process = Process.by_id(self._conn, process_id)
685                 made = Todo(None, process, False, todo.date)
686                 made.save(self._conn)
687                 if 'full' == approach:
688                     made.ensure_children(self._conn)
689                 todo.add_child(made)
690         # todo.save() may destroy Todo if .effort < 0, so retrieve .id_ early
691         url = f'/todo?id={todo.id_}'
692         todo.save(self._conn)
693         return url
694
695     def do_POST_process_descriptions(self) -> str:
696         """Update history timestamps for Process.description."""
697         return self._change_versioned_timestamps(Process, 'description')
698
699     def do_POST_process_efforts(self) -> str:
700         """Update history timestamps for Process.effort."""
701         return self._change_versioned_timestamps(Process, 'effort')
702
703     def do_POST_process_titles(self) -> str:
704         """Update history timestamps for Process.title."""
705         return self._change_versioned_timestamps(Process, 'title')
706
707     @_delete_or_post(Process, '/processes')
708     def do_POST_process(self, process: Process) -> str:
709         """Update or insert Process of ?id= and fields defined in postvars."""
710         # pylint: disable=too-many-locals
711
712         def id_or_title(l_id_or_title: list[str]) -> tuple[str, list[int]]:
713             l_ids, title = [], ''
714             for id_or_title in l_id_or_title:
715                 try:
716                     l_ids += [int(id_or_title)]
717                 except ValueError:
718                     title = id_or_title
719             return title, l_ids
720
721         versioned = {'title': self._form.get_str_or_fail('title'),
722                      'description': self._form.get_str_or_fail('description'),
723                      'effort': self._form.get_float_or_fail('effort')}
724         cond_rels = [self._form.get_all_int(s) for s
725                      in ['conditions', 'blockers', 'enables', 'disables']]
726         calendarize = self._form.get_bool_or_none('calendarize')
727         step_of = self._form.get_all_str('step_of')
728         suppressions = self._form.get_all_int('suppresses')
729         kept_steps = self._form.get_all_int('kept_steps')
730         new_top_step_procs = self._form.get_all_str('new_top_step')
731         new_steps_to = {}
732         for step_id in kept_steps:
733             name = f'new_step_to_{step_id}'
734             new_steps_to[step_id] = self._form.get_all_int(name)
735         new_owner_title, owners_to_set = id_or_title(step_of)
736         new_step_title, new_top_step_proc_ids = id_or_title(new_top_step_procs)
737         #
738         for k, v in versioned.items():
739             getattr(process, k).set(v)
740         if calendarize is not None:
741             process.calendarize = calendarize
742         process.save(self._conn)
743         assert isinstance(process.id_, int)
744         # set relations to Conditions and ProcessSteps / other Processes
745         process.set_condition_relations(self._conn, *cond_rels)
746         owned_steps = []
747         for step_id in kept_steps:
748             owned_steps += [ProcessStep.by_id(self._conn, step_id)]
749             owned_steps += [  # new sub-steps
750                     ProcessStep(None, process.id_, step_process_id, step_id)
751                     for step_process_id in new_steps_to[step_id]]
752         for step_process_id in new_top_step_proc_ids:
753             owned_steps += [ProcessStep(None, process.id_, step_process_id,
754                                         None)]
755         process.set_step_relations(self._conn, owners_to_set, suppressions,
756                                    owned_steps)
757         # encode titles for potential newly-to-create Processes up or down
758         params = f'id={process.id_}'
759         if new_step_title:
760             title_b64_encoded = b64encode(new_step_title.encode()).decode()
761             params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
762         elif new_owner_title:
763             title_b64_encoded = b64encode(new_owner_title.encode()).decode()
764             params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
765         process.save(self._conn)
766         return f'/process?{params}'
767
768     def do_POST_condition_descriptions(self) -> str:
769         """Update history timestamps for Condition.description."""
770         return self._change_versioned_timestamps(Condition, 'description')
771
772     def do_POST_condition_titles(self) -> str:
773         """Update history timestamps for Condition.title."""
774         return self._change_versioned_timestamps(Condition, 'title')
775
776     @_delete_or_post(Condition, '/conditions')
777     def do_POST_condition(self, condition: Condition) -> str:
778         """Update/insert Condition of ?id= and fields defined in postvars."""
779         title = self._form.get_str_or_fail('title')
780         description = self._form.get_str_or_fail('description')
781         is_active = self._form.get_bool_or_none('is_active')
782         #
783         if is_active is not None:
784             condition.is_active = is_active
785         condition.title.set(title)
786         condition.description.set(description)
787         condition.save(self._conn)
788         return f'/condition?id={condition.id_}'