1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
16 from plomtask.db import DatabaseConnection, DatabaseFile
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo
21 TEMPLATES_DIR = 'templates'
24 class TaskServer(HTTPServer):
25 """Variant of HTTPServer that knows .jinja as Jinja Environment."""
27 def __init__(self, db_file: DatabaseFile,
28 *args: Any, **kwargs: Any) -> None:
29 super().__init__(*args, **kwargs)
31 self.headers: list[tuple[str, str]] = []
32 self._render_mode = 'html'
33 self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
35 def set_json_mode(self) -> None:
36 """Make server send JSON instead of HTML responses."""
37 self._render_mode = 'json'
38 self.headers += [('Content-Type', 'application/json')]
41 def ctx_to_json(ctx: dict[str, object]) -> str:
42 """Render ctx into JSON string."""
43 def walk_ctx(node: object) -> Any:
44 if hasattr(node, 'as_dict_into_reference'):
45 if hasattr(node, 'id_') and node.id_ is not None:
46 return node.as_dict_into_reference(library)
47 if hasattr(node, 'as_dict'):
49 if isinstance(node, (list, tuple)):
50 return [walk_ctx(x) for x in node]
51 if isinstance(node, dict):
53 for k, v in node.items():
56 if isinstance(node, HandledException):
59 library: dict[str, dict[str | int, object]] = {}
60 for k, v in ctx.items():
62 ctx['_library'] = library
63 return json_dumps(ctx)
65 def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
66 """Render ctx according to self._render_mode.."""
67 tmpl_name = f'{tmpl_name}.{self._render_mode}'
68 if 'html' == self._render_mode:
69 template = self._jinja.get_template(tmpl_name)
70 return template.render(ctx)
71 return self.__class__.ctx_to_json(ctx)
75 """Wrapper for validating and retrieving dict-like HTTP inputs."""
77 def __init__(self, dict_: dict[str, list[str]],
78 strictness: bool = True) -> None:
80 self.strict = strictness
82 def get_str(self, key: str, default: str = '',
83 ignore_strict: bool = False) -> str:
84 """Retrieve single/first string value of key, or default."""
85 if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
86 if self.strict and not ignore_strict:
87 raise BadFormatException(f'no value found for key {key}')
89 return self.inputs[key][0]
91 def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
92 """Retrieve dict of (first) strings at key starting with prefix."""
94 for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
95 ret[key] = self.inputs[key][0]
98 def get_int(self, key: str) -> int:
99 """Retrieve single/first value of key as int, error if empty."""
100 val = self.get_int_or_none(key)
102 raise BadFormatException(f'unexpected empty value for: {key}')
105 def get_int_or_none(self, key: str) -> int | None:
106 """Retrieve single/first value of key as int, return None if empty."""
107 val = self.get_str(key, ignore_strict=True)
112 except ValueError as e:
113 msg = f'cannot int form field value for key {key}: {val}'
114 raise BadFormatException(msg) from e
116 def get_float(self, key: str) -> float:
117 """Retrieve float value of key from self.postvars."""
118 val = self.get_str(key)
121 except ValueError as e:
122 msg = f'cannot float form field value for key {key}: {val}'
123 raise BadFormatException(msg) from e
125 def get_all_str(self, key: str) -> list[str]:
126 """Retrieve list of string values at key."""
127 if key not in self.inputs.keys():
129 return self.inputs[key]
131 def get_all_int(self, key: str) -> list[int]:
132 """Retrieve list of int values at key."""
133 all_str = self.get_all_str(key)
135 return [int(s) for s in all_str if len(s) > 0]
136 except ValueError as e:
137 msg = f'cannot int a form field value for key {key} in: {all_str}'
138 raise BadFormatException(msg) from e
141 class TaskHandler(BaseHTTPRequestHandler):
142 """Handles single HTTP request."""
143 # pylint: disable=too-many-public-methods
145 conn: DatabaseConnection
147 _form_data: InputsParser
148 _params: InputsParser
155 """Send ctx as proper HTTP response."""
156 body = self.server.render(ctx, tmpl_name)
157 self.send_response(code)
158 for header_tuple in self.server.headers:
159 self.send_header(*header_tuple)
161 self.wfile.write(bytes(body, 'utf-8'))
164 def _request_wrapper(http_method: str, not_found_msg: str
165 ) -> Callable[..., Callable[[TaskHandler], None]]:
166 def decorator(f: Callable[..., str | None]
167 ) -> Callable[[TaskHandler], None]:
168 def wrapper(self: TaskHandler) -> None:
169 # pylint: disable=protected-access
170 # (because pylint here fails to detect the use of wrapper as a
171 # method to self with respective access privileges)
173 self.conn = DatabaseConnection(self.server.db)
174 parsed_url = urlparse(self.path)
175 self._site = path_split(parsed_url.path)[1]
176 params = parse_qs(parsed_url.query, strict_parsing=True)
177 self._params = InputsParser(params, False)
178 handler_name = f'do_{http_method}_{self._site}'
179 if hasattr(self, handler_name):
180 handler = getattr(self, handler_name)
181 redir_target = f(self, handler)
183 self.send_response(302)
184 self.send_header('Location', redir_target)
187 msg = f'{not_found_msg}: {self._site}'
188 raise NotFoundException(msg)
189 except HandledException as error:
190 for cls in (Day, Todo, Condition, Process, ProcessStep):
191 assert hasattr(cls, 'empty_cache')
194 self._send_page(ctx, 'msg', error.http_code)
200 @_request_wrapper('GET', 'Unknown page')
201 def do_GET(self, handler: Callable[[], str | dict[str, object]]
203 """Render page with result of handler, or redirect if result is str."""
204 tmpl_name = f'{self._site}'
205 ctx_or_redir_target = handler()
206 if isinstance(ctx_or_redir_target, str):
207 return ctx_or_redir_target
208 self._send_page(ctx_or_redir_target, tmpl_name)
211 @_request_wrapper('POST', 'Unknown POST target')
212 def do_POST(self, handler: Callable[[], str]) -> str:
213 """Handle POST with handler, prepare redirection to result."""
214 length = int(self.headers['content-length'])
215 postvars = parse_qs(self.rfile.read(length).decode(),
216 keep_blank_values=True, strict_parsing=True)
217 self._form_data = InputsParser(postvars)
218 redir_target = handler()
224 def do_GET_(self) -> str:
225 """Return redirect target on GET /."""
228 def _do_GET_calendar(self) -> dict[str, object]:
229 """Show Days from ?start= to ?end=.
231 Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
232 same, the only difference being the HTML template they are rendered to,
233 which .do_GET selects from their method name.
235 start = self._params.get_str('start')
236 end = self._params.get_str('end')
238 end = date_in_n_days(366)
239 ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
240 days, start, end = ret
241 days = Day.with_filled_gaps(days, start, end)
242 today = date_in_n_days(0)
243 return {'start': start, 'end': end, 'days': days, 'today': today}
245 def do_GET_calendar(self) -> dict[str, object]:
246 """Show Days from ?start= to ?end= – normal view."""
247 return self._do_GET_calendar()
249 def do_GET_calendar_txt(self) -> dict[str, object]:
250 """Show Days from ?start= to ?end= – minimalist view."""
251 return self._do_GET_calendar()
253 def do_GET_day(self) -> dict[str, object]:
254 """Show single Day of ?date=."""
255 date = self._params.get_str('date', date_in_n_days(0))
256 day = Day.by_id_or_create(self.conn, date)
257 make_type = self._params.get_str('make_type')
258 conditions_present = []
261 for todo in day.todos:
262 for condition in todo.conditions + todo.blockers:
263 if condition not in conditions_present:
264 conditions_present += [condition]
265 enablers_for[condition.id_] = [p for p in
266 Process.all(self.conn)
267 if condition in p.enables]
268 disablers_for[condition.id_] = [p for p in
269 Process.all(self.conn)
270 if condition in p.disables]
271 seen_todos: set[int] = set()
272 top_nodes = [t.get_step_tree(seen_todos)
273 for t in day.todos if not t.parents]
275 'top_nodes': top_nodes,
276 'make_type': make_type,
277 'enablers_for': enablers_for,
278 'disablers_for': disablers_for,
279 'conditions_present': conditions_present,
280 'processes': Process.all(self.conn)}
282 def do_GET_todo(self) -> dict[str, object]:
283 """Show single Todo of ?id=."""
287 """Collect what's useful for Todo steps tree display."""
290 process: Process | None
291 children: list[TodoStepsNode] # pylint: disable=undefined-variable
292 fillable: bool = False
294 def walk_process_steps(id_: int,
295 process_step_nodes: list[ProcessStepsNode],
296 steps_nodes: list[TodoStepsNode]) -> None:
297 for process_step_node in process_step_nodes:
299 node = TodoStepsNode(id_, None, process_step_node.process, [])
300 steps_nodes += [node]
301 walk_process_steps(id_, list(process_step_node.steps.values()),
304 def walk_todo_steps(id_: int, todos: list[Todo],
305 steps_nodes: list[TodoStepsNode]) -> None:
308 for match in [item for item in steps_nodes
310 and item.process == todo.process]:
313 for child in match.children:
314 child.fillable = True
315 walk_todo_steps(id_, todo.children, match.children)
318 node = TodoStepsNode(id_, todo, None, [])
319 steps_nodes += [node]
320 walk_todo_steps(id_, todo.children, node.children)
322 def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
325 for node in steps_nodes:
327 assert isinstance(node.process, Process)
328 assert isinstance(node.process.id_, int)
329 ids.add(node.process.id_)
330 ids = ids | collect_adoptables_keys(node.children)
333 id_ = self._params.get_int('id')
334 todo = Todo.by_id(self.conn, id_)
335 todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
336 process_tree = todo.process.get_steps(self.conn, None)
337 steps_todo_to_process: list[TodoStepsNode] = []
338 walk_process_steps(0, list(process_tree.values()),
339 steps_todo_to_process)
340 for steps_node in steps_todo_to_process:
341 steps_node.fillable = True
342 walk_todo_steps(len(steps_todo_to_process), todo_steps,
343 steps_todo_to_process)
344 adoptables: dict[int, list[Todo]] = {}
345 any_adoptables = [Todo.by_id(self.conn, t.id_)
346 for t in Todo.by_date(self.conn, todo.date)
349 for id_ in collect_adoptables_keys(steps_todo_to_process):
350 adoptables[id_] = [t for t in any_adoptables
351 if t.process.id_ == id_]
352 return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
353 'adoption_candidates_for': adoptables,
354 'process_candidates': Process.all(self.conn),
355 'todo_candidates': any_adoptables,
356 'condition_candidates': Condition.all(self.conn)}
358 def do_GET_todos(self) -> dict[str, object]:
359 """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
360 sort_by = self._params.get_str('sort_by')
361 start = self._params.get_str('start')
362 end = self._params.get_str('end')
363 process_id = self._params.get_int_or_none('process_id')
364 comment_pattern = self._params.get_str('comment_pattern')
366 ret = Todo.by_date_range_with_limits(self.conn, (start, end))
367 todos_by_date_range, start, end = ret
368 todos = [t for t in todos_by_date_range
369 if comment_pattern in t.comment
370 and ((not process_id) or t.process.id_ == process_id)]
371 if sort_by == 'doneness':
372 todos.sort(key=lambda t: t.is_done)
373 elif sort_by == '-doneness':
374 todos.sort(key=lambda t: t.is_done, reverse=True)
375 elif sort_by == 'title':
376 todos.sort(key=lambda t: t.title_then)
377 elif sort_by == '-title':
378 todos.sort(key=lambda t: t.title_then, reverse=True)
379 elif sort_by == 'comment':
380 todos.sort(key=lambda t: t.comment)
381 elif sort_by == '-comment':
382 todos.sort(key=lambda t: t.comment, reverse=True)
383 elif sort_by == '-date':
384 todos.sort(key=lambda t: t.date, reverse=True)
386 todos.sort(key=lambda t: t.date)
388 return {'start': start, 'end': end, 'process_id': process_id,
389 'comment_pattern': comment_pattern, 'todos': todos,
390 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
392 def do_GET_conditions(self) -> dict[str, object]:
393 """Show all Conditions."""
394 pattern = self._params.get_str('pattern')
395 conditions = Condition.matching(self.conn, pattern)
396 sort_by = self._params.get_str('sort_by')
397 if sort_by == 'is_active':
398 conditions.sort(key=lambda c: c.is_active)
399 elif sort_by == '-is_active':
400 conditions.sort(key=lambda c: c.is_active, reverse=True)
401 elif sort_by == '-title':
402 conditions.sort(key=lambda c: c.title.newest, reverse=True)
404 conditions.sort(key=lambda c: c.title.newest)
406 return {'conditions': conditions,
410 def do_GET_condition(self) -> dict[str, object]:
411 """Show Condition of ?id=."""
412 id_ = self._params.get_int_or_none('id')
413 c = Condition.by_id_or_create(self.conn, id_)
414 ps = Process.all(self.conn)
415 return {'condition': c, 'is_new': c.id_ is None,
416 'enabled_processes': [p for p in ps if c in p.conditions],
417 'disabled_processes': [p for p in ps if c in p.blockers],
418 'enabling_processes': [p for p in ps if c in p.enables],
419 'disabling_processes': [p for p in ps if c in p.disables]}
421 def do_GET_condition_titles(self) -> dict[str, object]:
422 """Show title history of Condition of ?id=."""
423 id_ = self._params.get_int('id')
424 condition = Condition.by_id(self.conn, id_)
425 return {'condition': condition}
427 def do_GET_condition_descriptions(self) -> dict[str, object]:
428 """Show description historys of Condition of ?id=."""
429 id_ = self._params.get_int('id')
430 condition = Condition.by_id(self.conn, id_)
431 return {'condition': condition}
433 def do_GET_process(self) -> dict[str, object]:
434 """Show Process of ?id=."""
435 id_ = self._params.get_int_or_none('id')
436 process = Process.by_id_or_create(self.conn, id_)
437 title_64 = self._params.get_str('title_b64')
439 title = b64decode(title_64.encode()).decode()
440 process.title.set(title)
441 owners = process.used_as_step_by(self.conn)
442 for step_id in self._params.get_all_int('step_to'):
443 owners += [Process.by_id(self.conn, step_id)]
444 preset_top_step = None
445 for process_id in self._params.get_all_int('has_step'):
446 preset_top_step = process_id
447 return {'process': process, 'is_new': process.id_ is None,
448 'preset_top_step': preset_top_step,
449 'steps': process.get_steps(self.conn), 'owners': owners,
450 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
451 'process_candidates': Process.all(self.conn),
452 'condition_candidates': Condition.all(self.conn)}
454 def do_GET_process_titles(self) -> dict[str, object]:
455 """Show title history of Process of ?id=."""
456 id_ = self._params.get_int('id')
457 process = Process.by_id(self.conn, id_)
458 return {'process': process}
460 def do_GET_process_descriptions(self) -> dict[str, object]:
461 """Show description historys of Process of ?id=."""
462 id_ = self._params.get_int('id')
463 process = Process.by_id(self.conn, id_)
464 return {'process': process}
466 def do_GET_process_efforts(self) -> dict[str, object]:
467 """Show default effort history of Process of ?id=."""
468 id_ = self._params.get_int('id')
469 process = Process.by_id(self.conn, id_)
470 return {'process': process}
472 def do_GET_processes(self) -> dict[str, object]:
473 """Show all Processes."""
474 pattern = self._params.get_str('pattern')
475 processes = Process.matching(self.conn, pattern)
476 sort_by = self._params.get_str('sort_by')
477 if sort_by == 'steps':
478 processes.sort(key=lambda p: len(p.explicit_steps))
479 elif sort_by == '-steps':
480 processes.sort(key=lambda p: len(p.explicit_steps), reverse=True)
481 elif sort_by == 'owners':
482 processes.sort(key=lambda p: p.n_owners or 0)
483 elif sort_by == '-owners':
484 processes.sort(key=lambda p: p.n_owners or 0, reverse=True)
485 elif sort_by == 'effort':
486 processes.sort(key=lambda p: p.effort.newest)
487 elif sort_by == '-effort':
488 processes.sort(key=lambda p: p.effort.newest, reverse=True)
489 elif sort_by == '-title':
490 processes.sort(key=lambda p: p.title.newest, reverse=True)
492 processes.sort(key=lambda p: p.title.newest)
494 return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
499 def _delete_or_post(target_class: Any, redir_target: str = '/'
500 ) -> Callable[..., Callable[[TaskHandler], str]]:
501 def decorator(f: Callable[..., str]
502 ) -> Callable[[TaskHandler], str]:
503 def wrapper(self: TaskHandler) -> str:
504 # pylint: disable=protected-access
505 # (because pylint here fails to detect the use of wrapper as a
506 # method to self with respective access privileges)
507 id_ = self._params.get_int_or_none('id')
508 for _ in self._form_data.get_all_str('delete'):
510 msg = 'trying to delete non-saved ' +\
511 f'{target_class.__name__}'
512 raise NotFoundException(msg)
513 item = target_class.by_id(self.conn, id_)
514 item.remove(self.conn)
516 if target_class.can_create_by_id:
517 item = target_class.by_id_or_create(self.conn, id_)
519 item = target_class.by_id(self.conn, id_)
524 def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
525 """Update history timestamps for VersionedAttribute."""
526 id_ = self._params.get_int_or_none('id')
527 item = cls.by_id(self.conn, id_)
528 attr = getattr(item, attr_name)
529 for k, v in self._form_data.get_first_strings_starting('at:').items():
532 attr.reset_timestamp(old, f'{v}.0')
534 return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
536 def do_POST_day(self) -> str:
537 """Update or insert Day of date and Todos mapped to it."""
538 date = self._params.get_str('date')
539 day = Day.by_id_or_create(self.conn, date)
540 day.comment = self._form_data.get_str('day_comment')
542 make_type = self._form_data.get_str('make_type')
543 for process_id in sorted(self._form_data.get_all_int('new_todo')):
544 if 'empty' == make_type:
545 process = Process.by_id(self.conn, process_id)
546 todo = Todo(None, process, False, date)
549 Todo.create_with_children(self.conn, process_id, date)
550 done_ids = self._form_data.get_all_int('done')
551 comments = self._form_data.get_all_str('comment')
552 efforts = self._form_data.get_all_str('effort')
553 for i, todo_id in enumerate(self._form_data.get_all_int('todo_id')):
554 todo = Todo.by_id(self.conn, todo_id)
555 todo.is_done = todo_id in done_ids
556 if len(comments) > 0:
557 todo.comment = comments[i]
559 todo.effort = float(efforts[i]) if efforts[i] else None
561 return f'/day?date={date}&make_type={make_type}'
563 @_delete_or_post(Todo, '/')
564 def do_POST_todo(self, todo: Todo) -> str:
565 """Update Todo and its children."""
566 adopted_child_ids = self._form_data.get_all_int('adopt')
567 processes_to_make_full = self._form_data.get_all_int('make_full')
568 processes_to_make_empty = self._form_data.get_all_int('make_empty')
569 fill_fors = self._form_data.get_first_strings_starting('fill_for_')
570 for v in fill_fors.values():
571 if v.startswith('make_empty_'):
572 processes_to_make_empty += [int(v[11:])]
573 elif v.startswith('make_full_'):
574 processes_to_make_full += [int(v[10:])]
576 adopted_child_ids += [int(v)]
578 for child in todo.children:
579 assert isinstance(child.id_, int)
580 if child.id_ not in adopted_child_ids:
581 to_remove += [child.id_]
582 for id_ in to_remove:
583 child = Todo.by_id(self.conn, id_)
584 todo.remove_child(child)
585 for child_id in adopted_child_ids:
586 if child_id in [c.id_ for c in todo.children]:
588 child = Todo.by_id(self.conn, child_id)
589 todo.add_child(child)
590 for process_id in processes_to_make_empty:
591 process = Process.by_id(self.conn, process_id)
592 made = Todo(None, process, False, todo.date)
595 for process_id in processes_to_make_full:
596 made = Todo.create_with_children(self.conn, process_id, todo.date)
598 effort = self._form_data.get_str('effort', ignore_strict=True)
599 todo.effort = float(effort) if effort else None
600 todo.set_conditions(self.conn,
601 self._form_data.get_all_int('conditions'))
602 todo.set_blockers(self.conn, self._form_data.get_all_int('blockers'))
603 todo.set_enables(self.conn, self._form_data.get_all_int('enables'))
604 todo.set_disables(self.conn, self._form_data.get_all_int('disables'))
605 todo.is_done = len(self._form_data.get_all_str('done')) > 0
606 todo.calendarize = len(self._form_data.get_all_str('calendarize')) > 0
607 todo.comment = self._form_data.get_str('comment', ignore_strict=True)
609 return f'/todo?id={todo.id_}'
611 def do_POST_process_descriptions(self) -> str:
612 """Update history timestamps for Process.description."""
613 return self._change_versioned_timestamps(Process, 'description')
615 def do_POST_process_efforts(self) -> str:
616 """Update history timestamps for Process.effort."""
617 return self._change_versioned_timestamps(Process, 'effort')
619 def do_POST_process_titles(self) -> str:
620 """Update history timestamps for Process.title."""
621 return self._change_versioned_timestamps(Process, 'title')
623 @_delete_or_post(Process, '/processes')
624 def do_POST_process(self, process: Process) -> str:
625 """Update or insert Process of ?id= and fields defined in postvars."""
626 process.title.set(self._form_data.get_str('title'))
627 process.description.set(self._form_data.get_str('description'))
628 process.effort.set(self._form_data.get_float('effort'))
629 process.set_conditions(self.conn,
630 self._form_data.get_all_int('conditions'))
631 process.set_blockers(self.conn,
632 self._form_data.get_all_int('blockers'))
633 process.set_enables(self.conn, self._form_data.get_all_int('enables'))
634 process.set_disables(self.conn,
635 self._form_data.get_all_int('disables'))
636 process.calendarize = self._form_data.get_all_str('calendarize') != []
637 process.save(self.conn)
638 assert isinstance(process.id_, int)
639 steps: list[ProcessStep] = []
640 for step_id in self._form_data.get_all_int('keep_step'):
641 if step_id not in self._form_data.get_all_int('steps'):
642 raise BadFormatException('trying to keep unknown step')
643 for step_id in self._form_data.get_all_int('steps'):
644 if step_id not in self._form_data.get_all_int('keep_step'):
646 step_process_id = self._form_data.get_int(
647 f'step_{step_id}_process_id')
648 parent_id = self._form_data.get_int_or_none(
649 f'step_{step_id}_parent_id')
650 steps += [ProcessStep(step_id, process.id_, step_process_id,
652 for step_id in self._form_data.get_all_int('steps'):
653 for step_process_id in self._form_data.get_all_int(
654 f'new_step_to_{step_id}'):
655 steps += [ProcessStep(None, process.id_, step_process_id,
657 new_step_title = None
658 for step_identifier in self._form_data.get_all_str('new_top_step'):
660 step_process_id = int(step_identifier)
661 steps += [ProcessStep(None, process.id_, step_process_id,
664 new_step_title = step_identifier
665 process.set_steps(self.conn, steps)
666 process.set_step_suppressions(self.conn,
668 get_all_int('suppresses'))
670 new_owner_title = None
671 for owner_identifier in self._form_data.get_all_str('step_of'):
673 owners_to_set += [int(owner_identifier)]
675 new_owner_title = owner_identifier
676 process.set_owners(self.conn, owners_to_set)
677 params = f'id={process.id_}'
679 title_b64_encoded = b64encode(new_step_title.encode()).decode()
680 params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
681 elif new_owner_title:
682 title_b64_encoded = b64encode(new_owner_title.encode()).decode()
683 params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
684 process.save(self.conn)
685 return f'/process?{params}'
687 def do_POST_condition_descriptions(self) -> str:
688 """Update history timestamps for Condition.description."""
689 return self._change_versioned_timestamps(Condition, 'description')
691 def do_POST_condition_titles(self) -> str:
692 """Update history timestamps for Condition.title."""
693 return self._change_versioned_timestamps(Condition, 'title')
695 @_delete_or_post(Condition, '/conditions')
696 def do_POST_condition(self, condition: Condition) -> str:
697 """Update/insert Condition of ?id= and fields defined in postvars."""
698 condition.is_active = self._form_data.get_str('is_active') == 'True'
699 condition.title.set(self._form_data.get_str('title'))
700 condition.description.set(self._form_data.get_str('description'))
701 condition.save(self.conn)
702 return f'/condition?id={condition.id_}'