1 """Web server stuff."""
2 from __future__ import annotations
3 from inspect import signature
4 from typing import Any, Callable
5 from base64 import b64encode, b64decode
6 from binascii import Error as binascii_Exception
7 from http.server import HTTPServer, BaseHTTPRequestHandler
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
16 from plomtask.db import DatabaseConnection, DatabaseFile, BaseModel
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo, TodoOrProcStepNode
20 from plomtask.misc import DictableNode
22 TEMPLATES_DIR = 'templates'
25 class TaskServer(HTTPServer):
26 """Variant of HTTPServer that knows .jinja as Jinja Environment."""
28 def __init__(self, db_file: DatabaseFile,
29 *args: Any, **kwargs: Any) -> None:
30 super().__init__(*args, **kwargs)
32 self.render_mode = 'html'
33 self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
37 """Wrapper for validating and retrieving dict-like HTTP inputs."""
39 def __init__(self, dict_: dict[str, list[str]]) -> None:
42 def get_all_str(self, key: str) -> list[str]:
43 """Retrieve list of string values at key (empty if no key)."""
44 if key not in self.inputs.keys():
46 return self.inputs[key]
48 def get_all_int(self, key: str, fail_on_empty: bool = False) -> list[int]:
49 """Retrieve list of int values at key."""
50 all_str = self.get_all_str(key)
52 return [int(s) for s in all_str if fail_on_empty or s != '']
53 except ValueError as e:
54 msg = f'cannot int a form field value for key {key} in: {all_str}'
55 raise BadFormatException(msg) from e
57 def get_str(self, key: str, default: str | None = None) -> str | None:
58 """Retrieve single/first string value of key, or default."""
59 vals = self.get_all_str(key)
64 def get_str_or_fail(self, key: str, default: str | None = None) -> str:
65 """Retrieve first string value of key, if none: fail or default."""
66 vals = self.get_all_str(key)
68 if default is not None:
70 raise BadFormatException(f'no value found for key: {key}')
73 def get_int_or_none(self, key: str) -> int | None:
74 """Retrieve single/first value of key as int, return None if empty."""
75 val = self.get_str_or_fail(key, '')
80 except (ValueError, TypeError) as e:
81 msg = f'cannot int form field value for key {key}: {val}'
82 raise BadFormatException(msg) from e
84 def get_bool(self, key: str) -> bool:
85 """Return if value to key truish; return False if None/no value."""
86 return self.get_str(key) in {'True', 'true', '1', 'on'}
88 def get_all_of_key_prefixed(self, key_prefix: str) -> dict[str, list[str]]:
89 """Retrieve dict of strings at keys starting with key_prefix."""
91 for key in [k for k in self.inputs.keys() if k.startswith(key_prefix)]:
92 ret[key[len(key_prefix):]] = self.inputs[key]
95 def get_float_or_fail(self, key: str) -> float:
96 """Retrieve float value of key from self.postvars, fail if none."""
97 val = self.get_str_or_fail(key)
100 except ValueError as e:
101 msg = f'cannot float form field value for key {key}: {val}'
102 raise BadFormatException(msg) from e
104 def get_all_floats_or_nones(self, key: str) -> list[float | None]:
105 """Retrieve list of float value at key, None if empty strings."""
106 ret: list[float | None] = []
107 for val in self.get_all_str(key):
113 except ValueError as e:
114 msg = f'cannot float form field value for key {key}: {val}'
115 raise BadFormatException(msg) from e
119 class TaskHandler(BaseHTTPRequestHandler):
120 """Handles single HTTP request."""
121 # pylint: disable=too-many-public-methods
123 _conn: DatabaseConnection
126 _params: InputsParser
129 self, ctx: dict[str, Any], tmpl_name: str, code: int = 200
131 """HTTP-send ctx as HTML or JSON, as defined by .server.render_mode.
133 The differentiation by .server.render_mode serves to allow easily
134 comparable JSON responses for automatic testing.
137 headers: list[tuple[str, str]] = []
138 if 'html' == self.server.render_mode:
139 tmpl = self.server.jinja.get_template(f'{tmpl_name}.html')
140 body = tmpl.render(ctx)
142 body = self._ctx_to_json(ctx)
143 headers += [('Content-Type', 'application/json')]
144 self.send_response(code)
145 for header_tuple in headers:
146 self.send_header(*header_tuple)
148 self.wfile.write(bytes(body, 'utf-8'))
150 def _ctx_to_json(self, ctx: dict[str, object]) -> str:
151 """Render ctx into JSON string.
153 Flattens any objects that json.dumps might not want to serialize, and
154 turns occurrences of BaseModel objects into listings of their .id_, to
155 be resolved to a full dict inside a top-level '_library' dictionary,
156 to avoid endless and circular nesting.
159 def flatten(node: object) -> object:
161 def update_library_with(
162 item: BaseModel[int] | BaseModel[str]) -> None:
163 cls_name = item.__class__.__name__
164 if cls_name not in library:
165 library[cls_name] = {}
166 if item.id_ not in library[cls_name]:
167 d, refs = item.as_dict_and_refs
168 id_key = '?' if item.id_ is None else item.id_
169 library[cls_name][id_key] = d
171 update_library_with(ref)
173 if isinstance(node, BaseModel):
174 update_library_with(node)
176 if isinstance(node, DictableNode):
177 d, refs = node.as_dict_and_refs
179 update_library_with(ref)
181 if isinstance(node, (list, tuple)):
182 return [flatten(item) for item in node]
183 if isinstance(node, dict):
185 for k, v in node.items():
188 if isinstance(node, HandledException):
192 library: dict[str, dict[str | int, object]] = {}
193 for k, v in ctx.items():
195 ctx['_library'] = library
196 return json_dumps(ctx)
199 def _request_wrapper(http_method: str, not_found_msg: str
200 ) -> Callable[..., Callable[[TaskHandler], None]]:
201 """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
203 Among other things, conditionally cleans all caches, but only on POST
204 requests, as only those are expected to change the states of objects
205 that may be cached, and certainly only those are expected to write any
206 changes to the database. We want to call them as early though as
207 possible here, either exactly after the specific request handler
208 returns successfully, or right after any exception is triggered –
209 otherwise, race conditions become plausible.
211 Note that otherwise any POST attempt, even a failed one, may end in
212 problematic inconsistencies:
214 - if the POST handler experiences an Exception, changes to objects
215 won't get written to the DB, but the changed objects may remain in
216 the cache and affect other objects despite their possibly illegal
219 - even if an object was just saved to the DB, we cannot be sure its
220 current state is completely identical to what we'd get if loading it
221 fresh from the DB (e.g. currently Process.n_owners is only updated
222 when loaded anew via .from_table_row, nor is its state written to
223 the DB by .save; a questionable design choice, but proof that we
224 have no guarantee that objects' .save stores all their states we'd
225 prefer at their most up-to-date.
228 def clear_caches() -> None:
229 for cls in (Day, Todo, Condition, Process, ProcessStep):
230 assert hasattr(cls, 'empty_cache')
233 def decorator(f: Callable[..., str | None]
234 ) -> Callable[[TaskHandler], None]:
235 def wrapper(self: TaskHandler) -> None:
236 # pylint: disable=protected-access
237 # (because pylint here fails to detect the use of wrapper as a
238 # method to self with respective access privileges)
240 self._conn = DatabaseConnection(self.server.db)
241 parsed_url = urlparse(self.path)
242 self._site = path_split(parsed_url.path)[1]
243 params = parse_qs(parsed_url.query,
244 keep_blank_values=True,
246 self._params = InputsParser(params)
247 handler_name = f'do_{http_method}_{self._site}'
248 if hasattr(self, handler_name):
249 handler = getattr(self, handler_name)
250 redir_target = f(self, handler)
251 if 'POST' == http_method:
254 self.send_response(302)
255 self.send_header('Location', redir_target)
258 msg = f'{not_found_msg}: {self._site}'
259 raise NotFoundException(msg)
260 except HandledException as error:
261 if 'POST' == http_method:
264 self._send_page(ctx, 'msg', error.http_code)
270 @_request_wrapper('GET', 'Unknown page')
271 def do_GET(self, handler: Callable[[], str | dict[str, object]]
273 """Render page with result of handler, or redirect if result is str."""
274 tmpl_name = f'{self._site}'
275 ctx_or_redir_target = handler()
276 if isinstance(ctx_or_redir_target, str):
277 return ctx_or_redir_target
278 self._send_page(ctx_or_redir_target, tmpl_name)
281 @_request_wrapper('POST', 'Unknown POST target')
282 def do_POST(self, handler: Callable[[], str]) -> str:
283 """Handle POST with handler, prepare redirection to result."""
284 length = int(self.headers['content-length'])
285 postvars = parse_qs(self.rfile.read(length).decode(),
286 keep_blank_values=True)
287 self._form = InputsParser(postvars)
288 redir_target = handler()
295 def _get_item(target_class: Any
296 ) -> Callable[..., Callable[[TaskHandler],
298 def decorator(f: Callable[..., dict[str, object]]
299 ) -> Callable[[TaskHandler], dict[str, object]]:
300 def wrapper(self: TaskHandler) -> dict[str, object]:
301 # pylint: disable=protected-access
302 # (because pylint here fails to detect the use of wrapper as a
303 # method to self with respective access privileges)
305 for val in self._params.get_all_int('id', fail_on_empty=True):
307 if target_class.can_create_by_id:
308 item = target_class.by_id_or_create(self._conn, id_)
310 item = target_class.by_id(self._conn, id_)
311 if 'exists' in signature(f).parameters:
312 exists = id_ is not None and target_class._get_cached(id_)
313 return f(self, item, exists)
318 def do_GET_(self) -> str:
319 """Return redirect target on GET /."""
322 def _do_GET_calendar(self) -> dict[str, object]:
323 """Show Days from ?start= to ?end=.
325 Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
326 same, the only difference being the HTML template they are rendered to,
327 which .do_GET selects from their method name.
329 start = self._params.get_str_or_fail('start', '')
330 end = self._params.get_str_or_fail('end', '')
331 end = end if end != '' else date_in_n_days(366)
333 days, start, end = Day.by_date_range_with_limits(self._conn,
335 days = Day.with_filled_gaps(days, start, end)
336 today = date_in_n_days(0)
337 return {'start': start, 'end': end, 'days': days, 'today': today}
339 def do_GET_calendar(self) -> dict[str, object]:
340 """Show Days from ?start= to ?end= – normal view."""
341 return self._do_GET_calendar()
343 def do_GET_calendar_txt(self) -> dict[str, object]:
344 """Show Days from ?start= to ?end= – minimalist view."""
345 return self._do_GET_calendar()
347 def do_GET_day(self) -> dict[str, object]:
348 """Show single Day of ?date=."""
349 date = self._params.get_str('date', date_in_n_days(0))
350 make_type = self._params.get_str_or_fail('make_type', 'full')
352 day = Day.by_id_or_create(self._conn, date)
353 conditions_present = []
356 for todo in day.todos:
357 for condition in todo.conditions + todo.blockers:
358 if condition not in conditions_present:
359 conditions_present += [condition]
360 enablers_for[condition.id_] = [p for p in
361 Process.all(self._conn)
362 if condition in p.enables]
363 disablers_for[condition.id_] = [p for p in
364 Process.all(self._conn)
365 if condition in p.disables]
366 seen_todos: set[int] = set()
367 top_nodes = [t.get_step_tree(seen_todos)
368 for t in day.todos if not t.parents]
370 'top_nodes': top_nodes,
371 'make_type': make_type,
372 'enablers_for': enablers_for,
373 'disablers_for': disablers_for,
374 'conditions_present': conditions_present,
375 'processes': Process.all(self._conn)}
378 def do_GET_todo(self, todo: Todo) -> dict[str, object]:
379 """Show single Todo of ?id=."""
381 def walk_process_steps(node_id: int,
382 process_step_nodes: list[ProcessStepsNode],
383 steps_nodes: list[TodoOrProcStepNode]) -> int:
384 for process_step_node in process_step_nodes:
386 proc = Process.by_id(self._conn,
387 process_step_node.step.step_process_id)
388 node = TodoOrProcStepNode(node_id, None, proc, [])
389 steps_nodes += [node]
390 node_id = walk_process_steps(
391 node_id, process_step_node.steps, node.children)
394 def walk_todo_steps(node_id: int, todos: list[Todo],
395 steps_nodes: list[TodoOrProcStepNode]) -> int:
398 for match in [item for item in steps_nodes
400 and item.process == todo.process]:
403 for child in match.children:
404 child.fillable = True
405 node_id = walk_todo_steps(
406 node_id, todo.children, match.children)
409 node = TodoOrProcStepNode(node_id, todo, None, [])
410 steps_nodes += [node]
411 node_id = walk_todo_steps(
412 node_id, todo.children, node.children)
415 def collect_adoptables_keys(
416 steps_nodes: list[TodoOrProcStepNode]) -> set[int]:
418 for node in steps_nodes:
420 assert isinstance(node.process, Process)
421 assert isinstance(node.process.id_, int)
422 ids.add(node.process.id_)
423 ids = ids | collect_adoptables_keys(node.children)
426 todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
427 process_tree = todo.process.get_steps(self._conn, None)
428 steps_todo_to_process: list[TodoOrProcStepNode] = []
429 last_node_id = walk_process_steps(0, process_tree,
430 steps_todo_to_process)
431 for steps_node in steps_todo_to_process:
432 steps_node.fillable = True
433 walk_todo_steps(last_node_id, todo_steps, steps_todo_to_process)
434 adoptables: dict[int, list[Todo]] = {}
435 any_adoptables = [Todo.by_id(self._conn, t.id_)
436 for t in Todo.by_date(self._conn, todo.date)
439 for id_ in collect_adoptables_keys(steps_todo_to_process):
440 adoptables[id_] = [t for t in any_adoptables
441 if t.process.id_ == id_]
442 return {'todo': todo,
443 'steps_todo_to_process': steps_todo_to_process,
444 'adoption_candidates_for': adoptables,
445 'process_candidates': sorted(Process.all(self._conn)),
446 'todo_candidates': any_adoptables,
447 'condition_candidates': Condition.all(self._conn)}
449 def do_GET_todos(self) -> dict[str, object]:
450 """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
451 sort_by = self._params.get_str_or_fail('sort_by', 'title')
452 start = self._params.get_str_or_fail('start', '')
453 end = self._params.get_str_or_fail('end', '')
454 process_id = self._params.get_int_or_none('process_id')
455 comment_pattern = self._params.get_str_or_fail('comment_pattern', '')
457 ret = Todo.by_date_range_with_limits(self._conn, (start, end))
458 todos_by_date_range, start, end = ret
459 todos = [t for t in todos_by_date_range
460 if comment_pattern in t.comment
461 and ((not process_id) or t.process.id_ == process_id)]
462 sort_by = Todo.sort_by(todos, sort_by)
463 return {'start': start, 'end': end, 'process_id': process_id,
464 'comment_pattern': comment_pattern, 'todos': todos,
465 'all_processes': Process.all(self._conn), 'sort_by': sort_by}
467 def do_GET_conditions(self) -> dict[str, object]:
468 """Show all Conditions."""
469 pattern = self._params.get_str_or_fail('pattern', '')
470 sort_by = self._params.get_str_or_fail('sort_by', 'title')
472 conditions = Condition.matching(self._conn, pattern)
473 sort_by = Condition.sort_by(conditions, sort_by)
474 return {'conditions': conditions,
478 @_get_item(Condition)
479 def do_GET_condition(self,
482 ) -> dict[str, object]:
483 """Show Condition of ?id=."""
484 ps = Process.all(self._conn)
485 return {'condition': c,
486 'is_new': not exists,
487 'enabled_processes': [p for p in ps if c in p.conditions],
488 'disabled_processes': [p for p in ps if c in p.blockers],
489 'enabling_processes': [p for p in ps if c in p.enables],
490 'disabling_processes': [p for p in ps if c in p.disables]}
492 @_get_item(Condition)
493 def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
494 """Show title history of Condition of ?id=."""
495 return {'condition': c}
497 @_get_item(Condition)
498 def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
499 """Show description historys of Condition of ?id=."""
500 return {'condition': c}
503 def do_GET_process(self,
506 ) -> dict[str, object]:
507 """Show Process of ?id=."""
508 owner_ids = self._params.get_all_int('step_to')
509 owned_ids = self._params.get_all_int('has_step')
510 title_64 = self._params.get_str('title_b64')
514 title_new = b64decode(title_64.encode()).decode()
515 except binascii_Exception as exc:
516 msg = 'invalid base64 for ?title_b64='
517 raise BadFormatException(msg) from exc
520 process.title.set(title_new)
521 preset_top_step = None
522 owners = process.used_as_step_by(self._conn)
523 for step_id in owner_ids:
524 owners += [Process.by_id(self._conn, step_id)]
525 for process_id in owned_ids:
526 Process.by_id(self._conn, process_id) # to ensure ID exists
527 preset_top_step = process_id
528 return {'process': process,
529 'is_new': not exists,
530 'preset_top_step': preset_top_step,
531 'steps': process.get_steps(self._conn),
533 'n_todos': len(Todo.by_process_id(self._conn, process.id_)),
534 'process_candidates': Process.all(self._conn),
535 'condition_candidates': Condition.all(self._conn)}
538 def do_GET_process_titles(self, p: Process) -> dict[str, object]:
539 """Show title history of Process of ?id=."""
540 return {'process': p}
543 def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
544 """Show description historys of Process of ?id=."""
545 return {'process': p}
548 def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
549 """Show default effort history of Process of ?id=."""
550 return {'process': p}
552 def do_GET_processes(self) -> dict[str, object]:
553 """Show all Processes."""
554 pattern = self._params.get_str_or_fail('pattern', '')
555 sort_by = self._params.get_str_or_fail('sort_by', 'title')
557 processes = Process.matching(self._conn, pattern)
558 sort_by = Process.sort_by(processes, sort_by)
559 return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
564 def _delete_or_post(target_class: Any, redir_target: str = '/'
565 ) -> Callable[..., Callable[[TaskHandler], str]]:
566 def decorator(f: Callable[..., str]
567 ) -> Callable[[TaskHandler], str]:
568 def wrapper(self: TaskHandler) -> str:
569 # pylint: disable=protected-access
570 # (because pylint here fails to detect the use of wrapper as a
571 # method to self with respective access privileges)
572 id_ = self._params.get_int_or_none('id')
573 for _ in self._form.get_all_str('delete'):
575 msg = 'trying to delete non-saved ' +\
576 f'{target_class.__name__}'
577 raise NotFoundException(msg)
578 item = target_class.by_id(self._conn, id_)
579 item.remove(self._conn)
581 if target_class.can_create_by_id:
582 item = target_class.by_id_or_create(self._conn, id_)
584 item = target_class.by_id(self._conn, id_)
589 def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
590 """Update history timestamps for VersionedAttribute."""
591 id_ = self._params.get_int_or_none('id')
592 item = cls.by_id(self._conn, id_)
593 attr = getattr(item, attr_name)
594 for k, vals in self._form.get_all_of_key_prefixed('at:').items():
595 if k[19:] != vals[0]:
596 attr.reset_timestamp(k, f'{vals[0]}.0')
597 attr.save(self._conn)
598 return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
600 def do_POST_day(self) -> str:
601 """Update or insert Day of date and Todos mapped to it."""
602 # pylint: disable=too-many-locals
603 date = self._params.get_str_or_fail('date')
604 day_comment = self._form.get_str_or_fail('day_comment')
605 make_type = self._form.get_str_or_fail('make_type')
606 old_todos = self._form.get_all_int('todo_id')
607 new_todos_by_process = self._form.get_all_int('new_todo')
608 comments = self._form.get_all_str('comment')
609 efforts = self._form.get_all_floats_or_nones('effort')
610 done_todos = self._form.get_all_int('done')
611 is_done = [t_id in done_todos for t_id in old_todos]
612 if not (len(old_todos) == len(is_done) == len(comments)
614 msg = 'not equal number each of number of todo_id, comments, ' +\
616 raise BadFormatException(msg)
617 for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
618 raise BadFormatException('"done" field refers to unknown Todo')
620 day = Day.by_id_or_create(self._conn, date)
621 day.comment = day_comment
624 for process_id in sorted(new_todos_by_process):
625 process = Process.by_id(self._conn, process_id)
626 todo = Todo(None, process, False, date)
627 todo.save(self._conn)
629 if 'full' == make_type:
630 for todo in new_todos:
631 todo.ensure_children(self._conn)
632 for i, todo_id in enumerate(old_todos):
633 todo = Todo.by_id(self._conn, todo_id)
634 todo.is_done = is_done[i]
635 todo.comment = comments[i]
636 todo.effort = efforts[i]
637 todo.save(self._conn)
638 return f'/day?date={date}&make_type={make_type}'
640 @_delete_or_post(Todo, '/')
641 def do_POST_todo(self, todo: Todo) -> str:
642 """Update Todo and its children."""
643 # pylint: disable=too-many-locals
644 # pylint: disable=too-many-branches
645 # pylint: disable=too-many-statements
646 assert todo.id_ is not None
647 adoptees = [(id_, todo.id_) for id_ in self._form.get_all_int('adopt')]
648 to_make = {'full': [(id_, todo.id_)
649 for id_ in self._form.get_all_int('make_full')],
650 'empty': [(id_, todo.id_)
651 for id_ in self._form.get_all_int('make_empty')]}
652 step_fillers_to = self._form.get_all_of_key_prefixed('step_filler_to_')
653 to_update: dict[str, Any] = {
654 'comment': self._form.get_str_or_fail('comment', ''),
655 'is_done': self._form.get_bool('is_done'),
656 'calendarize': self._form.get_bool('calendarize')}
657 cond_rels = [self._form.get_all_int(name) for name in
658 ['conditions', 'blockers', 'enables', 'disables']]
659 effort_or_not = self._form.get_str('effort')
660 if effort_or_not is not None:
661 if effort_or_not == '':
662 to_update['effort'] = None
665 to_update['effort'] = float(effort_or_not)
666 except ValueError as e:
667 msg = 'cannot float form field value for key: effort'
668 raise BadFormatException(msg) from e
669 for k, fillers in step_fillers_to.items():
672 except ValueError as e:
673 msg = f'bad step_filler_to_ key: {k}'
674 raise BadFormatException(msg) from e
675 for filler in [f for f in fillers if f != 'ignore']:
678 to_int = filler[5:] if filler.startswith(prefix) else filler
680 target_id = int(to_int)
681 except ValueError as e:
682 msg = f'bad fill_for target: {filler}'
683 raise BadFormatException(msg) from e
684 if filler.startswith(prefix):
685 to_make['empty'] += [(target_id, parent_id)]
687 adoptees += [(target_id, parent_id)]
689 todo.set_condition_relations(self._conn, *cond_rels)
690 for parent in [Todo.by_id(self._conn, a[1])
691 for a in adoptees] + [todo]:
692 for child in parent.children:
693 if child not in [t[0] for t in adoptees
694 if t[0] == child.id_ and t[1] == parent.id_]:
695 parent.remove_child(child)
696 parent.save(self._conn)
697 for child_id, parent_id in adoptees:
698 parent = Todo.by_id(self._conn, parent_id)
699 if child_id not in [c.id_ for c in parent.children]:
700 parent.add_child(Todo.by_id(self._conn, child_id))
701 parent.save(self._conn)
702 todo.update_attrs(**to_update)
703 for approach, make_data in to_make.items():
704 for process_id, parent_id in make_data:
705 parent = Todo.by_id(self._conn, parent_id)
706 process = Process.by_id(self._conn, process_id)
707 made = Todo(None, process, False, todo.date)
708 made.save(self._conn)
709 if 'full' == approach:
710 made.ensure_children(self._conn)
711 parent.add_child(made)
712 parent.save(self._conn)
713 # todo.save() may destroy Todo if .effort < 0, so retrieve .id_ early
714 url = f'/todo?id={todo.id_}'
715 todo.save(self._conn)
718 def do_POST_process_descriptions(self) -> str:
719 """Update history timestamps for Process.description."""
720 return self._change_versioned_timestamps(Process, 'description')
722 def do_POST_process_efforts(self) -> str:
723 """Update history timestamps for Process.effort."""
724 return self._change_versioned_timestamps(Process, 'effort')
726 def do_POST_process_titles(self) -> str:
727 """Update history timestamps for Process.title."""
728 return self._change_versioned_timestamps(Process, 'title')
730 @_delete_or_post(Process, '/processes')
731 def do_POST_process(self, process: Process) -> str:
732 """Update or insert Process of ?id= and fields defined in postvars."""
733 # pylint: disable=too-many-locals
735 def id_or_title(l_id_or_title: list[str]) -> tuple[str, list[int]]:
736 l_ids, title = [], ''
737 for id_or_title in l_id_or_title:
739 l_ids += [int(id_or_title)]
744 versioned = {'title': self._form.get_str_or_fail('title'),
745 'description': self._form.get_str_or_fail('description'),
746 'effort': self._form.get_float_or_fail('effort')}
747 cond_rels = [self._form.get_all_int(s) for s
748 in ['conditions', 'blockers', 'enables', 'disables']]
749 calendarize = self._form.get_bool('calendarize')
750 step_of = self._form.get_all_str('step_of')
751 suppressions = self._form.get_all_int('suppresses')
752 kept_steps = self._form.get_all_int('kept_steps')
753 new_top_step_procs = self._form.get_all_str('new_top_step')
755 for step_id in kept_steps:
756 name = f'new_step_to_{step_id}'
757 new_steps_to[step_id] = self._form.get_all_int(name)
758 new_owner_title, owners_to_set = id_or_title(step_of)
759 new_step_title, new_top_step_proc_ids = id_or_title(new_top_step_procs)
761 for k, v in versioned.items():
762 getattr(process, k).set(v)
763 process.calendarize = calendarize
764 process.save(self._conn)
765 assert isinstance(process.id_, int)
766 # set relations to Conditions and ProcessSteps / other Processes
767 process.set_condition_relations(self._conn, *cond_rels)
769 for step_id in kept_steps:
770 owned_steps += [ProcessStep.by_id(self._conn, step_id)]
771 owned_steps += [ # new sub-steps
772 ProcessStep(None, process.id_, step_process_id, step_id)
773 for step_process_id in new_steps_to[step_id]]
774 for step_process_id in new_top_step_proc_ids:
775 owned_steps += [ProcessStep(None, process.id_, step_process_id,
777 process.set_step_relations(self._conn, owners_to_set, suppressions,
779 # encode titles for potential newly-to-create Processes up or down
780 params = f'id={process.id_}'
782 title_b64_encoded = b64encode(new_step_title.encode()).decode()
783 params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
784 elif new_owner_title:
785 title_b64_encoded = b64encode(new_owner_title.encode()).decode()
786 params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
787 process.save(self._conn)
788 return f'/process?{params}'
790 def do_POST_condition_descriptions(self) -> str:
791 """Update history timestamps for Condition.description."""
792 return self._change_versioned_timestamps(Condition, 'description')
794 def do_POST_condition_titles(self) -> str:
795 """Update history timestamps for Condition.title."""
796 return self._change_versioned_timestamps(Condition, 'title')
798 @_delete_or_post(Condition, '/conditions')
799 def do_POST_condition(self, condition: Condition) -> str:
800 """Update/insert Condition of ?id= and fields defined in postvars."""
801 title = self._form.get_str_or_fail('title')
802 description = self._form.get_str_or_fail('description')
803 is_active = self._form.get_bool('is_active')
804 condition.is_active = is_active
806 condition.title.set(title)
807 condition.description.set(description)
808 condition.save(self._conn)
809 return f'/condition?id={condition.id_}'