home · contact · privacy
Refactor saving and caching tests, treatment of None IDs.
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import HandledException, BadFormatException, \
15         NotFoundException
16 from plomtask.db import DatabaseConnection, DatabaseFile
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo
20 from plomtask.db import BaseModel
21
22 TEMPLATES_DIR = 'templates'
23
24
25 class TaskServer(HTTPServer):
26     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
27
28     def __init__(self, db_file: DatabaseFile,
29                  *args: Any, **kwargs: Any) -> None:
30         super().__init__(*args, **kwargs)
31         self.db = db_file
32         self.headers: list[tuple[str, str]] = []
33         self._render_mode = 'html'
34         self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
35
36     def set_json_mode(self) -> None:
37         """Make server send JSON instead of HTML responses."""
38         self._render_mode = 'json'
39         self.headers += [('Content-Type', 'application/json')]
40
41     @staticmethod
42     def ctx_to_json(ctx: dict[str, object]) -> str:
43         """Render ctx into JSON string."""
44         def walk_ctx(node: object) -> Any:
45             if isinstance(node, BaseModel):
46                 return node.as_dict
47             if isinstance(node, (list, tuple)):
48                 return [walk_ctx(x) for x in node]
49             if isinstance(node, HandledException):
50                 return str(node)
51             return node
52         for k, v in ctx.items():
53             ctx[k] = walk_ctx(v)
54         return json_dumps(ctx)
55
56     def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
57         """Render ctx according to self._render_mode.."""
58         tmpl_name = f'{tmpl_name}.{self._render_mode}'
59         if 'html' == self._render_mode:
60             template = self._jinja.get_template(tmpl_name)
61             return template.render(ctx)
62         return self.__class__.ctx_to_json(ctx)
63
64
65 class InputsParser:
66     """Wrapper for validating and retrieving dict-like HTTP inputs."""
67
68     def __init__(self, dict_: dict[str, list[str]],
69                  strictness: bool = True) -> None:
70         self.inputs = dict_
71         self.strict = strictness
72
73     def get_str(self, key: str, default: str = '',
74                 ignore_strict: bool = False) -> str:
75         """Retrieve single/first string value of key, or default."""
76         if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
77             if self.strict and not ignore_strict:
78                 raise BadFormatException(f'no value found for key {key}')
79             return default
80         return self.inputs[key][0]
81
82     def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
83         """Retrieve dict of (first) strings at key starting with prefix."""
84         ret = {}
85         for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
86             ret[key] = self.inputs[key][0]
87         return ret
88
89     def get_int(self, key: str) -> int:
90         """Retrieve single/first value of key as int, error if empty."""
91         val = self.get_int_or_none(key)
92         if val is None:
93             raise BadFormatException(f'unexpected empty value for: {key}')
94         return val
95
96     def get_int_or_none(self, key: str) -> int | None:
97         """Retrieve single/first value of key as int, return None if empty."""
98         val = self.get_str(key, ignore_strict=True)
99         if val == '':
100             return None
101         try:
102             return int(val)
103         except ValueError as e:
104             msg = f'cannot int form field value for key {key}: {val}'
105             raise BadFormatException(msg) from e
106
107     def get_float(self, key: str) -> float:
108         """Retrieve float value of key from self.postvars."""
109         val = self.get_str(key)
110         try:
111             return float(val)
112         except ValueError as e:
113             msg = f'cannot float form field value for key {key}: {val}'
114             raise BadFormatException(msg) from e
115
116     def get_all_str(self, key: str) -> list[str]:
117         """Retrieve list of string values at key."""
118         if key not in self.inputs.keys():
119             return []
120         return self.inputs[key]
121
122     def get_all_int(self, key: str) -> list[int]:
123         """Retrieve list of int values at key."""
124         all_str = self.get_all_str(key)
125         try:
126             return [int(s) for s in all_str if len(s) > 0]
127         except ValueError as e:
128             msg = f'cannot int a form field value for key {key} in: {all_str}'
129             raise BadFormatException(msg) from e
130
131
132 class TaskHandler(BaseHTTPRequestHandler):
133     """Handles single HTTP request."""
134     # pylint: disable=too-many-public-methods
135     server: TaskServer
136     conn: DatabaseConnection
137     _site: str
138     _form_data: InputsParser
139     _params: InputsParser
140
141     def _send_page(self,
142                    ctx: dict[str, Any],
143                    tmpl_name: str,
144                    code: int = 200
145                    ) -> None:
146         """Send HTML as proper HTTP response."""
147         body = self.server.render(ctx, tmpl_name)
148         self.send_response(code)
149         for header_tuple in self.server.headers:
150             self.send_header(*header_tuple)
151         self.end_headers()
152         self.wfile.write(bytes(body, 'utf-8'))
153
154     @staticmethod
155     def _request_wrapper(http_method: str, not_found_msg: str
156                          ) -> Callable[..., Callable[[TaskHandler], None]]:
157         def decorator(f: Callable[..., str | None]
158                       ) -> Callable[[TaskHandler], None]:
159             def wrapper(self: TaskHandler) -> None:
160                 # pylint: disable=protected-access
161                 # (because pylint here fails to detect the use of wrapper as a
162                 # method to self with respective access privileges)
163                 try:
164                     self.conn = DatabaseConnection(self.server.db)
165                     parsed_url = urlparse(self.path)
166                     self._site = path_split(parsed_url.path)[1]
167                     params = parse_qs(parsed_url.query, strict_parsing=True)
168                     self._params = InputsParser(params, False)
169                     handler_name = f'do_{http_method}_{self._site}'
170                     if hasattr(self, handler_name):
171                         handler = getattr(self, handler_name)
172                         redir_target = f(self, handler)
173                         if redir_target:
174                             self.send_response(302)
175                             self.send_header('Location', redir_target)
176                             self.end_headers()
177                     else:
178                         msg = f'{not_found_msg}: {self._site}'
179                         raise NotFoundException(msg)
180                 except HandledException as error:
181                     for cls in (Day, Todo, Condition, Process, ProcessStep):
182                         assert hasattr(cls, 'empty_cache')
183                         cls.empty_cache()
184                     ctx = {'msg': error}
185                     self._send_page(ctx, 'msg', error.http_code)
186                 finally:
187                     self.conn.close()
188             return wrapper
189         return decorator
190
191     @_request_wrapper('GET', 'Unknown page')
192     def do_GET(self, handler: Callable[[], str | dict[str, object]]
193                ) -> str | None:
194         """Render page with result of handler, or redirect if result is str."""
195         tmpl_name = f'{self._site}'
196         ctx_or_redir_target = handler()
197         if isinstance(ctx_or_redir_target, str):
198             return ctx_or_redir_target
199         self._send_page(ctx_or_redir_target, tmpl_name)
200         return None
201
202     @_request_wrapper('POST', 'Unknown POST target')
203     def do_POST(self, handler: Callable[[], str]) -> str:
204         """Handle POST with handler, prepare redirection to result."""
205         length = int(self.headers['content-length'])
206         postvars = parse_qs(self.rfile.read(length).decode(),
207                             keep_blank_values=True, strict_parsing=True)
208         self._form_data = InputsParser(postvars)
209         redir_target = handler()
210         self.conn.commit()
211         return redir_target
212
213     # GET handlers
214
215     def do_GET_(self) -> str:
216         """Return redirect target on GET /."""
217         return '/day'
218
219     def _do_GET_calendar(self) -> dict[str, object]:
220         """Show Days from ?start= to ?end=.
221
222         Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
223         same, the only difference being the HTML template they are rendered to,
224         which .do_GET selects from their method name.
225         """
226         start = self._params.get_str('start')
227         end = self._params.get_str('end')
228         if not end:
229             end = date_in_n_days(366)
230         ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
231         days, start, end = ret
232         days = Day.with_filled_gaps(days, start, end)
233         today = date_in_n_days(0)
234         return {'start': start, 'end': end, 'days': days, 'today': today}
235
236     def do_GET_calendar(self) -> dict[str, object]:
237         """Show Days from ?start= to ?end= – normal view."""
238         return self._do_GET_calendar()
239
240     def do_GET_calendar_txt(self) -> dict[str, object]:
241         """Show Days from ?start= to ?end= – minimalist view."""
242         return self._do_GET_calendar()
243
244     def do_GET_day(self) -> dict[str, object]:
245         """Show single Day of ?date=."""
246         date = self._params.get_str('date', date_in_n_days(0))
247         day = Day.by_id_or_create(self.conn, date)
248         make_type = self._params.get_str('make_type')
249         conditions_present = []
250         enablers_for = {}
251         disablers_for = {}
252         for todo in day.todos:
253             for condition in todo.conditions + todo.blockers:
254                 if condition not in conditions_present:
255                     conditions_present += [condition]
256                     enablers_for[condition.id_] = [p for p in
257                                                    Process.all(self.conn)
258                                                    if condition in p.enables]
259                     disablers_for[condition.id_] = [p for p in
260                                                     Process.all(self.conn)
261                                                     if condition in p.disables]
262         seen_todos: set[int] = set()
263         top_nodes = [t.get_step_tree(seen_todos)
264                      for t in day.todos if not t.parents]
265         return {'day': day,
266                 'top_nodes': top_nodes,
267                 'make_type': make_type,
268                 'enablers_for': enablers_for,
269                 'disablers_for': disablers_for,
270                 'conditions_present': conditions_present,
271                 'processes': Process.all(self.conn)}
272
273     def do_GET_todo(self) -> dict[str, object]:
274         """Show single Todo of ?id=."""
275
276         @dataclass
277         class TodoStepsNode:
278             """Collect what's useful for Todo steps tree display."""
279             id_: int
280             todo: Todo | None
281             process: Process | None
282             children: list[TodoStepsNode]  # pylint: disable=undefined-variable
283             fillable: bool = False
284
285         def walk_process_steps(id_: int,
286                                process_step_nodes: list[ProcessStepsNode],
287                                steps_nodes: list[TodoStepsNode]) -> None:
288             for process_step_node in process_step_nodes:
289                 id_ += 1
290                 node = TodoStepsNode(id_, None, process_step_node.process, [])
291                 steps_nodes += [node]
292                 walk_process_steps(id_, list(process_step_node.steps.values()),
293                                    node.children)
294
295         def walk_todo_steps(id_: int, todos: list[Todo],
296                             steps_nodes: list[TodoStepsNode]) -> None:
297             for todo in todos:
298                 matched = False
299                 for match in [item for item in steps_nodes
300                               if item.process
301                               and item.process == todo.process]:
302                     match.todo = todo
303                     matched = True
304                     for child in match.children:
305                         child.fillable = True
306                     walk_todo_steps(id_, todo.children, match.children)
307                 if not matched:
308                     id_ += 1
309                     node = TodoStepsNode(id_, todo, None, [])
310                     steps_nodes += [node]
311                     walk_todo_steps(id_, todo.children, node.children)
312
313         def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
314                                     ) -> set[int]:
315             ids = set()
316             for node in steps_nodes:
317                 if not node.todo:
318                     assert isinstance(node.process, Process)
319                     assert isinstance(node.process.id_, int)
320                     ids.add(node.process.id_)
321                 ids = ids | collect_adoptables_keys(node.children)
322             return ids
323
324         id_ = self._params.get_int('id')
325         todo = Todo.by_id(self.conn, id_)
326         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
327         process_tree = todo.process.get_steps(self.conn, None)
328         steps_todo_to_process: list[TodoStepsNode] = []
329         walk_process_steps(0, list(process_tree.values()),
330                            steps_todo_to_process)
331         for steps_node in steps_todo_to_process:
332             steps_node.fillable = True
333         walk_todo_steps(len(steps_todo_to_process), todo_steps,
334                         steps_todo_to_process)
335         adoptables: dict[int, list[Todo]] = {}
336         any_adoptables = [Todo.by_id(self.conn, t.id_)
337                           for t in Todo.by_date(self.conn, todo.date)
338                           if t.id_ is not None
339                           and t != todo]
340         for id_ in collect_adoptables_keys(steps_todo_to_process):
341             adoptables[id_] = [t for t in any_adoptables
342                                if t.process.id_ == id_]
343         return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
344                 'adoption_candidates_for': adoptables,
345                 'process_candidates': Process.all(self.conn),
346                 'todo_candidates': any_adoptables,
347                 'condition_candidates': Condition.all(self.conn)}
348
349     def do_GET_todos(self) -> dict[str, object]:
350         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
351         sort_by = self._params.get_str('sort_by')
352         start = self._params.get_str('start')
353         end = self._params.get_str('end')
354         process_id = self._params.get_int_or_none('process_id')
355         comment_pattern = self._params.get_str('comment_pattern')
356         todos = []
357         ret = Todo.by_date_range_with_limits(self.conn, (start, end))
358         todos_by_date_range, start, end = ret
359         todos = [t for t in todos_by_date_range
360                  if comment_pattern in t.comment
361                  and ((not process_id) or t.process.id_ == process_id)]
362         if sort_by == 'doneness':
363             todos.sort(key=lambda t: t.is_done)
364         elif sort_by == '-doneness':
365             todos.sort(key=lambda t: t.is_done, reverse=True)
366         elif sort_by == 'title':
367             todos.sort(key=lambda t: t.title_then)
368         elif sort_by == '-title':
369             todos.sort(key=lambda t: t.title_then, reverse=True)
370         elif sort_by == 'comment':
371             todos.sort(key=lambda t: t.comment)
372         elif sort_by == '-comment':
373             todos.sort(key=lambda t: t.comment, reverse=True)
374         elif sort_by == '-date':
375             todos.sort(key=lambda t: t.date, reverse=True)
376         else:
377             todos.sort(key=lambda t: t.date)
378             sort_by = 'title'
379         return {'start': start, 'end': end, 'process_id': process_id,
380                 'comment_pattern': comment_pattern, 'todos': todos,
381                 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
382
383     def do_GET_conditions(self) -> dict[str, object]:
384         """Show all Conditions."""
385         pattern = self._params.get_str('pattern')
386         conditions = Condition.matching(self.conn, pattern)
387         sort_by = self._params.get_str('sort_by')
388         if sort_by == 'is_active':
389             conditions.sort(key=lambda c: c.is_active)
390         elif sort_by == '-is_active':
391             conditions.sort(key=lambda c: c.is_active, reverse=True)
392         elif sort_by == '-title':
393             conditions.sort(key=lambda c: c.title.newest, reverse=True)
394         else:
395             conditions.sort(key=lambda c: c.title.newest)
396             sort_by = 'title'
397         return {'conditions': conditions,
398                 'sort_by': sort_by,
399                 'pattern': pattern}
400
401     def do_GET_condition(self) -> dict[str, object]:
402         """Show Condition of ?id=."""
403         id_ = self._params.get_int_or_none('id')
404         c = Condition.by_id_or_create(self.conn, id_)
405         ps = Process.all(self.conn)
406         return {'condition': c, 'is_new': c.id_ is None,
407                 'enabled_processes': [p for p in ps if c in p.conditions],
408                 'disabled_processes': [p for p in ps if c in p.blockers],
409                 'enabling_processes': [p for p in ps if c in p.enables],
410                 'disabling_processes': [p for p in ps if c in p.disables]}
411
412     def do_GET_condition_titles(self) -> dict[str, object]:
413         """Show title history of Condition of ?id=."""
414         id_ = self._params.get_int('id')
415         condition = Condition.by_id(self.conn, id_)
416         return {'condition': condition}
417
418     def do_GET_condition_descriptions(self) -> dict[str, object]:
419         """Show description historys of Condition of ?id=."""
420         id_ = self._params.get_int('id')
421         condition = Condition.by_id(self.conn, id_)
422         return {'condition': condition}
423
424     def do_GET_process(self) -> dict[str, object]:
425         """Show Process of ?id=."""
426         id_ = self._params.get_int_or_none('id')
427         process = Process.by_id_or_create(self.conn, id_)
428         title_64 = self._params.get_str('title_b64')
429         if title_64:
430             title = b64decode(title_64.encode()).decode()
431             process.title.set(title)
432         owners = process.used_as_step_by(self.conn)
433         for step_id in self._params.get_all_int('step_to'):
434             owners += [Process.by_id(self.conn, step_id)]
435         preset_top_step = None
436         for process_id in self._params.get_all_int('has_step'):
437             preset_top_step = process_id
438         return {'process': process, 'is_new': process.id_ is None,
439                 'preset_top_step': preset_top_step,
440                 'steps': process.get_steps(self.conn), 'owners': owners,
441                 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
442                 'process_candidates': Process.all(self.conn),
443                 'condition_candidates': Condition.all(self.conn)}
444
445     def do_GET_process_titles(self) -> dict[str, object]:
446         """Show title history of Process of ?id=."""
447         id_ = self._params.get_int('id')
448         process = Process.by_id(self.conn, id_)
449         return {'process': process}
450
451     def do_GET_process_descriptions(self) -> dict[str, object]:
452         """Show description historys of Process of ?id=."""
453         id_ = self._params.get_int('id')
454         process = Process.by_id(self.conn, id_)
455         return {'process': process}
456
457     def do_GET_process_efforts(self) -> dict[str, object]:
458         """Show default effort history of Process of ?id=."""
459         id_ = self._params.get_int('id')
460         process = Process.by_id(self.conn, id_)
461         return {'process': process}
462
463     def do_GET_processes(self) -> dict[str, object]:
464         """Show all Processes."""
465         pattern = self._params.get_str('pattern')
466         processes = Process.matching(self.conn, pattern)
467         sort_by = self._params.get_str('sort_by')
468         if sort_by == 'steps':
469             processes.sort(key=lambda p: len(p.explicit_steps))
470         elif sort_by == '-steps':
471             processes.sort(key=lambda p: len(p.explicit_steps), reverse=True)
472         elif sort_by == 'owners':
473             processes.sort(key=lambda p: p.n_owners or 0)
474         elif sort_by == '-owners':
475             processes.sort(key=lambda p: p.n_owners or 0, reverse=True)
476         elif sort_by == 'effort':
477             processes.sort(key=lambda p: p.effort.newest)
478         elif sort_by == '-effort':
479             processes.sort(key=lambda p: p.effort.newest, reverse=True)
480         elif sort_by == '-title':
481             processes.sort(key=lambda p: p.title.newest, reverse=True)
482         else:
483             processes.sort(key=lambda p: p.title.newest)
484             sort_by = 'title'
485         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
486
487     # POST handlers
488
489     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
490         """Update history timestamps for VersionedAttribute."""
491         id_ = self._params.get_int_or_none('id')
492         item = cls.by_id(self.conn, id_)
493         attr = getattr(item, attr_name)
494         for k, v in self._form_data.get_first_strings_starting('at:').items():
495             old = k[3:]
496             if old[19:] != v:
497                 attr.reset_timestamp(old, f'{v}.0')
498         attr.save(self.conn)
499         cls_name = cls.__name__.lower()
500         return f'/{cls_name}_{attr_name}s?id={item.id_}'
501
502     def do_POST_day(self) -> str:
503         """Update or insert Day of date and Todos mapped to it."""
504         date = self._params.get_str('date')
505         day = Day.by_id_or_create(self.conn, date)
506         day.comment = self._form_data.get_str('day_comment')
507         day.save(self.conn)
508         make_type = self._form_data.get_str('make_type')
509         for process_id in sorted(self._form_data.get_all_int('new_todo')):
510             if 'empty' == make_type:
511                 process = Process.by_id(self.conn, process_id)
512                 todo = Todo(None, process, False, date)
513                 todo.save(self.conn)
514             else:
515                 Todo.create_with_children(self.conn, process_id, date)
516         done_ids = self._form_data.get_all_int('done')
517         comments = self._form_data.get_all_str('comment')
518         efforts = self._form_data.get_all_str('effort')
519         for i, todo_id in enumerate(self._form_data.get_all_int('todo_id')):
520             todo = Todo.by_id(self.conn, todo_id)
521             todo.is_done = todo_id in done_ids
522             if len(comments) > 0:
523                 todo.comment = comments[i]
524             if len(efforts) > 0:
525                 todo.effort = float(efforts[i]) if efforts[i] else None
526             todo.save(self.conn)
527         return f'/day?date={date}&make_type={make_type}'
528
529     def do_POST_todo(self) -> str:
530         """Update Todo and its children."""
531         # pylint: disable=too-many-locals
532         # pylint: disable=too-many-branches
533         id_ = self._params.get_int('id')
534         for _ in self._form_data.get_all_str('delete'):
535             todo = Todo .by_id(self.conn, id_)
536             todo.remove(self.conn)
537             return '/'
538         todo = Todo.by_id(self.conn, id_)
539         adopted_child_ids = self._form_data.get_all_int('adopt')
540         processes_to_make_full = self._form_data.get_all_int('make_full')
541         processes_to_make_empty = self._form_data.get_all_int('make_empty')
542         fill_fors = self._form_data.get_first_strings_starting('fill_for_')
543         for v in fill_fors.values():
544             if v.startswith('make_empty_'):
545                 processes_to_make_empty += [int(v[11:])]
546             elif v.startswith('make_full_'):
547                 processes_to_make_full += [int(v[10:])]
548             elif v != 'ignore':
549                 adopted_child_ids += [int(v)]
550         to_remove = []
551         for child in todo.children:
552             assert isinstance(child.id_, int)
553             if child.id_ not in adopted_child_ids:
554                 to_remove += [child.id_]
555         for id_ in to_remove:
556             child = Todo.by_id(self.conn, id_)
557             todo.remove_child(child)
558         for child_id in adopted_child_ids:
559             if child_id in [c.id_ for c in todo.children]:
560                 continue
561             child = Todo.by_id(self.conn, child_id)
562             todo.add_child(child)
563         for process_id in processes_to_make_empty:
564             process = Process.by_id(self.conn, process_id)
565             made = Todo(None, process, False, todo.date)
566             made.save(self.conn)
567             todo.add_child(made)
568         for process_id in processes_to_make_full:
569             made = Todo.create_with_children(self.conn, process_id, todo.date)
570             todo.add_child(made)
571         effort = self._form_data.get_str('effort', ignore_strict=True)
572         todo.effort = float(effort) if effort else None
573         todo.set_conditions(self.conn,
574                             self._form_data.get_all_int('condition'))
575         todo.set_blockers(self.conn, self._form_data.get_all_int('blocker'))
576         todo.set_enables(self.conn, self._form_data.get_all_int('enables'))
577         todo.set_disables(self.conn, self._form_data.get_all_int('disables'))
578         todo.is_done = len(self._form_data.get_all_str('done')) > 0
579         todo.calendarize = len(self._form_data.get_all_str('calendarize')) > 0
580         todo.comment = self._form_data.get_str('comment', ignore_strict=True)
581         todo.save(self.conn)
582         return f'/todo?id={todo.id_}'
583
584     def do_POST_process_descriptions(self) -> str:
585         """Update history timestamps for Process.description."""
586         return self._change_versioned_timestamps(Process, 'description')
587
588     def do_POST_process_efforts(self) -> str:
589         """Update history timestamps for Process.effort."""
590         return self._change_versioned_timestamps(Process, 'effort')
591
592     def do_POST_process_titles(self) -> str:
593         """Update history timestamps for Process.title."""
594         return self._change_versioned_timestamps(Process, 'title')
595
596     def do_POST_process(self) -> str:
597         """Update or insert Process of ?id= and fields defined in postvars."""
598         # pylint: disable=too-many-branches
599         id_ = self._params.get_int_or_none('id')
600         for _ in self._form_data.get_all_str('delete'):
601             if id_ is None:
602                 raise NotFoundException('trying to delete non-saved Process')
603             process = Process.by_id(self.conn, id_)
604             process.remove(self.conn)
605             return '/processes'
606         process = Process.by_id_or_create(self.conn, id_)
607         process.title.set(self._form_data.get_str('title'))
608         process.description.set(self._form_data.get_str('description'))
609         process.effort.set(self._form_data.get_float('effort'))
610         process.set_conditions(self.conn,
611                                self._form_data.get_all_int('condition'))
612         process.set_blockers(self.conn, self._form_data.get_all_int('blocker'))
613         process.set_enables(self.conn, self._form_data.get_all_int('enables'))
614         process.set_disables(self.conn,
615                              self._form_data.get_all_int('disables'))
616         process.calendarize = self._form_data.get_all_str('calendarize') != []
617         process.save(self.conn)
618         assert isinstance(process.id_, int)
619         steps: list[ProcessStep] = []
620         for step_id in self._form_data.get_all_int('keep_step'):
621             if step_id not in self._form_data.get_all_int('steps'):
622                 raise BadFormatException('trying to keep unknown step')
623         for step_id in self._form_data.get_all_int('steps'):
624             if step_id not in self._form_data.get_all_int('keep_step'):
625                 continue
626             step_process_id = self._form_data.get_int(
627                     f'step_{step_id}_process_id')
628             parent_id = self._form_data.get_int_or_none(
629                     f'step_{step_id}_parent_id')
630             steps += [ProcessStep(step_id, process.id_, step_process_id,
631                                   parent_id)]
632         for step_id in self._form_data.get_all_int('steps'):
633             for step_process_id in self._form_data.get_all_int(
634                     f'new_step_to_{step_id}'):
635                 steps += [ProcessStep(None, process.id_, step_process_id,
636                                       step_id)]
637         new_step_title = None
638         for step_identifier in self._form_data.get_all_str('new_top_step'):
639             try:
640                 step_process_id = int(step_identifier)
641                 steps += [ProcessStep(None, process.id_, step_process_id,
642                                       None)]
643             except ValueError:
644                 new_step_title = step_identifier
645         process.set_steps(self.conn, steps)
646         process.set_step_suppressions(self.conn,
647                                       self._form_data.
648                                       get_all_int('suppresses'))
649         owners_to_set = []
650         new_owner_title = None
651         for owner_identifier in self._form_data.get_all_str('step_of'):
652             try:
653                 owners_to_set += [int(owner_identifier)]
654             except ValueError:
655                 new_owner_title = owner_identifier
656         process.set_owners(self.conn, owners_to_set)
657         params = f'id={process.id_}'
658         if new_step_title:
659             title_b64_encoded = b64encode(new_step_title.encode()).decode()
660             params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
661         elif new_owner_title:
662             title_b64_encoded = b64encode(new_owner_title.encode()).decode()
663             params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
664         process.save(self.conn)
665         return f'/process?{params}'
666
667     def do_POST_condition_descriptions(self) -> str:
668         """Update history timestamps for Condition.description."""
669         return self._change_versioned_timestamps(Condition, 'description')
670
671     def do_POST_condition_titles(self) -> str:
672         """Update history timestamps for Condition.title."""
673         return self._change_versioned_timestamps(Condition, 'title')
674
675     def do_POST_condition(self) -> str:
676         """Update/insert Condition of ?id= and fields defined in postvars."""
677         id_ = self._params.get_int_or_none('id')
678         for _ in self._form_data.get_all_str('delete'):
679             if id_ is None:
680                 raise NotFoundException('trying to delete non-saved Condition')
681             condition = Condition.by_id_or_create(self.conn, id_)
682             condition.remove(self.conn)
683             return '/conditions'
684         condition = Condition.by_id_or_create(self.conn, id_)
685         condition.is_active = self._form_data.get_str('is_active') == 'True'
686         condition.title.set(self._form_data.get_str('title'))
687         condition.description.set(self._form_data.get_str('description'))
688         condition.save(self.conn)
689         return f'/condition?id={condition.id_}'