X-Git-Url: https://plomlompom.com/repos/?a=blobdiff_plain;f=tests%2Fprocesses.py;h=9413822291a87439aafda0cd39459c93b3905661;hb=8e1a5416151dbcf506f2435823362e21d85aed2d;hp=87b0b09809398ab1a5aa59af78bf1ac11d8bf1e4;hpb=b3ff25deb388919c9a205ceb1997ff3c42e93bc8;p=plomtask diff --git a/tests/processes.py b/tests/processes.py index 87b0b09..9413822 100644 --- a/tests/processes.py +++ b/tests/processes.py @@ -1,9 +1,9 @@ """Test Processes module.""" from unittest import TestCase -from typing import Any from tests.utils import TestCaseWithDB, TestCaseWithServer -from plomtask.processes import Process -from plomtask.exceptions import NotFoundException, BadFormatException +from plomtask.processes import Process, ProcessStep, ProcessStepsNode +from plomtask.conditions import Condition +from plomtask.exceptions import NotFoundException, HandledException class TestsSansDB(TestCase): @@ -17,92 +17,96 @@ class TestsSansDB(TestCase): def test_Process_legal_ID(self) -> None: """Test Process cannot be instantiated with id_=0.""" - with self.assertRaises(BadFormatException): + with self.assertRaises(HandledException): Process(0) class TestsWithDB(TestCaseWithDB): """Mdule tests not requiring DB setup.""" + def setUp(self) -> None: + super().setUp() + self.proc1 = Process(None) + self.proc1.save(self.db_conn) + self.proc2 = Process(None) + self.proc2.save(self.db_conn) + self.proc3 = Process(None) + self.proc3.save(self.db_conn) + def test_Process_ids(self) -> None: - """Test Process.save_without_steps() re Process.id_.""" - p = Process(None) - p.save_without_steps(self.db_conn) - self.assertEqual(p.id_, + """Test Process.save() re Process.id_.""" + self.assertEqual(self.proc1.id_, Process.by_id(self.db_conn, 1, create=False).id_) - p = Process(None) - p.save_without_steps(self.db_conn) - self.assertEqual(p.id_, + self.assertEqual(self.proc2.id_, Process.by_id(self.db_conn, 2, create=False).id_) - p = Process(5) - p.save_without_steps(self.db_conn) - self.assertEqual(p.id_, + proc5 = Process(5) + proc5.save(self.db_conn) + self.assertEqual(proc5.id_, Process.by_id(self.db_conn, 5, create=False).id_) - def test_Process_versioned_attributes(self) -> None: - """Test behavior of VersionedAttributes on saving (with .title).""" - p = Process(None) - p.save_without_steps(self.db_conn) - p.title.set('named') - p_loaded = Process.by_id(self.db_conn, p.id_) - self.assertNotEqual(p.title.history, p_loaded.title.history) - p.save_without_steps(self.db_conn) - p_loaded = Process.by_id(self.db_conn, p.id_) - self.assertEqual(p.title.history, p_loaded.title.history) - def test_Process_steps(self) -> None: """Test addition, nesting, and non-recursion of ProcessSteps""" - p_1 = Process(1) - p_1.save_without_steps(self.db_conn) - assert p_1.id_ is not None - p_2 = Process(2) - p_2.save_without_steps(self.db_conn) - assert p_2.id_ is not None - p_3 = Process(3) - p_3.save_without_steps(self.db_conn) - assert p_3.id_ is not None - p_1.add_step(self.db_conn, None, p_2.id_, None) - p_1_dict: dict[int, dict[str, Any]] = {1: { - 'process': p_2, 'parent_id': None, - 'is_explicit': True, 'steps': {}, 'seen': False - }} - self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict) - s_b = p_1.add_step(self.db_conn, None, p_3.id_, None) - p_1_dict[2] = { - 'process': p_3, 'parent_id': None, - 'is_explicit': True, 'steps': {}, 'seen': False - } - self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict) - s_c = p_2.add_step(self.db_conn, None, p_3.id_, None) - assert s_c.id_ is not None - p_1_dict[1]['steps'] = {3: { - 'process': p_3, 'parent_id': None, - 'is_explicit': False, 'steps': {}, 'seen': False - }} - self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict) - p_1.add_step(self.db_conn, None, p_2.id_, s_b.id_) - p_1_dict[2]['steps'][4] = { - 'process': p_2, 'parent_id': s_b.id_, 'seen': False, - 'is_explicit': True, 'steps': {3: { - 'process': p_3, 'parent_id': None, - 'is_explicit': False, 'steps': {}, 'seen': True - }}} - self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict) - p_1.add_step(self.db_conn, None, p_3.id_, 999) - p_1_dict[5] = { - 'process': p_3, 'parent_id': None, - 'is_explicit': True, 'steps': {}, 'seen': False - } - self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict) - p_1.add_step(self.db_conn, None, p_3.id_, 3) - p_1_dict[6] = { - 'process': p_3, 'parent_id': None, - 'is_explicit': True, 'steps': {}, 'seen': False - } - self.assertEqual(p_1.get_steps(self.db_conn, None), p_1_dict) - self.assertEqual(p_1.used_as_step_by(self.db_conn), []) - self.assertEqual(p_2.used_as_step_by(self.db_conn), [p_1]) - self.assertEqual(p_3.used_as_step_by(self.db_conn), [p_1, p_2]) + def add_step(proc: Process, + steps_proc: list[tuple[int | None, int, int | None]], + step_tuple: tuple[int | None, int, int | None], + expected_id: int) -> None: + steps_proc += [step_tuple] + proc.set_steps(self.db_conn, steps_proc) + steps_proc[-1] = (expected_id, step_tuple[1], step_tuple[2]) + assert isinstance(self.proc2.id_, int) + assert isinstance(self.proc3.id_, int) + steps_proc1: list[tuple[int | None, int, int | None]] = [] + add_step(self.proc1, steps_proc1, (None, self.proc2.id_, None), 1) + p_1_dict: dict[int, ProcessStepsNode] = {} + p_1_dict[1] = ProcessStepsNode(self.proc2, None, True, {}, False) + self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) + add_step(self.proc1, steps_proc1, (None, self.proc3.id_, None), 2) + step_2 = self.proc1.explicit_steps[-1] + assert isinstance(step_2.id_, int) + p_1_dict[2] = ProcessStepsNode(self.proc3, None, True, {}, False) + self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) + steps_proc2: list[tuple[int | None, int, int | None]] = [] + add_step(self.proc2, steps_proc2, (None, self.proc3.id_, None), 3) + p_1_dict[1].steps[3] = ProcessStepsNode(self.proc3, None, + False, {}, False) + self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) + add_step(self.proc1, steps_proc1, (None, self.proc2.id_, step_2.id_), + 4) + step_3 = ProcessStepsNode(self.proc3, None, False, {}, True) + p_1_dict[2].steps[4] = ProcessStepsNode(self.proc2, step_2.id_, True, + {3: step_3}, False) + self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) + add_step(self.proc1, steps_proc1, (None, self.proc3.id_, 999), 5) + p_1_dict[5] = ProcessStepsNode(self.proc3, None, True, {}, False) + self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) + add_step(self.proc1, steps_proc1, (None, self.proc3.id_, 3), 6) + p_1_dict[6] = ProcessStepsNode(self.proc3, None, True, {}, False) + self.assertEqual(self.proc1.get_steps(self.db_conn, None), + p_1_dict) + self.assertEqual(self.proc1.used_as_step_by(self.db_conn), + []) + self.assertEqual(self.proc2.used_as_step_by(self.db_conn), + [self.proc1]) + self.assertEqual(self.proc3.used_as_step_by(self.db_conn), + [self.proc1, self.proc2]) + + def test_Process_conditions(self) -> None: + """Test setting Process.conditions/enables/disables.""" + for target in ('conditions', 'enables', 'disables'): + c1 = Condition(None, False) + c1.save(self.db_conn) + assert isinstance(c1.id_, int) + c2 = Condition(None, False) + c2.save(self.db_conn) + assert isinstance(c2.id_, int) + self.proc1.set_conditions(self.db_conn, [], target) + self.assertEqual(getattr(self.proc1, target), []) + self.proc1.set_conditions(self.db_conn, [c1.id_], target) + self.assertEqual(getattr(self.proc1, target), [c1]) + self.proc1.set_conditions(self.db_conn, [c2.id_], target) + self.assertEqual(getattr(self.proc1, target), [c2]) + self.proc1.set_conditions(self.db_conn, [c1.id_, c2.id_], target) + self.assertEqual(getattr(self.proc1, target), [c1, c2]) def test_Process_by_id(self) -> None: """Test Process.by_id().""" @@ -110,24 +114,41 @@ class TestsWithDB(TestCaseWithDB): Process.by_id(self.db_conn, None, create=False) with self.assertRaises(NotFoundException): Process.by_id(self.db_conn, 0, create=False) - with self.assertRaises(NotFoundException): - Process.by_id(self.db_conn, 1, create=False) - self.assertNotEqual(Process(1).id_, + self.assertNotEqual(self.proc1.id_, Process.by_id(self.db_conn, None, create=True).id_) - self.assertEqual(Process(1).id_, - Process.by_id(self.db_conn, 1, create=True).id_) self.assertEqual(Process(2).id_, Process.by_id(self.db_conn, 2, create=True).id_) def test_Process_all(self) -> None: """Test Process.all().""" - p_1 = Process(None) - p_1.save_without_steps(self.db_conn) - p_2 = Process(None) - p_2.save_without_steps(self.db_conn) - self.assertEqual({p_1.id_, p_2.id_}, + self.assertEqual({self.proc1.id_, self.proc2.id_, self.proc3.id_}, set(p.id_ for p in Process.all(self.db_conn))) + def test_ProcessStep_singularity(self) -> None: + """Test pointers made for single object keep pointing to it.""" + assert isinstance(self.proc2.id_, int) + self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)]) + step = self.proc1.explicit_steps[-1] + assert isinstance(step.id_, int) + step_retrieved = ProcessStep.by_id(self.db_conn, step.id_) + step.parent_step_id = 99 + self.assertEqual(step.parent_step_id, step_retrieved.parent_step_id) + + def test_Process_singularity(self) -> None: + """Test pointers made for single object keep pointing to it.""" + assert isinstance(self.proc1.id_, int) + assert isinstance(self.proc2.id_, int) + self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)]) + p_retrieved = Process.by_id(self.db_conn, self.proc1.id_) + self.assertEqual(self.proc1.explicit_steps, p_retrieved.explicit_steps) + + def test_Process_versioned_attributes_singularity(self) -> None: + """Test behavior of VersionedAttributes on saving (with .title).""" + assert isinstance(self.proc1.id_, int) + self.proc1.title.set('named') + p_loaded = Process.by_id(self.db_conn, self.proc1.id_) + self.assertEqual(self.proc1.title.history, p_loaded.title.history) + class TestsWithServer(TestCaseWithServer): """Module tests against our HTTP server/handler (and database).""" @@ -149,11 +170,23 @@ class TestsWithServer(TestCaseWithServer): form_data = {'description': '', 'effort': 1.0} self.check_post(form_data, '/process?id=', 400) self.assertEqual(1, len(Process.all(self.db_conn))) + form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.0, + 'condition': []} + self.check_post(form_data, '/process?id=', 302, '/') + form_data['condition'] = [1] + self.check_post(form_data, '/process?id=', 404) + form_data_cond = {'title': 'foo', 'description': 'foo'} + self.check_post(form_data_cond, '/condition', 302, '/') + self.check_post(form_data, '/process?id=', 302, '/') + form_data['disables'] = [1] + self.check_post(form_data, '/process?id=', 302, '/') + form_data['enables'] = [1] + self.check_post(form_data, '/process?id=', 302, '/') def test_do_GET(self) -> None: """Test /process and /processes response codes.""" self.check_get('/process', 200) self.check_get('/process?id=', 200) - self.check_get('/process?id=0', 400) + self.check_get('/process?id=0', 500) self.check_get('/process?id=FOO', 400) self.check_get('/processes', 200)