home · contact · privacy
Some work on tests, kinda unfinished.
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from __future__ import annotations
3 from inspect import signature
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from binascii import Error as binascii_Exception
7 from http.server import HTTPServer, BaseHTTPRequestHandler
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
15                                  NotFoundException)
16 from plomtask.db import DatabaseConnection, DatabaseFile, BaseModel
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo, TodoOrProcStepNode
20 from plomtask.misc import DictableNode
21
22 TEMPLATES_DIR = 'templates'
23
24
25 class TaskServer(HTTPServer):
26     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
27
28     def __init__(self, db_file: DatabaseFile,
29                  *args: Any, **kwargs: Any) -> None:
30         super().__init__(*args, **kwargs)
31         self.db = db_file
32         self.render_mode = 'html'
33         self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
34
35
36 class InputsParser:
37     """Wrapper for validating and retrieving dict-like HTTP inputs."""
38
39     def __init__(self, dict_: dict[str, list[str]]) -> None:
40         self.inputs = dict_
41
42     def get_all_str(self, key: str) -> list[str]:
43         """Retrieve list of string values at key (empty if no key)."""
44         if key not in self.inputs.keys():
45             return []
46         return self.inputs[key]
47
48     def get_all_int(self, key: str, fail_on_empty: bool = False) -> list[int]:
49         """Retrieve list of int values at key."""
50         all_str = self.get_all_str(key)
51         try:
52             return [int(s) for s in all_str if fail_on_empty or s != '']
53         except ValueError as e:
54             msg = f'cannot int a form field value for key {key} in: {all_str}'
55             raise BadFormatException(msg) from e
56
57     def get_str(self, key: str, default: str | None = None) -> str | None:
58         """Retrieve single/first string value of key, or default."""
59         vals = self.get_all_str(key)
60         if vals:
61             return vals[0]
62         return default
63
64     def get_str_or_fail(self, key: str, default: str | None = None) -> str:
65         """Retrieve first string value of key, if none: fail or default."""
66         vals = self.get_all_str(key)
67         if not vals:
68             if default is not None:
69                 return default
70             raise BadFormatException(f'no value found for key: {key}')
71         return vals[0]
72
73     def get_int_or_none(self, key: str) -> int | None:
74         """Retrieve single/first value of key as int, return None if empty."""
75         val = self.get_str_or_fail(key, '')
76         if val == '':
77             return None
78         try:
79             return int(val)
80         except (ValueError, TypeError) as e:
81             msg = f'cannot int form field value for key {key}: {val}'
82             raise BadFormatException(msg) from e
83
84     def get_bool(self, key: str) -> bool:
85         """Return if value to key truish; return False if None/no value."""
86         return self.get_str(key) in {'True', 'true', '1', 'on'}
87
88     def get_all_of_key_prefixed(self, key_prefix: str) -> dict[str, list[str]]:
89         """Retrieve dict of strings at keys starting with key_prefix."""
90         ret = {}
91         for key in [k for k in self.inputs.keys() if k.startswith(key_prefix)]:
92             ret[key[len(key_prefix):]] = self.inputs[key]
93         return ret
94
95     def get_float_or_fail(self, key: str) -> float:
96         """Retrieve float value of key from self.postvars, fail if none."""
97         val = self.get_str_or_fail(key)
98         try:
99             return float(val)
100         except ValueError as e:
101             msg = f'cannot float form field value for key {key}: {val}'
102             raise BadFormatException(msg) from e
103
104     def get_all_floats_or_nones(self, key: str) -> list[float | None]:
105         """Retrieve list of float value at key, None if empty strings."""
106         ret: list[float | None] = []
107         for val in self.get_all_str(key):
108             if '' == val:
109                 ret += [None]
110             else:
111                 try:
112                     ret += [float(val)]
113                 except ValueError as e:
114                     msg = f'cannot float form field value for key {key}: {val}'
115                     raise BadFormatException(msg) from e
116         return ret
117
118
119 class TaskHandler(BaseHTTPRequestHandler):
120     """Handles single HTTP request."""
121     # pylint: disable=too-many-public-methods
122     server: TaskServer
123     _conn: DatabaseConnection
124     _site: str
125     _form: InputsParser
126     _params: InputsParser
127
128     def _send_page(
129             self, ctx: dict[str, Any], tmpl_name: str, code: int = 200
130             ) -> None:
131         """HTTP-send ctx as HTML or JSON, as defined by .server.render_mode.
132
133         The differentiation by .server.render_mode serves to allow easily
134         comparable JSON responses for automatic testing.
135         """
136         body: str
137         headers: list[tuple[str, str]] = []
138         if 'html' == self.server.render_mode:
139             tmpl = self.server.jinja.get_template(f'{tmpl_name}.html')
140             body = tmpl.render(ctx)
141         else:
142             body = self._ctx_to_json(ctx)
143             headers += [('Content-Type', 'application/json')]
144         self.send_response(code)
145         for header_tuple in headers:
146             self.send_header(*header_tuple)
147         self.end_headers()
148         self.wfile.write(bytes(body, 'utf-8'))
149
150     def _ctx_to_json(self, ctx: dict[str, object]) -> str:
151         """Render ctx into JSON string.
152
153         Flattens any objects that json.dumps might not want to serialize, and
154         turns occurrences of BaseModel objects into listings of their .id_, to
155         be resolved to a full dict inside a top-level '_library' dictionary,
156         to avoid endless and circular nesting.
157         """
158
159         def flatten(node: object) -> object:
160
161             def update_library_with(
162                     item: BaseModel[int] | BaseModel[str]) -> None:
163                 cls_name = item.__class__.__name__
164                 if cls_name not in library:
165                     library[cls_name] = {}
166                 if item.id_ not in library[cls_name]:
167                     d, refs = item.as_dict_and_refs
168                     id_key = '?' if item.id_ is None else item.id_
169                     library[cls_name][id_key] = d
170                     for ref in refs:
171                         update_library_with(ref)
172
173             if isinstance(node, BaseModel):
174                 update_library_with(node)
175                 return node.id_
176             if isinstance(node, DictableNode):
177                 d, refs = node.as_dict_and_refs
178                 for ref in refs:
179                     update_library_with(ref)
180                 return d
181             if isinstance(node, (list, tuple)):
182                 return [flatten(item) for item in node]
183             if isinstance(node, dict):
184                 d = {}
185                 for k, v in node.items():
186                     d[k] = flatten(v)
187                 return d
188             if isinstance(node, HandledException):
189                 return str(node)
190             return node
191
192         library: dict[str, dict[str | int, object]] = {}
193         for k, v in ctx.items():
194             ctx[k] = flatten(v)
195         ctx['_library'] = library
196         return json_dumps(ctx)
197
198     @staticmethod
199     def _request_wrapper(http_method: str, not_found_msg: str
200                          ) -> Callable[..., Callable[[TaskHandler], None]]:
201         """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
202
203         Among other things, conditionally cleans all caches, but only on POST
204         requests, as only those are expected to change the states of objects
205         that may be cached, and certainly only those are expected to write any
206         changes to the database. We want to call them as early though as
207         possible here, either exactly after the specific request handler
208         returns successfully, or right after any exception is triggered –
209         otherwise, race conditions become plausible.
210
211         Note that otherwise any POST attempt, even a failed one, may end in
212         problematic inconsistencies:
213
214         - if the POST handler experiences an Exception, changes to objects
215           won't get written to the DB, but the changed objects may remain in
216           the cache and affect other objects despite their possibly illegal
217           state
218
219         - even if an object was just saved to the DB, we cannot be sure its
220           current state is completely identical to what we'd get if loading it
221           fresh from the DB (e.g. currently Process.n_owners is only updated
222           when loaded anew via .from_table_row, nor is its state written to
223           the DB by .save; a questionable design choice, but proof that we
224           have no guarantee that objects' .save stores all their states we'd
225           prefer at their most up-to-date.
226         """
227
228         def clear_caches() -> None:
229             for cls in (Day, Todo, Condition, Process, ProcessStep):
230                 assert hasattr(cls, 'empty_cache')
231                 cls.empty_cache()
232
233         def decorator(f: Callable[..., str | None]
234                       ) -> Callable[[TaskHandler], None]:
235             def wrapper(self: TaskHandler) -> None:
236                 # pylint: disable=protected-access
237                 # (because pylint here fails to detect the use of wrapper as a
238                 # method to self with respective access privileges)
239                 try:
240                     self._conn = DatabaseConnection(self.server.db)
241                     parsed_url = urlparse(self.path)
242                     self._site = path_split(parsed_url.path)[1]
243                     params = parse_qs(parsed_url.query,
244                                       keep_blank_values=True,
245                                       strict_parsing=True)
246                     self._params = InputsParser(params)
247                     handler_name = f'do_{http_method}_{self._site}'
248                     if hasattr(self, handler_name):
249                         handler = getattr(self, handler_name)
250                         redir_target = f(self, handler)
251                         if 'POST' == http_method:
252                             clear_caches()
253                         if redir_target:
254                             self.send_response(302)
255                             self.send_header('Location', redir_target)
256                             self.end_headers()
257                     else:
258                         msg = f'{not_found_msg}: {self._site}'
259                         raise NotFoundException(msg)
260                 except HandledException as error:
261                     if 'POST' == http_method:
262                         clear_caches()
263                     ctx = {'msg': error}
264                     self._send_page(ctx, 'msg', error.http_code)
265                 finally:
266                     self._conn.close()
267             return wrapper
268         return decorator
269
270     @_request_wrapper('GET', 'Unknown page')
271     def do_GET(self, handler: Callable[[], str | dict[str, object]]
272                ) -> str | None:
273         """Render page with result of handler, or redirect if result is str."""
274         tmpl_name = f'{self._site}'
275         ctx_or_redir_target = handler()
276         if isinstance(ctx_or_redir_target, str):
277             return ctx_or_redir_target
278         self._send_page(ctx_or_redir_target, tmpl_name)
279         return None
280
281     @_request_wrapper('POST', 'Unknown POST target')
282     def do_POST(self, handler: Callable[[], str]) -> str:
283         """Handle POST with handler, prepare redirection to result."""
284         length = int(self.headers['content-length'])
285         postvars = parse_qs(self.rfile.read(length).decode(),
286                             keep_blank_values=True)
287         self._form = InputsParser(postvars)
288         redir_target = handler()
289         self._conn.commit()
290         return redir_target
291
292     # GET handlers
293
294     @staticmethod
295     def _get_item(target_class: Any
296                   ) -> Callable[..., Callable[[TaskHandler],
297                                               dict[str, object]]]:
298         def decorator(f: Callable[..., dict[str, object]]
299                       ) -> Callable[[TaskHandler], dict[str, object]]:
300             def wrapper(self: TaskHandler) -> dict[str, object]:
301                 # pylint: disable=protected-access
302                 # (because pylint here fails to detect the use of wrapper as a
303                 # method to self with respective access privileges)
304                 id_ = None
305                 for val in self._params.get_all_int('id', fail_on_empty=True):
306                     id_ = val
307                 if target_class.can_create_by_id:
308                     item = target_class.by_id_or_create(self._conn, id_)
309                 else:
310                     item = target_class.by_id(self._conn, id_)
311                 if 'exists' in signature(f).parameters:
312                     exists = id_ is not None and target_class._get_cached(id_)
313                     return f(self, item, exists)
314                 return f(self, item)
315             return wrapper
316         return decorator
317
318     def do_GET_(self) -> str:
319         """Return redirect target on GET /."""
320         return '/day'
321
322     def _do_GET_calendar(self) -> dict[str, object]:
323         """Show Days from ?start= to ?end=.
324
325         Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
326         same, the only difference being the HTML template they are rendered to,
327         which .do_GET selects from their method name.
328         """
329         start = self._params.get_str_or_fail('start', '')
330         end = self._params.get_str_or_fail('end', '')
331         end = end if end != '' else date_in_n_days(366)
332         #
333         days, start, end = Day.by_date_range_with_limits(self._conn,
334                                                          (start, end), 'id')
335         days = Day.with_filled_gaps(days, start, end)
336         today = date_in_n_days(0)
337         return {'start': start, 'end': end, 'days': days, 'today': today}
338
339     def do_GET_calendar(self) -> dict[str, object]:
340         """Show Days from ?start= to ?end= – normal view."""
341         return self._do_GET_calendar()
342
343     def do_GET_calendar_txt(self) -> dict[str, object]:
344         """Show Days from ?start= to ?end= – minimalist view."""
345         return self._do_GET_calendar()
346
347     def do_GET_day(self) -> dict[str, object]:
348         """Show single Day of ?date=."""
349         date = self._params.get_str('date', date_in_n_days(0))
350         make_type = self._params.get_str_or_fail('make_type', 'full')
351         #
352         day = Day.by_id_or_create(self._conn, date)
353         conditions_present = []
354         enablers_for = {}
355         disablers_for = {}
356         for todo in day.todos:
357             for condition in todo.conditions + todo.blockers:
358                 if condition not in conditions_present:
359                     conditions_present += [condition]
360                     enablers_for[condition.id_] = [p for p in
361                                                    Process.all(self._conn)
362                                                    if condition in p.enables]
363                     disablers_for[condition.id_] = [p for p in
364                                                     Process.all(self._conn)
365                                                     if condition in p.disables]
366         seen_todos: set[int] = set()
367         top_nodes = [t.get_step_tree(seen_todos)
368                      for t in day.todos if not t.parents]
369         return {'day': day,
370                 'top_nodes': top_nodes,
371                 'make_type': make_type,
372                 'enablers_for': enablers_for,
373                 'disablers_for': disablers_for,
374                 'conditions_present': conditions_present,
375                 'processes': Process.all(self._conn)}
376
377     @_get_item(Todo)
378     def do_GET_todo(self, todo: Todo) -> dict[str, object]:
379         """Show single Todo of ?id=."""
380
381         def walk_process_steps(node_id: int,
382                                process_step_nodes: list[ProcessStepsNode],
383                                steps_nodes: list[TodoOrProcStepNode]) -> int:
384             for process_step_node in process_step_nodes:
385                 node_id += 1
386                 proc = Process.by_id(self._conn,
387                                      process_step_node.step.step_process_id)
388                 node = TodoOrProcStepNode(node_id, None, proc, [])
389                 steps_nodes += [node]
390                 node_id = walk_process_steps(
391                         node_id, process_step_node.steps, node.children)
392             return node_id
393
394         def walk_todo_steps(node_id: int, todos: list[Todo],
395                             steps_nodes: list[TodoOrProcStepNode]) -> int:
396             for todo in todos:
397                 matched = False
398                 for match in [item for item in steps_nodes
399                               if item.process
400                               and item.process == todo.process]:
401                     match.todo = todo
402                     matched = True
403                     for child in match.children:
404                         child.fillable = True
405                     node_id = walk_todo_steps(
406                             node_id, todo.children, match.children)
407                 if not matched:
408                     node_id += 1
409                     node = TodoOrProcStepNode(node_id, todo, None, [])
410                     steps_nodes += [node]
411                     node_id = walk_todo_steps(
412                             node_id, todo.children, node.children)
413             return node_id
414
415         def collect_adoptables_keys(
416                 steps_nodes: list[TodoOrProcStepNode]) -> set[int]:
417             ids = set()
418             for node in steps_nodes:
419                 if not node.todo:
420                     assert isinstance(node.process, Process)
421                     assert isinstance(node.process.id_, int)
422                     ids.add(node.process.id_)
423                 ids = ids | collect_adoptables_keys(node.children)
424             return ids
425
426         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
427         process_tree = todo.process.get_steps(self._conn, None)
428         steps_todo_to_process: list[TodoOrProcStepNode] = []
429         last_node_id = walk_process_steps(0, process_tree,
430                                           steps_todo_to_process)
431         for steps_node in steps_todo_to_process:
432             steps_node.fillable = True
433         walk_todo_steps(last_node_id, todo_steps, steps_todo_to_process)
434         adoptables: dict[int, list[Todo]] = {}
435         any_adoptables = [Todo.by_id(self._conn, t.id_)
436                           for t in Todo.by_date(self._conn, todo.date)
437                           if t.id_ is not None
438                           and t != todo]
439         for id_ in collect_adoptables_keys(steps_todo_to_process):
440             adoptables[id_] = [t for t in any_adoptables
441                                if t.process.id_ == id_]
442         return {'todo': todo,
443                 'steps_todo_to_process': steps_todo_to_process,
444                 'adoption_candidates_for': adoptables,
445                 'process_candidates': sorted(Process.all(self._conn)),
446                 'todo_candidates': any_adoptables,
447                 'condition_candidates': Condition.all(self._conn)}
448
449     def do_GET_todos(self) -> dict[str, object]:
450         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
451         sort_by = self._params.get_str_or_fail('sort_by', 'title')
452         start = self._params.get_str_or_fail('start', '')
453         end = self._params.get_str_or_fail('end', '')
454         process_id = self._params.get_int_or_none('process_id')
455         comment_pattern = self._params.get_str_or_fail('comment_pattern', '')
456         #
457         ret = Todo.by_date_range_with_limits(self._conn, (start, end))
458         todos_by_date_range, start, end = ret
459         todos = [t for t in todos_by_date_range
460                  if comment_pattern in t.comment
461                  and ((not process_id) or t.process.id_ == process_id)]
462         sort_by = Todo.sort_by(todos, sort_by)
463         return {'start': start, 'end': end, 'process_id': process_id,
464                 'comment_pattern': comment_pattern, 'todos': todos,
465                 'all_processes': Process.all(self._conn), 'sort_by': sort_by}
466
467     def do_GET_conditions(self) -> dict[str, object]:
468         """Show all Conditions."""
469         pattern = self._params.get_str_or_fail('pattern', '')
470         sort_by = self._params.get_str_or_fail('sort_by', 'title')
471         #
472         conditions = Condition.matching(self._conn, pattern)
473         sort_by = Condition.sort_by(conditions, sort_by)
474         return {'conditions': conditions,
475                 'sort_by': sort_by,
476                 'pattern': pattern}
477
478     @_get_item(Condition)
479     def do_GET_condition(self,
480                          c: Condition,
481                          exists: bool
482                          ) -> dict[str, object]:
483         """Show Condition of ?id=."""
484         ps = Process.all(self._conn)
485         return {'condition': c,
486                 'is_new': not exists,
487                 'enabled_processes': [p for p in ps if c in p.conditions],
488                 'disabled_processes': [p for p in ps if c in p.blockers],
489                 'enabling_processes': [p for p in ps if c in p.enables],
490                 'disabling_processes': [p for p in ps if c in p.disables]}
491
492     @_get_item(Condition)
493     def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
494         """Show title history of Condition of ?id=."""
495         return {'condition': c}
496
497     @_get_item(Condition)
498     def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
499         """Show description historys of Condition of ?id=."""
500         return {'condition': c}
501
502     @_get_item(Process)
503     def do_GET_process(self,
504                        process: Process,
505                        exists: bool
506                        ) -> dict[str, object]:
507         """Show Process of ?id=."""
508         owner_ids = self._params.get_all_int('step_to')
509         owned_ids = self._params.get_all_int('has_step')
510         title_64 = self._params.get_str('title_b64')
511         title_new = None
512         if title_64:
513             try:
514                 title_new = b64decode(title_64.encode()).decode()
515             except binascii_Exception as exc:
516                 msg = 'invalid base64 for ?title_b64='
517                 raise BadFormatException(msg) from exc
518         #
519         if title_new:
520             process.title.set(title_new)
521         preset_top_step = None
522         owners = process.used_as_step_by(self._conn)
523         for step_id in owner_ids:
524             owners += [Process.by_id(self._conn, step_id)]
525         for process_id in owned_ids:
526             Process.by_id(self._conn, process_id)  # to ensure ID exists
527             preset_top_step = process_id
528         return {'process': process,
529                 'is_new': not exists,
530                 'preset_top_step': preset_top_step,
531                 'steps': process.get_steps(self._conn),
532                 'owners': owners,
533                 'n_todos': len(Todo.by_process_id(self._conn, process.id_)),
534                 'process_candidates': Process.all(self._conn),
535                 'condition_candidates': Condition.all(self._conn)}
536
537     @_get_item(Process)
538     def do_GET_process_titles(self, p: Process) -> dict[str, object]:
539         """Show title history of Process of ?id=."""
540         return {'process': p}
541
542     @_get_item(Process)
543     def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
544         """Show description historys of Process of ?id=."""
545         return {'process': p}
546
547     @_get_item(Process)
548     def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
549         """Show default effort history of Process of ?id=."""
550         return {'process': p}
551
552     def do_GET_processes(self) -> dict[str, object]:
553         """Show all Processes."""
554         pattern = self._params.get_str_or_fail('pattern', '')
555         sort_by = self._params.get_str_or_fail('sort_by', 'title')
556         #
557         processes = Process.matching(self._conn, pattern)
558         sort_by = Process.sort_by(processes, sort_by)
559         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
560
561     # POST handlers
562
563     @staticmethod
564     def _delete_or_post(target_class: Any, redir_target: str = '/'
565                         ) -> Callable[..., Callable[[TaskHandler], str]]:
566         def decorator(f: Callable[..., str]
567                       ) -> Callable[[TaskHandler], str]:
568             def wrapper(self: TaskHandler) -> str:
569                 # pylint: disable=protected-access
570                 # (because pylint here fails to detect the use of wrapper as a
571                 # method to self with respective access privileges)
572                 id_ = self._params.get_int_or_none('id')
573                 for _ in self._form.get_all_str('delete'):
574                     if id_ is None:
575                         msg = 'trying to delete non-saved ' +\
576                                 f'{target_class.__name__}'
577                         raise NotFoundException(msg)
578                     item = target_class.by_id(self._conn, id_)
579                     item.remove(self._conn)
580                     return redir_target
581                 if target_class.can_create_by_id:
582                     item = target_class.by_id_or_create(self._conn, id_)
583                 else:
584                     item = target_class.by_id(self._conn, id_)
585                 return f(self, item)
586             return wrapper
587         return decorator
588
589     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
590         """Update history timestamps for VersionedAttribute."""
591         id_ = self._params.get_int_or_none('id')
592         item = cls.by_id(self._conn, id_)
593         attr = getattr(item, attr_name)
594         for k, vals in self._form.get_all_of_key_prefixed('at:').items():
595             if k[19:] != vals[0]:
596                 attr.reset_timestamp(k, f'{vals[0]}.0')
597         attr.save(self._conn)
598         return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
599
600     def do_POST_day(self) -> str:
601         """Update or insert Day of date and Todos mapped to it."""
602         # pylint: disable=too-many-locals
603         date = self._params.get_str_or_fail('date')
604         day_comment = self._form.get_str_or_fail('day_comment')
605         make_type = self._form.get_str_or_fail('make_type')
606         old_todos = self._form.get_all_int('todo_id')
607         new_todos_by_process = self._form.get_all_int('new_todo')
608         comments = self._form.get_all_str('comment')
609         efforts = self._form.get_all_floats_or_nones('effort')
610         done_todos = self._form.get_all_int('done')
611         is_done = [t_id in done_todos for t_id in old_todos]
612         if not (len(old_todos) == len(is_done) == len(comments)
613                 == len(efforts)):
614             msg = 'not equal number each of number of todo_id, comments, ' +\
615                     'and efforts inputs'
616             raise BadFormatException(msg)
617         for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
618             raise BadFormatException('"done" field refers to unknown Todo')
619         #
620         day = Day.by_id_or_create(self._conn, date)
621         day.comment = day_comment
622         day.save(self._conn)
623         new_todos = []
624         for process_id in sorted(new_todos_by_process):
625             process = Process.by_id(self._conn, process_id)
626             todo = Todo(None, process, False, date)
627             todo.save(self._conn)
628             new_todos += [todo]
629         if 'full' == make_type:
630             for todo in new_todos:
631                 todo.ensure_children(self._conn)
632         for i, todo_id in enumerate(old_todos):
633             todo = Todo.by_id(self._conn, todo_id)
634             todo.is_done = is_done[i]
635             todo.comment = comments[i]
636             todo.effort = efforts[i]
637             todo.save(self._conn)
638         return f'/day?date={date}&make_type={make_type}'
639
640     @_delete_or_post(Todo, '/')
641     def do_POST_todo(self, todo: Todo) -> str:
642         """Update Todo and its children."""
643         # pylint: disable=too-many-locals
644         # pylint: disable=too-many-branches
645         # pylint: disable=too-many-statements
646         assert todo.id_ is not None
647         adoptees = [(id_, todo.id_) for id_ in self._form.get_all_int('adopt')]
648         to_make = {'full': [(id_, todo.id_)
649                             for id_ in self._form.get_all_int('make_full')],
650                    'empty': [(id_, todo.id_)
651                              for id_ in self._form.get_all_int('make_empty')]}
652         step_fillers_to = self._form.get_all_of_key_prefixed('step_filler_to_')
653         to_update: dict[str, Any] = {
654             'comment': self._form.get_str_or_fail('comment', ''),
655             'is_done': self._form.get_bool('is_done'),
656             'calendarize': self._form.get_bool('calendarize')}
657         cond_rels = [self._form.get_all_int(name) for name in
658                      ['conditions', 'blockers', 'enables', 'disables']]
659         effort_or_not = self._form.get_str('effort')
660         if effort_or_not is not None:
661             if effort_or_not == '':
662                 to_update['effort'] = None
663             else:
664                 try:
665                     to_update['effort'] = float(effort_or_not)
666                 except ValueError as e:
667                     msg = 'cannot float form field value for key: effort'
668                     raise BadFormatException(msg) from e
669         for k, fillers in step_fillers_to.items():
670             try:
671                 parent_id = int(k)
672             except ValueError as e:
673                 msg = f'bad step_filler_to_ key: {k}'
674                 raise BadFormatException(msg) from e
675             for filler in [f for f in fillers if f != 'ignore']:
676                 target_id: int
677                 prefix = 'make_'
678                 to_int = filler[5:] if filler.startswith(prefix) else filler
679                 try:
680                     target_id = int(to_int)
681                 except ValueError as e:
682                     msg = f'bad fill_for target: {filler}'
683                     raise BadFormatException(msg) from e
684                 if filler.startswith(prefix):
685                     to_make['empty'] += [(target_id, parent_id)]
686                 else:
687                     adoptees += [(target_id, parent_id)]
688         #
689         todo.set_condition_relations(self._conn, *cond_rels)
690         for parent in [Todo.by_id(self._conn, a[1])
691                        for a in adoptees] + [todo]:
692             for child in parent.children:
693                 if child not in [t[0] for t in adoptees
694                                  if t[0] == child.id_ and t[1] == parent.id_]:
695                     parent.remove_child(child)
696                     parent.save(self._conn)
697         for child_id, parent_id in adoptees:
698             parent = Todo.by_id(self._conn, parent_id)
699             if child_id not in [c.id_ for c in parent.children]:
700                 parent.add_child(Todo.by_id(self._conn, child_id))
701                 parent.save(self._conn)
702         todo.update_attrs(**to_update)
703         for approach, make_data in to_make.items():
704             for process_id, parent_id in make_data:
705                 parent = Todo.by_id(self._conn, parent_id)
706                 process = Process.by_id(self._conn, process_id)
707                 made = Todo(None, process, False, todo.date)
708                 made.save(self._conn)
709                 if 'full' == approach:
710                     made.ensure_children(self._conn)
711                 parent.add_child(made)
712                 parent.save(self._conn)
713         # todo.save() may destroy Todo if .effort < 0, so retrieve .id_ early
714         url = f'/todo?id={todo.id_}'
715         todo.save(self._conn)
716         return url
717
718     def do_POST_process_descriptions(self) -> str:
719         """Update history timestamps for Process.description."""
720         return self._change_versioned_timestamps(Process, 'description')
721
722     def do_POST_process_efforts(self) -> str:
723         """Update history timestamps for Process.effort."""
724         return self._change_versioned_timestamps(Process, 'effort')
725
726     def do_POST_process_titles(self) -> str:
727         """Update history timestamps for Process.title."""
728         return self._change_versioned_timestamps(Process, 'title')
729
730     @_delete_or_post(Process, '/processes')
731     def do_POST_process(self, process: Process) -> str:
732         """Update or insert Process of ?id= and fields defined in postvars."""
733         # pylint: disable=too-many-locals
734
735         def id_or_title(l_id_or_title: list[str]) -> tuple[str, list[int]]:
736             l_ids, title = [], ''
737             for id_or_title in l_id_or_title:
738                 try:
739                     l_ids += [int(id_or_title)]
740                 except ValueError:
741                     title = id_or_title
742             return title, l_ids
743
744         versioned = {'title': self._form.get_str_or_fail('title'),
745                      'description': self._form.get_str_or_fail('description'),
746                      'effort': self._form.get_float_or_fail('effort')}
747         cond_rels = [self._form.get_all_int(s) for s
748                      in ['conditions', 'blockers', 'enables', 'disables']]
749         calendarize = self._form.get_bool('calendarize')
750         step_of = self._form.get_all_str('step_of')
751         suppressions = self._form.get_all_int('suppresses')
752         kept_steps = self._form.get_all_int('kept_steps')
753         new_top_step_procs = self._form.get_all_str('new_top_step')
754         new_steps_to = {}
755         for step_id in kept_steps:
756             name = f'new_step_to_{step_id}'
757             new_steps_to[step_id] = self._form.get_all_int(name)
758         new_owner_title, owners_to_set = id_or_title(step_of)
759         new_step_title, new_top_step_proc_ids = id_or_title(new_top_step_procs)
760         #
761         for k, v in versioned.items():
762             getattr(process, k).set(v)
763         process.calendarize = calendarize
764         process.save(self._conn)
765         assert isinstance(process.id_, int)
766         # set relations to Conditions and ProcessSteps / other Processes
767         process.set_condition_relations(self._conn, *cond_rels)
768         owned_steps = []
769         for step_id in kept_steps:
770             owned_steps += [ProcessStep.by_id(self._conn, step_id)]
771             owned_steps += [  # new sub-steps
772                     ProcessStep(None, process.id_, step_process_id, step_id)
773                     for step_process_id in new_steps_to[step_id]]
774         for step_process_id in new_top_step_proc_ids:
775             owned_steps += [ProcessStep(None, process.id_, step_process_id,
776                                         None)]
777         process.set_step_relations(self._conn, owners_to_set, suppressions,
778                                    owned_steps)
779         # encode titles for potential newly-to-create Processes up or down
780         params = f'id={process.id_}'
781         if new_step_title:
782             title_b64_encoded = b64encode(new_step_title.encode()).decode()
783             params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
784         elif new_owner_title:
785             title_b64_encoded = b64encode(new_owner_title.encode()).decode()
786             params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
787         process.save(self._conn)
788         return f'/process?{params}'
789
790     def do_POST_condition_descriptions(self) -> str:
791         """Update history timestamps for Condition.description."""
792         return self._change_versioned_timestamps(Condition, 'description')
793
794     def do_POST_condition_titles(self) -> str:
795         """Update history timestamps for Condition.title."""
796         return self._change_versioned_timestamps(Condition, 'title')
797
798     @_delete_or_post(Condition, '/conditions')
799     def do_POST_condition(self, condition: Condition) -> str:
800         """Update/insert Condition of ?id= and fields defined in postvars."""
801         title = self._form.get_str_or_fail('title')
802         description = self._form.get_str_or_fail('description')
803         is_active = self._form.get_bool('is_active')
804         condition.is_active = is_active
805         #
806         condition.title.set(title)
807         condition.description.set(description)
808         condition.save(self._conn)
809         return f'/condition?id={condition.id_}'