1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
16 from plomtask.db import DatabaseConnection, DatabaseFile
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo
21 TEMPLATES_DIR = 'templates'
24 class TaskServer(HTTPServer):
25 """Variant of HTTPServer that knows .jinja as Jinja Environment."""
27 def __init__(self, db_file: DatabaseFile,
28 *args: Any, **kwargs: Any) -> None:
29 super().__init__(*args, **kwargs)
31 self.headers: list[tuple[str, str]] = []
32 self._render_mode = 'html'
33 self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
35 def set_json_mode(self) -> None:
36 """Make server send JSON instead of HTML responses."""
37 self._render_mode = 'json'
38 self.headers += [('Content-Type', 'application/json')]
41 def ctx_to_json(ctx: dict[str, object]) -> str:
42 """Render ctx into JSON string."""
43 def walk_ctx(node: object) -> Any:
44 if hasattr(node, 'as_dict_into_reference'):
45 if hasattr(node, 'id_') and node.id_ is not None:
46 return node.as_dict_into_reference(library)
47 if hasattr(node, 'as_dict'):
49 if isinstance(node, (list, tuple)):
50 return [walk_ctx(x) for x in node]
51 if isinstance(node, dict):
53 for k, v in node.items():
56 if isinstance(node, HandledException):
59 library: dict[str, dict[str | int, object]] = {}
60 for k, v in ctx.items():
62 ctx['_library'] = library
63 return json_dumps(ctx)
65 def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
66 """Render ctx according to self._render_mode.."""
67 tmpl_name = f'{tmpl_name}.{self._render_mode}'
68 if 'html' == self._render_mode:
69 template = self._jinja.get_template(tmpl_name)
70 return template.render(ctx)
71 return self.__class__.ctx_to_json(ctx)
75 """Wrapper for validating and retrieving dict-like HTTP inputs."""
77 def __init__(self, dict_: dict[str, list[str]],
78 strictness: bool = True) -> None:
80 self.strict = strictness
82 def get_str(self, key: str, default: str = '',
83 ignore_strict: bool = False) -> str:
84 """Retrieve single/first string value of key, or default."""
85 if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
86 if self.strict and not ignore_strict:
87 raise BadFormatException(f'no value found for key {key}')
89 return self.inputs[key][0]
91 def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
92 """Retrieve dict of (first) strings at key starting with prefix."""
94 for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
95 ret[key] = self.inputs[key][0]
98 def get_int(self, key: str) -> int:
99 """Retrieve single/first value of key as int, error if empty."""
100 val = self.get_int_or_none(key)
102 raise BadFormatException(f'unexpected empty value for: {key}')
105 def get_int_or_none(self, key: str) -> int | None:
106 """Retrieve single/first value of key as int, return None if empty."""
107 val = self.get_str(key, ignore_strict=True)
112 except ValueError as e:
113 msg = f'cannot int form field value for key {key}: {val}'
114 raise BadFormatException(msg) from e
116 def get_float(self, key: str) -> float:
117 """Retrieve float value of key from self.postvars."""
118 val = self.get_str(key)
121 except ValueError as e:
122 msg = f'cannot float form field value for key {key}: {val}'
123 raise BadFormatException(msg) from e
125 def get_all_str(self, key: str) -> list[str]:
126 """Retrieve list of string values at key."""
127 if key not in self.inputs.keys():
129 return self.inputs[key]
131 def get_all_int(self, key: str) -> list[int]:
132 """Retrieve list of int values at key."""
133 all_str = self.get_all_str(key)
135 return [int(s) for s in all_str if len(s) > 0]
136 except ValueError as e:
137 msg = f'cannot int a form field value for key {key} in: {all_str}'
138 raise BadFormatException(msg) from e
140 def get_all_floats_or_nones(self, key: str) -> list[float | None]:
141 """Retrieve list of float value at key, None if empty strings."""
142 ret: list[float | None] = []
143 for val in self.get_all_str(key):
149 except ValueError as e:
150 msg = f'cannot float form field value for key {key}: {val}'
151 raise BadFormatException(msg) from e
155 class TaskHandler(BaseHTTPRequestHandler):
156 """Handles single HTTP request."""
157 # pylint: disable=too-many-public-methods
159 conn: DatabaseConnection
161 _form_data: InputsParser
162 _params: InputsParser
169 """Send ctx as proper HTTP response."""
170 body = self.server.render(ctx, tmpl_name)
171 self.send_response(code)
172 for header_tuple in self.server.headers:
173 self.send_header(*header_tuple)
175 self.wfile.write(bytes(body, 'utf-8'))
178 def _request_wrapper(http_method: str, not_found_msg: str
179 ) -> Callable[..., Callable[[TaskHandler], None]]:
180 """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
182 Among other things, conditionally cleans all caches, but only on POST
183 requests, as only those are expected to change the states of objects
184 that may be cached, and certainly only those are expected to write any
185 changes to the database. We want to call them as early though as
186 possible here, either exactly after the specific request handler
187 returns successfully, or right after any exception is triggered –
188 otherwise, race conditions become plausible.
190 Note that any POST attempt, even a failed one, may end in problematic
193 - if the POST handler experiences an Exception, changes to objects
194 won't get written to the DB, but the changed objects may remain in
195 the cache and affect other objects despite their possibly illegal
198 - even if an object was just saved to the DB, we cannot be sure its
199 current state is completely identical to what we'd get if loading it
200 fresh from the DB (e.g. currently Process.n_owners is only updated
201 when loaded anew via .from_table_row, nor is its state written to
202 the DB by .save; a questionable design choice, but proof that we
203 have no guarantee that objects' .save stores all their states we'd
204 prefer at their most up-to-date.
207 def clear_caches() -> None:
208 for cls in (Day, Todo, Condition, Process, ProcessStep):
209 assert hasattr(cls, 'empty_cache')
212 def decorator(f: Callable[..., str | None]
213 ) -> Callable[[TaskHandler], None]:
214 def wrapper(self: TaskHandler) -> None:
215 # pylint: disable=protected-access
216 # (because pylint here fails to detect the use of wrapper as a
217 # method to self with respective access privileges)
219 self.conn = DatabaseConnection(self.server.db)
220 parsed_url = urlparse(self.path)
221 self._site = path_split(parsed_url.path)[1]
222 params = parse_qs(parsed_url.query, strict_parsing=True)
223 self._params = InputsParser(params, False)
224 handler_name = f'do_{http_method}_{self._site}'
225 if hasattr(self, handler_name):
226 handler = getattr(self, handler_name)
227 redir_target = f(self, handler)
228 if 'POST' != http_method:
231 self.send_response(302)
232 self.send_header('Location', redir_target)
235 msg = f'{not_found_msg}: {self._site}'
236 raise NotFoundException(msg)
237 except HandledException as error:
238 if 'POST' != http_method:
241 self._send_page(ctx, 'msg', error.http_code)
247 @_request_wrapper('GET', 'Unknown page')
248 def do_GET(self, handler: Callable[[], str | dict[str, object]]
250 """Render page with result of handler, or redirect if result is str."""
251 tmpl_name = f'{self._site}'
252 ctx_or_redir_target = handler()
253 if isinstance(ctx_or_redir_target, str):
254 return ctx_or_redir_target
255 self._send_page(ctx_or_redir_target, tmpl_name)
258 @_request_wrapper('POST', 'Unknown POST target')
259 def do_POST(self, handler: Callable[[], str]) -> str:
260 """Handle POST with handler, prepare redirection to result."""
261 length = int(self.headers['content-length'])
262 postvars = parse_qs(self.rfile.read(length).decode(),
263 keep_blank_values=True, strict_parsing=True)
264 self._form_data = InputsParser(postvars)
265 redir_target = handler()
272 def _get_item(target_class: Any
273 ) -> Callable[..., Callable[[TaskHandler],
275 def decorator(f: Callable[..., dict[str, object]]
276 ) -> Callable[[TaskHandler], dict[str, object]]:
277 def wrapper(self: TaskHandler) -> dict[str, object]:
278 # pylint: disable=protected-access
279 # (because pylint here fails to detect the use of wrapper as a
280 # method to self with respective access privileges)
281 id_ = self._params.get_int_or_none('id')
282 if target_class.can_create_by_id:
283 item = target_class.by_id_or_create(self.conn, id_)
285 item = target_class.by_id(self.conn, id_)
290 def do_GET_(self) -> str:
291 """Return redirect target on GET /."""
294 def _do_GET_calendar(self) -> dict[str, object]:
295 """Show Days from ?start= to ?end=.
297 Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
298 same, the only difference being the HTML template they are rendered to,
299 which .do_GET selects from their method name.
301 start = self._params.get_str('start')
302 end = self._params.get_str('end')
304 end = date_in_n_days(366)
305 ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
306 days, start, end = ret
307 days = Day.with_filled_gaps(days, start, end)
308 today = date_in_n_days(0)
309 return {'start': start, 'end': end, 'days': days, 'today': today}
311 def do_GET_calendar(self) -> dict[str, object]:
312 """Show Days from ?start= to ?end= – normal view."""
313 return self._do_GET_calendar()
315 def do_GET_calendar_txt(self) -> dict[str, object]:
316 """Show Days from ?start= to ?end= – minimalist view."""
317 return self._do_GET_calendar()
319 def do_GET_day(self) -> dict[str, object]:
320 """Show single Day of ?date=."""
321 date = self._params.get_str('date', date_in_n_days(0))
322 day = Day.by_id_or_create(self.conn, date)
323 make_type = self._params.get_str('make_type')
324 conditions_present = []
327 for todo in day.todos:
328 for condition in todo.conditions + todo.blockers:
329 if condition not in conditions_present:
330 conditions_present += [condition]
331 enablers_for[condition.id_] = [p for p in
332 Process.all(self.conn)
333 if condition in p.enables]
334 disablers_for[condition.id_] = [p for p in
335 Process.all(self.conn)
336 if condition in p.disables]
337 seen_todos: set[int] = set()
338 top_nodes = [t.get_step_tree(seen_todos)
339 for t in day.todos if not t.parents]
341 'top_nodes': top_nodes,
342 'make_type': make_type,
343 'enablers_for': enablers_for,
344 'disablers_for': disablers_for,
345 'conditions_present': conditions_present,
346 'processes': Process.all(self.conn)}
349 def do_GET_todo(self, todo: Todo) -> dict[str, object]:
350 """Show single Todo of ?id=."""
354 """Collect what's useful for Todo steps tree display."""
357 process: Process | None
358 children: list[TodoStepsNode] # pylint: disable=undefined-variable
359 fillable: bool = False
361 def walk_process_steps(id_: int,
362 process_step_nodes: list[ProcessStepsNode],
363 steps_nodes: list[TodoStepsNode]) -> None:
364 for process_step_node in process_step_nodes:
366 node = TodoStepsNode(id_, None, process_step_node.process, [])
367 steps_nodes += [node]
368 walk_process_steps(id_, list(process_step_node.steps.values()),
371 def walk_todo_steps(id_: int, todos: list[Todo],
372 steps_nodes: list[TodoStepsNode]) -> None:
375 for match in [item for item in steps_nodes
377 and item.process == todo.process]:
380 for child in match.children:
381 child.fillable = True
382 walk_todo_steps(id_, todo.children, match.children)
385 node = TodoStepsNode(id_, todo, None, [])
386 steps_nodes += [node]
387 walk_todo_steps(id_, todo.children, node.children)
389 def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
392 for node in steps_nodes:
394 assert isinstance(node.process, Process)
395 assert isinstance(node.process.id_, int)
396 ids.add(node.process.id_)
397 ids = ids | collect_adoptables_keys(node.children)
400 todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
401 process_tree = todo.process.get_steps(self.conn, None)
402 steps_todo_to_process: list[TodoStepsNode] = []
403 walk_process_steps(0, list(process_tree.values()),
404 steps_todo_to_process)
405 for steps_node in steps_todo_to_process:
406 steps_node.fillable = True
407 walk_todo_steps(len(steps_todo_to_process), todo_steps,
408 steps_todo_to_process)
409 adoptables: dict[int, list[Todo]] = {}
410 any_adoptables = [Todo.by_id(self.conn, t.id_)
411 for t in Todo.by_date(self.conn, todo.date)
414 for id_ in collect_adoptables_keys(steps_todo_to_process):
415 adoptables[id_] = [t for t in any_adoptables
416 if t.process.id_ == id_]
417 return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
418 'adoption_candidates_for': adoptables,
419 'process_candidates': Process.all(self.conn),
420 'todo_candidates': any_adoptables,
421 'condition_candidates': Condition.all(self.conn)}
423 def do_GET_todos(self) -> dict[str, object]:
424 """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
425 sort_by = self._params.get_str('sort_by')
426 start = self._params.get_str('start')
427 end = self._params.get_str('end')
428 process_id = self._params.get_int_or_none('process_id')
429 comment_pattern = self._params.get_str('comment_pattern')
431 ret = Todo.by_date_range_with_limits(self.conn, (start, end))
432 todos_by_date_range, start, end = ret
433 todos = [t for t in todos_by_date_range
434 if comment_pattern in t.comment
435 and ((not process_id) or t.process.id_ == process_id)]
436 sort_by = Todo.sort_by(todos, sort_by)
437 return {'start': start, 'end': end, 'process_id': process_id,
438 'comment_pattern': comment_pattern, 'todos': todos,
439 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
441 def do_GET_conditions(self) -> dict[str, object]:
442 """Show all Conditions."""
443 pattern = self._params.get_str('pattern')
444 sort_by = self._params.get_str('sort_by')
445 conditions = Condition.matching(self.conn, pattern)
446 sort_by = Condition.sort_by(conditions, sort_by)
447 return {'conditions': conditions,
451 @_get_item(Condition)
452 def do_GET_condition(self, c: Condition) -> dict[str, object]:
453 """Show Condition of ?id=."""
454 ps = Process.all(self.conn)
455 return {'condition': c, 'is_new': c.id_ is None,
456 'enabled_processes': [p for p in ps if c in p.conditions],
457 'disabled_processes': [p for p in ps if c in p.blockers],
458 'enabling_processes': [p for p in ps if c in p.enables],
459 'disabling_processes': [p for p in ps if c in p.disables]}
461 @_get_item(Condition)
462 def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
463 """Show title history of Condition of ?id=."""
464 return {'condition': c}
466 @_get_item(Condition)
467 def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
468 """Show description historys of Condition of ?id=."""
469 return {'condition': c}
472 def do_GET_process(self, process: Process) -> dict[str, object]:
473 """Show Process of ?id=."""
474 owner_ids = self._params.get_all_int('step_to')
475 owned_ids = self._params.get_all_int('has_step')
476 title_64 = self._params.get_str('title_b64')
478 title = b64decode(title_64.encode()).decode()
479 process.title.set(title)
480 owners = process.used_as_step_by(self.conn)
481 for step_id in owner_ids:
482 owners += [Process.by_id(self.conn, step_id)]
483 preset_top_step = None
484 for process_id in owned_ids:
485 preset_top_step = process_id
486 return {'process': process, 'is_new': process.id_ is None,
487 'preset_top_step': preset_top_step,
488 'steps': process.get_steps(self.conn), 'owners': owners,
489 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
490 'process_candidates': Process.all(self.conn),
491 'condition_candidates': Condition.all(self.conn)}
494 def do_GET_process_titles(self, p: Process) -> dict[str, object]:
495 """Show title history of Process of ?id=."""
496 return {'process': p}
499 def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
500 """Show description historys of Process of ?id=."""
501 return {'process': p}
504 def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
505 """Show default effort history of Process of ?id=."""
506 return {'process': p}
508 def do_GET_processes(self) -> dict[str, object]:
509 """Show all Processes."""
510 pattern = self._params.get_str('pattern')
511 sort_by = self._params.get_str('sort_by')
512 processes = Process.matching(self.conn, pattern)
513 sort_by = Process.sort_by(processes, sort_by)
514 return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
519 def _delete_or_post(target_class: Any, redir_target: str = '/'
520 ) -> Callable[..., Callable[[TaskHandler], str]]:
521 def decorator(f: Callable[..., str]
522 ) -> Callable[[TaskHandler], str]:
523 def wrapper(self: TaskHandler) -> str:
524 # pylint: disable=protected-access
525 # (because pylint here fails to detect the use of wrapper as a
526 # method to self with respective access privileges)
527 id_ = self._params.get_int_or_none('id')
528 for _ in self._form_data.get_all_str('delete'):
530 msg = 'trying to delete non-saved ' +\
531 f'{target_class.__name__}'
532 raise NotFoundException(msg)
533 item = target_class.by_id(self.conn, id_)
534 item.remove(self.conn)
536 if target_class.can_create_by_id:
537 item = target_class.by_id_or_create(self.conn, id_)
539 item = target_class.by_id(self.conn, id_)
544 def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
545 """Update history timestamps for VersionedAttribute."""
546 id_ = self._params.get_int_or_none('id')
547 item = cls.by_id(self.conn, id_)
548 attr = getattr(item, attr_name)
549 for k, v in self._form_data.get_first_strings_starting('at:').items():
552 attr.reset_timestamp(old, f'{v}.0')
554 return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
556 def do_POST_day(self) -> str:
557 """Update or insert Day of date and Todos mapped to it."""
558 # pylint: disable=too-many-locals
559 date = self._params.get_str('date')
560 day_comment = self._form_data.get_str('day_comment')
561 make_type = self._form_data.get_str('make_type')
562 old_todos = self._form_data.get_all_int('todo_id')
563 new_todos = self._form_data.get_all_int('new_todo')
564 comments = self._form_data.get_all_str('comment')
565 efforts = self._form_data.get_all_floats_or_nones('effort')
566 done_todos = self._form_data.get_all_int('done')
567 for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
568 raise BadFormatException('"done" field refers to unknown Todo')
569 is_done = [t_id in done_todos for t_id in old_todos]
570 if not (len(old_todos) == len(is_done) == len(comments)
572 msg = 'not equal number each of number of todo_id, comments, ' +\
574 raise BadFormatException(msg)
575 day = Day.by_id_or_create(self.conn, date)
576 day.comment = day_comment
578 for process_id in sorted(new_todos):
579 if 'empty' == make_type:
580 process = Process.by_id(self.conn, process_id)
581 todo = Todo(None, process, False, date)
584 Todo.create_with_children(self.conn, process_id, date)
585 for i, todo_id in enumerate(old_todos):
586 todo = Todo.by_id(self.conn, todo_id)
587 todo.is_done = is_done[i]
588 todo.comment = comments[i]
589 todo.effort = efforts[i]
591 return f'/day?date={date}&make_type={make_type}'
593 @_delete_or_post(Todo, '/')
594 def do_POST_todo(self, todo: Todo) -> str:
595 """Update Todo and its children."""
596 # pylint: disable=too-many-locals
597 adopted_child_ids = self._form_data.get_all_int('adopt')
598 processes_to_make_full = self._form_data.get_all_int('make_full')
599 processes_to_make_empty = self._form_data.get_all_int('make_empty')
600 fill_fors = self._form_data.get_first_strings_starting('fill_for_')
601 effort = self._form_data.get_str('effort', ignore_strict=True)
602 conditions = self._form_data.get_all_int('conditions')
603 disables = self._form_data.get_all_int('disables')
604 blockers = self._form_data.get_all_int('blockers')
605 enables = self._form_data.get_all_int('enables')
606 is_done = len(self._form_data.get_all_str('done')) > 0
607 calendarize = len(self._form_data.get_all_str('calendarize')) > 0
608 comment = self._form_data.get_str('comment', ignore_strict=True)
609 for v in fill_fors.values():
610 if v.startswith('make_empty_'):
611 processes_to_make_empty += [int(v[11:])]
612 elif v.startswith('make_full_'):
613 processes_to_make_full += [int(v[10:])]
615 adopted_child_ids += [int(v)]
617 for child in todo.children:
618 assert isinstance(child.id_, int)
619 if child.id_ not in adopted_child_ids:
620 to_remove += [child.id_]
621 for id_ in to_remove:
622 child = Todo.by_id(self.conn, id_)
623 todo.remove_child(child)
624 for child_id in adopted_child_ids:
625 if child_id in [c.id_ for c in todo.children]:
627 child = Todo.by_id(self.conn, child_id)
628 todo.add_child(child)
629 for process_id in processes_to_make_empty:
630 process = Process.by_id(self.conn, process_id)
631 made = Todo(None, process, False, todo.date)
634 for process_id in processes_to_make_full:
635 made = Todo.create_with_children(self.conn, process_id, todo.date)
637 todo.effort = float(effort) if effort else None
638 todo.set_conditions(self.conn, conditions)
639 todo.set_blockers(self.conn, blockers)
640 todo.set_enables(self.conn, enables)
641 todo.set_disables(self.conn, disables)
642 todo.is_done = is_done
643 todo.calendarize = calendarize
644 todo.comment = comment
646 return f'/todo?id={todo.id_}'
648 def do_POST_process_descriptions(self) -> str:
649 """Update history timestamps for Process.description."""
650 return self._change_versioned_timestamps(Process, 'description')
652 def do_POST_process_efforts(self) -> str:
653 """Update history timestamps for Process.effort."""
654 return self._change_versioned_timestamps(Process, 'effort')
656 def do_POST_process_titles(self) -> str:
657 """Update history timestamps for Process.title."""
658 return self._change_versioned_timestamps(Process, 'title')
660 @_delete_or_post(Process, '/processes')
661 def do_POST_process(self, process: Process) -> str:
662 """Update or insert Process of ?id= and fields defined in postvars."""
663 # pylint: disable=too-many-locals
664 # pylint: disable=too-many-statements
665 title = self._form_data.get_str('title')
666 description = self._form_data.get_str('description')
667 effort = self._form_data.get_float('effort')
668 conditions = self._form_data.get_all_int('conditions')
669 blockers = self._form_data.get_all_int('blockers')
670 enables = self._form_data.get_all_int('enables')
671 disables = self._form_data.get_all_int('disables')
672 calendarize = self._form_data.get_all_str('calendarize') != []
673 suppresses = self._form_data.get_all_int('suppresses')
674 step_of = self._form_data.get_all_str('step_of')
675 keep_steps = self._form_data.get_all_int('keep_step')
676 step_ids = self._form_data.get_all_int('steps')
677 new_top_steps = self._form_data.get_all_str('new_top_step')
678 step_process_id_to = {}
679 step_parent_id_to = {}
681 for step_id in step_ids:
682 name = f'new_step_to_{step_id}'
683 new_steps_to[step_id] = self._form_data.get_all_int(name)
684 for step_id in keep_steps:
685 name = f'step_{step_id}_process_id'
686 step_process_id_to[step_id] = self._form_data.get_int(name)
687 name = f'step_{step_id}_parent_id'
688 step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
689 process.title.set(title)
690 process.description.set(description)
691 process.effort.set(effort)
692 process.set_conditions(self.conn, conditions)
693 process.set_blockers(self.conn, blockers)
694 process.set_enables(self.conn, enables)
695 process.set_disables(self.conn, disables)
696 process.calendarize = calendarize
697 process.save(self.conn)
698 assert isinstance(process.id_, int)
699 new_step_title = None
700 steps: list[ProcessStep] = []
701 for step_id in keep_steps:
702 if step_id not in step_ids:
703 raise BadFormatException('trying to keep unknown step')
704 step = ProcessStep(step_id, process.id_,
705 step_process_id_to[step_id],
706 step_parent_id_to[step_id])
708 for step_id in step_ids:
709 new = [ProcessStep(None, process.id_, step_process_id, step_id)
710 for step_process_id in new_steps_to[step_id]]
712 for step_identifier in new_top_steps:
714 step_process_id = int(step_identifier)
715 step = ProcessStep(None, process.id_, step_process_id, None)
718 new_step_title = step_identifier
719 process.set_steps(self.conn, steps)
720 process.set_step_suppressions(self.conn, suppresses)
722 new_owner_title = None
723 for owner_identifier in step_of:
725 owners_to_set += [int(owner_identifier)]
727 new_owner_title = owner_identifier
728 process.set_owners(self.conn, owners_to_set)
729 params = f'id={process.id_}'
731 title_b64_encoded = b64encode(new_step_title.encode()).decode()
732 params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
733 elif new_owner_title:
734 title_b64_encoded = b64encode(new_owner_title.encode()).decode()
735 params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
736 process.save(self.conn)
737 return f'/process?{params}'
739 def do_POST_condition_descriptions(self) -> str:
740 """Update history timestamps for Condition.description."""
741 return self._change_versioned_timestamps(Condition, 'description')
743 def do_POST_condition_titles(self) -> str:
744 """Update history timestamps for Condition.title."""
745 return self._change_versioned_timestamps(Condition, 'title')
747 @_delete_or_post(Condition, '/conditions')
748 def do_POST_condition(self, condition: Condition) -> str:
749 """Update/insert Condition of ?id= and fields defined in postvars."""
750 is_active = self._form_data.get_str('is_active') == 'True'
751 title = self._form_data.get_str('title')
752 description = self._form_data.get_str('description')
753 condition.is_active = is_active
754 condition.title.set(title)
755 condition.description.set(description)
756 condition.save(self.conn)
757 return f'/condition?id={condition.id_}'