1 """Test Processes module."""
2 from tests.utils import TestCaseWithDB, TestCaseWithServer, TestCaseSansDB
3 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
4 from plomtask.conditions import Condition
5 from plomtask.exceptions import HandledException, NotFoundException
6 from plomtask.todos import Todo
9 class TestsSansDB(TestCaseSansDB):
10 """Module tests not requiring DB setup."""
11 checked_class = Process
13 def test_Process_id_setting(self) -> None:
14 """Test .id_ being set and its legal range being enforced."""
15 self.check_id_setting()
17 def test_Process_versioned_defaults(self) -> None:
18 """Test defaults of VersionedAttributes."""
19 self.check_versioned_defaults({
25 class TestsSansDBProcessStep(TestCaseSansDB):
26 """Module tests not requiring DB setup."""
27 checked_class = ProcessStep
29 def test_ProcessStep_id_setting(self) -> None:
30 """Test .id_ being set and its legal range being enforced."""
31 self.check_id_setting(2, 3, 4)
34 class TestsWithDB(TestCaseWithDB):
35 """Module tests requiring DB setup."""
36 checked_class = Process
38 def three_processes(self) -> tuple[Process, Process, Process]:
39 """Return three saved processes."""
40 p1, p2, p3 = Process(None), Process(None), Process(None)
41 for p in [p1, p2, p3]:
45 def test_Process_saving_and_caching(self) -> None:
46 """Test .save/.save_core."""
48 self.check_saving_and_caching(**kwargs)
52 p.description.set('d1')
53 p.description.set('d2')
56 c1, c2, c3 = Condition(None), Condition(None), Condition(None)
57 for c in [c1, c2, c3]:
59 assert isinstance(c1.id_, int)
60 assert isinstance(c2.id_, int)
61 assert isinstance(c3.id_, int)
62 p.set_conditions(self.db_conn, [c1.id_, c2.id_])
63 p.set_enables(self.db_conn, [c2.id_, c3.id_])
64 p.set_disables(self.db_conn, [c1.id_, c3.id_])
66 r = Process.by_id(self.db_conn, p.id_)
67 self.assertEqual(sorted(r.title.history.values()), ['t1', 't2'])
68 self.assertEqual(sorted(r.description.history.values()), ['d1', 'd2'])
69 self.assertEqual(sorted(r.effort.history.values()), [0.5, 1.5])
70 self.assertEqual(sorted(r.conditions), sorted([c1, c2]))
71 self.assertEqual(sorted(r.enables), sorted([c2, c3]))
72 self.assertEqual(sorted(r.disables), sorted([c1, c3]))
74 def test_Process_steps(self) -> None:
75 """Test addition, nesting, and non-recursion of ProcessSteps"""
76 def add_step(proc: Process,
77 steps_proc: list[tuple[int | None, int, int | None]],
78 step_tuple: tuple[int | None, int, int | None],
79 expected_id: int) -> None:
80 steps_proc += [step_tuple]
81 proc.set_steps(self.db_conn, steps_proc)
82 steps_proc[-1] = (expected_id, step_tuple[1], step_tuple[2])
83 p1, p2, p3 = self.three_processes()
84 assert isinstance(p1.id_, int)
85 assert isinstance(p2.id_, int)
86 assert isinstance(p3.id_, int)
87 steps_p1: list[tuple[int | None, int, int | None]] = []
88 add_step(p1, steps_p1, (None, p2.id_, None), 1)
89 p1_dict: dict[int, ProcessStepsNode] = {}
90 p1_dict[1] = ProcessStepsNode(p2, None, True, {}, False)
91 self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
92 add_step(p1, steps_p1, (None, p3.id_, None), 2)
93 step_2 = p1.explicit_steps[-1]
94 p1_dict[2] = ProcessStepsNode(p3, None, True, {}, False)
95 self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
96 steps_p2: list[tuple[int | None, int, int | None]] = []
97 add_step(p2, steps_p2, (None, p3.id_, None), 3)
98 p1_dict[1].steps[3] = ProcessStepsNode(p3, None, False, {}, False)
99 self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
100 add_step(p1, steps_p1, (None, p2.id_, step_2.id_), 4)
101 step_3 = ProcessStepsNode(p3, None, False, {}, True)
102 p1_dict[2].steps[4] = ProcessStepsNode(p2, step_2.id_, True,
104 self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
105 add_step(p1, steps_p1, (None, p3.id_, 999), 5)
106 p1_dict[5] = ProcessStepsNode(p3, None, True, {}, False)
107 self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
108 add_step(p1, steps_p1, (None, p3.id_, 3), 6)
109 p1_dict[6] = ProcessStepsNode(p3, None, True, {}, False)
110 self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
111 self.assertEqual(p1.used_as_step_by(self.db_conn), [])
112 self.assertEqual(p2.used_as_step_by(self.db_conn), [p1])
113 self.assertEqual(p3.used_as_step_by(self.db_conn), [p1, p2])
115 def test_Process_conditions(self) -> None:
116 """Test setting Process.conditions/enables/disables."""
119 for target in ('conditions', 'enables', 'disables'):
120 method = getattr(p, f'set_{target}')
121 c1, c2 = Condition(None), Condition(None)
122 c1.save(self.db_conn)
123 c2.save(self.db_conn)
124 assert isinstance(c1.id_, int)
125 assert isinstance(c2.id_, int)
126 method(self.db_conn, [])
127 self.assertEqual(getattr(p, target), [])
128 method(self.db_conn, [c1.id_])
129 self.assertEqual(getattr(p, target), [c1])
130 method(self.db_conn, [c2.id_])
131 self.assertEqual(getattr(p, target), [c2])
132 method(self.db_conn, [c1.id_, c2.id_])
133 self.assertEqual(getattr(p, target), [c1, c2])
135 def test_Process_by_id(self) -> None:
136 """Test .by_id(), including creation"""
139 def test_Process_all(self) -> None:
143 def test_Process_singularity(self) -> None:
144 """Test pointers made for single object keep pointing to it."""
145 self.check_singularity('conditions', [Condition(None)])
147 def test_Process_versioned_attributes_singularity(self) -> None:
148 """Test behavior of VersionedAttributes on saving (with .title)."""
149 self.check_versioned_singularity()
151 def test_Process_removal(self) -> None:
152 """Test removal of Processes and ProcessSteps."""
154 p1, p2, p3 = self.three_processes()
155 assert isinstance(p1.id_, int)
156 assert isinstance(p2.id_, int)
157 assert isinstance(p3.id_, int)
158 p2.set_steps(self.db_conn, [(None, p1.id_, None)])
159 with self.assertRaises(HandledException):
160 p1.remove(self.db_conn)
161 step = p2.explicit_steps[0]
162 p2.set_steps(self.db_conn, [])
163 with self.assertRaises(NotFoundException):
164 ProcessStep.by_id(self.db_conn, step.id_)
165 p1.remove(self.db_conn)
166 p2.set_steps(self.db_conn, [(None, p3.id_, None)])
167 step = p2.explicit_steps[0]
168 p2.remove(self.db_conn)
169 with self.assertRaises(NotFoundException):
170 ProcessStep.by_id(self.db_conn, step.id_)
171 todo = Todo(None, p3, False, '2024-01-01')
172 todo.save(self.db_conn)
173 with self.assertRaises(HandledException):
174 p3.remove(self.db_conn)
175 todo.remove(self.db_conn)
176 p3.remove(self.db_conn)
179 class TestsWithDBForProcessStep(TestCaseWithDB):
180 """Module tests requiring DB setup."""
181 checked_class = ProcessStep
183 def test_ProcessStep_saving_and_caching(self) -> None:
184 """Test .save/.save_core."""
187 'step_process_id': 3,
189 self.check_saving_and_caching(**kwargs)
191 def test_ProcessStep_from_table_row(self) -> None:
192 """Test .from_table_row() properly reads in class from DB"""
193 self.check_from_table_row(2, 3, None)
195 def test_ProcessStep_singularity(self) -> None:
196 """Test pointers made for single object keep pointing to it."""
197 self.check_singularity('parent_step_id', 1, 2, 3, None)
199 def test_ProcessStep_remove(self) -> None:
200 """Test .remove and unsetting of owner's .explicit_steps entry."""
203 p1.save(self.db_conn)
204 p2.save(self.db_conn)
205 assert isinstance(p2.id_, int)
206 p1.set_steps(self.db_conn, [(None, p2.id_, None)])
207 step = p1.explicit_steps[0]
208 step.remove(self.db_conn)
209 self.assertEqual(p1.explicit_steps, [])
210 self.check_storage([])
213 class TestsWithServer(TestCaseWithServer):
214 """Module tests against our HTTP server/handler (and database)."""
216 def test_do_POST_process(self) -> None:
217 """Test POST /process and its effect on the database."""
218 self.assertEqual(0, len(Process.all(self.db_conn)))
219 form_data = self.post_process()
220 self.assertEqual(1, len(Process.all(self.db_conn)))
221 self.check_post(form_data, '/process?id=FOO', 400)
222 self.check_post(form_data | {'effort': 'foo'}, '/process?id=', 400)
223 self.check_post({}, '/process?id=', 400)
224 self.check_post({'title': '', 'description': ''}, '/process?id=', 400)
225 self.check_post({'title': '', 'effort': 1.1}, '/process?id=', 400)
226 self.check_post({'description': '', 'effort': 1.0},
228 self.assertEqual(1, len(Process.all(self.db_conn)))
229 form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.0}
230 self.post_process(2, form_data | {'condition': []})
231 self.check_post(form_data | {'condition': [1]}, '/process?id=', 404)
232 self.check_post({'title': 'foo', 'description': 'foo'},
233 '/condition', 302, '/condition?id=1')
234 self.post_process(3, form_data | {'condition': [1]})
235 self.post_process(4, form_data | {'disables': [1]})
236 self.post_process(5, form_data | {'enables': [1]})
237 form_data['delete'] = ''
238 self.check_post(form_data, '/process?id=', 404)
239 self.check_post(form_data, '/process?id=6', 404)
240 self.check_post(form_data, '/process?id=5', 302, '/processes')
242 def test_do_GET(self) -> None:
243 """Test /process and /processes response codes."""
245 self.check_get_defaults('/process')
246 self.check_get('/processes', 200)