1 """Web server stuff."""
2 from __future__ import annotations
3 from typing import Any, Callable
4 from base64 import b64encode, b64decode
5 from binascii import Error as binascii_Exception
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
16 from plomtask.db import DatabaseConnection, DatabaseFile, BaseModel
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo, TodoOrProcStepNode
20 from plomtask.misc import DictableNode
22 TEMPLATES_DIR = 'templates'
25 class TaskServer(HTTPServer):
26 """Variant of HTTPServer that knows .jinja as Jinja Environment."""
28 def __init__(self, db_file: DatabaseFile,
29 *args: Any, **kwargs: Any) -> None:
30 super().__init__(*args, **kwargs)
32 self.render_mode = 'html'
33 self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
37 """Wrapper for validating and retrieving dict-like HTTP inputs."""
39 def __init__(self, dict_: dict[str, list[str]]) -> None:
42 def get_all_str(self, key: str) -> list[str]:
43 """Retrieve list of string values at key (empty if no key)."""
44 if key not in self.inputs.keys():
46 return self.inputs[key]
48 def get_all_int(self, key: str) -> list[int]:
49 """Retrieve list of int values at key."""
50 all_str = self.get_all_str(key)
52 return [int(s) for s in all_str if len(s) > 0]
53 except ValueError as e:
54 msg = f'cannot int a form field value for key {key} in: {all_str}'
55 raise BadFormatException(msg) from e
57 def get_str(self, key: str, default: str | None = None) -> str | None:
58 """Retrieve single/first string value of key, or default."""
59 vals = self.get_all_str(key)
64 def get_str_or_fail(self, key: str, default: str | None = None) -> str:
65 """Retrieve first string value of key, if none: fail or default."""
66 vals = self.get_all_str(key)
68 if default is not None:
70 raise BadFormatException(f'no value found for key: {key}')
73 def get_int_or_none(self, key: str) -> int | None:
74 """Retrieve single/first value of key as int, return None if empty."""
75 val = self.get_str_or_fail(key, '')
80 except (ValueError, TypeError) as e:
81 msg = f'cannot int form field value for key {key}: {val}'
82 raise BadFormatException(msg) from e
84 def get_bool_or_none(self, key: str) -> bool | None:
85 """Return value to key if truish; if no value to key, None."""
86 val = self.get_str(key)
89 return val in {'True', 'true', '1', 'on'}
91 def get_firsts_of_key_prefixed(self, prefix: str) -> dict[str, str]:
92 """Retrieve dict of (first) strings at key starting with prefix."""
94 for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
95 ret[key] = self.inputs[key][0]
98 def get_float_or_fail(self, key: str) -> float:
99 """Retrieve float value of key from self.postvars, fail if none."""
100 val = self.get_str_or_fail(key)
103 except ValueError as e:
104 msg = f'cannot float form field value for key {key}: {val}'
105 raise BadFormatException(msg) from e
107 def get_all_floats_or_nones(self, key: str) -> list[float | None]:
108 """Retrieve list of float value at key, None if empty strings."""
109 ret: list[float | None] = []
110 for val in self.get_all_str(key):
116 except ValueError as e:
117 msg = f'cannot float form field value for key {key}: {val}'
118 raise BadFormatException(msg) from e
122 class TaskHandler(BaseHTTPRequestHandler):
123 """Handles single HTTP request."""
124 # pylint: disable=too-many-public-methods
126 _conn: DatabaseConnection
129 _params: InputsParser
132 self, ctx: dict[str, Any], tmpl_name: str, code: int = 200
134 """HTTP-send ctx as HTML or JSON, as defined by .server.render_mode.
136 The differentiation by .server.render_mode serves to allow easily
137 comparable JSON responses for automatic testing.
140 headers: list[tuple[str, str]] = []
141 if 'html' == self.server.render_mode:
142 tmpl = self.server.jinja.get_template(f'{tmpl_name}.html')
143 body = tmpl.render(ctx)
145 body = self._ctx_to_json(ctx)
146 headers += [('Content-Type', 'application/json')]
147 self.send_response(code)
148 for header_tuple in headers:
149 self.send_header(*header_tuple)
151 self.wfile.write(bytes(body, 'utf-8'))
153 def _ctx_to_json(self, ctx: dict[str, object]) -> str:
154 """Render ctx into JSON string.
156 Flattens any objects that json.dumps might not want to serialize, and
157 turns occurrences of BaseModel objects into listings of their .id_, to
158 be resolved to a full dict inside a top-level '_library' dictionary,
159 to avoid endless and circular nesting.
162 def flatten(node: object) -> object:
164 def update_library_with(
165 item: BaseModel[int] | BaseModel[str]) -> None:
166 cls_name = item.__class__.__name__
167 if cls_name not in library:
168 library[cls_name] = {}
169 if item.id_ not in library[cls_name]:
170 d, refs = item.as_dict_and_refs
171 id_key = '?' if item.id_ is None else item.id_
172 library[cls_name][id_key] = d
174 update_library_with(ref)
176 if isinstance(node, BaseModel):
177 update_library_with(node)
179 if isinstance(node, DictableNode):
180 d, refs = node.as_dict_and_refs
182 update_library_with(ref)
184 if isinstance(node, (list, tuple)):
185 return [flatten(item) for item in node]
186 if isinstance(node, dict):
188 for k, v in node.items():
191 if isinstance(node, HandledException):
195 library: dict[str, dict[str | int, object]] = {}
196 for k, v in ctx.items():
198 ctx['_library'] = library
199 return json_dumps(ctx)
202 def _request_wrapper(http_method: str, not_found_msg: str
203 ) -> Callable[..., Callable[[TaskHandler], None]]:
204 """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
206 Among other things, conditionally cleans all caches, but only on POST
207 requests, as only those are expected to change the states of objects
208 that may be cached, and certainly only those are expected to write any
209 changes to the database. We want to call them as early though as
210 possible here, either exactly after the specific request handler
211 returns successfully, or right after any exception is triggered –
212 otherwise, race conditions become plausible.
214 Note that otherwise any POST attempt, even a failed one, may end in
215 problematic inconsistencies:
217 - if the POST handler experiences an Exception, changes to objects
218 won't get written to the DB, but the changed objects may remain in
219 the cache and affect other objects despite their possibly illegal
222 - even if an object was just saved to the DB, we cannot be sure its
223 current state is completely identical to what we'd get if loading it
224 fresh from the DB (e.g. currently Process.n_owners is only updated
225 when loaded anew via .from_table_row, nor is its state written to
226 the DB by .save; a questionable design choice, but proof that we
227 have no guarantee that objects' .save stores all their states we'd
228 prefer at their most up-to-date.
231 def clear_caches() -> None:
232 for cls in (Day, Todo, Condition, Process, ProcessStep):
233 assert hasattr(cls, 'empty_cache')
236 def decorator(f: Callable[..., str | None]
237 ) -> Callable[[TaskHandler], None]:
238 def wrapper(self: TaskHandler) -> None:
239 # pylint: disable=protected-access
240 # (because pylint here fails to detect the use of wrapper as a
241 # method to self with respective access privileges)
243 self._conn = DatabaseConnection(self.server.db)
244 parsed_url = urlparse(self.path)
245 self._site = path_split(parsed_url.path)[1]
246 params = parse_qs(parsed_url.query,
247 keep_blank_values=True,
249 self._params = InputsParser(params)
250 handler_name = f'do_{http_method}_{self._site}'
251 if hasattr(self, handler_name):
252 handler = getattr(self, handler_name)
253 redir_target = f(self, handler)
254 if 'POST' == http_method:
257 self.send_response(302)
258 self.send_header('Location', redir_target)
261 msg = f'{not_found_msg}: {self._site}'
262 raise NotFoundException(msg)
263 except HandledException as error:
264 if 'POST' == http_method:
267 self._send_page(ctx, 'msg', error.http_code)
273 @_request_wrapper('GET', 'Unknown page')
274 def do_GET(self, handler: Callable[[], str | dict[str, object]]
276 """Render page with result of handler, or redirect if result is str."""
277 tmpl_name = f'{self._site}'
278 ctx_or_redir_target = handler()
279 if isinstance(ctx_or_redir_target, str):
280 return ctx_or_redir_target
281 self._send_page(ctx_or_redir_target, tmpl_name)
284 @_request_wrapper('POST', 'Unknown POST target')
285 def do_POST(self, handler: Callable[[], str]) -> str:
286 """Handle POST with handler, prepare redirection to result."""
287 length = int(self.headers['content-length'])
288 postvars = parse_qs(self.rfile.read(length).decode(),
289 keep_blank_values=True)
290 self._form = InputsParser(postvars)
291 redir_target = handler()
298 def _get_item(target_class: Any
299 ) -> Callable[..., Callable[[TaskHandler],
301 def decorator(f: Callable[..., dict[str, object]]
302 ) -> Callable[[TaskHandler], dict[str, object]]:
303 def wrapper(self: TaskHandler) -> dict[str, object]:
304 # pylint: disable=protected-access
305 # (because pylint here fails to detect the use of wrapper as a
306 # method to self with respective access privileges)
307 id_ = self._params.get_int_or_none('id')
308 if target_class.can_create_by_id:
309 item = target_class.by_id_or_create(self._conn, id_)
311 item = target_class.by_id(self._conn, id_)
316 def do_GET_(self) -> str:
317 """Return redirect target on GET /."""
320 def _do_GET_calendar(self) -> dict[str, object]:
321 """Show Days from ?start= to ?end=.
323 Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
324 same, the only difference being the HTML template they are rendered to,
325 which .do_GET selects from their method name.
327 start = self._params.get_str_or_fail('start', '')
328 end = self._params.get_str_or_fail('end', '')
329 end = end if end != '' else date_in_n_days(366)
331 days, start, end = Day.by_date_range_with_limits(self._conn,
333 days = Day.with_filled_gaps(days, start, end)
334 today = date_in_n_days(0)
335 return {'start': start, 'end': end, 'days': days, 'today': today}
337 def do_GET_calendar(self) -> dict[str, object]:
338 """Show Days from ?start= to ?end= – normal view."""
339 return self._do_GET_calendar()
341 def do_GET_calendar_txt(self) -> dict[str, object]:
342 """Show Days from ?start= to ?end= – minimalist view."""
343 return self._do_GET_calendar()
345 def do_GET_day(self) -> dict[str, object]:
346 """Show single Day of ?date=."""
347 date = self._params.get_str_or_fail('date', date_in_n_days(0))
348 make_type = self._params.get_str_or_fail('make_type', 'full')
350 day = Day.by_id_or_create(self._conn, date)
351 conditions_present = []
354 for todo in day.todos:
355 for condition in todo.conditions + todo.blockers:
356 if condition not in conditions_present:
357 conditions_present += [condition]
358 enablers_for[condition.id_] = [p for p in
359 Process.all(self._conn)
360 if condition in p.enables]
361 disablers_for[condition.id_] = [p for p in
362 Process.all(self._conn)
363 if condition in p.disables]
364 seen_todos: set[int] = set()
365 top_nodes = [t.get_step_tree(seen_todos)
366 for t in day.todos if not t.parents]
368 'top_nodes': top_nodes,
369 'make_type': make_type,
370 'enablers_for': enablers_for,
371 'disablers_for': disablers_for,
372 'conditions_present': conditions_present,
373 'processes': Process.all(self._conn)}
376 def do_GET_todo(self, todo: Todo) -> dict[str, object]:
377 """Show single Todo of ?id=."""
379 def walk_process_steps(node_id: int,
380 process_step_nodes: list[ProcessStepsNode],
381 steps_nodes: list[TodoOrProcStepNode]) -> int:
382 for process_step_node in process_step_nodes:
384 proc = Process.by_id(self._conn,
385 process_step_node.step.step_process_id)
386 node = TodoOrProcStepNode(node_id, None, proc, [])
387 steps_nodes += [node]
388 node_id = walk_process_steps(
389 node_id, process_step_node.steps, node.children)
392 def walk_todo_steps(node_id: int, todos: list[Todo],
393 steps_nodes: list[TodoOrProcStepNode]) -> int:
396 for match in [item for item in steps_nodes
398 and item.process == todo.process]:
401 for child in match.children:
402 child.fillable = True
403 node_id = walk_todo_steps(
404 node_id, todo.children, match.children)
407 node = TodoOrProcStepNode(node_id, todo, None, [])
408 steps_nodes += [node]
409 node_id = walk_todo_steps(
410 node_id, todo.children, node.children)
413 def collect_adoptables_keys(
414 steps_nodes: list[TodoOrProcStepNode]) -> set[int]:
416 for node in steps_nodes:
418 assert isinstance(node.process, Process)
419 assert isinstance(node.process.id_, int)
420 ids.add(node.process.id_)
421 ids = ids | collect_adoptables_keys(node.children)
424 todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
425 process_tree = todo.process.get_steps(self._conn, None)
426 steps_todo_to_process: list[TodoOrProcStepNode] = []
427 last_node_id = walk_process_steps(0, process_tree,
428 steps_todo_to_process)
429 for steps_node in steps_todo_to_process:
430 steps_node.fillable = True
431 walk_todo_steps(last_node_id, todo_steps, steps_todo_to_process)
432 adoptables: dict[int, list[Todo]] = {}
433 any_adoptables = [Todo.by_id(self._conn, t.id_)
434 for t in Todo.by_date(self._conn, todo.date)
437 for id_ in collect_adoptables_keys(steps_todo_to_process):
438 adoptables[id_] = [t for t in any_adoptables
439 if t.process.id_ == id_]
440 return {'todo': todo,
441 'steps_todo_to_process': steps_todo_to_process,
442 'adoption_candidates_for': adoptables,
443 'process_candidates': sorted(Process.all(self._conn)),
444 'todo_candidates': any_adoptables,
445 'condition_candidates': Condition.all(self._conn)}
447 def do_GET_todos(self) -> dict[str, object]:
448 """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
449 sort_by = self._params.get_str_or_fail('sort_by', 'title')
450 start = self._params.get_str_or_fail('start', '')
451 end = self._params.get_str_or_fail('end', '')
452 process_id = self._params.get_int_or_none('process_id')
453 comment_pattern = self._params.get_str_or_fail('comment_pattern', '')
456 ret = Todo.by_date_range_with_limits(self._conn, (start, end))
457 todos_by_date_range, start, end = ret
458 todos = [t for t in todos_by_date_range
459 if comment_pattern in t.comment
460 and ((not process_id) or t.process.id_ == process_id)]
461 sort_by = Todo.sort_by(todos, sort_by)
462 return {'start': start, 'end': end, 'process_id': process_id,
463 'comment_pattern': comment_pattern, 'todos': todos,
464 'all_processes': Process.all(self._conn), 'sort_by': sort_by}
466 def do_GET_conditions(self) -> dict[str, object]:
467 """Show all Conditions."""
468 pattern = self._params.get_str_or_fail('pattern', '')
469 sort_by = self._params.get_str_or_fail('sort_by', 'title')
471 conditions = Condition.matching(self._conn, pattern)
472 sort_by = Condition.sort_by(conditions, sort_by)
473 return {'conditions': conditions,
477 @_get_item(Condition)
478 def do_GET_condition(self, c: Condition) -> dict[str, object]:
479 """Show Condition of ?id=."""
480 ps = Process.all(self._conn)
481 return {'condition': c, 'is_new': c.id_ is None,
482 'enabled_processes': [p for p in ps if c in p.conditions],
483 'disabled_processes': [p for p in ps if c in p.blockers],
484 'enabling_processes': [p for p in ps if c in p.enables],
485 'disabling_processes': [p for p in ps if c in p.disables]}
487 @_get_item(Condition)
488 def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
489 """Show title history of Condition of ?id=."""
490 return {'condition': c}
492 @_get_item(Condition)
493 def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
494 """Show description historys of Condition of ?id=."""
495 return {'condition': c}
498 def do_GET_process(self, process: Process) -> dict[str, object]:
499 """Show Process of ?id=."""
500 owner_ids = self._params.get_all_int('step_to')
501 owned_ids = self._params.get_all_int('has_step')
502 title_64 = self._params.get_str('title_b64')
506 title = b64decode(title_64.encode()).decode()
507 except binascii_Exception as exc:
508 msg = 'invalid base64 for ?title_b64='
509 raise BadFormatException(msg) from exc
510 process.title.set(title)
511 preset_top_step = None
512 owners = process.used_as_step_by(self._conn)
513 for step_id in owner_ids:
514 owners += [Process.by_id(self._conn, step_id)]
515 for process_id in owned_ids:
516 Process.by_id(self._conn, process_id) # to ensure ID exists
517 preset_top_step = process_id
518 return {'process': process, 'is_new': process.id_ is None,
519 'preset_top_step': preset_top_step,
520 'steps': process.get_steps(self._conn),
522 'n_todos': len(Todo.by_process_id(self._conn, process.id_)),
523 'process_candidates': Process.all(self._conn),
524 'condition_candidates': Condition.all(self._conn)}
527 def do_GET_process_titles(self, p: Process) -> dict[str, object]:
528 """Show title history of Process of ?id=."""
529 return {'process': p}
532 def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
533 """Show description historys of Process of ?id=."""
534 return {'process': p}
537 def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
538 """Show default effort history of Process of ?id=."""
539 return {'process': p}
541 def do_GET_processes(self) -> dict[str, object]:
542 """Show all Processes."""
543 pattern = self._params.get_str_or_fail('pattern', '')
544 sort_by = self._params.get_str_or_fail('sort_by', 'title')
546 processes = Process.matching(self._conn, pattern)
547 sort_by = Process.sort_by(processes, sort_by)
548 return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
553 def _delete_or_post(target_class: Any, redir_target: str = '/'
554 ) -> Callable[..., Callable[[TaskHandler], str]]:
555 def decorator(f: Callable[..., str]
556 ) -> Callable[[TaskHandler], str]:
557 def wrapper(self: TaskHandler) -> str:
558 # pylint: disable=protected-access
559 # (because pylint here fails to detect the use of wrapper as a
560 # method to self with respective access privileges)
561 id_ = self._params.get_int_or_none('id')
562 for _ in self._form.get_all_str('delete'):
564 msg = 'trying to delete non-saved ' +\
565 f'{target_class.__name__}'
566 raise NotFoundException(msg)
567 item = target_class.by_id(self._conn, id_)
568 item.remove(self._conn)
570 if target_class.can_create_by_id:
571 item = target_class.by_id_or_create(self._conn, id_)
573 item = target_class.by_id(self._conn, id_)
578 def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
579 """Update history timestamps for VersionedAttribute."""
580 id_ = self._params.get_int_or_none('id')
581 item = cls.by_id(self._conn, id_)
582 attr = getattr(item, attr_name)
583 for k, v in self._form.get_firsts_of_key_prefixed('at:').items():
586 attr.reset_timestamp(old, f'{v}.0')
587 attr.save(self._conn)
588 return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
590 def do_POST_day(self) -> str:
591 """Update or insert Day of date and Todos mapped to it."""
592 # pylint: disable=too-many-locals
593 date = self._params.get_str_or_fail('date')
594 day_comment = self._form.get_str_or_fail('day_comment')
595 make_type = self._form.get_str_or_fail('make_type')
596 old_todos = self._form.get_all_int('todo_id')
597 new_todos_by_process = self._form.get_all_int('new_todo')
598 comments = self._form.get_all_str('comment')
599 efforts = self._form.get_all_floats_or_nones('effort')
600 done_todos = self._form.get_all_int('done')
602 for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
603 raise BadFormatException('"done" field refers to unknown Todo')
604 is_done = [t_id in done_todos for t_id in old_todos]
605 if not (len(old_todos) == len(is_done) == len(comments)
607 msg = 'not equal number each of number of todo_id, comments, ' +\
609 raise BadFormatException(msg)
610 day = Day.by_id_or_create(self._conn, date)
611 day.comment = day_comment
614 for process_id in sorted(new_todos_by_process):
615 process = Process.by_id(self._conn, process_id)
616 todo = Todo(None, process, False, date)
617 todo.save(self._conn)
619 if 'full' == make_type:
620 for todo in new_todos:
621 todo.ensure_children(self._conn)
622 for i, todo_id in enumerate(old_todos):
623 todo = Todo.by_id(self._conn, todo_id)
624 todo.is_done = is_done[i]
625 todo.comment = comments[i]
626 todo.effort = efforts[i]
627 todo.save(self._conn)
628 return f'/day?date={date}&make_type={make_type}'
630 @_delete_or_post(Todo, '/')
631 def do_POST_todo(self, todo: Todo) -> str:
632 """Update Todo and its children."""
633 # pylint: disable=too-many-locals
634 # pylint: disable=too-many-branches
635 adoptees = self._form.get_all_int('adopt')
636 to_make = {'full': self._form.get_all_int('make_full'),
637 'empty': self._form.get_all_int('make_empty')}
638 step_fillers = self._form.get_all_str('step_filler')
639 to_update: dict[str, Any] = {
640 'comment': self._form.get_str_or_fail('comment', '')}
641 for k in ('is_done', 'calendarize'):
642 v = self._form.get_bool_or_none(k)
645 cond_rels = [self._form.get_all_int(name) for name in
646 ['conditions', 'blockers', 'enables', 'disables']]
647 effort_or_not = self._form.get_str('effort')
648 if effort_or_not is not None:
649 if effort_or_not == '':
650 to_update['effort'] = None
653 to_update['effort'] = float(effort_or_not)
654 except ValueError as e:
655 msg = 'cannot float form field value for key: effort'
656 raise BadFormatException(msg) from e
657 for filler in [f for f in step_fillers if f != 'ignore']:
660 for prefix in [p for p in ['make_empty_', 'make_full_']
661 if filler.startswith(p)]:
662 to_int = filler[len(prefix):]
664 target_id = int(to_int)
665 except ValueError as e:
666 msg = 'bad fill_for target: {filler}'
667 raise BadFormatException(msg) from e
668 if filler.startswith('make_empty_'):
669 to_make['empty'] += [target_id]
670 elif filler.startswith('make_full_'):
671 to_make['full'] += [target_id]
673 adoptees += [target_id]
675 todo.set_condition_relations(self._conn, *cond_rels)
676 for child in [c for c in todo.children if c.id_ not in adoptees]:
677 todo.remove_child(child)
678 for child_id in [id_ for id_ in adoptees
679 if id_ not in [c.id_ for c in todo.children]]:
680 todo.add_child(Todo.by_id(self._conn, child_id))
681 todo.update_attrs(**to_update)
682 for approach, proc_ids in to_make.items():
683 for process_id in proc_ids:
684 process = Process.by_id(self._conn, process_id)
685 made = Todo(None, process, False, todo.date)
686 made.save(self._conn)
687 if 'full' == approach:
688 made.ensure_children(self._conn)
690 # todo.save() may destroy Todo if .effort < 0, so retrieve .id_ early
691 url = f'/todo?id={todo.id_}'
692 todo.save(self._conn)
695 def do_POST_process_descriptions(self) -> str:
696 """Update history timestamps for Process.description."""
697 return self._change_versioned_timestamps(Process, 'description')
699 def do_POST_process_efforts(self) -> str:
700 """Update history timestamps for Process.effort."""
701 return self._change_versioned_timestamps(Process, 'effort')
703 def do_POST_process_titles(self) -> str:
704 """Update history timestamps for Process.title."""
705 return self._change_versioned_timestamps(Process, 'title')
707 @_delete_or_post(Process, '/processes')
708 def do_POST_process(self, process: Process) -> str:
709 """Update or insert Process of ?id= and fields defined in postvars."""
710 # pylint: disable=too-many-locals
712 def id_or_title(l_id_or_title: list[str]) -> tuple[str, list[int]]:
713 l_ids, title = [], ''
714 for id_or_title in l_id_or_title:
716 l_ids += [int(id_or_title)]
721 versioned = {'title': self._form.get_str_or_fail('title'),
722 'description': self._form.get_str_or_fail('description'),
723 'effort': self._form.get_float_or_fail('effort')}
724 cond_rels = [self._form.get_all_int(s) for s
725 in ['conditions', 'blockers', 'enables', 'disables']]
726 calendarize = self._form.get_bool_or_none('calendarize')
727 step_of = self._form.get_all_str('step_of')
728 suppressions = self._form.get_all_int('suppresses')
729 kept_steps = self._form.get_all_int('kept_steps')
730 new_top_step_procs = self._form.get_all_str('new_top_step')
732 for step_id in kept_steps:
733 name = f'new_step_to_{step_id}'
734 new_steps_to[step_id] = self._form.get_all_int(name)
735 new_owner_title, owners_to_set = id_or_title(step_of)
736 new_step_title, new_top_step_proc_ids = id_or_title(new_top_step_procs)
738 for k, v in versioned.items():
739 getattr(process, k).set(v)
740 if calendarize is not None:
741 process.calendarize = calendarize
742 process.save(self._conn)
743 assert isinstance(process.id_, int)
744 # set relations to Conditions and ProcessSteps / other Processes
745 process.set_condition_relations(self._conn, *cond_rels)
747 for step_id in kept_steps:
748 owned_steps += [ProcessStep.by_id(self._conn, step_id)]
749 owned_steps += [ # new sub-steps
750 ProcessStep(None, process.id_, step_process_id, step_id)
751 for step_process_id in new_steps_to[step_id]]
752 for step_process_id in new_top_step_proc_ids:
753 owned_steps += [ProcessStep(None, process.id_, step_process_id,
755 process.set_step_relations(self._conn, owners_to_set, suppressions,
757 # encode titles for potential newly-to-create Processes up or down
758 params = f'id={process.id_}'
760 title_b64_encoded = b64encode(new_step_title.encode()).decode()
761 params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
762 elif new_owner_title:
763 title_b64_encoded = b64encode(new_owner_title.encode()).decode()
764 params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
765 process.save(self._conn)
766 return f'/process?{params}'
768 def do_POST_condition_descriptions(self) -> str:
769 """Update history timestamps for Condition.description."""
770 return self._change_versioned_timestamps(Condition, 'description')
772 def do_POST_condition_titles(self) -> str:
773 """Update history timestamps for Condition.title."""
774 return self._change_versioned_timestamps(Condition, 'title')
776 @_delete_or_post(Condition, '/conditions')
777 def do_POST_condition(self, condition: Condition) -> str:
778 """Update/insert Condition of ?id= and fields defined in postvars."""
779 title = self._form.get_str_or_fail('title')
780 description = self._form.get_str_or_fail('description')
781 is_active = self._form.get_bool_or_none('is_active')
783 if is_active is not None:
784 condition.is_active = is_active
785 condition.title.set(title)
786 condition.description.set(description)
787 condition.save(self._conn)
788 return f'/condition?id={condition.id_}'