home · contact · privacy
Expand POST /todo adoption tests.
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from __future__ import annotations
3 from typing import Any, Callable
4 from base64 import b64encode, b64decode
5 from binascii import Error as binascii_Exception
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
15                                  NotFoundException)
16 from plomtask.db import DatabaseConnection, DatabaseFile, BaseModel
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo, TodoOrProcStepNode, DictableNode
20
21 TEMPLATES_DIR = 'templates'
22
23
24 class TaskServer(HTTPServer):
25     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
26
27     def __init__(self, db_file: DatabaseFile,
28                  *args: Any, **kwargs: Any) -> None:
29         super().__init__(*args, **kwargs)
30         self.db = db_file
31         self.render_mode = 'html'
32         self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
33
34
35 class InputsParser:
36     """Wrapper for validating and retrieving dict-like HTTP inputs."""
37
38     def __init__(self, dict_: dict[str, list[str]],
39                  strictness: bool = True) -> None:
40         self.inputs = dict_
41         self.strict = strictness  # return None on absence of key, or fail?
42
43     def get_str(self, key: str, default: str = '',
44                 ignore_strict: bool = False) -> str:
45         """Retrieve single/first string value of key, or default."""
46         if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
47             if self.strict and not ignore_strict:
48                 raise NotFoundException(f'no value found for key {key}')
49             return default
50         return self.inputs[key][0]
51
52     def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
53         """Retrieve dict of (first) strings at key starting with prefix."""
54         ret = {}
55         for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
56             ret[key] = self.inputs[key][0]
57         return ret
58
59     def get_int(self, key: str) -> int:
60         """Retrieve single/first value of key as int, error if empty."""
61         val = self.get_int_or_none(key)
62         if val is None:
63             raise BadFormatException(f'unexpected empty value for: {key}')
64         return val
65
66     def get_int_or_none(self, key: str) -> int | None:
67         """Retrieve single/first value of key as int, return None if empty."""
68         val = self.get_str(key, ignore_strict=True)
69         if val == '':
70             return None
71         try:
72             return int(val)
73         except ValueError as e:
74             msg = f'cannot int form field value for key {key}: {val}'
75             raise BadFormatException(msg) from e
76
77     def get_float(self, key: str) -> float:
78         """Retrieve float value of key from self.postvars."""
79         val = self.get_str(key)
80         try:
81             return float(val)
82         except ValueError as e:
83             msg = f'cannot float form field value for key {key}: {val}'
84             raise BadFormatException(msg) from e
85
86     def get_float_or_none(self, key: str) -> float | None:
87         """Retrieve float value of key from self.postvars, None if empty."""
88         val = self.get_str(key)
89         if '' == val:
90             return None
91         try:
92             return float(val)
93         except ValueError as e:
94             msg = f'cannot float form field value for key {key}: {val}'
95             raise BadFormatException(msg) from e
96
97     def get_all_str(self, key: str) -> list[str]:
98         """Retrieve list of string values at key."""
99         if key not in self.inputs.keys():
100             return []
101         return self.inputs[key]
102
103     def get_all_int(self, key: str) -> list[int]:
104         """Retrieve list of int values at key."""
105         all_str = self.get_all_str(key)
106         try:
107             return [int(s) for s in all_str if len(s) > 0]
108         except ValueError as e:
109             msg = f'cannot int a form field value for key {key} in: {all_str}'
110             raise BadFormatException(msg) from e
111
112     def get_all_floats_or_nones(self, key: str) -> list[float | None]:
113         """Retrieve list of float value at key, None if empty strings."""
114         ret: list[float | None] = []
115         for val in self.get_all_str(key):
116             if '' == val:
117                 ret += [None]
118             else:
119                 try:
120                     ret += [float(val)]
121                 except ValueError as e:
122                     msg = f'cannot float form field value for key {key}: {val}'
123                     raise BadFormatException(msg) from e
124         return ret
125
126
127 class TaskHandler(BaseHTTPRequestHandler):
128     """Handles single HTTP request."""
129     # pylint: disable=too-many-public-methods
130     server: TaskServer
131     conn: DatabaseConnection
132     _site: str
133     _form_data: InputsParser
134     _params: InputsParser
135
136     def _send_page(
137             self, ctx: dict[str, Any], tmpl_name: str, code: int = 200
138             ) -> None:
139         """HTTP-send ctx as HTML or JSON, as defined by .server.render_mode.
140
141         The differentiation by .server.render_mode serves to allow easily
142         comparable JSON responses for automatic testing.
143         """
144         body: str
145         headers: list[tuple[str, str]] = []
146         if 'html' == self.server.render_mode:
147             tmpl = self.server.jinja.get_template(f'{tmpl_name}.html')
148             body = tmpl.render(ctx)
149         else:
150             body = self._ctx_to_json(ctx)
151             headers += [('Content-Type', 'application/json')]
152         self.send_response(code)
153         for header_tuple in headers:
154             self.send_header(*header_tuple)
155         self.end_headers()
156         self.wfile.write(bytes(body, 'utf-8'))
157
158     def _ctx_to_json(self, ctx: dict[str, object]) -> str:
159         """Render ctx into JSON string.
160
161         Flattens any objects that json.dumps might not want to serialize, and
162         turns occurrences of BaseModel objects into listings of their .id_, to
163         be resolved to a full dict inside a top-level '_library' dictionary,
164         to avoid endless and circular nesting.
165         """
166
167         def flatten(node: object) -> object:
168
169             def update_library_with(
170                     item: BaseModel[int] | BaseModel[str]) -> None:
171                 cls_name = item.__class__.__name__
172                 if cls_name not in library:
173                     library[cls_name] = {}
174                 if item.id_ not in library[cls_name]:
175                     d, refs = item.as_dict_and_refs
176                     id_key = '?' if item.id_ is None else item.id_
177                     library[cls_name][id_key] = d
178                     for ref in refs:
179                         update_library_with(ref)
180
181             if isinstance(node, BaseModel):
182                 update_library_with(node)
183                 return node.id_
184             if isinstance(node, DictableNode):
185                 d, refs = node.as_dict_and_refs
186                 for ref in refs:
187                     update_library_with(ref)
188                 return d
189             if isinstance(node, (list, tuple)):
190                 return [flatten(item) for item in node]
191             if isinstance(node, dict):
192                 d = {}
193                 for k, v in node.items():
194                     d[k] = flatten(v)
195                 return d
196             if isinstance(node, HandledException):
197                 return str(node)
198             return node
199
200         library: dict[str, dict[str | int, object]] = {}
201         for k, v in ctx.items():
202             ctx[k] = flatten(v)
203         ctx['_library'] = library
204         return json_dumps(ctx)
205
206     @staticmethod
207     def _request_wrapper(http_method: str, not_found_msg: str
208                          ) -> Callable[..., Callable[[TaskHandler], None]]:
209         """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
210
211         Among other things, conditionally cleans all caches, but only on POST
212         requests, as only those are expected to change the states of objects
213         that may be cached, and certainly only those are expected to write any
214         changes to the database. We want to call them as early though as
215         possible here, either exactly after the specific request handler
216         returns successfully, or right after any exception is triggered –
217         otherwise, race conditions become plausible.
218
219         Note that otherwise any POST attempt, even a failed one, may end in
220         problematic inconsistencies:
221
222         - if the POST handler experiences an Exception, changes to objects
223           won't get written to the DB, but the changed objects may remain in
224           the cache and affect other objects despite their possibly illegal
225           state
226
227         - even if an object was just saved to the DB, we cannot be sure its
228           current state is completely identical to what we'd get if loading it
229           fresh from the DB (e.g. currently Process.n_owners is only updated
230           when loaded anew via .from_table_row, nor is its state written to
231           the DB by .save; a questionable design choice, but proof that we
232           have no guarantee that objects' .save stores all their states we'd
233           prefer at their most up-to-date.
234         """
235
236         def clear_caches() -> None:
237             for cls in (Day, Todo, Condition, Process, ProcessStep):
238                 assert hasattr(cls, 'empty_cache')
239                 cls.empty_cache()
240
241         def decorator(f: Callable[..., str | None]
242                       ) -> Callable[[TaskHandler], None]:
243             def wrapper(self: TaskHandler) -> None:
244                 # pylint: disable=protected-access
245                 # (because pylint here fails to detect the use of wrapper as a
246                 # method to self with respective access privileges)
247                 try:
248                     self.conn = DatabaseConnection(self.server.db)
249                     parsed_url = urlparse(self.path)
250                     self._site = path_split(parsed_url.path)[1]
251                     params = parse_qs(parsed_url.query, strict_parsing=True)
252                     self._params = InputsParser(params, False)
253                     handler_name = f'do_{http_method}_{self._site}'
254                     if hasattr(self, handler_name):
255                         handler = getattr(self, handler_name)
256                         redir_target = f(self, handler)
257                         if 'POST' == http_method:
258                             clear_caches()
259                         if redir_target:
260                             self.send_response(302)
261                             self.send_header('Location', redir_target)
262                             self.end_headers()
263                     else:
264                         msg = f'{not_found_msg}: {self._site}'
265                         raise NotFoundException(msg)
266                 except HandledException as error:
267                     if 'POST' == http_method:
268                         clear_caches()
269                     ctx = {'msg': error}
270                     self._send_page(ctx, 'msg', error.http_code)
271                 finally:
272                     self.conn.close()
273             return wrapper
274         return decorator
275
276     @_request_wrapper('GET', 'Unknown page')
277     def do_GET(self, handler: Callable[[], str | dict[str, object]]
278                ) -> str | None:
279         """Render page with result of handler, or redirect if result is str."""
280         tmpl_name = f'{self._site}'
281         ctx_or_redir_target = handler()
282         if isinstance(ctx_or_redir_target, str):
283             return ctx_or_redir_target
284         self._send_page(ctx_or_redir_target, tmpl_name)
285         return None
286
287     @_request_wrapper('POST', 'Unknown POST target')
288     def do_POST(self, handler: Callable[[], str]) -> str:
289         """Handle POST with handler, prepare redirection to result."""
290         length = int(self.headers['content-length'])
291         postvars = parse_qs(self.rfile.read(length).decode(),
292                             keep_blank_values=True, strict_parsing=True)
293         self._form_data = InputsParser(postvars)
294         redir_target = handler()
295         self.conn.commit()
296         return redir_target
297
298     # GET handlers
299
300     @staticmethod
301     def _get_item(target_class: Any
302                   ) -> Callable[..., Callable[[TaskHandler],
303                                               dict[str, object]]]:
304         def decorator(f: Callable[..., dict[str, object]]
305                       ) -> Callable[[TaskHandler], dict[str, object]]:
306             def wrapper(self: TaskHandler) -> dict[str, object]:
307                 # pylint: disable=protected-access
308                 # (because pylint here fails to detect the use of wrapper as a
309                 # method to self with respective access privileges)
310                 id_ = self._params.get_int_or_none('id')
311                 if target_class.can_create_by_id:
312                     item = target_class.by_id_or_create(self.conn, id_)
313                 else:
314                     item = target_class.by_id(self.conn, id_)
315                 return f(self, item)
316             return wrapper
317         return decorator
318
319     def do_GET_(self) -> str:
320         """Return redirect target on GET /."""
321         return '/day'
322
323     def _do_GET_calendar(self) -> dict[str, object]:
324         """Show Days from ?start= to ?end=.
325
326         Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
327         same, the only difference being the HTML template they are rendered to,
328         which .do_GET selects from their method name.
329         """
330         start, end = self._params.get_str('start'), self._params.get_str('end')
331         end = end if end else date_in_n_days(366)
332         days, start, end = Day.by_date_range_with_limits(self.conn,
333                                                          (start, end), 'id')
334         days = Day.with_filled_gaps(days, start, end)
335         today = date_in_n_days(0)
336         return {'start': start, 'end': end, 'days': days, 'today': today}
337
338     def do_GET_calendar(self) -> dict[str, object]:
339         """Show Days from ?start= to ?end= – normal view."""
340         return self._do_GET_calendar()
341
342     def do_GET_calendar_txt(self) -> dict[str, object]:
343         """Show Days from ?start= to ?end= – minimalist view."""
344         return self._do_GET_calendar()
345
346     def do_GET_day(self) -> dict[str, object]:
347         """Show single Day of ?date=."""
348         date = self._params.get_str('date', date_in_n_days(0))
349         day = Day.by_id_or_create(self.conn, date)
350         make_type = self._params.get_str('make_type')
351         conditions_present = []
352         enablers_for = {}
353         disablers_for = {}
354         for todo in day.todos:
355             for condition in todo.conditions + todo.blockers:
356                 if condition not in conditions_present:
357                     conditions_present += [condition]
358                     enablers_for[condition.id_] = [p for p in
359                                                    Process.all(self.conn)
360                                                    if condition in p.enables]
361                     disablers_for[condition.id_] = [p for p in
362                                                     Process.all(self.conn)
363                                                     if condition in p.disables]
364         seen_todos: set[int] = set()
365         top_nodes = [t.get_step_tree(seen_todos)
366                      for t in day.todos if not t.parents]
367         return {'day': day,
368                 'top_nodes': top_nodes,
369                 'make_type': make_type,
370                 'enablers_for': enablers_for,
371                 'disablers_for': disablers_for,
372                 'conditions_present': conditions_present,
373                 'processes': Process.all(self.conn)}
374
375     @_get_item(Todo)
376     def do_GET_todo(self, todo: Todo) -> dict[str, object]:
377         """Show single Todo of ?id=."""
378
379         def walk_process_steps(node_id: int,
380                                process_step_nodes: list[ProcessStepsNode],
381                                steps_nodes: list[TodoOrProcStepNode]) -> int:
382             for process_step_node in process_step_nodes:
383                 node_id += 1
384                 node = TodoOrProcStepNode(node_id, None,
385                                           process_step_node.process, [])
386                 steps_nodes += [node]
387                 node_id = walk_process_steps(
388                         node_id, list(process_step_node.steps.values()),
389                         node.children)
390             return node_id
391
392         def walk_todo_steps(node_id: int, todos: list[Todo],
393                             steps_nodes: list[TodoOrProcStepNode]) -> int:
394             for todo in todos:
395                 matched = False
396                 for match in [item for item in steps_nodes
397                               if item.process
398                               and item.process == todo.process]:
399                     match.todo = todo
400                     matched = True
401                     for child in match.children:
402                         child.fillable = True
403                     node_id = walk_todo_steps(
404                             node_id, todo.children, match.children)
405                 if not matched:
406                     node_id += 1
407                     node = TodoOrProcStepNode(node_id, todo, None, [])
408                     steps_nodes += [node]
409                     node_id = walk_todo_steps(
410                             node_id, todo.children, node.children)
411             return node_id
412
413         def collect_adoptables_keys(
414                 steps_nodes: list[TodoOrProcStepNode]) -> set[int]:
415             ids = set()
416             for node in steps_nodes:
417                 if not node.todo:
418                     assert isinstance(node.process, Process)
419                     assert isinstance(node.process.id_, int)
420                     ids.add(node.process.id_)
421                 ids = ids | collect_adoptables_keys(node.children)
422             return ids
423
424         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
425         process_tree = todo.process.get_steps(self.conn, None)
426         steps_todo_to_process: list[TodoOrProcStepNode] = []
427         last_node_id = walk_process_steps(
428                 0, list(process_tree.values()), steps_todo_to_process)
429         for steps_node in steps_todo_to_process:
430             steps_node.fillable = True
431         walk_todo_steps(last_node_id, todo_steps, steps_todo_to_process)
432         adoptables: dict[int, list[Todo]] = {}
433         any_adoptables = [Todo.by_id(self.conn, t.id_)
434                           for t in Todo.by_date(self.conn, todo.date)
435                           if t.id_ is not None
436                           and t != todo]
437         for id_ in collect_adoptables_keys(steps_todo_to_process):
438             adoptables[id_] = [t for t in any_adoptables
439                                if t.process.id_ == id_]
440         return {'todo': todo,
441                 'steps_todo_to_process': steps_todo_to_process,
442                 'adoption_candidates_for': adoptables,
443                 'process_candidates': sorted(Process.all(self.conn)),
444                 'todo_candidates': any_adoptables,
445                 'condition_candidates': Condition.all(self.conn)}
446
447     def do_GET_todos(self) -> dict[str, object]:
448         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
449         sort_by = self._params.get_str('sort_by')
450         start = self._params.get_str('start')
451         end = self._params.get_str('end')
452         process_id = self._params.get_int_or_none('process_id')
453         comment_pattern = self._params.get_str('comment_pattern')
454         todos = []
455         ret = Todo.by_date_range_with_limits(self.conn, (start, end))
456         todos_by_date_range, start, end = ret
457         todos = [t for t in todos_by_date_range
458                  if comment_pattern in t.comment
459                  and ((not process_id) or t.process.id_ == process_id)]
460         sort_by = Todo.sort_by(todos, sort_by)
461         return {'start': start, 'end': end, 'process_id': process_id,
462                 'comment_pattern': comment_pattern, 'todos': todos,
463                 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
464
465     def do_GET_conditions(self) -> dict[str, object]:
466         """Show all Conditions."""
467         pattern = self._params.get_str('pattern')
468         sort_by = self._params.get_str('sort_by')
469         conditions = Condition.matching(self.conn, pattern)
470         sort_by = Condition.sort_by(conditions, sort_by)
471         return {'conditions': conditions,
472                 'sort_by': sort_by,
473                 'pattern': pattern}
474
475     @_get_item(Condition)
476     def do_GET_condition(self, c: Condition) -> dict[str, object]:
477         """Show Condition of ?id=."""
478         ps = Process.all(self.conn)
479         return {'condition': c, 'is_new': c.id_ is None,
480                 'enabled_processes': [p for p in ps if c in p.conditions],
481                 'disabled_processes': [p for p in ps if c in p.blockers],
482                 'enabling_processes': [p for p in ps if c in p.enables],
483                 'disabling_processes': [p for p in ps if c in p.disables]}
484
485     @_get_item(Condition)
486     def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
487         """Show title history of Condition of ?id=."""
488         return {'condition': c}
489
490     @_get_item(Condition)
491     def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
492         """Show description historys of Condition of ?id=."""
493         return {'condition': c}
494
495     @_get_item(Process)
496     def do_GET_process(self, process: Process) -> dict[str, object]:
497         """Show Process of ?id=."""
498         owner_ids = self._params.get_all_int('step_to')
499         owned_ids = self._params.get_all_int('has_step')
500         title_64 = self._params.get_str('title_b64')
501         if title_64:
502             try:
503                 title = b64decode(title_64.encode()).decode()
504             except binascii_Exception as exc:
505                 msg = 'invalid base64 for ?title_b64='
506                 raise BadFormatException(msg) from exc
507             process.title.set(title)
508         preset_top_step = None
509         owners = process.used_as_step_by(self.conn)
510         for step_id in owner_ids:
511             owners += [Process.by_id(self.conn, step_id)]
512         for process_id in owned_ids:
513             Process.by_id(self.conn, process_id)  # to ensure ID exists
514             preset_top_step = process_id
515         return {'process': process, 'is_new': process.id_ is None,
516                 'preset_top_step': preset_top_step,
517                 'steps': process.get_steps(self.conn), 'owners': owners,
518                 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
519                 'process_candidates': Process.all(self.conn),
520                 'condition_candidates': Condition.all(self.conn)}
521
522     @_get_item(Process)
523     def do_GET_process_titles(self, p: Process) -> dict[str, object]:
524         """Show title history of Process of ?id=."""
525         return {'process': p}
526
527     @_get_item(Process)
528     def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
529         """Show description historys of Process of ?id=."""
530         return {'process': p}
531
532     @_get_item(Process)
533     def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
534         """Show default effort history of Process of ?id=."""
535         return {'process': p}
536
537     def do_GET_processes(self) -> dict[str, object]:
538         """Show all Processes."""
539         pattern = self._params.get_str('pattern')
540         sort_by = self._params.get_str('sort_by')
541         processes = Process.matching(self.conn, pattern)
542         sort_by = Process.sort_by(processes, sort_by)
543         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
544
545     # POST handlers
546
547     @staticmethod
548     def _delete_or_post(target_class: Any, redir_target: str = '/'
549                         ) -> Callable[..., Callable[[TaskHandler], str]]:
550         def decorator(f: Callable[..., str]
551                       ) -> Callable[[TaskHandler], str]:
552             def wrapper(self: TaskHandler) -> str:
553                 # pylint: disable=protected-access
554                 # (because pylint here fails to detect the use of wrapper as a
555                 # method to self with respective access privileges)
556                 id_ = self._params.get_int_or_none('id')
557                 for _ in self._form_data.get_all_str('delete'):
558                     if id_ is None:
559                         msg = 'trying to delete non-saved ' +\
560                                 f'{target_class.__name__}'
561                         raise NotFoundException(msg)
562                     item = target_class.by_id(self.conn, id_)
563                     item.remove(self.conn)
564                     return redir_target
565                 if target_class.can_create_by_id:
566                     item = target_class.by_id_or_create(self.conn, id_)
567                 else:
568                     item = target_class.by_id(self.conn, id_)
569                 return f(self, item)
570             return wrapper
571         return decorator
572
573     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
574         """Update history timestamps for VersionedAttribute."""
575         id_ = self._params.get_int_or_none('id')
576         item = cls.by_id(self.conn, id_)
577         attr = getattr(item, attr_name)
578         for k, v in self._form_data.get_first_strings_starting('at:').items():
579             old = k[3:]
580             if old[19:] != v:
581                 attr.reset_timestamp(old, f'{v}.0')
582         attr.save(self.conn)
583         return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
584
585     def do_POST_day(self) -> str:
586         """Update or insert Day of date and Todos mapped to it."""
587         # pylint: disable=too-many-locals
588         try:
589             date = self._params.get_str('date')
590             day_comment = self._form_data.get_str('day_comment')
591             make_type = self._form_data.get_str('make_type')
592         except NotFoundException as e:
593             raise BadFormatException from e
594         old_todos = self._form_data.get_all_int('todo_id')
595         new_todos = self._form_data.get_all_int('new_todo')
596         comments = self._form_data.get_all_str('comment')
597         efforts = self._form_data.get_all_floats_or_nones('effort')
598         done_todos = self._form_data.get_all_int('done')
599         for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
600             raise BadFormatException('"done" field refers to unknown Todo')
601         is_done = [t_id in done_todos for t_id in old_todos]
602         if not (len(old_todos) == len(is_done) == len(comments)
603                 == len(efforts)):
604             msg = 'not equal number each of number of todo_id, comments, ' +\
605                     'and efforts inputs'
606             raise BadFormatException(msg)
607         day = Day.by_id_or_create(self.conn, date)
608         day.comment = day_comment
609         day.save(self.conn)
610         for process_id in sorted(new_todos):
611             if 'empty' == make_type:
612                 process = Process.by_id(self.conn, process_id)
613                 todo = Todo(None, process, False, date)
614                 todo.save(self.conn)
615             else:
616                 Todo.create_with_children(self.conn, process_id, date)
617         for i, todo_id in enumerate(old_todos):
618             todo = Todo.by_id(self.conn, todo_id)
619             todo.is_done = is_done[i]
620             todo.comment = comments[i]
621             todo.effort = efforts[i]
622             todo.save(self.conn)
623         return f'/day?date={date}&make_type={make_type}'
624
625     @_delete_or_post(Todo, '/')
626     def do_POST_todo(self, todo: Todo) -> str:
627         """Update Todo and its children."""
628         # pylint: disable=too-many-locals
629         # pylint: disable=too-many-branches
630         # pylint: disable=too-many-statements
631         adopted_child_ids = self._form_data.get_all_int('adopt')
632         processes_to_make_full = self._form_data.get_all_int('make_full')
633         processes_to_make_empty = self._form_data.get_all_int('make_empty')
634         step_fillers = self._form_data.get_all_str('step_filler')
635         with_effort_post = True
636         try:
637             effort = self._form_data.get_float_or_none('effort')
638         except NotFoundException:
639             with_effort_post = False
640         conditions = self._form_data.get_all_int('conditions')
641         disables = self._form_data.get_all_int('disables')
642         blockers = self._form_data.get_all_int('blockers')
643         enables = self._form_data.get_all_int('enables')
644         is_done = len(self._form_data.get_all_str('done')) > 0
645         calendarize = len(self._form_data.get_all_str('calendarize')) > 0
646         comment = self._form_data.get_str('comment', ignore_strict=True)
647         for filler in step_fillers:
648             target_id: int
649             to_int = filler
650             for prefix in [p for p in ['make_empty_', 'make_full_']
651                            if filler.startswith(p)]:
652                 to_int = filler[len(prefix):]
653             try:
654                 target_id = int(to_int)
655             except ValueError as e:
656                 msg = 'bad fill_for target: {filler}'
657                 raise BadFormatException(msg) from e
658             if filler.startswith('make_empty_'):
659                 processes_to_make_empty += [target_id]
660             elif filler.startswith('make_full_'):
661                 processes_to_make_full += [target_id]
662             elif filler != 'ignore':
663                 adopted_child_ids += [target_id]
664         to_remove = []
665         for child in todo.children:
666             assert isinstance(child.id_, int)
667             if child.id_ not in adopted_child_ids:
668                 to_remove += [child.id_]
669         for id_ in to_remove:
670             child = Todo.by_id(self.conn, id_)
671             todo.remove_child(child)
672         for child_id in adopted_child_ids:
673             if child_id in [c.id_ for c in todo.children]:
674                 continue
675             child = Todo.by_id(self.conn, child_id)
676             todo.add_child(child)
677         for process_id in processes_to_make_empty:
678             process = Process.by_id(self.conn, process_id)
679             made = Todo(None, process, False, todo.date)
680             made.save(self.conn)
681             todo.add_child(made)
682         for process_id in processes_to_make_full:
683             made = Todo.create_with_children(self.conn, process_id, todo.date)
684             todo.add_child(made)
685         if with_effort_post:
686             todo.effort = effort
687         todo.set_conditions(self.conn, conditions)
688         todo.set_blockers(self.conn, blockers)
689         todo.set_enables(self.conn, enables)
690         todo.set_disables(self.conn, disables)
691         todo.is_done = is_done
692         todo.calendarize = calendarize
693         todo.comment = comment
694         # todo.save() may destroy Todo if .effort < 0, so retrieve .id_ early
695         url = f'/todo?id={todo.id_}'
696         todo.save(self.conn)
697         return url
698
699     def do_POST_process_descriptions(self) -> str:
700         """Update history timestamps for Process.description."""
701         return self._change_versioned_timestamps(Process, 'description')
702
703     def do_POST_process_efforts(self) -> str:
704         """Update history timestamps for Process.effort."""
705         return self._change_versioned_timestamps(Process, 'effort')
706
707     def do_POST_process_titles(self) -> str:
708         """Update history timestamps for Process.title."""
709         return self._change_versioned_timestamps(Process, 'title')
710
711     @_delete_or_post(Process, '/processes')
712     def do_POST_process(self, process: Process) -> str:
713         """Update or insert Process of ?id= and fields defined in postvars."""
714         # pylint: disable=too-many-locals
715         # pylint: disable=too-many-statements
716         try:
717             title = self._form_data.get_str('title')
718             description = self._form_data.get_str('description')
719             effort = self._form_data.get_float('effort')
720         except NotFoundException as e:
721             raise BadFormatException from e
722         conditions = self._form_data.get_all_int('conditions')
723         blockers = self._form_data.get_all_int('blockers')
724         enables = self._form_data.get_all_int('enables')
725         disables = self._form_data.get_all_int('disables')
726         calendarize = self._form_data.get_all_str('calendarize') != []
727         suppresses = self._form_data.get_all_int('suppresses')
728         step_of = self._form_data.get_all_str('step_of')
729         keep_steps = self._form_data.get_all_int('keep_step')
730         step_ids = self._form_data.get_all_int('steps')
731         new_top_steps = self._form_data.get_all_str('new_top_step')
732         step_process_id_to = {}
733         step_parent_id_to = {}
734         new_steps_to = {}
735         for step_id in step_ids:
736             name = f'new_step_to_{step_id}'
737             new_steps_to[step_id] = self._form_data.get_all_int(name)
738         for step_id in keep_steps:
739             name = f'step_{step_id}_process_id'
740             step_process_id_to[step_id] = self._form_data.get_int(name)
741             name = f'step_{step_id}_parent_id'
742             step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
743         process.title.set(title)
744         process.description.set(description)
745         process.effort.set(effort)
746         process.set_conditions(self.conn, conditions)
747         process.set_blockers(self.conn, blockers)
748         process.set_enables(self.conn, enables)
749         process.set_disables(self.conn, disables)
750         process.calendarize = calendarize
751         process.save(self.conn)
752         assert isinstance(process.id_, int)
753         new_step_title = None
754         steps: list[ProcessStep] = []
755         for step_id in keep_steps:
756             if step_id not in step_ids:
757                 raise BadFormatException('trying to keep unknown step')
758             step = ProcessStep(step_id, process.id_,
759                                step_process_id_to[step_id],
760                                step_parent_id_to[step_id])
761             steps += [step]
762         for step_id in step_ids:
763             new = [ProcessStep(None, process.id_, step_process_id, step_id)
764                    for step_process_id in new_steps_to[step_id]]
765             steps += new
766         for step_identifier in new_top_steps:
767             try:
768                 step_process_id = int(step_identifier)
769                 step = ProcessStep(None, process.id_, step_process_id, None)
770                 steps += [step]
771             except ValueError:
772                 new_step_title = step_identifier
773         process.set_steps(self.conn, steps)
774         process.set_step_suppressions(self.conn, suppresses)
775         owners_to_set = []
776         new_owner_title = None
777         for owner_identifier in step_of:
778             try:
779                 owners_to_set += [int(owner_identifier)]
780             except ValueError:
781                 new_owner_title = owner_identifier
782         process.set_owners(self.conn, owners_to_set)
783         params = f'id={process.id_}'
784         if new_step_title:
785             title_b64_encoded = b64encode(new_step_title.encode()).decode()
786             params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
787         elif new_owner_title:
788             title_b64_encoded = b64encode(new_owner_title.encode()).decode()
789             params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
790         process.save(self.conn)
791         return f'/process?{params}'
792
793     def do_POST_condition_descriptions(self) -> str:
794         """Update history timestamps for Condition.description."""
795         return self._change_versioned_timestamps(Condition, 'description')
796
797     def do_POST_condition_titles(self) -> str:
798         """Update history timestamps for Condition.title."""
799         return self._change_versioned_timestamps(Condition, 'title')
800
801     @_delete_or_post(Condition, '/conditions')
802     def do_POST_condition(self, condition: Condition) -> str:
803         """Update/insert Condition of ?id= and fields defined in postvars."""
804         try:
805             is_active = self._form_data.get_str('is_active') == 'True'
806             title = self._form_data.get_str('title')
807             description = self._form_data.get_str('description')
808         except NotFoundException as e:
809             raise BadFormatException(e) from e
810         condition.is_active = is_active
811         condition.title.set(title)
812         condition.description.set(description)
813         condition.save(self.conn)
814         return f'/condition?id={condition.id_}'