1 """Web server stuff."""
2 from __future__ import annotations
3 from typing import Any, Callable
4 from base64 import b64encode, b64decode
5 from binascii import Error as binascii_Exception
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
16 from plomtask.db import DatabaseConnection, DatabaseFile, CtxReferences
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo, TodoStepsNode
21 TEMPLATES_DIR = 'templates'
24 class TaskServer(HTTPServer):
25 """Variant of HTTPServer that knows .jinja as Jinja Environment."""
27 def __init__(self, db_file: DatabaseFile,
28 *args: Any, **kwargs: Any) -> None:
29 super().__init__(*args, **kwargs)
31 self.headers: list[tuple[str, str]] = []
32 self._render_mode = 'html'
33 self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
35 def set_json_mode(self) -> None:
36 """Make server send JSON instead of HTML responses."""
37 self._render_mode = 'json'
38 self.headers += [('Content-Type', 'application/json')]
41 def ctx_to_json(ctx: dict[str, object], conn: DatabaseConnection) -> str:
42 """Render ctx into JSON string."""
44 def walk_ctx(node: object, references: CtxReferences) -> Any:
45 if hasattr(node, 'into_reference'):
46 if hasattr(node, 'id_') and node.id_ is not None:
47 library_growing[0] = True
48 return node.into_reference(references)
49 if hasattr(node, 'as_dict'):
51 if '_references' in d:
52 own_refs = d['_references']
53 if own_refs.update(references):
54 library_growing[0] = True
57 if isinstance(node, (list, tuple)):
58 return [walk_ctx(x, references) for x in node]
59 if isinstance(node, dict):
61 for k, v in node.items():
62 d[k] = walk_ctx(v, references)
64 if isinstance(node, HandledException):
69 for cls in [Day, Process, ProcessStep, Condition, Todo]:
70 models[cls.__name__] = cls
71 library: dict[str, dict[str | int, object]] = {}
72 references = CtxReferences({})
73 library_growing = [True]
74 while library_growing[0]:
75 library_growing[0] = False
76 for k, v in ctx.items():
77 ctx[k] = walk_ctx(v, references)
78 for cls_name, ids in references.d.items():
79 if cls_name not in library:
80 library[cls_name] = {}
82 cls = models[cls_name]
83 assert hasattr(cls, 'can_create_by_id')
84 if cls.can_create_by_id:
85 assert hasattr(cls, 'by_id_or_create')
86 d = cls.by_id_or_create(conn, id_).as_dict
88 assert hasattr(cls, 'by_id')
89 d = cls.by_id(conn, id_).as_dict
91 library[cls_name][id_] = d
92 references.d[cls_name] = []
93 ctx['_library'] = library
94 return json_dumps(ctx)
97 ctx: dict[str, object],
99 conn: DatabaseConnection
101 """Render ctx according to self._render_mode.."""
102 tmpl_name = f'{tmpl_name}.{self._render_mode}'
103 if 'html' == self._render_mode:
104 template = self._jinja.get_template(tmpl_name)
105 return template.render(ctx)
106 return self.__class__.ctx_to_json(ctx, conn)
110 """Wrapper for validating and retrieving dict-like HTTP inputs."""
112 def __init__(self, dict_: dict[str, list[str]],
113 strictness: bool = True) -> None:
115 self.strict = strictness # return None on absence of key, or fail?
117 def get_str(self, key: str, default: str = '',
118 ignore_strict: bool = False) -> str:
119 """Retrieve single/first string value of key, or default."""
120 if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
121 if self.strict and not ignore_strict:
122 raise NotFoundException(f'no value found for key {key}')
124 return self.inputs[key][0]
126 def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
127 """Retrieve dict of (first) strings at key starting with prefix."""
129 for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
130 ret[key] = self.inputs[key][0]
133 def get_int(self, key: str) -> int:
134 """Retrieve single/first value of key as int, error if empty."""
135 val = self.get_int_or_none(key)
137 raise BadFormatException(f'unexpected empty value for: {key}')
140 def get_int_or_none(self, key: str) -> int | None:
141 """Retrieve single/first value of key as int, return None if empty."""
142 val = self.get_str(key, ignore_strict=True)
147 except ValueError as e:
148 msg = f'cannot int form field value for key {key}: {val}'
149 raise BadFormatException(msg) from e
151 def get_float(self, key: str) -> float:
152 """Retrieve float value of key from self.postvars."""
153 val = self.get_str(key)
156 except ValueError as e:
157 msg = f'cannot float form field value for key {key}: {val}'
158 raise BadFormatException(msg) from e
160 def get_float_or_none(self, key: str) -> float | None:
161 """Retrieve float value of key from self.postvars, None if empty."""
162 val = self.get_str(key)
167 except ValueError as e:
168 msg = f'cannot float form field value for key {key}: {val}'
169 raise BadFormatException(msg) from e
171 def get_all_str(self, key: str) -> list[str]:
172 """Retrieve list of string values at key."""
173 if key not in self.inputs.keys():
175 return self.inputs[key]
177 def get_all_int(self, key: str) -> list[int]:
178 """Retrieve list of int values at key."""
179 all_str = self.get_all_str(key)
181 return [int(s) for s in all_str if len(s) > 0]
182 except ValueError as e:
183 msg = f'cannot int a form field value for key {key} in: {all_str}'
184 raise BadFormatException(msg) from e
186 def get_all_floats_or_nones(self, key: str) -> list[float | None]:
187 """Retrieve list of float value at key, None if empty strings."""
188 ret: list[float | None] = []
189 for val in self.get_all_str(key):
195 except ValueError as e:
196 msg = f'cannot float form field value for key {key}: {val}'
197 raise BadFormatException(msg) from e
201 class TaskHandler(BaseHTTPRequestHandler):
202 """Handles single HTTP request."""
203 # pylint: disable=too-many-public-methods
205 conn: DatabaseConnection
207 _form_data: InputsParser
208 _params: InputsParser
215 """Send ctx as proper HTTP response."""
216 body = self.server.render(ctx, tmpl_name, self.conn)
217 self.send_response(code)
218 for header_tuple in self.server.headers:
219 self.send_header(*header_tuple)
221 self.wfile.write(bytes(body, 'utf-8'))
224 def _request_wrapper(http_method: str, not_found_msg: str
225 ) -> Callable[..., Callable[[TaskHandler], None]]:
226 """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
228 Among other things, conditionally cleans all caches, but only on POST
229 requests, as only those are expected to change the states of objects
230 that may be cached, and certainly only those are expected to write any
231 changes to the database. We want to call them as early though as
232 possible here, either exactly after the specific request handler
233 returns successfully, or right after any exception is triggered –
234 otherwise, race conditions become plausible.
236 Note that otherwise any POST attempt, even a failed one, may end in
237 problematic inconsistencies:
239 - if the POST handler experiences an Exception, changes to objects
240 won't get written to the DB, but the changed objects may remain in
241 the cache and affect other objects despite their possibly illegal
244 - even if an object was just saved to the DB, we cannot be sure its
245 current state is completely identical to what we'd get if loading it
246 fresh from the DB (e.g. currently Process.n_owners is only updated
247 when loaded anew via .from_table_row, nor is its state written to
248 the DB by .save; a questionable design choice, but proof that we
249 have no guarantee that objects' .save stores all their states we'd
250 prefer at their most up-to-date.
253 def clear_caches() -> None:
254 for cls in (Day, Todo, Condition, Process, ProcessStep):
255 assert hasattr(cls, 'empty_cache')
258 def decorator(f: Callable[..., str | None]
259 ) -> Callable[[TaskHandler], None]:
260 def wrapper(self: TaskHandler) -> None:
261 # pylint: disable=protected-access
262 # (because pylint here fails to detect the use of wrapper as a
263 # method to self with respective access privileges)
265 self.conn = DatabaseConnection(self.server.db)
266 parsed_url = urlparse(self.path)
267 self._site = path_split(parsed_url.path)[1]
268 params = parse_qs(parsed_url.query, strict_parsing=True)
269 self._params = InputsParser(params, False)
270 handler_name = f'do_{http_method}_{self._site}'
271 if hasattr(self, handler_name):
272 handler = getattr(self, handler_name)
273 redir_target = f(self, handler)
274 if 'POST' == http_method:
277 self.send_response(302)
278 self.send_header('Location', redir_target)
281 msg = f'{not_found_msg}: {self._site}'
282 raise NotFoundException(msg)
283 except HandledException as error:
284 if 'POST' == http_method:
287 self._send_page(ctx, 'msg', error.http_code)
293 @_request_wrapper('GET', 'Unknown page')
294 def do_GET(self, handler: Callable[[], str | dict[str, object]]
296 """Render page with result of handler, or redirect if result is str."""
297 tmpl_name = f'{self._site}'
298 ctx_or_redir_target = handler()
299 if isinstance(ctx_or_redir_target, str):
300 return ctx_or_redir_target
301 self._send_page(ctx_or_redir_target, tmpl_name)
304 @_request_wrapper('POST', 'Unknown POST target')
305 def do_POST(self, handler: Callable[[], str]) -> str:
306 """Handle POST with handler, prepare redirection to result."""
307 length = int(self.headers['content-length'])
308 postvars = parse_qs(self.rfile.read(length).decode(),
309 keep_blank_values=True, strict_parsing=True)
310 self._form_data = InputsParser(postvars)
311 redir_target = handler()
318 def _get_item(target_class: Any
319 ) -> Callable[..., Callable[[TaskHandler],
321 def decorator(f: Callable[..., dict[str, object]]
322 ) -> Callable[[TaskHandler], dict[str, object]]:
323 def wrapper(self: TaskHandler) -> dict[str, object]:
324 # pylint: disable=protected-access
325 # (because pylint here fails to detect the use of wrapper as a
326 # method to self with respective access privileges)
327 id_ = self._params.get_int_or_none('id')
328 if target_class.can_create_by_id:
329 item = target_class.by_id_or_create(self.conn, id_)
331 item = target_class.by_id(self.conn, id_)
336 def do_GET_(self) -> str:
337 """Return redirect target on GET /."""
340 def _do_GET_calendar(self) -> dict[str, object]:
341 """Show Days from ?start= to ?end=.
343 Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
344 same, the only difference being the HTML template they are rendered to,
345 which .do_GET selects from their method name.
347 start, end = self._params.get_str('start'), self._params.get_str('end')
348 end = end if end else date_in_n_days(366)
349 days, start, end = Day.by_date_range_with_limits(self.conn,
351 days = Day.with_filled_gaps(days, start, end)
352 today = date_in_n_days(0)
353 return {'start': start, 'end': end, 'days': days, 'today': today}
355 def do_GET_calendar(self) -> dict[str, object]:
356 """Show Days from ?start= to ?end= – normal view."""
357 return self._do_GET_calendar()
359 def do_GET_calendar_txt(self) -> dict[str, object]:
360 """Show Days from ?start= to ?end= – minimalist view."""
361 return self._do_GET_calendar()
363 def do_GET_day(self) -> dict[str, object]:
364 """Show single Day of ?date=."""
365 date = self._params.get_str('date', date_in_n_days(0))
366 day = Day.by_id_or_create(self.conn, date)
367 make_type = self._params.get_str('make_type')
368 conditions_present = []
371 for todo in day.todos:
372 for condition in todo.conditions + todo.blockers:
373 if condition not in conditions_present:
374 conditions_present += [condition]
375 enablers_for[condition.id_] = [p for p in
376 Process.all(self.conn)
377 if condition in p.enables]
378 disablers_for[condition.id_] = [p for p in
379 Process.all(self.conn)
380 if condition in p.disables]
381 seen_todos: set[int] = set()
382 top_nodes = [t.get_step_tree(seen_todos)
383 for t in day.todos if not t.parents]
385 'top_nodes': top_nodes,
386 'make_type': make_type,
387 'enablers_for': enablers_for,
388 'disablers_for': disablers_for,
389 'conditions_present': conditions_present,
390 'processes': Process.all(self.conn)}
393 def do_GET_todo(self, todo: Todo) -> dict[str, object]:
394 """Show single Todo of ?id=."""
396 def walk_process_steps(id_: int,
397 process_step_nodes: list[ProcessStepsNode],
398 steps_nodes: list[TodoStepsNode]) -> None:
399 for process_step_node in process_step_nodes:
401 node = TodoStepsNode(id_, None, process_step_node.process, [])
402 steps_nodes += [node]
403 walk_process_steps(id_, list(process_step_node.steps.values()),
406 def walk_todo_steps(id_: int, todos: list[Todo],
407 steps_nodes: list[TodoStepsNode]) -> None:
410 for match in [item for item in steps_nodes
412 and item.process == todo.process]:
415 for child in match.children:
416 child.fillable = True
417 walk_todo_steps(id_, todo.children, match.children)
420 node = TodoStepsNode(id_, todo, None, [])
421 steps_nodes += [node]
422 walk_todo_steps(id_, todo.children, node.children)
424 def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
427 for node in steps_nodes:
429 assert isinstance(node.process, Process)
430 assert isinstance(node.process.id_, int)
431 ids.add(node.process.id_)
432 ids = ids | collect_adoptables_keys(node.children)
435 todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
436 process_tree = todo.process.get_steps(self.conn, None)
437 steps_todo_to_process: list[TodoStepsNode] = []
438 walk_process_steps(0, list(process_tree.values()),
439 steps_todo_to_process)
440 for steps_node in steps_todo_to_process:
441 steps_node.fillable = True
442 walk_todo_steps(len(steps_todo_to_process), todo_steps,
443 steps_todo_to_process)
444 adoptables: dict[int, list[Todo]] = {}
445 any_adoptables = [Todo.by_id(self.conn, t.id_)
446 for t in Todo.by_date(self.conn, todo.date)
449 for id_ in collect_adoptables_keys(steps_todo_to_process):
450 adoptables[id_] = [t for t in any_adoptables
451 if t.process.id_ == id_]
452 return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
453 'adoption_candidates_for': adoptables,
454 'process_candidates': Process.all(self.conn),
455 'todo_candidates': any_adoptables,
456 'condition_candidates': Condition.all(self.conn)}
458 def do_GET_todos(self) -> dict[str, object]:
459 """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
460 sort_by = self._params.get_str('sort_by')
461 start = self._params.get_str('start')
462 end = self._params.get_str('end')
463 process_id = self._params.get_int_or_none('process_id')
464 comment_pattern = self._params.get_str('comment_pattern')
466 ret = Todo.by_date_range_with_limits(self.conn, (start, end))
467 todos_by_date_range, start, end = ret
468 todos = [t for t in todos_by_date_range
469 if comment_pattern in t.comment
470 and ((not process_id) or t.process.id_ == process_id)]
471 sort_by = Todo.sort_by(todos, sort_by)
472 return {'start': start, 'end': end, 'process_id': process_id,
473 'comment_pattern': comment_pattern, 'todos': todos,
474 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
476 def do_GET_conditions(self) -> dict[str, object]:
477 """Show all Conditions."""
478 pattern = self._params.get_str('pattern')
479 sort_by = self._params.get_str('sort_by')
480 conditions = Condition.matching(self.conn, pattern)
481 sort_by = Condition.sort_by(conditions, sort_by)
482 return {'conditions': conditions,
486 @_get_item(Condition)
487 def do_GET_condition(self, c: Condition) -> dict[str, object]:
488 """Show Condition of ?id=."""
489 ps = Process.all(self.conn)
490 return {'condition': c, 'is_new': c.id_ is None,
491 'enabled_processes': [p for p in ps if c in p.conditions],
492 'disabled_processes': [p for p in ps if c in p.blockers],
493 'enabling_processes': [p for p in ps if c in p.enables],
494 'disabling_processes': [p for p in ps if c in p.disables]}
496 @_get_item(Condition)
497 def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
498 """Show title history of Condition of ?id=."""
499 return {'condition': c}
501 @_get_item(Condition)
502 def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
503 """Show description historys of Condition of ?id=."""
504 return {'condition': c}
507 def do_GET_process(self, process: Process) -> dict[str, object]:
508 """Show Process of ?id=."""
509 owner_ids = self._params.get_all_int('step_to')
510 owned_ids = self._params.get_all_int('has_step')
511 title_64 = self._params.get_str('title_b64')
514 title = b64decode(title_64.encode()).decode()
515 except binascii_Exception as exc:
516 msg = 'invalid base64 for ?title_b64='
517 raise BadFormatException(msg) from exc
518 process.title.set(title)
519 preset_top_step = None
520 owners = process.used_as_step_by(self.conn)
521 for step_id in owner_ids:
522 owners += [Process.by_id(self.conn, step_id)]
523 for process_id in owned_ids:
524 Process.by_id(self.conn, process_id) # to ensure ID exists
525 preset_top_step = process_id
526 return {'process': process, 'is_new': process.id_ is None,
527 'preset_top_step': preset_top_step,
528 'steps': process.get_steps(self.conn), 'owners': owners,
529 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
530 'process_candidates': Process.all(self.conn),
531 'condition_candidates': Condition.all(self.conn)}
534 def do_GET_process_titles(self, p: Process) -> dict[str, object]:
535 """Show title history of Process of ?id=."""
536 return {'process': p}
539 def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
540 """Show description historys of Process of ?id=."""
541 return {'process': p}
544 def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
545 """Show default effort history of Process of ?id=."""
546 return {'process': p}
548 def do_GET_processes(self) -> dict[str, object]:
549 """Show all Processes."""
550 pattern = self._params.get_str('pattern')
551 sort_by = self._params.get_str('sort_by')
552 processes = Process.matching(self.conn, pattern)
553 sort_by = Process.sort_by(processes, sort_by)
554 return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
559 def _delete_or_post(target_class: Any, redir_target: str = '/'
560 ) -> Callable[..., Callable[[TaskHandler], str]]:
561 def decorator(f: Callable[..., str]
562 ) -> Callable[[TaskHandler], str]:
563 def wrapper(self: TaskHandler) -> str:
564 # pylint: disable=protected-access
565 # (because pylint here fails to detect the use of wrapper as a
566 # method to self with respective access privileges)
567 id_ = self._params.get_int_or_none('id')
568 for _ in self._form_data.get_all_str('delete'):
570 msg = 'trying to delete non-saved ' +\
571 f'{target_class.__name__}'
572 raise NotFoundException(msg)
573 item = target_class.by_id(self.conn, id_)
574 item.remove(self.conn)
576 if target_class.can_create_by_id:
577 item = target_class.by_id_or_create(self.conn, id_)
579 item = target_class.by_id(self.conn, id_)
584 def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
585 """Update history timestamps for VersionedAttribute."""
586 id_ = self._params.get_int_or_none('id')
587 item = cls.by_id(self.conn, id_)
588 attr = getattr(item, attr_name)
589 for k, v in self._form_data.get_first_strings_starting('at:').items():
592 attr.reset_timestamp(old, f'{v}.0')
594 return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
596 def do_POST_day(self) -> str:
597 """Update or insert Day of date and Todos mapped to it."""
598 # pylint: disable=too-many-locals
600 date = self._params.get_str('date')
601 day_comment = self._form_data.get_str('day_comment')
602 make_type = self._form_data.get_str('make_type')
603 except NotFoundException as e:
604 raise BadFormatException from e
605 old_todos = self._form_data.get_all_int('todo_id')
606 new_todos = self._form_data.get_all_int('new_todo')
607 comments = self._form_data.get_all_str('comment')
608 efforts = self._form_data.get_all_floats_or_nones('effort')
609 done_todos = self._form_data.get_all_int('done')
610 for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
611 raise BadFormatException('"done" field refers to unknown Todo')
612 is_done = [t_id in done_todos for t_id in old_todos]
613 if not (len(old_todos) == len(is_done) == len(comments)
615 msg = 'not equal number each of number of todo_id, comments, ' +\
617 raise BadFormatException(msg)
618 day = Day.by_id_or_create(self.conn, date)
619 day.comment = day_comment
621 for process_id in sorted(new_todos):
622 if 'empty' == make_type:
623 process = Process.by_id(self.conn, process_id)
624 todo = Todo(None, process, False, date)
627 Todo.create_with_children(self.conn, process_id, date)
628 for i, todo_id in enumerate(old_todos):
629 todo = Todo.by_id(self.conn, todo_id)
630 todo.is_done = is_done[i]
631 todo.comment = comments[i]
632 todo.effort = efforts[i]
634 return f'/day?date={date}&make_type={make_type}'
636 @_delete_or_post(Todo, '/')
637 def do_POST_todo(self, todo: Todo) -> str:
638 """Update Todo and its children."""
639 # pylint: disable=too-many-locals
640 # pylint: disable=too-many-branches
641 # pylint: disable=too-many-statements
642 adopted_child_ids = self._form_data.get_all_int('adopt')
643 processes_to_make_full = self._form_data.get_all_int('make_full')
644 processes_to_make_empty = self._form_data.get_all_int('make_empty')
645 fill_fors = self._form_data.get_first_strings_starting('fill_for_')
646 with_effort_post = True
648 effort = self._form_data.get_float_or_none('effort')
649 except NotFoundException:
650 with_effort_post = False
651 conditions = self._form_data.get_all_int('conditions')
652 disables = self._form_data.get_all_int('disables')
653 blockers = self._form_data.get_all_int('blockers')
654 enables = self._form_data.get_all_int('enables')
655 is_done = len(self._form_data.get_all_str('done')) > 0
656 calendarize = len(self._form_data.get_all_str('calendarize')) > 0
657 comment = self._form_data.get_str('comment', ignore_strict=True)
658 for v in fill_fors.values():
660 for prefix in ['make_empty_', 'make_full_']:
661 if v.startswith(prefix):
663 target_id = int(v[len(prefix):])
664 except ValueError as e:
665 msg = 'bad fill_for target: {v}'
666 raise BadFormatException(msg) from e
668 if v.startswith('make_empty_'):
669 processes_to_make_empty += [target_id]
670 elif v.startswith('make_full_'):
671 processes_to_make_full += [target_id]
673 adopted_child_ids += [int(v)]
675 for child in todo.children:
676 assert isinstance(child.id_, int)
677 if child.id_ not in adopted_child_ids:
678 to_remove += [child.id_]
679 for id_ in to_remove:
680 child = Todo.by_id(self.conn, id_)
681 todo.remove_child(child)
682 for child_id in adopted_child_ids:
683 if child_id in [c.id_ for c in todo.children]:
685 child = Todo.by_id(self.conn, child_id)
686 todo.add_child(child)
687 for process_id in processes_to_make_empty:
688 process = Process.by_id(self.conn, process_id)
689 made = Todo(None, process, False, todo.date)
692 for process_id in processes_to_make_full:
693 made = Todo.create_with_children(self.conn, process_id, todo.date)
697 todo.set_conditions(self.conn, conditions)
698 todo.set_blockers(self.conn, blockers)
699 todo.set_enables(self.conn, enables)
700 todo.set_disables(self.conn, disables)
701 todo.is_done = is_done
702 todo.calendarize = calendarize
703 todo.comment = comment
705 return f'/todo?id={todo.id_}'
707 def do_POST_process_descriptions(self) -> str:
708 """Update history timestamps for Process.description."""
709 return self._change_versioned_timestamps(Process, 'description')
711 def do_POST_process_efforts(self) -> str:
712 """Update history timestamps for Process.effort."""
713 return self._change_versioned_timestamps(Process, 'effort')
715 def do_POST_process_titles(self) -> str:
716 """Update history timestamps for Process.title."""
717 return self._change_versioned_timestamps(Process, 'title')
719 @_delete_or_post(Process, '/processes')
720 def do_POST_process(self, process: Process) -> str:
721 """Update or insert Process of ?id= and fields defined in postvars."""
722 # pylint: disable=too-many-locals
723 # pylint: disable=too-many-statements
725 title = self._form_data.get_str('title')
726 description = self._form_data.get_str('description')
727 effort = self._form_data.get_float('effort')
728 except NotFoundException as e:
729 raise BadFormatException from e
730 conditions = self._form_data.get_all_int('conditions')
731 blockers = self._form_data.get_all_int('blockers')
732 enables = self._form_data.get_all_int('enables')
733 disables = self._form_data.get_all_int('disables')
734 calendarize = self._form_data.get_all_str('calendarize') != []
735 suppresses = self._form_data.get_all_int('suppresses')
736 step_of = self._form_data.get_all_str('step_of')
737 keep_steps = self._form_data.get_all_int('keep_step')
738 step_ids = self._form_data.get_all_int('steps')
739 new_top_steps = self._form_data.get_all_str('new_top_step')
740 step_process_id_to = {}
741 step_parent_id_to = {}
743 for step_id in step_ids:
744 name = f'new_step_to_{step_id}'
745 new_steps_to[step_id] = self._form_data.get_all_int(name)
746 for step_id in keep_steps:
747 name = f'step_{step_id}_process_id'
748 step_process_id_to[step_id] = self._form_data.get_int(name)
749 name = f'step_{step_id}_parent_id'
750 step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
751 process.title.set(title)
752 process.description.set(description)
753 process.effort.set(effort)
754 process.set_conditions(self.conn, conditions)
755 process.set_blockers(self.conn, blockers)
756 process.set_enables(self.conn, enables)
757 process.set_disables(self.conn, disables)
758 process.calendarize = calendarize
759 process.save(self.conn)
760 assert isinstance(process.id_, int)
761 new_step_title = None
762 steps: list[ProcessStep] = []
763 for step_id in keep_steps:
764 if step_id not in step_ids:
765 raise BadFormatException('trying to keep unknown step')
766 step = ProcessStep(step_id, process.id_,
767 step_process_id_to[step_id],
768 step_parent_id_to[step_id])
770 for step_id in step_ids:
771 new = [ProcessStep(None, process.id_, step_process_id, step_id)
772 for step_process_id in new_steps_to[step_id]]
774 for step_identifier in new_top_steps:
776 step_process_id = int(step_identifier)
777 step = ProcessStep(None, process.id_, step_process_id, None)
780 new_step_title = step_identifier
781 process.set_steps(self.conn, steps)
782 process.set_step_suppressions(self.conn, suppresses)
784 new_owner_title = None
785 for owner_identifier in step_of:
787 owners_to_set += [int(owner_identifier)]
789 new_owner_title = owner_identifier
790 process.set_owners(self.conn, owners_to_set)
791 params = f'id={process.id_}'
793 title_b64_encoded = b64encode(new_step_title.encode()).decode()
794 params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
795 elif new_owner_title:
796 title_b64_encoded = b64encode(new_owner_title.encode()).decode()
797 params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
798 process.save(self.conn)
799 return f'/process?{params}'
801 def do_POST_condition_descriptions(self) -> str:
802 """Update history timestamps for Condition.description."""
803 return self._change_versioned_timestamps(Condition, 'description')
805 def do_POST_condition_titles(self) -> str:
806 """Update history timestamps for Condition.title."""
807 return self._change_versioned_timestamps(Condition, 'title')
809 @_delete_or_post(Condition, '/conditions')
810 def do_POST_condition(self, condition: Condition) -> str:
811 """Update/insert Condition of ?id= and fields defined in postvars."""
813 is_active = self._form_data.get_str('is_active') == 'True'
814 title = self._form_data.get_str('title')
815 description = self._form_data.get_str('description')
816 except NotFoundException as e:
817 raise BadFormatException(e) from e
818 condition.is_active = is_active
819 condition.title.set(title)
820 condition.description.set(description)
821 condition.save(self.conn)
822 return f'/condition?id={condition.id_}'