1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
16 from plomtask.db import DatabaseConnection, DatabaseFile
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo
21 TEMPLATES_DIR = 'templates'
24 class TaskServer(HTTPServer):
25 """Variant of HTTPServer that knows .jinja as Jinja Environment."""
27 def __init__(self, db_file: DatabaseFile,
28 *args: Any, **kwargs: Any) -> None:
29 super().__init__(*args, **kwargs)
31 self.headers: list[tuple[str, str]] = []
32 self._render_mode = 'html'
33 self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
35 def set_json_mode(self) -> None:
36 """Make server send JSON instead of HTML responses."""
37 self._render_mode = 'json'
38 self.headers += [('Content-Type', 'application/json')]
41 def ctx_to_json(ctx: dict[str, object]) -> str:
42 """Render ctx into JSON string."""
43 def walk_ctx(node: object) -> Any:
44 if hasattr(node, 'as_dict_into_reference'):
45 if hasattr(node, 'id_') and node.id_ is not None:
46 return node.as_dict_into_reference(library)
47 if hasattr(node, 'as_dict'):
49 if isinstance(node, (list, tuple)):
50 return [walk_ctx(x) for x in node]
51 if isinstance(node, dict):
53 for k, v in node.items():
56 if isinstance(node, HandledException):
59 library: dict[str, dict[str | int, object]] = {}
60 for k, v in ctx.items():
62 ctx['_library'] = library
63 return json_dumps(ctx)
65 def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
66 """Render ctx according to self._render_mode.."""
67 tmpl_name = f'{tmpl_name}.{self._render_mode}'
68 if 'html' == self._render_mode:
69 template = self._jinja.get_template(tmpl_name)
70 return template.render(ctx)
71 return self.__class__.ctx_to_json(ctx)
75 """Wrapper for validating and retrieving dict-like HTTP inputs."""
77 def __init__(self, dict_: dict[str, list[str]],
78 strictness: bool = True) -> None:
80 self.strict = strictness
82 def get_str(self, key: str, default: str = '',
83 ignore_strict: bool = False) -> str:
84 """Retrieve single/first string value of key, or default."""
85 if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
86 if self.strict and not ignore_strict:
87 raise BadFormatException(f'no value found for key {key}')
89 return self.inputs[key][0]
91 def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
92 """Retrieve dict of (first) strings at key starting with prefix."""
94 for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
95 ret[key] = self.inputs[key][0]
98 def get_int(self, key: str) -> int:
99 """Retrieve single/first value of key as int, error if empty."""
100 val = self.get_int_or_none(key)
102 raise BadFormatException(f'unexpected empty value for: {key}')
105 def get_int_or_none(self, key: str) -> int | None:
106 """Retrieve single/first value of key as int, return None if empty."""
107 val = self.get_str(key, ignore_strict=True)
112 except ValueError as e:
113 msg = f'cannot int form field value for key {key}: {val}'
114 raise BadFormatException(msg) from e
116 def get_float(self, key: str) -> float:
117 """Retrieve float value of key from self.postvars."""
118 val = self.get_str(key)
121 except ValueError as e:
122 msg = f'cannot float form field value for key {key}: {val}'
123 raise BadFormatException(msg) from e
125 def get_all_str(self, key: str) -> list[str]:
126 """Retrieve list of string values at key."""
127 if key not in self.inputs.keys():
129 return self.inputs[key]
131 def get_all_int(self, key: str) -> list[int]:
132 """Retrieve list of int values at key."""
133 all_str = self.get_all_str(key)
135 return [int(s) for s in all_str if len(s) > 0]
136 except ValueError as e:
137 msg = f'cannot int a form field value for key {key} in: {all_str}'
138 raise BadFormatException(msg) from e
140 def get_all_floats_or_nones(self, key: str) -> list[float | None]:
141 """Retrieve list of float value at key, None if empty strings."""
142 ret: list[float | None] = []
143 for val in self.get_all_str(key):
149 except ValueError as e:
150 msg = f'cannot float form field value for key {key}: {val}'
151 raise BadFormatException(msg) from e
155 class TaskHandler(BaseHTTPRequestHandler):
156 """Handles single HTTP request."""
157 # pylint: disable=too-many-public-methods
159 conn: DatabaseConnection
161 _form_data: InputsParser
162 _params: InputsParser
169 """Send ctx as proper HTTP response."""
170 body = self.server.render(ctx, tmpl_name)
171 self.send_response(code)
172 for header_tuple in self.server.headers:
173 self.send_header(*header_tuple)
175 self.wfile.write(bytes(body, 'utf-8'))
178 def _request_wrapper(http_method: str, not_found_msg: str
179 ) -> Callable[..., Callable[[TaskHandler], None]]:
180 def decorator(f: Callable[..., str | None]
181 ) -> Callable[[TaskHandler], None]:
182 def wrapper(self: TaskHandler) -> None:
183 # pylint: disable=protected-access
184 # (because pylint here fails to detect the use of wrapper as a
185 # method to self with respective access privileges)
187 self.conn = DatabaseConnection(self.server.db)
188 parsed_url = urlparse(self.path)
189 self._site = path_split(parsed_url.path)[1]
190 params = parse_qs(parsed_url.query, strict_parsing=True)
191 self._params = InputsParser(params, False)
192 handler_name = f'do_{http_method}_{self._site}'
193 if hasattr(self, handler_name):
194 handler = getattr(self, handler_name)
195 redir_target = f(self, handler)
197 self.send_response(302)
198 self.send_header('Location', redir_target)
201 msg = f'{not_found_msg}: {self._site}'
202 raise NotFoundException(msg)
203 except HandledException as error:
204 for cls in (Day, Todo, Condition, Process, ProcessStep):
205 assert hasattr(cls, 'empty_cache')
208 self._send_page(ctx, 'msg', error.http_code)
214 @_request_wrapper('GET', 'Unknown page')
215 def do_GET(self, handler: Callable[[], str | dict[str, object]]
217 """Render page with result of handler, or redirect if result is str."""
218 tmpl_name = f'{self._site}'
219 ctx_or_redir_target = handler()
220 if isinstance(ctx_or_redir_target, str):
221 return ctx_or_redir_target
222 self._send_page(ctx_or_redir_target, tmpl_name)
225 @_request_wrapper('POST', 'Unknown POST target')
226 def do_POST(self, handler: Callable[[], str]) -> str:
227 """Handle POST with handler, prepare redirection to result."""
228 length = int(self.headers['content-length'])
229 postvars = parse_qs(self.rfile.read(length).decode(),
230 keep_blank_values=True, strict_parsing=True)
231 self._form_data = InputsParser(postvars)
232 redir_target = handler()
239 def _get_item(target_class: Any
240 ) -> Callable[..., Callable[[TaskHandler],
242 def decorator(f: Callable[..., dict[str, object]]
243 ) -> Callable[[TaskHandler], dict[str, object]]:
244 def wrapper(self: TaskHandler) -> dict[str, object]:
245 # pylint: disable=protected-access
246 # (because pylint here fails to detect the use of wrapper as a
247 # method to self with respective access privileges)
248 id_ = self._params.get_int_or_none('id')
249 if target_class.can_create_by_id:
250 item = target_class.by_id_or_create(self.conn, id_)
252 item = target_class.by_id(self.conn, id_)
257 def do_GET_(self) -> str:
258 """Return redirect target on GET /."""
261 def _do_GET_calendar(self) -> dict[str, object]:
262 """Show Days from ?start= to ?end=.
264 Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
265 same, the only difference being the HTML template they are rendered to,
266 which .do_GET selects from their method name.
268 start = self._params.get_str('start')
269 end = self._params.get_str('end')
271 end = date_in_n_days(366)
272 ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
273 days, start, end = ret
274 days = Day.with_filled_gaps(days, start, end)
275 today = date_in_n_days(0)
276 return {'start': start, 'end': end, 'days': days, 'today': today}
278 def do_GET_calendar(self) -> dict[str, object]:
279 """Show Days from ?start= to ?end= – normal view."""
280 return self._do_GET_calendar()
282 def do_GET_calendar_txt(self) -> dict[str, object]:
283 """Show Days from ?start= to ?end= – minimalist view."""
284 return self._do_GET_calendar()
286 def do_GET_day(self) -> dict[str, object]:
287 """Show single Day of ?date=."""
288 date = self._params.get_str('date', date_in_n_days(0))
289 day = Day.by_id_or_create(self.conn, date)
290 make_type = self._params.get_str('make_type')
291 conditions_present = []
294 for todo in day.todos:
295 for condition in todo.conditions + todo.blockers:
296 if condition not in conditions_present:
297 conditions_present += [condition]
298 enablers_for[condition.id_] = [p for p in
299 Process.all(self.conn)
300 if condition in p.enables]
301 disablers_for[condition.id_] = [p for p in
302 Process.all(self.conn)
303 if condition in p.disables]
304 seen_todos: set[int] = set()
305 top_nodes = [t.get_step_tree(seen_todos)
306 for t in day.todos if not t.parents]
308 'top_nodes': top_nodes,
309 'make_type': make_type,
310 'enablers_for': enablers_for,
311 'disablers_for': disablers_for,
312 'conditions_present': conditions_present,
313 'processes': Process.all(self.conn)}
316 def do_GET_todo(self, todo: Todo) -> dict[str, object]:
317 """Show single Todo of ?id=."""
321 """Collect what's useful for Todo steps tree display."""
324 process: Process | None
325 children: list[TodoStepsNode] # pylint: disable=undefined-variable
326 fillable: bool = False
328 def walk_process_steps(id_: int,
329 process_step_nodes: list[ProcessStepsNode],
330 steps_nodes: list[TodoStepsNode]) -> None:
331 for process_step_node in process_step_nodes:
333 node = TodoStepsNode(id_, None, process_step_node.process, [])
334 steps_nodes += [node]
335 walk_process_steps(id_, list(process_step_node.steps.values()),
338 def walk_todo_steps(id_: int, todos: list[Todo],
339 steps_nodes: list[TodoStepsNode]) -> None:
342 for match in [item for item in steps_nodes
344 and item.process == todo.process]:
347 for child in match.children:
348 child.fillable = True
349 walk_todo_steps(id_, todo.children, match.children)
352 node = TodoStepsNode(id_, todo, None, [])
353 steps_nodes += [node]
354 walk_todo_steps(id_, todo.children, node.children)
356 def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
359 for node in steps_nodes:
361 assert isinstance(node.process, Process)
362 assert isinstance(node.process.id_, int)
363 ids.add(node.process.id_)
364 ids = ids | collect_adoptables_keys(node.children)
367 todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
368 process_tree = todo.process.get_steps(self.conn, None)
369 steps_todo_to_process: list[TodoStepsNode] = []
370 walk_process_steps(0, list(process_tree.values()),
371 steps_todo_to_process)
372 for steps_node in steps_todo_to_process:
373 steps_node.fillable = True
374 walk_todo_steps(len(steps_todo_to_process), todo_steps,
375 steps_todo_to_process)
376 adoptables: dict[int, list[Todo]] = {}
377 any_adoptables = [Todo.by_id(self.conn, t.id_)
378 for t in Todo.by_date(self.conn, todo.date)
381 for id_ in collect_adoptables_keys(steps_todo_to_process):
382 adoptables[id_] = [t for t in any_adoptables
383 if t.process.id_ == id_]
384 return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
385 'adoption_candidates_for': adoptables,
386 'process_candidates': Process.all(self.conn),
387 'todo_candidates': any_adoptables,
388 'condition_candidates': Condition.all(self.conn)}
390 def do_GET_todos(self) -> dict[str, object]:
391 """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
392 sort_by = self._params.get_str('sort_by')
393 start = self._params.get_str('start')
394 end = self._params.get_str('end')
395 process_id = self._params.get_int_or_none('process_id')
396 comment_pattern = self._params.get_str('comment_pattern')
398 ret = Todo.by_date_range_with_limits(self.conn, (start, end))
399 todos_by_date_range, start, end = ret
400 todos = [t for t in todos_by_date_range
401 if comment_pattern in t.comment
402 and ((not process_id) or t.process.id_ == process_id)]
403 sort_by = Todo.sort_by(todos, sort_by)
404 return {'start': start, 'end': end, 'process_id': process_id,
405 'comment_pattern': comment_pattern, 'todos': todos,
406 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
408 def do_GET_conditions(self) -> dict[str, object]:
409 """Show all Conditions."""
410 pattern = self._params.get_str('pattern')
411 sort_by = self._params.get_str('sort_by')
412 conditions = Condition.matching(self.conn, pattern)
413 sort_by = Condition.sort_by(conditions, sort_by)
414 return {'conditions': conditions,
418 @_get_item(Condition)
419 def do_GET_condition(self, c: Condition) -> dict[str, object]:
420 """Show Condition of ?id=."""
421 ps = Process.all(self.conn)
422 return {'condition': c, 'is_new': c.id_ is None,
423 'enabled_processes': [p for p in ps if c in p.conditions],
424 'disabled_processes': [p for p in ps if c in p.blockers],
425 'enabling_processes': [p for p in ps if c in p.enables],
426 'disabling_processes': [p for p in ps if c in p.disables]}
428 @_get_item(Condition)
429 def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
430 """Show title history of Condition of ?id=."""
431 return {'condition': c}
433 @_get_item(Condition)
434 def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
435 """Show description historys of Condition of ?id=."""
436 return {'condition': c}
439 def do_GET_process(self, process: Process) -> dict[str, object]:
440 """Show Process of ?id=."""
441 owner_ids = self._params.get_all_int('step_to')
442 owned_ids = self._params.get_all_int('has_step')
443 title_64 = self._params.get_str('title_b64')
445 title = b64decode(title_64.encode()).decode()
446 process.title.set(title)
447 owners = process.used_as_step_by(self.conn)
448 for step_id in owner_ids:
449 owners += [Process.by_id(self.conn, step_id)]
450 preset_top_step = None
451 for process_id in owned_ids:
452 preset_top_step = process_id
453 return {'process': process, 'is_new': process.id_ is None,
454 'preset_top_step': preset_top_step,
455 'steps': process.get_steps(self.conn), 'owners': owners,
456 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
457 'process_candidates': Process.all(self.conn),
458 'condition_candidates': Condition.all(self.conn)}
461 def do_GET_process_titles(self, p: Process) -> dict[str, object]:
462 """Show title history of Process of ?id=."""
463 return {'process': p}
466 def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
467 """Show description historys of Process of ?id=."""
468 return {'process': p}
471 def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
472 """Show default effort history of Process of ?id=."""
473 return {'process': p}
475 def do_GET_processes(self) -> dict[str, object]:
476 """Show all Processes."""
477 pattern = self._params.get_str('pattern')
478 sort_by = self._params.get_str('sort_by')
479 processes = Process.matching(self.conn, pattern)
480 sort_by = Process.sort_by(processes, sort_by)
481 return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
486 def _delete_or_post(target_class: Any, redir_target: str = '/'
487 ) -> Callable[..., Callable[[TaskHandler], str]]:
488 def decorator(f: Callable[..., str]
489 ) -> Callable[[TaskHandler], str]:
490 def wrapper(self: TaskHandler) -> str:
491 # pylint: disable=protected-access
492 # (because pylint here fails to detect the use of wrapper as a
493 # method to self with respective access privileges)
494 id_ = self._params.get_int_or_none('id')
495 for _ in self._form_data.get_all_str('delete'):
497 msg = 'trying to delete non-saved ' +\
498 f'{target_class.__name__}'
499 raise NotFoundException(msg)
500 item = target_class.by_id(self.conn, id_)
501 item.remove(self.conn)
503 if target_class.can_create_by_id:
504 item = target_class.by_id_or_create(self.conn, id_)
506 item = target_class.by_id(self.conn, id_)
511 def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
512 """Update history timestamps for VersionedAttribute."""
513 id_ = self._params.get_int_or_none('id')
514 item = cls.by_id(self.conn, id_)
515 attr = getattr(item, attr_name)
516 for k, v in self._form_data.get_first_strings_starting('at:').items():
519 attr.reset_timestamp(old, f'{v}.0')
521 return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
523 def do_POST_day(self) -> str:
524 """Update or insert Day of date and Todos mapped to it."""
525 # pylint: disable=too-many-locals
526 date = self._params.get_str('date')
527 day_comment = self._form_data.get_str('day_comment')
528 make_type = self._form_data.get_str('make_type')
529 old_todos = self._form_data.get_all_int('todo_id')
530 new_todos = self._form_data.get_all_int('new_todo')
531 comments = self._form_data.get_all_str('comment')
532 efforts = self._form_data.get_all_floats_or_nones('effort')
533 done_todos = self._form_data.get_all_int('done')
534 for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
535 raise BadFormatException('"done" field refers to unknown Todo')
536 is_done = [t_id in done_todos for t_id in old_todos]
537 if not (len(old_todos) == len(is_done) == len(comments)
539 msg = 'not equal number each of number of todo_id, comments, ' +\
541 raise BadFormatException(msg)
542 day = Day.by_id_or_create(self.conn, date)
543 day.comment = day_comment
545 for process_id in sorted(new_todos):
546 if 'empty' == make_type:
547 process = Process.by_id(self.conn, process_id)
548 todo = Todo(None, process, False, date)
551 Todo.create_with_children(self.conn, process_id, date)
552 for i, todo_id in enumerate(old_todos):
553 todo = Todo.by_id(self.conn, todo_id)
554 todo.is_done = is_done[i]
555 todo.comment = comments[i]
556 todo.effort = efforts[i]
558 return f'/day?date={date}&make_type={make_type}'
560 @_delete_or_post(Todo, '/')
561 def do_POST_todo(self, todo: Todo) -> str:
562 """Update Todo and its children."""
563 # pylint: disable=too-many-locals
564 adopted_child_ids = self._form_data.get_all_int('adopt')
565 processes_to_make_full = self._form_data.get_all_int('make_full')
566 processes_to_make_empty = self._form_data.get_all_int('make_empty')
567 fill_fors = self._form_data.get_first_strings_starting('fill_for_')
568 effort = self._form_data.get_str('effort', ignore_strict=True)
569 conditions = self._form_data.get_all_int('conditions')
570 disables = self._form_data.get_all_int('disables')
571 blockers = self._form_data.get_all_int('blockers')
572 enables = self._form_data.get_all_int('enables')
573 is_done = len(self._form_data.get_all_str('done')) > 0
574 calendarize = len(self._form_data.get_all_str('calendarize')) > 0
575 comment = self._form_data.get_str('comment', ignore_strict=True)
576 for v in fill_fors.values():
577 if v.startswith('make_empty_'):
578 processes_to_make_empty += [int(v[11:])]
579 elif v.startswith('make_full_'):
580 processes_to_make_full += [int(v[10:])]
582 adopted_child_ids += [int(v)]
584 for child in todo.children:
585 assert isinstance(child.id_, int)
586 if child.id_ not in adopted_child_ids:
587 to_remove += [child.id_]
588 for id_ in to_remove:
589 child = Todo.by_id(self.conn, id_)
590 todo.remove_child(child)
591 for child_id in adopted_child_ids:
592 if child_id in [c.id_ for c in todo.children]:
594 child = Todo.by_id(self.conn, child_id)
595 todo.add_child(child)
596 for process_id in processes_to_make_empty:
597 process = Process.by_id(self.conn, process_id)
598 made = Todo(None, process, False, todo.date)
601 for process_id in processes_to_make_full:
602 made = Todo.create_with_children(self.conn, process_id, todo.date)
604 todo.effort = float(effort) if effort else None
605 todo.set_conditions(self.conn, conditions)
606 todo.set_blockers(self.conn, blockers)
607 todo.set_enables(self.conn, enables)
608 todo.set_disables(self.conn, disables)
609 todo.is_done = is_done
610 todo.calendarize = calendarize
611 todo.comment = comment
613 return f'/todo?id={todo.id_}'
615 def do_POST_process_descriptions(self) -> str:
616 """Update history timestamps for Process.description."""
617 return self._change_versioned_timestamps(Process, 'description')
619 def do_POST_process_efforts(self) -> str:
620 """Update history timestamps for Process.effort."""
621 return self._change_versioned_timestamps(Process, 'effort')
623 def do_POST_process_titles(self) -> str:
624 """Update history timestamps for Process.title."""
625 return self._change_versioned_timestamps(Process, 'title')
627 @_delete_or_post(Process, '/processes')
628 def do_POST_process(self, process: Process) -> str:
629 """Update or insert Process of ?id= and fields defined in postvars."""
630 # pylint: disable=too-many-locals
631 # pylint: disable=too-many-statements
632 title = self._form_data.get_str('title')
633 description = self._form_data.get_str('description')
634 effort = self._form_data.get_float('effort')
635 conditions = self._form_data.get_all_int('conditions')
636 blockers = self._form_data.get_all_int('blockers')
637 enables = self._form_data.get_all_int('enables')
638 disables = self._form_data.get_all_int('disables')
639 calendarize = self._form_data.get_all_str('calendarize') != []
640 suppresses = self._form_data.get_all_int('suppresses')
641 step_of = self._form_data.get_all_str('step_of')
642 keep_steps = self._form_data.get_all_int('keep_step')
643 step_ids = self._form_data.get_all_int('steps')
644 new_top_steps = self._form_data.get_all_str('new_top_step')
645 step_process_id_to = {}
646 step_parent_id_to = {}
648 for step_id in step_ids:
649 name = f'new_step_to_{step_id}'
650 new_steps_to[step_id] = self._form_data.get_all_int(name)
651 for step_id in keep_steps:
652 name = f'step_{step_id}_process_id'
653 step_process_id_to[step_id] = self._form_data.get_int(name)
654 name = f'step_{step_id}_parent_id'
655 step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
656 process.title.set(title)
657 process.description.set(description)
658 process.effort.set(effort)
659 process.set_conditions(self.conn, conditions)
660 process.set_blockers(self.conn, blockers)
661 process.set_enables(self.conn, enables)
662 process.set_disables(self.conn, disables)
663 process.calendarize = calendarize
664 process.save(self.conn)
665 assert isinstance(process.id_, int)
666 new_step_title = None
667 steps: list[ProcessStep] = []
668 for step_id in keep_steps:
669 if step_id not in step_ids:
670 raise BadFormatException('trying to keep unknown step')
671 step = ProcessStep(step_id, process.id_,
672 step_process_id_to[step_id],
673 step_parent_id_to[step_id])
675 for step_id in step_ids:
676 new = [ProcessStep(None, process.id_, step_process_id, step_id)
677 for step_process_id in new_steps_to[step_id]]
679 for step_identifier in new_top_steps:
681 step_process_id = int(step_identifier)
682 step = ProcessStep(None, process.id_, step_process_id, None)
685 new_step_title = step_identifier
686 process.set_steps(self.conn, steps)
687 process.set_step_suppressions(self.conn, suppresses)
689 new_owner_title = None
690 for owner_identifier in step_of:
692 owners_to_set += [int(owner_identifier)]
694 new_owner_title = owner_identifier
695 process.set_owners(self.conn, owners_to_set)
696 params = f'id={process.id_}'
698 title_b64_encoded = b64encode(new_step_title.encode()).decode()
699 params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
700 elif new_owner_title:
701 title_b64_encoded = b64encode(new_owner_title.encode()).decode()
702 params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
703 process.save(self.conn)
704 return f'/process?{params}'
706 def do_POST_condition_descriptions(self) -> str:
707 """Update history timestamps for Condition.description."""
708 return self._change_versioned_timestamps(Condition, 'description')
710 def do_POST_condition_titles(self) -> str:
711 """Update history timestamps for Condition.title."""
712 return self._change_versioned_timestamps(Condition, 'title')
714 @_delete_or_post(Condition, '/conditions')
715 def do_POST_condition(self, condition: Condition) -> str:
716 """Update/insert Condition of ?id= and fields defined in postvars."""
717 is_active = self._form_data.get_str('is_active') == 'True'
718 title = self._form_data.get_str('title')
719 description = self._form_data.get_str('description')
720 condition.is_active = is_active
721 condition.title.set(title)
722 condition.description.set(description)
723 condition.save(self.conn)
724 return f'/condition?id={condition.id_}'