From: Christian Heller Date: Thu, 18 Apr 2024 20:28:51 +0000 (+0200) Subject: Refactor Process/ProcessStep setting and saving. X-Git-Url: https://plomlompom.com/repos/%7B%7Bprefix%7D%7D/static/%7B%7Bdb.prefix%7D%7D/%7B%7Byoutube_prefix%7D%7D%7B%7Bvideo_id%7D%7D?a=commitdiff_plain;h=56b8aa166d51d139edb41f0ef250b1854a5e93e8;p=plomtask Refactor Process/ProcessStep setting and saving. --- diff --git a/plomtask/db.py b/plomtask/db.py index 4a82132..f53a94c 100644 --- a/plomtask/db.py +++ b/plomtask/db.py @@ -23,7 +23,7 @@ class DatabaseFile: # pylint: disable=too-few-public-methods self._check() def _check(self) -> None: - """Check file exists and is of proper schema.""" + """Check file exists, and is of proper schema.""" self.exists = isfile(self.path) if self.exists: self._validate_schema() diff --git a/plomtask/http.py b/plomtask/http.py index cb00825..25be899 100644 --- a/plomtask/http.py +++ b/plomtask/http.py @@ -225,23 +225,25 @@ class TaskHandler(BaseHTTPRequestHandler): self.form_data.get_all_int('condition')) process.set_fulfills(self.conn, self.form_data.get_all_int('fulfills')) process.set_undoes(self.conn, self.form_data.get_all_int('undoes')) - process.save_without_steps(self.conn) + process.save_id(self.conn) assert process.id_ is not None # for mypy process.explicit_steps = [] + steps: list[tuple[int | None, int, int | None]] = [] for step_id in self.form_data.get_all_int('steps'): - for step_process_id in\ - self.form_data.get_all_int(f'new_step_to_{step_id}'): - process.add_step(self.conn, None, step_process_id, step_id) + for step_process_id in self.form_data.get_all_int( + f'new_step_to_{step_id}'): + steps += [(None, step_process_id, step_id)] if step_id not in self.form_data.get_all_int('keep_step'): continue step_process_id = self.form_data.get_int( f'step_{step_id}_process_id') parent_id = self.form_data.get_int_or_none( f'step_{step_id}_parent_id') - process.add_step(self.conn, step_id, step_process_id, parent_id) + steps += [(step_id, step_process_id, parent_id)] for step_process_id in self.form_data.get_all_int('new_top_step'): - process.add_step(self.conn, None, step_process_id, None) - process.fix_steps(self.conn) + steps += [(None, step_process_id, None)] + process.set_steps(self.conn, steps) + process.save(self.conn) def do_POST_condition(self) -> None: """Update/insert Condition of ?id= and fields defined in postvars.""" diff --git a/plomtask/processes.py b/plomtask/processes.py index dc0613f..76ed30d 100644 --- a/plomtask/processes.py +++ b/plomtask/processes.py @@ -149,12 +149,15 @@ class Process: """Set self.undoes to Conditions identified by ids.""" self.set_conditions(db_conn, ids, 'undoes') - def add_step(self, db_conn: DatabaseConnection, id_: int | None, - step_process_id: int, - parent_step_id: int | None) -> ProcessStep: + def _add_step(self, + db_conn: DatabaseConnection, + id_: int | None, + step_process_id: int, + parent_step_id: int | None) -> ProcessStep: """Create new ProcessStep, save and add it to self.explicit_steps. Also checks against step recursion. + The new step's parent_step_id will fall back to None either if no matching ProcessStep is found (which can be assumed in case it was just deleted under its feet), or if the parent step would not be @@ -180,10 +183,27 @@ class Process: step.save(db_conn) # NB: This ensures a non-None step.id_. return step - def save_without_steps(self, db_conn: DatabaseConnection) -> None: - """Add (or re-write) self and connected VersionedAttributes to DB.""" + def set_steps(self, db_conn: DatabaseConnection, + steps: list[tuple[int | None, int, int | None]]) -> None: + """Set self.explicit_steps in bulk.""" + for step in self.explicit_steps: + assert step.id_ is not None + del db_conn.cached_process_steps[step.id_] + self.explicit_steps = [] + db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?', + (self.id_,)) + for step_tuple in steps: + self._add_step(db_conn, step_tuple[0], + step_tuple[1], step_tuple[2]) + + def save_id(self, db_conn: DatabaseConnection) -> None: + """Write bare-bones self (sans connected items), ensuring self.id_.""" cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,)) self.id_ = cursor.lastrowid + + def save(self, db_conn: DatabaseConnection) -> None: + """Add (or re-write) self and connected items to DB.""" + self.save_id(db_conn) self.title.save(db_conn) self.description.save(db_conn) self.effort.save(db_conn) @@ -203,27 +223,11 @@ class Process: db_conn.exec('INSERT INTO process_undoes VALUES (?,?)', (self.id_, condition.id_)) assert self.id_ is not None - db_conn.cached_processes[self.id_] = self - - def fix_steps(self, db_conn: DatabaseConnection) -> None: - """Rewrite ProcessSteps from self.explicit_steps. - - This also fixes illegal Step.parent_step_id values, i.e. those pointing - to steps now absent, or owned by a different Process, fall back into - .parent_step_id=None - """ db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?', (self.id_,)) for step in self.explicit_steps: - if step.parent_step_id is not None: - try: - parent_step = ProcessStep.by_id(db_conn, - step.parent_step_id) - if parent_step.owner_id != self.id_: - step.parent_step_id = None - except NotFoundException: - step.parent_step_id = None step.save(db_conn) + db_conn.cached_processes[self.id_] = self class ProcessStep: diff --git a/tests/processes.py b/tests/processes.py index 390b130..960fd70 100644 --- a/tests/processes.py +++ b/tests/processes.py @@ -28,61 +28,71 @@ class TestsWithDB(TestCaseWithDB): def setUp(self) -> None: super().setUp() self.proc1 = Process(None) - self.proc1.save_without_steps(self.db_conn) + self.proc1.save(self.db_conn) self.proc2 = Process(None) - self.proc2.save_without_steps(self.db_conn) + self.proc2.save(self.db_conn) self.proc3 = Process(None) - self.proc3.save_without_steps(self.db_conn) + self.proc3.save(self.db_conn) def test_Process_ids(self) -> None: - """Test Process.save_without_steps() re Process.id_.""" + """Test Process.save() re Process.id_.""" self.assertEqual(self.proc1.id_, Process.by_id(self.db_conn, 1, create=False).id_) self.assertEqual(self.proc2.id_, Process.by_id(self.db_conn, 2, create=False).id_) proc5 = Process(5) - proc5.save_without_steps(self.db_conn) + proc5.save(self.db_conn) self.assertEqual(proc5.id_, Process.by_id(self.db_conn, 5, create=False).id_) def test_Process_steps(self) -> None: """Test addition, nesting, and non-recursion of ProcessSteps""" + def add_step(proc: Process, + steps_proc: list[tuple[int | None, int, int | None]], + step_tuple: tuple[int | None, int, int | None], + expected_id: int) -> None: + steps_proc += [step_tuple] + proc.set_steps(self.db_conn, steps_proc) + steps_proc[-1] = (expected_id, step_tuple[1], step_tuple[2]) assert self.proc2.id_ is not None assert self.proc3.id_ is not None - self.proc1.add_step(self.db_conn, None, self.proc2.id_, None) + steps_proc1: list[tuple[int | None, int, int | None]] = [] + add_step(self.proc1, steps_proc1, (None, self.proc2.id_, None), 1) p_1_dict: dict[int, dict[str, Any]] = {1: { 'process': self.proc2, 'parent_id': None, 'is_explicit': True, 'steps': {}, 'seen': False }} self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) - s_b = self.proc1.add_step(self.db_conn, None, self.proc3.id_, None) + add_step(self.proc1, steps_proc1, (None, self.proc3.id_, None), 2) + step_2 = self.proc1.explicit_steps[-1] p_1_dict[2] = { 'process': self.proc3, 'parent_id': None, 'is_explicit': True, 'steps': {}, 'seen': False } self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) - s_c = self.proc2.add_step(self.db_conn, None, self.proc3.id_, None) - assert s_c.id_ is not None + steps_proc2: list[tuple[int | None, int, int | None]] = [] + add_step(self.proc2, steps_proc2, (None, self.proc3.id_, None), 3) p_1_dict[1]['steps'] = {3: { 'process': self.proc3, 'parent_id': None, 'is_explicit': False, 'steps': {}, 'seen': False }} self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) - self.proc1.add_step(self.db_conn, None, self.proc2.id_, s_b.id_) + add_step(self.proc1, steps_proc1, (None, self.proc2.id_, step_2.id_), + 4) p_1_dict[2]['steps'][4] = { - 'process': self.proc2, 'parent_id': s_b.id_, 'seen': False, + 'process': self.proc2, 'parent_id': step_2.id_, 'seen': False, 'is_explicit': True, 'steps': {3: { 'process': self.proc3, 'parent_id': None, 'is_explicit': False, 'steps': {}, 'seen': True }}} self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) - self.proc1.add_step(self.db_conn, None, self.proc3.id_, 999) + add_step(self.proc1, steps_proc1, (None, self.proc3.id_, 999), 5) p_1_dict[5] = { 'process': self.proc3, 'parent_id': None, 'is_explicit': True, 'steps': {}, 'seen': False } self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict) - self.proc1.add_step(self.db_conn, None, self.proc3.id_, 3) + add_step(self.proc1, steps_proc1, (None, self.proc3.id_, 3), 6) p_1_dict[6] = { 'process': self.proc3, 'parent_id': None, 'is_explicit': True, 'steps': {}, 'seen': False @@ -133,7 +143,8 @@ class TestsWithDB(TestCaseWithDB): def test_ProcessStep_singularity(self) -> None: """Test pointers made for single object keep pointing to it.""" assert self.proc2.id_ is not None - step = self.proc1.add_step(self.db_conn, None, self.proc2.id_, None) + self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)]) + step = self.proc1.explicit_steps[-1] assert step.id_ is not None step_retrieved = ProcessStep.by_id(self.db_conn, step.id_) step.parent_step_id = 99 @@ -142,7 +153,7 @@ class TestsWithDB(TestCaseWithDB): def test_Process_singularity(self) -> None: """Test pointers made for single object keep pointing to it.""" assert self.proc2.id_ is not None - self.proc1.add_step(self.db_conn, None, self.proc2.id_, None) + self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)]) p_retrieved = Process.by_id(self.db_conn, self.proc1.id_) self.assertEqual(self.proc1.explicit_steps, p_retrieved.explicit_steps) diff --git a/tests/todos.py b/tests/todos.py index f33534d..6bc5a05 100644 --- a/tests/todos.py +++ b/tests/todos.py @@ -18,7 +18,7 @@ class TestsWithDB(TestCaseWithDB): self.day2 = Day('2024-01-02') self.day2.save(self.db_conn) self.proc = Process(None) - self.proc.save_without_steps(self.db_conn) + self.proc.save(self.db_conn) self.cond1 = Condition(None) self.cond1.save(self.db_conn) self.cond2 = Condition(None) @@ -30,7 +30,7 @@ class TestsWithDB(TestCaseWithDB): todo = Todo(None, process_unsaved, False, self.day1) with self.assertRaises(NotFoundException): todo.save(self.db_conn) - process_unsaved.save_without_steps(self.db_conn) + process_unsaved.save(self.db_conn) todo.save(self.db_conn) self.assertEqual(Todo.by_id(self.db_conn, 1), todo) with self.assertRaises(NotFoundException):