1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
16 from plomtask.db import DatabaseConnection, DatabaseFile
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo
21 TEMPLATES_DIR = 'templates'
24 class TaskServer(HTTPServer):
25 """Variant of HTTPServer that knows .jinja as Jinja Environment."""
27 def __init__(self, db_file: DatabaseFile,
28 *args: Any, **kwargs: Any) -> None:
29 super().__init__(*args, **kwargs)
31 self.headers: list[tuple[str, str]] = []
32 self._render_mode = 'html'
33 self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
35 def set_json_mode(self) -> None:
36 """Make server send JSON instead of HTML responses."""
37 self._render_mode = 'json'
38 self.headers += [('Content-Type', 'application/json')]
41 def ctx_to_json(ctx: dict[str, object]) -> str:
42 """Render ctx into JSON string."""
43 def walk_ctx(node: object) -> Any:
44 if hasattr(node, 'as_dict_into_reference'):
45 if hasattr(node, 'id_') and node.id_ is not None:
46 return node.as_dict_into_reference(library)
47 if hasattr(node, 'as_dict'):
49 if isinstance(node, (list, tuple)):
50 return [walk_ctx(x) for x in node]
51 if isinstance(node, dict):
53 for k, v in node.items():
56 if isinstance(node, HandledException):
59 library: dict[str, dict[str | int, object]] = {}
60 for k, v in ctx.items():
62 ctx['_library'] = library
63 return json_dumps(ctx)
65 def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
66 """Render ctx according to self._render_mode.."""
67 tmpl_name = f'{tmpl_name}.{self._render_mode}'
68 if 'html' == self._render_mode:
69 template = self._jinja.get_template(tmpl_name)
70 return template.render(ctx)
71 return self.__class__.ctx_to_json(ctx)
75 """Wrapper for validating and retrieving dict-like HTTP inputs."""
77 def __init__(self, dict_: dict[str, list[str]],
78 strictness: bool = True) -> None:
80 self.strict = strictness
82 def get_str(self, key: str, default: str = '',
83 ignore_strict: bool = False) -> str:
84 """Retrieve single/first string value of key, or default."""
85 if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
86 if self.strict and not ignore_strict:
87 raise BadFormatException(f'no value found for key {key}')
89 return self.inputs[key][0]
91 def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
92 """Retrieve dict of (first) strings at key starting with prefix."""
94 for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
95 ret[key] = self.inputs[key][0]
98 def get_int(self, key: str) -> int:
99 """Retrieve single/first value of key as int, error if empty."""
100 val = self.get_int_or_none(key)
102 raise BadFormatException(f'unexpected empty value for: {key}')
105 def get_int_or_none(self, key: str) -> int | None:
106 """Retrieve single/first value of key as int, return None if empty."""
107 val = self.get_str(key, ignore_strict=True)
112 except ValueError as e:
113 msg = f'cannot int form field value for key {key}: {val}'
114 raise BadFormatException(msg) from e
116 def get_float(self, key: str) -> float:
117 """Retrieve float value of key from self.postvars."""
118 val = self.get_str(key)
121 except ValueError as e:
122 msg = f'cannot float form field value for key {key}: {val}'
123 raise BadFormatException(msg) from e
125 def get_all_str(self, key: str) -> list[str]:
126 """Retrieve list of string values at key."""
127 if key not in self.inputs.keys():
129 return self.inputs[key]
131 def get_all_int(self, key: str) -> list[int]:
132 """Retrieve list of int values at key."""
133 all_str = self.get_all_str(key)
135 return [int(s) for s in all_str if len(s) > 0]
136 except ValueError as e:
137 msg = f'cannot int a form field value for key {key} in: {all_str}'
138 raise BadFormatException(msg) from e
141 class TaskHandler(BaseHTTPRequestHandler):
142 """Handles single HTTP request."""
143 # pylint: disable=too-many-public-methods
145 conn: DatabaseConnection
147 _form_data: InputsParser
148 _params: InputsParser
155 """Send ctx as proper HTTP response."""
156 body = self.server.render(ctx, tmpl_name)
157 self.send_response(code)
158 for header_tuple in self.server.headers:
159 self.send_header(*header_tuple)
161 self.wfile.write(bytes(body, 'utf-8'))
164 def _request_wrapper(http_method: str, not_found_msg: str
165 ) -> Callable[..., Callable[[TaskHandler], None]]:
166 def decorator(f: Callable[..., str | None]
167 ) -> Callable[[TaskHandler], None]:
168 def wrapper(self: TaskHandler) -> None:
169 # pylint: disable=protected-access
170 # (because pylint here fails to detect the use of wrapper as a
171 # method to self with respective access privileges)
173 self.conn = DatabaseConnection(self.server.db)
174 parsed_url = urlparse(self.path)
175 self._site = path_split(parsed_url.path)[1]
176 params = parse_qs(parsed_url.query, strict_parsing=True)
177 self._params = InputsParser(params, False)
178 handler_name = f'do_{http_method}_{self._site}'
179 if hasattr(self, handler_name):
180 handler = getattr(self, handler_name)
181 redir_target = f(self, handler)
183 self.send_response(302)
184 self.send_header('Location', redir_target)
187 msg = f'{not_found_msg}: {self._site}'
188 raise NotFoundException(msg)
189 except HandledException as error:
190 for cls in (Day, Todo, Condition, Process, ProcessStep):
191 assert hasattr(cls, 'empty_cache')
194 self._send_page(ctx, 'msg', error.http_code)
200 @_request_wrapper('GET', 'Unknown page')
201 def do_GET(self, handler: Callable[[], str | dict[str, object]]
203 """Render page with result of handler, or redirect if result is str."""
204 tmpl_name = f'{self._site}'
205 ctx_or_redir_target = handler()
206 if isinstance(ctx_or_redir_target, str):
207 return ctx_or_redir_target
208 self._send_page(ctx_or_redir_target, tmpl_name)
211 @_request_wrapper('POST', 'Unknown POST target')
212 def do_POST(self, handler: Callable[[], str]) -> str:
213 """Handle POST with handler, prepare redirection to result."""
214 length = int(self.headers['content-length'])
215 postvars = parse_qs(self.rfile.read(length).decode(),
216 keep_blank_values=True, strict_parsing=True)
217 self._form_data = InputsParser(postvars)
218 redir_target = handler()
224 def do_GET_(self) -> str:
225 """Return redirect target on GET /."""
228 def _do_GET_calendar(self) -> dict[str, object]:
229 """Show Days from ?start= to ?end=.
231 Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
232 same, the only difference being the HTML template they are rendered to,
233 which .do_GET selects from their method name.
235 start = self._params.get_str('start')
236 end = self._params.get_str('end')
238 end = date_in_n_days(366)
239 ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
240 days, start, end = ret
241 days = Day.with_filled_gaps(days, start, end)
242 today = date_in_n_days(0)
243 return {'start': start, 'end': end, 'days': days, 'today': today}
245 def do_GET_calendar(self) -> dict[str, object]:
246 """Show Days from ?start= to ?end= – normal view."""
247 return self._do_GET_calendar()
249 def do_GET_calendar_txt(self) -> dict[str, object]:
250 """Show Days from ?start= to ?end= – minimalist view."""
251 return self._do_GET_calendar()
253 def do_GET_day(self) -> dict[str, object]:
254 """Show single Day of ?date=."""
255 date = self._params.get_str('date', date_in_n_days(0))
256 day = Day.by_id_or_create(self.conn, date)
257 make_type = self._params.get_str('make_type')
258 conditions_present = []
261 for todo in day.todos:
262 for condition in todo.conditions + todo.blockers:
263 if condition not in conditions_present:
264 conditions_present += [condition]
265 enablers_for[condition.id_] = [p for p in
266 Process.all(self.conn)
267 if condition in p.enables]
268 disablers_for[condition.id_] = [p for p in
269 Process.all(self.conn)
270 if condition in p.disables]
271 seen_todos: set[int] = set()
272 top_nodes = [t.get_step_tree(seen_todos)
273 for t in day.todos if not t.parents]
275 'top_nodes': top_nodes,
276 'make_type': make_type,
277 'enablers_for': enablers_for,
278 'disablers_for': disablers_for,
279 'conditions_present': conditions_present,
280 'processes': Process.all(self.conn)}
282 def do_GET_todo(self) -> dict[str, object]:
283 """Show single Todo of ?id=."""
287 """Collect what's useful for Todo steps tree display."""
290 process: Process | None
291 children: list[TodoStepsNode] # pylint: disable=undefined-variable
292 fillable: bool = False
294 def walk_process_steps(id_: int,
295 process_step_nodes: list[ProcessStepsNode],
296 steps_nodes: list[TodoStepsNode]) -> None:
297 for process_step_node in process_step_nodes:
299 node = TodoStepsNode(id_, None, process_step_node.process, [])
300 steps_nodes += [node]
301 walk_process_steps(id_, list(process_step_node.steps.values()),
304 def walk_todo_steps(id_: int, todos: list[Todo],
305 steps_nodes: list[TodoStepsNode]) -> None:
308 for match in [item for item in steps_nodes
310 and item.process == todo.process]:
313 for child in match.children:
314 child.fillable = True
315 walk_todo_steps(id_, todo.children, match.children)
318 node = TodoStepsNode(id_, todo, None, [])
319 steps_nodes += [node]
320 walk_todo_steps(id_, todo.children, node.children)
322 def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
325 for node in steps_nodes:
327 assert isinstance(node.process, Process)
328 assert isinstance(node.process.id_, int)
329 ids.add(node.process.id_)
330 ids = ids | collect_adoptables_keys(node.children)
333 id_ = self._params.get_int('id')
334 todo = Todo.by_id(self.conn, id_)
335 todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
336 process_tree = todo.process.get_steps(self.conn, None)
337 steps_todo_to_process: list[TodoStepsNode] = []
338 walk_process_steps(0, list(process_tree.values()),
339 steps_todo_to_process)
340 for steps_node in steps_todo_to_process:
341 steps_node.fillable = True
342 walk_todo_steps(len(steps_todo_to_process), todo_steps,
343 steps_todo_to_process)
344 adoptables: dict[int, list[Todo]] = {}
345 any_adoptables = [Todo.by_id(self.conn, t.id_)
346 for t in Todo.by_date(self.conn, todo.date)
349 for id_ in collect_adoptables_keys(steps_todo_to_process):
350 adoptables[id_] = [t for t in any_adoptables
351 if t.process.id_ == id_]
352 return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
353 'adoption_candidates_for': adoptables,
354 'process_candidates': Process.all(self.conn),
355 'todo_candidates': any_adoptables,
356 'condition_candidates': Condition.all(self.conn)}
358 def do_GET_todos(self) -> dict[str, object]:
359 """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
360 sort_by = self._params.get_str('sort_by')
361 start = self._params.get_str('start')
362 end = self._params.get_str('end')
363 process_id = self._params.get_int_or_none('process_id')
364 comment_pattern = self._params.get_str('comment_pattern')
366 ret = Todo.by_date_range_with_limits(self.conn, (start, end))
367 todos_by_date_range, start, end = ret
368 todos = [t for t in todos_by_date_range
369 if comment_pattern in t.comment
370 and ((not process_id) or t.process.id_ == process_id)]
371 if sort_by == 'doneness':
372 todos.sort(key=lambda t: t.is_done)
373 elif sort_by == '-doneness':
374 todos.sort(key=lambda t: t.is_done, reverse=True)
375 elif sort_by == 'title':
376 todos.sort(key=lambda t: t.title_then)
377 elif sort_by == '-title':
378 todos.sort(key=lambda t: t.title_then, reverse=True)
379 elif sort_by == 'comment':
380 todos.sort(key=lambda t: t.comment)
381 elif sort_by == '-comment':
382 todos.sort(key=lambda t: t.comment, reverse=True)
383 elif sort_by == '-date':
384 todos.sort(key=lambda t: t.date, reverse=True)
386 todos.sort(key=lambda t: t.date)
388 return {'start': start, 'end': end, 'process_id': process_id,
389 'comment_pattern': comment_pattern, 'todos': todos,
390 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
392 def do_GET_conditions(self) -> dict[str, object]:
393 """Show all Conditions."""
394 pattern = self._params.get_str('pattern')
395 conditions = Condition.matching(self.conn, pattern)
396 sort_by = self._params.get_str('sort_by')
397 if sort_by == 'is_active':
398 conditions.sort(key=lambda c: c.is_active)
399 elif sort_by == '-is_active':
400 conditions.sort(key=lambda c: c.is_active, reverse=True)
401 elif sort_by == '-title':
402 conditions.sort(key=lambda c: c.title.newest, reverse=True)
404 conditions.sort(key=lambda c: c.title.newest)
406 return {'conditions': conditions,
410 def do_GET_condition(self) -> dict[str, object]:
411 """Show Condition of ?id=."""
412 id_ = self._params.get_int_or_none('id')
413 c = Condition.by_id_or_create(self.conn, id_)
414 ps = Process.all(self.conn)
415 return {'condition': c, 'is_new': c.id_ is None,
416 'enabled_processes': [p for p in ps if c in p.conditions],
417 'disabled_processes': [p for p in ps if c in p.blockers],
418 'enabling_processes': [p for p in ps if c in p.enables],
419 'disabling_processes': [p for p in ps if c in p.disables]}
421 def do_GET_condition_titles(self) -> dict[str, object]:
422 """Show title history of Condition of ?id=."""
423 id_ = self._params.get_int('id')
424 condition = Condition.by_id(self.conn, id_)
425 return {'condition': condition}
427 def do_GET_condition_descriptions(self) -> dict[str, object]:
428 """Show description historys of Condition of ?id=."""
429 id_ = self._params.get_int('id')
430 condition = Condition.by_id(self.conn, id_)
431 return {'condition': condition}
433 def do_GET_process(self) -> dict[str, object]:
434 """Show Process of ?id=."""
435 id_ = self._params.get_int_or_none('id')
436 process = Process.by_id_or_create(self.conn, id_)
437 title_64 = self._params.get_str('title_b64')
439 title = b64decode(title_64.encode()).decode()
440 process.title.set(title)
441 owners = process.used_as_step_by(self.conn)
442 for step_id in self._params.get_all_int('step_to'):
443 owners += [Process.by_id(self.conn, step_id)]
444 preset_top_step = None
445 for process_id in self._params.get_all_int('has_step'):
446 preset_top_step = process_id
447 return {'process': process, 'is_new': process.id_ is None,
448 'preset_top_step': preset_top_step,
449 'steps': process.get_steps(self.conn), 'owners': owners,
450 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
451 'process_candidates': Process.all(self.conn),
452 'condition_candidates': Condition.all(self.conn)}
454 def do_GET_process_titles(self) -> dict[str, object]:
455 """Show title history of Process of ?id=."""
456 id_ = self._params.get_int('id')
457 process = Process.by_id(self.conn, id_)
458 return {'process': process}
460 def do_GET_process_descriptions(self) -> dict[str, object]:
461 """Show description historys of Process of ?id=."""
462 id_ = self._params.get_int('id')
463 process = Process.by_id(self.conn, id_)
464 return {'process': process}
466 def do_GET_process_efforts(self) -> dict[str, object]:
467 """Show default effort history of Process of ?id=."""
468 id_ = self._params.get_int('id')
469 process = Process.by_id(self.conn, id_)
470 return {'process': process}
472 def do_GET_processes(self) -> dict[str, object]:
473 """Show all Processes."""
474 pattern = self._params.get_str('pattern')
475 processes = Process.matching(self.conn, pattern)
476 sort_by = self._params.get_str('sort_by')
477 if sort_by == 'steps':
478 processes.sort(key=lambda p: len(p.explicit_steps))
479 elif sort_by == '-steps':
480 processes.sort(key=lambda p: len(p.explicit_steps), reverse=True)
481 elif sort_by == 'owners':
482 processes.sort(key=lambda p: p.n_owners or 0)
483 elif sort_by == '-owners':
484 processes.sort(key=lambda p: p.n_owners or 0, reverse=True)
485 elif sort_by == 'effort':
486 processes.sort(key=lambda p: p.effort.newest)
487 elif sort_by == '-effort':
488 processes.sort(key=lambda p: p.effort.newest, reverse=True)
489 elif sort_by == '-title':
490 processes.sort(key=lambda p: p.title.newest, reverse=True)
492 processes.sort(key=lambda p: p.title.newest)
494 return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
499 def _delete_or_post(target_class: Any, redir_target: str = '/'
500 ) -> Callable[..., Callable[[TaskHandler], str]]:
501 def decorator(f: Callable[..., str]
502 ) -> Callable[[TaskHandler], str]:
503 def wrapper(self: TaskHandler) -> str:
504 # pylint: disable=protected-access
505 # (because pylint here fails to detect the use of wrapper as a
506 # method to self with respective access privileges)
507 id_ = self._params.get_int_or_none('id')
508 for _ in self._form_data.get_all_str('delete'):
510 msg = 'trying to delete non-saved ' +\
511 f'{target_class.__name__}'
512 raise NotFoundException(msg)
513 item = target_class.by_id(self.conn, id_)
514 item.remove(self.conn)
516 if target_class.can_create_by_id:
517 item = target_class.by_id_or_create(self.conn, id_)
519 item = target_class.by_id(self.conn, id_)
524 def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
525 """Update history timestamps for VersionedAttribute."""
526 id_ = self._params.get_int_or_none('id')
527 item = cls.by_id(self.conn, id_)
528 attr = getattr(item, attr_name)
529 for k, v in self._form_data.get_first_strings_starting('at:').items():
532 attr.reset_timestamp(old, f'{v}.0')
534 return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
536 def do_POST_day(self) -> str:
537 """Update or insert Day of date and Todos mapped to it."""
538 # pylint: disable=too-many-locals
539 date = self._params.get_str('date')
540 day_comment = self._form_data.get_str('day_comment')
541 make_type = self._form_data.get_str('make_type')
542 old_todos = self._form_data.get_all_int('todo_id')
543 new_todos = self._form_data.get_all_int('new_todo')
544 is_done = [t_id in self._form_data.get_all_int('done')
545 for t_id in old_todos]
546 comments = self._form_data.get_all_str('comment')
547 efforts = [float(effort) if effort else None
548 for effort in self._form_data.get_all_str('effort')]
549 if old_todos and 3*[len(old_todos)] != [len(is_done), len(comments),
551 msg = 'not equal number each of number of todo_id, comments, ' +\
553 raise BadFormatException(msg)
554 day = Day.by_id_or_create(self.conn, date)
555 day.comment = day_comment
557 for process_id in sorted(new_todos):
558 if 'empty' == make_type:
559 process = Process.by_id(self.conn, process_id)
560 todo = Todo(None, process, False, date)
563 Todo.create_with_children(self.conn, process_id, date)
564 for i, todo_id in enumerate(old_todos):
565 todo = Todo.by_id(self.conn, todo_id)
566 todo.is_done = is_done[i]
567 todo.comment = comments[i]
568 todo.effort = efforts[i]
570 return f'/day?date={date}&make_type={make_type}'
572 @_delete_or_post(Todo, '/')
573 def do_POST_todo(self, todo: Todo) -> str:
574 """Update Todo and its children."""
575 # pylint: disable=too-many-locals
576 adopted_child_ids = self._form_data.get_all_int('adopt')
577 processes_to_make_full = self._form_data.get_all_int('make_full')
578 processes_to_make_empty = self._form_data.get_all_int('make_empty')
579 fill_fors = self._form_data.get_first_strings_starting('fill_for_')
580 effort = self._form_data.get_str('effort', ignore_strict=True)
581 conditions = self._form_data.get_all_int('conditions')
582 disables = self._form_data.get_all_int('disables')
583 blockers = self._form_data.get_all_int('blockers')
584 enables = self._form_data.get_all_int('enables')
585 is_done = len(self._form_data.get_all_str('done')) > 0
586 calendarize = len(self._form_data.get_all_str('calendarize')) > 0
587 comment = self._form_data.get_str('comment', ignore_strict=True)
588 for v in fill_fors.values():
589 if v.startswith('make_empty_'):
590 processes_to_make_empty += [int(v[11:])]
591 elif v.startswith('make_full_'):
592 processes_to_make_full += [int(v[10:])]
594 adopted_child_ids += [int(v)]
596 for child in todo.children:
597 assert isinstance(child.id_, int)
598 if child.id_ not in adopted_child_ids:
599 to_remove += [child.id_]
600 for id_ in to_remove:
601 child = Todo.by_id(self.conn, id_)
602 todo.remove_child(child)
603 for child_id in adopted_child_ids:
604 if child_id in [c.id_ for c in todo.children]:
606 child = Todo.by_id(self.conn, child_id)
607 todo.add_child(child)
608 for process_id in processes_to_make_empty:
609 process = Process.by_id(self.conn, process_id)
610 made = Todo(None, process, False, todo.date)
613 for process_id in processes_to_make_full:
614 made = Todo.create_with_children(self.conn, process_id, todo.date)
616 todo.effort = float(effort) if effort else None
617 todo.set_conditions(self.conn, conditions)
618 todo.set_blockers(self.conn, blockers)
619 todo.set_enables(self.conn, enables)
620 todo.set_disables(self.conn, disables)
621 todo.is_done = is_done
622 todo.calendarize = calendarize
623 todo.comment = comment
625 return f'/todo?id={todo.id_}'
627 def do_POST_process_descriptions(self) -> str:
628 """Update history timestamps for Process.description."""
629 return self._change_versioned_timestamps(Process, 'description')
631 def do_POST_process_efforts(self) -> str:
632 """Update history timestamps for Process.effort."""
633 return self._change_versioned_timestamps(Process, 'effort')
635 def do_POST_process_titles(self) -> str:
636 """Update history timestamps for Process.title."""
637 return self._change_versioned_timestamps(Process, 'title')
639 @_delete_or_post(Process, '/processes')
640 def do_POST_process(self, process: Process) -> str:
641 """Update or insert Process of ?id= and fields defined in postvars."""
642 # pylint: disable=too-many-locals
643 # pylint: disable=too-many-statements
644 title = self._form_data.get_str('title')
645 description = self._form_data.get_str('description')
646 effort = self._form_data.get_float('effort')
647 conditions = self._form_data.get_all_int('conditions')
648 blockers = self._form_data.get_all_int('blockers')
649 enables = self._form_data.get_all_int('enables')
650 disables = self._form_data.get_all_int('disables')
651 calendarize = self._form_data.get_all_str('calendarize') != []
652 suppresses = self._form_data.get_all_int('suppresses')
653 step_of = self._form_data.get_all_str('step_of')
654 keep_steps = self._form_data.get_all_int('keep_step')
655 step_ids = self._form_data.get_all_int('steps')
656 new_top_steps = self._form_data.get_all_str('new_top_step')
657 step_process_id_to = {}
658 step_parent_id_to = {}
660 for step_id in step_ids:
661 name = f'new_step_to_{step_id}'
662 new_steps_to[step_id] = self._form_data.get_all_int(name)
663 for step_id in keep_steps:
664 name = f'step_{step_id}_process_id'
665 step_process_id_to[step_id] = self._form_data.get_int(name)
666 name = f'step_{step_id}_parent_id'
667 step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
668 process.title.set(title)
669 process.description.set(description)
670 process.effort.set(effort)
671 process.set_conditions(self.conn, conditions)
672 process.set_blockers(self.conn, blockers)
673 process.set_enables(self.conn, enables)
674 process.set_disables(self.conn, disables)
675 process.calendarize = calendarize
676 process.save(self.conn)
677 assert isinstance(process.id_, int)
678 new_step_title = None
679 steps: list[ProcessStep] = []
680 for step_id in keep_steps:
681 if step_id not in step_ids:
682 raise BadFormatException('trying to keep unknown step')
683 step = ProcessStep(step_id, process.id_,
684 step_process_id_to[step_id],
685 step_parent_id_to[step_id])
687 for step_id in step_ids:
688 new = [ProcessStep(None, process.id_, step_process_id, step_id)
689 for step_process_id in new_steps_to[step_id]]
691 for step_identifier in new_top_steps:
693 step_process_id = int(step_identifier)
694 step = ProcessStep(None, process.id_, step_process_id, None)
697 new_step_title = step_identifier
698 process.set_steps(self.conn, steps)
699 process.set_step_suppressions(self.conn, suppresses)
701 new_owner_title = None
702 for owner_identifier in step_of:
704 owners_to_set += [int(owner_identifier)]
706 new_owner_title = owner_identifier
707 process.set_owners(self.conn, owners_to_set)
708 params = f'id={process.id_}'
710 title_b64_encoded = b64encode(new_step_title.encode()).decode()
711 params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
712 elif new_owner_title:
713 title_b64_encoded = b64encode(new_owner_title.encode()).decode()
714 params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
715 process.save(self.conn)
716 return f'/process?{params}'
718 def do_POST_condition_descriptions(self) -> str:
719 """Update history timestamps for Condition.description."""
720 return self._change_versioned_timestamps(Condition, 'description')
722 def do_POST_condition_titles(self) -> str:
723 """Update history timestamps for Condition.title."""
724 return self._change_versioned_timestamps(Condition, 'title')
726 @_delete_or_post(Condition, '/conditions')
727 def do_POST_condition(self, condition: Condition) -> str:
728 """Update/insert Condition of ?id= and fields defined in postvars."""
729 is_active = self._form_data.get_str('is_active') == 'True'
730 title = self._form_data.get_str('title')
731 description = self._form_data.get_str('description')
732 condition.is_active = is_active
733 condition.title.set(title)
734 condition.description.set(description)
735 condition.save(self.conn)
736 return f'/condition?id={condition.id_}'