home · contact · privacy
Empty cache more often to avoid race conditions.
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
15                                  NotFoundException)
16 from plomtask.db import DatabaseConnection, DatabaseFile
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo
20
21 TEMPLATES_DIR = 'templates'
22
23
24 class TaskServer(HTTPServer):
25     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
26
27     def __init__(self, db_file: DatabaseFile,
28                  *args: Any, **kwargs: Any) -> None:
29         super().__init__(*args, **kwargs)
30         self.db = db_file
31         self.headers: list[tuple[str, str]] = []
32         self._render_mode = 'html'
33         self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
34
35     def set_json_mode(self) -> None:
36         """Make server send JSON instead of HTML responses."""
37         self._render_mode = 'json'
38         self.headers += [('Content-Type', 'application/json')]
39
40     @staticmethod
41     def ctx_to_json(ctx: dict[str, object]) -> str:
42         """Render ctx into JSON string."""
43         def walk_ctx(node: object) -> Any:
44             if hasattr(node, 'as_dict_into_reference'):
45                 if hasattr(node, 'id_') and node.id_ is not None:
46                     return node.as_dict_into_reference(library)
47             if hasattr(node, 'as_dict'):
48                 return node.as_dict
49             if isinstance(node, (list, tuple)):
50                 return [walk_ctx(x) for x in node]
51             if isinstance(node, dict):
52                 d = {}
53                 for k, v in node.items():
54                     d[k] = walk_ctx(v)
55                 return d
56             if isinstance(node, HandledException):
57                 return str(node)
58             return node
59         library: dict[str, dict[str | int, object]] = {}
60         for k, v in ctx.items():
61             ctx[k] = walk_ctx(v)
62         ctx['_library'] = library
63         return json_dumps(ctx)
64
65     def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
66         """Render ctx according to self._render_mode.."""
67         tmpl_name = f'{tmpl_name}.{self._render_mode}'
68         if 'html' == self._render_mode:
69             template = self._jinja.get_template(tmpl_name)
70             return template.render(ctx)
71         return self.__class__.ctx_to_json(ctx)
72
73
74 class InputsParser:
75     """Wrapper for validating and retrieving dict-like HTTP inputs."""
76
77     def __init__(self, dict_: dict[str, list[str]],
78                  strictness: bool = True) -> None:
79         self.inputs = dict_
80         self.strict = strictness
81
82     def get_str(self, key: str, default: str = '',
83                 ignore_strict: bool = False) -> str:
84         """Retrieve single/first string value of key, or default."""
85         if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
86             if self.strict and not ignore_strict:
87                 raise BadFormatException(f'no value found for key {key}')
88             return default
89         return self.inputs[key][0]
90
91     def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
92         """Retrieve dict of (first) strings at key starting with prefix."""
93         ret = {}
94         for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
95             ret[key] = self.inputs[key][0]
96         return ret
97
98     def get_int(self, key: str) -> int:
99         """Retrieve single/first value of key as int, error if empty."""
100         val = self.get_int_or_none(key)
101         if val is None:
102             raise BadFormatException(f'unexpected empty value for: {key}')
103         return val
104
105     def get_int_or_none(self, key: str) -> int | None:
106         """Retrieve single/first value of key as int, return None if empty."""
107         val = self.get_str(key, ignore_strict=True)
108         if val == '':
109             return None
110         try:
111             return int(val)
112         except ValueError as e:
113             msg = f'cannot int form field value for key {key}: {val}'
114             raise BadFormatException(msg) from e
115
116     def get_float(self, key: str) -> float:
117         """Retrieve float value of key from self.postvars."""
118         val = self.get_str(key)
119         try:
120             return float(val)
121         except ValueError as e:
122             msg = f'cannot float form field value for key {key}: {val}'
123             raise BadFormatException(msg) from e
124
125     def get_all_str(self, key: str) -> list[str]:
126         """Retrieve list of string values at key."""
127         if key not in self.inputs.keys():
128             return []
129         return self.inputs[key]
130
131     def get_all_int(self, key: str) -> list[int]:
132         """Retrieve list of int values at key."""
133         all_str = self.get_all_str(key)
134         try:
135             return [int(s) for s in all_str if len(s) > 0]
136         except ValueError as e:
137             msg = f'cannot int a form field value for key {key} in: {all_str}'
138             raise BadFormatException(msg) from e
139
140     def get_all_floats_or_nones(self, key: str) -> list[float | None]:
141         """Retrieve list of float value at key, None if empty strings."""
142         ret: list[float | None] = []
143         for val in self.get_all_str(key):
144             if '' == val:
145                 ret += [None]
146             else:
147                 try:
148                     ret += [float(val)]
149                 except ValueError as e:
150                     msg = f'cannot float form field value for key {key}: {val}'
151                     raise BadFormatException(msg) from e
152         return ret
153
154
155 class TaskHandler(BaseHTTPRequestHandler):
156     """Handles single HTTP request."""
157     # pylint: disable=too-many-public-methods
158     server: TaskServer
159     conn: DatabaseConnection
160     _site: str
161     _form_data: InputsParser
162     _params: InputsParser
163
164     def _send_page(self,
165                    ctx: dict[str, Any],
166                    tmpl_name: str,
167                    code: int = 200
168                    ) -> None:
169         """Send ctx as proper HTTP response."""
170         body = self.server.render(ctx, tmpl_name)
171         self.send_response(code)
172         for header_tuple in self.server.headers:
173             self.send_header(*header_tuple)
174         self.end_headers()
175         self.wfile.write(bytes(body, 'utf-8'))
176
177     @staticmethod
178     def _request_wrapper(http_method: str, not_found_msg: str
179                          ) -> Callable[..., Callable[[TaskHandler], None]]:
180         """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
181
182         Among other things, conditionally cleans all caches, but only on POST
183         requests, as only those are expected to change the states of objects
184         that may be cached, and certainly only those are expected to write any
185         changes to the database. We want to call them as early though as
186         possible here, either exactly after the specific request handler
187         returns successfully, or right after any exception is triggered –
188         otherwise, race conditions become plausible.
189
190         Note that any POST attempt, even a failed one, may end in problematic
191         inconsistencies:
192
193         - if the POST handler experiences an Exception, changes to objects
194           won't get written to the DB, but the changed objects may remain in
195           the cache and affect other objects despite their possibly illegal
196           state
197
198         - even if an object was just saved to the DB, we cannot be sure its
199           current state is completely identical to what we'd get if loading it
200           fresh from the DB (e.g. currently Process.n_owners is only updated
201           when loaded anew via .from_table_row, nor is its state written to
202           the DB by .save; a questionable design choice, but proof that we
203           have no guarantee that objects' .save stores all their states we'd
204           prefer at their most up-to-date.
205         """
206
207         def clear_caches() -> None:
208             for cls in (Day, Todo, Condition, Process, ProcessStep):
209                 assert hasattr(cls, 'empty_cache')
210                 cls.empty_cache()
211
212         def decorator(f: Callable[..., str | None]
213                       ) -> Callable[[TaskHandler], None]:
214             def wrapper(self: TaskHandler) -> None:
215                 # pylint: disable=protected-access
216                 # (because pylint here fails to detect the use of wrapper as a
217                 # method to self with respective access privileges)
218                 try:
219                     self.conn = DatabaseConnection(self.server.db)
220                     parsed_url = urlparse(self.path)
221                     self._site = path_split(parsed_url.path)[1]
222                     params = parse_qs(parsed_url.query, strict_parsing=True)
223                     self._params = InputsParser(params, False)
224                     handler_name = f'do_{http_method}_{self._site}'
225                     if hasattr(self, handler_name):
226                         handler = getattr(self, handler_name)
227                         redir_target = f(self, handler)
228                         if 'POST' != http_method:
229                            clear_caches()
230                         if redir_target:
231                             self.send_response(302)
232                             self.send_header('Location', redir_target)
233                             self.end_headers()
234                     else:
235                         msg = f'{not_found_msg}: {self._site}'
236                         raise NotFoundException(msg)
237                 except HandledException as error:
238                     if 'POST' != http_method:
239                        clear_caches()
240                     ctx = {'msg': error}
241                     self._send_page(ctx, 'msg', error.http_code)
242                 finally:
243                     self.conn.close()
244             return wrapper
245         return decorator
246
247     @_request_wrapper('GET', 'Unknown page')
248     def do_GET(self, handler: Callable[[], str | dict[str, object]]
249                ) -> str | None:
250         """Render page with result of handler, or redirect if result is str."""
251         tmpl_name = f'{self._site}'
252         ctx_or_redir_target = handler()
253         if isinstance(ctx_or_redir_target, str):
254             return ctx_or_redir_target
255         self._send_page(ctx_or_redir_target, tmpl_name)
256         return None
257
258     @_request_wrapper('POST', 'Unknown POST target')
259     def do_POST(self, handler: Callable[[], str]) -> str:
260         """Handle POST with handler, prepare redirection to result."""
261         length = int(self.headers['content-length'])
262         postvars = parse_qs(self.rfile.read(length).decode(),
263                             keep_blank_values=True, strict_parsing=True)
264         self._form_data = InputsParser(postvars)
265         redir_target = handler()
266         self.conn.commit()
267         return redir_target
268
269     # GET handlers
270
271     @staticmethod
272     def _get_item(target_class: Any
273                   ) -> Callable[..., Callable[[TaskHandler],
274                                               dict[str, object]]]:
275         def decorator(f: Callable[..., dict[str, object]]
276                       ) -> Callable[[TaskHandler], dict[str, object]]:
277             def wrapper(self: TaskHandler) -> dict[str, object]:
278                 # pylint: disable=protected-access
279                 # (because pylint here fails to detect the use of wrapper as a
280                 # method to self with respective access privileges)
281                 id_ = self._params.get_int_or_none('id')
282                 if target_class.can_create_by_id:
283                     item = target_class.by_id_or_create(self.conn, id_)
284                 else:
285                     item = target_class.by_id(self.conn, id_)
286                 return f(self, item)
287             return wrapper
288         return decorator
289
290     def do_GET_(self) -> str:
291         """Return redirect target on GET /."""
292         return '/day'
293
294     def _do_GET_calendar(self) -> dict[str, object]:
295         """Show Days from ?start= to ?end=.
296
297         Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
298         same, the only difference being the HTML template they are rendered to,
299         which .do_GET selects from their method name.
300         """
301         start = self._params.get_str('start')
302         end = self._params.get_str('end')
303         if not end:
304             end = date_in_n_days(366)
305         ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
306         days, start, end = ret
307         days = Day.with_filled_gaps(days, start, end)
308         today = date_in_n_days(0)
309         return {'start': start, 'end': end, 'days': days, 'today': today}
310
311     def do_GET_calendar(self) -> dict[str, object]:
312         """Show Days from ?start= to ?end= – normal view."""
313         return self._do_GET_calendar()
314
315     def do_GET_calendar_txt(self) -> dict[str, object]:
316         """Show Days from ?start= to ?end= – minimalist view."""
317         return self._do_GET_calendar()
318
319     def do_GET_day(self) -> dict[str, object]:
320         """Show single Day of ?date=."""
321         date = self._params.get_str('date', date_in_n_days(0))
322         day = Day.by_id_or_create(self.conn, date)
323         make_type = self._params.get_str('make_type')
324         conditions_present = []
325         enablers_for = {}
326         disablers_for = {}
327         for todo in day.todos:
328             for condition in todo.conditions + todo.blockers:
329                 if condition not in conditions_present:
330                     conditions_present += [condition]
331                     enablers_for[condition.id_] = [p for p in
332                                                    Process.all(self.conn)
333                                                    if condition in p.enables]
334                     disablers_for[condition.id_] = [p for p in
335                                                     Process.all(self.conn)
336                                                     if condition in p.disables]
337         seen_todos: set[int] = set()
338         top_nodes = [t.get_step_tree(seen_todos)
339                      for t in day.todos if not t.parents]
340         return {'day': day,
341                 'top_nodes': top_nodes,
342                 'make_type': make_type,
343                 'enablers_for': enablers_for,
344                 'disablers_for': disablers_for,
345                 'conditions_present': conditions_present,
346                 'processes': Process.all(self.conn)}
347
348     @_get_item(Todo)
349     def do_GET_todo(self, todo: Todo) -> dict[str, object]:
350         """Show single Todo of ?id=."""
351
352         @dataclass
353         class TodoStepsNode:
354             """Collect what's useful for Todo steps tree display."""
355             id_: int
356             todo: Todo | None
357             process: Process | None
358             children: list[TodoStepsNode]  # pylint: disable=undefined-variable
359             fillable: bool = False
360
361         def walk_process_steps(id_: int,
362                                process_step_nodes: list[ProcessStepsNode],
363                                steps_nodes: list[TodoStepsNode]) -> None:
364             for process_step_node in process_step_nodes:
365                 id_ += 1
366                 node = TodoStepsNode(id_, None, process_step_node.process, [])
367                 steps_nodes += [node]
368                 walk_process_steps(id_, list(process_step_node.steps.values()),
369                                    node.children)
370
371         def walk_todo_steps(id_: int, todos: list[Todo],
372                             steps_nodes: list[TodoStepsNode]) -> None:
373             for todo in todos:
374                 matched = False
375                 for match in [item for item in steps_nodes
376                               if item.process
377                               and item.process == todo.process]:
378                     match.todo = todo
379                     matched = True
380                     for child in match.children:
381                         child.fillable = True
382                     walk_todo_steps(id_, todo.children, match.children)
383                 if not matched:
384                     id_ += 1
385                     node = TodoStepsNode(id_, todo, None, [])
386                     steps_nodes += [node]
387                     walk_todo_steps(id_, todo.children, node.children)
388
389         def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
390                                     ) -> set[int]:
391             ids = set()
392             for node in steps_nodes:
393                 if not node.todo:
394                     assert isinstance(node.process, Process)
395                     assert isinstance(node.process.id_, int)
396                     ids.add(node.process.id_)
397                 ids = ids | collect_adoptables_keys(node.children)
398             return ids
399
400         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
401         process_tree = todo.process.get_steps(self.conn, None)
402         steps_todo_to_process: list[TodoStepsNode] = []
403         walk_process_steps(0, list(process_tree.values()),
404                            steps_todo_to_process)
405         for steps_node in steps_todo_to_process:
406             steps_node.fillable = True
407         walk_todo_steps(len(steps_todo_to_process), todo_steps,
408                         steps_todo_to_process)
409         adoptables: dict[int, list[Todo]] = {}
410         any_adoptables = [Todo.by_id(self.conn, t.id_)
411                           for t in Todo.by_date(self.conn, todo.date)
412                           if t.id_ is not None
413                           and t != todo]
414         for id_ in collect_adoptables_keys(steps_todo_to_process):
415             adoptables[id_] = [t for t in any_adoptables
416                                if t.process.id_ == id_]
417         return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
418                 'adoption_candidates_for': adoptables,
419                 'process_candidates': Process.all(self.conn),
420                 'todo_candidates': any_adoptables,
421                 'condition_candidates': Condition.all(self.conn)}
422
423     def do_GET_todos(self) -> dict[str, object]:
424         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
425         sort_by = self._params.get_str('sort_by')
426         start = self._params.get_str('start')
427         end = self._params.get_str('end')
428         process_id = self._params.get_int_or_none('process_id')
429         comment_pattern = self._params.get_str('comment_pattern')
430         todos = []
431         ret = Todo.by_date_range_with_limits(self.conn, (start, end))
432         todos_by_date_range, start, end = ret
433         todos = [t for t in todos_by_date_range
434                  if comment_pattern in t.comment
435                  and ((not process_id) or t.process.id_ == process_id)]
436         sort_by = Todo.sort_by(todos, sort_by)
437         return {'start': start, 'end': end, 'process_id': process_id,
438                 'comment_pattern': comment_pattern, 'todos': todos,
439                 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
440
441     def do_GET_conditions(self) -> dict[str, object]:
442         """Show all Conditions."""
443         pattern = self._params.get_str('pattern')
444         sort_by = self._params.get_str('sort_by')
445         conditions = Condition.matching(self.conn, pattern)
446         sort_by = Condition.sort_by(conditions, sort_by)
447         return {'conditions': conditions,
448                 'sort_by': sort_by,
449                 'pattern': pattern}
450
451     @_get_item(Condition)
452     def do_GET_condition(self, c: Condition) -> dict[str, object]:
453         """Show Condition of ?id=."""
454         ps = Process.all(self.conn)
455         return {'condition': c, 'is_new': c.id_ is None,
456                 'enabled_processes': [p for p in ps if c in p.conditions],
457                 'disabled_processes': [p for p in ps if c in p.blockers],
458                 'enabling_processes': [p for p in ps if c in p.enables],
459                 'disabling_processes': [p for p in ps if c in p.disables]}
460
461     @_get_item(Condition)
462     def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
463         """Show title history of Condition of ?id=."""
464         return {'condition': c}
465
466     @_get_item(Condition)
467     def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
468         """Show description historys of Condition of ?id=."""
469         return {'condition': c}
470
471     @_get_item(Process)
472     def do_GET_process(self, process: Process) -> dict[str, object]:
473         """Show Process of ?id=."""
474         owner_ids = self._params.get_all_int('step_to')
475         owned_ids = self._params.get_all_int('has_step')
476         title_64 = self._params.get_str('title_b64')
477         if title_64:
478             title = b64decode(title_64.encode()).decode()
479             process.title.set(title)
480         owners = process.used_as_step_by(self.conn)
481         for step_id in owner_ids:
482             owners += [Process.by_id(self.conn, step_id)]
483         preset_top_step = None
484         for process_id in owned_ids:
485             preset_top_step = process_id
486         return {'process': process, 'is_new': process.id_ is None,
487                 'preset_top_step': preset_top_step,
488                 'steps': process.get_steps(self.conn), 'owners': owners,
489                 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
490                 'process_candidates': Process.all(self.conn),
491                 'condition_candidates': Condition.all(self.conn)}
492
493     @_get_item(Process)
494     def do_GET_process_titles(self, p: Process) -> dict[str, object]:
495         """Show title history of Process of ?id=."""
496         return {'process': p}
497
498     @_get_item(Process)
499     def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
500         """Show description historys of Process of ?id=."""
501         return {'process': p}
502
503     @_get_item(Process)
504     def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
505         """Show default effort history of Process of ?id=."""
506         return {'process': p}
507
508     def do_GET_processes(self) -> dict[str, object]:
509         """Show all Processes."""
510         pattern = self._params.get_str('pattern')
511         sort_by = self._params.get_str('sort_by')
512         processes = Process.matching(self.conn, pattern)
513         sort_by = Process.sort_by(processes, sort_by)
514         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
515
516     # POST handlers
517
518     @staticmethod
519     def _delete_or_post(target_class: Any, redir_target: str = '/'
520                         ) -> Callable[..., Callable[[TaskHandler], str]]:
521         def decorator(f: Callable[..., str]
522                       ) -> Callable[[TaskHandler], str]:
523             def wrapper(self: TaskHandler) -> str:
524                 # pylint: disable=protected-access
525                 # (because pylint here fails to detect the use of wrapper as a
526                 # method to self with respective access privileges)
527                 id_ = self._params.get_int_or_none('id')
528                 for _ in self._form_data.get_all_str('delete'):
529                     if id_ is None:
530                         msg = 'trying to delete non-saved ' +\
531                                 f'{target_class.__name__}'
532                         raise NotFoundException(msg)
533                     item = target_class.by_id(self.conn, id_)
534                     item.remove(self.conn)
535                     return redir_target
536                 if target_class.can_create_by_id:
537                     item = target_class.by_id_or_create(self.conn, id_)
538                 else:
539                     item = target_class.by_id(self.conn, id_)
540                 return f(self, item)
541             return wrapper
542         return decorator
543
544     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
545         """Update history timestamps for VersionedAttribute."""
546         id_ = self._params.get_int_or_none('id')
547         item = cls.by_id(self.conn, id_)
548         attr = getattr(item, attr_name)
549         for k, v in self._form_data.get_first_strings_starting('at:').items():
550             old = k[3:]
551             if old[19:] != v:
552                 attr.reset_timestamp(old, f'{v}.0')
553         attr.save(self.conn)
554         return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
555
556     def do_POST_day(self) -> str:
557         """Update or insert Day of date and Todos mapped to it."""
558         # pylint: disable=too-many-locals
559         date = self._params.get_str('date')
560         day_comment = self._form_data.get_str('day_comment')
561         make_type = self._form_data.get_str('make_type')
562         old_todos = self._form_data.get_all_int('todo_id')
563         new_todos = self._form_data.get_all_int('new_todo')
564         comments = self._form_data.get_all_str('comment')
565         efforts = self._form_data.get_all_floats_or_nones('effort')
566         done_todos = self._form_data.get_all_int('done')
567         for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
568             raise BadFormatException('"done" field refers to unknown Todo')
569         is_done = [t_id in done_todos for t_id in old_todos]
570         if not (len(old_todos) == len(is_done) == len(comments)
571                 == len(efforts)):
572             msg = 'not equal number each of number of todo_id, comments, ' +\
573                     'and efforts inputs'
574             raise BadFormatException(msg)
575         day = Day.by_id_or_create(self.conn, date)
576         day.comment = day_comment
577         day.save(self.conn)
578         for process_id in sorted(new_todos):
579             if 'empty' == make_type:
580                 process = Process.by_id(self.conn, process_id)
581                 todo = Todo(None, process, False, date)
582                 todo.save(self.conn)
583             else:
584                 Todo.create_with_children(self.conn, process_id, date)
585         for i, todo_id in enumerate(old_todos):
586             todo = Todo.by_id(self.conn, todo_id)
587             todo.is_done = is_done[i]
588             todo.comment = comments[i]
589             todo.effort = efforts[i]
590             todo.save(self.conn)
591         return f'/day?date={date}&make_type={make_type}'
592
593     @_delete_or_post(Todo, '/')
594     def do_POST_todo(self, todo: Todo) -> str:
595         """Update Todo and its children."""
596         # pylint: disable=too-many-locals
597         adopted_child_ids = self._form_data.get_all_int('adopt')
598         processes_to_make_full = self._form_data.get_all_int('make_full')
599         processes_to_make_empty = self._form_data.get_all_int('make_empty')
600         fill_fors = self._form_data.get_first_strings_starting('fill_for_')
601         effort = self._form_data.get_str('effort', ignore_strict=True)
602         conditions = self._form_data.get_all_int('conditions')
603         disables = self._form_data.get_all_int('disables')
604         blockers = self._form_data.get_all_int('blockers')
605         enables = self._form_data.get_all_int('enables')
606         is_done = len(self._form_data.get_all_str('done')) > 0
607         calendarize = len(self._form_data.get_all_str('calendarize')) > 0
608         comment = self._form_data.get_str('comment', ignore_strict=True)
609         for v in fill_fors.values():
610             if v.startswith('make_empty_'):
611                 processes_to_make_empty += [int(v[11:])]
612             elif v.startswith('make_full_'):
613                 processes_to_make_full += [int(v[10:])]
614             elif v != 'ignore':
615                 adopted_child_ids += [int(v)]
616         to_remove = []
617         for child in todo.children:
618             assert isinstance(child.id_, int)
619             if child.id_ not in adopted_child_ids:
620                 to_remove += [child.id_]
621         for id_ in to_remove:
622             child = Todo.by_id(self.conn, id_)
623             todo.remove_child(child)
624         for child_id in adopted_child_ids:
625             if child_id in [c.id_ for c in todo.children]:
626                 continue
627             child = Todo.by_id(self.conn, child_id)
628             todo.add_child(child)
629         for process_id in processes_to_make_empty:
630             process = Process.by_id(self.conn, process_id)
631             made = Todo(None, process, False, todo.date)
632             made.save(self.conn)
633             todo.add_child(made)
634         for process_id in processes_to_make_full:
635             made = Todo.create_with_children(self.conn, process_id, todo.date)
636             todo.add_child(made)
637         todo.effort = float(effort) if effort else None
638         todo.set_conditions(self.conn, conditions)
639         todo.set_blockers(self.conn, blockers)
640         todo.set_enables(self.conn, enables)
641         todo.set_disables(self.conn, disables)
642         todo.is_done = is_done
643         todo.calendarize = calendarize
644         todo.comment = comment
645         todo.save(self.conn)
646         return f'/todo?id={todo.id_}'
647
648     def do_POST_process_descriptions(self) -> str:
649         """Update history timestamps for Process.description."""
650         return self._change_versioned_timestamps(Process, 'description')
651
652     def do_POST_process_efforts(self) -> str:
653         """Update history timestamps for Process.effort."""
654         return self._change_versioned_timestamps(Process, 'effort')
655
656     def do_POST_process_titles(self) -> str:
657         """Update history timestamps for Process.title."""
658         return self._change_versioned_timestamps(Process, 'title')
659
660     @_delete_or_post(Process, '/processes')
661     def do_POST_process(self, process: Process) -> str:
662         """Update or insert Process of ?id= and fields defined in postvars."""
663         # pylint: disable=too-many-locals
664         # pylint: disable=too-many-statements
665         title = self._form_data.get_str('title')
666         description = self._form_data.get_str('description')
667         effort = self._form_data.get_float('effort')
668         conditions = self._form_data.get_all_int('conditions')
669         blockers = self._form_data.get_all_int('blockers')
670         enables = self._form_data.get_all_int('enables')
671         disables = self._form_data.get_all_int('disables')
672         calendarize = self._form_data.get_all_str('calendarize') != []
673         suppresses = self._form_data.get_all_int('suppresses')
674         step_of = self._form_data.get_all_str('step_of')
675         keep_steps = self._form_data.get_all_int('keep_step')
676         step_ids = self._form_data.get_all_int('steps')
677         new_top_steps = self._form_data.get_all_str('new_top_step')
678         step_process_id_to = {}
679         step_parent_id_to = {}
680         new_steps_to = {}
681         for step_id in step_ids:
682             name = f'new_step_to_{step_id}'
683             new_steps_to[step_id] = self._form_data.get_all_int(name)
684         for step_id in keep_steps:
685             name = f'step_{step_id}_process_id'
686             step_process_id_to[step_id] = self._form_data.get_int(name)
687             name = f'step_{step_id}_parent_id'
688             step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
689         process.title.set(title)
690         process.description.set(description)
691         process.effort.set(effort)
692         process.set_conditions(self.conn, conditions)
693         process.set_blockers(self.conn, blockers)
694         process.set_enables(self.conn, enables)
695         process.set_disables(self.conn, disables)
696         process.calendarize = calendarize
697         process.save(self.conn)
698         assert isinstance(process.id_, int)
699         new_step_title = None
700         steps: list[ProcessStep] = []
701         for step_id in keep_steps:
702             if step_id not in step_ids:
703                 raise BadFormatException('trying to keep unknown step')
704             step = ProcessStep(step_id, process.id_,
705                                step_process_id_to[step_id],
706                                step_parent_id_to[step_id])
707             steps += [step]
708         for step_id in step_ids:
709             new = [ProcessStep(None, process.id_, step_process_id, step_id)
710                    for step_process_id in new_steps_to[step_id]]
711             steps += new
712         for step_identifier in new_top_steps:
713             try:
714                 step_process_id = int(step_identifier)
715                 step = ProcessStep(None, process.id_, step_process_id, None)
716                 steps += [step]
717             except ValueError:
718                 new_step_title = step_identifier
719         process.set_steps(self.conn, steps)
720         process.set_step_suppressions(self.conn, suppresses)
721         owners_to_set = []
722         new_owner_title = None
723         for owner_identifier in step_of:
724             try:
725                 owners_to_set += [int(owner_identifier)]
726             except ValueError:
727                 new_owner_title = owner_identifier
728         process.set_owners(self.conn, owners_to_set)
729         params = f'id={process.id_}'
730         if new_step_title:
731             title_b64_encoded = b64encode(new_step_title.encode()).decode()
732             params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
733         elif new_owner_title:
734             title_b64_encoded = b64encode(new_owner_title.encode()).decode()
735             params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
736         process.save(self.conn)
737         return f'/process?{params}'
738
739     def do_POST_condition_descriptions(self) -> str:
740         """Update history timestamps for Condition.description."""
741         return self._change_versioned_timestamps(Condition, 'description')
742
743     def do_POST_condition_titles(self) -> str:
744         """Update history timestamps for Condition.title."""
745         return self._change_versioned_timestamps(Condition, 'title')
746
747     @_delete_or_post(Condition, '/conditions')
748     def do_POST_condition(self, condition: Condition) -> str:
749         """Update/insert Condition of ?id= and fields defined in postvars."""
750         is_active = self._form_data.get_str('is_active') == 'True'
751         title = self._form_data.get_str('title')
752         description = self._form_data.get_str('description')
753         condition.is_active = is_active
754         condition.title.set(title)
755         condition.description.set(description)
756         condition.save(self.conn)
757         return f'/condition?id={condition.id_}'