From 051c62713adb93a233713589222efde640fd994d Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Thu, 8 Aug 2024 10:13:42 +0200 Subject: [PATCH 01/16] Minor tests refactoring. --- tests/conditions.py | 26 ++++++++------------------ tests/processes.py | 42 +++++++++++------------------------------- tests/utils.py | 8 ++++++++ 3 files changed, 27 insertions(+), 49 deletions(-) diff --git a/tests/conditions.py b/tests/conditions.py index 3edafd6..6feda94 100644 --- a/tests/conditions.py +++ b/tests/conditions.py @@ -138,33 +138,23 @@ class TestsWithServer(TestCaseWithServer): exp.set('sort_by', 'title') # for clarity (already default) self.check_json_get('/conditions?sort_by=foo&pattern=bar&foo=x', exp) # test non-empty result, automatic (positive) sorting by title + exp.set('pattern', '') post_cond1 = {'is_active': 0, 'title': 'foo', 'description': 'oof'} post_cond2 = {'is_active': 0, 'title': 'bar', 'description': 'rab'} post_cond3 = {'is_active': 1, 'title': 'baz', 'description': 'zab'} for i, post in enumerate([post_cond1, post_cond2, post_cond3]): self.post_exp_cond([exp], i+1, post, '', f'?id={i+1}') - exp.set('pattern', '') - exp.force('conditions', [2, 3, 1]) - self.check_json_get('/conditions', exp) + self.check_filter(exp, 'conditions', 'sort_by', 'title', [2, 3, 1]) # test other sortings - exp.set('sort_by', '-title') - exp.force('conditions', [1, 3, 2]) - self.check_json_get('/conditions?sort_by=-title', exp) - exp.set('sort_by', 'is_active') - exp.force('conditions', [1, 2, 3]) - self.check_json_get('/conditions?sort_by=is_active', exp) - exp.set('sort_by', '-is_active') - exp.force('conditions', [3, 2, 1]) - self.check_json_get('/conditions?sort_by=-is_active', exp) + self.check_filter(exp, 'conditions', 'sort_by', '-title', [1, 3, 2]) + self.check_filter(exp, 'conditions', 'sort_by', 'is_active', [1, 2, 3]) + self.check_filter(exp, 'conditions', 'sort_by', '-is_active', + [3, 2, 1]) # test pattern matching on title exp.set('sort_by', 'title') - exp.set('pattern', 'ba') - exp.force('conditions', [2, 3]) exp.lib_del('Condition', 1) - self.check_json_get('/conditions?pattern=ba', exp) + self.check_filter(exp, 'conditions', 'pattern', 'ba', [2, 3]) # test pattern matching on description - exp.set('pattern', 'of') exp.lib_wipe('Condition') exp.set_cond_from_post(1, post_cond1) - exp.force('conditions', [1]) - self.check_json_get('/conditions?pattern=of', exp) + self.check_filter(exp, 'conditions', 'pattern', 'of', [1]) diff --git a/tests/processes.py b/tests/processes.py index 649aee5..422c283 100644 --- a/tests/processes.py +++ b/tests/processes.py @@ -434,7 +434,7 @@ class TestsWithServer(TestCaseWithServer): # test on meaningless non-empty params (incl. entirely un-used key), # that 'sort_by' default to 'title' (even if set to something else, as # long as without handler) and 'pattern' get preserved - exp.set('pattern', 'bar') # preserved despite zero effect! + exp.set('pattern', 'bar') url = '/processes?sort_by=foo&pattern=bar&foo=x' self.check_json_get(url, exp) # test non-empty result, automatic (positive) sorting by title @@ -449,42 +449,22 @@ class TestsWithServer(TestCaseWithServer): exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 2, 1), exp.procstep_as_dict(2, 3, 1), exp.procstep_as_dict(3, 3, 2)]) - exp.lib_get('Process', '') exp.set('pattern', '') - exp.force('processes', [2, 3, 1]) - self.check_json_get('/processes', exp) + self.check_filter(exp, 'processes', 'sort_by', 'title', [2, 3, 1]) # test other sortings - exp.set('sort_by', '-title') - exp.force('processes', [1, 3, 2]) - self.check_json_get('/processes?sort_by=-title', exp) - exp.set('sort_by', 'effort') - exp.force('processes', [3, 1, 2]) - self.check_json_get('/processes?sort_by=effort', exp) - exp.set('sort_by', '-effort') - exp.force('processes', [2, 1, 3]) - self.check_json_get('/processes?sort_by=-effort', exp) - exp.set('sort_by', 'steps') - exp.force('processes', [1, 2, 3]) - self.check_json_get('/processes?sort_by=steps', exp) - exp.set('sort_by', '-steps') - exp.force('processes', [3, 2, 1]) - self.check_json_get('/processes?sort_by=-steps', exp) - exp.set('sort_by', 'owners') - exp.force('processes', [3, 2, 1]) - self.check_json_get('/processes?sort_by=owners', exp) - exp.set('sort_by', '-owners') - exp.force('processes', [1, 2, 3]) - self.check_json_get('/processes?sort_by=-owners', exp) + self.check_filter(exp, 'processes', 'sort_by', '-title', [1, 3, 2]) + self.check_filter(exp, 'processes', 'sort_by', 'effort', [3, 1, 2]) + self.check_filter(exp, 'processes', 'sort_by', '-effort', [2, 1, 3]) + self.check_filter(exp, 'processes', 'sort_by', 'steps', [1, 2, 3]) + self.check_filter(exp, 'processes', 'sort_by', '-steps', [3, 2, 1]) + self.check_filter(exp, 'processes', 'sort_by', 'owners', [3, 2, 1]) + self.check_filter(exp, 'processes', 'sort_by', '-owners', [1, 2, 3]) # test pattern matching on title - exp.set('pattern', 'ba') exp.set('sort_by', 'title') exp.lib_del('Process', '1') - exp.force('processes', [2, 3]) - self.check_json_get('/processes?pattern=ba', exp) + self.check_filter(exp, 'processes', 'pattern', 'ba', [2, 3]) # test pattern matching on description - exp.set('pattern', 'of') exp.lib_wipe('Process') exp.lib_wipe('ProcessStep') self.post_exp_process([exp], {'description': 'oof', 'effort': 1.0}, 1) - exp.force('processes', [1]) - self.check_json_get('/processes?pattern=of', exp) + self.check_filter(exp, 'processes', 'pattern', 'of', [1]) diff --git a/tests/utils.py b/tests/utils.py index 1a306d1..e04fba6 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -911,6 +911,14 @@ class TestCaseWithServer(TestCaseWithDB): exp.set_proc_from_post(id_, payload) return payload + def check_filter(self, exp: Expected, category: str, key: str, + val: str, list_ids: list[int]) -> None: + """Check GET /{category}?{key}={val} sorts to list_ids.""" + # pylint: disable=too-many-arguments + exp.set(key, val) + exp.force(category, list_ids) + self.check_json_get(f'/{category}?{key}={val}', exp) + def check_redirect(self, target: str) -> None: """Check that self.conn answers with a 302 redirect to target.""" response = self.conn.getresponse() -- 2.30.2 From 1af7e2710a2af8c8e03e5e679921399825a4407a Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Fri, 9 Aug 2024 16:14:55 +0200 Subject: [PATCH 02/16] Private TaskHandler.conn to ._conn. --- plomtask/http.py | 114 +++++++++++++++++++++++------------------------ 1 file changed, 57 insertions(+), 57 deletions(-) diff --git a/plomtask/http.py b/plomtask/http.py index c7897e8..ef9438a 100644 --- a/plomtask/http.py +++ b/plomtask/http.py @@ -123,7 +123,7 @@ class TaskHandler(BaseHTTPRequestHandler): """Handles single HTTP request.""" # pylint: disable=too-many-public-methods server: TaskServer - conn: DatabaseConnection + _conn: DatabaseConnection _site: str _form: InputsParser _params: InputsParser @@ -240,7 +240,7 @@ class TaskHandler(BaseHTTPRequestHandler): # (because pylint here fails to detect the use of wrapper as a # method to self with respective access privileges) try: - self.conn = DatabaseConnection(self.server.db) + self._conn = DatabaseConnection(self.server.db) parsed_url = urlparse(self.path) self._site = path_split(parsed_url.path)[1] params = parse_qs(parsed_url.query, @@ -266,7 +266,7 @@ class TaskHandler(BaseHTTPRequestHandler): ctx = {'msg': error} self._send_page(ctx, 'msg', error.http_code) finally: - self.conn.close() + self._conn.close() return wrapper return decorator @@ -289,7 +289,7 @@ class TaskHandler(BaseHTTPRequestHandler): keep_blank_values=True) self._form = InputsParser(postvars) redir_target = handler() - self.conn.commit() + self._conn.commit() return redir_target # GET handlers @@ -306,9 +306,9 @@ class TaskHandler(BaseHTTPRequestHandler): # method to self with respective access privileges) id_ = self._params.get_int_or_none('id') if target_class.can_create_by_id: - item = target_class.by_id_or_create(self.conn, id_) + item = target_class.by_id_or_create(self._conn, id_) else: - item = target_class.by_id(self.conn, id_) + item = target_class.by_id(self._conn, id_) return f(self, item) return wrapper return decorator @@ -327,7 +327,7 @@ class TaskHandler(BaseHTTPRequestHandler): start = self._params.get_str_or_fail('start', '') end = self._params.get_str_or_fail('end', '') end = end if end != '' else date_in_n_days(366) - days, start, end = Day.by_date_range_with_limits(self.conn, + days, start, end = Day.by_date_range_with_limits(self._conn, (start, end), 'id') days = Day.with_filled_gaps(days, start, end) today = date_in_n_days(0) @@ -344,7 +344,7 @@ class TaskHandler(BaseHTTPRequestHandler): def do_GET_day(self) -> dict[str, object]: """Show single Day of ?date=.""" date = self._params.get_str_or_fail('date', date_in_n_days(0)) - day = Day.by_id_or_create(self.conn, date) + day = Day.by_id_or_create(self._conn, date) make_type = self._params.get_str_or_fail('make_type', '') conditions_present = [] enablers_for = {} @@ -354,10 +354,10 @@ class TaskHandler(BaseHTTPRequestHandler): if condition not in conditions_present: conditions_present += [condition] enablers_for[condition.id_] = [p for p in - Process.all(self.conn) + Process.all(self._conn) if condition in p.enables] disablers_for[condition.id_] = [p for p in - Process.all(self.conn) + Process.all(self._conn) if condition in p.disables] seen_todos: set[int] = set() top_nodes = [t.get_step_tree(seen_todos) @@ -368,7 +368,7 @@ class TaskHandler(BaseHTTPRequestHandler): 'enablers_for': enablers_for, 'disablers_for': disablers_for, 'conditions_present': conditions_present, - 'processes': Process.all(self.conn)} + 'processes': Process.all(self._conn)} @_get_item(Todo) def do_GET_todo(self, todo: Todo) -> dict[str, object]: @@ -379,7 +379,7 @@ class TaskHandler(BaseHTTPRequestHandler): steps_nodes: list[TodoOrProcStepNode]) -> int: for process_step_node in process_step_nodes: node_id += 1 - proc = Process.by_id(self.conn, + proc = Process.by_id(self._conn, process_step_node.step.step_process_id) node = TodoOrProcStepNode(node_id, None, proc, []) steps_nodes += [node] @@ -420,7 +420,7 @@ class TaskHandler(BaseHTTPRequestHandler): return ids todo_steps = [step.todo for step in todo.get_step_tree(set()).children] - process_tree = todo.process.get_steps(self.conn, None) + process_tree = todo.process.get_steps(self._conn, None) steps_todo_to_process: list[TodoOrProcStepNode] = [] last_node_id = walk_process_steps(0, process_tree, steps_todo_to_process) @@ -428,8 +428,8 @@ class TaskHandler(BaseHTTPRequestHandler): steps_node.fillable = True walk_todo_steps(last_node_id, todo_steps, steps_todo_to_process) adoptables: dict[int, list[Todo]] = {} - any_adoptables = [Todo.by_id(self.conn, t.id_) - for t in Todo.by_date(self.conn, todo.date) + any_adoptables = [Todo.by_id(self._conn, t.id_) + for t in Todo.by_date(self._conn, todo.date) if t.id_ is not None and t != todo] for id_ in collect_adoptables_keys(steps_todo_to_process): @@ -438,9 +438,9 @@ class TaskHandler(BaseHTTPRequestHandler): return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process, 'adoption_candidates_for': adoptables, - 'process_candidates': sorted(Process.all(self.conn)), + 'process_candidates': sorted(Process.all(self._conn)), 'todo_candidates': any_adoptables, - 'condition_candidates': Condition.all(self.conn)} + 'condition_candidates': Condition.all(self._conn)} def do_GET_todos(self) -> dict[str, object]: """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern""" @@ -450,7 +450,7 @@ class TaskHandler(BaseHTTPRequestHandler): process_id = self._params.get_int_or_none('process_id') comment_pattern = self._params.get_str_or_fail('comment_pattern', '') todos = [] - ret = Todo.by_date_range_with_limits(self.conn, (start, end)) + ret = Todo.by_date_range_with_limits(self._conn, (start, end)) todos_by_date_range, start, end = ret todos = [t for t in todos_by_date_range if comment_pattern in t.comment @@ -458,13 +458,13 @@ class TaskHandler(BaseHTTPRequestHandler): sort_by = Todo.sort_by(todos, sort_by) return {'start': start, 'end': end, 'process_id': process_id, 'comment_pattern': comment_pattern, 'todos': todos, - 'all_processes': Process.all(self.conn), 'sort_by': sort_by} + 'all_processes': Process.all(self._conn), 'sort_by': sort_by} def do_GET_conditions(self) -> dict[str, object]: """Show all Conditions.""" pattern = self._params.get_str_or_fail('pattern', '') sort_by = self._params.get_str_or_fail('sort_by', '') - conditions = Condition.matching(self.conn, pattern) + conditions = Condition.matching(self._conn, pattern) sort_by = Condition.sort_by(conditions, sort_by) return {'conditions': conditions, 'sort_by': sort_by, @@ -473,7 +473,7 @@ class TaskHandler(BaseHTTPRequestHandler): @_get_item(Condition) def do_GET_condition(self, c: Condition) -> dict[str, object]: """Show Condition of ?id=.""" - ps = Process.all(self.conn) + ps = Process.all(self._conn) return {'condition': c, 'is_new': c.id_ is None, 'enabled_processes': [p for p in ps if c in p.conditions], 'disabled_processes': [p for p in ps if c in p.blockers], @@ -504,19 +504,19 @@ class TaskHandler(BaseHTTPRequestHandler): raise BadFormatException(msg) from exc process.title.set(title) preset_top_step = None - owners = process.used_as_step_by(self.conn) + owners = process.used_as_step_by(self._conn) for step_id in owner_ids: - owners += [Process.by_id(self.conn, step_id)] + owners += [Process.by_id(self._conn, step_id)] for process_id in owned_ids: - Process.by_id(self.conn, process_id) # to ensure ID exists + Process.by_id(self._conn, process_id) # to ensure ID exists preset_top_step = process_id return {'process': process, 'is_new': process.id_ is None, 'preset_top_step': preset_top_step, - 'steps': process.get_steps(self.conn), + 'steps': process.get_steps(self._conn), 'owners': owners, - 'n_todos': len(Todo.by_process_id(self.conn, process.id_)), - 'process_candidates': Process.all(self.conn), - 'condition_candidates': Condition.all(self.conn)} + 'n_todos': len(Todo.by_process_id(self._conn, process.id_)), + 'process_candidates': Process.all(self._conn), + 'condition_candidates': Condition.all(self._conn)} @_get_item(Process) def do_GET_process_titles(self, p: Process) -> dict[str, object]: @@ -537,7 +537,7 @@ class TaskHandler(BaseHTTPRequestHandler): """Show all Processes.""" pattern = self._params.get_str_or_fail('pattern', '') sort_by = self._params.get_str_or_fail('sort_by', '') - processes = Process.matching(self.conn, pattern) + processes = Process.matching(self._conn, pattern) sort_by = Process.sort_by(processes, sort_by) return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern} @@ -558,13 +558,13 @@ class TaskHandler(BaseHTTPRequestHandler): msg = 'trying to delete non-saved ' +\ f'{target_class.__name__}' raise NotFoundException(msg) - item = target_class.by_id(self.conn, id_) - item.remove(self.conn) + item = target_class.by_id(self._conn, id_) + item.remove(self._conn) return redir_target if target_class.can_create_by_id: - item = target_class.by_id_or_create(self.conn, id_) + item = target_class.by_id_or_create(self._conn, id_) else: - item = target_class.by_id(self.conn, id_) + item = target_class.by_id(self._conn, id_) return f(self, item) return wrapper return decorator @@ -572,13 +572,13 @@ class TaskHandler(BaseHTTPRequestHandler): def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str: """Update history timestamps for VersionedAttribute.""" id_ = self._params.get_int_or_none('id') - item = cls.by_id(self.conn, id_) + item = cls.by_id(self._conn, id_) attr = getattr(item, attr_name) for k, v in self._form.get_firsts_of_key_prefixed('at:').items(): old = k[3:] if old[19:] != v: attr.reset_timestamp(old, f'{v}.0') - attr.save(self.conn) + attr.save(self._conn) return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}' def do_POST_day(self) -> str: @@ -600,24 +600,24 @@ class TaskHandler(BaseHTTPRequestHandler): msg = 'not equal number each of number of todo_id, comments, ' +\ 'and efforts inputs' raise BadFormatException(msg) - day = Day.by_id_or_create(self.conn, date) + day = Day.by_id_or_create(self._conn, date) day.comment = day_comment - day.save(self.conn) + day.save(self._conn) new_todos = [] for process_id in sorted(new_todos_by_process): - process = Process.by_id(self.conn, process_id) + process = Process.by_id(self._conn, process_id) todo = Todo(None, process, False, date) - todo.save(self.conn) + todo.save(self._conn) new_todos += [todo] if 'full' == make_type: for todo in new_todos: - todo.ensure_children(self.conn) + todo.ensure_children(self._conn) for i, todo_id in enumerate(old_todos): - todo = Todo.by_id(self.conn, todo_id) + todo = Todo.by_id(self._conn, todo_id) todo.is_done = is_done[i] todo.comment = comments[i] todo.effort = efforts[i] - todo.save(self.conn) + todo.save(self._conn) return f'/day?date={date}&make_type={make_type}' @_delete_or_post(Todo, '/') @@ -647,7 +647,7 @@ class TaskHandler(BaseHTTPRequestHandler): except ValueError as e: msg = 'cannot float form field value for key: effort' raise BadFormatException(msg) from e - todo.set_condition_relations(self.conn, *cond_rels) + todo.set_condition_relations(self._conn, *cond_rels) for filler in [f for f in step_fillers if f != 'ignore']: target_id: int to_int = filler @@ -670,23 +670,23 @@ class TaskHandler(BaseHTTPRequestHandler): if child.id_ and (child.id_ not in adopted_child_ids): to_remove += [child.id_] for id_ in to_remove: - child = Todo.by_id(self.conn, id_) + child = Todo.by_id(self._conn, id_) todo.remove_child(child) for child_id in adopted_child_ids: if child_id not in [c.id_ for c in todo.children]: - todo.add_child(Todo.by_id(self.conn, child_id)) + todo.add_child(Todo.by_id(self._conn, child_id)) todo.update_attrs(**to_update) for approach, proc_ids in to_make.items(): for process_id in proc_ids: - process = Process.by_id(self.conn, process_id) + process = Process.by_id(self._conn, process_id) made = Todo(None, process, False, todo.date) - made.save(self.conn) + made.save(self._conn) if 'full' == approach: - made.ensure_children(self.conn) + made.ensure_children(self._conn) todo.add_child(made) # todo.save() may destroy Todo if .effort < 0, so retrieve .id_ early url = f'/todo?id={todo.id_}' - todo.save(self.conn) + todo.save(self._conn) return url def do_POST_process_descriptions(self) -> str: @@ -721,10 +721,10 @@ class TaskHandler(BaseHTTPRequestHandler): new_steps_to[step_id] = self._form.get_all_int(name) for k, v in versioned.items(): getattr(process, k).set(v) - process.set_condition_relations(self.conn, *cond_rels) + process.set_condition_relations(self._conn, *cond_rels) if calendarize is not None: process.calendarize = calendarize - process.save(self.conn) + process.save(self._conn) assert isinstance(process.id_, int) # set relations to, and if non-existant yet: create, other Processes # pylint: disable=fixme @@ -740,10 +740,10 @@ class TaskHandler(BaseHTTPRequestHandler): owners_to_set += [int(owner_identifier)] except ValueError: new_owner_title = owner_identifier - process.set_owners(self.conn, owners_to_set) + process.set_owners(self._conn, owners_to_set) # 2. owneds (downwards) new_step_title = None - steps: list[ProcessStep] = [ProcessStep.by_id(self.conn, step_id) + steps: list[ProcessStep] = [ProcessStep.by_id(self._conn, step_id) for step_id in kept_steps] for step_id in kept_steps: new_sub_steps = [ @@ -757,8 +757,8 @@ class TaskHandler(BaseHTTPRequestHandler): steps += [step] except ValueError: new_step_title = step_id_or_new_title - process.set_steps(self.conn, steps) - process.set_step_suppressions(self.conn, suppresses) + process.set_steps(self._conn, steps) + process.set_step_suppressions(self._conn, suppresses) # encode titles for potentially newly created Processes up or down params = f'id={process.id_}' if new_step_title: @@ -767,7 +767,7 @@ class TaskHandler(BaseHTTPRequestHandler): elif new_owner_title: title_b64_encoded = b64encode(new_owner_title.encode()).decode() params = f'has_step={process.id_}&title_b64={title_b64_encoded}' - process.save(self.conn) + process.save(self._conn) return f'/process?{params}' def do_POST_condition_descriptions(self) -> str: @@ -788,5 +788,5 @@ class TaskHandler(BaseHTTPRequestHandler): condition.is_active = is_active condition.title.set(title) condition.description.set(description) - condition.save(self.conn) + condition.save(self._conn) return f'/condition?id={condition.id_}' -- 2.30.2 From fa927e56c36522f1148635c28b1133fd370cff80 Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Fri, 9 Aug 2024 16:51:35 +0200 Subject: [PATCH 03/16] Minor improvements to query parameter handling/defaulting. --- plomtask/http.py | 18 ++++++++++++++---- tests/days.py | 3 ++- tests/utils.py | 10 +++++----- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/plomtask/http.py b/plomtask/http.py index ef9438a..c2e1f5d 100644 --- a/plomtask/http.py +++ b/plomtask/http.py @@ -327,6 +327,7 @@ class TaskHandler(BaseHTTPRequestHandler): start = self._params.get_str_or_fail('start', '') end = self._params.get_str_or_fail('end', '') end = end if end != '' else date_in_n_days(366) + # days, start, end = Day.by_date_range_with_limits(self._conn, (start, end), 'id') days = Day.with_filled_gaps(days, start, end) @@ -344,8 +345,9 @@ class TaskHandler(BaseHTTPRequestHandler): def do_GET_day(self) -> dict[str, object]: """Show single Day of ?date=.""" date = self._params.get_str_or_fail('date', date_in_n_days(0)) + make_type = self._params.get_str_or_fail('make_type', 'full') + # day = Day.by_id_or_create(self._conn, date) - make_type = self._params.get_str_or_fail('make_type', '') conditions_present = [] enablers_for = {} disablers_for = {} @@ -444,11 +446,12 @@ class TaskHandler(BaseHTTPRequestHandler): def do_GET_todos(self) -> dict[str, object]: """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern""" - sort_by = self._params.get_str_or_fail('sort_by', '') + sort_by = self._params.get_str_or_fail('sort_by', 'title') start = self._params.get_str_or_fail('start', '') end = self._params.get_str_or_fail('end', '') process_id = self._params.get_int_or_none('process_id') comment_pattern = self._params.get_str_or_fail('comment_pattern', '') + # todos = [] ret = Todo.by_date_range_with_limits(self._conn, (start, end)) todos_by_date_range, start, end = ret @@ -463,7 +466,8 @@ class TaskHandler(BaseHTTPRequestHandler): def do_GET_conditions(self) -> dict[str, object]: """Show all Conditions.""" pattern = self._params.get_str_or_fail('pattern', '') - sort_by = self._params.get_str_or_fail('sort_by', '') + sort_by = self._params.get_str_or_fail('sort_by', 'title') + # conditions = Condition.matching(self._conn, pattern) sort_by = Condition.sort_by(conditions, sort_by) return {'conditions': conditions, @@ -496,6 +500,7 @@ class TaskHandler(BaseHTTPRequestHandler): owner_ids = self._params.get_all_int('step_to') owned_ids = self._params.get_all_int('has_step') title_64 = self._params.get_str('title_b64') + # if title_64: try: title = b64decode(title_64.encode()).decode() @@ -536,7 +541,8 @@ class TaskHandler(BaseHTTPRequestHandler): def do_GET_processes(self) -> dict[str, object]: """Show all Processes.""" pattern = self._params.get_str_or_fail('pattern', '') - sort_by = self._params.get_str_or_fail('sort_by', '') + sort_by = self._params.get_str_or_fail('sort_by', 'title') + # processes = Process.matching(self._conn, pattern) sort_by = Process.sort_by(processes, sort_by) return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern} @@ -592,6 +598,7 @@ class TaskHandler(BaseHTTPRequestHandler): comments = self._form.get_all_str('comment') efforts = self._form.get_all_floats_or_nones('effort') done_todos = self._form.get_all_int('done') + # for _ in [id_ for id_ in done_todos if id_ not in old_todos]: raise BadFormatException('"done" field refers to unknown Todo') is_done = [t_id in done_todos for t_id in old_todos] @@ -638,6 +645,7 @@ class TaskHandler(BaseHTTPRequestHandler): cond_rels = [self._form.get_all_int(name) for name in ['conditions', 'blockers', 'enables', 'disables']] effort_or_not = self._form.get_str('effort') + # if effort_or_not is not None: if effort_or_not == '': to_update['effort'] = None @@ -719,6 +727,7 @@ class TaskHandler(BaseHTTPRequestHandler): for step_id in kept_steps: name = f'new_step_to_{step_id}' new_steps_to[step_id] = self._form.get_all_int(name) + # for k, v in versioned.items(): getattr(process, k).set(v) process.set_condition_relations(self._conn, *cond_rels) @@ -784,6 +793,7 @@ class TaskHandler(BaseHTTPRequestHandler): title = self._form.get_str_or_fail('title') description = self._form.get_str_or_fail('description') is_active = self._form.get_bool_or_none('is_active') + # if is_active is not None: condition.is_active = is_active condition.title.set(title) diff --git a/tests/days.py b/tests/days.py index 0c6ee72..aac150b 100644 --- a/tests/days.py +++ b/tests/days.py @@ -123,7 +123,7 @@ class ExpectedGetCalendar(Expected): class ExpectedGetDay(Expected): """Builder of expectations for GET /day.""" - _default_dict = {'make_type': ''} + _default_dict = {'make_type': 'full'} _on_empty_make_temp = ('Day', 'day_as_dict') def __init__(self, date: str, *args: Any, **kwargs: Any) -> None: @@ -163,6 +163,7 @@ class TestsWithServer(TestCaseWithServer): def test_basic_GET_day(self) -> None: """Test basic (no Processes/Conditions/Todos) GET /day basics.""" # check illegal date parameters + self.check_get('/day?date=', 400) self.check_get('/day?date=foo', 400) self.check_get('/day?date=2024-02-30', 400) # check undefined day diff --git a/tests/utils.py b/tests/utils.py index e04fba6..71da9fb 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1015,9 +1015,9 @@ class TestCaseWithServer(TestCaseWithDB): try: self.assertEqual(cmp, retrieved) except AssertionError as e: - print('EXPECTED:') - pprint(cmp) - print('RETRIEVED:') - pprint(retrieved) - walk_diffs('', cmp, retrieved) + # print('EXPECTED:') + # pprint(cmp) + # print('RETRIEVED:') + # pprint(retrieved) + # walk_diffs('', cmp, retrieved) raise e -- 2.30.2 From 5a1ad2652cd2984a73eb9c1dc3f1c23f361a2365 Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Fri, 9 Aug 2024 18:15:04 +0200 Subject: [PATCH 04/16] Clean up do_POST_process code. --- plomtask/http.py | 65 ++++++++++++++++++----------------------- plomtask/processes.py | 67 +++++++++++++++++++++++++++---------------- 2 files changed, 70 insertions(+), 62 deletions(-) diff --git a/plomtask/http.py b/plomtask/http.py index c2e1f5d..51b35cb 100644 --- a/plomtask/http.py +++ b/plomtask/http.py @@ -645,7 +645,6 @@ class TaskHandler(BaseHTTPRequestHandler): cond_rels = [self._form.get_all_int(name) for name in ['conditions', 'blockers', 'enables', 'disables']] effort_or_not = self._form.get_str('effort') - # if effort_or_not is not None: if effort_or_not == '': to_update['effort'] = None @@ -655,7 +654,6 @@ class TaskHandler(BaseHTTPRequestHandler): except ValueError as e: msg = 'cannot float form field value for key: effort' raise BadFormatException(msg) from e - todo.set_condition_relations(self._conn, *cond_rels) for filler in [f for f in step_fillers if f != 'ignore']: target_id: int to_int = filler @@ -673,6 +671,8 @@ class TaskHandler(BaseHTTPRequestHandler): to_make['full'] += [target_id] else: adopted_child_ids += [target_id] + # + todo.set_condition_relations(self._conn, *cond_rels) to_remove = [] for child in todo.children: if child.id_ and (child.id_ not in adopted_child_ids): @@ -713,6 +713,16 @@ class TaskHandler(BaseHTTPRequestHandler): def do_POST_process(self, process: Process) -> str: """Update or insert Process of ?id= and fields defined in postvars.""" # pylint: disable=too-many-locals + + def id_or_title(l_id_or_title: list[str]) -> tuple[str, list[int]]: + l_ids, title = [], '' + for id_or_title in l_id_or_title: + try: + l_ids += [int(id_or_title)] + except ValueError: + title = id_or_title + return title, l_ids + versioned = {'title': self._form.get_str_or_fail('title'), 'description': self._form.get_str_or_fail('description'), 'effort': self._form.get_float_or_fail('effort')} @@ -720,55 +730,36 @@ class TaskHandler(BaseHTTPRequestHandler): in ['conditions', 'blockers', 'enables', 'disables']] calendarize = self._form.get_bool_or_none('calendarize') step_of = self._form.get_all_str('step_of') - suppresses = self._form.get_all_int('suppresses') + suppressions = self._form.get_all_int('suppresses') kept_steps = self._form.get_all_int('kept_steps') - new_top_steps = self._form.get_all_str('new_top_step') + new_top_step_procs = self._form.get_all_str('new_top_step') new_steps_to = {} for step_id in kept_steps: name = f'new_step_to_{step_id}' new_steps_to[step_id] = self._form.get_all_int(name) + new_owner_title, owners_to_set = id_or_title(step_of) + new_step_title, new_top_step_proc_ids = id_or_title(new_top_step_procs) # for k, v in versioned.items(): getattr(process, k).set(v) - process.set_condition_relations(self._conn, *cond_rels) if calendarize is not None: process.calendarize = calendarize process.save(self._conn) assert isinstance(process.id_, int) - # set relations to, and if non-existant yet: create, other Processes - # pylint: disable=fixme - # TODO: in what order to set owners, owneds, and possibly step - # suppressions can make the difference between recursion checks - # failing; should probably be handled class-internally to Process - # rather than here! - # 1. owners (upwards) - owners_to_set = [] - new_owner_title = None - for owner_identifier in step_of: - try: - owners_to_set += [int(owner_identifier)] - except ValueError: - new_owner_title = owner_identifier - process.set_owners(self._conn, owners_to_set) - # 2. owneds (downwards) - new_step_title = None - steps: list[ProcessStep] = [ProcessStep.by_id(self._conn, step_id) - for step_id in kept_steps] - for step_id in kept_steps: - new_sub_steps = [ + # set relations to Conditions and ProcessSteps / other Processes + process.set_condition_relations(self._conn, *cond_rels) + owned_steps = [] + for step_id in kept_steps: # collecting sub-steps + owned_steps += [ProcessStep.by_id(self._conn, step_id)] + owned_steps += [ # new sub-steps ProcessStep(None, process.id_, step_process_id, step_id) for step_process_id in new_steps_to[step_id]] - steps += new_sub_steps - for step_id_or_new_title in new_top_steps: - try: - step_process_id = int(step_id_or_new_title) - step = ProcessStep(None, process.id_, step_process_id, None) - steps += [step] - except ValueError: - new_step_title = step_id_or_new_title - process.set_steps(self._conn, steps) - process.set_step_suppressions(self._conn, suppresses) - # encode titles for potentially newly created Processes up or down + for step_process_id in new_top_step_proc_ids: + owned_steps += [ProcessStep(None, process.id_, step_process_id, + None)] + process.set_step_relations(self._conn, owners_to_set, suppressions, + owned_steps) + # encode titles for potential newly-to-create Processes up or down params = f'id={process.id_}' if new_step_title: title_b64_encoded = b64encode(new_step_title.encode()).decode() diff --git a/plomtask/processes.py b/plomtask/processes.py index 56dd282..23eb624 100644 --- a/plomtask/processes.py +++ b/plomtask/processes.py @@ -129,16 +129,54 @@ class Process(BaseModel[int], ConditionsRelations): walk_steps(step_node) return step_nodes - def set_step_suppressions(self, db_conn: DatabaseConnection, - step_ids: list[int]) -> None: + def set_step_relations(self, + db_conn: DatabaseConnection, + owners: list[int], + suppressions: list[int], + owned_steps: list[ProcessStep] + ) -> None: + """Set step owners, suppressions, and owned steps.""" + self._set_owners(db_conn, owners) + self._set_step_suppressions(db_conn, suppressions) + self.set_steps(db_conn, owned_steps) + + def _set_step_suppressions(self, + db_conn: DatabaseConnection, + step_ids: list[int] + ) -> None: """Set self.suppressed_steps from step_ids.""" assert isinstance(self.id_, int) db_conn.delete_where('process_step_suppressions', 'process', self.id_) self.suppressed_steps = [ProcessStep.by_id(db_conn, s) for s in step_ids] - def set_steps(self, db_conn: DatabaseConnection, - steps: list[ProcessStep]) -> None: + def _set_owners(self, + db_conn: DatabaseConnection, + owner_ids: list[int] + ) -> None: + """Re-set owners to those identified in owner_ids.""" + owners_old = self.used_as_step_by(db_conn) + losers = [o for o in owners_old if o.id_ not in owner_ids] + owners_old_ids = [o.id_ for o in owners_old] + winners = [Process.by_id(db_conn, id_) for id_ in owner_ids + if id_ not in owners_old_ids] + steps_to_remove = [] + for loser in losers: + steps_to_remove += [s for s in loser.explicit_steps + if s.step_process_id == self.id_] + for step in steps_to_remove: + step.remove(db_conn) + for winner in winners: + assert isinstance(winner.id_, int) + assert isinstance(self.id_, int) + new_step = ProcessStep(None, winner.id_, self.id_, None) + new_explicit_steps = winner.explicit_steps + [new_step] + winner.set_steps(db_conn, new_explicit_steps) + + def set_steps(self, + db_conn: DatabaseConnection, + steps: list[ProcessStep] + ) -> None: """Set self.explicit_steps in bulk. Checks against recursion, and turns into top-level steps any of @@ -166,27 +204,6 @@ class Process(BaseModel[int], ConditionsRelations): walk_steps(step) step.save(db_conn) - def set_owners(self, db_conn: DatabaseConnection, - owner_ids: list[int]) -> None: - """Re-set owners to those identified in owner_ids.""" - owners_old = self.used_as_step_by(db_conn) - losers = [o for o in owners_old if o.id_ not in owner_ids] - owners_old_ids = [o.id_ for o in owners_old] - winners = [Process.by_id(db_conn, id_) for id_ in owner_ids - if id_ not in owners_old_ids] - steps_to_remove = [] - for loser in losers: - steps_to_remove += [s for s in loser.explicit_steps - if s.step_process_id == self.id_] - for step in steps_to_remove: - step.remove(db_conn) - for winner in winners: - assert isinstance(winner.id_, int) - assert isinstance(self.id_, int) - new_step = ProcessStep(None, winner.id_, self.id_, None) - new_explicit_steps = winner.explicit_steps + [new_step] - winner.set_steps(db_conn, new_explicit_steps) - def save(self, db_conn: DatabaseConnection) -> None: """Add (or re-write) self and connected items to DB.""" super().save(db_conn) -- 2.30.2 From 9c615407aa166e6ce39aedde0ed45890aa87e58b Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Fri, 9 Aug 2024 20:04:28 +0200 Subject: [PATCH 05/16] Simplify do_POST_todo code. --- plomtask/http.py | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/plomtask/http.py b/plomtask/http.py index 51b35cb..704ae06 100644 --- a/plomtask/http.py +++ b/plomtask/http.py @@ -632,7 +632,7 @@ class TaskHandler(BaseHTTPRequestHandler): """Update Todo and its children.""" # pylint: disable=too-many-locals # pylint: disable=too-many-branches - adopted_child_ids = self._form.get_all_int('adopt') + adoptees = self._form.get_all_int('adopt') to_make = {'full': self._form.get_all_int('make_full'), 'empty': self._form.get_all_int('make_empty')} step_fillers = self._form.get_all_str('step_filler') @@ -670,19 +670,14 @@ class TaskHandler(BaseHTTPRequestHandler): elif filler.startswith('make_full_'): to_make['full'] += [target_id] else: - adopted_child_ids += [target_id] + adoptees += [target_id] # todo.set_condition_relations(self._conn, *cond_rels) - to_remove = [] - for child in todo.children: - if child.id_ and (child.id_ not in adopted_child_ids): - to_remove += [child.id_] - for id_ in to_remove: - child = Todo.by_id(self._conn, id_) - todo.remove_child(child) - for child_id in adopted_child_ids: - if child_id not in [c.id_ for c in todo.children]: - todo.add_child(Todo.by_id(self._conn, child_id)) + for child in [c for c in todo.children if c.id_ not in adoptees]: + todo.remove_child(child) + for child_id in [id_ for id_ in adoptees + if id_ not in [c.id_ for c in todo.children]]: + todo.add_child(Todo.by_id(self._conn, child_id)) todo.update_attrs(**to_update) for approach, proc_ids in to_make.items(): for process_id in proc_ids: @@ -749,7 +744,7 @@ class TaskHandler(BaseHTTPRequestHandler): # set relations to Conditions and ProcessSteps / other Processes process.set_condition_relations(self._conn, *cond_rels) owned_steps = [] - for step_id in kept_steps: # collecting sub-steps + for step_id in kept_steps: owned_steps += [ProcessStep.by_id(self._conn, step_id)] owned_steps += [ # new sub-steps ProcessStep(None, process.id_, step_process_id, step_id) -- 2.30.2 From 46e509c1467535e2281922718dc60ee89a0a019e Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Sat, 10 Aug 2024 03:12:21 +0200 Subject: [PATCH 06/16] Minor improvements of TaskHandler code. --- plomtask/http.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/plomtask/http.py b/plomtask/http.py index 704ae06..cba55d6 100644 --- a/plomtask/http.py +++ b/plomtask/http.py @@ -452,7 +452,6 @@ class TaskHandler(BaseHTTPRequestHandler): process_id = self._params.get_int_or_none('process_id') comment_pattern = self._params.get_str_or_fail('comment_pattern', '') # - todos = [] ret = Todo.by_date_range_with_limits(self._conn, (start, end)) todos_by_date_range, start, end = ret todos = [t for t in todos_by_date_range @@ -500,14 +499,16 @@ class TaskHandler(BaseHTTPRequestHandler): owner_ids = self._params.get_all_int('step_to') owned_ids = self._params.get_all_int('has_step') title_64 = self._params.get_str('title_b64') - # + title_new = None if title_64: try: - title = b64decode(title_64.encode()).decode() + title_new = b64decode(title_64.encode()).decode() except binascii_Exception as exc: msg = 'invalid base64 for ?title_b64=' raise BadFormatException(msg) from exc - process.title.set(title) + # + if title_new: + process.title.set(title_new) preset_top_step = None owners = process.used_as_step_by(self._conn) for step_id in owner_ids: @@ -598,15 +599,15 @@ class TaskHandler(BaseHTTPRequestHandler): comments = self._form.get_all_str('comment') efforts = self._form.get_all_floats_or_nones('effort') done_todos = self._form.get_all_int('done') - # - for _ in [id_ for id_ in done_todos if id_ not in old_todos]: - raise BadFormatException('"done" field refers to unknown Todo') is_done = [t_id in done_todos for t_id in old_todos] if not (len(old_todos) == len(is_done) == len(comments) == len(efforts)): msg = 'not equal number each of number of todo_id, comments, ' +\ 'and efforts inputs' raise BadFormatException(msg) + for _ in [id_ for id_ in done_todos if id_ not in old_todos]: + raise BadFormatException('"done" field refers to unknown Todo') + # day = Day.by_id_or_create(self._conn, date) day.comment = day_comment day.save(self._conn) @@ -674,10 +675,10 @@ class TaskHandler(BaseHTTPRequestHandler): # todo.set_condition_relations(self._conn, *cond_rels) for child in [c for c in todo.children if c.id_ not in adoptees]: - todo.remove_child(child) + todo.remove_child(child) for child_id in [id_ for id_ in adoptees if id_ not in [c.id_ for c in todo.children]]: - todo.add_child(Todo.by_id(self._conn, child_id)) + todo.add_child(Todo.by_id(self._conn, child_id)) todo.update_attrs(**to_update) for approach, proc_ids in to_make.items(): for process_id in proc_ids: -- 2.30.2 From 76af5f86ab547bcad3c1c944a369bf8b216ee5da Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Sat, 10 Aug 2024 03:18:20 +0200 Subject: [PATCH 07/16] In Todo view, allow filling of steps below sub-steps. --- templates/todo.html | 2 -- 1 file changed, 2 deletions(-) diff --git a/templates/todo.html b/templates/todo.html index 20279bb..61d4675 100644 --- a/templates/todo.html +++ b/templates/todo.html @@ -22,7 +22,6 @@ select{ font-size: 0.5em; margin: 0; padding: 0; } {{item.todo.title_then|e}} {% else %} {{item.process.title.newest|e}} -{% if indent == 0 %} · fill: {% endif %} -{% endif %} {% for child in item.children %} -- 2.30.2 From 4c6908e51c5eeaeef66dd66325d367dc42b29b75 Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Sat, 10 Aug 2024 05:32:55 +0200 Subject: [PATCH 08/16] Add TaskHandler code to actually make previous commit work. --- plomtask/http.py | 82 ++++++++++++++++++++++++++------------------- templates/todo.html | 10 +++--- tests/misc.py | 36 ++++++++++---------- tests/todos.py | 61 +++++++-------------------------- tests/utils.py | 16 +++++---- 5 files changed, 94 insertions(+), 111 deletions(-) diff --git a/plomtask/http.py b/plomtask/http.py index cba55d6..e307f14 100644 --- a/plomtask/http.py +++ b/plomtask/http.py @@ -88,11 +88,11 @@ class InputsParser: return None return val in {'True', 'true', '1', 'on'} - def get_firsts_of_key_prefixed(self, prefix: str) -> dict[str, str]: - """Retrieve dict of (first) strings at key starting with prefix.""" + def get_all_of_key_prefixed(self, key_prefix: str) -> dict[str, list[str]]: + """Retrieve dict of strings at keys starting with key_prefix.""" ret = {} - for key in [k for k in self.inputs.keys() if k.startswith(prefix)]: - ret[key] = self.inputs[key][0] + for key in [k for k in self.inputs.keys() if k.startswith(key_prefix)]: + ret[key[len(key_prefix):]] = self.inputs[key] return ret def get_float_or_fail(self, key: str) -> float: @@ -581,10 +581,9 @@ class TaskHandler(BaseHTTPRequestHandler): id_ = self._params.get_int_or_none('id') item = cls.by_id(self._conn, id_) attr = getattr(item, attr_name) - for k, v in self._form.get_firsts_of_key_prefixed('at:').items(): - old = k[3:] - if old[19:] != v: - attr.reset_timestamp(old, f'{v}.0') + for k, vals in self._form.get_all_of_key_prefixed('at:').items(): + if k[19:] != vals[0]: + attr.reset_timestamp(k, f'{vals[0]}.0') attr.save(self._conn) return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}' @@ -633,10 +632,14 @@ class TaskHandler(BaseHTTPRequestHandler): """Update Todo and its children.""" # pylint: disable=too-many-locals # pylint: disable=too-many-branches - adoptees = self._form.get_all_int('adopt') - to_make = {'full': self._form.get_all_int('make_full'), - 'empty': self._form.get_all_int('make_empty')} - step_fillers = self._form.get_all_str('step_filler') + # pylint: disable=too-many-statements + assert todo.id_ is not None + adoptees = [(id_, todo.id_) for id_ in self._form.get_all_int('adopt')] + to_make = {'full': [(id_, todo.id_) + for id_ in self._form.get_all_int('make_full')], + 'empty': [(id_, todo.id_) + for id_ in self._form.get_all_int('make_empty')]} + step_fillers_to = self._form.get_all_of_key_prefixed('step_filler_to_') to_update: dict[str, Any] = { 'comment': self._form.get_str_or_fail('comment', '')} for k in ('is_done', 'calendarize'): @@ -655,39 +658,50 @@ class TaskHandler(BaseHTTPRequestHandler): except ValueError as e: msg = 'cannot float form field value for key: effort' raise BadFormatException(msg) from e - for filler in [f for f in step_fillers if f != 'ignore']: - target_id: int - to_int = filler - for prefix in [p for p in ['make_empty_', 'make_full_'] - if filler.startswith(p)]: - to_int = filler[len(prefix):] + for k, fillers in step_fillers_to.items(): try: - target_id = int(to_int) + parent_id = int(k) except ValueError as e: - msg = 'bad fill_for target: {filler}' + msg = f'bad step_filler_to_ key: {k}' raise BadFormatException(msg) from e - if filler.startswith('make_empty_'): - to_make['empty'] += [target_id] - elif filler.startswith('make_full_'): - to_make['full'] += [target_id] - else: - adoptees += [target_id] + for filler in [f for f in fillers if f != 'ignore']: + target_id: int + prefix = 'make_' + to_int = filler[5:] if filler.startswith(prefix) else filler + try: + target_id = int(to_int) + except ValueError as e: + msg = f'bad fill_for target: {filler}' + raise BadFormatException(msg) from e + if filler.startswith(prefix): + to_make['empty'] += [(target_id, parent_id)] + else: + adoptees += [(target_id, parent_id)] # todo.set_condition_relations(self._conn, *cond_rels) - for child in [c for c in todo.children if c.id_ not in adoptees]: - todo.remove_child(child) - for child_id in [id_ for id_ in adoptees - if id_ not in [c.id_ for c in todo.children]]: - todo.add_child(Todo.by_id(self._conn, child_id)) + for parent in [Todo.by_id(self._conn, a[1]) + for a in adoptees] + [todo]: + for child in parent.children: + if child not in [t[0] for t in adoptees + if t[0] == child.id_ and t[1] == parent.id_]: + parent.remove_child(child) + parent.save(self._conn) + for child_id, parent_id in adoptees: + parent = Todo.by_id(self._conn, parent_id) + if child_id not in [c.id_ for c in parent.children]: + parent.add_child(Todo.by_id(self._conn, child_id)) + parent.save(self._conn) todo.update_attrs(**to_update) - for approach, proc_ids in to_make.items(): - for process_id in proc_ids: + for approach, make_data in to_make.items(): + for process_id, parent_id in make_data: + parent = Todo.by_id(self._conn, parent_id) process = Process.by_id(self._conn, process_id) made = Todo(None, process, False, todo.date) made.save(self._conn) if 'full' == approach: made.ensure_children(self._conn) - todo.add_child(made) + parent.add_child(made) + parent.save(self._conn) # todo.save() may destroy Todo if .effort < 0, so retrieve .id_ early url = f'/todo?id={todo.id_}' todo.save(self._conn) diff --git a/templates/todo.html b/templates/todo.html index 61d4675..de5dbd2 100644 --- a/templates/todo.html +++ b/templates/todo.html @@ -22,19 +22,21 @@ select{ font-size: 0.5em; margin: 0; padding: 0; } {{item.todo.title_then|e}} {% else %} {{item.process.title.newest|e}} -· fill: - - + {% for adoptable in adoption_candidates_for[item.process.id_] %} {% endfor %} +{% endif %} + {% endif %} {% for child in item.children %} -{{ draw_tree_row(child, item, indent+1) }} +{{ draw_tree_row(child, item.todo, indent+1) }} {% endfor %} {% endmacro %} diff --git a/tests/misc.py b/tests/misc.py index 1efa335..86474c7 100644 --- a/tests/misc.py +++ b/tests/misc.py @@ -36,31 +36,31 @@ class TestsSansServer(TestCase): parser = InputsParser({'foo': ['baz', 'quux']}) self.assertEqual('baz', parser.get_str('foo', 'bar')) - def test_InputsParser_get_firsts_of_key_prefixed(self) -> None: - """Test InputsParser.get_firsts_of_key_prefixed.""" + def test_InputsParser_get_all_of_key_prefixed(self) -> None: + """Test InputsParser.get_all_of_key_prefixed.""" parser = InputsParser({}) self.assertEqual({}, - parser.get_firsts_of_key_prefixed('')) + parser.get_all_of_key_prefixed('')) self.assertEqual({}, - parser.get_firsts_of_key_prefixed('foo')) + parser.get_all_of_key_prefixed('foo')) parser = InputsParser({'foo': ['bar']}) - self.assertEqual({'foo': 'bar'}, - parser.get_firsts_of_key_prefixed('')) - parser = InputsParser({'x': ['y']}) - self.assertEqual({'x': 'y'}, - parser.get_firsts_of_key_prefixed('x')) - parser = InputsParser({'xx': ['y']}) - self.assertEqual({'xx': 'y'}, - parser.get_firsts_of_key_prefixed('x')) + self.assertEqual({'foo': ['bar']}, + parser.get_all_of_key_prefixed('')) + parser = InputsParser({'x': ['y', 'z']}) + self.assertEqual({'': ['y', 'z']}, + parser.get_all_of_key_prefixed('x')) + parser = InputsParser({'xx': ['y', 'Z']}) + self.assertEqual({'x': ['y', 'Z']}, + parser.get_all_of_key_prefixed('x')) parser = InputsParser({'xx': ['y']}) self.assertEqual({}, - parser.get_firsts_of_key_prefixed('xxx')) + parser.get_all_of_key_prefixed('xxx')) parser = InputsParser({'xxx': ['x'], 'xxy': ['y'], 'xyy': ['z']}) - self.assertEqual({'xxx': 'x', 'xxy': 'y'}, - parser.get_firsts_of_key_prefixed('xx')) - parser = InputsParser({'xxx': ['x', 'y', 'z'], 'xxy': ['y', 'z']}) - self.assertEqual({'xxx': 'x', 'xxy': 'y'}, - parser.get_firsts_of_key_prefixed('xx')) + self.assertEqual({'x': ['x'], 'y': ['y']}, + parser.get_all_of_key_prefixed('xx')) + parser = InputsParser({'xxx': ['x', 'y'], 'xxy': ['y', 'z']}) + self.assertEqual({'x': ['x', 'y'], 'y': ['y', 'z']}, + parser.get_all_of_key_prefixed('xx')) def test_InputsParser_get_int_or_none(self) -> None: """Test InputsParser.get_int_or_none.""" diff --git a/tests/todos.py b/tests/todos.py index d84bb70..2ecf3b8 100644 --- a/tests/todos.py +++ b/tests/todos.py @@ -282,14 +282,16 @@ class TestsWithServer(TestCaseWithServer): self.check_post({}, '/todo?id=1', 404) # test malformed values on existing Todo self.post_exp_day([], {'new_todo': [1]}) - for name in [ - 'adopt', 'effort', 'make_full', 'make_empty', 'step_filler', - 'conditions', 'disables', 'blockers', 'enables']: + for name in ['adopt', 'effort', 'make_full', 'make_empty', + 'conditions', 'disables', 'blockers', 'enables']: self.check_post({name: 'x'}, '/todo?id=1', 400, '/todo') - for prefix in ['make_empty_', 'make_full_']: + for prefix in ['make_', '']: for suffix in ['', 'x', '1.1']: - self.check_post({'step_filler': f'{prefix}{suffix}'}, - '/todo?id=1', 400, '/todo') + self.check_post({'step_filler_to_1': [f'{prefix}{suffix}']}, + '/todo?id=1', 400, '/todo') + for suffix in ['', 'x', '1.1']: + self.check_post({'step_filler_to_{suffix}': ['1']}, + '/todo?id=1', 400, '/todo') def test_basic_POST_todo(self) -> None: """Test basic POST /todo manipulations.""" @@ -398,14 +400,15 @@ class TestsWithServer(TestCaseWithServer): self.post_exp_day([exp], {'new_todo': [2]}) self.post_exp_day([exp], {'new_todo': [3]}) self.check_json_get('/todo?id=1', exp) - self._post_exp_todo(1, {'step_filler': 5, 'adopt': [4]}, exp) + self._post_exp_todo(1, {'step_filler_to_1': 5, 'adopt': [4]}, exp) + exp.lib_get('Todo', 1)['children'] += [5] step1_proc2 = exp.step_as_dict(1, [], 2, 4, True) step2_proc3 = exp.step_as_dict(2, [], 3, 5, True) exp.set('steps_todo_to_process', [step1_proc2, step2_proc3]) self.check_json_get('/todo?id=1', exp) # test 'ignore' values for 'step_filler' are ignored, and intable # 'step_filler' values are interchangeable with those of 'adopt' - todo_post = {'adopt': 5, 'step_filler': ['ignore', 4]} + todo_post = {'adopt': 5, 'step_filler_to_1': ['ignore', 4]} self.check_post(todo_post, '/todo?id=1') self.check_json_get('/todo?id=1', exp) # test cannot adopt into non-top-level elements of chain, instead @@ -423,46 +426,8 @@ class TestsWithServer(TestCaseWithServer): step4_todo6]) self.check_json_get('/todo?id=1', exp) - def test_POST_todo_make_full(self) -> None: - """Test creation and adoption via POST /todo with "make_full".""" - # create chain of Processes - exp = ExpectedGetTodo(1) - self.post_exp_process([exp], {}, 1) - for i in range(1, 4): - self.post_exp_process([exp], {'new_top_step': i}, i+1) - exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 2, 1), - exp.procstep_as_dict(2, 3, 2), - exp.procstep_as_dict(3, 4, 3)]) - step3_proc1 = exp.step_as_dict(3, [], 1, None, False) - step2_proc2 = exp.step_as_dict(2, [step3_proc1], 2, None, False) - step1_proc3 = exp.step_as_dict(1, [step2_proc2], 3, None, True) - exp.set('steps_todo_to_process', [step1_proc3]) - # post (childless) Todo of chain end, then make_full on next in line - self.post_exp_day([exp], {'new_todo': [4]}) - self.check_post({'step_filler': 'make_full_3'}, '/todo?id=1') - exp.set_todo_from_post(4, {'process_id': 1}) - exp.set_todo_from_post(3, {'process_id': 2, 'children': [4]}) - exp.set_todo_from_post(2, {'process_id': 3, 'children': [3]}) - exp.set_todo_from_post(1, {'process_id': 4, 'children': [2]}) - step3_proc1 = exp.step_as_dict(3, [], 1, 4, True) - step2_proc2 = exp.step_as_dict(2, [step3_proc1], 2, 3, True) - step1_proc3 = exp.step_as_dict(1, [step2_proc2], 3, 2, True) - exp.set('steps_todo_to_process', [step1_proc3]) - self.check_json_get('/todo?id=1', exp) - # make new chain next to expected, find steps_todo_to_process extended, - # expect existing Todo demanded by new chain be adopted into new chain - self.check_post({'make_full': 2, 'adopt': [2]}, '/todo?id=1') - exp.set_todo_from_post(5, {'process_id': 2, 'children': [4]}) - exp.set_todo_from_post(1, {'process_id': 4, 'children': [2, 5]}) - step5_todo4 = exp.step_as_dict(5, [], None, 4) - step4_todo5 = exp.step_as_dict(4, [step5_todo4], None, 5) - exp.set('steps_todo_to_process', [step1_proc3, step4_todo5]) - self.check_json_get('/todo?id=1', exp) - # fail on trying to call make_full on non-existing Process - self.check_post({'make_full': 5}, '/todo?id=1', 404) - def test_POST_todo_make_empty(self) -> None: - """Test creation and adoption via POST /todo with "make_empty".""" + """Test creation via POST /todo "step_filler_to"/"make".""" # create chain of Processes exp = ExpectedGetTodo(1) self.post_exp_process([exp], {}, 1) @@ -478,7 +443,7 @@ class TestsWithServer(TestCaseWithServer): step1_proc3 = exp.step_as_dict(1, [step2_proc2], 3, None, True) exp.set('steps_todo_to_process', [step1_proc3]) self.check_json_get('/todo?id=1', exp) - self.check_post({'step_filler': 'make_empty_3'}, '/todo?id=1') + self.check_post({'step_filler_to_1': 'make_3'}, '/todo?id=1') exp.set_todo_from_post(2, {'process_id': 3}) exp.set_todo_from_post(1, {'process_id': 4, 'children': [2]}) step2_proc2 = exp.step_as_dict(2, [step3_proc1], 2, None, True) diff --git a/tests/utils.py b/tests/utils.py index 71da9fb..d1b6eac 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -753,11 +753,13 @@ class Expected: """Set Todo of id_ in library based on POST dict d.""" corrected_kwargs: dict[str, Any] = {'children': []} for k, v in d.items(): - if k in {'adopt', 'step_filler'}: + if k.startswith('step_filler_to_'): + continue + elif 'adopt' == k: new_children = v if isinstance(v, list) else [v] corrected_kwargs['children'] += new_children continue - if k in {'is_done', 'calendarize'}: + elif k in {'is_done', 'calendarize'}: v = v in VALID_TRUES corrected_kwargs[k] = v todo = self.lib_get('Todo', id_) @@ -1015,9 +1017,9 @@ class TestCaseWithServer(TestCaseWithDB): try: self.assertEqual(cmp, retrieved) except AssertionError as e: - # print('EXPECTED:') - # pprint(cmp) - # print('RETRIEVED:') - # pprint(retrieved) - # walk_diffs('', cmp, retrieved) + print('EXPECTED:') + pprint(cmp) + print('RETRIEVED:') + pprint(retrieved) + walk_diffs('', cmp, retrieved) raise e -- 2.30.2 From 79d2b7f2c1e944e20f7beb9739183e4cdd3694dd Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Mon, 12 Aug 2024 11:11:54 +0200 Subject: [PATCH 09/16] Set 'is_new' even when provided not-yet-existing ID, adapt and fix tests. --- plomtask/db.py | 5 ++-- plomtask/http.py | 20 ++++++++++--- tests/conditions.py | 68 +++++++++++++++++++++++---------------------- tests/processes.py | 4 ++- tests/todos.py | 10 +++---- tests/utils.py | 29 +++++++++---------- 6 files changed, 77 insertions(+), 59 deletions(-) diff --git a/plomtask/db.py b/plomtask/db.py index ee5f3b9..1fdd3e1 100644 --- a/plomtask/db.py +++ b/plomtask/db.py @@ -376,7 +376,8 @@ class BaseModel(Generic[BaseModelId]): @classmethod def _get_cached(cls: type[BaseModelInstance], - id_: BaseModelId) -> BaseModelInstance | None: + id_: BaseModelId + ) -> BaseModelInstance | None: """Get object of id_ from class's cache, or None if not found.""" cache = cls.get_cache() if id_ in cache: @@ -449,7 +450,7 @@ class BaseModel(Generic[BaseModelId]): def by_id_or_create(cls, db_conn: DatabaseConnection, id_: BaseModelId | None ) -> Self: - """Wrapper around .by_id, creating (not caching/saving) if not find.""" + """Wrapper around .by_id, creating (not caching/saving) if no find.""" if not cls.can_create_by_id: raise HandledException('Class cannot .by_id_or_create.') if id_ is None: diff --git a/plomtask/http.py b/plomtask/http.py index e307f14..4426bba 100644 --- a/plomtask/http.py +++ b/plomtask/http.py @@ -1,5 +1,6 @@ """Web server stuff.""" from __future__ import annotations +from inspect import signature from typing import Any, Callable from base64 import b64encode, b64decode from binascii import Error as binascii_Exception @@ -309,6 +310,9 @@ class TaskHandler(BaseHTTPRequestHandler): item = target_class.by_id_or_create(self._conn, id_) else: item = target_class.by_id(self._conn, id_) + if 'exists' in signature(f).parameters: + exists = id_ is not None and target_class._get_cached(id_) + return f(self, item, exists) return f(self, item) return wrapper return decorator @@ -474,10 +478,14 @@ class TaskHandler(BaseHTTPRequestHandler): 'pattern': pattern} @_get_item(Condition) - def do_GET_condition(self, c: Condition) -> dict[str, object]: + def do_GET_condition(self, + c: Condition, + exists: bool + ) -> dict[str, object]: """Show Condition of ?id=.""" ps = Process.all(self._conn) - return {'condition': c, 'is_new': c.id_ is None, + return {'condition': c, + 'is_new': not exists, 'enabled_processes': [p for p in ps if c in p.conditions], 'disabled_processes': [p for p in ps if c in p.blockers], 'enabling_processes': [p for p in ps if c in p.enables], @@ -494,7 +502,10 @@ class TaskHandler(BaseHTTPRequestHandler): return {'condition': c} @_get_item(Process) - def do_GET_process(self, process: Process) -> dict[str, object]: + def do_GET_process(self, + process: Process, + exists: bool + ) -> dict[str, object]: """Show Process of ?id=.""" owner_ids = self._params.get_all_int('step_to') owned_ids = self._params.get_all_int('has_step') @@ -516,7 +527,8 @@ class TaskHandler(BaseHTTPRequestHandler): for process_id in owned_ids: Process.by_id(self._conn, process_id) # to ensure ID exists preset_top_step = process_id - return {'process': process, 'is_new': process.id_ is None, + return {'process': process, + 'is_new': not exists, 'preset_top_step': preset_top_step, 'steps': process.get_steps(self._conn), 'owners': owners, diff --git a/tests/conditions.py b/tests/conditions.py index 6feda94..a9b28bb 100644 --- a/tests/conditions.py +++ b/tests/conditions.py @@ -51,9 +51,10 @@ class ExpectedGetConditions(Expected): class ExpectedGetCondition(Expected): """Builder of expectations for GET /condition.""" + _default_dict = {'is_new': False} _on_empty_make_temp = ('Condition', 'cond_as_dict') - def __init__(self, id_: int, *args: Any, **kwargs: Any) -> None: + def __init__(self, id_: int | None, *args: Any, **kwargs: Any) -> None: self._fields = {'condition': id_} super().__init__(*args, **kwargs) @@ -67,7 +68,6 @@ class ExpectedGetCondition(Expected): self._fields[c_field] = self.as_ids([ p for p in self.lib_all('Process') if self._fields['condition'] in p[p_field]]) - self._fields['is_new'] = False class TestsWithServer(TestCaseWithServer): @@ -79,51 +79,55 @@ class TestsWithServer(TestCaseWithServer): url = '/condition' self.check_post({}, url, 400) self.check_post({'title': ''}, url, 400) - self.check_post({'title': '', 'is_active': 0}, url, 400) - self.check_post({'description': '', 'is_active': 0}, url, 400) + self.check_post({'description': ''}, url, 400) # check valid POST payload on bad paths valid_payload = {'title': '', 'description': '', 'is_active': 0} - self.check_post(valid_payload, '/condition?id=foo', 400) + self.check_post(valid_payload, f'{url}?id=foo', 400) def test_POST_condition(self) -> None: """Test (valid) POST /condition and its effect on GET /condition[s].""" - exp_single = ExpectedGetCondition(1) - exp_all = ExpectedGetConditions() + url_single, url_all = '/condition?id=1', '/conditions' + exp_single, exp_all = ExpectedGetCondition(1), ExpectedGetConditions() all_exps = [exp_single, exp_all] # test valid POST's effect on single /condition and full /conditions - post = {'title': 'foo', 'description': 'oof', 'is_active': 0} - self.post_exp_cond(all_exps, 1, post, '', '?id=1') - self.check_json_get('/condition?id=1', exp_single) - self.check_json_get('/conditions', exp_all) + self.post_exp_cond(all_exps, {'title': 'foo', 'description': 'oof'}, + post_to_id=False) + self.check_json_get(url_single, exp_single) + self.check_json_get(url_all, exp_all) # test (no) effect of invalid POST to existing Condition on /condition - self.check_post({}, '/condition?id=1', 400) - self.check_json_get('/condition?id=1', exp_single) + self.check_post({}, url_single, 400) + self.check_json_get(url_single, exp_single) # test effect of POST changing title and activeness - post = {'title': 'bar', 'description': 'oof', 'is_active': 1} - self.post_exp_cond(all_exps, 1, post, '?id=1', '?id=1') - self.check_json_get('/condition?id=1', exp_single) - self.check_json_get('/conditions', exp_all) + self.post_exp_cond(all_exps, {'title': 'bar', 'description': 'oof', + 'is_active': 1}) + self.check_json_get(url_single, exp_single) + self.check_json_get(url_all, exp_all) # test deletion POST's effect, both to return id=1 into empty single, # full /conditions into empty list - self.post_exp_cond(all_exps, 1, {'delete': ''}, '?id=1', 's') - self.check_json_get('/condition?id=1', exp_single) - self.check_json_get('/conditions', exp_all) + self.post_exp_cond(all_exps, {'delete': ''}, redir_to_id=False) + exp_single.set('is_new', True) + self.check_json_get(url_single, exp_single) + self.check_json_get(url_all, exp_all) def test_GET_condition(self) -> None: """More GET /condition testing, especially for Process relations.""" # check expected default status codes self.check_get_defaults('/condition') + # check 'is_new' set if id= absent or pointing to not-yet-existing ID + exp = ExpectedGetCondition(None) + exp.set('is_new', True) + self.check_json_get('/condition', exp) + exp = ExpectedGetCondition(1) + exp.set('is_new', True) + self.check_json_get('/condition?id=1', exp) # make Condition and two Processes that among them establish all # possible ConditionsRelations to it, check /condition displays all exp = ExpectedGetCondition(1) - cond_post = {'title': 'foo', 'description': 'oof', 'is_active': 0} - self.post_exp_cond([exp], 1, cond_post, '', '?id=1') - proc1_post = {'title': 'A', 'description': '', 'effort': 1.1, - 'conditions': [1], 'disables': [1]} - proc2_post = {'title': 'B', 'description': '', 'effort': 0.9, - 'enables': [1], 'blockers': [1]} - self.post_exp_process([exp], proc1_post, 1) - self.post_exp_process([exp], proc2_post, 2) + self.post_exp_cond([exp], {'title': 'foo', 'description': 'oof'}, + post_to_id=False) + for i, p in enumerate([('conditions', 'disables'), + ('enables', 'blockers')]): + self.post_exp_process([exp], {k: [1] for k in p}, i+1) self.check_json_get('/condition?id=1', exp) def test_GET_conditions(self) -> None: @@ -131,19 +135,17 @@ class TestsWithServer(TestCaseWithServer): # test empty result on empty DB, default-settings on empty params exp = ExpectedGetConditions() self.check_json_get('/conditions', exp) - # test ignorance of meaningless non-empty params (incl. unknown key), - # that 'sort_by' default to 'title' (even if set to something else, as + # test 'sort_by' default to 'title' (even if set to something else, as # long as without handler) and 'pattern' get preserved exp.set('pattern', 'bar') - exp.set('sort_by', 'title') # for clarity (already default) self.check_json_get('/conditions?sort_by=foo&pattern=bar&foo=x', exp) - # test non-empty result, automatic (positive) sorting by title exp.set('pattern', '') + # test non-empty result, automatic (positive) sorting by title post_cond1 = {'is_active': 0, 'title': 'foo', 'description': 'oof'} post_cond2 = {'is_active': 0, 'title': 'bar', 'description': 'rab'} post_cond3 = {'is_active': 1, 'title': 'baz', 'description': 'zab'} for i, post in enumerate([post_cond1, post_cond2, post_cond3]): - self.post_exp_cond([exp], i+1, post, '', f'?id={i+1}') + self.post_exp_cond([exp], post, i+1, post_to_id=False) self.check_filter(exp, 'conditions', 'sort_by', 'title', [2, 3, 1]) # test other sortings self.check_filter(exp, 'conditions', 'sort_by', '-title', [1, 3, 2]) diff --git a/tests/processes.py b/tests/processes.py index 422c283..24a62bd 100644 --- a/tests/processes.py +++ b/tests/processes.py @@ -319,10 +319,11 @@ class TestsWithServer(TestCaseWithServer): # check on un-saved exp = ExpectedGetProcess(1) exp.force('process_candidates', []) + exp.set('is_new', True) self.check_json_get('/process?id=1', exp) # check on minimal payload post + exp = ExpectedGetProcess(1) valid_post = {'title': 'foo', 'description': 'oof', 'effort': 2.3} - exp.unforce('process_candidates') self.post_exp_process([exp], valid_post, 1) self.check_json_get('/process?id=1', exp) # check n_todos field @@ -345,6 +346,7 @@ class TestsWithServer(TestCaseWithServer): exp.set_proc_from_post(3, valid_post) exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 3, 2)]) exp.force('process_candidates', [1, 2, 3]) + exp.set('is_new', True) self.check_json_get('/process?id=4', exp) def test_POST_process_steps(self) -> None: diff --git a/tests/todos.py b/tests/todos.py index 2ecf3b8..f048d46 100644 --- a/tests/todos.py +++ b/tests/todos.py @@ -288,10 +288,10 @@ class TestsWithServer(TestCaseWithServer): for prefix in ['make_', '']: for suffix in ['', 'x', '1.1']: self.check_post({'step_filler_to_1': [f'{prefix}{suffix}']}, - '/todo?id=1', 400, '/todo') + '/todo?id=1', 400, '/todo') for suffix in ['', 'x', '1.1']: - self.check_post({'step_filler_to_{suffix}': ['1']}, - '/todo?id=1', 400, '/todo') + self.check_post({'step_filler_to_{suffix}': ['1']}, + '/todo?id=1', 400, '/todo') def test_basic_POST_todo(self) -> None: """Test basic POST /todo manipulations.""" @@ -318,8 +318,8 @@ class TestsWithServer(TestCaseWithServer): # test Condition posts c1_post = {'title': 'foo', 'description': 'oof', 'is_active': 0} c2_post = {'title': 'bar', 'description': 'rab', 'is_active': 1} - self.post_exp_cond([exp], 1, c1_post, '?id=1', '?id=1') - self.post_exp_cond([exp], 2, c2_post, '?id=2', '?id=2') + self.post_exp_cond([exp], c1_post, 1) + self.post_exp_cond([exp], c2_post, 2) self.check_json_get('/todo?id=1', exp) todo_post = {'conditions': [1], 'disables': [1], 'blockers': [2], 'enables': [2]} diff --git a/tests/utils.py b/tests/utils.py index d1b6eac..7945f61 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -569,8 +569,7 @@ class Expected: id_ = self._fields[category.lower()] make_temp = not bool(self.lib_get(category, id_)) if make_temp: - f = getattr(self, dicter) - self.lib_set(category, [f(id_)]) + self.lib_set(category, [getattr(self, dicter)(id_)]) self.recalc() d = {'_library': self._lib} for k, v in self._fields.items(): @@ -587,6 +586,7 @@ class Expected: d[k] = v if make_temp: json = json_dumps(d) + id_ = id_ if id_ is not None else '?' self.lib_del(category, id_) d = json_loads(json) return d @@ -640,7 +640,8 @@ class Expected: """Return dictionary of items by their 'id' fields.""" refs = {} for item in items: - refs[str(item['id'])] = item + id_ = str(item['id']) if item['id'] is not None else '?' + refs[id_] = item return refs @staticmethod @@ -703,7 +704,8 @@ class Expected: return cond = self.lib_get('Condition', id_) if cond: - cond['is_active'] = d['is_active'] + if 'is_active' in d: + cond['is_active'] = d['is_active'] for category in ['title', 'description']: history = cond['_versioned'][category] if len(history) > 0: @@ -713,8 +715,7 @@ class Expected: else: history['0'] = d[category] else: - cond = self.cond_as_dict( - id_, d['is_active'], d['title'], d['description']) + cond = self.cond_as_dict(id_, **d) self.lib_set('Condition', [cond]) @staticmethod @@ -755,11 +756,11 @@ class Expected: for k, v in d.items(): if k.startswith('step_filler_to_'): continue - elif 'adopt' == k: + if 'adopt' == k: new_children = v if isinstance(v, list) else [v] corrected_kwargs['children'] += new_children continue - elif k in {'is_done', 'calendarize'}: + if k in {'is_done', 'calendarize'}: v = v in VALID_TRUES corrected_kwargs[k] = v todo = self.lib_get('Todo', id_) @@ -866,16 +867,16 @@ class TestCaseWithServer(TestCaseWithDB): def post_exp_cond(self, exps: list[Expected], - id_: int, payload: dict[str, object], - path_suffix: str = '', - redir_suffix: str = '' + id_: int = 1, + post_to_id: bool = True, + redir_to_id: bool = True ) -> None: """POST /condition(s), appropriately update Expecteds.""" # pylint: disable=too-many-arguments - path = f'/condition{path_suffix}' - redir = f'/condition{redir_suffix}' - self.check_post(payload, path, redir=redir) + target = f'/condition?id={id_}' if post_to_id else '/condition' + redir = f'/condition?id={id_}' if redir_to_id else '/conditions' + self.check_post(payload, target, redir=redir) for exp in exps: exp.set_cond_from_post(id_, payload) -- 2.30.2 From 14e7f26613b8ac213a1b82370a153f81df7726cf Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Mon, 12 Aug 2024 13:58:27 +0200 Subject: [PATCH 10/16] Harmonize treatment of GET /[item]?id=. --- plomtask/db.py | 9 ++++-- plomtask/http.py | 8 ++++-- tests/conditions.py | 3 +- tests/days.py | 4 +-- tests/misc.py | 3 +- tests/processes.py | 12 ++------ tests/todos.py | 9 ++---- tests/utils.py | 67 +++++++++++++++++++++++++++++---------------- 8 files changed, 65 insertions(+), 50 deletions(-) diff --git a/plomtask/db.py b/plomtask/db.py index 1fdd3e1..f067cd3 100644 --- a/plomtask/db.py +++ b/plomtask/db.py @@ -5,7 +5,8 @@ from os.path import isfile from difflib import Differ from sqlite3 import connect as sql_connect, Cursor, Row from typing import Any, Self, TypeVar, Generic, Callable -from plomtask.exceptions import HandledException, NotFoundException +from plomtask.exceptions import (HandledException, NotFoundException, + BadFormatException) from plomtask.dating import valid_date EXPECTED_DB_VERSION = 5 @@ -246,10 +247,10 @@ class BaseModel(Generic[BaseModelId]): def __init__(self, id_: BaseModelId | None) -> None: if isinstance(id_, int) and id_ < 1: msg = f'illegal {self.__class__.__name__} ID, must be >=1: {id_}' - raise HandledException(msg) + raise BadFormatException(msg) if isinstance(id_, str) and "" == id_: msg = f'illegal {self.__class__.__name__} ID, must be non-empty' - raise HandledException(msg) + raise BadFormatException(msg) self.id_ = id_ def __hash__(self) -> int: @@ -437,6 +438,8 @@ class BaseModel(Generic[BaseModelId]): """ obj = None if id_ is not None: + if isinstance(id_, int) and id_ == 0: + raise BadFormatException('illegal ID of value 0') obj = cls._get_cached(id_) if not obj: for row in db_conn.row_where(cls.table_name, 'id', id_): diff --git a/plomtask/http.py b/plomtask/http.py index 4426bba..e242a36 100644 --- a/plomtask/http.py +++ b/plomtask/http.py @@ -50,7 +50,7 @@ class InputsParser: """Retrieve list of int values at key.""" all_str = self.get_all_str(key) try: - return [int(s) for s in all_str if len(s) > 0] + return [int(s) for s in all_str] except ValueError as e: msg = f'cannot int a form field value for key {key} in: {all_str}' raise BadFormatException(msg) from e @@ -305,7 +305,9 @@ class TaskHandler(BaseHTTPRequestHandler): # pylint: disable=protected-access # (because pylint here fails to detect the use of wrapper as a # method to self with respective access privileges) - id_ = self._params.get_int_or_none('id') + id_ = None + for val in self._params.get_all_int('id'): + id_ = val if target_class.can_create_by_id: item = target_class.by_id_or_create(self._conn, id_) else: @@ -348,7 +350,7 @@ class TaskHandler(BaseHTTPRequestHandler): def do_GET_day(self) -> dict[str, object]: """Show single Day of ?date=.""" - date = self._params.get_str_or_fail('date', date_in_n_days(0)) + date = self._params.get_str('date', date_in_n_days(0)) make_type = self._params.get_str_or_fail('make_type', 'full') # day = Day.by_id_or_create(self._conn, date) diff --git a/tests/conditions.py b/tests/conditions.py index a9b28bb..58fa18b 100644 --- a/tests/conditions.py +++ b/tests/conditions.py @@ -72,6 +72,7 @@ class ExpectedGetCondition(Expected): class TestsWithServer(TestCaseWithServer): """Module tests against our HTTP server/handler (and database).""" + checked_class = Condition def test_fail_POST_condition(self) -> None: """Test malformed/illegal POST /condition requests.""" @@ -152,8 +153,8 @@ class TestsWithServer(TestCaseWithServer): self.check_filter(exp, 'conditions', 'sort_by', 'is_active', [1, 2, 3]) self.check_filter(exp, 'conditions', 'sort_by', '-is_active', [3, 2, 1]) - # test pattern matching on title exp.set('sort_by', 'title') + # test pattern matching on title exp.lib_del('Condition', 1) self.check_filter(exp, 'conditions', 'pattern', 'ba', [2, 3]) # test pattern matching on description diff --git a/tests/days.py b/tests/days.py index aac150b..5edec50 100644 --- a/tests/days.py +++ b/tests/days.py @@ -159,12 +159,12 @@ class ExpectedGetDay(Expected): class TestsWithServer(TestCaseWithServer): """Tests against our HTTP server/handler (and database).""" + checked_class = Day def test_basic_GET_day(self) -> None: """Test basic (no Processes/Conditions/Todos) GET /day basics.""" # check illegal date parameters - self.check_get('/day?date=', 400) - self.check_get('/day?date=foo', 400) + self.check_get_defaults('/day', '2024-01-01', 'date') self.check_get('/day?date=2024-02-30', 400) # check undefined day date = _testing_date_in_n_days(0) diff --git a/tests/misc.py b/tests/misc.py index 86474c7..8159124 100644 --- a/tests/misc.py +++ b/tests/misc.py @@ -147,7 +147,8 @@ class TestsSansServer(TestCase): parser = InputsParser({'foo': []}) self.assertEqual([], parser.get_all_int('foo')) parser = InputsParser({'foo': ['']}) - self.assertEqual([], parser.get_all_int('foo')) + with self.assertRaises(BadFormatException): + parser.get_all_int('foo') parser = InputsParser({'foo': ['0']}) self.assertEqual([0], parser.get_all_int('foo')) parser = InputsParser({'foo': ['0', '17']}) diff --git a/tests/processes.py b/tests/processes.py index 24a62bd..2561fbb 100644 --- a/tests/processes.py +++ b/tests/processes.py @@ -284,6 +284,7 @@ class ExpectedGetProcesses(Expected): class TestsWithServer(TestCaseWithServer): """Module tests against our HTTP server/handler (and database).""" + checked_class = Process def _post_process(self, id_: int = 1, form_data: dict[str, Any] | None = None @@ -404,19 +405,10 @@ class TestsWithServer(TestCaseWithServer): p = p_min | {'kept_steps': [1, 2, 3], 'new_step_to_2': 5, 'step_of': 6} self.check_post(p, url, 400) - def test_GET(self) -> None: - """Test /process and /processes response codes.""" - self.check_get('/process', 200) - self.check_get('/process?id=', 200) - self.check_get('/process?id=1', 200) - self.check_get_defaults('/process') - self.check_get('/processes', 200) - def test_fail_GET_process(self) -> None: """Test invalid GET /process params.""" # check for invalid IDs - self.check_get('/process?id=foo', 400) - self.check_get('/process?id=0', 500) + self.check_get_defaults('/process') # check we catch invalid base64 self.check_get('/process?title_b64=foo', 400) # check failure on references to unknown processes; we create Process diff --git a/tests/todos.py b/tests/todos.py index f048d46..9f3874d 100644 --- a/tests/todos.py +++ b/tests/todos.py @@ -266,6 +266,7 @@ class ExpectedGetTodo(Expected): class TestsWithServer(TestCaseWithServer): """Tests against our HTTP server/handler (and database).""" + checked_class = Todo def _post_exp_todo( self, id_: int, payload: dict[str, Any], exp: Expected) -> None: @@ -278,7 +279,7 @@ class TestsWithServer(TestCaseWithServer): # test we cannot just POST into non-existing Todo self.check_post({}, '/todo', 404) self.check_post({}, '/todo?id=FOO', 400) - self.check_post({}, '/todo?id=0', 404) + self.check_post({}, '/todo?id=0', 400) self.check_post({}, '/todo?id=1', 404) # test malformed values on existing Todo self.post_exp_day([], {'new_todo': [1]}) @@ -463,11 +464,7 @@ class TestsWithServer(TestCaseWithServer): def test_GET_todo(self) -> None: """Test GET /todo response codes.""" # test malformed or illegal parameter values - self.check_get('/todo', 404) - self.check_get('/todo?id=', 404) - self.check_get('/todo?id=foo', 400) - self.check_get('/todo?id=0', 404) - self.check_get('/todo?id=2', 404) + self.check_get_defaults('/todo') # test all existing Processes are shown as available exp = ExpectedGetTodo(1) self.post_exp_process([exp], {}, 1) diff --git a/tests/utils.py b/tests/utils.py index 7945f61..75c7e50 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -35,17 +35,8 @@ class TestCaseAugmented(TestCase): default_init_kwargs: dict[str, Any] = {} @staticmethod - def _run_if_checked_class(f: Callable[..., None]) -> Callable[..., None]: - def wrapper(self: TestCase) -> None: - if hasattr(self, 'checked_class'): - f(self) - return wrapper - - @classmethod - def _run_on_versioned_attributes(cls, - f: Callable[..., None] + def _run_on_versioned_attributes(f: Callable[..., None] ) -> Callable[..., None]: - @cls._run_if_checked_class def wrapper(self: TestCase) -> None: assert isinstance(self, TestCaseAugmented) for attr_name in self.checked_class.to_save_versioned(): @@ -56,6 +47,23 @@ class TestCaseAugmented(TestCase): f(self, owner, attr_name, attr, default, to_set) return wrapper + @classmethod + def _run_if_sans_db(cls, f: Callable[..., None]) -> Callable[..., None]: + def wrapper(self: TestCaseSansDB) -> None: + if issubclass(cls, TestCaseSansDB): + f(self) + return wrapper + + @classmethod + def _run_if_with_db_but_not_server(cls, + f: Callable[..., None] + ) -> Callable[..., None]: + def wrapper(self: TestCaseWithDB) -> None: + if issubclass(cls, TestCaseWithDB) and\ + not issubclass(cls, TestCaseWithServer): + f(self) + return wrapper + @classmethod def _make_from_defaults(cls, id_: float | str | None) -> Any: return cls.checked_class(id_, **cls.default_init_kwargs) @@ -66,7 +74,7 @@ class TestCaseSansDB(TestCaseAugmented): legal_ids: list[str] | list[int] = [1, 5] illegal_ids: list[str] | list[int] = [0] - @TestCaseAugmented._run_if_checked_class + @TestCaseAugmented._run_if_sans_db def test_id_validation(self) -> None: """Test .id_ validation/setting.""" for id_ in self.illegal_ids: @@ -76,6 +84,7 @@ class TestCaseSansDB(TestCaseAugmented): obj = self._make_from_defaults(id_) self.assertEqual(obj.id_, id_) + @TestCaseAugmented._run_if_sans_db @TestCaseAugmented._run_on_versioned_attributes def test_versioned_set(self, _: Any, @@ -115,6 +124,7 @@ class TestCaseSansDB(TestCaseAugmented): attr.set(to_set[1]) self.assertEqual(timesorted_vals, expected) + @TestCaseAugmented._run_if_sans_db @TestCaseAugmented._run_on_versioned_attributes def test_versioned_newest(self, _: Any, @@ -134,6 +144,7 @@ class TestCaseSansDB(TestCaseAugmented): attr.set(default) self.assertEqual(attr.newest, default) + @TestCaseAugmented._run_if_sans_db @TestCaseAugmented._run_on_versioned_attributes def test_versioned_at(self, _: Any, @@ -277,6 +288,7 @@ class TestCaseWithDB(TestCaseAugmented): self.assertEqual(start, end) self.assertEqual(items, [obj_today]) + @TestCaseAugmented._run_if_with_db_but_not_server @TestCaseAugmented._run_on_versioned_attributes def test_saving_versioned_attributes(self, owner: Any, @@ -318,7 +330,7 @@ class TestCaseWithDB(TestCaseAugmented): attr_vals_saved = retrieve_attr_vals(attr) self.assertEqual(to_set, attr_vals_saved) - @TestCaseAugmented._run_if_checked_class + @TestCaseAugmented._run_if_with_db_but_not_server def test_saving_and_caching(self) -> None: """Test effects of .cache() and .save().""" id1 = self.default_ids[0] @@ -353,7 +365,7 @@ class TestCaseWithDB(TestCaseAugmented): with self.assertRaises(HandledException): obj1.save(self.db_conn) - @TestCaseAugmented._run_if_checked_class + @TestCaseAugmented._run_if_with_db_but_not_server def test_by_id(self) -> None: """Test .by_id().""" id1, id2, _ = self.default_ids @@ -369,7 +381,7 @@ class TestCaseWithDB(TestCaseAugmented): obj2.save(self.db_conn) self.assertEqual(obj2, self.checked_class.by_id(self.db_conn, id2)) - @TestCaseAugmented._run_if_checked_class + @TestCaseAugmented._run_if_with_db_but_not_server def test_by_id_or_create(self) -> None: """Test .by_id_or_create.""" # check .by_id_or_create fails if wrong class @@ -392,7 +404,7 @@ class TestCaseWithDB(TestCaseAugmented): self.checked_class.by_id(self.db_conn, item.id_) self.assertEqual(self.checked_class(item.id_), item) - @TestCaseAugmented._run_if_checked_class + @TestCaseAugmented._run_if_with_db_but_not_server def test_from_table_row(self) -> None: """Test .from_table_row() properly reads in class directly from DB.""" id_ = self.default_ids[0] @@ -416,6 +428,7 @@ class TestCaseWithDB(TestCaseAugmented): self.assertEqual({retrieved.id_: retrieved}, self.checked_class.get_cache()) + @TestCaseAugmented._run_if_with_db_but_not_server @TestCaseAugmented._run_on_versioned_attributes def test_versioned_history_from_row(self, owner: Any, @@ -439,7 +452,7 @@ class TestCaseWithDB(TestCaseAugmented): for timestamp, value in attr.history.items(): self.assertEqual(value, loaded_attr.history[timestamp]) - @TestCaseAugmented._run_if_checked_class + @TestCaseAugmented._run_if_with_db_but_not_server def test_all(self) -> None: """Test .all() and its relation to cache and savings.""" id1, id2, id3 = self.default_ids @@ -457,7 +470,7 @@ class TestCaseWithDB(TestCaseAugmented): self.assertEqual(sorted(self.checked_class.all(self.db_conn)), sorted([item1, item2, item3])) - @TestCaseAugmented._run_if_checked_class + @TestCaseAugmented._run_if_with_db_but_not_server def test_singularity(self) -> None: """Test pointers made for single object keep pointing to it.""" id1 = self.default_ids[0] @@ -469,6 +482,7 @@ class TestCaseWithDB(TestCaseAugmented): retrieved = self.checked_class.by_id(self.db_conn, id1) self.assertEqual(new_attr, getattr(retrieved, attr_name)) + @TestCaseAugmented._run_if_with_db_but_not_server @TestCaseAugmented._run_on_versioned_attributes def test_versioned_singularity(self, owner: Any, @@ -485,7 +499,7 @@ class TestCaseWithDB(TestCaseAugmented): attr_retrieved = getattr(retrieved, attr_name) self.assertEqual(attr.history, attr_retrieved.history) - @TestCaseAugmented._run_if_checked_class + @TestCaseAugmented._run_if_with_db_but_not_server def test_remove(self) -> None: """Test .remove() effects on DB and cache.""" id_ = self.default_ids[0] @@ -947,13 +961,18 @@ class TestCaseWithServer(TestCaseWithDB): else: self.assertEqual(self.conn.getresponse().status, expected_code) - def check_get_defaults(self, path: str) -> None: + def check_get_defaults(self, + path: str, + default_id: str = '1', + id_name: str = 'id' + ) -> None: """Some standard model paths to test.""" - self.check_get(path, 200) - self.check_get(f'{path}?id=', 200) - self.check_get(f'{path}?id=foo', 400) - self.check_get(f'/{path}?id=0', 500) - self.check_get(f'{path}?id=1', 200) + nonexist_status = 200 if self.checked_class.can_create_by_id else 404 + self.check_get(path, nonexist_status) + self.check_get(f'{path}?{id_name}=', 400) + self.check_get(f'{path}?{id_name}=foo', 400) + self.check_get(f'/{path}?{id_name}=0', 400) + self.check_get(f'{path}?{id_name}={default_id}', nonexist_status) def check_json_get(self, path: str, expected: Expected) -> None: """Compare JSON on GET path with expected. -- 2.30.2 From 48c40ed9d42ed8f1d6607b8fd5d31bbb1ac52f29 Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Mon, 12 Aug 2024 15:24:39 +0200 Subject: [PATCH 11/16] Fix bug of /day POSTS breaking on empty new_todo fields. --- plomtask/http.py | 6 +++--- tests/days.py | 8 ++++---- tests/misc.py | 5 +++-- tests/utils.py | 2 +- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/plomtask/http.py b/plomtask/http.py index e242a36..bb6e4ed 100644 --- a/plomtask/http.py +++ b/plomtask/http.py @@ -46,11 +46,11 @@ class InputsParser: return [] return self.inputs[key] - def get_all_int(self, key: str) -> list[int]: + def get_all_int(self, key: str, fail_on_empty: bool = False) -> list[int]: """Retrieve list of int values at key.""" all_str = self.get_all_str(key) try: - return [int(s) for s in all_str] + return [int(s) for s in all_str if fail_on_empty or s != ''] except ValueError as e: msg = f'cannot int a form field value for key {key} in: {all_str}' raise BadFormatException(msg) from e @@ -306,7 +306,7 @@ class TaskHandler(BaseHTTPRequestHandler): # (because pylint here fails to detect the use of wrapper as a # method to self with respective access privileges) id_ = None - for val in self._params.get_all_int('id'): + for val in self._params.get_all_int('id', fail_on_empty=True): id_ = val if target_class.can_create_by_id: item = target_class.by_id_or_create(self._conn, id_) diff --git a/tests/days.py b/tests/days.py index 5edec50..f79284c 100644 --- a/tests/days.py +++ b/tests/days.py @@ -240,16 +240,16 @@ class TestsWithServer(TestCaseWithServer): """Test basic (no Processes/Conditions/Todos) POST /day. Check POST requests properly parse 'today', 'tomorrow', 'yesterday', - and actual date strings; - preserve 'make_type' setting in redirect even if nonsensical; - and store 'day_comment'. + and actual date strings; store 'day_comment'; preserve 'make_type' + setting in redirect even if nonsensical; and allow '' as 'new_todo'. """ for name, dist, test_str in [('2024-01-01', None, 'a'), ('today', 0, 'b'), ('yesterday', -1, 'c'), ('tomorrow', +1, 'd')]: date = name if dist is None else _testing_date_in_n_days(dist) - post = {'day_comment': test_str, 'make_type': f'x:{test_str}'} + post = {'day_comment': test_str, 'make_type': f'x:{test_str}', + 'new_todo': ['', '']} post_url = f'/day?date={name}' redir_url = f'{post_url}&make_type={post["make_type"]}' self.check_post(post, post_url, 302, redir_url) diff --git a/tests/misc.py b/tests/misc.py index 8159124..3aa1314 100644 --- a/tests/misc.py +++ b/tests/misc.py @@ -140,15 +140,16 @@ class TestsSansServer(TestCase): parser = InputsParser({'foo': ['bar', 'baz']}) self.assertEqual(['bar', 'baz'], parser.get_all_str('foo')) - def test_InputsParser_strict_get_all_int(self) -> None: + def test_InputsParser_get_all_int(self) -> None: """Test InputsParser.get_all_int.""" parser = InputsParser({}) self.assertEqual([], parser.get_all_int('foo')) parser = InputsParser({'foo': []}) self.assertEqual([], parser.get_all_int('foo')) parser = InputsParser({'foo': ['']}) + parser.get_all_int('foo') with self.assertRaises(BadFormatException): - parser.get_all_int('foo') + parser.get_all_int('foo', fail_on_empty=True) parser = InputsParser({'foo': ['0']}) self.assertEqual([0], parser.get_all_int('foo')) parser = InputsParser({'foo': ['0', '17']}) diff --git a/tests/utils.py b/tests/utils.py index 75c7e50..9d5f88e 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -679,7 +679,7 @@ class Expected: for todo in self.lib_all('Todo'): if next_id <= todo['id']: next_id = todo['id'] + 1 - for proc_id in sorted(v): + for proc_id in sorted([id_ for id_ in v if id_]): todo = self.todo_as_dict(next_id, proc_id, date) self.lib_set('Todo', [todo]) next_id += 1 -- 2.30.2 From caf992b4dbeb534d0da59f7a76b82bab30a2d0c1 Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Tue, 13 Aug 2024 06:24:05 +0200 Subject: [PATCH 12/16] Extend and refactor tests. --- tests/conditions.py | 9 ++--- tests/days.py | 77 +++++++++++++++++++++----------------- tests/processes.py | 5 +-- tests/todos.py | 90 +++++++++++++++++++++++++-------------------- tests/utils.py | 11 +++++- 5 files changed, 107 insertions(+), 85 deletions(-) diff --git a/tests/conditions.py b/tests/conditions.py index 58fa18b..3b64959 100644 --- a/tests/conditions.py +++ b/tests/conditions.py @@ -77,13 +77,10 @@ class TestsWithServer(TestCaseWithServer): def test_fail_POST_condition(self) -> None: """Test malformed/illegal POST /condition requests.""" # check incomplete POST payloads - url = '/condition' - self.check_post({}, url, 400) - self.check_post({'title': ''}, url, 400) - self.check_post({'description': ''}, url, 400) + valid_payload = {'title': '', 'description': ''} + self.check_minimal_inputs('/condition', valid_payload) # check valid POST payload on bad paths - valid_payload = {'title': '', 'description': '', 'is_active': 0} - self.check_post(valid_payload, f'{url}?id=foo', 400) + self.check_post(valid_payload, '/condition?id=foo', 400) def test_POST_condition(self) -> None: """Test (valid) POST /condition and its effect on GET /condition[s].""" diff --git a/tests/days.py b/tests/days.py index f79284c..c195237 100644 --- a/tests/days.py +++ b/tests/days.py @@ -56,7 +56,9 @@ class TestsWithDB(TestCaseWithDB): def test_Day_with_filled_gaps(self) -> None: """Test .with_filled_gaps.""" - def test(range_indexes: tuple[int, int], indexes_to_provide: list[int] + def expect_within_full_range_as_commented( + range_indexes: tuple[int, int], + indexes_to_provide: list[int] ) -> None: start_i, end_i = range_indexes days_provided = [] @@ -78,32 +80,24 @@ class TestsWithDB(TestCaseWithDB): days_with_comment = [Day(date, comment=date[-1:]) for date in dates] days_sans_comment = [Day(date, comment='') for date in dates] # check provided Days recognizable in (full-range) interval - test((0, 8), [0, 4, 8]) + expect_within_full_range_as_commented((0, 8), [0, 4, 8]) # check limited range, but limiting Days provided - test((2, 6), [2, 5, 6]) + expect_within_full_range_as_commented((2, 6), [2, 5, 6]) # check Days within range but beyond provided Days also filled in - test((1, 7), [2, 5]) + expect_within_full_range_as_commented((1, 7), [2, 5]) # check provided Days beyond range ignored - test((3, 5), [1, 2, 4, 6, 7]) + expect_within_full_range_as_commented((3, 5), [1, 2, 4, 6, 7]) # check inversion of start_date and end_date returns empty list - test((5, 3), [2, 4, 6]) + expect_within_full_range_as_commented((5, 3), [2, 4, 6]) # check empty provision still creates filler elements in interval - test((3, 5), []) + expect_within_full_range_as_commented((3, 5), []) # check single-element selection creating only filler beyond provided - test((1, 1), [2, 4, 6]) + expect_within_full_range_as_commented((1, 1), [2, 4, 6]) # check (un-saved) filler Days don't show up in cache or DB - # dates = [f'2024-02-0{n}' for n in range(1, 6)] day = Day(dates[3]) day.save(self.db_conn) self.checked_class.with_filled_gaps([day], dates[0], dates[-1]) self.check_identity_with_cache_and_db([day]) - # check 'today', 'yesterday', 'tomorrow' are interpreted - yesterday = Day('yesterday') - tomorrow = Day('tomorrow') - today = Day('today') - result = self.checked_class.with_filled_gaps([today], 'yesterday', - 'tomorrow') - self.assertEqual(result, [yesterday, today, tomorrow]) class ExpectedGetCalendar(Expected): @@ -167,10 +161,9 @@ class TestsWithServer(TestCaseWithServer): self.check_get_defaults('/day', '2024-01-01', 'date') self.check_get('/day?date=2024-02-30', 400) # check undefined day - date = _testing_date_in_n_days(0) - exp = ExpectedGetDay(date) + exp = ExpectedGetDay(_testing_date_in_n_days(0)) self.check_json_get('/day', exp) - # check defined day, with and without make_type parameter + # check defined day with make_type parameter date = '2024-01-01' exp = ExpectedGetDay(date) exp.set('make_type', 'bar') @@ -185,22 +178,19 @@ class TestsWithServer(TestCaseWithServer): """Test malformed/illegal POST /day requests.""" # check payloads lacking minimum expecteds url = '/day?date=2024-01-01' - self.check_post({}, url, 400) - self.check_post({'day_comment': ''}, url, 400) - self.check_post({'make_type': ''}, url, 400) + minimal_post = {'make_type': '', 'day_comment': ''} + self.check_minimal_inputs(url, minimal_post) # to next check illegal new_todo values, we need an actual Process self.post_exp_process([], {}, 1) # check illegal new_todo values - post: dict[str, object] - post = {'make_type': '', 'day_comment': '', 'new_todo': ['foo']} - self.check_post(post, url, 400) - post['new_todo'] = [1, 2] # no Process of .id_=2 exists + self.check_post(minimal_post | {'new_todo': ['foo']}, url, 400) + self.check_post(minimal_post | {'new_todo': [1, 2]}, url, 404) # to next check illegal old_todo inputs, we need to first post Todo - post['new_todo'] = [1] - self.check_post(post, url, 302, '/day?date=2024-01-01&make_type=') + self.check_post(minimal_post | {'new_todo': [1]}, url, 302, + '/day?date=2024-01-01&make_type=') # check illegal old_todo inputs (equal list lengths though) - post = {'make_type': '', 'day_comment': '', 'comment': ['foo'], - 'effort': [3.3], 'done': [], 'todo_id': [1]} + post = minimal_post | {'comment': ['foo'], 'effort': [3.3], + 'done': [], 'todo_id': [1]} self.check_post(post, url, 302, '/day?date=2024-01-01&make_type=') post['todo_id'] = [2] # reference to non-existant Process self.check_post(post, url, 404) @@ -268,7 +258,7 @@ class TestsWithServer(TestCaseWithServer): for i, proc_post in enumerate(proc_posts): self.post_exp_process([exp], proc_post, i+1) self.check_json_get(f'/day?date={date}', exp) - # post Todos of either process and check their display + # post Todos of either Process and check their display self.post_exp_day([exp], {'new_todo': [1, 2]}) self.check_json_get(f'/day?date={date}', exp) # test malformed Todo manipulation posts @@ -302,7 +292,8 @@ class TestsWithServer(TestCaseWithServer): # create two Processes, with second one step of first one self.post_exp_process([exp], {}, 2) self.post_exp_process([exp], {'new_top_step': 2}, 1) - exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 2, None)]) + exp.lib_set('ProcessStep', [ + exp.procstep_as_dict(1, owner_id=1, step_process_id=2)]) self.check_json_get(f'/day?date={date}', exp) # post Todo of adopting Process, with make_type=full self.post_exp_day([exp], {'make_type': 'full', 'new_todo': [1]}) @@ -325,7 +316,7 @@ class TestsWithServer(TestCaseWithServer): 'children': []}]}] exp.force('top_nodes', top_nodes) self.check_json_get(f'/day?date={date}', exp) - # post another Todo of adopting Process, make_type=empty + # post another Todo of adopting Process, no adopt with make_type=empty self.post_exp_day([exp], {'make_type': 'empty', 'new_todo': [1]}) exp.lib_set('Todo', [exp.todo_as_dict(4, 1)]) top_nodes += [{'todo': 4, @@ -340,7 +331,8 @@ class TestsWithServer(TestCaseWithServer): exp = ExpectedGetDay(date) self.post_exp_process([exp], {}, 2) self.post_exp_process([exp], {'new_top_step': 2}, 1) - exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 2, None)]) + exp.lib_set('ProcessStep', [ + exp.procstep_as_dict(1, owner_id=1, step_process_id=2)]) # make-full-day-post batch of Todos of both Processes in one order …, self.post_exp_day([exp], {'make_type': 'full', 'new_todo': [1, 2]}) top_nodes: list[dict[str, Any]] = [{'todo': 1, @@ -364,6 +356,23 @@ class TestsWithServer(TestCaseWithServer): exp.lib_get('Todo', 3)['children'] = [4] self.check_json_get(f'/day?date={date}', exp) + def test_POST_day_todo_deletion_by_negative_effort(self) -> None: + """Test POST /day removal of Todos by setting negative effort.""" + date = '2024-01-01' + exp = ExpectedGetDay(date) + self.post_exp_process([exp], {}, 1) + self.post_exp_day([exp], {'new_todo': [1]}) + # check cannot remove Todo if commented + self.post_exp_day([exp], + {'todo_id': [1], 'comment': ['foo'], 'effort': [-1]}) + self.check_json_get(f'/day?date={date}', exp) + # check *can* remove Todo while getting done + self.post_exp_day([exp], + {'todo_id': [1], 'comment': [''], 'effort': [-1], + 'done': [1]}) + exp.lib_del('Todo', 1) + self.check_json_get(f'/day?date={date}', exp) + def test_GET_day_with_conditions(self) -> None: """Test GET /day displaying Conditions and their relations.""" date = '2024-01-01' diff --git a/tests/processes.py b/tests/processes.py index 2561fbb..0c2c1a9 100644 --- a/tests/processes.py +++ b/tests/processes.py @@ -300,10 +300,7 @@ class TestsWithServer(TestCaseWithServer): """Test POST /process and its effect on the database.""" valid_post = {'title': '', 'description': '', 'effort': 1.0} # check payloads lacking minimum expecteds - self.check_post({}, '/process', 400) - self.check_post({'title': '', 'description': ''}, '/process', 400) - self.check_post({'title': '', 'effort': 1}, '/process', 400) - self.check_post({'description': '', 'effort': 1}, '/process', 400) + self.check_minimal_inputs('/process', valid_post) # check payloads of bad data types self.check_post(valid_post | {'effort': ''}, '/process', 400) # check references to non-existant items diff --git a/tests/todos.py b/tests/todos.py index 9f3874d..6118ab7 100644 --- a/tests/todos.py +++ b/tests/todos.py @@ -251,14 +251,14 @@ class ExpectedGetTodo(Expected): @staticmethod def step_as_dict(node_id: int, - children: list[dict[str, object]], process: int | None = None, todo: int | None = None, fillable: bool = False, + children: None | list[dict[str, object]] = None ) -> dict[str, object]: """Return JSON of TodoOrProcStepsNode to expect.""" return {'node_id': node_id, - 'children': children, + 'children': children if children is not None else [], 'process': process, 'fillable': fillable, 'todo': todo} @@ -371,7 +371,8 @@ class TestsWithServer(TestCaseWithServer): self.post_exp_day([exp], {'new_todo': [1]}) self.post_exp_day([exp], {'new_todo': [1]}) self._post_exp_todo(1, {'adopt': 2}, exp) - exp.set('steps_todo_to_process', [exp.step_as_dict(1, [], todo=2)]) + exp.set('steps_todo_to_process', [ + exp.step_as_dict(node_id=1, process=None, todo=2)]) self.check_json_get('/todo?id=1', exp) # test Todo un-adopting by just not sending an adopt self._post_exp_todo(1, {}, exp) @@ -393,19 +394,20 @@ class TestsWithServer(TestCaseWithServer): self.post_exp_process([exp], {}, 2) self.post_exp_process([exp], {}, 3) self.post_exp_process([exp], {'new_top_step': [2, 3]}, 1) - exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 2), - exp.procstep_as_dict(2, 1, 3)]) - step1_proc2 = exp.step_as_dict(1, [], 2, None, True) - step2_proc3 = exp.step_as_dict(2, [], 3, None, True) - exp.set('steps_todo_to_process', [step1_proc2, step2_proc3]) + exp.lib_set('ProcessStep', + [exp.procstep_as_dict(1, owner_id=1, step_process_id=2), + exp.procstep_as_dict(2, owner_id=1, step_process_id=3)]) + slots = [ + exp.step_as_dict(node_id=1, process=2, todo=None, fillable=True), + exp.step_as_dict(node_id=2, process=3, todo=None, fillable=True)] + exp.set('steps_todo_to_process', slots) self.post_exp_day([exp], {'new_todo': [2]}) self.post_exp_day([exp], {'new_todo': [3]}) self.check_json_get('/todo?id=1', exp) self._post_exp_todo(1, {'step_filler_to_1': 5, 'adopt': [4]}, exp) exp.lib_get('Todo', 1)['children'] += [5] - step1_proc2 = exp.step_as_dict(1, [], 2, 4, True) - step2_proc3 = exp.step_as_dict(2, [], 3, 5, True) - exp.set('steps_todo_to_process', [step1_proc2, step2_proc3]) + slots[0]['todo'] = 4 + slots[1]['todo'] = 5 self.check_json_get('/todo?id=1', exp) # test 'ignore' values for 'step_filler' are ignored, and intable # 'step_filler' values are interchangeable with those of 'adopt' @@ -416,15 +418,14 @@ class TestsWithServer(TestCaseWithServer): # creating new top-level steps when adopting of respective Process self.post_exp_process([exp], {}, 4) self.post_exp_process([exp], {'new_top_step': 4, 'step_of': [1]}, 3) - exp.lib_set('ProcessStep', [exp.procstep_as_dict(3, 3, 4)]) - step3_proc4 = exp.step_as_dict(3, [], 4, None, True) - step2_proc3 = exp.step_as_dict(2, [step3_proc4], 3, 5, True) - exp.set('steps_todo_to_process', [step1_proc2, step2_proc3]) + exp.lib_set('ProcessStep', + [exp.procstep_as_dict(3, owner_id=3, step_process_id=4)]) + slots[1]['children'] = [exp.step_as_dict( + node_id=3, process=4, todo=None, fillable=True)] self.post_exp_day([exp], {'new_todo': [4]}) self._post_exp_todo(1, {'adopt': [4, 5, 6]}, exp) - step4_todo6 = exp.step_as_dict(4, [], None, 6, False) - exp.set('steps_todo_to_process', [step1_proc2, step2_proc3, - step4_todo6]) + slots += [exp.step_as_dict( + node_id=4, process=None, todo=6, fillable=False)] self.check_json_get('/todo?id=1', exp) def test_POST_todo_make_empty(self) -> None: @@ -434,29 +435,33 @@ class TestsWithServer(TestCaseWithServer): self.post_exp_process([exp], {}, 1) for i in range(1, 4): self.post_exp_process([exp], {'new_top_step': i}, i+1) - exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 2, 1), - exp.procstep_as_dict(2, 3, 2), - exp.procstep_as_dict(3, 4, 3)]) + exp.lib_set('ProcessStep', + [exp.procstep_as_dict(1, owner_id=2, step_process_id=1), + exp.procstep_as_dict(2, owner_id=3, step_process_id=2), + exp.procstep_as_dict(3, owner_id=4, step_process_id=3)]) # post (childless) Todo of chain end, then make empty on next in line self.post_exp_day([exp], {'new_todo': [4]}) - step3_proc1 = exp.step_as_dict(3, [], 1) - step2_proc2 = exp.step_as_dict(2, [step3_proc1], 2) - step1_proc3 = exp.step_as_dict(1, [step2_proc2], 3, None, True) - exp.set('steps_todo_to_process', [step1_proc3]) + slots = [exp.step_as_dict( + node_id=1, process=3, todo=None, fillable=True, + children=[exp.step_as_dict( + node_id=2, process=2, todo=None, fillable=False, + children=[exp.step_as_dict( + node_id=3, process=1, todo=None, fillable=False)])])] + exp.set('steps_todo_to_process', slots) self.check_json_get('/todo?id=1', exp) self.check_post({'step_filler_to_1': 'make_3'}, '/todo?id=1') exp.set_todo_from_post(2, {'process_id': 3}) exp.set_todo_from_post(1, {'process_id': 4, 'children': [2]}) - step2_proc2 = exp.step_as_dict(2, [step3_proc1], 2, None, True) - step1_proc3 = exp.step_as_dict(1, [step2_proc2], 3, 2, True) - exp.set('steps_todo_to_process', [step1_proc3]) + slots[0]['todo'] = 2 + assert isinstance(slots[0]['children'], list) + slots[0]['children'][0]['fillable'] = True self.check_json_get('/todo?id=1', exp) # make new top-level Todo without chain implied by its Process self.check_post({'make_empty': 2, 'adopt': [2]}, '/todo?id=1') exp.set_todo_from_post(3, {'process_id': 2}) exp.set_todo_from_post(1, {'process_id': 4, 'children': [2, 3]}) - step4_todo3 = exp.step_as_dict(4, [], None, 3) - exp.set('steps_todo_to_process', [step1_proc3, step4_todo3]) + slots += [exp.step_as_dict( + node_id=4, process=None, todo=3, fillable=False)] self.check_json_get('/todo?id=1', exp) # fail on trying to call make_empty on non-existing Process self.check_post({'make_full': 5}, '/todo?id=1', 404) @@ -477,20 +482,25 @@ class TestsWithServer(TestCaseWithServer): self.post_exp_process([exp], {'new_top_step': 2}, 1) self.post_exp_process([exp], {'new_top_step': 3, 'step_of': [1]}, 2) self.post_exp_process([exp], {'new_top_step': 4, 'step_of': [2]}, 3) - exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 2, None), - exp.procstep_as_dict(2, 2, 3, None), - exp.procstep_as_dict(3, 3, 4, None)]) - step3_proc4 = exp.step_as_dict(3, [], 4) - step2_proc3 = exp.step_as_dict(2, [step3_proc4], 3) - step1_proc2 = exp.step_as_dict(1, [step2_proc3], 2, fillable=True) - exp.set('steps_todo_to_process', [step1_proc2]) + exp.lib_set('ProcessStep', [ + exp.procstep_as_dict(1, owner_id=1, step_process_id=2), + exp.procstep_as_dict(2, owner_id=2, step_process_id=3), + exp.procstep_as_dict(3, owner_id=3, step_process_id=4)]) + slots = [exp.step_as_dict( + node_id=1, process=2, todo=None, fillable=True, + children=[exp.step_as_dict( + node_id=2, process=3, todo=None, fillable=False, + children=[exp.step_as_dict( + node_id=3, process=4, todo=None, fillable=False)])])] + exp.set('steps_todo_to_process', slots) self.check_json_get('/todo?id=1', exp) # test display of parallel chains proc_steps_post = {'new_top_step': 4, 'kept_steps': [1, 3]} self.post_exp_process([], proc_steps_post, 1) - step4_proc4 = exp.step_as_dict(4, [], 4, fillable=True) - exp.lib_set('ProcessStep', [exp.procstep_as_dict(4, 1, 4, None)]) - exp.set('steps_todo_to_process', [step1_proc2, step4_proc4]) + exp.lib_set('ProcessStep', [ + exp.procstep_as_dict(4, owner_id=1, step_process_id=4)]) + slots += [exp.step_as_dict( + node_id=4, process=4, todo=None, fillable=True)] self.check_json_get('/todo?id=1', exp) def test_POST_todo_doneness_relations(self) -> None: diff --git a/tests/utils.py b/tests/utils.py index 9d5f88e..e706ec0 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -679,7 +679,7 @@ class Expected: for todo in self.lib_all('Todo'): if next_id <= todo['id']: next_id = todo['id'] + 1 - for proc_id in sorted([id_ for id_ in v if id_]): + for proc_id in sorted([id_ for id_ in v if id_]): todo = self.todo_as_dict(next_id, proc_id, date) self.lib_set('Todo', [todo]) next_id += 1 @@ -947,6 +947,15 @@ class TestCaseWithServer(TestCaseWithDB): self.conn.request('GET', target) self.assertEqual(self.conn.getresponse().status, expected_code) + def check_minimal_inputs(self, + url: str, + minimal_inputs: dict[str, Any] + ) -> None: + """Check that url 400's unless all of minimal_inputs provided.""" + for to_hide in minimal_inputs.keys(): + to_post = {k: v for k, v in minimal_inputs.items() if k != to_hide} + self.check_post(to_post, url, 400) + def check_post(self, data: Mapping[str, object], target: str, expected_code: int = 302, redir: str = '') -> None: """Check that POST of data to target yields expected_code.""" -- 2.30.2 From e0681c1e366007ca7a662a855b16dd5e2f553da7 Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Fri, 16 Aug 2024 03:35:34 +0200 Subject: [PATCH 13/16] For various boolean settings, treat absence of form POST as setting False. --- plomtask/http.py | 27 ++++++++++---------------- tests/conditions.py | 4 ++++ tests/misc.py | 47 ++++++++++++++++----------------------------- tests/processes.py | 5 +++++ tests/todos.py | 43 +++-------------------------------------- tests/utils.py | 16 ++++++++------- 6 files changed, 48 insertions(+), 94 deletions(-) diff --git a/plomtask/http.py b/plomtask/http.py index bb6e4ed..e224ea0 100644 --- a/plomtask/http.py +++ b/plomtask/http.py @@ -82,12 +82,9 @@ class InputsParser: msg = f'cannot int form field value for key {key}: {val}' raise BadFormatException(msg) from e - def get_bool_or_none(self, key: str) -> bool | None: - """Return value to key if truish; if no value to key, None.""" - val = self.get_str(key) - if val is None: - return None - return val in {'True', 'true', '1', 'on'} + def get_bool(self, key: str) -> bool: + """Return if value to key truish; return False if None/no value.""" + return self.get_str(key) in {'True', 'true', '1', 'on'} def get_all_of_key_prefixed(self, key_prefix: str) -> dict[str, list[str]]: """Retrieve dict of strings at keys starting with key_prefix.""" @@ -655,11 +652,9 @@ class TaskHandler(BaseHTTPRequestHandler): for id_ in self._form.get_all_int('make_empty')]} step_fillers_to = self._form.get_all_of_key_prefixed('step_filler_to_') to_update: dict[str, Any] = { - 'comment': self._form.get_str_or_fail('comment', '')} - for k in ('is_done', 'calendarize'): - v = self._form.get_bool_or_none(k) - if v is not None: - to_update[k] = v + 'comment': self._form.get_str_or_fail('comment', ''), + 'is_done': self._form.get_bool('is_done'), + 'calendarize': self._form.get_bool('calendarize')} cond_rels = [self._form.get_all_int(name) for name in ['conditions', 'blockers', 'enables', 'disables']] effort_or_not = self._form.get_str('effort') @@ -752,7 +747,7 @@ class TaskHandler(BaseHTTPRequestHandler): 'effort': self._form.get_float_or_fail('effort')} cond_rels = [self._form.get_all_int(s) for s in ['conditions', 'blockers', 'enables', 'disables']] - calendarize = self._form.get_bool_or_none('calendarize') + calendarize = self._form.get_bool('calendarize') step_of = self._form.get_all_str('step_of') suppressions = self._form.get_all_int('suppresses') kept_steps = self._form.get_all_int('kept_steps') @@ -766,8 +761,7 @@ class TaskHandler(BaseHTTPRequestHandler): # for k, v in versioned.items(): getattr(process, k).set(v) - if calendarize is not None: - process.calendarize = calendarize + process.calendarize = calendarize process.save(self._conn) assert isinstance(process.id_, int) # set relations to Conditions and ProcessSteps / other Processes @@ -807,10 +801,9 @@ class TaskHandler(BaseHTTPRequestHandler): """Update/insert Condition of ?id= and fields defined in postvars.""" title = self._form.get_str_or_fail('title') description = self._form.get_str_or_fail('description') - is_active = self._form.get_bool_or_none('is_active') + is_active = self._form.get_bool('is_active') + condition.is_active = is_active # - if is_active is not None: - condition.is_active = is_active condition.title.set(title) condition.description.set(description) condition.save(self._conn) diff --git a/tests/conditions.py b/tests/conditions.py index 3b64959..c071e65 100644 --- a/tests/conditions.py +++ b/tests/conditions.py @@ -100,6 +100,10 @@ class TestsWithServer(TestCaseWithServer): 'is_active': 1}) self.check_json_get(url_single, exp_single) self.check_json_get(url_all, exp_all) + # test POST sans 'is_active' setting it negative + self.post_exp_cond(all_exps, {'title': 'bar', 'description': 'oof'}) + self.check_json_get(url_single, exp_single) + self.check_json_get(url_all, exp_all) # test deletion POST's effect, both to return id=1 into empty single, # full /conditions into empty list self.post_exp_cond(all_exps, {'delete': ''}, redir_to_id=False) diff --git a/tests/misc.py b/tests/misc.py index 3aa1314..a6df2e5 100644 --- a/tests/misc.py +++ b/tests/misc.py @@ -98,36 +98,23 @@ class TestsSansServer(TestCase): with self.assertRaises(BadFormatException): InputsParser({'foo': []}).get_float_or_fail('foo') - def test_InputsParser_get_bool_or_none(self) -> None: - """Test InputsParser.get_all_str.""" - parser = InputsParser({}) - self.assertEqual(None, parser.get_bool_or_none('foo')) - parser = InputsParser({'val': ['foo']}) - self.assertEqual(None, parser.get_bool_or_none('foo')) - parser = InputsParser({'val': ['True']}) - self.assertEqual(None, parser.get_bool_or_none('foo')) - parser = InputsParser({'foo': []}) - self.assertEqual(None, parser.get_bool_or_none('foo')) - parser = InputsParser({'foo': ['None']}) - self.assertEqual(False, parser.get_bool_or_none('foo')) - parser = InputsParser({'foo': ['0']}) - self.assertEqual(False, parser.get_bool_or_none('foo')) - parser = InputsParser({'foo': ['']}) - self.assertEqual(False, parser.get_bool_or_none('foo')) - parser = InputsParser({'foo': ['bar']}) - self.assertEqual(False, parser.get_bool_or_none('foo')) - parser = InputsParser({'foo': ['bar', 'baz']}) - self.assertEqual(False, parser.get_bool_or_none('foo')) - parser = InputsParser({'foo': ['False']}) - self.assertEqual(False, parser.get_bool_or_none('foo')) - parser = InputsParser({'foo': ['true']}) - self.assertEqual(True, parser.get_bool_or_none('foo')) - parser = InputsParser({'foo': ['True']}) - self.assertEqual(True, parser.get_bool_or_none('foo')) - parser = InputsParser({'foo': ['1']}) - self.assertEqual(True, parser.get_bool_or_none('foo')) - parser = InputsParser({'foo': ['on']}) - self.assertEqual(True, parser.get_bool_or_none('foo')) + def test_InputsParser_get_bool(self) -> None: + """Test InputsParser.get_bool.""" + self.assertEqual(0, InputsParser({}).get_bool('foo')) + self.assertEqual(0, InputsParser({'val': ['foo']}).get_bool('foo')) + self.assertEqual(0, InputsParser({'val': ['True']}).get_bool('foo')) + self.assertEqual(0, InputsParser({'foo': []}).get_bool('foo')) + self.assertEqual(0, InputsParser({'foo': ['None']}).get_bool('foo')) + self.assertEqual(0, InputsParser({'foo': ['0']}).get_bool('foo')) + self.assertEqual(0, InputsParser({'foo': ['']}).get_bool('foo')) + self.assertEqual(0, InputsParser({'foo': ['bar']}).get_bool('foo')) + self.assertEqual(0, InputsParser({'foo': ['bar', + 'baz']}).get_bool('foo')) + self.assertEqual(0, InputsParser({'foo': ['False']}).get_bool('foo')) + self.assertEqual(1, InputsParser({'foo': ['true']}).get_bool('foo')) + self.assertEqual(1, InputsParser({'foo': ['True']}).get_bool('foo')) + self.assertEqual(1, InputsParser({'foo': ['1']}).get_bool('foo')) + self.assertEqual(1, InputsParser({'foo': ['on']}).get_bool('foo')) def test_InputsParser_get_all_str(self) -> None: """Test InputsParser.get_all_str.""" diff --git a/tests/processes.py b/tests/processes.py index 0c2c1a9..0fa352d 100644 --- a/tests/processes.py +++ b/tests/processes.py @@ -324,6 +324,11 @@ class TestsWithServer(TestCaseWithServer): valid_post = {'title': 'foo', 'description': 'oof', 'effort': 2.3} self.post_exp_process([exp], valid_post, 1) self.check_json_get('/process?id=1', exp) + # check boolean 'calendarize' + self.post_exp_process([exp], valid_post | {'calendarize': True}, 1) + self.check_json_get('/process?id=1', exp) + self.post_exp_process([exp], valid_post, 1) + self.check_json_get('/process?id=1', exp) # check n_todos field self.post_exp_day([], {'new_todo': ['1']}, '2024-01-01') self.post_exp_day([], {'new_todo': ['1']}, '2024-01-02') diff --git a/tests/todos.py b/tests/todos.py index 6118ab7..ea61a33 100644 --- a/tests/todos.py +++ b/tests/todos.py @@ -5,8 +5,7 @@ from tests.utils import (TestCaseSansDB, TestCaseWithDB, TestCaseWithServer, from plomtask.todos import Todo, TodoNode from plomtask.processes import Process, ProcessStep from plomtask.conditions import Condition -from plomtask.exceptions import (NotFoundException, BadFormatException, - HandledException) +from plomtask.exceptions import BadFormatException, HandledException class TestsWithDB(TestCaseWithDB, TestCaseSansDB): @@ -31,26 +30,6 @@ class TestsWithDB(TestCaseWithDB, TestCaseSansDB): self.cond2.save(self.db_conn) self.default_init_kwargs['process'] = self.proc - def test_Todo_init(self) -> None: - """Test creation of Todo and what they default to.""" - process = Process(None) - with self.assertRaises(NotFoundException): - Todo(None, process, False, self.date1) - process.save(self.db_conn) - assert isinstance(self.cond1.id_, int) - assert isinstance(self.cond2.id_, int) - process.set_condition_relations(self.db_conn, - [self.cond1.id_, self.cond2.id_], [], - [self.cond1.id_], [self.cond2.id_]) - todo_no_id = Todo(None, process, False, self.date1) - self.assertEqual(todo_no_id.conditions, [self.cond1, self.cond2]) - self.assertEqual(todo_no_id.enables, [self.cond1]) - self.assertEqual(todo_no_id.disables, [self.cond2]) - todo_yes_id = Todo(5, process, False, self.date1) - self.assertEqual(todo_yes_id.conditions, []) - self.assertEqual(todo_yes_id.enables, []) - self.assertEqual(todo_yes_id.disables, []) - def test_Todo_by_date(self) -> None: """Test findability of Todos by date.""" t1 = Todo(None, self.proc, False, self.date1) @@ -66,21 +45,6 @@ class TestsWithDB(TestCaseWithDB, TestCaseSansDB): """Test .by_date_range_with_limits.""" self.check_by_date_range_with_limits('day') - def test_Todo_on_conditions(self) -> None: - """Test effect of Todos on Conditions.""" - assert isinstance(self.cond1.id_, int) - assert isinstance(self.cond2.id_, int) - todo = Todo(None, self.proc, False, self.date1) - todo.save(self.db_conn) - todo.set_condition_relations(self.db_conn, [], [], - [self.cond1.id_], [self.cond2.id_]) - todo.is_done = True - self.assertEqual(self.cond1.is_active, True) - self.assertEqual(self.cond2.is_active, False) - todo.is_done = False - self.assertEqual(self.cond1.is_active, True) - self.assertEqual(self.cond2.is_active, False) - def test_Todo_children(self) -> None: """Test Todo.children relations.""" todo_1 = Todo(None, self.proc, False, self.date1) @@ -308,9 +272,8 @@ class TestsWithServer(TestCaseWithServer): 'comment': 'foo', 'effort': 2.3} self._post_exp_todo(1, todo_post, exp) self.check_json_get('/todo?id=1', exp) - # test implicitly un-setting (only) comment by empty post - self.check_post({}, '/todo?id=1') - exp.lib_get('Todo', 1)['comment'] = '' + # test implicitly un-setting comment/calendarize/is_done by empty post + self._post_exp_todo(1, {}, exp) self.check_json_get('/todo?id=1', exp) # test effort post can be explicitly unset by "effort":"" post self.check_post({'effort': ''}, '/todo?id=1') diff --git a/tests/utils.py b/tests/utils.py index e706ec0..045194f 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -718,8 +718,8 @@ class Expected: return cond = self.lib_get('Condition', id_) if cond: - if 'is_active' in d: - cond['is_active'] = d['is_active'] + cond['is_active'] = 'is_active' in d and\ + d['is_active'] in VALID_TRUES for category in ['title', 'description']: history = cond['_versioned'][category] if len(history) > 0: @@ -766,7 +766,8 @@ class Expected: def set_todo_from_post(self, id_: int, d: dict[str, Any]) -> None: """Set Todo of id_ in library based on POST dict d.""" - corrected_kwargs: dict[str, Any] = {'children': []} + corrected_kwargs: dict[str, Any] = { + 'children': [], 'is_done': 0, 'calendarize': 0, 'comment': ''} for k, v in d.items(): if k.startswith('step_filler_to_'): continue @@ -774,8 +775,8 @@ class Expected: new_children = v if isinstance(v, list) else [v] corrected_kwargs['children'] += new_children continue - if k in {'is_done', 'calendarize'}: - v = v in VALID_TRUES + if k in {'is_done', 'calendarize'} and v in VALID_TRUES: + v = True corrected_kwargs[k] = v todo = self.lib_get('Todo', id_) if todo: @@ -846,12 +847,13 @@ class Expected: d['title'], d['description'], d['effort']) ignore = {'title', 'description', 'effort', 'new_top_step', 'step_of', 'kept_steps'} + proc['calendarize'] = False for k, v in d.items(): if k in ignore\ or k.startswith('step_') or k.startswith('new_step_to'): continue - if k in {'calendarize'}: - v = v in VALID_TRUES + if k in {'calendarize'} and v in VALID_TRUES: + v = True elif k in {'suppressed_steps', 'explicit_steps', 'conditions', 'disables', 'enables', 'blockers'}: if not isinstance(v, list): -- 2.30.2 From 607cd1e87c1ac45783b344f80632c860859a29b8 Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Sun, 18 Aug 2024 15:46:13 +0200 Subject: [PATCH 14/16] Minor test improvements/expansions/refactorings. --- tests/conditions.py | 46 +++++++++++++++------------------------------ tests/todos.py | 23 +++++++++-------------- tests/utils.py | 16 +++++++++++++++- 3 files changed, 39 insertions(+), 46 deletions(-) diff --git a/tests/conditions.py b/tests/conditions.py index c071e65..b35dc6e 100644 --- a/tests/conditions.py +++ b/tests/conditions.py @@ -3,9 +3,6 @@ from typing import Any from tests.utils import (TestCaseSansDB, TestCaseWithDB, TestCaseWithServer, Expected) from plomtask.conditions import Condition -from plomtask.processes import Process -from plomtask.todos import Todo -from plomtask.exceptions import HandledException class TestsSansDB(TestCaseSansDB): @@ -18,26 +15,6 @@ class TestsWithDB(TestCaseWithDB): checked_class = Condition default_init_kwargs = {'is_active': 0} - def test_remove(self) -> None: - """Test .remove() effects on DB and cache.""" - super().test_remove() - proc = Process(None) - proc.save(self.db_conn) - todo = Todo(None, proc, False, '2024-01-01') - todo.save(self.db_conn) - # check condition can only be deleted if not depended upon - for depender in (proc, todo): - c = Condition(None) - c.save(self.db_conn) - assert isinstance(c.id_, int) - depender.set_condition_relations(self.db_conn, [c.id_], [], [], []) - depender.save(self.db_conn) - with self.assertRaises(HandledException): - c.remove(self.db_conn) - depender.set_condition_relations(self.db_conn, [], [], [], []) - depender.save(self.db_conn) - c.remove(self.db_conn) - class ExpectedGetConditions(Expected): """Builder of expectations for GET /conditions.""" @@ -81,6 +58,16 @@ class TestsWithServer(TestCaseWithServer): self.check_minimal_inputs('/condition', valid_payload) # check valid POST payload on bad paths self.check_post(valid_payload, '/condition?id=foo', 400) + # check cannot delete depended-upon Condition + self.post_exp_cond([], {}) + for key in ('conditions', 'blockers', 'enables', 'disables'): + self.post_exp_process([], {key: [1]}, 1) + self.check_post({'delete': ''}, '/condition?id=1', 500) + self.post_exp_process([], {}, 1) + self.post_exp_day([], {'new_todo': '1'}) + for key in ('conditions', 'blockers', 'enables', 'disables'): + self.post_exp_todo([], {key: [1]}, 1) + self.check_post({'delete': ''}, '/condition?id=1', 500) def test_POST_condition(self) -> None: """Test (valid) POST /condition and its effect on GET /condition[s].""" @@ -88,24 +75,22 @@ class TestsWithServer(TestCaseWithServer): exp_single, exp_all = ExpectedGetCondition(1), ExpectedGetConditions() all_exps = [exp_single, exp_all] # test valid POST's effect on single /condition and full /conditions - self.post_exp_cond(all_exps, {'title': 'foo', 'description': 'oof'}, - post_to_id=False) + self.post_exp_cond(all_exps, {}, post_to_id=False) self.check_json_get(url_single, exp_single) self.check_json_get(url_all, exp_all) # test (no) effect of invalid POST to existing Condition on /condition self.check_post({}, url_single, 400) self.check_json_get(url_single, exp_single) - # test effect of POST changing title and activeness + # test effect of POST changing title, description, and activeness self.post_exp_cond(all_exps, {'title': 'bar', 'description': 'oof', 'is_active': 1}) self.check_json_get(url_single, exp_single) - self.check_json_get(url_all, exp_all) # test POST sans 'is_active' setting it negative - self.post_exp_cond(all_exps, {'title': 'bar', 'description': 'oof'}) + self.post_exp_cond(all_exps, {}) self.check_json_get(url_single, exp_single) - self.check_json_get(url_all, exp_all) # test deletion POST's effect, both to return id=1 into empty single, # full /conditions into empty list + self.check_json_get(url_single, exp_single) self.post_exp_cond(all_exps, {'delete': ''}, redir_to_id=False) exp_single.set('is_new', True) self.check_json_get(url_single, exp_single) @@ -125,8 +110,7 @@ class TestsWithServer(TestCaseWithServer): # make Condition and two Processes that among them establish all # possible ConditionsRelations to it, check /condition displays all exp = ExpectedGetCondition(1) - self.post_exp_cond([exp], {'title': 'foo', 'description': 'oof'}, - post_to_id=False) + self.post_exp_cond([exp], {}, post_to_id=False) for i, p in enumerate([('conditions', 'disables'), ('enables', 'blockers')]): self.post_exp_process([exp], {k: [1] for k in p}, i+1) diff --git a/tests/todos.py b/tests/todos.py index ea61a33..834330b 100644 --- a/tests/todos.py +++ b/tests/todos.py @@ -232,11 +232,6 @@ class TestsWithServer(TestCaseWithServer): """Tests against our HTTP server/handler (and database).""" checked_class = Todo - def _post_exp_todo( - self, id_: int, payload: dict[str, Any], exp: Expected) -> None: - self.check_post(payload, f'/todo?id={id_}') - exp.set_todo_from_post(id_, payload) - def test_basic_fail_POST_todo(self) -> None: """Test basic malformed/illegal POST /todo requests.""" self.post_exp_process([], {}, 1) @@ -270,10 +265,10 @@ class TestsWithServer(TestCaseWithServer): # test posting doneness, comment, calendarization, effort todo_post = {'is_done': 1, 'calendarize': 1, 'comment': 'foo', 'effort': 2.3} - self._post_exp_todo(1, todo_post, exp) + self.post_exp_todo([exp], todo_post, 1) self.check_json_get('/todo?id=1', exp) # test implicitly un-setting comment/calendarize/is_done by empty post - self._post_exp_todo(1, {}, exp) + self.post_exp_todo([exp], {}, 1) self.check_json_get('/todo?id=1', exp) # test effort post can be explicitly unset by "effort":"" post self.check_post({'effort': ''}, '/todo?id=1') @@ -287,7 +282,7 @@ class TestsWithServer(TestCaseWithServer): self.check_json_get('/todo?id=1', exp) todo_post = {'conditions': [1], 'disables': [1], 'blockers': [2], 'enables': [2]} - self._post_exp_todo(1, todo_post, exp) + self.post_exp_todo([exp], todo_post, 1) self.check_json_get('/todo?id=1', exp) def test_POST_todo_deletion(self) -> None: @@ -333,12 +328,12 @@ class TestsWithServer(TestCaseWithServer): self.post_exp_process([exp], {}, 1) self.post_exp_day([exp], {'new_todo': [1]}) self.post_exp_day([exp], {'new_todo': [1]}) - self._post_exp_todo(1, {'adopt': 2}, exp) + self.post_exp_todo([exp], {'adopt': 2}, 1) exp.set('steps_todo_to_process', [ exp.step_as_dict(node_id=1, process=None, todo=2)]) self.check_json_get('/todo?id=1', exp) # test Todo un-adopting by just not sending an adopt - self._post_exp_todo(1, {}, exp) + self.post_exp_todo([exp], {}, 1) exp.set('steps_todo_to_process', []) self.check_json_get('/todo?id=1', exp) # test fail on trying to adopt non-existing Todo @@ -346,11 +341,11 @@ class TestsWithServer(TestCaseWithServer): # test cannot self-adopt self.check_post({'adopt': 1}, '/todo?id=1', 400) # test cannot do 1-step circular adoption - self._post_exp_todo(2, {'adopt': 1}, exp) + self.post_exp_todo([exp], {'adopt': 1}, 2) self.check_post({'adopt': 2}, '/todo?id=1', 400) # test cannot do 2-step circular adoption self.post_exp_day([exp], {'new_todo': [1]}) - self._post_exp_todo(3, {'adopt': 2}, exp) + self.post_exp_todo([exp], {'adopt': 2}, 3) self.check_post({'adopt': 3}, '/todo?id=1', 400) # test can adopt Todo into ProcessStep chain via its Process (with key # 'step_filler' equivalent to single-element 'adopt' if intable) @@ -367,7 +362,7 @@ class TestsWithServer(TestCaseWithServer): self.post_exp_day([exp], {'new_todo': [2]}) self.post_exp_day([exp], {'new_todo': [3]}) self.check_json_get('/todo?id=1', exp) - self._post_exp_todo(1, {'step_filler_to_1': 5, 'adopt': [4]}, exp) + self.post_exp_todo([exp], {'step_filler_to_1': 5, 'adopt': [4]}, 1) exp.lib_get('Todo', 1)['children'] += [5] slots[0]['todo'] = 4 slots[1]['todo'] = 5 @@ -386,7 +381,7 @@ class TestsWithServer(TestCaseWithServer): slots[1]['children'] = [exp.step_as_dict( node_id=3, process=4, todo=None, fillable=True)] self.post_exp_day([exp], {'new_todo': [4]}) - self._post_exp_todo(1, {'adopt': [4, 5, 6]}, exp) + self.post_exp_todo([exp], {'adopt': [4, 5, 6]}, 1) slots += [exp.step_as_dict( node_id=4, process=None, todo=6, fillable=False)] self.check_json_get('/todo?id=1', exp) diff --git a/tests/utils.py b/tests/utils.py index 045194f..b9db97e 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -713,7 +713,7 @@ class Expected: def set_cond_from_post(self, id_: int, d: dict[str, Any]) -> None: """Set Condition of id_ in library based on POST dict d.""" - if d == {'delete': ''}: + if 'delete' in d: self.lib_del('Condition', id_) return cond = self.lib_get('Condition', id_) @@ -892,6 +892,10 @@ class TestCaseWithServer(TestCaseWithDB): # pylint: disable=too-many-arguments target = f'/condition?id={id_}' if post_to_id else '/condition' redir = f'/condition?id={id_}' if redir_to_id else '/conditions' + if 'title' not in payload: + payload['title'] = 'foo' + if 'description' not in payload: + payload['description'] = 'foo' self.check_post(payload, target, redir=redir) for exp in exps: exp.set_cond_from_post(id_, payload) @@ -930,6 +934,16 @@ class TestCaseWithServer(TestCaseWithDB): exp.set_proc_from_post(id_, payload) return payload + def post_exp_todo(self, + exps: list[Expected], + payload: dict[str, Any], + id_: int, + ) -> None: + """POST /todo, appropriately updated Expecteds.""" + self.check_post(payload, f'/todo?id={id_}') + for exp in exps: + exp.set_todo_from_post(id_, payload) + def check_filter(self, exp: Expected, category: str, key: str, val: str, list_ids: list[int]) -> None: """Check GET /{category}?{key}={val} sorts to list_ids.""" -- 2.30.2 From 37af8a589ecb172aa1118464e13cd9f480da99b7 Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Mon, 19 Aug 2024 07:08:58 +0200 Subject: [PATCH 15/16] Some test clean-ups. --- tests/processes.py | 53 ++++++---------- tests/todos.py | 148 ++++----------------------------------------- 2 files changed, 33 insertions(+), 168 deletions(-) diff --git a/tests/processes.py b/tests/processes.py index 0fa352d..2e2f716 100644 --- a/tests/processes.py +++ b/tests/processes.py @@ -286,16 +286,6 @@ class TestsWithServer(TestCaseWithServer): """Module tests against our HTTP server/handler (and database).""" checked_class = Process - def _post_process(self, id_: int = 1, - form_data: dict[str, Any] | None = None - ) -> dict[str, Any]: - """POST basic Process.""" - if not form_data: - form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1} - self.check_post(form_data, f'/process?id={id_}', - redir=f'/process?id={id_}') - return form_data - def test_fail_POST_process(self) -> None: """Test POST /process and its effect on the database.""" valid_post = {'title': '', 'description': '', 'effort': 1.0} @@ -321,13 +311,12 @@ class TestsWithServer(TestCaseWithServer): self.check_json_get('/process?id=1', exp) # check on minimal payload post exp = ExpectedGetProcess(1) - valid_post = {'title': 'foo', 'description': 'oof', 'effort': 2.3} - self.post_exp_process([exp], valid_post, 1) + self.post_exp_process([exp], {}, 1) self.check_json_get('/process?id=1', exp) # check boolean 'calendarize' - self.post_exp_process([exp], valid_post | {'calendarize': True}, 1) + self.post_exp_process([exp], {'calendarize': True}, 1) self.check_json_get('/process?id=1', exp) - self.post_exp_process([exp], valid_post, 1) + self.post_exp_process([exp], {}, 1) self.check_json_get('/process?id=1', exp) # check n_todos field self.post_exp_day([], {'new_todo': ['1']}, '2024-01-01') @@ -337,19 +326,18 @@ class TestsWithServer(TestCaseWithServer): # check cannot delete if Todos to Process self.check_post({'delete': ''}, '/process?id=1', 500) # check cannot delete if some ProcessStep's .step_process_id - self.post_exp_process([exp], valid_post, 2) - self.post_exp_process([exp], valid_post | {'new_top_step': 2}, 3) + self.post_exp_process([exp], {}, 2) + self.post_exp_process([exp], {'new_top_step': 2}, 3) self.check_post({'delete': ''}, '/process?id=2', 500) # check successful deletion - self.post_exp_process([exp], valid_post, 4) + self.post_exp_process([exp], {}, 4) self.check_post({'delete': ''}, '/process?id=4', 302, '/processes') exp = ExpectedGetProcess(4) - exp.set_proc_from_post(1, valid_post) - exp.set_proc_from_post(2, valid_post) - exp.set_proc_from_post(3, valid_post) - exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 3, 2)]) - exp.force('process_candidates', [1, 2, 3]) exp.set('is_new', True) + self.post_exp_process([exp], {}, 1) + self.post_exp_process([exp], {}, 2) + self.post_exp_process([exp], {}, 3) + exp.force('process_candidates', [1, 2, 3]) self.check_json_get('/process?id=4', exp) def test_POST_process_steps(self) -> None: @@ -434,17 +422,16 @@ class TestsWithServer(TestCaseWithServer): url = '/processes?sort_by=foo&pattern=bar&foo=x' self.check_json_get(url, exp) # test non-empty result, automatic (positive) sorting by title - proc1_post = {'title': 'foo', 'description': 'oof', 'effort': 1.0} - self.post_exp_process([exp], proc1_post, 1) - proc2_post = {'title': 'bar', 'description': 'rab', 'effort': 1.1} - self.post_exp_process([exp], proc2_post | {'new_top_step': [1]}, 2) - proc3_post = {'title': 'baz', 'description': 'zab', 'effort': 0.9} - self.post_exp_process([exp], proc3_post | {'new_top_step': [1]}, 3) - proc3_post = proc3_post | {'new_top_step': [2], 'kept_steps': [2]} - self.post_exp_process([exp], proc3_post, 3) - exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 2, 1), - exp.procstep_as_dict(2, 3, 1), - exp.procstep_as_dict(3, 3, 2)]) + for i, t in enumerate([('foo', 'oof', 1.0, []), + ('bar', 'rab', 1.1, [1]), + ('baz', 'zab', 0.9, [1, 2])]): + payload = {'title': t[0], 'description': t[1], 'effort': t[2], + 'new_top_step': t[3]} + self.post_exp_process([exp], payload, i+1) + exp.lib_set('ProcessStep', + [exp.procstep_as_dict(1, owner_id=2, step_process_id=1), + exp.procstep_as_dict(2, owner_id=3, step_process_id=1), + exp.procstep_as_dict(3, owner_id=3, step_process_id=2)]) exp.set('pattern', '') self.check_filter(exp, 'processes', 'sort_by', 'title', [2, 3, 1]) # test other sortings diff --git a/tests/todos.py b/tests/todos.py index 834330b..5e849b0 100644 --- a/tests/todos.py +++ b/tests/todos.py @@ -2,9 +2,8 @@ from typing import Any from tests.utils import (TestCaseSansDB, TestCaseWithDB, TestCaseWithServer, Expected) -from plomtask.todos import Todo, TodoNode -from plomtask.processes import Process, ProcessStep -from plomtask.conditions import Condition +from plomtask.todos import Todo +from plomtask.processes import Process from plomtask.exceptions import BadFormatException, HandledException @@ -20,24 +19,19 @@ class TestsWithDB(TestCaseWithDB, TestCaseSansDB): def setUp(self) -> None: super().setUp() - self.date1 = '2024-01-01' - self.date2 = '2024-01-02' self.proc = Process(None) self.proc.save(self.db_conn) - self.cond1 = Condition(None) - self.cond1.save(self.db_conn) - self.cond2 = Condition(None) - self.cond2.save(self.db_conn) self.default_init_kwargs['process'] = self.proc def test_Todo_by_date(self) -> None: """Test findability of Todos by date.""" - t1 = Todo(None, self.proc, False, self.date1) + date1, date2 = '2024-01-01', '2024-01-02' + t1 = Todo(None, self.proc, False, date1) t1.save(self.db_conn) - t2 = Todo(None, self.proc, False, self.date1) + t2 = Todo(None, self.proc, False, date1) t2.save(self.db_conn) - self.assertEqual(Todo.by_date(self.db_conn, self.date1), [t1, t2]) - self.assertEqual(Todo.by_date(self.db_conn, self.date2), []) + self.assertEqual(Todo.by_date(self.db_conn, date1), [t1, t2]) + self.assertEqual(Todo.by_date(self.db_conn, date2), []) with self.assertRaises(BadFormatException): self.assertEqual(Todo.by_date(self.db_conn, 'foo'), []) @@ -47,134 +41,18 @@ class TestsWithDB(TestCaseWithDB, TestCaseSansDB): def test_Todo_children(self) -> None: """Test Todo.children relations.""" - todo_1 = Todo(None, self.proc, False, self.date1) - todo_2 = Todo(None, self.proc, False, self.date1) + date1 = '2024-01-01' + todo_1 = Todo(None, self.proc, False, date1) + todo_2 = Todo(None, self.proc, False, date1) todo_2.save(self.db_conn) + # check un-saved Todo cannot parent with self.assertRaises(HandledException): todo_1.add_child(todo_2) todo_1.save(self.db_conn) - todo_3 = Todo(None, self.proc, False, self.date1) + todo_3 = Todo(None, self.proc, False, date1) + # check un-saved Todo cannot be parented with self.assertRaises(HandledException): todo_1.add_child(todo_3) - todo_3.save(self.db_conn) - todo_1.add_child(todo_3) - todo_1.save(self.db_conn) - assert isinstance(todo_1.id_, int) - todo_retrieved = Todo.by_id(self.db_conn, todo_1.id_) - self.assertEqual(todo_retrieved.children, [todo_3]) - with self.assertRaises(BadFormatException): - todo_3.add_child(todo_1) - - def test_Todo_conditioning(self) -> None: - """Test Todo.doability conditions.""" - assert isinstance(self.cond1.id_, int) - todo_1 = Todo(None, self.proc, False, self.date1) - todo_1.save(self.db_conn) - todo_2 = Todo(None, self.proc, False, self.date1) - todo_2.save(self.db_conn) - todo_2.add_child(todo_1) - with self.assertRaises(BadFormatException): - todo_2.is_done = True - todo_1.is_done = True - todo_2.is_done = True - todo_2.is_done = False - todo_2.set_condition_relations( - self.db_conn, [self.cond1.id_], [], [], []) - with self.assertRaises(BadFormatException): - todo_2.is_done = True - self.cond1.is_active = True - todo_2.is_done = True - - def test_Todo_step_tree(self) -> None: - """Test self-configuration of TodoStepsNode tree for Day view.""" - - def todo_node_as_dict(node: TodoNode) -> dict[str, object]: - return {'todo': node.todo.id_, 'seen': node.seen, - 'children': [todo_node_as_dict(c) for c in node.children]} - - todo_1 = Todo(None, self.proc, False, self.date1) - todo_1.save(self.db_conn) - assert isinstance(todo_1.id_, int) - # test minimum - node_0 = TodoNode(todo_1, False, []) - cmp_0_dict = todo_node_as_dict(todo_1.get_step_tree(set())) - cmp_1_dict = todo_node_as_dict(node_0) - self.assertEqual(cmp_0_dict, cmp_1_dict) - # test non_emtpy seen_todo does something - node_0.seen = True - cmp_0_dict = todo_node_as_dict(todo_1.get_step_tree({todo_1.id_})) - cmp_1_dict = todo_node_as_dict(node_0) - self.assertEqual(cmp_0_dict, cmp_1_dict) - # test child shows up - todo_2 = Todo(None, self.proc, False, self.date1) - todo_2.save(self.db_conn) - assert isinstance(todo_2.id_, int) - todo_1.add_child(todo_2) - node_2 = TodoNode(todo_2, False, []) - node_0.children = [node_2] - node_0.seen = False - cmp_0_dict = todo_node_as_dict(todo_1.get_step_tree(set())) - cmp_1_dict = todo_node_as_dict(node_0) - self.assertEqual(cmp_0_dict, cmp_1_dict) - # test child shows up with child - todo_3 = Todo(None, self.proc, False, self.date1) - todo_3.save(self.db_conn) - assert isinstance(todo_3.id_, int) - todo_2.add_child(todo_3) - node_3 = TodoNode(todo_3, False, []) - node_2.children = [node_3] - cmp_0_dict = todo_node_as_dict(todo_1.get_step_tree(set())) - cmp_1_dict = todo_node_as_dict(node_0) - self.assertEqual(cmp_0_dict, cmp_1_dict) - # test same todo can be child-ed multiple times at different locations - todo_1.add_child(todo_3) - node_4 = TodoNode(todo_3, True, []) - node_0.children += [node_4] - cmp_0_dict = todo_node_as_dict(todo_1.get_step_tree(set())) - cmp_1_dict = todo_node_as_dict(node_0) - self.assertEqual(cmp_0_dict, cmp_1_dict) - - def test_Todo_ensure_children(self) -> None: - """Test parenthood guarantees of Todo.ensure_children.""" - assert isinstance(self.proc.id_, int) - proc2 = Process(None) - proc2.save(self.db_conn) - assert isinstance(proc2.id_, int) - proc3 = Process(None) - proc3.save(self.db_conn) - assert isinstance(proc3.id_, int) - proc4 = Process(None) - proc4.save(self.db_conn) - assert isinstance(proc4.id_, int) - # make proc4 step of proc3 - step = ProcessStep(None, proc3.id_, proc4.id_, None) - proc3.set_steps(self.db_conn, [step]) - # give proc2 three steps; 2× proc1, 1× proc3 - step1 = ProcessStep(None, proc2.id_, self.proc.id_, None) - step2 = ProcessStep(None, proc2.id_, self.proc.id_, None) - step3 = ProcessStep(None, proc2.id_, proc3.id_, None) - proc2.set_steps(self.db_conn, [step1, step2, step3]) - # test mere creation does nothing - todo_ignore = Todo(None, proc2, False, self.date1) - todo_ignore.save(self.db_conn) - self.assertEqual(todo_ignore.children, []) - # test create_with_children on step-less does nothing - todo_1 = Todo(None, self.proc, False, self.date1) - todo_1.save(self.db_conn) - todo_1.ensure_children(self.db_conn) - self.assertEqual(todo_1.children, []) - self.assertEqual(len(Todo.all(self.db_conn)), 2) - # test create_with_children adopts and creates, and down tree too - todo_2 = Todo(None, proc2, False, self.date1) - todo_2.save(self.db_conn) - todo_2.ensure_children(self.db_conn) - self.assertEqual(3, len(todo_2.children)) - self.assertEqual(todo_1, todo_2.children[0]) - self.assertEqual(self.proc, todo_2.children[2].process) - self.assertEqual(proc3, todo_2.children[1].process) - todo_3 = todo_2.children[1] - self.assertEqual(len(todo_3.children), 1) - self.assertEqual(todo_3.children[0].process, proc4) class ExpectedGetTodo(Expected): -- 2.30.2 From c5b24e606c1e91c6f99f2574e3eb4e09fd74ba6b Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Wed, 28 Aug 2024 04:40:29 +0200 Subject: [PATCH 16/16] Some work on tests, kinda unfinished. --- plomtask/http.py | 3 +- tests/processes.py | 317 ++++++++++++++++++++------------------------- 2 files changed, 142 insertions(+), 178 deletions(-) diff --git a/plomtask/http.py b/plomtask/http.py index e224ea0..99d3850 100644 --- a/plomtask/http.py +++ b/plomtask/http.py @@ -4,8 +4,7 @@ from inspect import signature from typing import Any, Callable from base64 import b64encode, b64decode from binascii import Error as binascii_Exception -from http.server import BaseHTTPRequestHandler -from http.server import HTTPServer +from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs from json import dumps as json_dumps from os.path import split as path_split diff --git a/tests/processes.py b/tests/processes.py index 2e2f716..42ad22e 100644 --- a/tests/processes.py +++ b/tests/processes.py @@ -23,42 +23,6 @@ class TestsWithDB(TestCaseWithDB): """Module tests requiring DB setup.""" checked_class = Process - def three_processes(self) -> tuple[Process, Process, Process]: - """Return three saved processes.""" - p1, p2, p3 = Process(None), Process(None), Process(None) - for p in [p1, p2, p3]: - p.save(self.db_conn) - return p1, p2, p3 - - def p_of_conditions(self) -> tuple[Process, list[Condition], - list[Condition], list[Condition]]: - """Return Process and its three Condition sets.""" - p = Process(None) - c1, c2, c3 = Condition(None), Condition(None), Condition(None) - for c in [c1, c2, c3]: - c.save(self.db_conn) - assert isinstance(c1.id_, int) - assert isinstance(c2.id_, int) - assert isinstance(c3.id_, int) - set_1 = [c1, c2] - set_2 = [c2, c3] - set_3 = [c1, c3] - conds = [c.id_ for c in set_1 if isinstance(c.id_, int)] - enables = [c.id_ for c in set_2 if isinstance(c.id_, int)] - disables = [c.id_ for c in set_3 if isinstance(c.id_, int)] - p.set_condition_relations(self.db_conn, conds, [], enables, disables) - p.save(self.db_conn) - return p, set_1, set_2, set_3 - - def test_Process_conditions_saving(self) -> None: - """Test .save/.save_core.""" - p, set1, set2, set3 = self.p_of_conditions() - assert p.id_ is not None - r = Process.by_id(self.db_conn, p.id_) - self.assertEqual(sorted(r.conditions), sorted(set1)) - self.assertEqual(sorted(r.enables), sorted(set2)) - self.assertEqual(sorted(r.disables), sorted(set3)) - def test_from_table_row(self) -> None: """Test .from_table_row() properly reads in class from DB.""" super().test_from_table_row() @@ -72,120 +36,98 @@ class TestsWithDB(TestCaseWithDB): self.assertEqual(sorted(r.enables), sorted(set2)) self.assertEqual(sorted(r.disables), sorted(set3)) - # def test_Process_steps(self) -> None: - # """Test addition, nesting, and non-recursion of ProcessSteps""" - # # pylint: disable=too-many-locals - # # pylint: disable=too-many-statements - # p1, p2, p3 = self.three_processes() - # assert isinstance(p1.id_, int) - # assert isinstance(p2.id_, int) - # assert isinstance(p3.id_, int) - # steps_p1: list[ProcessStep] = [] - # # add step of process p2 as first (top-level) step to p1 - # s_p2_to_p1 = ProcessStep(None, p1.id_, p2.id_, None) - # steps_p1 += [s_p2_to_p1] - # p1.set_steps(self.db_conn, steps_p1) - # p1_dict: dict[int, ProcessStepsNode] = {} - # p1_dict[1] = ProcessStepsNode(p2, None, True, {}) - # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) - # # add step of process p3 as second (top-level) step to p1 - # s_p3_to_p1 = ProcessStep(None, p1.id_, p3.id_, None) - # steps_p1 += [s_p3_to_p1] - # p1.set_steps(self.db_conn, steps_p1) - # p1_dict[2] = ProcessStepsNode(p3, None, True, {}) - # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) - # # add step of process p3 as first (top-level) step to p2, - # steps_p2: list[ProcessStep] = [] - # s_p3_to_p2 = ProcessStep(None, p2.id_, p3.id_, None) - # steps_p2 += [s_p3_to_p2] - # p2.set_steps(self.db_conn, steps_p2) - # # expect it as implicit sub-step of p1's second (p3) step - # p2_dict = {3: ProcessStepsNode(p3, None, False, {})} - # p1_dict[1].steps[3] = p2_dict[3] - # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) - # # add step of process p2 as explicit sub-step to p1's second sub-step - # s_p2_to_p1_first = ProcessStep(None, p1.id_, p2.id_, s_p3_to_p1.id_) - # steps_p1 += [s_p2_to_p1_first] - # p1.set_steps(self.db_conn, steps_p1) - # seen_3 = ProcessStepsNode(p3, None, False, {}, False) - # p1_dict[1].steps[3].seen = True - # p1_dict[2].steps[4] = ProcessStepsNode(p2, s_p3_to_p1.id_, True, - # {3: seen_3}) - # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) - # # add step of process p3 as explicit sub-step to non-existing p1 - # # sub-step (of id=999), expect it to become another p1 top-level step - # s_p3_to_p1_999 = ProcessStep(None, p1.id_, p3.id_, 999) - # steps_p1 += [s_p3_to_p1_999] - # p1.set_steps(self.db_conn, steps_p1) - # p1_dict[5] = ProcessStepsNode(p3, None, True, {}) - # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) - # # add step of process p3 as explicit sub-step to p1's implicit p3 - # # sub-step, expect it to become another p1 top-level step - # s_p3_to_p1_impl_p3 = ProcessStep(None, p1.id_, p3.id_, - # s_p3_to_p2.id_) - # steps_p1 += [s_p3_to_p1_impl_p3] - # p1.set_steps(self.db_conn, steps_p1) - # p1_dict[6] = ProcessStepsNode(p3, None, True, {}) - # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) - # self.assertEqual(p1.used_as_step_by(self.db_conn), []) - # self.assertEqual(p2.used_as_step_by(self.db_conn), [p1]) - # self.assertEqual(p3.used_as_step_by(self.db_conn), [p1, p2]) - # # # add step of process p3 as explicit sub-step to p1's first - # # # sub-step, expect it to eliminate implicit p3 sub-step - # # s_p3_to_p1_first_explicit = ProcessStep(None, p1.id_, p3.id_, - # # s_p2_to_p1.id_) - # # p1_dict[1].steps = {7: ProcessStepsNode(p3, 1, True, {})} - # # p1_dict[2].steps[4].steps[3].seen = False - # # steps_p1 += [s_p3_to_p1_first_explicit] - # # p1.set_steps(self.db_conn, steps_p1) - # # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) - # # ensure implicit steps non-top explicit steps are shown - # s_p3_to_p2_first = ProcessStep(None, p2.id_, p3.id_, s_p3_to_p2.id_) - # steps_p2 += [s_p3_to_p2_first] - # p2.set_steps(self.db_conn, steps_p2) - # p1_dict[1].steps[3].steps[7] = ProcessStepsNode(p3, 3, False, {}, - # True) - # p1_dict[2].steps[4].steps[3].steps[7] = ProcessStepsNode( - # p3, 3, False, {}, False) - # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) - # # ensure suppressed step nodes are hidden - # assert isinstance(s_p3_to_p2.id_, int) - # p1.set_step_suppressions(self.db_conn, [s_p3_to_p2.id_]) - # p1_dict[1].steps[3].steps = {} - # p1_dict[1].steps[3].is_suppressed = True - # p1_dict[2].steps[4].steps[3].steps = {} - # p1_dict[2].steps[4].steps[3].is_suppressed = True - # self.assertEqual(p1.get_steps(self.db_conn), p1_dict) - - def test_Process_conditions(self) -> None: - """Test setting Process.conditions/enables/disables.""" - p = Process(None) - p.save(self.db_conn) - targets = ['conditions', 'blockers', 'enables', 'disables'] - for i, target in enumerate(targets): - c1, c2 = Condition(None), Condition(None) - c1.save(self.db_conn) - c2.save(self.db_conn) - assert isinstance(c1.id_, int) - assert isinstance(c2.id_, int) - args: list[list[int]] = [[], [], [], []] - args[i] = [] - p.set_condition_relations(self.db_conn, *args) - self.assertEqual(getattr(p, target), []) - args[i] = [c1.id_] - p.set_condition_relations(self.db_conn, *args) - self.assertEqual(getattr(p, target), [c1]) - args[i] = [c2.id_] - p.set_condition_relations(self.db_conn, *args) - self.assertEqual(getattr(p, target), [c2]) - args[i] = [c1.id_, c2.id_] - p.set_condition_relations(self.db_conn, *args) - self.assertEqual(getattr(p, target), [c1, c2]) + def test_Process_steps(self) -> None: + """Test addition, nesting, and non-recursion of ProcessSteps""" + # pylint: disable=too-many-locals + # pylint: disable=too-many-statements + p1, p2, p3 = self.three_processes() + assert isinstance(p1.id_, int) + assert isinstance(p2.id_, int) + assert isinstance(p3.id_, int) + steps_p1: list[ProcessStep] = [] + # # add step of process p2 as first (top-level) step to p1 + # s_p2_to_p1 = ProcessStep(None, p1.id_, p2.id_, None) + # steps_p1 += [s_p2_to_p1] + # p1.set_steps(self.db_conn, steps_p1) + # p1_dict: dict[int, ProcessStepsNode] = {} + # p1_dict[1] = ProcessStepsNode(p2, None, True, {}) + # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + # # add step of process p3 as second (top-level) step to p1 + # s_p3_to_p1 = ProcessStep(None, p1.id_, p3.id_, None) + # steps_p1 += [s_p3_to_p1] + # p1.set_steps(self.db_conn, steps_p1) + # p1_dict[2] = ProcessStepsNode(p3, None, True, {}) + # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + # # add step of process p3 as first (top-level) step to p2, + # steps_p2: list[ProcessStep] = [] + # s_p3_to_p2 = ProcessStep(None, p2.id_, p3.id_, None) + # steps_p2 += [s_p3_to_p2] + # p2.set_steps(self.db_conn, steps_p2) + # # expect it as implicit sub-step of p1's second (p3) step + # p2_dict = {3: ProcessStepsNode(p3, None, False, {})} + # p1_dict[1].steps[3] = p2_dict[3] + # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + # # add step of process p2 as explicit sub-step to p1's second sub-step + # s_p2_to_p1_first = ProcessStep(None, p1.id_, p2.id_, s_p3_to_p1.id_) + # steps_p1 += [s_p2_to_p1_first] + # p1.set_steps(self.db_conn, steps_p1) + # seen_3 = ProcessStepsNode(p3, None, False, {}, False) + # p1_dict[1].steps[3].seen = True + # p1_dict[2].steps[4] = ProcessStepsNode(p2, s_p3_to_p1.id_, True, + # {3: seen_3}) + # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + + # add step of process p3 as explicit sub-step to non-existing p1 + # sub-step (of id=999), expect it to become another p1 top-level step + s_p3_to_p1_999 = ProcessStep(None, p1.id_, p3.id_, 999) + steps_p1 += [s_p3_to_p1_999] + p1.set_steps(self.db_conn, steps_p1) + p1_dict[5] = ProcessStepsNode(p3, None, True, {}) + self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + # add step of process p3 as explicit sub-step to p1's implicit p3 + # sub-step, expect it to become another p1 top-level step + s_p3_to_p1_impl_p3 = ProcessStep(None, p1.id_, p3.id_, + s_p3_to_p2.id_) + steps_p1 += [s_p3_to_p1_impl_p3] + p1.set_steps(self.db_conn, steps_p1) + p1_dict[6] = ProcessStepsNode(p3, None, True, {}) + self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + self.assertEqual(p1.used_as_step_by(self.db_conn), []) + self.assertEqual(p2.used_as_step_by(self.db_conn), [p1]) + self.assertEqual(p3.used_as_step_by(self.db_conn), [p1, p2]) + # add step of process p3 as explicit sub-step to p1's first + # sub-step, expect it to eliminate implicit p3 sub-step + s_p3_to_p1_first_explicit = ProcessStep(None, p1.id_, p3.id_, + s_p2_to_p1.id_) + p1_dict[1].steps = {7: ProcessStepsNode(p3, 1, True, {})} + p1_dict[2].steps[4].steps[3].seen = False + steps_p1 += [s_p3_to_p1_first_explicit] + p1.set_steps(self.db_conn, steps_p1) + self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + # ensure implicit steps non-top explicit steps are shown + s_p3_to_p2_first = ProcessStep(None, p2.id_, p3.id_, s_p3_to_p2.id_) + steps_p2 += [s_p3_to_p2_first] + p2.set_steps(self.db_conn, steps_p2) + p1_dict[1].steps[3].steps[7] = ProcessStepsNode(p3, 3, False, {}, + True) + p1_dict[2].steps[4].steps[3].steps[7] = ProcessStepsNode( + p3, 3, False, {}, False) + self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + # ensure suppressed step nodes are hidden + assert isinstance(s_p3_to_p2.id_, int) + p1.set_step_suppressions(self.db_conn, [s_p3_to_p2.id_]) + p1_dict[1].steps[3].steps = {} + p1_dict[1].steps[3].is_suppressed = True + p1_dict[2].steps[4].steps[3].steps = {} + p1_dict[2].steps[4].steps[3].is_suppressed = True + self.assertEqual(p1.get_steps(self.db_conn), p1_dict) def test_remove(self) -> None: """Test removal of Processes and ProcessSteps.""" super().test_remove() - p1, p2, p3 = self.three_processes() + p1, p2, p3 = Process(None), Process(None), Process(None) + for p in [p1, p2, p3]: + p.save(self.db_conn) assert isinstance(p1.id_, int) assert isinstance(p2.id_, int) assert isinstance(p3.id_, int) @@ -318,6 +260,13 @@ class TestsWithServer(TestCaseWithServer): self.check_json_get('/process?id=1', exp) self.post_exp_process([exp], {}, 1) self.check_json_get('/process?id=1', exp) + # check conditions posting + for i in range(3): + self.post_exp_cond([exp], {}, i+1) + p = {'conditions': [1, 2], 'disables': [1], + 'blockers': [3], 'enables': [2, 3]} + self.post_exp_process([exp], p, 1) + self.check_json_get('/process?id=1', exp) # check n_todos field self.post_exp_day([], {'new_todo': ['1']}, '2024-01-01') self.post_exp_day([], {'new_todo': ['1']}, '2024-01-02') @@ -334,9 +283,9 @@ class TestsWithServer(TestCaseWithServer): self.check_post({'delete': ''}, '/process?id=4', 302, '/processes') exp = ExpectedGetProcess(4) exp.set('is_new', True) - self.post_exp_process([exp], {}, 1) - self.post_exp_process([exp], {}, 2) - self.post_exp_process([exp], {}, 3) + for i in range(3): + self.post_exp_cond([exp], {}, i+1) + self.post_exp_process([exp], {}, i+1) exp.force('process_candidates', [1, 2, 3]) self.check_json_get('/process?id=4', exp) @@ -346,10 +295,13 @@ class TestsWithServer(TestCaseWithServer): url = '/process?id=1' exp = ExpectedGetProcess(1) self.post_exp_process([exp], {}, 1) - # post first (top-level) step of proc 2 to proc 1 by 'step_of' in 2 + # post first (top-level) step of proc2 to proc1 by 'step_of' in 2 self.post_exp_process([exp], {'step_of': 1}, 2) - exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 2)]) - exp.set('steps', [exp.stepnode_as_dict(1, 2)]) + exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, owner_id=1, step_process_id=2)]) + exp.set('steps', [ + exp.stepnode_as_dict( + step_id=1, + proc_id=2)]) self.check_json_get(url, exp) # post empty/absent steps list to process, expect clean slate, and old # step to completely disappear @@ -357,43 +309,56 @@ class TestsWithServer(TestCaseWithServer): exp.lib_wipe('ProcessStep') exp.set('steps', []) self.check_json_get(url, exp) - # post new step of proc2 to proc1 by 'new_top_step' + # post anew (as only step yet) step of proc2 to proc1 by 'new_top_step' self.post_exp_process([exp], {'new_top_step': 2}, 1) - exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 2)]) + exp.lib_set('ProcessStep', + [exp.procstep_as_dict(1, owner_id=1, step_process_id=2)]) self.post_exp_process([exp], {'kept_steps': [1]}, 1) - exp.set('steps', [exp.stepnode_as_dict(1, 2)]) + step_nodes = [exp.stepnode_as_dict(step_id=1, proc_id=2)] + exp.set('steps', step_nodes) self.check_json_get(url, exp) - # fail on single- and multi-step recursion + # fail on single--step recursion p_min = {'title': '', 'description': '', 'effort': 0} self.check_post(p_min | {'new_top_step': 1}, url, 400) self.check_post(p_min | {'step_of': 1}, url, 400) - self.post_exp_process([exp], {'new_top_step': 1}, 2) - self.check_post(p_min | {'step_of': 2, 'new_top_step': 2}, url, 400) - self.post_exp_process([exp], {}, 3) - self.post_exp_process([exp], {'step_of': 3}, 4) - self.check_post(p_min | {'new_top_step': 3, 'step_of': 4}, url, 400) # post sibling steps + self.post_exp_process([exp], {}, 3) + self.post_exp_process([exp], {'kept_steps': [1], 'new_top_step': 3}, 1) + exp.lib_set('ProcessStep', + [exp.procstep_as_dict(2, owner_id=1, step_process_id=3)]) + step_nodes += [exp.stepnode_as_dict(step_id=2, proc_id=3)] + self.check_json_get(url, exp) + # # post implicit sub-step via post to proc2 self.post_exp_process([exp], {}, 4) - self.post_exp_process([exp], {'new_top_step': 4}, 1) - self.post_exp_process([exp], {'kept_steps': [1], 'new_top_step': 4}, 1) - exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 4), - exp.procstep_as_dict(2, 1, 4)]) - exp.set('steps', [exp.stepnode_as_dict(1, 4), - exp.stepnode_as_dict(2, 4)]) + self.post_exp_process([exp], {'step_of': [1], 'new_top_step': 4}, 2) + exp.lib_set('ProcessStep', + [exp.procstep_as_dict(3, owner_id=2, step_process_id=4)]) + step_nodes[0]['steps'] = [ + exp.stepnode_as_dict(step_id=3, proc_id=4, is_explicit=False)] self.check_json_get(url, exp) - # post sub-step chain + # post explicit sub-step via post to proc1 p = {'kept_steps': [1, 2], 'new_step_to_2': 4} self.post_exp_process([exp], p, 1) - exp.lib_set('ProcessStep', [exp.procstep_as_dict(3, 1, 4, 2)]) - exp.set('steps', [exp.stepnode_as_dict(1, 4), - exp.stepnode_as_dict(2, 4, steps=[ - exp.stepnode_as_dict(3, 4)])]) + exp.lib_set('ProcessStep', [exp.procstep_as_dict( + 4, owner_id=1, step_process_id=4, parent_step_id=2)]) + step_nodes[1]['steps'] = [ + exp.stepnode_as_dict(step_id=4, proc_id=4)] self.check_json_get(url, exp) - # fail posting sub-step that would cause recursion + # fail on multi-step recursion via new step(s) + self.post_exp_process([exp], {}, 5) + self.post_exp_process([exp], {'new_top_step': 1}, 5) + exp.lib_set('ProcessStep', [exp.procstep_as_dict( + 5, owner_id=5, step_process_id=1)]) + self.check_post(p_min | {'step_of': 5, 'new_top_step': 5}, url, 400) self.post_exp_process([exp], {}, 6) - self.post_exp_process([exp], {'new_top_step': 6}, 5) - p = p_min | {'kept_steps': [1, 2, 3], 'new_step_to_2': 5, 'step_of': 6} - self.check_post(p, url, 400) + self.post_exp_process([exp], {'new_top_step': 5}, 6) + exp.lib_set('ProcessStep', [exp.procstep_as_dict( + 6, owner_id=6, step_process_id=5)]) + self.check_post(p_min | {'step_of': 5, 'new_top_step': 6}, url, 400) + # fail on multi-step recursion via explicit sub-step + self.check_json_get(url, exp) + p = {'step_of': 5, 'kept_steps': [1, 2, 4], 'new_step_to_2': 6} + self.check_post(p_min | p, url, 400) def test_fail_GET_process(self) -> None: """Test invalid GET /process params.""" -- 2.30.2