From: Christian Heller Date: Wed, 5 Jun 2024 22:07:45 +0000 (+0200) Subject: Refactor ProcessStep code and undo replacement of implicit steps by explicit ones. X-Git-Url: https://plomlompom.com/repos/%7B%7Bdb.prefix%7D%7D/static/process?a=commitdiff_plain;h=0952d4a17e7df265cf0c50b66df1e1391075b821;p=plomtask Refactor ProcessStep code and undo replacement of implicit steps by explicit ones. --- diff --git a/plomtask/http.py b/plomtask/http.py index 249de32..2e0fc76 100644 --- a/plomtask/http.py +++ b/plomtask/http.py @@ -10,7 +10,7 @@ from plomtask.days import Day from plomtask.exceptions import HandledException, BadFormatException, \ NotFoundException from plomtask.db import DatabaseConnection, DatabaseFile -from plomtask.processes import Process +from plomtask.processes import Process, ProcessStep from plomtask.conditions import Condition from plomtask.todos import Todo @@ -378,7 +378,8 @@ class TaskHandler(BaseHTTPRequestHandler): process.set_disables(self.conn, self.form_data.get_all_int('disables')) process.calendarize = self.form_data.get_all_str('calendarize') != [] process.save(self.conn) - steps: list[tuple[int | None, int, int | None]] = [] + assert isinstance(process.id_, int) + steps: list[ProcessStep] = [] for step_id in self.form_data.get_all_int('keep_step'): if step_id not in self.form_data.get_all_int('steps'): raise BadFormatException('trying to keep unknown step') @@ -389,13 +390,15 @@ class TaskHandler(BaseHTTPRequestHandler): f'step_{step_id}_process_id') parent_id = self.form_data.get_int_or_none( f'step_{step_id}_parent_id') - steps += [(step_id, step_process_id, parent_id)] + steps += [ProcessStep(step_id, process.id_, step_process_id, + parent_id)] for step_id in self.form_data.get_all_int('steps'): for step_process_id in self.form_data.get_all_int( f'new_step_to_{step_id}'): - steps += [(None, step_process_id, step_id)] + steps += [ProcessStep(None, process.id_, step_process_id, + step_id)] for step_process_id in self.form_data.get_all_int('new_top_step'): - steps += [(None, step_process_id, None)] + steps += [ProcessStep(None, process.id_, step_process_id, None)] process.uncache() process.set_steps(self.conn, steps) process.save(self.conn) diff --git a/plomtask/processes.py b/plomtask/processes.py index 089a710..4ad6e75 100644 --- a/plomtask/processes.py +++ b/plomtask/processes.py @@ -93,13 +93,13 @@ class Process(BaseModel[int], ConditionsRelations): for child in explicit_children: assert isinstance(child.id_, int) node.steps[child.id_] = make_node(child) - # ensure that one (!) explicit step of process replaces - # one (!) implicit step of same process - for i in [i for i, s in node.steps.items() - if not s.is_explicit - and s.process.id_ == child.step_process_id]: - del node.steps[i] - break + # # ensure that one (!) explicit step of process replaces + # # one (!) implicit step of same process + # for i in [i for i, s in node.steps.items() + # if not s.process_step.owner_id == child.id_ + # and s.process.id_ == child.step_process_id]: + # del node.steps[i] + # break node.seen = node_id in seen_step_ids seen_step_ids.add(node_id) for id_, step in node.steps.items(): @@ -117,19 +117,12 @@ class Process(BaseModel[int], ConditionsRelations): walk_steps(step_id, step_node) return steps - def _add_step(self, - db_conn: DatabaseConnection, - id_: int | None, - step_process_id: int, - parent_step_id: int | None) -> ProcessStep: - """Create new ProcessStep, save and add it to self.explicit_steps. - - Also checks against step recursion. + def set_steps(self, db_conn: DatabaseConnection, + steps: list[ProcessStep]) -> None: + """Set self.explicit_steps in bulk. - The new step's parent_step_id will fall back to None either if no - matching ProcessStep is found (which can be assumed in case it was - just deleted under its feet), or if the parent step would not be - owned by the current Process. + Checks against recursion, and turns into top-level steps any of + unknown or non-owned parent. """ def walk_steps(node: ProcessStep) -> None: if node.step_process_id == self.id_: @@ -138,31 +131,23 @@ class Process(BaseModel[int], ConditionsRelations): for step in step_process.explicit_steps: walk_steps(step) - if parent_step_id is not None: - try: - parent_step = ProcessStep.by_id(db_conn, parent_step_id) - if parent_step.owner_id != self.id_: - parent_step_id = None - except NotFoundException: - parent_step_id = None - assert isinstance(self.id_, int) - step = ProcessStep(id_, self.id_, step_process_id, parent_step_id) - walk_steps(step) - self.explicit_steps += [step] - step.save(db_conn) # NB: This ensures a non-None step.id_. - return step - - def set_steps(self, db_conn: DatabaseConnection, - steps: list[tuple[int | None, int, int | None]]) -> None: - """Set self.explicit_steps in bulk.""" assert isinstance(self.id_, int) for step in self.explicit_steps: step.uncache() self.explicit_steps = [] db_conn.delete_where('process_steps', 'owner', self.id_) - for step_tuple in steps: - self._add_step(db_conn, step_tuple[0], - step_tuple[1], step_tuple[2]) + for step in steps: + step.save(db_conn) + if step.parent_step_id is not None: + try: + parent_step = ProcessStep.by_id(db_conn, + step.parent_step_id) + if parent_step.owner_id != self.id_: + step.parent_step_id = None + except NotFoundException: + step.parent_step_id = None + walk_steps(step) + self.explicit_steps += [step] def save(self, db_conn: DatabaseConnection) -> None: """Add (or re-write) self and connected items to DB.""" diff --git a/tests/processes.py b/tests/processes.py index 7d1d0f1..930e560 100644 --- a/tests/processes.py +++ b/tests/processes.py @@ -83,60 +83,76 @@ class TestsWithDB(TestCaseWithDB): def test_Process_steps(self) -> None: """Test addition, nesting, and non-recursion of ProcessSteps""" - def add_step(proc: Process, - steps_proc: list[tuple[int | None, int, int | None]], - step_tuple: tuple[int | None, int, int | None], - expected_id: int) -> None: - steps_proc += [step_tuple] - proc.set_steps(self.db_conn, steps_proc) - steps_proc[-1] = (expected_id, step_tuple[1], step_tuple[2]) + # pylint: disable=too-many-locals p1, p2, p3 = self.three_processes() assert isinstance(p1.id_, int) assert isinstance(p2.id_, int) assert isinstance(p3.id_, int) - steps_p1: list[tuple[int | None, int, int | None]] = [] + steps_p1: list[ProcessStep] = [] # add step of process p2 as first (top-level) step to p1 - add_step(p1, steps_p1, (None, p2.id_, None), 1) + s_p2_to_p1 = ProcessStep(None, p1.id_, p2.id_, None) + steps_p1 += [s_p2_to_p1] + p1.set_steps(self.db_conn, steps_p1) p1_dict: dict[int, ProcessStepsNode] = {} p1_dict[1] = ProcessStepsNode(p2, None, True, {}, False) self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) # add step of process p3 as second (top-level) step to p1 - add_step(p1, steps_p1, (None, p3.id_, None), 2) - step_2 = p1.explicit_steps[-1] + s_p3_to_p1 = ProcessStep(None, p1.id_, p3.id_, None) + steps_p1 += [s_p3_to_p1] + p1.set_steps(self.db_conn, steps_p1) p1_dict[2] = ProcessStepsNode(p3, None, True, {}, False) self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) # add step of process p3 as first (top-level) step to p2, + steps_p2: list[ProcessStep] = [] + s_p3_to_p2 = ProcessStep(None, p2.id_, p3.id_, None) + steps_p2 += [s_p3_to_p2] + p2.set_steps(self.db_conn, steps_p2) # expect it as implicit sub-step of p1's second (p3) step - steps_p2: list[tuple[int | None, int, int | None]] = [] - add_step(p2, steps_p2, (None, p3.id_, None), 3) - p1_dict[1].steps[3] = ProcessStepsNode(p3, None, False, {}, False) + p2_dict = {3: ProcessStepsNode(p3, None, False, {}, False)} + p1_dict[1].steps[3] = p2_dict[3] self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) - # add step of process p2 as explicit sub-step to p1's first sub-step - add_step(p1, steps_p1, (None, p2.id_, step_2.id_), 4) - step_3 = ProcessStepsNode(p3, None, False, {}, True) - p1_dict[2].steps[4] = ProcessStepsNode(p2, step_2.id_, True, - {3: step_3}, False) + # add step of process p2 as explicit sub-step to p1's second sub-step + s_p2_to_p1_first = ProcessStep(None, p1.id_, p2.id_, s_p3_to_p1.id_) + steps_p1 += [s_p2_to_p1_first] + p1.set_steps(self.db_conn, steps_p1) + seen_3 = ProcessStepsNode(p3, None, False, {}, True) + p1_dict[2].steps[4] = ProcessStepsNode(p2, s_p3_to_p1.id_, True, + {3: seen_3}, False) self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) # add step of process p3 as explicit sub-step to non-existing p1 # sub-step (of id=999), expect it to become another p1 top-level step - add_step(p1, steps_p1, (None, p3.id_, 999), 5) + s_p3_to_p1_999 = ProcessStep(None, p1.id_, p3.id_, 999) + steps_p1 += [s_p3_to_p1_999] + p1.set_steps(self.db_conn, steps_p1) p1_dict[5] = ProcessStepsNode(p3, None, True, {}, False) self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) # add step of process p3 as explicit sub-step to p1's implicit p3 # sub-step, expect it to become another p1 top-level step - add_step(p1, steps_p1, (None, p3.id_, 3), 6) + s_p3_to_p1_impl_p3 = ProcessStep(None, p1.id_, p3.id_, s_p3_to_p2.id_) + steps_p1 += [s_p3_to_p1_impl_p3] + p1.set_steps(self.db_conn, steps_p1) p1_dict[6] = ProcessStepsNode(p3, None, True, {}, False) self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) self.assertEqual(p1.used_as_step_by(self.db_conn), []) self.assertEqual(p2.used_as_step_by(self.db_conn), [p1]) self.assertEqual(p3.used_as_step_by(self.db_conn), [p1, p2]) - # add step of process p2 as explicit sub-step to p1's second (p3) - # top-level step - add_step(p1, steps_p1, (None, p3.id_, 2), 7) - p1_dict[2].steps[7] = ProcessStepsNode(p3, 2, True, {}, False) - # import pprint - # pprint.pp(p1.get_steps(self.db_conn, None)) - # pprint.pp(p1_dict) + # # add step of process p3 as explicit sub-step to p1's first sub-step, + # # expect it to eliminate implicit p3 sub-step + # s_p3_to_p1_first_explicit = ProcessStep(None, p1.id_, p3.id_, + # s_p2_to_p1.id_) + # p1_dict[1].steps = {7: ProcessStepsNode(p3, 1, True, {}, False)} + # p1_dict[2].steps[4].steps[3].seen = False + # steps_p1 += [s_p3_to_p1_first_explicit] + # p1.set_steps(self.db_conn, steps_p1) + # self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) + # ensure implicit steps non-top explicit steps are shown + s_p3_to_p2_first = ProcessStep(None, p2.id_, p3.id_, s_p3_to_p2.id_) + steps_p2 += [s_p3_to_p2_first] + p2.set_steps(self.db_conn, steps_p2) + p1_dict[1].steps[3].steps[7] = ProcessStepsNode(p3, 3, False, {}, + False) + p1_dict[2].steps[4].steps[3].steps[7] = ProcessStepsNode(p3, 3, False, + {}, True) self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict) def test_Process_conditions(self) -> None: @@ -182,16 +198,16 @@ class TestsWithDB(TestCaseWithDB): assert isinstance(p1.id_, int) assert isinstance(p2.id_, int) assert isinstance(p3.id_, int) - p2.set_steps(self.db_conn, [(None, p1.id_, None)]) + step = ProcessStep(None, p2.id_, p1.id_, None) + p2.set_steps(self.db_conn, [step]) with self.assertRaises(HandledException): p1.remove(self.db_conn) - step = p2.explicit_steps[0] p2.set_steps(self.db_conn, []) with self.assertRaises(NotFoundException): ProcessStep.by_id(self.db_conn, step.id_) p1.remove(self.db_conn) - p2.set_steps(self.db_conn, [(None, p3.id_, None)]) - step = p2.explicit_steps[0] + step = ProcessStep(None, p2.id_, p3.id_, None) + p2.set_steps(self.db_conn, [step]) p2.remove(self.db_conn) with self.assertRaises(NotFoundException): ProcessStep.by_id(self.db_conn, step.id_) @@ -223,9 +239,10 @@ class TestsWithDBForProcessStep(TestCaseWithDB): p2 = Process(None) p1.save(self.db_conn) p2.save(self.db_conn) + assert isinstance(p1.id_, int) assert isinstance(p2.id_, int) - p1.set_steps(self.db_conn, [(None, p2.id_, None)]) - step = p1.explicit_steps[0] + step = ProcessStep(None, p1.id_, p2.id_, None) + p1.set_steps(self.db_conn, [step]) step.remove(self.db_conn) self.assertEqual(p1.explicit_steps, []) self.check_storage([]) diff --git a/tests/todos.py b/tests/todos.py index ecf2089..c45171a 100644 --- a/tests/todos.py +++ b/tests/todos.py @@ -1,7 +1,7 @@ """Test Todos module.""" from tests.utils import TestCaseWithDB, TestCaseWithServer from plomtask.todos import Todo, TodoNode -from plomtask.processes import Process +from plomtask.processes import Process, ProcessStep from plomtask.conditions import Condition from plomtask.exceptions import (NotFoundException, BadFormatException, HandledException) @@ -167,11 +167,13 @@ class TestsWithDB(TestCaseWithDB): proc4.save(self.db_conn) assert isinstance(proc4.id_, int) # make proc4 step of proc3 - proc3.set_steps(self.db_conn, [(None, proc4.id_, None)]) + step = ProcessStep(None, proc3.id_, proc4.id_, None) + proc3.set_steps(self.db_conn, [step]) # give proc2 three steps; 2× proc1, 1× proc3 - proc2.set_steps(self.db_conn, [(None, self.proc.id_, None), - (None, self.proc.id_, None), - (None, proc3.id_, None)]) + step1 = ProcessStep(None, proc2.id_, self.proc.id_, None) + step2 = ProcessStep(None, proc2.id_, self.proc.id_, None) + step3 = ProcessStep(None, proc2.id_, proc3.id_, None) + proc2.set_steps(self.db_conn, [step1, step2, step3]) # test mere creation does nothing todo_ignore = Todo(None, proc2, False, self.date1) todo_ignore.save(self.db_conn)