home · contact · privacy
In GET /process handler, catch malformed ?title_b64= params.
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from binascii import Error as binascii_Exception
7 from http.server import BaseHTTPRequestHandler
8 from http.server import HTTPServer
9 from urllib.parse import urlparse, parse_qs
10 from json import dumps as json_dumps
11 from os.path import split as path_split
12 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
13 from plomtask.dating import date_in_n_days
14 from plomtask.days import Day
15 from plomtask.exceptions import (HandledException, BadFormatException,
16                                  NotFoundException)
17 from plomtask.db import DatabaseConnection, DatabaseFile
18 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
19 from plomtask.conditions import Condition
20 from plomtask.todos import Todo
21
22 TEMPLATES_DIR = 'templates'
23
24
25 class TaskServer(HTTPServer):
26     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
27
28     def __init__(self, db_file: DatabaseFile,
29                  *args: Any, **kwargs: Any) -> None:
30         super().__init__(*args, **kwargs)
31         self.db = db_file
32         self.headers: list[tuple[str, str]] = []
33         self._render_mode = 'html'
34         self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
35
36     def set_json_mode(self) -> None:
37         """Make server send JSON instead of HTML responses."""
38         self._render_mode = 'json'
39         self.headers += [('Content-Type', 'application/json')]
40
41     @staticmethod
42     def ctx_to_json(ctx: dict[str, object]) -> str:
43         """Render ctx into JSON string."""
44         def walk_ctx(node: object) -> Any:
45             if hasattr(node, 'as_dict_into_reference'):
46                 if hasattr(node, 'id_') and node.id_ is not None:
47                     return node.as_dict_into_reference(library)
48             if hasattr(node, 'as_dict'):
49                 return node.as_dict
50             if isinstance(node, (list, tuple)):
51                 return [walk_ctx(x) for x in node]
52             if isinstance(node, dict):
53                 d = {}
54                 for k, v in node.items():
55                     d[k] = walk_ctx(v)
56                 return d
57             if isinstance(node, HandledException):
58                 return str(node)
59             return node
60         library: dict[str, dict[str | int, object]] = {}
61         for k, v in ctx.items():
62             ctx[k] = walk_ctx(v)
63         ctx['_library'] = library
64         return json_dumps(ctx)
65
66     def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
67         """Render ctx according to self._render_mode.."""
68         tmpl_name = f'{tmpl_name}.{self._render_mode}'
69         if 'html' == self._render_mode:
70             template = self._jinja.get_template(tmpl_name)
71             return template.render(ctx)
72         return self.__class__.ctx_to_json(ctx)
73
74
75 class InputsParser:
76     """Wrapper for validating and retrieving dict-like HTTP inputs."""
77
78     def __init__(self, dict_: dict[str, list[str]],
79                  strictness: bool = True) -> None:
80         self.inputs = dict_
81         self.strict = strictness
82
83     def get_str(self, key: str, default: str = '',
84                 ignore_strict: bool = False) -> str:
85         """Retrieve single/first string value of key, or default."""
86         if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
87             if self.strict and not ignore_strict:
88                 raise BadFormatException(f'no value found for key {key}')
89             return default
90         return self.inputs[key][0]
91
92     def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
93         """Retrieve dict of (first) strings at key starting with prefix."""
94         ret = {}
95         for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
96             ret[key] = self.inputs[key][0]
97         return ret
98
99     def get_int(self, key: str) -> int:
100         """Retrieve single/first value of key as int, error if empty."""
101         val = self.get_int_or_none(key)
102         if val is None:
103             raise BadFormatException(f'unexpected empty value for: {key}')
104         return val
105
106     def get_int_or_none(self, key: str) -> int | None:
107         """Retrieve single/first value of key as int, return None if empty."""
108         val = self.get_str(key, ignore_strict=True)
109         if val == '':
110             return None
111         try:
112             return int(val)
113         except ValueError as e:
114             msg = f'cannot int form field value for key {key}: {val}'
115             raise BadFormatException(msg) from e
116
117     def get_float(self, key: str) -> float:
118         """Retrieve float value of key from self.postvars."""
119         val = self.get_str(key)
120         try:
121             return float(val)
122         except ValueError as e:
123             msg = f'cannot float form field value for key {key}: {val}'
124             raise BadFormatException(msg) from e
125
126     def get_all_str(self, key: str) -> list[str]:
127         """Retrieve list of string values at key."""
128         if key not in self.inputs.keys():
129             return []
130         return self.inputs[key]
131
132     def get_all_int(self, key: str) -> list[int]:
133         """Retrieve list of int values at key."""
134         all_str = self.get_all_str(key)
135         try:
136             return [int(s) for s in all_str if len(s) > 0]
137         except ValueError as e:
138             msg = f'cannot int a form field value for key {key} in: {all_str}'
139             raise BadFormatException(msg) from e
140
141     def get_all_floats_or_nones(self, key: str) -> list[float | None]:
142         """Retrieve list of float value at key, None if empty strings."""
143         ret: list[float | None] = []
144         for val in self.get_all_str(key):
145             if '' == val:
146                 ret += [None]
147             else:
148                 try:
149                     ret += [float(val)]
150                 except ValueError as e:
151                     msg = f'cannot float form field value for key {key}: {val}'
152                     raise BadFormatException(msg) from e
153         return ret
154
155
156 class TaskHandler(BaseHTTPRequestHandler):
157     """Handles single HTTP request."""
158     # pylint: disable=too-many-public-methods
159     server: TaskServer
160     conn: DatabaseConnection
161     _site: str
162     _form_data: InputsParser
163     _params: InputsParser
164
165     def _send_page(self,
166                    ctx: dict[str, Any],
167                    tmpl_name: str,
168                    code: int = 200
169                    ) -> None:
170         """Send ctx as proper HTTP response."""
171         body = self.server.render(ctx, tmpl_name)
172         self.send_response(code)
173         for header_tuple in self.server.headers:
174             self.send_header(*header_tuple)
175         self.end_headers()
176         self.wfile.write(bytes(body, 'utf-8'))
177
178     @staticmethod
179     def _request_wrapper(http_method: str, not_found_msg: str
180                          ) -> Callable[..., Callable[[TaskHandler], None]]:
181         """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
182
183         Among other things, conditionally cleans all caches, but only on POST
184         requests, as only those are expected to change the states of objects
185         that may be cached, and certainly only those are expected to write any
186         changes to the database. We want to call them as early though as
187         possible here, either exactly after the specific request handler
188         returns successfully, or right after any exception is triggered –
189         otherwise, race conditions become plausible.
190
191         Note that otherwise any POST attempt, even a failed one, may end in
192         problematic inconsistencies:
193
194         - if the POST handler experiences an Exception, changes to objects
195           won't get written to the DB, but the changed objects may remain in
196           the cache and affect other objects despite their possibly illegal
197           state
198
199         - even if an object was just saved to the DB, we cannot be sure its
200           current state is completely identical to what we'd get if loading it
201           fresh from the DB (e.g. currently Process.n_owners is only updated
202           when loaded anew via .from_table_row, nor is its state written to
203           the DB by .save; a questionable design choice, but proof that we
204           have no guarantee that objects' .save stores all their states we'd
205           prefer at their most up-to-date.
206         """
207
208         def clear_caches() -> None:
209             for cls in (Day, Todo, Condition, Process, ProcessStep):
210                 assert hasattr(cls, 'empty_cache')
211                 cls.empty_cache()
212
213         def decorator(f: Callable[..., str | None]
214                       ) -> Callable[[TaskHandler], None]:
215             def wrapper(self: TaskHandler) -> None:
216                 # pylint: disable=protected-access
217                 # (because pylint here fails to detect the use of wrapper as a
218                 # method to self with respective access privileges)
219                 try:
220                     self.conn = DatabaseConnection(self.server.db)
221                     parsed_url = urlparse(self.path)
222                     self._site = path_split(parsed_url.path)[1]
223                     params = parse_qs(parsed_url.query, strict_parsing=True)
224                     self._params = InputsParser(params, False)
225                     handler_name = f'do_{http_method}_{self._site}'
226                     if hasattr(self, handler_name):
227                         handler = getattr(self, handler_name)
228                         redir_target = f(self, handler)
229                         if 'POST' == http_method:
230                             clear_caches()
231                         if redir_target:
232                             self.send_response(302)
233                             self.send_header('Location', redir_target)
234                             self.end_headers()
235                     else:
236                         msg = f'{not_found_msg}: {self._site}'
237                         raise NotFoundException(msg)
238                 except HandledException as error:
239                     if 'POST' == http_method:
240                         clear_caches()
241                     ctx = {'msg': error}
242                     self._send_page(ctx, 'msg', error.http_code)
243                 finally:
244                     self.conn.close()
245             return wrapper
246         return decorator
247
248     @_request_wrapper('GET', 'Unknown page')
249     def do_GET(self, handler: Callable[[], str | dict[str, object]]
250                ) -> str | None:
251         """Render page with result of handler, or redirect if result is str."""
252         tmpl_name = f'{self._site}'
253         ctx_or_redir_target = handler()
254         if isinstance(ctx_or_redir_target, str):
255             return ctx_or_redir_target
256         self._send_page(ctx_or_redir_target, tmpl_name)
257         return None
258
259     @_request_wrapper('POST', 'Unknown POST target')
260     def do_POST(self, handler: Callable[[], str]) -> str:
261         """Handle POST with handler, prepare redirection to result."""
262         length = int(self.headers['content-length'])
263         postvars = parse_qs(self.rfile.read(length).decode(),
264                             keep_blank_values=True, strict_parsing=True)
265         self._form_data = InputsParser(postvars)
266         redir_target = handler()
267         self.conn.commit()
268         return redir_target
269
270     # GET handlers
271
272     @staticmethod
273     def _get_item(target_class: Any
274                   ) -> Callable[..., Callable[[TaskHandler],
275                                               dict[str, object]]]:
276         def decorator(f: Callable[..., dict[str, object]]
277                       ) -> Callable[[TaskHandler], dict[str, object]]:
278             def wrapper(self: TaskHandler) -> dict[str, object]:
279                 # pylint: disable=protected-access
280                 # (because pylint here fails to detect the use of wrapper as a
281                 # method to self with respective access privileges)
282                 id_ = self._params.get_int_or_none('id')
283                 if target_class.can_create_by_id:
284                     item = target_class.by_id_or_create(self.conn, id_)
285                 else:
286                     item = target_class.by_id(self.conn, id_)
287                 return f(self, item)
288             return wrapper
289         return decorator
290
291     def do_GET_(self) -> str:
292         """Return redirect target on GET /."""
293         return '/day'
294
295     def _do_GET_calendar(self) -> dict[str, object]:
296         """Show Days from ?start= to ?end=.
297
298         Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
299         same, the only difference being the HTML template they are rendered to,
300         which .do_GET selects from their method name.
301         """
302         start = self._params.get_str('start')
303         end = self._params.get_str('end')
304         if not end:
305             end = date_in_n_days(366)
306         ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
307         days, start, end = ret
308         days = Day.with_filled_gaps(days, start, end)
309         today = date_in_n_days(0)
310         return {'start': start, 'end': end, 'days': days, 'today': today}
311
312     def do_GET_calendar(self) -> dict[str, object]:
313         """Show Days from ?start= to ?end= – normal view."""
314         return self._do_GET_calendar()
315
316     def do_GET_calendar_txt(self) -> dict[str, object]:
317         """Show Days from ?start= to ?end= – minimalist view."""
318         return self._do_GET_calendar()
319
320     def do_GET_day(self) -> dict[str, object]:
321         """Show single Day of ?date=."""
322         date = self._params.get_str('date', date_in_n_days(0))
323         day = Day.by_id_or_create(self.conn, date)
324         make_type = self._params.get_str('make_type')
325         conditions_present = []
326         enablers_for = {}
327         disablers_for = {}
328         for todo in day.todos:
329             for condition in todo.conditions + todo.blockers:
330                 if condition not in conditions_present:
331                     conditions_present += [condition]
332                     enablers_for[condition.id_] = [p for p in
333                                                    Process.all(self.conn)
334                                                    if condition in p.enables]
335                     disablers_for[condition.id_] = [p for p in
336                                                     Process.all(self.conn)
337                                                     if condition in p.disables]
338         seen_todos: set[int] = set()
339         top_nodes = [t.get_step_tree(seen_todos)
340                      for t in day.todos if not t.parents]
341         return {'day': day,
342                 'top_nodes': top_nodes,
343                 'make_type': make_type,
344                 'enablers_for': enablers_for,
345                 'disablers_for': disablers_for,
346                 'conditions_present': conditions_present,
347                 'processes': Process.all(self.conn)}
348
349     @_get_item(Todo)
350     def do_GET_todo(self, todo: Todo) -> dict[str, object]:
351         """Show single Todo of ?id=."""
352
353         @dataclass
354         class TodoStepsNode:
355             """Collect what's useful for Todo steps tree display."""
356             id_: int
357             todo: Todo | None
358             process: Process | None
359             children: list[TodoStepsNode]  # pylint: disable=undefined-variable
360             fillable: bool = False
361
362         def walk_process_steps(id_: int,
363                                process_step_nodes: list[ProcessStepsNode],
364                                steps_nodes: list[TodoStepsNode]) -> None:
365             for process_step_node in process_step_nodes:
366                 id_ += 1
367                 node = TodoStepsNode(id_, None, process_step_node.process, [])
368                 steps_nodes += [node]
369                 walk_process_steps(id_, list(process_step_node.steps.values()),
370                                    node.children)
371
372         def walk_todo_steps(id_: int, todos: list[Todo],
373                             steps_nodes: list[TodoStepsNode]) -> None:
374             for todo in todos:
375                 matched = False
376                 for match in [item for item in steps_nodes
377                               if item.process
378                               and item.process == todo.process]:
379                     match.todo = todo
380                     matched = True
381                     for child in match.children:
382                         child.fillable = True
383                     walk_todo_steps(id_, todo.children, match.children)
384                 if not matched:
385                     id_ += 1
386                     node = TodoStepsNode(id_, todo, None, [])
387                     steps_nodes += [node]
388                     walk_todo_steps(id_, todo.children, node.children)
389
390         def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
391                                     ) -> set[int]:
392             ids = set()
393             for node in steps_nodes:
394                 if not node.todo:
395                     assert isinstance(node.process, Process)
396                     assert isinstance(node.process.id_, int)
397                     ids.add(node.process.id_)
398                 ids = ids | collect_adoptables_keys(node.children)
399             return ids
400
401         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
402         process_tree = todo.process.get_steps(self.conn, None)
403         steps_todo_to_process: list[TodoStepsNode] = []
404         walk_process_steps(0, list(process_tree.values()),
405                            steps_todo_to_process)
406         for steps_node in steps_todo_to_process:
407             steps_node.fillable = True
408         walk_todo_steps(len(steps_todo_to_process), todo_steps,
409                         steps_todo_to_process)
410         adoptables: dict[int, list[Todo]] = {}
411         any_adoptables = [Todo.by_id(self.conn, t.id_)
412                           for t in Todo.by_date(self.conn, todo.date)
413                           if t.id_ is not None
414                           and t != todo]
415         for id_ in collect_adoptables_keys(steps_todo_to_process):
416             adoptables[id_] = [t for t in any_adoptables
417                                if t.process.id_ == id_]
418         return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
419                 'adoption_candidates_for': adoptables,
420                 'process_candidates': Process.all(self.conn),
421                 'todo_candidates': any_adoptables,
422                 'condition_candidates': Condition.all(self.conn)}
423
424     def do_GET_todos(self) -> dict[str, object]:
425         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
426         sort_by = self._params.get_str('sort_by')
427         start = self._params.get_str('start')
428         end = self._params.get_str('end')
429         process_id = self._params.get_int_or_none('process_id')
430         comment_pattern = self._params.get_str('comment_pattern')
431         todos = []
432         ret = Todo.by_date_range_with_limits(self.conn, (start, end))
433         todos_by_date_range, start, end = ret
434         todos = [t for t in todos_by_date_range
435                  if comment_pattern in t.comment
436                  and ((not process_id) or t.process.id_ == process_id)]
437         sort_by = Todo.sort_by(todos, sort_by)
438         return {'start': start, 'end': end, 'process_id': process_id,
439                 'comment_pattern': comment_pattern, 'todos': todos,
440                 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
441
442     def do_GET_conditions(self) -> dict[str, object]:
443         """Show all Conditions."""
444         pattern = self._params.get_str('pattern')
445         sort_by = self._params.get_str('sort_by')
446         conditions = Condition.matching(self.conn, pattern)
447         sort_by = Condition.sort_by(conditions, sort_by)
448         return {'conditions': conditions,
449                 'sort_by': sort_by,
450                 'pattern': pattern}
451
452     @_get_item(Condition)
453     def do_GET_condition(self, c: Condition) -> dict[str, object]:
454         """Show Condition of ?id=."""
455         ps = Process.all(self.conn)
456         return {'condition': c, 'is_new': c.id_ is None,
457                 'enabled_processes': [p for p in ps if c in p.conditions],
458                 'disabled_processes': [p for p in ps if c in p.blockers],
459                 'enabling_processes': [p for p in ps if c in p.enables],
460                 'disabling_processes': [p for p in ps if c in p.disables]}
461
462     @_get_item(Condition)
463     def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
464         """Show title history of Condition of ?id=."""
465         return {'condition': c}
466
467     @_get_item(Condition)
468     def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
469         """Show description historys of Condition of ?id=."""
470         return {'condition': c}
471
472     @_get_item(Process)
473     def do_GET_process(self, process: Process) -> dict[str, object]:
474         """Show Process of ?id=."""
475         owner_ids = self._params.get_all_int('step_to')
476         owned_ids = self._params.get_all_int('has_step')
477         title_64 = self._params.get_str('title_b64')
478         if title_64:
479             try:
480                 title = b64decode(title_64.encode()).decode()
481             except binascii_Exception as exc:
482                 msg = 'invalid base64 for ?title_b64='
483                 raise BadFormatException(msg) from exc
484             process.title.set(title)
485         preset_top_step = None
486         owners = process.used_as_step_by(self.conn)
487         for step_id in owner_ids:
488             owners += [Process.by_id(self.conn, step_id)]
489         for process_id in owned_ids:
490             Process.by_id(self.conn, process_id)  # to ensure ID exists
491             preset_top_step = process_id
492         return {'process': process, 'is_new': process.id_ is None,
493                 'preset_top_step': preset_top_step,
494                 'steps': process.get_steps(self.conn), 'owners': owners,
495                 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
496                 'process_candidates': Process.all(self.conn),
497                 'condition_candidates': Condition.all(self.conn)}
498
499     @_get_item(Process)
500     def do_GET_process_titles(self, p: Process) -> dict[str, object]:
501         """Show title history of Process of ?id=."""
502         return {'process': p}
503
504     @_get_item(Process)
505     def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
506         """Show description historys of Process of ?id=."""
507         return {'process': p}
508
509     @_get_item(Process)
510     def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
511         """Show default effort history of Process of ?id=."""
512         return {'process': p}
513
514     def do_GET_processes(self) -> dict[str, object]:
515         """Show all Processes."""
516         pattern = self._params.get_str('pattern')
517         sort_by = self._params.get_str('sort_by')
518         processes = Process.matching(self.conn, pattern)
519         sort_by = Process.sort_by(processes, sort_by)
520         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
521
522     # POST handlers
523
524     @staticmethod
525     def _delete_or_post(target_class: Any, redir_target: str = '/'
526                         ) -> Callable[..., Callable[[TaskHandler], str]]:
527         def decorator(f: Callable[..., str]
528                       ) -> Callable[[TaskHandler], str]:
529             def wrapper(self: TaskHandler) -> str:
530                 # pylint: disable=protected-access
531                 # (because pylint here fails to detect the use of wrapper as a
532                 # method to self with respective access privileges)
533                 id_ = self._params.get_int_or_none('id')
534                 for _ in self._form_data.get_all_str('delete'):
535                     if id_ is None:
536                         msg = 'trying to delete non-saved ' +\
537                                 f'{target_class.__name__}'
538                         raise NotFoundException(msg)
539                     item = target_class.by_id(self.conn, id_)
540                     item.remove(self.conn)
541                     return redir_target
542                 if target_class.can_create_by_id:
543                     item = target_class.by_id_or_create(self.conn, id_)
544                 else:
545                     item = target_class.by_id(self.conn, id_)
546                 return f(self, item)
547             return wrapper
548         return decorator
549
550     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
551         """Update history timestamps for VersionedAttribute."""
552         id_ = self._params.get_int_or_none('id')
553         item = cls.by_id(self.conn, id_)
554         attr = getattr(item, attr_name)
555         for k, v in self._form_data.get_first_strings_starting('at:').items():
556             old = k[3:]
557             if old[19:] != v:
558                 attr.reset_timestamp(old, f'{v}.0')
559         attr.save(self.conn)
560         return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
561
562     def do_POST_day(self) -> str:
563         """Update or insert Day of date and Todos mapped to it."""
564         # pylint: disable=too-many-locals
565         date = self._params.get_str('date')
566         day_comment = self._form_data.get_str('day_comment')
567         make_type = self._form_data.get_str('make_type')
568         old_todos = self._form_data.get_all_int('todo_id')
569         new_todos = self._form_data.get_all_int('new_todo')
570         comments = self._form_data.get_all_str('comment')
571         efforts = self._form_data.get_all_floats_or_nones('effort')
572         done_todos = self._form_data.get_all_int('done')
573         for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
574             raise BadFormatException('"done" field refers to unknown Todo')
575         is_done = [t_id in done_todos for t_id in old_todos]
576         if not (len(old_todos) == len(is_done) == len(comments)
577                 == len(efforts)):
578             msg = 'not equal number each of number of todo_id, comments, ' +\
579                     'and efforts inputs'
580             raise BadFormatException(msg)
581         day = Day.by_id_or_create(self.conn, date)
582         day.comment = day_comment
583         day.save(self.conn)
584         for process_id in sorted(new_todos):
585             if 'empty' == make_type:
586                 process = Process.by_id(self.conn, process_id)
587                 todo = Todo(None, process, False, date)
588                 todo.save(self.conn)
589             else:
590                 Todo.create_with_children(self.conn, process_id, date)
591         for i, todo_id in enumerate(old_todos):
592             todo = Todo.by_id(self.conn, todo_id)
593             todo.is_done = is_done[i]
594             todo.comment = comments[i]
595             todo.effort = efforts[i]
596             todo.save(self.conn)
597         return f'/day?date={date}&make_type={make_type}'
598
599     @_delete_or_post(Todo, '/')
600     def do_POST_todo(self, todo: Todo) -> str:
601         """Update Todo and its children."""
602         # pylint: disable=too-many-locals
603         adopted_child_ids = self._form_data.get_all_int('adopt')
604         processes_to_make_full = self._form_data.get_all_int('make_full')
605         processes_to_make_empty = self._form_data.get_all_int('make_empty')
606         fill_fors = self._form_data.get_first_strings_starting('fill_for_')
607         effort = self._form_data.get_str('effort', ignore_strict=True)
608         conditions = self._form_data.get_all_int('conditions')
609         disables = self._form_data.get_all_int('disables')
610         blockers = self._form_data.get_all_int('blockers')
611         enables = self._form_data.get_all_int('enables')
612         is_done = len(self._form_data.get_all_str('done')) > 0
613         calendarize = len(self._form_data.get_all_str('calendarize')) > 0
614         comment = self._form_data.get_str('comment', ignore_strict=True)
615         for v in fill_fors.values():
616             if v.startswith('make_empty_'):
617                 processes_to_make_empty += [int(v[11:])]
618             elif v.startswith('make_full_'):
619                 processes_to_make_full += [int(v[10:])]
620             elif v != 'ignore':
621                 adopted_child_ids += [int(v)]
622         to_remove = []
623         for child in todo.children:
624             assert isinstance(child.id_, int)
625             if child.id_ not in adopted_child_ids:
626                 to_remove += [child.id_]
627         for id_ in to_remove:
628             child = Todo.by_id(self.conn, id_)
629             todo.remove_child(child)
630         for child_id in adopted_child_ids:
631             if child_id in [c.id_ for c in todo.children]:
632                 continue
633             child = Todo.by_id(self.conn, child_id)
634             todo.add_child(child)
635         for process_id in processes_to_make_empty:
636             process = Process.by_id(self.conn, process_id)
637             made = Todo(None, process, False, todo.date)
638             made.save(self.conn)
639             todo.add_child(made)
640         for process_id in processes_to_make_full:
641             made = Todo.create_with_children(self.conn, process_id, todo.date)
642             todo.add_child(made)
643         todo.effort = float(effort) if effort else None
644         todo.set_conditions(self.conn, conditions)
645         todo.set_blockers(self.conn, blockers)
646         todo.set_enables(self.conn, enables)
647         todo.set_disables(self.conn, disables)
648         todo.is_done = is_done
649         todo.calendarize = calendarize
650         todo.comment = comment
651         todo.save(self.conn)
652         return f'/todo?id={todo.id_}'
653
654     def do_POST_process_descriptions(self) -> str:
655         """Update history timestamps for Process.description."""
656         return self._change_versioned_timestamps(Process, 'description')
657
658     def do_POST_process_efforts(self) -> str:
659         """Update history timestamps for Process.effort."""
660         return self._change_versioned_timestamps(Process, 'effort')
661
662     def do_POST_process_titles(self) -> str:
663         """Update history timestamps for Process.title."""
664         return self._change_versioned_timestamps(Process, 'title')
665
666     @_delete_or_post(Process, '/processes')
667     def do_POST_process(self, process: Process) -> str:
668         """Update or insert Process of ?id= and fields defined in postvars."""
669         # pylint: disable=too-many-locals
670         # pylint: disable=too-many-statements
671         title = self._form_data.get_str('title')
672         description = self._form_data.get_str('description')
673         effort = self._form_data.get_float('effort')
674         conditions = self._form_data.get_all_int('conditions')
675         blockers = self._form_data.get_all_int('blockers')
676         enables = self._form_data.get_all_int('enables')
677         disables = self._form_data.get_all_int('disables')
678         calendarize = self._form_data.get_all_str('calendarize') != []
679         suppresses = self._form_data.get_all_int('suppresses')
680         step_of = self._form_data.get_all_str('step_of')
681         keep_steps = self._form_data.get_all_int('keep_step')
682         step_ids = self._form_data.get_all_int('steps')
683         new_top_steps = self._form_data.get_all_str('new_top_step')
684         step_process_id_to = {}
685         step_parent_id_to = {}
686         new_steps_to = {}
687         for step_id in step_ids:
688             name = f'new_step_to_{step_id}'
689             new_steps_to[step_id] = self._form_data.get_all_int(name)
690         for step_id in keep_steps:
691             name = f'step_{step_id}_process_id'
692             step_process_id_to[step_id] = self._form_data.get_int(name)
693             name = f'step_{step_id}_parent_id'
694             step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
695         process.title.set(title)
696         process.description.set(description)
697         process.effort.set(effort)
698         process.set_conditions(self.conn, conditions)
699         process.set_blockers(self.conn, blockers)
700         process.set_enables(self.conn, enables)
701         process.set_disables(self.conn, disables)
702         process.calendarize = calendarize
703         process.save(self.conn)
704         assert isinstance(process.id_, int)
705         new_step_title = None
706         steps: list[ProcessStep] = []
707         for step_id in keep_steps:
708             if step_id not in step_ids:
709                 raise BadFormatException('trying to keep unknown step')
710             step = ProcessStep(step_id, process.id_,
711                                step_process_id_to[step_id],
712                                step_parent_id_to[step_id])
713             steps += [step]
714         for step_id in step_ids:
715             new = [ProcessStep(None, process.id_, step_process_id, step_id)
716                    for step_process_id in new_steps_to[step_id]]
717             steps += new
718         for step_identifier in new_top_steps:
719             try:
720                 step_process_id = int(step_identifier)
721                 step = ProcessStep(None, process.id_, step_process_id, None)
722                 steps += [step]
723             except ValueError:
724                 new_step_title = step_identifier
725         process.set_steps(self.conn, steps)
726         process.set_step_suppressions(self.conn, suppresses)
727         owners_to_set = []
728         new_owner_title = None
729         for owner_identifier in step_of:
730             try:
731                 owners_to_set += [int(owner_identifier)]
732             except ValueError:
733                 new_owner_title = owner_identifier
734         process.set_owners(self.conn, owners_to_set)
735         params = f'id={process.id_}'
736         if new_step_title:
737             title_b64_encoded = b64encode(new_step_title.encode()).decode()
738             params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
739         elif new_owner_title:
740             title_b64_encoded = b64encode(new_owner_title.encode()).decode()
741             params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
742         process.save(self.conn)
743         return f'/process?{params}'
744
745     def do_POST_condition_descriptions(self) -> str:
746         """Update history timestamps for Condition.description."""
747         return self._change_versioned_timestamps(Condition, 'description')
748
749     def do_POST_condition_titles(self) -> str:
750         """Update history timestamps for Condition.title."""
751         return self._change_versioned_timestamps(Condition, 'title')
752
753     @_delete_or_post(Condition, '/conditions')
754     def do_POST_condition(self, condition: Condition) -> str:
755         """Update/insert Condition of ?id= and fields defined in postvars."""
756         is_active = self._form_data.get_str('is_active') == 'True'
757         title = self._form_data.get_str('title')
758         description = self._form_data.get_str('description')
759         condition.is_active = is_active
760         condition.title.set(title)
761         condition.description.set(description)
762         condition.save(self.conn)
763         return f'/condition?id={condition.id_}'