"""Test Conditions module."""
-from tests.utils import TestCaseWithDB, TestCaseWithServer, TestCaseSansDB
+from typing import Any
+from tests.utils import (TestCaseSansDB, TestCaseWithDB, TestCaseWithServer,
+ Expected)
from plomtask.conditions import Condition
from plomtask.processes import Process
from plomtask.todos import Todo
for depender in (proc, todo):
c = Condition(None)
c.save(self.db_conn)
+ assert isinstance(c.id_, int)
depender.set_condition_relations(self.db_conn, [c.id_], [], [], [])
depender.save(self.db_conn)
with self.assertRaises(HandledException):
c.remove(self.db_conn)
+class ExpectedGetConditions(Expected):
+ """Builder of expectations for GET /conditions."""
+ _default_dict = {'sort_by': 'title', 'pattern': ''}
+
+ def recalc(self) -> None:
+ """Update internal dictionary by subclass-specific rules."""
+ super().recalc()
+ self._fields['conditions'] = self.as_ids(self.lib_all('Condition'))
+
+
+class ExpectedGetCondition(Expected):
+ """Builder of expectations for GET /condition."""
+ _on_empty_make_temp = ('Condition', 'cond_as_dict')
+
+ def __init__(self, id_: int, *args: Any, **kwargs: Any) -> None:
+ self._fields = {'condition': id_}
+ super().__init__(*args, **kwargs)
+
+ def recalc(self) -> None:
+ """Update internal dictionary by subclass-specific rules."""
+ super().recalc()
+ for p_field, c_field in [('conditions', 'enabled_processes'),
+ ('disables', 'disabling_processes'),
+ ('blockers', 'disabled_processes'),
+ ('enables', 'enabling_processes')]:
+ self._fields[c_field] = self.as_ids([
+ p for p in self.lib_all('Process')
+ if self._fields['condition'] in p[p_field]])
+ self._fields['is_new'] = False
+
+
class TestsWithServer(TestCaseWithServer):
"""Module tests against our HTTP server/handler (and database)."""
- @classmethod
- def GET_condition_dict(cls, cond: dict[str, object]) -> dict[str, object]:
- """Return JSON of GET /condition to expect."""
- return {'is_new': False,
- 'enabled_processes': [],
- 'disabled_processes': [],
- 'enabling_processes': [],
- 'disabling_processes': [],
- 'condition': cond['id'],
- '_library': {'Condition': cls.as_refs([cond])}}
-
- @classmethod
- def GET_conditions_dict(cls, conds: list[dict[str, object]]
- ) -> dict[str, object]:
- """Return JSON of GET /conditions to expect."""
- library = {'Condition': cls.as_refs(conds)} if conds else {}
- d: dict[str, object] = {'conditions': cls.as_id_list(conds),
- 'sort_by': 'title',
- 'pattern': '',
- '_library': library}
- return d
-
def test_fail_POST_condition(self) -> None:
"""Test malformed/illegal POST /condition requests."""
# check incomplete POST payloads
def test_POST_condition(self) -> None:
"""Test (valid) POST /condition and its effect on GET /condition[s]."""
- # test valid POST's effect on …
+ exp_single = ExpectedGetCondition(1)
+ exp_all = ExpectedGetConditions()
+ all_exps = [exp_single, exp_all]
+ # test valid POST's effect on single /condition and full /conditions
post = {'title': 'foo', 'description': 'oof', 'is_active': False}
- self.check_post(post, '/condition', redir='/condition?id=1')
- # … single /condition
- expected_cond = self.cond_as_dict(titles=['foo'], descriptions=['oof'])
- assert isinstance(expected_cond['_versioned'], dict)
- expected_single = self.GET_condition_dict(expected_cond)
- self.check_json_get('/condition?id=1', expected_single)
- # … full /conditions
- expected_all = self.GET_conditions_dict([expected_cond])
- self.check_json_get('/conditions', expected_all)
+ self.post_exp_cond(all_exps, 1, post, '', '?id=1')
+ self.check_json_get('/condition?id=1', exp_single)
+ self.check_json_get('/conditions', exp_all)
# test (no) effect of invalid POST to existing Condition on /condition
self.check_post({}, '/condition?id=1', 400)
- self.check_json_get('/condition?id=1', expected_single)
+ self.check_json_get('/condition?id=1', exp_single)
# test effect of POST changing title and activeness
post = {'title': 'bar', 'description': 'oof', 'is_active': True}
- self.check_post(post, '/condition?id=1')
- expected_cond['_versioned']['title'][1] = 'bar'
- expected_cond['is_active'] = True
- self.check_json_get('/condition?id=1', expected_single)
- # test deletion POST's effect, both to return id=1 into empty single, …
- self.check_post({'delete': ''}, '/condition?id=1', redir='/conditions')
- expected_cond = self.cond_as_dict()
- assert isinstance(expected_single['_library'], dict)
- expected_single['_library']['Condition'] = self.as_refs(
- [expected_cond])
- self.check_json_get('/condition?id=1', expected_single)
- # … and full /conditions into empty list
- expected_all['conditions'] = []
- expected_all['_library'] = {}
- self.check_json_get('/conditions', expected_all)
+ self.post_exp_cond(all_exps, 1, post, '?id=1', '?id=1')
+ self.check_json_get('/condition?id=1', exp_single)
+ self.check_json_get('/conditions', exp_all)
+ # test deletion POST's effect, both to return id=1 into empty single,
+ # full /conditions into empty list
+ self.post_exp_cond(all_exps, 1, {'delete': ''}, '?id=1', 's')
+ self.check_json_get('/condition?id=1', exp_single)
+ self.check_json_get('/conditions', exp_all)
def test_GET_condition(self) -> None:
"""More GET /condition testing, especially for Process relations."""
# check expected default status codes
self.check_get_defaults('/condition')
# make Condition and two Processes that among them establish all
- # possible ConditionsRelations to it, …
+ # possible ConditionsRelations to it, check /condition displays all
+ exp = ExpectedGetCondition(1)
cond_post = {'title': 'foo', 'description': 'oof', 'is_active': False}
- self.check_post(cond_post, '/condition', redir='/condition?id=1')
- proc1_post = {'title': 'A', 'description': '', 'effort': 1.0,
+ self.post_exp_cond([exp], 1, cond_post, '', '?id=1')
+ proc1_post = {'title': 'A', 'description': '', 'effort': 1.1,
'conditions': [1], 'disables': [1]}
- proc2_post = {'title': 'B', 'description': '', 'effort': 1.0,
+ proc2_post = {'title': 'B', 'description': '', 'effort': 0.9,
'enables': [1], 'blockers': [1]}
- self.post_process(1, proc1_post)
- self.post_process(2, proc2_post)
- # … then check /condition displays all these properly.
- cond_expected = self.cond_as_dict(titles=['foo'], descriptions=['oof'])
- assert isinstance(cond_expected['id'], int)
- proc1 = self.proc_as_dict(conditions=[cond_expected['id']],
- disables=[cond_expected['id']])
- proc2 = self.proc_as_dict(2, 'B',
- blockers=[cond_expected['id']],
- enables=[cond_expected['id']])
- display_expected = self.GET_condition_dict(cond_expected)
- assert isinstance(display_expected['_library'], dict)
- display_expected['enabled_processes'] = self.as_id_list([proc1])
- display_expected['disabled_processes'] = self.as_id_list([proc2])
- display_expected['enabling_processes'] = self.as_id_list([proc2])
- display_expected['disabling_processes'] = self.as_id_list([proc1])
- display_expected['_library']['Process'] = self.as_refs([proc1, proc2])
- self.check_json_get('/condition?id=1', display_expected)
+ self.post_exp_process([exp], proc1_post, 1)
+ self.post_exp_process([exp], proc2_post, 2)
+ self.check_json_get('/condition?id=1', exp)
def test_GET_conditions(self) -> None:
"""Test GET /conditions."""
# test empty result on empty DB, default-settings on empty params
- expected = self.GET_conditions_dict([])
- self.check_json_get('/conditions', expected)
+ exp = ExpectedGetConditions()
+ self.check_json_get('/conditions', exp)
# test ignorance of meaningless non-empty params (incl. unknown key),
# that 'sort_by' default to 'title' (even if set to something else, as
# long as without handler) and 'pattern' get preserved
- expected['pattern'] = 'bar' # preserved despite zero effect!
- expected['sort_by'] = 'title' # for clarity (actually already set)
- url = '/conditions?sort_by=foo&pattern=bar&foo=x'
- self.check_json_get(url, expected)
+ exp.set('pattern', 'bar') # preserved despite zero effect!
+ exp.set('sort_by', 'title') # for clarity (already default)
+ self.check_json_get('/conditions?sort_by=foo&pattern=bar&foo=x', exp)
# test non-empty result, automatic (positive) sorting by title
post_cond1 = {'is_active': False, 'title': 'foo', 'description': 'oof'}
post_cond2 = {'is_active': False, 'title': 'bar', 'description': 'rab'}
post_cond3 = {'is_active': True, 'title': 'baz', 'description': 'zab'}
- self.check_post(post_cond1, '/condition', redir='/condition?id=1')
- self.check_post(post_cond2, '/condition', redir='/condition?id=2')
- self.check_post(post_cond3, '/condition', redir='/condition?id=3')
- cond1 = self.cond_as_dict(1, False, ['foo'], ['oof'])
- cond2 = self.cond_as_dict(2, False, ['bar'], ['rab'])
- cond3 = self.cond_as_dict(3, True, ['baz'], ['zab'])
- expected = self.GET_conditions_dict([cond2, cond3, cond1])
- self.check_json_get('/conditions', expected)
+ for i, post in enumerate([post_cond1, post_cond2, post_cond3]):
+ self.post_exp_cond([exp], i+1, post, '', f'?id={i+1}')
+ exp.set('pattern', '')
+ exp.force('conditions', [2, 3, 1])
+ self.check_json_get('/conditions', exp)
# test other sortings
- expected['sort_by'] = '-title'
- assert isinstance(expected['conditions'], list)
- expected['conditions'].reverse()
- self.check_json_get('/conditions?sort_by=-title', expected)
- expected['sort_by'] = 'is_active'
- expected['conditions'] = self.as_id_list([cond1, cond2, cond3])
- self.check_json_get('/conditions?sort_by=is_active', expected)
- expected['sort_by'] = '-is_active'
- expected['conditions'].reverse()
- self.check_json_get('/conditions?sort_by=-is_active', expected)
+ exp.set('sort_by', '-title')
+ exp.force('conditions', [1, 3, 2])
+ self.check_json_get('/conditions?sort_by=-title', exp)
+ exp.set('sort_by', 'is_active')
+ exp.force('conditions', [1, 2, 3])
+ self.check_json_get('/conditions?sort_by=is_active', exp)
+ exp.set('sort_by', '-is_active')
+ exp.force('conditions', [3, 2, 1])
+ self.check_json_get('/conditions?sort_by=-is_active', exp)
# test pattern matching on title
- expected = self.GET_conditions_dict([cond2, cond3])
- expected['pattern'] = 'ba'
- self.check_json_get('/conditions?pattern=ba', expected)
+ exp.set('sort_by', 'title')
+ exp.set('pattern', 'ba')
+ exp.force('conditions', [2, 3])
+ exp.lib_del('Condition', 1)
+ self.check_json_get('/conditions?pattern=ba', exp)
# test pattern matching on description
- assert isinstance(expected['_library'], dict)
- expected['pattern'] = 'of'
- expected['conditions'] = self.as_id_list([cond1])
- expected['_library']['Condition'] = self.as_refs([cond1])
- self.check_json_get('/conditions?pattern=of', expected)
+ exp.set('pattern', 'of')
+ exp.lib_wipe('Condition')
+ exp.set_cond_from_post(1, post_cond1)
+ exp.force('conditions', [1])
+ self.check_json_get('/conditions?pattern=of', exp)
"""Test Days module."""
from datetime import datetime, timedelta
-from typing import Callable
-from tests.utils import TestCaseSansDB, TestCaseWithDB, TestCaseWithServer
+from typing import Any
+from tests.utils import (TestCaseSansDB, TestCaseWithDB, TestCaseWithServer,
+ Expected)
from plomtask.dating import date_in_n_days as tested_date_in_n_days
from plomtask.days import Day
TESTING_DATE_FORMAT = '%Y-%m-%d'
+def _testing_date_in_n_days(n: int) -> str:
+ """Return in TEST_DATE_FORMAT date from today + n days.
+
+ As with TESTING_DATE_FORMAT, we assume this equal the original's code
+ at plomtask.dating.date_in_n_days, but want to state our expectations
+ explicitly to rule out importing issues from the original.
+ """
+ date = datetime.now() + timedelta(days=n)
+ return date.strftime(TESTING_DATE_FORMAT)
+
+
class TestsSansDB(TestCaseSansDB):
"""Days module tests not requiring DB setup."""
checked_class = Day
self.assertEqual(result, [yesterday, today, tomorrow])
-class TestsWithServer(TestCaseWithServer):
- """Tests against our HTTP server/handler (and database)."""
+class ExpectedGetCalendar(Expected):
+ """Builder of expectations for GET /calendar."""
- @staticmethod
- def _testing_date_in_n_days(n: int) -> str:
- """Return in TEST_DATE_FORMAT date from today + n days.
+ def __init__(self, start: int, end: int, *args: Any, **kwargs: Any
+ ) -> None:
+ self._fields = {'start': _testing_date_in_n_days(start),
+ 'end': _testing_date_in_n_days(end),
+ 'today': _testing_date_in_n_days(0)}
+ self._fields['days'] = [_testing_date_in_n_days(i)
+ for i in range(start, end+1)]
+ super().__init__(*args, **kwargs)
+ for date in self._fields['days']:
+ self.lib_set('Day', [self.day_as_dict(date)])
- As with TESTING_DATE_FORMAT, we assume this equal the original's code
- at plomtask.dating.date_in_n_days, but want to state our expectations
- explicitly to rule out importing issues from the original.
- """
- date = datetime.now() + timedelta(days=n)
- return date.strftime(TESTING_DATE_FORMAT)
-
- @staticmethod
- def _day_as_dict(date: str) -> dict[str, object]:
- return {'id': date, 'comment': '', 'todos': []}
-
- @staticmethod
- def _todo_node_as_dict(todo_id: int) -> dict[str, object]:
- """Return JSON of TodoNode to expect."""
- return {'children': [], 'seen': False, 'todo': todo_id}
-
- @staticmethod
- def _post_args_return_expectation(
- args: list[object],
- names_of_simples: list[str],
- names_of_versioneds: list[str],
- f_as_dict: Callable[..., dict[str, object]],
- f_to_post: Callable[..., None | dict[str, object]]
- ) -> dict[str, object]:
- """Create expected=f_as_dict(*args), post as names_* with f_to_post."""
- expected = f_as_dict(*args)
- assert isinstance(expected['_versioned'], dict)
- to_post = {}
- for name in names_of_simples:
- to_post[name] = expected[name]
- for name in names_of_versioneds:
- to_post[name] = expected['_versioned'][name][0]
- f_to_post(expected['id'], to_post)
- return expected
-
- def _post_day(self, params: str = '',
- form_data: None | dict[str, object] = None,
- redir_to: str = '',
- status: int = 302,
- ) -> None:
- """POST /day?{params} with form_data."""
- if not form_data:
- form_data = {'day_comment': '', 'make_type': ''}
- target = f'/day?{params}'
- if not redir_to:
- redir_to = f'{target}&make_type={form_data["make_type"]}'
- self.check_post(form_data, target, status, redir_to)
-
- @classmethod
- def GET_day_dict(cls, date: str) -> dict[str, object]:
- """Return JSON of GET /day to expect."""
- day = cls._day_as_dict(date)
- d: dict[str, object] = {'day': date,
- 'top_nodes': [],
- 'make_type': '',
- 'enablers_for': {},
- 'disablers_for': {},
- 'conditions_present': [],
- 'processes': [],
- '_library': {'Day': cls.as_refs([day])}}
- return d
-
- @classmethod
- def GET_calendar_dict(cls, start: int, end: int) -> dict[str, object]:
- """Return JSON of GET /calendar to expect.
-
- NB: the date string list to key 'days' implies/expects a continuous (=
- gaps filled) alphabetical order of dates by virtue of range(start,
- end+1) and date_in_n_days.
- """
- today_date = cls._testing_date_in_n_days(0)
- start_date = cls._testing_date_in_n_days(start)
- end_date = cls._testing_date_in_n_days(end)
- dates = [cls._testing_date_in_n_days(i) for i in range(start, end+1)]
- days = [cls._day_as_dict(d) for d in dates]
- library = {'Day': cls.as_refs(days)} if len(days) > 0 else {}
- return {'today': today_date, 'start': start_date, 'end': end_date,
- 'days': dates, '_library': library}
+
+class ExpectedGetDay(Expected):
+ """Builder of expectations for GET /day."""
+ _default_dict = {'make_type': ''}
+ _on_empty_make_temp = ('Day', 'day_as_dict')
+
+ def __init__(self, date: str, *args: Any, **kwargs: Any) -> None:
+ self._fields = {'day': date}
+ super().__init__(*args, **kwargs)
+
+ def recalc(self) -> None:
+ super().recalc()
+ todos = [t for t in self.lib_all('Todo')
+ if t['date'] == self._fields['day']]
+ self.lib_get('Day', self._fields['day'])['todos'] = self.as_ids(todos)
+ self._fields['top_nodes'] = [
+ {'children': [], 'seen': False, 'todo': todo['id']}
+ for todo in todos]
+ for todo in todos:
+ proc = self.lib_get('Process', todo['process_id'])
+ for title in ['conditions', 'enables', 'blockers', 'disables']:
+ todo[title] = proc[title]
+ conds_present = set()
+ for todo in todos:
+ for title in ['conditions', 'enables', 'blockers', 'disables']:
+ for cond_id in todo[title]:
+ conds_present.add(cond_id)
+ self._fields['conditions_present'] = list(conds_present)
+ for prefix in ['en', 'dis']:
+ blers = {}
+ for cond_id in conds_present:
+ blers[str(cond_id)] = self.as_ids(
+ [t for t in todos if cond_id in t[f'{prefix}ables']])
+ self._fields[f'{prefix}ablers_for'] = blers
+ self._fields['processes'] = self.as_ids(self.lib_all('Process'))
+
+
+class TestsWithServer(TestCaseWithServer):
+ """Tests against our HTTP server/handler (and database)."""
def test_basic_GET_day(self) -> None:
"""Test basic (no Processes/Conditions/Todos) GET /day basics."""
self.check_get('/day?date=foo', 400)
self.check_get('/day?date=2024-02-30', 400)
# check undefined day
- date = self._testing_date_in_n_days(0)
- expected = self.GET_day_dict(date)
- self.check_json_get('/day', expected)
+ date = _testing_date_in_n_days(0)
+ exp = ExpectedGetDay(date)
+ self.check_json_get('/day', exp)
# check defined day, with and without make_type parameter
date = '2024-01-01'
- expected = self.GET_day_dict(date)
- expected['make_type'] = 'bar'
- self.check_json_get(f'/day?date={date}&make_type=bar', expected)
+ exp = ExpectedGetDay(date)
+ exp.set('make_type', 'bar')
+ self.check_json_get(f'/day?date={date}&make_type=bar', exp)
# check parsing of 'yesterday', 'today', 'tomorrow'
for name, dist in [('yesterday', -1), ('today', 0), ('tomorrow', +1)]:
- date = self._testing_date_in_n_days(dist)
- expected = self.GET_day_dict(date)
- self.check_json_get(f'/day?date={name}', expected)
+ date = _testing_date_in_n_days(dist)
+ exp = ExpectedGetDay(date)
+ self.check_json_get(f'/day?date={name}', exp)
def test_fail_POST_day(self) -> None:
"""Test malformed/illegal POST /day requests."""
self.check_post({'day_comment': ''}, url, 400)
self.check_post({'make_type': ''}, url, 400)
# to next check illegal new_todo values, we need an actual Process
- self.post_process(1)
+ self.post_exp_process([], {}, 1)
# check illegal new_todo values
post: dict[str, object]
post = {'make_type': '', 'day_comment': '', 'new_todo': ['foo']}
('today', 0, 'b'),
('yesterday', -1, 'c'),
('tomorrow', +1, 'd')]:
- date = name if dist is None else self._testing_date_in_n_days(dist)
+ date = name if dist is None else _testing_date_in_n_days(dist)
post = {'day_comment': test_str, 'make_type': f'x:{test_str}'}
post_url = f'/day?date={name}'
redir_url = f'{post_url}&make_type={post["make_type"]}'
self.check_post(post, post_url, 302, redir_url)
- expected = self.GET_day_dict(date)
- assert isinstance(expected['_library'], dict)
- expected['_library']['Day'][date]['comment'] = test_str
- self.check_json_get(post_url, expected)
+ exp = ExpectedGetDay(date)
+ exp.set_day_from_post(date, post)
+ self.check_json_get(post_url, exp)
def test_GET_day_with_processes_and_todos(self) -> None:
"""Test GET /day displaying Processes and Todos (no trees)."""
date = '2024-01-01'
+ exp = ExpectedGetDay(date)
# check Processes get displayed in ['processes'] and ['_library'],
# even without any Todos referencing them
- procs_data = [[1, 'foo', 'oof', 1.1], # id, title, desc, effort
- [2, 'bar', 'rab', 0.9]]
- procs_expected = []
- for p_data in procs_data:
- procs_expected += [self._post_args_return_expectation(
- p_data, [], ['title', 'description', 'effort'],
- self.proc_as_dict, self.post_process)]
- expected = self.GET_day_dict(date)
- assert isinstance(expected['_library'], dict)
- expected['processes'] = self.as_id_list(procs_expected)
- expected['_library']['Process'] = self.as_refs(procs_expected)
- self.check_json_get(f'/day?date={date}', expected)
+ proc_posts = [{'title': 'foo', 'description': 'oof', 'effort': 1.1},
+ {'title': 'bar', 'description': 'rab', 'effort': 0.9}]
+ for i, proc_post in enumerate(proc_posts):
+ self.post_exp_process([exp], proc_post, i+1)
+ self.check_json_get(f'/day?date={date}', exp)
# post Todos of either process and check their display
- post_day: dict[str, object]
- post_day = {'day_comment': '', 'make_type': '', 'new_todo': [1, 2]}
- todos = [self.todo_as_dict(1, 1, date), self.todo_as_dict(2, 2, date)]
- expected['_library']['Todo'] = self.as_refs(todos)
- expected['_library']['Day'][date]['todos'] = self.as_id_list(todos)
- nodes = [self._todo_node_as_dict(1), self._todo_node_as_dict(2)]
- expected['top_nodes'] = nodes
- self._post_day(f'date={date}', post_day)
- self.check_json_get(f'/day?date={date}', expected)
+ self.post_exp_day([exp], {'new_todo': [1, 2]})
+ self.check_json_get(f'/day?date={date}', exp)
+ # test malformed Todo manipulation posts
+ post_day = {'day_comment': '', 'make_type': '', 'comment': [''],
+ 'new_todo': [], 'done': [1], 'effort': [2.3]}
+ self.check_post(post_day, f'/day?date={date}', 400) # no todo_id
+ post_day['todo_id'] = [2] # not identifying Todo refered by done
+ self.check_post(post_day, f'/day?date={date}', 400)
+ post_day['todo_id'] = [1, 2] # imply range beyond that of effort etc.
+ self.check_post(post_day, f'/day?date={date}', 400)
+ post_day['comment'] = ['FOO', '']
+ self.check_post(post_day, f'/day?date={date}', 400)
+ post_day['effort'] = [2.3, '']
+ post_day['comment'] = ['']
+ self.check_post(post_day, f'/day?date={date}', 400)
# add a comment to one Todo and set the other's doneness and effort
- post_day = {'day_comment': '', 'make_type': '', 'new_todo': [],
- 'todo_id': [1, 2], 'done': [2], 'comment': ['FOO', ''],
- 'effort': [2.3, '']}
- expected['_library']['Todo']['1']['comment'] = 'FOO'
- expected['_library']['Todo']['1']['effort'] = 2.3
- expected['_library']['Todo']['2']['is_done'] = True
- self._post_day(f'date={date}', post_day)
- self.check_json_get(f'/day?date={date}', expected)
+ post_day['comment'] = ['FOO', '']
+ self.post_exp_day([exp], post_day)
+ self.check_json_get(f'/day?date={date}', exp)
+ # invert effort and comment between both Todos
+ # (cannot invert doneness, /day only collects positive setting)
+ post_day['comment'] = ['', 'FOO']
+ post_day['effort'] = ['', 2.3]
+ self.post_exp_day([exp], post_day)
+ self.check_json_get(f'/day?date={date}', exp)
+
+ def test_POST_day_todo_make_types(self) -> None:
+ """Test behavior of POST /todo on 'make_type'='full' and 'empty'."""
+ date = '2024-01-01'
+ exp = ExpectedGetDay(date)
+ # create two Processes, with second one step of first one
+ self.post_exp_process([exp], {}, 2)
+ self.post_exp_process([exp], {'new_top_step': 2}, 1)
+ exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 2, None)])
+ self.check_json_get(f'/day?date={date}', exp)
+ # post Todo of adopting Process, with make_type=full
+ self.post_exp_day([exp], {'make_type': 'full', 'new_todo': [1]})
+ exp.lib_get('Todo', 1)['children'] = [2]
+ exp.lib_set('Todo', [exp.todo_as_dict(2, 2)])
+ top_nodes = [{'todo': 1,
+ 'seen': False,
+ 'children': [{'todo': 2,
+ 'seen': False,
+ 'children': []}]}]
+ exp.force('top_nodes', top_nodes)
+ self.check_json_get(f'/day?date={date}', exp)
+ # post another Todo of adopting Process, expect to adopt existing
+ self.post_exp_day([exp], {'make_type': 'full', 'new_todo': [1]})
+ exp.lib_set('Todo', [exp.todo_as_dict(3, 1, children=[2])])
+ top_nodes += [{'todo': 3,
+ 'seen': False,
+ 'children': [{'todo': 2,
+ 'seen': True,
+ 'children': []}]}]
+ exp.force('top_nodes', top_nodes)
+ self.check_json_get(f'/day?date={date}', exp)
+ # post another Todo of adopting Process, make_type=empty
+ self.post_exp_day([exp], {'make_type': 'empty', 'new_todo': [1]})
+ exp.lib_set('Todo', [exp.todo_as_dict(4, 1)])
+ top_nodes += [{'todo': 4,
+ 'seen': False,
+ 'children': []}]
+ exp.force('top_nodes', top_nodes)
+ self.check_json_get(f'/day?date={date}', exp)
+
+ def test_POST_day_new_todo_order_commutative(self) -> None:
+ """Check that order of 'new_todo' values in POST /day don't matter."""
+ date = '2024-01-01'
+ exp = ExpectedGetDay(date)
+ self.post_exp_process([exp], {}, 2)
+ self.post_exp_process([exp], {'new_top_step': 2}, 1)
+ exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 2, None)])
+ # make-full-day-post batch of Todos of both Processes in one order …,
+ self.post_exp_day([exp], {'make_type': 'full', 'new_todo': [1, 2]})
+ top_nodes: list[dict[str, Any]] = [{'todo': 1,
+ 'seen': False,
+ 'children': [{'todo': 2,
+ 'seen': False,
+ 'children': []}]}]
+ exp.force('top_nodes', top_nodes)
+ exp.lib_get('Todo', 1)['children'] = [2]
+ self.check_json_get(f'/day?date={date}', exp)
+ # … and then in the other, expecting same node tree / relations
+ exp.lib_del('Day', date)
+ date = '2024-01-02'
+ exp.set('day', date)
+ day_post = {'make_type': 'full', 'new_todo': [2, 1]}
+ self.post_exp_day([exp], day_post, date)
+ exp.lib_del('Todo', 1)
+ exp.lib_del('Todo', 2)
+ top_nodes[0]['todo'] = 3 # was: 1
+ top_nodes[0]['children'][0]['todo'] = 4 # was: 2
+ exp.lib_get('Todo', 3)['children'] = [4]
+ self.check_json_get(f'/day?date={date}', exp)
def test_GET_day_with_conditions(self) -> None:
"""Test GET /day displaying Conditions and their relations."""
date = '2024-01-01'
- # add Process with Conditions and their Todos, check display
- conds_data = [[1, False, ['A'], ['a']], # id, is_active, title, desc
- [2, True, ['B'], ['b']]]
- conds_expected = []
- for c_data in conds_data:
- conds_expected += [self._post_args_return_expectation(
- c_data, ['is_active'], ['title', 'description'],
- self.cond_as_dict,
- lambda x, y: self.check_post(y, f'/condition?id={x}'))]
- procs_data = [ # id, title, desc, effort,
- # conditions, disables, blockers, enables
- [1, 'foo', 'oof', 1.1, [1], [1], [2], [2]],
- [2, 'bar', 'rab', 0.9, [2], [2], [1], [1]]]
- procs_expected = []
- for p_data in procs_data:
- procs_expected += [self._post_args_return_expectation(
- p_data,
- ['conditions', 'disables', 'blockers', 'enables'],
- ['title', 'description', 'effort'],
- self.proc_as_dict, self.post_process)]
- expected = self.GET_day_dict(date)
- assert isinstance(expected['_library'], dict)
- expected['processes'] = self.as_id_list(procs_expected)
- expected['_library']['Process'] = self.as_refs(procs_expected)
- expected['_library']['Condition'] = self.as_refs(conds_expected)
- self._post_day(f'date={date}')
- self.check_json_get(f'/day?date={date}', expected)
- # add Todos in relation to Conditions, check consequences
- post_day: dict[str, object]
- post_day = {'day_comment': '', 'make_type': '', 'new_todo': [1, 2]}
- todos = [ # id, process_id, date, conds, disables, blockers, enables
- self.todo_as_dict(1, 1, date, [1], [1], [2], [2]),
- self.todo_as_dict(2, 2, date, [2], [2], [1], [1])]
- expected['_library']['Todo'] = self.as_refs(todos)
- expected['_library']['Day'][date]['todos'] = self.as_id_list(todos)
- nodes = [self._todo_node_as_dict(1), self._todo_node_as_dict(2)]
- expected['top_nodes'] = nodes
- expected['disablers_for'] = {'1': [1], '2': [2]}
- expected['enablers_for'] = {'1': [2], '2': [1]}
- expected['conditions_present'] = self.as_id_list(conds_expected)
- self._post_day(f'date={date}', post_day)
- self.check_json_get(f'/day?date={date}', expected)
+ exp = ExpectedGetDay(date)
+ # check non-referenced Conditions not shown
+ cond_posts = [{'is_active': False, 'title': 'A', 'description': 'a'},
+ {'is_active': True, 'title': 'B', 'description': 'b'}]
+ for i, cond_post in enumerate(cond_posts):
+ self.check_post(cond_post, f'/condition?id={i+1}')
+ self.check_json_get(f'/day?date={date}', exp)
+ # add Processes with Conditions, check Conditions now shown
+ for i, (c1, c2) in enumerate([(1, 2), (2, 1)]):
+ post = {'conditions': [c1], 'disables': [c1],
+ 'blockers': [c2], 'enables': [c2]}
+ self.post_exp_process([exp], post, i+1)
+ for i, cond_post in enumerate(cond_posts):
+ exp.set_cond_from_post(i+1, cond_post)
+ self.check_json_get(f'/day?date={date}', exp)
+ # add Todos in relation to Conditions, check consequence relations
+ self.post_exp_day([exp], {'new_todo': [1, 2]})
+ self.check_json_get(f'/day?date={date}', exp)
def test_GET_calendar(self) -> None:
"""Test GET /calendar responses based on various inputs, DB states."""
self.check_get('/calendar?start=foo', 400)
self.check_get('/calendar?end=foo', 400)
# check default range for expected selection/order without saved days
- expected = self.GET_calendar_dict(-1, 366)
- self.check_json_get('/calendar', expected)
- self.check_json_get('/calendar?start=&end=', expected)
+ exp = ExpectedGetCalendar(-1, 366)
+ self.check_json_get('/calendar', exp)
+ self.check_json_get('/calendar?start=&end=', exp)
# check with named days as delimiters
- expected = self.GET_calendar_dict(-1, +1)
- self.check_json_get('/calendar?start=yesterday&end=tomorrow', expected)
+ exp = ExpectedGetCalendar(-1, +1)
+ self.check_json_get('/calendar?start=yesterday&end=tomorrow', exp)
# check zero-element range
- expected = self.GET_calendar_dict(+1, 0)
- self.check_json_get('/calendar?start=tomorrow&end=today', expected)
+ exp = ExpectedGetCalendar(+1, 0)
+ self.check_json_get('/calendar?start=tomorrow&end=today', exp)
# check saved day shows up in results, proven by its comment
- post_day: dict[str, object] = {'day_comment': 'foo', 'make_type': ''}
- date = self._testing_date_in_n_days(-2)
- self._post_day(f'date={date}', post_day)
- start_date = self._testing_date_in_n_days(-5)
- end_date = self._testing_date_in_n_days(+5)
+ start_date = _testing_date_in_n_days(-5)
+ date = _testing_date_in_n_days(-2)
+ end_date = _testing_date_in_n_days(+5)
+ exp = ExpectedGetCalendar(-5, +5)
+ self.post_exp_day([exp], {'day_comment': 'foo'}, date)
url = f'/calendar?start={start_date}&end={end_date}'
- expected = self.GET_calendar_dict(-5, +5)
- assert isinstance(expected['_library'], dict)
- expected['_library']['Day'][date]['comment'] = post_day['day_comment']
- self.check_json_get(url, expected)
+ self.check_json_get(url, exp)
"""Test Processes module."""
from typing import Any
-from tests.utils import TestCaseWithDB, TestCaseWithServer, TestCaseSansDB
-from plomtask.processes import Process, ProcessStep, ProcessStepsNode
+from tests.utils import (TestCaseSansDB, TestCaseWithDB, TestCaseWithServer,
+ Expected)
+from plomtask.processes import Process, ProcessStep
from plomtask.conditions import Condition
from plomtask.exceptions import HandledException, NotFoundException
from plomtask.todos import Todo
self.assertEqual(sorted(r.enables), sorted(set2))
self.assertEqual(sorted(r.disables), sorted(set3))
- def test_Process_steps(self) -> None:
- """Test addition, nesting, and non-recursion of ProcessSteps"""
- # pylint: disable=too-many-locals
- # pylint: disable=too-many-statements
- p1, p2, p3 = self.three_processes()
- assert isinstance(p1.id_, int)
- assert isinstance(p2.id_, int)
- assert isinstance(p3.id_, int)
- steps_p1: list[ProcessStep] = []
- # add step of process p2 as first (top-level) step to p1
- s_p2_to_p1 = ProcessStep(None, p1.id_, p2.id_, None)
- steps_p1 += [s_p2_to_p1]
- p1.set_steps(self.db_conn, steps_p1)
- p1_dict: dict[int, ProcessStepsNode] = {}
- p1_dict[1] = ProcessStepsNode(p2, None, True, {})
- self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
- # add step of process p3 as second (top-level) step to p1
- s_p3_to_p1 = ProcessStep(None, p1.id_, p3.id_, None)
- steps_p1 += [s_p3_to_p1]
- p1.set_steps(self.db_conn, steps_p1)
- p1_dict[2] = ProcessStepsNode(p3, None, True, {})
- self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
- # add step of process p3 as first (top-level) step to p2,
- steps_p2: list[ProcessStep] = []
- s_p3_to_p2 = ProcessStep(None, p2.id_, p3.id_, None)
- steps_p2 += [s_p3_to_p2]
- p2.set_steps(self.db_conn, steps_p2)
- # expect it as implicit sub-step of p1's second (p3) step
- p2_dict = {3: ProcessStepsNode(p3, None, False, {})}
- p1_dict[1].steps[3] = p2_dict[3]
- self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
- # add step of process p2 as explicit sub-step to p1's second sub-step
- s_p2_to_p1_first = ProcessStep(None, p1.id_, p2.id_, s_p3_to_p1.id_)
- steps_p1 += [s_p2_to_p1_first]
- p1.set_steps(self.db_conn, steps_p1)
- seen_3 = ProcessStepsNode(p3, None, False, {}, False)
- p1_dict[1].steps[3].seen = True
- p1_dict[2].steps[4] = ProcessStepsNode(p2, s_p3_to_p1.id_, True,
- {3: seen_3})
- self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
- # add step of process p3 as explicit sub-step to non-existing p1
- # sub-step (of id=999), expect it to become another p1 top-level step
- s_p3_to_p1_999 = ProcessStep(None, p1.id_, p3.id_, 999)
- steps_p1 += [s_p3_to_p1_999]
- p1.set_steps(self.db_conn, steps_p1)
- p1_dict[5] = ProcessStepsNode(p3, None, True, {})
- self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
- # add step of process p3 as explicit sub-step to p1's implicit p3
- # sub-step, expect it to become another p1 top-level step
- s_p3_to_p1_impl_p3 = ProcessStep(None, p1.id_, p3.id_, s_p3_to_p2.id_)
- steps_p1 += [s_p3_to_p1_impl_p3]
- p1.set_steps(self.db_conn, steps_p1)
- p1_dict[6] = ProcessStepsNode(p3, None, True, {})
- self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
- self.assertEqual(p1.used_as_step_by(self.db_conn), [])
- self.assertEqual(p2.used_as_step_by(self.db_conn), [p1])
- self.assertEqual(p3.used_as_step_by(self.db_conn), [p1, p2])
- # # add step of process p3 as explicit sub-step to p1's first sub-step,
- # # expect it to eliminate implicit p3 sub-step
- # s_p3_to_p1_first_explicit = ProcessStep(None, p1.id_, p3.id_,
- # s_p2_to_p1.id_)
- # p1_dict[1].steps = {7: ProcessStepsNode(p3, 1, True, {})}
- # p1_dict[2].steps[4].steps[3].seen = False
- # steps_p1 += [s_p3_to_p1_first_explicit]
- # p1.set_steps(self.db_conn, steps_p1)
- # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
- # ensure implicit steps non-top explicit steps are shown
- s_p3_to_p2_first = ProcessStep(None, p2.id_, p3.id_, s_p3_to_p2.id_)
- steps_p2 += [s_p3_to_p2_first]
- p2.set_steps(self.db_conn, steps_p2)
- p1_dict[1].steps[3].steps[7] = ProcessStepsNode(p3, 3, False, {}, True)
- p1_dict[2].steps[4].steps[3].steps[7] = ProcessStepsNode(p3, 3, False,
- {}, False)
- self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
- # ensure suppressed step nodes are hidden
- assert isinstance(s_p3_to_p2.id_, int)
- p1.set_step_suppressions(self.db_conn, [s_p3_to_p2.id_])
- p1_dict[1].steps[3].steps = {}
- p1_dict[1].steps[3].is_suppressed = True
- p1_dict[2].steps[4].steps[3].steps = {}
- p1_dict[2].steps[4].steps[3].is_suppressed = True
- self.assertEqual(p1.get_steps(self.db_conn), p1_dict)
+ # def test_Process_steps(self) -> None:
+ # """Test addition, nesting, and non-recursion of ProcessSteps"""
+ # # pylint: disable=too-many-locals
+ # # pylint: disable=too-many-statements
+ # p1, p2, p3 = self.three_processes()
+ # assert isinstance(p1.id_, int)
+ # assert isinstance(p2.id_, int)
+ # assert isinstance(p3.id_, int)
+ # steps_p1: list[ProcessStep] = []
+ # # add step of process p2 as first (top-level) step to p1
+ # s_p2_to_p1 = ProcessStep(None, p1.id_, p2.id_, None)
+ # steps_p1 += [s_p2_to_p1]
+ # p1.set_steps(self.db_conn, steps_p1)
+ # p1_dict: dict[int, ProcessStepsNode] = {}
+ # p1_dict[1] = ProcessStepsNode(p2, None, True, {})
+ # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+ # # add step of process p3 as second (top-level) step to p1
+ # s_p3_to_p1 = ProcessStep(None, p1.id_, p3.id_, None)
+ # steps_p1 += [s_p3_to_p1]
+ # p1.set_steps(self.db_conn, steps_p1)
+ # p1_dict[2] = ProcessStepsNode(p3, None, True, {})
+ # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+ # # add step of process p3 as first (top-level) step to p2,
+ # steps_p2: list[ProcessStep] = []
+ # s_p3_to_p2 = ProcessStep(None, p2.id_, p3.id_, None)
+ # steps_p2 += [s_p3_to_p2]
+ # p2.set_steps(self.db_conn, steps_p2)
+ # # expect it as implicit sub-step of p1's second (p3) step
+ # p2_dict = {3: ProcessStepsNode(p3, None, False, {})}
+ # p1_dict[1].steps[3] = p2_dict[3]
+ # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+ # # add step of process p2 as explicit sub-step to p1's second sub-step
+ # s_p2_to_p1_first = ProcessStep(None, p1.id_, p2.id_, s_p3_to_p1.id_)
+ # steps_p1 += [s_p2_to_p1_first]
+ # p1.set_steps(self.db_conn, steps_p1)
+ # seen_3 = ProcessStepsNode(p3, None, False, {}, False)
+ # p1_dict[1].steps[3].seen = True
+ # p1_dict[2].steps[4] = ProcessStepsNode(p2, s_p3_to_p1.id_, True,
+ # {3: seen_3})
+ # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+ # # add step of process p3 as explicit sub-step to non-existing p1
+ # # sub-step (of id=999), expect it to become another p1 top-level step
+ # s_p3_to_p1_999 = ProcessStep(None, p1.id_, p3.id_, 999)
+ # steps_p1 += [s_p3_to_p1_999]
+ # p1.set_steps(self.db_conn, steps_p1)
+ # p1_dict[5] = ProcessStepsNode(p3, None, True, {})
+ # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+ # # add step of process p3 as explicit sub-step to p1's implicit p3
+ # # sub-step, expect it to become another p1 top-level step
+ # s_p3_to_p1_impl_p3 = ProcessStep(None, p1.id_, p3.id_,
+ # s_p3_to_p2.id_)
+ # steps_p1 += [s_p3_to_p1_impl_p3]
+ # p1.set_steps(self.db_conn, steps_p1)
+ # p1_dict[6] = ProcessStepsNode(p3, None, True, {})
+ # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+ # self.assertEqual(p1.used_as_step_by(self.db_conn), [])
+ # self.assertEqual(p2.used_as_step_by(self.db_conn), [p1])
+ # self.assertEqual(p3.used_as_step_by(self.db_conn), [p1, p2])
+ # # # add step of process p3 as explicit sub-step to p1's first
+ # # # sub-step, expect it to eliminate implicit p3 sub-step
+ # # s_p3_to_p1_first_explicit = ProcessStep(None, p1.id_, p3.id_,
+ # # s_p2_to_p1.id_)
+ # # p1_dict[1].steps = {7: ProcessStepsNode(p3, 1, True, {})}
+ # # p1_dict[2].steps[4].steps[3].seen = False
+ # # steps_p1 += [s_p3_to_p1_first_explicit]
+ # # p1.set_steps(self.db_conn, steps_p1)
+ # # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+ # # ensure implicit steps non-top explicit steps are shown
+ # s_p3_to_p2_first = ProcessStep(None, p2.id_, p3.id_, s_p3_to_p2.id_)
+ # steps_p2 += [s_p3_to_p2_first]
+ # p2.set_steps(self.db_conn, steps_p2)
+ # p1_dict[1].steps[3].steps[7] = ProcessStepsNode(p3, 3, False, {},
+ # True)
+ # p1_dict[2].steps[4].steps[3].steps[7] = ProcessStepsNode(
+ # p3, 3, False, {}, False)
+ # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+ # # ensure suppressed step nodes are hidden
+ # assert isinstance(s_p3_to_p2.id_, int)
+ # p1.set_step_suppressions(self.db_conn, [s_p3_to_p2.id_])
+ # p1_dict[1].steps[3].steps = {}
+ # p1_dict[1].steps[3].is_suppressed = True
+ # p1_dict[2].steps[4].steps[3].steps = {}
+ # p1_dict[2].steps[4].steps[3].is_suppressed = True
+ # self.assertEqual(p1.get_steps(self.db_conn), p1_dict)
def test_Process_conditions(self) -> None:
"""Test setting Process.conditions/enables/disables."""
self.check_identity_with_cache_and_db([])
+class ExpectedGetProcesses(Expected):
+ """Builder of expectations for GET /processes."""
+ _default_dict = {'sort_by': 'title', 'pattern': ''}
+
+ def recalc(self) -> None:
+ """Update internal dictionary by subclass-specific rules."""
+ super().recalc()
+ self._fields['processes'] = self.as_ids(self.lib_all('Process'))
+
+
class TestsWithServer(TestCaseWithServer):
"""Module tests against our HTTP server/handler (and database)."""
+ def _post_process(self, id_: int = 1,
+ form_data: dict[str, Any] | None = None
+ ) -> dict[str, Any]:
+ """POST basic Process."""
+ if not form_data:
+ form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1}
+ self.check_post(form_data, f'/process?id={id_}',
+ redir=f'/process?id={id_}')
+ return form_data
+
def test_do_POST_process(self) -> None:
"""Test POST /process and its effect on the database."""
self.assertEqual(0, len(Process.all(self.db_conn)))
- form_data = self.post_process()
+ form_data = self._post_process()
self.assertEqual(1, len(Process.all(self.db_conn)))
self.check_post(form_data, '/process?id=FOO', 400)
self.check_post(form_data | {'effort': 'foo'}, '/process?id=', 400)
'/process?id=', 400)
self.assertEqual(1, len(Process.all(self.db_conn)))
form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.0}
- self.post_process(2, form_data | {'conditions': []})
+ self._post_process(2, form_data | {'conditions': []})
self.check_post(form_data | {'conditions': [1]}, '/process?id=', 404)
self.check_post({'title': 'foo', 'description': 'foo',
'is_active': False},
'/condition', 302, '/condition?id=1')
- self.post_process(3, form_data | {'conditions': [1]})
- self.post_process(4, form_data | {'disables': [1]})
- self.post_process(5, form_data | {'enables': [1]})
+ self._post_process(3, form_data | {'conditions': [1]})
+ self._post_process(4, form_data | {'disables': [1]})
+ self._post_process(5, form_data | {'enables': [1]})
form_data['delete'] = ''
self.check_post(form_data, '/process?id=', 404)
self.check_post(form_data, '/process?id=6', 404)
def test_do_POST_process_steps(self) -> None:
"""Test behavior of ProcessStep posting."""
# pylint: disable=too-many-statements
- form_data_1 = self.post_process(1)
- self.post_process(2)
- self.post_process(3)
+ form_data_1 = self._post_process(1)
+ self._post_process(2)
+ self._post_process(3)
# post first (top-level) step of process 2 to process 1
form_data_1['new_top_step'] = [2]
- self.post_process(1, form_data_1)
+ self._post_process(1, form_data_1)
retrieved_process = Process.by_id(self.db_conn, 1)
self.assertEqual(len(retrieved_process.explicit_steps), 1)
retrieved_step = retrieved_process.explicit_steps[0]
# post empty steps list to process, expect clean slate, and old step to
# completely disappear
form_data_1['new_top_step'] = []
- self.post_process(1, form_data_1)
+ self._post_process(1, form_data_1)
retrieved_process = Process.by_id(self.db_conn, 1)
self.assertEqual(retrieved_process.explicit_steps, [])
assert retrieved_step_id is not None
ProcessStep.by_id(self.db_conn, retrieved_step_id)
# post new first (top_level) step of process 3 to process 1
form_data_1['new_top_step'] = [3]
- self.post_process(1, form_data_1)
+ self._post_process(1, form_data_1)
retrieved_process = Process.by_id(self.db_conn, 1)
retrieved_step = retrieved_process.explicit_steps[0]
self.assertEqual(retrieved_step.step_process_id, 3)
# post to process steps list without keeps, expect clean slate
form_data_1['new_top_step'] = []
form_data_1['steps'] = [retrieved_step.id_]
- self.post_process(1, form_data_1)
+ self._post_process(1, form_data_1)
retrieved_process = Process.by_id(self.db_conn, 1)
self.assertEqual(retrieved_process.explicit_steps, [])
# post to process empty steps list but keep, expect 400
self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
# post to process steps list with keep and process ID, expect 200
form_data_1[f'step_{retrieved_step_id}_process_id'] = [2]
- self.post_process(1, form_data_1)
+ self._post_process(1, form_data_1)
retrieved_process = Process.by_id(self.db_conn, 1)
self.assertEqual(len(retrieved_process.explicit_steps), 1)
retrieved_step = retrieved_process.explicit_steps[0]
form_data_1['new_top_step'] = [3]
form_data_1['steps'] = [retrieved_step.id_]
form_data_1['keep_step'] = [retrieved_step.id_]
- self.post_process(1, form_data_1)
+ self._post_process(1, form_data_1)
retrieved_process = Process.by_id(self.db_conn, 1)
self.assertEqual(len(retrieved_process.explicit_steps), 2)
retrieved_step_0 = retrieved_process.explicit_steps[1]
# post sub-step to step
form_data_1[f'step_{retrieved_step_0.id_}_process_id'] = [3]
form_data_1[f'new_step_to_{retrieved_step_0.id_}'] = [3]
- self.post_process(1, form_data_1)
+ self._post_process(1, form_data_1)
retrieved_process = Process.by_id(self.db_conn, 1)
self.assertEqual(len(retrieved_process.explicit_steps), 3)
retrieved_step_0 = retrieved_process.explicit_steps[1]
# of ID=1 here so we know the 404 comes from step_to=2 etc. (that tie
# the Process displayed by /process to others), not from not finding
# the main Process itself
- self.post_process(1)
+ self.post_exp_process([], {}, 1)
self.check_get('/process?id=1&step_to=2', 404)
self.check_get('/process?id=1&has_step=2', 404)
- @classmethod
- def GET_processes_dict(cls, procs: list[dict[str, object]]
- ) -> dict[str, object]:
- """Return JSON of GET /processes to expect."""
- library = {'Process': cls.as_refs(procs)} if procs else {}
- d: dict[str, object] = {'processes': cls.as_id_list(procs),
- 'sort_by': 'title',
- 'pattern': '',
- '_library': library}
- return d
-
def test_GET_processes(self) -> None:
"""Test GET /processes."""
# pylint: disable=too-many-statements
# test empty result on empty DB, default-settings on empty params
- expected = self.GET_processes_dict([])
- self.check_json_get('/processes', expected)
+ exp = ExpectedGetProcesses()
+ self.check_json_get('/processes', exp)
# test on meaningless non-empty params (incl. entirely un-used key),
# that 'sort_by' default to 'title' (even if set to something else, as
# long as without handler) and 'pattern' get preserved
- expected['pattern'] = 'bar' # preserved despite zero effect!
+ exp.set('pattern', 'bar') # preserved despite zero effect!
url = '/processes?sort_by=foo&pattern=bar&foo=x'
- self.check_json_get(url, expected)
+ self.check_json_get(url, exp)
# test non-empty result, automatic (positive) sorting by title
- post1: dict[str, Any]
- post2: dict[str, Any]
- post3: dict[str, Any]
- post1 = {'title': 'foo', 'description': 'oof', 'effort': 1.0}
- post2 = {'title': 'bar', 'description': 'rab', 'effort': 1.1}
- post2['new_top_step'] = 1
- post3 = {'title': 'baz', 'description': 'zab', 'effort': 0.9}
- post3['new_top_step'] = 1
- self.post_process(1, post1)
- self.post_process(2, post2)
- self.post_process(3, post3)
- post3['new_top_step'] = 2
- post3['keep_step'] = 2
- post3['steps'] = [2]
- post3['step_2_process_id'] = 1
- self.post_process(3, post3)
- proc1 = self.proc_as_dict(1, post1['title'],
- post1['description'], post1['effort'])
- proc2 = self.proc_as_dict(2, post2['title'],
- post2['description'], post2['effort'])
- proc3 = self.proc_as_dict(3, post3['title'],
- post3['description'], post3['effort'])
- proc2['explicit_steps'] = [1]
- proc3['explicit_steps'] = [2, 3]
- step1 = self.procstep_as_dict(1, 2, 1)
- step2 = self.procstep_as_dict(2, 3, 1)
- step3 = self.procstep_as_dict(3, 3, 2)
- expected = self.GET_processes_dict([proc2, proc3, proc1])
- assert isinstance(expected['_library'], dict)
- expected['_library']['ProcessStep'] = self.as_refs([step1, step2,
- step3])
- self.check_json_get('/processes', expected)
+ proc1_post = {'title': 'foo', 'description': 'oof', 'effort': 1.0}
+ self.post_exp_process([exp], proc1_post, 1)
+ proc2_post = {'title': 'bar', 'description': 'rab', 'effort': 1.1}
+ self.post_exp_process([exp], proc2_post | {'new_top_step': [1]}, 2)
+ proc3_post = {'title': 'baz', 'description': 'zab', 'effort': 0.9}
+ self.post_exp_process([exp], proc3_post | {'new_top_step': [1]}, 3)
+ proc3_post = proc3_post | {'new_top_step': [2], 'keep_step': [2],
+ 'steps': [2], 'step_2_process_id': 1}
+ self.post_exp_process([exp], proc3_post, 3)
+ exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 2, 1),
+ exp.procstep_as_dict(2, 3, 1),
+ exp.procstep_as_dict(3, 3, 2)])
+ exp.lib_get('Process', '')
+ exp.set('pattern', '')
+ exp.force('processes', [2, 3, 1])
+ self.check_json_get('/processes', exp)
# test other sortings
- expected['sort_by'] = '-title'
- expected['processes'] = self.as_id_list([proc1, proc3, proc2])
- self.check_json_get('/processes?sort_by=-title', expected)
- expected['sort_by'] = 'effort'
- expected['processes'] = self.as_id_list([proc3, proc1, proc2])
- self.check_json_get('/processes?sort_by=effort', expected)
- expected['sort_by'] = '-effort'
- expected['processes'] = self.as_id_list([proc2, proc1, proc3])
- self.check_json_get('/processes?sort_by=-effort', expected)
- expected['sort_by'] = 'steps'
- expected['processes'] = self.as_id_list([proc1, proc2, proc3])
- self.check_json_get('/processes?sort_by=steps', expected)
- expected['sort_by'] = '-steps'
- expected['processes'] = self.as_id_list([proc3, proc2, proc1])
- self.check_json_get('/processes?sort_by=-steps', expected)
- expected['sort_by'] = 'owners'
- expected['processes'] = self.as_id_list([proc3, proc2, proc1])
- self.check_json_get('/processes?sort_by=owners', expected)
- expected['sort_by'] = '-owners'
- expected['processes'] = self.as_id_list([proc1, proc2, proc3])
- self.check_json_get('/processes?sort_by=-owners', expected)
+ exp.set('sort_by', '-title')
+ exp.force('processes', [1, 3, 2])
+ self.check_json_get('/processes?sort_by=-title', exp)
+ exp.set('sort_by', 'effort')
+ exp.force('processes', [3, 1, 2])
+ self.check_json_get('/processes?sort_by=effort', exp)
+ exp.set('sort_by', '-effort')
+ exp.force('processes', [2, 1, 3])
+ self.check_json_get('/processes?sort_by=-effort', exp)
+ exp.set('sort_by', 'steps')
+ exp.force('processes', [1, 2, 3])
+ self.check_json_get('/processes?sort_by=steps', exp)
+ exp.set('sort_by', '-steps')
+ exp.force('processes', [3, 2, 1])
+ self.check_json_get('/processes?sort_by=-steps', exp)
+ exp.set('sort_by', 'owners')
+ exp.force('processes', [3, 2, 1])
+ self.check_json_get('/processes?sort_by=owners', exp)
+ exp.set('sort_by', '-owners')
+ exp.force('processes', [1, 2, 3])
+ self.check_json_get('/processes?sort_by=-owners', exp)
# test pattern matching on title
- expected = self.GET_processes_dict([proc2, proc3])
- assert isinstance(expected['_library'], dict)
- expected['pattern'] = 'ba'
- expected['_library']['ProcessStep'] = self.as_refs([step1, step2,
- step3])
- self.check_json_get('/processes?pattern=ba', expected)
+ exp.set('pattern', 'ba')
+ exp.set('sort_by', 'title')
+ exp.lib_del('Process', '1')
+ exp.force('processes', [2, 3])
+ self.check_json_get('/processes?pattern=ba', exp)
# test pattern matching on description
- expected['processes'] = self.as_id_list([proc1])
- expected['_library'] = {'Process': self.as_refs([proc1])}
- expected['pattern'] = 'of'
- self.check_json_get('/processes?pattern=of', expected)
+ exp.set('pattern', 'of')
+ exp.lib_wipe('Process')
+ exp.lib_wipe('ProcessStep')
+ self.post_exp_process([exp], {'description': 'oof', 'effort': 1.0}, 1)
+ exp.force('processes', [1])
+ self.check_json_get('/processes?pattern=of', exp)
"""Test Todos module."""
from typing import Any
-from tests.utils import TestCaseSansDB, TestCaseWithDB, TestCaseWithServer
+from tests.utils import (TestCaseSansDB, TestCaseWithDB, TestCaseWithServer,
+ Expected)
from plomtask.todos import Todo, TodoNode
from plomtask.processes import Process, ProcessStep
from plomtask.conditions import Condition
cmp_1_dict = todo_node_as_dict(node_0)
self.assertEqual(cmp_0_dict, cmp_1_dict)
- def test_Todo_create_with_children(self) -> None:
- """Test parenthood guarantees of Todo.create_with_children."""
+ def test_Todo_ensure_children(self) -> None:
+ """Test parenthood guarantees of Todo.ensure_children."""
assert isinstance(self.proc.id_, int)
proc2 = Process(None)
proc2.save(self.db_conn)
todo_ignore.save(self.db_conn)
self.assertEqual(todo_ignore.children, [])
# test create_with_children on step-less does nothing
- todo_1 = Todo.create_with_children(self.db_conn, self.proc.id_,
- self.date1)
+ todo_1 = Todo(None, self.proc, False, self.date1)
+ todo_1.save(self.db_conn)
+ todo_1.ensure_children(self.db_conn)
self.assertEqual(todo_1.children, [])
self.assertEqual(len(Todo.all(self.db_conn)), 2)
# test create_with_children adopts and creates, and down tree too
- todo_2 = Todo.create_with_children(self.db_conn, proc2.id_, self.date1)
+ todo_2 = Todo(None, proc2, False, self.date1)
+ todo_2.save(self.db_conn)
+ todo_2.ensure_children(self.db_conn)
self.assertEqual(3, len(todo_2.children))
self.assertEqual(todo_1, todo_2.children[0])
self.assertEqual(self.proc, todo_2.children[2].process)
self.assertEqual(todo_3.children[0].process, proc4)
-class TestsWithServer(TestCaseWithServer):
- """Tests against our HTTP server/handler (and database)."""
-
- def setUp(self) -> None:
- super().setUp()
- self._proc1_form_data: Any = self.post_process(1)
- self._date = '2024-01-01'
-
- @classmethod
- def GET_todo_dict(cls,
- target_id: int,
- todos: list[dict[str, object]],
- processes: list[dict[str, object]],
- process_steps: list[dict[str, object]] | None = None,
- conditions: list[dict[str, object]] | None = None
- ) -> dict[str, object]:
- """Return JSON of GET /todo to expect."""
- # pylint: disable=too-many-arguments
- library = {'Todo': cls.as_refs(todos),
- 'Process': cls.as_refs(processes)}
- if process_steps:
- library['ProcessStep'] = cls.as_refs(process_steps)
- conditions = conditions if conditions else []
- if conditions:
- library['Condition'] = cls.as_refs(conditions)
- return {'todo': target_id,
- 'steps_todo_to_process': [],
- 'adoption_candidates_for': {},
- 'process_candidates': [p['id'] for p in processes],
- 'todo_candidates': [t['id'] for t in todos
- if t['id'] != target_id],
- 'condition_candidates': [c['id'] for c in conditions],
- '_library': library}
+class ExpectedGetTodo(Expected):
+ """Builder of expectations for GET /todo."""
+
+ def __init__(self,
+ todo_id: int,
+ *args: Any, **kwargs: Any) -> None:
+ self._fields = {'todo': todo_id,
+ 'steps_todo_to_process': []}
+ super().__init__(*args, **kwargs)
+
+ def recalc(self) -> None:
+ """Update internal dictionary by subclass-specific rules."""
+
+ def walk_steps(step: dict[str, Any]) -> None:
+ if not step['todo']:
+ proc_id = step['process']
+ cands = self.as_ids(
+ [t for t in todos if proc_id == t['process_id']
+ and t['id'] in self._fields['todo_candidates']])
+ self._fields['adoption_candidates_for'][str(proc_id)] = cands
+ for child in step['children']:
+ walk_steps(child)
+
+ super().recalc()
+ self.lib_wipe('Day')
+ todos = self.lib_all('Todo')
+ procs = self.lib_all('Process')
+ conds = self.lib_all('Condition')
+ self._fields['todo_candidates'] = self.as_ids(
+ [t for t in todos if t['id'] != self._fields['todo']])
+ self._fields['process_candidates'] = self.as_ids(procs)
+ self._fields['condition_candidates'] = self.as_ids(conds)
+ self._fields['adoption_candidates_for'] = {}
+ for step in self._fields['steps_todo_to_process']:
+ walk_steps(step)
@staticmethod
- def _step_as_dict(node_id: int,
- children: list[dict[str, object]],
- process: int | None = None,
- todo: int | None = None,
- fillable: bool = False,
- ) -> dict[str, object]:
+ def step_as_dict(node_id: int,
+ children: list[dict[str, object]],
+ process: int | None = None,
+ todo: int | None = None,
+ fillable: bool = False,
+ ) -> dict[str, object]:
+ """Return JSON of TodoOrProcStepsNode to expect."""
return {'node_id': node_id,
'children': children,
'process': process,
'fillable': fillable,
'todo': todo}
- def _make_todo_via_day_post(self, proc_id: int) -> None:
- payload = {'day_comment': '',
- 'new_todo': proc_id,
- 'make_type': 'empty'}
- self.check_post(payload, f'/day?date={self._date}&make_type=empty')
+
+class TestsWithServer(TestCaseWithServer):
+ """Tests against our HTTP server/handler (and database)."""
+
+ def _post_exp_todo(
+ self, id_: int, payload: dict[str, Any], exp: Expected) -> None:
+ self.check_post(payload, f'/todo?id={id_}')
+ exp.set_todo_from_post(id_, payload)
def test_basic_fail_POST_todo(self) -> None:
"""Test basic malformed/illegal POST /todo requests."""
+ self.post_exp_process([], {}, 1)
# test we cannot just POST into non-existing Todo
self.check_post({}, '/todo', 404)
self.check_post({}, '/todo?id=FOO', 400)
self.check_post({}, '/todo?id=0', 404)
self.check_post({}, '/todo?id=1', 404)
# test malformed values on existing Todo
- self._make_todo_via_day_post(1)
+ self.post_exp_day([], {'new_todo': [1]})
for name in [
'adopt', 'effort', 'make_full', 'make_empty', 'step_filler',
'conditions', 'disables', 'blockers', 'enables']:
def test_basic_POST_todo(self) -> None:
"""Test basic POST /todo manipulations."""
- self._make_todo_via_day_post(1)
+ exp = ExpectedGetTodo(1)
+ self.post_exp_process([exp], {}, 1)
+ self.post_exp_day([exp], {'new_todo': [1]})
# test posting naked entity at first changes nothing
- todo_dict = self.todo_as_dict(1, 1)
- proc_dict = self.proc_as_dict(**self._proc1_form_data)
- expected = self.GET_todo_dict(1, [todo_dict], [proc_dict])
- self.check_json_get('/todo?id=1', expected)
+ self.check_json_get('/todo?id=1', exp)
self.check_post({}, '/todo?id=1')
- self.check_json_get('/todo?id=1', expected)
+ self.check_json_get('/todo?id=1', exp)
# test posting doneness, comment, calendarization, effort
- todo_post = {'done': '', 'calendarize': '', 'comment': 'foo',
- 'effort': 2.3}
- todo_dict = self.todo_as_dict(1, 1, is_done=True, calendarize=True,
- comment='foo', effort=2.3)
- expected = self.GET_todo_dict(1, [todo_dict], [proc_dict])
- self.check_post(todo_post, '/todo?id=1')
- self.check_json_get('/todo?id=1', expected)
+ todo_post = {'done': '', 'calendarize': '',
+ 'comment': 'foo', 'effort': 2.3}
+ self._post_exp_todo(1, todo_post, exp)
+ self.check_json_get('/todo?id=1', exp)
# test implicitly un-setting all of those except effort by empty post
self.check_post({}, '/todo?id=1')
- todo_dict = self.todo_as_dict(1, 1, effort=2.3)
- expected = self.GET_todo_dict(1, [todo_dict], [proc_dict])
- self.check_json_get('/todo?id=1', expected)
- # test empty effort post can be explicitly unset by "" post
+ exp.lib_set('Todo', [exp.todo_as_dict(effort=2.3)])
+ self.check_json_get('/todo?id=1', exp)
+ # test effort post can be explicitly unset by "effort":"" post
self.check_post({'effort': ''}, '/todo?id=1')
- todo_dict['effort'] = None
- self.check_json_get('/todo?id=1', expected)
+ exp.lib_set('Todo', [exp.todo_as_dict(effort=None)])
+ self.check_json_get('/todo?id=1', exp)
# test Condition posts
c1_post = {'title': 'foo', 'description': 'oof', 'is_active': False}
c2_post = {'title': 'bar', 'description': 'rab', 'is_active': True}
- self.check_post(c1_post, '/condition', redir='/condition?id=1')
- self.check_post(c2_post, '/condition', redir='/condition?id=2')
- c1_dict = self.cond_as_dict(1, False, ['foo'], ['oof'])
- c2_dict = self.cond_as_dict(2, True, ['bar'], ['rab'])
- conditions = [c1_dict, c2_dict]
+ self.post_exp_cond([exp], 1, c1_post, '?id=1', '?id=1')
+ self.post_exp_cond([exp], 2, c2_post, '?id=2', '?id=2')
todo_post = {'conditions': [1], 'disables': [1],
'blockers': [2], 'enables': [2]}
- for k, v in todo_post.items():
- todo_dict[k] = v
- self.check_post(todo_post, '/todo?id=1')
- expected = self.GET_todo_dict(1, [todo_dict], [proc_dict],
- conditions=conditions)
- self.check_json_get('/todo?id=1', expected)
+ self._post_exp_todo(1, todo_post, exp)
+ self.check_json_get('/todo?id=1', exp)
def test_POST_todo_deletion(self) -> None:
"""Test deletions via POST /todo."""
- self._make_todo_via_day_post(1)
- todo_dict = self.todo_as_dict(1, process_id=1)
- proc_dict = self.proc_as_dict(**self._proc1_form_data)
- expected = self.GET_todo_dict(1, [todo_dict], [proc_dict])
+ exp = ExpectedGetTodo(1)
+ self.post_exp_process([exp], {}, 1)
# test failure of deletion on non-existing Todo
self.check_post({'delete': ''}, '/todo?id=2', 404, '/')
# test deletion of existing Todo
+ self.post_exp_day([exp], {'new_todo': [1]})
self.check_post({'delete': ''}, '/todo?id=1', 302, '/')
self.check_get('/todo?id=1', 404)
+ exp.lib_del('Todo', 1)
# test deletion of adopted Todo
- self._make_todo_via_day_post(1)
- self._make_todo_via_day_post(1)
+ self.post_exp_day([exp], {'new_todo': [1]})
+ self.post_exp_day([exp], {'new_todo': [1]})
self.check_post({'adopt': 2}, '/todo?id=1')
self.check_post({'delete': ''}, '/todo?id=2', 302, '/')
- self.check_json_get('/todo?id=1', expected)
+ exp.lib_del('Todo', 2)
+ self.check_get('/todo?id=2', 404)
+ self.check_json_get('/todo?id=1', exp)
# test deletion of adopting Todo
- self._make_todo_via_day_post(1)
+ self.post_exp_day([exp], {'new_todo': [1]})
self.check_post({'adopt': 2}, '/todo?id=1')
self.check_post({'delete': ''}, '/todo?id=1', 302, '/')
- todo_dict['id'] = 2
- expected = self.GET_todo_dict(2, [todo_dict], [proc_dict])
- self.check_json_get('/todo?id=2', expected)
+ exp.set('todo', 2)
+ exp.lib_del('Todo', 1)
+ self.check_json_get('/todo?id=2', exp)
# test cannot delete Todo with comment or effort
self.check_post({'comment': 'foo'}, '/todo?id=2')
self.check_post({'delete': ''}, '/todo?id=2', 500, '/')
self.check_post({'effort': 5}, '/todo?id=2')
self.check_post({'delete': ''}, '/todo?id=2', 500, '/')
- # test deletion via effort < 0, but only once deletable
+ # test deletion via effort < 0, but only if deletable
self.check_post({'effort': -1, 'comment': 'foo'}, '/todo?id=2')
- todo_dict['comment'] = 'foo'
- todo_dict['effort'] = -1
- self.check_json_get('/todo?id=2', expected)
self.check_post({}, '/todo?id=2')
self.check_get('/todo?id=2', 404)
def test_POST_todo_adoption(self) -> None:
"""Test adoption via POST /todo with "adopt"."""
- # pylint: disable=too-many-locals
- # pylint: disable=too-many-statements
# post two Todos to Day, have first adopt second
- self._make_todo_via_day_post(1)
- self._make_todo_via_day_post(1)
- proc1_dict = self.proc_as_dict(**self._proc1_form_data)
- todo1_dict = self.todo_as_dict(1, process_id=1, children=[2])
- todo2_dict = self.todo_as_dict(2, process_id=1, parents=[1])
- todos = [todo1_dict, todo2_dict]
- expected = self.GET_todo_dict(1, todos, [proc1_dict])
- expected['steps_todo_to_process'] = [self._step_as_dict(1, [], todo=2)]
- self.check_post({'adopt': 2}, '/todo?id=1')
- self.check_json_get('/todo?id=1', expected)
+ exp = ExpectedGetTodo(1)
+ self.post_exp_process([exp], {}, 1)
+ self.post_exp_day([exp], {'new_todo': [1]})
+ self.post_exp_day([exp], {'new_todo': [1]})
+ self._post_exp_todo(1, {'adopt': 2}, exp)
+ exp.set('steps_todo_to_process', [exp.step_as_dict(1, [], todo=2)])
+ self.check_json_get('/todo?id=1', exp)
# test Todo un-adopting by just not sending an adopt
- self.check_post({}, '/todo?id=1')
- todo1_dict['children'] = []
- todo2_dict['parents'] = []
- expected['steps_todo_to_process'] = []
- self.check_json_get('/todo?id=1', expected)
+ self._post_exp_todo(1, {}, exp)
+ exp.set('steps_todo_to_process', [])
+ self.check_json_get('/todo?id=1', exp)
# test fail on trying to adopt non-existing Todo
self.check_post({'adopt': 3}, '/todo?id=1', 404)
# test cannot self-adopt
self.check_post({'adopt': 1}, '/todo?id=1', 400)
# test cannot do 1-step circular adoption
- self.check_post({'adopt': 1}, '/todo?id=2')
- todo1_dict['parents'] = [2]
- todo2_dict['children'] = [1]
+ self._post_exp_todo(2, {'adopt': 1}, exp)
self.check_post({'adopt': 2}, '/todo?id=1', 400)
# test cannot do 2-step circular adoption
- self._make_todo_via_day_post(1)
- self.check_post({'adopt': 2}, '/todo?id=3')
- todo3_dict = self.todo_as_dict(3, process_id=1, children=[2])
- todo2_dict['parents'] = [3]
- todos += [todo3_dict]
+ self.post_exp_day([exp], {'new_todo': [1]})
+ self._post_exp_todo(3, {'adopt': 2}, exp)
self.check_post({'adopt': 3}, '/todo?id=1', 400)
# test can adopt Todo into ProcessStep chain via its Process (with key
# 'step_filler' equivalent to single-element 'adopt' if intable)
- proc_post = {'title': 'A', 'description': '', 'effort': 1.0}
- self.post_process(3, proc_post)
- self.post_process(2, proc_post)
- self.post_process(1, self._proc1_form_data | {'new_top_step': [2, 3]})
- self._make_todo_via_day_post(2)
- self._make_todo_via_day_post(3)
- self.check_post({'step_filler': 5, 'adopt': [4]}, '/todo?id=1')
- proc3_dict = self.proc_as_dict(3)
- proc2_dict = self.proc_as_dict(2)
- proc1_dict['explicit_steps'] = [1, 2]
- procs = [proc1_dict, proc2_dict, proc3_dict]
- procsteps = [self.procstep_as_dict(1, 1, 2),
- self.procstep_as_dict(2, 1, 3)]
- todo1_dict['children'] = [4, 5]
- todo4_dict = self.todo_as_dict(4, process_id=2, parents=[1])
- todo5_dict = self.todo_as_dict(5, process_id=3, parents=[1])
- todos += [todo4_dict, todo5_dict]
- expected = self.GET_todo_dict(1, todos, procs, procsteps)
- step_proc2 = self._step_as_dict(1, [], 2, 4, True)
- step_proc3 = self._step_as_dict(2, [], 3, 5, True)
- expected['steps_todo_to_process'] = [step_proc2, step_proc3]
- self.check_json_get('/todo?id=1', expected)
+ self.post_exp_process([exp], {}, 2)
+ self.post_exp_process([exp], {}, 3)
+ self.post_exp_process([exp], {'new_top_step': [2, 3]}, 1)
+ exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 2),
+ exp.procstep_as_dict(2, 1, 3)])
+ step1_proc2 = exp.step_as_dict(1, [], 2, None, True)
+ step2_proc3 = exp.step_as_dict(2, [], 3, None, True)
+ exp.set('steps_todo_to_process', [step1_proc2, step2_proc3])
+ self.post_exp_day([exp], {'new_todo': [2]})
+ self.post_exp_day([exp], {'new_todo': [3]})
+ self.check_json_get('/todo?id=1', exp)
+ self._post_exp_todo(1, {'step_filler': 5, 'adopt': [4]}, exp)
+ step1_proc2 = exp.step_as_dict(1, [], 2, 4, True)
+ step2_proc3 = exp.step_as_dict(2, [], 3, 5, True)
+ exp.set('steps_todo_to_process', [step1_proc2, step2_proc3])
+ self.check_json_get('/todo?id=1', exp)
# test 'ignore' values for 'step_filler' are ignored, and intable
# 'step_filler' values are interchangeable with those of 'adopt'
todo_post = {'adopt': 5, 'step_filler': ['ignore', 4]}
self.check_post(todo_post, '/todo?id=1')
- self.check_json_get('/todo?id=1', expected)
- # test cannot adopt into non-top-level elements of chain
- self.post_process(4, proc_post)
- self.post_process(3, proc_post | {'new_top_step': 4, 'step_of': [1]})
- proc4_dict = self.proc_as_dict(4)
- proc3_dict['explicit_steps'] = [3]
- procs += [proc4_dict]
- procsteps += [self.procstep_as_dict(3, 3, 4)]
- step_proc4 = self._step_as_dict(3, [], 4, None, True)
- step_proc3['children'] = [step_proc4]
- self._make_todo_via_day_post(4)
- self.check_post({'adopt': [4, 5, 6]}, '/todo?id=1')
- todo6_dict = self.todo_as_dict(6, process_id=4, parents=[1])
- todo1_dict['children'] = [4, 5, 6]
- todos += [todo6_dict]
- expected = self.GET_todo_dict(1, todos, procs, procsteps)
- step2_proc4 = self._step_as_dict(4, [], None, 6, False)
- expected['steps_todo_to_process'] = [step_proc2, step_proc3,
- step2_proc4]
- expected['adoption_candidates_for'] = {'4': [6]}
- self.check_json_get('/todo?id=1', expected)
+ self.check_json_get('/todo?id=1', exp)
+ # test cannot adopt into non-top-level elements of chain, instead
+ # creating new top-level steps when adopting of respective Process
+ self.post_exp_process([exp], {}, 4)
+ self.post_exp_process([exp], {'new_top_step': 4, 'step_of': [1]}, 3)
+ exp.lib_set('ProcessStep', [exp.procstep_as_dict(3, 3, 4)])
+ step3_proc4 = exp.step_as_dict(3, [], 4, None, True)
+ step2_proc3 = exp.step_as_dict(2, [step3_proc4], 3, 5, True)
+ exp.set('steps_todo_to_process', [step1_proc2, step2_proc3])
+ self.post_exp_day([exp], {'new_todo': [4]})
+ self._post_exp_todo(1, {'adopt': [4, 5, 6]}, exp)
+ step4_todo6 = exp.step_as_dict(4, [], None, 6, False)
+ exp.set('steps_todo_to_process', [step1_proc2, step2_proc3,
+ step4_todo6])
+ self.check_json_get('/todo?id=1', exp)
def test_POST_todo_make_full(self) -> None:
"""Test creation and adoption via POST /todo with "make_full"."""
- # pylint: disable=too-many-locals
# create chain of Processes
- proc_post = {'title': 'A', 'description': '', 'effort': 1.0}
- self.post_process(2, proc_post | {'new_top_step': 1})
- self.post_process(3, proc_post | {'new_top_step': 2})
- self.post_process(4, proc_post | {'new_top_step': 3})
- proc1_dict = self.proc_as_dict(**self._proc1_form_data)
- proc2_dict = self.proc_as_dict(2, explicit_steps=[1])
- proc3_dict = self.proc_as_dict(3, explicit_steps=[2])
- proc4_dict = self.proc_as_dict(4, explicit_steps=[3])
- procs = [proc1_dict, proc2_dict, proc3_dict, proc4_dict]
- procsteps = [self.procstep_as_dict(1, 2, 1),
- self.procstep_as_dict(2, 3, 2),
- self.procstep_as_dict(3, 4, 3)]
+ exp = ExpectedGetTodo(1)
+ self.post_exp_process([exp], {}, 1)
+ for i in range(1, 4):
+ self.post_exp_process([exp], {'new_top_step': i}, i+1)
+ exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 2, 1),
+ exp.procstep_as_dict(2, 3, 2),
+ exp.procstep_as_dict(3, 4, 3)])
+ step3_proc1 = exp.step_as_dict(3, [], 1, None, False)
+ step2_proc2 = exp.step_as_dict(2, [step3_proc1], 2, None, False)
+ step1_proc3 = exp.step_as_dict(1, [step2_proc2], 3, None, True)
+ exp.set('steps_todo_to_process', [step1_proc3])
# post (childless) Todo of chain end, then make_full on next in line
- self._make_todo_via_day_post(4)
- todo1_dict = self.todo_as_dict(1, 4, children=[2])
- todo2_dict = self.todo_as_dict(2, 3, children=[3], parents=[1])
- todo3_dict = self.todo_as_dict(3, 2, parents=[2], children=[4])
- todo4_dict = self.todo_as_dict(4, 1, parents=[3])
- todos = [todo1_dict, todo2_dict, todo3_dict, todo4_dict]
- expected = self.GET_todo_dict(1, todos, procs, procsteps)
- step_proc1 = self._step_as_dict(3, [], 1, 4, True)
- step_proc2 = self._step_as_dict(2, [step_proc1], 2, 3, True)
- step_proc3 = self._step_as_dict(1, [step_proc2], 3, 2, True)
- expected['steps_todo_to_process'] = [step_proc3]
+ self.post_exp_day([exp], {'new_todo': [4]})
self.check_post({'step_filler': 'make_full_3'}, '/todo?id=1')
- self.check_json_get('/todo?id=1', expected)
+ exp.set_todo_from_post(4, {'process_id': 1})
+ exp.set_todo_from_post(3, {'process_id': 2, 'children': [4]})
+ exp.set_todo_from_post(2, {'process_id': 3, 'children': [3]})
+ exp.set_todo_from_post(1, {'process_id': 4, 'children': [2]})
+ step3_proc1 = exp.step_as_dict(3, [], 1, 4, True)
+ step2_proc2 = exp.step_as_dict(2, [step3_proc1], 2, 3, True)
+ step1_proc3 = exp.step_as_dict(1, [step2_proc2], 3, 2, True)
+ exp.set('steps_todo_to_process', [step1_proc3])
+ self.check_json_get('/todo?id=1', exp)
# make new chain next to expected, find steps_todo_to_process extended,
# expect existing Todo demanded by new chain be adopted into new chain
self.check_post({'make_full': 2, 'adopt': [2]}, '/todo?id=1')
- todo5_dict = self.todo_as_dict(5, 2, parents=[1], children=[4])
- todo1_dict['children'] = [2, 5]
- todo4_dict['parents'] = [3, 5]
- todos += [todo5_dict]
- step2_proc1 = self._step_as_dict(5, [], None, 4)
- step2_proc2 = self._step_as_dict(4, [step2_proc1], None, 5)
- expected = self.GET_todo_dict(1, todos, procs, procsteps)
- expected['steps_todo_to_process'] = [step_proc3, step2_proc2]
- self.check_json_get('/todo?id=1', expected)
+ exp.set_todo_from_post(5, {'process_id': 2, 'children': [4]})
+ exp.set_todo_from_post(1, {'process_id': 4, 'children': [2, 5]})
+ step5_todo4 = exp.step_as_dict(5, [], None, 4)
+ step4_todo5 = exp.step_as_dict(4, [step5_todo4], None, 5)
+ exp.set('steps_todo_to_process', [step1_proc3, step4_todo5])
+ self.check_json_get('/todo?id=1', exp)
# fail on trying to call make_full on non-existing Process
self.check_post({'make_full': 5}, '/todo?id=1', 404)
def test_POST_todo_make_empty(self) -> None:
"""Test creation and adoption via POST /todo with "make_empty"."""
- # pylint: disable=too-many-locals
# create chain of Processes
- proc_post = {'title': 'A', 'description': '', 'effort': 1.0}
- self.post_process(2, proc_post | {'new_top_step': 1})
- self.post_process(3, proc_post | {'new_top_step': 2})
- self.post_process(4, proc_post | {'new_top_step': 3})
- proc1_dict = self.proc_as_dict(**self._proc1_form_data)
- proc2_dict = self.proc_as_dict(2, explicit_steps=[1])
- proc3_dict = self.proc_as_dict(3, explicit_steps=[2])
- proc4_dict = self.proc_as_dict(4, explicit_steps=[3])
- procs = [proc1_dict, proc2_dict, proc3_dict, proc4_dict]
- procsteps = [self.procstep_as_dict(1, 2, 1),
- self.procstep_as_dict(2, 3, 2),
- self.procstep_as_dict(3, 4, 3)]
+ exp = ExpectedGetTodo(1)
+ self.post_exp_process([exp], {}, 1)
+ for i in range(1, 4):
+ self.post_exp_process([exp], {'new_top_step': i}, i+1)
+ exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 2, 1),
+ exp.procstep_as_dict(2, 3, 2),
+ exp.procstep_as_dict(3, 4, 3)])
# post (childless) Todo of chain end, then make empty on next in line
- self._make_todo_via_day_post(4)
- todo1_dict = self.todo_as_dict(1, 4, children=[2])
- todo2_dict = self.todo_as_dict(2, 3, parents=[1])
- todos = [todo1_dict, todo2_dict]
- expected = self.GET_todo_dict(1, todos, procs, procsteps)
- step_proc1 = self._step_as_dict(3, [], 1, None)
- step_proc2 = self._step_as_dict(2, [step_proc1], 2, None, True)
- step_proc3 = self._step_as_dict(1, [step_proc2], 3, 2, True)
- expected['steps_todo_to_process'] = [step_proc3]
- expected['adoption_candidates_for'] = {'1': [], '2': []}
+ self.post_exp_day([exp], {'new_todo': [4]})
+ step3_proc1 = exp.step_as_dict(3, [], 1)
+ step2_proc2 = exp.step_as_dict(2, [step3_proc1], 2)
+ step1_proc3 = exp.step_as_dict(1, [step2_proc2], 3, None, True)
+ exp.set('steps_todo_to_process', [step1_proc3])
+ self.check_json_get('/todo?id=1', exp)
self.check_post({'step_filler': 'make_empty_3'}, '/todo?id=1')
- self.check_json_get('/todo?id=1', expected)
+ exp.set_todo_from_post(2, {'process_id': 3})
+ exp.set_todo_from_post(1, {'process_id': 4, 'children': [2]})
+ step2_proc2 = exp.step_as_dict(2, [step3_proc1], 2, None, True)
+ step1_proc3 = exp.step_as_dict(1, [step2_proc2], 3, 2, True)
+ exp.set('steps_todo_to_process', [step1_proc3])
+ self.check_json_get('/todo?id=1', exp)
# make new top-level Todo without chain implied by its Process
self.check_post({'make_empty': 2, 'adopt': [2]}, '/todo?id=1')
- todo3_dict = self.todo_as_dict(3, 2, parents=[1], children=[])
- todo1_dict['children'] = [2, 3]
- todos += [todo3_dict]
- step2_proc2 = self._step_as_dict(4, [], None, 3)
- expected = self.GET_todo_dict(1, todos, procs, procsteps)
- expected['steps_todo_to_process'] = [step_proc3, step2_proc2]
- expected['adoption_candidates_for'] = {'1': [], '2': [3]}
- self.check_json_get('/todo?id=1', expected)
+ exp.set_todo_from_post(3, {'process_id': 2})
+ exp.set_todo_from_post(1, {'process_id': 4, 'children': [2, 3]})
+ step4_todo3 = exp.step_as_dict(4, [], None, 3)
+ exp.set('steps_todo_to_process', [step1_proc3, step4_todo3])
+ self.check_json_get('/todo?id=1', exp)
# fail on trying to call make_empty on non-existing Process
self.check_post({'make_full': 5}, '/todo?id=1', 404)
- def test_do_GET_todo(self) -> None:
+ def test_GET_todo(self) -> None:
"""Test GET /todo response codes."""
- self._make_todo_via_day_post(1)
# test malformed or illegal parameter values
self.check_get('/todo', 404)
self.check_get('/todo?id=', 404)
self.check_get('/todo?id=0', 404)
self.check_get('/todo?id=2', 404)
# test all existing Processes are shown as available
- proc_post = {'title': 'A', 'description': '', 'effort': 1.0}
- self.post_process(2, proc_post)
- todo1_dict = self.todo_as_dict(1, process_id=1)
- proc1_dict = self.proc_as_dict(1, **self._proc1_form_data)
- proc2_dict = self.proc_as_dict(2)
- procs = [proc1_dict, proc2_dict]
- expected = self.GET_todo_dict(1, [todo1_dict], procs)
- self.check_json_get('/todo?id=1', expected)
+ exp = ExpectedGetTodo(1)
+ self.post_exp_process([exp], {}, 1)
+ self.post_exp_day([exp], {'new_todo': [1]})
+ self.post_exp_process([exp], {}, 2)
+ self.check_json_get('/todo?id=1', exp)
# test chain of Processes shown as potential step nodes
- self.post_process(2, proc_post)
- self.post_process(3, proc_post)
- self.post_process(4, proc_post)
- self.post_process(1, self._proc1_form_data | {'new_top_step': 2})
- self.post_process(2, proc_post | {'new_top_step': 3, 'step_of': [1]})
- self.post_process(3, proc_post | {'new_top_step': 4, 'step_of': [2]})
- proc1_dict['explicit_steps'] = [1]
- proc2_dict['explicit_steps'] = [2]
- proc3_dict = self.proc_as_dict(3, explicit_steps=[3])
- proc4_dict = self.proc_as_dict(4)
- procs += [proc3_dict, proc4_dict]
- procsteps = [self.procstep_as_dict(1, 1, 2, None),
- self.procstep_as_dict(2, 2, 3, None),
- self.procstep_as_dict(3, 3, 4, None)]
- expected = self.GET_todo_dict(1, [todo1_dict], procs, procsteps)
- step_proc4 = self._step_as_dict(3, [], 4)
- step_proc3 = self._step_as_dict(2, [step_proc4], 3)
- step_proc2 = self._step_as_dict(1, [step_proc3], 2, fillable=True)
- expected['steps_todo_to_process'] = [step_proc2]
- expected['adoption_candidates_for'] = {'2': [], '3': [], '4': []}
- self.check_json_get('/todo?id=1', expected)
+ self.post_exp_process([exp], {}, 3)
+ self.post_exp_process([exp], {}, 4)
+ self.post_exp_process([exp], {'new_top_step': 2}, 1)
+ self.post_exp_process([exp], {'new_top_step': 3, 'step_of': [1]}, 2)
+ self.post_exp_process([exp], {'new_top_step': 4, 'step_of': [2]}, 3)
+ exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 2, None),
+ exp.procstep_as_dict(2, 2, 3, None),
+ exp.procstep_as_dict(3, 3, 4, None)])
+ step3_proc4 = exp.step_as_dict(3, [], 4)
+ step2_proc3 = exp.step_as_dict(2, [step3_proc4], 3)
+ step1_proc2 = exp.step_as_dict(1, [step2_proc3], 2, fillable=True)
+ exp.set('steps_todo_to_process', [step1_proc2])
+ self.check_json_get('/todo?id=1', exp)
# test display of parallel chains
proc_steps_post = {'new_top_step': 4, 'keep_step': [1],
'step_1_process_id': 2, 'steps': [1, 4]}
- self.post_process(1, self._proc1_form_data | proc_steps_post)
- proc1_dict['explicit_steps'] = [1, 4]
- step2_proc4 = self._step_as_dict(4, [], 4, fillable=True)
- procsteps += [self.procstep_as_dict(4, 1, 4, None)]
- expected = self.GET_todo_dict(1, [todo1_dict], procs, procsteps)
- expected['steps_todo_to_process'] = [step_proc2, step2_proc4]
- expected['adoption_candidates_for'] = {'2': [], '3': [], '4': []}
- self.check_json_get('/todo?id=1', expected)
-
- def test_do_POST_doneness_relations(self) -> None:
+ self.post_exp_process([], proc_steps_post, 1)
+ step4_proc4 = exp.step_as_dict(4, [], 4, fillable=True)
+ exp.lib_set('ProcessStep', [exp.procstep_as_dict(4, 1, 4, None)])
+ exp.set('steps_todo_to_process', [step1_proc2, step4_proc4])
+ self.check_json_get('/todo?id=1', exp)
+
+ def test_POST_todo_doneness_relations(self) -> None:
"""Test Todo.is_done Condition, adoption relations for /todo POSTs."""
+ self.post_exp_process([], {}, 1)
# test Todo with adoptee can only be set done if adoptee is done too
- self._make_todo_via_day_post(1)
- self._make_todo_via_day_post(1)
+ self.post_exp_day([], {'new_todo': [1]})
+ self.post_exp_day([], {'new_todo': [1]})
self.check_post({'adopt': 2, 'done': ''}, '/todo?id=1', 400)
self.check_post({'done': ''}, '/todo?id=2')
self.check_post({'adopt': 2, 'done': ''}, '/todo?id=1', 302)
self.check_post({'disables': [1]}, '/todo?id=1')
self.check_post({'disables': [1], 'done': ''}, '/todo?id=1')
self.check_post({'blockers': [1]}, '/todo?id=2')
-
- def test_do_POST_day_todo_adoption(self) -> None:
- """Test Todos posted to Day view may adopt existing Todos."""
- form_data = self.post_process(
- 2, self._proc1_form_data | {'new_top_step': 1})
- form_data = {'day_comment': '', 'new_todo': 1, 'make_type': 'full'}
- self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
- form_data['new_todo'] = 2
- self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
- todo1 = Todo.by_date(self.db_conn, '2024-01-01')[0]
- todo2 = Todo.by_date(self.db_conn, '2024-01-01')[1]
- self.assertEqual(todo1.children, [])
- self.assertEqual(todo1.parents, [todo2])
- self.assertEqual(todo2.children, [todo1])
- self.assertEqual(todo2.parents, [])
-
- def test_do_POST_day_todo_multiple_inner_adoption(self) -> None:
- """Test multiple Todos can be posted to Day view w. inner adoption."""
-
- def key_order_func(t: Todo) -> int:
- assert isinstance(t.process.id_, int)
- return t.process.id_
-
- def check_adoption(date: str, new_todos: list[int]) -> None:
- form_data = {'day_comment': '', 'new_todo': new_todos,
- 'make_type': 'full'}
- self.check_post(form_data, f'/day?date={date}&make_type=full', 302)
- day_todos = Todo.by_date(self.db_conn, date)
- day_todos.sort(key=key_order_func)
- todo1 = day_todos[0]
- todo2 = day_todos[1]
- self.assertEqual(todo1.children, [])
- self.assertEqual(todo1.parents, [todo2])
- self.assertEqual(todo2.children, [todo1])
- self.assertEqual(todo2.parents, [])
-
- def check_nesting_adoption(process_id: int, date: str,
- new_top_steps: list[int]) -> None:
- form_data = {'title': '', 'description': '', 'effort': 1,
- 'step_of': [2]}
- form_data = self.post_process(1, form_data)
- form_data['new_top_step'] = new_top_steps
- form_data['step_of'] = []
- form_data = self.post_process(process_id, form_data)
- form_data = {'day_comment': '', 'new_todo': [process_id],
- 'make_type': 'full'}
- self.check_post(form_data, f'/day?date={date}&make_type=full', 302)
- day_todos = Todo.by_date(self.db_conn, date)
- day_todos.sort(key=key_order_func, reverse=True)
- self.assertEqual(len(day_todos), 3)
- todo1 = day_todos[0] # process of process_id
- todo2 = day_todos[1] # process 2
- todo3 = day_todos[2] # process 1
- self.assertEqual(sorted(todo1.children), sorted([todo2, todo3]))
- self.assertEqual(todo1.parents, [])
- self.assertEqual(todo2.children, [todo3])
- self.assertEqual(todo2.parents, [todo1])
- self.assertEqual(todo3.children, [])
- self.assertEqual(sorted(todo3.parents), sorted([todo2, todo1]))
-
- self.post_process(2, self._proc1_form_data | {'new_top_step': 1})
- check_adoption('2024-01-01', [1, 2])
- check_adoption('2024-01-02', [2, 1])
- check_nesting_adoption(3, '2024-01-03', [1, 2])
- check_nesting_adoption(4, '2024-01-04', [2, 1])
-
- def test_do_POST_day_todo_doneness(self) -> None:
- """Test Todo doneness can be posted to Day view."""
- form_data = {'day_comment': '', 'new_todo': [1], 'make_type': 'full'}
- self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
- todo = Todo.by_date(self.db_conn, '2024-01-01')[0]
- form_data = {'day_comment': '', 'todo_id': [1], 'make_type': 'full',
- 'comment': [''], 'done': [], 'effort': ['']}
- self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
- todo = Todo.by_date(self.db_conn, '2024-01-01')[0]
- self.assertEqual(todo.is_done, False)
- form_data = {'day_comment': '', 'todo_id': [1], 'done': [1],
- 'make_type': 'full', 'comment': [''], 'effort': ['']}
- self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
- todo = Todo.by_date(self.db_conn, '2024-01-01')[0]
- self.assertEqual(todo.is_done, True)
from http.client import HTTPConnection
from datetime import datetime, timedelta
from time import sleep
-from json import loads as json_loads
+from json import loads as json_loads, dumps as json_dumps
from urllib.parse import urlencode
from uuid import uuid4
from os import remove as remove_file
+from pprint import pprint
from plomtask.db import DatabaseFile, DatabaseConnection
from plomtask.http import TaskHandler, TaskServer
from plomtask.processes import Process, ProcessStep
self.check_identity_with_cache_and_db([])
-class TestCaseWithServer(TestCaseWithDB):
- """Module tests against our HTTP server/handler (and database)."""
-
- def setUp(self) -> None:
- super().setUp()
- self.httpd = TaskServer(self.db_file, ('localhost', 0), TaskHandler)
- self.server_thread = Thread(target=self.httpd.serve_forever)
- self.server_thread.daemon = True
- self.server_thread.start()
- self.conn = HTTPConnection(str(self.httpd.server_address[0]),
- self.httpd.server_address[1])
- self.httpd.render_mode = 'json'
+class Expected:
+ """Builder of (JSON-like) dict to compare against responses of test server.
+
+ Collects all items and relations we expect expressed in the server's JSON
+ responses and puts them into the proper json.dumps-friendly dict structure,
+ accessibla via .as_dict, to compare them in TestsWithServer.check_json_get.
+
+ On its own provides for .as_dict output only {"_library": …}, initialized
+ from .__init__ and to be directly manipulated via the .lib* methods.
+ Further structures of the expected response may be added and kept
+ up-to-date by subclassing .__init__, .recalc, and .d.
+
+ NB: Lots of expectations towards server behavior will be made explicit here
+ (or in the subclasses) rather than in the actual TestCase methods' code.
+ """
+ _default_dict: dict[str, Any]
+ _forced: dict[str, Any]
+ _fields: dict[str, Any]
+ _on_empty_make_temp: tuple[str, str]
+
+ def __init__(self,
+ todos: list[dict[str, Any]] | None = None,
+ procs: list[dict[str, Any]] | None = None,
+ procsteps: list[dict[str, Any]] | None = None,
+ conds: list[dict[str, Any]] | None = None,
+ days: list[dict[str, Any]] | None = None
+ ) -> None:
+ # pylint: disable=too-many-arguments
+ for name in ['_default_dict', '_fields', '_forced']:
+ if not hasattr(self, name):
+ setattr(self, name, {})
+ self._lib = {}
+ for title, items in [('Todo', todos),
+ ('Process', procs),
+ ('ProcessStep', procsteps),
+ ('Condition', conds),
+ ('Day', days)]:
+ if items:
+ self._lib[title] = self._as_refs(items)
+ for k, v in self._default_dict.items():
+ if k not in self._fields:
+ self._fields[k] = v
+
+ def recalc(self) -> None:
+ """Update internal dictionary by subclass-specific rules."""
+ todos = self.lib_all('Todo')
+ for todo in todos:
+ todo['parents'] = []
+ for todo in todos:
+ for child_id in todo['children']:
+ self.lib_get('Todo', child_id)['parents'] += [todo['id']]
+ todo['children'].sort()
+ procsteps = self.lib_all('ProcessStep')
+ procs = self.lib_all('Process')
+ for proc in procs:
+ proc['explicit_steps'] = [s['id'] for s in procsteps
+ if s['owner_id'] == proc['id']]
+
+ @property
+ def as_dict(self) -> dict[str, Any]:
+ """Return dict to compare against test server JSON responses."""
+ make_temp = False
+ if hasattr(self, '_on_empty_make_temp'):
+ category, dicter = getattr(self, '_on_empty_make_temp')
+ id_ = self._fields[category.lower()]
+ make_temp = not bool(self.lib_get(category, id_))
+ if make_temp:
+ f = getattr(self, dicter)
+ self.lib_set(category, [f(id_)])
+ self.recalc()
+ d = {'_library': self._lib}
+ for k, v in self._fields.items():
+ # we expect everything sortable to be sorted
+ if isinstance(v, list) and k not in self._forced:
+ # NB: if we don't test for v being list, sorted() on an empty
+ # dict may return an empty list
+ try:
+ v = sorted(v)
+ except TypeError:
+ pass
+ d[k] = v
+ for k, v in self._forced.items():
+ d[k] = v
+ if make_temp:
+ json = json_dumps(d)
+ self.lib_del(category, id_)
+ d = json_loads(json)
+ return d
- def tearDown(self) -> None:
- self.httpd.shutdown()
- self.httpd.server_close()
- self.server_thread.join()
- super().tearDown()
+ def lib_get(self, category: str, id_: str | int) -> dict[str, Any]:
+ """From library, return item of category and id_, or empty dict."""
+ str_id = str(id_)
+ if category in self._lib and str_id in self._lib[category]:
+ return self._lib[category][str_id]
+ return {}
+
+ def lib_all(self, category: str) -> list[dict[str, Any]]:
+ """From library, return items of category, or [] if none."""
+ if category in self._lib:
+ return list(self._lib[category].values())
+ return []
+
+ def lib_set(self, category: str, items: list[dict[str, object]]) -> None:
+ """Update library for category with items."""
+ if category not in self._lib:
+ self._lib[category] = {}
+ for k, v in self._as_refs(items).items():
+ self._lib[category][k] = v
+
+ def lib_del(self, category: str, id_: str | int) -> None:
+ """Remove category element of id_ from library."""
+ del self._lib[category][str(id_)]
+ if 0 == len(self._lib[category]):
+ del self._lib[category]
+
+ def lib_wipe(self, category: str) -> None:
+ """Remove category from library."""
+ if category in self._lib:
+ del self._lib[category]
+
+ def set(self, field_name: str, value: object) -> None:
+ """Set top-level .as_dict field."""
+ self._fields[field_name] = value
+
+ def force(self, field_name: str, value: object) -> None:
+ """Set ._forced field to ensure value in .as_dict."""
+ self._forced[field_name] = value
@staticmethod
- def as_id_list(items: list[dict[str, object]]) -> list[int | str]:
- """Return list of only 'id' fields of items."""
- # NB: To tighten the mypy test, consider to, instead of returning
- # list[str | int], returnlist[int] | list[str]. But since so far to me
- # the only way to make that work seems to be to repaclement of the
- # currently active last line with complexity of the out-commented code
- # block beneath, I currently opt for the status quo.
- id_list = []
- for item in items:
- assert isinstance(item['id'], (int, str))
- id_list += [item['id']]
- return id_list
- # if id_list:
- # if isinstance(id_list[0], int):
- # for id_ in id_list:
- # assert isinstance(id_, int)
- # l_int: list[int] = [id_ for id_ in id_list
- # if isinstance(id_, int)]
- # return l_int
- # for id_ in id_list:
- # assert isinstance(id_, str)
- # l_str: list[str] = [id_ for id_ in id_list
- # if isinstance(id_, str)]
- # return l_str
- # return []
-
- @staticmethod
- def as_refs(items: list[dict[str, object]]
- ) -> dict[str, dict[str, object]]:
+ def _as_refs(items: list[dict[str, object]]
+ ) -> dict[str, dict[str, object]]:
"""Return dictionary of items by their 'id' fields."""
refs = {}
for item in items:
refs[str(item['id'])] = item
return refs
+ @staticmethod
+ def as_ids(items: list[dict[str, Any]]) -> list[int] | list[str]:
+ """Return list of only 'id' fields of items."""
+ return [item['id'] for item in items]
+
+ @staticmethod
+ def day_as_dict(date: str, comment: str = '') -> dict[str, object]:
+ """Return JSON of Day to expect."""
+ return {'id': date, 'comment': comment, 'todos': []}
+
+ def set_day_from_post(self, date: str, d: dict[str, Any]) -> None:
+ """Set Day of date in library based on POST dict d."""
+ day = self.day_as_dict(date)
+ for k, v in d.items():
+ if 'day_comment' == k:
+ day['comment'] = v
+ elif 'new_todo' == k:
+ next_id = 1
+ for todo in self.lib_all('Todo'):
+ if next_id <= todo['id']:
+ next_id = todo['id'] + 1
+ for proc_id in sorted(v):
+ todo = self.todo_as_dict(next_id, proc_id, date)
+ self.lib_set('Todo', [todo])
+ next_id += 1
+ elif 'done' == k:
+ for todo_id in v:
+ self.lib_get('Todo', todo_id)['is_done'] = True
+ elif 'todo_id' == k:
+ for i, todo_id in enumerate(v):
+ t = self.lib_get('Todo', todo_id)
+ if 'comment' in d:
+ t['comment'] = d['comment'][i]
+ if 'effort' in d:
+ effort = d['effort'][i] if d['effort'][i] else None
+ t['effort'] = effort
+ self.lib_set('Day', [day])
+
@staticmethod
def cond_as_dict(id_: int = 1,
is_active: bool = False,
- titles: None | list[str] = None,
- descriptions: None | list[str] = None
+ title: None | str = None,
+ description: None | str = None,
) -> dict[str, object]:
"""Return JSON of Condition to expect."""
- d = {'id': id_,
- 'is_active': is_active,
- '_versioned': {
- 'title': {},
- 'description': {}}}
- titles = titles if titles else []
- descriptions = descriptions if descriptions else []
- assert isinstance(d['_versioned'], dict)
- for i, title in enumerate(titles):
- d['_versioned']['title'][i] = title
- for i, description in enumerate(descriptions):
- d['_versioned']['description'][i] = description
- return d
-
- @staticmethod
- def procstep_as_dict(id_: int,
- owner_id: int,
- step_process_id: int,
- parent_step_id: int | None = None
- ) -> dict[str, object]:
- """Return JSON of Process to expect."""
- return {'id': id_,
- 'owner_id': owner_id,
- 'step_process_id': step_process_id,
- 'parent_step_id': parent_step_id}
+ versioned: dict[str, dict[str, object]]
+ versioned = {'title': {}, 'description': {}}
+ if title is not None:
+ versioned['title']['0'] = title
+ if description is not None:
+ versioned['description']['0'] = description
+ return {'id': id_, 'is_active': is_active, '_versioned': versioned}
+
+ def set_cond_from_post(self, id_: int, d: dict[str, Any]) -> None:
+ """Set Condition of id_ in library based on POST dict d."""
+ if d == {'delete': ''}:
+ self.lib_del('Condition', id_)
+ return
+ cond = self.lib_get('Condition', id_)
+ if cond:
+ cond['is_active'] = d['is_active']
+ for category in ['title', 'description']:
+ if category in cond['_versioned']:
+ history = cond['_versioned'][category]
+ if len(history) > 0:
+ last_i = sorted([int(k) for k in history.keys()])[-1]
+ if d[category] != history[str(last_i)]:
+ history[str(last_i + 1)] = d[category]
+ continue
+ cond['_versioned'][category]['0'] = d[category]
+ else:
+ cond = self.cond_as_dict(
+ id_, d['is_active'], d['title'], d['description'])
+ self.lib_set('Condition', [cond])
@staticmethod
def todo_as_dict(id_: int = 1,
'enables': enables if enables else []}
return d
+ def set_todo_from_post(self, id_: int, d: dict[str, Any]) -> None:
+ """Set Todo of id_ in library based on POST dict d."""
+ corrected_kwargs: dict[str, Any] = {}
+ for k, v in d.items():
+ if k in {'adopt', 'step_filler'}:
+ if 'children' not in corrected_kwargs:
+ corrected_kwargs['children'] = []
+ new_children = v if isinstance(v, list) else [v]
+ corrected_kwargs['children'] += new_children
+ continue
+ if 'done' == k:
+ k = 'is_done'
+ if k in {'is_done', 'calendarize'}:
+ v = True
+ corrected_kwargs[k] = v
+ todo = self.todo_as_dict(id_, **corrected_kwargs)
+ self.lib_set('Todo', [todo])
+
+ @staticmethod
+ def procstep_as_dict(id_: int,
+ owner_id: int,
+ step_process_id: int,
+ parent_step_id: int | None = None
+ ) -> dict[str, object]:
+ """Return JSON of ProcessStep to expect."""
+ return {'id': id_,
+ 'owner_id': owner_id,
+ 'step_process_id': step_process_id,
+ 'parent_step_id': parent_step_id}
+
@staticmethod
def proc_as_dict(id_: int = 1,
title: str = 'A',
'suppressed_steps': [],
'explicit_steps': explicit_steps if explicit_steps else [],
'_versioned': {
- 'title': {0: title},
- 'description': {0: description},
- 'effort': {0: effort}},
+ 'title': {'0': title},
+ 'description': {'0': description},
+ 'effort': {'0': effort}},
'conditions': conditions if conditions else [],
'disables': disables if disables else [],
'enables': enables if enables else [],
'blockers': blockers if blockers else []}
return d
+ def set_proc_from_post(self, id_: int, d: dict[str, Any]) -> None:
+ """Set Process of id_ in library based on POST dict d."""
+ proc = self.lib_get('Process', id_)
+ if proc:
+ for k in ['title', 'description', 'effort']:
+ last_i = sorted(proc['_versioned'][k].keys())[-1]
+ if d[k] != proc['_versioned'][k][last_i]:
+ proc['_versioned'][k][last_i + 1] = d[k]
+ else:
+ proc = self.proc_as_dict(id_,
+ d['title'], d['description'], d['effort'])
+ ignore = {'title', 'description', 'effort', 'new_top_step', 'step_of',
+ 'keep_step', 'steps'}
+ for k, v in d.items():
+ if k in ignore or k.startswith('step_'):
+ continue
+ if k in {'calendarize'}:
+ v = True
+ elif k in {'suppressed_steps', 'explicit_steps', 'conditions',
+ 'disables', 'enables', 'blockers'}:
+ if not isinstance(v, list):
+ v = [v]
+ proc[k] = v
+ self.lib_set('Process', [proc])
+
+
+class TestCaseWithServer(TestCaseWithDB):
+ """Module tests against our HTTP server/handler (and database)."""
+
+ def setUp(self) -> None:
+ super().setUp()
+ self.httpd = TaskServer(self.db_file, ('localhost', 0), TaskHandler)
+ self.server_thread = Thread(target=self.httpd.serve_forever)
+ self.server_thread.daemon = True
+ self.server_thread.start()
+ self.conn = HTTPConnection(str(self.httpd.server_address[0]),
+ self.httpd.server_address[1])
+ self.httpd.render_mode = 'json'
+
+ def tearDown(self) -> None:
+ self.httpd.shutdown()
+ self.httpd.server_close()
+ self.server_thread.join()
+ super().tearDown()
+
+ def post_exp_cond(self,
+ exps: list[Expected],
+ id_: int,
+ payload: dict[str, object],
+ path_suffix: str = '',
+ redir_suffix: str = ''
+ ) -> None:
+ """POST /condition(s), appropriately update Expecteds."""
+ # pylint: disable=too-many-arguments
+ path = f'/condition{path_suffix}'
+ redir = f'/condition{redir_suffix}'
+ self.check_post(payload, path, redir=redir)
+ for exp in exps:
+ exp.set_cond_from_post(id_, payload)
+
+ def post_exp_day(self,
+ exps: list[Expected],
+ payload: dict[str, Any],
+ date: str = '2024-01-01'
+ ) -> None:
+ """POST /day, appropriately update Expecteds."""
+ if 'make_type' not in payload:
+ payload['make_type'] = 'empty'
+ if 'day_comment' not in payload:
+ payload['day_comment'] = ''
+ target = f'/day?date={date}'
+ redir_to = f'{target}&make_type={payload["make_type"]}'
+ self.check_post(payload, target, 302, redir_to)
+ for exp in exps:
+ exp.set_day_from_post(date, payload)
+
+ def post_exp_process(self,
+ exps: list[Expected],
+ payload: dict[str, Any],
+ id_: int,
+ ) -> dict[str, object]:
+ """POST /process, appropriately update Expecteds."""
+ if 'title' not in payload:
+ payload['title'] = 'foo'
+ if 'description' not in payload:
+ payload['description'] = 'foo'
+ if 'effort' not in payload:
+ payload['effort'] = 1.1
+ self.check_post(payload, f'/process?id={id_}',
+ redir=f'/process?id={id_}')
+ for exp in exps:
+ exp.set_proc_from_post(id_, payload)
+ return payload
+
def check_redirect(self, target: str) -> None:
"""Check that self.conn answers with a 302 redirect to target."""
response = self.conn.getresponse()
self.check_get(f'/{path}?id=0', 500)
self.check_get(f'{path}?id=1', 200)
- def post_process(self, id_: int = 1,
- form_data: dict[str, Any] | None = None
- ) -> dict[str, Any]:
- """POST basic Process."""
- if not form_data:
- form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1}
- self.check_post(form_data, f'/process?id={id_}',
- redir=f'/process?id={id_}')
- return form_data
-
- def check_json_get(self, path: str, expected: dict[str, object]) -> None:
+ def check_json_get(self, path: str, expected: Expected) -> None:
"""Compare JSON on GET path with expected.
To simplify comparison of VersionedAttribute histories, transforms
def rewrite_history_keys_in(item: Any) -> Any:
if isinstance(item, dict):
if '_versioned' in item.keys():
- for k in item['_versioned']:
- vals = item['_versioned'][k].values()
+ for category in item['_versioned']:
+ vals = item['_versioned'][category].values()
history = {}
for i, val in enumerate(vals):
- history[i] = val
- item['_versioned'][k] = history
- for k in list(item.keys()):
- rewrite_history_keys_in(item[k])
+ history[str(i)] = val
+ item['_versioned'][category] = history
+ for category in list(item.keys()):
+ rewrite_history_keys_in(item[category])
elif isinstance(item, list):
item[:] = [rewrite_history_keys_in(i) for i in item]
return item
+ def walk_diffs(path: str, cmp1: object, cmp2: object) -> None:
+ # pylint: disable=too-many-branches
+ def warn(intro: str, val: object) -> None:
+ if isinstance(val, (str, int, float)):
+ print(intro, val)
+ else:
+ print(intro)
+ pprint(val)
+ if cmp1 != cmp2:
+ if isinstance(cmp1, dict) and isinstance(cmp2, dict):
+ for k, v in cmp1.items():
+ if k not in cmp2:
+ warn(f'DIFF {path}: retrieved lacks {k}', v)
+ elif v != cmp2[k]:
+ walk_diffs(f'{path}:{k}', v, cmp2[k])
+ for k in [k for k in cmp2.keys() if k not in cmp1]:
+ warn(f'DIFF {path}: expected lacks retrieved\'s {k}',
+ cmp2[k])
+ elif isinstance(cmp1, list) and isinstance(cmp2, list):
+ for i, v1 in enumerate(cmp1):
+ if i >= len(cmp2):
+ warn(f'DIFF {path}[{i}] retrieved misses:', v1)
+ elif v1 != cmp2[i]:
+ walk_diffs(f'{path}[{i}]', v1, cmp2[i])
+ if len(cmp2) > len(cmp1):
+ for i, v2 in enumerate(cmp2[len(cmp1):]):
+ warn(f'DIFF {path}[{len(cmp1)+i}] misses:', v2)
+ else:
+ warn(f'DIFF {path} – for expected:', cmp1)
+ warn('… and for retrieved:', cmp2)
+
self.conn.request('GET', path)
response = self.conn.getresponse()
self.assertEqual(response.status, 200)
retrieved = json_loads(response.read().decode())
rewrite_history_keys_in(retrieved)
- # import pprint
- # pprint.pprint(expected)
- # pprint.pprint(retrieved)
- self.assertEqual(expected, retrieved)
+ cmp = expected.as_dict
+ try:
+ self.assertEqual(cmp, retrieved)
+ except AssertionError as e:
+ print('EXPECTED:')
+ pprint(cmp)
+ print('RETRIEVED:')
+ pprint(retrieved)
+ walk_diffs('', cmp, retrieved)
+ raise e