1 """Web server stuff."""
2 from __future__ import annotations
3 from inspect import signature
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from binascii import Error as binascii_Exception
7 from http.server import BaseHTTPRequestHandler
8 from http.server import HTTPServer
9 from urllib.parse import urlparse, parse_qs
10 from json import dumps as json_dumps
11 from os.path import split as path_split
12 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
13 from plomtask.dating import date_in_n_days
14 from plomtask.days import Day
15 from plomtask.exceptions import (HandledException, BadFormatException,
17 from plomtask.db import DatabaseConnection, DatabaseFile, BaseModel
18 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
19 from plomtask.conditions import Condition
20 from plomtask.todos import Todo, TodoOrProcStepNode
21 from plomtask.misc import DictableNode
23 TEMPLATES_DIR = 'templates'
26 class TaskServer(HTTPServer):
27 """Variant of HTTPServer that knows .jinja as Jinja Environment."""
29 def __init__(self, db_file: DatabaseFile,
30 *args: Any, **kwargs: Any) -> None:
31 super().__init__(*args, **kwargs)
33 self.render_mode = 'html'
34 self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
38 """Wrapper for validating and retrieving dict-like HTTP inputs."""
40 def __init__(self, dict_: dict[str, list[str]]) -> None:
43 def get_all_str(self, key: str) -> list[str]:
44 """Retrieve list of string values at key (empty if no key)."""
45 if key not in self.inputs.keys():
47 return self.inputs[key]
49 def get_all_int(self, key: str, fail_on_empty: bool = False) -> list[int]:
50 """Retrieve list of int values at key."""
51 all_str = self.get_all_str(key)
53 return [int(s) for s in all_str if fail_on_empty or s != '']
54 except ValueError as e:
55 msg = f'cannot int a form field value for key {key} in: {all_str}'
56 raise BadFormatException(msg) from e
58 def get_str(self, key: str, default: str | None = None) -> str | None:
59 """Retrieve single/first string value of key, or default."""
60 vals = self.get_all_str(key)
65 def get_str_or_fail(self, key: str, default: str | None = None) -> str:
66 """Retrieve first string value of key, if none: fail or default."""
67 vals = self.get_all_str(key)
69 if default is not None:
71 raise BadFormatException(f'no value found for key: {key}')
74 def get_int_or_none(self, key: str) -> int | None:
75 """Retrieve single/first value of key as int, return None if empty."""
76 val = self.get_str_or_fail(key, '')
81 except (ValueError, TypeError) as e:
82 msg = f'cannot int form field value for key {key}: {val}'
83 raise BadFormatException(msg) from e
85 def get_bool_or_none(self, key: str) -> bool | None:
86 """Return value to key if truish; if no value to key, None."""
87 val = self.get_str(key)
90 return val in {'True', 'true', '1', 'on'}
92 def get_all_of_key_prefixed(self, key_prefix: str) -> dict[str, list[str]]:
93 """Retrieve dict of strings at keys starting with key_prefix."""
95 for key in [k for k in self.inputs.keys() if k.startswith(key_prefix)]:
96 ret[key[len(key_prefix):]] = self.inputs[key]
99 def get_float_or_fail(self, key: str) -> float:
100 """Retrieve float value of key from self.postvars, fail if none."""
101 val = self.get_str_or_fail(key)
104 except ValueError as e:
105 msg = f'cannot float form field value for key {key}: {val}'
106 raise BadFormatException(msg) from e
108 def get_all_floats_or_nones(self, key: str) -> list[float | None]:
109 """Retrieve list of float value at key, None if empty strings."""
110 ret: list[float | None] = []
111 for val in self.get_all_str(key):
117 except ValueError as e:
118 msg = f'cannot float form field value for key {key}: {val}'
119 raise BadFormatException(msg) from e
123 class TaskHandler(BaseHTTPRequestHandler):
124 """Handles single HTTP request."""
125 # pylint: disable=too-many-public-methods
127 _conn: DatabaseConnection
130 _params: InputsParser
133 self, ctx: dict[str, Any], tmpl_name: str, code: int = 200
135 """HTTP-send ctx as HTML or JSON, as defined by .server.render_mode.
137 The differentiation by .server.render_mode serves to allow easily
138 comparable JSON responses for automatic testing.
141 headers: list[tuple[str, str]] = []
142 if 'html' == self.server.render_mode:
143 tmpl = self.server.jinja.get_template(f'{tmpl_name}.html')
144 body = tmpl.render(ctx)
146 body = self._ctx_to_json(ctx)
147 headers += [('Content-Type', 'application/json')]
148 self.send_response(code)
149 for header_tuple in headers:
150 self.send_header(*header_tuple)
152 self.wfile.write(bytes(body, 'utf-8'))
154 def _ctx_to_json(self, ctx: dict[str, object]) -> str:
155 """Render ctx into JSON string.
157 Flattens any objects that json.dumps might not want to serialize, and
158 turns occurrences of BaseModel objects into listings of their .id_, to
159 be resolved to a full dict inside a top-level '_library' dictionary,
160 to avoid endless and circular nesting.
163 def flatten(node: object) -> object:
165 def update_library_with(
166 item: BaseModel[int] | BaseModel[str]) -> None:
167 cls_name = item.__class__.__name__
168 if cls_name not in library:
169 library[cls_name] = {}
170 if item.id_ not in library[cls_name]:
171 d, refs = item.as_dict_and_refs
172 id_key = '?' if item.id_ is None else item.id_
173 library[cls_name][id_key] = d
175 update_library_with(ref)
177 if isinstance(node, BaseModel):
178 update_library_with(node)
180 if isinstance(node, DictableNode):
181 d, refs = node.as_dict_and_refs
183 update_library_with(ref)
185 if isinstance(node, (list, tuple)):
186 return [flatten(item) for item in node]
187 if isinstance(node, dict):
189 for k, v in node.items():
192 if isinstance(node, HandledException):
196 library: dict[str, dict[str | int, object]] = {}
197 for k, v in ctx.items():
199 ctx['_library'] = library
200 return json_dumps(ctx)
203 def _request_wrapper(http_method: str, not_found_msg: str
204 ) -> Callable[..., Callable[[TaskHandler], None]]:
205 """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
207 Among other things, conditionally cleans all caches, but only on POST
208 requests, as only those are expected to change the states of objects
209 that may be cached, and certainly only those are expected to write any
210 changes to the database. We want to call them as early though as
211 possible here, either exactly after the specific request handler
212 returns successfully, or right after any exception is triggered –
213 otherwise, race conditions become plausible.
215 Note that otherwise any POST attempt, even a failed one, may end in
216 problematic inconsistencies:
218 - if the POST handler experiences an Exception, changes to objects
219 won't get written to the DB, but the changed objects may remain in
220 the cache and affect other objects despite their possibly illegal
223 - even if an object was just saved to the DB, we cannot be sure its
224 current state is completely identical to what we'd get if loading it
225 fresh from the DB (e.g. currently Process.n_owners is only updated
226 when loaded anew via .from_table_row, nor is its state written to
227 the DB by .save; a questionable design choice, but proof that we
228 have no guarantee that objects' .save stores all their states we'd
229 prefer at their most up-to-date.
232 def clear_caches() -> None:
233 for cls in (Day, Todo, Condition, Process, ProcessStep):
234 assert hasattr(cls, 'empty_cache')
237 def decorator(f: Callable[..., str | None]
238 ) -> Callable[[TaskHandler], None]:
239 def wrapper(self: TaskHandler) -> None:
240 # pylint: disable=protected-access
241 # (because pylint here fails to detect the use of wrapper as a
242 # method to self with respective access privileges)
244 self._conn = DatabaseConnection(self.server.db)
245 parsed_url = urlparse(self.path)
246 self._site = path_split(parsed_url.path)[1]
247 params = parse_qs(parsed_url.query,
248 keep_blank_values=True,
250 self._params = InputsParser(params)
251 handler_name = f'do_{http_method}_{self._site}'
252 if hasattr(self, handler_name):
253 handler = getattr(self, handler_name)
254 redir_target = f(self, handler)
255 if 'POST' == http_method:
258 self.send_response(302)
259 self.send_header('Location', redir_target)
262 msg = f'{not_found_msg}: {self._site}'
263 raise NotFoundException(msg)
264 except HandledException as error:
265 if 'POST' == http_method:
268 self._send_page(ctx, 'msg', error.http_code)
274 @_request_wrapper('GET', 'Unknown page')
275 def do_GET(self, handler: Callable[[], str | dict[str, object]]
277 """Render page with result of handler, or redirect if result is str."""
278 tmpl_name = f'{self._site}'
279 ctx_or_redir_target = handler()
280 if isinstance(ctx_or_redir_target, str):
281 return ctx_or_redir_target
282 self._send_page(ctx_or_redir_target, tmpl_name)
285 @_request_wrapper('POST', 'Unknown POST target')
286 def do_POST(self, handler: Callable[[], str]) -> str:
287 """Handle POST with handler, prepare redirection to result."""
288 length = int(self.headers['content-length'])
289 postvars = parse_qs(self.rfile.read(length).decode(),
290 keep_blank_values=True)
291 self._form = InputsParser(postvars)
292 redir_target = handler()
299 def _get_item(target_class: Any
300 ) -> Callable[..., Callable[[TaskHandler],
302 def decorator(f: Callable[..., dict[str, object]]
303 ) -> Callable[[TaskHandler], dict[str, object]]:
304 def wrapper(self: TaskHandler) -> dict[str, object]:
305 # pylint: disable=protected-access
306 # (because pylint here fails to detect the use of wrapper as a
307 # method to self with respective access privileges)
309 for val in self._params.get_all_int('id', fail_on_empty=True):
311 if target_class.can_create_by_id:
312 item = target_class.by_id_or_create(self._conn, id_)
314 item = target_class.by_id(self._conn, id_)
315 if 'exists' in signature(f).parameters:
316 exists = id_ is not None and target_class._get_cached(id_)
317 return f(self, item, exists)
322 def do_GET_(self) -> str:
323 """Return redirect target on GET /."""
326 def _do_GET_calendar(self) -> dict[str, object]:
327 """Show Days from ?start= to ?end=.
329 Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
330 same, the only difference being the HTML template they are rendered to,
331 which .do_GET selects from their method name.
333 start = self._params.get_str_or_fail('start', '')
334 end = self._params.get_str_or_fail('end', '')
335 end = end if end != '' else date_in_n_days(366)
337 days, start, end = Day.by_date_range_with_limits(self._conn,
339 days = Day.with_filled_gaps(days, start, end)
340 today = date_in_n_days(0)
341 return {'start': start, 'end': end, 'days': days, 'today': today}
343 def do_GET_calendar(self) -> dict[str, object]:
344 """Show Days from ?start= to ?end= – normal view."""
345 return self._do_GET_calendar()
347 def do_GET_calendar_txt(self) -> dict[str, object]:
348 """Show Days from ?start= to ?end= – minimalist view."""
349 return self._do_GET_calendar()
351 def do_GET_day(self) -> dict[str, object]:
352 """Show single Day of ?date=."""
353 date = self._params.get_str('date', date_in_n_days(0))
354 make_type = self._params.get_str_or_fail('make_type', 'full')
356 day = Day.by_id_or_create(self._conn, date)
357 conditions_present = []
360 for todo in day.todos:
361 for condition in todo.conditions + todo.blockers:
362 if condition not in conditions_present:
363 conditions_present += [condition]
364 enablers_for[condition.id_] = [p for p in
365 Process.all(self._conn)
366 if condition in p.enables]
367 disablers_for[condition.id_] = [p for p in
368 Process.all(self._conn)
369 if condition in p.disables]
370 seen_todos: set[int] = set()
371 top_nodes = [t.get_step_tree(seen_todos)
372 for t in day.todos if not t.parents]
374 'top_nodes': top_nodes,
375 'make_type': make_type,
376 'enablers_for': enablers_for,
377 'disablers_for': disablers_for,
378 'conditions_present': conditions_present,
379 'processes': Process.all(self._conn)}
382 def do_GET_todo(self, todo: Todo) -> dict[str, object]:
383 """Show single Todo of ?id=."""
385 def walk_process_steps(node_id: int,
386 process_step_nodes: list[ProcessStepsNode],
387 steps_nodes: list[TodoOrProcStepNode]) -> int:
388 for process_step_node in process_step_nodes:
390 proc = Process.by_id(self._conn,
391 process_step_node.step.step_process_id)
392 node = TodoOrProcStepNode(node_id, None, proc, [])
393 steps_nodes += [node]
394 node_id = walk_process_steps(
395 node_id, process_step_node.steps, node.children)
398 def walk_todo_steps(node_id: int, todos: list[Todo],
399 steps_nodes: list[TodoOrProcStepNode]) -> int:
402 for match in [item for item in steps_nodes
404 and item.process == todo.process]:
407 for child in match.children:
408 child.fillable = True
409 node_id = walk_todo_steps(
410 node_id, todo.children, match.children)
413 node = TodoOrProcStepNode(node_id, todo, None, [])
414 steps_nodes += [node]
415 node_id = walk_todo_steps(
416 node_id, todo.children, node.children)
419 def collect_adoptables_keys(
420 steps_nodes: list[TodoOrProcStepNode]) -> set[int]:
422 for node in steps_nodes:
424 assert isinstance(node.process, Process)
425 assert isinstance(node.process.id_, int)
426 ids.add(node.process.id_)
427 ids = ids | collect_adoptables_keys(node.children)
430 todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
431 process_tree = todo.process.get_steps(self._conn, None)
432 steps_todo_to_process: list[TodoOrProcStepNode] = []
433 last_node_id = walk_process_steps(0, process_tree,
434 steps_todo_to_process)
435 for steps_node in steps_todo_to_process:
436 steps_node.fillable = True
437 walk_todo_steps(last_node_id, todo_steps, steps_todo_to_process)
438 adoptables: dict[int, list[Todo]] = {}
439 any_adoptables = [Todo.by_id(self._conn, t.id_)
440 for t in Todo.by_date(self._conn, todo.date)
443 for id_ in collect_adoptables_keys(steps_todo_to_process):
444 adoptables[id_] = [t for t in any_adoptables
445 if t.process.id_ == id_]
446 return {'todo': todo,
447 'steps_todo_to_process': steps_todo_to_process,
448 'adoption_candidates_for': adoptables,
449 'process_candidates': sorted(Process.all(self._conn)),
450 'todo_candidates': any_adoptables,
451 'condition_candidates': Condition.all(self._conn)}
453 def do_GET_todos(self) -> dict[str, object]:
454 """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
455 sort_by = self._params.get_str_or_fail('sort_by', 'title')
456 start = self._params.get_str_or_fail('start', '')
457 end = self._params.get_str_or_fail('end', '')
458 process_id = self._params.get_int_or_none('process_id')
459 comment_pattern = self._params.get_str_or_fail('comment_pattern', '')
461 ret = Todo.by_date_range_with_limits(self._conn, (start, end))
462 todos_by_date_range, start, end = ret
463 todos = [t for t in todos_by_date_range
464 if comment_pattern in t.comment
465 and ((not process_id) or t.process.id_ == process_id)]
466 sort_by = Todo.sort_by(todos, sort_by)
467 return {'start': start, 'end': end, 'process_id': process_id,
468 'comment_pattern': comment_pattern, 'todos': todos,
469 'all_processes': Process.all(self._conn), 'sort_by': sort_by}
471 def do_GET_conditions(self) -> dict[str, object]:
472 """Show all Conditions."""
473 pattern = self._params.get_str_or_fail('pattern', '')
474 sort_by = self._params.get_str_or_fail('sort_by', 'title')
476 conditions = Condition.matching(self._conn, pattern)
477 sort_by = Condition.sort_by(conditions, sort_by)
478 return {'conditions': conditions,
482 @_get_item(Condition)
483 def do_GET_condition(self,
486 ) -> dict[str, object]:
487 """Show Condition of ?id=."""
488 ps = Process.all(self._conn)
489 return {'condition': c,
490 'is_new': not exists,
491 'enabled_processes': [p for p in ps if c in p.conditions],
492 'disabled_processes': [p for p in ps if c in p.blockers],
493 'enabling_processes': [p for p in ps if c in p.enables],
494 'disabling_processes': [p for p in ps if c in p.disables]}
496 @_get_item(Condition)
497 def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
498 """Show title history of Condition of ?id=."""
499 return {'condition': c}
501 @_get_item(Condition)
502 def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
503 """Show description historys of Condition of ?id=."""
504 return {'condition': c}
507 def do_GET_process(self,
510 ) -> dict[str, object]:
511 """Show Process of ?id=."""
512 owner_ids = self._params.get_all_int('step_to')
513 owned_ids = self._params.get_all_int('has_step')
514 title_64 = self._params.get_str('title_b64')
518 title_new = b64decode(title_64.encode()).decode()
519 except binascii_Exception as exc:
520 msg = 'invalid base64 for ?title_b64='
521 raise BadFormatException(msg) from exc
524 process.title.set(title_new)
525 preset_top_step = None
526 owners = process.used_as_step_by(self._conn)
527 for step_id in owner_ids:
528 owners += [Process.by_id(self._conn, step_id)]
529 for process_id in owned_ids:
530 Process.by_id(self._conn, process_id) # to ensure ID exists
531 preset_top_step = process_id
532 return {'process': process,
533 'is_new': not exists,
534 'preset_top_step': preset_top_step,
535 'steps': process.get_steps(self._conn),
537 'n_todos': len(Todo.by_process_id(self._conn, process.id_)),
538 'process_candidates': Process.all(self._conn),
539 'condition_candidates': Condition.all(self._conn)}
542 def do_GET_process_titles(self, p: Process) -> dict[str, object]:
543 """Show title history of Process of ?id=."""
544 return {'process': p}
547 def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
548 """Show description historys of Process of ?id=."""
549 return {'process': p}
552 def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
553 """Show default effort history of Process of ?id=."""
554 return {'process': p}
556 def do_GET_processes(self) -> dict[str, object]:
557 """Show all Processes."""
558 pattern = self._params.get_str_or_fail('pattern', '')
559 sort_by = self._params.get_str_or_fail('sort_by', 'title')
561 processes = Process.matching(self._conn, pattern)
562 sort_by = Process.sort_by(processes, sort_by)
563 return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
568 def _delete_or_post(target_class: Any, redir_target: str = '/'
569 ) -> Callable[..., Callable[[TaskHandler], str]]:
570 def decorator(f: Callable[..., str]
571 ) -> Callable[[TaskHandler], str]:
572 def wrapper(self: TaskHandler) -> str:
573 # pylint: disable=protected-access
574 # (because pylint here fails to detect the use of wrapper as a
575 # method to self with respective access privileges)
576 id_ = self._params.get_int_or_none('id')
577 for _ in self._form.get_all_str('delete'):
579 msg = 'trying to delete non-saved ' +\
580 f'{target_class.__name__}'
581 raise NotFoundException(msg)
582 item = target_class.by_id(self._conn, id_)
583 item.remove(self._conn)
585 if target_class.can_create_by_id:
586 item = target_class.by_id_or_create(self._conn, id_)
588 item = target_class.by_id(self._conn, id_)
593 def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
594 """Update history timestamps for VersionedAttribute."""
595 id_ = self._params.get_int_or_none('id')
596 item = cls.by_id(self._conn, id_)
597 attr = getattr(item, attr_name)
598 for k, vals in self._form.get_all_of_key_prefixed('at:').items():
599 if k[19:] != vals[0]:
600 attr.reset_timestamp(k, f'{vals[0]}.0')
601 attr.save(self._conn)
602 return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
604 def do_POST_day(self) -> str:
605 """Update or insert Day of date and Todos mapped to it."""
606 # pylint: disable=too-many-locals
607 date = self._params.get_str_or_fail('date')
608 day_comment = self._form.get_str_or_fail('day_comment')
609 make_type = self._form.get_str_or_fail('make_type')
610 old_todos = self._form.get_all_int('todo_id')
611 new_todos_by_process = self._form.get_all_int('new_todo')
612 comments = self._form.get_all_str('comment')
613 efforts = self._form.get_all_floats_or_nones('effort')
614 done_todos = self._form.get_all_int('done')
615 is_done = [t_id in done_todos for t_id in old_todos]
616 if not (len(old_todos) == len(is_done) == len(comments)
618 msg = 'not equal number each of number of todo_id, comments, ' +\
620 raise BadFormatException(msg)
621 for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
622 raise BadFormatException('"done" field refers to unknown Todo')
624 day = Day.by_id_or_create(self._conn, date)
625 day.comment = day_comment
628 for process_id in sorted(new_todos_by_process):
629 process = Process.by_id(self._conn, process_id)
630 todo = Todo(None, process, False, date)
631 todo.save(self._conn)
633 if 'full' == make_type:
634 for todo in new_todos:
635 todo.ensure_children(self._conn)
636 for i, todo_id in enumerate(old_todos):
637 todo = Todo.by_id(self._conn, todo_id)
638 todo.is_done = is_done[i]
639 todo.comment = comments[i]
640 todo.effort = efforts[i]
641 todo.save(self._conn)
642 return f'/day?date={date}&make_type={make_type}'
644 @_delete_or_post(Todo, '/')
645 def do_POST_todo(self, todo: Todo) -> str:
646 """Update Todo and its children."""
647 # pylint: disable=too-many-locals
648 # pylint: disable=too-many-branches
649 # pylint: disable=too-many-statements
650 assert todo.id_ is not None
651 adoptees = [(id_, todo.id_) for id_ in self._form.get_all_int('adopt')]
652 to_make = {'full': [(id_, todo.id_)
653 for id_ in self._form.get_all_int('make_full')],
654 'empty': [(id_, todo.id_)
655 for id_ in self._form.get_all_int('make_empty')]}
656 step_fillers_to = self._form.get_all_of_key_prefixed('step_filler_to_')
657 to_update: dict[str, Any] = {
658 'comment': self._form.get_str_or_fail('comment', '')}
659 for k in ('is_done', 'calendarize'):
660 v = self._form.get_bool_or_none(k)
663 cond_rels = [self._form.get_all_int(name) for name in
664 ['conditions', 'blockers', 'enables', 'disables']]
665 effort_or_not = self._form.get_str('effort')
666 if effort_or_not is not None:
667 if effort_or_not == '':
668 to_update['effort'] = None
671 to_update['effort'] = float(effort_or_not)
672 except ValueError as e:
673 msg = 'cannot float form field value for key: effort'
674 raise BadFormatException(msg) from e
675 for k, fillers in step_fillers_to.items():
678 except ValueError as e:
679 msg = f'bad step_filler_to_ key: {k}'
680 raise BadFormatException(msg) from e
681 for filler in [f for f in fillers if f != 'ignore']:
684 to_int = filler[5:] if filler.startswith(prefix) else filler
686 target_id = int(to_int)
687 except ValueError as e:
688 msg = f'bad fill_for target: {filler}'
689 raise BadFormatException(msg) from e
690 if filler.startswith(prefix):
691 to_make['empty'] += [(target_id, parent_id)]
693 adoptees += [(target_id, parent_id)]
695 todo.set_condition_relations(self._conn, *cond_rels)
696 for parent in [Todo.by_id(self._conn, a[1])
697 for a in adoptees] + [todo]:
698 for child in parent.children:
699 if child not in [t[0] for t in adoptees
700 if t[0] == child.id_ and t[1] == parent.id_]:
701 parent.remove_child(child)
702 parent.save(self._conn)
703 for child_id, parent_id in adoptees:
704 parent = Todo.by_id(self._conn, parent_id)
705 if child_id not in [c.id_ for c in parent.children]:
706 parent.add_child(Todo.by_id(self._conn, child_id))
707 parent.save(self._conn)
708 todo.update_attrs(**to_update)
709 for approach, make_data in to_make.items():
710 for process_id, parent_id in make_data:
711 parent = Todo.by_id(self._conn, parent_id)
712 process = Process.by_id(self._conn, process_id)
713 made = Todo(None, process, False, todo.date)
714 made.save(self._conn)
715 if 'full' == approach:
716 made.ensure_children(self._conn)
717 parent.add_child(made)
718 parent.save(self._conn)
719 # todo.save() may destroy Todo if .effort < 0, so retrieve .id_ early
720 url = f'/todo?id={todo.id_}'
721 todo.save(self._conn)
724 def do_POST_process_descriptions(self) -> str:
725 """Update history timestamps for Process.description."""
726 return self._change_versioned_timestamps(Process, 'description')
728 def do_POST_process_efforts(self) -> str:
729 """Update history timestamps for Process.effort."""
730 return self._change_versioned_timestamps(Process, 'effort')
732 def do_POST_process_titles(self) -> str:
733 """Update history timestamps for Process.title."""
734 return self._change_versioned_timestamps(Process, 'title')
736 @_delete_or_post(Process, '/processes')
737 def do_POST_process(self, process: Process) -> str:
738 """Update or insert Process of ?id= and fields defined in postvars."""
739 # pylint: disable=too-many-locals
741 def id_or_title(l_id_or_title: list[str]) -> tuple[str, list[int]]:
742 l_ids, title = [], ''
743 for id_or_title in l_id_or_title:
745 l_ids += [int(id_or_title)]
750 versioned = {'title': self._form.get_str_or_fail('title'),
751 'description': self._form.get_str_or_fail('description'),
752 'effort': self._form.get_float_or_fail('effort')}
753 cond_rels = [self._form.get_all_int(s) for s
754 in ['conditions', 'blockers', 'enables', 'disables']]
755 calendarize = self._form.get_bool_or_none('calendarize')
756 step_of = self._form.get_all_str('step_of')
757 suppressions = self._form.get_all_int('suppresses')
758 kept_steps = self._form.get_all_int('kept_steps')
759 new_top_step_procs = self._form.get_all_str('new_top_step')
761 for step_id in kept_steps:
762 name = f'new_step_to_{step_id}'
763 new_steps_to[step_id] = self._form.get_all_int(name)
764 new_owner_title, owners_to_set = id_or_title(step_of)
765 new_step_title, new_top_step_proc_ids = id_or_title(new_top_step_procs)
767 for k, v in versioned.items():
768 getattr(process, k).set(v)
769 if calendarize is not None:
770 process.calendarize = calendarize
771 process.save(self._conn)
772 assert isinstance(process.id_, int)
773 # set relations to Conditions and ProcessSteps / other Processes
774 process.set_condition_relations(self._conn, *cond_rels)
776 for step_id in kept_steps:
777 owned_steps += [ProcessStep.by_id(self._conn, step_id)]
778 owned_steps += [ # new sub-steps
779 ProcessStep(None, process.id_, step_process_id, step_id)
780 for step_process_id in new_steps_to[step_id]]
781 for step_process_id in new_top_step_proc_ids:
782 owned_steps += [ProcessStep(None, process.id_, step_process_id,
784 process.set_step_relations(self._conn, owners_to_set, suppressions,
786 # encode titles for potential newly-to-create Processes up or down
787 params = f'id={process.id_}'
789 title_b64_encoded = b64encode(new_step_title.encode()).decode()
790 params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
791 elif new_owner_title:
792 title_b64_encoded = b64encode(new_owner_title.encode()).decode()
793 params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
794 process.save(self._conn)
795 return f'/process?{params}'
797 def do_POST_condition_descriptions(self) -> str:
798 """Update history timestamps for Condition.description."""
799 return self._change_versioned_timestamps(Condition, 'description')
801 def do_POST_condition_titles(self) -> str:
802 """Update history timestamps for Condition.title."""
803 return self._change_versioned_timestamps(Condition, 'title')
805 @_delete_or_post(Condition, '/conditions')
806 def do_POST_condition(self, condition: Condition) -> str:
807 """Update/insert Condition of ?id= and fields defined in postvars."""
808 title = self._form.get_str_or_fail('title')
809 description = self._form.get_str_or_fail('description')
810 is_active = self._form.get_bool_or_none('is_active')
812 if is_active is not None:
813 condition.is_active = is_active
814 condition.title.set(title)
815 condition.description.set(description)
816 condition.save(self._conn)
817 return f'/condition?id={condition.id_}'