1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
16 from plomtask.db import DatabaseConnection, DatabaseFile
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo
21 TEMPLATES_DIR = 'templates'
24 class TaskServer(HTTPServer):
25 """Variant of HTTPServer that knows .jinja as Jinja Environment."""
27 def __init__(self, db_file: DatabaseFile,
28 *args: Any, **kwargs: Any) -> None:
29 super().__init__(*args, **kwargs)
31 self.headers: list[tuple[str, str]] = []
32 self._render_mode = 'html'
33 self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
35 def set_json_mode(self) -> None:
36 """Make server send JSON instead of HTML responses."""
37 self._render_mode = 'json'
38 self.headers += [('Content-Type', 'application/json')]
41 def ctx_to_json(ctx: dict[str, object]) -> str:
42 """Render ctx into JSON string."""
43 def walk_ctx(node: object) -> Any:
44 if hasattr(node, 'as_dict_into_reference'):
45 if hasattr(node, 'id_') and node.id_ is not None:
46 return node.as_dict_into_reference(library)
47 if hasattr(node, 'as_dict'):
49 if isinstance(node, (list, tuple)):
50 return [walk_ctx(x) for x in node]
51 if isinstance(node, dict):
53 for k, v in node.items():
56 if isinstance(node, HandledException):
59 library: dict[str, dict[str | int, object]] = {}
60 for k, v in ctx.items():
62 ctx['_library'] = library
63 return json_dumps(ctx)
65 def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
66 """Render ctx according to self._render_mode.."""
67 tmpl_name = f'{tmpl_name}.{self._render_mode}'
68 if 'html' == self._render_mode:
69 template = self._jinja.get_template(tmpl_name)
70 return template.render(ctx)
71 return self.__class__.ctx_to_json(ctx)
75 """Wrapper for validating and retrieving dict-like HTTP inputs."""
77 def __init__(self, dict_: dict[str, list[str]],
78 strictness: bool = True) -> None:
80 self.strict = strictness
82 def get_str(self, key: str, default: str = '',
83 ignore_strict: bool = False) -> str:
84 """Retrieve single/first string value of key, or default."""
85 if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
86 if self.strict and not ignore_strict:
87 raise BadFormatException(f'no value found for key {key}')
89 return self.inputs[key][0]
91 def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
92 """Retrieve dict of (first) strings at key starting with prefix."""
94 for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
95 ret[key] = self.inputs[key][0]
98 def get_int(self, key: str) -> int:
99 """Retrieve single/first value of key as int, error if empty."""
100 val = self.get_int_or_none(key)
102 raise BadFormatException(f'unexpected empty value for: {key}')
105 def get_int_or_none(self, key: str) -> int | None:
106 """Retrieve single/first value of key as int, return None if empty."""
107 val = self.get_str(key, ignore_strict=True)
112 except ValueError as e:
113 msg = f'cannot int form field value for key {key}: {val}'
114 raise BadFormatException(msg) from e
116 def get_float(self, key: str) -> float:
117 """Retrieve float value of key from self.postvars."""
118 val = self.get_str(key)
121 except ValueError as e:
122 msg = f'cannot float form field value for key {key}: {val}'
123 raise BadFormatException(msg) from e
125 def get_all_str(self, key: str) -> list[str]:
126 """Retrieve list of string values at key."""
127 if key not in self.inputs.keys():
129 return self.inputs[key]
131 def get_all_int(self, key: str) -> list[int]:
132 """Retrieve list of int values at key."""
133 all_str = self.get_all_str(key)
135 return [int(s) for s in all_str if len(s) > 0]
136 except ValueError as e:
137 msg = f'cannot int a form field value for key {key} in: {all_str}'
138 raise BadFormatException(msg) from e
140 def get_all_floats_or_nones(self, key: str) -> list[float | None]:
141 """Retrieve list of float value at key, None if empty strings."""
142 ret: list[float | None] = []
143 for val in self.get_all_str(key):
149 except ValueError as e:
150 msg = f'cannot float form field value for key {key}: {val}'
151 raise BadFormatException(msg) from e
155 class TaskHandler(BaseHTTPRequestHandler):
156 """Handles single HTTP request."""
157 # pylint: disable=too-many-public-methods
159 conn: DatabaseConnection
161 _form_data: InputsParser
162 _params: InputsParser
169 """Send ctx as proper HTTP response."""
170 body = self.server.render(ctx, tmpl_name)
171 self.send_response(code)
172 for header_tuple in self.server.headers:
173 self.send_header(*header_tuple)
175 self.wfile.write(bytes(body, 'utf-8'))
178 def _request_wrapper(http_method: str, not_found_msg: str
179 ) -> Callable[..., Callable[[TaskHandler], None]]:
180 def decorator(f: Callable[..., str | None]
181 ) -> Callable[[TaskHandler], None]:
182 def wrapper(self: TaskHandler) -> None:
183 # pylint: disable=protected-access
184 # (because pylint here fails to detect the use of wrapper as a
185 # method to self with respective access privileges)
187 self.conn = DatabaseConnection(self.server.db)
188 parsed_url = urlparse(self.path)
189 self._site = path_split(parsed_url.path)[1]
190 params = parse_qs(parsed_url.query, strict_parsing=True)
191 self._params = InputsParser(params, False)
192 handler_name = f'do_{http_method}_{self._site}'
193 if hasattr(self, handler_name):
194 handler = getattr(self, handler_name)
195 redir_target = f(self, handler)
197 self.send_response(302)
198 self.send_header('Location', redir_target)
201 msg = f'{not_found_msg}: {self._site}'
202 raise NotFoundException(msg)
203 except HandledException as error:
204 for cls in (Day, Todo, Condition, Process, ProcessStep):
205 assert hasattr(cls, 'empty_cache')
208 self._send_page(ctx, 'msg', error.http_code)
214 @_request_wrapper('GET', 'Unknown page')
215 def do_GET(self, handler: Callable[[], str | dict[str, object]]
217 """Render page with result of handler, or redirect if result is str."""
218 tmpl_name = f'{self._site}'
219 ctx_or_redir_target = handler()
220 if isinstance(ctx_or_redir_target, str):
221 return ctx_or_redir_target
222 self._send_page(ctx_or_redir_target, tmpl_name)
225 @_request_wrapper('POST', 'Unknown POST target')
226 def do_POST(self, handler: Callable[[], str]) -> str:
227 """Handle POST with handler, prepare redirection to result."""
228 length = int(self.headers['content-length'])
229 postvars = parse_qs(self.rfile.read(length).decode(),
230 keep_blank_values=True, strict_parsing=True)
231 self._form_data = InputsParser(postvars)
232 redir_target = handler()
239 def _get_item(target_class: Any
240 ) -> Callable[..., Callable[[TaskHandler],
242 def decorator(f: Callable[..., dict[str, object]]
243 ) -> Callable[[TaskHandler], dict[str, object]]:
244 def wrapper(self: TaskHandler) -> dict[str, object]:
245 # pylint: disable=protected-access
246 # (because pylint here fails to detect the use of wrapper as a
247 # method to self with respective access privileges)
248 id_ = self._params.get_int_or_none('id')
249 if target_class.can_create_by_id:
250 item = target_class.by_id_or_create(self.conn, id_)
252 item = target_class.by_id(self.conn, id_)
257 def do_GET_(self) -> str:
258 """Return redirect target on GET /."""
261 def _do_GET_calendar(self) -> dict[str, object]:
262 """Show Days from ?start= to ?end=.
264 Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
265 same, the only difference being the HTML template they are rendered to,
266 which .do_GET selects from their method name.
268 start = self._params.get_str('start')
269 end = self._params.get_str('end')
271 end = date_in_n_days(366)
272 ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
273 days, start, end = ret
274 days = Day.with_filled_gaps(days, start, end)
275 today = date_in_n_days(0)
276 return {'start': start, 'end': end, 'days': days, 'today': today}
278 def do_GET_calendar(self) -> dict[str, object]:
279 """Show Days from ?start= to ?end= – normal view."""
280 return self._do_GET_calendar()
282 def do_GET_calendar_txt(self) -> dict[str, object]:
283 """Show Days from ?start= to ?end= – minimalist view."""
284 return self._do_GET_calendar()
286 def do_GET_day(self) -> dict[str, object]:
287 """Show single Day of ?date=."""
288 date = self._params.get_str('date', date_in_n_days(0))
289 day = Day.by_id_or_create(self.conn, date)
290 make_type = self._params.get_str('make_type')
291 conditions_present = []
294 for todo in day.todos:
295 for condition in todo.conditions + todo.blockers:
296 if condition not in conditions_present:
297 conditions_present += [condition]
298 enablers_for[condition.id_] = [p for p in
299 Process.all(self.conn)
300 if condition in p.enables]
301 disablers_for[condition.id_] = [p for p in
302 Process.all(self.conn)
303 if condition in p.disables]
304 seen_todos: set[int] = set()
305 top_nodes = [t.get_step_tree(seen_todos)
306 for t in day.todos if not t.parents]
308 'top_nodes': top_nodes,
309 'make_type': make_type,
310 'enablers_for': enablers_for,
311 'disablers_for': disablers_for,
312 'conditions_present': conditions_present,
313 'processes': Process.all(self.conn)}
316 def do_GET_todo(self, todo: Todo) -> dict[str, object]:
317 """Show single Todo of ?id=."""
321 """Collect what's useful for Todo steps tree display."""
324 process: Process | None
325 children: list[TodoStepsNode] # pylint: disable=undefined-variable
326 fillable: bool = False
328 def walk_process_steps(id_: int,
329 process_step_nodes: list[ProcessStepsNode],
330 steps_nodes: list[TodoStepsNode]) -> None:
331 for process_step_node in process_step_nodes:
333 node = TodoStepsNode(id_, None, process_step_node.process, [])
334 steps_nodes += [node]
335 walk_process_steps(id_, list(process_step_node.steps.values()),
338 def walk_todo_steps(id_: int, todos: list[Todo],
339 steps_nodes: list[TodoStepsNode]) -> None:
342 for match in [item for item in steps_nodes
344 and item.process == todo.process]:
347 for child in match.children:
348 child.fillable = True
349 walk_todo_steps(id_, todo.children, match.children)
352 node = TodoStepsNode(id_, todo, None, [])
353 steps_nodes += [node]
354 walk_todo_steps(id_, todo.children, node.children)
356 def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
359 for node in steps_nodes:
361 assert isinstance(node.process, Process)
362 assert isinstance(node.process.id_, int)
363 ids.add(node.process.id_)
364 ids = ids | collect_adoptables_keys(node.children)
367 todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
368 process_tree = todo.process.get_steps(self.conn, None)
369 steps_todo_to_process: list[TodoStepsNode] = []
370 walk_process_steps(0, list(process_tree.values()),
371 steps_todo_to_process)
372 for steps_node in steps_todo_to_process:
373 steps_node.fillable = True
374 walk_todo_steps(len(steps_todo_to_process), todo_steps,
375 steps_todo_to_process)
376 adoptables: dict[int, list[Todo]] = {}
377 any_adoptables = [Todo.by_id(self.conn, t.id_)
378 for t in Todo.by_date(self.conn, todo.date)
381 for id_ in collect_adoptables_keys(steps_todo_to_process):
382 adoptables[id_] = [t for t in any_adoptables
383 if t.process.id_ == id_]
384 return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
385 'adoption_candidates_for': adoptables,
386 'process_candidates': Process.all(self.conn),
387 'todo_candidates': any_adoptables,
388 'condition_candidates': Condition.all(self.conn)}
390 def do_GET_todos(self) -> dict[str, object]:
391 """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
392 sort_by = self._params.get_str('sort_by')
393 start = self._params.get_str('start')
394 end = self._params.get_str('end')
395 process_id = self._params.get_int_or_none('process_id')
396 comment_pattern = self._params.get_str('comment_pattern')
398 ret = Todo.by_date_range_with_limits(self.conn, (start, end))
399 todos_by_date_range, start, end = ret
400 todos = [t for t in todos_by_date_range
401 if comment_pattern in t.comment
402 and ((not process_id) or t.process.id_ == process_id)]
403 if sort_by == 'doneness':
404 todos.sort(key=lambda t: t.is_done)
405 elif sort_by == '-doneness':
406 todos.sort(key=lambda t: t.is_done, reverse=True)
407 elif sort_by == 'title':
408 todos.sort(key=lambda t: t.title_then)
409 elif sort_by == '-title':
410 todos.sort(key=lambda t: t.title_then, reverse=True)
411 elif sort_by == 'comment':
412 todos.sort(key=lambda t: t.comment)
413 elif sort_by == '-comment':
414 todos.sort(key=lambda t: t.comment, reverse=True)
415 elif sort_by == '-date':
416 todos.sort(key=lambda t: t.date, reverse=True)
418 todos.sort(key=lambda t: t.date)
420 return {'start': start, 'end': end, 'process_id': process_id,
421 'comment_pattern': comment_pattern, 'todos': todos,
422 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
424 def do_GET_conditions(self) -> dict[str, object]:
425 """Show all Conditions."""
426 pattern = self._params.get_str('pattern')
427 conditions = Condition.matching(self.conn, pattern)
428 sort_by = self._params.get_str('sort_by')
429 if sort_by == 'is_active':
430 conditions.sort(key=lambda c: c.is_active)
431 elif sort_by == '-is_active':
432 conditions.sort(key=lambda c: c.is_active, reverse=True)
433 elif sort_by == '-title':
434 conditions.sort(key=lambda c: c.title.newest, reverse=True)
436 conditions.sort(key=lambda c: c.title.newest)
438 return {'conditions': conditions,
442 @_get_item(Condition)
443 def do_GET_condition(self, c: Condition) -> dict[str, object]:
444 """Show Condition of ?id=."""
445 ps = Process.all(self.conn)
446 return {'condition': c, 'is_new': c.id_ is None,
447 'enabled_processes': [p for p in ps if c in p.conditions],
448 'disabled_processes': [p for p in ps if c in p.blockers],
449 'enabling_processes': [p for p in ps if c in p.enables],
450 'disabling_processes': [p for p in ps if c in p.disables]}
452 @_get_item(Condition)
453 def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
454 """Show title history of Condition of ?id=."""
455 return {'condition': c}
457 @_get_item(Condition)
458 def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
459 """Show description historys of Condition of ?id=."""
460 return {'condition': c}
463 def do_GET_process(self, process: Process) -> dict[str, object]:
464 """Show Process of ?id=."""
465 title_64 = self._params.get_str('title_b64')
467 title = b64decode(title_64.encode()).decode()
468 process.title.set(title)
469 owners = process.used_as_step_by(self.conn)
470 for step_id in self._params.get_all_int('step_to'):
471 owners += [Process.by_id(self.conn, step_id)]
472 preset_top_step = None
473 for process_id in self._params.get_all_int('has_step'):
474 preset_top_step = process_id
475 return {'process': process, 'is_new': process.id_ is None,
476 'preset_top_step': preset_top_step,
477 'steps': process.get_steps(self.conn), 'owners': owners,
478 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
479 'process_candidates': Process.all(self.conn),
480 'condition_candidates': Condition.all(self.conn)}
483 def do_GET_process_titles(self, p: Process) -> dict[str, object]:
484 """Show title history of Process of ?id=."""
485 return {'process': p}
488 def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
489 """Show description historys of Process of ?id=."""
490 return {'process': p}
493 def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
494 """Show default effort history of Process of ?id=."""
495 return {'process': p}
497 def do_GET_processes(self) -> dict[str, object]:
498 """Show all Processes."""
499 pattern = self._params.get_str('pattern')
500 processes = Process.matching(self.conn, pattern)
501 sort_by = self._params.get_str('sort_by')
502 if sort_by == 'steps':
503 processes.sort(key=lambda p: len(p.explicit_steps))
504 elif sort_by == '-steps':
505 processes.sort(key=lambda p: len(p.explicit_steps), reverse=True)
506 elif sort_by == 'owners':
507 processes.sort(key=lambda p: p.n_owners or 0)
508 elif sort_by == '-owners':
509 processes.sort(key=lambda p: p.n_owners or 0, reverse=True)
510 elif sort_by == 'effort':
511 processes.sort(key=lambda p: p.effort.newest)
512 elif sort_by == '-effort':
513 processes.sort(key=lambda p: p.effort.newest, reverse=True)
514 elif sort_by == '-title':
515 processes.sort(key=lambda p: p.title.newest, reverse=True)
517 processes.sort(key=lambda p: p.title.newest)
519 return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
524 def _delete_or_post(target_class: Any, redir_target: str = '/'
525 ) -> Callable[..., Callable[[TaskHandler], str]]:
526 def decorator(f: Callable[..., str]
527 ) -> Callable[[TaskHandler], str]:
528 def wrapper(self: TaskHandler) -> str:
529 # pylint: disable=protected-access
530 # (because pylint here fails to detect the use of wrapper as a
531 # method to self with respective access privileges)
532 id_ = self._params.get_int_or_none('id')
533 for _ in self._form_data.get_all_str('delete'):
535 msg = 'trying to delete non-saved ' +\
536 f'{target_class.__name__}'
537 raise NotFoundException(msg)
538 item = target_class.by_id(self.conn, id_)
539 item.remove(self.conn)
541 if target_class.can_create_by_id:
542 item = target_class.by_id_or_create(self.conn, id_)
544 item = target_class.by_id(self.conn, id_)
549 def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
550 """Update history timestamps for VersionedAttribute."""
551 id_ = self._params.get_int_or_none('id')
552 item = cls.by_id(self.conn, id_)
553 attr = getattr(item, attr_name)
554 for k, v in self._form_data.get_first_strings_starting('at:').items():
557 attr.reset_timestamp(old, f'{v}.0')
559 return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
561 def do_POST_day(self) -> str:
562 """Update or insert Day of date and Todos mapped to it."""
563 # pylint: disable=too-many-locals
564 date = self._params.get_str('date')
565 day_comment = self._form_data.get_str('day_comment')
566 make_type = self._form_data.get_str('make_type')
567 old_todos = self._form_data.get_all_int('todo_id')
568 new_todos = self._form_data.get_all_int('new_todo')
569 comments = self._form_data.get_all_str('comment')
570 efforts = self._form_data.get_all_floats_or_nones('effort')
571 done_todos = self._form_data.get_all_int('done')
572 for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
573 raise BadFormatException('"done" field refers to unknown Todo')
574 is_done = [t_id in done_todos for t_id in old_todos]
575 if not (len(old_todos) == len(is_done) == len(comments)
577 msg = 'not equal number each of number of todo_id, comments, ' +\
579 raise BadFormatException(msg)
580 day = Day.by_id_or_create(self.conn, date)
581 day.comment = day_comment
583 for process_id in sorted(new_todos):
584 if 'empty' == make_type:
585 process = Process.by_id(self.conn, process_id)
586 todo = Todo(None, process, False, date)
589 Todo.create_with_children(self.conn, process_id, date)
590 for i, todo_id in enumerate(old_todos):
591 todo = Todo.by_id(self.conn, todo_id)
592 todo.is_done = is_done[i]
593 todo.comment = comments[i]
594 todo.effort = efforts[i]
596 return f'/day?date={date}&make_type={make_type}'
598 @_delete_or_post(Todo, '/')
599 def do_POST_todo(self, todo: Todo) -> str:
600 """Update Todo and its children."""
601 # pylint: disable=too-many-locals
602 adopted_child_ids = self._form_data.get_all_int('adopt')
603 processes_to_make_full = self._form_data.get_all_int('make_full')
604 processes_to_make_empty = self._form_data.get_all_int('make_empty')
605 fill_fors = self._form_data.get_first_strings_starting('fill_for_')
606 effort = self._form_data.get_str('effort', ignore_strict=True)
607 conditions = self._form_data.get_all_int('conditions')
608 disables = self._form_data.get_all_int('disables')
609 blockers = self._form_data.get_all_int('blockers')
610 enables = self._form_data.get_all_int('enables')
611 is_done = len(self._form_data.get_all_str('done')) > 0
612 calendarize = len(self._form_data.get_all_str('calendarize')) > 0
613 comment = self._form_data.get_str('comment', ignore_strict=True)
614 for v in fill_fors.values():
615 if v.startswith('make_empty_'):
616 processes_to_make_empty += [int(v[11:])]
617 elif v.startswith('make_full_'):
618 processes_to_make_full += [int(v[10:])]
620 adopted_child_ids += [int(v)]
622 for child in todo.children:
623 assert isinstance(child.id_, int)
624 if child.id_ not in adopted_child_ids:
625 to_remove += [child.id_]
626 for id_ in to_remove:
627 child = Todo.by_id(self.conn, id_)
628 todo.remove_child(child)
629 for child_id in adopted_child_ids:
630 if child_id in [c.id_ for c in todo.children]:
632 child = Todo.by_id(self.conn, child_id)
633 todo.add_child(child)
634 for process_id in processes_to_make_empty:
635 process = Process.by_id(self.conn, process_id)
636 made = Todo(None, process, False, todo.date)
639 for process_id in processes_to_make_full:
640 made = Todo.create_with_children(self.conn, process_id, todo.date)
642 todo.effort = float(effort) if effort else None
643 todo.set_conditions(self.conn, conditions)
644 todo.set_blockers(self.conn, blockers)
645 todo.set_enables(self.conn, enables)
646 todo.set_disables(self.conn, disables)
647 todo.is_done = is_done
648 todo.calendarize = calendarize
649 todo.comment = comment
651 return f'/todo?id={todo.id_}'
653 def do_POST_process_descriptions(self) -> str:
654 """Update history timestamps for Process.description."""
655 return self._change_versioned_timestamps(Process, 'description')
657 def do_POST_process_efforts(self) -> str:
658 """Update history timestamps for Process.effort."""
659 return self._change_versioned_timestamps(Process, 'effort')
661 def do_POST_process_titles(self) -> str:
662 """Update history timestamps for Process.title."""
663 return self._change_versioned_timestamps(Process, 'title')
665 @_delete_or_post(Process, '/processes')
666 def do_POST_process(self, process: Process) -> str:
667 """Update or insert Process of ?id= and fields defined in postvars."""
668 # pylint: disable=too-many-locals
669 # pylint: disable=too-many-statements
670 title = self._form_data.get_str('title')
671 description = self._form_data.get_str('description')
672 effort = self._form_data.get_float('effort')
673 conditions = self._form_data.get_all_int('conditions')
674 blockers = self._form_data.get_all_int('blockers')
675 enables = self._form_data.get_all_int('enables')
676 disables = self._form_data.get_all_int('disables')
677 calendarize = self._form_data.get_all_str('calendarize') != []
678 suppresses = self._form_data.get_all_int('suppresses')
679 step_of = self._form_data.get_all_str('step_of')
680 keep_steps = self._form_data.get_all_int('keep_step')
681 step_ids = self._form_data.get_all_int('steps')
682 new_top_steps = self._form_data.get_all_str('new_top_step')
683 step_process_id_to = {}
684 step_parent_id_to = {}
686 for step_id in step_ids:
687 name = f'new_step_to_{step_id}'
688 new_steps_to[step_id] = self._form_data.get_all_int(name)
689 for step_id in keep_steps:
690 name = f'step_{step_id}_process_id'
691 step_process_id_to[step_id] = self._form_data.get_int(name)
692 name = f'step_{step_id}_parent_id'
693 step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
694 process.title.set(title)
695 process.description.set(description)
696 process.effort.set(effort)
697 process.set_conditions(self.conn, conditions)
698 process.set_blockers(self.conn, blockers)
699 process.set_enables(self.conn, enables)
700 process.set_disables(self.conn, disables)
701 process.calendarize = calendarize
702 process.save(self.conn)
703 assert isinstance(process.id_, int)
704 new_step_title = None
705 steps: list[ProcessStep] = []
706 for step_id in keep_steps:
707 if step_id not in step_ids:
708 raise BadFormatException('trying to keep unknown step')
709 step = ProcessStep(step_id, process.id_,
710 step_process_id_to[step_id],
711 step_parent_id_to[step_id])
713 for step_id in step_ids:
714 new = [ProcessStep(None, process.id_, step_process_id, step_id)
715 for step_process_id in new_steps_to[step_id]]
717 for step_identifier in new_top_steps:
719 step_process_id = int(step_identifier)
720 step = ProcessStep(None, process.id_, step_process_id, None)
723 new_step_title = step_identifier
724 process.set_steps(self.conn, steps)
725 process.set_step_suppressions(self.conn, suppresses)
727 new_owner_title = None
728 for owner_identifier in step_of:
730 owners_to_set += [int(owner_identifier)]
732 new_owner_title = owner_identifier
733 process.set_owners(self.conn, owners_to_set)
734 params = f'id={process.id_}'
736 title_b64_encoded = b64encode(new_step_title.encode()).decode()
737 params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
738 elif new_owner_title:
739 title_b64_encoded = b64encode(new_owner_title.encode()).decode()
740 params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
741 process.save(self.conn)
742 return f'/process?{params}'
744 def do_POST_condition_descriptions(self) -> str:
745 """Update history timestamps for Condition.description."""
746 return self._change_versioned_timestamps(Condition, 'description')
748 def do_POST_condition_titles(self) -> str:
749 """Update history timestamps for Condition.title."""
750 return self._change_versioned_timestamps(Condition, 'title')
752 @_delete_or_post(Condition, '/conditions')
753 def do_POST_condition(self, condition: Condition) -> str:
754 """Update/insert Condition of ?id= and fields defined in postvars."""
755 is_active = self._form_data.get_str('is_active') == 'True'
756 title = self._form_data.get_str('title')
757 description = self._form_data.get_str('description')
758 condition.is_active = is_active
759 condition.title.set(title)
760 condition.description.set(description)
761 condition.save(self.conn)
762 return f'/condition?id={condition.id_}'