1 """Web server stuff."""
2 from __future__ import annotations
3 from dataclasses import dataclass
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from binascii import Error as binascii_Exception
7 from http.server import BaseHTTPRequestHandler
8 from http.server import HTTPServer
9 from urllib.parse import urlparse, parse_qs
10 from json import dumps as json_dumps
11 from os.path import split as path_split
12 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
13 from plomtask.dating import date_in_n_days
14 from plomtask.days import Day
15 from plomtask.exceptions import (HandledException, BadFormatException,
17 from plomtask.db import DatabaseConnection, DatabaseFile
18 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
19 from plomtask.conditions import Condition
20 from plomtask.todos import Todo
22 TEMPLATES_DIR = 'templates'
25 class TaskServer(HTTPServer):
26 """Variant of HTTPServer that knows .jinja as Jinja Environment."""
28 def __init__(self, db_file: DatabaseFile,
29 *args: Any, **kwargs: Any) -> None:
30 super().__init__(*args, **kwargs)
32 self.headers: list[tuple[str, str]] = []
33 self._render_mode = 'html'
34 self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
36 def set_json_mode(self) -> None:
37 """Make server send JSON instead of HTML responses."""
38 self._render_mode = 'json'
39 self.headers += [('Content-Type', 'application/json')]
42 def ctx_to_json(ctx: dict[str, object]) -> str:
43 """Render ctx into JSON string."""
44 def walk_ctx(node: object) -> Any:
45 if hasattr(node, 'as_dict_into_reference'):
46 if hasattr(node, 'id_') and node.id_ is not None:
47 return node.as_dict_into_reference(library)
48 if hasattr(node, 'as_dict'):
50 if isinstance(node, (list, tuple)):
51 return [walk_ctx(x) for x in node]
52 if isinstance(node, dict):
54 for k, v in node.items():
57 if isinstance(node, HandledException):
60 library: dict[str, dict[str | int, object]] = {}
61 for k, v in ctx.items():
63 ctx['_library'] = library
64 return json_dumps(ctx)
66 def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
67 """Render ctx according to self._render_mode.."""
68 tmpl_name = f'{tmpl_name}.{self._render_mode}'
69 if 'html' == self._render_mode:
70 template = self._jinja.get_template(tmpl_name)
71 return template.render(ctx)
72 return self.__class__.ctx_to_json(ctx)
76 """Wrapper for validating and retrieving dict-like HTTP inputs."""
78 def __init__(self, dict_: dict[str, list[str]],
79 strictness: bool = True) -> None:
81 self.strict = strictness
83 def get_str(self, key: str, default: str = '',
84 ignore_strict: bool = False) -> str:
85 """Retrieve single/first string value of key, or default."""
86 if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
87 if self.strict and not ignore_strict:
88 raise BadFormatException(f'no value found for key {key}')
90 return self.inputs[key][0]
92 def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
93 """Retrieve dict of (first) strings at key starting with prefix."""
95 for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
96 ret[key] = self.inputs[key][0]
99 def get_int(self, key: str) -> int:
100 """Retrieve single/first value of key as int, error if empty."""
101 val = self.get_int_or_none(key)
103 raise BadFormatException(f'unexpected empty value for: {key}')
106 def get_int_or_none(self, key: str) -> int | None:
107 """Retrieve single/first value of key as int, return None if empty."""
108 val = self.get_str(key, ignore_strict=True)
113 except ValueError as e:
114 msg = f'cannot int form field value for key {key}: {val}'
115 raise BadFormatException(msg) from e
117 def get_float(self, key: str) -> float:
118 """Retrieve float value of key from self.postvars."""
119 val = self.get_str(key)
122 except ValueError as e:
123 msg = f'cannot float form field value for key {key}: {val}'
124 raise BadFormatException(msg) from e
126 def get_all_str(self, key: str) -> list[str]:
127 """Retrieve list of string values at key."""
128 if key not in self.inputs.keys():
130 return self.inputs[key]
132 def get_all_int(self, key: str) -> list[int]:
133 """Retrieve list of int values at key."""
134 all_str = self.get_all_str(key)
136 return [int(s) for s in all_str if len(s) > 0]
137 except ValueError as e:
138 msg = f'cannot int a form field value for key {key} in: {all_str}'
139 raise BadFormatException(msg) from e
141 def get_all_floats_or_nones(self, key: str) -> list[float | None]:
142 """Retrieve list of float value at key, None if empty strings."""
143 ret: list[float | None] = []
144 for val in self.get_all_str(key):
150 except ValueError as e:
151 msg = f'cannot float form field value for key {key}: {val}'
152 raise BadFormatException(msg) from e
156 class TaskHandler(BaseHTTPRequestHandler):
157 """Handles single HTTP request."""
158 # pylint: disable=too-many-public-methods
160 conn: DatabaseConnection
162 _form_data: InputsParser
163 _params: InputsParser
170 """Send ctx as proper HTTP response."""
171 body = self.server.render(ctx, tmpl_name)
172 self.send_response(code)
173 for header_tuple in self.server.headers:
174 self.send_header(*header_tuple)
176 self.wfile.write(bytes(body, 'utf-8'))
179 def _request_wrapper(http_method: str, not_found_msg: str
180 ) -> Callable[..., Callable[[TaskHandler], None]]:
181 """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
183 Among other things, conditionally cleans all caches, but only on POST
184 requests, as only those are expected to change the states of objects
185 that may be cached, and certainly only those are expected to write any
186 changes to the database. We want to call them as early though as
187 possible here, either exactly after the specific request handler
188 returns successfully, or right after any exception is triggered –
189 otherwise, race conditions become plausible.
191 Note that otherwise any POST attempt, even a failed one, may end in
192 problematic inconsistencies:
194 - if the POST handler experiences an Exception, changes to objects
195 won't get written to the DB, but the changed objects may remain in
196 the cache and affect other objects despite their possibly illegal
199 - even if an object was just saved to the DB, we cannot be sure its
200 current state is completely identical to what we'd get if loading it
201 fresh from the DB (e.g. currently Process.n_owners is only updated
202 when loaded anew via .from_table_row, nor is its state written to
203 the DB by .save; a questionable design choice, but proof that we
204 have no guarantee that objects' .save stores all their states we'd
205 prefer at their most up-to-date.
208 def clear_caches() -> None:
209 for cls in (Day, Todo, Condition, Process, ProcessStep):
210 assert hasattr(cls, 'empty_cache')
213 def decorator(f: Callable[..., str | None]
214 ) -> Callable[[TaskHandler], None]:
215 def wrapper(self: TaskHandler) -> None:
216 # pylint: disable=protected-access
217 # (because pylint here fails to detect the use of wrapper as a
218 # method to self with respective access privileges)
220 self.conn = DatabaseConnection(self.server.db)
221 parsed_url = urlparse(self.path)
222 self._site = path_split(parsed_url.path)[1]
223 params = parse_qs(parsed_url.query, strict_parsing=True)
224 self._params = InputsParser(params, False)
225 handler_name = f'do_{http_method}_{self._site}'
226 if hasattr(self, handler_name):
227 handler = getattr(self, handler_name)
228 redir_target = f(self, handler)
229 if 'POST' == http_method:
232 self.send_response(302)
233 self.send_header('Location', redir_target)
236 msg = f'{not_found_msg}: {self._site}'
237 raise NotFoundException(msg)
238 except HandledException as error:
239 if 'POST' == http_method:
242 self._send_page(ctx, 'msg', error.http_code)
248 @_request_wrapper('GET', 'Unknown page')
249 def do_GET(self, handler: Callable[[], str | dict[str, object]]
251 """Render page with result of handler, or redirect if result is str."""
252 tmpl_name = f'{self._site}'
253 ctx_or_redir_target = handler()
254 if isinstance(ctx_or_redir_target, str):
255 return ctx_or_redir_target
256 self._send_page(ctx_or_redir_target, tmpl_name)
259 @_request_wrapper('POST', 'Unknown POST target')
260 def do_POST(self, handler: Callable[[], str]) -> str:
261 """Handle POST with handler, prepare redirection to result."""
262 length = int(self.headers['content-length'])
263 postvars = parse_qs(self.rfile.read(length).decode(),
264 keep_blank_values=True, strict_parsing=True)
265 self._form_data = InputsParser(postvars)
266 redir_target = handler()
273 def _get_item(target_class: Any
274 ) -> Callable[..., Callable[[TaskHandler],
276 def decorator(f: Callable[..., dict[str, object]]
277 ) -> Callable[[TaskHandler], dict[str, object]]:
278 def wrapper(self: TaskHandler) -> dict[str, object]:
279 # pylint: disable=protected-access
280 # (because pylint here fails to detect the use of wrapper as a
281 # method to self with respective access privileges)
282 id_ = self._params.get_int_or_none('id')
283 if target_class.can_create_by_id:
284 item = target_class.by_id_or_create(self.conn, id_)
286 item = target_class.by_id(self.conn, id_)
291 def do_GET_(self) -> str:
292 """Return redirect target on GET /."""
295 def _do_GET_calendar(self) -> dict[str, object]:
296 """Show Days from ?start= to ?end=.
298 Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
299 same, the only difference being the HTML template they are rendered to,
300 which .do_GET selects from their method name.
302 start = self._params.get_str('start')
303 end = self._params.get_str('end')
305 end = date_in_n_days(366)
306 ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
307 days, start, end = ret
308 days = Day.with_filled_gaps(days, start, end)
309 today = date_in_n_days(0)
310 return {'start': start, 'end': end, 'days': days, 'today': today}
312 def do_GET_calendar(self) -> dict[str, object]:
313 """Show Days from ?start= to ?end= – normal view."""
314 return self._do_GET_calendar()
316 def do_GET_calendar_txt(self) -> dict[str, object]:
317 """Show Days from ?start= to ?end= – minimalist view."""
318 return self._do_GET_calendar()
320 def do_GET_day(self) -> dict[str, object]:
321 """Show single Day of ?date=."""
322 date = self._params.get_str('date', date_in_n_days(0))
323 day = Day.by_id_or_create(self.conn, date)
324 make_type = self._params.get_str('make_type')
325 conditions_present = []
328 for todo in day.todos:
329 for condition in todo.conditions + todo.blockers:
330 if condition not in conditions_present:
331 conditions_present += [condition]
332 enablers_for[condition.id_] = [p for p in
333 Process.all(self.conn)
334 if condition in p.enables]
335 disablers_for[condition.id_] = [p for p in
336 Process.all(self.conn)
337 if condition in p.disables]
338 seen_todos: set[int] = set()
339 top_nodes = [t.get_step_tree(seen_todos)
340 for t in day.todos if not t.parents]
342 'top_nodes': top_nodes,
343 'make_type': make_type,
344 'enablers_for': enablers_for,
345 'disablers_for': disablers_for,
346 'conditions_present': conditions_present,
347 'processes': Process.all(self.conn)}
350 def do_GET_todo(self, todo: Todo) -> dict[str, object]:
351 """Show single Todo of ?id=."""
355 """Collect what's useful for Todo steps tree display."""
358 process: Process | None
359 children: list[TodoStepsNode] # pylint: disable=undefined-variable
360 fillable: bool = False
362 def walk_process_steps(id_: int,
363 process_step_nodes: list[ProcessStepsNode],
364 steps_nodes: list[TodoStepsNode]) -> None:
365 for process_step_node in process_step_nodes:
367 node = TodoStepsNode(id_, None, process_step_node.process, [])
368 steps_nodes += [node]
369 walk_process_steps(id_, list(process_step_node.steps.values()),
372 def walk_todo_steps(id_: int, todos: list[Todo],
373 steps_nodes: list[TodoStepsNode]) -> None:
376 for match in [item for item in steps_nodes
378 and item.process == todo.process]:
381 for child in match.children:
382 child.fillable = True
383 walk_todo_steps(id_, todo.children, match.children)
386 node = TodoStepsNode(id_, todo, None, [])
387 steps_nodes += [node]
388 walk_todo_steps(id_, todo.children, node.children)
390 def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
393 for node in steps_nodes:
395 assert isinstance(node.process, Process)
396 assert isinstance(node.process.id_, int)
397 ids.add(node.process.id_)
398 ids = ids | collect_adoptables_keys(node.children)
401 todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
402 process_tree = todo.process.get_steps(self.conn, None)
403 steps_todo_to_process: list[TodoStepsNode] = []
404 walk_process_steps(0, list(process_tree.values()),
405 steps_todo_to_process)
406 for steps_node in steps_todo_to_process:
407 steps_node.fillable = True
408 walk_todo_steps(len(steps_todo_to_process), todo_steps,
409 steps_todo_to_process)
410 adoptables: dict[int, list[Todo]] = {}
411 any_adoptables = [Todo.by_id(self.conn, t.id_)
412 for t in Todo.by_date(self.conn, todo.date)
415 for id_ in collect_adoptables_keys(steps_todo_to_process):
416 adoptables[id_] = [t for t in any_adoptables
417 if t.process.id_ == id_]
418 return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
419 'adoption_candidates_for': adoptables,
420 'process_candidates': Process.all(self.conn),
421 'todo_candidates': any_adoptables,
422 'condition_candidates': Condition.all(self.conn)}
424 def do_GET_todos(self) -> dict[str, object]:
425 """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
426 sort_by = self._params.get_str('sort_by')
427 start = self._params.get_str('start')
428 end = self._params.get_str('end')
429 process_id = self._params.get_int_or_none('process_id')
430 comment_pattern = self._params.get_str('comment_pattern')
432 ret = Todo.by_date_range_with_limits(self.conn, (start, end))
433 todos_by_date_range, start, end = ret
434 todos = [t for t in todos_by_date_range
435 if comment_pattern in t.comment
436 and ((not process_id) or t.process.id_ == process_id)]
437 sort_by = Todo.sort_by(todos, sort_by)
438 return {'start': start, 'end': end, 'process_id': process_id,
439 'comment_pattern': comment_pattern, 'todos': todos,
440 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
442 def do_GET_conditions(self) -> dict[str, object]:
443 """Show all Conditions."""
444 pattern = self._params.get_str('pattern')
445 sort_by = self._params.get_str('sort_by')
446 conditions = Condition.matching(self.conn, pattern)
447 sort_by = Condition.sort_by(conditions, sort_by)
448 return {'conditions': conditions,
452 @_get_item(Condition)
453 def do_GET_condition(self, c: Condition) -> dict[str, object]:
454 """Show Condition of ?id=."""
455 ps = Process.all(self.conn)
456 return {'condition': c, 'is_new': c.id_ is None,
457 'enabled_processes': [p for p in ps if c in p.conditions],
458 'disabled_processes': [p for p in ps if c in p.blockers],
459 'enabling_processes': [p for p in ps if c in p.enables],
460 'disabling_processes': [p for p in ps if c in p.disables]}
462 @_get_item(Condition)
463 def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
464 """Show title history of Condition of ?id=."""
465 return {'condition': c}
467 @_get_item(Condition)
468 def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
469 """Show description historys of Condition of ?id=."""
470 return {'condition': c}
473 def do_GET_process(self, process: Process) -> dict[str, object]:
474 """Show Process of ?id=."""
475 owner_ids = self._params.get_all_int('step_to')
476 owned_ids = self._params.get_all_int('has_step')
477 title_64 = self._params.get_str('title_b64')
480 title = b64decode(title_64.encode()).decode()
481 except binascii_Exception as exc:
482 msg = 'invalid base64 for ?title_b64='
483 raise BadFormatException(msg) from exc
484 process.title.set(title)
485 preset_top_step = None
486 owners = process.used_as_step_by(self.conn)
487 for step_id in owner_ids:
488 owners += [Process.by_id(self.conn, step_id)]
489 for process_id in owned_ids:
490 Process.by_id(self.conn, process_id) # to ensure ID exists
491 preset_top_step = process_id
492 return {'process': process, 'is_new': process.id_ is None,
493 'preset_top_step': preset_top_step,
494 'steps': process.get_steps(self.conn), 'owners': owners,
495 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
496 'process_candidates': Process.all(self.conn),
497 'condition_candidates': Condition.all(self.conn)}
500 def do_GET_process_titles(self, p: Process) -> dict[str, object]:
501 """Show title history of Process of ?id=."""
502 return {'process': p}
505 def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
506 """Show description historys of Process of ?id=."""
507 return {'process': p}
510 def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
511 """Show default effort history of Process of ?id=."""
512 return {'process': p}
514 def do_GET_processes(self) -> dict[str, object]:
515 """Show all Processes."""
516 pattern = self._params.get_str('pattern')
517 sort_by = self._params.get_str('sort_by')
518 processes = Process.matching(self.conn, pattern)
519 sort_by = Process.sort_by(processes, sort_by)
520 return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
525 def _delete_or_post(target_class: Any, redir_target: str = '/'
526 ) -> Callable[..., Callable[[TaskHandler], str]]:
527 def decorator(f: Callable[..., str]
528 ) -> Callable[[TaskHandler], str]:
529 def wrapper(self: TaskHandler) -> str:
530 # pylint: disable=protected-access
531 # (because pylint here fails to detect the use of wrapper as a
532 # method to self with respective access privileges)
533 id_ = self._params.get_int_or_none('id')
534 for _ in self._form_data.get_all_str('delete'):
536 msg = 'trying to delete non-saved ' +\
537 f'{target_class.__name__}'
538 raise NotFoundException(msg)
539 item = target_class.by_id(self.conn, id_)
540 item.remove(self.conn)
542 if target_class.can_create_by_id:
543 item = target_class.by_id_or_create(self.conn, id_)
545 item = target_class.by_id(self.conn, id_)
550 def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
551 """Update history timestamps for VersionedAttribute."""
552 id_ = self._params.get_int_or_none('id')
553 item = cls.by_id(self.conn, id_)
554 attr = getattr(item, attr_name)
555 for k, v in self._form_data.get_first_strings_starting('at:').items():
558 attr.reset_timestamp(old, f'{v}.0')
560 return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
562 def do_POST_day(self) -> str:
563 """Update or insert Day of date and Todos mapped to it."""
564 # pylint: disable=too-many-locals
565 date = self._params.get_str('date')
566 day_comment = self._form_data.get_str('day_comment')
567 make_type = self._form_data.get_str('make_type')
568 old_todos = self._form_data.get_all_int('todo_id')
569 new_todos = self._form_data.get_all_int('new_todo')
570 comments = self._form_data.get_all_str('comment')
571 efforts = self._form_data.get_all_floats_or_nones('effort')
572 done_todos = self._form_data.get_all_int('done')
573 for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
574 raise BadFormatException('"done" field refers to unknown Todo')
575 is_done = [t_id in done_todos for t_id in old_todos]
576 if not (len(old_todos) == len(is_done) == len(comments)
578 msg = 'not equal number each of number of todo_id, comments, ' +\
580 raise BadFormatException(msg)
581 day = Day.by_id_or_create(self.conn, date)
582 day.comment = day_comment
584 for process_id in sorted(new_todos):
585 if 'empty' == make_type:
586 process = Process.by_id(self.conn, process_id)
587 todo = Todo(None, process, False, date)
590 Todo.create_with_children(self.conn, process_id, date)
591 for i, todo_id in enumerate(old_todos):
592 todo = Todo.by_id(self.conn, todo_id)
593 todo.is_done = is_done[i]
594 todo.comment = comments[i]
595 todo.effort = efforts[i]
597 return f'/day?date={date}&make_type={make_type}'
599 @_delete_or_post(Todo, '/')
600 def do_POST_todo(self, todo: Todo) -> str:
601 """Update Todo and its children."""
602 # pylint: disable=too-many-locals
603 adopted_child_ids = self._form_data.get_all_int('adopt')
604 processes_to_make_full = self._form_data.get_all_int('make_full')
605 processes_to_make_empty = self._form_data.get_all_int('make_empty')
606 fill_fors = self._form_data.get_first_strings_starting('fill_for_')
607 effort = self._form_data.get_str('effort', ignore_strict=True)
608 conditions = self._form_data.get_all_int('conditions')
609 disables = self._form_data.get_all_int('disables')
610 blockers = self._form_data.get_all_int('blockers')
611 enables = self._form_data.get_all_int('enables')
612 is_done = len(self._form_data.get_all_str('done')) > 0
613 calendarize = len(self._form_data.get_all_str('calendarize')) > 0
614 comment = self._form_data.get_str('comment', ignore_strict=True)
615 for v in fill_fors.values():
616 if v.startswith('make_empty_'):
617 processes_to_make_empty += [int(v[11:])]
618 elif v.startswith('make_full_'):
619 processes_to_make_full += [int(v[10:])]
621 adopted_child_ids += [int(v)]
623 for child in todo.children:
624 assert isinstance(child.id_, int)
625 if child.id_ not in adopted_child_ids:
626 to_remove += [child.id_]
627 for id_ in to_remove:
628 child = Todo.by_id(self.conn, id_)
629 todo.remove_child(child)
630 for child_id in adopted_child_ids:
631 if child_id in [c.id_ for c in todo.children]:
633 child = Todo.by_id(self.conn, child_id)
634 todo.add_child(child)
635 for process_id in processes_to_make_empty:
636 process = Process.by_id(self.conn, process_id)
637 made = Todo(None, process, False, todo.date)
640 for process_id in processes_to_make_full:
641 made = Todo.create_with_children(self.conn, process_id, todo.date)
643 todo.effort = float(effort) if effort else None
644 todo.set_conditions(self.conn, conditions)
645 todo.set_blockers(self.conn, blockers)
646 todo.set_enables(self.conn, enables)
647 todo.set_disables(self.conn, disables)
648 todo.is_done = is_done
649 todo.calendarize = calendarize
650 todo.comment = comment
652 return f'/todo?id={todo.id_}'
654 def do_POST_process_descriptions(self) -> str:
655 """Update history timestamps for Process.description."""
656 return self._change_versioned_timestamps(Process, 'description')
658 def do_POST_process_efforts(self) -> str:
659 """Update history timestamps for Process.effort."""
660 return self._change_versioned_timestamps(Process, 'effort')
662 def do_POST_process_titles(self) -> str:
663 """Update history timestamps for Process.title."""
664 return self._change_versioned_timestamps(Process, 'title')
666 @_delete_or_post(Process, '/processes')
667 def do_POST_process(self, process: Process) -> str:
668 """Update or insert Process of ?id= and fields defined in postvars."""
669 # pylint: disable=too-many-locals
670 # pylint: disable=too-many-statements
671 title = self._form_data.get_str('title')
672 description = self._form_data.get_str('description')
673 effort = self._form_data.get_float('effort')
674 conditions = self._form_data.get_all_int('conditions')
675 blockers = self._form_data.get_all_int('blockers')
676 enables = self._form_data.get_all_int('enables')
677 disables = self._form_data.get_all_int('disables')
678 calendarize = self._form_data.get_all_str('calendarize') != []
679 suppresses = self._form_data.get_all_int('suppresses')
680 step_of = self._form_data.get_all_str('step_of')
681 keep_steps = self._form_data.get_all_int('keep_step')
682 step_ids = self._form_data.get_all_int('steps')
683 new_top_steps = self._form_data.get_all_str('new_top_step')
684 step_process_id_to = {}
685 step_parent_id_to = {}
687 for step_id in step_ids:
688 name = f'new_step_to_{step_id}'
689 new_steps_to[step_id] = self._form_data.get_all_int(name)
690 for step_id in keep_steps:
691 name = f'step_{step_id}_process_id'
692 step_process_id_to[step_id] = self._form_data.get_int(name)
693 name = f'step_{step_id}_parent_id'
694 step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
695 process.title.set(title)
696 process.description.set(description)
697 process.effort.set(effort)
698 process.set_conditions(self.conn, conditions)
699 process.set_blockers(self.conn, blockers)
700 process.set_enables(self.conn, enables)
701 process.set_disables(self.conn, disables)
702 process.calendarize = calendarize
703 process.save(self.conn)
704 assert isinstance(process.id_, int)
705 new_step_title = None
706 steps: list[ProcessStep] = []
707 for step_id in keep_steps:
708 if step_id not in step_ids:
709 raise BadFormatException('trying to keep unknown step')
710 step = ProcessStep(step_id, process.id_,
711 step_process_id_to[step_id],
712 step_parent_id_to[step_id])
714 for step_id in step_ids:
715 new = [ProcessStep(None, process.id_, step_process_id, step_id)
716 for step_process_id in new_steps_to[step_id]]
718 for step_identifier in new_top_steps:
720 step_process_id = int(step_identifier)
721 step = ProcessStep(None, process.id_, step_process_id, None)
724 new_step_title = step_identifier
725 process.set_steps(self.conn, steps)
726 process.set_step_suppressions(self.conn, suppresses)
728 new_owner_title = None
729 for owner_identifier in step_of:
731 owners_to_set += [int(owner_identifier)]
733 new_owner_title = owner_identifier
734 process.set_owners(self.conn, owners_to_set)
735 params = f'id={process.id_}'
737 title_b64_encoded = b64encode(new_step_title.encode()).decode()
738 params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
739 elif new_owner_title:
740 title_b64_encoded = b64encode(new_owner_title.encode()).decode()
741 params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
742 process.save(self.conn)
743 return f'/process?{params}'
745 def do_POST_condition_descriptions(self) -> str:
746 """Update history timestamps for Condition.description."""
747 return self._change_versioned_timestamps(Condition, 'description')
749 def do_POST_condition_titles(self) -> str:
750 """Update history timestamps for Condition.title."""
751 return self._change_versioned_timestamps(Condition, 'title')
753 @_delete_or_post(Condition, '/conditions')
754 def do_POST_condition(self, condition: Condition) -> str:
755 """Update/insert Condition of ?id= and fields defined in postvars."""
756 is_active = self._form_data.get_str('is_active') == 'True'
757 title = self._form_data.get_str('title')
758 description = self._form_data.get_str('description')
759 condition.is_active = is_active
760 condition.title.set(title)
761 condition.description.set(description)
762 condition.save(self.conn)
763 return f'/condition?id={condition.id_}'