home · contact · privacy
28812dffc4bd53e44dbb6aef640ebadd881df8d4
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
15                                  NotFoundException)
16 from plomtask.db import DatabaseConnection, DatabaseFile
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo
20
21 TEMPLATES_DIR = 'templates'
22
23
24 class TaskServer(HTTPServer):
25     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
26
27     def __init__(self, db_file: DatabaseFile,
28                  *args: Any, **kwargs: Any) -> None:
29         super().__init__(*args, **kwargs)
30         self.db = db_file
31         self.headers: list[tuple[str, str]] = []
32         self._render_mode = 'html'
33         self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
34
35     def set_json_mode(self) -> None:
36         """Make server send JSON instead of HTML responses."""
37         self._render_mode = 'json'
38         self.headers += [('Content-Type', 'application/json')]
39
40     @staticmethod
41     def ctx_to_json(ctx: dict[str, object]) -> str:
42         """Render ctx into JSON string."""
43         def walk_ctx(node: object) -> Any:
44             if hasattr(node, 'as_dict_into_reference'):
45                 if hasattr(node, 'id_') and node.id_ is not None:
46                     return node.as_dict_into_reference(library)
47             if hasattr(node, 'as_dict'):
48                 return node.as_dict
49             if isinstance(node, (list, tuple)):
50                 return [walk_ctx(x) for x in node]
51             if isinstance(node, dict):
52                 d = {}
53                 for k, v in node.items():
54                     d[k] = walk_ctx(v)
55                 return d
56             if isinstance(node, HandledException):
57                 return str(node)
58             return node
59         library: dict[str, dict[str | int, object]] = {}
60         for k, v in ctx.items():
61             ctx[k] = walk_ctx(v)
62         ctx['_library'] = library
63         return json_dumps(ctx)
64
65     def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
66         """Render ctx according to self._render_mode.."""
67         tmpl_name = f'{tmpl_name}.{self._render_mode}'
68         if 'html' == self._render_mode:
69             template = self._jinja.get_template(tmpl_name)
70             return template.render(ctx)
71         return self.__class__.ctx_to_json(ctx)
72
73
74 class InputsParser:
75     """Wrapper for validating and retrieving dict-like HTTP inputs."""
76
77     def __init__(self, dict_: dict[str, list[str]],
78                  strictness: bool = True) -> None:
79         self.inputs = dict_
80         self.strict = strictness
81
82     def get_str(self, key: str, default: str = '',
83                 ignore_strict: bool = False) -> str:
84         """Retrieve single/first string value of key, or default."""
85         if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
86             if self.strict and not ignore_strict:
87                 raise BadFormatException(f'no value found for key {key}')
88             return default
89         return self.inputs[key][0]
90
91     def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
92         """Retrieve dict of (first) strings at key starting with prefix."""
93         ret = {}
94         for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
95             ret[key] = self.inputs[key][0]
96         return ret
97
98     def get_int(self, key: str) -> int:
99         """Retrieve single/first value of key as int, error if empty."""
100         val = self.get_int_or_none(key)
101         if val is None:
102             raise BadFormatException(f'unexpected empty value for: {key}')
103         return val
104
105     def get_int_or_none(self, key: str) -> int | None:
106         """Retrieve single/first value of key as int, return None if empty."""
107         val = self.get_str(key, ignore_strict=True)
108         if val == '':
109             return None
110         try:
111             return int(val)
112         except ValueError as e:
113             msg = f'cannot int form field value for key {key}: {val}'
114             raise BadFormatException(msg) from e
115
116     def get_float(self, key: str) -> float:
117         """Retrieve float value of key from self.postvars."""
118         val = self.get_str(key)
119         try:
120             return float(val)
121         except ValueError as e:
122             msg = f'cannot float form field value for key {key}: {val}'
123             raise BadFormatException(msg) from e
124
125     def get_all_str(self, key: str) -> list[str]:
126         """Retrieve list of string values at key."""
127         if key not in self.inputs.keys():
128             return []
129         return self.inputs[key]
130
131     def get_all_int(self, key: str) -> list[int]:
132         """Retrieve list of int values at key."""
133         all_str = self.get_all_str(key)
134         try:
135             return [int(s) for s in all_str if len(s) > 0]
136         except ValueError as e:
137             msg = f'cannot int a form field value for key {key} in: {all_str}'
138             raise BadFormatException(msg) from e
139
140
141 class TaskHandler(BaseHTTPRequestHandler):
142     """Handles single HTTP request."""
143     # pylint: disable=too-many-public-methods
144     server: TaskServer
145     conn: DatabaseConnection
146     _site: str
147     _form_data: InputsParser
148     _params: InputsParser
149
150     def _send_page(self,
151                    ctx: dict[str, Any],
152                    tmpl_name: str,
153                    code: int = 200
154                    ) -> None:
155         """Send ctx as proper HTTP response."""
156         body = self.server.render(ctx, tmpl_name)
157         self.send_response(code)
158         for header_tuple in self.server.headers:
159             self.send_header(*header_tuple)
160         self.end_headers()
161         self.wfile.write(bytes(body, 'utf-8'))
162
163     @staticmethod
164     def _request_wrapper(http_method: str, not_found_msg: str
165                          ) -> Callable[..., Callable[[TaskHandler], None]]:
166         def decorator(f: Callable[..., str | None]
167                       ) -> Callable[[TaskHandler], None]:
168             def wrapper(self: TaskHandler) -> None:
169                 # pylint: disable=protected-access
170                 # (because pylint here fails to detect the use of wrapper as a
171                 # method to self with respective access privileges)
172                 try:
173                     self.conn = DatabaseConnection(self.server.db)
174                     parsed_url = urlparse(self.path)
175                     self._site = path_split(parsed_url.path)[1]
176                     params = parse_qs(parsed_url.query, strict_parsing=True)
177                     self._params = InputsParser(params, False)
178                     handler_name = f'do_{http_method}_{self._site}'
179                     if hasattr(self, handler_name):
180                         handler = getattr(self, handler_name)
181                         redir_target = f(self, handler)
182                         if redir_target:
183                             self.send_response(302)
184                             self.send_header('Location', redir_target)
185                             self.end_headers()
186                     else:
187                         msg = f'{not_found_msg}: {self._site}'
188                         raise NotFoundException(msg)
189                 except HandledException as error:
190                     for cls in (Day, Todo, Condition, Process, ProcessStep):
191                         assert hasattr(cls, 'empty_cache')
192                         cls.empty_cache()
193                     ctx = {'msg': error}
194                     self._send_page(ctx, 'msg', error.http_code)
195                 finally:
196                     self.conn.close()
197             return wrapper
198         return decorator
199
200     @_request_wrapper('GET', 'Unknown page')
201     def do_GET(self, handler: Callable[[], str | dict[str, object]]
202                ) -> str | None:
203         """Render page with result of handler, or redirect if result is str."""
204         tmpl_name = f'{self._site}'
205         ctx_or_redir_target = handler()
206         if isinstance(ctx_or_redir_target, str):
207             return ctx_or_redir_target
208         self._send_page(ctx_or_redir_target, tmpl_name)
209         return None
210
211     @_request_wrapper('POST', 'Unknown POST target')
212     def do_POST(self, handler: Callable[[], str]) -> str:
213         """Handle POST with handler, prepare redirection to result."""
214         length = int(self.headers['content-length'])
215         postvars = parse_qs(self.rfile.read(length).decode(),
216                             keep_blank_values=True, strict_parsing=True)
217         self._form_data = InputsParser(postvars)
218         redir_target = handler()
219         self.conn.commit()
220         return redir_target
221
222     # GET handlers
223
224     @staticmethod
225     def _get_item(target_class: Any
226                   ) -> Callable[..., Callable[[TaskHandler],
227                                               dict[str, object]]]:
228         def decorator(f: Callable[..., dict[str, object]]
229                       ) -> Callable[[TaskHandler], dict[str, object]]:
230             def wrapper(self: TaskHandler) -> dict[str, object]:
231                 # pylint: disable=protected-access
232                 # (because pylint here fails to detect the use of wrapper as a
233                 # method to self with respective access privileges)
234                 id_ = self._params.get_int_or_none('id')
235                 if target_class.can_create_by_id:
236                     item = target_class.by_id_or_create(self.conn, id_)
237                 else:
238                     item = target_class.by_id(self.conn, id_)
239                 return f(self, item)
240             return wrapper
241         return decorator
242
243     def do_GET_(self) -> str:
244         """Return redirect target on GET /."""
245         return '/day'
246
247     def _do_GET_calendar(self) -> dict[str, object]:
248         """Show Days from ?start= to ?end=.
249
250         Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
251         same, the only difference being the HTML template they are rendered to,
252         which .do_GET selects from their method name.
253         """
254         start = self._params.get_str('start')
255         end = self._params.get_str('end')
256         if not end:
257             end = date_in_n_days(366)
258         ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
259         days, start, end = ret
260         days = Day.with_filled_gaps(days, start, end)
261         today = date_in_n_days(0)
262         return {'start': start, 'end': end, 'days': days, 'today': today}
263
264     def do_GET_calendar(self) -> dict[str, object]:
265         """Show Days from ?start= to ?end= – normal view."""
266         return self._do_GET_calendar()
267
268     def do_GET_calendar_txt(self) -> dict[str, object]:
269         """Show Days from ?start= to ?end= – minimalist view."""
270         return self._do_GET_calendar()
271
272     def do_GET_day(self) -> dict[str, object]:
273         """Show single Day of ?date=."""
274         date = self._params.get_str('date', date_in_n_days(0))
275         day = Day.by_id_or_create(self.conn, date)
276         make_type = self._params.get_str('make_type')
277         conditions_present = []
278         enablers_for = {}
279         disablers_for = {}
280         for todo in day.todos:
281             for condition in todo.conditions + todo.blockers:
282                 if condition not in conditions_present:
283                     conditions_present += [condition]
284                     enablers_for[condition.id_] = [p for p in
285                                                    Process.all(self.conn)
286                                                    if condition in p.enables]
287                     disablers_for[condition.id_] = [p for p in
288                                                     Process.all(self.conn)
289                                                     if condition in p.disables]
290         seen_todos: set[int] = set()
291         top_nodes = [t.get_step_tree(seen_todos)
292                      for t in day.todos if not t.parents]
293         return {'day': day,
294                 'top_nodes': top_nodes,
295                 'make_type': make_type,
296                 'enablers_for': enablers_for,
297                 'disablers_for': disablers_for,
298                 'conditions_present': conditions_present,
299                 'processes': Process.all(self.conn)}
300
301     @_get_item(Todo)
302     def do_GET_todo(self, todo: Todo) -> dict[str, object]:
303         """Show single Todo of ?id=."""
304
305         @dataclass
306         class TodoStepsNode:
307             """Collect what's useful for Todo steps tree display."""
308             id_: int
309             todo: Todo | None
310             process: Process | None
311             children: list[TodoStepsNode]  # pylint: disable=undefined-variable
312             fillable: bool = False
313
314         def walk_process_steps(id_: int,
315                                process_step_nodes: list[ProcessStepsNode],
316                                steps_nodes: list[TodoStepsNode]) -> None:
317             for process_step_node in process_step_nodes:
318                 id_ += 1
319                 node = TodoStepsNode(id_, None, process_step_node.process, [])
320                 steps_nodes += [node]
321                 walk_process_steps(id_, list(process_step_node.steps.values()),
322                                    node.children)
323
324         def walk_todo_steps(id_: int, todos: list[Todo],
325                             steps_nodes: list[TodoStepsNode]) -> None:
326             for todo in todos:
327                 matched = False
328                 for match in [item for item in steps_nodes
329                               if item.process
330                               and item.process == todo.process]:
331                     match.todo = todo
332                     matched = True
333                     for child in match.children:
334                         child.fillable = True
335                     walk_todo_steps(id_, todo.children, match.children)
336                 if not matched:
337                     id_ += 1
338                     node = TodoStepsNode(id_, todo, None, [])
339                     steps_nodes += [node]
340                     walk_todo_steps(id_, todo.children, node.children)
341
342         def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
343                                     ) -> set[int]:
344             ids = set()
345             for node in steps_nodes:
346                 if not node.todo:
347                     assert isinstance(node.process, Process)
348                     assert isinstance(node.process.id_, int)
349                     ids.add(node.process.id_)
350                 ids = ids | collect_adoptables_keys(node.children)
351             return ids
352
353         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
354         process_tree = todo.process.get_steps(self.conn, None)
355         steps_todo_to_process: list[TodoStepsNode] = []
356         walk_process_steps(0, list(process_tree.values()),
357                            steps_todo_to_process)
358         for steps_node in steps_todo_to_process:
359             steps_node.fillable = True
360         walk_todo_steps(len(steps_todo_to_process), todo_steps,
361                         steps_todo_to_process)
362         adoptables: dict[int, list[Todo]] = {}
363         any_adoptables = [Todo.by_id(self.conn, t.id_)
364                           for t in Todo.by_date(self.conn, todo.date)
365                           if t.id_ is not None
366                           and t != todo]
367         for id_ in collect_adoptables_keys(steps_todo_to_process):
368             adoptables[id_] = [t for t in any_adoptables
369                                if t.process.id_ == id_]
370         return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
371                 'adoption_candidates_for': adoptables,
372                 'process_candidates': Process.all(self.conn),
373                 'todo_candidates': any_adoptables,
374                 'condition_candidates': Condition.all(self.conn)}
375
376     def do_GET_todos(self) -> dict[str, object]:
377         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
378         sort_by = self._params.get_str('sort_by')
379         start = self._params.get_str('start')
380         end = self._params.get_str('end')
381         process_id = self._params.get_int_or_none('process_id')
382         comment_pattern = self._params.get_str('comment_pattern')
383         todos = []
384         ret = Todo.by_date_range_with_limits(self.conn, (start, end))
385         todos_by_date_range, start, end = ret
386         todos = [t for t in todos_by_date_range
387                  if comment_pattern in t.comment
388                  and ((not process_id) or t.process.id_ == process_id)]
389         if sort_by == 'doneness':
390             todos.sort(key=lambda t: t.is_done)
391         elif sort_by == '-doneness':
392             todos.sort(key=lambda t: t.is_done, reverse=True)
393         elif sort_by == 'title':
394             todos.sort(key=lambda t: t.title_then)
395         elif sort_by == '-title':
396             todos.sort(key=lambda t: t.title_then, reverse=True)
397         elif sort_by == 'comment':
398             todos.sort(key=lambda t: t.comment)
399         elif sort_by == '-comment':
400             todos.sort(key=lambda t: t.comment, reverse=True)
401         elif sort_by == '-date':
402             todos.sort(key=lambda t: t.date, reverse=True)
403         else:
404             todos.sort(key=lambda t: t.date)
405             sort_by = 'title'
406         return {'start': start, 'end': end, 'process_id': process_id,
407                 'comment_pattern': comment_pattern, 'todos': todos,
408                 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
409
410     def do_GET_conditions(self) -> dict[str, object]:
411         """Show all Conditions."""
412         pattern = self._params.get_str('pattern')
413         conditions = Condition.matching(self.conn, pattern)
414         sort_by = self._params.get_str('sort_by')
415         if sort_by == 'is_active':
416             conditions.sort(key=lambda c: c.is_active)
417         elif sort_by == '-is_active':
418             conditions.sort(key=lambda c: c.is_active, reverse=True)
419         elif sort_by == '-title':
420             conditions.sort(key=lambda c: c.title.newest, reverse=True)
421         else:
422             conditions.sort(key=lambda c: c.title.newest)
423             sort_by = 'title'
424         return {'conditions': conditions,
425                 'sort_by': sort_by,
426                 'pattern': pattern}
427
428     @_get_item(Condition)
429     def do_GET_condition(self, c: Condition) -> dict[str, object]:
430         """Show Condition of ?id=."""
431         ps = Process.all(self.conn)
432         return {'condition': c, 'is_new': c.id_ is None,
433                 'enabled_processes': [p for p in ps if c in p.conditions],
434                 'disabled_processes': [p for p in ps if c in p.blockers],
435                 'enabling_processes': [p for p in ps if c in p.enables],
436                 'disabling_processes': [p for p in ps if c in p.disables]}
437
438     @_get_item(Condition)
439     def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
440         """Show title history of Condition of ?id=."""
441         return {'condition': c}
442
443     @_get_item(Condition)
444     def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
445         """Show description historys of Condition of ?id=."""
446         return {'condition': c}
447
448     @_get_item(Process)
449     def do_GET_process(self, process: Process) -> dict[str, object]:
450         """Show Process of ?id=."""
451         title_64 = self._params.get_str('title_b64')
452         if title_64:
453             title = b64decode(title_64.encode()).decode()
454             process.title.set(title)
455         owners = process.used_as_step_by(self.conn)
456         for step_id in self._params.get_all_int('step_to'):
457             owners += [Process.by_id(self.conn, step_id)]
458         preset_top_step = None
459         for process_id in self._params.get_all_int('has_step'):
460             preset_top_step = process_id
461         return {'process': process, 'is_new': process.id_ is None,
462                 'preset_top_step': preset_top_step,
463                 'steps': process.get_steps(self.conn), 'owners': owners,
464                 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
465                 'process_candidates': Process.all(self.conn),
466                 'condition_candidates': Condition.all(self.conn)}
467
468     @_get_item(Process)
469     def do_GET_process_titles(self, p: Process) -> dict[str, object]:
470         """Show title history of Process of ?id=."""
471         return {'process': p}
472
473     @_get_item(Process)
474     def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
475         """Show description historys of Process of ?id=."""
476         return {'process': p}
477
478     @_get_item(Process)
479     def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
480         """Show default effort history of Process of ?id=."""
481         return {'process': p}
482
483     def do_GET_processes(self) -> dict[str, object]:
484         """Show all Processes."""
485         pattern = self._params.get_str('pattern')
486         processes = Process.matching(self.conn, pattern)
487         sort_by = self._params.get_str('sort_by')
488         if sort_by == 'steps':
489             processes.sort(key=lambda p: len(p.explicit_steps))
490         elif sort_by == '-steps':
491             processes.sort(key=lambda p: len(p.explicit_steps), reverse=True)
492         elif sort_by == 'owners':
493             processes.sort(key=lambda p: p.n_owners or 0)
494         elif sort_by == '-owners':
495             processes.sort(key=lambda p: p.n_owners or 0, reverse=True)
496         elif sort_by == 'effort':
497             processes.sort(key=lambda p: p.effort.newest)
498         elif sort_by == '-effort':
499             processes.sort(key=lambda p: p.effort.newest, reverse=True)
500         elif sort_by == '-title':
501             processes.sort(key=lambda p: p.title.newest, reverse=True)
502         else:
503             processes.sort(key=lambda p: p.title.newest)
504             sort_by = 'title'
505         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
506
507     # POST handlers
508
509     @staticmethod
510     def _delete_or_post(target_class: Any, redir_target: str = '/'
511                         ) -> Callable[..., Callable[[TaskHandler], str]]:
512         def decorator(f: Callable[..., str]
513                       ) -> Callable[[TaskHandler], str]:
514             def wrapper(self: TaskHandler) -> str:
515                 # pylint: disable=protected-access
516                 # (because pylint here fails to detect the use of wrapper as a
517                 # method to self with respective access privileges)
518                 id_ = self._params.get_int_or_none('id')
519                 for _ in self._form_data.get_all_str('delete'):
520                     if id_ is None:
521                         msg = 'trying to delete non-saved ' +\
522                                 f'{target_class.__name__}'
523                         raise NotFoundException(msg)
524                     item = target_class.by_id(self.conn, id_)
525                     item.remove(self.conn)
526                     return redir_target
527                 if target_class.can_create_by_id:
528                     item = target_class.by_id_or_create(self.conn, id_)
529                 else:
530                     item = target_class.by_id(self.conn, id_)
531                 return f(self, item)
532             return wrapper
533         return decorator
534
535     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
536         """Update history timestamps for VersionedAttribute."""
537         id_ = self._params.get_int_or_none('id')
538         item = cls.by_id(self.conn, id_)
539         attr = getattr(item, attr_name)
540         for k, v in self._form_data.get_first_strings_starting('at:').items():
541             old = k[3:]
542             if old[19:] != v:
543                 attr.reset_timestamp(old, f'{v}.0')
544         attr.save(self.conn)
545         return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
546
547     def do_POST_day(self) -> str:
548         """Update or insert Day of date and Todos mapped to it."""
549         # pylint: disable=too-many-locals
550         date = self._params.get_str('date')
551         day_comment = self._form_data.get_str('day_comment')
552         make_type = self._form_data.get_str('make_type')
553         old_todos = self._form_data.get_all_int('todo_id')
554         new_todos = self._form_data.get_all_int('new_todo')
555         is_done = [t_id in self._form_data.get_all_int('done')
556                    for t_id in old_todos]
557         comments = self._form_data.get_all_str('comment')
558         efforts = [float(effort) if effort else None
559                    for effort in self._form_data.get_all_str('effort')]
560         if old_todos and 3*[len(old_todos)] != [len(is_done), len(comments),
561                                                 len(efforts)]:
562             msg = 'not equal number each of number of todo_id, comments, ' +\
563                     'and efforts inputs'
564             raise BadFormatException(msg)
565         day = Day.by_id_or_create(self.conn, date)
566         day.comment = day_comment
567         day.save(self.conn)
568         for process_id in sorted(new_todos):
569             if 'empty' == make_type:
570                 process = Process.by_id(self.conn, process_id)
571                 todo = Todo(None, process, False, date)
572                 todo.save(self.conn)
573             else:
574                 Todo.create_with_children(self.conn, process_id, date)
575         for i, todo_id in enumerate(old_todos):
576             todo = Todo.by_id(self.conn, todo_id)
577             todo.is_done = is_done[i]
578             todo.comment = comments[i]
579             todo.effort = efforts[i]
580             todo.save(self.conn)
581         return f'/day?date={date}&make_type={make_type}'
582
583     @_delete_or_post(Todo, '/')
584     def do_POST_todo(self, todo: Todo) -> str:
585         """Update Todo and its children."""
586         # pylint: disable=too-many-locals
587         adopted_child_ids = self._form_data.get_all_int('adopt')
588         processes_to_make_full = self._form_data.get_all_int('make_full')
589         processes_to_make_empty = self._form_data.get_all_int('make_empty')
590         fill_fors = self._form_data.get_first_strings_starting('fill_for_')
591         effort = self._form_data.get_str('effort', ignore_strict=True)
592         conditions = self._form_data.get_all_int('conditions')
593         disables = self._form_data.get_all_int('disables')
594         blockers = self._form_data.get_all_int('blockers')
595         enables = self._form_data.get_all_int('enables')
596         is_done = len(self._form_data.get_all_str('done')) > 0
597         calendarize = len(self._form_data.get_all_str('calendarize')) > 0
598         comment = self._form_data.get_str('comment', ignore_strict=True)
599         for v in fill_fors.values():
600             if v.startswith('make_empty_'):
601                 processes_to_make_empty += [int(v[11:])]
602             elif v.startswith('make_full_'):
603                 processes_to_make_full += [int(v[10:])]
604             elif v != 'ignore':
605                 adopted_child_ids += [int(v)]
606         to_remove = []
607         for child in todo.children:
608             assert isinstance(child.id_, int)
609             if child.id_ not in adopted_child_ids:
610                 to_remove += [child.id_]
611         for id_ in to_remove:
612             child = Todo.by_id(self.conn, id_)
613             todo.remove_child(child)
614         for child_id in adopted_child_ids:
615             if child_id in [c.id_ for c in todo.children]:
616                 continue
617             child = Todo.by_id(self.conn, child_id)
618             todo.add_child(child)
619         for process_id in processes_to_make_empty:
620             process = Process.by_id(self.conn, process_id)
621             made = Todo(None, process, False, todo.date)
622             made.save(self.conn)
623             todo.add_child(made)
624         for process_id in processes_to_make_full:
625             made = Todo.create_with_children(self.conn, process_id, todo.date)
626             todo.add_child(made)
627         todo.effort = float(effort) if effort else None
628         todo.set_conditions(self.conn, conditions)
629         todo.set_blockers(self.conn, blockers)
630         todo.set_enables(self.conn, enables)
631         todo.set_disables(self.conn, disables)
632         todo.is_done = is_done
633         todo.calendarize = calendarize
634         todo.comment = comment
635         todo.save(self.conn)
636         return f'/todo?id={todo.id_}'
637
638     def do_POST_process_descriptions(self) -> str:
639         """Update history timestamps for Process.description."""
640         return self._change_versioned_timestamps(Process, 'description')
641
642     def do_POST_process_efforts(self) -> str:
643         """Update history timestamps for Process.effort."""
644         return self._change_versioned_timestamps(Process, 'effort')
645
646     def do_POST_process_titles(self) -> str:
647         """Update history timestamps for Process.title."""
648         return self._change_versioned_timestamps(Process, 'title')
649
650     @_delete_or_post(Process, '/processes')
651     def do_POST_process(self, process: Process) -> str:
652         """Update or insert Process of ?id= and fields defined in postvars."""
653         # pylint: disable=too-many-locals
654         # pylint: disable=too-many-statements
655         title = self._form_data.get_str('title')
656         description = self._form_data.get_str('description')
657         effort = self._form_data.get_float('effort')
658         conditions = self._form_data.get_all_int('conditions')
659         blockers = self._form_data.get_all_int('blockers')
660         enables = self._form_data.get_all_int('enables')
661         disables = self._form_data.get_all_int('disables')
662         calendarize = self._form_data.get_all_str('calendarize') != []
663         suppresses = self._form_data.get_all_int('suppresses')
664         step_of = self._form_data.get_all_str('step_of')
665         keep_steps = self._form_data.get_all_int('keep_step')
666         step_ids = self._form_data.get_all_int('steps')
667         new_top_steps = self._form_data.get_all_str('new_top_step')
668         step_process_id_to = {}
669         step_parent_id_to = {}
670         new_steps_to = {}
671         for step_id in step_ids:
672             name = f'new_step_to_{step_id}'
673             new_steps_to[step_id] = self._form_data.get_all_int(name)
674         for step_id in keep_steps:
675             name = f'step_{step_id}_process_id'
676             step_process_id_to[step_id] = self._form_data.get_int(name)
677             name = f'step_{step_id}_parent_id'
678             step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
679         process.title.set(title)
680         process.description.set(description)
681         process.effort.set(effort)
682         process.set_conditions(self.conn, conditions)
683         process.set_blockers(self.conn, blockers)
684         process.set_enables(self.conn, enables)
685         process.set_disables(self.conn, disables)
686         process.calendarize = calendarize
687         process.save(self.conn)
688         assert isinstance(process.id_, int)
689         new_step_title = None
690         steps: list[ProcessStep] = []
691         for step_id in keep_steps:
692             if step_id not in step_ids:
693                 raise BadFormatException('trying to keep unknown step')
694             step = ProcessStep(step_id, process.id_,
695                                step_process_id_to[step_id],
696                                step_parent_id_to[step_id])
697             steps += [step]
698         for step_id in step_ids:
699             new = [ProcessStep(None, process.id_, step_process_id, step_id)
700                    for step_process_id in new_steps_to[step_id]]
701             steps += new
702         for step_identifier in new_top_steps:
703             try:
704                 step_process_id = int(step_identifier)
705                 step = ProcessStep(None, process.id_, step_process_id, None)
706                 steps += [step]
707             except ValueError:
708                 new_step_title = step_identifier
709         process.set_steps(self.conn, steps)
710         process.set_step_suppressions(self.conn, suppresses)
711         owners_to_set = []
712         new_owner_title = None
713         for owner_identifier in step_of:
714             try:
715                 owners_to_set += [int(owner_identifier)]
716             except ValueError:
717                 new_owner_title = owner_identifier
718         process.set_owners(self.conn, owners_to_set)
719         params = f'id={process.id_}'
720         if new_step_title:
721             title_b64_encoded = b64encode(new_step_title.encode()).decode()
722             params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
723         elif new_owner_title:
724             title_b64_encoded = b64encode(new_owner_title.encode()).decode()
725             params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
726         process.save(self.conn)
727         return f'/process?{params}'
728
729     def do_POST_condition_descriptions(self) -> str:
730         """Update history timestamps for Condition.description."""
731         return self._change_versioned_timestamps(Condition, 'description')
732
733     def do_POST_condition_titles(self) -> str:
734         """Update history timestamps for Condition.title."""
735         return self._change_versioned_timestamps(Condition, 'title')
736
737     @_delete_or_post(Condition, '/conditions')
738     def do_POST_condition(self, condition: Condition) -> str:
739         """Update/insert Condition of ?id= and fields defined in postvars."""
740         is_active = self._form_data.get_str('is_active') == 'True'
741         title = self._form_data.get_str('title')
742         description = self._form_data.get_str('description')
743         condition.is_active = is_active
744         condition.title.set(title)
745         condition.description.set(description)
746         condition.save(self.conn)
747         return f'/condition?id={condition.id_}'