home · contact · privacy
0d58af30d6370477f8247300b49f6ad49eff23c4
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from binascii import Error as binascii_Exception
7 from http.server import BaseHTTPRequestHandler
8 from http.server import HTTPServer
9 from urllib.parse import urlparse, parse_qs
10 from json import dumps as json_dumps
11 from os.path import split as path_split
12 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
13 from plomtask.dating import date_in_n_days
14 from plomtask.days import Day
15 from plomtask.exceptions import (HandledException, BadFormatException,
16                                  NotFoundException)
17 from plomtask.db import DatabaseConnection, DatabaseFile
18 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
19 from plomtask.conditions import Condition
20 from plomtask.todos import Todo
21
22 TEMPLATES_DIR = 'templates'
23
24
25 class TaskServer(HTTPServer):
26     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
27
28     def __init__(self, db_file: DatabaseFile,
29                  *args: Any, **kwargs: Any) -> None:
30         super().__init__(*args, **kwargs)
31         self.db = db_file
32         self.headers: list[tuple[str, str]] = []
33         self._render_mode = 'html'
34         self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
35
36     def set_json_mode(self) -> None:
37         """Make server send JSON instead of HTML responses."""
38         self._render_mode = 'json'
39         self.headers += [('Content-Type', 'application/json')]
40
41     @staticmethod
42     def ctx_to_json(ctx: dict[str, object]) -> str:
43         """Render ctx into JSON string."""
44         def walk_ctx(node: object) -> Any:
45             if hasattr(node, 'as_dict_into_reference'):
46                 if hasattr(node, 'id_') and node.id_ is not None:
47                     return node.as_dict_into_reference(library)
48             if hasattr(node, 'as_dict'):
49                 return node.as_dict
50             if isinstance(node, (list, tuple)):
51                 return [walk_ctx(x) for x in node]
52             if isinstance(node, dict):
53                 d = {}
54                 for k, v in node.items():
55                     d[k] = walk_ctx(v)
56                 return d
57             if isinstance(node, HandledException):
58                 return str(node)
59             return node
60         library: dict[str, dict[str, object] | dict[int, object]] = {}
61         for k, v in ctx.items():
62             ctx[k] = walk_ctx(v)
63         ctx['_library'] = library
64         return json_dumps(ctx)
65
66     def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
67         """Render ctx according to self._render_mode.."""
68         tmpl_name = f'{tmpl_name}.{self._render_mode}'
69         if 'html' == self._render_mode:
70             template = self._jinja.get_template(tmpl_name)
71             return template.render(ctx)
72         return self.__class__.ctx_to_json(ctx)
73
74
75 class InputsParser:
76     """Wrapper for validating and retrieving dict-like HTTP inputs."""
77
78     def __init__(self, dict_: dict[str, list[str]],
79                  strictness: bool = True) -> None:
80         self.inputs = dict_
81         self.strict = strictness  # return None on absence of key, or fail?
82
83     def get_str(self, key: str, default: str = '',
84                 ignore_strict: bool = False) -> str:
85         """Retrieve single/first string value of key, or default."""
86         if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
87             if self.strict and not ignore_strict:
88                 raise NotFoundException(f'no value found for key {key}')
89             return default
90         return self.inputs[key][0]
91
92     def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
93         """Retrieve dict of (first) strings at key starting with prefix."""
94         ret = {}
95         for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
96             ret[key] = self.inputs[key][0]
97         return ret
98
99     def get_int(self, key: str) -> int:
100         """Retrieve single/first value of key as int, error if empty."""
101         val = self.get_int_or_none(key)
102         if val is None:
103             raise BadFormatException(f'unexpected empty value for: {key}')
104         return val
105
106     def get_int_or_none(self, key: str) -> int | None:
107         """Retrieve single/first value of key as int, return None if empty."""
108         val = self.get_str(key, ignore_strict=True)
109         if val == '':
110             return None
111         try:
112             return int(val)
113         except ValueError as e:
114             msg = f'cannot int form field value for key {key}: {val}'
115             raise BadFormatException(msg) from e
116
117     def get_float(self, key: str) -> float:
118         """Retrieve float value of key from self.postvars."""
119         val = self.get_str(key)
120         try:
121             return float(val)
122         except ValueError as e:
123             msg = f'cannot float form field value for key {key}: {val}'
124             raise BadFormatException(msg) from e
125
126     def get_float_or_none(self, key: str) -> float | None:
127         """Retrieve float value of key from self.postvars, None if empty."""
128         val = self.get_str(key)
129         if '' == val:
130             return None
131         try:
132             return float(val)
133         except ValueError as e:
134             msg = f'cannot float form field value for key {key}: {val}'
135             raise BadFormatException(msg) from e
136
137     def get_all_str(self, key: str) -> list[str]:
138         """Retrieve list of string values at key."""
139         if key not in self.inputs.keys():
140             return []
141         return self.inputs[key]
142
143     def get_all_int(self, key: str) -> list[int]:
144         """Retrieve list of int values at key."""
145         all_str = self.get_all_str(key)
146         try:
147             return [int(s) for s in all_str if len(s) > 0]
148         except ValueError as e:
149             msg = f'cannot int a form field value for key {key} in: {all_str}'
150             raise BadFormatException(msg) from e
151
152     def get_all_floats_or_nones(self, key: str) -> list[float | None]:
153         """Retrieve list of float value at key, None if empty strings."""
154         ret: list[float | None] = []
155         for val in self.get_all_str(key):
156             if '' == val:
157                 ret += [None]
158             else:
159                 try:
160                     ret += [float(val)]
161                 except ValueError as e:
162                     msg = f'cannot float form field value for key {key}: {val}'
163                     raise BadFormatException(msg) from e
164         return ret
165
166
167 class TaskHandler(BaseHTTPRequestHandler):
168     """Handles single HTTP request."""
169     # pylint: disable=too-many-public-methods
170     server: TaskServer
171     conn: DatabaseConnection
172     _site: str
173     _form_data: InputsParser
174     _params: InputsParser
175
176     def _send_page(self,
177                    ctx: dict[str, Any],
178                    tmpl_name: str,
179                    code: int = 200
180                    ) -> None:
181         """Send ctx as proper HTTP response."""
182         body = self.server.render(ctx, tmpl_name)
183         self.send_response(code)
184         for header_tuple in self.server.headers:
185             self.send_header(*header_tuple)
186         self.end_headers()
187         self.wfile.write(bytes(body, 'utf-8'))
188
189     @staticmethod
190     def _request_wrapper(http_method: str, not_found_msg: str
191                          ) -> Callable[..., Callable[[TaskHandler], None]]:
192         """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
193
194         Among other things, conditionally cleans all caches, but only on POST
195         requests, as only those are expected to change the states of objects
196         that may be cached, and certainly only those are expected to write any
197         changes to the database. We want to call them as early though as
198         possible here, either exactly after the specific request handler
199         returns successfully, or right after any exception is triggered –
200         otherwise, race conditions become plausible.
201
202         Note that otherwise any POST attempt, even a failed one, may end in
203         problematic inconsistencies:
204
205         - if the POST handler experiences an Exception, changes to objects
206           won't get written to the DB, but the changed objects may remain in
207           the cache and affect other objects despite their possibly illegal
208           state
209
210         - even if an object was just saved to the DB, we cannot be sure its
211           current state is completely identical to what we'd get if loading it
212           fresh from the DB (e.g. currently Process.n_owners is only updated
213           when loaded anew via .from_table_row, nor is its state written to
214           the DB by .save; a questionable design choice, but proof that we
215           have no guarantee that objects' .save stores all their states we'd
216           prefer at their most up-to-date.
217         """
218
219         def clear_caches() -> None:
220             for cls in (Day, Todo, Condition, Process, ProcessStep):
221                 assert hasattr(cls, 'empty_cache')
222                 cls.empty_cache()
223
224         def decorator(f: Callable[..., str | None]
225                       ) -> Callable[[TaskHandler], None]:
226             def wrapper(self: TaskHandler) -> None:
227                 # pylint: disable=protected-access
228                 # (because pylint here fails to detect the use of wrapper as a
229                 # method to self with respective access privileges)
230                 try:
231                     self.conn = DatabaseConnection(self.server.db)
232                     parsed_url = urlparse(self.path)
233                     self._site = path_split(parsed_url.path)[1]
234                     params = parse_qs(parsed_url.query, strict_parsing=True)
235                     self._params = InputsParser(params, False)
236                     handler_name = f'do_{http_method}_{self._site}'
237                     if hasattr(self, handler_name):
238                         handler = getattr(self, handler_name)
239                         redir_target = f(self, handler)
240                         if 'POST' == http_method:
241                             clear_caches()
242                         if redir_target:
243                             self.send_response(302)
244                             self.send_header('Location', redir_target)
245                             self.end_headers()
246                     else:
247                         msg = f'{not_found_msg}: {self._site}'
248                         raise NotFoundException(msg)
249                 except HandledException as error:
250                     if 'POST' == http_method:
251                         clear_caches()
252                     ctx = {'msg': error}
253                     self._send_page(ctx, 'msg', error.http_code)
254                 finally:
255                     self.conn.close()
256             return wrapper
257         return decorator
258
259     @_request_wrapper('GET', 'Unknown page')
260     def do_GET(self, handler: Callable[[], str | dict[str, object]]
261                ) -> str | None:
262         """Render page with result of handler, or redirect if result is str."""
263         tmpl_name = f'{self._site}'
264         ctx_or_redir_target = handler()
265         if isinstance(ctx_or_redir_target, str):
266             return ctx_or_redir_target
267         self._send_page(ctx_or_redir_target, tmpl_name)
268         return None
269
270     @_request_wrapper('POST', 'Unknown POST target')
271     def do_POST(self, handler: Callable[[], str]) -> str:
272         """Handle POST with handler, prepare redirection to result."""
273         length = int(self.headers['content-length'])
274         postvars = parse_qs(self.rfile.read(length).decode(),
275                             keep_blank_values=True, strict_parsing=True)
276         self._form_data = InputsParser(postvars)
277         redir_target = handler()
278         self.conn.commit()
279         return redir_target
280
281     # GET handlers
282
283     @staticmethod
284     def _get_item(target_class: Any
285                   ) -> Callable[..., Callable[[TaskHandler],
286                                               dict[str, object]]]:
287         def decorator(f: Callable[..., dict[str, object]]
288                       ) -> Callable[[TaskHandler], dict[str, object]]:
289             def wrapper(self: TaskHandler) -> dict[str, object]:
290                 # pylint: disable=protected-access
291                 # (because pylint here fails to detect the use of wrapper as a
292                 # method to self with respective access privileges)
293                 id_ = self._params.get_int_or_none('id')
294                 if target_class.can_create_by_id:
295                     item = target_class.by_id_or_create(self.conn, id_)
296                 else:
297                     item = target_class.by_id(self.conn, id_)
298                 return f(self, item)
299             return wrapper
300         return decorator
301
302     def do_GET_(self) -> str:
303         """Return redirect target on GET /."""
304         return '/day'
305
306     def _do_GET_calendar(self) -> dict[str, object]:
307         """Show Days from ?start= to ?end=.
308
309         Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
310         same, the only difference being the HTML template they are rendered to,
311         which .do_GET selects from their method name.
312         """
313         start, end = self._params.get_str('start'), self._params.get_str('end')
314         end = end if end else date_in_n_days(366)
315         days, start, end = Day.by_date_range_with_limits(self.conn,
316                                                          (start, end), 'id')
317         days = Day.with_filled_gaps(days, start, end)
318         today = date_in_n_days(0)
319         return {'start': start, 'end': end, 'days': days, 'today': today}
320
321     def do_GET_calendar(self) -> dict[str, object]:
322         """Show Days from ?start= to ?end= – normal view."""
323         return self._do_GET_calendar()
324
325     def do_GET_calendar_txt(self) -> dict[str, object]:
326         """Show Days from ?start= to ?end= – minimalist view."""
327         return self._do_GET_calendar()
328
329     def do_GET_day(self) -> dict[str, object]:
330         """Show single Day of ?date=."""
331         date = self._params.get_str('date', date_in_n_days(0))
332         day = Day.by_id_or_create(self.conn, date)
333         make_type = self._params.get_str('make_type')
334         conditions_present = []
335         enablers_for = {}
336         disablers_for = {}
337         for todo in day.todos:
338             for condition in todo.conditions + todo.blockers:
339                 if condition not in conditions_present:
340                     conditions_present += [condition]
341                     enablers_for[condition.id_] = [p for p in
342                                                    Process.all(self.conn)
343                                                    if condition in p.enables]
344                     disablers_for[condition.id_] = [p for p in
345                                                     Process.all(self.conn)
346                                                     if condition in p.disables]
347         seen_todos: set[int] = set()
348         top_nodes = [t.get_step_tree(seen_todos)
349                      for t in day.todos if not t.parents]
350         return {'day': day,
351                 'top_nodes': top_nodes,
352                 'make_type': make_type,
353                 'enablers_for': enablers_for,
354                 'disablers_for': disablers_for,
355                 'conditions_present': conditions_present,
356                 'processes': Process.all(self.conn)}
357
358     @_get_item(Todo)
359     def do_GET_todo(self, todo: Todo) -> dict[str, object]:
360         """Show single Todo of ?id=."""
361
362         @dataclass
363         class TodoStepsNode:
364             """Collect what's useful for Todo steps tree display."""
365             id_: int
366             todo: Todo | None
367             process: Process | None
368             children: list[TodoStepsNode]  # pylint: disable=undefined-variable
369             fillable: bool = False
370
371         def walk_process_steps(id_: int,
372                                process_step_nodes: list[ProcessStepsNode],
373                                steps_nodes: list[TodoStepsNode]) -> None:
374             for process_step_node in process_step_nodes:
375                 id_ += 1
376                 node = TodoStepsNode(id_, None, process_step_node.process, [])
377                 steps_nodes += [node]
378                 walk_process_steps(id_, list(process_step_node.steps.values()),
379                                    node.children)
380
381         def walk_todo_steps(id_: int, todos: list[Todo],
382                             steps_nodes: list[TodoStepsNode]) -> None:
383             for todo in todos:
384                 matched = False
385                 for match in [item for item in steps_nodes
386                               if item.process
387                               and item.process == todo.process]:
388                     match.todo = todo
389                     matched = True
390                     for child in match.children:
391                         child.fillable = True
392                     walk_todo_steps(id_, todo.children, match.children)
393                 if not matched:
394                     id_ += 1
395                     node = TodoStepsNode(id_, todo, None, [])
396                     steps_nodes += [node]
397                     walk_todo_steps(id_, todo.children, node.children)
398
399         def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
400                                     ) -> set[int]:
401             ids = set()
402             for node in steps_nodes:
403                 if not node.todo:
404                     assert isinstance(node.process, Process)
405                     assert isinstance(node.process.id_, int)
406                     ids.add(node.process.id_)
407                 ids = ids | collect_adoptables_keys(node.children)
408             return ids
409
410         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
411         process_tree = todo.process.get_steps(self.conn, None)
412         steps_todo_to_process: list[TodoStepsNode] = []
413         walk_process_steps(0, list(process_tree.values()),
414                            steps_todo_to_process)
415         for steps_node in steps_todo_to_process:
416             steps_node.fillable = True
417         walk_todo_steps(len(steps_todo_to_process), todo_steps,
418                         steps_todo_to_process)
419         adoptables: dict[int, list[Todo]] = {}
420         any_adoptables = [Todo.by_id(self.conn, t.id_)
421                           for t in Todo.by_date(self.conn, todo.date)
422                           if t.id_ is not None
423                           and t != todo]
424         for id_ in collect_adoptables_keys(steps_todo_to_process):
425             adoptables[id_] = [t for t in any_adoptables
426                                if t.process.id_ == id_]
427         return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
428                 'adoption_candidates_for': adoptables,
429                 'process_candidates': Process.all(self.conn),
430                 'todo_candidates': any_adoptables,
431                 'condition_candidates': Condition.all(self.conn)}
432
433     def do_GET_todos(self) -> dict[str, object]:
434         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
435         sort_by = self._params.get_str('sort_by')
436         start = self._params.get_str('start')
437         end = self._params.get_str('end')
438         process_id = self._params.get_int_or_none('process_id')
439         comment_pattern = self._params.get_str('comment_pattern')
440         todos = []
441         ret = Todo.by_date_range_with_limits(self.conn, (start, end))
442         todos_by_date_range, start, end = ret
443         todos = [t for t in todos_by_date_range
444                  if comment_pattern in t.comment
445                  and ((not process_id) or t.process.id_ == process_id)]
446         sort_by = Todo.sort_by(todos, sort_by)
447         return {'start': start, 'end': end, 'process_id': process_id,
448                 'comment_pattern': comment_pattern, 'todos': todos,
449                 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
450
451     def do_GET_conditions(self) -> dict[str, object]:
452         """Show all Conditions."""
453         pattern = self._params.get_str('pattern')
454         sort_by = self._params.get_str('sort_by')
455         conditions = Condition.matching(self.conn, pattern)
456         sort_by = Condition.sort_by(conditions, sort_by)
457         return {'conditions': conditions,
458                 'sort_by': sort_by,
459                 'pattern': pattern}
460
461     @_get_item(Condition)
462     def do_GET_condition(self, c: Condition) -> dict[str, object]:
463         """Show Condition of ?id=."""
464         ps = Process.all(self.conn)
465         return {'condition': c, 'is_new': c.id_ is None,
466                 'enabled_processes': [p for p in ps if c in p.conditions],
467                 'disabled_processes': [p for p in ps if c in p.blockers],
468                 'enabling_processes': [p for p in ps if c in p.enables],
469                 'disabling_processes': [p for p in ps if c in p.disables]}
470
471     @_get_item(Condition)
472     def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
473         """Show title history of Condition of ?id=."""
474         return {'condition': c}
475
476     @_get_item(Condition)
477     def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
478         """Show description historys of Condition of ?id=."""
479         return {'condition': c}
480
481     @_get_item(Process)
482     def do_GET_process(self, process: Process) -> dict[str, object]:
483         """Show Process of ?id=."""
484         owner_ids = self._params.get_all_int('step_to')
485         owned_ids = self._params.get_all_int('has_step')
486         title_64 = self._params.get_str('title_b64')
487         if title_64:
488             try:
489                 title = b64decode(title_64.encode()).decode()
490             except binascii_Exception as exc:
491                 msg = 'invalid base64 for ?title_b64='
492                 raise BadFormatException(msg) from exc
493             process.title.set(title)
494         preset_top_step = None
495         owners = process.used_as_step_by(self.conn)
496         for step_id in owner_ids:
497             owners += [Process.by_id(self.conn, step_id)]
498         for process_id in owned_ids:
499             Process.by_id(self.conn, process_id)  # to ensure ID exists
500             preset_top_step = process_id
501         return {'process': process, 'is_new': process.id_ is None,
502                 'preset_top_step': preset_top_step,
503                 'steps': process.get_steps(self.conn), 'owners': owners,
504                 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
505                 'process_candidates': Process.all(self.conn),
506                 'condition_candidates': Condition.all(self.conn)}
507
508     @_get_item(Process)
509     def do_GET_process_titles(self, p: Process) -> dict[str, object]:
510         """Show title history of Process of ?id=."""
511         return {'process': p}
512
513     @_get_item(Process)
514     def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
515         """Show description historys of Process of ?id=."""
516         return {'process': p}
517
518     @_get_item(Process)
519     def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
520         """Show default effort history of Process of ?id=."""
521         return {'process': p}
522
523     def do_GET_processes(self) -> dict[str, object]:
524         """Show all Processes."""
525         pattern = self._params.get_str('pattern')
526         sort_by = self._params.get_str('sort_by')
527         processes = Process.matching(self.conn, pattern)
528         sort_by = Process.sort_by(processes, sort_by)
529         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
530
531     # POST handlers
532
533     @staticmethod
534     def _delete_or_post(target_class: Any, redir_target: str = '/'
535                         ) -> Callable[..., Callable[[TaskHandler], str]]:
536         def decorator(f: Callable[..., str]
537                       ) -> Callable[[TaskHandler], str]:
538             def wrapper(self: TaskHandler) -> str:
539                 # pylint: disable=protected-access
540                 # (because pylint here fails to detect the use of wrapper as a
541                 # method to self with respective access privileges)
542                 id_ = self._params.get_int_or_none('id')
543                 for _ in self._form_data.get_all_str('delete'):
544                     if id_ is None:
545                         msg = 'trying to delete non-saved ' +\
546                                 f'{target_class.__name__}'
547                         raise NotFoundException(msg)
548                     item = target_class.by_id(self.conn, id_)
549                     item.remove(self.conn)
550                     return redir_target
551                 if target_class.can_create_by_id:
552                     item = target_class.by_id_or_create(self.conn, id_)
553                 else:
554                     item = target_class.by_id(self.conn, id_)
555                 return f(self, item)
556             return wrapper
557         return decorator
558
559     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
560         """Update history timestamps for VersionedAttribute."""
561         id_ = self._params.get_int_or_none('id')
562         item = cls.by_id(self.conn, id_)
563         attr = getattr(item, attr_name)
564         for k, v in self._form_data.get_first_strings_starting('at:').items():
565             old = k[3:]
566             if old[19:] != v:
567                 attr.reset_timestamp(old, f'{v}.0')
568         attr.save(self.conn)
569         return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
570
571     def do_POST_day(self) -> str:
572         """Update or insert Day of date and Todos mapped to it."""
573         # pylint: disable=too-many-locals
574         try:
575             date = self._params.get_str('date')
576             day_comment = self._form_data.get_str('day_comment')
577             make_type = self._form_data.get_str('make_type')
578         except NotFoundException as e:
579             raise BadFormatException(e)
580         old_todos = self._form_data.get_all_int('todo_id')
581         new_todos = self._form_data.get_all_int('new_todo')
582         comments = self._form_data.get_all_str('comment')
583         efforts = self._form_data.get_all_floats_or_nones('effort')
584         done_todos = self._form_data.get_all_int('done')
585         for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
586             raise BadFormatException('"done" field refers to unknown Todo')
587         is_done = [t_id in done_todos for t_id in old_todos]
588         if not (len(old_todos) == len(is_done) == len(comments)
589                 == len(efforts)):
590             msg = 'not equal number each of number of todo_id, comments, ' +\
591                     'and efforts inputs'
592             raise BadFormatException(msg)
593         day = Day.by_id_or_create(self.conn, date)
594         day.comment = day_comment
595         day.save(self.conn)
596         for process_id in sorted(new_todos):
597             if 'empty' == make_type:
598                 process = Process.by_id(self.conn, process_id)
599                 todo = Todo(None, process, False, date)
600                 todo.save(self.conn)
601             else:
602                 Todo.create_with_children(self.conn, process_id, date)
603         for i, todo_id in enumerate(old_todos):
604             todo = Todo.by_id(self.conn, todo_id)
605             todo.is_done = is_done[i]
606             todo.comment = comments[i]
607             todo.effort = efforts[i]
608             todo.save(self.conn)
609         return f'/day?date={date}&make_type={make_type}'
610
611     @_delete_or_post(Todo, '/')
612     def do_POST_todo(self, todo: Todo) -> str:
613         """Update Todo and its children."""
614         # pylint: disable=too-many-locals
615         # pylint: disable=too-many-branches
616         # pylint: disable=too-many-branches
617         adopted_child_ids = self._form_data.get_all_int('adopt')
618         processes_to_make_full = self._form_data.get_all_int('make_full')
619         processes_to_make_empty = self._form_data.get_all_int('make_empty')
620         fill_fors = self._form_data.get_first_strings_starting('fill_for_')
621         with_effort_post = True
622         try:
623             effort = self._form_data.get_float_or_none('effort')
624         except NotFoundException:
625             with_effort_post = False
626         conditions = self._form_data.get_all_int('conditions')
627         disables = self._form_data.get_all_int('disables')
628         blockers = self._form_data.get_all_int('blockers')
629         enables = self._form_data.get_all_int('enables')
630         is_done = len(self._form_data.get_all_str('done')) > 0
631         calendarize = len(self._form_data.get_all_str('calendarize')) > 0
632         comment = self._form_data.get_str('comment', ignore_strict=True)
633         for v in fill_fors.values():
634             if v.startswith('make_empty_'):
635                 processes_to_make_empty += [int(v[11:])]
636             elif v.startswith('make_full_'):
637                 processes_to_make_full += [int(v[10:])]
638             elif v != 'ignore':
639                 adopted_child_ids += [int(v)]
640         to_remove = []
641         for child in todo.children:
642             assert isinstance(child.id_, int)
643             if child.id_ not in adopted_child_ids:
644                 to_remove += [child.id_]
645         for id_ in to_remove:
646             child = Todo.by_id(self.conn, id_)
647             todo.remove_child(child)
648         for child_id in adopted_child_ids:
649             if child_id in [c.id_ for c in todo.children]:
650                 continue
651             child = Todo.by_id(self.conn, child_id)
652             todo.add_child(child)
653         for process_id in processes_to_make_empty:
654             process = Process.by_id(self.conn, process_id)
655             made = Todo(None, process, False, todo.date)
656             made.save(self.conn)
657             todo.add_child(made)
658         for process_id in processes_to_make_full:
659             made = Todo.create_with_children(self.conn, process_id, todo.date)
660             todo.add_child(made)
661         if with_effort_post:
662             todo.effort = effort
663         todo.set_conditions(self.conn, conditions)
664         todo.set_blockers(self.conn, blockers)
665         todo.set_enables(self.conn, enables)
666         todo.set_disables(self.conn, disables)
667         todo.is_done = is_done
668         todo.calendarize = calendarize
669         todo.comment = comment
670         todo.save(self.conn)
671         return f'/todo?id={todo.id_}'
672
673     def do_POST_process_descriptions(self) -> str:
674         """Update history timestamps for Process.description."""
675         return self._change_versioned_timestamps(Process, 'description')
676
677     def do_POST_process_efforts(self) -> str:
678         """Update history timestamps for Process.effort."""
679         return self._change_versioned_timestamps(Process, 'effort')
680
681     def do_POST_process_titles(self) -> str:
682         """Update history timestamps for Process.title."""
683         return self._change_versioned_timestamps(Process, 'title')
684
685     @_delete_or_post(Process, '/processes')
686     def do_POST_process(self, process: Process) -> str:
687         """Update or insert Process of ?id= and fields defined in postvars."""
688         # pylint: disable=too-many-locals
689         # pylint: disable=too-many-statements
690         try:
691             title = self._form_data.get_str('title')
692             description = self._form_data.get_str('description')
693             effort = self._form_data.get_float('effort')
694         except NotFoundException as e:
695             raise BadFormatException from e
696         conditions = self._form_data.get_all_int('conditions')
697         blockers = self._form_data.get_all_int('blockers')
698         enables = self._form_data.get_all_int('enables')
699         disables = self._form_data.get_all_int('disables')
700         calendarize = self._form_data.get_all_str('calendarize') != []
701         suppresses = self._form_data.get_all_int('suppresses')
702         step_of = self._form_data.get_all_str('step_of')
703         keep_steps = self._form_data.get_all_int('keep_step')
704         step_ids = self._form_data.get_all_int('steps')
705         new_top_steps = self._form_data.get_all_str('new_top_step')
706         step_process_id_to = {}
707         step_parent_id_to = {}
708         new_steps_to = {}
709         for step_id in step_ids:
710             name = f'new_step_to_{step_id}'
711             new_steps_to[step_id] = self._form_data.get_all_int(name)
712         for step_id in keep_steps:
713             name = f'step_{step_id}_process_id'
714             step_process_id_to[step_id] = self._form_data.get_int(name)
715             name = f'step_{step_id}_parent_id'
716             step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
717         process.title.set(title)
718         process.description.set(description)
719         process.effort.set(effort)
720         process.set_conditions(self.conn, conditions)
721         process.set_blockers(self.conn, blockers)
722         process.set_enables(self.conn, enables)
723         process.set_disables(self.conn, disables)
724         process.calendarize = calendarize
725         process.save(self.conn)
726         assert isinstance(process.id_, int)
727         new_step_title = None
728         steps: list[ProcessStep] = []
729         for step_id in keep_steps:
730             if step_id not in step_ids:
731                 raise BadFormatException('trying to keep unknown step')
732             step = ProcessStep(step_id, process.id_,
733                                step_process_id_to[step_id],
734                                step_parent_id_to[step_id])
735             steps += [step]
736         for step_id in step_ids:
737             new = [ProcessStep(None, process.id_, step_process_id, step_id)
738                    for step_process_id in new_steps_to[step_id]]
739             steps += new
740         for step_identifier in new_top_steps:
741             try:
742                 step_process_id = int(step_identifier)
743                 step = ProcessStep(None, process.id_, step_process_id, None)
744                 steps += [step]
745             except ValueError:
746                 new_step_title = step_identifier
747         process.set_steps(self.conn, steps)
748         process.set_step_suppressions(self.conn, suppresses)
749         owners_to_set = []
750         new_owner_title = None
751         for owner_identifier in step_of:
752             try:
753                 owners_to_set += [int(owner_identifier)]
754             except ValueError:
755                 new_owner_title = owner_identifier
756         process.set_owners(self.conn, owners_to_set)
757         params = f'id={process.id_}'
758         if new_step_title:
759             title_b64_encoded = b64encode(new_step_title.encode()).decode()
760             params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
761         elif new_owner_title:
762             title_b64_encoded = b64encode(new_owner_title.encode()).decode()
763             params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
764         process.save(self.conn)
765         return f'/process?{params}'
766
767     def do_POST_condition_descriptions(self) -> str:
768         """Update history timestamps for Condition.description."""
769         return self._change_versioned_timestamps(Condition, 'description')
770
771     def do_POST_condition_titles(self) -> str:
772         """Update history timestamps for Condition.title."""
773         return self._change_versioned_timestamps(Condition, 'title')
774
775     @_delete_or_post(Condition, '/conditions')
776     def do_POST_condition(self, condition: Condition) -> str:
777         """Update/insert Condition of ?id= and fields defined in postvars."""
778         try:
779             is_active = self._form_data.get_str('is_active') == 'True'
780             title = self._form_data.get_str('title')
781             description = self._form_data.get_str('description')
782         except NotFoundException as e:
783             raise BadFormatException(e) from e
784         condition.is_active = is_active
785         condition.title.set(title)
786         condition.description.set(description)
787         condition.save(self.conn)
788         return f'/condition?id={condition.id_}'