1 """Web server stuff."""
2 from __future__ import annotations
3 from typing import Any, Callable
4 from base64 import b64encode, b64decode
5 from binascii import Error as binascii_Exception
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
16 from plomtask.db import DatabaseConnection, DatabaseFile, BaseModel
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo, TodoOrProcStepNode
20 from plomtask.misc import DictableNode
22 TEMPLATES_DIR = 'templates'
25 class TaskServer(HTTPServer):
26 """Variant of HTTPServer that knows .jinja as Jinja Environment."""
28 def __init__(self, db_file: DatabaseFile,
29 *args: Any, **kwargs: Any) -> None:
30 super().__init__(*args, **kwargs)
32 self.render_mode = 'html'
33 self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
37 """Wrapper for validating and retrieving dict-like HTTP inputs."""
39 def __init__(self, dict_: dict[str, list[str]]) -> None:
42 def get_all_str(self, key: str) -> list[str]:
43 """Retrieve list of string values at key (empty if no key)."""
44 if key not in self.inputs.keys():
46 return self.inputs[key]
48 def get_all_int(self, key: str) -> list[int]:
49 """Retrieve list of int values at key."""
50 all_str = self.get_all_str(key)
52 return [int(s) for s in all_str if len(s) > 0]
53 except ValueError as e:
54 msg = f'cannot int a form field value for key {key} in: {all_str}'
55 raise BadFormatException(msg) from e
57 def get_str(self, key: str, default: str | None = None) -> str | None:
58 """Retrieve single/first string value of key, or default."""
59 vals = self.get_all_str(key)
64 def get_str_or_fail(self, key: str, default: str | None = None) -> str:
65 """Retrieve first string value of key, if none: fail or default."""
66 vals = self.get_all_str(key)
68 if default is not None:
70 raise BadFormatException(f'no value found for key: {key}')
73 def get_int_or_none(self, key: str) -> int | None:
74 """Retrieve single/first value of key as int, return None if empty."""
75 val = self.get_str_or_fail(key, '')
80 except (ValueError, TypeError) as e:
81 msg = f'cannot int form field value for key {key}: {val}'
82 raise BadFormatException(msg) from e
84 def get_bool_or_none(self, key: str) -> bool | None:
85 """Return value to key if truish; if no value to key, None."""
86 val = self.get_str(key)
89 return val in {'True', 'true', '1', 'on'}
91 def get_firsts_of_key_prefixed(self, prefix: str) -> dict[str, str]:
92 """Retrieve dict of (first) strings at key starting with prefix."""
94 for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
95 ret[key] = self.inputs[key][0]
98 def get_float_or_fail(self, key: str) -> float:
99 """Retrieve float value of key from self.postvars, fail if none."""
100 val = self.get_str_or_fail(key)
103 except ValueError as e:
104 msg = f'cannot float form field value for key {key}: {val}'
105 raise BadFormatException(msg) from e
107 def get_all_floats_or_nones(self, key: str) -> list[float | None]:
108 """Retrieve list of float value at key, None if empty strings."""
109 ret: list[float | None] = []
110 for val in self.get_all_str(key):
116 except ValueError as e:
117 msg = f'cannot float form field value for key {key}: {val}'
118 raise BadFormatException(msg) from e
122 class TaskHandler(BaseHTTPRequestHandler):
123 """Handles single HTTP request."""
124 # pylint: disable=too-many-public-methods
126 conn: DatabaseConnection
129 _params: InputsParser
132 self, ctx: dict[str, Any], tmpl_name: str, code: int = 200
134 """HTTP-send ctx as HTML or JSON, as defined by .server.render_mode.
136 The differentiation by .server.render_mode serves to allow easily
137 comparable JSON responses for automatic testing.
140 headers: list[tuple[str, str]] = []
141 if 'html' == self.server.render_mode:
142 tmpl = self.server.jinja.get_template(f'{tmpl_name}.html')
143 body = tmpl.render(ctx)
145 body = self._ctx_to_json(ctx)
146 headers += [('Content-Type', 'application/json')]
147 self.send_response(code)
148 for header_tuple in headers:
149 self.send_header(*header_tuple)
151 self.wfile.write(bytes(body, 'utf-8'))
153 def _ctx_to_json(self, ctx: dict[str, object]) -> str:
154 """Render ctx into JSON string.
156 Flattens any objects that json.dumps might not want to serialize, and
157 turns occurrences of BaseModel objects into listings of their .id_, to
158 be resolved to a full dict inside a top-level '_library' dictionary,
159 to avoid endless and circular nesting.
162 def flatten(node: object) -> object:
164 def update_library_with(
165 item: BaseModel[int] | BaseModel[str]) -> None:
166 cls_name = item.__class__.__name__
167 if cls_name not in library:
168 library[cls_name] = {}
169 if item.id_ not in library[cls_name]:
170 d, refs = item.as_dict_and_refs
171 id_key = '?' if item.id_ is None else item.id_
172 library[cls_name][id_key] = d
174 update_library_with(ref)
176 if isinstance(node, BaseModel):
177 update_library_with(node)
179 if isinstance(node, DictableNode):
180 d, refs = node.as_dict_and_refs
182 update_library_with(ref)
184 if isinstance(node, (list, tuple)):
185 return [flatten(item) for item in node]
186 if isinstance(node, dict):
188 for k, v in node.items():
191 if isinstance(node, HandledException):
195 library: dict[str, dict[str | int, object]] = {}
196 for k, v in ctx.items():
198 ctx['_library'] = library
199 return json_dumps(ctx)
202 def _request_wrapper(http_method: str, not_found_msg: str
203 ) -> Callable[..., Callable[[TaskHandler], None]]:
204 """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
206 Among other things, conditionally cleans all caches, but only on POST
207 requests, as only those are expected to change the states of objects
208 that may be cached, and certainly only those are expected to write any
209 changes to the database. We want to call them as early though as
210 possible here, either exactly after the specific request handler
211 returns successfully, or right after any exception is triggered –
212 otherwise, race conditions become plausible.
214 Note that otherwise any POST attempt, even a failed one, may end in
215 problematic inconsistencies:
217 - if the POST handler experiences an Exception, changes to objects
218 won't get written to the DB, but the changed objects may remain in
219 the cache and affect other objects despite their possibly illegal
222 - even if an object was just saved to the DB, we cannot be sure its
223 current state is completely identical to what we'd get if loading it
224 fresh from the DB (e.g. currently Process.n_owners is only updated
225 when loaded anew via .from_table_row, nor is its state written to
226 the DB by .save; a questionable design choice, but proof that we
227 have no guarantee that objects' .save stores all their states we'd
228 prefer at their most up-to-date.
231 def clear_caches() -> None:
232 for cls in (Day, Todo, Condition, Process, ProcessStep):
233 assert hasattr(cls, 'empty_cache')
236 def decorator(f: Callable[..., str | None]
237 ) -> Callable[[TaskHandler], None]:
238 def wrapper(self: TaskHandler) -> None:
239 # pylint: disable=protected-access
240 # (because pylint here fails to detect the use of wrapper as a
241 # method to self with respective access privileges)
243 self.conn = DatabaseConnection(self.server.db)
244 parsed_url = urlparse(self.path)
245 self._site = path_split(parsed_url.path)[1]
246 params = parse_qs(parsed_url.query,
247 keep_blank_values=True,
249 self._params = InputsParser(params)
250 handler_name = f'do_{http_method}_{self._site}'
251 if hasattr(self, handler_name):
252 handler = getattr(self, handler_name)
253 redir_target = f(self, handler)
254 if 'POST' == http_method:
257 self.send_response(302)
258 self.send_header('Location', redir_target)
261 msg = f'{not_found_msg}: {self._site}'
262 raise NotFoundException(msg)
263 except HandledException as error:
264 if 'POST' == http_method:
267 self._send_page(ctx, 'msg', error.http_code)
273 @_request_wrapper('GET', 'Unknown page')
274 def do_GET(self, handler: Callable[[], str | dict[str, object]]
276 """Render page with result of handler, or redirect if result is str."""
277 tmpl_name = f'{self._site}'
278 ctx_or_redir_target = handler()
279 if isinstance(ctx_or_redir_target, str):
280 return ctx_or_redir_target
281 self._send_page(ctx_or_redir_target, tmpl_name)
284 @_request_wrapper('POST', 'Unknown POST target')
285 def do_POST(self, handler: Callable[[], str]) -> str:
286 """Handle POST with handler, prepare redirection to result."""
287 length = int(self.headers['content-length'])
288 postvars = parse_qs(self.rfile.read(length).decode(),
289 keep_blank_values=True)
290 self._form = InputsParser(postvars)
291 redir_target = handler()
298 def _get_item(target_class: Any
299 ) -> Callable[..., Callable[[TaskHandler],
301 def decorator(f: Callable[..., dict[str, object]]
302 ) -> Callable[[TaskHandler], dict[str, object]]:
303 def wrapper(self: TaskHandler) -> dict[str, object]:
304 # pylint: disable=protected-access
305 # (because pylint here fails to detect the use of wrapper as a
306 # method to self with respective access privileges)
307 id_ = self._params.get_int_or_none('id')
308 if target_class.can_create_by_id:
309 item = target_class.by_id_or_create(self.conn, id_)
311 item = target_class.by_id(self.conn, id_)
316 def do_GET_(self) -> str:
317 """Return redirect target on GET /."""
320 def _do_GET_calendar(self) -> dict[str, object]:
321 """Show Days from ?start= to ?end=.
323 Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
324 same, the only difference being the HTML template they are rendered to,
325 which .do_GET selects from their method name.
327 start = self._params.get_str_or_fail('start', '')
328 end = self._params.get_str_or_fail('end', '')
329 end = end if end != '' else date_in_n_days(366)
330 days, start, end = Day.by_date_range_with_limits(self.conn,
332 days = Day.with_filled_gaps(days, start, end)
333 today = date_in_n_days(0)
334 return {'start': start, 'end': end, 'days': days, 'today': today}
336 def do_GET_calendar(self) -> dict[str, object]:
337 """Show Days from ?start= to ?end= – normal view."""
338 return self._do_GET_calendar()
340 def do_GET_calendar_txt(self) -> dict[str, object]:
341 """Show Days from ?start= to ?end= – minimalist view."""
342 return self._do_GET_calendar()
344 def do_GET_day(self) -> dict[str, object]:
345 """Show single Day of ?date=."""
346 date = self._params.get_str_or_fail('date', date_in_n_days(0))
347 day = Day.by_id_or_create(self.conn, date)
348 make_type = self._params.get_str_or_fail('make_type', '')
349 conditions_present = []
352 for todo in day.todos:
353 for condition in todo.conditions + todo.blockers:
354 if condition not in conditions_present:
355 conditions_present += [condition]
356 enablers_for[condition.id_] = [p for p in
357 Process.all(self.conn)
358 if condition in p.enables]
359 disablers_for[condition.id_] = [p for p in
360 Process.all(self.conn)
361 if condition in p.disables]
362 seen_todos: set[int] = set()
363 top_nodes = [t.get_step_tree(seen_todos)
364 for t in day.todos if not t.parents]
366 'top_nodes': top_nodes,
367 'make_type': make_type,
368 'enablers_for': enablers_for,
369 'disablers_for': disablers_for,
370 'conditions_present': conditions_present,
371 'processes': Process.all(self.conn)}
374 def do_GET_todo(self, todo: Todo) -> dict[str, object]:
375 """Show single Todo of ?id=."""
377 def walk_process_steps(node_id: int,
378 process_step_nodes: list[ProcessStepsNode],
379 steps_nodes: list[TodoOrProcStepNode]) -> int:
380 for process_step_node in process_step_nodes:
382 proc = Process.by_id(self.conn,
383 process_step_node.step.step_process_id)
384 node = TodoOrProcStepNode(node_id, None, proc, [])
385 steps_nodes += [node]
386 node_id = walk_process_steps(
387 node_id, process_step_node.steps, node.children)
390 def walk_todo_steps(node_id: int, todos: list[Todo],
391 steps_nodes: list[TodoOrProcStepNode]) -> int:
394 for match in [item for item in steps_nodes
396 and item.process == todo.process]:
399 for child in match.children:
400 child.fillable = True
401 node_id = walk_todo_steps(
402 node_id, todo.children, match.children)
405 node = TodoOrProcStepNode(node_id, todo, None, [])
406 steps_nodes += [node]
407 node_id = walk_todo_steps(
408 node_id, todo.children, node.children)
411 def collect_adoptables_keys(
412 steps_nodes: list[TodoOrProcStepNode]) -> set[int]:
414 for node in steps_nodes:
416 assert isinstance(node.process, Process)
417 assert isinstance(node.process.id_, int)
418 ids.add(node.process.id_)
419 ids = ids | collect_adoptables_keys(node.children)
422 todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
423 process_tree = todo.process.get_steps(self.conn, None)
424 steps_todo_to_process: list[TodoOrProcStepNode] = []
425 last_node_id = walk_process_steps(0, process_tree,
426 steps_todo_to_process)
427 for steps_node in steps_todo_to_process:
428 steps_node.fillable = True
429 walk_todo_steps(last_node_id, todo_steps, steps_todo_to_process)
430 adoptables: dict[int, list[Todo]] = {}
431 any_adoptables = [Todo.by_id(self.conn, t.id_)
432 for t in Todo.by_date(self.conn, todo.date)
435 for id_ in collect_adoptables_keys(steps_todo_to_process):
436 adoptables[id_] = [t for t in any_adoptables
437 if t.process.id_ == id_]
438 return {'todo': todo,
439 'steps_todo_to_process': steps_todo_to_process,
440 'adoption_candidates_for': adoptables,
441 'process_candidates': sorted(Process.all(self.conn)),
442 'todo_candidates': any_adoptables,
443 'condition_candidates': Condition.all(self.conn)}
445 def do_GET_todos(self) -> dict[str, object]:
446 """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
447 sort_by = self._params.get_str_or_fail('sort_by', '')
448 start = self._params.get_str_or_fail('start', '')
449 end = self._params.get_str_or_fail('end', '')
450 process_id = self._params.get_int_or_none('process_id')
451 comment_pattern = self._params.get_str_or_fail('comment_pattern', '')
453 ret = Todo.by_date_range_with_limits(self.conn, (start, end))
454 todos_by_date_range, start, end = ret
455 todos = [t for t in todos_by_date_range
456 if comment_pattern in t.comment
457 and ((not process_id) or t.process.id_ == process_id)]
458 sort_by = Todo.sort_by(todos, sort_by)
459 return {'start': start, 'end': end, 'process_id': process_id,
460 'comment_pattern': comment_pattern, 'todos': todos,
461 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
463 def do_GET_conditions(self) -> dict[str, object]:
464 """Show all Conditions."""
465 pattern = self._params.get_str_or_fail('pattern', '')
466 sort_by = self._params.get_str_or_fail('sort_by', '')
467 conditions = Condition.matching(self.conn, pattern)
468 sort_by = Condition.sort_by(conditions, sort_by)
469 return {'conditions': conditions,
473 @_get_item(Condition)
474 def do_GET_condition(self, c: Condition) -> dict[str, object]:
475 """Show Condition of ?id=."""
476 ps = Process.all(self.conn)
477 return {'condition': c, 'is_new': c.id_ is None,
478 'enabled_processes': [p for p in ps if c in p.conditions],
479 'disabled_processes': [p for p in ps if c in p.blockers],
480 'enabling_processes': [p for p in ps if c in p.enables],
481 'disabling_processes': [p for p in ps if c in p.disables]}
483 @_get_item(Condition)
484 def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
485 """Show title history of Condition of ?id=."""
486 return {'condition': c}
488 @_get_item(Condition)
489 def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
490 """Show description historys of Condition of ?id=."""
491 return {'condition': c}
494 def do_GET_process(self, process: Process) -> dict[str, object]:
495 """Show Process of ?id=."""
496 owner_ids = self._params.get_all_int('step_to')
497 owned_ids = self._params.get_all_int('has_step')
498 title_64 = self._params.get_str('title_b64')
501 title = b64decode(title_64.encode()).decode()
502 except binascii_Exception as exc:
503 msg = 'invalid base64 for ?title_b64='
504 raise BadFormatException(msg) from exc
505 process.title.set(title)
506 preset_top_step = None
507 owners = process.used_as_step_by(self.conn)
508 for step_id in owner_ids:
509 owners += [Process.by_id(self.conn, step_id)]
510 for process_id in owned_ids:
511 Process.by_id(self.conn, process_id) # to ensure ID exists
512 preset_top_step = process_id
513 return {'process': process, 'is_new': process.id_ is None,
514 'preset_top_step': preset_top_step,
515 'steps': process.get_steps(self.conn),
517 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
518 'process_candidates': Process.all(self.conn),
519 'condition_candidates': Condition.all(self.conn)}
522 def do_GET_process_titles(self, p: Process) -> dict[str, object]:
523 """Show title history of Process of ?id=."""
524 return {'process': p}
527 def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
528 """Show description historys of Process of ?id=."""
529 return {'process': p}
532 def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
533 """Show default effort history of Process of ?id=."""
534 return {'process': p}
536 def do_GET_processes(self) -> dict[str, object]:
537 """Show all Processes."""
538 pattern = self._params.get_str_or_fail('pattern', '')
539 sort_by = self._params.get_str_or_fail('sort_by', '')
540 processes = Process.matching(self.conn, pattern)
541 sort_by = Process.sort_by(processes, sort_by)
542 return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
547 def _delete_or_post(target_class: Any, redir_target: str = '/'
548 ) -> Callable[..., Callable[[TaskHandler], str]]:
549 def decorator(f: Callable[..., str]
550 ) -> Callable[[TaskHandler], str]:
551 def wrapper(self: TaskHandler) -> str:
552 # pylint: disable=protected-access
553 # (because pylint here fails to detect the use of wrapper as a
554 # method to self with respective access privileges)
555 id_ = self._params.get_int_or_none('id')
556 for _ in self._form.get_all_str('delete'):
558 msg = 'trying to delete non-saved ' +\
559 f'{target_class.__name__}'
560 raise NotFoundException(msg)
561 item = target_class.by_id(self.conn, id_)
562 item.remove(self.conn)
564 if target_class.can_create_by_id:
565 item = target_class.by_id_or_create(self.conn, id_)
567 item = target_class.by_id(self.conn, id_)
572 def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
573 """Update history timestamps for VersionedAttribute."""
574 id_ = self._params.get_int_or_none('id')
575 item = cls.by_id(self.conn, id_)
576 attr = getattr(item, attr_name)
577 for k, v in self._form.get_firsts_of_key_prefixed('at:').items():
580 attr.reset_timestamp(old, f'{v}.0')
582 return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
584 def do_POST_day(self) -> str:
585 """Update or insert Day of date and Todos mapped to it."""
586 # pylint: disable=too-many-locals
587 date = self._params.get_str_or_fail('date')
588 day_comment = self._form.get_str_or_fail('day_comment')
589 make_type = self._form.get_str_or_fail('make_type')
590 old_todos = self._form.get_all_int('todo_id')
591 new_todos_by_process = self._form.get_all_int('new_todo')
592 comments = self._form.get_all_str('comment')
593 efforts = self._form.get_all_floats_or_nones('effort')
594 done_todos = self._form.get_all_int('done')
595 for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
596 raise BadFormatException('"done" field refers to unknown Todo')
597 is_done = [t_id in done_todos for t_id in old_todos]
598 if not (len(old_todos) == len(is_done) == len(comments)
600 msg = 'not equal number each of number of todo_id, comments, ' +\
602 raise BadFormatException(msg)
603 day = Day.by_id_or_create(self.conn, date)
604 day.comment = day_comment
607 for process_id in sorted(new_todos_by_process):
608 process = Process.by_id(self.conn, process_id)
609 todo = Todo(None, process, False, date)
612 if 'full' == make_type:
613 for todo in new_todos:
614 todo.ensure_children(self.conn)
615 for i, todo_id in enumerate(old_todos):
616 todo = Todo.by_id(self.conn, todo_id)
617 todo.is_done = is_done[i]
618 todo.comment = comments[i]
619 todo.effort = efforts[i]
621 return f'/day?date={date}&make_type={make_type}'
623 @_delete_or_post(Todo, '/')
624 def do_POST_todo(self, todo: Todo) -> str:
625 """Update Todo and its children."""
626 # pylint: disable=too-many-locals
627 # pylint: disable=too-many-branches
628 adopted_child_ids = self._form.get_all_int('adopt')
629 to_make = {'full': self._form.get_all_int('make_full'),
630 'empty': self._form.get_all_int('make_empty')}
631 step_fillers = self._form.get_all_str('step_filler')
632 to_update: dict[str, Any] = {
633 'comment': self._form.get_str_or_fail('comment', '')}
634 for k in ('is_done', 'calendarize'):
635 v = self._form.get_bool_or_none(k)
638 cond_rels = [self._form.get_all_int(name) for name in
639 ['conditions', 'blockers', 'enables', 'disables']]
640 effort_or_not = self._form.get_str('effort')
641 if effort_or_not is not None:
642 if effort_or_not == '':
643 to_update['effort'] = None
646 to_update['effort'] = float(effort_or_not)
647 except ValueError as e:
648 msg = 'cannot float form field value for key: effort'
649 raise BadFormatException(msg) from e
650 todo.set_condition_relations(self.conn, *cond_rels)
651 for filler in [f for f in step_fillers if f != 'ignore']:
654 for prefix in [p for p in ['make_empty_', 'make_full_']
655 if filler.startswith(p)]:
656 to_int = filler[len(prefix):]
658 target_id = int(to_int)
659 except ValueError as e:
660 msg = 'bad fill_for target: {filler}'
661 raise BadFormatException(msg) from e
662 if filler.startswith('make_empty_'):
663 to_make['empty'] += [target_id]
664 elif filler.startswith('make_full_'):
665 to_make['full'] += [target_id]
667 adopted_child_ids += [target_id]
669 for child in todo.children:
670 if child.id_ and (child.id_ not in adopted_child_ids):
671 to_remove += [child.id_]
672 for id_ in to_remove:
673 child = Todo.by_id(self.conn, id_)
674 todo.remove_child(child)
675 for child_id in adopted_child_ids:
676 if child_id not in [c.id_ for c in todo.children]:
677 todo.add_child(Todo.by_id(self.conn, child_id))
678 todo.update_attrs(**to_update)
679 for approach, proc_ids in to_make.items():
680 for process_id in proc_ids:
681 process = Process.by_id(self.conn, process_id)
682 made = Todo(None, process, False, todo.date)
684 if 'full' == approach:
685 made.ensure_children(self.conn)
687 # todo.save() may destroy Todo if .effort < 0, so retrieve .id_ early
688 url = f'/todo?id={todo.id_}'
692 def do_POST_process_descriptions(self) -> str:
693 """Update history timestamps for Process.description."""
694 return self._change_versioned_timestamps(Process, 'description')
696 def do_POST_process_efforts(self) -> str:
697 """Update history timestamps for Process.effort."""
698 return self._change_versioned_timestamps(Process, 'effort')
700 def do_POST_process_titles(self) -> str:
701 """Update history timestamps for Process.title."""
702 return self._change_versioned_timestamps(Process, 'title')
704 @_delete_or_post(Process, '/processes')
705 def do_POST_process(self, process: Process) -> str:
706 """Update or insert Process of ?id= and fields defined in postvars."""
707 # pylint: disable=too-many-locals
708 versioned = {'title': self._form.get_str_or_fail('title'),
709 'description': self._form.get_str_or_fail('description'),
710 'effort': self._form.get_float_or_fail('effort')}
711 cond_rels = [self._form.get_all_int(s) for s
712 in ['conditions', 'blockers', 'enables', 'disables']]
713 calendarize = self._form.get_bool_or_none('calendarize')
714 step_of = self._form.get_all_str('step_of')
715 suppresses = self._form.get_all_int('suppresses')
716 kept_steps = self._form.get_all_int('kept_steps')
717 new_top_steps = self._form.get_all_str('new_top_step')
719 for step_id in kept_steps:
720 name = f'new_step_to_{step_id}'
721 new_steps_to[step_id] = self._form.get_all_int(name)
722 for k, v in versioned.items():
723 getattr(process, k).set(v)
724 process.set_condition_relations(self.conn, *cond_rels)
725 if calendarize is not None:
726 process.calendarize = calendarize
727 process.save(self.conn)
728 assert isinstance(process.id_, int)
729 # set relations to, and if non-existant yet: create, other Processes
730 # pylint: disable=fixme
731 # TODO: in what order to set owners, owneds, and possibly step
732 # suppressions can make the difference between recursion checks
733 # failing; should probably be handled class-internally to Process
735 # 1. owners (upwards)
737 new_owner_title = None
738 for owner_identifier in step_of:
740 owners_to_set += [int(owner_identifier)]
742 new_owner_title = owner_identifier
743 process.set_owners(self.conn, owners_to_set)
744 # 2. owneds (downwards)
745 new_step_title = None
746 steps: list[ProcessStep] = [ProcessStep.by_id(self.conn, step_id)
747 for step_id in kept_steps]
748 for step_id in kept_steps:
750 ProcessStep(None, process.id_, step_process_id, step_id)
751 for step_process_id in new_steps_to[step_id]]
752 steps += new_sub_steps
753 for step_id_or_new_title in new_top_steps:
755 step_process_id = int(step_id_or_new_title)
756 step = ProcessStep(None, process.id_, step_process_id, None)
759 new_step_title = step_id_or_new_title
760 process.set_steps(self.conn, steps)
761 process.set_step_suppressions(self.conn, suppresses)
762 # encode titles for potentially newly created Processes up or down
763 params = f'id={process.id_}'
765 title_b64_encoded = b64encode(new_step_title.encode()).decode()
766 params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
767 elif new_owner_title:
768 title_b64_encoded = b64encode(new_owner_title.encode()).decode()
769 params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
770 process.save(self.conn)
771 return f'/process?{params}'
773 def do_POST_condition_descriptions(self) -> str:
774 """Update history timestamps for Condition.description."""
775 return self._change_versioned_timestamps(Condition, 'description')
777 def do_POST_condition_titles(self) -> str:
778 """Update history timestamps for Condition.title."""
779 return self._change_versioned_timestamps(Condition, 'title')
781 @_delete_or_post(Condition, '/conditions')
782 def do_POST_condition(self, condition: Condition) -> str:
783 """Update/insert Condition of ?id= and fields defined in postvars."""
784 title = self._form.get_str_or_fail('title')
785 description = self._form.get_str_or_fail('description')
786 is_active = self._form.get_bool_or_none('is_active')
787 if is_active is not None:
788 condition.is_active = is_active
789 condition.title.set(title)
790 condition.description.set(description)
791 condition.save(self.conn)
792 return f'/condition?id={condition.id_}'