From: Christian Heller Date: Mon, 5 Aug 2024 05:33:05 +0000 (+0200) Subject: Simplify POST /process form field structure. X-Git-Url: https://plomlompom.com/repos/%7B%7B%20web_path%20%7D%7D/%7B%7Bdb.prefix%7D%7D/%7B%7Bprefix%7D%7D/add_structured?a=commitdiff_plain;h=f024840834fb695220ee216a5cc7eb03ba982b33;p=taskplom Simplify POST /process form field structure. --- diff --git a/plomtask/http.py b/plomtask/http.py index 475f87f..fa04579 100644 --- a/plomtask/http.py +++ b/plomtask/http.py @@ -711,51 +711,43 @@ class TaskHandler(BaseHTTPRequestHandler): def do_POST_process(self, process: Process) -> str: """Update or insert Process of ?id= and fields defined in postvars.""" # pylint: disable=too-many-locals - # pylint: disable=too-many-statements try: - title = self._form_data.get_str('title') - description = self._form_data.get_str('description') - effort = self._form_data.get_float('effort') + versioned = {'title': self._form_data.get_str('title'), + 'description': self._form_data.get_str('description'), + 'effort': self._form_data.get_float('effort')} except NotFoundException as e: raise BadFormatException from e - conditions = self._form_data.get_all_int('conditions') - blockers = self._form_data.get_all_int('blockers') - enables = self._form_data.get_all_int('enables') - disables = self._form_data.get_all_int('disables') + cond_rels = [self._form_data.get_all_int(s) for s + in ['conditions', 'blockers', 'enables', 'disables']] calendarize = self._form_data.get_bool('calendarize') - suppresses = self._form_data.get_all_int('suppresses') step_of = self._form_data.get_all_str('step_of') - keep_steps = self._form_data.get_all_int('keep_step') - step_ids = self._form_data.get_all_int('steps') + suppresses = self._form_data.get_all_int('suppresses') + step_ids = self._form_data.get_all_int('kept_steps') new_top_steps = self._form_data.get_all_str('new_top_step') - step_process_id_to = {} - step_parent_id_to = {} new_steps_to = {} for step_id in step_ids: name = f'new_step_to_{step_id}' new_steps_to[step_id] = self._form_data.get_all_int(name) - for step_id in keep_steps: - name = f'step_{step_id}_process_id' - step_process_id_to[step_id] = self._form_data.get_int(name) - name = f'step_{step_id}_parent_id' - step_parent_id_to[step_id] = self._form_data.get_int_or_none(name) - process.title.set(title) - process.description.set(description) - process.effort.set(effort) - process.set_condition_relations( - self.conn, conditions, blockers, enables, disables) + for k, v in versioned.items(): + getattr(process, k).set(v) + process.set_condition_relations(self.conn, *cond_rels) process.calendarize = calendarize process.save(self.conn) assert isinstance(process.id_, int) + # set relations to, and if non-existant yet: create, other Processes + # 1. owners (upwards) + owners_to_set = [] + new_owner_title = None + for owner_identifier in step_of: + try: + owners_to_set += [int(owner_identifier)] + except ValueError: + new_owner_title = owner_identifier + process.set_owners(self.conn, owners_to_set) + # 2. owneds (downwards) new_step_title = None - steps: list[ProcessStep] = [] - for step_id in keep_steps: - if step_id not in step_ids: - raise BadFormatException('trying to keep unknown step') - step = ProcessStep(step_id, process.id_, - step_process_id_to[step_id], - step_parent_id_to[step_id]) - steps += [step] + steps: list[ProcessStep] = [ProcessStep.by_id(self.conn, step_id) + for step_id in step_ids] for step_id in step_ids: new = [ProcessStep(None, process.id_, step_process_id, step_id) for step_process_id in new_steps_to[step_id]] @@ -769,14 +761,7 @@ class TaskHandler(BaseHTTPRequestHandler): new_step_title = step_identifier process.set_steps(self.conn, steps) process.set_step_suppressions(self.conn, suppresses) - owners_to_set = [] - new_owner_title = None - for owner_identifier in step_of: - try: - owners_to_set += [int(owner_identifier)] - except ValueError: - new_owner_title = owner_identifier - process.set_owners(self.conn, owners_to_set) + # encode titles for potentially newly created Processes up or down params = f'id={process.id_}' if new_step_title: title_b64_encoded = b64encode(new_step_title.encode()).decode() diff --git a/templates/process.html b/templates/process.html index 7bb503e..a4029dc 100644 --- a/templates/process.html +++ b/templates/process.html @@ -17,14 +17,12 @@ details[open] > summary::after { -{% macro step_with_steps(step_id, step_node, indent) %} +{% macro step_with_steps(step_node, indent) %} - + {% if step_node.is_explicit %} - - - + {% endif %} @@ -60,8 +58,8 @@ details[open] > summary::after { {% endif %} {% if step_node.is_explicit or not step_node.seen %} -{% for substep_id, substep in step_node.steps.items() %} -{{ step_with_steps(substep_id, substep, indent+1) }} +{% for substep in step_node.steps %} +{{ step_with_steps(substep, indent+1) }} {% endfor %} {% endif %} {% endmacro %} @@ -116,8 +114,8 @@ edit process of ID {{process.id_}} {% if steps %} -{% for step_id, step_node in steps.items() %} -{{ step_with_steps(step_id, step_node, 0) }} +{% for step_node in steps %} +{{ step_with_steps(step_node, 0) }} {% endfor %}
{% endif %} diff --git a/tests/processes.py b/tests/processes.py index 45497a6..9affa84 100644 --- a/tests/processes.py +++ b/tests/processes.py @@ -346,72 +346,56 @@ class TestsWithServer(TestCaseWithServer): def test_POST_process_steps(self) -> None: """Test behavior of ProcessStep posting.""" # pylint: disable=too-many-statements + url = '/process?id=1' exp = ExpectedGetProcess(1) self.post_exp_process([exp], {}, 1) - # post first (top-level) step of proc 2 to proc 1 by 'new_top_step' - self.post_exp_process([exp], {}, 2) - self.post_exp_process([exp], {'new_top_step': 2}, 1) + # post first (top-level) step of proc 2 to proc 1 by 'step_of' in 2 + self.post_exp_process([exp], {'step_of': 1}, 2) exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 2)]) exp.set('steps', [exp.stepnode_as_dict(1)]) - self.check_json_get('/process?id=1', exp) + self.check_json_get(url, exp) # post empty/absent steps list to process, expect clean slate, and old # step to completely disappear self.post_exp_process([exp], {}, 1) exp.lib_wipe('ProcessStep') exp.set('steps', []) - self.check_json_get('/process?id=1', exp) - # post new step of proc2 to proc1 by 'keep_step', 'steps', and its deps - post = {'step_1_process_id': 2, 'step_1_parent_id': '', 'steps': [1]} - self.post_exp_process([exp], post | {'keep_step': [1]}, 1) + self.check_json_get(url, exp) + # post new step of proc2 to proc1 by 'new_top_step' + self.post_exp_process([exp], {'new_top_step': 2}, 1) exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 2)]) + self.post_exp_process([exp], {'kept_steps': [1]}, 1) exp.set('steps', [exp.stepnode_as_dict(1)]) - self.check_json_get('/process?id=1', exp) - # fail with invalid or missing 'steps' dependencies - p_min = {'title': '', 'description': '', 'effort': 0} - p = p_min | {'step_1_process_id': 2, 'steps': [1], 'keep_step': [1]} - self.check_post(p | {'step_1_parent_id': 'foo'}, '/process?id=1', 400) - p = p_min | {'step_1_parent_id': '', 'steps': [1], 'keep_step': [1]} - self.check_post(p | {'step_1_process_id': 'foo'}, '/process?id=1', 400) - self.check_post(p, '/process?id=1', 400) - # treat valid steps data without 'keep_step' like posting emptiness - p = {'step_1_process_id': 2, 'step_1_parent_id': '', 'steps': [1]} - self.post_exp_process([exp], post, 1) - exp.lib_wipe('ProcessStep') - exp.set('steps', []) - self.check_json_get('/process?id=1', exp) - # fail when refering in 'keep_step' to step not in 'steps' - p = p_min | p - self.check_post(p | {'keep_step': [2]}, '/process?id=1', 400) - # expect failure independently of what parent/proc_id keys exist - p['step_2_process_id'], p['step_2_parent_id'] = 3, None - self.check_post(p | {'keep_step': [2]}, '/process?id=1', 400) + self.check_json_get(url, exp) # fail on single- and multi-step recursion - self.check_post(p | {'new_top_step': 1}, '/process?id=1', 400) + p_min = {'title': '', 'description': '', 'effort': 0} + self.check_post(p_min | {'new_top_step': 1}, url, 400) + self.check_post(p_min | {'step_of': 1}, url, 400) self.post_exp_process([exp], {'new_top_step': 1}, 2) - self.check_post(p | {'new_top_step': 2}, '/process?id=1', 400) - # post sibling steps + self.check_post(p_min | {'step_of': 2, 'new_top_step': 2}, url, 400) self.post_exp_process([exp], {}, 3) - p = {'step_1_process_id': 3, 'steps': [1], 'keep_step': [1]} - self.post_exp_process([exp], p | {'new_top_step': 3}, 1) - exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 3), - exp.procstep_as_dict(2, 1, 3)]) + self.post_exp_process([exp], {'step_of': 3}, 4) + self.check_post(p_min | {'new_top_step': 3, 'step_of': 4}, url, 400) + # post sibling steps + self.post_exp_process([exp], {}, 4) + self.post_exp_process([exp], {'new_top_step': 4}, 1) + self.post_exp_process([exp], {'kept_steps': [1], 'new_top_step': 4}, 1) + exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 4), + exp.procstep_as_dict(2, 1, 4)]) exp.set('steps', [exp.stepnode_as_dict(1), exp.stepnode_as_dict(2)]) - self.check_json_get('/process?id=1', exp) + self.check_json_get(url, exp) # post sub-step chain - p = {'step_1_process_id': 3, 'step_2_process_id': 3, - 'steps': [1, 2], 'keep_step': [1, 2]} - self.post_exp_process([exp], p | {'new_step_to_2': 3}, 1) - exp.lib_set('ProcessStep', [exp.procstep_as_dict(3, 1, 3, 2)]) + p = {'kept_steps': [1, 2], 'new_step_to_2': 4} + self.post_exp_process([exp], p, 1) + exp.lib_set('ProcessStep', [exp.procstep_as_dict(3, 1, 4, 2)]) exp.set('steps', [exp.stepnode_as_dict(1), exp.stepnode_as_dict(2, steps=[ exp.stepnode_as_dict(3)])]) - self.check_json_get('/process?id=1', exp) - p['steps'], p['keep_step'] = [1, 2, 3], [1, 2, 3] - p['step_3_process_id'] = 3 - # fail post sub-step that would cause recursion - self.post_exp_process([exp], {'new_top_step': 1}, 2) - exp.lib_set('ProcessStep', [exp.procstep_as_dict(4, 2, 1)]) - self.check_post(p_min | p | {'new_step_to_2': 2}, '/process?id=1', 400) + self.check_json_get(url, exp) + # fail posting sub-step that would cause recursion + self.post_exp_process([exp], {}, 6) + self.post_exp_process([exp], {'new_top_step': 6}, 5) + p = p_min | {'kept_steps': [1, 2, 3], 'new_step_to_2': 5, 'step_of': 6} + self.check_post(p, url, 400) def test_GET(self) -> None: """Test /process and /processes response codes.""" @@ -455,8 +439,7 @@ class TestsWithServer(TestCaseWithServer): self.post_exp_process([exp], proc2_post | {'new_top_step': [1]}, 2) proc3_post = {'title': 'baz', 'description': 'zab', 'effort': 0.9} self.post_exp_process([exp], proc3_post | {'new_top_step': [1]}, 3) - proc3_post = proc3_post | {'new_top_step': [2], 'keep_step': [2], - 'steps': [2], 'step_2_process_id': 1} + proc3_post = proc3_post | {'new_top_step': [2], 'kept_steps': [2]} self.post_exp_process([exp], proc3_post, 3) exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 2, 1), exp.procstep_as_dict(2, 3, 1), diff --git a/tests/todos.py b/tests/todos.py index 41adca9..752e289 100644 --- a/tests/todos.py +++ b/tests/todos.py @@ -523,8 +523,7 @@ class TestsWithServer(TestCaseWithServer): exp.set('steps_todo_to_process', [step1_proc2]) self.check_json_get('/todo?id=1', exp) # test display of parallel chains - proc_steps_post = {'new_top_step': 4, 'keep_step': [1], - 'step_1_process_id': 2, 'steps': [1, 4]} + proc_steps_post = {'new_top_step': 4, 'kept_steps': [1, 3]} self.post_exp_process([], proc_steps_post, 1) step4_proc4 = exp.step_as_dict(4, [], 4, fillable=True) exp.lib_set('ProcessStep', [exp.procstep_as_dict(4, 1, 4, None)]) diff --git a/tests/utils.py b/tests/utils.py index 3d86163..df3feea 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -826,7 +826,7 @@ class Expected: proc = self.proc_as_dict(id_, d['title'], d['description'], d['effort']) ignore = {'title', 'description', 'effort', 'new_top_step', 'step_of', - 'keep_step', 'steps'} + 'kept_steps'} for k, v in d.items(): if k in ignore\ or k.startswith('step_') or k.startswith('new_step_to'):