X-Git-Url: https://plomlompom.com/repos/?a=blobdiff_plain;f=tests%2Fprocesses.py;h=06ed257689b0601e814a074c454cabe6ea0b6c98;hb=8c0cbef8f467d125ba7c987b3eb1f5bef7d38120;hp=e8967f70273e2345df966d599d50fdbeb4069e39;hpb=13bce001bd86de2c20dff2f93d9f08cfae95a16b;p=plomtask diff --git a/tests/processes.py b/tests/processes.py index e8967f7..06ed257 100644 --- a/tests/processes.py +++ b/tests/processes.py @@ -1,9 +1,10 @@ """Test Processes module.""" from unittest import TestCase -from urllib.parse import urlencode from tests.utils import TestCaseWithDB, TestCaseWithServer -from plomtask.processes import Process -from plomtask.exceptions import NotFoundException, BadFormatException +from plomtask.processes import Process, ProcessStep, ProcessStepsNode +from plomtask.conditions import Condition +from plomtask.todos import Todo +from plomtask.exceptions import NotFoundException, HandledException class TestsSansDB(TestCase): @@ -15,28 +16,98 @@ class TestsSansDB(TestCase): self.assertEqual(Process(None).description.newest, '') self.assertEqual(Process(None).effort.newest, 1.0) + def test_Process_legal_ID(self) -> None: + """Test Process cannot be instantiated with id_=0.""" + with self.assertRaises(HandledException): + Process(0) + class TestsWithDB(TestCaseWithDB): """Mdule tests not requiring DB setup.""" - def test_Process_save(self) -> None: - """Test Process.save().""" - p_saved = Process(None) - p_saved.save(self.db_conn) - self.assertEqual(p_saved.id_, + def setUp(self) -> None: + super().setUp() + self.proc1 = Process(None) + self.proc1.save(self.db_conn) + self.proc2 = Process(None) + self.proc2.save(self.db_conn) + self.proc3 = Process(None) + self.proc3.save(self.db_conn) + + def test_Process_ids(self) -> None: + """Test Process.save() re Process.id_.""" + self.assertEqual(self.proc1.id_, Process.by_id(self.db_conn, 1, create=False).id_) - with self.assertRaises(BadFormatException): - p_saved = Process(0) - p_saved = Process(5) - p_saved.save(self.db_conn) - self.assertEqual(p_saved.id_, + self.assertEqual(self.proc2.id_, + Process.by_id(self.db_conn, 2, create=False).id_) + proc5 = Process(5) + proc5.save(self.db_conn) + self.assertEqual(proc5.id_, Process.by_id(self.db_conn, 5, create=False).id_) - p_saved.title.set('named') - p_loaded = Process.by_id(self.db_conn, p_saved.id_) - self.assertNotEqual(p_saved.title.history, p_loaded.title.history) - p_saved.save(self.db_conn) - p_loaded = Process.by_id(self.db_conn, p_saved.id_) - self.assertEqual(p_saved.title.history, p_loaded.title.history) + + def test_Process_steps(self) -> None: + """Test addition, nesting, and non-recursion of ProcessSteps""" + def add_step(proc: Process, + steps_proc: list[tuple[int | None, int, int | None]], + step_tuple: tuple[int | None, int, int | None], + expected_id: int) -> None: + steps_proc += [step_tuple] + proc.set_steps(self.db_conn, steps_proc) + steps_proc[-1] = (expected_id, step_tuple[1], step_tuple[2]) + assert isinstance(self.proc2.id_, int) + assert isinstance(self.proc3.id_, int) + steps_proc1: list[tuple[int | None, int, int | None]] = [] + add_step(self.proc1, steps_proc1, (None, self.proc2.id_, None), 1) + p_1_dict: dict[int, ProcessStepsNode] = {} + p_1_dict[1] = ProcessStepsNode(self.proc2, None, True, {}, False) + self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) + add_step(self.proc1, steps_proc1, (None, self.proc3.id_, None), 2) + step_2 = self.proc1.explicit_steps[-1] + assert isinstance(step_2.id_, int) + p_1_dict[2] = ProcessStepsNode(self.proc3, None, True, {}, False) + self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) + steps_proc2: list[tuple[int | None, int, int | None]] = [] + add_step(self.proc2, steps_proc2, (None, self.proc3.id_, None), 3) + p_1_dict[1].steps[3] = ProcessStepsNode(self.proc3, None, + False, {}, False) + self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) + add_step(self.proc1, steps_proc1, (None, self.proc2.id_, step_2.id_), + 4) + step_3 = ProcessStepsNode(self.proc3, None, False, {}, True) + p_1_dict[2].steps[4] = ProcessStepsNode(self.proc2, step_2.id_, True, + {3: step_3}, False) + self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) + add_step(self.proc1, steps_proc1, (None, self.proc3.id_, 999), 5) + p_1_dict[5] = ProcessStepsNode(self.proc3, None, True, {}, False) + self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) + add_step(self.proc1, steps_proc1, (None, self.proc3.id_, 3), 6) + p_1_dict[6] = ProcessStepsNode(self.proc3, None, True, {}, False) + self.assertEqual(self.proc1.get_steps(self.db_conn, None), + p_1_dict) + self.assertEqual(self.proc1.used_as_step_by(self.db_conn), + []) + self.assertEqual(self.proc2.used_as_step_by(self.db_conn), + [self.proc1]) + self.assertEqual(self.proc3.used_as_step_by(self.db_conn), + [self.proc1, self.proc2]) + + def test_Process_conditions(self) -> None: + """Test setting Process.conditions/enables/disables.""" + for target in ('conditions', 'enables', 'disables'): + c1 = Condition(None, False) + c1.save(self.db_conn) + assert isinstance(c1.id_, int) + c2 = Condition(None, False) + c2.save(self.db_conn) + assert isinstance(c2.id_, int) + self.proc1.set_conditions(self.db_conn, [], target) + self.assertEqual(getattr(self.proc1, target), []) + self.proc1.set_conditions(self.db_conn, [c1.id_], target) + self.assertEqual(getattr(self.proc1, target), [c1]) + self.proc1.set_conditions(self.db_conn, [c2.id_], target) + self.assertEqual(getattr(self.proc1, target), [c2]) + self.proc1.set_conditions(self.db_conn, [c1.id_, c2.id_], target) + self.assertEqual(getattr(self.proc1, target), [c1, c2]) def test_Process_by_id(self) -> None: """Test Process.by_id().""" @@ -44,60 +115,114 @@ class TestsWithDB(TestCaseWithDB): Process.by_id(self.db_conn, None, create=False) with self.assertRaises(NotFoundException): Process.by_id(self.db_conn, 0, create=False) - with self.assertRaises(NotFoundException): - Process.by_id(self.db_conn, 1, create=False) - self.assertNotEqual(Process(1).id_, + self.assertNotEqual(self.proc1.id_, Process.by_id(self.db_conn, None, create=True).id_) - self.assertEqual(Process(1).id_, - Process.by_id(self.db_conn, 1, create=True).id_) self.assertEqual(Process(2).id_, Process.by_id(self.db_conn, 2, create=True).id_) def test_Process_all(self) -> None: """Test Process.all().""" - p_1 = Process(None) - p_1.save(self.db_conn) - p_2 = Process(None) - p_2.save(self.db_conn) - self.assertEqual({p_1.id_, p_2.id_}, + self.assertEqual({self.proc1.id_, self.proc2.id_, self.proc3.id_}, set(p.id_ for p in Process.all(self.db_conn))) + def test_ProcessStep_singularity(self) -> None: + """Test pointers made for single object keep pointing to it.""" + assert isinstance(self.proc2.id_, int) + self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)]) + step = self.proc1.explicit_steps[-1] + assert isinstance(step.id_, int) + step_retrieved = ProcessStep.by_id(self.db_conn, step.id_) + step.parent_step_id = 99 + self.assertEqual(step.parent_step_id, step_retrieved.parent_step_id) + + def test_Process_singularity(self) -> None: + """Test pointers made for single object keep pointing to it, and + subsequent retrievals don't overload relations.""" + assert isinstance(self.proc1.id_, int) + assert isinstance(self.proc2.id_, int) + c1 = Condition(None, False) + c1.save(self.db_conn) + assert isinstance(c1.id_, int) + self.proc1.set_conditions(self.db_conn, [c1.id_]) + self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)]) + self.proc1.save(self.db_conn) + p_retrieved = Process.by_id(self.db_conn, self.proc1.id_) + self.assertEqual(self.proc1.explicit_steps, p_retrieved.explicit_steps) + self.assertEqual(self.proc1.conditions, p_retrieved.conditions) + self.proc1.save(self.db_conn) + + def test_Process_versioned_attributes_singularity(self) -> None: + """Test behavior of VersionedAttributes on saving (with .title).""" + assert isinstance(self.proc1.id_, int) + self.proc1.title.set('named') + p_loaded = Process.by_id(self.db_conn, self.proc1.id_) + self.assertEqual(self.proc1.title.history, p_loaded.title.history) + + def test_Process_removal(self) -> None: + """Test removal of Processes and ProcessSteps.""" + assert isinstance(self.proc3.id_, int) + self.proc1.remove(self.db_conn) + self.assertEqual({self.proc2.id_, self.proc3.id_}, + set(p.id_ for p in Process.all(self.db_conn))) + self.proc2.set_steps(self.db_conn, [(None, self.proc3.id_, None)]) + with self.assertRaises(HandledException): + self.proc3.remove(self.db_conn) + self.proc2.explicit_steps[0].remove(self.db_conn) + retrieved = Process.by_id(self.db_conn, self.proc2.id_) + self.assertEqual(retrieved.explicit_steps, []) + self.proc2.set_steps(self.db_conn, [(None, self.proc3.id_, None)]) + step = retrieved.explicit_steps[0] + self.proc2.remove(self.db_conn) + with self.assertRaises(NotFoundException): + ProcessStep.by_id(self.db_conn, step.id_) + todo = Todo(None, self.proc3, False, '2024-01-01') + todo.save(self.db_conn) + with self.assertRaises(HandledException): + self.proc3.remove(self.db_conn) + todo.remove(self.db_conn) + self.proc3.remove(self.db_conn) + class TestsWithServer(TestCaseWithServer): """Module tests against our HTTP server/handler (and database).""" def test_do_POST_process(self) -> None: """Test POST /process and its effect on the database.""" - def post_data_to_expect(form_data: dict[str, object], - to_: str, expect: int) -> None: - encoded_form_data = urlencode(form_data).encode('utf-8') - headers = {'Content-Type': 'application/x-www-form-urlencoded', - 'Content-Length': str(len(encoded_form_data))} - self.conn.request('POST', to_, - body=encoded_form_data, headers=headers) - self.assertEqual(self.conn.getresponse().status, expect) - form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.0} - post_data_to_expect(form_data, '/process?id=FOO', 400) + self.assertEqual(0, len(Process.all(self.db_conn))) + form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1} + self.check_post(form_data, '/process?id=', 302, '/process?id=1') + self.assertEqual(1, len(Process.all(self.db_conn))) + self.check_post(form_data, '/process?id=FOO', 400) form_data['effort'] = 'foo' - post_data_to_expect(form_data, '/process?id=', 400) - form_data['effort'] = None - post_data_to_expect(form_data, '/process?id=', 400) - form_data = {'title': None, 'description': 1, 'effort': 1.0} - post_data_to_expect(form_data, '/process?id=', 302) - retrieved = Process.by_id(self.db_conn, 1) - self.assertEqual(retrieved.title.newest, 'None') - self.assertEqual([p.id_ for p in Process.all(self.db_conn)], - [retrieved.id_]) + self.check_post(form_data, '/process?id=', 400) + self.check_post({}, '/process?id=', 400) + form_data = {'title': '', 'description': ''} + self.check_post(form_data, '/process?id=', 400) + form_data = {'title': '', 'effort': 1.1} + self.check_post(form_data, '/process?id=', 400) + form_data = {'description': '', 'effort': 1.0} + self.check_post(form_data, '/process?id=', 400) + self.assertEqual(1, len(Process.all(self.db_conn))) + form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.0, + 'condition': []} + self.check_post(form_data, '/process?id=', 302, '/process?id=2') + form_data['condition'] = [1] + self.check_post(form_data, '/process?id=', 404) + form_data_cond = {'title': 'foo', 'description': 'foo'} + self.check_post(form_data_cond, '/condition', 302, '/condition?id=1') + self.check_post(form_data, '/process?id=', 302, '/process?id=3') + form_data['disables'] = [1] + self.check_post(form_data, '/process?id=', 302, '/process?id=4') + form_data['enables'] = [1] + self.check_post(form_data, '/process?id=', 302, '/process?id=5') + form_data['delete'] = '' + self.check_post(form_data, '/process?id=', 404) + self.check_post(form_data, '/process?id=6', 404) + self.check_post(form_data, '/process?id=5', 302, '/processes') def test_do_GET(self) -> None: """Test /process and /processes response codes.""" - self.conn.request('GET', '/process') - self.assertEqual(self.conn.getresponse().status, 200) - self.conn.request('GET', '/process?id=') - self.assertEqual(self.conn.getresponse().status, 200) - self.conn.request('GET', '/process?id=0') - self.assertEqual(self.conn.getresponse().status, 400) - self.conn.request('GET', '/process?id=FOO') - self.assertEqual(self.conn.getresponse().status, 400) - self.conn.request('GET', '/processes') - self.assertEqual(self.conn.getresponse().status, 200) + form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1} + self.check_post(form_data, '/process?id=', 302, '/process?id=1') + self.check_get_defaults('/process') + self.check_get('/processes', 200)