X-Git-Url: https://plomlompom.com/repos/?a=blobdiff_plain;f=tests%2Fprocesses.py;h=127a3f680d4a674030b1be83756f12921a78d4e5;hb=77e9d9dfa5681bc32e8aec8558d0fa449805e3ac;hp=3d861b752a92ae88888039764fdbb7351decd3a6;hpb=c6ad5c1fe2811225648a1d1c74c667ed38b7671e;p=plomtask diff --git a/tests/processes.py b/tests/processes.py index 3d861b7..127a3f6 100644 --- a/tests/processes.py +++ b/tests/processes.py @@ -1,49 +1,85 @@ """Test Processes module.""" -from unittest import TestCase -from tests.utils import TestCaseWithDB, TestCaseWithServer +from tests.utils import TestCaseWithDB, TestCaseWithServer, TestCaseSansDB from plomtask.processes import Process, ProcessStep, ProcessStepsNode from plomtask.conditions import Condition +from plomtask.exceptions import HandledException, NotFoundException from plomtask.todos import Todo -from plomtask.exceptions import NotFoundException, HandledException -class TestsSansDB(TestCase): +class TestsSansDB(TestCaseSansDB): """Module tests not requiring DB setup.""" + checked_class = Process + do_id_test = True + versioned_defaults_to_test = {'title': 'UNNAMED', 'description': '', + 'effort': 1.0} - def test_Process_versioned_defaults(self) -> None: - """Test defaults of Process' VersionedAttributes.""" - self.assertEqual(Process(None).title.newest, 'UNNAMED') - self.assertEqual(Process(None).description.newest, '') - self.assertEqual(Process(None).effort.newest, 1.0) - def test_Process_legal_ID(self) -> None: - """Test Process cannot be instantiated with id_=0.""" - with self.assertRaises(HandledException): - Process(0) +class TestsSansDBProcessStep(TestCaseSansDB): + """Module tests not requiring DB setup.""" + checked_class = ProcessStep + do_id_test = True + default_init_args = [2, 3, 4] class TestsWithDB(TestCaseWithDB): - """Mdule tests not requiring DB setup.""" - - def setUp(self) -> None: - super().setUp() - self.proc1 = Process(None) - self.proc1.save(self.db_conn) - self.proc2 = Process(None) - self.proc2.save(self.db_conn) - self.proc3 = Process(None) - self.proc3.save(self.db_conn) - - def test_Process_ids(self) -> None: - """Test Process.save() re Process.id_.""" - self.assertEqual(self.proc1.id_, - Process.by_id(self.db_conn, 1, create=False).id_) - self.assertEqual(self.proc2.id_, - Process.by_id(self.db_conn, 2, create=False).id_) - proc5 = Process(5) - proc5.save(self.db_conn) - self.assertEqual(proc5.id_, - Process.by_id(self.db_conn, 5, create=False).id_) + """Module tests requiring DB setup.""" + checked_class = Process + test_versioneds = {'title': str, 'description': str, 'effort': float} + + def three_processes(self) -> tuple[Process, Process, Process]: + """Return three saved processes.""" + p1, p2, p3 = Process(None), Process(None), Process(None) + for p in [p1, p2, p3]: + p.save(self.db_conn) + return p1, p2, p3 + + def p_of_conditions(self) -> tuple[Process, list[Condition], + list[Condition], list[Condition]]: + """Return Process and its three Condition sets.""" + p = Process(None) + c1, c2, c3 = Condition(None), Condition(None), Condition(None) + for c in [c1, c2, c3]: + c.save(self.db_conn) + assert isinstance(c1.id_, int) + assert isinstance(c2.id_, int) + assert isinstance(c3.id_, int) + set_1 = [c1, c2] + set_2 = [c2, c3] + set_3 = [c1, c3] + p.set_conditions(self.db_conn, [c.id_ for c in set_1 + if isinstance(c.id_, int)]) + p.set_enables(self.db_conn, [c.id_ for c in set_2 + if isinstance(c.id_, int)]) + p.set_disables(self.db_conn, [c.id_ for c in set_3 + if isinstance(c.id_, int)]) + p.save(self.db_conn) + return p, set_1, set_2, set_3 + + def test_Process_conditions_saving(self) -> None: + """Test .save/.save_core.""" + p, set1, set2, set3 = self.p_of_conditions() + p.uncache() + r = Process.by_id(self.db_conn, p.id_) + self.assertEqual(sorted(r.conditions), sorted(set1)) + self.assertEqual(sorted(r.enables), sorted(set2)) + self.assertEqual(sorted(r.disables), sorted(set3)) + + def test_Process_from_table_row(self) -> None: + """Test .from_table_row() properly reads in class from DB""" + self.check_from_table_row() + self.check_versioned_from_table_row('title', str) + self.check_versioned_from_table_row('description', str) + self.check_versioned_from_table_row('effort', float) + p, set1, set2, set3 = self.p_of_conditions() + p.save(self.db_conn) + assert isinstance(p.id_, int) + for row in self.db_conn.row_where(self.checked_class.table_name, + 'id', p.id_): + # pylint: disable=no-member + r = Process.from_table_row(self.db_conn, row) + self.assertEqual(sorted(r.conditions), sorted(set1)) + self.assertEqual(sorted(r.enables), sorted(set2)) + self.assertEqual(sorted(r.disables), sorted(set3)) def test_Process_steps(self) -> None: """Test addition, nesting, and non-recursion of ProcessSteps""" @@ -54,133 +90,128 @@ class TestsWithDB(TestCaseWithDB): steps_proc += [step_tuple] proc.set_steps(self.db_conn, steps_proc) steps_proc[-1] = (expected_id, step_tuple[1], step_tuple[2]) - assert isinstance(self.proc2.id_, int) - assert isinstance(self.proc3.id_, int) - steps_proc1: list[tuple[int | None, int, int | None]] = [] - add_step(self.proc1, steps_proc1, (None, self.proc2.id_, None), 1) - p_1_dict: dict[int, ProcessStepsNode] = {} - p_1_dict[1] = ProcessStepsNode(self.proc2, None, True, {}, False) - self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) - add_step(self.proc1, steps_proc1, (None, self.proc3.id_, None), 2) - step_2 = self.proc1.explicit_steps[-1] - assert isinstance(step_2.id_, int) - p_1_dict[2] = ProcessStepsNode(self.proc3, None, True, {}, False) - self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) - steps_proc2: list[tuple[int | None, int, int | None]] = [] - add_step(self.proc2, steps_proc2, (None, self.proc3.id_, None), 3) - p_1_dict[1].steps[3] = ProcessStepsNode(self.proc3, None, - False, {}, False) - self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) - add_step(self.proc1, steps_proc1, (None, self.proc2.id_, step_2.id_), - 4) - step_3 = ProcessStepsNode(self.proc3, None, False, {}, True) - p_1_dict[2].steps[4] = ProcessStepsNode(self.proc2, step_2.id_, True, - {3: step_3}, False) - self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) - add_step(self.proc1, steps_proc1, (None, self.proc3.id_, 999), 5) - p_1_dict[5] = ProcessStepsNode(self.proc3, None, True, {}, False) - self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) - add_step(self.proc1, steps_proc1, (None, self.proc3.id_, 3), 6) - p_1_dict[6] = ProcessStepsNode(self.proc3, None, True, {}, False) - self.assertEqual(self.proc1.get_steps(self.db_conn, None), - p_1_dict) - self.assertEqual(self.proc1.used_as_step_by(self.db_conn), - []) - self.assertEqual(self.proc2.used_as_step_by(self.db_conn), - [self.proc1]) - self.assertEqual(self.proc3.used_as_step_by(self.db_conn), - [self.proc1, self.proc2]) + p1, p2, p3 = self.three_processes() + assert isinstance(p1.id_, int) + assert isinstance(p2.id_, int) + assert isinstance(p3.id_, int) + steps_p1: list[tuple[int | None, int, int | None]] = [] + add_step(p1, steps_p1, (None, p2.id_, None), 1) + p1_dict: dict[int, ProcessStepsNode] = {} + p1_dict[1] = ProcessStepsNode(p2, None, True, {}, False) + self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + add_step(p1, steps_p1, (None, p3.id_, None), 2) + step_2 = p1.explicit_steps[-1] + p1_dict[2] = ProcessStepsNode(p3, None, True, {}, False) + self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + steps_p2: list[tuple[int | None, int, int | None]] = [] + add_step(p2, steps_p2, (None, p3.id_, None), 3) + p1_dict[1].steps[3] = ProcessStepsNode(p3, None, False, {}, False) + self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + add_step(p1, steps_p1, (None, p2.id_, step_2.id_), 4) + step_3 = ProcessStepsNode(p3, None, False, {}, True) + p1_dict[2].steps[4] = ProcessStepsNode(p2, step_2.id_, True, + {3: step_3}, False) + self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + add_step(p1, steps_p1, (None, p3.id_, 999), 5) + p1_dict[5] = ProcessStepsNode(p3, None, True, {}, False) + self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + add_step(p1, steps_p1, (None, p3.id_, 3), 6) + p1_dict[6] = ProcessStepsNode(p3, None, True, {}, False) + self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + self.assertEqual(p1.used_as_step_by(self.db_conn), []) + self.assertEqual(p2.used_as_step_by(self.db_conn), [p1]) + self.assertEqual(p3.used_as_step_by(self.db_conn), [p1, p2]) def test_Process_conditions(self) -> None: """Test setting Process.conditions/enables/disables.""" + p = Process(None) + p.save(self.db_conn) for target in ('conditions', 'enables', 'disables'): - c1 = Condition(None, False) + method = getattr(p, f'set_{target}') + c1, c2 = Condition(None), Condition(None) c1.save(self.db_conn) - assert isinstance(c1.id_, int) - c2 = Condition(None, False) c2.save(self.db_conn) + assert isinstance(c1.id_, int) assert isinstance(c2.id_, int) - self.proc1.set_conditions(self.db_conn, [], target) - self.assertEqual(getattr(self.proc1, target), []) - self.proc1.set_conditions(self.db_conn, [c1.id_], target) - self.assertEqual(getattr(self.proc1, target), [c1]) - self.proc1.set_conditions(self.db_conn, [c2.id_], target) - self.assertEqual(getattr(self.proc1, target), [c2]) - self.proc1.set_conditions(self.db_conn, [c1.id_, c2.id_], target) - self.assertEqual(getattr(self.proc1, target), [c1, c2]) + method(self.db_conn, []) + self.assertEqual(getattr(p, target), []) + method(self.db_conn, [c1.id_]) + self.assertEqual(getattr(p, target), [c1]) + method(self.db_conn, [c2.id_]) + self.assertEqual(getattr(p, target), [c2]) + method(self.db_conn, [c1.id_, c2.id_]) + self.assertEqual(getattr(p, target), [c1, c2]) def test_Process_by_id(self) -> None: - """Test Process.by_id().""" - with self.assertRaises(NotFoundException): - Process.by_id(self.db_conn, None, create=False) - with self.assertRaises(NotFoundException): - Process.by_id(self.db_conn, 0, create=False) - self.assertNotEqual(self.proc1.id_, - Process.by_id(self.db_conn, None, create=True).id_) - self.assertEqual(Process(2).id_, - Process.by_id(self.db_conn, 2, create=True).id_) + """Test .by_id(), including creation""" + self.check_by_id() def test_Process_all(self) -> None: - """Test Process.all().""" - self.assertEqual({self.proc1.id_, self.proc2.id_, self.proc3.id_}, - set(p.id_ for p in Process.all(self.db_conn))) - - def test_ProcessStep_singularity(self) -> None: - """Test pointers made for single object keep pointing to it.""" - assert isinstance(self.proc2.id_, int) - self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)]) - step = self.proc1.explicit_steps[-1] - assert isinstance(step.id_, int) - step_retrieved = ProcessStep.by_id(self.db_conn, step.id_) - step.parent_step_id = 99 - self.assertEqual(step.parent_step_id, step_retrieved.parent_step_id) + """Test .all().""" + self.check_all() def test_Process_singularity(self) -> None: - """Test pointers made for single object keep pointing to it, and - subsequent retrievals don't overload relations.""" - assert isinstance(self.proc1.id_, int) - assert isinstance(self.proc2.id_, int) - c1 = Condition(None, False) - c1.save(self.db_conn) - assert isinstance(c1.id_, int) - self.proc1.set_conditions(self.db_conn, [c1.id_]) - self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)]) - self.proc1.save(self.db_conn) - p_retrieved = Process.by_id(self.db_conn, self.proc1.id_) - self.assertEqual(self.proc1.explicit_steps, p_retrieved.explicit_steps) - self.assertEqual(self.proc1.conditions, p_retrieved.conditions) - self.proc1.save(self.db_conn) + """Test pointers made for single object keep pointing to it.""" + self.check_singularity('conditions', [Condition(None)]) def test_Process_versioned_attributes_singularity(self) -> None: """Test behavior of VersionedAttributes on saving (with .title).""" - assert isinstance(self.proc1.id_, int) - self.proc1.title.set('named') - p_loaded = Process.by_id(self.db_conn, self.proc1.id_) - self.assertEqual(self.proc1.title.history, p_loaded.title.history) + self.check_versioned_singularity() def test_Process_removal(self) -> None: """Test removal of Processes and ProcessSteps.""" - assert isinstance(self.proc3.id_, int) - self.proc1.remove(self.db_conn) - self.assertEqual({self.proc2.id_, self.proc3.id_}, - set(p.id_ for p in Process.all(self.db_conn))) - self.proc2.set_steps(self.db_conn, [(None, self.proc3.id_, None)]) + self.check_remove() + p1, p2, p3 = self.three_processes() + assert isinstance(p1.id_, int) + assert isinstance(p2.id_, int) + assert isinstance(p3.id_, int) + p2.set_steps(self.db_conn, [(None, p1.id_, None)]) with self.assertRaises(HandledException): - self.proc3.remove(self.db_conn) - self.proc2.explicit_steps[0].remove(self.db_conn) - retrieved = Process.by_id(self.db_conn, self.proc2.id_) - self.assertEqual(retrieved.explicit_steps, []) - self.proc2.set_steps(self.db_conn, [(None, self.proc3.id_, None)]) - step = retrieved.explicit_steps[0] - self.proc2.remove(self.db_conn) + p1.remove(self.db_conn) + step = p2.explicit_steps[0] + p2.set_steps(self.db_conn, []) + with self.assertRaises(NotFoundException): + ProcessStep.by_id(self.db_conn, step.id_) + p1.remove(self.db_conn) + p2.set_steps(self.db_conn, [(None, p3.id_, None)]) + step = p2.explicit_steps[0] + p2.remove(self.db_conn) with self.assertRaises(NotFoundException): ProcessStep.by_id(self.db_conn, step.id_) - todo = Todo(None, self.proc3, False, '2024-01-01') + todo = Todo(None, p3, False, '2024-01-01') todo.save(self.db_conn) with self.assertRaises(HandledException): - self.proc3.remove(self.db_conn) + p3.remove(self.db_conn) todo.remove(self.db_conn) - self.proc3.remove(self.db_conn) + p3.remove(self.db_conn) + + +class TestsWithDBForProcessStep(TestCaseWithDB): + """Module tests requiring DB setup.""" + checked_class = ProcessStep + default_init_kwargs = {'owner_id': 2, 'step_process_id': 3, + 'parent_step_id': 4} + + def test_ProcessStep_from_table_row(self) -> None: + """Test .from_table_row() properly reads in class from DB""" + self.check_from_table_row(2, 3, None) + + def test_ProcessStep_singularity(self) -> None: + """Test pointers made for single object keep pointing to it.""" + self.check_singularity('parent_step_id', 1, 2, 3, None) + + def test_ProcessStep_remove(self) -> None: + """Test .remove and unsetting of owner's .explicit_steps entry.""" + p1 = Process(None) + p2 = Process(None) + p1.save(self.db_conn) + p2.save(self.db_conn) + assert isinstance(p2.id_, int) + p1.set_steps(self.db_conn, [(None, p2.id_, None)]) + step = p1.explicit_steps[0] + step.remove(self.db_conn) + self.assertEqual(p1.explicit_steps, []) + self.check_storage([]) class TestsWithServer(TestCaseWithServer): @@ -212,8 +243,59 @@ class TestsWithServer(TestCaseWithServer): self.check_post(form_data, '/process?id=6', 404) self.check_post(form_data, '/process?id=5', 302, '/processes') + def test_do_POST_process_steps(self) -> None: + """Test behavior of ProcessStep posting.""" + form_data_1 = self.post_process(1) + self.post_process(2) + self.post_process(3) + form_data_1['new_top_step'] = [2] + self.post_process(1, form_data_1) + retrieved_process = Process.by_id(self.db_conn, 1) + self.assertEqual(len(retrieved_process.explicit_steps), 1) + retrieved_step = retrieved_process.explicit_steps[0] + self.assertEqual(retrieved_step.step_process_id, 2) + self.assertEqual(retrieved_step.owner_id, 1) + self.assertEqual(retrieved_step.parent_step_id, None) + form_data_1['new_top_step'] = [] + self.post_process(1, form_data_1) + retrieved_process = Process.by_id(self.db_conn, 1) + self.assertEqual(retrieved_process.explicit_steps, []) + with self.assertRaises(NotFoundException): + ProcessStep.by_id(self.db_conn, retrieved_step.id_) + form_data_1['new_top_step'] = [3] + self.post_process(1, form_data_1) + retrieved_process = Process.by_id(self.db_conn, 1) + retrieved_step = retrieved_process.explicit_steps[0] + self.assertEqual(retrieved_step.step_process_id, 3) + self.assertEqual(retrieved_step.owner_id, 1) + self.assertEqual(retrieved_step.parent_step_id, None) + form_data_1['new_top_step'] = [] + form_data_1['steps'] = [retrieved_step.id_] + self.post_process(1, form_data_1) + retrieved_process = Process.by_id(self.db_conn, 1) + self.assertEqual(retrieved_process.explicit_steps, []) + form_data_1['steps'] = [] + form_data_1['keep_step'] = [retrieved_step.id_] + self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1') + form_data_1['steps'] = [retrieved_step.id_] + form_data_1['keep_step'] = [retrieved_step.id_] + self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1') + form_data_1[f'step_{retrieved_step.id_}_process_id'] = [2] + self.post_process(1, form_data_1) + retrieved_process = Process.by_id(self.db_conn, 1) + self.assertEqual(len(retrieved_process.explicit_steps), 1) + retrieved_step = retrieved_process.explicit_steps[0] + self.assertEqual(retrieved_step.step_process_id, 2) + self.assertEqual(retrieved_step.owner_id, 1) + self.assertEqual(retrieved_step.parent_step_id, None) + form_data_1['new_top_step'] = ['foo'] + form_data_1['steps'] = [] + form_data_1['keep_step'] = [] + self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1') + retrieved_process = Process.by_id(self.db_conn, 1) + self.assertEqual(len(retrieved_process.explicit_steps), 1) + def test_do_GET(self) -> None: """Test /process and /processes response codes.""" - self.post_process() self.check_get_defaults('/process') self.check_get('/processes', 200)