X-Git-Url: https://plomlompom.com/repos/?a=blobdiff_plain;f=tests%2Fprocesses.py;h=9413822291a87439aafda0cd39459c93b3905661;hb=8e1a5416151dbcf506f2435823362e21d85aed2d;hp=02f664477fa5147b0396538e9a63263dca9ce73a;hpb=13845c83a9e3e107aa7c40e86d8a0cda1a317f8a;p=plomtask diff --git a/tests/processes.py b/tests/processes.py index 02f6644..9413822 100644 --- a/tests/processes.py +++ b/tests/processes.py @@ -1,8 +1,9 @@ """Test Processes module.""" from unittest import TestCase from tests.utils import TestCaseWithDB, TestCaseWithServer -from plomtask.processes import Process -from plomtask.exceptions import NotFoundException, BadFormatException +from plomtask.processes import Process, ProcessStep, ProcessStepsNode +from plomtask.conditions import Condition +from plomtask.exceptions import NotFoundException, HandledException class TestsSansDB(TestCase): @@ -14,41 +15,98 @@ class TestsSansDB(TestCase): self.assertEqual(Process(None).description.newest, '') self.assertEqual(Process(None).effort.newest, 1.0) + def test_Process_legal_ID(self) -> None: + """Test Process cannot be instantiated with id_=0.""" + with self.assertRaises(HandledException): + Process(0) + class TestsWithDB(TestCaseWithDB): """Mdule tests not requiring DB setup.""" - def test_Process_save(self) -> None: - """Test Process.save().""" - p_saved = Process(None) - p_saved.save(self.db_conn) - self.assertEqual(p_saved.id_, + def setUp(self) -> None: + super().setUp() + self.proc1 = Process(None) + self.proc1.save(self.db_conn) + self.proc2 = Process(None) + self.proc2.save(self.db_conn) + self.proc3 = Process(None) + self.proc3.save(self.db_conn) + + def test_Process_ids(self) -> None: + """Test Process.save() re Process.id_.""" + self.assertEqual(self.proc1.id_, Process.by_id(self.db_conn, 1, create=False).id_) - with self.assertRaises(BadFormatException): - p_saved = Process(0) - p_saved = Process(5) - p_saved.save(self.db_conn) - self.assertEqual(p_saved.id_, + self.assertEqual(self.proc2.id_, + Process.by_id(self.db_conn, 2, create=False).id_) + proc5 = Process(5) + proc5.save(self.db_conn) + self.assertEqual(proc5.id_, Process.by_id(self.db_conn, 5, create=False).id_) - p_saved.title.set('named') - p_loaded = Process.by_id(self.db_conn, p_saved.id_) - self.assertNotEqual(p_saved.title.history, p_loaded.title.history) - p_saved.save(self.db_conn) - p_loaded = Process.by_id(self.db_conn, p_saved.id_) - self.assertEqual(p_saved.title.history, p_loaded.title.history) - p_9 = Process(9) - p_9.child_ids = [4] - with self.assertRaises(NotFoundException): - p_9.save(self.db_conn) - p_9.child_ids = [5] - p_9.save(self.db_conn) - p_5 = Process.by_id(self.db_conn, 5) - p_5.child_ids = [1] - p_5.save(self.db_conn) - p_1 = Process.by_id(self.db_conn, 1) - p_1.child_ids = [9] - with self.assertRaises(BadFormatException): - p_1.save(self.db_conn) + + def test_Process_steps(self) -> None: + """Test addition, nesting, and non-recursion of ProcessSteps""" + def add_step(proc: Process, + steps_proc: list[tuple[int | None, int, int | None]], + step_tuple: tuple[int | None, int, int | None], + expected_id: int) -> None: + steps_proc += [step_tuple] + proc.set_steps(self.db_conn, steps_proc) + steps_proc[-1] = (expected_id, step_tuple[1], step_tuple[2]) + assert isinstance(self.proc2.id_, int) + assert isinstance(self.proc3.id_, int) + steps_proc1: list[tuple[int | None, int, int | None]] = [] + add_step(self.proc1, steps_proc1, (None, self.proc2.id_, None), 1) + p_1_dict: dict[int, ProcessStepsNode] = {} + p_1_dict[1] = ProcessStepsNode(self.proc2, None, True, {}, False) + self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) + add_step(self.proc1, steps_proc1, (None, self.proc3.id_, None), 2) + step_2 = self.proc1.explicit_steps[-1] + assert isinstance(step_2.id_, int) + p_1_dict[2] = ProcessStepsNode(self.proc3, None, True, {}, False) + self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) + steps_proc2: list[tuple[int | None, int, int | None]] = [] + add_step(self.proc2, steps_proc2, (None, self.proc3.id_, None), 3) + p_1_dict[1].steps[3] = ProcessStepsNode(self.proc3, None, + False, {}, False) + self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) + add_step(self.proc1, steps_proc1, (None, self.proc2.id_, step_2.id_), + 4) + step_3 = ProcessStepsNode(self.proc3, None, False, {}, True) + p_1_dict[2].steps[4] = ProcessStepsNode(self.proc2, step_2.id_, True, + {3: step_3}, False) + self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) + add_step(self.proc1, steps_proc1, (None, self.proc3.id_, 999), 5) + p_1_dict[5] = ProcessStepsNode(self.proc3, None, True, {}, False) + self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) + add_step(self.proc1, steps_proc1, (None, self.proc3.id_, 3), 6) + p_1_dict[6] = ProcessStepsNode(self.proc3, None, True, {}, False) + self.assertEqual(self.proc1.get_steps(self.db_conn, None), + p_1_dict) + self.assertEqual(self.proc1.used_as_step_by(self.db_conn), + []) + self.assertEqual(self.proc2.used_as_step_by(self.db_conn), + [self.proc1]) + self.assertEqual(self.proc3.used_as_step_by(self.db_conn), + [self.proc1, self.proc2]) + + def test_Process_conditions(self) -> None: + """Test setting Process.conditions/enables/disables.""" + for target in ('conditions', 'enables', 'disables'): + c1 = Condition(None, False) + c1.save(self.db_conn) + assert isinstance(c1.id_, int) + c2 = Condition(None, False) + c2.save(self.db_conn) + assert isinstance(c2.id_, int) + self.proc1.set_conditions(self.db_conn, [], target) + self.assertEqual(getattr(self.proc1, target), []) + self.proc1.set_conditions(self.db_conn, [c1.id_], target) + self.assertEqual(getattr(self.proc1, target), [c1]) + self.proc1.set_conditions(self.db_conn, [c2.id_], target) + self.assertEqual(getattr(self.proc1, target), [c2]) + self.proc1.set_conditions(self.db_conn, [c1.id_, c2.id_], target) + self.assertEqual(getattr(self.proc1, target), [c1, c2]) def test_Process_by_id(self) -> None: """Test Process.by_id().""" @@ -56,32 +114,51 @@ class TestsWithDB(TestCaseWithDB): Process.by_id(self.db_conn, None, create=False) with self.assertRaises(NotFoundException): Process.by_id(self.db_conn, 0, create=False) - with self.assertRaises(NotFoundException): - Process.by_id(self.db_conn, 1, create=False) - self.assertNotEqual(Process(1).id_, + self.assertNotEqual(self.proc1.id_, Process.by_id(self.db_conn, None, create=True).id_) - self.assertEqual(Process(1).id_, - Process.by_id(self.db_conn, 1, create=True).id_) self.assertEqual(Process(2).id_, Process.by_id(self.db_conn, 2, create=True).id_) def test_Process_all(self) -> None: """Test Process.all().""" - p_1 = Process(None) - p_1.save(self.db_conn) - p_2 = Process(None) - p_2.save(self.db_conn) - self.assertEqual({p_1.id_, p_2.id_}, + self.assertEqual({self.proc1.id_, self.proc2.id_, self.proc3.id_}, set(p.id_ for p in Process.all(self.db_conn))) + def test_ProcessStep_singularity(self) -> None: + """Test pointers made for single object keep pointing to it.""" + assert isinstance(self.proc2.id_, int) + self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)]) + step = self.proc1.explicit_steps[-1] + assert isinstance(step.id_, int) + step_retrieved = ProcessStep.by_id(self.db_conn, step.id_) + step.parent_step_id = 99 + self.assertEqual(step.parent_step_id, step_retrieved.parent_step_id) + + def test_Process_singularity(self) -> None: + """Test pointers made for single object keep pointing to it.""" + assert isinstance(self.proc1.id_, int) + assert isinstance(self.proc2.id_, int) + self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)]) + p_retrieved = Process.by_id(self.db_conn, self.proc1.id_) + self.assertEqual(self.proc1.explicit_steps, p_retrieved.explicit_steps) + + def test_Process_versioned_attributes_singularity(self) -> None: + """Test behavior of VersionedAttributes on saving (with .title).""" + assert isinstance(self.proc1.id_, int) + self.proc1.title.set('named') + p_loaded = Process.by_id(self.db_conn, self.proc1.id_) + self.assertEqual(self.proc1.title.history, p_loaded.title.history) + class TestsWithServer(TestCaseWithServer): """Module tests against our HTTP server/handler (and database).""" def test_do_POST_process(self) -> None: """Test POST /process and its effect on the database.""" + self.assertEqual(0, len(Process.all(self.db_conn))) form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1} self.check_post(form_data, '/process?id=', 302, '/') + self.assertEqual(1, len(Process.all(self.db_conn))) self.check_post(form_data, '/process?id=FOO', 400) form_data['effort'] = 'foo' self.check_post(form_data, '/process?id=', 400) @@ -92,35 +169,24 @@ class TestsWithServer(TestCaseWithServer): self.check_post(form_data, '/process?id=', 400) form_data = {'description': '', 'effort': 1.0} self.check_post(form_data, '/process?id=', 400) - form_data = {'title': '', 'description': '', - 'effort': 1.1, 'children': [1]} + self.assertEqual(1, len(Process.all(self.db_conn))) + form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.0, + 'condition': []} + self.check_post(form_data, '/process?id=', 302, '/') + form_data['condition'] = [1] + self.check_post(form_data, '/process?id=', 404) + form_data_cond = {'title': 'foo', 'description': 'foo'} + self.check_post(form_data_cond, '/condition', 302, '/') + self.check_post(form_data, '/process?id=', 302, '/') + form_data['disables'] = [1] + self.check_post(form_data, '/process?id=', 302, '/') + form_data['enables'] = [1] self.check_post(form_data, '/process?id=', 302, '/') - form_data['children'] = 1.1 - self.check_post(form_data, '/process?id=', 400) - form_data['children'] = 'a' - self.check_post(form_data, '/process?id=', 400) - form_data['children'] = [1.2] - self.check_post(form_data, '/process?id=', 400) - form_data['children'] = ['b'] - self.check_post(form_data, '/process?id=', 400) - form_data['children'] = [2] - self.check_post(form_data, '/process?id=1', 400) - retrieved_1 = Process.by_id(self.db_conn, 1) - self.assertEqual(retrieved_1.title.newest, 'foo') - self.assertEqual(retrieved_1.child_ids, []) - retrieved_2 = Process.by_id(self.db_conn, 2) - self.assertEqual(retrieved_2.child_ids, [1]) - form_data = {'title': 'bar', 'description': 'bar', 'effort': 1.1} - self.check_post(form_data, '/process?id=1', 302, '/') - retrieved_1 = Process.by_id(self.db_conn, 1) - self.assertEqual(retrieved_1.title.newest, 'bar') - self.assertEqual([p.id_ for p in Process.all(self.db_conn)], - [retrieved_1.id_, retrieved_2.id_]) def test_do_GET(self) -> None: """Test /process and /processes response codes.""" self.check_get('/process', 200) self.check_get('/process?id=', 200) - self.check_get('/process?id=0', 400) + self.check_get('/process?id=0', 500) self.check_get('/process?id=FOO', 400) self.check_get('/processes', 200)