home · contact · privacy
Rename "condition"/"blocker" input names to plurals, like they are everywhere else.
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import HandledException, BadFormatException, \
15         NotFoundException
16 from plomtask.db import DatabaseConnection, DatabaseFile
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo
20
21 TEMPLATES_DIR = 'templates'
22
23
24 class TaskServer(HTTPServer):
25     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
26
27     def __init__(self, db_file: DatabaseFile,
28                  *args: Any, **kwargs: Any) -> None:
29         super().__init__(*args, **kwargs)
30         self.db = db_file
31         self.headers: list[tuple[str, str]] = []
32         self._render_mode = 'html'
33         self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
34
35     def set_json_mode(self) -> None:
36         """Make server send JSON instead of HTML responses."""
37         self._render_mode = 'json'
38         self.headers += [('Content-Type', 'application/json')]
39
40     @staticmethod
41     def ctx_to_json(ctx: dict[str, object]) -> str:
42         """Render ctx into JSON string."""
43         def walk_ctx(node: object) -> Any:
44             if hasattr(node, 'as_dict_into_reference'):
45                 if hasattr(node, 'id_') and node.id_ is not None:
46                     return node.as_dict_into_reference(library)
47             if hasattr(node, 'as_dict'):
48                 return node.as_dict
49             if isinstance(node, (list, tuple)):
50                 return [walk_ctx(x) for x in node]
51             if isinstance(node, dict):
52                 d = {}
53                 for k, v in node.items():
54                     d[k] = walk_ctx(v)
55                 return d
56             if isinstance(node, HandledException):
57                 return str(node)
58             return node
59         library: dict[str, dict[str | int, object]] = {}
60         for k, v in ctx.items():
61             ctx[k] = walk_ctx(v)
62         ctx['_library'] = library
63         return json_dumps(ctx)
64
65     def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
66         """Render ctx according to self._render_mode.."""
67         tmpl_name = f'{tmpl_name}.{self._render_mode}'
68         if 'html' == self._render_mode:
69             template = self._jinja.get_template(tmpl_name)
70             return template.render(ctx)
71         return self.__class__.ctx_to_json(ctx)
72
73
74 class InputsParser:
75     """Wrapper for validating and retrieving dict-like HTTP inputs."""
76
77     def __init__(self, dict_: dict[str, list[str]],
78                  strictness: bool = True) -> None:
79         self.inputs = dict_
80         self.strict = strictness
81
82     def get_str(self, key: str, default: str = '',
83                 ignore_strict: bool = False) -> str:
84         """Retrieve single/first string value of key, or default."""
85         if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
86             if self.strict and not ignore_strict:
87                 raise BadFormatException(f'no value found for key {key}')
88             return default
89         return self.inputs[key][0]
90
91     def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
92         """Retrieve dict of (first) strings at key starting with prefix."""
93         ret = {}
94         for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
95             ret[key] = self.inputs[key][0]
96         return ret
97
98     def get_int(self, key: str) -> int:
99         """Retrieve single/first value of key as int, error if empty."""
100         val = self.get_int_or_none(key)
101         if val is None:
102             raise BadFormatException(f'unexpected empty value for: {key}')
103         return val
104
105     def get_int_or_none(self, key: str) -> int | None:
106         """Retrieve single/first value of key as int, return None if empty."""
107         val = self.get_str(key, ignore_strict=True)
108         if val == '':
109             return None
110         try:
111             return int(val)
112         except ValueError as e:
113             msg = f'cannot int form field value for key {key}: {val}'
114             raise BadFormatException(msg) from e
115
116     def get_float(self, key: str) -> float:
117         """Retrieve float value of key from self.postvars."""
118         val = self.get_str(key)
119         try:
120             return float(val)
121         except ValueError as e:
122             msg = f'cannot float form field value for key {key}: {val}'
123             raise BadFormatException(msg) from e
124
125     def get_all_str(self, key: str) -> list[str]:
126         """Retrieve list of string values at key."""
127         if key not in self.inputs.keys():
128             return []
129         return self.inputs[key]
130
131     def get_all_int(self, key: str) -> list[int]:
132         """Retrieve list of int values at key."""
133         all_str = self.get_all_str(key)
134         try:
135             return [int(s) for s in all_str if len(s) > 0]
136         except ValueError as e:
137             msg = f'cannot int a form field value for key {key} in: {all_str}'
138             raise BadFormatException(msg) from e
139
140
141 class TaskHandler(BaseHTTPRequestHandler):
142     """Handles single HTTP request."""
143     # pylint: disable=too-many-public-methods
144     server: TaskServer
145     conn: DatabaseConnection
146     _site: str
147     _form_data: InputsParser
148     _params: InputsParser
149
150     def _send_page(self,
151                    ctx: dict[str, Any],
152                    tmpl_name: str,
153                    code: int = 200
154                    ) -> None:
155         """Send ctx as proper HTTP response."""
156         body = self.server.render(ctx, tmpl_name)
157         self.send_response(code)
158         for header_tuple in self.server.headers:
159             self.send_header(*header_tuple)
160         self.end_headers()
161         self.wfile.write(bytes(body, 'utf-8'))
162
163     @staticmethod
164     def _request_wrapper(http_method: str, not_found_msg: str
165                          ) -> Callable[..., Callable[[TaskHandler], None]]:
166         def decorator(f: Callable[..., str | None]
167                       ) -> Callable[[TaskHandler], None]:
168             def wrapper(self: TaskHandler) -> None:
169                 # pylint: disable=protected-access
170                 # (because pylint here fails to detect the use of wrapper as a
171                 # method to self with respective access privileges)
172                 try:
173                     self.conn = DatabaseConnection(self.server.db)
174                     parsed_url = urlparse(self.path)
175                     self._site = path_split(parsed_url.path)[1]
176                     params = parse_qs(parsed_url.query, strict_parsing=True)
177                     self._params = InputsParser(params, False)
178                     handler_name = f'do_{http_method}_{self._site}'
179                     if hasattr(self, handler_name):
180                         handler = getattr(self, handler_name)
181                         redir_target = f(self, handler)
182                         if redir_target:
183                             self.send_response(302)
184                             self.send_header('Location', redir_target)
185                             self.end_headers()
186                     else:
187                         msg = f'{not_found_msg}: {self._site}'
188                         raise NotFoundException(msg)
189                 except HandledException as error:
190                     for cls in (Day, Todo, Condition, Process, ProcessStep):
191                         assert hasattr(cls, 'empty_cache')
192                         cls.empty_cache()
193                     ctx = {'msg': error}
194                     self._send_page(ctx, 'msg', error.http_code)
195                 finally:
196                     self.conn.close()
197             return wrapper
198         return decorator
199
200     @_request_wrapper('GET', 'Unknown page')
201     def do_GET(self, handler: Callable[[], str | dict[str, object]]
202                ) -> str | None:
203         """Render page with result of handler, or redirect if result is str."""
204         tmpl_name = f'{self._site}'
205         ctx_or_redir_target = handler()
206         if isinstance(ctx_or_redir_target, str):
207             return ctx_or_redir_target
208         self._send_page(ctx_or_redir_target, tmpl_name)
209         return None
210
211     @_request_wrapper('POST', 'Unknown POST target')
212     def do_POST(self, handler: Callable[[], str]) -> str:
213         """Handle POST with handler, prepare redirection to result."""
214         length = int(self.headers['content-length'])
215         postvars = parse_qs(self.rfile.read(length).decode(),
216                             keep_blank_values=True, strict_parsing=True)
217         self._form_data = InputsParser(postvars)
218         redir_target = handler()
219         self.conn.commit()
220         return redir_target
221
222     # GET handlers
223
224     def do_GET_(self) -> str:
225         """Return redirect target on GET /."""
226         return '/day'
227
228     def _do_GET_calendar(self) -> dict[str, object]:
229         """Show Days from ?start= to ?end=.
230
231         Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
232         same, the only difference being the HTML template they are rendered to,
233         which .do_GET selects from their method name.
234         """
235         start = self._params.get_str('start')
236         end = self._params.get_str('end')
237         if not end:
238             end = date_in_n_days(366)
239         ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
240         days, start, end = ret
241         days = Day.with_filled_gaps(days, start, end)
242         today = date_in_n_days(0)
243         return {'start': start, 'end': end, 'days': days, 'today': today}
244
245     def do_GET_calendar(self) -> dict[str, object]:
246         """Show Days from ?start= to ?end= – normal view."""
247         return self._do_GET_calendar()
248
249     def do_GET_calendar_txt(self) -> dict[str, object]:
250         """Show Days from ?start= to ?end= – minimalist view."""
251         return self._do_GET_calendar()
252
253     def do_GET_day(self) -> dict[str, object]:
254         """Show single Day of ?date=."""
255         date = self._params.get_str('date', date_in_n_days(0))
256         day = Day.by_id_or_create(self.conn, date)
257         make_type = self._params.get_str('make_type')
258         conditions_present = []
259         enablers_for = {}
260         disablers_for = {}
261         for todo in day.todos:
262             for condition in todo.conditions + todo.blockers:
263                 if condition not in conditions_present:
264                     conditions_present += [condition]
265                     enablers_for[condition.id_] = [p for p in
266                                                    Process.all(self.conn)
267                                                    if condition in p.enables]
268                     disablers_for[condition.id_] = [p for p in
269                                                     Process.all(self.conn)
270                                                     if condition in p.disables]
271         seen_todos: set[int] = set()
272         top_nodes = [t.get_step_tree(seen_todos)
273                      for t in day.todos if not t.parents]
274         return {'day': day,
275                 'top_nodes': top_nodes,
276                 'make_type': make_type,
277                 'enablers_for': enablers_for,
278                 'disablers_for': disablers_for,
279                 'conditions_present': conditions_present,
280                 'processes': Process.all(self.conn)}
281
282     def do_GET_todo(self) -> dict[str, object]:
283         """Show single Todo of ?id=."""
284
285         @dataclass
286         class TodoStepsNode:
287             """Collect what's useful for Todo steps tree display."""
288             id_: int
289             todo: Todo | None
290             process: Process | None
291             children: list[TodoStepsNode]  # pylint: disable=undefined-variable
292             fillable: bool = False
293
294         def walk_process_steps(id_: int,
295                                process_step_nodes: list[ProcessStepsNode],
296                                steps_nodes: list[TodoStepsNode]) -> None:
297             for process_step_node in process_step_nodes:
298                 id_ += 1
299                 node = TodoStepsNode(id_, None, process_step_node.process, [])
300                 steps_nodes += [node]
301                 walk_process_steps(id_, list(process_step_node.steps.values()),
302                                    node.children)
303
304         def walk_todo_steps(id_: int, todos: list[Todo],
305                             steps_nodes: list[TodoStepsNode]) -> None:
306             for todo in todos:
307                 matched = False
308                 for match in [item for item in steps_nodes
309                               if item.process
310                               and item.process == todo.process]:
311                     match.todo = todo
312                     matched = True
313                     for child in match.children:
314                         child.fillable = True
315                     walk_todo_steps(id_, todo.children, match.children)
316                 if not matched:
317                     id_ += 1
318                     node = TodoStepsNode(id_, todo, None, [])
319                     steps_nodes += [node]
320                     walk_todo_steps(id_, todo.children, node.children)
321
322         def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
323                                     ) -> set[int]:
324             ids = set()
325             for node in steps_nodes:
326                 if not node.todo:
327                     assert isinstance(node.process, Process)
328                     assert isinstance(node.process.id_, int)
329                     ids.add(node.process.id_)
330                 ids = ids | collect_adoptables_keys(node.children)
331             return ids
332
333         id_ = self._params.get_int('id')
334         todo = Todo.by_id(self.conn, id_)
335         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
336         process_tree = todo.process.get_steps(self.conn, None)
337         steps_todo_to_process: list[TodoStepsNode] = []
338         walk_process_steps(0, list(process_tree.values()),
339                            steps_todo_to_process)
340         for steps_node in steps_todo_to_process:
341             steps_node.fillable = True
342         walk_todo_steps(len(steps_todo_to_process), todo_steps,
343                         steps_todo_to_process)
344         adoptables: dict[int, list[Todo]] = {}
345         any_adoptables = [Todo.by_id(self.conn, t.id_)
346                           for t in Todo.by_date(self.conn, todo.date)
347                           if t.id_ is not None
348                           and t != todo]
349         for id_ in collect_adoptables_keys(steps_todo_to_process):
350             adoptables[id_] = [t for t in any_adoptables
351                                if t.process.id_ == id_]
352         return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
353                 'adoption_candidates_for': adoptables,
354                 'process_candidates': Process.all(self.conn),
355                 'todo_candidates': any_adoptables,
356                 'condition_candidates': Condition.all(self.conn)}
357
358     def do_GET_todos(self) -> dict[str, object]:
359         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
360         sort_by = self._params.get_str('sort_by')
361         start = self._params.get_str('start')
362         end = self._params.get_str('end')
363         process_id = self._params.get_int_or_none('process_id')
364         comment_pattern = self._params.get_str('comment_pattern')
365         todos = []
366         ret = Todo.by_date_range_with_limits(self.conn, (start, end))
367         todos_by_date_range, start, end = ret
368         todos = [t for t in todos_by_date_range
369                  if comment_pattern in t.comment
370                  and ((not process_id) or t.process.id_ == process_id)]
371         if sort_by == 'doneness':
372             todos.sort(key=lambda t: t.is_done)
373         elif sort_by == '-doneness':
374             todos.sort(key=lambda t: t.is_done, reverse=True)
375         elif sort_by == 'title':
376             todos.sort(key=lambda t: t.title_then)
377         elif sort_by == '-title':
378             todos.sort(key=lambda t: t.title_then, reverse=True)
379         elif sort_by == 'comment':
380             todos.sort(key=lambda t: t.comment)
381         elif sort_by == '-comment':
382             todos.sort(key=lambda t: t.comment, reverse=True)
383         elif sort_by == '-date':
384             todos.sort(key=lambda t: t.date, reverse=True)
385         else:
386             todos.sort(key=lambda t: t.date)
387             sort_by = 'title'
388         return {'start': start, 'end': end, 'process_id': process_id,
389                 'comment_pattern': comment_pattern, 'todos': todos,
390                 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
391
392     def do_GET_conditions(self) -> dict[str, object]:
393         """Show all Conditions."""
394         pattern = self._params.get_str('pattern')
395         conditions = Condition.matching(self.conn, pattern)
396         sort_by = self._params.get_str('sort_by')
397         if sort_by == 'is_active':
398             conditions.sort(key=lambda c: c.is_active)
399         elif sort_by == '-is_active':
400             conditions.sort(key=lambda c: c.is_active, reverse=True)
401         elif sort_by == '-title':
402             conditions.sort(key=lambda c: c.title.newest, reverse=True)
403         else:
404             conditions.sort(key=lambda c: c.title.newest)
405             sort_by = 'title'
406         return {'conditions': conditions,
407                 'sort_by': sort_by,
408                 'pattern': pattern}
409
410     def do_GET_condition(self) -> dict[str, object]:
411         """Show Condition of ?id=."""
412         id_ = self._params.get_int_or_none('id')
413         c = Condition.by_id_or_create(self.conn, id_)
414         ps = Process.all(self.conn)
415         return {'condition': c, 'is_new': c.id_ is None,
416                 'enabled_processes': [p for p in ps if c in p.conditions],
417                 'disabled_processes': [p for p in ps if c in p.blockers],
418                 'enabling_processes': [p for p in ps if c in p.enables],
419                 'disabling_processes': [p for p in ps if c in p.disables]}
420
421     def do_GET_condition_titles(self) -> dict[str, object]:
422         """Show title history of Condition of ?id=."""
423         id_ = self._params.get_int('id')
424         condition = Condition.by_id(self.conn, id_)
425         return {'condition': condition}
426
427     def do_GET_condition_descriptions(self) -> dict[str, object]:
428         """Show description historys of Condition of ?id=."""
429         id_ = self._params.get_int('id')
430         condition = Condition.by_id(self.conn, id_)
431         return {'condition': condition}
432
433     def do_GET_process(self) -> dict[str, object]:
434         """Show Process of ?id=."""
435         id_ = self._params.get_int_or_none('id')
436         process = Process.by_id_or_create(self.conn, id_)
437         title_64 = self._params.get_str('title_b64')
438         if title_64:
439             title = b64decode(title_64.encode()).decode()
440             process.title.set(title)
441         owners = process.used_as_step_by(self.conn)
442         for step_id in self._params.get_all_int('step_to'):
443             owners += [Process.by_id(self.conn, step_id)]
444         preset_top_step = None
445         for process_id in self._params.get_all_int('has_step'):
446             preset_top_step = process_id
447         return {'process': process, 'is_new': process.id_ is None,
448                 'preset_top_step': preset_top_step,
449                 'steps': process.get_steps(self.conn), 'owners': owners,
450                 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
451                 'process_candidates': Process.all(self.conn),
452                 'condition_candidates': Condition.all(self.conn)}
453
454     def do_GET_process_titles(self) -> dict[str, object]:
455         """Show title history of Process of ?id=."""
456         id_ = self._params.get_int('id')
457         process = Process.by_id(self.conn, id_)
458         return {'process': process}
459
460     def do_GET_process_descriptions(self) -> dict[str, object]:
461         """Show description historys of Process of ?id=."""
462         id_ = self._params.get_int('id')
463         process = Process.by_id(self.conn, id_)
464         return {'process': process}
465
466     def do_GET_process_efforts(self) -> dict[str, object]:
467         """Show default effort history of Process of ?id=."""
468         id_ = self._params.get_int('id')
469         process = Process.by_id(self.conn, id_)
470         return {'process': process}
471
472     def do_GET_processes(self) -> dict[str, object]:
473         """Show all Processes."""
474         pattern = self._params.get_str('pattern')
475         processes = Process.matching(self.conn, pattern)
476         sort_by = self._params.get_str('sort_by')
477         if sort_by == 'steps':
478             processes.sort(key=lambda p: len(p.explicit_steps))
479         elif sort_by == '-steps':
480             processes.sort(key=lambda p: len(p.explicit_steps), reverse=True)
481         elif sort_by == 'owners':
482             processes.sort(key=lambda p: p.n_owners or 0)
483         elif sort_by == '-owners':
484             processes.sort(key=lambda p: p.n_owners or 0, reverse=True)
485         elif sort_by == 'effort':
486             processes.sort(key=lambda p: p.effort.newest)
487         elif sort_by == '-effort':
488             processes.sort(key=lambda p: p.effort.newest, reverse=True)
489         elif sort_by == '-title':
490             processes.sort(key=lambda p: p.title.newest, reverse=True)
491         else:
492             processes.sort(key=lambda p: p.title.newest)
493             sort_by = 'title'
494         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
495
496     # POST handlers
497
498     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
499         """Update history timestamps for VersionedAttribute."""
500         id_ = self._params.get_int_or_none('id')
501         item = cls.by_id(self.conn, id_)
502         attr = getattr(item, attr_name)
503         for k, v in self._form_data.get_first_strings_starting('at:').items():
504             old = k[3:]
505             if old[19:] != v:
506                 attr.reset_timestamp(old, f'{v}.0')
507         attr.save(self.conn)
508         cls_name = cls.__name__.lower()
509         return f'/{cls_name}_{attr_name}s?id={item.id_}'
510
511     def do_POST_day(self) -> str:
512         """Update or insert Day of date and Todos mapped to it."""
513         date = self._params.get_str('date')
514         day = Day.by_id_or_create(self.conn, date)
515         day.comment = self._form_data.get_str('day_comment')
516         day.save(self.conn)
517         make_type = self._form_data.get_str('make_type')
518         for process_id in sorted(self._form_data.get_all_int('new_todo')):
519             if 'empty' == make_type:
520                 process = Process.by_id(self.conn, process_id)
521                 todo = Todo(None, process, False, date)
522                 todo.save(self.conn)
523             else:
524                 Todo.create_with_children(self.conn, process_id, date)
525         done_ids = self._form_data.get_all_int('done')
526         comments = self._form_data.get_all_str('comment')
527         efforts = self._form_data.get_all_str('effort')
528         for i, todo_id in enumerate(self._form_data.get_all_int('todo_id')):
529             todo = Todo.by_id(self.conn, todo_id)
530             todo.is_done = todo_id in done_ids
531             if len(comments) > 0:
532                 todo.comment = comments[i]
533             if len(efforts) > 0:
534                 todo.effort = float(efforts[i]) if efforts[i] else None
535             todo.save(self.conn)
536         return f'/day?date={date}&make_type={make_type}'
537
538     def do_POST_todo(self) -> str:
539         """Update Todo and its children."""
540         # pylint: disable=too-many-locals
541         # pylint: disable=too-many-branches
542         id_ = self._params.get_int('id')
543         for _ in self._form_data.get_all_str('delete'):
544             todo = Todo .by_id(self.conn, id_)
545             todo.remove(self.conn)
546             return '/'
547         todo = Todo.by_id(self.conn, id_)
548         adopted_child_ids = self._form_data.get_all_int('adopt')
549         processes_to_make_full = self._form_data.get_all_int('make_full')
550         processes_to_make_empty = self._form_data.get_all_int('make_empty')
551         fill_fors = self._form_data.get_first_strings_starting('fill_for_')
552         for v in fill_fors.values():
553             if v.startswith('make_empty_'):
554                 processes_to_make_empty += [int(v[11:])]
555             elif v.startswith('make_full_'):
556                 processes_to_make_full += [int(v[10:])]
557             elif v != 'ignore':
558                 adopted_child_ids += [int(v)]
559         to_remove = []
560         for child in todo.children:
561             assert isinstance(child.id_, int)
562             if child.id_ not in adopted_child_ids:
563                 to_remove += [child.id_]
564         for id_ in to_remove:
565             child = Todo.by_id(self.conn, id_)
566             todo.remove_child(child)
567         for child_id in adopted_child_ids:
568             if child_id in [c.id_ for c in todo.children]:
569                 continue
570             child = Todo.by_id(self.conn, child_id)
571             todo.add_child(child)
572         for process_id in processes_to_make_empty:
573             process = Process.by_id(self.conn, process_id)
574             made = Todo(None, process, False, todo.date)
575             made.save(self.conn)
576             todo.add_child(made)
577         for process_id in processes_to_make_full:
578             made = Todo.create_with_children(self.conn, process_id, todo.date)
579             todo.add_child(made)
580         effort = self._form_data.get_str('effort', ignore_strict=True)
581         todo.effort = float(effort) if effort else None
582         todo.set_conditions(self.conn,
583                             self._form_data.get_all_int('conditions'))
584         todo.set_blockers(self.conn, self._form_data.get_all_int('blockers'))
585         todo.set_enables(self.conn, self._form_data.get_all_int('enables'))
586         todo.set_disables(self.conn, self._form_data.get_all_int('disables'))
587         todo.is_done = len(self._form_data.get_all_str('done')) > 0
588         todo.calendarize = len(self._form_data.get_all_str('calendarize')) > 0
589         todo.comment = self._form_data.get_str('comment', ignore_strict=True)
590         todo.save(self.conn)
591         return f'/todo?id={todo.id_}'
592
593     def do_POST_process_descriptions(self) -> str:
594         """Update history timestamps for Process.description."""
595         return self._change_versioned_timestamps(Process, 'description')
596
597     def do_POST_process_efforts(self) -> str:
598         """Update history timestamps for Process.effort."""
599         return self._change_versioned_timestamps(Process, 'effort')
600
601     def do_POST_process_titles(self) -> str:
602         """Update history timestamps for Process.title."""
603         return self._change_versioned_timestamps(Process, 'title')
604
605     def do_POST_process(self) -> str:
606         """Update or insert Process of ?id= and fields defined in postvars."""
607         # pylint: disable=too-many-branches
608         id_ = self._params.get_int_or_none('id')
609         for _ in self._form_data.get_all_str('delete'):
610             if id_ is None:
611                 raise NotFoundException('trying to delete non-saved Process')
612             process = Process.by_id(self.conn, id_)
613             process.remove(self.conn)
614             return '/processes'
615         process = Process.by_id_or_create(self.conn, id_)
616         process.title.set(self._form_data.get_str('title'))
617         process.description.set(self._form_data.get_str('description'))
618         process.effort.set(self._form_data.get_float('effort'))
619         process.set_conditions(self.conn,
620                                self._form_data.get_all_int('conditions'))
621         process.set_blockers(self.conn,
622                              self._form_data.get_all_int('blockers'))
623         process.set_enables(self.conn, self._form_data.get_all_int('enables'))
624         process.set_disables(self.conn,
625                              self._form_data.get_all_int('disables'))
626         process.calendarize = self._form_data.get_all_str('calendarize') != []
627         process.save(self.conn)
628         assert isinstance(process.id_, int)
629         steps: list[ProcessStep] = []
630         for step_id in self._form_data.get_all_int('keep_step'):
631             if step_id not in self._form_data.get_all_int('steps'):
632                 raise BadFormatException('trying to keep unknown step')
633         for step_id in self._form_data.get_all_int('steps'):
634             if step_id not in self._form_data.get_all_int('keep_step'):
635                 continue
636             step_process_id = self._form_data.get_int(
637                     f'step_{step_id}_process_id')
638             parent_id = self._form_data.get_int_or_none(
639                     f'step_{step_id}_parent_id')
640             steps += [ProcessStep(step_id, process.id_, step_process_id,
641                                   parent_id)]
642         for step_id in self._form_data.get_all_int('steps'):
643             for step_process_id in self._form_data.get_all_int(
644                     f'new_step_to_{step_id}'):
645                 steps += [ProcessStep(None, process.id_, step_process_id,
646                                       step_id)]
647         new_step_title = None
648         for step_identifier in self._form_data.get_all_str('new_top_step'):
649             try:
650                 step_process_id = int(step_identifier)
651                 steps += [ProcessStep(None, process.id_, step_process_id,
652                                       None)]
653             except ValueError:
654                 new_step_title = step_identifier
655         process.set_steps(self.conn, steps)
656         process.set_step_suppressions(self.conn,
657                                       self._form_data.
658                                       get_all_int('suppresses'))
659         owners_to_set = []
660         new_owner_title = None
661         for owner_identifier in self._form_data.get_all_str('step_of'):
662             try:
663                 owners_to_set += [int(owner_identifier)]
664             except ValueError:
665                 new_owner_title = owner_identifier
666         process.set_owners(self.conn, owners_to_set)
667         params = f'id={process.id_}'
668         if new_step_title:
669             title_b64_encoded = b64encode(new_step_title.encode()).decode()
670             params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
671         elif new_owner_title:
672             title_b64_encoded = b64encode(new_owner_title.encode()).decode()
673             params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
674         process.save(self.conn)
675         return f'/process?{params}'
676
677     def do_POST_condition_descriptions(self) -> str:
678         """Update history timestamps for Condition.description."""
679         return self._change_versioned_timestamps(Condition, 'description')
680
681     def do_POST_condition_titles(self) -> str:
682         """Update history timestamps for Condition.title."""
683         return self._change_versioned_timestamps(Condition, 'title')
684
685     def do_POST_condition(self) -> str:
686         """Update/insert Condition of ?id= and fields defined in postvars."""
687         id_ = self._params.get_int_or_none('id')
688         for _ in self._form_data.get_all_str('delete'):
689             if id_ is None:
690                 raise NotFoundException('trying to delete non-saved Condition')
691             condition = Condition.by_id_or_create(self.conn, id_)
692             condition.remove(self.conn)
693             return '/conditions'
694         condition = Condition.by_id_or_create(self.conn, id_)
695         condition.is_active = self._form_data.get_str('is_active') == 'True'
696         condition.title.set(self._form_data.get_str('title'))
697         condition.description.set(self._form_data.get_str('description'))
698         condition.save(self.conn)
699         return f'/condition?id={condition.id_}'