home · contact · privacy
Minor refactoring.
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from binascii import Error as binascii_Exception
7 from http.server import BaseHTTPRequestHandler
8 from http.server import HTTPServer
9 from urllib.parse import urlparse, parse_qs
10 from json import dumps as json_dumps
11 from os.path import split as path_split
12 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
13 from plomtask.dating import date_in_n_days
14 from plomtask.days import Day
15 from plomtask.exceptions import (HandledException, BadFormatException,
16                                  NotFoundException)
17 from plomtask.db import DatabaseConnection, DatabaseFile
18 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
19 from plomtask.conditions import Condition
20 from plomtask.todos import Todo
21
22 TEMPLATES_DIR = 'templates'
23
24
25 class TaskServer(HTTPServer):
26     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
27
28     def __init__(self, db_file: DatabaseFile,
29                  *args: Any, **kwargs: Any) -> None:
30         super().__init__(*args, **kwargs)
31         self.db = db_file
32         self.headers: list[tuple[str, str]] = []
33         self._render_mode = 'html'
34         self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
35
36     def set_json_mode(self) -> None:
37         """Make server send JSON instead of HTML responses."""
38         self._render_mode = 'json'
39         self.headers += [('Content-Type', 'application/json')]
40
41     @staticmethod
42     def ctx_to_json(ctx: dict[str, object]) -> str:
43         """Render ctx into JSON string."""
44         def walk_ctx(node: object) -> Any:
45             if hasattr(node, 'as_dict_into_reference'):
46                 if hasattr(node, 'id_') and node.id_ is not None:
47                     return node.as_dict_into_reference(library)
48             if hasattr(node, 'as_dict'):
49                 return node.as_dict
50             if isinstance(node, (list, tuple)):
51                 return [walk_ctx(x) for x in node]
52             if isinstance(node, dict):
53                 d = {}
54                 for k, v in node.items():
55                     d[k] = walk_ctx(v)
56                 return d
57             if isinstance(node, HandledException):
58                 return str(node)
59             return node
60         library: dict[str, dict[str | int, object]] = {}
61         for k, v in ctx.items():
62             ctx[k] = walk_ctx(v)
63         ctx['_library'] = library
64         return json_dumps(ctx)
65
66     def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
67         """Render ctx according to self._render_mode.."""
68         tmpl_name = f'{tmpl_name}.{self._render_mode}'
69         if 'html' == self._render_mode:
70             template = self._jinja.get_template(tmpl_name)
71             return template.render(ctx)
72         return self.__class__.ctx_to_json(ctx)
73
74
75 class InputsParser:
76     """Wrapper for validating and retrieving dict-like HTTP inputs."""
77
78     def __init__(self, dict_: dict[str, list[str]],
79                  strictness: bool = True) -> None:
80         self.inputs = dict_
81         self.strict = strictness
82
83     def get_str(self, key: str, default: str = '',
84                 ignore_strict: bool = False) -> str:
85         """Retrieve single/first string value of key, or default."""
86         if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
87             if self.strict and not ignore_strict:
88                 raise BadFormatException(f'no value found for key {key}')
89             return default
90         return self.inputs[key][0]
91
92     def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
93         """Retrieve dict of (first) strings at key starting with prefix."""
94         ret = {}
95         for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
96             ret[key] = self.inputs[key][0]
97         return ret
98
99     def get_int(self, key: str) -> int:
100         """Retrieve single/first value of key as int, error if empty."""
101         val = self.get_int_or_none(key)
102         if val is None:
103             raise BadFormatException(f'unexpected empty value for: {key}')
104         return val
105
106     def get_int_or_none(self, key: str) -> int | None:
107         """Retrieve single/first value of key as int, return None if empty."""
108         val = self.get_str(key, ignore_strict=True)
109         if val == '':
110             return None
111         try:
112             return int(val)
113         except ValueError as e:
114             msg = f'cannot int form field value for key {key}: {val}'
115             raise BadFormatException(msg) from e
116
117     def get_float(self, key: str) -> float:
118         """Retrieve float value of key from self.postvars."""
119         val = self.get_str(key)
120         try:
121             return float(val)
122         except ValueError as e:
123             msg = f'cannot float form field value for key {key}: {val}'
124             raise BadFormatException(msg) from e
125
126     def get_all_str(self, key: str) -> list[str]:
127         """Retrieve list of string values at key."""
128         if key not in self.inputs.keys():
129             return []
130         return self.inputs[key]
131
132     def get_all_int(self, key: str) -> list[int]:
133         """Retrieve list of int values at key."""
134         all_str = self.get_all_str(key)
135         try:
136             return [int(s) for s in all_str if len(s) > 0]
137         except ValueError as e:
138             msg = f'cannot int a form field value for key {key} in: {all_str}'
139             raise BadFormatException(msg) from e
140
141     def get_all_floats_or_nones(self, key: str) -> list[float | None]:
142         """Retrieve list of float value at key, None if empty strings."""
143         ret: list[float | None] = []
144         for val in self.get_all_str(key):
145             if '' == val:
146                 ret += [None]
147             else:
148                 try:
149                     ret += [float(val)]
150                 except ValueError as e:
151                     msg = f'cannot float form field value for key {key}: {val}'
152                     raise BadFormatException(msg) from e
153         return ret
154
155
156 class TaskHandler(BaseHTTPRequestHandler):
157     """Handles single HTTP request."""
158     # pylint: disable=too-many-public-methods
159     server: TaskServer
160     conn: DatabaseConnection
161     _site: str
162     _form_data: InputsParser
163     _params: InputsParser
164
165     def _send_page(self,
166                    ctx: dict[str, Any],
167                    tmpl_name: str,
168                    code: int = 200
169                    ) -> None:
170         """Send ctx as proper HTTP response."""
171         body = self.server.render(ctx, tmpl_name)
172         self.send_response(code)
173         for header_tuple in self.server.headers:
174             self.send_header(*header_tuple)
175         self.end_headers()
176         self.wfile.write(bytes(body, 'utf-8'))
177
178     @staticmethod
179     def _request_wrapper(http_method: str, not_found_msg: str
180                          ) -> Callable[..., Callable[[TaskHandler], None]]:
181         """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
182
183         Among other things, conditionally cleans all caches, but only on POST
184         requests, as only those are expected to change the states of objects
185         that may be cached, and certainly only those are expected to write any
186         changes to the database. We want to call them as early though as
187         possible here, either exactly after the specific request handler
188         returns successfully, or right after any exception is triggered –
189         otherwise, race conditions become plausible.
190
191         Note that otherwise any POST attempt, even a failed one, may end in
192         problematic inconsistencies:
193
194         - if the POST handler experiences an Exception, changes to objects
195           won't get written to the DB, but the changed objects may remain in
196           the cache and affect other objects despite their possibly illegal
197           state
198
199         - even if an object was just saved to the DB, we cannot be sure its
200           current state is completely identical to what we'd get if loading it
201           fresh from the DB (e.g. currently Process.n_owners is only updated
202           when loaded anew via .from_table_row, nor is its state written to
203           the DB by .save; a questionable design choice, but proof that we
204           have no guarantee that objects' .save stores all their states we'd
205           prefer at their most up-to-date.
206         """
207
208         def clear_caches() -> None:
209             for cls in (Day, Todo, Condition, Process, ProcessStep):
210                 assert hasattr(cls, 'empty_cache')
211                 cls.empty_cache()
212
213         def decorator(f: Callable[..., str | None]
214                       ) -> Callable[[TaskHandler], None]:
215             def wrapper(self: TaskHandler) -> None:
216                 # pylint: disable=protected-access
217                 # (because pylint here fails to detect the use of wrapper as a
218                 # method to self with respective access privileges)
219                 try:
220                     self.conn = DatabaseConnection(self.server.db)
221                     parsed_url = urlparse(self.path)
222                     self._site = path_split(parsed_url.path)[1]
223                     params = parse_qs(parsed_url.query, strict_parsing=True)
224                     self._params = InputsParser(params, False)
225                     handler_name = f'do_{http_method}_{self._site}'
226                     if hasattr(self, handler_name):
227                         handler = getattr(self, handler_name)
228                         redir_target = f(self, handler)
229                         if 'POST' == http_method:
230                             clear_caches()
231                         if redir_target:
232                             self.send_response(302)
233                             self.send_header('Location', redir_target)
234                             self.end_headers()
235                     else:
236                         msg = f'{not_found_msg}: {self._site}'
237                         raise NotFoundException(msg)
238                 except HandledException as error:
239                     if 'POST' == http_method:
240                         clear_caches()
241                     ctx = {'msg': error}
242                     self._send_page(ctx, 'msg', error.http_code)
243                 finally:
244                     self.conn.close()
245             return wrapper
246         return decorator
247
248     @_request_wrapper('GET', 'Unknown page')
249     def do_GET(self, handler: Callable[[], str | dict[str, object]]
250                ) -> str | None:
251         """Render page with result of handler, or redirect if result is str."""
252         tmpl_name = f'{self._site}'
253         ctx_or_redir_target = handler()
254         if isinstance(ctx_or_redir_target, str):
255             return ctx_or_redir_target
256         self._send_page(ctx_or_redir_target, tmpl_name)
257         return None
258
259     @_request_wrapper('POST', 'Unknown POST target')
260     def do_POST(self, handler: Callable[[], str]) -> str:
261         """Handle POST with handler, prepare redirection to result."""
262         length = int(self.headers['content-length'])
263         postvars = parse_qs(self.rfile.read(length).decode(),
264                             keep_blank_values=True, strict_parsing=True)
265         self._form_data = InputsParser(postvars)
266         redir_target = handler()
267         self.conn.commit()
268         return redir_target
269
270     # GET handlers
271
272     @staticmethod
273     def _get_item(target_class: Any
274                   ) -> Callable[..., Callable[[TaskHandler],
275                                               dict[str, object]]]:
276         def decorator(f: Callable[..., dict[str, object]]
277                       ) -> Callable[[TaskHandler], dict[str, object]]:
278             def wrapper(self: TaskHandler) -> dict[str, object]:
279                 # pylint: disable=protected-access
280                 # (because pylint here fails to detect the use of wrapper as a
281                 # method to self with respective access privileges)
282                 id_ = self._params.get_int_or_none('id')
283                 if target_class.can_create_by_id:
284                     item = target_class.by_id_or_create(self.conn, id_)
285                 else:
286                     item = target_class.by_id(self.conn, id_)
287                 return f(self, item)
288             return wrapper
289         return decorator
290
291     def do_GET_(self) -> str:
292         """Return redirect target on GET /."""
293         return '/day'
294
295     def _do_GET_calendar(self) -> dict[str, object]:
296         """Show Days from ?start= to ?end=.
297
298         Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
299         same, the only difference being the HTML template they are rendered to,
300         which .do_GET selects from their method name.
301         """
302         start, end = self._params.get_str('start'), self._params.get_str('end')
303         end = end if end else date_in_n_days(366)
304         days, start, end = Day.by_date_range_with_limits(self.conn,
305                                                          (start, end), 'id')
306         days = Day.with_filled_gaps(days, start, end)
307         today = date_in_n_days(0)
308         return {'start': start, 'end': end, 'days': days, 'today': today}
309
310     def do_GET_calendar(self) -> dict[str, object]:
311         """Show Days from ?start= to ?end= – normal view."""
312         return self._do_GET_calendar()
313
314     def do_GET_calendar_txt(self) -> dict[str, object]:
315         """Show Days from ?start= to ?end= – minimalist view."""
316         return self._do_GET_calendar()
317
318     def do_GET_day(self) -> dict[str, object]:
319         """Show single Day of ?date=."""
320         date = self._params.get_str('date', date_in_n_days(0))
321         day = Day.by_id_or_create(self.conn, date)
322         make_type = self._params.get_str('make_type')
323         conditions_present = []
324         enablers_for = {}
325         disablers_for = {}
326         for todo in day.todos:
327             for condition in todo.conditions + todo.blockers:
328                 if condition not in conditions_present:
329                     conditions_present += [condition]
330                     enablers_for[condition.id_] = [p for p in
331                                                    Process.all(self.conn)
332                                                    if condition in p.enables]
333                     disablers_for[condition.id_] = [p for p in
334                                                     Process.all(self.conn)
335                                                     if condition in p.disables]
336         seen_todos: set[int] = set()
337         top_nodes = [t.get_step_tree(seen_todos)
338                      for t in day.todos if not t.parents]
339         return {'day': day,
340                 'top_nodes': top_nodes,
341                 'make_type': make_type,
342                 'enablers_for': enablers_for,
343                 'disablers_for': disablers_for,
344                 'conditions_present': conditions_present,
345                 'processes': Process.all(self.conn)}
346
347     @_get_item(Todo)
348     def do_GET_todo(self, todo: Todo) -> dict[str, object]:
349         """Show single Todo of ?id=."""
350
351         @dataclass
352         class TodoStepsNode:
353             """Collect what's useful for Todo steps tree display."""
354             id_: int
355             todo: Todo | None
356             process: Process | None
357             children: list[TodoStepsNode]  # pylint: disable=undefined-variable
358             fillable: bool = False
359
360         def walk_process_steps(id_: int,
361                                process_step_nodes: list[ProcessStepsNode],
362                                steps_nodes: list[TodoStepsNode]) -> None:
363             for process_step_node in process_step_nodes:
364                 id_ += 1
365                 node = TodoStepsNode(id_, None, process_step_node.process, [])
366                 steps_nodes += [node]
367                 walk_process_steps(id_, list(process_step_node.steps.values()),
368                                    node.children)
369
370         def walk_todo_steps(id_: int, todos: list[Todo],
371                             steps_nodes: list[TodoStepsNode]) -> None:
372             for todo in todos:
373                 matched = False
374                 for match in [item for item in steps_nodes
375                               if item.process
376                               and item.process == todo.process]:
377                     match.todo = todo
378                     matched = True
379                     for child in match.children:
380                         child.fillable = True
381                     walk_todo_steps(id_, todo.children, match.children)
382                 if not matched:
383                     id_ += 1
384                     node = TodoStepsNode(id_, todo, None, [])
385                     steps_nodes += [node]
386                     walk_todo_steps(id_, todo.children, node.children)
387
388         def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
389                                     ) -> set[int]:
390             ids = set()
391             for node in steps_nodes:
392                 if not node.todo:
393                     assert isinstance(node.process, Process)
394                     assert isinstance(node.process.id_, int)
395                     ids.add(node.process.id_)
396                 ids = ids | collect_adoptables_keys(node.children)
397             return ids
398
399         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
400         process_tree = todo.process.get_steps(self.conn, None)
401         steps_todo_to_process: list[TodoStepsNode] = []
402         walk_process_steps(0, list(process_tree.values()),
403                            steps_todo_to_process)
404         for steps_node in steps_todo_to_process:
405             steps_node.fillable = True
406         walk_todo_steps(len(steps_todo_to_process), todo_steps,
407                         steps_todo_to_process)
408         adoptables: dict[int, list[Todo]] = {}
409         any_adoptables = [Todo.by_id(self.conn, t.id_)
410                           for t in Todo.by_date(self.conn, todo.date)
411                           if t.id_ is not None
412                           and t != todo]
413         for id_ in collect_adoptables_keys(steps_todo_to_process):
414             adoptables[id_] = [t for t in any_adoptables
415                                if t.process.id_ == id_]
416         return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
417                 'adoption_candidates_for': adoptables,
418                 'process_candidates': Process.all(self.conn),
419                 'todo_candidates': any_adoptables,
420                 'condition_candidates': Condition.all(self.conn)}
421
422     def do_GET_todos(self) -> dict[str, object]:
423         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
424         sort_by = self._params.get_str('sort_by')
425         start = self._params.get_str('start')
426         end = self._params.get_str('end')
427         process_id = self._params.get_int_or_none('process_id')
428         comment_pattern = self._params.get_str('comment_pattern')
429         todos = []
430         ret = Todo.by_date_range_with_limits(self.conn, (start, end))
431         todos_by_date_range, start, end = ret
432         todos = [t for t in todos_by_date_range
433                  if comment_pattern in t.comment
434                  and ((not process_id) or t.process.id_ == process_id)]
435         sort_by = Todo.sort_by(todos, sort_by)
436         return {'start': start, 'end': end, 'process_id': process_id,
437                 'comment_pattern': comment_pattern, 'todos': todos,
438                 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
439
440     def do_GET_conditions(self) -> dict[str, object]:
441         """Show all Conditions."""
442         pattern = self._params.get_str('pattern')
443         sort_by = self._params.get_str('sort_by')
444         conditions = Condition.matching(self.conn, pattern)
445         sort_by = Condition.sort_by(conditions, sort_by)
446         return {'conditions': conditions,
447                 'sort_by': sort_by,
448                 'pattern': pattern}
449
450     @_get_item(Condition)
451     def do_GET_condition(self, c: Condition) -> dict[str, object]:
452         """Show Condition of ?id=."""
453         ps = Process.all(self.conn)
454         return {'condition': c, 'is_new': c.id_ is None,
455                 'enabled_processes': [p for p in ps if c in p.conditions],
456                 'disabled_processes': [p for p in ps if c in p.blockers],
457                 'enabling_processes': [p for p in ps if c in p.enables],
458                 'disabling_processes': [p for p in ps if c in p.disables]}
459
460     @_get_item(Condition)
461     def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
462         """Show title history of Condition of ?id=."""
463         return {'condition': c}
464
465     @_get_item(Condition)
466     def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
467         """Show description historys of Condition of ?id=."""
468         return {'condition': c}
469
470     @_get_item(Process)
471     def do_GET_process(self, process: Process) -> dict[str, object]:
472         """Show Process of ?id=."""
473         owner_ids = self._params.get_all_int('step_to')
474         owned_ids = self._params.get_all_int('has_step')
475         title_64 = self._params.get_str('title_b64')
476         if title_64:
477             try:
478                 title = b64decode(title_64.encode()).decode()
479             except binascii_Exception as exc:
480                 msg = 'invalid base64 for ?title_b64='
481                 raise BadFormatException(msg) from exc
482             process.title.set(title)
483         preset_top_step = None
484         owners = process.used_as_step_by(self.conn)
485         for step_id in owner_ids:
486             owners += [Process.by_id(self.conn, step_id)]
487         for process_id in owned_ids:
488             Process.by_id(self.conn, process_id)  # to ensure ID exists
489             preset_top_step = process_id
490         return {'process': process, 'is_new': process.id_ is None,
491                 'preset_top_step': preset_top_step,
492                 'steps': process.get_steps(self.conn), 'owners': owners,
493                 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
494                 'process_candidates': Process.all(self.conn),
495                 'condition_candidates': Condition.all(self.conn)}
496
497     @_get_item(Process)
498     def do_GET_process_titles(self, p: Process) -> dict[str, object]:
499         """Show title history of Process of ?id=."""
500         return {'process': p}
501
502     @_get_item(Process)
503     def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
504         """Show description historys of Process of ?id=."""
505         return {'process': p}
506
507     @_get_item(Process)
508     def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
509         """Show default effort history of Process of ?id=."""
510         return {'process': p}
511
512     def do_GET_processes(self) -> dict[str, object]:
513         """Show all Processes."""
514         pattern = self._params.get_str('pattern')
515         sort_by = self._params.get_str('sort_by')
516         processes = Process.matching(self.conn, pattern)
517         sort_by = Process.sort_by(processes, sort_by)
518         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
519
520     # POST handlers
521
522     @staticmethod
523     def _delete_or_post(target_class: Any, redir_target: str = '/'
524                         ) -> Callable[..., Callable[[TaskHandler], str]]:
525         def decorator(f: Callable[..., str]
526                       ) -> Callable[[TaskHandler], str]:
527             def wrapper(self: TaskHandler) -> str:
528                 # pylint: disable=protected-access
529                 # (because pylint here fails to detect the use of wrapper as a
530                 # method to self with respective access privileges)
531                 id_ = self._params.get_int_or_none('id')
532                 for _ in self._form_data.get_all_str('delete'):
533                     if id_ is None:
534                         msg = 'trying to delete non-saved ' +\
535                                 f'{target_class.__name__}'
536                         raise NotFoundException(msg)
537                     item = target_class.by_id(self.conn, id_)
538                     item.remove(self.conn)
539                     return redir_target
540                 if target_class.can_create_by_id:
541                     item = target_class.by_id_or_create(self.conn, id_)
542                 else:
543                     item = target_class.by_id(self.conn, id_)
544                 return f(self, item)
545             return wrapper
546         return decorator
547
548     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
549         """Update history timestamps for VersionedAttribute."""
550         id_ = self._params.get_int_or_none('id')
551         item = cls.by_id(self.conn, id_)
552         attr = getattr(item, attr_name)
553         for k, v in self._form_data.get_first_strings_starting('at:').items():
554             old = k[3:]
555             if old[19:] != v:
556                 attr.reset_timestamp(old, f'{v}.0')
557         attr.save(self.conn)
558         return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
559
560     def do_POST_day(self) -> str:
561         """Update or insert Day of date and Todos mapped to it."""
562         # pylint: disable=too-many-locals
563         date = self._params.get_str('date')
564         day_comment = self._form_data.get_str('day_comment')
565         make_type = self._form_data.get_str('make_type')
566         old_todos = self._form_data.get_all_int('todo_id')
567         new_todos = self._form_data.get_all_int('new_todo')
568         comments = self._form_data.get_all_str('comment')
569         efforts = self._form_data.get_all_floats_or_nones('effort')
570         done_todos = self._form_data.get_all_int('done')
571         for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
572             raise BadFormatException('"done" field refers to unknown Todo')
573         is_done = [t_id in done_todos for t_id in old_todos]
574         if not (len(old_todos) == len(is_done) == len(comments)
575                 == len(efforts)):
576             msg = 'not equal number each of number of todo_id, comments, ' +\
577                     'and efforts inputs'
578             raise BadFormatException(msg)
579         day = Day.by_id_or_create(self.conn, date)
580         day.comment = day_comment
581         day.save(self.conn)
582         for process_id in sorted(new_todos):
583             if 'empty' == make_type:
584                 process = Process.by_id(self.conn, process_id)
585                 todo = Todo(None, process, False, date)
586                 todo.save(self.conn)
587             else:
588                 Todo.create_with_children(self.conn, process_id, date)
589         for i, todo_id in enumerate(old_todos):
590             todo = Todo.by_id(self.conn, todo_id)
591             todo.is_done = is_done[i]
592             todo.comment = comments[i]
593             todo.effort = efforts[i]
594             todo.save(self.conn)
595         return f'/day?date={date}&make_type={make_type}'
596
597     @_delete_or_post(Todo, '/')
598     def do_POST_todo(self, todo: Todo) -> str:
599         """Update Todo and its children."""
600         # pylint: disable=too-many-locals
601         adopted_child_ids = self._form_data.get_all_int('adopt')
602         processes_to_make_full = self._form_data.get_all_int('make_full')
603         processes_to_make_empty = self._form_data.get_all_int('make_empty')
604         fill_fors = self._form_data.get_first_strings_starting('fill_for_')
605         effort = self._form_data.get_str('effort', ignore_strict=True)
606         conditions = self._form_data.get_all_int('conditions')
607         disables = self._form_data.get_all_int('disables')
608         blockers = self._form_data.get_all_int('blockers')
609         enables = self._form_data.get_all_int('enables')
610         is_done = len(self._form_data.get_all_str('done')) > 0
611         calendarize = len(self._form_data.get_all_str('calendarize')) > 0
612         comment = self._form_data.get_str('comment', ignore_strict=True)
613         for v in fill_fors.values():
614             if v.startswith('make_empty_'):
615                 processes_to_make_empty += [int(v[11:])]
616             elif v.startswith('make_full_'):
617                 processes_to_make_full += [int(v[10:])]
618             elif v != 'ignore':
619                 adopted_child_ids += [int(v)]
620         to_remove = []
621         for child in todo.children:
622             assert isinstance(child.id_, int)
623             if child.id_ not in adopted_child_ids:
624                 to_remove += [child.id_]
625         for id_ in to_remove:
626             child = Todo.by_id(self.conn, id_)
627             todo.remove_child(child)
628         for child_id in adopted_child_ids:
629             if child_id in [c.id_ for c in todo.children]:
630                 continue
631             child = Todo.by_id(self.conn, child_id)
632             todo.add_child(child)
633         for process_id in processes_to_make_empty:
634             process = Process.by_id(self.conn, process_id)
635             made = Todo(None, process, False, todo.date)
636             made.save(self.conn)
637             todo.add_child(made)
638         for process_id in processes_to_make_full:
639             made = Todo.create_with_children(self.conn, process_id, todo.date)
640             todo.add_child(made)
641         todo.effort = float(effort) if effort else None
642         todo.set_conditions(self.conn, conditions)
643         todo.set_blockers(self.conn, blockers)
644         todo.set_enables(self.conn, enables)
645         todo.set_disables(self.conn, disables)
646         todo.is_done = is_done
647         todo.calendarize = calendarize
648         todo.comment = comment
649         todo.save(self.conn)
650         return f'/todo?id={todo.id_}'
651
652     def do_POST_process_descriptions(self) -> str:
653         """Update history timestamps for Process.description."""
654         return self._change_versioned_timestamps(Process, 'description')
655
656     def do_POST_process_efforts(self) -> str:
657         """Update history timestamps for Process.effort."""
658         return self._change_versioned_timestamps(Process, 'effort')
659
660     def do_POST_process_titles(self) -> str:
661         """Update history timestamps for Process.title."""
662         return self._change_versioned_timestamps(Process, 'title')
663
664     @_delete_or_post(Process, '/processes')
665     def do_POST_process(self, process: Process) -> str:
666         """Update or insert Process of ?id= and fields defined in postvars."""
667         # pylint: disable=too-many-locals
668         # pylint: disable=too-many-statements
669         title = self._form_data.get_str('title')
670         description = self._form_data.get_str('description')
671         effort = self._form_data.get_float('effort')
672         conditions = self._form_data.get_all_int('conditions')
673         blockers = self._form_data.get_all_int('blockers')
674         enables = self._form_data.get_all_int('enables')
675         disables = self._form_data.get_all_int('disables')
676         calendarize = self._form_data.get_all_str('calendarize') != []
677         suppresses = self._form_data.get_all_int('suppresses')
678         step_of = self._form_data.get_all_str('step_of')
679         keep_steps = self._form_data.get_all_int('keep_step')
680         step_ids = self._form_data.get_all_int('steps')
681         new_top_steps = self._form_data.get_all_str('new_top_step')
682         step_process_id_to = {}
683         step_parent_id_to = {}
684         new_steps_to = {}
685         for step_id in step_ids:
686             name = f'new_step_to_{step_id}'
687             new_steps_to[step_id] = self._form_data.get_all_int(name)
688         for step_id in keep_steps:
689             name = f'step_{step_id}_process_id'
690             step_process_id_to[step_id] = self._form_data.get_int(name)
691             name = f'step_{step_id}_parent_id'
692             step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
693         process.title.set(title)
694         process.description.set(description)
695         process.effort.set(effort)
696         process.set_conditions(self.conn, conditions)
697         process.set_blockers(self.conn, blockers)
698         process.set_enables(self.conn, enables)
699         process.set_disables(self.conn, disables)
700         process.calendarize = calendarize
701         process.save(self.conn)
702         assert isinstance(process.id_, int)
703         new_step_title = None
704         steps: list[ProcessStep] = []
705         for step_id in keep_steps:
706             if step_id not in step_ids:
707                 raise BadFormatException('trying to keep unknown step')
708             step = ProcessStep(step_id, process.id_,
709                                step_process_id_to[step_id],
710                                step_parent_id_to[step_id])
711             steps += [step]
712         for step_id in step_ids:
713             new = [ProcessStep(None, process.id_, step_process_id, step_id)
714                    for step_process_id in new_steps_to[step_id]]
715             steps += new
716         for step_identifier in new_top_steps:
717             try:
718                 step_process_id = int(step_identifier)
719                 step = ProcessStep(None, process.id_, step_process_id, None)
720                 steps += [step]
721             except ValueError:
722                 new_step_title = step_identifier
723         process.set_steps(self.conn, steps)
724         process.set_step_suppressions(self.conn, suppresses)
725         owners_to_set = []
726         new_owner_title = None
727         for owner_identifier in step_of:
728             try:
729                 owners_to_set += [int(owner_identifier)]
730             except ValueError:
731                 new_owner_title = owner_identifier
732         process.set_owners(self.conn, owners_to_set)
733         params = f'id={process.id_}'
734         if new_step_title:
735             title_b64_encoded = b64encode(new_step_title.encode()).decode()
736             params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
737         elif new_owner_title:
738             title_b64_encoded = b64encode(new_owner_title.encode()).decode()
739             params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
740         process.save(self.conn)
741         return f'/process?{params}'
742
743     def do_POST_condition_descriptions(self) -> str:
744         """Update history timestamps for Condition.description."""
745         return self._change_versioned_timestamps(Condition, 'description')
746
747     def do_POST_condition_titles(self) -> str:
748         """Update history timestamps for Condition.title."""
749         return self._change_versioned_timestamps(Condition, 'title')
750
751     @_delete_or_post(Condition, '/conditions')
752     def do_POST_condition(self, condition: Condition) -> str:
753         """Update/insert Condition of ?id= and fields defined in postvars."""
754         is_active = self._form_data.get_str('is_active') == 'True'
755         title = self._form_data.get_str('title')
756         description = self._form_data.get_str('description')
757         condition.is_active = is_active
758         condition.title.set(title)
759         condition.description.set(description)
760         condition.save(self.conn)
761         return f'/condition?id={condition.id_}'