home · contact · privacy
Extend and refactor tests.
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from binascii import Error as binascii_Exception
7 from http.server import BaseHTTPRequestHandler
8 from http.server import HTTPServer
9 from urllib.parse import urlparse, parse_qs
10 from json import dumps as json_dumps
11 from os.path import split as path_split
12 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
13 from plomtask.dating import date_in_n_days
14 from plomtask.days import Day
15 from plomtask.exceptions import (HandledException, BadFormatException,
16                                  NotFoundException)
17 from plomtask.db import DatabaseConnection, DatabaseFile
18 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
19 from plomtask.conditions import Condition
20 from plomtask.todos import Todo
21
22 TEMPLATES_DIR = 'templates'
23
24
25 class TaskServer(HTTPServer):
26     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
27
28     def __init__(self, db_file: DatabaseFile,
29                  *args: Any, **kwargs: Any) -> None:
30         super().__init__(*args, **kwargs)
31         self.db = db_file
32         self.headers: list[tuple[str, str]] = []
33         self._render_mode = 'html'
34         self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
35
36     def set_json_mode(self) -> None:
37         """Make server send JSON instead of HTML responses."""
38         self._render_mode = 'json'
39         self.headers += [('Content-Type', 'application/json')]
40
41     @staticmethod
42     def ctx_to_json(ctx: dict[str, object]) -> str:
43         """Render ctx into JSON string."""
44         def walk_ctx(node: object) -> Any:
45             if hasattr(node, 'as_dict_into_reference'):
46                 if hasattr(node, 'id_') and node.id_ is not None:
47                     return node.as_dict_into_reference(library)
48             if hasattr(node, 'as_dict'):
49                 return node.as_dict
50             if isinstance(node, (list, tuple)):
51                 return [walk_ctx(x) for x in node]
52             if isinstance(node, dict):
53                 d = {}
54                 for k, v in node.items():
55                     d[k] = walk_ctx(v)
56                 return d
57             if isinstance(node, HandledException):
58                 return str(node)
59             return node
60         library: dict[str, dict[str, object] | dict[int, object]] = {}
61         for k, v in ctx.items():
62             ctx[k] = walk_ctx(v)
63         ctx['_library'] = library
64         return json_dumps(ctx)
65
66     def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
67         """Render ctx according to self._render_mode.."""
68         tmpl_name = f'{tmpl_name}.{self._render_mode}'
69         if 'html' == self._render_mode:
70             template = self._jinja.get_template(tmpl_name)
71             return template.render(ctx)
72         return self.__class__.ctx_to_json(ctx)
73
74
75 class InputsParser:
76     """Wrapper for validating and retrieving dict-like HTTP inputs."""
77
78     def __init__(self, dict_: dict[str, list[str]],
79                  strictness: bool = True) -> None:
80         self.inputs = dict_
81         self.strict = strictness  # return None on absence of key, or fail?
82
83     def get_str(self, key: str, default: str = '',
84                 ignore_strict: bool = False) -> str:
85         """Retrieve single/first string value of key, or default."""
86         if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
87             if self.strict and not ignore_strict:
88                 raise NotFoundException(f'no value found for key {key}')
89             return default
90         return self.inputs[key][0]
91
92     def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
93         """Retrieve dict of (first) strings at key starting with prefix."""
94         ret = {}
95         for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
96             ret[key] = self.inputs[key][0]
97         return ret
98
99     def get_int(self, key: str) -> int:
100         """Retrieve single/first value of key as int, error if empty."""
101         val = self.get_int_or_none(key)
102         if val is None:
103             raise BadFormatException(f'unexpected empty value for: {key}')
104         return val
105
106     def get_int_or_none(self, key: str) -> int | None:
107         """Retrieve single/first value of key as int, return None if empty."""
108         val = self.get_str(key, ignore_strict=True)
109         if val == '':
110             return None
111         try:
112             return int(val)
113         except ValueError as e:
114             msg = f'cannot int form field value for key {key}: {val}'
115             raise BadFormatException(msg) from e
116
117     def get_float(self, key: str) -> float:
118         """Retrieve float value of key from self.postvars."""
119         val = self.get_str(key)
120         try:
121             return float(val)
122         except ValueError as e:
123             msg = f'cannot float form field value for key {key}: {val}'
124             raise BadFormatException(msg) from e
125
126     def get_float_or_none(self, key: str) -> float | None:
127         """Retrieve float value of key from self.postvars, None if empty."""
128         val = self.get_str(key)
129         if '' == val:
130             return None
131         try:
132             return float(val)
133         except ValueError as e:
134             msg = f'cannot float form field value for key {key}: {val}'
135             raise BadFormatException(msg) from e
136
137     def get_all_str(self, key: str) -> list[str]:
138         """Retrieve list of string values at key."""
139         if key not in self.inputs.keys():
140             return []
141         return self.inputs[key]
142
143     def get_all_int(self, key: str) -> list[int]:
144         """Retrieve list of int values at key."""
145         all_str = self.get_all_str(key)
146         try:
147             return [int(s) for s in all_str if len(s) > 0]
148         except ValueError as e:
149             msg = f'cannot int a form field value for key {key} in: {all_str}'
150             raise BadFormatException(msg) from e
151
152     def get_all_floats_or_nones(self, key: str) -> list[float | None]:
153         """Retrieve list of float value at key, None if empty strings."""
154         ret: list[float | None] = []
155         for val in self.get_all_str(key):
156             if '' == val:
157                 ret += [None]
158             else:
159                 try:
160                     ret += [float(val)]
161                 except ValueError as e:
162                     msg = f'cannot float form field value for key {key}: {val}'
163                     raise BadFormatException(msg) from e
164         return ret
165
166
167 class TaskHandler(BaseHTTPRequestHandler):
168     """Handles single HTTP request."""
169     # pylint: disable=too-many-public-methods
170     server: TaskServer
171     conn: DatabaseConnection
172     _site: str
173     _form_data: InputsParser
174     _params: InputsParser
175
176     def _send_page(self,
177                    ctx: dict[str, Any],
178                    tmpl_name: str,
179                    code: int = 200
180                    ) -> None:
181         """Send ctx as proper HTTP response."""
182         body = self.server.render(ctx, tmpl_name)
183         self.send_response(code)
184         for header_tuple in self.server.headers:
185             self.send_header(*header_tuple)
186         self.end_headers()
187         self.wfile.write(bytes(body, 'utf-8'))
188
189     @staticmethod
190     def _request_wrapper(http_method: str, not_found_msg: str
191                          ) -> Callable[..., Callable[[TaskHandler], None]]:
192         """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
193
194         Among other things, conditionally cleans all caches, but only on POST
195         requests, as only those are expected to change the states of objects
196         that may be cached, and certainly only those are expected to write any
197         changes to the database. We want to call them as early though as
198         possible here, either exactly after the specific request handler
199         returns successfully, or right after any exception is triggered –
200         otherwise, race conditions become plausible.
201
202         Note that otherwise any POST attempt, even a failed one, may end in
203         problematic inconsistencies:
204
205         - if the POST handler experiences an Exception, changes to objects
206           won't get written to the DB, but the changed objects may remain in
207           the cache and affect other objects despite their possibly illegal
208           state
209
210         - even if an object was just saved to the DB, we cannot be sure its
211           current state is completely identical to what we'd get if loading it
212           fresh from the DB (e.g. currently Process.n_owners is only updated
213           when loaded anew via .from_table_row, nor is its state written to
214           the DB by .save; a questionable design choice, but proof that we
215           have no guarantee that objects' .save stores all their states we'd
216           prefer at their most up-to-date.
217         """
218
219         def clear_caches() -> None:
220             for cls in (Day, Todo, Condition, Process, ProcessStep):
221                 assert hasattr(cls, 'empty_cache')
222                 cls.empty_cache()
223
224         def decorator(f: Callable[..., str | None]
225                       ) -> Callable[[TaskHandler], None]:
226             def wrapper(self: TaskHandler) -> None:
227                 # pylint: disable=protected-access
228                 # (because pylint here fails to detect the use of wrapper as a
229                 # method to self with respective access privileges)
230                 try:
231                     self.conn = DatabaseConnection(self.server.db)
232                     parsed_url = urlparse(self.path)
233                     self._site = path_split(parsed_url.path)[1]
234                     params = parse_qs(parsed_url.query, strict_parsing=True)
235                     self._params = InputsParser(params, False)
236                     handler_name = f'do_{http_method}_{self._site}'
237                     if hasattr(self, handler_name):
238                         handler = getattr(self, handler_name)
239                         redir_target = f(self, handler)
240                         if 'POST' == http_method:
241                             clear_caches()
242                         if redir_target:
243                             self.send_response(302)
244                             self.send_header('Location', redir_target)
245                             self.end_headers()
246                     else:
247                         msg = f'{not_found_msg}: {self._site}'
248                         raise NotFoundException(msg)
249                 except HandledException as error:
250                     if 'POST' == http_method:
251                         clear_caches()
252                     ctx = {'msg': error}
253                     self._send_page(ctx, 'msg', error.http_code)
254                 finally:
255                     self.conn.close()
256             return wrapper
257         return decorator
258
259     @_request_wrapper('GET', 'Unknown page')
260     def do_GET(self, handler: Callable[[], str | dict[str, object]]
261                ) -> str | None:
262         """Render page with result of handler, or redirect if result is str."""
263         tmpl_name = f'{self._site}'
264         ctx_or_redir_target = handler()
265         if isinstance(ctx_or_redir_target, str):
266             return ctx_or_redir_target
267         self._send_page(ctx_or_redir_target, tmpl_name)
268         return None
269
270     @_request_wrapper('POST', 'Unknown POST target')
271     def do_POST(self, handler: Callable[[], str]) -> str:
272         """Handle POST with handler, prepare redirection to result."""
273         length = int(self.headers['content-length'])
274         postvars = parse_qs(self.rfile.read(length).decode(),
275                             keep_blank_values=True, strict_parsing=True)
276         self._form_data = InputsParser(postvars)
277         redir_target = handler()
278         self.conn.commit()
279         return redir_target
280
281     # GET handlers
282
283     @staticmethod
284     def _get_item(target_class: Any
285                   ) -> Callable[..., Callable[[TaskHandler],
286                                               dict[str, object]]]:
287         def decorator(f: Callable[..., dict[str, object]]
288                       ) -> Callable[[TaskHandler], dict[str, object]]:
289             def wrapper(self: TaskHandler) -> dict[str, object]:
290                 # pylint: disable=protected-access
291                 # (because pylint here fails to detect the use of wrapper as a
292                 # method to self with respective access privileges)
293                 id_ = self._params.get_int_or_none('id')
294                 if target_class.can_create_by_id:
295                     item = target_class.by_id_or_create(self.conn, id_)
296                 else:
297                     item = target_class.by_id(self.conn, id_)
298                 return f(self, item)
299             return wrapper
300         return decorator
301
302     def do_GET_(self) -> str:
303         """Return redirect target on GET /."""
304         return '/day'
305
306     def _do_GET_calendar(self) -> dict[str, object]:
307         """Show Days from ?start= to ?end=.
308
309         Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
310         same, the only difference being the HTML template they are rendered to,
311         which .do_GET selects from their method name.
312         """
313         start, end = self._params.get_str('start'), self._params.get_str('end')
314         end = end if end else date_in_n_days(366)
315         days, start, end = Day.by_date_range_with_limits(self.conn,
316                                                          (start, end), 'id')
317         days = Day.with_filled_gaps(days, start, end)
318         today = date_in_n_days(0)
319         return {'start': start, 'end': end, 'days': days, 'today': today}
320
321     def do_GET_calendar(self) -> dict[str, object]:
322         """Show Days from ?start= to ?end= – normal view."""
323         return self._do_GET_calendar()
324
325     def do_GET_calendar_txt(self) -> dict[str, object]:
326         """Show Days from ?start= to ?end= – minimalist view."""
327         return self._do_GET_calendar()
328
329     def do_GET_day(self) -> dict[str, object]:
330         """Show single Day of ?date=."""
331         date = self._params.get_str('date', date_in_n_days(0))
332         day = Day.by_id_or_create(self.conn, date)
333         make_type = self._params.get_str('make_type')
334         conditions_present = []
335         enablers_for = {}
336         disablers_for = {}
337         for todo in day.todos:
338             for condition in todo.conditions + todo.blockers:
339                 if condition not in conditions_present:
340                     conditions_present += [condition]
341                     enablers_for[condition.id_] = [p for p in
342                                                    Process.all(self.conn)
343                                                    if condition in p.enables]
344                     disablers_for[condition.id_] = [p for p in
345                                                     Process.all(self.conn)
346                                                     if condition in p.disables]
347         seen_todos: set[int] = set()
348         top_nodes = [t.get_step_tree(seen_todos)
349                      for t in day.todos if not t.parents]
350         return {'day': day,
351                 'top_nodes': top_nodes,
352                 'make_type': make_type,
353                 'enablers_for': enablers_for,
354                 'disablers_for': disablers_for,
355                 'conditions_present': conditions_present,
356                 'processes': Process.all(self.conn)}
357
358     @_get_item(Todo)
359     def do_GET_todo(self, todo: Todo) -> dict[str, object]:
360         """Show single Todo of ?id=."""
361
362         @dataclass
363         class TodoStepsNode:
364             """Collect what's useful for Todo steps tree display."""
365             id_: int
366             todo: Todo | None
367             process: Process | None
368             children: list[TodoStepsNode]  # pylint: disable=undefined-variable
369             fillable: bool = False
370
371         def walk_process_steps(id_: int,
372                                process_step_nodes: list[ProcessStepsNode],
373                                steps_nodes: list[TodoStepsNode]) -> None:
374             for process_step_node in process_step_nodes:
375                 id_ += 1
376                 node = TodoStepsNode(id_, None, process_step_node.process, [])
377                 steps_nodes += [node]
378                 walk_process_steps(id_, list(process_step_node.steps.values()),
379                                    node.children)
380
381         def walk_todo_steps(id_: int, todos: list[Todo],
382                             steps_nodes: list[TodoStepsNode]) -> None:
383             for todo in todos:
384                 matched = False
385                 for match in [item for item in steps_nodes
386                               if item.process
387                               and item.process == todo.process]:
388                     match.todo = todo
389                     matched = True
390                     for child in match.children:
391                         child.fillable = True
392                     walk_todo_steps(id_, todo.children, match.children)
393                 if not matched:
394                     id_ += 1
395                     node = TodoStepsNode(id_, todo, None, [])
396                     steps_nodes += [node]
397                     walk_todo_steps(id_, todo.children, node.children)
398
399         def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
400                                     ) -> set[int]:
401             ids = set()
402             for node in steps_nodes:
403                 if not node.todo:
404                     assert isinstance(node.process, Process)
405                     assert isinstance(node.process.id_, int)
406                     ids.add(node.process.id_)
407                 ids = ids | collect_adoptables_keys(node.children)
408             return ids
409
410         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
411         process_tree = todo.process.get_steps(self.conn, None)
412         steps_todo_to_process: list[TodoStepsNode] = []
413         walk_process_steps(0, list(process_tree.values()),
414                            steps_todo_to_process)
415         for steps_node in steps_todo_to_process:
416             steps_node.fillable = True
417         walk_todo_steps(len(steps_todo_to_process), todo_steps,
418                         steps_todo_to_process)
419         adoptables: dict[int, list[Todo]] = {}
420         any_adoptables = [Todo.by_id(self.conn, t.id_)
421                           for t in Todo.by_date(self.conn, todo.date)
422                           if t.id_ is not None
423                           and t != todo]
424         for id_ in collect_adoptables_keys(steps_todo_to_process):
425             adoptables[id_] = [t for t in any_adoptables
426                                if t.process.id_ == id_]
427         return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
428                 'adoption_candidates_for': adoptables,
429                 'process_candidates': Process.all(self.conn),
430                 'todo_candidates': any_adoptables,
431                 'condition_candidates': Condition.all(self.conn)}
432
433     def do_GET_todos(self) -> dict[str, object]:
434         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
435         sort_by = self._params.get_str('sort_by')
436         start = self._params.get_str('start')
437         end = self._params.get_str('end')
438         process_id = self._params.get_int_or_none('process_id')
439         comment_pattern = self._params.get_str('comment_pattern')
440         todos = []
441         ret = Todo.by_date_range_with_limits(self.conn, (start, end))
442         todos_by_date_range, start, end = ret
443         todos = [t for t in todos_by_date_range
444                  if comment_pattern in t.comment
445                  and ((not process_id) or t.process.id_ == process_id)]
446         sort_by = Todo.sort_by(todos, sort_by)
447         return {'start': start, 'end': end, 'process_id': process_id,
448                 'comment_pattern': comment_pattern, 'todos': todos,
449                 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
450
451     def do_GET_conditions(self) -> dict[str, object]:
452         """Show all Conditions."""
453         pattern = self._params.get_str('pattern')
454         sort_by = self._params.get_str('sort_by')
455         conditions = Condition.matching(self.conn, pattern)
456         sort_by = Condition.sort_by(conditions, sort_by)
457         return {'conditions': conditions,
458                 'sort_by': sort_by,
459                 'pattern': pattern}
460
461     @_get_item(Condition)
462     def do_GET_condition(self, c: Condition) -> dict[str, object]:
463         """Show Condition of ?id=."""
464         ps = Process.all(self.conn)
465         return {'condition': c, 'is_new': c.id_ is None,
466                 'enabled_processes': [p for p in ps if c in p.conditions],
467                 'disabled_processes': [p for p in ps if c in p.blockers],
468                 'enabling_processes': [p for p in ps if c in p.enables],
469                 'disabling_processes': [p for p in ps if c in p.disables]}
470
471     @_get_item(Condition)
472     def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
473         """Show title history of Condition of ?id=."""
474         return {'condition': c}
475
476     @_get_item(Condition)
477     def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
478         """Show description historys of Condition of ?id=."""
479         return {'condition': c}
480
481     @_get_item(Process)
482     def do_GET_process(self, process: Process) -> dict[str, object]:
483         """Show Process of ?id=."""
484         owner_ids = self._params.get_all_int('step_to')
485         owned_ids = self._params.get_all_int('has_step')
486         title_64 = self._params.get_str('title_b64')
487         if title_64:
488             try:
489                 title = b64decode(title_64.encode()).decode()
490             except binascii_Exception as exc:
491                 msg = 'invalid base64 for ?title_b64='
492                 raise BadFormatException(msg) from exc
493             process.title.set(title)
494         preset_top_step = None
495         owners = process.used_as_step_by(self.conn)
496         for step_id in owner_ids:
497             owners += [Process.by_id(self.conn, step_id)]
498         for process_id in owned_ids:
499             Process.by_id(self.conn, process_id)  # to ensure ID exists
500             preset_top_step = process_id
501         return {'process': process, 'is_new': process.id_ is None,
502                 'preset_top_step': preset_top_step,
503                 'steps': process.get_steps(self.conn), 'owners': owners,
504                 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
505                 'process_candidates': Process.all(self.conn),
506                 'condition_candidates': Condition.all(self.conn)}
507
508     @_get_item(Process)
509     def do_GET_process_titles(self, p: Process) -> dict[str, object]:
510         """Show title history of Process of ?id=."""
511         return {'process': p}
512
513     @_get_item(Process)
514     def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
515         """Show description historys of Process of ?id=."""
516         return {'process': p}
517
518     @_get_item(Process)
519     def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
520         """Show default effort history of Process of ?id=."""
521         return {'process': p}
522
523     def do_GET_processes(self) -> dict[str, object]:
524         """Show all Processes."""
525         pattern = self._params.get_str('pattern')
526         sort_by = self._params.get_str('sort_by')
527         processes = Process.matching(self.conn, pattern)
528         sort_by = Process.sort_by(processes, sort_by)
529         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
530
531     # POST handlers
532
533     @staticmethod
534     def _delete_or_post(target_class: Any, redir_target: str = '/'
535                         ) -> Callable[..., Callable[[TaskHandler], str]]:
536         def decorator(f: Callable[..., str]
537                       ) -> Callable[[TaskHandler], str]:
538             def wrapper(self: TaskHandler) -> str:
539                 # pylint: disable=protected-access
540                 # (because pylint here fails to detect the use of wrapper as a
541                 # method to self with respective access privileges)
542                 id_ = self._params.get_int_or_none('id')
543                 for _ in self._form_data.get_all_str('delete'):
544                     if id_ is None:
545                         msg = 'trying to delete non-saved ' +\
546                                 f'{target_class.__name__}'
547                         raise NotFoundException(msg)
548                     item = target_class.by_id(self.conn, id_)
549                     item.remove(self.conn)
550                     return redir_target
551                 if target_class.can_create_by_id:
552                     item = target_class.by_id_or_create(self.conn, id_)
553                 else:
554                     item = target_class.by_id(self.conn, id_)
555                 return f(self, item)
556             return wrapper
557         return decorator
558
559     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
560         """Update history timestamps for VersionedAttribute."""
561         id_ = self._params.get_int_or_none('id')
562         item = cls.by_id(self.conn, id_)
563         attr = getattr(item, attr_name)
564         for k, v in self._form_data.get_first_strings_starting('at:').items():
565             old = k[3:]
566             if old[19:] != v:
567                 attr.reset_timestamp(old, f'{v}.0')
568         attr.save(self.conn)
569         return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
570
571     def do_POST_day(self) -> str:
572         """Update or insert Day of date and Todos mapped to it."""
573         # pylint: disable=too-many-locals
574         try:
575             date = self._params.get_str('date')
576             day_comment = self._form_data.get_str('day_comment')
577             make_type = self._form_data.get_str('make_type')
578         except NotFoundException as e:
579             raise BadFormatException from e
580         old_todos = self._form_data.get_all_int('todo_id')
581         new_todos = self._form_data.get_all_int('new_todo')
582         comments = self._form_data.get_all_str('comment')
583         efforts = self._form_data.get_all_floats_or_nones('effort')
584         done_todos = self._form_data.get_all_int('done')
585         for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
586             raise BadFormatException('"done" field refers to unknown Todo')
587         is_done = [t_id in done_todos for t_id in old_todos]
588         if not (len(old_todos) == len(is_done) == len(comments)
589                 == len(efforts)):
590             msg = 'not equal number each of number of todo_id, comments, ' +\
591                     'and efforts inputs'
592             raise BadFormatException(msg)
593         day = Day.by_id_or_create(self.conn, date)
594         day.comment = day_comment
595         day.save(self.conn)
596         for process_id in sorted(new_todos):
597             if 'empty' == make_type:
598                 process = Process.by_id(self.conn, process_id)
599                 todo = Todo(None, process, False, date)
600                 todo.save(self.conn)
601             else:
602                 Todo.create_with_children(self.conn, process_id, date)
603         for i, todo_id in enumerate(old_todos):
604             todo = Todo.by_id(self.conn, todo_id)
605             todo.is_done = is_done[i]
606             todo.comment = comments[i]
607             todo.effort = efforts[i]
608             todo.save(self.conn)
609         return f'/day?date={date}&make_type={make_type}'
610
611     @_delete_or_post(Todo, '/')
612     def do_POST_todo(self, todo: Todo) -> str:
613         """Update Todo and its children."""
614         # pylint: disable=too-many-locals
615         # pylint: disable=too-many-branches
616         # pylint: disable=too-many-statements
617         adopted_child_ids = self._form_data.get_all_int('adopt')
618         processes_to_make_full = self._form_data.get_all_int('make_full')
619         processes_to_make_empty = self._form_data.get_all_int('make_empty')
620         fill_fors = self._form_data.get_first_strings_starting('fill_for_')
621         with_effort_post = True
622         try:
623             effort = self._form_data.get_float_or_none('effort')
624         except NotFoundException:
625             with_effort_post = False
626         conditions = self._form_data.get_all_int('conditions')
627         disables = self._form_data.get_all_int('disables')
628         blockers = self._form_data.get_all_int('blockers')
629         enables = self._form_data.get_all_int('enables')
630         is_done = len(self._form_data.get_all_str('done')) > 0
631         calendarize = len(self._form_data.get_all_str('calendarize')) > 0
632         comment = self._form_data.get_str('comment', ignore_strict=True)
633         for v in fill_fors.values():
634             target_id: int
635             for prefix in ['make_empty_', 'make_full_']:
636                 if v.startswith(prefix):
637                     try:
638                         target_id = int(v[len(prefix):])
639                     except ValueError as e:
640                         msg = 'bad fill_for target: {v}'
641                         raise BadFormatException(msg) from e
642                     continue
643             if v.startswith('make_empty_'):
644                 processes_to_make_empty += [target_id]
645             elif v.startswith('make_full_'):
646                 processes_to_make_full += [target_id]
647             elif v != 'ignore':
648                 adopted_child_ids += [int(v)]
649         to_remove = []
650         for child in todo.children:
651             assert isinstance(child.id_, int)
652             if child.id_ not in adopted_child_ids:
653                 to_remove += [child.id_]
654         for id_ in to_remove:
655             child = Todo.by_id(self.conn, id_)
656             todo.remove_child(child)
657         for child_id in adopted_child_ids:
658             if child_id in [c.id_ for c in todo.children]:
659                 continue
660             child = Todo.by_id(self.conn, child_id)
661             todo.add_child(child)
662         for process_id in processes_to_make_empty:
663             process = Process.by_id(self.conn, process_id)
664             made = Todo(None, process, False, todo.date)
665             made.save(self.conn)
666             todo.add_child(made)
667         for process_id in processes_to_make_full:
668             made = Todo.create_with_children(self.conn, process_id, todo.date)
669             todo.add_child(made)
670         if with_effort_post:
671             todo.effort = effort
672         todo.set_conditions(self.conn, conditions)
673         todo.set_blockers(self.conn, blockers)
674         todo.set_enables(self.conn, enables)
675         todo.set_disables(self.conn, disables)
676         todo.is_done = is_done
677         todo.calendarize = calendarize
678         todo.comment = comment
679         todo.save(self.conn)
680         return f'/todo?id={todo.id_}'
681
682     def do_POST_process_descriptions(self) -> str:
683         """Update history timestamps for Process.description."""
684         return self._change_versioned_timestamps(Process, 'description')
685
686     def do_POST_process_efforts(self) -> str:
687         """Update history timestamps for Process.effort."""
688         return self._change_versioned_timestamps(Process, 'effort')
689
690     def do_POST_process_titles(self) -> str:
691         """Update history timestamps for Process.title."""
692         return self._change_versioned_timestamps(Process, 'title')
693
694     @_delete_or_post(Process, '/processes')
695     def do_POST_process(self, process: Process) -> str:
696         """Update or insert Process of ?id= and fields defined in postvars."""
697         # pylint: disable=too-many-locals
698         # pylint: disable=too-many-statements
699         try:
700             title = self._form_data.get_str('title')
701             description = self._form_data.get_str('description')
702             effort = self._form_data.get_float('effort')
703         except NotFoundException as e:
704             raise BadFormatException from e
705         conditions = self._form_data.get_all_int('conditions')
706         blockers = self._form_data.get_all_int('blockers')
707         enables = self._form_data.get_all_int('enables')
708         disables = self._form_data.get_all_int('disables')
709         calendarize = self._form_data.get_all_str('calendarize') != []
710         suppresses = self._form_data.get_all_int('suppresses')
711         step_of = self._form_data.get_all_str('step_of')
712         keep_steps = self._form_data.get_all_int('keep_step')
713         step_ids = self._form_data.get_all_int('steps')
714         new_top_steps = self._form_data.get_all_str('new_top_step')
715         step_process_id_to = {}
716         step_parent_id_to = {}
717         new_steps_to = {}
718         for step_id in step_ids:
719             name = f'new_step_to_{step_id}'
720             new_steps_to[step_id] = self._form_data.get_all_int(name)
721         for step_id in keep_steps:
722             name = f'step_{step_id}_process_id'
723             step_process_id_to[step_id] = self._form_data.get_int(name)
724             name = f'step_{step_id}_parent_id'
725             step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
726         process.title.set(title)
727         process.description.set(description)
728         process.effort.set(effort)
729         process.set_conditions(self.conn, conditions)
730         process.set_blockers(self.conn, blockers)
731         process.set_enables(self.conn, enables)
732         process.set_disables(self.conn, disables)
733         process.calendarize = calendarize
734         process.save(self.conn)
735         assert isinstance(process.id_, int)
736         new_step_title = None
737         steps: list[ProcessStep] = []
738         for step_id in keep_steps:
739             if step_id not in step_ids:
740                 raise BadFormatException('trying to keep unknown step')
741             step = ProcessStep(step_id, process.id_,
742                                step_process_id_to[step_id],
743                                step_parent_id_to[step_id])
744             steps += [step]
745         for step_id in step_ids:
746             new = [ProcessStep(None, process.id_, step_process_id, step_id)
747                    for step_process_id in new_steps_to[step_id]]
748             steps += new
749         for step_identifier in new_top_steps:
750             try:
751                 step_process_id = int(step_identifier)
752                 step = ProcessStep(None, process.id_, step_process_id, None)
753                 steps += [step]
754             except ValueError:
755                 new_step_title = step_identifier
756         process.set_steps(self.conn, steps)
757         process.set_step_suppressions(self.conn, suppresses)
758         owners_to_set = []
759         new_owner_title = None
760         for owner_identifier in step_of:
761             try:
762                 owners_to_set += [int(owner_identifier)]
763             except ValueError:
764                 new_owner_title = owner_identifier
765         process.set_owners(self.conn, owners_to_set)
766         params = f'id={process.id_}'
767         if new_step_title:
768             title_b64_encoded = b64encode(new_step_title.encode()).decode()
769             params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
770         elif new_owner_title:
771             title_b64_encoded = b64encode(new_owner_title.encode()).decode()
772             params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
773         process.save(self.conn)
774         return f'/process?{params}'
775
776     def do_POST_condition_descriptions(self) -> str:
777         """Update history timestamps for Condition.description."""
778         return self._change_versioned_timestamps(Condition, 'description')
779
780     def do_POST_condition_titles(self) -> str:
781         """Update history timestamps for Condition.title."""
782         return self._change_versioned_timestamps(Condition, 'title')
783
784     @_delete_or_post(Condition, '/conditions')
785     def do_POST_condition(self, condition: Condition) -> str:
786         """Update/insert Condition of ?id= and fields defined in postvars."""
787         try:
788             is_active = self._form_data.get_str('is_active') == 'True'
789             title = self._form_data.get_str('title')
790             description = self._form_data.get_str('description')
791         except NotFoundException as e:
792             raise BadFormatException(e) from e
793         condition.is_active = is_active
794         condition.title.set(title)
795         condition.description.set(description)
796         condition.save(self.conn)
797         return f'/condition?id={condition.id_}'