home · contact · privacy
Minor tests refactoring.
[plomtask] / tests / processes.py
1 """Test Processes module."""
2 from unittest import TestCase
3 from tests.utils import TestCaseWithDB, TestCaseWithServer
4 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
5 from plomtask.conditions import Condition
6 from plomtask.todos import Todo
7 from plomtask.exceptions import NotFoundException, HandledException
8
9
10 class TestsSansDB(TestCase):
11     """Module tests not requiring DB setup."""
12
13     def test_Process_versioned_defaults(self) -> None:
14         """Test defaults of Process' VersionedAttributes."""
15         self.assertEqual(Process(None).title.newest, 'UNNAMED')
16         self.assertEqual(Process(None).description.newest, '')
17         self.assertEqual(Process(None).effort.newest, 1.0)
18
19     def test_Process_legal_ID(self) -> None:
20         """Test Process cannot be instantiated with id_=0."""
21         with self.assertRaises(HandledException):
22             Process(0)
23
24
25 class TestsWithDB(TestCaseWithDB):
26     """Mdule tests not requiring DB setup."""
27
28     def setUp(self) -> None:
29         super().setUp()
30         self.proc1 = Process(None)
31         self.proc1.save(self.db_conn)
32         self.proc2 = Process(None)
33         self.proc2.save(self.db_conn)
34         self.proc3 = Process(None)
35         self.proc3.save(self.db_conn)
36
37     def test_Process_ids(self) -> None:
38         """Test Process.save() re Process.id_."""
39         self.assertEqual(self.proc1.id_,
40                          Process.by_id(self.db_conn, 1, create=False).id_)
41         self.assertEqual(self.proc2.id_,
42                          Process.by_id(self.db_conn, 2, create=False).id_)
43         proc5 = Process(5)
44         proc5.save(self.db_conn)
45         self.assertEqual(proc5.id_,
46                          Process.by_id(self.db_conn, 5, create=False).id_)
47
48     def test_Process_steps(self) -> None:
49         """Test addition, nesting, and non-recursion of ProcessSteps"""
50         def add_step(proc: Process,
51                      steps_proc: list[tuple[int | None, int, int | None]],
52                      step_tuple: tuple[int | None, int, int | None],
53                      expected_id: int) -> None:
54             steps_proc += [step_tuple]
55             proc.set_steps(self.db_conn, steps_proc)
56             steps_proc[-1] = (expected_id, step_tuple[1], step_tuple[2])
57         assert isinstance(self.proc2.id_, int)
58         assert isinstance(self.proc3.id_, int)
59         steps_proc1: list[tuple[int | None, int, int | None]] = []
60         add_step(self.proc1, steps_proc1, (None, self.proc2.id_, None), 1)
61         p_1_dict: dict[int, ProcessStepsNode] = {}
62         p_1_dict[1] = ProcessStepsNode(self.proc2, None, True, {}, False)
63         self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
64         add_step(self.proc1, steps_proc1, (None, self.proc3.id_, None), 2)
65         step_2 = self.proc1.explicit_steps[-1]
66         assert isinstance(step_2.id_, int)
67         p_1_dict[2] = ProcessStepsNode(self.proc3, None, True, {}, False)
68         self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
69         steps_proc2: list[tuple[int | None, int, int | None]] = []
70         add_step(self.proc2, steps_proc2, (None, self.proc3.id_, None), 3)
71         p_1_dict[1].steps[3] = ProcessStepsNode(self.proc3, None,
72                                                 False, {}, False)
73         self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
74         add_step(self.proc1, steps_proc1, (None, self.proc2.id_, step_2.id_),
75                  4)
76         step_3 = ProcessStepsNode(self.proc3, None, False, {}, True)
77         p_1_dict[2].steps[4] = ProcessStepsNode(self.proc2, step_2.id_, True,
78                                                 {3: step_3}, False)
79         self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
80         add_step(self.proc1, steps_proc1, (None, self.proc3.id_, 999), 5)
81         p_1_dict[5] = ProcessStepsNode(self.proc3, None, True, {}, False)
82         self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
83         add_step(self.proc1, steps_proc1, (None, self.proc3.id_, 3), 6)
84         p_1_dict[6] = ProcessStepsNode(self.proc3, None, True, {}, False)
85         self.assertEqual(self.proc1.get_steps(self.db_conn, None),
86                          p_1_dict)
87         self.assertEqual(self.proc1.used_as_step_by(self.db_conn),
88                          [])
89         self.assertEqual(self.proc2.used_as_step_by(self.db_conn),
90                          [self.proc1])
91         self.assertEqual(self.proc3.used_as_step_by(self.db_conn),
92                          [self.proc1, self.proc2])
93
94     def test_Process_conditions(self) -> None:
95         """Test setting Process.conditions/enables/disables."""
96         for target in ('conditions', 'enables', 'disables'):
97             c1 = Condition(None, False)
98             c1.save(self.db_conn)
99             assert isinstance(c1.id_, int)
100             c2 = Condition(None, False)
101             c2.save(self.db_conn)
102             assert isinstance(c2.id_, int)
103             self.proc1.set_conditions(self.db_conn, [], target)
104             self.assertEqual(getattr(self.proc1, target), [])
105             self.proc1.set_conditions(self.db_conn, [c1.id_], target)
106             self.assertEqual(getattr(self.proc1, target), [c1])
107             self.proc1.set_conditions(self.db_conn, [c2.id_], target)
108             self.assertEqual(getattr(self.proc1, target), [c2])
109             self.proc1.set_conditions(self.db_conn, [c1.id_, c2.id_], target)
110             self.assertEqual(getattr(self.proc1, target), [c1, c2])
111
112     def test_Process_by_id(self) -> None:
113         """Test Process.by_id()."""
114         with self.assertRaises(NotFoundException):
115             Process.by_id(self.db_conn, None, create=False)
116         with self.assertRaises(NotFoundException):
117             Process.by_id(self.db_conn, 0, create=False)
118         self.assertNotEqual(self.proc1.id_,
119                             Process.by_id(self.db_conn, None, create=True).id_)
120         self.assertEqual(Process(2).id_,
121                          Process.by_id(self.db_conn, 2, create=True).id_)
122
123     def test_Process_all(self) -> None:
124         """Test Process.all()."""
125         self.assertEqual({self.proc1.id_, self.proc2.id_, self.proc3.id_},
126                          set(p.id_ for p in Process.all(self.db_conn)))
127
128     def test_ProcessStep_singularity(self) -> None:
129         """Test pointers made for single object keep pointing to it."""
130         assert isinstance(self.proc2.id_, int)
131         self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)])
132         step = self.proc1.explicit_steps[-1]
133         assert isinstance(step.id_, int)
134         step_retrieved = ProcessStep.by_id(self.db_conn, step.id_)
135         step.parent_step_id = 99
136         self.assertEqual(step.parent_step_id, step_retrieved.parent_step_id)
137
138     def test_Process_singularity(self) -> None:
139         """Test pointers made for single object keep pointing to it, and
140         subsequent retrievals don't overload relations."""
141         assert isinstance(self.proc1.id_, int)
142         assert isinstance(self.proc2.id_, int)
143         c1 = Condition(None, False)
144         c1.save(self.db_conn)
145         assert isinstance(c1.id_, int)
146         self.proc1.set_conditions(self.db_conn, [c1.id_])
147         self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)])
148         self.proc1.save(self.db_conn)
149         p_retrieved = Process.by_id(self.db_conn, self.proc1.id_)
150         self.assertEqual(self.proc1.explicit_steps, p_retrieved.explicit_steps)
151         self.assertEqual(self.proc1.conditions, p_retrieved.conditions)
152         self.proc1.save(self.db_conn)
153
154     def test_Process_versioned_attributes_singularity(self) -> None:
155         """Test behavior of VersionedAttributes on saving (with .title)."""
156         assert isinstance(self.proc1.id_, int)
157         self.proc1.title.set('named')
158         p_loaded = Process.by_id(self.db_conn, self.proc1.id_)
159         self.assertEqual(self.proc1.title.history, p_loaded.title.history)
160
161     def test_Process_removal(self) -> None:
162         """Test removal of Processes and ProcessSteps."""
163         assert isinstance(self.proc3.id_, int)
164         self.proc1.remove(self.db_conn)
165         self.assertEqual({self.proc2.id_, self.proc3.id_},
166                          set(p.id_ for p in Process.all(self.db_conn)))
167         self.proc2.set_steps(self.db_conn, [(None, self.proc3.id_, None)])
168         with self.assertRaises(HandledException):
169             self.proc3.remove(self.db_conn)
170         self.proc2.explicit_steps[0].remove(self.db_conn)
171         retrieved = Process.by_id(self.db_conn, self.proc2.id_)
172         self.assertEqual(retrieved.explicit_steps, [])
173         self.proc2.set_steps(self.db_conn, [(None, self.proc3.id_, None)])
174         step = retrieved.explicit_steps[0]
175         self.proc2.remove(self.db_conn)
176         with self.assertRaises(NotFoundException):
177             ProcessStep.by_id(self.db_conn, step.id_)
178         todo = Todo(None, self.proc3, False, '2024-01-01')
179         todo.save(self.db_conn)
180         with self.assertRaises(HandledException):
181             self.proc3.remove(self.db_conn)
182         todo.remove(self.db_conn)
183         self.proc3.remove(self.db_conn)
184
185
186 class TestsWithServer(TestCaseWithServer):
187     """Module tests against our HTTP server/handler (and database)."""
188
189     def test_do_POST_process(self) -> None:
190         """Test POST /process and its effect on the database."""
191         self.assertEqual(0, len(Process.all(self.db_conn)))
192         form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1}
193         self.check_post(form_data, '/process?id=', 302, '/process?id=1')
194         self.assertEqual(1, len(Process.all(self.db_conn)))
195         self.check_post(form_data, '/process?id=FOO', 400)
196         form_data['effort'] = 'foo'
197         self.check_post(form_data, '/process?id=', 400)
198         self.check_post({}, '/process?id=', 400)
199         form_data = {'title': '', 'description': ''}
200         self.check_post(form_data, '/process?id=', 400)
201         form_data = {'title': '', 'effort': 1.1}
202         self.check_post(form_data, '/process?id=', 400)
203         form_data = {'description': '', 'effort': 1.0}
204         self.check_post(form_data, '/process?id=', 400)
205         self.assertEqual(1, len(Process.all(self.db_conn)))
206         form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.0,
207                      'condition': []}
208         self.check_post(form_data, '/process?id=', 302, '/process?id=2')
209         form_data['condition'] = [1]
210         self.check_post(form_data, '/process?id=', 404)
211         form_data_cond = {'title': 'foo', 'description': 'foo'}
212         self.check_post(form_data_cond, '/condition', 302, '/condition?id=1')
213         self.check_post(form_data, '/process?id=', 302, '/process?id=3')
214         form_data['disables'] = [1]
215         self.check_post(form_data, '/process?id=', 302, '/process?id=4')
216         form_data['enables'] = [1]
217         self.check_post(form_data, '/process?id=', 302, '/process?id=5')
218         form_data['delete'] = ''
219         self.check_post(form_data, '/process?id=', 404)
220         self.check_post(form_data, '/process?id=6', 404)
221         self.check_post(form_data, '/process?id=5', 302, '/processes')
222
223     def test_do_GET(self) -> None:
224         """Test /process and /processes response codes."""
225         form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1}
226         self.check_post(form_data, '/process?id=', 302, '/process?id=1')
227         self.check_get_defaults('/process')
228         self.check_get('/processes', 200)