home · contact · privacy
861600821e5095397be3633cd8cca00e1a9b0742
[plomtask] / tests / processes.py
1 """Test Processes module."""
2 from tests.utils import TestCaseWithDB, TestCaseWithServer, TestCaseSansDB
3 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
4 from plomtask.conditions import Condition
5 from plomtask.exceptions import HandledException, NotFoundException
6 from plomtask.todos import Todo
7
8
9 class TestsSansDB(TestCaseSansDB):
10     """Module tests not requiring DB setup."""
11     checked_class = Process
12
13     def test_Process_id_setting(self) -> None:
14         """Test .id_ being set and its legal range being enforced."""
15         self.check_id_setting()
16
17     def test_Process_versioned_defaults(self) -> None:
18         """Test defaults of VersionedAttributes."""
19         self.check_versioned_defaults({
20             'title': 'UNNAMED',
21             'description': '',
22             'effort': 1.0})
23
24
25 class TestsSansDBProcessStep(TestCaseSansDB):
26     """Module tests not requiring DB setup."""
27     checked_class = ProcessStep
28
29     def test_ProcessStep_id_setting(self) -> None:
30         """Test .id_ being set and its legal range being enforced."""
31         self.check_id_setting(2, 3, 4)
32
33
34 class TestsWithDB(TestCaseWithDB):
35     """Module tests requiring DB setup."""
36     checked_class = Process
37
38     def three_processes(self) -> tuple[Process, Process, Process]:
39         """Return three saved processes."""
40         p1, p2, p3 = Process(None), Process(None), Process(None)
41         for p in [p1, p2, p3]:
42             p.save(self.db_conn)
43         return p1, p2, p3
44
45     def test_Process_saving_and_caching(self) -> None:
46         """Test .save/.save_core."""
47         kwargs = {'id_': 1}
48         self.check_saving_and_caching(**kwargs)
49         p = Process(None)
50         p.title.set('t1')
51         p.title.set('t2')
52         p.description.set('d1')
53         p.description.set('d2')
54         p.effort.set(0.5)
55         p.effort.set(1.5)
56         c1, c2, c3 = Condition(None), Condition(None), Condition(None)
57         for c in [c1, c2, c3]:
58             c.save(self.db_conn)
59         assert isinstance(c1.id_, int)
60         assert isinstance(c2.id_, int)
61         assert isinstance(c3.id_, int)
62         p.set_conditions(self.db_conn, [c1.id_, c2.id_])
63         p.set_enables(self.db_conn, [c2.id_, c3.id_])
64         p.set_disables(self.db_conn, [c1.id_, c3.id_])
65         p.save(self.db_conn)
66         r = Process.by_id(self.db_conn, p.id_)
67         self.assertEqual(sorted(r.title.history.values()), ['t1', 't2'])
68         self.assertEqual(sorted(r.description.history.values()), ['d1', 'd2'])
69         self.assertEqual(sorted(r.effort.history.values()), [0.5, 1.5])
70         self.assertEqual(sorted(r.conditions), sorted([c1, c2]))
71         self.assertEqual(sorted(r.enables), sorted([c2, c3]))
72         self.assertEqual(sorted(r.disables), sorted([c1, c3]))
73
74     def test_Process_steps(self) -> None:
75         """Test addition, nesting, and non-recursion of ProcessSteps"""
76         def add_step(proc: Process,
77                      steps_proc: list[tuple[int | None, int, int | None]],
78                      step_tuple: tuple[int | None, int, int | None],
79                      expected_id: int) -> None:
80             steps_proc += [step_tuple]
81             proc.set_steps(self.db_conn, steps_proc)
82             steps_proc[-1] = (expected_id, step_tuple[1], step_tuple[2])
83         p1, p2, p3 = self.three_processes()
84         assert isinstance(p1.id_, int)
85         assert isinstance(p2.id_, int)
86         assert isinstance(p3.id_, int)
87         steps_p1: list[tuple[int | None, int, int | None]] = []
88         add_step(p1, steps_p1, (None, p2.id_, None), 1)
89         p1_dict: dict[int, ProcessStepsNode] = {}
90         p1_dict[1] = ProcessStepsNode(p2, None, True, {}, False)
91         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
92         add_step(p1, steps_p1, (None, p3.id_, None), 2)
93         step_2 = p1.explicit_steps[-1]
94         p1_dict[2] = ProcessStepsNode(p3, None, True, {}, False)
95         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
96         steps_p2: list[tuple[int | None, int, int | None]] = []
97         add_step(p2, steps_p2, (None, p3.id_, None), 3)
98         p1_dict[1].steps[3] = ProcessStepsNode(p3, None, False, {}, False)
99         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
100         add_step(p1, steps_p1, (None, p2.id_, step_2.id_), 4)
101         step_3 = ProcessStepsNode(p3, None, False, {}, True)
102         p1_dict[2].steps[4] = ProcessStepsNode(p2, step_2.id_, True,
103                                                {3: step_3}, False)
104         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
105         add_step(p1, steps_p1, (None, p3.id_, 999), 5)
106         p1_dict[5] = ProcessStepsNode(p3, None, True, {}, False)
107         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
108         add_step(p1, steps_p1, (None, p3.id_, 3), 6)
109         p1_dict[6] = ProcessStepsNode(p3, None, True, {}, False)
110         self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
111         self.assertEqual(p1.used_as_step_by(self.db_conn), [])
112         self.assertEqual(p2.used_as_step_by(self.db_conn), [p1])
113         self.assertEqual(p3.used_as_step_by(self.db_conn), [p1, p2])
114
115     def test_Process_conditions(self) -> None:
116         """Test setting Process.conditions/enables/disables."""
117         p = Process(None)
118         p.save(self.db_conn)
119         for target in ('conditions', 'enables', 'disables'):
120             method = getattr(p, f'set_{target}')
121             c1, c2 = Condition(None), Condition(None)
122             c1.save(self.db_conn)
123             c2.save(self.db_conn)
124             assert isinstance(c1.id_, int)
125             assert isinstance(c2.id_, int)
126             method(self.db_conn, [])
127             self.assertEqual(getattr(p, target), [])
128             method(self.db_conn, [c1.id_])
129             self.assertEqual(getattr(p, target), [c1])
130             method(self.db_conn, [c2.id_])
131             self.assertEqual(getattr(p, target), [c2])
132             method(self.db_conn, [c1.id_, c2.id_])
133             self.assertEqual(getattr(p, target), [c1, c2])
134
135     def test_Process_by_id(self) -> None:
136         """Test .by_id(), including creation"""
137         self.check_by_id()
138
139     def test_Process_all(self) -> None:
140         """Test .all()."""
141         self.check_all()
142
143     def test_Process_singularity(self) -> None:
144         """Test pointers made for single object keep pointing to it."""
145         self.check_singularity('conditions', [Condition(None)])
146
147     def test_Process_versioned_attributes_singularity(self) -> None:
148         """Test behavior of VersionedAttributes on saving (with .title)."""
149         self.check_versioned_singularity()
150
151     def test_Process_removal(self) -> None:
152         """Test removal of Processes and ProcessSteps."""
153         self.check_remove()
154         p1, p2, p3 = self.three_processes()
155         assert isinstance(p1.id_, int)
156         assert isinstance(p2.id_, int)
157         assert isinstance(p3.id_, int)
158         p2.set_steps(self.db_conn, [(None, p1.id_, None)])
159         with self.assertRaises(HandledException):
160             p1.remove(self.db_conn)
161         step = p2.explicit_steps[0]
162         p2.set_steps(self.db_conn, [])
163         with self.assertRaises(NotFoundException):
164             ProcessStep.by_id(self.db_conn, step.id_)
165         p1.remove(self.db_conn)
166         p2.set_steps(self.db_conn, [(None, p3.id_, None)])
167         step = p2.explicit_steps[0]
168         p2.remove(self.db_conn)
169         with self.assertRaises(NotFoundException):
170             ProcessStep.by_id(self.db_conn, step.id_)
171         todo = Todo(None, p3, False, '2024-01-01')
172         todo.save(self.db_conn)
173         with self.assertRaises(HandledException):
174             p3.remove(self.db_conn)
175         todo.remove(self.db_conn)
176         p3.remove(self.db_conn)
177
178
179 class TestsWithDBForProcessStep(TestCaseWithDB):
180     """Module tests requiring DB setup."""
181     checked_class = ProcessStep
182
183     def test_ProcessStep_saving_and_caching(self) -> None:
184         """Test .save/.save_core."""
185         kwargs = {'id_': 1,
186                   'owner_id': 2,
187                   'step_process_id': 3,
188                   'parent_step_id': 4}
189         self.check_saving_and_caching(**kwargs)
190
191     def test_ProcessStep_from_table_row(self) -> None:
192         """Test .from_table_row() properly reads in class from DB"""
193         self.check_from_table_row(2, 3, None)
194
195     def test_ProcessStep_singularity(self) -> None:
196         """Test pointers made for single object keep pointing to it."""
197         self.check_singularity('parent_step_id', 1, 2, 3, None)
198
199     def test_ProcessStep_remove(self) -> None:
200         """Test .remove and unsetting of owner's .explicit_steps entry."""
201         p1 = Process(None)
202         p2 = Process(None)
203         p1.save(self.db_conn)
204         p2.save(self.db_conn)
205         assert isinstance(p2.id_, int)
206         p1.set_steps(self.db_conn, [(None, p2.id_, None)])
207         step = p1.explicit_steps[0]
208         step.remove(self.db_conn)
209         self.assertEqual(p1.explicit_steps, [])
210         self.check_storage([])
211
212
213 class TestsWithServer(TestCaseWithServer):
214     """Module tests against our HTTP server/handler (and database)."""
215
216     def test_do_POST_process(self) -> None:
217         """Test POST /process and its effect on the database."""
218         self.assertEqual(0, len(Process.all(self.db_conn)))
219         form_data = self.post_process()
220         self.assertEqual(1, len(Process.all(self.db_conn)))
221         self.check_post(form_data, '/process?id=FOO', 400)
222         self.check_post(form_data | {'effort': 'foo'}, '/process?id=', 400)
223         self.check_post({}, '/process?id=', 400)
224         self.check_post({'title': '', 'description': ''}, '/process?id=', 400)
225         self.check_post({'title': '', 'effort': 1.1}, '/process?id=', 400)
226         self.check_post({'description': '', 'effort': 1.0},
227                         '/process?id=', 400)
228         self.assertEqual(1, len(Process.all(self.db_conn)))
229         form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.0}
230         self.post_process(2, form_data | {'condition': []})
231         self.check_post(form_data | {'condition': [1]}, '/process?id=', 404)
232         self.check_post({'title': 'foo', 'description': 'foo'},
233                         '/condition', 302, '/condition?id=1')
234         self.post_process(3, form_data | {'condition': [1]})
235         self.post_process(4, form_data | {'disables': [1]})
236         self.post_process(5, form_data | {'enables': [1]})
237         form_data['delete'] = ''
238         self.check_post(form_data, '/process?id=', 404)
239         self.check_post(form_data, '/process?id=6', 404)
240         self.check_post(form_data, '/process?id=5', 302, '/processes')
241
242     def test_do_GET(self) -> None:
243         """Test /process and /processes response codes."""
244         self.post_process()
245         self.check_get_defaults('/process')
246         self.check_get('/processes', 200)