def do_POST_process(self, process: Process) -> str:
"""Update or insert Process of ?id= and fields defined in postvars."""
# pylint: disable=too-many-locals
- # pylint: disable=too-many-statements
try:
- title = self._form_data.get_str('title')
- description = self._form_data.get_str('description')
- effort = self._form_data.get_float('effort')
+ versioned = {'title': self._form_data.get_str('title'),
+ 'description': self._form_data.get_str('description'),
+ 'effort': self._form_data.get_float('effort')}
except NotFoundException as e:
raise BadFormatException from e
- conditions = self._form_data.get_all_int('conditions')
- blockers = self._form_data.get_all_int('blockers')
- enables = self._form_data.get_all_int('enables')
- disables = self._form_data.get_all_int('disables')
+ cond_rels = [self._form_data.get_all_int(s) for s
+ in ['conditions', 'blockers', 'enables', 'disables']]
calendarize = self._form_data.get_bool('calendarize')
- suppresses = self._form_data.get_all_int('suppresses')
step_of = self._form_data.get_all_str('step_of')
- keep_steps = self._form_data.get_all_int('keep_step')
- step_ids = self._form_data.get_all_int('steps')
+ suppresses = self._form_data.get_all_int('suppresses')
+ step_ids = self._form_data.get_all_int('kept_steps')
new_top_steps = self._form_data.get_all_str('new_top_step')
- step_process_id_to = {}
- step_parent_id_to = {}
new_steps_to = {}
for step_id in step_ids:
name = f'new_step_to_{step_id}'
new_steps_to[step_id] = self._form_data.get_all_int(name)
- for step_id in keep_steps:
- name = f'step_{step_id}_process_id'
- step_process_id_to[step_id] = self._form_data.get_int(name)
- name = f'step_{step_id}_parent_id'
- step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
- process.title.set(title)
- process.description.set(description)
- process.effort.set(effort)
- process.set_condition_relations(
- self.conn, conditions, blockers, enables, disables)
+ for k, v in versioned.items():
+ getattr(process, k).set(v)
+ process.set_condition_relations(self.conn, *cond_rels)
process.calendarize = calendarize
process.save(self.conn)
assert isinstance(process.id_, int)
+ # set relations to, and if non-existant yet: create, other Processes
+ # 1. owners (upwards)
+ owners_to_set = []
+ new_owner_title = None
+ for owner_identifier in step_of:
+ try:
+ owners_to_set += [int(owner_identifier)]
+ except ValueError:
+ new_owner_title = owner_identifier
+ process.set_owners(self.conn, owners_to_set)
+ # 2. owneds (downwards)
new_step_title = None
- steps: list[ProcessStep] = []
- for step_id in keep_steps:
- if step_id not in step_ids:
- raise BadFormatException('trying to keep unknown step')
- step = ProcessStep(step_id, process.id_,
- step_process_id_to[step_id],
- step_parent_id_to[step_id])
- steps += [step]
+ steps: list[ProcessStep] = [ProcessStep.by_id(self.conn, step_id)
+ for step_id in step_ids]
for step_id in step_ids:
new = [ProcessStep(None, process.id_, step_process_id, step_id)
for step_process_id in new_steps_to[step_id]]
new_step_title = step_identifier
process.set_steps(self.conn, steps)
process.set_step_suppressions(self.conn, suppresses)
- owners_to_set = []
- new_owner_title = None
- for owner_identifier in step_of:
- try:
- owners_to_set += [int(owner_identifier)]
- except ValueError:
- new_owner_title = owner_identifier
- process.set_owners(self.conn, owners_to_set)
+ # encode titles for potentially newly created Processes up or down
params = f'id={process.id_}'
if new_step_title:
title_b64_encoded = b64encode(new_step_title.encode()).decode()
def test_POST_process_steps(self) -> None:
"""Test behavior of ProcessStep posting."""
# pylint: disable=too-many-statements
+ url = '/process?id=1'
exp = ExpectedGetProcess(1)
self.post_exp_process([exp], {}, 1)
- # post first (top-level) step of proc 2 to proc 1 by 'new_top_step'
- self.post_exp_process([exp], {}, 2)
- self.post_exp_process([exp], {'new_top_step': 2}, 1)
+ # post first (top-level) step of proc 2 to proc 1 by 'step_of' in 2
+ self.post_exp_process([exp], {'step_of': 1}, 2)
exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 2)])
exp.set('steps', [exp.stepnode_as_dict(1)])
- self.check_json_get('/process?id=1', exp)
+ self.check_json_get(url, exp)
# post empty/absent steps list to process, expect clean slate, and old
# step to completely disappear
self.post_exp_process([exp], {}, 1)
exp.lib_wipe('ProcessStep')
exp.set('steps', [])
- self.check_json_get('/process?id=1', exp)
- # post new step of proc2 to proc1 by 'keep_step', 'steps', and its deps
- post = {'step_1_process_id': 2, 'step_1_parent_id': '', 'steps': [1]}
- self.post_exp_process([exp], post | {'keep_step': [1]}, 1)
+ self.check_json_get(url, exp)
+ # post new step of proc2 to proc1 by 'new_top_step'
+ self.post_exp_process([exp], {'new_top_step': 2}, 1)
exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 2)])
+ self.post_exp_process([exp], {'kept_steps': [1]}, 1)
exp.set('steps', [exp.stepnode_as_dict(1)])
- self.check_json_get('/process?id=1', exp)
- # fail with invalid or missing 'steps' dependencies
- p_min = {'title': '', 'description': '', 'effort': 0}
- p = p_min | {'step_1_process_id': 2, 'steps': [1], 'keep_step': [1]}
- self.check_post(p | {'step_1_parent_id': 'foo'}, '/process?id=1', 400)
- p = p_min | {'step_1_parent_id': '', 'steps': [1], 'keep_step': [1]}
- self.check_post(p | {'step_1_process_id': 'foo'}, '/process?id=1', 400)
- self.check_post(p, '/process?id=1', 400)
- # treat valid steps data without 'keep_step' like posting emptiness
- p = {'step_1_process_id': 2, 'step_1_parent_id': '', 'steps': [1]}
- self.post_exp_process([exp], post, 1)
- exp.lib_wipe('ProcessStep')
- exp.set('steps', [])
- self.check_json_get('/process?id=1', exp)
- # fail when refering in 'keep_step' to step not in 'steps'
- p = p_min | p
- self.check_post(p | {'keep_step': [2]}, '/process?id=1', 400)
- # expect failure independently of what parent/proc_id keys exist
- p['step_2_process_id'], p['step_2_parent_id'] = 3, None
- self.check_post(p | {'keep_step': [2]}, '/process?id=1', 400)
+ self.check_json_get(url, exp)
# fail on single- and multi-step recursion
- self.check_post(p | {'new_top_step': 1}, '/process?id=1', 400)
+ p_min = {'title': '', 'description': '', 'effort': 0}
+ self.check_post(p_min | {'new_top_step': 1}, url, 400)
+ self.check_post(p_min | {'step_of': 1}, url, 400)
self.post_exp_process([exp], {'new_top_step': 1}, 2)
- self.check_post(p | {'new_top_step': 2}, '/process?id=1', 400)
- # post sibling steps
+ self.check_post(p_min | {'step_of': 2, 'new_top_step': 2}, url, 400)
self.post_exp_process([exp], {}, 3)
- p = {'step_1_process_id': 3, 'steps': [1], 'keep_step': [1]}
- self.post_exp_process([exp], p | {'new_top_step': 3}, 1)
- exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 3),
- exp.procstep_as_dict(2, 1, 3)])
+ self.post_exp_process([exp], {'step_of': 3}, 4)
+ self.check_post(p_min | {'new_top_step': 3, 'step_of': 4}, url, 400)
+ # post sibling steps
+ self.post_exp_process([exp], {}, 4)
+ self.post_exp_process([exp], {'new_top_step': 4}, 1)
+ self.post_exp_process([exp], {'kept_steps': [1], 'new_top_step': 4}, 1)
+ exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 4),
+ exp.procstep_as_dict(2, 1, 4)])
exp.set('steps', [exp.stepnode_as_dict(1), exp.stepnode_as_dict(2)])
- self.check_json_get('/process?id=1', exp)
+ self.check_json_get(url, exp)
# post sub-step chain
- p = {'step_1_process_id': 3, 'step_2_process_id': 3,
- 'steps': [1, 2], 'keep_step': [1, 2]}
- self.post_exp_process([exp], p | {'new_step_to_2': 3}, 1)
- exp.lib_set('ProcessStep', [exp.procstep_as_dict(3, 1, 3, 2)])
+ p = {'kept_steps': [1, 2], 'new_step_to_2': 4}
+ self.post_exp_process([exp], p, 1)
+ exp.lib_set('ProcessStep', [exp.procstep_as_dict(3, 1, 4, 2)])
exp.set('steps', [exp.stepnode_as_dict(1),
exp.stepnode_as_dict(2, steps=[
exp.stepnode_as_dict(3)])])
- self.check_json_get('/process?id=1', exp)
- p['steps'], p['keep_step'] = [1, 2, 3], [1, 2, 3]
- p['step_3_process_id'] = 3
- # fail post sub-step that would cause recursion
- self.post_exp_process([exp], {'new_top_step': 1}, 2)
- exp.lib_set('ProcessStep', [exp.procstep_as_dict(4, 2, 1)])
- self.check_post(p_min | p | {'new_step_to_2': 2}, '/process?id=1', 400)
+ self.check_json_get(url, exp)
+ # fail posting sub-step that would cause recursion
+ self.post_exp_process([exp], {}, 6)
+ self.post_exp_process([exp], {'new_top_step': 6}, 5)
+ p = p_min | {'kept_steps': [1, 2, 3], 'new_step_to_2': 5, 'step_of': 6}
+ self.check_post(p, url, 400)
def test_GET(self) -> None:
"""Test /process and /processes response codes."""
self.post_exp_process([exp], proc2_post | {'new_top_step': [1]}, 2)
proc3_post = {'title': 'baz', 'description': 'zab', 'effort': 0.9}
self.post_exp_process([exp], proc3_post | {'new_top_step': [1]}, 3)
- proc3_post = proc3_post | {'new_top_step': [2], 'keep_step': [2],
- 'steps': [2], 'step_2_process_id': 1}
+ proc3_post = proc3_post | {'new_top_step': [2], 'kept_steps': [2]}
self.post_exp_process([exp], proc3_post, 3)
exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 2, 1),
exp.procstep_as_dict(2, 3, 1),