1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from binascii import Error as binascii_Exception
7 from http.server import BaseHTTPRequestHandler
8 from http.server import HTTPServer
9 from urllib.parse import urlparse, parse_qs
10 from json import dumps as json_dumps
11 from os.path import split as path_split
12 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
13 from plomtask.dating import date_in_n_days
14 from plomtask.days import Day
15 from plomtask.exceptions import (HandledException, BadFormatException,
17 from plomtask.db import DatabaseConnection, DatabaseFile
18 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
19 from plomtask.conditions import Condition
20 from plomtask.todos import Todo
22 TEMPLATES_DIR = 'templates'
25 class TaskServer(HTTPServer):
26 """Variant of HTTPServer that knows .jinja as Jinja Environment."""
28 def __init__(self, db_file: DatabaseFile,
29 *args: Any, **kwargs: Any) -> None:
30 super().__init__(*args, **kwargs)
32 self.headers: list[tuple[str, str]] = []
33 self._render_mode = 'html'
34 self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
36 def set_json_mode(self) -> None:
37 """Make server send JSON instead of HTML responses."""
38 self._render_mode = 'json'
39 self.headers += [('Content-Type', 'application/json')]
42 def ctx_to_json(ctx: dict[str, object]) -> str:
43 """Render ctx into JSON string."""
44 def walk_ctx(node: object) -> Any:
45 if hasattr(node, 'as_dict_into_reference'):
46 if hasattr(node, 'id_') and node.id_ is not None:
47 return node.as_dict_into_reference(library)
48 if hasattr(node, 'as_dict'):
50 if isinstance(node, (list, tuple)):
51 return [walk_ctx(x) for x in node]
52 if isinstance(node, dict):
54 for k, v in node.items():
57 if isinstance(node, HandledException):
60 library: dict[str, dict[str, object] | dict[int, object]] = {}
61 for k, v in ctx.items():
63 ctx['_library'] = library
64 return json_dumps(ctx)
66 def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
67 """Render ctx according to self._render_mode.."""
68 tmpl_name = f'{tmpl_name}.{self._render_mode}'
69 if 'html' == self._render_mode:
70 template = self._jinja.get_template(tmpl_name)
71 return template.render(ctx)
72 return self.__class__.ctx_to_json(ctx)
76 """Wrapper for validating and retrieving dict-like HTTP inputs."""
78 def __init__(self, dict_: dict[str, list[str]],
79 strictness: bool = True) -> None:
81 self.strict = strictness # return None on absence of key, or fail?
83 def get_str(self, key: str, default: str = '',
84 ignore_strict: bool = False) -> str:
85 """Retrieve single/first string value of key, or default."""
86 if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
87 if self.strict and not ignore_strict:
88 raise NotFoundException(f'no value found for key {key}')
90 return self.inputs[key][0]
92 def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
93 """Retrieve dict of (first) strings at key starting with prefix."""
95 for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
96 ret[key] = self.inputs[key][0]
99 def get_int(self, key: str) -> int:
100 """Retrieve single/first value of key as int, error if empty."""
101 val = self.get_int_or_none(key)
103 raise BadFormatException(f'unexpected empty value for: {key}')
106 def get_int_or_none(self, key: str) -> int | None:
107 """Retrieve single/first value of key as int, return None if empty."""
108 val = self.get_str(key, ignore_strict=True)
113 except ValueError as e:
114 msg = f'cannot int form field value for key {key}: {val}'
115 raise BadFormatException(msg) from e
117 def get_float(self, key: str) -> float:
118 """Retrieve float value of key from self.postvars."""
119 val = self.get_str(key)
122 except ValueError as e:
123 msg = f'cannot float form field value for key {key}: {val}'
124 raise BadFormatException(msg) from e
126 def get_float_or_none(self, key: str) -> float | None:
127 """Retrieve float value of key from self.postvars, None if empty."""
128 val = self.get_str(key)
133 except ValueError as e:
134 msg = f'cannot float form field value for key {key}: {val}'
135 raise BadFormatException(msg) from e
137 def get_all_str(self, key: str) -> list[str]:
138 """Retrieve list of string values at key."""
139 if key not in self.inputs.keys():
141 return self.inputs[key]
143 def get_all_int(self, key: str) -> list[int]:
144 """Retrieve list of int values at key."""
145 all_str = self.get_all_str(key)
147 return [int(s) for s in all_str if len(s) > 0]
148 except ValueError as e:
149 msg = f'cannot int a form field value for key {key} in: {all_str}'
150 raise BadFormatException(msg) from e
152 def get_all_floats_or_nones(self, key: str) -> list[float | None]:
153 """Retrieve list of float value at key, None if empty strings."""
154 ret: list[float | None] = []
155 for val in self.get_all_str(key):
161 except ValueError as e:
162 msg = f'cannot float form field value for key {key}: {val}'
163 raise BadFormatException(msg) from e
167 class TaskHandler(BaseHTTPRequestHandler):
168 """Handles single HTTP request."""
169 # pylint: disable=too-many-public-methods
171 conn: DatabaseConnection
173 _form_data: InputsParser
174 _params: InputsParser
181 """Send ctx as proper HTTP response."""
182 body = self.server.render(ctx, tmpl_name)
183 self.send_response(code)
184 for header_tuple in self.server.headers:
185 self.send_header(*header_tuple)
187 self.wfile.write(bytes(body, 'utf-8'))
190 def _request_wrapper(http_method: str, not_found_msg: str
191 ) -> Callable[..., Callable[[TaskHandler], None]]:
192 """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
194 Among other things, conditionally cleans all caches, but only on POST
195 requests, as only those are expected to change the states of objects
196 that may be cached, and certainly only those are expected to write any
197 changes to the database. We want to call them as early though as
198 possible here, either exactly after the specific request handler
199 returns successfully, or right after any exception is triggered –
200 otherwise, race conditions become plausible.
202 Note that otherwise any POST attempt, even a failed one, may end in
203 problematic inconsistencies:
205 - if the POST handler experiences an Exception, changes to objects
206 won't get written to the DB, but the changed objects may remain in
207 the cache and affect other objects despite their possibly illegal
210 - even if an object was just saved to the DB, we cannot be sure its
211 current state is completely identical to what we'd get if loading it
212 fresh from the DB (e.g. currently Process.n_owners is only updated
213 when loaded anew via .from_table_row, nor is its state written to
214 the DB by .save; a questionable design choice, but proof that we
215 have no guarantee that objects' .save stores all their states we'd
216 prefer at their most up-to-date.
219 def clear_caches() -> None:
220 for cls in (Day, Todo, Condition, Process, ProcessStep):
221 assert hasattr(cls, 'empty_cache')
224 def decorator(f: Callable[..., str | None]
225 ) -> Callable[[TaskHandler], None]:
226 def wrapper(self: TaskHandler) -> None:
227 # pylint: disable=protected-access
228 # (because pylint here fails to detect the use of wrapper as a
229 # method to self with respective access privileges)
231 self.conn = DatabaseConnection(self.server.db)
232 parsed_url = urlparse(self.path)
233 self._site = path_split(parsed_url.path)[1]
234 params = parse_qs(parsed_url.query, strict_parsing=True)
235 self._params = InputsParser(params, False)
236 handler_name = f'do_{http_method}_{self._site}'
237 if hasattr(self, handler_name):
238 handler = getattr(self, handler_name)
239 redir_target = f(self, handler)
240 if 'POST' == http_method:
243 self.send_response(302)
244 self.send_header('Location', redir_target)
247 msg = f'{not_found_msg}: {self._site}'
248 raise NotFoundException(msg)
249 except HandledException as error:
250 if 'POST' == http_method:
253 self._send_page(ctx, 'msg', error.http_code)
259 @_request_wrapper('GET', 'Unknown page')
260 def do_GET(self, handler: Callable[[], str | dict[str, object]]
262 """Render page with result of handler, or redirect if result is str."""
263 tmpl_name = f'{self._site}'
264 ctx_or_redir_target = handler()
265 if isinstance(ctx_or_redir_target, str):
266 return ctx_or_redir_target
267 self._send_page(ctx_or_redir_target, tmpl_name)
270 @_request_wrapper('POST', 'Unknown POST target')
271 def do_POST(self, handler: Callable[[], str]) -> str:
272 """Handle POST with handler, prepare redirection to result."""
273 length = int(self.headers['content-length'])
274 postvars = parse_qs(self.rfile.read(length).decode(),
275 keep_blank_values=True, strict_parsing=True)
276 self._form_data = InputsParser(postvars)
277 redir_target = handler()
284 def _get_item(target_class: Any
285 ) -> Callable[..., Callable[[TaskHandler],
287 def decorator(f: Callable[..., dict[str, object]]
288 ) -> Callable[[TaskHandler], dict[str, object]]:
289 def wrapper(self: TaskHandler) -> dict[str, object]:
290 # pylint: disable=protected-access
291 # (because pylint here fails to detect the use of wrapper as a
292 # method to self with respective access privileges)
293 id_ = self._params.get_int_or_none('id')
294 if target_class.can_create_by_id:
295 item = target_class.by_id_or_create(self.conn, id_)
297 item = target_class.by_id(self.conn, id_)
302 def do_GET_(self) -> str:
303 """Return redirect target on GET /."""
306 def _do_GET_calendar(self) -> dict[str, object]:
307 """Show Days from ?start= to ?end=.
309 Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
310 same, the only difference being the HTML template they are rendered to,
311 which .do_GET selects from their method name.
313 start, end = self._params.get_str('start'), self._params.get_str('end')
314 end = end if end else date_in_n_days(366)
315 days, start, end = Day.by_date_range_with_limits(self.conn,
317 days = Day.with_filled_gaps(days, start, end)
318 today = date_in_n_days(0)
319 return {'start': start, 'end': end, 'days': days, 'today': today}
321 def do_GET_calendar(self) -> dict[str, object]:
322 """Show Days from ?start= to ?end= – normal view."""
323 return self._do_GET_calendar()
325 def do_GET_calendar_txt(self) -> dict[str, object]:
326 """Show Days from ?start= to ?end= – minimalist view."""
327 return self._do_GET_calendar()
329 def do_GET_day(self) -> dict[str, object]:
330 """Show single Day of ?date=."""
331 date = self._params.get_str('date', date_in_n_days(0))
332 day = Day.by_id_or_create(self.conn, date)
333 make_type = self._params.get_str('make_type')
334 conditions_present = []
337 for todo in day.todos:
338 for condition in todo.conditions + todo.blockers:
339 if condition not in conditions_present:
340 conditions_present += [condition]
341 enablers_for[condition.id_] = [p for p in
342 Process.all(self.conn)
343 if condition in p.enables]
344 disablers_for[condition.id_] = [p for p in
345 Process.all(self.conn)
346 if condition in p.disables]
347 seen_todos: set[int] = set()
348 top_nodes = [t.get_step_tree(seen_todos)
349 for t in day.todos if not t.parents]
351 'top_nodes': top_nodes,
352 'make_type': make_type,
353 'enablers_for': enablers_for,
354 'disablers_for': disablers_for,
355 'conditions_present': conditions_present,
356 'processes': Process.all(self.conn)}
359 def do_GET_todo(self, todo: Todo) -> dict[str, object]:
360 """Show single Todo of ?id=."""
364 """Collect what's useful for Todo steps tree display."""
367 process: Process | None
368 children: list[TodoStepsNode] # pylint: disable=undefined-variable
369 fillable: bool = False
371 def walk_process_steps(id_: int,
372 process_step_nodes: list[ProcessStepsNode],
373 steps_nodes: list[TodoStepsNode]) -> None:
374 for process_step_node in process_step_nodes:
376 node = TodoStepsNode(id_, None, process_step_node.process, [])
377 steps_nodes += [node]
378 walk_process_steps(id_, list(process_step_node.steps.values()),
381 def walk_todo_steps(id_: int, todos: list[Todo],
382 steps_nodes: list[TodoStepsNode]) -> None:
385 for match in [item for item in steps_nodes
387 and item.process == todo.process]:
390 for child in match.children:
391 child.fillable = True
392 walk_todo_steps(id_, todo.children, match.children)
395 node = TodoStepsNode(id_, todo, None, [])
396 steps_nodes += [node]
397 walk_todo_steps(id_, todo.children, node.children)
399 def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
402 for node in steps_nodes:
404 assert isinstance(node.process, Process)
405 assert isinstance(node.process.id_, int)
406 ids.add(node.process.id_)
407 ids = ids | collect_adoptables_keys(node.children)
410 todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
411 process_tree = todo.process.get_steps(self.conn, None)
412 steps_todo_to_process: list[TodoStepsNode] = []
413 walk_process_steps(0, list(process_tree.values()),
414 steps_todo_to_process)
415 for steps_node in steps_todo_to_process:
416 steps_node.fillable = True
417 walk_todo_steps(len(steps_todo_to_process), todo_steps,
418 steps_todo_to_process)
419 adoptables: dict[int, list[Todo]] = {}
420 any_adoptables = [Todo.by_id(self.conn, t.id_)
421 for t in Todo.by_date(self.conn, todo.date)
424 for id_ in collect_adoptables_keys(steps_todo_to_process):
425 adoptables[id_] = [t for t in any_adoptables
426 if t.process.id_ == id_]
427 return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
428 'adoption_candidates_for': adoptables,
429 'process_candidates': Process.all(self.conn),
430 'todo_candidates': any_adoptables,
431 'condition_candidates': Condition.all(self.conn)}
433 def do_GET_todos(self) -> dict[str, object]:
434 """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
435 sort_by = self._params.get_str('sort_by')
436 start = self._params.get_str('start')
437 end = self._params.get_str('end')
438 process_id = self._params.get_int_or_none('process_id')
439 comment_pattern = self._params.get_str('comment_pattern')
441 ret = Todo.by_date_range_with_limits(self.conn, (start, end))
442 todos_by_date_range, start, end = ret
443 todos = [t for t in todos_by_date_range
444 if comment_pattern in t.comment
445 and ((not process_id) or t.process.id_ == process_id)]
446 sort_by = Todo.sort_by(todos, sort_by)
447 return {'start': start, 'end': end, 'process_id': process_id,
448 'comment_pattern': comment_pattern, 'todos': todos,
449 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
451 def do_GET_conditions(self) -> dict[str, object]:
452 """Show all Conditions."""
453 pattern = self._params.get_str('pattern')
454 sort_by = self._params.get_str('sort_by')
455 conditions = Condition.matching(self.conn, pattern)
456 sort_by = Condition.sort_by(conditions, sort_by)
457 return {'conditions': conditions,
461 @_get_item(Condition)
462 def do_GET_condition(self, c: Condition) -> dict[str, object]:
463 """Show Condition of ?id=."""
464 ps = Process.all(self.conn)
465 return {'condition': c, 'is_new': c.id_ is None,
466 'enabled_processes': [p for p in ps if c in p.conditions],
467 'disabled_processes': [p for p in ps if c in p.blockers],
468 'enabling_processes': [p for p in ps if c in p.enables],
469 'disabling_processes': [p for p in ps if c in p.disables]}
471 @_get_item(Condition)
472 def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
473 """Show title history of Condition of ?id=."""
474 return {'condition': c}
476 @_get_item(Condition)
477 def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
478 """Show description historys of Condition of ?id=."""
479 return {'condition': c}
482 def do_GET_process(self, process: Process) -> dict[str, object]:
483 """Show Process of ?id=."""
484 owner_ids = self._params.get_all_int('step_to')
485 owned_ids = self._params.get_all_int('has_step')
486 title_64 = self._params.get_str('title_b64')
489 title = b64decode(title_64.encode()).decode()
490 except binascii_Exception as exc:
491 msg = 'invalid base64 for ?title_b64='
492 raise BadFormatException(msg) from exc
493 process.title.set(title)
494 preset_top_step = None
495 owners = process.used_as_step_by(self.conn)
496 for step_id in owner_ids:
497 owners += [Process.by_id(self.conn, step_id)]
498 for process_id in owned_ids:
499 Process.by_id(self.conn, process_id) # to ensure ID exists
500 preset_top_step = process_id
501 return {'process': process, 'is_new': process.id_ is None,
502 'preset_top_step': preset_top_step,
503 'steps': process.get_steps(self.conn), 'owners': owners,
504 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
505 'process_candidates': Process.all(self.conn),
506 'condition_candidates': Condition.all(self.conn)}
509 def do_GET_process_titles(self, p: Process) -> dict[str, object]:
510 """Show title history of Process of ?id=."""
511 return {'process': p}
514 def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
515 """Show description historys of Process of ?id=."""
516 return {'process': p}
519 def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
520 """Show default effort history of Process of ?id=."""
521 return {'process': p}
523 def do_GET_processes(self) -> dict[str, object]:
524 """Show all Processes."""
525 pattern = self._params.get_str('pattern')
526 sort_by = self._params.get_str('sort_by')
527 processes = Process.matching(self.conn, pattern)
528 sort_by = Process.sort_by(processes, sort_by)
529 return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
534 def _delete_or_post(target_class: Any, redir_target: str = '/'
535 ) -> Callable[..., Callable[[TaskHandler], str]]:
536 def decorator(f: Callable[..., str]
537 ) -> Callable[[TaskHandler], str]:
538 def wrapper(self: TaskHandler) -> str:
539 # pylint: disable=protected-access
540 # (because pylint here fails to detect the use of wrapper as a
541 # method to self with respective access privileges)
542 id_ = self._params.get_int_or_none('id')
543 for _ in self._form_data.get_all_str('delete'):
545 msg = 'trying to delete non-saved ' +\
546 f'{target_class.__name__}'
547 raise NotFoundException(msg)
548 item = target_class.by_id(self.conn, id_)
549 item.remove(self.conn)
551 if target_class.can_create_by_id:
552 item = target_class.by_id_or_create(self.conn, id_)
554 item = target_class.by_id(self.conn, id_)
559 def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
560 """Update history timestamps for VersionedAttribute."""
561 id_ = self._params.get_int_or_none('id')
562 item = cls.by_id(self.conn, id_)
563 attr = getattr(item, attr_name)
564 for k, v in self._form_data.get_first_strings_starting('at:').items():
567 attr.reset_timestamp(old, f'{v}.0')
569 return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
571 def do_POST_day(self) -> str:
572 """Update or insert Day of date and Todos mapped to it."""
573 # pylint: disable=too-many-locals
575 date = self._params.get_str('date')
576 day_comment = self._form_data.get_str('day_comment')
577 make_type = self._form_data.get_str('make_type')
578 except NotFoundException as e:
579 raise BadFormatException from e
580 old_todos = self._form_data.get_all_int('todo_id')
581 new_todos = self._form_data.get_all_int('new_todo')
582 comments = self._form_data.get_all_str('comment')
583 efforts = self._form_data.get_all_floats_or_nones('effort')
584 done_todos = self._form_data.get_all_int('done')
585 for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
586 raise BadFormatException('"done" field refers to unknown Todo')
587 is_done = [t_id in done_todos for t_id in old_todos]
588 if not (len(old_todos) == len(is_done) == len(comments)
590 msg = 'not equal number each of number of todo_id, comments, ' +\
592 raise BadFormatException(msg)
593 day = Day.by_id_or_create(self.conn, date)
594 day.comment = day_comment
596 for process_id in sorted(new_todos):
597 if 'empty' == make_type:
598 process = Process.by_id(self.conn, process_id)
599 todo = Todo(None, process, False, date)
602 Todo.create_with_children(self.conn, process_id, date)
603 for i, todo_id in enumerate(old_todos):
604 todo = Todo.by_id(self.conn, todo_id)
605 todo.is_done = is_done[i]
606 todo.comment = comments[i]
607 todo.effort = efforts[i]
609 return f'/day?date={date}&make_type={make_type}'
611 @_delete_or_post(Todo, '/')
612 def do_POST_todo(self, todo: Todo) -> str:
613 """Update Todo and its children."""
614 # pylint: disable=too-many-locals
615 # pylint: disable=too-many-branches
616 # pylint: disable=too-many-statements
617 adopted_child_ids = self._form_data.get_all_int('adopt')
618 processes_to_make_full = self._form_data.get_all_int('make_full')
619 processes_to_make_empty = self._form_data.get_all_int('make_empty')
620 fill_fors = self._form_data.get_first_strings_starting('fill_for_')
621 with_effort_post = True
623 effort = self._form_data.get_float_or_none('effort')
624 except NotFoundException:
625 with_effort_post = False
626 conditions = self._form_data.get_all_int('conditions')
627 disables = self._form_data.get_all_int('disables')
628 blockers = self._form_data.get_all_int('blockers')
629 enables = self._form_data.get_all_int('enables')
630 is_done = len(self._form_data.get_all_str('done')) > 0
631 calendarize = len(self._form_data.get_all_str('calendarize')) > 0
632 comment = self._form_data.get_str('comment', ignore_strict=True)
633 for v in fill_fors.values():
635 for prefix in ['make_empty_', 'make_full_']:
636 if v.startswith(prefix):
638 target_id = int(v[len(prefix):])
639 except ValueError as e:
640 msg = 'bad fill_for target: {v}'
641 raise BadFormatException(msg) from e
643 if v.startswith('make_empty_'):
644 processes_to_make_empty += [target_id]
645 elif v.startswith('make_full_'):
646 processes_to_make_full += [target_id]
648 adopted_child_ids += [int(v)]
650 for child in todo.children:
651 assert isinstance(child.id_, int)
652 if child.id_ not in adopted_child_ids:
653 to_remove += [child.id_]
654 for id_ in to_remove:
655 child = Todo.by_id(self.conn, id_)
656 todo.remove_child(child)
657 for child_id in adopted_child_ids:
658 if child_id in [c.id_ for c in todo.children]:
660 child = Todo.by_id(self.conn, child_id)
661 todo.add_child(child)
662 for process_id in processes_to_make_empty:
663 process = Process.by_id(self.conn, process_id)
664 made = Todo(None, process, False, todo.date)
667 for process_id in processes_to_make_full:
668 made = Todo.create_with_children(self.conn, process_id, todo.date)
672 todo.set_conditions(self.conn, conditions)
673 todo.set_blockers(self.conn, blockers)
674 todo.set_enables(self.conn, enables)
675 todo.set_disables(self.conn, disables)
676 todo.is_done = is_done
677 todo.calendarize = calendarize
678 todo.comment = comment
680 return f'/todo?id={todo.id_}'
682 def do_POST_process_descriptions(self) -> str:
683 """Update history timestamps for Process.description."""
684 return self._change_versioned_timestamps(Process, 'description')
686 def do_POST_process_efforts(self) -> str:
687 """Update history timestamps for Process.effort."""
688 return self._change_versioned_timestamps(Process, 'effort')
690 def do_POST_process_titles(self) -> str:
691 """Update history timestamps for Process.title."""
692 return self._change_versioned_timestamps(Process, 'title')
694 @_delete_or_post(Process, '/processes')
695 def do_POST_process(self, process: Process) -> str:
696 """Update or insert Process of ?id= and fields defined in postvars."""
697 # pylint: disable=too-many-locals
698 # pylint: disable=too-many-statements
700 title = self._form_data.get_str('title')
701 description = self._form_data.get_str('description')
702 effort = self._form_data.get_float('effort')
703 except NotFoundException as e:
704 raise BadFormatException from e
705 conditions = self._form_data.get_all_int('conditions')
706 blockers = self._form_data.get_all_int('blockers')
707 enables = self._form_data.get_all_int('enables')
708 disables = self._form_data.get_all_int('disables')
709 calendarize = self._form_data.get_all_str('calendarize') != []
710 suppresses = self._form_data.get_all_int('suppresses')
711 step_of = self._form_data.get_all_str('step_of')
712 keep_steps = self._form_data.get_all_int('keep_step')
713 step_ids = self._form_data.get_all_int('steps')
714 new_top_steps = self._form_data.get_all_str('new_top_step')
715 step_process_id_to = {}
716 step_parent_id_to = {}
718 for step_id in step_ids:
719 name = f'new_step_to_{step_id}'
720 new_steps_to[step_id] = self._form_data.get_all_int(name)
721 for step_id in keep_steps:
722 name = f'step_{step_id}_process_id'
723 step_process_id_to[step_id] = self._form_data.get_int(name)
724 name = f'step_{step_id}_parent_id'
725 step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
726 process.title.set(title)
727 process.description.set(description)
728 process.effort.set(effort)
729 process.set_conditions(self.conn, conditions)
730 process.set_blockers(self.conn, blockers)
731 process.set_enables(self.conn, enables)
732 process.set_disables(self.conn, disables)
733 process.calendarize = calendarize
734 process.save(self.conn)
735 assert isinstance(process.id_, int)
736 new_step_title = None
737 steps: list[ProcessStep] = []
738 for step_id in keep_steps:
739 if step_id not in step_ids:
740 raise BadFormatException('trying to keep unknown step')
741 step = ProcessStep(step_id, process.id_,
742 step_process_id_to[step_id],
743 step_parent_id_to[step_id])
745 for step_id in step_ids:
746 new = [ProcessStep(None, process.id_, step_process_id, step_id)
747 for step_process_id in new_steps_to[step_id]]
749 for step_identifier in new_top_steps:
751 step_process_id = int(step_identifier)
752 step = ProcessStep(None, process.id_, step_process_id, None)
755 new_step_title = step_identifier
756 process.set_steps(self.conn, steps)
757 process.set_step_suppressions(self.conn, suppresses)
759 new_owner_title = None
760 for owner_identifier in step_of:
762 owners_to_set += [int(owner_identifier)]
764 new_owner_title = owner_identifier
765 process.set_owners(self.conn, owners_to_set)
766 params = f'id={process.id_}'
768 title_b64_encoded = b64encode(new_step_title.encode()).decode()
769 params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
770 elif new_owner_title:
771 title_b64_encoded = b64encode(new_owner_title.encode()).decode()
772 params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
773 process.save(self.conn)
774 return f'/process?{params}'
776 def do_POST_condition_descriptions(self) -> str:
777 """Update history timestamps for Condition.description."""
778 return self._change_versioned_timestamps(Condition, 'description')
780 def do_POST_condition_titles(self) -> str:
781 """Update history timestamps for Condition.title."""
782 return self._change_versioned_timestamps(Condition, 'title')
784 @_delete_or_post(Condition, '/conditions')
785 def do_POST_condition(self, condition: Condition) -> str:
786 """Update/insert Condition of ?id= and fields defined in postvars."""
788 is_active = self._form_data.get_str('is_active') == 'True'
789 title = self._form_data.get_str('title')
790 description = self._form_data.get_str('description')
791 except NotFoundException as e:
792 raise BadFormatException(e) from e
793 condition.is_active = is_active
794 condition.title.set(title)
795 condition.description.set(description)
796 condition.save(self.conn)
797 return f'/condition?id={condition.id_}'