home · contact · privacy
27066036b2c300e23f41f7d348e5090905c9f2f8
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
15                                  NotFoundException)
16 from plomtask.db import DatabaseConnection, DatabaseFile
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo
20
21 TEMPLATES_DIR = 'templates'
22
23
24 class TaskServer(HTTPServer):
25     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
26
27     def __init__(self, db_file: DatabaseFile,
28                  *args: Any, **kwargs: Any) -> None:
29         super().__init__(*args, **kwargs)
30         self.db = db_file
31         self.headers: list[tuple[str, str]] = []
32         self._render_mode = 'html'
33         self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
34
35     def set_json_mode(self) -> None:
36         """Make server send JSON instead of HTML responses."""
37         self._render_mode = 'json'
38         self.headers += [('Content-Type', 'application/json')]
39
40     @staticmethod
41     def ctx_to_json(ctx: dict[str, object]) -> str:
42         """Render ctx into JSON string."""
43         def walk_ctx(node: object) -> Any:
44             if hasattr(node, 'as_dict_into_reference'):
45                 if hasattr(node, 'id_') and node.id_ is not None:
46                     return node.as_dict_into_reference(library)
47             if hasattr(node, 'as_dict'):
48                 return node.as_dict
49             if isinstance(node, (list, tuple)):
50                 return [walk_ctx(x) for x in node]
51             if isinstance(node, dict):
52                 d = {}
53                 for k, v in node.items():
54                     d[k] = walk_ctx(v)
55                 return d
56             if isinstance(node, HandledException):
57                 return str(node)
58             return node
59         library: dict[str, dict[str | int, object]] = {}
60         for k, v in ctx.items():
61             ctx[k] = walk_ctx(v)
62         ctx['_library'] = library
63         return json_dumps(ctx)
64
65     def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
66         """Render ctx according to self._render_mode.."""
67         tmpl_name = f'{tmpl_name}.{self._render_mode}'
68         if 'html' == self._render_mode:
69             template = self._jinja.get_template(tmpl_name)
70             return template.render(ctx)
71         return self.__class__.ctx_to_json(ctx)
72
73
74 class InputsParser:
75     """Wrapper for validating and retrieving dict-like HTTP inputs."""
76
77     def __init__(self, dict_: dict[str, list[str]],
78                  strictness: bool = True) -> None:
79         self.inputs = dict_
80         self.strict = strictness
81
82     def get_str(self, key: str, default: str = '',
83                 ignore_strict: bool = False) -> str:
84         """Retrieve single/first string value of key, or default."""
85         if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
86             if self.strict and not ignore_strict:
87                 raise BadFormatException(f'no value found for key {key}')
88             return default
89         return self.inputs[key][0]
90
91     def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
92         """Retrieve dict of (first) strings at key starting with prefix."""
93         ret = {}
94         for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
95             ret[key] = self.inputs[key][0]
96         return ret
97
98     def get_int(self, key: str) -> int:
99         """Retrieve single/first value of key as int, error if empty."""
100         val = self.get_int_or_none(key)
101         if val is None:
102             raise BadFormatException(f'unexpected empty value for: {key}')
103         return val
104
105     def get_int_or_none(self, key: str) -> int | None:
106         """Retrieve single/first value of key as int, return None if empty."""
107         val = self.get_str(key, ignore_strict=True)
108         if val == '':
109             return None
110         try:
111             return int(val)
112         except ValueError as e:
113             msg = f'cannot int form field value for key {key}: {val}'
114             raise BadFormatException(msg) from e
115
116     def get_float(self, key: str) -> float:
117         """Retrieve float value of key from self.postvars."""
118         val = self.get_str(key)
119         try:
120             return float(val)
121         except ValueError as e:
122             msg = f'cannot float form field value for key {key}: {val}'
123             raise BadFormatException(msg) from e
124
125     def get_all_str(self, key: str) -> list[str]:
126         """Retrieve list of string values at key."""
127         if key not in self.inputs.keys():
128             return []
129         return self.inputs[key]
130
131     def get_all_int(self, key: str) -> list[int]:
132         """Retrieve list of int values at key."""
133         all_str = self.get_all_str(key)
134         try:
135             return [int(s) for s in all_str if len(s) > 0]
136         except ValueError as e:
137             msg = f'cannot int a form field value for key {key} in: {all_str}'
138             raise BadFormatException(msg) from e
139
140     def get_all_floats_or_nones(self, key: str) -> list[float | None]:
141         """Retrieve list of float value at key, None if empty strings."""
142         ret: list[float | None] = []
143         for val in self.get_all_str(key):
144             if '' == val:
145                 ret += [None]
146             else:
147                 try:
148                     ret += [float(val)]
149                 except ValueError as e:
150                     msg = f'cannot float form field value for key {key}: {val}'
151                     raise BadFormatException(msg) from e
152         return ret
153
154
155 class TaskHandler(BaseHTTPRequestHandler):
156     """Handles single HTTP request."""
157     # pylint: disable=too-many-public-methods
158     server: TaskServer
159     conn: DatabaseConnection
160     _site: str
161     _form_data: InputsParser
162     _params: InputsParser
163
164     def _send_page(self,
165                    ctx: dict[str, Any],
166                    tmpl_name: str,
167                    code: int = 200
168                    ) -> None:
169         """Send ctx as proper HTTP response."""
170         body = self.server.render(ctx, tmpl_name)
171         self.send_response(code)
172         for header_tuple in self.server.headers:
173             self.send_header(*header_tuple)
174         self.end_headers()
175         self.wfile.write(bytes(body, 'utf-8'))
176
177     @staticmethod
178     def _request_wrapper(http_method: str, not_found_msg: str
179                          ) -> Callable[..., Callable[[TaskHandler], None]]:
180         def decorator(f: Callable[..., str | None]
181                       ) -> Callable[[TaskHandler], None]:
182             def wrapper(self: TaskHandler) -> None:
183                 # pylint: disable=protected-access
184                 # (because pylint here fails to detect the use of wrapper as a
185                 # method to self with respective access privileges)
186                 try:
187                     self.conn = DatabaseConnection(self.server.db)
188                     parsed_url = urlparse(self.path)
189                     self._site = path_split(parsed_url.path)[1]
190                     params = parse_qs(parsed_url.query, strict_parsing=True)
191                     self._params = InputsParser(params, False)
192                     handler_name = f'do_{http_method}_{self._site}'
193                     if hasattr(self, handler_name):
194                         handler = getattr(self, handler_name)
195                         redir_target = f(self, handler)
196                         if redir_target:
197                             self.send_response(302)
198                             self.send_header('Location', redir_target)
199                             self.end_headers()
200                     else:
201                         msg = f'{not_found_msg}: {self._site}'
202                         raise NotFoundException(msg)
203                 except HandledException as error:
204                     for cls in (Day, Todo, Condition, Process, ProcessStep):
205                         assert hasattr(cls, 'empty_cache')
206                         cls.empty_cache()
207                     ctx = {'msg': error}
208                     self._send_page(ctx, 'msg', error.http_code)
209                 finally:
210                     self.conn.close()
211             return wrapper
212         return decorator
213
214     @_request_wrapper('GET', 'Unknown page')
215     def do_GET(self, handler: Callable[[], str | dict[str, object]]
216                ) -> str | None:
217         """Render page with result of handler, or redirect if result is str."""
218         tmpl_name = f'{self._site}'
219         ctx_or_redir_target = handler()
220         if isinstance(ctx_or_redir_target, str):
221             return ctx_or_redir_target
222         self._send_page(ctx_or_redir_target, tmpl_name)
223         return None
224
225     @_request_wrapper('POST', 'Unknown POST target')
226     def do_POST(self, handler: Callable[[], str]) -> str:
227         """Handle POST with handler, prepare redirection to result."""
228         length = int(self.headers['content-length'])
229         postvars = parse_qs(self.rfile.read(length).decode(),
230                             keep_blank_values=True, strict_parsing=True)
231         self._form_data = InputsParser(postvars)
232         redir_target = handler()
233         self.conn.commit()
234         return redir_target
235
236     # GET handlers
237
238     @staticmethod
239     def _get_item(target_class: Any
240                   ) -> Callable[..., Callable[[TaskHandler],
241                                               dict[str, object]]]:
242         def decorator(f: Callable[..., dict[str, object]]
243                       ) -> Callable[[TaskHandler], dict[str, object]]:
244             def wrapper(self: TaskHandler) -> dict[str, object]:
245                 # pylint: disable=protected-access
246                 # (because pylint here fails to detect the use of wrapper as a
247                 # method to self with respective access privileges)
248                 id_ = self._params.get_int_or_none('id')
249                 if target_class.can_create_by_id:
250                     item = target_class.by_id_or_create(self.conn, id_)
251                 else:
252                     item = target_class.by_id(self.conn, id_)
253                 return f(self, item)
254             return wrapper
255         return decorator
256
257     def do_GET_(self) -> str:
258         """Return redirect target on GET /."""
259         return '/day'
260
261     def _do_GET_calendar(self) -> dict[str, object]:
262         """Show Days from ?start= to ?end=.
263
264         Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
265         same, the only difference being the HTML template they are rendered to,
266         which .do_GET selects from their method name.
267         """
268         start = self._params.get_str('start')
269         end = self._params.get_str('end')
270         if not end:
271             end = date_in_n_days(366)
272         ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
273         days, start, end = ret
274         days = Day.with_filled_gaps(days, start, end)
275         today = date_in_n_days(0)
276         return {'start': start, 'end': end, 'days': days, 'today': today}
277
278     def do_GET_calendar(self) -> dict[str, object]:
279         """Show Days from ?start= to ?end= – normal view."""
280         return self._do_GET_calendar()
281
282     def do_GET_calendar_txt(self) -> dict[str, object]:
283         """Show Days from ?start= to ?end= – minimalist view."""
284         return self._do_GET_calendar()
285
286     def do_GET_day(self) -> dict[str, object]:
287         """Show single Day of ?date=."""
288         date = self._params.get_str('date', date_in_n_days(0))
289         day = Day.by_id_or_create(self.conn, date)
290         make_type = self._params.get_str('make_type')
291         conditions_present = []
292         enablers_for = {}
293         disablers_for = {}
294         for todo in day.todos:
295             for condition in todo.conditions + todo.blockers:
296                 if condition not in conditions_present:
297                     conditions_present += [condition]
298                     enablers_for[condition.id_] = [p for p in
299                                                    Process.all(self.conn)
300                                                    if condition in p.enables]
301                     disablers_for[condition.id_] = [p for p in
302                                                     Process.all(self.conn)
303                                                     if condition in p.disables]
304         seen_todos: set[int] = set()
305         top_nodes = [t.get_step_tree(seen_todos)
306                      for t in day.todos if not t.parents]
307         return {'day': day,
308                 'top_nodes': top_nodes,
309                 'make_type': make_type,
310                 'enablers_for': enablers_for,
311                 'disablers_for': disablers_for,
312                 'conditions_present': conditions_present,
313                 'processes': Process.all(self.conn)}
314
315     @_get_item(Todo)
316     def do_GET_todo(self, todo: Todo) -> dict[str, object]:
317         """Show single Todo of ?id=."""
318
319         @dataclass
320         class TodoStepsNode:
321             """Collect what's useful for Todo steps tree display."""
322             id_: int
323             todo: Todo | None
324             process: Process | None
325             children: list[TodoStepsNode]  # pylint: disable=undefined-variable
326             fillable: bool = False
327
328         def walk_process_steps(id_: int,
329                                process_step_nodes: list[ProcessStepsNode],
330                                steps_nodes: list[TodoStepsNode]) -> None:
331             for process_step_node in process_step_nodes:
332                 id_ += 1
333                 node = TodoStepsNode(id_, None, process_step_node.process, [])
334                 steps_nodes += [node]
335                 walk_process_steps(id_, list(process_step_node.steps.values()),
336                                    node.children)
337
338         def walk_todo_steps(id_: int, todos: list[Todo],
339                             steps_nodes: list[TodoStepsNode]) -> None:
340             for todo in todos:
341                 matched = False
342                 for match in [item for item in steps_nodes
343                               if item.process
344                               and item.process == todo.process]:
345                     match.todo = todo
346                     matched = True
347                     for child in match.children:
348                         child.fillable = True
349                     walk_todo_steps(id_, todo.children, match.children)
350                 if not matched:
351                     id_ += 1
352                     node = TodoStepsNode(id_, todo, None, [])
353                     steps_nodes += [node]
354                     walk_todo_steps(id_, todo.children, node.children)
355
356         def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
357                                     ) -> set[int]:
358             ids = set()
359             for node in steps_nodes:
360                 if not node.todo:
361                     assert isinstance(node.process, Process)
362                     assert isinstance(node.process.id_, int)
363                     ids.add(node.process.id_)
364                 ids = ids | collect_adoptables_keys(node.children)
365             return ids
366
367         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
368         process_tree = todo.process.get_steps(self.conn, None)
369         steps_todo_to_process: list[TodoStepsNode] = []
370         walk_process_steps(0, list(process_tree.values()),
371                            steps_todo_to_process)
372         for steps_node in steps_todo_to_process:
373             steps_node.fillable = True
374         walk_todo_steps(len(steps_todo_to_process), todo_steps,
375                         steps_todo_to_process)
376         adoptables: dict[int, list[Todo]] = {}
377         any_adoptables = [Todo.by_id(self.conn, t.id_)
378                           for t in Todo.by_date(self.conn, todo.date)
379                           if t.id_ is not None
380                           and t != todo]
381         for id_ in collect_adoptables_keys(steps_todo_to_process):
382             adoptables[id_] = [t for t in any_adoptables
383                                if t.process.id_ == id_]
384         return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
385                 'adoption_candidates_for': adoptables,
386                 'process_candidates': Process.all(self.conn),
387                 'todo_candidates': any_adoptables,
388                 'condition_candidates': Condition.all(self.conn)}
389
390     def do_GET_todos(self) -> dict[str, object]:
391         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
392         sort_by = self._params.get_str('sort_by')
393         start = self._params.get_str('start')
394         end = self._params.get_str('end')
395         process_id = self._params.get_int_or_none('process_id')
396         comment_pattern = self._params.get_str('comment_pattern')
397         todos = []
398         ret = Todo.by_date_range_with_limits(self.conn, (start, end))
399         todos_by_date_range, start, end = ret
400         todos = [t for t in todos_by_date_range
401                  if comment_pattern in t.comment
402                  and ((not process_id) or t.process.id_ == process_id)]
403         if sort_by == 'doneness':
404             todos.sort(key=lambda t: t.is_done)
405         elif sort_by == '-doneness':
406             todos.sort(key=lambda t: t.is_done, reverse=True)
407         elif sort_by == 'title':
408             todos.sort(key=lambda t: t.title_then)
409         elif sort_by == '-title':
410             todos.sort(key=lambda t: t.title_then, reverse=True)
411         elif sort_by == 'comment':
412             todos.sort(key=lambda t: t.comment)
413         elif sort_by == '-comment':
414             todos.sort(key=lambda t: t.comment, reverse=True)
415         elif sort_by == '-date':
416             todos.sort(key=lambda t: t.date, reverse=True)
417         else:
418             todos.sort(key=lambda t: t.date)
419             sort_by = 'title'
420         return {'start': start, 'end': end, 'process_id': process_id,
421                 'comment_pattern': comment_pattern, 'todos': todos,
422                 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
423
424     def do_GET_conditions(self) -> dict[str, object]:
425         """Show all Conditions."""
426         pattern = self._params.get_str('pattern')
427         conditions = Condition.matching(self.conn, pattern)
428         sort_by = self._params.get_str('sort_by')
429         if sort_by == 'is_active':
430             conditions.sort(key=lambda c: c.is_active)
431         elif sort_by == '-is_active':
432             conditions.sort(key=lambda c: c.is_active, reverse=True)
433         elif sort_by == '-title':
434             conditions.sort(key=lambda c: c.title.newest, reverse=True)
435         else:
436             conditions.sort(key=lambda c: c.title.newest)
437             sort_by = 'title'
438         return {'conditions': conditions,
439                 'sort_by': sort_by,
440                 'pattern': pattern}
441
442     @_get_item(Condition)
443     def do_GET_condition(self, c: Condition) -> dict[str, object]:
444         """Show Condition of ?id=."""
445         ps = Process.all(self.conn)
446         return {'condition': c, 'is_new': c.id_ is None,
447                 'enabled_processes': [p for p in ps if c in p.conditions],
448                 'disabled_processes': [p for p in ps if c in p.blockers],
449                 'enabling_processes': [p for p in ps if c in p.enables],
450                 'disabling_processes': [p for p in ps if c in p.disables]}
451
452     @_get_item(Condition)
453     def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
454         """Show title history of Condition of ?id=."""
455         return {'condition': c}
456
457     @_get_item(Condition)
458     def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
459         """Show description historys of Condition of ?id=."""
460         return {'condition': c}
461
462     @_get_item(Process)
463     def do_GET_process(self, process: Process) -> dict[str, object]:
464         """Show Process of ?id=."""
465         title_64 = self._params.get_str('title_b64')
466         if title_64:
467             title = b64decode(title_64.encode()).decode()
468             process.title.set(title)
469         owners = process.used_as_step_by(self.conn)
470         for step_id in self._params.get_all_int('step_to'):
471             owners += [Process.by_id(self.conn, step_id)]
472         preset_top_step = None
473         for process_id in self._params.get_all_int('has_step'):
474             preset_top_step = process_id
475         return {'process': process, 'is_new': process.id_ is None,
476                 'preset_top_step': preset_top_step,
477                 'steps': process.get_steps(self.conn), 'owners': owners,
478                 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
479                 'process_candidates': Process.all(self.conn),
480                 'condition_candidates': Condition.all(self.conn)}
481
482     @_get_item(Process)
483     def do_GET_process_titles(self, p: Process) -> dict[str, object]:
484         """Show title history of Process of ?id=."""
485         return {'process': p}
486
487     @_get_item(Process)
488     def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
489         """Show description historys of Process of ?id=."""
490         return {'process': p}
491
492     @_get_item(Process)
493     def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
494         """Show default effort history of Process of ?id=."""
495         return {'process': p}
496
497     def do_GET_processes(self) -> dict[str, object]:
498         """Show all Processes."""
499         pattern = self._params.get_str('pattern')
500         processes = Process.matching(self.conn, pattern)
501         sort_by = self._params.get_str('sort_by')
502         if sort_by == 'steps':
503             processes.sort(key=lambda p: len(p.explicit_steps))
504         elif sort_by == '-steps':
505             processes.sort(key=lambda p: len(p.explicit_steps), reverse=True)
506         elif sort_by == 'owners':
507             processes.sort(key=lambda p: p.n_owners or 0)
508         elif sort_by == '-owners':
509             processes.sort(key=lambda p: p.n_owners or 0, reverse=True)
510         elif sort_by == 'effort':
511             processes.sort(key=lambda p: p.effort.newest)
512         elif sort_by == '-effort':
513             processes.sort(key=lambda p: p.effort.newest, reverse=True)
514         elif sort_by == '-title':
515             processes.sort(key=lambda p: p.title.newest, reverse=True)
516         else:
517             processes.sort(key=lambda p: p.title.newest)
518             sort_by = 'title'
519         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
520
521     # POST handlers
522
523     @staticmethod
524     def _delete_or_post(target_class: Any, redir_target: str = '/'
525                         ) -> Callable[..., Callable[[TaskHandler], str]]:
526         def decorator(f: Callable[..., str]
527                       ) -> Callable[[TaskHandler], str]:
528             def wrapper(self: TaskHandler) -> str:
529                 # pylint: disable=protected-access
530                 # (because pylint here fails to detect the use of wrapper as a
531                 # method to self with respective access privileges)
532                 id_ = self._params.get_int_or_none('id')
533                 for _ in self._form_data.get_all_str('delete'):
534                     if id_ is None:
535                         msg = 'trying to delete non-saved ' +\
536                                 f'{target_class.__name__}'
537                         raise NotFoundException(msg)
538                     item = target_class.by_id(self.conn, id_)
539                     item.remove(self.conn)
540                     return redir_target
541                 if target_class.can_create_by_id:
542                     item = target_class.by_id_or_create(self.conn, id_)
543                 else:
544                     item = target_class.by_id(self.conn, id_)
545                 return f(self, item)
546             return wrapper
547         return decorator
548
549     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
550         """Update history timestamps for VersionedAttribute."""
551         id_ = self._params.get_int_or_none('id')
552         item = cls.by_id(self.conn, id_)
553         attr = getattr(item, attr_name)
554         for k, v in self._form_data.get_first_strings_starting('at:').items():
555             old = k[3:]
556             if old[19:] != v:
557                 attr.reset_timestamp(old, f'{v}.0')
558         attr.save(self.conn)
559         return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
560
561     def do_POST_day(self) -> str:
562         """Update or insert Day of date and Todos mapped to it."""
563         # pylint: disable=too-many-locals
564         date = self._params.get_str('date')
565         day_comment = self._form_data.get_str('day_comment')
566         make_type = self._form_data.get_str('make_type')
567         old_todos = self._form_data.get_all_int('todo_id')
568         new_todos = self._form_data.get_all_int('new_todo')
569         comments = self._form_data.get_all_str('comment')
570         efforts = self._form_data.get_all_floats_or_nones('effort')
571         done_todos = self._form_data.get_all_int('done')
572         for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
573             raise BadFormatException('"done" field refers to unknown Todo')
574         is_done = [t_id in done_todos for t_id in old_todos]
575         if not (len(old_todos) == len(is_done) == len(comments)
576                 == len(efforts)):
577             msg = 'not equal number each of number of todo_id, comments, ' +\
578                     'and efforts inputs'
579             raise BadFormatException(msg)
580         day = Day.by_id_or_create(self.conn, date)
581         day.comment = day_comment
582         day.save(self.conn)
583         for process_id in sorted(new_todos):
584             if 'empty' == make_type:
585                 process = Process.by_id(self.conn, process_id)
586                 todo = Todo(None, process, False, date)
587                 todo.save(self.conn)
588             else:
589                 Todo.create_with_children(self.conn, process_id, date)
590         for i, todo_id in enumerate(old_todos):
591             todo = Todo.by_id(self.conn, todo_id)
592             todo.is_done = is_done[i]
593             todo.comment = comments[i]
594             todo.effort = efforts[i]
595             todo.save(self.conn)
596         return f'/day?date={date}&make_type={make_type}'
597
598     @_delete_or_post(Todo, '/')
599     def do_POST_todo(self, todo: Todo) -> str:
600         """Update Todo and its children."""
601         # pylint: disable=too-many-locals
602         adopted_child_ids = self._form_data.get_all_int('adopt')
603         processes_to_make_full = self._form_data.get_all_int('make_full')
604         processes_to_make_empty = self._form_data.get_all_int('make_empty')
605         fill_fors = self._form_data.get_first_strings_starting('fill_for_')
606         effort = self._form_data.get_str('effort', ignore_strict=True)
607         conditions = self._form_data.get_all_int('conditions')
608         disables = self._form_data.get_all_int('disables')
609         blockers = self._form_data.get_all_int('blockers')
610         enables = self._form_data.get_all_int('enables')
611         is_done = len(self._form_data.get_all_str('done')) > 0
612         calendarize = len(self._form_data.get_all_str('calendarize')) > 0
613         comment = self._form_data.get_str('comment', ignore_strict=True)
614         for v in fill_fors.values():
615             if v.startswith('make_empty_'):
616                 processes_to_make_empty += [int(v[11:])]
617             elif v.startswith('make_full_'):
618                 processes_to_make_full += [int(v[10:])]
619             elif v != 'ignore':
620                 adopted_child_ids += [int(v)]
621         to_remove = []
622         for child in todo.children:
623             assert isinstance(child.id_, int)
624             if child.id_ not in adopted_child_ids:
625                 to_remove += [child.id_]
626         for id_ in to_remove:
627             child = Todo.by_id(self.conn, id_)
628             todo.remove_child(child)
629         for child_id in adopted_child_ids:
630             if child_id in [c.id_ for c in todo.children]:
631                 continue
632             child = Todo.by_id(self.conn, child_id)
633             todo.add_child(child)
634         for process_id in processes_to_make_empty:
635             process = Process.by_id(self.conn, process_id)
636             made = Todo(None, process, False, todo.date)
637             made.save(self.conn)
638             todo.add_child(made)
639         for process_id in processes_to_make_full:
640             made = Todo.create_with_children(self.conn, process_id, todo.date)
641             todo.add_child(made)
642         todo.effort = float(effort) if effort else None
643         todo.set_conditions(self.conn, conditions)
644         todo.set_blockers(self.conn, blockers)
645         todo.set_enables(self.conn, enables)
646         todo.set_disables(self.conn, disables)
647         todo.is_done = is_done
648         todo.calendarize = calendarize
649         todo.comment = comment
650         todo.save(self.conn)
651         return f'/todo?id={todo.id_}'
652
653     def do_POST_process_descriptions(self) -> str:
654         """Update history timestamps for Process.description."""
655         return self._change_versioned_timestamps(Process, 'description')
656
657     def do_POST_process_efforts(self) -> str:
658         """Update history timestamps for Process.effort."""
659         return self._change_versioned_timestamps(Process, 'effort')
660
661     def do_POST_process_titles(self) -> str:
662         """Update history timestamps for Process.title."""
663         return self._change_versioned_timestamps(Process, 'title')
664
665     @_delete_or_post(Process, '/processes')
666     def do_POST_process(self, process: Process) -> str:
667         """Update or insert Process of ?id= and fields defined in postvars."""
668         # pylint: disable=too-many-locals
669         # pylint: disable=too-many-statements
670         title = self._form_data.get_str('title')
671         description = self._form_data.get_str('description')
672         effort = self._form_data.get_float('effort')
673         conditions = self._form_data.get_all_int('conditions')
674         blockers = self._form_data.get_all_int('blockers')
675         enables = self._form_data.get_all_int('enables')
676         disables = self._form_data.get_all_int('disables')
677         calendarize = self._form_data.get_all_str('calendarize') != []
678         suppresses = self._form_data.get_all_int('suppresses')
679         step_of = self._form_data.get_all_str('step_of')
680         keep_steps = self._form_data.get_all_int('keep_step')
681         step_ids = self._form_data.get_all_int('steps')
682         new_top_steps = self._form_data.get_all_str('new_top_step')
683         step_process_id_to = {}
684         step_parent_id_to = {}
685         new_steps_to = {}
686         for step_id in step_ids:
687             name = f'new_step_to_{step_id}'
688             new_steps_to[step_id] = self._form_data.get_all_int(name)
689         for step_id in keep_steps:
690             name = f'step_{step_id}_process_id'
691             step_process_id_to[step_id] = self._form_data.get_int(name)
692             name = f'step_{step_id}_parent_id'
693             step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
694         process.title.set(title)
695         process.description.set(description)
696         process.effort.set(effort)
697         process.set_conditions(self.conn, conditions)
698         process.set_blockers(self.conn, blockers)
699         process.set_enables(self.conn, enables)
700         process.set_disables(self.conn, disables)
701         process.calendarize = calendarize
702         process.save(self.conn)
703         assert isinstance(process.id_, int)
704         new_step_title = None
705         steps: list[ProcessStep] = []
706         for step_id in keep_steps:
707             if step_id not in step_ids:
708                 raise BadFormatException('trying to keep unknown step')
709             step = ProcessStep(step_id, process.id_,
710                                step_process_id_to[step_id],
711                                step_parent_id_to[step_id])
712             steps += [step]
713         for step_id in step_ids:
714             new = [ProcessStep(None, process.id_, step_process_id, step_id)
715                    for step_process_id in new_steps_to[step_id]]
716             steps += new
717         for step_identifier in new_top_steps:
718             try:
719                 step_process_id = int(step_identifier)
720                 step = ProcessStep(None, process.id_, step_process_id, None)
721                 steps += [step]
722             except ValueError:
723                 new_step_title = step_identifier
724         process.set_steps(self.conn, steps)
725         process.set_step_suppressions(self.conn, suppresses)
726         owners_to_set = []
727         new_owner_title = None
728         for owner_identifier in step_of:
729             try:
730                 owners_to_set += [int(owner_identifier)]
731             except ValueError:
732                 new_owner_title = owner_identifier
733         process.set_owners(self.conn, owners_to_set)
734         params = f'id={process.id_}'
735         if new_step_title:
736             title_b64_encoded = b64encode(new_step_title.encode()).decode()
737             params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
738         elif new_owner_title:
739             title_b64_encoded = b64encode(new_owner_title.encode()).decode()
740             params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
741         process.save(self.conn)
742         return f'/process?{params}'
743
744     def do_POST_condition_descriptions(self) -> str:
745         """Update history timestamps for Condition.description."""
746         return self._change_versioned_timestamps(Condition, 'description')
747
748     def do_POST_condition_titles(self) -> str:
749         """Update history timestamps for Condition.title."""
750         return self._change_versioned_timestamps(Condition, 'title')
751
752     @_delete_or_post(Condition, '/conditions')
753     def do_POST_condition(self, condition: Condition) -> str:
754         """Update/insert Condition of ?id= and fields defined in postvars."""
755         is_active = self._form_data.get_str('is_active') == 'True'
756         title = self._form_data.get_str('title')
757         description = self._form_data.get_str('description')
758         condition.is_active = is_active
759         condition.title.set(title)
760         condition.description.set(description)
761         condition.save(self.conn)
762         return f'/condition?id={condition.id_}'