1 """Web server stuff."""
2 from __future__ import annotations
3 from typing import Any, Callable
4 from base64 import b64encode, b64decode
5 from binascii import Error as binascii_Exception
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
16 from plomtask.db import DatabaseConnection, DatabaseFile, BaseModel
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo, TodoOrProcStepNode
20 from plomtask.misc import DictableNode
22 TEMPLATES_DIR = 'templates'
25 class TaskServer(HTTPServer):
26 """Variant of HTTPServer that knows .jinja as Jinja Environment."""
28 def __init__(self, db_file: DatabaseFile,
29 *args: Any, **kwargs: Any) -> None:
30 super().__init__(*args, **kwargs)
32 self.render_mode = 'html'
33 self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
37 """Wrapper for validating and retrieving dict-like HTTP inputs."""
39 def __init__(self, dict_: dict[str, list[str]]) -> None:
42 def get_all_str(self, key: str) -> list[str]:
43 """Retrieve list of string values at key (empty if no key)."""
44 if key not in self.inputs.keys():
46 return self.inputs[key]
48 def get_all_int(self, key: str) -> list[int]:
49 """Retrieve list of int values at key."""
50 all_str = self.get_all_str(key)
52 return [int(s) for s in all_str if len(s) > 0]
53 except ValueError as e:
54 msg = f'cannot int a form field value for key {key} in: {all_str}'
55 raise BadFormatException(msg) from e
57 def get_str(self, key: str, default: str | None = None) -> str | None:
58 """Retrieve single/first string value of key, or default."""
59 vals = self.get_all_str(key)
64 def get_str_or_fail(self, key: str, default: str | None = None) -> str:
65 """Retrieve first string value of key, if none: fail or default."""
66 vals = self.get_all_str(key)
68 if default is not None:
70 raise BadFormatException(f'no value found for key: {key}')
73 def get_int_or_none(self, key: str) -> int | None:
74 """Retrieve single/first value of key as int, return None if empty."""
75 val = self.get_str_or_fail(key, '')
80 except (ValueError, TypeError) as e:
81 msg = f'cannot int form field value for key {key}: {val}'
82 raise BadFormatException(msg) from e
84 def get_bool_or_none(self, key: str) -> bool | None:
85 """Return value to key if truish; if no value to key, None."""
86 val = self.get_str(key)
89 return val in {'True', 'true', '1', 'on'}
91 def get_all_of_key_prefixed(self, key_prefix: str) -> dict[str, list[str]]:
92 """Retrieve dict of strings at keys starting with key_prefix."""
94 for key in [k for k in self.inputs.keys() if k.startswith(key_prefix)]:
95 ret[key[len(key_prefix):]] = self.inputs[key]
98 def get_float_or_fail(self, key: str) -> float:
99 """Retrieve float value of key from self.postvars, fail if none."""
100 val = self.get_str_or_fail(key)
103 except ValueError as e:
104 msg = f'cannot float form field value for key {key}: {val}'
105 raise BadFormatException(msg) from e
107 def get_all_floats_or_nones(self, key: str) -> list[float | None]:
108 """Retrieve list of float value at key, None if empty strings."""
109 ret: list[float | None] = []
110 for val in self.get_all_str(key):
116 except ValueError as e:
117 msg = f'cannot float form field value for key {key}: {val}'
118 raise BadFormatException(msg) from e
122 class TaskHandler(BaseHTTPRequestHandler):
123 """Handles single HTTP request."""
124 # pylint: disable=too-many-public-methods
126 _conn: DatabaseConnection
129 _params: InputsParser
132 self, ctx: dict[str, Any], tmpl_name: str, code: int = 200
134 """HTTP-send ctx as HTML or JSON, as defined by .server.render_mode.
136 The differentiation by .server.render_mode serves to allow easily
137 comparable JSON responses for automatic testing.
140 headers: list[tuple[str, str]] = []
141 if 'html' == self.server.render_mode:
142 tmpl = self.server.jinja.get_template(f'{tmpl_name}.html')
143 body = tmpl.render(ctx)
145 body = self._ctx_to_json(ctx)
146 headers += [('Content-Type', 'application/json')]
147 self.send_response(code)
148 for header_tuple in headers:
149 self.send_header(*header_tuple)
151 self.wfile.write(bytes(body, 'utf-8'))
153 def _ctx_to_json(self, ctx: dict[str, object]) -> str:
154 """Render ctx into JSON string.
156 Flattens any objects that json.dumps might not want to serialize, and
157 turns occurrences of BaseModel objects into listings of their .id_, to
158 be resolved to a full dict inside a top-level '_library' dictionary,
159 to avoid endless and circular nesting.
162 def flatten(node: object) -> object:
164 def update_library_with(
165 item: BaseModel[int] | BaseModel[str]) -> None:
166 cls_name = item.__class__.__name__
167 if cls_name not in library:
168 library[cls_name] = {}
169 if item.id_ not in library[cls_name]:
170 d, refs = item.as_dict_and_refs
171 id_key = '?' if item.id_ is None else item.id_
172 library[cls_name][id_key] = d
174 update_library_with(ref)
176 if isinstance(node, BaseModel):
177 update_library_with(node)
179 if isinstance(node, DictableNode):
180 d, refs = node.as_dict_and_refs
182 update_library_with(ref)
184 if isinstance(node, (list, tuple)):
185 return [flatten(item) for item in node]
186 if isinstance(node, dict):
188 for k, v in node.items():
191 if isinstance(node, HandledException):
195 library: dict[str, dict[str | int, object]] = {}
196 for k, v in ctx.items():
198 ctx['_library'] = library
199 return json_dumps(ctx)
202 def _request_wrapper(http_method: str, not_found_msg: str
203 ) -> Callable[..., Callable[[TaskHandler], None]]:
204 """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
206 Among other things, conditionally cleans all caches, but only on POST
207 requests, as only those are expected to change the states of objects
208 that may be cached, and certainly only those are expected to write any
209 changes to the database. We want to call them as early though as
210 possible here, either exactly after the specific request handler
211 returns successfully, or right after any exception is triggered –
212 otherwise, race conditions become plausible.
214 Note that otherwise any POST attempt, even a failed one, may end in
215 problematic inconsistencies:
217 - if the POST handler experiences an Exception, changes to objects
218 won't get written to the DB, but the changed objects may remain in
219 the cache and affect other objects despite their possibly illegal
222 - even if an object was just saved to the DB, we cannot be sure its
223 current state is completely identical to what we'd get if loading it
224 fresh from the DB (e.g. currently Process.n_owners is only updated
225 when loaded anew via .from_table_row, nor is its state written to
226 the DB by .save; a questionable design choice, but proof that we
227 have no guarantee that objects' .save stores all their states we'd
228 prefer at their most up-to-date.
231 def clear_caches() -> None:
232 for cls in (Day, Todo, Condition, Process, ProcessStep):
233 assert hasattr(cls, 'empty_cache')
236 def decorator(f: Callable[..., str | None]
237 ) -> Callable[[TaskHandler], None]:
238 def wrapper(self: TaskHandler) -> None:
239 # pylint: disable=protected-access
240 # (because pylint here fails to detect the use of wrapper as a
241 # method to self with respective access privileges)
243 self._conn = DatabaseConnection(self.server.db)
244 parsed_url = urlparse(self.path)
245 self._site = path_split(parsed_url.path)[1]
246 params = parse_qs(parsed_url.query,
247 keep_blank_values=True,
249 self._params = InputsParser(params)
250 handler_name = f'do_{http_method}_{self._site}'
251 if hasattr(self, handler_name):
252 handler = getattr(self, handler_name)
253 redir_target = f(self, handler)
254 if 'POST' == http_method:
257 self.send_response(302)
258 self.send_header('Location', redir_target)
261 msg = f'{not_found_msg}: {self._site}'
262 raise NotFoundException(msg)
263 except HandledException as error:
264 if 'POST' == http_method:
267 self._send_page(ctx, 'msg', error.http_code)
273 @_request_wrapper('GET', 'Unknown page')
274 def do_GET(self, handler: Callable[[], str | dict[str, object]]
276 """Render page with result of handler, or redirect if result is str."""
277 tmpl_name = f'{self._site}'
278 ctx_or_redir_target = handler()
279 if isinstance(ctx_or_redir_target, str):
280 return ctx_or_redir_target
281 self._send_page(ctx_or_redir_target, tmpl_name)
284 @_request_wrapper('POST', 'Unknown POST target')
285 def do_POST(self, handler: Callable[[], str]) -> str:
286 """Handle POST with handler, prepare redirection to result."""
287 length = int(self.headers['content-length'])
288 postvars = parse_qs(self.rfile.read(length).decode(),
289 keep_blank_values=True)
290 self._form = InputsParser(postvars)
291 redir_target = handler()
298 def _get_item(target_class: Any
299 ) -> Callable[..., Callable[[TaskHandler],
301 def decorator(f: Callable[..., dict[str, object]]
302 ) -> Callable[[TaskHandler], dict[str, object]]:
303 def wrapper(self: TaskHandler) -> dict[str, object]:
304 # pylint: disable=protected-access
305 # (because pylint here fails to detect the use of wrapper as a
306 # method to self with respective access privileges)
307 id_ = self._params.get_int_or_none('id')
308 if target_class.can_create_by_id:
309 item = target_class.by_id_or_create(self._conn, id_)
311 item = target_class.by_id(self._conn, id_)
316 def do_GET_(self) -> str:
317 """Return redirect target on GET /."""
320 def _do_GET_calendar(self) -> dict[str, object]:
321 """Show Days from ?start= to ?end=.
323 Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
324 same, the only difference being the HTML template they are rendered to,
325 which .do_GET selects from their method name.
327 start = self._params.get_str_or_fail('start', '')
328 end = self._params.get_str_or_fail('end', '')
329 end = end if end != '' else date_in_n_days(366)
331 days, start, end = Day.by_date_range_with_limits(self._conn,
333 days = Day.with_filled_gaps(days, start, end)
334 today = date_in_n_days(0)
335 return {'start': start, 'end': end, 'days': days, 'today': today}
337 def do_GET_calendar(self) -> dict[str, object]:
338 """Show Days from ?start= to ?end= – normal view."""
339 return self._do_GET_calendar()
341 def do_GET_calendar_txt(self) -> dict[str, object]:
342 """Show Days from ?start= to ?end= – minimalist view."""
343 return self._do_GET_calendar()
345 def do_GET_day(self) -> dict[str, object]:
346 """Show single Day of ?date=."""
347 date = self._params.get_str_or_fail('date', date_in_n_days(0))
348 make_type = self._params.get_str_or_fail('make_type', 'full')
350 day = Day.by_id_or_create(self._conn, date)
351 conditions_present = []
354 for todo in day.todos:
355 for condition in todo.conditions + todo.blockers:
356 if condition not in conditions_present:
357 conditions_present += [condition]
358 enablers_for[condition.id_] = [p for p in
359 Process.all(self._conn)
360 if condition in p.enables]
361 disablers_for[condition.id_] = [p for p in
362 Process.all(self._conn)
363 if condition in p.disables]
364 seen_todos: set[int] = set()
365 top_nodes = [t.get_step_tree(seen_todos)
366 for t in day.todos if not t.parents]
368 'top_nodes': top_nodes,
369 'make_type': make_type,
370 'enablers_for': enablers_for,
371 'disablers_for': disablers_for,
372 'conditions_present': conditions_present,
373 'processes': Process.all(self._conn)}
376 def do_GET_todo(self, todo: Todo) -> dict[str, object]:
377 """Show single Todo of ?id=."""
379 def walk_process_steps(node_id: int,
380 process_step_nodes: list[ProcessStepsNode],
381 steps_nodes: list[TodoOrProcStepNode]) -> int:
382 for process_step_node in process_step_nodes:
384 proc = Process.by_id(self._conn,
385 process_step_node.step.step_process_id)
386 node = TodoOrProcStepNode(node_id, None, proc, [])
387 steps_nodes += [node]
388 node_id = walk_process_steps(
389 node_id, process_step_node.steps, node.children)
392 def walk_todo_steps(node_id: int, todos: list[Todo],
393 steps_nodes: list[TodoOrProcStepNode]) -> int:
396 for match in [item for item in steps_nodes
398 and item.process == todo.process]:
401 for child in match.children:
402 child.fillable = True
403 node_id = walk_todo_steps(
404 node_id, todo.children, match.children)
407 node = TodoOrProcStepNode(node_id, todo, None, [])
408 steps_nodes += [node]
409 node_id = walk_todo_steps(
410 node_id, todo.children, node.children)
413 def collect_adoptables_keys(
414 steps_nodes: list[TodoOrProcStepNode]) -> set[int]:
416 for node in steps_nodes:
418 assert isinstance(node.process, Process)
419 assert isinstance(node.process.id_, int)
420 ids.add(node.process.id_)
421 ids = ids | collect_adoptables_keys(node.children)
424 todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
425 process_tree = todo.process.get_steps(self._conn, None)
426 steps_todo_to_process: list[TodoOrProcStepNode] = []
427 last_node_id = walk_process_steps(0, process_tree,
428 steps_todo_to_process)
429 for steps_node in steps_todo_to_process:
430 steps_node.fillable = True
431 walk_todo_steps(last_node_id, todo_steps, steps_todo_to_process)
432 adoptables: dict[int, list[Todo]] = {}
433 any_adoptables = [Todo.by_id(self._conn, t.id_)
434 for t in Todo.by_date(self._conn, todo.date)
437 for id_ in collect_adoptables_keys(steps_todo_to_process):
438 adoptables[id_] = [t for t in any_adoptables
439 if t.process.id_ == id_]
440 return {'todo': todo,
441 'steps_todo_to_process': steps_todo_to_process,
442 'adoption_candidates_for': adoptables,
443 'process_candidates': sorted(Process.all(self._conn)),
444 'todo_candidates': any_adoptables,
445 'condition_candidates': Condition.all(self._conn)}
447 def do_GET_todos(self) -> dict[str, object]:
448 """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
449 sort_by = self._params.get_str_or_fail('sort_by', 'title')
450 start = self._params.get_str_or_fail('start', '')
451 end = self._params.get_str_or_fail('end', '')
452 process_id = self._params.get_int_or_none('process_id')
453 comment_pattern = self._params.get_str_or_fail('comment_pattern', '')
455 ret = Todo.by_date_range_with_limits(self._conn, (start, end))
456 todos_by_date_range, start, end = ret
457 todos = [t for t in todos_by_date_range
458 if comment_pattern in t.comment
459 and ((not process_id) or t.process.id_ == process_id)]
460 sort_by = Todo.sort_by(todos, sort_by)
461 return {'start': start, 'end': end, 'process_id': process_id,
462 'comment_pattern': comment_pattern, 'todos': todos,
463 'all_processes': Process.all(self._conn), 'sort_by': sort_by}
465 def do_GET_conditions(self) -> dict[str, object]:
466 """Show all Conditions."""
467 pattern = self._params.get_str_or_fail('pattern', '')
468 sort_by = self._params.get_str_or_fail('sort_by', 'title')
470 conditions = Condition.matching(self._conn, pattern)
471 sort_by = Condition.sort_by(conditions, sort_by)
472 return {'conditions': conditions,
476 @_get_item(Condition)
477 def do_GET_condition(self, c: Condition) -> dict[str, object]:
478 """Show Condition of ?id=."""
479 ps = Process.all(self._conn)
480 return {'condition': c, 'is_new': c.id_ is None,
481 'enabled_processes': [p for p in ps if c in p.conditions],
482 'disabled_processes': [p for p in ps if c in p.blockers],
483 'enabling_processes': [p for p in ps if c in p.enables],
484 'disabling_processes': [p for p in ps if c in p.disables]}
486 @_get_item(Condition)
487 def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
488 """Show title history of Condition of ?id=."""
489 return {'condition': c}
491 @_get_item(Condition)
492 def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
493 """Show description historys of Condition of ?id=."""
494 return {'condition': c}
497 def do_GET_process(self, process: Process) -> dict[str, object]:
498 """Show Process of ?id=."""
499 owner_ids = self._params.get_all_int('step_to')
500 owned_ids = self._params.get_all_int('has_step')
501 title_64 = self._params.get_str('title_b64')
505 title_new = b64decode(title_64.encode()).decode()
506 except binascii_Exception as exc:
507 msg = 'invalid base64 for ?title_b64='
508 raise BadFormatException(msg) from exc
511 process.title.set(title_new)
512 preset_top_step = None
513 owners = process.used_as_step_by(self._conn)
514 for step_id in owner_ids:
515 owners += [Process.by_id(self._conn, step_id)]
516 for process_id in owned_ids:
517 Process.by_id(self._conn, process_id) # to ensure ID exists
518 preset_top_step = process_id
519 return {'process': process, 'is_new': process.id_ is None,
520 'preset_top_step': preset_top_step,
521 'steps': process.get_steps(self._conn),
523 'n_todos': len(Todo.by_process_id(self._conn, process.id_)),
524 'process_candidates': Process.all(self._conn),
525 'condition_candidates': Condition.all(self._conn)}
528 def do_GET_process_titles(self, p: Process) -> dict[str, object]:
529 """Show title history of Process of ?id=."""
530 return {'process': p}
533 def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
534 """Show description historys of Process of ?id=."""
535 return {'process': p}
538 def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
539 """Show default effort history of Process of ?id=."""
540 return {'process': p}
542 def do_GET_processes(self) -> dict[str, object]:
543 """Show all Processes."""
544 pattern = self._params.get_str_or_fail('pattern', '')
545 sort_by = self._params.get_str_or_fail('sort_by', 'title')
547 processes = Process.matching(self._conn, pattern)
548 sort_by = Process.sort_by(processes, sort_by)
549 return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
554 def _delete_or_post(target_class: Any, redir_target: str = '/'
555 ) -> Callable[..., Callable[[TaskHandler], str]]:
556 def decorator(f: Callable[..., str]
557 ) -> Callable[[TaskHandler], str]:
558 def wrapper(self: TaskHandler) -> str:
559 # pylint: disable=protected-access
560 # (because pylint here fails to detect the use of wrapper as a
561 # method to self with respective access privileges)
562 id_ = self._params.get_int_or_none('id')
563 for _ in self._form.get_all_str('delete'):
565 msg = 'trying to delete non-saved ' +\
566 f'{target_class.__name__}'
567 raise NotFoundException(msg)
568 item = target_class.by_id(self._conn, id_)
569 item.remove(self._conn)
571 if target_class.can_create_by_id:
572 item = target_class.by_id_or_create(self._conn, id_)
574 item = target_class.by_id(self._conn, id_)
579 def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
580 """Update history timestamps for VersionedAttribute."""
581 id_ = self._params.get_int_or_none('id')
582 item = cls.by_id(self._conn, id_)
583 attr = getattr(item, attr_name)
584 for k, vals in self._form.get_all_of_key_prefixed('at:').items():
585 if k[19:] != vals[0]:
586 attr.reset_timestamp(k, f'{vals[0]}.0')
587 attr.save(self._conn)
588 return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
590 def do_POST_day(self) -> str:
591 """Update or insert Day of date and Todos mapped to it."""
592 # pylint: disable=too-many-locals
593 date = self._params.get_str_or_fail('date')
594 day_comment = self._form.get_str_or_fail('day_comment')
595 make_type = self._form.get_str_or_fail('make_type')
596 old_todos = self._form.get_all_int('todo_id')
597 new_todos_by_process = self._form.get_all_int('new_todo')
598 comments = self._form.get_all_str('comment')
599 efforts = self._form.get_all_floats_or_nones('effort')
600 done_todos = self._form.get_all_int('done')
601 is_done = [t_id in done_todos for t_id in old_todos]
602 if not (len(old_todos) == len(is_done) == len(comments)
604 msg = 'not equal number each of number of todo_id, comments, ' +\
606 raise BadFormatException(msg)
607 for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
608 raise BadFormatException('"done" field refers to unknown Todo')
610 day = Day.by_id_or_create(self._conn, date)
611 day.comment = day_comment
614 for process_id in sorted(new_todos_by_process):
615 process = Process.by_id(self._conn, process_id)
616 todo = Todo(None, process, False, date)
617 todo.save(self._conn)
619 if 'full' == make_type:
620 for todo in new_todos:
621 todo.ensure_children(self._conn)
622 for i, todo_id in enumerate(old_todos):
623 todo = Todo.by_id(self._conn, todo_id)
624 todo.is_done = is_done[i]
625 todo.comment = comments[i]
626 todo.effort = efforts[i]
627 todo.save(self._conn)
628 return f'/day?date={date}&make_type={make_type}'
630 @_delete_or_post(Todo, '/')
631 def do_POST_todo(self, todo: Todo) -> str:
632 """Update Todo and its children."""
633 # pylint: disable=too-many-locals
634 # pylint: disable=too-many-branches
635 # pylint: disable=too-many-statements
636 assert todo.id_ is not None
637 adoptees = [(id_, todo.id_) for id_ in self._form.get_all_int('adopt')]
638 to_make = {'full': [(id_, todo.id_)
639 for id_ in self._form.get_all_int('make_full')],
640 'empty': [(id_, todo.id_)
641 for id_ in self._form.get_all_int('make_empty')]}
642 step_fillers_to = self._form.get_all_of_key_prefixed('step_filler_to_')
643 to_update: dict[str, Any] = {
644 'comment': self._form.get_str_or_fail('comment', '')}
645 for k in ('is_done', 'calendarize'):
646 v = self._form.get_bool_or_none(k)
649 cond_rels = [self._form.get_all_int(name) for name in
650 ['conditions', 'blockers', 'enables', 'disables']]
651 effort_or_not = self._form.get_str('effort')
652 if effort_or_not is not None:
653 if effort_or_not == '':
654 to_update['effort'] = None
657 to_update['effort'] = float(effort_or_not)
658 except ValueError as e:
659 msg = 'cannot float form field value for key: effort'
660 raise BadFormatException(msg) from e
661 for k, fillers in step_fillers_to.items():
664 except ValueError as e:
665 msg = f'bad step_filler_to_ key: {k}'
666 raise BadFormatException(msg) from e
667 for filler in [f for f in fillers if f != 'ignore']:
670 to_int = filler[5:] if filler.startswith(prefix) else filler
672 target_id = int(to_int)
673 except ValueError as e:
674 msg = f'bad fill_for target: {filler}'
675 raise BadFormatException(msg) from e
676 if filler.startswith(prefix):
677 to_make['empty'] += [(target_id, parent_id)]
679 adoptees += [(target_id, parent_id)]
681 todo.set_condition_relations(self._conn, *cond_rels)
682 for parent in [Todo.by_id(self._conn, a[1])
683 for a in adoptees] + [todo]:
684 for child in parent.children:
685 if child not in [t[0] for t in adoptees
686 if t[0] == child.id_ and t[1] == parent.id_]:
687 parent.remove_child(child)
688 parent.save(self._conn)
689 for child_id, parent_id in adoptees:
690 parent = Todo.by_id(self._conn, parent_id)
691 if child_id not in [c.id_ for c in parent.children]:
692 parent.add_child(Todo.by_id(self._conn, child_id))
693 parent.save(self._conn)
694 todo.update_attrs(**to_update)
695 for approach, make_data in to_make.items():
696 for process_id, parent_id in make_data:
697 parent = Todo.by_id(self._conn, parent_id)
698 process = Process.by_id(self._conn, process_id)
699 made = Todo(None, process, False, todo.date)
700 made.save(self._conn)
701 if 'full' == approach:
702 made.ensure_children(self._conn)
703 parent.add_child(made)
704 parent.save(self._conn)
705 # todo.save() may destroy Todo if .effort < 0, so retrieve .id_ early
706 url = f'/todo?id={todo.id_}'
707 todo.save(self._conn)
710 def do_POST_process_descriptions(self) -> str:
711 """Update history timestamps for Process.description."""
712 return self._change_versioned_timestamps(Process, 'description')
714 def do_POST_process_efforts(self) -> str:
715 """Update history timestamps for Process.effort."""
716 return self._change_versioned_timestamps(Process, 'effort')
718 def do_POST_process_titles(self) -> str:
719 """Update history timestamps for Process.title."""
720 return self._change_versioned_timestamps(Process, 'title')
722 @_delete_or_post(Process, '/processes')
723 def do_POST_process(self, process: Process) -> str:
724 """Update or insert Process of ?id= and fields defined in postvars."""
725 # pylint: disable=too-many-locals
727 def id_or_title(l_id_or_title: list[str]) -> tuple[str, list[int]]:
728 l_ids, title = [], ''
729 for id_or_title in l_id_or_title:
731 l_ids += [int(id_or_title)]
736 versioned = {'title': self._form.get_str_or_fail('title'),
737 'description': self._form.get_str_or_fail('description'),
738 'effort': self._form.get_float_or_fail('effort')}
739 cond_rels = [self._form.get_all_int(s) for s
740 in ['conditions', 'blockers', 'enables', 'disables']]
741 calendarize = self._form.get_bool_or_none('calendarize')
742 step_of = self._form.get_all_str('step_of')
743 suppressions = self._form.get_all_int('suppresses')
744 kept_steps = self._form.get_all_int('kept_steps')
745 new_top_step_procs = self._form.get_all_str('new_top_step')
747 for step_id in kept_steps:
748 name = f'new_step_to_{step_id}'
749 new_steps_to[step_id] = self._form.get_all_int(name)
750 new_owner_title, owners_to_set = id_or_title(step_of)
751 new_step_title, new_top_step_proc_ids = id_or_title(new_top_step_procs)
753 for k, v in versioned.items():
754 getattr(process, k).set(v)
755 if calendarize is not None:
756 process.calendarize = calendarize
757 process.save(self._conn)
758 assert isinstance(process.id_, int)
759 # set relations to Conditions and ProcessSteps / other Processes
760 process.set_condition_relations(self._conn, *cond_rels)
762 for step_id in kept_steps:
763 owned_steps += [ProcessStep.by_id(self._conn, step_id)]
764 owned_steps += [ # new sub-steps
765 ProcessStep(None, process.id_, step_process_id, step_id)
766 for step_process_id in new_steps_to[step_id]]
767 for step_process_id in new_top_step_proc_ids:
768 owned_steps += [ProcessStep(None, process.id_, step_process_id,
770 process.set_step_relations(self._conn, owners_to_set, suppressions,
772 # encode titles for potential newly-to-create Processes up or down
773 params = f'id={process.id_}'
775 title_b64_encoded = b64encode(new_step_title.encode()).decode()
776 params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
777 elif new_owner_title:
778 title_b64_encoded = b64encode(new_owner_title.encode()).decode()
779 params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
780 process.save(self._conn)
781 return f'/process?{params}'
783 def do_POST_condition_descriptions(self) -> str:
784 """Update history timestamps for Condition.description."""
785 return self._change_versioned_timestamps(Condition, 'description')
787 def do_POST_condition_titles(self) -> str:
788 """Update history timestamps for Condition.title."""
789 return self._change_versioned_timestamps(Condition, 'title')
791 @_delete_or_post(Condition, '/conditions')
792 def do_POST_condition(self, condition: Condition) -> str:
793 """Update/insert Condition of ?id= and fields defined in postvars."""
794 title = self._form.get_str_or_fail('title')
795 description = self._form.get_str_or_fail('description')
796 is_active = self._form.get_bool_or_none('is_active')
798 if is_active is not None:
799 condition.is_active = is_active
800 condition.title.set(title)
801 condition.description.set(description)
802 condition.save(self._conn)
803 return f'/condition?id={condition.id_}'