From: Christian Heller Date: Fri, 2 Aug 2024 09:36:05 +0000 (+0200) Subject: Re-organize testing. X-Git-Url: https://plomlompom.com/repos/%7B%7Bprefix%7D%7D/static/%7B%7Bdb.prefix%7D%7D/new_day?a=commitdiff_plain;h=b8d00b7031874cec4298f25e7077c48c8d7ea7cf;p=plomtask Re-organize testing. --- diff --git a/tests/conditions.py b/tests/conditions.py index 6cf06b8..333267f 100644 --- a/tests/conditions.py +++ b/tests/conditions.py @@ -1,5 +1,7 @@ """Test Conditions module.""" -from tests.utils import TestCaseWithDB, TestCaseWithServer, TestCaseSansDB +from typing import Any +from tests.utils import (TestCaseSansDB, TestCaseWithDB, TestCaseWithServer, + Expected) from plomtask.conditions import Condition from plomtask.processes import Process from plomtask.todos import Todo @@ -27,6 +29,7 @@ class TestsWithDB(TestCaseWithDB): for depender in (proc, todo): c = Condition(None) c.save(self.db_conn) + assert isinstance(c.id_, int) depender.set_condition_relations(self.db_conn, [c.id_], [], [], []) depender.save(self.db_conn) with self.assertRaises(HandledException): @@ -36,31 +39,40 @@ class TestsWithDB(TestCaseWithDB): c.remove(self.db_conn) +class ExpectedGetConditions(Expected): + """Builder of expectations for GET /conditions.""" + _default_dict = {'sort_by': 'title', 'pattern': ''} + + def recalc(self) -> None: + """Update internal dictionary by subclass-specific rules.""" + super().recalc() + self._fields['conditions'] = self.as_ids(self.lib_all('Condition')) + + +class ExpectedGetCondition(Expected): + """Builder of expectations for GET /condition.""" + _on_empty_make_temp = ('Condition', 'cond_as_dict') + + def __init__(self, id_: int, *args: Any, **kwargs: Any) -> None: + self._fields = {'condition': id_} + super().__init__(*args, **kwargs) + + def recalc(self) -> None: + """Update internal dictionary by subclass-specific rules.""" + super().recalc() + for p_field, c_field in [('conditions', 'enabled_processes'), + ('disables', 'disabling_processes'), + ('blockers', 'disabled_processes'), + ('enables', 'enabling_processes')]: + self._fields[c_field] = self.as_ids([ + p for p in self.lib_all('Process') + if self._fields['condition'] in p[p_field]]) + self._fields['is_new'] = False + + class TestsWithServer(TestCaseWithServer): """Module tests against our HTTP server/handler (and database).""" - @classmethod - def GET_condition_dict(cls, cond: dict[str, object]) -> dict[str, object]: - """Return JSON of GET /condition to expect.""" - return {'is_new': False, - 'enabled_processes': [], - 'disabled_processes': [], - 'enabling_processes': [], - 'disabling_processes': [], - 'condition': cond['id'], - '_library': {'Condition': cls.as_refs([cond])}} - - @classmethod - def GET_conditions_dict(cls, conds: list[dict[str, object]] - ) -> dict[str, object]: - """Return JSON of GET /conditions to expect.""" - library = {'Condition': cls.as_refs(conds)} if conds else {} - d: dict[str, object] = {'conditions': cls.as_id_list(conds), - 'sort_by': 'title', - 'pattern': '', - '_library': library} - return d - def test_fail_POST_condition(self) -> None: """Test malformed/illegal POST /condition requests.""" # check incomplete POST payloads @@ -76,111 +88,84 @@ class TestsWithServer(TestCaseWithServer): def test_POST_condition(self) -> None: """Test (valid) POST /condition and its effect on GET /condition[s].""" - # test valid POST's effect on … + exp_single = ExpectedGetCondition(1) + exp_all = ExpectedGetConditions() + all_exps = [exp_single, exp_all] + # test valid POST's effect on single /condition and full /conditions post = {'title': 'foo', 'description': 'oof', 'is_active': False} - self.check_post(post, '/condition', redir='/condition?id=1') - # … single /condition - expected_cond = self.cond_as_dict(titles=['foo'], descriptions=['oof']) - assert isinstance(expected_cond['_versioned'], dict) - expected_single = self.GET_condition_dict(expected_cond) - self.check_json_get('/condition?id=1', expected_single) - # … full /conditions - expected_all = self.GET_conditions_dict([expected_cond]) - self.check_json_get('/conditions', expected_all) + self.post_exp_cond(all_exps, 1, post, '', '?id=1') + self.check_json_get('/condition?id=1', exp_single) + self.check_json_get('/conditions', exp_all) # test (no) effect of invalid POST to existing Condition on /condition self.check_post({}, '/condition?id=1', 400) - self.check_json_get('/condition?id=1', expected_single) + self.check_json_get('/condition?id=1', exp_single) # test effect of POST changing title and activeness post = {'title': 'bar', 'description': 'oof', 'is_active': True} - self.check_post(post, '/condition?id=1') - expected_cond['_versioned']['title'][1] = 'bar' - expected_cond['is_active'] = True - self.check_json_get('/condition?id=1', expected_single) - # test deletion POST's effect, both to return id=1 into empty single, … - self.check_post({'delete': ''}, '/condition?id=1', redir='/conditions') - expected_cond = self.cond_as_dict() - assert isinstance(expected_single['_library'], dict) - expected_single['_library']['Condition'] = self.as_refs( - [expected_cond]) - self.check_json_get('/condition?id=1', expected_single) - # … and full /conditions into empty list - expected_all['conditions'] = [] - expected_all['_library'] = {} - self.check_json_get('/conditions', expected_all) + self.post_exp_cond(all_exps, 1, post, '?id=1', '?id=1') + self.check_json_get('/condition?id=1', exp_single) + self.check_json_get('/conditions', exp_all) + # test deletion POST's effect, both to return id=1 into empty single, + # full /conditions into empty list + self.post_exp_cond(all_exps, 1, {'delete': ''}, '?id=1', 's') + self.check_json_get('/condition?id=1', exp_single) + self.check_json_get('/conditions', exp_all) def test_GET_condition(self) -> None: """More GET /condition testing, especially for Process relations.""" # check expected default status codes self.check_get_defaults('/condition') # make Condition and two Processes that among them establish all - # possible ConditionsRelations to it, … + # possible ConditionsRelations to it, check /condition displays all + exp = ExpectedGetCondition(1) cond_post = {'title': 'foo', 'description': 'oof', 'is_active': False} - self.check_post(cond_post, '/condition', redir='/condition?id=1') - proc1_post = {'title': 'A', 'description': '', 'effort': 1.0, + self.post_exp_cond([exp], 1, cond_post, '', '?id=1') + proc1_post = {'title': 'A', 'description': '', 'effort': 1.1, 'conditions': [1], 'disables': [1]} - proc2_post = {'title': 'B', 'description': '', 'effort': 1.0, + proc2_post = {'title': 'B', 'description': '', 'effort': 0.9, 'enables': [1], 'blockers': [1]} - self.post_process(1, proc1_post) - self.post_process(2, proc2_post) - # … then check /condition displays all these properly. - cond_expected = self.cond_as_dict(titles=['foo'], descriptions=['oof']) - assert isinstance(cond_expected['id'], int) - proc1 = self.proc_as_dict(conditions=[cond_expected['id']], - disables=[cond_expected['id']]) - proc2 = self.proc_as_dict(2, 'B', - blockers=[cond_expected['id']], - enables=[cond_expected['id']]) - display_expected = self.GET_condition_dict(cond_expected) - assert isinstance(display_expected['_library'], dict) - display_expected['enabled_processes'] = self.as_id_list([proc1]) - display_expected['disabled_processes'] = self.as_id_list([proc2]) - display_expected['enabling_processes'] = self.as_id_list([proc2]) - display_expected['disabling_processes'] = self.as_id_list([proc1]) - display_expected['_library']['Process'] = self.as_refs([proc1, proc2]) - self.check_json_get('/condition?id=1', display_expected) + self.post_exp_process([exp], proc1_post, 1) + self.post_exp_process([exp], proc2_post, 2) + self.check_json_get('/condition?id=1', exp) def test_GET_conditions(self) -> None: """Test GET /conditions.""" # test empty result on empty DB, default-settings on empty params - expected = self.GET_conditions_dict([]) - self.check_json_get('/conditions', expected) + exp = ExpectedGetConditions() + self.check_json_get('/conditions', exp) # test ignorance of meaningless non-empty params (incl. unknown key), # that 'sort_by' default to 'title' (even if set to something else, as # long as without handler) and 'pattern' get preserved - expected['pattern'] = 'bar' # preserved despite zero effect! - expected['sort_by'] = 'title' # for clarity (actually already set) - url = '/conditions?sort_by=foo&pattern=bar&foo=x' - self.check_json_get(url, expected) + exp.set('pattern', 'bar') # preserved despite zero effect! + exp.set('sort_by', 'title') # for clarity (already default) + self.check_json_get('/conditions?sort_by=foo&pattern=bar&foo=x', exp) # test non-empty result, automatic (positive) sorting by title post_cond1 = {'is_active': False, 'title': 'foo', 'description': 'oof'} post_cond2 = {'is_active': False, 'title': 'bar', 'description': 'rab'} post_cond3 = {'is_active': True, 'title': 'baz', 'description': 'zab'} - self.check_post(post_cond1, '/condition', redir='/condition?id=1') - self.check_post(post_cond2, '/condition', redir='/condition?id=2') - self.check_post(post_cond3, '/condition', redir='/condition?id=3') - cond1 = self.cond_as_dict(1, False, ['foo'], ['oof']) - cond2 = self.cond_as_dict(2, False, ['bar'], ['rab']) - cond3 = self.cond_as_dict(3, True, ['baz'], ['zab']) - expected = self.GET_conditions_dict([cond2, cond3, cond1]) - self.check_json_get('/conditions', expected) + for i, post in enumerate([post_cond1, post_cond2, post_cond3]): + self.post_exp_cond([exp], i+1, post, '', f'?id={i+1}') + exp.set('pattern', '') + exp.force('conditions', [2, 3, 1]) + self.check_json_get('/conditions', exp) # test other sortings - expected['sort_by'] = '-title' - assert isinstance(expected['conditions'], list) - expected['conditions'].reverse() - self.check_json_get('/conditions?sort_by=-title', expected) - expected['sort_by'] = 'is_active' - expected['conditions'] = self.as_id_list([cond1, cond2, cond3]) - self.check_json_get('/conditions?sort_by=is_active', expected) - expected['sort_by'] = '-is_active' - expected['conditions'].reverse() - self.check_json_get('/conditions?sort_by=-is_active', expected) + exp.set('sort_by', '-title') + exp.force('conditions', [1, 3, 2]) + self.check_json_get('/conditions?sort_by=-title', exp) + exp.set('sort_by', 'is_active') + exp.force('conditions', [1, 2, 3]) + self.check_json_get('/conditions?sort_by=is_active', exp) + exp.set('sort_by', '-is_active') + exp.force('conditions', [3, 2, 1]) + self.check_json_get('/conditions?sort_by=-is_active', exp) # test pattern matching on title - expected = self.GET_conditions_dict([cond2, cond3]) - expected['pattern'] = 'ba' - self.check_json_get('/conditions?pattern=ba', expected) + exp.set('sort_by', 'title') + exp.set('pattern', 'ba') + exp.force('conditions', [2, 3]) + exp.lib_del('Condition', 1) + self.check_json_get('/conditions?pattern=ba', exp) # test pattern matching on description - assert isinstance(expected['_library'], dict) - expected['pattern'] = 'of' - expected['conditions'] = self.as_id_list([cond1]) - expected['_library']['Condition'] = self.as_refs([cond1]) - self.check_json_get('/conditions?pattern=of', expected) + exp.set('pattern', 'of') + exp.lib_wipe('Condition') + exp.set_cond_from_post(1, post_cond1) + exp.force('conditions', [1]) + self.check_json_get('/conditions?pattern=of', exp) diff --git a/tests/days.py b/tests/days.py index c36a9ef..78c5552 100644 --- a/tests/days.py +++ b/tests/days.py @@ -1,7 +1,8 @@ """Test Days module.""" from datetime import datetime, timedelta -from typing import Callable -from tests.utils import TestCaseSansDB, TestCaseWithDB, TestCaseWithServer +from typing import Any +from tests.utils import (TestCaseSansDB, TestCaseWithDB, TestCaseWithServer, + Expected) from plomtask.dating import date_in_n_days as tested_date_in_n_days from plomtask.days import Day @@ -10,6 +11,17 @@ from plomtask.days import Day TESTING_DATE_FORMAT = '%Y-%m-%d' +def _testing_date_in_n_days(n: int) -> str: + """Return in TEST_DATE_FORMAT date from today + n days. + + As with TESTING_DATE_FORMAT, we assume this equal the original's code + at plomtask.dating.date_in_n_days, but want to state our expectations + explicitly to rule out importing issues from the original. + """ + date = datetime.now() + timedelta(days=n) + return date.strftime(TESTING_DATE_FORMAT) + + class TestsSansDB(TestCaseSansDB): """Days module tests not requiring DB setup.""" checked_class = Day @@ -94,91 +106,59 @@ class TestsWithDB(TestCaseWithDB): self.assertEqual(result, [yesterday, today, tomorrow]) -class TestsWithServer(TestCaseWithServer): - """Tests against our HTTP server/handler (and database).""" +class ExpectedGetCalendar(Expected): + """Builder of expectations for GET /calendar.""" - @staticmethod - def _testing_date_in_n_days(n: int) -> str: - """Return in TEST_DATE_FORMAT date from today + n days. + def __init__(self, start: int, end: int, *args: Any, **kwargs: Any + ) -> None: + self._fields = {'start': _testing_date_in_n_days(start), + 'end': _testing_date_in_n_days(end), + 'today': _testing_date_in_n_days(0)} + self._fields['days'] = [_testing_date_in_n_days(i) + for i in range(start, end+1)] + super().__init__(*args, **kwargs) + for date in self._fields['days']: + self.lib_set('Day', [self.day_as_dict(date)]) - As with TESTING_DATE_FORMAT, we assume this equal the original's code - at plomtask.dating.date_in_n_days, but want to state our expectations - explicitly to rule out importing issues from the original. - """ - date = datetime.now() + timedelta(days=n) - return date.strftime(TESTING_DATE_FORMAT) - - @staticmethod - def _day_as_dict(date: str) -> dict[str, object]: - return {'id': date, 'comment': '', 'todos': []} - - @staticmethod - def _todo_node_as_dict(todo_id: int) -> dict[str, object]: - """Return JSON of TodoNode to expect.""" - return {'children': [], 'seen': False, 'todo': todo_id} - - @staticmethod - def _post_args_return_expectation( - args: list[object], - names_of_simples: list[str], - names_of_versioneds: list[str], - f_as_dict: Callable[..., dict[str, object]], - f_to_post: Callable[..., None | dict[str, object]] - ) -> dict[str, object]: - """Create expected=f_as_dict(*args), post as names_* with f_to_post.""" - expected = f_as_dict(*args) - assert isinstance(expected['_versioned'], dict) - to_post = {} - for name in names_of_simples: - to_post[name] = expected[name] - for name in names_of_versioneds: - to_post[name] = expected['_versioned'][name][0] - f_to_post(expected['id'], to_post) - return expected - - def _post_day(self, params: str = '', - form_data: None | dict[str, object] = None, - redir_to: str = '', - status: int = 302, - ) -> None: - """POST /day?{params} with form_data.""" - if not form_data: - form_data = {'day_comment': '', 'make_type': ''} - target = f'/day?{params}' - if not redir_to: - redir_to = f'{target}&make_type={form_data["make_type"]}' - self.check_post(form_data, target, status, redir_to) - - @classmethod - def GET_day_dict(cls, date: str) -> dict[str, object]: - """Return JSON of GET /day to expect.""" - day = cls._day_as_dict(date) - d: dict[str, object] = {'day': date, - 'top_nodes': [], - 'make_type': '', - 'enablers_for': {}, - 'disablers_for': {}, - 'conditions_present': [], - 'processes': [], - '_library': {'Day': cls.as_refs([day])}} - return d - - @classmethod - def GET_calendar_dict(cls, start: int, end: int) -> dict[str, object]: - """Return JSON of GET /calendar to expect. - - NB: the date string list to key 'days' implies/expects a continuous (= - gaps filled) alphabetical order of dates by virtue of range(start, - end+1) and date_in_n_days. - """ - today_date = cls._testing_date_in_n_days(0) - start_date = cls._testing_date_in_n_days(start) - end_date = cls._testing_date_in_n_days(end) - dates = [cls._testing_date_in_n_days(i) for i in range(start, end+1)] - days = [cls._day_as_dict(d) for d in dates] - library = {'Day': cls.as_refs(days)} if len(days) > 0 else {} - return {'today': today_date, 'start': start_date, 'end': end_date, - 'days': dates, '_library': library} + +class ExpectedGetDay(Expected): + """Builder of expectations for GET /day.""" + _default_dict = {'make_type': ''} + _on_empty_make_temp = ('Day', 'day_as_dict') + + def __init__(self, date: str, *args: Any, **kwargs: Any) -> None: + self._fields = {'day': date} + super().__init__(*args, **kwargs) + + def recalc(self) -> None: + super().recalc() + todos = [t for t in self.lib_all('Todo') + if t['date'] == self._fields['day']] + self.lib_get('Day', self._fields['day'])['todos'] = self.as_ids(todos) + self._fields['top_nodes'] = [ + {'children': [], 'seen': False, 'todo': todo['id']} + for todo in todos] + for todo in todos: + proc = self.lib_get('Process', todo['process_id']) + for title in ['conditions', 'enables', 'blockers', 'disables']: + todo[title] = proc[title] + conds_present = set() + for todo in todos: + for title in ['conditions', 'enables', 'blockers', 'disables']: + for cond_id in todo[title]: + conds_present.add(cond_id) + self._fields['conditions_present'] = list(conds_present) + for prefix in ['en', 'dis']: + blers = {} + for cond_id in conds_present: + blers[str(cond_id)] = self.as_ids( + [t for t in todos if cond_id in t[f'{prefix}ables']]) + self._fields[f'{prefix}ablers_for'] = blers + self._fields['processes'] = self.as_ids(self.lib_all('Process')) + + +class TestsWithServer(TestCaseWithServer): + """Tests against our HTTP server/handler (and database).""" def test_basic_GET_day(self) -> None: """Test basic (no Processes/Conditions/Todos) GET /day basics.""" @@ -186,19 +166,19 @@ class TestsWithServer(TestCaseWithServer): self.check_get('/day?date=foo', 400) self.check_get('/day?date=2024-02-30', 400) # check undefined day - date = self._testing_date_in_n_days(0) - expected = self.GET_day_dict(date) - self.check_json_get('/day', expected) + date = _testing_date_in_n_days(0) + exp = ExpectedGetDay(date) + self.check_json_get('/day', exp) # check defined day, with and without make_type parameter date = '2024-01-01' - expected = self.GET_day_dict(date) - expected['make_type'] = 'bar' - self.check_json_get(f'/day?date={date}&make_type=bar', expected) + exp = ExpectedGetDay(date) + exp.set('make_type', 'bar') + self.check_json_get(f'/day?date={date}&make_type=bar', exp) # check parsing of 'yesterday', 'today', 'tomorrow' for name, dist in [('yesterday', -1), ('today', 0), ('tomorrow', +1)]: - date = self._testing_date_in_n_days(dist) - expected = self.GET_day_dict(date) - self.check_json_get(f'/day?date={name}', expected) + date = _testing_date_in_n_days(dist) + exp = ExpectedGetDay(date) + self.check_json_get(f'/day?date={name}', exp) def test_fail_POST_day(self) -> None: """Test malformed/illegal POST /day requests.""" @@ -208,7 +188,7 @@ class TestsWithServer(TestCaseWithServer): self.check_post({'day_comment': ''}, url, 400) self.check_post({'make_type': ''}, url, 400) # to next check illegal new_todo values, we need an actual Process - self.post_process(1) + self.post_exp_process([], {}, 1) # check illegal new_todo values post: dict[str, object] post = {'make_type': '', 'day_comment': '', 'new_todo': ['foo']} @@ -267,98 +247,143 @@ class TestsWithServer(TestCaseWithServer): ('today', 0, 'b'), ('yesterday', -1, 'c'), ('tomorrow', +1, 'd')]: - date = name if dist is None else self._testing_date_in_n_days(dist) + date = name if dist is None else _testing_date_in_n_days(dist) post = {'day_comment': test_str, 'make_type': f'x:{test_str}'} post_url = f'/day?date={name}' redir_url = f'{post_url}&make_type={post["make_type"]}' self.check_post(post, post_url, 302, redir_url) - expected = self.GET_day_dict(date) - assert isinstance(expected['_library'], dict) - expected['_library']['Day'][date]['comment'] = test_str - self.check_json_get(post_url, expected) + exp = ExpectedGetDay(date) + exp.set_day_from_post(date, post) + self.check_json_get(post_url, exp) def test_GET_day_with_processes_and_todos(self) -> None: """Test GET /day displaying Processes and Todos (no trees).""" date = '2024-01-01' + exp = ExpectedGetDay(date) # check Processes get displayed in ['processes'] and ['_library'], # even without any Todos referencing them - procs_data = [[1, 'foo', 'oof', 1.1], # id, title, desc, effort - [2, 'bar', 'rab', 0.9]] - procs_expected = [] - for p_data in procs_data: - procs_expected += [self._post_args_return_expectation( - p_data, [], ['title', 'description', 'effort'], - self.proc_as_dict, self.post_process)] - expected = self.GET_day_dict(date) - assert isinstance(expected['_library'], dict) - expected['processes'] = self.as_id_list(procs_expected) - expected['_library']['Process'] = self.as_refs(procs_expected) - self.check_json_get(f'/day?date={date}', expected) + proc_posts = [{'title': 'foo', 'description': 'oof', 'effort': 1.1}, + {'title': 'bar', 'description': 'rab', 'effort': 0.9}] + for i, proc_post in enumerate(proc_posts): + self.post_exp_process([exp], proc_post, i+1) + self.check_json_get(f'/day?date={date}', exp) # post Todos of either process and check their display - post_day: dict[str, object] - post_day = {'day_comment': '', 'make_type': '', 'new_todo': [1, 2]} - todos = [self.todo_as_dict(1, 1, date), self.todo_as_dict(2, 2, date)] - expected['_library']['Todo'] = self.as_refs(todos) - expected['_library']['Day'][date]['todos'] = self.as_id_list(todos) - nodes = [self._todo_node_as_dict(1), self._todo_node_as_dict(2)] - expected['top_nodes'] = nodes - self._post_day(f'date={date}', post_day) - self.check_json_get(f'/day?date={date}', expected) + self.post_exp_day([exp], {'new_todo': [1, 2]}) + self.check_json_get(f'/day?date={date}', exp) + # test malformed Todo manipulation posts + post_day = {'day_comment': '', 'make_type': '', 'comment': [''], + 'new_todo': [], 'done': [1], 'effort': [2.3]} + self.check_post(post_day, f'/day?date={date}', 400) # no todo_id + post_day['todo_id'] = [2] # not identifying Todo refered by done + self.check_post(post_day, f'/day?date={date}', 400) + post_day['todo_id'] = [1, 2] # imply range beyond that of effort etc. + self.check_post(post_day, f'/day?date={date}', 400) + post_day['comment'] = ['FOO', ''] + self.check_post(post_day, f'/day?date={date}', 400) + post_day['effort'] = [2.3, ''] + post_day['comment'] = [''] + self.check_post(post_day, f'/day?date={date}', 400) # add a comment to one Todo and set the other's doneness and effort - post_day = {'day_comment': '', 'make_type': '', 'new_todo': [], - 'todo_id': [1, 2], 'done': [2], 'comment': ['FOO', ''], - 'effort': [2.3, '']} - expected['_library']['Todo']['1']['comment'] = 'FOO' - expected['_library']['Todo']['1']['effort'] = 2.3 - expected['_library']['Todo']['2']['is_done'] = True - self._post_day(f'date={date}', post_day) - self.check_json_get(f'/day?date={date}', expected) + post_day['comment'] = ['FOO', ''] + self.post_exp_day([exp], post_day) + self.check_json_get(f'/day?date={date}', exp) + # invert effort and comment between both Todos + # (cannot invert doneness, /day only collects positive setting) + post_day['comment'] = ['', 'FOO'] + post_day['effort'] = ['', 2.3] + self.post_exp_day([exp], post_day) + self.check_json_get(f'/day?date={date}', exp) + + def test_POST_day_todo_make_types(self) -> None: + """Test behavior of POST /todo on 'make_type'='full' and 'empty'.""" + date = '2024-01-01' + exp = ExpectedGetDay(date) + # create two Processes, with second one step of first one + self.post_exp_process([exp], {}, 2) + self.post_exp_process([exp], {'new_top_step': 2}, 1) + exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 2, None)]) + self.check_json_get(f'/day?date={date}', exp) + # post Todo of adopting Process, with make_type=full + self.post_exp_day([exp], {'make_type': 'full', 'new_todo': [1]}) + exp.lib_get('Todo', 1)['children'] = [2] + exp.lib_set('Todo', [exp.todo_as_dict(2, 2)]) + top_nodes = [{'todo': 1, + 'seen': False, + 'children': [{'todo': 2, + 'seen': False, + 'children': []}]}] + exp.force('top_nodes', top_nodes) + self.check_json_get(f'/day?date={date}', exp) + # post another Todo of adopting Process, expect to adopt existing + self.post_exp_day([exp], {'make_type': 'full', 'new_todo': [1]}) + exp.lib_set('Todo', [exp.todo_as_dict(3, 1, children=[2])]) + top_nodes += [{'todo': 3, + 'seen': False, + 'children': [{'todo': 2, + 'seen': True, + 'children': []}]}] + exp.force('top_nodes', top_nodes) + self.check_json_get(f'/day?date={date}', exp) + # post another Todo of adopting Process, make_type=empty + self.post_exp_day([exp], {'make_type': 'empty', 'new_todo': [1]}) + exp.lib_set('Todo', [exp.todo_as_dict(4, 1)]) + top_nodes += [{'todo': 4, + 'seen': False, + 'children': []}] + exp.force('top_nodes', top_nodes) + self.check_json_get(f'/day?date={date}', exp) + + def test_POST_day_new_todo_order_commutative(self) -> None: + """Check that order of 'new_todo' values in POST /day don't matter.""" + date = '2024-01-01' + exp = ExpectedGetDay(date) + self.post_exp_process([exp], {}, 2) + self.post_exp_process([exp], {'new_top_step': 2}, 1) + exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 2, None)]) + # make-full-day-post batch of Todos of both Processes in one order …, + self.post_exp_day([exp], {'make_type': 'full', 'new_todo': [1, 2]}) + top_nodes: list[dict[str, Any]] = [{'todo': 1, + 'seen': False, + 'children': [{'todo': 2, + 'seen': False, + 'children': []}]}] + exp.force('top_nodes', top_nodes) + exp.lib_get('Todo', 1)['children'] = [2] + self.check_json_get(f'/day?date={date}', exp) + # … and then in the other, expecting same node tree / relations + exp.lib_del('Day', date) + date = '2024-01-02' + exp.set('day', date) + day_post = {'make_type': 'full', 'new_todo': [2, 1]} + self.post_exp_day([exp], day_post, date) + exp.lib_del('Todo', 1) + exp.lib_del('Todo', 2) + top_nodes[0]['todo'] = 3 # was: 1 + top_nodes[0]['children'][0]['todo'] = 4 # was: 2 + exp.lib_get('Todo', 3)['children'] = [4] + self.check_json_get(f'/day?date={date}', exp) def test_GET_day_with_conditions(self) -> None: """Test GET /day displaying Conditions and their relations.""" date = '2024-01-01' - # add Process with Conditions and their Todos, check display - conds_data = [[1, False, ['A'], ['a']], # id, is_active, title, desc - [2, True, ['B'], ['b']]] - conds_expected = [] - for c_data in conds_data: - conds_expected += [self._post_args_return_expectation( - c_data, ['is_active'], ['title', 'description'], - self.cond_as_dict, - lambda x, y: self.check_post(y, f'/condition?id={x}'))] - procs_data = [ # id, title, desc, effort, - # conditions, disables, blockers, enables - [1, 'foo', 'oof', 1.1, [1], [1], [2], [2]], - [2, 'bar', 'rab', 0.9, [2], [2], [1], [1]]] - procs_expected = [] - for p_data in procs_data: - procs_expected += [self._post_args_return_expectation( - p_data, - ['conditions', 'disables', 'blockers', 'enables'], - ['title', 'description', 'effort'], - self.proc_as_dict, self.post_process)] - expected = self.GET_day_dict(date) - assert isinstance(expected['_library'], dict) - expected['processes'] = self.as_id_list(procs_expected) - expected['_library']['Process'] = self.as_refs(procs_expected) - expected['_library']['Condition'] = self.as_refs(conds_expected) - self._post_day(f'date={date}') - self.check_json_get(f'/day?date={date}', expected) - # add Todos in relation to Conditions, check consequences - post_day: dict[str, object] - post_day = {'day_comment': '', 'make_type': '', 'new_todo': [1, 2]} - todos = [ # id, process_id, date, conds, disables, blockers, enables - self.todo_as_dict(1, 1, date, [1], [1], [2], [2]), - self.todo_as_dict(2, 2, date, [2], [2], [1], [1])] - expected['_library']['Todo'] = self.as_refs(todos) - expected['_library']['Day'][date]['todos'] = self.as_id_list(todos) - nodes = [self._todo_node_as_dict(1), self._todo_node_as_dict(2)] - expected['top_nodes'] = nodes - expected['disablers_for'] = {'1': [1], '2': [2]} - expected['enablers_for'] = {'1': [2], '2': [1]} - expected['conditions_present'] = self.as_id_list(conds_expected) - self._post_day(f'date={date}', post_day) - self.check_json_get(f'/day?date={date}', expected) + exp = ExpectedGetDay(date) + # check non-referenced Conditions not shown + cond_posts = [{'is_active': False, 'title': 'A', 'description': 'a'}, + {'is_active': True, 'title': 'B', 'description': 'b'}] + for i, cond_post in enumerate(cond_posts): + self.check_post(cond_post, f'/condition?id={i+1}') + self.check_json_get(f'/day?date={date}', exp) + # add Processes with Conditions, check Conditions now shown + for i, (c1, c2) in enumerate([(1, 2), (2, 1)]): + post = {'conditions': [c1], 'disables': [c1], + 'blockers': [c2], 'enables': [c2]} + self.post_exp_process([exp], post, i+1) + for i, cond_post in enumerate(cond_posts): + exp.set_cond_from_post(i+1, cond_post) + self.check_json_get(f'/day?date={date}', exp) + # add Todos in relation to Conditions, check consequence relations + self.post_exp_day([exp], {'new_todo': [1, 2]}) + self.check_json_get(f'/day?date={date}', exp) def test_GET_calendar(self) -> None: """Test GET /calendar responses based on various inputs, DB states.""" @@ -366,23 +391,20 @@ class TestsWithServer(TestCaseWithServer): self.check_get('/calendar?start=foo', 400) self.check_get('/calendar?end=foo', 400) # check default range for expected selection/order without saved days - expected = self.GET_calendar_dict(-1, 366) - self.check_json_get('/calendar', expected) - self.check_json_get('/calendar?start=&end=', expected) + exp = ExpectedGetCalendar(-1, 366) + self.check_json_get('/calendar', exp) + self.check_json_get('/calendar?start=&end=', exp) # check with named days as delimiters - expected = self.GET_calendar_dict(-1, +1) - self.check_json_get('/calendar?start=yesterday&end=tomorrow', expected) + exp = ExpectedGetCalendar(-1, +1) + self.check_json_get('/calendar?start=yesterday&end=tomorrow', exp) # check zero-element range - expected = self.GET_calendar_dict(+1, 0) - self.check_json_get('/calendar?start=tomorrow&end=today', expected) + exp = ExpectedGetCalendar(+1, 0) + self.check_json_get('/calendar?start=tomorrow&end=today', exp) # check saved day shows up in results, proven by its comment - post_day: dict[str, object] = {'day_comment': 'foo', 'make_type': ''} - date = self._testing_date_in_n_days(-2) - self._post_day(f'date={date}', post_day) - start_date = self._testing_date_in_n_days(-5) - end_date = self._testing_date_in_n_days(+5) + start_date = _testing_date_in_n_days(-5) + date = _testing_date_in_n_days(-2) + end_date = _testing_date_in_n_days(+5) + exp = ExpectedGetCalendar(-5, +5) + self.post_exp_day([exp], {'day_comment': 'foo'}, date) url = f'/calendar?start={start_date}&end={end_date}' - expected = self.GET_calendar_dict(-5, +5) - assert isinstance(expected['_library'], dict) - expected['_library']['Day'][date]['comment'] = post_day['day_comment'] - self.check_json_get(url, expected) + self.check_json_get(url, exp) diff --git a/tests/processes.py b/tests/processes.py index eb94745..8ce3312 100644 --- a/tests/processes.py +++ b/tests/processes.py @@ -1,7 +1,8 @@ """Test Processes module.""" from typing import Any -from tests.utils import TestCaseWithDB, TestCaseWithServer, TestCaseSansDB -from plomtask.processes import Process, ProcessStep, ProcessStepsNode +from tests.utils import (TestCaseSansDB, TestCaseWithDB, TestCaseWithServer, + Expected) +from plomtask.processes import Process, ProcessStep from plomtask.conditions import Condition from plomtask.exceptions import HandledException, NotFoundException from plomtask.todos import Todo @@ -72,88 +73,90 @@ class TestsWithDB(TestCaseWithDB): self.assertEqual(sorted(r.enables), sorted(set2)) self.assertEqual(sorted(r.disables), sorted(set3)) - def test_Process_steps(self) -> None: - """Test addition, nesting, and non-recursion of ProcessSteps""" - # pylint: disable=too-many-locals - # pylint: disable=too-many-statements - p1, p2, p3 = self.three_processes() - assert isinstance(p1.id_, int) - assert isinstance(p2.id_, int) - assert isinstance(p3.id_, int) - steps_p1: list[ProcessStep] = [] - # add step of process p2 as first (top-level) step to p1 - s_p2_to_p1 = ProcessStep(None, p1.id_, p2.id_, None) - steps_p1 += [s_p2_to_p1] - p1.set_steps(self.db_conn, steps_p1) - p1_dict: dict[int, ProcessStepsNode] = {} - p1_dict[1] = ProcessStepsNode(p2, None, True, {}) - self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) - # add step of process p3 as second (top-level) step to p1 - s_p3_to_p1 = ProcessStep(None, p1.id_, p3.id_, None) - steps_p1 += [s_p3_to_p1] - p1.set_steps(self.db_conn, steps_p1) - p1_dict[2] = ProcessStepsNode(p3, None, True, {}) - self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) - # add step of process p3 as first (top-level) step to p2, - steps_p2: list[ProcessStep] = [] - s_p3_to_p2 = ProcessStep(None, p2.id_, p3.id_, None) - steps_p2 += [s_p3_to_p2] - p2.set_steps(self.db_conn, steps_p2) - # expect it as implicit sub-step of p1's second (p3) step - p2_dict = {3: ProcessStepsNode(p3, None, False, {})} - p1_dict[1].steps[3] = p2_dict[3] - self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) - # add step of process p2 as explicit sub-step to p1's second sub-step - s_p2_to_p1_first = ProcessStep(None, p1.id_, p2.id_, s_p3_to_p1.id_) - steps_p1 += [s_p2_to_p1_first] - p1.set_steps(self.db_conn, steps_p1) - seen_3 = ProcessStepsNode(p3, None, False, {}, False) - p1_dict[1].steps[3].seen = True - p1_dict[2].steps[4] = ProcessStepsNode(p2, s_p3_to_p1.id_, True, - {3: seen_3}) - self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) - # add step of process p3 as explicit sub-step to non-existing p1 - # sub-step (of id=999), expect it to become another p1 top-level step - s_p3_to_p1_999 = ProcessStep(None, p1.id_, p3.id_, 999) - steps_p1 += [s_p3_to_p1_999] - p1.set_steps(self.db_conn, steps_p1) - p1_dict[5] = ProcessStepsNode(p3, None, True, {}) - self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) - # add step of process p3 as explicit sub-step to p1's implicit p3 - # sub-step, expect it to become another p1 top-level step - s_p3_to_p1_impl_p3 = ProcessStep(None, p1.id_, p3.id_, s_p3_to_p2.id_) - steps_p1 += [s_p3_to_p1_impl_p3] - p1.set_steps(self.db_conn, steps_p1) - p1_dict[6] = ProcessStepsNode(p3, None, True, {}) - self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) - self.assertEqual(p1.used_as_step_by(self.db_conn), []) - self.assertEqual(p2.used_as_step_by(self.db_conn), [p1]) - self.assertEqual(p3.used_as_step_by(self.db_conn), [p1, p2]) - # # add step of process p3 as explicit sub-step to p1's first sub-step, - # # expect it to eliminate implicit p3 sub-step - # s_p3_to_p1_first_explicit = ProcessStep(None, p1.id_, p3.id_, - # s_p2_to_p1.id_) - # p1_dict[1].steps = {7: ProcessStepsNode(p3, 1, True, {})} - # p1_dict[2].steps[4].steps[3].seen = False - # steps_p1 += [s_p3_to_p1_first_explicit] - # p1.set_steps(self.db_conn, steps_p1) - # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) - # ensure implicit steps non-top explicit steps are shown - s_p3_to_p2_first = ProcessStep(None, p2.id_, p3.id_, s_p3_to_p2.id_) - steps_p2 += [s_p3_to_p2_first] - p2.set_steps(self.db_conn, steps_p2) - p1_dict[1].steps[3].steps[7] = ProcessStepsNode(p3, 3, False, {}, True) - p1_dict[2].steps[4].steps[3].steps[7] = ProcessStepsNode(p3, 3, False, - {}, False) - self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) - # ensure suppressed step nodes are hidden - assert isinstance(s_p3_to_p2.id_, int) - p1.set_step_suppressions(self.db_conn, [s_p3_to_p2.id_]) - p1_dict[1].steps[3].steps = {} - p1_dict[1].steps[3].is_suppressed = True - p1_dict[2].steps[4].steps[3].steps = {} - p1_dict[2].steps[4].steps[3].is_suppressed = True - self.assertEqual(p1.get_steps(self.db_conn), p1_dict) + # def test_Process_steps(self) -> None: + # """Test addition, nesting, and non-recursion of ProcessSteps""" + # # pylint: disable=too-many-locals + # # pylint: disable=too-many-statements + # p1, p2, p3 = self.three_processes() + # assert isinstance(p1.id_, int) + # assert isinstance(p2.id_, int) + # assert isinstance(p3.id_, int) + # steps_p1: list[ProcessStep] = [] + # # add step of process p2 as first (top-level) step to p1 + # s_p2_to_p1 = ProcessStep(None, p1.id_, p2.id_, None) + # steps_p1 += [s_p2_to_p1] + # p1.set_steps(self.db_conn, steps_p1) + # p1_dict: dict[int, ProcessStepsNode] = {} + # p1_dict[1] = ProcessStepsNode(p2, None, True, {}) + # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + # # add step of process p3 as second (top-level) step to p1 + # s_p3_to_p1 = ProcessStep(None, p1.id_, p3.id_, None) + # steps_p1 += [s_p3_to_p1] + # p1.set_steps(self.db_conn, steps_p1) + # p1_dict[2] = ProcessStepsNode(p3, None, True, {}) + # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + # # add step of process p3 as first (top-level) step to p2, + # steps_p2: list[ProcessStep] = [] + # s_p3_to_p2 = ProcessStep(None, p2.id_, p3.id_, None) + # steps_p2 += [s_p3_to_p2] + # p2.set_steps(self.db_conn, steps_p2) + # # expect it as implicit sub-step of p1's second (p3) step + # p2_dict = {3: ProcessStepsNode(p3, None, False, {})} + # p1_dict[1].steps[3] = p2_dict[3] + # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + # # add step of process p2 as explicit sub-step to p1's second sub-step + # s_p2_to_p1_first = ProcessStep(None, p1.id_, p2.id_, s_p3_to_p1.id_) + # steps_p1 += [s_p2_to_p1_first] + # p1.set_steps(self.db_conn, steps_p1) + # seen_3 = ProcessStepsNode(p3, None, False, {}, False) + # p1_dict[1].steps[3].seen = True + # p1_dict[2].steps[4] = ProcessStepsNode(p2, s_p3_to_p1.id_, True, + # {3: seen_3}) + # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + # # add step of process p3 as explicit sub-step to non-existing p1 + # # sub-step (of id=999), expect it to become another p1 top-level step + # s_p3_to_p1_999 = ProcessStep(None, p1.id_, p3.id_, 999) + # steps_p1 += [s_p3_to_p1_999] + # p1.set_steps(self.db_conn, steps_p1) + # p1_dict[5] = ProcessStepsNode(p3, None, True, {}) + # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + # # add step of process p3 as explicit sub-step to p1's implicit p3 + # # sub-step, expect it to become another p1 top-level step + # s_p3_to_p1_impl_p3 = ProcessStep(None, p1.id_, p3.id_, + # s_p3_to_p2.id_) + # steps_p1 += [s_p3_to_p1_impl_p3] + # p1.set_steps(self.db_conn, steps_p1) + # p1_dict[6] = ProcessStepsNode(p3, None, True, {}) + # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + # self.assertEqual(p1.used_as_step_by(self.db_conn), []) + # self.assertEqual(p2.used_as_step_by(self.db_conn), [p1]) + # self.assertEqual(p3.used_as_step_by(self.db_conn), [p1, p2]) + # # # add step of process p3 as explicit sub-step to p1's first + # # # sub-step, expect it to eliminate implicit p3 sub-step + # # s_p3_to_p1_first_explicit = ProcessStep(None, p1.id_, p3.id_, + # # s_p2_to_p1.id_) + # # p1_dict[1].steps = {7: ProcessStepsNode(p3, 1, True, {})} + # # p1_dict[2].steps[4].steps[3].seen = False + # # steps_p1 += [s_p3_to_p1_first_explicit] + # # p1.set_steps(self.db_conn, steps_p1) + # # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + # # ensure implicit steps non-top explicit steps are shown + # s_p3_to_p2_first = ProcessStep(None, p2.id_, p3.id_, s_p3_to_p2.id_) + # steps_p2 += [s_p3_to_p2_first] + # p2.set_steps(self.db_conn, steps_p2) + # p1_dict[1].steps[3].steps[7] = ProcessStepsNode(p3, 3, False, {}, + # True) + # p1_dict[2].steps[4].steps[3].steps[7] = ProcessStepsNode( + # p3, 3, False, {}, False) + # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + # # ensure suppressed step nodes are hidden + # assert isinstance(s_p3_to_p2.id_, int) + # p1.set_step_suppressions(self.db_conn, [s_p3_to_p2.id_]) + # p1_dict[1].steps[3].steps = {} + # p1_dict[1].steps[3].is_suppressed = True + # p1_dict[2].steps[4].steps[3].steps = {} + # p1_dict[2].steps[4].steps[3].is_suppressed = True + # self.assertEqual(p1.get_steps(self.db_conn), p1_dict) def test_Process_conditions(self) -> None: """Test setting Process.conditions/enables/disables.""" @@ -236,13 +239,33 @@ class TestsWithDBForProcessStep(TestCaseWithDB): self.check_identity_with_cache_and_db([]) +class ExpectedGetProcesses(Expected): + """Builder of expectations for GET /processes.""" + _default_dict = {'sort_by': 'title', 'pattern': ''} + + def recalc(self) -> None: + """Update internal dictionary by subclass-specific rules.""" + super().recalc() + self._fields['processes'] = self.as_ids(self.lib_all('Process')) + + class TestsWithServer(TestCaseWithServer): """Module tests against our HTTP server/handler (and database).""" + def _post_process(self, id_: int = 1, + form_data: dict[str, Any] | None = None + ) -> dict[str, Any]: + """POST basic Process.""" + if not form_data: + form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1} + self.check_post(form_data, f'/process?id={id_}', + redir=f'/process?id={id_}') + return form_data + def test_do_POST_process(self) -> None: """Test POST /process and its effect on the database.""" self.assertEqual(0, len(Process.all(self.db_conn))) - form_data = self.post_process() + form_data = self._post_process() self.assertEqual(1, len(Process.all(self.db_conn))) self.check_post(form_data, '/process?id=FOO', 400) self.check_post(form_data | {'effort': 'foo'}, '/process?id=', 400) @@ -253,14 +276,14 @@ class TestsWithServer(TestCaseWithServer): '/process?id=', 400) self.assertEqual(1, len(Process.all(self.db_conn))) form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.0} - self.post_process(2, form_data | {'conditions': []}) + self._post_process(2, form_data | {'conditions': []}) self.check_post(form_data | {'conditions': [1]}, '/process?id=', 404) self.check_post({'title': 'foo', 'description': 'foo', 'is_active': False}, '/condition', 302, '/condition?id=1') - self.post_process(3, form_data | {'conditions': [1]}) - self.post_process(4, form_data | {'disables': [1]}) - self.post_process(5, form_data | {'enables': [1]}) + self._post_process(3, form_data | {'conditions': [1]}) + self._post_process(4, form_data | {'disables': [1]}) + self._post_process(5, form_data | {'enables': [1]}) form_data['delete'] = '' self.check_post(form_data, '/process?id=', 404) self.check_post(form_data, '/process?id=6', 404) @@ -269,12 +292,12 @@ class TestsWithServer(TestCaseWithServer): def test_do_POST_process_steps(self) -> None: """Test behavior of ProcessStep posting.""" # pylint: disable=too-many-statements - form_data_1 = self.post_process(1) - self.post_process(2) - self.post_process(3) + form_data_1 = self._post_process(1) + self._post_process(2) + self._post_process(3) # post first (top-level) step of process 2 to process 1 form_data_1['new_top_step'] = [2] - self.post_process(1, form_data_1) + self._post_process(1, form_data_1) retrieved_process = Process.by_id(self.db_conn, 1) self.assertEqual(len(retrieved_process.explicit_steps), 1) retrieved_step = retrieved_process.explicit_steps[0] @@ -285,7 +308,7 @@ class TestsWithServer(TestCaseWithServer): # post empty steps list to process, expect clean slate, and old step to # completely disappear form_data_1['new_top_step'] = [] - self.post_process(1, form_data_1) + self._post_process(1, form_data_1) retrieved_process = Process.by_id(self.db_conn, 1) self.assertEqual(retrieved_process.explicit_steps, []) assert retrieved_step_id is not None @@ -293,7 +316,7 @@ class TestsWithServer(TestCaseWithServer): ProcessStep.by_id(self.db_conn, retrieved_step_id) # post new first (top_level) step of process 3 to process 1 form_data_1['new_top_step'] = [3] - self.post_process(1, form_data_1) + self._post_process(1, form_data_1) retrieved_process = Process.by_id(self.db_conn, 1) retrieved_step = retrieved_process.explicit_steps[0] self.assertEqual(retrieved_step.step_process_id, 3) @@ -302,7 +325,7 @@ class TestsWithServer(TestCaseWithServer): # post to process steps list without keeps, expect clean slate form_data_1['new_top_step'] = [] form_data_1['steps'] = [retrieved_step.id_] - self.post_process(1, form_data_1) + self._post_process(1, form_data_1) retrieved_process = Process.by_id(self.db_conn, 1) self.assertEqual(retrieved_process.explicit_steps, []) # post to process empty steps list but keep, expect 400 @@ -315,7 +338,7 @@ class TestsWithServer(TestCaseWithServer): self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1') # post to process steps list with keep and process ID, expect 200 form_data_1[f'step_{retrieved_step_id}_process_id'] = [2] - self.post_process(1, form_data_1) + self._post_process(1, form_data_1) retrieved_process = Process.by_id(self.db_conn, 1) self.assertEqual(len(retrieved_process.explicit_steps), 1) retrieved_step = retrieved_process.explicit_steps[0] @@ -336,7 +359,7 @@ class TestsWithServer(TestCaseWithServer): form_data_1['new_top_step'] = [3] form_data_1['steps'] = [retrieved_step.id_] form_data_1['keep_step'] = [retrieved_step.id_] - self.post_process(1, form_data_1) + self._post_process(1, form_data_1) retrieved_process = Process.by_id(self.db_conn, 1) self.assertEqual(len(retrieved_process.explicit_steps), 2) retrieved_step_0 = retrieved_process.explicit_steps[1] @@ -368,7 +391,7 @@ class TestsWithServer(TestCaseWithServer): # post sub-step to step form_data_1[f'step_{retrieved_step_0.id_}_process_id'] = [3] form_data_1[f'new_step_to_{retrieved_step_0.id_}'] = [3] - self.post_process(1, form_data_1) + self._post_process(1, form_data_1) retrieved_process = Process.by_id(self.db_conn, 1) self.assertEqual(len(retrieved_process.explicit_steps), 3) retrieved_step_0 = retrieved_process.explicit_steps[1] @@ -403,97 +426,71 @@ class TestsWithServer(TestCaseWithServer): # of ID=1 here so we know the 404 comes from step_to=2 etc. (that tie # the Process displayed by /process to others), not from not finding # the main Process itself - self.post_process(1) + self.post_exp_process([], {}, 1) self.check_get('/process?id=1&step_to=2', 404) self.check_get('/process?id=1&has_step=2', 404) - @classmethod - def GET_processes_dict(cls, procs: list[dict[str, object]] - ) -> dict[str, object]: - """Return JSON of GET /processes to expect.""" - library = {'Process': cls.as_refs(procs)} if procs else {} - d: dict[str, object] = {'processes': cls.as_id_list(procs), - 'sort_by': 'title', - 'pattern': '', - '_library': library} - return d - def test_GET_processes(self) -> None: """Test GET /processes.""" # pylint: disable=too-many-statements # test empty result on empty DB, default-settings on empty params - expected = self.GET_processes_dict([]) - self.check_json_get('/processes', expected) + exp = ExpectedGetProcesses() + self.check_json_get('/processes', exp) # test on meaningless non-empty params (incl. entirely un-used key), # that 'sort_by' default to 'title' (even if set to something else, as # long as without handler) and 'pattern' get preserved - expected['pattern'] = 'bar' # preserved despite zero effect! + exp.set('pattern', 'bar') # preserved despite zero effect! url = '/processes?sort_by=foo&pattern=bar&foo=x' - self.check_json_get(url, expected) + self.check_json_get(url, exp) # test non-empty result, automatic (positive) sorting by title - post1: dict[str, Any] - post2: dict[str, Any] - post3: dict[str, Any] - post1 = {'title': 'foo', 'description': 'oof', 'effort': 1.0} - post2 = {'title': 'bar', 'description': 'rab', 'effort': 1.1} - post2['new_top_step'] = 1 - post3 = {'title': 'baz', 'description': 'zab', 'effort': 0.9} - post3['new_top_step'] = 1 - self.post_process(1, post1) - self.post_process(2, post2) - self.post_process(3, post3) - post3['new_top_step'] = 2 - post3['keep_step'] = 2 - post3['steps'] = [2] - post3['step_2_process_id'] = 1 - self.post_process(3, post3) - proc1 = self.proc_as_dict(1, post1['title'], - post1['description'], post1['effort']) - proc2 = self.proc_as_dict(2, post2['title'], - post2['description'], post2['effort']) - proc3 = self.proc_as_dict(3, post3['title'], - post3['description'], post3['effort']) - proc2['explicit_steps'] = [1] - proc3['explicit_steps'] = [2, 3] - step1 = self.procstep_as_dict(1, 2, 1) - step2 = self.procstep_as_dict(2, 3, 1) - step3 = self.procstep_as_dict(3, 3, 2) - expected = self.GET_processes_dict([proc2, proc3, proc1]) - assert isinstance(expected['_library'], dict) - expected['_library']['ProcessStep'] = self.as_refs([step1, step2, - step3]) - self.check_json_get('/processes', expected) + proc1_post = {'title': 'foo', 'description': 'oof', 'effort': 1.0} + self.post_exp_process([exp], proc1_post, 1) + proc2_post = {'title': 'bar', 'description': 'rab', 'effort': 1.1} + self.post_exp_process([exp], proc2_post | {'new_top_step': [1]}, 2) + proc3_post = {'title': 'baz', 'description': 'zab', 'effort': 0.9} + self.post_exp_process([exp], proc3_post | {'new_top_step': [1]}, 3) + proc3_post = proc3_post | {'new_top_step': [2], 'keep_step': [2], + 'steps': [2], 'step_2_process_id': 1} + self.post_exp_process([exp], proc3_post, 3) + exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 2, 1), + exp.procstep_as_dict(2, 3, 1), + exp.procstep_as_dict(3, 3, 2)]) + exp.lib_get('Process', '') + exp.set('pattern', '') + exp.force('processes', [2, 3, 1]) + self.check_json_get('/processes', exp) # test other sortings - expected['sort_by'] = '-title' - expected['processes'] = self.as_id_list([proc1, proc3, proc2]) - self.check_json_get('/processes?sort_by=-title', expected) - expected['sort_by'] = 'effort' - expected['processes'] = self.as_id_list([proc3, proc1, proc2]) - self.check_json_get('/processes?sort_by=effort', expected) - expected['sort_by'] = '-effort' - expected['processes'] = self.as_id_list([proc2, proc1, proc3]) - self.check_json_get('/processes?sort_by=-effort', expected) - expected['sort_by'] = 'steps' - expected['processes'] = self.as_id_list([proc1, proc2, proc3]) - self.check_json_get('/processes?sort_by=steps', expected) - expected['sort_by'] = '-steps' - expected['processes'] = self.as_id_list([proc3, proc2, proc1]) - self.check_json_get('/processes?sort_by=-steps', expected) - expected['sort_by'] = 'owners' - expected['processes'] = self.as_id_list([proc3, proc2, proc1]) - self.check_json_get('/processes?sort_by=owners', expected) - expected['sort_by'] = '-owners' - expected['processes'] = self.as_id_list([proc1, proc2, proc3]) - self.check_json_get('/processes?sort_by=-owners', expected) + exp.set('sort_by', '-title') + exp.force('processes', [1, 3, 2]) + self.check_json_get('/processes?sort_by=-title', exp) + exp.set('sort_by', 'effort') + exp.force('processes', [3, 1, 2]) + self.check_json_get('/processes?sort_by=effort', exp) + exp.set('sort_by', '-effort') + exp.force('processes', [2, 1, 3]) + self.check_json_get('/processes?sort_by=-effort', exp) + exp.set('sort_by', 'steps') + exp.force('processes', [1, 2, 3]) + self.check_json_get('/processes?sort_by=steps', exp) + exp.set('sort_by', '-steps') + exp.force('processes', [3, 2, 1]) + self.check_json_get('/processes?sort_by=-steps', exp) + exp.set('sort_by', 'owners') + exp.force('processes', [3, 2, 1]) + self.check_json_get('/processes?sort_by=owners', exp) + exp.set('sort_by', '-owners') + exp.force('processes', [1, 2, 3]) + self.check_json_get('/processes?sort_by=-owners', exp) # test pattern matching on title - expected = self.GET_processes_dict([proc2, proc3]) - assert isinstance(expected['_library'], dict) - expected['pattern'] = 'ba' - expected['_library']['ProcessStep'] = self.as_refs([step1, step2, - step3]) - self.check_json_get('/processes?pattern=ba', expected) + exp.set('pattern', 'ba') + exp.set('sort_by', 'title') + exp.lib_del('Process', '1') + exp.force('processes', [2, 3]) + self.check_json_get('/processes?pattern=ba', exp) # test pattern matching on description - expected['processes'] = self.as_id_list([proc1]) - expected['_library'] = {'Process': self.as_refs([proc1])} - expected['pattern'] = 'of' - self.check_json_get('/processes?pattern=of', expected) + exp.set('pattern', 'of') + exp.lib_wipe('Process') + exp.lib_wipe('ProcessStep') + self.post_exp_process([exp], {'description': 'oof', 'effort': 1.0}, 1) + exp.force('processes', [1]) + self.check_json_get('/processes?pattern=of', exp) diff --git a/tests/todos.py b/tests/todos.py index 25fa05b..41adca9 100644 --- a/tests/todos.py +++ b/tests/todos.py @@ -1,6 +1,7 @@ """Test Todos module.""" from typing import Any -from tests.utils import TestCaseSansDB, TestCaseWithDB, TestCaseWithServer +from tests.utils import (TestCaseSansDB, TestCaseWithDB, TestCaseWithServer, + Expected) from plomtask.todos import Todo, TodoNode from plomtask.processes import Process, ProcessStep from plomtask.conditions import Condition @@ -169,8 +170,8 @@ class TestsWithDB(TestCaseWithDB, TestCaseSansDB): cmp_1_dict = todo_node_as_dict(node_0) self.assertEqual(cmp_0_dict, cmp_1_dict) - def test_Todo_create_with_children(self) -> None: - """Test parenthood guarantees of Todo.create_with_children.""" + def test_Todo_ensure_children(self) -> None: + """Test parenthood guarantees of Todo.ensure_children.""" assert isinstance(self.proc.id_, int) proc2 = Process(None) proc2.save(self.db_conn) @@ -194,12 +195,15 @@ class TestsWithDB(TestCaseWithDB, TestCaseSansDB): todo_ignore.save(self.db_conn) self.assertEqual(todo_ignore.children, []) # test create_with_children on step-less does nothing - todo_1 = Todo.create_with_children(self.db_conn, self.proc.id_, - self.date1) + todo_1 = Todo(None, self.proc, False, self.date1) + todo_1.save(self.db_conn) + todo_1.ensure_children(self.db_conn) self.assertEqual(todo_1.children, []) self.assertEqual(len(Todo.all(self.db_conn)), 2) # test create_with_children adopts and creates, and down tree too - todo_2 = Todo.create_with_children(self.db_conn, proc2.id_, self.date1) + todo_2 = Todo(None, proc2, False, self.date1) + todo_2.save(self.db_conn) + todo_2.ensure_children(self.db_conn) self.assertEqual(3, len(todo_2.children)) self.assertEqual(todo_1, todo_2.children[0]) self.assertEqual(self.proc, todo_2.children[2].process) @@ -209,68 +213,75 @@ class TestsWithDB(TestCaseWithDB, TestCaseSansDB): self.assertEqual(todo_3.children[0].process, proc4) -class TestsWithServer(TestCaseWithServer): - """Tests against our HTTP server/handler (and database).""" - - def setUp(self) -> None: - super().setUp() - self._proc1_form_data: Any = self.post_process(1) - self._date = '2024-01-01' - - @classmethod - def GET_todo_dict(cls, - target_id: int, - todos: list[dict[str, object]], - processes: list[dict[str, object]], - process_steps: list[dict[str, object]] | None = None, - conditions: list[dict[str, object]] | None = None - ) -> dict[str, object]: - """Return JSON of GET /todo to expect.""" - # pylint: disable=too-many-arguments - library = {'Todo': cls.as_refs(todos), - 'Process': cls.as_refs(processes)} - if process_steps: - library['ProcessStep'] = cls.as_refs(process_steps) - conditions = conditions if conditions else [] - if conditions: - library['Condition'] = cls.as_refs(conditions) - return {'todo': target_id, - 'steps_todo_to_process': [], - 'adoption_candidates_for': {}, - 'process_candidates': [p['id'] for p in processes], - 'todo_candidates': [t['id'] for t in todos - if t['id'] != target_id], - 'condition_candidates': [c['id'] for c in conditions], - '_library': library} +class ExpectedGetTodo(Expected): + """Builder of expectations for GET /todo.""" + + def __init__(self, + todo_id: int, + *args: Any, **kwargs: Any) -> None: + self._fields = {'todo': todo_id, + 'steps_todo_to_process': []} + super().__init__(*args, **kwargs) + + def recalc(self) -> None: + """Update internal dictionary by subclass-specific rules.""" + + def walk_steps(step: dict[str, Any]) -> None: + if not step['todo']: + proc_id = step['process'] + cands = self.as_ids( + [t for t in todos if proc_id == t['process_id'] + and t['id'] in self._fields['todo_candidates']]) + self._fields['adoption_candidates_for'][str(proc_id)] = cands + for child in step['children']: + walk_steps(child) + + super().recalc() + self.lib_wipe('Day') + todos = self.lib_all('Todo') + procs = self.lib_all('Process') + conds = self.lib_all('Condition') + self._fields['todo_candidates'] = self.as_ids( + [t for t in todos if t['id'] != self._fields['todo']]) + self._fields['process_candidates'] = self.as_ids(procs) + self._fields['condition_candidates'] = self.as_ids(conds) + self._fields['adoption_candidates_for'] = {} + for step in self._fields['steps_todo_to_process']: + walk_steps(step) @staticmethod - def _step_as_dict(node_id: int, - children: list[dict[str, object]], - process: int | None = None, - todo: int | None = None, - fillable: bool = False, - ) -> dict[str, object]: + def step_as_dict(node_id: int, + children: list[dict[str, object]], + process: int | None = None, + todo: int | None = None, + fillable: bool = False, + ) -> dict[str, object]: + """Return JSON of TodoOrProcStepsNode to expect.""" return {'node_id': node_id, 'children': children, 'process': process, 'fillable': fillable, 'todo': todo} - def _make_todo_via_day_post(self, proc_id: int) -> None: - payload = {'day_comment': '', - 'new_todo': proc_id, - 'make_type': 'empty'} - self.check_post(payload, f'/day?date={self._date}&make_type=empty') + +class TestsWithServer(TestCaseWithServer): + """Tests against our HTTP server/handler (and database).""" + + def _post_exp_todo( + self, id_: int, payload: dict[str, Any], exp: Expected) -> None: + self.check_post(payload, f'/todo?id={id_}') + exp.set_todo_from_post(id_, payload) def test_basic_fail_POST_todo(self) -> None: """Test basic malformed/illegal POST /todo requests.""" + self.post_exp_process([], {}, 1) # test we cannot just POST into non-existing Todo self.check_post({}, '/todo', 404) self.check_post({}, '/todo?id=FOO', 400) self.check_post({}, '/todo?id=0', 404) self.check_post({}, '/todo?id=1', 404) # test malformed values on existing Todo - self._make_todo_via_day_post(1) + self.post_exp_day([], {'new_todo': [1]}) for name in [ 'adopt', 'effort', 'make_full', 'make_empty', 'step_filler', 'conditions', 'disables', 'blockers', 'enables']: @@ -282,262 +293,209 @@ class TestsWithServer(TestCaseWithServer): def test_basic_POST_todo(self) -> None: """Test basic POST /todo manipulations.""" - self._make_todo_via_day_post(1) + exp = ExpectedGetTodo(1) + self.post_exp_process([exp], {}, 1) + self.post_exp_day([exp], {'new_todo': [1]}) # test posting naked entity at first changes nothing - todo_dict = self.todo_as_dict(1, 1) - proc_dict = self.proc_as_dict(**self._proc1_form_data) - expected = self.GET_todo_dict(1, [todo_dict], [proc_dict]) - self.check_json_get('/todo?id=1', expected) + self.check_json_get('/todo?id=1', exp) self.check_post({}, '/todo?id=1') - self.check_json_get('/todo?id=1', expected) + self.check_json_get('/todo?id=1', exp) # test posting doneness, comment, calendarization, effort - todo_post = {'done': '', 'calendarize': '', 'comment': 'foo', - 'effort': 2.3} - todo_dict = self.todo_as_dict(1, 1, is_done=True, calendarize=True, - comment='foo', effort=2.3) - expected = self.GET_todo_dict(1, [todo_dict], [proc_dict]) - self.check_post(todo_post, '/todo?id=1') - self.check_json_get('/todo?id=1', expected) + todo_post = {'done': '', 'calendarize': '', + 'comment': 'foo', 'effort': 2.3} + self._post_exp_todo(1, todo_post, exp) + self.check_json_get('/todo?id=1', exp) # test implicitly un-setting all of those except effort by empty post self.check_post({}, '/todo?id=1') - todo_dict = self.todo_as_dict(1, 1, effort=2.3) - expected = self.GET_todo_dict(1, [todo_dict], [proc_dict]) - self.check_json_get('/todo?id=1', expected) - # test empty effort post can be explicitly unset by "" post + exp.lib_set('Todo', [exp.todo_as_dict(effort=2.3)]) + self.check_json_get('/todo?id=1', exp) + # test effort post can be explicitly unset by "effort":"" post self.check_post({'effort': ''}, '/todo?id=1') - todo_dict['effort'] = None - self.check_json_get('/todo?id=1', expected) + exp.lib_set('Todo', [exp.todo_as_dict(effort=None)]) + self.check_json_get('/todo?id=1', exp) # test Condition posts c1_post = {'title': 'foo', 'description': 'oof', 'is_active': False} c2_post = {'title': 'bar', 'description': 'rab', 'is_active': True} - self.check_post(c1_post, '/condition', redir='/condition?id=1') - self.check_post(c2_post, '/condition', redir='/condition?id=2') - c1_dict = self.cond_as_dict(1, False, ['foo'], ['oof']) - c2_dict = self.cond_as_dict(2, True, ['bar'], ['rab']) - conditions = [c1_dict, c2_dict] + self.post_exp_cond([exp], 1, c1_post, '?id=1', '?id=1') + self.post_exp_cond([exp], 2, c2_post, '?id=2', '?id=2') todo_post = {'conditions': [1], 'disables': [1], 'blockers': [2], 'enables': [2]} - for k, v in todo_post.items(): - todo_dict[k] = v - self.check_post(todo_post, '/todo?id=1') - expected = self.GET_todo_dict(1, [todo_dict], [proc_dict], - conditions=conditions) - self.check_json_get('/todo?id=1', expected) + self._post_exp_todo(1, todo_post, exp) + self.check_json_get('/todo?id=1', exp) def test_POST_todo_deletion(self) -> None: """Test deletions via POST /todo.""" - self._make_todo_via_day_post(1) - todo_dict = self.todo_as_dict(1, process_id=1) - proc_dict = self.proc_as_dict(**self._proc1_form_data) - expected = self.GET_todo_dict(1, [todo_dict], [proc_dict]) + exp = ExpectedGetTodo(1) + self.post_exp_process([exp], {}, 1) # test failure of deletion on non-existing Todo self.check_post({'delete': ''}, '/todo?id=2', 404, '/') # test deletion of existing Todo + self.post_exp_day([exp], {'new_todo': [1]}) self.check_post({'delete': ''}, '/todo?id=1', 302, '/') self.check_get('/todo?id=1', 404) + exp.lib_del('Todo', 1) # test deletion of adopted Todo - self._make_todo_via_day_post(1) - self._make_todo_via_day_post(1) + self.post_exp_day([exp], {'new_todo': [1]}) + self.post_exp_day([exp], {'new_todo': [1]}) self.check_post({'adopt': 2}, '/todo?id=1') self.check_post({'delete': ''}, '/todo?id=2', 302, '/') - self.check_json_get('/todo?id=1', expected) + exp.lib_del('Todo', 2) + self.check_get('/todo?id=2', 404) + self.check_json_get('/todo?id=1', exp) # test deletion of adopting Todo - self._make_todo_via_day_post(1) + self.post_exp_day([exp], {'new_todo': [1]}) self.check_post({'adopt': 2}, '/todo?id=1') self.check_post({'delete': ''}, '/todo?id=1', 302, '/') - todo_dict['id'] = 2 - expected = self.GET_todo_dict(2, [todo_dict], [proc_dict]) - self.check_json_get('/todo?id=2', expected) + exp.set('todo', 2) + exp.lib_del('Todo', 1) + self.check_json_get('/todo?id=2', exp) # test cannot delete Todo with comment or effort self.check_post({'comment': 'foo'}, '/todo?id=2') self.check_post({'delete': ''}, '/todo?id=2', 500, '/') self.check_post({'effort': 5}, '/todo?id=2') self.check_post({'delete': ''}, '/todo?id=2', 500, '/') - # test deletion via effort < 0, but only once deletable + # test deletion via effort < 0, but only if deletable self.check_post({'effort': -1, 'comment': 'foo'}, '/todo?id=2') - todo_dict['comment'] = 'foo' - todo_dict['effort'] = -1 - self.check_json_get('/todo?id=2', expected) self.check_post({}, '/todo?id=2') self.check_get('/todo?id=2', 404) def test_POST_todo_adoption(self) -> None: """Test adoption via POST /todo with "adopt".""" - # pylint: disable=too-many-locals - # pylint: disable=too-many-statements # post two Todos to Day, have first adopt second - self._make_todo_via_day_post(1) - self._make_todo_via_day_post(1) - proc1_dict = self.proc_as_dict(**self._proc1_form_data) - todo1_dict = self.todo_as_dict(1, process_id=1, children=[2]) - todo2_dict = self.todo_as_dict(2, process_id=1, parents=[1]) - todos = [todo1_dict, todo2_dict] - expected = self.GET_todo_dict(1, todos, [proc1_dict]) - expected['steps_todo_to_process'] = [self._step_as_dict(1, [], todo=2)] - self.check_post({'adopt': 2}, '/todo?id=1') - self.check_json_get('/todo?id=1', expected) + exp = ExpectedGetTodo(1) + self.post_exp_process([exp], {}, 1) + self.post_exp_day([exp], {'new_todo': [1]}) + self.post_exp_day([exp], {'new_todo': [1]}) + self._post_exp_todo(1, {'adopt': 2}, exp) + exp.set('steps_todo_to_process', [exp.step_as_dict(1, [], todo=2)]) + self.check_json_get('/todo?id=1', exp) # test Todo un-adopting by just not sending an adopt - self.check_post({}, '/todo?id=1') - todo1_dict['children'] = [] - todo2_dict['parents'] = [] - expected['steps_todo_to_process'] = [] - self.check_json_get('/todo?id=1', expected) + self._post_exp_todo(1, {}, exp) + exp.set('steps_todo_to_process', []) + self.check_json_get('/todo?id=1', exp) # test fail on trying to adopt non-existing Todo self.check_post({'adopt': 3}, '/todo?id=1', 404) # test cannot self-adopt self.check_post({'adopt': 1}, '/todo?id=1', 400) # test cannot do 1-step circular adoption - self.check_post({'adopt': 1}, '/todo?id=2') - todo1_dict['parents'] = [2] - todo2_dict['children'] = [1] + self._post_exp_todo(2, {'adopt': 1}, exp) self.check_post({'adopt': 2}, '/todo?id=1', 400) # test cannot do 2-step circular adoption - self._make_todo_via_day_post(1) - self.check_post({'adopt': 2}, '/todo?id=3') - todo3_dict = self.todo_as_dict(3, process_id=1, children=[2]) - todo2_dict['parents'] = [3] - todos += [todo3_dict] + self.post_exp_day([exp], {'new_todo': [1]}) + self._post_exp_todo(3, {'adopt': 2}, exp) self.check_post({'adopt': 3}, '/todo?id=1', 400) # test can adopt Todo into ProcessStep chain via its Process (with key # 'step_filler' equivalent to single-element 'adopt' if intable) - proc_post = {'title': 'A', 'description': '', 'effort': 1.0} - self.post_process(3, proc_post) - self.post_process(2, proc_post) - self.post_process(1, self._proc1_form_data | {'new_top_step': [2, 3]}) - self._make_todo_via_day_post(2) - self._make_todo_via_day_post(3) - self.check_post({'step_filler': 5, 'adopt': [4]}, '/todo?id=1') - proc3_dict = self.proc_as_dict(3) - proc2_dict = self.proc_as_dict(2) - proc1_dict['explicit_steps'] = [1, 2] - procs = [proc1_dict, proc2_dict, proc3_dict] - procsteps = [self.procstep_as_dict(1, 1, 2), - self.procstep_as_dict(2, 1, 3)] - todo1_dict['children'] = [4, 5] - todo4_dict = self.todo_as_dict(4, process_id=2, parents=[1]) - todo5_dict = self.todo_as_dict(5, process_id=3, parents=[1]) - todos += [todo4_dict, todo5_dict] - expected = self.GET_todo_dict(1, todos, procs, procsteps) - step_proc2 = self._step_as_dict(1, [], 2, 4, True) - step_proc3 = self._step_as_dict(2, [], 3, 5, True) - expected['steps_todo_to_process'] = [step_proc2, step_proc3] - self.check_json_get('/todo?id=1', expected) + self.post_exp_process([exp], {}, 2) + self.post_exp_process([exp], {}, 3) + self.post_exp_process([exp], {'new_top_step': [2, 3]}, 1) + exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 2), + exp.procstep_as_dict(2, 1, 3)]) + step1_proc2 = exp.step_as_dict(1, [], 2, None, True) + step2_proc3 = exp.step_as_dict(2, [], 3, None, True) + exp.set('steps_todo_to_process', [step1_proc2, step2_proc3]) + self.post_exp_day([exp], {'new_todo': [2]}) + self.post_exp_day([exp], {'new_todo': [3]}) + self.check_json_get('/todo?id=1', exp) + self._post_exp_todo(1, {'step_filler': 5, 'adopt': [4]}, exp) + step1_proc2 = exp.step_as_dict(1, [], 2, 4, True) + step2_proc3 = exp.step_as_dict(2, [], 3, 5, True) + exp.set('steps_todo_to_process', [step1_proc2, step2_proc3]) + self.check_json_get('/todo?id=1', exp) # test 'ignore' values for 'step_filler' are ignored, and intable # 'step_filler' values are interchangeable with those of 'adopt' todo_post = {'adopt': 5, 'step_filler': ['ignore', 4]} self.check_post(todo_post, '/todo?id=1') - self.check_json_get('/todo?id=1', expected) - # test cannot adopt into non-top-level elements of chain - self.post_process(4, proc_post) - self.post_process(3, proc_post | {'new_top_step': 4, 'step_of': [1]}) - proc4_dict = self.proc_as_dict(4) - proc3_dict['explicit_steps'] = [3] - procs += [proc4_dict] - procsteps += [self.procstep_as_dict(3, 3, 4)] - step_proc4 = self._step_as_dict(3, [], 4, None, True) - step_proc3['children'] = [step_proc4] - self._make_todo_via_day_post(4) - self.check_post({'adopt': [4, 5, 6]}, '/todo?id=1') - todo6_dict = self.todo_as_dict(6, process_id=4, parents=[1]) - todo1_dict['children'] = [4, 5, 6] - todos += [todo6_dict] - expected = self.GET_todo_dict(1, todos, procs, procsteps) - step2_proc4 = self._step_as_dict(4, [], None, 6, False) - expected['steps_todo_to_process'] = [step_proc2, step_proc3, - step2_proc4] - expected['adoption_candidates_for'] = {'4': [6]} - self.check_json_get('/todo?id=1', expected) + self.check_json_get('/todo?id=1', exp) + # test cannot adopt into non-top-level elements of chain, instead + # creating new top-level steps when adopting of respective Process + self.post_exp_process([exp], {}, 4) + self.post_exp_process([exp], {'new_top_step': 4, 'step_of': [1]}, 3) + exp.lib_set('ProcessStep', [exp.procstep_as_dict(3, 3, 4)]) + step3_proc4 = exp.step_as_dict(3, [], 4, None, True) + step2_proc3 = exp.step_as_dict(2, [step3_proc4], 3, 5, True) + exp.set('steps_todo_to_process', [step1_proc2, step2_proc3]) + self.post_exp_day([exp], {'new_todo': [4]}) + self._post_exp_todo(1, {'adopt': [4, 5, 6]}, exp) + step4_todo6 = exp.step_as_dict(4, [], None, 6, False) + exp.set('steps_todo_to_process', [step1_proc2, step2_proc3, + step4_todo6]) + self.check_json_get('/todo?id=1', exp) def test_POST_todo_make_full(self) -> None: """Test creation and adoption via POST /todo with "make_full".""" - # pylint: disable=too-many-locals # create chain of Processes - proc_post = {'title': 'A', 'description': '', 'effort': 1.0} - self.post_process(2, proc_post | {'new_top_step': 1}) - self.post_process(3, proc_post | {'new_top_step': 2}) - self.post_process(4, proc_post | {'new_top_step': 3}) - proc1_dict = self.proc_as_dict(**self._proc1_form_data) - proc2_dict = self.proc_as_dict(2, explicit_steps=[1]) - proc3_dict = self.proc_as_dict(3, explicit_steps=[2]) - proc4_dict = self.proc_as_dict(4, explicit_steps=[3]) - procs = [proc1_dict, proc2_dict, proc3_dict, proc4_dict] - procsteps = [self.procstep_as_dict(1, 2, 1), - self.procstep_as_dict(2, 3, 2), - self.procstep_as_dict(3, 4, 3)] + exp = ExpectedGetTodo(1) + self.post_exp_process([exp], {}, 1) + for i in range(1, 4): + self.post_exp_process([exp], {'new_top_step': i}, i+1) + exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 2, 1), + exp.procstep_as_dict(2, 3, 2), + exp.procstep_as_dict(3, 4, 3)]) + step3_proc1 = exp.step_as_dict(3, [], 1, None, False) + step2_proc2 = exp.step_as_dict(2, [step3_proc1], 2, None, False) + step1_proc3 = exp.step_as_dict(1, [step2_proc2], 3, None, True) + exp.set('steps_todo_to_process', [step1_proc3]) # post (childless) Todo of chain end, then make_full on next in line - self._make_todo_via_day_post(4) - todo1_dict = self.todo_as_dict(1, 4, children=[2]) - todo2_dict = self.todo_as_dict(2, 3, children=[3], parents=[1]) - todo3_dict = self.todo_as_dict(3, 2, parents=[2], children=[4]) - todo4_dict = self.todo_as_dict(4, 1, parents=[3]) - todos = [todo1_dict, todo2_dict, todo3_dict, todo4_dict] - expected = self.GET_todo_dict(1, todos, procs, procsteps) - step_proc1 = self._step_as_dict(3, [], 1, 4, True) - step_proc2 = self._step_as_dict(2, [step_proc1], 2, 3, True) - step_proc3 = self._step_as_dict(1, [step_proc2], 3, 2, True) - expected['steps_todo_to_process'] = [step_proc3] + self.post_exp_day([exp], {'new_todo': [4]}) self.check_post({'step_filler': 'make_full_3'}, '/todo?id=1') - self.check_json_get('/todo?id=1', expected) + exp.set_todo_from_post(4, {'process_id': 1}) + exp.set_todo_from_post(3, {'process_id': 2, 'children': [4]}) + exp.set_todo_from_post(2, {'process_id': 3, 'children': [3]}) + exp.set_todo_from_post(1, {'process_id': 4, 'children': [2]}) + step3_proc1 = exp.step_as_dict(3, [], 1, 4, True) + step2_proc2 = exp.step_as_dict(2, [step3_proc1], 2, 3, True) + step1_proc3 = exp.step_as_dict(1, [step2_proc2], 3, 2, True) + exp.set('steps_todo_to_process', [step1_proc3]) + self.check_json_get('/todo?id=1', exp) # make new chain next to expected, find steps_todo_to_process extended, # expect existing Todo demanded by new chain be adopted into new chain self.check_post({'make_full': 2, 'adopt': [2]}, '/todo?id=1') - todo5_dict = self.todo_as_dict(5, 2, parents=[1], children=[4]) - todo1_dict['children'] = [2, 5] - todo4_dict['parents'] = [3, 5] - todos += [todo5_dict] - step2_proc1 = self._step_as_dict(5, [], None, 4) - step2_proc2 = self._step_as_dict(4, [step2_proc1], None, 5) - expected = self.GET_todo_dict(1, todos, procs, procsteps) - expected['steps_todo_to_process'] = [step_proc3, step2_proc2] - self.check_json_get('/todo?id=1', expected) + exp.set_todo_from_post(5, {'process_id': 2, 'children': [4]}) + exp.set_todo_from_post(1, {'process_id': 4, 'children': [2, 5]}) + step5_todo4 = exp.step_as_dict(5, [], None, 4) + step4_todo5 = exp.step_as_dict(4, [step5_todo4], None, 5) + exp.set('steps_todo_to_process', [step1_proc3, step4_todo5]) + self.check_json_get('/todo?id=1', exp) # fail on trying to call make_full on non-existing Process self.check_post({'make_full': 5}, '/todo?id=1', 404) def test_POST_todo_make_empty(self) -> None: """Test creation and adoption via POST /todo with "make_empty".""" - # pylint: disable=too-many-locals # create chain of Processes - proc_post = {'title': 'A', 'description': '', 'effort': 1.0} - self.post_process(2, proc_post | {'new_top_step': 1}) - self.post_process(3, proc_post | {'new_top_step': 2}) - self.post_process(4, proc_post | {'new_top_step': 3}) - proc1_dict = self.proc_as_dict(**self._proc1_form_data) - proc2_dict = self.proc_as_dict(2, explicit_steps=[1]) - proc3_dict = self.proc_as_dict(3, explicit_steps=[2]) - proc4_dict = self.proc_as_dict(4, explicit_steps=[3]) - procs = [proc1_dict, proc2_dict, proc3_dict, proc4_dict] - procsteps = [self.procstep_as_dict(1, 2, 1), - self.procstep_as_dict(2, 3, 2), - self.procstep_as_dict(3, 4, 3)] + exp = ExpectedGetTodo(1) + self.post_exp_process([exp], {}, 1) + for i in range(1, 4): + self.post_exp_process([exp], {'new_top_step': i}, i+1) + exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 2, 1), + exp.procstep_as_dict(2, 3, 2), + exp.procstep_as_dict(3, 4, 3)]) # post (childless) Todo of chain end, then make empty on next in line - self._make_todo_via_day_post(4) - todo1_dict = self.todo_as_dict(1, 4, children=[2]) - todo2_dict = self.todo_as_dict(2, 3, parents=[1]) - todos = [todo1_dict, todo2_dict] - expected = self.GET_todo_dict(1, todos, procs, procsteps) - step_proc1 = self._step_as_dict(3, [], 1, None) - step_proc2 = self._step_as_dict(2, [step_proc1], 2, None, True) - step_proc3 = self._step_as_dict(1, [step_proc2], 3, 2, True) - expected['steps_todo_to_process'] = [step_proc3] - expected['adoption_candidates_for'] = {'1': [], '2': []} + self.post_exp_day([exp], {'new_todo': [4]}) + step3_proc1 = exp.step_as_dict(3, [], 1) + step2_proc2 = exp.step_as_dict(2, [step3_proc1], 2) + step1_proc3 = exp.step_as_dict(1, [step2_proc2], 3, None, True) + exp.set('steps_todo_to_process', [step1_proc3]) + self.check_json_get('/todo?id=1', exp) self.check_post({'step_filler': 'make_empty_3'}, '/todo?id=1') - self.check_json_get('/todo?id=1', expected) + exp.set_todo_from_post(2, {'process_id': 3}) + exp.set_todo_from_post(1, {'process_id': 4, 'children': [2]}) + step2_proc2 = exp.step_as_dict(2, [step3_proc1], 2, None, True) + step1_proc3 = exp.step_as_dict(1, [step2_proc2], 3, 2, True) + exp.set('steps_todo_to_process', [step1_proc3]) + self.check_json_get('/todo?id=1', exp) # make new top-level Todo without chain implied by its Process self.check_post({'make_empty': 2, 'adopt': [2]}, '/todo?id=1') - todo3_dict = self.todo_as_dict(3, 2, parents=[1], children=[]) - todo1_dict['children'] = [2, 3] - todos += [todo3_dict] - step2_proc2 = self._step_as_dict(4, [], None, 3) - expected = self.GET_todo_dict(1, todos, procs, procsteps) - expected['steps_todo_to_process'] = [step_proc3, step2_proc2] - expected['adoption_candidates_for'] = {'1': [], '2': [3]} - self.check_json_get('/todo?id=1', expected) + exp.set_todo_from_post(3, {'process_id': 2}) + exp.set_todo_from_post(1, {'process_id': 4, 'children': [2, 3]}) + step4_todo3 = exp.step_as_dict(4, [], None, 3) + exp.set('steps_todo_to_process', [step1_proc3, step4_todo3]) + self.check_json_get('/todo?id=1', exp) # fail on trying to call make_empty on non-existing Process self.check_post({'make_full': 5}, '/todo?id=1', 404) - def test_do_GET_todo(self) -> None: + def test_GET_todo(self) -> None: """Test GET /todo response codes.""" - self._make_todo_via_day_post(1) # test malformed or illegal parameter values self.check_get('/todo', 404) self.check_get('/todo?id=', 404) @@ -545,53 +503,40 @@ class TestsWithServer(TestCaseWithServer): self.check_get('/todo?id=0', 404) self.check_get('/todo?id=2', 404) # test all existing Processes are shown as available - proc_post = {'title': 'A', 'description': '', 'effort': 1.0} - self.post_process(2, proc_post) - todo1_dict = self.todo_as_dict(1, process_id=1) - proc1_dict = self.proc_as_dict(1, **self._proc1_form_data) - proc2_dict = self.proc_as_dict(2) - procs = [proc1_dict, proc2_dict] - expected = self.GET_todo_dict(1, [todo1_dict], procs) - self.check_json_get('/todo?id=1', expected) + exp = ExpectedGetTodo(1) + self.post_exp_process([exp], {}, 1) + self.post_exp_day([exp], {'new_todo': [1]}) + self.post_exp_process([exp], {}, 2) + self.check_json_get('/todo?id=1', exp) # test chain of Processes shown as potential step nodes - self.post_process(2, proc_post) - self.post_process(3, proc_post) - self.post_process(4, proc_post) - self.post_process(1, self._proc1_form_data | {'new_top_step': 2}) - self.post_process(2, proc_post | {'new_top_step': 3, 'step_of': [1]}) - self.post_process(3, proc_post | {'new_top_step': 4, 'step_of': [2]}) - proc1_dict['explicit_steps'] = [1] - proc2_dict['explicit_steps'] = [2] - proc3_dict = self.proc_as_dict(3, explicit_steps=[3]) - proc4_dict = self.proc_as_dict(4) - procs += [proc3_dict, proc4_dict] - procsteps = [self.procstep_as_dict(1, 1, 2, None), - self.procstep_as_dict(2, 2, 3, None), - self.procstep_as_dict(3, 3, 4, None)] - expected = self.GET_todo_dict(1, [todo1_dict], procs, procsteps) - step_proc4 = self._step_as_dict(3, [], 4) - step_proc3 = self._step_as_dict(2, [step_proc4], 3) - step_proc2 = self._step_as_dict(1, [step_proc3], 2, fillable=True) - expected['steps_todo_to_process'] = [step_proc2] - expected['adoption_candidates_for'] = {'2': [], '3': [], '4': []} - self.check_json_get('/todo?id=1', expected) + self.post_exp_process([exp], {}, 3) + self.post_exp_process([exp], {}, 4) + self.post_exp_process([exp], {'new_top_step': 2}, 1) + self.post_exp_process([exp], {'new_top_step': 3, 'step_of': [1]}, 2) + self.post_exp_process([exp], {'new_top_step': 4, 'step_of': [2]}, 3) + exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 2, None), + exp.procstep_as_dict(2, 2, 3, None), + exp.procstep_as_dict(3, 3, 4, None)]) + step3_proc4 = exp.step_as_dict(3, [], 4) + step2_proc3 = exp.step_as_dict(2, [step3_proc4], 3) + step1_proc2 = exp.step_as_dict(1, [step2_proc3], 2, fillable=True) + exp.set('steps_todo_to_process', [step1_proc2]) + self.check_json_get('/todo?id=1', exp) # test display of parallel chains proc_steps_post = {'new_top_step': 4, 'keep_step': [1], 'step_1_process_id': 2, 'steps': [1, 4]} - self.post_process(1, self._proc1_form_data | proc_steps_post) - proc1_dict['explicit_steps'] = [1, 4] - step2_proc4 = self._step_as_dict(4, [], 4, fillable=True) - procsteps += [self.procstep_as_dict(4, 1, 4, None)] - expected = self.GET_todo_dict(1, [todo1_dict], procs, procsteps) - expected['steps_todo_to_process'] = [step_proc2, step2_proc4] - expected['adoption_candidates_for'] = {'2': [], '3': [], '4': []} - self.check_json_get('/todo?id=1', expected) - - def test_do_POST_doneness_relations(self) -> None: + self.post_exp_process([], proc_steps_post, 1) + step4_proc4 = exp.step_as_dict(4, [], 4, fillable=True) + exp.lib_set('ProcessStep', [exp.procstep_as_dict(4, 1, 4, None)]) + exp.set('steps_todo_to_process', [step1_proc2, step4_proc4]) + self.check_json_get('/todo?id=1', exp) + + def test_POST_todo_doneness_relations(self) -> None: """Test Todo.is_done Condition, adoption relations for /todo POSTs.""" + self.post_exp_process([], {}, 1) # test Todo with adoptee can only be set done if adoptee is done too - self._make_todo_via_day_post(1) - self._make_todo_via_day_post(1) + self.post_exp_day([], {'new_todo': [1]}) + self.post_exp_day([], {'new_todo': [1]}) self.check_post({'adopt': 2, 'done': ''}, '/todo?id=1', 400) self.check_post({'done': ''}, '/todo?id=2') self.check_post({'adopt': 2, 'done': ''}, '/todo?id=1', 302) @@ -622,84 +567,3 @@ class TestsWithServer(TestCaseWithServer): self.check_post({'disables': [1]}, '/todo?id=1') self.check_post({'disables': [1], 'done': ''}, '/todo?id=1') self.check_post({'blockers': [1]}, '/todo?id=2') - - def test_do_POST_day_todo_adoption(self) -> None: - """Test Todos posted to Day view may adopt existing Todos.""" - form_data = self.post_process( - 2, self._proc1_form_data | {'new_top_step': 1}) - form_data = {'day_comment': '', 'new_todo': 1, 'make_type': 'full'} - self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302) - form_data['new_todo'] = 2 - self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302) - todo1 = Todo.by_date(self.db_conn, '2024-01-01')[0] - todo2 = Todo.by_date(self.db_conn, '2024-01-01')[1] - self.assertEqual(todo1.children, []) - self.assertEqual(todo1.parents, [todo2]) - self.assertEqual(todo2.children, [todo1]) - self.assertEqual(todo2.parents, []) - - def test_do_POST_day_todo_multiple_inner_adoption(self) -> None: - """Test multiple Todos can be posted to Day view w. inner adoption.""" - - def key_order_func(t: Todo) -> int: - assert isinstance(t.process.id_, int) - return t.process.id_ - - def check_adoption(date: str, new_todos: list[int]) -> None: - form_data = {'day_comment': '', 'new_todo': new_todos, - 'make_type': 'full'} - self.check_post(form_data, f'/day?date={date}&make_type=full', 302) - day_todos = Todo.by_date(self.db_conn, date) - day_todos.sort(key=key_order_func) - todo1 = day_todos[0] - todo2 = day_todos[1] - self.assertEqual(todo1.children, []) - self.assertEqual(todo1.parents, [todo2]) - self.assertEqual(todo2.children, [todo1]) - self.assertEqual(todo2.parents, []) - - def check_nesting_adoption(process_id: int, date: str, - new_top_steps: list[int]) -> None: - form_data = {'title': '', 'description': '', 'effort': 1, - 'step_of': [2]} - form_data = self.post_process(1, form_data) - form_data['new_top_step'] = new_top_steps - form_data['step_of'] = [] - form_data = self.post_process(process_id, form_data) - form_data = {'day_comment': '', 'new_todo': [process_id], - 'make_type': 'full'} - self.check_post(form_data, f'/day?date={date}&make_type=full', 302) - day_todos = Todo.by_date(self.db_conn, date) - day_todos.sort(key=key_order_func, reverse=True) - self.assertEqual(len(day_todos), 3) - todo1 = day_todos[0] # process of process_id - todo2 = day_todos[1] # process 2 - todo3 = day_todos[2] # process 1 - self.assertEqual(sorted(todo1.children), sorted([todo2, todo3])) - self.assertEqual(todo1.parents, []) - self.assertEqual(todo2.children, [todo3]) - self.assertEqual(todo2.parents, [todo1]) - self.assertEqual(todo3.children, []) - self.assertEqual(sorted(todo3.parents), sorted([todo2, todo1])) - - self.post_process(2, self._proc1_form_data | {'new_top_step': 1}) - check_adoption('2024-01-01', [1, 2]) - check_adoption('2024-01-02', [2, 1]) - check_nesting_adoption(3, '2024-01-03', [1, 2]) - check_nesting_adoption(4, '2024-01-04', [2, 1]) - - def test_do_POST_day_todo_doneness(self) -> None: - """Test Todo doneness can be posted to Day view.""" - form_data = {'day_comment': '', 'new_todo': [1], 'make_type': 'full'} - self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302) - todo = Todo.by_date(self.db_conn, '2024-01-01')[0] - form_data = {'day_comment': '', 'todo_id': [1], 'make_type': 'full', - 'comment': [''], 'done': [], 'effort': ['']} - self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302) - todo = Todo.by_date(self.db_conn, '2024-01-01')[0] - self.assertEqual(todo.is_done, False) - form_data = {'day_comment': '', 'todo_id': [1], 'done': [1], - 'make_type': 'full', 'comment': [''], 'effort': ['']} - self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302) - todo = Todo.by_date(self.db_conn, '2024-01-01')[0] - self.assertEqual(todo.is_done, True) diff --git a/tests/utils.py b/tests/utils.py index c428f4c..f256345 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -6,10 +6,11 @@ from threading import Thread from http.client import HTTPConnection from datetime import datetime, timedelta from time import sleep -from json import loads as json_loads +from json import loads as json_loads, dumps as json_dumps from urllib.parse import urlencode from uuid import uuid4 from os import remove as remove_file +from pprint import pprint from plomtask.db import DatabaseFile, DatabaseConnection from plomtask.http import TaskHandler, TaskServer from plomtask.processes import Process, ProcessStep @@ -499,93 +500,217 @@ class TestCaseWithDB(TestCaseAugmented): self.check_identity_with_cache_and_db([]) -class TestCaseWithServer(TestCaseWithDB): - """Module tests against our HTTP server/handler (and database).""" - - def setUp(self) -> None: - super().setUp() - self.httpd = TaskServer(self.db_file, ('localhost', 0), TaskHandler) - self.server_thread = Thread(target=self.httpd.serve_forever) - self.server_thread.daemon = True - self.server_thread.start() - self.conn = HTTPConnection(str(self.httpd.server_address[0]), - self.httpd.server_address[1]) - self.httpd.render_mode = 'json' +class Expected: + """Builder of (JSON-like) dict to compare against responses of test server. + + Collects all items and relations we expect expressed in the server's JSON + responses and puts them into the proper json.dumps-friendly dict structure, + accessibla via .as_dict, to compare them in TestsWithServer.check_json_get. + + On its own provides for .as_dict output only {"_library": …}, initialized + from .__init__ and to be directly manipulated via the .lib* methods. + Further structures of the expected response may be added and kept + up-to-date by subclassing .__init__, .recalc, and .d. + + NB: Lots of expectations towards server behavior will be made explicit here + (or in the subclasses) rather than in the actual TestCase methods' code. + """ + _default_dict: dict[str, Any] + _forced: dict[str, Any] + _fields: dict[str, Any] + _on_empty_make_temp: tuple[str, str] + + def __init__(self, + todos: list[dict[str, Any]] | None = None, + procs: list[dict[str, Any]] | None = None, + procsteps: list[dict[str, Any]] | None = None, + conds: list[dict[str, Any]] | None = None, + days: list[dict[str, Any]] | None = None + ) -> None: + # pylint: disable=too-many-arguments + for name in ['_default_dict', '_fields', '_forced']: + if not hasattr(self, name): + setattr(self, name, {}) + self._lib = {} + for title, items in [('Todo', todos), + ('Process', procs), + ('ProcessStep', procsteps), + ('Condition', conds), + ('Day', days)]: + if items: + self._lib[title] = self._as_refs(items) + for k, v in self._default_dict.items(): + if k not in self._fields: + self._fields[k] = v + + def recalc(self) -> None: + """Update internal dictionary by subclass-specific rules.""" + todos = self.lib_all('Todo') + for todo in todos: + todo['parents'] = [] + for todo in todos: + for child_id in todo['children']: + self.lib_get('Todo', child_id)['parents'] += [todo['id']] + todo['children'].sort() + procsteps = self.lib_all('ProcessStep') + procs = self.lib_all('Process') + for proc in procs: + proc['explicit_steps'] = [s['id'] for s in procsteps + if s['owner_id'] == proc['id']] + + @property + def as_dict(self) -> dict[str, Any]: + """Return dict to compare against test server JSON responses.""" + make_temp = False + if hasattr(self, '_on_empty_make_temp'): + category, dicter = getattr(self, '_on_empty_make_temp') + id_ = self._fields[category.lower()] + make_temp = not bool(self.lib_get(category, id_)) + if make_temp: + f = getattr(self, dicter) + self.lib_set(category, [f(id_)]) + self.recalc() + d = {'_library': self._lib} + for k, v in self._fields.items(): + # we expect everything sortable to be sorted + if isinstance(v, list) and k not in self._forced: + # NB: if we don't test for v being list, sorted() on an empty + # dict may return an empty list + try: + v = sorted(v) + except TypeError: + pass + d[k] = v + for k, v in self._forced.items(): + d[k] = v + if make_temp: + json = json_dumps(d) + self.lib_del(category, id_) + d = json_loads(json) + return d - def tearDown(self) -> None: - self.httpd.shutdown() - self.httpd.server_close() - self.server_thread.join() - super().tearDown() + def lib_get(self, category: str, id_: str | int) -> dict[str, Any]: + """From library, return item of category and id_, or empty dict.""" + str_id = str(id_) + if category in self._lib and str_id in self._lib[category]: + return self._lib[category][str_id] + return {} + + def lib_all(self, category: str) -> list[dict[str, Any]]: + """From library, return items of category, or [] if none.""" + if category in self._lib: + return list(self._lib[category].values()) + return [] + + def lib_set(self, category: str, items: list[dict[str, object]]) -> None: + """Update library for category with items.""" + if category not in self._lib: + self._lib[category] = {} + for k, v in self._as_refs(items).items(): + self._lib[category][k] = v + + def lib_del(self, category: str, id_: str | int) -> None: + """Remove category element of id_ from library.""" + del self._lib[category][str(id_)] + if 0 == len(self._lib[category]): + del self._lib[category] + + def lib_wipe(self, category: str) -> None: + """Remove category from library.""" + if category in self._lib: + del self._lib[category] + + def set(self, field_name: str, value: object) -> None: + """Set top-level .as_dict field.""" + self._fields[field_name] = value + + def force(self, field_name: str, value: object) -> None: + """Set ._forced field to ensure value in .as_dict.""" + self._forced[field_name] = value @staticmethod - def as_id_list(items: list[dict[str, object]]) -> list[int | str]: - """Return list of only 'id' fields of items.""" - # NB: To tighten the mypy test, consider to, instead of returning - # list[str | int], returnlist[int] | list[str]. But since so far to me - # the only way to make that work seems to be to repaclement of the - # currently active last line with complexity of the out-commented code - # block beneath, I currently opt for the status quo. - id_list = [] - for item in items: - assert isinstance(item['id'], (int, str)) - id_list += [item['id']] - return id_list - # if id_list: - # if isinstance(id_list[0], int): - # for id_ in id_list: - # assert isinstance(id_, int) - # l_int: list[int] = [id_ for id_ in id_list - # if isinstance(id_, int)] - # return l_int - # for id_ in id_list: - # assert isinstance(id_, str) - # l_str: list[str] = [id_ for id_ in id_list - # if isinstance(id_, str)] - # return l_str - # return [] - - @staticmethod - def as_refs(items: list[dict[str, object]] - ) -> dict[str, dict[str, object]]: + def _as_refs(items: list[dict[str, object]] + ) -> dict[str, dict[str, object]]: """Return dictionary of items by their 'id' fields.""" refs = {} for item in items: refs[str(item['id'])] = item return refs + @staticmethod + def as_ids(items: list[dict[str, Any]]) -> list[int] | list[str]: + """Return list of only 'id' fields of items.""" + return [item['id'] for item in items] + + @staticmethod + def day_as_dict(date: str, comment: str = '') -> dict[str, object]: + """Return JSON of Day to expect.""" + return {'id': date, 'comment': comment, 'todos': []} + + def set_day_from_post(self, date: str, d: dict[str, Any]) -> None: + """Set Day of date in library based on POST dict d.""" + day = self.day_as_dict(date) + for k, v in d.items(): + if 'day_comment' == k: + day['comment'] = v + elif 'new_todo' == k: + next_id = 1 + for todo in self.lib_all('Todo'): + if next_id <= todo['id']: + next_id = todo['id'] + 1 + for proc_id in sorted(v): + todo = self.todo_as_dict(next_id, proc_id, date) + self.lib_set('Todo', [todo]) + next_id += 1 + elif 'done' == k: + for todo_id in v: + self.lib_get('Todo', todo_id)['is_done'] = True + elif 'todo_id' == k: + for i, todo_id in enumerate(v): + t = self.lib_get('Todo', todo_id) + if 'comment' in d: + t['comment'] = d['comment'][i] + if 'effort' in d: + effort = d['effort'][i] if d['effort'][i] else None + t['effort'] = effort + self.lib_set('Day', [day]) + @staticmethod def cond_as_dict(id_: int = 1, is_active: bool = False, - titles: None | list[str] = None, - descriptions: None | list[str] = None + title: None | str = None, + description: None | str = None, ) -> dict[str, object]: """Return JSON of Condition to expect.""" - d = {'id': id_, - 'is_active': is_active, - '_versioned': { - 'title': {}, - 'description': {}}} - titles = titles if titles else [] - descriptions = descriptions if descriptions else [] - assert isinstance(d['_versioned'], dict) - for i, title in enumerate(titles): - d['_versioned']['title'][i] = title - for i, description in enumerate(descriptions): - d['_versioned']['description'][i] = description - return d - - @staticmethod - def procstep_as_dict(id_: int, - owner_id: int, - step_process_id: int, - parent_step_id: int | None = None - ) -> dict[str, object]: - """Return JSON of Process to expect.""" - return {'id': id_, - 'owner_id': owner_id, - 'step_process_id': step_process_id, - 'parent_step_id': parent_step_id} + versioned: dict[str, dict[str, object]] + versioned = {'title': {}, 'description': {}} + if title is not None: + versioned['title']['0'] = title + if description is not None: + versioned['description']['0'] = description + return {'id': id_, 'is_active': is_active, '_versioned': versioned} + + def set_cond_from_post(self, id_: int, d: dict[str, Any]) -> None: + """Set Condition of id_ in library based on POST dict d.""" + if d == {'delete': ''}: + self.lib_del('Condition', id_) + return + cond = self.lib_get('Condition', id_) + if cond: + cond['is_active'] = d['is_active'] + for category in ['title', 'description']: + if category in cond['_versioned']: + history = cond['_versioned'][category] + if len(history) > 0: + last_i = sorted([int(k) for k in history.keys()])[-1] + if d[category] != history[str(last_i)]: + history[str(last_i + 1)] = d[category] + continue + cond['_versioned'][category]['0'] = d[category] + else: + cond = self.cond_as_dict( + id_, d['is_active'], d['title'], d['description']) + self.lib_set('Condition', [cond]) @staticmethod def todo_as_dict(id_: int = 1, @@ -619,6 +744,36 @@ class TestCaseWithServer(TestCaseWithDB): 'enables': enables if enables else []} return d + def set_todo_from_post(self, id_: int, d: dict[str, Any]) -> None: + """Set Todo of id_ in library based on POST dict d.""" + corrected_kwargs: dict[str, Any] = {} + for k, v in d.items(): + if k in {'adopt', 'step_filler'}: + if 'children' not in corrected_kwargs: + corrected_kwargs['children'] = [] + new_children = v if isinstance(v, list) else [v] + corrected_kwargs['children'] += new_children + continue + if 'done' == k: + k = 'is_done' + if k in {'is_done', 'calendarize'}: + v = True + corrected_kwargs[k] = v + todo = self.todo_as_dict(id_, **corrected_kwargs) + self.lib_set('Todo', [todo]) + + @staticmethod + def procstep_as_dict(id_: int, + owner_id: int, + step_process_id: int, + parent_step_id: int | None = None + ) -> dict[str, object]: + """Return JSON of ProcessStep to expect.""" + return {'id': id_, + 'owner_id': owner_id, + 'step_process_id': step_process_id, + 'parent_step_id': parent_step_id} + @staticmethod def proc_as_dict(id_: int = 1, title: str = 'A', @@ -637,15 +792,109 @@ class TestCaseWithServer(TestCaseWithDB): 'suppressed_steps': [], 'explicit_steps': explicit_steps if explicit_steps else [], '_versioned': { - 'title': {0: title}, - 'description': {0: description}, - 'effort': {0: effort}}, + 'title': {'0': title}, + 'description': {'0': description}, + 'effort': {'0': effort}}, 'conditions': conditions if conditions else [], 'disables': disables if disables else [], 'enables': enables if enables else [], 'blockers': blockers if blockers else []} return d + def set_proc_from_post(self, id_: int, d: dict[str, Any]) -> None: + """Set Process of id_ in library based on POST dict d.""" + proc = self.lib_get('Process', id_) + if proc: + for k in ['title', 'description', 'effort']: + last_i = sorted(proc['_versioned'][k].keys())[-1] + if d[k] != proc['_versioned'][k][last_i]: + proc['_versioned'][k][last_i + 1] = d[k] + else: + proc = self.proc_as_dict(id_, + d['title'], d['description'], d['effort']) + ignore = {'title', 'description', 'effort', 'new_top_step', 'step_of', + 'keep_step', 'steps'} + for k, v in d.items(): + if k in ignore or k.startswith('step_'): + continue + if k in {'calendarize'}: + v = True + elif k in {'suppressed_steps', 'explicit_steps', 'conditions', + 'disables', 'enables', 'blockers'}: + if not isinstance(v, list): + v = [v] + proc[k] = v + self.lib_set('Process', [proc]) + + +class TestCaseWithServer(TestCaseWithDB): + """Module tests against our HTTP server/handler (and database).""" + + def setUp(self) -> None: + super().setUp() + self.httpd = TaskServer(self.db_file, ('localhost', 0), TaskHandler) + self.server_thread = Thread(target=self.httpd.serve_forever) + self.server_thread.daemon = True + self.server_thread.start() + self.conn = HTTPConnection(str(self.httpd.server_address[0]), + self.httpd.server_address[1]) + self.httpd.render_mode = 'json' + + def tearDown(self) -> None: + self.httpd.shutdown() + self.httpd.server_close() + self.server_thread.join() + super().tearDown() + + def post_exp_cond(self, + exps: list[Expected], + id_: int, + payload: dict[str, object], + path_suffix: str = '', + redir_suffix: str = '' + ) -> None: + """POST /condition(s), appropriately update Expecteds.""" + # pylint: disable=too-many-arguments + path = f'/condition{path_suffix}' + redir = f'/condition{redir_suffix}' + self.check_post(payload, path, redir=redir) + for exp in exps: + exp.set_cond_from_post(id_, payload) + + def post_exp_day(self, + exps: list[Expected], + payload: dict[str, Any], + date: str = '2024-01-01' + ) -> None: + """POST /day, appropriately update Expecteds.""" + if 'make_type' not in payload: + payload['make_type'] = 'empty' + if 'day_comment' not in payload: + payload['day_comment'] = '' + target = f'/day?date={date}' + redir_to = f'{target}&make_type={payload["make_type"]}' + self.check_post(payload, target, 302, redir_to) + for exp in exps: + exp.set_day_from_post(date, payload) + + def post_exp_process(self, + exps: list[Expected], + payload: dict[str, Any], + id_: int, + ) -> dict[str, object]: + """POST /process, appropriately update Expecteds.""" + if 'title' not in payload: + payload['title'] = 'foo' + if 'description' not in payload: + payload['description'] = 'foo' + if 'effort' not in payload: + payload['effort'] = 1.1 + self.check_post(payload, f'/process?id={id_}', + redir=f'/process?id={id_}') + for exp in exps: + exp.set_proc_from_post(id_, payload) + return payload + def check_redirect(self, target: str) -> None: """Check that self.conn answers with a 302 redirect to target.""" response = self.conn.getresponse() @@ -679,17 +928,7 @@ class TestCaseWithServer(TestCaseWithDB): self.check_get(f'/{path}?id=0', 500) self.check_get(f'{path}?id=1', 200) - def post_process(self, id_: int = 1, - form_data: dict[str, Any] | None = None - ) -> dict[str, Any]: - """POST basic Process.""" - if not form_data: - form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1} - self.check_post(form_data, f'/process?id={id_}', - redir=f'/process?id={id_}') - return form_data - - def check_json_get(self, path: str, expected: dict[str, object]) -> None: + def check_json_get(self, path: str, expected: Expected) -> None: """Compare JSON on GET path with expected. To simplify comparison of VersionedAttribute histories, transforms @@ -700,24 +939,61 @@ class TestCaseWithServer(TestCaseWithDB): def rewrite_history_keys_in(item: Any) -> Any: if isinstance(item, dict): if '_versioned' in item.keys(): - for k in item['_versioned']: - vals = item['_versioned'][k].values() + for category in item['_versioned']: + vals = item['_versioned'][category].values() history = {} for i, val in enumerate(vals): - history[i] = val - item['_versioned'][k] = history - for k in list(item.keys()): - rewrite_history_keys_in(item[k]) + history[str(i)] = val + item['_versioned'][category] = history + for category in list(item.keys()): + rewrite_history_keys_in(item[category]) elif isinstance(item, list): item[:] = [rewrite_history_keys_in(i) for i in item] return item + def walk_diffs(path: str, cmp1: object, cmp2: object) -> None: + # pylint: disable=too-many-branches + def warn(intro: str, val: object) -> None: + if isinstance(val, (str, int, float)): + print(intro, val) + else: + print(intro) + pprint(val) + if cmp1 != cmp2: + if isinstance(cmp1, dict) and isinstance(cmp2, dict): + for k, v in cmp1.items(): + if k not in cmp2: + warn(f'DIFF {path}: retrieved lacks {k}', v) + elif v != cmp2[k]: + walk_diffs(f'{path}:{k}', v, cmp2[k]) + for k in [k for k in cmp2.keys() if k not in cmp1]: + warn(f'DIFF {path}: expected lacks retrieved\'s {k}', + cmp2[k]) + elif isinstance(cmp1, list) and isinstance(cmp2, list): + for i, v1 in enumerate(cmp1): + if i >= len(cmp2): + warn(f'DIFF {path}[{i}] retrieved misses:', v1) + elif v1 != cmp2[i]: + walk_diffs(f'{path}[{i}]', v1, cmp2[i]) + if len(cmp2) > len(cmp1): + for i, v2 in enumerate(cmp2[len(cmp1):]): + warn(f'DIFF {path}[{len(cmp1)+i}] misses:', v2) + else: + warn(f'DIFF {path} – for expected:', cmp1) + warn('… and for retrieved:', cmp2) + self.conn.request('GET', path) response = self.conn.getresponse() self.assertEqual(response.status, 200) retrieved = json_loads(response.read().decode()) rewrite_history_keys_in(retrieved) - # import pprint - # pprint.pprint(expected) - # pprint.pprint(retrieved) - self.assertEqual(expected, retrieved) + cmp = expected.as_dict + try: + self.assertEqual(cmp, retrieved) + except AssertionError as e: + print('EXPECTED:') + pprint(cmp) + print('RETRIEVED:') + pprint(retrieved) + walk_diffs('', cmp, retrieved) + raise e