home · contact · privacy
Fix Condition POSTing not properly setting .is_active.
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import HandledException, BadFormatException, \
15         NotFoundException
16 from plomtask.db import DatabaseConnection, DatabaseFile
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo
20 from plomtask.db import BaseModel
21
22 TEMPLATES_DIR = 'templates'
23
24
25 class TaskServer(HTTPServer):
26     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
27
28     def __init__(self, db_file: DatabaseFile,
29                  *args: Any, **kwargs: Any) -> None:
30         super().__init__(*args, **kwargs)
31         self.db = db_file
32         self.headers: list[tuple[str, str]] = []
33         self._render_mode = 'html'
34         self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
35
36     def set_json_mode(self) -> None:
37         """Make server send JSON instead of HTML responses."""
38         self._render_mode = 'json'
39         self.headers += [('Content-Type', 'application/json')]
40
41     @staticmethod
42     def ctx_to_json(ctx: dict[str, object]) -> str:
43         """Render ctx into JSON string."""
44         def walk_ctx(node: object) -> Any:
45             if isinstance(node, BaseModel):
46                 return node.as_dict
47             if isinstance(node, (list, tuple)):
48                 return [walk_ctx(x) for x in node]
49             if isinstance(node, HandledException):
50                 return str(node)
51             return node
52         for k, v in ctx.items():
53             ctx[k] = walk_ctx(v)
54         return json_dumps(ctx)
55
56     def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
57         """Render ctx according to self._render_mode.."""
58         tmpl_name = f'{tmpl_name}.{self._render_mode}'
59         if 'html' == self._render_mode:
60             template = self._jinja.get_template(tmpl_name)
61             return template.render(ctx)
62         return self.__class__.ctx_to_json(ctx)
63
64
65 class InputsParser:
66     """Wrapper for validating and retrieving dict-like HTTP inputs."""
67
68     def __init__(self, dict_: dict[str, list[str]],
69                  strictness: bool = True) -> None:
70         self.inputs = dict_
71         self.strict = strictness
72
73     def get_str(self, key: str, default: str = '',
74                 ignore_strict: bool = False) -> str:
75         """Retrieve single/first string value of key, or default."""
76         if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
77             if self.strict and not ignore_strict:
78                 raise BadFormatException(f'no value found for key {key}')
79             return default
80         return self.inputs[key][0]
81
82     def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
83         """Retrieve dict of (first) strings at key starting with prefix."""
84         ret = {}
85         for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
86             ret[key] = self.inputs[key][0]
87         return ret
88
89     def get_int(self, key: str) -> int:
90         """Retrieve single/first value of key as int, error if empty."""
91         val = self.get_int_or_none(key)
92         if val is None:
93             raise BadFormatException(f'unexpected empty value for: {key}')
94         return val
95
96     def get_int_or_none(self, key: str) -> int | None:
97         """Retrieve single/first value of key as int, return None if empty."""
98         val = self.get_str(key, ignore_strict=True)
99         if val == '':
100             return None
101         try:
102             return int(val)
103         except ValueError as e:
104             msg = f'cannot int form field value for key {key}: {val}'
105             raise BadFormatException(msg) from e
106
107     def get_float(self, key: str) -> float:
108         """Retrieve float value of key from self.postvars."""
109         val = self.get_str(key)
110         try:
111             return float(val)
112         except ValueError as e:
113             msg = f'cannot float form field value for key {key}: {val}'
114             raise BadFormatException(msg) from e
115
116     def get_all_str(self, key: str) -> list[str]:
117         """Retrieve list of string values at key."""
118         if key not in self.inputs.keys():
119             return []
120         return self.inputs[key]
121
122     def get_all_int(self, key: str) -> list[int]:
123         """Retrieve list of int values at key."""
124         all_str = self.get_all_str(key)
125         try:
126             return [int(s) for s in all_str if len(s) > 0]
127         except ValueError as e:
128             msg = f'cannot int a form field value for key {key} in: {all_str}'
129             raise BadFormatException(msg) from e
130
131
132 class TaskHandler(BaseHTTPRequestHandler):
133     """Handles single HTTP request."""
134     # pylint: disable=too-many-public-methods
135     server: TaskServer
136     conn: DatabaseConnection
137     _site: str
138     _form_data: InputsParser
139     _params: InputsParser
140
141     def _send_page(self,
142                    ctx: dict[str, Any],
143                    tmpl_name: str,
144                    code: int = 200
145                    ) -> None:
146         """Send HTML as proper HTTP response."""
147         body = self.server.render(ctx, tmpl_name)
148         self.send_response(code)
149         for header_tuple in self.server.headers:
150             self.send_header(*header_tuple)
151         self.end_headers()
152         self.wfile.write(bytes(body, 'utf-8'))
153
154     @staticmethod
155     def _request_wrapper(http_method: str, not_found_msg: str
156                          ) -> Callable[..., Callable[[TaskHandler], None]]:
157         def decorator(f: Callable[..., str | None]
158                       ) -> Callable[[TaskHandler], None]:
159             def wrapper(self: TaskHandler) -> None:
160                 # pylint: disable=protected-access
161                 # (because pylint here fails to detect the use of wrapper as a
162                 # method to self with respective access privileges)
163                 try:
164                     self.conn = DatabaseConnection(self.server.db)
165                     parsed_url = urlparse(self.path)
166                     self._site = path_split(parsed_url.path)[1]
167                     params = parse_qs(parsed_url.query, strict_parsing=True)
168                     self._params = InputsParser(params, False)
169                     handler_name = f'do_{http_method}_{self._site}'
170                     if hasattr(self, handler_name):
171                         handler = getattr(self, handler_name)
172                         redir_target = f(self, handler)
173                         if redir_target:
174                             self.send_response(302)
175                             self.send_header('Location', redir_target)
176                             self.end_headers()
177                     else:
178                         msg = f'{not_found_msg}: {self._site}'
179                         raise NotFoundException(msg)
180                 except HandledException as error:
181                     for cls in (Day, Todo, Condition, Process, ProcessStep):
182                         assert hasattr(cls, 'empty_cache')
183                         cls.empty_cache()
184                     ctx = {'msg': error}
185                     self._send_page(ctx, 'msg', error.http_code)
186                 finally:
187                     self.conn.close()
188             return wrapper
189         return decorator
190
191     @_request_wrapper('GET', 'Unknown page')
192     def do_GET(self, handler: Callable[[], str | dict[str, object]]
193                ) -> str | None:
194         """Render page with result of handler, or redirect if result is str."""
195         tmpl_name = f'{self._site}'
196         ctx_or_redir_target = handler()
197         if isinstance(ctx_or_redir_target, str):
198             return ctx_or_redir_target
199         self._send_page(ctx_or_redir_target, tmpl_name)
200         return None
201
202     @_request_wrapper('POST', 'Unknown POST target')
203     def do_POST(self, handler: Callable[[], str]) -> str:
204         """Handle POST with handler, prepare redirection to result."""
205         length = int(self.headers['content-length'])
206         postvars = parse_qs(self.rfile.read(length).decode(),
207                             keep_blank_values=True, strict_parsing=True)
208         self._form_data = InputsParser(postvars)
209         redir_target = handler()
210         self.conn.commit()
211         return redir_target
212
213     # GET handlers
214
215     def do_GET_(self) -> str:
216         """Return redirect target on GET /."""
217         return '/day'
218
219     def _do_GET_calendar(self) -> dict[str, object]:
220         """Show Days from ?start= to ?end=.
221
222         Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
223         same, the only difference being the HTML template they are rendered to,
224         which .do_GET selects from their method name.
225         """
226         start = self._params.get_str('start')
227         end = self._params.get_str('end')
228         if not end:
229             end = date_in_n_days(366)
230         ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
231         days, start, end = ret
232         days = Day.with_filled_gaps(days, start, end)
233         today = date_in_n_days(0)
234         return {'start': start, 'end': end, 'days': days, 'today': today}
235
236     def do_GET_calendar(self) -> dict[str, object]:
237         """Show Days from ?start= to ?end= – normal view."""
238         return self._do_GET_calendar()
239
240     def do_GET_calendar_txt(self) -> dict[str, object]:
241         """Show Days from ?start= to ?end= – minimalist view."""
242         return self._do_GET_calendar()
243
244     def do_GET_day(self) -> dict[str, object]:
245         """Show single Day of ?date=."""
246         date = self._params.get_str('date', date_in_n_days(0))
247         day = Day.by_id(self.conn, date, create=True)
248         make_type = self._params.get_str('make_type')
249         conditions_present = []
250         enablers_for = {}
251         disablers_for = {}
252         for todo in day.todos:
253             for condition in todo.conditions + todo.blockers:
254                 if condition not in conditions_present:
255                     conditions_present += [condition]
256                     enablers_for[condition.id_] = [p for p in
257                                                    Process.all(self.conn)
258                                                    if condition in p.enables]
259                     disablers_for[condition.id_] = [p for p in
260                                                     Process.all(self.conn)
261                                                     if condition in p.disables]
262         seen_todos: set[int] = set()
263         top_nodes = [t.get_step_tree(seen_todos)
264                      for t in day.todos if not t.parents]
265         return {'day': day,
266                 'top_nodes': top_nodes,
267                 'make_type': make_type,
268                 'enablers_for': enablers_for,
269                 'disablers_for': disablers_for,
270                 'conditions_present': conditions_present,
271                 'processes': Process.all(self.conn)}
272
273     def do_GET_todo(self) -> dict[str, object]:
274         """Show single Todo of ?id=."""
275
276         @dataclass
277         class TodoStepsNode:
278             """Collect what's useful for Todo steps tree display."""
279             id_: int
280             todo: Todo | None
281             process: Process | None
282             children: list[TodoStepsNode]  # pylint: disable=undefined-variable
283             fillable: bool = False
284
285         def walk_process_steps(id_: int,
286                                process_step_nodes: list[ProcessStepsNode],
287                                steps_nodes: list[TodoStepsNode]) -> None:
288             for process_step_node in process_step_nodes:
289                 id_ += 1
290                 node = TodoStepsNode(id_, None, process_step_node.process, [])
291                 steps_nodes += [node]
292                 walk_process_steps(id_, list(process_step_node.steps.values()),
293                                    node.children)
294
295         def walk_todo_steps(id_: int, todos: list[Todo],
296                             steps_nodes: list[TodoStepsNode]) -> None:
297             for todo in todos:
298                 matched = False
299                 for match in [item for item in steps_nodes
300                               if item.process
301                               and item.process == todo.process]:
302                     match.todo = todo
303                     matched = True
304                     for child in match.children:
305                         child.fillable = True
306                     walk_todo_steps(id_, todo.children, match.children)
307                 if not matched:
308                     id_ += 1
309                     node = TodoStepsNode(id_, todo, None, [])
310                     steps_nodes += [node]
311                     walk_todo_steps(id_, todo.children, node.children)
312
313         def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
314                                     ) -> set[int]:
315             ids = set()
316             for node in steps_nodes:
317                 if not node.todo:
318                     assert isinstance(node.process, Process)
319                     assert isinstance(node.process.id_, int)
320                     ids.add(node.process.id_)
321                 ids = ids | collect_adoptables_keys(node.children)
322             return ids
323
324         id_ = self._params.get_int('id')
325         todo = Todo.by_id(self.conn, id_)
326         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
327         process_tree = todo.process.get_steps(self.conn, None)
328         steps_todo_to_process: list[TodoStepsNode] = []
329         walk_process_steps(0, list(process_tree.values()),
330                            steps_todo_to_process)
331         for steps_node in steps_todo_to_process:
332             steps_node.fillable = True
333         walk_todo_steps(len(steps_todo_to_process), todo_steps,
334                         steps_todo_to_process)
335         adoptables: dict[int, list[Todo]] = {}
336         any_adoptables = [Todo.by_id(self.conn, t.id_)
337                           for t in Todo.by_date(self.conn, todo.date)
338                           if t != todo]
339         for id_ in collect_adoptables_keys(steps_todo_to_process):
340             adoptables[id_] = [t for t in any_adoptables
341                                if t.process.id_ == id_]
342         return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
343                 'adoption_candidates_for': adoptables,
344                 'process_candidates': Process.all(self.conn),
345                 'todo_candidates': any_adoptables,
346                 'condition_candidates': Condition.all(self.conn)}
347
348     def do_GET_todos(self) -> dict[str, object]:
349         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
350         sort_by = self._params.get_str('sort_by')
351         start = self._params.get_str('start')
352         end = self._params.get_str('end')
353         process_id = self._params.get_int_or_none('process_id')
354         comment_pattern = self._params.get_str('comment_pattern')
355         todos = []
356         ret = Todo.by_date_range_with_limits(self.conn, (start, end))
357         todos_by_date_range, start, end = ret
358         todos = [t for t in todos_by_date_range
359                  if comment_pattern in t.comment
360                  and ((not process_id) or t.process.id_ == process_id)]
361         if sort_by == 'doneness':
362             todos.sort(key=lambda t: t.is_done)
363         elif sort_by == '-doneness':
364             todos.sort(key=lambda t: t.is_done, reverse=True)
365         elif sort_by == 'title':
366             todos.sort(key=lambda t: t.title_then)
367         elif sort_by == '-title':
368             todos.sort(key=lambda t: t.title_then, reverse=True)
369         elif sort_by == 'comment':
370             todos.sort(key=lambda t: t.comment)
371         elif sort_by == '-comment':
372             todos.sort(key=lambda t: t.comment, reverse=True)
373         elif sort_by == '-date':
374             todos.sort(key=lambda t: t.date, reverse=True)
375         else:
376             todos.sort(key=lambda t: t.date)
377             sort_by = 'title'
378         return {'start': start, 'end': end, 'process_id': process_id,
379                 'comment_pattern': comment_pattern, 'todos': todos,
380                 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
381
382     def do_GET_conditions(self) -> dict[str, object]:
383         """Show all Conditions."""
384         pattern = self._params.get_str('pattern')
385         conditions = Condition.matching(self.conn, pattern)
386         sort_by = self._params.get_str('sort_by')
387         if sort_by == 'is_active':
388             conditions.sort(key=lambda c: c.is_active)
389         elif sort_by == '-is_active':
390             conditions.sort(key=lambda c: c.is_active, reverse=True)
391         elif sort_by == '-title':
392             conditions.sort(key=lambda c: c.title.newest, reverse=True)
393         else:
394             conditions.sort(key=lambda c: c.title.newest)
395             sort_by = 'title'
396         return {'conditions': conditions,
397                 'sort_by': sort_by,
398                 'pattern': pattern}
399
400     def do_GET_condition(self) -> dict[str, object]:
401         """Show Condition of ?id=."""
402         id_ = self._params.get_int_or_none('id')
403         c = Condition.by_id(self.conn, id_, create=True)
404         ps = Process.all(self.conn)
405         return {'condition': c, 'is_new': c.id_ is None,
406                 'enabled_processes': [p for p in ps if c in p.conditions],
407                 'disabled_processes': [p for p in ps if c in p.blockers],
408                 'enabling_processes': [p for p in ps if c in p.enables],
409                 'disabling_processes': [p for p in ps if c in p.disables]}
410
411     def do_GET_condition_titles(self) -> dict[str, object]:
412         """Show title history of Condition of ?id=."""
413         id_ = self._params.get_int_or_none('id')
414         condition = Condition.by_id(self.conn, id_)
415         return {'condition': condition}
416
417     def do_GET_condition_descriptions(self) -> dict[str, object]:
418         """Show description historys of Condition of ?id=."""
419         id_ = self._params.get_int_or_none('id')
420         condition = Condition.by_id(self.conn, id_)
421         return {'condition': condition}
422
423     def do_GET_process(self) -> dict[str, object]:
424         """Show Process of ?id=."""
425         id_ = self._params.get_int_or_none('id')
426         process = Process.by_id(self.conn, id_, create=True)
427         title_64 = self._params.get_str('title_b64')
428         if title_64:
429             title = b64decode(title_64.encode()).decode()
430             process.title.set(title)
431         owners = process.used_as_step_by(self.conn)
432         for step_id in self._params.get_all_int('step_to'):
433             owners += [Process.by_id(self.conn, step_id)]
434         preset_top_step = None
435         for process_id in self._params.get_all_int('has_step'):
436             preset_top_step = process_id
437         return {'process': process, 'is_new': process.id_ is None,
438                 'preset_top_step': preset_top_step,
439                 'steps': process.get_steps(self.conn), 'owners': owners,
440                 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
441                 'process_candidates': Process.all(self.conn),
442                 'condition_candidates': Condition.all(self.conn)}
443
444     def do_GET_process_titles(self) -> dict[str, object]:
445         """Show title history of Process of ?id=."""
446         id_ = self._params.get_int_or_none('id')
447         process = Process.by_id(self.conn, id_)
448         return {'process': process}
449
450     def do_GET_process_descriptions(self) -> dict[str, object]:
451         """Show description historys of Process of ?id=."""
452         id_ = self._params.get_int_or_none('id')
453         process = Process.by_id(self.conn, id_)
454         return {'process': process}
455
456     def do_GET_process_efforts(self) -> dict[str, object]:
457         """Show default effort history of Process of ?id=."""
458         id_ = self._params.get_int_or_none('id')
459         process = Process.by_id(self.conn, id_)
460         return {'process': process}
461
462     def do_GET_processes(self) -> dict[str, object]:
463         """Show all Processes."""
464         pattern = self._params.get_str('pattern')
465         processes = Process.matching(self.conn, pattern)
466         sort_by = self._params.get_str('sort_by')
467         if sort_by == 'steps':
468             processes.sort(key=lambda p: len(p.explicit_steps))
469         elif sort_by == '-steps':
470             processes.sort(key=lambda p: len(p.explicit_steps), reverse=True)
471         elif sort_by == 'owners':
472             processes.sort(key=lambda p: p.n_owners or 0)
473         elif sort_by == '-owners':
474             processes.sort(key=lambda p: p.n_owners or 0, reverse=True)
475         elif sort_by == 'effort':
476             processes.sort(key=lambda p: p.effort.newest)
477         elif sort_by == '-effort':
478             processes.sort(key=lambda p: p.effort.newest, reverse=True)
479         elif sort_by == '-title':
480             processes.sort(key=lambda p: p.title.newest, reverse=True)
481         else:
482             processes.sort(key=lambda p: p.title.newest)
483             sort_by = 'title'
484         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
485
486     # POST handlers
487
488     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
489         """Update history timestamps for VersionedAttribute."""
490         id_ = self._params.get_int_or_none('id')
491         item = cls.by_id(self.conn, id_)
492         attr = getattr(item, attr_name)
493         for k, v in self._form_data.get_first_strings_starting('at:').items():
494             old = k[3:]
495             if old[19:] != v:
496                 attr.reset_timestamp(old, f'{v}.0')
497         attr.save(self.conn)
498         cls_name = cls.__name__.lower()
499         return f'/{cls_name}_{attr_name}s?id={item.id_}'
500
501     def do_POST_day(self) -> str:
502         """Update or insert Day of date and Todos mapped to it."""
503         date = self._params.get_str('date')
504         day = Day.by_id(self.conn, date, create=True)
505         day.comment = self._form_data.get_str('day_comment')
506         day.save(self.conn)
507         make_type = self._form_data.get_str('make_type')
508         for process_id in sorted(self._form_data.get_all_int('new_todo')):
509             if 'empty' == make_type:
510                 process = Process.by_id(self.conn, process_id)
511                 todo = Todo(None, process, False, date)
512                 todo.save(self.conn)
513             else:
514                 Todo.create_with_children(self.conn, process_id, date)
515         done_ids = self._form_data.get_all_int('done')
516         comments = self._form_data.get_all_str('comment')
517         efforts = self._form_data.get_all_str('effort')
518         for i, todo_id in enumerate(self._form_data.get_all_int('todo_id')):
519             todo = Todo.by_id(self.conn, todo_id)
520             todo.is_done = todo_id in done_ids
521             if len(comments) > 0:
522                 todo.comment = comments[i]
523             if len(efforts) > 0:
524                 todo.effort = float(efforts[i]) if efforts[i] else None
525             todo.save(self.conn)
526         return f'/day?date={date}&make_type={make_type}'
527
528     def do_POST_todo(self) -> str:
529         """Update Todo and its children."""
530         # pylint: disable=too-many-locals
531         # pylint: disable=too-many-branches
532         id_ = self._params.get_int('id')
533         for _ in self._form_data.get_all_str('delete'):
534             todo = Todo .by_id(self.conn, id_)
535             todo.remove(self.conn)
536             return '/'
537         todo = Todo.by_id(self.conn, id_)
538         adopted_child_ids = self._form_data.get_all_int('adopt')
539         processes_to_make_full = self._form_data.get_all_int('make_full')
540         processes_to_make_empty = self._form_data.get_all_int('make_empty')
541         fill_fors = self._form_data.get_first_strings_starting('fill_for_')
542         for v in fill_fors.values():
543             if v.startswith('make_empty_'):
544                 processes_to_make_empty += [int(v[11:])]
545             elif v.startswith('make_full_'):
546                 processes_to_make_full += [int(v[10:])]
547             elif v != 'ignore':
548                 adopted_child_ids += [int(v)]
549         to_remove = []
550         for child in todo.children:
551             assert isinstance(child.id_, int)
552             if child.id_ not in adopted_child_ids:
553                 to_remove += [child.id_]
554         for id_ in to_remove:
555             child = Todo.by_id(self.conn, id_)
556             todo.remove_child(child)
557         for child_id in adopted_child_ids:
558             if child_id in [c.id_ for c in todo.children]:
559                 continue
560             child = Todo.by_id(self.conn, child_id)
561             todo.add_child(child)
562         for process_id in processes_to_make_empty:
563             process = Process.by_id(self.conn, process_id)
564             made = Todo(None, process, False, todo.date)
565             made.save(self.conn)
566             todo.add_child(made)
567         for process_id in processes_to_make_full:
568             made = Todo.create_with_children(self.conn, process_id, todo.date)
569             todo.add_child(made)
570         effort = self._form_data.get_str('effort', ignore_strict=True)
571         todo.effort = float(effort) if effort else None
572         todo.set_conditions(self.conn,
573                             self._form_data.get_all_int('condition'))
574         todo.set_blockers(self.conn, self._form_data.get_all_int('blocker'))
575         todo.set_enables(self.conn, self._form_data.get_all_int('enables'))
576         todo.set_disables(self.conn, self._form_data.get_all_int('disables'))
577         todo.is_done = len(self._form_data.get_all_str('done')) > 0
578         todo.calendarize = len(self._form_data.get_all_str('calendarize')) > 0
579         todo.comment = self._form_data.get_str('comment', ignore_strict=True)
580         todo.save(self.conn)
581         return f'/todo?id={todo.id_}'
582
583     def do_POST_process_descriptions(self) -> str:
584         """Update history timestamps for Process.description."""
585         return self._change_versioned_timestamps(Process, 'description')
586
587     def do_POST_process_efforts(self) -> str:
588         """Update history timestamps for Process.effort."""
589         return self._change_versioned_timestamps(Process, 'effort')
590
591     def do_POST_process_titles(self) -> str:
592         """Update history timestamps for Process.title."""
593         return self._change_versioned_timestamps(Process, 'title')
594
595     def do_POST_process(self) -> str:
596         """Update or insert Process of ?id= and fields defined in postvars."""
597         # pylint: disable=too-many-branches
598         id_ = self._params.get_int_or_none('id')
599         for _ in self._form_data.get_all_str('delete'):
600             process = Process.by_id(self.conn, id_)
601             process.remove(self.conn)
602             return '/processes'
603         process = Process.by_id(self.conn, id_, create=True)
604         process.title.set(self._form_data.get_str('title'))
605         process.description.set(self._form_data.get_str('description'))
606         process.effort.set(self._form_data.get_float('effort'))
607         process.set_conditions(self.conn,
608                                self._form_data.get_all_int('condition'))
609         process.set_blockers(self.conn, self._form_data.get_all_int('blocker'))
610         process.set_enables(self.conn, self._form_data.get_all_int('enables'))
611         process.set_disables(self.conn,
612                              self._form_data.get_all_int('disables'))
613         process.calendarize = self._form_data.get_all_str('calendarize') != []
614         process.save(self.conn)
615         assert isinstance(process.id_, int)
616         steps: list[ProcessStep] = []
617         for step_id in self._form_data.get_all_int('keep_step'):
618             if step_id not in self._form_data.get_all_int('steps'):
619                 raise BadFormatException('trying to keep unknown step')
620         for step_id in self._form_data.get_all_int('steps'):
621             if step_id not in self._form_data.get_all_int('keep_step'):
622                 continue
623             step_process_id = self._form_data.get_int(
624                     f'step_{step_id}_process_id')
625             parent_id = self._form_data.get_int_or_none(
626                     f'step_{step_id}_parent_id')
627             steps += [ProcessStep(step_id, process.id_, step_process_id,
628                                   parent_id)]
629         for step_id in self._form_data.get_all_int('steps'):
630             for step_process_id in self._form_data.get_all_int(
631                     f'new_step_to_{step_id}'):
632                 steps += [ProcessStep(None, process.id_, step_process_id,
633                                       step_id)]
634         new_step_title = None
635         for step_identifier in self._form_data.get_all_str('new_top_step'):
636             try:
637                 step_process_id = int(step_identifier)
638                 steps += [ProcessStep(None, process.id_, step_process_id,
639                                       None)]
640             except ValueError:
641                 new_step_title = step_identifier
642         process.set_steps(self.conn, steps)
643         process.set_step_suppressions(self.conn,
644                                       self._form_data.
645                                       get_all_int('suppresses'))
646         owners_to_set = []
647         new_owner_title = None
648         for owner_identifier in self._form_data.get_all_str('step_of'):
649             try:
650                 owners_to_set += [int(owner_identifier)]
651             except ValueError:
652                 new_owner_title = owner_identifier
653         process.set_owners(self.conn, owners_to_set)
654         params = f'id={process.id_}'
655         if new_step_title:
656             title_b64_encoded = b64encode(new_step_title.encode()).decode()
657             params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
658         elif new_owner_title:
659             title_b64_encoded = b64encode(new_owner_title.encode()).decode()
660             params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
661         process.save(self.conn)
662         return f'/process?{params}'
663
664     def do_POST_condition_descriptions(self) -> str:
665         """Update history timestamps for Condition.description."""
666         return self._change_versioned_timestamps(Condition, 'description')
667
668     def do_POST_condition_titles(self) -> str:
669         """Update history timestamps for Condition.title."""
670         return self._change_versioned_timestamps(Condition, 'title')
671
672     def do_POST_condition(self) -> str:
673         """Update/insert Condition of ?id= and fields defined in postvars."""
674         id_ = self._params.get_int_or_none('id')
675         for _ in self._form_data.get_all_str('delete'):
676             condition = Condition.by_id(self.conn, id_)
677             condition.remove(self.conn)
678             return '/conditions'
679         condition = Condition.by_id(self.conn, id_, create=True)
680         condition.is_active = self._form_data.get_str('is_active') == 'True'
681         condition.title.set(self._form_data.get_str('title'))
682         condition.description.set(self._form_data.get_str('description'))
683         condition.save(self.conn)
684         return f'/condition?id={condition.id_}'