+ self._send_msg(error, code=error.http_code)
+ finally:
+ self.conn.close()
+
+ def do_POST_day(self) -> str:
+ """Update or insert Day of date and Todos mapped to it."""
+ date = self.params.get_str('date')
+ day = Day.by_id(self.conn, date, create=True)
+ day.comment = self.form_data.get_str('day_comment')
+ day.save(self.conn)
+ for process_id in sorted(self.form_data.get_all_int('new_todo')):
+ Todo.create_with_children(self.conn, process_id, date)
+ done_ids = self.form_data.get_all_int('done')
+ comments = self.form_data.get_all_str('comment')
+ efforts = self.form_data.get_all_str('effort')
+ for i, todo_id in enumerate(self.form_data.get_all_int('todo_id')):
+ todo = Todo.by_id(self.conn, todo_id)
+ todo.is_done = todo_id in done_ids
+ if len(comments) > 0:
+ todo.comment = comments[i]
+ if len(efforts) > 0:
+ todo.effort = float(efforts[i]) if efforts[i] else None
+ todo.save(self.conn)
+ for condition in todo.enables:
+ condition.save(self.conn)
+ for condition in todo.disables:
+ condition.save(self.conn)
+ return f'/day?date={date}'
+
+ def do_POST_todo(self) -> str:
+ """Update Todo and its children."""
+ id_ = self.params.get_int('id')
+ for _ in self.form_data.get_all_str('delete'):
+ todo = Todo .by_id(self.conn, id_)
+ todo.remove(self.conn)
+ return '/'
+ todo = Todo.by_id(self.conn, id_)
+ adopted_child_ids = self.form_data.get_all_int('adopt')
+ for child in todo.children:
+ if child.id_ not in adopted_child_ids:
+ assert isinstance(child.id_, int)
+ child = Todo.by_id(self.conn, child.id_)
+ todo.remove_child(child)
+ for child_id in adopted_child_ids:
+ if child_id in [c.id_ for c in todo.children]:
+ continue
+ child = Todo.by_id(self.conn, child_id)
+ todo.add_child(child)
+ for process_id in self.form_data.get_all_int('make'):
+ made = Todo.create_with_children(self.conn, process_id, todo.date)
+ todo.add_child(made)
+ effort = self.form_data.get_str('effort', ignore_strict=True)
+ todo.effort = float(effort) if effort else None
+ todo.set_conditions(self.conn, self.form_data.get_all_int('condition'))
+ todo.set_blockers(self.conn, self.form_data.get_all_int('blocker'))
+ todo.set_enables(self.conn, self.form_data.get_all_int('enables'))
+ todo.set_disables(self.conn, self.form_data.get_all_int('disables'))
+ todo.is_done = len(self.form_data.get_all_str('done')) > 0
+ todo.calendarize = len(self.form_data.get_all_str('calendarize')) > 0
+ todo.comment = self.form_data.get_str('comment', ignore_strict=True)
+ todo.save(self.conn)
+ for condition in todo.enables:
+ condition.save(self.conn)
+ for condition in todo.disables:
+ condition.save(self.conn)
+ return f'/todo?id={todo.id_}'
+
+ def _do_POST_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
+ """Update history timestamps for VersionedAttribute."""
+ id_ = self.params.get_int_or_none('id')
+ item = cls.by_id(self.conn, id_)
+ attr = getattr(item, attr_name)
+ for k, v in self.form_data.get_first_strings_starting('at:').items():
+ old = k[3:]
+ if old[19:] != v:
+ attr.reset_timestamp(old, f'{v}.0')
+ attr.save(self.conn)
+ cls_name = cls.__name__.lower()
+ return f'/{cls_name}_{attr_name}s?id={item.id_}'
+
+ def do_POST_process_descriptions(self) -> str:
+ """Update history timestamps for Process.description."""
+ return self._do_POST_versioned_timestamps(Process, 'description')
+
+ def do_POST_process_efforts(self) -> str:
+ """Update history timestamps for Process.effort."""
+ return self._do_POST_versioned_timestamps(Process, 'effort')
+
+ def do_POST_process_titles(self) -> str:
+ """Update history timestamps for Process.title."""
+ return self._do_POST_versioned_timestamps(Process, 'title')
+
+ def do_POST_process(self) -> str:
+ """Update or insert Process of ?id= and fields defined in postvars."""
+ # pylint: disable=too-many-branches
+ id_ = self.params.get_int_or_none('id')
+ for _ in self.form_data.get_all_str('delete'):
+ process = Process.by_id(self.conn, id_)
+ process.remove(self.conn)
+ return '/processes'
+ process = Process.by_id(self.conn, id_, create=True)
+ process.title.set(self.form_data.get_str('title'))
+ process.description.set(self.form_data.get_str('description'))
+ process.effort.set(self.form_data.get_float('effort'))
+ process.set_conditions(self.conn,
+ self.form_data.get_all_int('condition'))
+ process.set_blockers(self.conn, self.form_data.get_all_int('blocker'))
+ process.set_enables(self.conn, self.form_data.get_all_int('enables'))
+ process.set_disables(self.conn, self.form_data.get_all_int('disables'))
+ process.calendarize = self.form_data.get_all_str('calendarize') != []
+ process.save(self.conn)
+ assert isinstance(process.id_, int)
+ steps: list[ProcessStep] = []
+ for step_id in self.form_data.get_all_int('keep_step'):
+ if step_id not in self.form_data.get_all_int('steps'):
+ raise BadFormatException('trying to keep unknown step')
+ for step_id in self.form_data.get_all_int('steps'):
+ if step_id not in self.form_data.get_all_int('keep_step'):
+ continue
+ step_process_id = self.form_data.get_int(
+ f'step_{step_id}_process_id')
+ parent_id = self.form_data.get_int_or_none(
+ f'step_{step_id}_parent_id')
+ steps += [ProcessStep(step_id, process.id_, step_process_id,
+ parent_id)]
+ for step_id in self.form_data.get_all_int('steps'):
+ for step_process_id in self.form_data.get_all_int(
+ f'new_step_to_{step_id}'):
+ steps += [ProcessStep(None, process.id_, step_process_id,
+ step_id)]
+ new_step_title = None
+ for step_identifier in self.form_data.get_all_str('new_top_step'):
+ try:
+ step_process_id = int(step_identifier)
+ steps += [ProcessStep(None, process.id_, step_process_id,
+ None)]
+ except ValueError:
+ new_step_title = step_identifier
+ process.uncache()
+ process.set_steps(self.conn, steps)
+ process.set_step_suppressions(self.conn,
+ self.form_data.get_all_int('suppresses'))
+ process.save(self.conn)
+ owners_to_set = []
+ new_owner_title = None
+ for owner_identifier in self.form_data.get_all_str('step_of'):
+ try:
+ owners_to_set += [int(owner_identifier)]
+ except ValueError:
+ new_owner_title = owner_identifier
+ process.set_owners(self.conn, owners_to_set)
+ params = f'id={process.id_}'
+ if new_step_title:
+ title_b64_encoded = b64encode(new_step_title.encode()).decode()
+ params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
+ elif new_owner_title:
+ title_b64_encoded = b64encode(new_owner_title.encode()).decode()
+ params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
+ return f'/process?{params}'
+
+ def do_POST_condition_descriptions(self) -> str:
+ """Update history timestamps for Condition.description."""
+ return self._do_POST_versioned_timestamps(Condition, 'description')
+
+ def do_POST_condition_titles(self) -> str:
+ """Update history timestamps for Condition.title."""
+ return self._do_POST_versioned_timestamps(Condition, 'title')
+
+ def do_POST_condition(self) -> str:
+ """Update/insert Condition of ?id= and fields defined in postvars."""
+ id_ = self.params.get_int_or_none('id')
+ for _ in self.form_data.get_all_str('delete'):
+ condition = Condition.by_id(self.conn, id_)
+ condition.remove(self.conn)
+ return '/conditions'
+ condition = Condition.by_id(self.conn, id_, create=True)
+ condition.is_active = self.form_data.get_all_str('is_active') != []
+ condition.title.set(self.form_data.get_str('title'))
+ condition.description.set(self.form_data.get_str('description'))
+ condition.save(self.conn)
+ return f'/condition?id={condition.id_}'
+
+ def _init_handling(self) -> None:
+ # pylint: disable=attribute-defined-outside-init
+ self.conn = DatabaseConnection(self.server.db)
+ parsed_url = urlparse(self.path)
+ self.site = path_split(parsed_url.path)[1]
+ params = parse_qs(parsed_url.query, strict_parsing=True)
+ self.params = InputsParser(params, False)