1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from binascii import Error as binascii_Exception
7 from http.server import BaseHTTPRequestHandler
8 from http.server import HTTPServer
9 from urllib.parse import urlparse, parse_qs
10 from json import dumps as json_dumps
11 from os.path import split as path_split
12 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
13 from plomtask.dating import date_in_n_days
14 from plomtask.days import Day
15 from plomtask.exceptions import (HandledException, BadFormatException,
17 from plomtask.db import DatabaseConnection, DatabaseFile
18 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
19 from plomtask.conditions import Condition
20 from plomtask.todos import Todo
22 TEMPLATES_DIR = 'templates'
25 class TaskServer(HTTPServer):
26 """Variant of HTTPServer that knows .jinja as Jinja Environment."""
28 def __init__(self, db_file: DatabaseFile,
29 *args: Any, **kwargs: Any) -> None:
30 super().__init__(*args, **kwargs)
32 self.headers: list[tuple[str, str]] = []
33 self._render_mode = 'html'
34 self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
36 def set_json_mode(self) -> None:
37 """Make server send JSON instead of HTML responses."""
38 self._render_mode = 'json'
39 self.headers += [('Content-Type', 'application/json')]
42 def ctx_to_json(ctx: dict[str, object]) -> str:
43 """Render ctx into JSON string."""
44 def walk_ctx(node: object) -> Any:
45 if hasattr(node, 'as_dict_into_reference'):
46 if hasattr(node, 'id_') and node.id_ is not None:
47 return node.as_dict_into_reference(library)
48 if hasattr(node, 'as_dict'):
50 if isinstance(node, (list, tuple)):
51 return [walk_ctx(x) for x in node]
52 if isinstance(node, dict):
54 for k, v in node.items():
57 if isinstance(node, HandledException):
60 library: dict[str, dict[str | int, object]] = {}
61 for k, v in ctx.items():
63 ctx['_library'] = library
64 return json_dumps(ctx)
66 def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
67 """Render ctx according to self._render_mode.."""
68 tmpl_name = f'{tmpl_name}.{self._render_mode}'
69 if 'html' == self._render_mode:
70 template = self._jinja.get_template(tmpl_name)
71 return template.render(ctx)
72 return self.__class__.ctx_to_json(ctx)
76 """Wrapper for validating and retrieving dict-like HTTP inputs."""
78 def __init__(self, dict_: dict[str, list[str]],
79 strictness: bool = True) -> None:
81 self.strict = strictness
83 def get_str(self, key: str, default: str = '',
84 ignore_strict: bool = False) -> str:
85 """Retrieve single/first string value of key, or default."""
86 if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
87 if self.strict and not ignore_strict:
88 raise BadFormatException(f'no value found for key {key}')
90 return self.inputs[key][0]
92 def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
93 """Retrieve dict of (first) strings at key starting with prefix."""
95 for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
96 ret[key] = self.inputs[key][0]
99 def get_int(self, key: str) -> int:
100 """Retrieve single/first value of key as int, error if empty."""
101 val = self.get_int_or_none(key)
103 raise BadFormatException(f'unexpected empty value for: {key}')
106 def get_int_or_none(self, key: str) -> int | None:
107 """Retrieve single/first value of key as int, return None if empty."""
108 val = self.get_str(key, ignore_strict=True)
113 except ValueError as e:
114 msg = f'cannot int form field value for key {key}: {val}'
115 raise BadFormatException(msg) from e
117 def get_float(self, key: str) -> float:
118 """Retrieve float value of key from self.postvars."""
119 val = self.get_str(key)
122 except ValueError as e:
123 msg = f'cannot float form field value for key {key}: {val}'
124 raise BadFormatException(msg) from e
126 def get_all_str(self, key: str) -> list[str]:
127 """Retrieve list of string values at key."""
128 if key not in self.inputs.keys():
130 return self.inputs[key]
132 def get_all_int(self, key: str) -> list[int]:
133 """Retrieve list of int values at key."""
134 all_str = self.get_all_str(key)
136 return [int(s) for s in all_str if len(s) > 0]
137 except ValueError as e:
138 msg = f'cannot int a form field value for key {key} in: {all_str}'
139 raise BadFormatException(msg) from e
141 def get_all_floats_or_nones(self, key: str) -> list[float | None]:
142 """Retrieve list of float value at key, None if empty strings."""
143 ret: list[float | None] = []
144 for val in self.get_all_str(key):
150 except ValueError as e:
151 msg = f'cannot float form field value for key {key}: {val}'
152 raise BadFormatException(msg) from e
156 class TaskHandler(BaseHTTPRequestHandler):
157 """Handles single HTTP request."""
158 # pylint: disable=too-many-public-methods
160 conn: DatabaseConnection
162 _form_data: InputsParser
163 _params: InputsParser
170 """Send ctx as proper HTTP response."""
171 body = self.server.render(ctx, tmpl_name)
172 self.send_response(code)
173 for header_tuple in self.server.headers:
174 self.send_header(*header_tuple)
176 self.wfile.write(bytes(body, 'utf-8'))
179 def _request_wrapper(http_method: str, not_found_msg: str
180 ) -> Callable[..., Callable[[TaskHandler], None]]:
181 """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
183 Among other things, conditionally cleans all caches, but only on POST
184 requests, as only those are expected to change the states of objects
185 that may be cached, and certainly only those are expected to write any
186 changes to the database. We want to call them as early though as
187 possible here, either exactly after the specific request handler
188 returns successfully, or right after any exception is triggered –
189 otherwise, race conditions become plausible.
191 Note that otherwise any POST attempt, even a failed one, may end in
192 problematic inconsistencies:
194 - if the POST handler experiences an Exception, changes to objects
195 won't get written to the DB, but the changed objects may remain in
196 the cache and affect other objects despite their possibly illegal
199 - even if an object was just saved to the DB, we cannot be sure its
200 current state is completely identical to what we'd get if loading it
201 fresh from the DB (e.g. currently Process.n_owners is only updated
202 when loaded anew via .from_table_row, nor is its state written to
203 the DB by .save; a questionable design choice, but proof that we
204 have no guarantee that objects' .save stores all their states we'd
205 prefer at their most up-to-date.
208 def clear_caches() -> None:
209 for cls in (Day, Todo, Condition, Process, ProcessStep):
210 assert hasattr(cls, 'empty_cache')
213 def decorator(f: Callable[..., str | None]
214 ) -> Callable[[TaskHandler], None]:
215 def wrapper(self: TaskHandler) -> None:
216 # pylint: disable=protected-access
217 # (because pylint here fails to detect the use of wrapper as a
218 # method to self with respective access privileges)
220 self.conn = DatabaseConnection(self.server.db)
221 parsed_url = urlparse(self.path)
222 self._site = path_split(parsed_url.path)[1]
223 params = parse_qs(parsed_url.query, strict_parsing=True)
224 self._params = InputsParser(params, False)
225 handler_name = f'do_{http_method}_{self._site}'
226 if hasattr(self, handler_name):
227 handler = getattr(self, handler_name)
228 redir_target = f(self, handler)
229 if 'POST' == http_method:
232 self.send_response(302)
233 self.send_header('Location', redir_target)
236 msg = f'{not_found_msg}: {self._site}'
237 raise NotFoundException(msg)
238 except HandledException as error:
239 if 'POST' == http_method:
242 self._send_page(ctx, 'msg', error.http_code)
248 @_request_wrapper('GET', 'Unknown page')
249 def do_GET(self, handler: Callable[[], str | dict[str, object]]
251 """Render page with result of handler, or redirect if result is str."""
252 tmpl_name = f'{self._site}'
253 ctx_or_redir_target = handler()
254 if isinstance(ctx_or_redir_target, str):
255 return ctx_or_redir_target
256 self._send_page(ctx_or_redir_target, tmpl_name)
259 @_request_wrapper('POST', 'Unknown POST target')
260 def do_POST(self, handler: Callable[[], str]) -> str:
261 """Handle POST with handler, prepare redirection to result."""
262 length = int(self.headers['content-length'])
263 postvars = parse_qs(self.rfile.read(length).decode(),
264 keep_blank_values=True, strict_parsing=True)
265 self._form_data = InputsParser(postvars)
266 redir_target = handler()
273 def _get_item(target_class: Any
274 ) -> Callable[..., Callable[[TaskHandler],
276 def decorator(f: Callable[..., dict[str, object]]
277 ) -> Callable[[TaskHandler], dict[str, object]]:
278 def wrapper(self: TaskHandler) -> dict[str, object]:
279 # pylint: disable=protected-access
280 # (because pylint here fails to detect the use of wrapper as a
281 # method to self with respective access privileges)
282 id_ = self._params.get_int_or_none('id')
283 if target_class.can_create_by_id:
284 item = target_class.by_id_or_create(self.conn, id_)
286 item = target_class.by_id(self.conn, id_)
291 def do_GET_(self) -> str:
292 """Return redirect target on GET /."""
295 def _do_GET_calendar(self) -> dict[str, object]:
296 """Show Days from ?start= to ?end=.
298 Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
299 same, the only difference being the HTML template they are rendered to,
300 which .do_GET selects from their method name.
302 start, end = self._params.get_str('start'), self._params.get_str('end')
303 end = end if end else date_in_n_days(366)
304 days, start, end = Day.by_date_range_with_limits(self.conn,
306 days = Day.with_filled_gaps(days, start, end)
307 today = date_in_n_days(0)
308 return {'start': start, 'end': end, 'days': days, 'today': today}
310 def do_GET_calendar(self) -> dict[str, object]:
311 """Show Days from ?start= to ?end= – normal view."""
312 return self._do_GET_calendar()
314 def do_GET_calendar_txt(self) -> dict[str, object]:
315 """Show Days from ?start= to ?end= – minimalist view."""
316 return self._do_GET_calendar()
318 def do_GET_day(self) -> dict[str, object]:
319 """Show single Day of ?date=."""
320 date = self._params.get_str('date', date_in_n_days(0))
321 day = Day.by_id_or_create(self.conn, date)
322 make_type = self._params.get_str('make_type')
323 conditions_present = []
326 for todo in day.todos:
327 for condition in todo.conditions + todo.blockers:
328 if condition not in conditions_present:
329 conditions_present += [condition]
330 enablers_for[condition.id_] = [p for p in
331 Process.all(self.conn)
332 if condition in p.enables]
333 disablers_for[condition.id_] = [p for p in
334 Process.all(self.conn)
335 if condition in p.disables]
336 seen_todos: set[int] = set()
337 top_nodes = [t.get_step_tree(seen_todos)
338 for t in day.todos if not t.parents]
340 'top_nodes': top_nodes,
341 'make_type': make_type,
342 'enablers_for': enablers_for,
343 'disablers_for': disablers_for,
344 'conditions_present': conditions_present,
345 'processes': Process.all(self.conn)}
348 def do_GET_todo(self, todo: Todo) -> dict[str, object]:
349 """Show single Todo of ?id=."""
353 """Collect what's useful for Todo steps tree display."""
356 process: Process | None
357 children: list[TodoStepsNode] # pylint: disable=undefined-variable
358 fillable: bool = False
360 def walk_process_steps(id_: int,
361 process_step_nodes: list[ProcessStepsNode],
362 steps_nodes: list[TodoStepsNode]) -> None:
363 for process_step_node in process_step_nodes:
365 node = TodoStepsNode(id_, None, process_step_node.process, [])
366 steps_nodes += [node]
367 walk_process_steps(id_, list(process_step_node.steps.values()),
370 def walk_todo_steps(id_: int, todos: list[Todo],
371 steps_nodes: list[TodoStepsNode]) -> None:
374 for match in [item for item in steps_nodes
376 and item.process == todo.process]:
379 for child in match.children:
380 child.fillable = True
381 walk_todo_steps(id_, todo.children, match.children)
384 node = TodoStepsNode(id_, todo, None, [])
385 steps_nodes += [node]
386 walk_todo_steps(id_, todo.children, node.children)
388 def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
391 for node in steps_nodes:
393 assert isinstance(node.process, Process)
394 assert isinstance(node.process.id_, int)
395 ids.add(node.process.id_)
396 ids = ids | collect_adoptables_keys(node.children)
399 todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
400 process_tree = todo.process.get_steps(self.conn, None)
401 steps_todo_to_process: list[TodoStepsNode] = []
402 walk_process_steps(0, list(process_tree.values()),
403 steps_todo_to_process)
404 for steps_node in steps_todo_to_process:
405 steps_node.fillable = True
406 walk_todo_steps(len(steps_todo_to_process), todo_steps,
407 steps_todo_to_process)
408 adoptables: dict[int, list[Todo]] = {}
409 any_adoptables = [Todo.by_id(self.conn, t.id_)
410 for t in Todo.by_date(self.conn, todo.date)
413 for id_ in collect_adoptables_keys(steps_todo_to_process):
414 adoptables[id_] = [t for t in any_adoptables
415 if t.process.id_ == id_]
416 return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
417 'adoption_candidates_for': adoptables,
418 'process_candidates': Process.all(self.conn),
419 'todo_candidates': any_adoptables,
420 'condition_candidates': Condition.all(self.conn)}
422 def do_GET_todos(self) -> dict[str, object]:
423 """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
424 sort_by = self._params.get_str('sort_by')
425 start = self._params.get_str('start')
426 end = self._params.get_str('end')
427 process_id = self._params.get_int_or_none('process_id')
428 comment_pattern = self._params.get_str('comment_pattern')
430 ret = Todo.by_date_range_with_limits(self.conn, (start, end))
431 todos_by_date_range, start, end = ret
432 todos = [t for t in todos_by_date_range
433 if comment_pattern in t.comment
434 and ((not process_id) or t.process.id_ == process_id)]
435 sort_by = Todo.sort_by(todos, sort_by)
436 return {'start': start, 'end': end, 'process_id': process_id,
437 'comment_pattern': comment_pattern, 'todos': todos,
438 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
440 def do_GET_conditions(self) -> dict[str, object]:
441 """Show all Conditions."""
442 pattern = self._params.get_str('pattern')
443 sort_by = self._params.get_str('sort_by')
444 conditions = Condition.matching(self.conn, pattern)
445 sort_by = Condition.sort_by(conditions, sort_by)
446 return {'conditions': conditions,
450 @_get_item(Condition)
451 def do_GET_condition(self, c: Condition) -> dict[str, object]:
452 """Show Condition of ?id=."""
453 ps = Process.all(self.conn)
454 return {'condition': c, 'is_new': c.id_ is None,
455 'enabled_processes': [p for p in ps if c in p.conditions],
456 'disabled_processes': [p for p in ps if c in p.blockers],
457 'enabling_processes': [p for p in ps if c in p.enables],
458 'disabling_processes': [p for p in ps if c in p.disables]}
460 @_get_item(Condition)
461 def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
462 """Show title history of Condition of ?id=."""
463 return {'condition': c}
465 @_get_item(Condition)
466 def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
467 """Show description historys of Condition of ?id=."""
468 return {'condition': c}
471 def do_GET_process(self, process: Process) -> dict[str, object]:
472 """Show Process of ?id=."""
473 owner_ids = self._params.get_all_int('step_to')
474 owned_ids = self._params.get_all_int('has_step')
475 title_64 = self._params.get_str('title_b64')
478 title = b64decode(title_64.encode()).decode()
479 except binascii_Exception as exc:
480 msg = 'invalid base64 for ?title_b64='
481 raise BadFormatException(msg) from exc
482 process.title.set(title)
483 preset_top_step = None
484 owners = process.used_as_step_by(self.conn)
485 for step_id in owner_ids:
486 owners += [Process.by_id(self.conn, step_id)]
487 for process_id in owned_ids:
488 Process.by_id(self.conn, process_id) # to ensure ID exists
489 preset_top_step = process_id
490 return {'process': process, 'is_new': process.id_ is None,
491 'preset_top_step': preset_top_step,
492 'steps': process.get_steps(self.conn), 'owners': owners,
493 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
494 'process_candidates': Process.all(self.conn),
495 'condition_candidates': Condition.all(self.conn)}
498 def do_GET_process_titles(self, p: Process) -> dict[str, object]:
499 """Show title history of Process of ?id=."""
500 return {'process': p}
503 def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
504 """Show description historys of Process of ?id=."""
505 return {'process': p}
508 def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
509 """Show default effort history of Process of ?id=."""
510 return {'process': p}
512 def do_GET_processes(self) -> dict[str, object]:
513 """Show all Processes."""
514 pattern = self._params.get_str('pattern')
515 sort_by = self._params.get_str('sort_by')
516 processes = Process.matching(self.conn, pattern)
517 sort_by = Process.sort_by(processes, sort_by)
518 return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
523 def _delete_or_post(target_class: Any, redir_target: str = '/'
524 ) -> Callable[..., Callable[[TaskHandler], str]]:
525 def decorator(f: Callable[..., str]
526 ) -> Callable[[TaskHandler], str]:
527 def wrapper(self: TaskHandler) -> str:
528 # pylint: disable=protected-access
529 # (because pylint here fails to detect the use of wrapper as a
530 # method to self with respective access privileges)
531 id_ = self._params.get_int_or_none('id')
532 for _ in self._form_data.get_all_str('delete'):
534 msg = 'trying to delete non-saved ' +\
535 f'{target_class.__name__}'
536 raise NotFoundException(msg)
537 item = target_class.by_id(self.conn, id_)
538 item.remove(self.conn)
540 if target_class.can_create_by_id:
541 item = target_class.by_id_or_create(self.conn, id_)
543 item = target_class.by_id(self.conn, id_)
548 def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
549 """Update history timestamps for VersionedAttribute."""
550 id_ = self._params.get_int_or_none('id')
551 item = cls.by_id(self.conn, id_)
552 attr = getattr(item, attr_name)
553 for k, v in self._form_data.get_first_strings_starting('at:').items():
556 attr.reset_timestamp(old, f'{v}.0')
558 return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
560 def do_POST_day(self) -> str:
561 """Update or insert Day of date and Todos mapped to it."""
562 # pylint: disable=too-many-locals
563 date = self._params.get_str('date')
564 day_comment = self._form_data.get_str('day_comment')
565 make_type = self._form_data.get_str('make_type')
566 old_todos = self._form_data.get_all_int('todo_id')
567 new_todos = self._form_data.get_all_int('new_todo')
568 comments = self._form_data.get_all_str('comment')
569 efforts = self._form_data.get_all_floats_or_nones('effort')
570 done_todos = self._form_data.get_all_int('done')
571 for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
572 raise BadFormatException('"done" field refers to unknown Todo')
573 is_done = [t_id in done_todos for t_id in old_todos]
574 if not (len(old_todos) == len(is_done) == len(comments)
576 msg = 'not equal number each of number of todo_id, comments, ' +\
578 raise BadFormatException(msg)
579 day = Day.by_id_or_create(self.conn, date)
580 day.comment = day_comment
582 for process_id in sorted(new_todos):
583 if 'empty' == make_type:
584 process = Process.by_id(self.conn, process_id)
585 todo = Todo(None, process, False, date)
588 Todo.create_with_children(self.conn, process_id, date)
589 for i, todo_id in enumerate(old_todos):
590 todo = Todo.by_id(self.conn, todo_id)
591 todo.is_done = is_done[i]
592 todo.comment = comments[i]
593 todo.effort = efforts[i]
595 return f'/day?date={date}&make_type={make_type}'
597 @_delete_or_post(Todo, '/')
598 def do_POST_todo(self, todo: Todo) -> str:
599 """Update Todo and its children."""
600 # pylint: disable=too-many-locals
601 adopted_child_ids = self._form_data.get_all_int('adopt')
602 processes_to_make_full = self._form_data.get_all_int('make_full')
603 processes_to_make_empty = self._form_data.get_all_int('make_empty')
604 fill_fors = self._form_data.get_first_strings_starting('fill_for_')
605 effort = self._form_data.get_str('effort', ignore_strict=True)
606 conditions = self._form_data.get_all_int('conditions')
607 disables = self._form_data.get_all_int('disables')
608 blockers = self._form_data.get_all_int('blockers')
609 enables = self._form_data.get_all_int('enables')
610 is_done = len(self._form_data.get_all_str('done')) > 0
611 calendarize = len(self._form_data.get_all_str('calendarize')) > 0
612 comment = self._form_data.get_str('comment', ignore_strict=True)
613 for v in fill_fors.values():
614 if v.startswith('make_empty_'):
615 processes_to_make_empty += [int(v[11:])]
616 elif v.startswith('make_full_'):
617 processes_to_make_full += [int(v[10:])]
619 adopted_child_ids += [int(v)]
621 for child in todo.children:
622 assert isinstance(child.id_, int)
623 if child.id_ not in adopted_child_ids:
624 to_remove += [child.id_]
625 for id_ in to_remove:
626 child = Todo.by_id(self.conn, id_)
627 todo.remove_child(child)
628 for child_id in adopted_child_ids:
629 if child_id in [c.id_ for c in todo.children]:
631 child = Todo.by_id(self.conn, child_id)
632 todo.add_child(child)
633 for process_id in processes_to_make_empty:
634 process = Process.by_id(self.conn, process_id)
635 made = Todo(None, process, False, todo.date)
638 for process_id in processes_to_make_full:
639 made = Todo.create_with_children(self.conn, process_id, todo.date)
641 todo.effort = float(effort) if effort else None
642 todo.set_conditions(self.conn, conditions)
643 todo.set_blockers(self.conn, blockers)
644 todo.set_enables(self.conn, enables)
645 todo.set_disables(self.conn, disables)
646 todo.is_done = is_done
647 todo.calendarize = calendarize
648 todo.comment = comment
650 return f'/todo?id={todo.id_}'
652 def do_POST_process_descriptions(self) -> str:
653 """Update history timestamps for Process.description."""
654 return self._change_versioned_timestamps(Process, 'description')
656 def do_POST_process_efforts(self) -> str:
657 """Update history timestamps for Process.effort."""
658 return self._change_versioned_timestamps(Process, 'effort')
660 def do_POST_process_titles(self) -> str:
661 """Update history timestamps for Process.title."""
662 return self._change_versioned_timestamps(Process, 'title')
664 @_delete_or_post(Process, '/processes')
665 def do_POST_process(self, process: Process) -> str:
666 """Update or insert Process of ?id= and fields defined in postvars."""
667 # pylint: disable=too-many-locals
668 # pylint: disable=too-many-statements
669 title = self._form_data.get_str('title')
670 description = self._form_data.get_str('description')
671 effort = self._form_data.get_float('effort')
672 conditions = self._form_data.get_all_int('conditions')
673 blockers = self._form_data.get_all_int('blockers')
674 enables = self._form_data.get_all_int('enables')
675 disables = self._form_data.get_all_int('disables')
676 calendarize = self._form_data.get_all_str('calendarize') != []
677 suppresses = self._form_data.get_all_int('suppresses')
678 step_of = self._form_data.get_all_str('step_of')
679 keep_steps = self._form_data.get_all_int('keep_step')
680 step_ids = self._form_data.get_all_int('steps')
681 new_top_steps = self._form_data.get_all_str('new_top_step')
682 step_process_id_to = {}
683 step_parent_id_to = {}
685 for step_id in step_ids:
686 name = f'new_step_to_{step_id}'
687 new_steps_to[step_id] = self._form_data.get_all_int(name)
688 for step_id in keep_steps:
689 name = f'step_{step_id}_process_id'
690 step_process_id_to[step_id] = self._form_data.get_int(name)
691 name = f'step_{step_id}_parent_id'
692 step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
693 process.title.set(title)
694 process.description.set(description)
695 process.effort.set(effort)
696 process.set_conditions(self.conn, conditions)
697 process.set_blockers(self.conn, blockers)
698 process.set_enables(self.conn, enables)
699 process.set_disables(self.conn, disables)
700 process.calendarize = calendarize
701 process.save(self.conn)
702 assert isinstance(process.id_, int)
703 new_step_title = None
704 steps: list[ProcessStep] = []
705 for step_id in keep_steps:
706 if step_id not in step_ids:
707 raise BadFormatException('trying to keep unknown step')
708 step = ProcessStep(step_id, process.id_,
709 step_process_id_to[step_id],
710 step_parent_id_to[step_id])
712 for step_id in step_ids:
713 new = [ProcessStep(None, process.id_, step_process_id, step_id)
714 for step_process_id in new_steps_to[step_id]]
716 for step_identifier in new_top_steps:
718 step_process_id = int(step_identifier)
719 step = ProcessStep(None, process.id_, step_process_id, None)
722 new_step_title = step_identifier
723 process.set_steps(self.conn, steps)
724 process.set_step_suppressions(self.conn, suppresses)
726 new_owner_title = None
727 for owner_identifier in step_of:
729 owners_to_set += [int(owner_identifier)]
731 new_owner_title = owner_identifier
732 process.set_owners(self.conn, owners_to_set)
733 params = f'id={process.id_}'
735 title_b64_encoded = b64encode(new_step_title.encode()).decode()
736 params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
737 elif new_owner_title:
738 title_b64_encoded = b64encode(new_owner_title.encode()).decode()
739 params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
740 process.save(self.conn)
741 return f'/process?{params}'
743 def do_POST_condition_descriptions(self) -> str:
744 """Update history timestamps for Condition.description."""
745 return self._change_versioned_timestamps(Condition, 'description')
747 def do_POST_condition_titles(self) -> str:
748 """Update history timestamps for Condition.title."""
749 return self._change_versioned_timestamps(Condition, 'title')
751 @_delete_or_post(Condition, '/conditions')
752 def do_POST_condition(self, condition: Condition) -> str:
753 """Update/insert Condition of ?id= and fields defined in postvars."""
754 is_active = self._form_data.get_str('is_active') == 'True'
755 title = self._form_data.get_str('title')
756 description = self._form_data.get_str('description')
757 condition.is_active = is_active
758 condition.title.set(title)
759 condition.description.set(description)
760 condition.save(self.conn)
761 return f'/condition?id={condition.id_}'