From 79d2b7f2c1e944e20f7beb9739183e4cdd3694dd Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Mon, 12 Aug 2024 11:11:54 +0200 Subject: [PATCH] Set 'is_new' even when provided not-yet-existing ID, adapt and fix tests. --- plomtask/db.py | 5 ++-- plomtask/http.py | 20 ++++++++++--- tests/conditions.py | 68 +++++++++++++++++++++++---------------------- tests/processes.py | 4 ++- tests/todos.py | 10 +++---- tests/utils.py | 29 +++++++++---------- 6 files changed, 77 insertions(+), 59 deletions(-) diff --git a/plomtask/db.py b/plomtask/db.py index ee5f3b9..1fdd3e1 100644 --- a/plomtask/db.py +++ b/plomtask/db.py @@ -376,7 +376,8 @@ class BaseModel(Generic[BaseModelId]): @classmethod def _get_cached(cls: type[BaseModelInstance], - id_: BaseModelId) -> BaseModelInstance | None: + id_: BaseModelId + ) -> BaseModelInstance | None: """Get object of id_ from class's cache, or None if not found.""" cache = cls.get_cache() if id_ in cache: @@ -449,7 +450,7 @@ class BaseModel(Generic[BaseModelId]): def by_id_or_create(cls, db_conn: DatabaseConnection, id_: BaseModelId | None ) -> Self: - """Wrapper around .by_id, creating (not caching/saving) if not find.""" + """Wrapper around .by_id, creating (not caching/saving) if no find.""" if not cls.can_create_by_id: raise HandledException('Class cannot .by_id_or_create.') if id_ is None: diff --git a/plomtask/http.py b/plomtask/http.py index e307f14..4426bba 100644 --- a/plomtask/http.py +++ b/plomtask/http.py @@ -1,5 +1,6 @@ """Web server stuff.""" from __future__ import annotations +from inspect import signature from typing import Any, Callable from base64 import b64encode, b64decode from binascii import Error as binascii_Exception @@ -309,6 +310,9 @@ class TaskHandler(BaseHTTPRequestHandler): item = target_class.by_id_or_create(self._conn, id_) else: item = target_class.by_id(self._conn, id_) + if 'exists' in signature(f).parameters: + exists = id_ is not None and target_class._get_cached(id_) + return f(self, item, exists) return f(self, item) return wrapper return decorator @@ -474,10 +478,14 @@ class TaskHandler(BaseHTTPRequestHandler): 'pattern': pattern} @_get_item(Condition) - def do_GET_condition(self, c: Condition) -> dict[str, object]: + def do_GET_condition(self, + c: Condition, + exists: bool + ) -> dict[str, object]: """Show Condition of ?id=.""" ps = Process.all(self._conn) - return {'condition': c, 'is_new': c.id_ is None, + return {'condition': c, + 'is_new': not exists, 'enabled_processes': [p for p in ps if c in p.conditions], 'disabled_processes': [p for p in ps if c in p.blockers], 'enabling_processes': [p for p in ps if c in p.enables], @@ -494,7 +502,10 @@ class TaskHandler(BaseHTTPRequestHandler): return {'condition': c} @_get_item(Process) - def do_GET_process(self, process: Process) -> dict[str, object]: + def do_GET_process(self, + process: Process, + exists: bool + ) -> dict[str, object]: """Show Process of ?id=.""" owner_ids = self._params.get_all_int('step_to') owned_ids = self._params.get_all_int('has_step') @@ -516,7 +527,8 @@ class TaskHandler(BaseHTTPRequestHandler): for process_id in owned_ids: Process.by_id(self._conn, process_id) # to ensure ID exists preset_top_step = process_id - return {'process': process, 'is_new': process.id_ is None, + return {'process': process, + 'is_new': not exists, 'preset_top_step': preset_top_step, 'steps': process.get_steps(self._conn), 'owners': owners, diff --git a/tests/conditions.py b/tests/conditions.py index 6feda94..a9b28bb 100644 --- a/tests/conditions.py +++ b/tests/conditions.py @@ -51,9 +51,10 @@ class ExpectedGetConditions(Expected): class ExpectedGetCondition(Expected): """Builder of expectations for GET /condition.""" + _default_dict = {'is_new': False} _on_empty_make_temp = ('Condition', 'cond_as_dict') - def __init__(self, id_: int, *args: Any, **kwargs: Any) -> None: + def __init__(self, id_: int | None, *args: Any, **kwargs: Any) -> None: self._fields = {'condition': id_} super().__init__(*args, **kwargs) @@ -67,7 +68,6 @@ class ExpectedGetCondition(Expected): self._fields[c_field] = self.as_ids([ p for p in self.lib_all('Process') if self._fields['condition'] in p[p_field]]) - self._fields['is_new'] = False class TestsWithServer(TestCaseWithServer): @@ -79,51 +79,55 @@ class TestsWithServer(TestCaseWithServer): url = '/condition' self.check_post({}, url, 400) self.check_post({'title': ''}, url, 400) - self.check_post({'title': '', 'is_active': 0}, url, 400) - self.check_post({'description': '', 'is_active': 0}, url, 400) + self.check_post({'description': ''}, url, 400) # check valid POST payload on bad paths valid_payload = {'title': '', 'description': '', 'is_active': 0} - self.check_post(valid_payload, '/condition?id=foo', 400) + self.check_post(valid_payload, f'{url}?id=foo', 400) def test_POST_condition(self) -> None: """Test (valid) POST /condition and its effect on GET /condition[s].""" - exp_single = ExpectedGetCondition(1) - exp_all = ExpectedGetConditions() + url_single, url_all = '/condition?id=1', '/conditions' + exp_single, exp_all = ExpectedGetCondition(1), ExpectedGetConditions() all_exps = [exp_single, exp_all] # test valid POST's effect on single /condition and full /conditions - post = {'title': 'foo', 'description': 'oof', 'is_active': 0} - self.post_exp_cond(all_exps, 1, post, '', '?id=1') - self.check_json_get('/condition?id=1', exp_single) - self.check_json_get('/conditions', exp_all) + self.post_exp_cond(all_exps, {'title': 'foo', 'description': 'oof'}, + post_to_id=False) + self.check_json_get(url_single, exp_single) + self.check_json_get(url_all, exp_all) # test (no) effect of invalid POST to existing Condition on /condition - self.check_post({}, '/condition?id=1', 400) - self.check_json_get('/condition?id=1', exp_single) + self.check_post({}, url_single, 400) + self.check_json_get(url_single, exp_single) # test effect of POST changing title and activeness - post = {'title': 'bar', 'description': 'oof', 'is_active': 1} - self.post_exp_cond(all_exps, 1, post, '?id=1', '?id=1') - self.check_json_get('/condition?id=1', exp_single) - self.check_json_get('/conditions', exp_all) + self.post_exp_cond(all_exps, {'title': 'bar', 'description': 'oof', + 'is_active': 1}) + self.check_json_get(url_single, exp_single) + self.check_json_get(url_all, exp_all) # test deletion POST's effect, both to return id=1 into empty single, # full /conditions into empty list - self.post_exp_cond(all_exps, 1, {'delete': ''}, '?id=1', 's') - self.check_json_get('/condition?id=1', exp_single) - self.check_json_get('/conditions', exp_all) + self.post_exp_cond(all_exps, {'delete': ''}, redir_to_id=False) + exp_single.set('is_new', True) + self.check_json_get(url_single, exp_single) + self.check_json_get(url_all, exp_all) def test_GET_condition(self) -> None: """More GET /condition testing, especially for Process relations.""" # check expected default status codes self.check_get_defaults('/condition') + # check 'is_new' set if id= absent or pointing to not-yet-existing ID + exp = ExpectedGetCondition(None) + exp.set('is_new', True) + self.check_json_get('/condition', exp) + exp = ExpectedGetCondition(1) + exp.set('is_new', True) + self.check_json_get('/condition?id=1', exp) # make Condition and two Processes that among them establish all # possible ConditionsRelations to it, check /condition displays all exp = ExpectedGetCondition(1) - cond_post = {'title': 'foo', 'description': 'oof', 'is_active': 0} - self.post_exp_cond([exp], 1, cond_post, '', '?id=1') - proc1_post = {'title': 'A', 'description': '', 'effort': 1.1, - 'conditions': [1], 'disables': [1]} - proc2_post = {'title': 'B', 'description': '', 'effort': 0.9, - 'enables': [1], 'blockers': [1]} - self.post_exp_process([exp], proc1_post, 1) - self.post_exp_process([exp], proc2_post, 2) + self.post_exp_cond([exp], {'title': 'foo', 'description': 'oof'}, + post_to_id=False) + for i, p in enumerate([('conditions', 'disables'), + ('enables', 'blockers')]): + self.post_exp_process([exp], {k: [1] for k in p}, i+1) self.check_json_get('/condition?id=1', exp) def test_GET_conditions(self) -> None: @@ -131,19 +135,17 @@ class TestsWithServer(TestCaseWithServer): # test empty result on empty DB, default-settings on empty params exp = ExpectedGetConditions() self.check_json_get('/conditions', exp) - # test ignorance of meaningless non-empty params (incl. unknown key), - # that 'sort_by' default to 'title' (even if set to something else, as + # test 'sort_by' default to 'title' (even if set to something else, as # long as without handler) and 'pattern' get preserved exp.set('pattern', 'bar') - exp.set('sort_by', 'title') # for clarity (already default) self.check_json_get('/conditions?sort_by=foo&pattern=bar&foo=x', exp) - # test non-empty result, automatic (positive) sorting by title exp.set('pattern', '') + # test non-empty result, automatic (positive) sorting by title post_cond1 = {'is_active': 0, 'title': 'foo', 'description': 'oof'} post_cond2 = {'is_active': 0, 'title': 'bar', 'description': 'rab'} post_cond3 = {'is_active': 1, 'title': 'baz', 'description': 'zab'} for i, post in enumerate([post_cond1, post_cond2, post_cond3]): - self.post_exp_cond([exp], i+1, post, '', f'?id={i+1}') + self.post_exp_cond([exp], post, i+1, post_to_id=False) self.check_filter(exp, 'conditions', 'sort_by', 'title', [2, 3, 1]) # test other sortings self.check_filter(exp, 'conditions', 'sort_by', '-title', [1, 3, 2]) diff --git a/tests/processes.py b/tests/processes.py index 422c283..24a62bd 100644 --- a/tests/processes.py +++ b/tests/processes.py @@ -319,10 +319,11 @@ class TestsWithServer(TestCaseWithServer): # check on un-saved exp = ExpectedGetProcess(1) exp.force('process_candidates', []) + exp.set('is_new', True) self.check_json_get('/process?id=1', exp) # check on minimal payload post + exp = ExpectedGetProcess(1) valid_post = {'title': 'foo', 'description': 'oof', 'effort': 2.3} - exp.unforce('process_candidates') self.post_exp_process([exp], valid_post, 1) self.check_json_get('/process?id=1', exp) # check n_todos field @@ -345,6 +346,7 @@ class TestsWithServer(TestCaseWithServer): exp.set_proc_from_post(3, valid_post) exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 3, 2)]) exp.force('process_candidates', [1, 2, 3]) + exp.set('is_new', True) self.check_json_get('/process?id=4', exp) def test_POST_process_steps(self) -> None: diff --git a/tests/todos.py b/tests/todos.py index 2ecf3b8..f048d46 100644 --- a/tests/todos.py +++ b/tests/todos.py @@ -288,10 +288,10 @@ class TestsWithServer(TestCaseWithServer): for prefix in ['make_', '']: for suffix in ['', 'x', '1.1']: self.check_post({'step_filler_to_1': [f'{prefix}{suffix}']}, - '/todo?id=1', 400, '/todo') + '/todo?id=1', 400, '/todo') for suffix in ['', 'x', '1.1']: - self.check_post({'step_filler_to_{suffix}': ['1']}, - '/todo?id=1', 400, '/todo') + self.check_post({'step_filler_to_{suffix}': ['1']}, + '/todo?id=1', 400, '/todo') def test_basic_POST_todo(self) -> None: """Test basic POST /todo manipulations.""" @@ -318,8 +318,8 @@ class TestsWithServer(TestCaseWithServer): # test Condition posts c1_post = {'title': 'foo', 'description': 'oof', 'is_active': 0} c2_post = {'title': 'bar', 'description': 'rab', 'is_active': 1} - self.post_exp_cond([exp], 1, c1_post, '?id=1', '?id=1') - self.post_exp_cond([exp], 2, c2_post, '?id=2', '?id=2') + self.post_exp_cond([exp], c1_post, 1) + self.post_exp_cond([exp], c2_post, 2) self.check_json_get('/todo?id=1', exp) todo_post = {'conditions': [1], 'disables': [1], 'blockers': [2], 'enables': [2]} diff --git a/tests/utils.py b/tests/utils.py index d1b6eac..7945f61 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -569,8 +569,7 @@ class Expected: id_ = self._fields[category.lower()] make_temp = not bool(self.lib_get(category, id_)) if make_temp: - f = getattr(self, dicter) - self.lib_set(category, [f(id_)]) + self.lib_set(category, [getattr(self, dicter)(id_)]) self.recalc() d = {'_library': self._lib} for k, v in self._fields.items(): @@ -587,6 +586,7 @@ class Expected: d[k] = v if make_temp: json = json_dumps(d) + id_ = id_ if id_ is not None else '?' self.lib_del(category, id_) d = json_loads(json) return d @@ -640,7 +640,8 @@ class Expected: """Return dictionary of items by their 'id' fields.""" refs = {} for item in items: - refs[str(item['id'])] = item + id_ = str(item['id']) if item['id'] is not None else '?' + refs[id_] = item return refs @staticmethod @@ -703,7 +704,8 @@ class Expected: return cond = self.lib_get('Condition', id_) if cond: - cond['is_active'] = d['is_active'] + if 'is_active' in d: + cond['is_active'] = d['is_active'] for category in ['title', 'description']: history = cond['_versioned'][category] if len(history) > 0: @@ -713,8 +715,7 @@ class Expected: else: history['0'] = d[category] else: - cond = self.cond_as_dict( - id_, d['is_active'], d['title'], d['description']) + cond = self.cond_as_dict(id_, **d) self.lib_set('Condition', [cond]) @staticmethod @@ -755,11 +756,11 @@ class Expected: for k, v in d.items(): if k.startswith('step_filler_to_'): continue - elif 'adopt' == k: + if 'adopt' == k: new_children = v if isinstance(v, list) else [v] corrected_kwargs['children'] += new_children continue - elif k in {'is_done', 'calendarize'}: + if k in {'is_done', 'calendarize'}: v = v in VALID_TRUES corrected_kwargs[k] = v todo = self.lib_get('Todo', id_) @@ -866,16 +867,16 @@ class TestCaseWithServer(TestCaseWithDB): def post_exp_cond(self, exps: list[Expected], - id_: int, payload: dict[str, object], - path_suffix: str = '', - redir_suffix: str = '' + id_: int = 1, + post_to_id: bool = True, + redir_to_id: bool = True ) -> None: """POST /condition(s), appropriately update Expecteds.""" # pylint: disable=too-many-arguments - path = f'/condition{path_suffix}' - redir = f'/condition{redir_suffix}' - self.check_post(payload, path, redir=redir) + target = f'/condition?id={id_}' if post_to_id else '/condition' + redir = f'/condition?id={id_}' if redir_to_id else '/conditions' + self.check_post(payload, target, redir=redir) for exp in exps: exp.set_cond_from_post(id_, payload) -- 2.30.2