1 """Web server stuff."""
2 from __future__ import annotations
3 from typing import Any, Callable
4 from base64 import b64encode, b64decode
5 from binascii import Error as binascii_Exception
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
16 from plomtask.db import DatabaseConnection, DatabaseFile, BaseModel
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo, TodoOrProcStepNode, DictableNode
21 TEMPLATES_DIR = 'templates'
24 class TaskServer(HTTPServer):
25 """Variant of HTTPServer that knows .jinja as Jinja Environment."""
27 def __init__(self, db_file: DatabaseFile,
28 *args: Any, **kwargs: Any) -> None:
29 super().__init__(*args, **kwargs)
31 self.render_mode = 'html'
32 self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
36 """Wrapper for validating and retrieving dict-like HTTP inputs."""
38 def __init__(self, dict_: dict[str, list[str]],
39 strictness: bool = True) -> None:
41 self.strict = strictness # return None on absence of key, or fail?
43 def get_str(self, key: str, default: str = '',
44 ignore_strict: bool = False) -> str:
45 """Retrieve single/first string value of key, or default."""
46 if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
47 if self.strict and not ignore_strict:
48 raise NotFoundException(f'no value found for key {key}')
50 return self.inputs[key][0]
52 def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
53 """Retrieve dict of (first) strings at key starting with prefix."""
55 for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
56 ret[key] = self.inputs[key][0]
59 def get_int(self, key: str) -> int:
60 """Retrieve single/first value of key as int, error if empty."""
61 val = self.get_int_or_none(key)
63 raise BadFormatException(f'unexpected empty value for: {key}')
66 def get_int_or_none(self, key: str) -> int | None:
67 """Retrieve single/first value of key as int, return None if empty."""
68 val = self.get_str(key, ignore_strict=True)
73 except ValueError as e:
74 msg = f'cannot int form field value for key {key}: {val}'
75 raise BadFormatException(msg) from e
77 def get_float(self, key: str) -> float:
78 """Retrieve float value of key from self.postvars."""
79 val = self.get_str(key)
82 except ValueError as e:
83 msg = f'cannot float form field value for key {key}: {val}'
84 raise BadFormatException(msg) from e
86 def get_float_or_none(self, key: str) -> float | None:
87 """Retrieve float value of key from self.postvars, None if empty."""
88 val = self.get_str(key)
93 except ValueError as e:
94 msg = f'cannot float form field value for key {key}: {val}'
95 raise BadFormatException(msg) from e
97 def get_all_str(self, key: str) -> list[str]:
98 """Retrieve list of string values at key."""
99 if key not in self.inputs.keys():
101 return self.inputs[key]
103 def get_all_int(self, key: str) -> list[int]:
104 """Retrieve list of int values at key."""
105 all_str = self.get_all_str(key)
107 return [int(s) for s in all_str if len(s) > 0]
108 except ValueError as e:
109 msg = f'cannot int a form field value for key {key} in: {all_str}'
110 raise BadFormatException(msg) from e
112 def get_all_floats_or_nones(self, key: str) -> list[float | None]:
113 """Retrieve list of float value at key, None if empty strings."""
114 ret: list[float | None] = []
115 for val in self.get_all_str(key):
121 except ValueError as e:
122 msg = f'cannot float form field value for key {key}: {val}'
123 raise BadFormatException(msg) from e
127 class TaskHandler(BaseHTTPRequestHandler):
128 """Handles single HTTP request."""
129 # pylint: disable=too-many-public-methods
131 conn: DatabaseConnection
133 _form_data: InputsParser
134 _params: InputsParser
137 self, ctx: dict[str, Any], tmpl_name: str, code: int = 200
139 """HTTP-send ctx as HTML or JSON, as defined by .server.render_mode.
141 The differentiation by .server.render_mode serves to allow easily
142 comparable JSON responses for automatic testing.
145 headers: list[tuple[str, str]] = []
146 if 'html' == self.server.render_mode:
147 tmpl = self.server.jinja.get_template(f'{tmpl_name}.html')
148 body = tmpl.render(ctx)
150 body = self._ctx_to_json(ctx)
151 headers += [('Content-Type', 'application/json')]
152 self.send_response(code)
153 for header_tuple in headers:
154 self.send_header(*header_tuple)
156 self.wfile.write(bytes(body, 'utf-8'))
158 def _ctx_to_json(self, ctx: dict[str, object]) -> str:
159 """Render ctx into JSON string.
161 Flattens any objects that json.dumps might not want to serialize, and
162 turns occurrences of BaseModel objects into listings of their .id_, to
163 be resolved to a full dict inside a top-level '_library' dictionary,
164 to avoid endless and circular nesting.
167 def flatten(node: object) -> object:
169 def update_library_with(
170 item: BaseModel[int] | BaseModel[str]) -> None:
171 cls_name = item.__class__.__name__
172 if cls_name not in library:
173 library[cls_name] = {}
174 if item.id_ not in library[cls_name]:
175 d, refs = item.as_dict_and_refs
176 id_key = '?' if item.id_ is None else item.id_
177 library[cls_name][id_key] = d
179 update_library_with(ref)
181 if isinstance(node, BaseModel):
182 update_library_with(node)
184 if isinstance(node, DictableNode):
185 d, refs = node.as_dict_and_refs
187 update_library_with(ref)
189 if isinstance(node, (list, tuple)):
190 return [flatten(item) for item in node]
191 if isinstance(node, dict):
193 for k, v in node.items():
196 if isinstance(node, HandledException):
200 library: dict[str, dict[str | int, object]] = {}
201 for k, v in ctx.items():
203 ctx['_library'] = library
204 return json_dumps(ctx)
207 def _request_wrapper(http_method: str, not_found_msg: str
208 ) -> Callable[..., Callable[[TaskHandler], None]]:
209 """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
211 Among other things, conditionally cleans all caches, but only on POST
212 requests, as only those are expected to change the states of objects
213 that may be cached, and certainly only those are expected to write any
214 changes to the database. We want to call them as early though as
215 possible here, either exactly after the specific request handler
216 returns successfully, or right after any exception is triggered –
217 otherwise, race conditions become plausible.
219 Note that otherwise any POST attempt, even a failed one, may end in
220 problematic inconsistencies:
222 - if the POST handler experiences an Exception, changes to objects
223 won't get written to the DB, but the changed objects may remain in
224 the cache and affect other objects despite their possibly illegal
227 - even if an object was just saved to the DB, we cannot be sure its
228 current state is completely identical to what we'd get if loading it
229 fresh from the DB (e.g. currently Process.n_owners is only updated
230 when loaded anew via .from_table_row, nor is its state written to
231 the DB by .save; a questionable design choice, but proof that we
232 have no guarantee that objects' .save stores all their states we'd
233 prefer at their most up-to-date.
236 def clear_caches() -> None:
237 for cls in (Day, Todo, Condition, Process, ProcessStep):
238 assert hasattr(cls, 'empty_cache')
241 def decorator(f: Callable[..., str | None]
242 ) -> Callable[[TaskHandler], None]:
243 def wrapper(self: TaskHandler) -> None:
244 # pylint: disable=protected-access
245 # (because pylint here fails to detect the use of wrapper as a
246 # method to self with respective access privileges)
248 self.conn = DatabaseConnection(self.server.db)
249 parsed_url = urlparse(self.path)
250 self._site = path_split(parsed_url.path)[1]
251 params = parse_qs(parsed_url.query, strict_parsing=True)
252 self._params = InputsParser(params, False)
253 handler_name = f'do_{http_method}_{self._site}'
254 if hasattr(self, handler_name):
255 handler = getattr(self, handler_name)
256 redir_target = f(self, handler)
257 if 'POST' == http_method:
260 self.send_response(302)
261 self.send_header('Location', redir_target)
264 msg = f'{not_found_msg}: {self._site}'
265 raise NotFoundException(msg)
266 except HandledException as error:
267 if 'POST' == http_method:
270 self._send_page(ctx, 'msg', error.http_code)
276 @_request_wrapper('GET', 'Unknown page')
277 def do_GET(self, handler: Callable[[], str | dict[str, object]]
279 """Render page with result of handler, or redirect if result is str."""
280 tmpl_name = f'{self._site}'
281 ctx_or_redir_target = handler()
282 if isinstance(ctx_or_redir_target, str):
283 return ctx_or_redir_target
284 self._send_page(ctx_or_redir_target, tmpl_name)
287 @_request_wrapper('POST', 'Unknown POST target')
288 def do_POST(self, handler: Callable[[], str]) -> str:
289 """Handle POST with handler, prepare redirection to result."""
290 length = int(self.headers['content-length'])
291 postvars = parse_qs(self.rfile.read(length).decode(),
292 keep_blank_values=True, strict_parsing=True)
293 self._form_data = InputsParser(postvars)
294 redir_target = handler()
301 def _get_item(target_class: Any
302 ) -> Callable[..., Callable[[TaskHandler],
304 def decorator(f: Callable[..., dict[str, object]]
305 ) -> Callable[[TaskHandler], dict[str, object]]:
306 def wrapper(self: TaskHandler) -> dict[str, object]:
307 # pylint: disable=protected-access
308 # (because pylint here fails to detect the use of wrapper as a
309 # method to self with respective access privileges)
310 id_ = self._params.get_int_or_none('id')
311 if target_class.can_create_by_id:
312 item = target_class.by_id_or_create(self.conn, id_)
314 item = target_class.by_id(self.conn, id_)
319 def do_GET_(self) -> str:
320 """Return redirect target on GET /."""
323 def _do_GET_calendar(self) -> dict[str, object]:
324 """Show Days from ?start= to ?end=.
326 Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
327 same, the only difference being the HTML template they are rendered to,
328 which .do_GET selects from their method name.
330 start, end = self._params.get_str('start'), self._params.get_str('end')
331 end = end if end else date_in_n_days(366)
332 days, start, end = Day.by_date_range_with_limits(self.conn,
334 days = Day.with_filled_gaps(days, start, end)
335 today = date_in_n_days(0)
336 return {'start': start, 'end': end, 'days': days, 'today': today}
338 def do_GET_calendar(self) -> dict[str, object]:
339 """Show Days from ?start= to ?end= – normal view."""
340 return self._do_GET_calendar()
342 def do_GET_calendar_txt(self) -> dict[str, object]:
343 """Show Days from ?start= to ?end= – minimalist view."""
344 return self._do_GET_calendar()
346 def do_GET_day(self) -> dict[str, object]:
347 """Show single Day of ?date=."""
348 date = self._params.get_str('date', date_in_n_days(0))
349 day = Day.by_id_or_create(self.conn, date)
350 make_type = self._params.get_str('make_type')
351 conditions_present = []
354 for todo in day.todos:
355 for condition in todo.conditions + todo.blockers:
356 if condition not in conditions_present:
357 conditions_present += [condition]
358 enablers_for[condition.id_] = [p for p in
359 Process.all(self.conn)
360 if condition in p.enables]
361 disablers_for[condition.id_] = [p for p in
362 Process.all(self.conn)
363 if condition in p.disables]
364 seen_todos: set[int] = set()
365 top_nodes = [t.get_step_tree(seen_todos)
366 for t in day.todos if not t.parents]
368 'top_nodes': top_nodes,
369 'make_type': make_type,
370 'enablers_for': enablers_for,
371 'disablers_for': disablers_for,
372 'conditions_present': conditions_present,
373 'processes': Process.all(self.conn)}
376 def do_GET_todo(self, todo: Todo) -> dict[str, object]:
377 """Show single Todo of ?id=."""
379 def walk_process_steps(node_id: int,
380 process_step_nodes: list[ProcessStepsNode],
381 steps_nodes: list[TodoOrProcStepNode]) -> int:
382 for process_step_node in process_step_nodes:
384 node = TodoOrProcStepNode(node_id, None,
385 process_step_node.process, [])
386 steps_nodes += [node]
387 node_id = walk_process_steps(
388 node_id, list(process_step_node.steps.values()),
392 def walk_todo_steps(node_id: int, todos: list[Todo],
393 steps_nodes: list[TodoOrProcStepNode]) -> int:
396 for match in [item for item in steps_nodes
398 and item.process == todo.process]:
401 for child in match.children:
402 child.fillable = True
403 node_id = walk_todo_steps(
404 node_id, todo.children, match.children)
407 node = TodoOrProcStepNode(node_id, todo, None, [])
408 steps_nodes += [node]
409 node_id = walk_todo_steps(
410 node_id, todo.children, node.children)
413 def collect_adoptables_keys(
414 steps_nodes: list[TodoOrProcStepNode]) -> set[int]:
416 for node in steps_nodes:
418 assert isinstance(node.process, Process)
419 assert isinstance(node.process.id_, int)
420 ids.add(node.process.id_)
421 ids = ids | collect_adoptables_keys(node.children)
424 todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
425 process_tree = todo.process.get_steps(self.conn, None)
426 steps_todo_to_process: list[TodoOrProcStepNode] = []
427 last_node_id = walk_process_steps(
428 0, list(process_tree.values()), steps_todo_to_process)
429 for steps_node in steps_todo_to_process:
430 steps_node.fillable = True
431 walk_todo_steps(last_node_id, todo_steps, steps_todo_to_process)
432 adoptables: dict[int, list[Todo]] = {}
433 any_adoptables = [Todo.by_id(self.conn, t.id_)
434 for t in Todo.by_date(self.conn, todo.date)
437 for id_ in collect_adoptables_keys(steps_todo_to_process):
438 adoptables[id_] = [t for t in any_adoptables
439 if t.process.id_ == id_]
440 return {'todo': todo,
441 'steps_todo_to_process': steps_todo_to_process,
442 'adoption_candidates_for': adoptables,
443 'process_candidates': sorted(Process.all(self.conn)),
444 'todo_candidates': any_adoptables,
445 'condition_candidates': Condition.all(self.conn)}
447 def do_GET_todos(self) -> dict[str, object]:
448 """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
449 sort_by = self._params.get_str('sort_by')
450 start = self._params.get_str('start')
451 end = self._params.get_str('end')
452 process_id = self._params.get_int_or_none('process_id')
453 comment_pattern = self._params.get_str('comment_pattern')
455 ret = Todo.by_date_range_with_limits(self.conn, (start, end))
456 todos_by_date_range, start, end = ret
457 todos = [t for t in todos_by_date_range
458 if comment_pattern in t.comment
459 and ((not process_id) or t.process.id_ == process_id)]
460 sort_by = Todo.sort_by(todos, sort_by)
461 return {'start': start, 'end': end, 'process_id': process_id,
462 'comment_pattern': comment_pattern, 'todos': todos,
463 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
465 def do_GET_conditions(self) -> dict[str, object]:
466 """Show all Conditions."""
467 pattern = self._params.get_str('pattern')
468 sort_by = self._params.get_str('sort_by')
469 conditions = Condition.matching(self.conn, pattern)
470 sort_by = Condition.sort_by(conditions, sort_by)
471 return {'conditions': conditions,
475 @_get_item(Condition)
476 def do_GET_condition(self, c: Condition) -> dict[str, object]:
477 """Show Condition of ?id=."""
478 ps = Process.all(self.conn)
479 return {'condition': c, 'is_new': c.id_ is None,
480 'enabled_processes': [p for p in ps if c in p.conditions],
481 'disabled_processes': [p for p in ps if c in p.blockers],
482 'enabling_processes': [p for p in ps if c in p.enables],
483 'disabling_processes': [p for p in ps if c in p.disables]}
485 @_get_item(Condition)
486 def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
487 """Show title history of Condition of ?id=."""
488 return {'condition': c}
490 @_get_item(Condition)
491 def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
492 """Show description historys of Condition of ?id=."""
493 return {'condition': c}
496 def do_GET_process(self, process: Process) -> dict[str, object]:
497 """Show Process of ?id=."""
498 owner_ids = self._params.get_all_int('step_to')
499 owned_ids = self._params.get_all_int('has_step')
500 title_64 = self._params.get_str('title_b64')
503 title = b64decode(title_64.encode()).decode()
504 except binascii_Exception as exc:
505 msg = 'invalid base64 for ?title_b64='
506 raise BadFormatException(msg) from exc
507 process.title.set(title)
508 preset_top_step = None
509 owners = process.used_as_step_by(self.conn)
510 for step_id in owner_ids:
511 owners += [Process.by_id(self.conn, step_id)]
512 for process_id in owned_ids:
513 Process.by_id(self.conn, process_id) # to ensure ID exists
514 preset_top_step = process_id
515 return {'process': process, 'is_new': process.id_ is None,
516 'preset_top_step': preset_top_step,
517 'steps': process.get_steps(self.conn), 'owners': owners,
518 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
519 'process_candidates': Process.all(self.conn),
520 'condition_candidates': Condition.all(self.conn)}
523 def do_GET_process_titles(self, p: Process) -> dict[str, object]:
524 """Show title history of Process of ?id=."""
525 return {'process': p}
528 def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
529 """Show description historys of Process of ?id=."""
530 return {'process': p}
533 def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
534 """Show default effort history of Process of ?id=."""
535 return {'process': p}
537 def do_GET_processes(self) -> dict[str, object]:
538 """Show all Processes."""
539 pattern = self._params.get_str('pattern')
540 sort_by = self._params.get_str('sort_by')
541 processes = Process.matching(self.conn, pattern)
542 sort_by = Process.sort_by(processes, sort_by)
543 return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
548 def _delete_or_post(target_class: Any, redir_target: str = '/'
549 ) -> Callable[..., Callable[[TaskHandler], str]]:
550 def decorator(f: Callable[..., str]
551 ) -> Callable[[TaskHandler], str]:
552 def wrapper(self: TaskHandler) -> str:
553 # pylint: disable=protected-access
554 # (because pylint here fails to detect the use of wrapper as a
555 # method to self with respective access privileges)
556 id_ = self._params.get_int_or_none('id')
557 for _ in self._form_data.get_all_str('delete'):
559 msg = 'trying to delete non-saved ' +\
560 f'{target_class.__name__}'
561 raise NotFoundException(msg)
562 item = target_class.by_id(self.conn, id_)
563 item.remove(self.conn)
565 if target_class.can_create_by_id:
566 item = target_class.by_id_or_create(self.conn, id_)
568 item = target_class.by_id(self.conn, id_)
573 def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
574 """Update history timestamps for VersionedAttribute."""
575 id_ = self._params.get_int_or_none('id')
576 item = cls.by_id(self.conn, id_)
577 attr = getattr(item, attr_name)
578 for k, v in self._form_data.get_first_strings_starting('at:').items():
581 attr.reset_timestamp(old, f'{v}.0')
583 return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
585 def do_POST_day(self) -> str:
586 """Update or insert Day of date and Todos mapped to it."""
587 # pylint: disable=too-many-locals
589 date = self._params.get_str('date')
590 day_comment = self._form_data.get_str('day_comment')
591 make_type = self._form_data.get_str('make_type')
592 except NotFoundException as e:
593 raise BadFormatException from e
594 old_todos = self._form_data.get_all_int('todo_id')
595 new_todos = self._form_data.get_all_int('new_todo')
596 comments = self._form_data.get_all_str('comment')
597 efforts = self._form_data.get_all_floats_or_nones('effort')
598 done_todos = self._form_data.get_all_int('done')
599 for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
600 raise BadFormatException('"done" field refers to unknown Todo')
601 is_done = [t_id in done_todos for t_id in old_todos]
602 if not (len(old_todos) == len(is_done) == len(comments)
604 msg = 'not equal number each of number of todo_id, comments, ' +\
606 raise BadFormatException(msg)
607 day = Day.by_id_or_create(self.conn, date)
608 day.comment = day_comment
610 for process_id in sorted(new_todos):
611 if 'empty' == make_type:
612 process = Process.by_id(self.conn, process_id)
613 todo = Todo(None, process, False, date)
616 Todo.create_with_children(self.conn, process_id, date)
617 for i, todo_id in enumerate(old_todos):
618 todo = Todo.by_id(self.conn, todo_id)
619 todo.is_done = is_done[i]
620 todo.comment = comments[i]
621 todo.effort = efforts[i]
623 return f'/day?date={date}&make_type={make_type}'
625 @_delete_or_post(Todo, '/')
626 def do_POST_todo(self, todo: Todo) -> str:
627 """Update Todo and its children."""
628 # pylint: disable=too-many-locals
629 # pylint: disable=too-many-branches
630 # pylint: disable=too-many-statements
631 adopted_child_ids = self._form_data.get_all_int('adopt')
632 processes_to_make_full = self._form_data.get_all_int('make_full')
633 processes_to_make_empty = self._form_data.get_all_int('make_empty')
634 step_fillers = self._form_data.get_all_str('step_filler')
635 with_effort_post = True
637 effort = self._form_data.get_float_or_none('effort')
638 except NotFoundException:
639 with_effort_post = False
640 conditions = self._form_data.get_all_int('conditions')
641 disables = self._form_data.get_all_int('disables')
642 blockers = self._form_data.get_all_int('blockers')
643 enables = self._form_data.get_all_int('enables')
644 is_done = len(self._form_data.get_all_str('done')) > 0
645 calendarize = len(self._form_data.get_all_str('calendarize')) > 0
646 comment = self._form_data.get_str('comment', ignore_strict=True)
647 for filler in step_fillers:
650 for prefix in [p for p in ['make_empty_', 'make_full_']
651 if filler.startswith(p)]:
652 to_int = filler[len(prefix):]
654 target_id = int(to_int)
655 except ValueError as e:
656 msg = 'bad fill_for target: {filler}'
657 raise BadFormatException(msg) from e
658 if filler.startswith('make_empty_'):
659 processes_to_make_empty += [target_id]
660 elif filler.startswith('make_full_'):
661 processes_to_make_full += [target_id]
662 elif filler != 'ignore':
663 adopted_child_ids += [target_id]
665 for child in todo.children:
666 assert isinstance(child.id_, int)
667 if child.id_ not in adopted_child_ids:
668 to_remove += [child.id_]
669 for id_ in to_remove:
670 child = Todo.by_id(self.conn, id_)
671 todo.remove_child(child)
672 for child_id in adopted_child_ids:
673 if child_id in [c.id_ for c in todo.children]:
675 child = Todo.by_id(self.conn, child_id)
676 todo.add_child(child)
677 for process_id in processes_to_make_empty:
678 process = Process.by_id(self.conn, process_id)
679 made = Todo(None, process, False, todo.date)
682 for process_id in processes_to_make_full:
683 made = Todo.create_with_children(self.conn, process_id, todo.date)
687 todo.set_conditions(self.conn, conditions)
688 todo.set_blockers(self.conn, blockers)
689 todo.set_enables(self.conn, enables)
690 todo.set_disables(self.conn, disables)
691 todo.is_done = is_done
692 todo.calendarize = calendarize
693 todo.comment = comment
694 # todo.save() may destroy Todo if .effort < 0, so retrieve .id_ early
695 url = f'/todo?id={todo.id_}'
699 def do_POST_process_descriptions(self) -> str:
700 """Update history timestamps for Process.description."""
701 return self._change_versioned_timestamps(Process, 'description')
703 def do_POST_process_efforts(self) -> str:
704 """Update history timestamps for Process.effort."""
705 return self._change_versioned_timestamps(Process, 'effort')
707 def do_POST_process_titles(self) -> str:
708 """Update history timestamps for Process.title."""
709 return self._change_versioned_timestamps(Process, 'title')
711 @_delete_or_post(Process, '/processes')
712 def do_POST_process(self, process: Process) -> str:
713 """Update or insert Process of ?id= and fields defined in postvars."""
714 # pylint: disable=too-many-locals
715 # pylint: disable=too-many-statements
717 title = self._form_data.get_str('title')
718 description = self._form_data.get_str('description')
719 effort = self._form_data.get_float('effort')
720 except NotFoundException as e:
721 raise BadFormatException from e
722 conditions = self._form_data.get_all_int('conditions')
723 blockers = self._form_data.get_all_int('blockers')
724 enables = self._form_data.get_all_int('enables')
725 disables = self._form_data.get_all_int('disables')
726 calendarize = self._form_data.get_all_str('calendarize') != []
727 suppresses = self._form_data.get_all_int('suppresses')
728 step_of = self._form_data.get_all_str('step_of')
729 keep_steps = self._form_data.get_all_int('keep_step')
730 step_ids = self._form_data.get_all_int('steps')
731 new_top_steps = self._form_data.get_all_str('new_top_step')
732 step_process_id_to = {}
733 step_parent_id_to = {}
735 for step_id in step_ids:
736 name = f'new_step_to_{step_id}'
737 new_steps_to[step_id] = self._form_data.get_all_int(name)
738 for step_id in keep_steps:
739 name = f'step_{step_id}_process_id'
740 step_process_id_to[step_id] = self._form_data.get_int(name)
741 name = f'step_{step_id}_parent_id'
742 step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
743 process.title.set(title)
744 process.description.set(description)
745 process.effort.set(effort)
746 process.set_conditions(self.conn, conditions)
747 process.set_blockers(self.conn, blockers)
748 process.set_enables(self.conn, enables)
749 process.set_disables(self.conn, disables)
750 process.calendarize = calendarize
751 process.save(self.conn)
752 assert isinstance(process.id_, int)
753 new_step_title = None
754 steps: list[ProcessStep] = []
755 for step_id in keep_steps:
756 if step_id not in step_ids:
757 raise BadFormatException('trying to keep unknown step')
758 step = ProcessStep(step_id, process.id_,
759 step_process_id_to[step_id],
760 step_parent_id_to[step_id])
762 for step_id in step_ids:
763 new = [ProcessStep(None, process.id_, step_process_id, step_id)
764 for step_process_id in new_steps_to[step_id]]
766 for step_identifier in new_top_steps:
768 step_process_id = int(step_identifier)
769 step = ProcessStep(None, process.id_, step_process_id, None)
772 new_step_title = step_identifier
773 process.set_steps(self.conn, steps)
774 process.set_step_suppressions(self.conn, suppresses)
776 new_owner_title = None
777 for owner_identifier in step_of:
779 owners_to_set += [int(owner_identifier)]
781 new_owner_title = owner_identifier
782 process.set_owners(self.conn, owners_to_set)
783 params = f'id={process.id_}'
785 title_b64_encoded = b64encode(new_step_title.encode()).decode()
786 params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
787 elif new_owner_title:
788 title_b64_encoded = b64encode(new_owner_title.encode()).decode()
789 params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
790 process.save(self.conn)
791 return f'/process?{params}'
793 def do_POST_condition_descriptions(self) -> str:
794 """Update history timestamps for Condition.description."""
795 return self._change_versioned_timestamps(Condition, 'description')
797 def do_POST_condition_titles(self) -> str:
798 """Update history timestamps for Condition.title."""
799 return self._change_versioned_timestamps(Condition, 'title')
801 @_delete_or_post(Condition, '/conditions')
802 def do_POST_condition(self, condition: Condition) -> str:
803 """Update/insert Condition of ?id= and fields defined in postvars."""
805 is_active = self._form_data.get_str('is_active') == 'True'
806 title = self._form_data.get_str('title')
807 description = self._form_data.get_str('description')
808 except NotFoundException as e:
809 raise BadFormatException(e) from e
810 condition.is_active = is_active
811 condition.title.set(title)
812 condition.description.set(description)
813 condition.save(self.conn)
814 return f'/condition?id={condition.id_}'