"""Module tests requiring DB setup."""
checked_class = Process
- def three_processes(self) -> tuple[Process, Process, Process]:
- """Return three saved processes."""
- p1, p2, p3 = Process(None), Process(None), Process(None)
- for p in [p1, p2, p3]:
- p.save(self.db_conn)
- return p1, p2, p3
-
- def p_of_conditions(self) -> tuple[Process, list[Condition],
- list[Condition], list[Condition]]:
- """Return Process and its three Condition sets."""
- p = Process(None)
- c1, c2, c3 = Condition(None), Condition(None), Condition(None)
- for c in [c1, c2, c3]:
- c.save(self.db_conn)
- assert isinstance(c1.id_, int)
- assert isinstance(c2.id_, int)
- assert isinstance(c3.id_, int)
- set_1 = [c1, c2]
- set_2 = [c2, c3]
- set_3 = [c1, c3]
- conds = [c.id_ for c in set_1 if isinstance(c.id_, int)]
- enables = [c.id_ for c in set_2 if isinstance(c.id_, int)]
- disables = [c.id_ for c in set_3 if isinstance(c.id_, int)]
- p.set_condition_relations(self.db_conn, conds, [], enables, disables)
- p.save(self.db_conn)
- return p, set_1, set_2, set_3
-
- def test_Process_conditions_saving(self) -> None:
- """Test .save/.save_core."""
- p, set1, set2, set3 = self.p_of_conditions()
- assert p.id_ is not None
- r = Process.by_id(self.db_conn, p.id_)
- self.assertEqual(sorted(r.conditions), sorted(set1))
- self.assertEqual(sorted(r.enables), sorted(set2))
- self.assertEqual(sorted(r.disables), sorted(set3))
-
def test_from_table_row(self) -> None:
"""Test .from_table_row() properly reads in class from DB."""
super().test_from_table_row()
self.assertEqual(sorted(r.enables), sorted(set2))
self.assertEqual(sorted(r.disables), sorted(set3))
- # def test_Process_steps(self) -> None:
- # """Test addition, nesting, and non-recursion of ProcessSteps"""
- # # pylint: disable=too-many-locals
- # # pylint: disable=too-many-statements
- # p1, p2, p3 = self.three_processes()
- # assert isinstance(p1.id_, int)
- # assert isinstance(p2.id_, int)
- # assert isinstance(p3.id_, int)
- # steps_p1: list[ProcessStep] = []
- # # add step of process p2 as first (top-level) step to p1
- # s_p2_to_p1 = ProcessStep(None, p1.id_, p2.id_, None)
- # steps_p1 += [s_p2_to_p1]
- # p1.set_steps(self.db_conn, steps_p1)
- # p1_dict: dict[int, ProcessStepsNode] = {}
- # p1_dict[1] = ProcessStepsNode(p2, None, True, {})
- # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
- # # add step of process p3 as second (top-level) step to p1
- # s_p3_to_p1 = ProcessStep(None, p1.id_, p3.id_, None)
- # steps_p1 += [s_p3_to_p1]
- # p1.set_steps(self.db_conn, steps_p1)
- # p1_dict[2] = ProcessStepsNode(p3, None, True, {})
- # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
- # # add step of process p3 as first (top-level) step to p2,
- # steps_p2: list[ProcessStep] = []
- # s_p3_to_p2 = ProcessStep(None, p2.id_, p3.id_, None)
- # steps_p2 += [s_p3_to_p2]
- # p2.set_steps(self.db_conn, steps_p2)
- # # expect it as implicit sub-step of p1's second (p3) step
- # p2_dict = {3: ProcessStepsNode(p3, None, False, {})}
- # p1_dict[1].steps[3] = p2_dict[3]
- # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
- # # add step of process p2 as explicit sub-step to p1's second sub-step
- # s_p2_to_p1_first = ProcessStep(None, p1.id_, p2.id_, s_p3_to_p1.id_)
- # steps_p1 += [s_p2_to_p1_first]
- # p1.set_steps(self.db_conn, steps_p1)
- # seen_3 = ProcessStepsNode(p3, None, False, {}, False)
- # p1_dict[1].steps[3].seen = True
- # p1_dict[2].steps[4] = ProcessStepsNode(p2, s_p3_to_p1.id_, True,
- # {3: seen_3})
- # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
- # # add step of process p3 as explicit sub-step to non-existing p1
- # # sub-step (of id=999), expect it to become another p1 top-level step
- # s_p3_to_p1_999 = ProcessStep(None, p1.id_, p3.id_, 999)
- # steps_p1 += [s_p3_to_p1_999]
- # p1.set_steps(self.db_conn, steps_p1)
- # p1_dict[5] = ProcessStepsNode(p3, None, True, {})
- # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
- # # add step of process p3 as explicit sub-step to p1's implicit p3
- # # sub-step, expect it to become another p1 top-level step
- # s_p3_to_p1_impl_p3 = ProcessStep(None, p1.id_, p3.id_,
- # s_p3_to_p2.id_)
- # steps_p1 += [s_p3_to_p1_impl_p3]
- # p1.set_steps(self.db_conn, steps_p1)
- # p1_dict[6] = ProcessStepsNode(p3, None, True, {})
- # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
- # self.assertEqual(p1.used_as_step_by(self.db_conn), [])
- # self.assertEqual(p2.used_as_step_by(self.db_conn), [p1])
- # self.assertEqual(p3.used_as_step_by(self.db_conn), [p1, p2])
- # # # add step of process p3 as explicit sub-step to p1's first
- # # # sub-step, expect it to eliminate implicit p3 sub-step
- # # s_p3_to_p1_first_explicit = ProcessStep(None, p1.id_, p3.id_,
- # # s_p2_to_p1.id_)
- # # p1_dict[1].steps = {7: ProcessStepsNode(p3, 1, True, {})}
- # # p1_dict[2].steps[4].steps[3].seen = False
- # # steps_p1 += [s_p3_to_p1_first_explicit]
- # # p1.set_steps(self.db_conn, steps_p1)
- # # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
- # # ensure implicit steps non-top explicit steps are shown
- # s_p3_to_p2_first = ProcessStep(None, p2.id_, p3.id_, s_p3_to_p2.id_)
- # steps_p2 += [s_p3_to_p2_first]
- # p2.set_steps(self.db_conn, steps_p2)
- # p1_dict[1].steps[3].steps[7] = ProcessStepsNode(p3, 3, False, {},
- # True)
- # p1_dict[2].steps[4].steps[3].steps[7] = ProcessStepsNode(
- # p3, 3, False, {}, False)
- # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
- # # ensure suppressed step nodes are hidden
- # assert isinstance(s_p3_to_p2.id_, int)
- # p1.set_step_suppressions(self.db_conn, [s_p3_to_p2.id_])
- # p1_dict[1].steps[3].steps = {}
- # p1_dict[1].steps[3].is_suppressed = True
- # p1_dict[2].steps[4].steps[3].steps = {}
- # p1_dict[2].steps[4].steps[3].is_suppressed = True
- # self.assertEqual(p1.get_steps(self.db_conn), p1_dict)
-
- def test_Process_conditions(self) -> None:
- """Test setting Process.conditions/enables/disables."""
- p = Process(None)
- p.save(self.db_conn)
- targets = ['conditions', 'blockers', 'enables', 'disables']
- for i, target in enumerate(targets):
- c1, c2 = Condition(None), Condition(None)
- c1.save(self.db_conn)
- c2.save(self.db_conn)
- assert isinstance(c1.id_, int)
- assert isinstance(c2.id_, int)
- args: list[list[int]] = [[], [], [], []]
- args[i] = []
- p.set_condition_relations(self.db_conn, *args)
- self.assertEqual(getattr(p, target), [])
- args[i] = [c1.id_]
- p.set_condition_relations(self.db_conn, *args)
- self.assertEqual(getattr(p, target), [c1])
- args[i] = [c2.id_]
- p.set_condition_relations(self.db_conn, *args)
- self.assertEqual(getattr(p, target), [c2])
- args[i] = [c1.id_, c2.id_]
- p.set_condition_relations(self.db_conn, *args)
- self.assertEqual(getattr(p, target), [c1, c2])
+ def test_Process_steps(self) -> None:
+ """Test addition, nesting, and non-recursion of ProcessSteps"""
+ # pylint: disable=too-many-locals
+ # pylint: disable=too-many-statements
+ p1, p2, p3 = self.three_processes()
+ assert isinstance(p1.id_, int)
+ assert isinstance(p2.id_, int)
+ assert isinstance(p3.id_, int)
+ steps_p1: list[ProcessStep] = []
+ # # add step of process p2 as first (top-level) step to p1
+ # s_p2_to_p1 = ProcessStep(None, p1.id_, p2.id_, None)
+ # steps_p1 += [s_p2_to_p1]
+ # p1.set_steps(self.db_conn, steps_p1)
+ # p1_dict: dict[int, ProcessStepsNode] = {}
+ # p1_dict[1] = ProcessStepsNode(p2, None, True, {})
+ # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+ # # add step of process p3 as second (top-level) step to p1
+ # s_p3_to_p1 = ProcessStep(None, p1.id_, p3.id_, None)
+ # steps_p1 += [s_p3_to_p1]
+ # p1.set_steps(self.db_conn, steps_p1)
+ # p1_dict[2] = ProcessStepsNode(p3, None, True, {})
+ # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+ # # add step of process p3 as first (top-level) step to p2,
+ # steps_p2: list[ProcessStep] = []
+ # s_p3_to_p2 = ProcessStep(None, p2.id_, p3.id_, None)
+ # steps_p2 += [s_p3_to_p2]
+ # p2.set_steps(self.db_conn, steps_p2)
+ # # expect it as implicit sub-step of p1's second (p3) step
+ # p2_dict = {3: ProcessStepsNode(p3, None, False, {})}
+ # p1_dict[1].steps[3] = p2_dict[3]
+ # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+ # # add step of process p2 as explicit sub-step to p1's second sub-step
+ # s_p2_to_p1_first = ProcessStep(None, p1.id_, p2.id_, s_p3_to_p1.id_)
+ # steps_p1 += [s_p2_to_p1_first]
+ # p1.set_steps(self.db_conn, steps_p1)
+ # seen_3 = ProcessStepsNode(p3, None, False, {}, False)
+ # p1_dict[1].steps[3].seen = True
+ # p1_dict[2].steps[4] = ProcessStepsNode(p2, s_p3_to_p1.id_, True,
+ # {3: seen_3})
+ # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+
+ # add step of process p3 as explicit sub-step to non-existing p1
+ # sub-step (of id=999), expect it to become another p1 top-level step
+ s_p3_to_p1_999 = ProcessStep(None, p1.id_, p3.id_, 999)
+ steps_p1 += [s_p3_to_p1_999]
+ p1.set_steps(self.db_conn, steps_p1)
+ p1_dict[5] = ProcessStepsNode(p3, None, True, {})
+ self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+ # add step of process p3 as explicit sub-step to p1's implicit p3
+ # sub-step, expect it to become another p1 top-level step
+ s_p3_to_p1_impl_p3 = ProcessStep(None, p1.id_, p3.id_,
+ s_p3_to_p2.id_)
+ steps_p1 += [s_p3_to_p1_impl_p3]
+ p1.set_steps(self.db_conn, steps_p1)
+ p1_dict[6] = ProcessStepsNode(p3, None, True, {})
+ self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+ self.assertEqual(p1.used_as_step_by(self.db_conn), [])
+ self.assertEqual(p2.used_as_step_by(self.db_conn), [p1])
+ self.assertEqual(p3.used_as_step_by(self.db_conn), [p1, p2])
+ # add step of process p3 as explicit sub-step to p1's first
+ # sub-step, expect it to eliminate implicit p3 sub-step
+ s_p3_to_p1_first_explicit = ProcessStep(None, p1.id_, p3.id_,
+ s_p2_to_p1.id_)
+ p1_dict[1].steps = {7: ProcessStepsNode(p3, 1, True, {})}
+ p1_dict[2].steps[4].steps[3].seen = False
+ steps_p1 += [s_p3_to_p1_first_explicit]
+ p1.set_steps(self.db_conn, steps_p1)
+ self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+ # ensure implicit steps non-top explicit steps are shown
+ s_p3_to_p2_first = ProcessStep(None, p2.id_, p3.id_, s_p3_to_p2.id_)
+ steps_p2 += [s_p3_to_p2_first]
+ p2.set_steps(self.db_conn, steps_p2)
+ p1_dict[1].steps[3].steps[7] = ProcessStepsNode(p3, 3, False, {},
+ True)
+ p1_dict[2].steps[4].steps[3].steps[7] = ProcessStepsNode(
+ p3, 3, False, {}, False)
+ self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+ # ensure suppressed step nodes are hidden
+ assert isinstance(s_p3_to_p2.id_, int)
+ p1.set_step_suppressions(self.db_conn, [s_p3_to_p2.id_])
+ p1_dict[1].steps[3].steps = {}
+ p1_dict[1].steps[3].is_suppressed = True
+ p1_dict[2].steps[4].steps[3].steps = {}
+ p1_dict[2].steps[4].steps[3].is_suppressed = True
+ self.assertEqual(p1.get_steps(self.db_conn), p1_dict)
def test_remove(self) -> None:
"""Test removal of Processes and ProcessSteps."""
super().test_remove()
- p1, p2, p3 = self.three_processes()
+ p1, p2, p3 = Process(None), Process(None), Process(None)
+ for p in [p1, p2, p3]:
+ p.save(self.db_conn)
assert isinstance(p1.id_, int)
assert isinstance(p2.id_, int)
assert isinstance(p3.id_, int)
self.check_json_get('/process?id=1', exp)
self.post_exp_process([exp], {}, 1)
self.check_json_get('/process?id=1', exp)
+ # check conditions posting
+ for i in range(3):
+ self.post_exp_cond([exp], {}, i+1)
+ p = {'conditions': [1, 2], 'disables': [1],
+ 'blockers': [3], 'enables': [2, 3]}
+ self.post_exp_process([exp], p, 1)
+ self.check_json_get('/process?id=1', exp)
# check n_todos field
self.post_exp_day([], {'new_todo': ['1']}, '2024-01-01')
self.post_exp_day([], {'new_todo': ['1']}, '2024-01-02')
self.check_post({'delete': ''}, '/process?id=4', 302, '/processes')
exp = ExpectedGetProcess(4)
exp.set('is_new', True)
- self.post_exp_process([exp], {}, 1)
- self.post_exp_process([exp], {}, 2)
- self.post_exp_process([exp], {}, 3)
+ for i in range(3):
+ self.post_exp_cond([exp], {}, i+1)
+ self.post_exp_process([exp], {}, i+1)
exp.force('process_candidates', [1, 2, 3])
self.check_json_get('/process?id=4', exp)
url = '/process?id=1'
exp = ExpectedGetProcess(1)
self.post_exp_process([exp], {}, 1)
- # post first (top-level) step of proc 2 to proc 1 by 'step_of' in 2
+ # post first (top-level) step of proc2 to proc1 by 'step_of' in 2
self.post_exp_process([exp], {'step_of': 1}, 2)
- exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 2)])
- exp.set('steps', [exp.stepnode_as_dict(1, 2)])
+ exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, owner_id=1, step_process_id=2)])
+ exp.set('steps', [
+ exp.stepnode_as_dict(
+ step_id=1,
+ proc_id=2)])
self.check_json_get(url, exp)
# post empty/absent steps list to process, expect clean slate, and old
# step to completely disappear
exp.lib_wipe('ProcessStep')
exp.set('steps', [])
self.check_json_get(url, exp)
- # post new step of proc2 to proc1 by 'new_top_step'
+ # post anew (as only step yet) step of proc2 to proc1 by 'new_top_step'
self.post_exp_process([exp], {'new_top_step': 2}, 1)
- exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 2)])
+ exp.lib_set('ProcessStep',
+ [exp.procstep_as_dict(1, owner_id=1, step_process_id=2)])
self.post_exp_process([exp], {'kept_steps': [1]}, 1)
- exp.set('steps', [exp.stepnode_as_dict(1, 2)])
+ step_nodes = [exp.stepnode_as_dict(step_id=1, proc_id=2)]
+ exp.set('steps', step_nodes)
self.check_json_get(url, exp)
- # fail on single- and multi-step recursion
+ # fail on single--step recursion
p_min = {'title': '', 'description': '', 'effort': 0}
self.check_post(p_min | {'new_top_step': 1}, url, 400)
self.check_post(p_min | {'step_of': 1}, url, 400)
- self.post_exp_process([exp], {'new_top_step': 1}, 2)
- self.check_post(p_min | {'step_of': 2, 'new_top_step': 2}, url, 400)
- self.post_exp_process([exp], {}, 3)
- self.post_exp_process([exp], {'step_of': 3}, 4)
- self.check_post(p_min | {'new_top_step': 3, 'step_of': 4}, url, 400)
# post sibling steps
+ self.post_exp_process([exp], {}, 3)
+ self.post_exp_process([exp], {'kept_steps': [1], 'new_top_step': 3}, 1)
+ exp.lib_set('ProcessStep',
+ [exp.procstep_as_dict(2, owner_id=1, step_process_id=3)])
+ step_nodes += [exp.stepnode_as_dict(step_id=2, proc_id=3)]
+ self.check_json_get(url, exp)
+ # # post implicit sub-step via post to proc2
self.post_exp_process([exp], {}, 4)
- self.post_exp_process([exp], {'new_top_step': 4}, 1)
- self.post_exp_process([exp], {'kept_steps': [1], 'new_top_step': 4}, 1)
- exp.lib_set('ProcessStep', [exp.procstep_as_dict(1, 1, 4),
- exp.procstep_as_dict(2, 1, 4)])
- exp.set('steps', [exp.stepnode_as_dict(1, 4),
- exp.stepnode_as_dict(2, 4)])
+ self.post_exp_process([exp], {'step_of': [1], 'new_top_step': 4}, 2)
+ exp.lib_set('ProcessStep',
+ [exp.procstep_as_dict(3, owner_id=2, step_process_id=4)])
+ step_nodes[0]['steps'] = [
+ exp.stepnode_as_dict(step_id=3, proc_id=4, is_explicit=False)]
self.check_json_get(url, exp)
- # post sub-step chain
+ # post explicit sub-step via post to proc1
p = {'kept_steps': [1, 2], 'new_step_to_2': 4}
self.post_exp_process([exp], p, 1)
- exp.lib_set('ProcessStep', [exp.procstep_as_dict(3, 1, 4, 2)])
- exp.set('steps', [exp.stepnode_as_dict(1, 4),
- exp.stepnode_as_dict(2, 4, steps=[
- exp.stepnode_as_dict(3, 4)])])
+ exp.lib_set('ProcessStep', [exp.procstep_as_dict(
+ 4, owner_id=1, step_process_id=4, parent_step_id=2)])
+ step_nodes[1]['steps'] = [
+ exp.stepnode_as_dict(step_id=4, proc_id=4)]
self.check_json_get(url, exp)
- # fail posting sub-step that would cause recursion
+ # fail on multi-step recursion via new step(s)
+ self.post_exp_process([exp], {}, 5)
+ self.post_exp_process([exp], {'new_top_step': 1}, 5)
+ exp.lib_set('ProcessStep', [exp.procstep_as_dict(
+ 5, owner_id=5, step_process_id=1)])
+ self.check_post(p_min | {'step_of': 5, 'new_top_step': 5}, url, 400)
self.post_exp_process([exp], {}, 6)
- self.post_exp_process([exp], {'new_top_step': 6}, 5)
- p = p_min | {'kept_steps': [1, 2, 3], 'new_step_to_2': 5, 'step_of': 6}
- self.check_post(p, url, 400)
+ self.post_exp_process([exp], {'new_top_step': 5}, 6)
+ exp.lib_set('ProcessStep', [exp.procstep_as_dict(
+ 6, owner_id=6, step_process_id=5)])
+ self.check_post(p_min | {'step_of': 5, 'new_top_step': 6}, url, 400)
+ # fail on multi-step recursion via explicit sub-step
+ self.check_json_get(url, exp)
+ p = {'step_of': 5, 'kept_steps': [1, 2, 4], 'new_step_to_2': 6}
+ self.check_post(p_min | p, url, 400)
def test_fail_GET_process(self) -> None:
"""Test invalid GET /process params."""