home · contact · privacy
Refactor Process/ProcessStep setting and saving.
authorChristian Heller <c.heller@plomlompom.de>
Thu, 18 Apr 2024 20:28:51 +0000 (22:28 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Thu, 18 Apr 2024 20:28:51 +0000 (22:28 +0200)
plomtask/db.py
plomtask/http.py
plomtask/processes.py
tests/processes.py
tests/todos.py

index 4a82132ff9e89d990c5d3f04f1b4f7395bfb0fda..f53a94c4f085f3664d66f0b0cc38863b217741e1 100644 (file)
@@ -23,7 +23,7 @@ class DatabaseFile:  # pylint: disable=too-few-public-methods
         self._check()
 
     def _check(self) -> None:
-        """Check file exists and is of proper schema."""
+        """Check file exists, and is of proper schema."""
         self.exists = isfile(self.path)
         if self.exists:
             self._validate_schema()
index cb00825d469989935f88cdd97132cc96f69b1492..25be899044d652145f166ad1ebed5ff3528b0db0 100644 (file)
@@ -225,23 +225,25 @@ class TaskHandler(BaseHTTPRequestHandler):
                                self.form_data.get_all_int('condition'))
         process.set_fulfills(self.conn, self.form_data.get_all_int('fulfills'))
         process.set_undoes(self.conn, self.form_data.get_all_int('undoes'))
-        process.save_without_steps(self.conn)
+        process.save_id(self.conn)
         assert process.id_ is not None  # for mypy
         process.explicit_steps = []
+        steps: list[tuple[int | None, int, int | None]] = []
         for step_id in self.form_data.get_all_int('steps'):
-            for step_process_id in\
-                    self.form_data.get_all_int(f'new_step_to_{step_id}'):
-                process.add_step(self.conn, None, step_process_id, step_id)
+            for step_process_id in self.form_data.get_all_int(
+                    f'new_step_to_{step_id}'):
+                steps += [(None, step_process_id, step_id)]
             if step_id not in self.form_data.get_all_int('keep_step'):
                 continue
             step_process_id = self.form_data.get_int(
                     f'step_{step_id}_process_id')
             parent_id = self.form_data.get_int_or_none(
                     f'step_{step_id}_parent_id')
-            process.add_step(self.conn, step_id, step_process_id, parent_id)
+            steps += [(step_id, step_process_id, parent_id)]
         for step_process_id in self.form_data.get_all_int('new_top_step'):
-            process.add_step(self.conn, None, step_process_id, None)
-        process.fix_steps(self.conn)
+            steps += [(None, step_process_id, None)]
+        process.set_steps(self.conn, steps)
+        process.save(self.conn)
 
     def do_POST_condition(self) -> None:
         """Update/insert Condition of ?id= and fields defined in postvars."""
index dc0613f8dbb2b01f941603203f57cbddb3766693..76ed30df049b5d801b283ab904364da27f05f240 100644 (file)
@@ -149,12 +149,15 @@ class Process:
         """Set self.undoes to Conditions identified by ids."""
         self.set_conditions(db_conn, ids, 'undoes')
 
-    def add_step(self, db_conn: DatabaseConnection, id_: int | None,
-                 step_process_id: int,
-                 parent_step_id: int | None) -> ProcessStep:
+    def _add_step(self,
+                  db_conn: DatabaseConnection,
+                  id_: int | None,
+                  step_process_id: int,
+                  parent_step_id: int | None) -> ProcessStep:
         """Create new ProcessStep, save and add it to self.explicit_steps.
 
         Also checks against step recursion.
+
         The new step's parent_step_id will fall back to None either if no
         matching ProcessStep is found (which can be assumed in case it was
         just deleted under its feet), or if the parent step would not be
@@ -180,10 +183,27 @@ class Process:
         step.save(db_conn)  # NB: This ensures a non-None step.id_.
         return step
 
-    def save_without_steps(self, db_conn: DatabaseConnection) -> None:
-        """Add (or re-write) self and connected VersionedAttributes to DB."""
+    def set_steps(self, db_conn: DatabaseConnection,
+                  steps: list[tuple[int | None, int, int | None]]) -> None:
+        """Set self.explicit_steps in bulk."""
+        for step in self.explicit_steps:
+            assert step.id_ is not None
+            del db_conn.cached_process_steps[step.id_]
+        self.explicit_steps = []
+        db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
+                     (self.id_,))
+        for step_tuple in steps:
+            self._add_step(db_conn, step_tuple[0],
+                           step_tuple[1], step_tuple[2])
+
+    def save_id(self, db_conn: DatabaseConnection) -> None:
+        """Write bare-bones self (sans connected items), ensuring self.id_."""
         cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
         self.id_ = cursor.lastrowid
+
+    def save(self, db_conn: DatabaseConnection) -> None:
+        """Add (or re-write) self and connected items to DB."""
+        self.save_id(db_conn)
         self.title.save(db_conn)
         self.description.save(db_conn)
         self.effort.save(db_conn)
@@ -203,27 +223,11 @@ class Process:
             db_conn.exec('INSERT INTO process_undoes VALUES (?,?)',
                          (self.id_, condition.id_))
         assert self.id_ is not None
-        db_conn.cached_processes[self.id_] = self
-
-    def fix_steps(self, db_conn: DatabaseConnection) -> None:
-        """Rewrite ProcessSteps from self.explicit_steps.
-
-        This also fixes illegal Step.parent_step_id values, i.e. those pointing
-        to steps now absent, or owned by a different Process, fall back into
-        .parent_step_id=None
-        """
         db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
                      (self.id_,))
         for step in self.explicit_steps:
-            if step.parent_step_id is not None:
-                try:
-                    parent_step = ProcessStep.by_id(db_conn,
-                                                    step.parent_step_id)
-                    if parent_step.owner_id != self.id_:
-                        step.parent_step_id = None
-                except NotFoundException:
-                    step.parent_step_id = None
             step.save(db_conn)
+        db_conn.cached_processes[self.id_] = self
 
 
 class ProcessStep:
index 390b1302932dce72da1ebc375a5f1ec6f59d8eec..960fd707eee4345cb4350c065d9dcc974e94405f 100644 (file)
@@ -28,61 +28,71 @@ class TestsWithDB(TestCaseWithDB):
     def setUp(self) -> None:
         super().setUp()
         self.proc1 = Process(None)
-        self.proc1.save_without_steps(self.db_conn)
+        self.proc1.save(self.db_conn)
         self.proc2 = Process(None)
-        self.proc2.save_without_steps(self.db_conn)
+        self.proc2.save(self.db_conn)
         self.proc3 = Process(None)
-        self.proc3.save_without_steps(self.db_conn)
+        self.proc3.save(self.db_conn)
 
     def test_Process_ids(self) -> None:
-        """Test Process.save_without_steps() re Process.id_."""
+        """Test Process.save() re Process.id_."""
         self.assertEqual(self.proc1.id_,
                          Process.by_id(self.db_conn, 1, create=False).id_)
         self.assertEqual(self.proc2.id_,
                          Process.by_id(self.db_conn, 2, create=False).id_)
         proc5 = Process(5)
-        proc5.save_without_steps(self.db_conn)
+        proc5.save(self.db_conn)
         self.assertEqual(proc5.id_,
                          Process.by_id(self.db_conn, 5, create=False).id_)
 
     def test_Process_steps(self) -> None:
         """Test addition, nesting, and non-recursion of ProcessSteps"""
+        def add_step(proc: Process,
+                     steps_proc: list[tuple[int | None, int, int | None]],
+                     step_tuple: tuple[int | None, int, int | None],
+                     expected_id: int) -> None:
+            steps_proc += [step_tuple]
+            proc.set_steps(self.db_conn, steps_proc)
+            steps_proc[-1] = (expected_id, step_tuple[1], step_tuple[2])
         assert self.proc2.id_ is not None
         assert self.proc3.id_ is not None
-        self.proc1.add_step(self.db_conn, None, self.proc2.id_, None)
+        steps_proc1: list[tuple[int | None, int, int | None]] = []
+        add_step(self.proc1, steps_proc1, (None, self.proc2.id_, None), 1)
         p_1_dict: dict[int, dict[str, Any]] = {1: {
             'process': self.proc2, 'parent_id': None,
             'is_explicit': True, 'steps': {}, 'seen': False
         }}
         self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
-        s_b = self.proc1.add_step(self.db_conn, None, self.proc3.id_, None)
+        add_step(self.proc1, steps_proc1, (None, self.proc3.id_, None), 2)
+        step_2 = self.proc1.explicit_steps[-1]
         p_1_dict[2] = {
             'process': self.proc3, 'parent_id': None,
             'is_explicit': True, 'steps': {}, 'seen': False
         }
         self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
-        s_c = self.proc2.add_step(self.db_conn, None, self.proc3.id_, None)
-        assert s_c.id_ is not None
+        steps_proc2: list[tuple[int | None, int, int | None]] = []
+        add_step(self.proc2, steps_proc2, (None, self.proc3.id_, None), 3)
         p_1_dict[1]['steps'] = {3: {
             'process': self.proc3, 'parent_id': None,
             'is_explicit': False, 'steps': {}, 'seen': False
         }}
         self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
-        self.proc1.add_step(self.db_conn, None, self.proc2.id_, s_b.id_)
+        add_step(self.proc1, steps_proc1, (None, self.proc2.id_, step_2.id_),
+                 4)
         p_1_dict[2]['steps'][4] = {
-            'process': self.proc2, 'parent_id': s_b.id_, 'seen': False,
+            'process': self.proc2, 'parent_id': step_2.id_, 'seen': False,
             'is_explicit': True, 'steps': {3: {
                 'process': self.proc3, 'parent_id': None,
                 'is_explicit': False, 'steps': {}, 'seen': True
                 }}}
         self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
-        self.proc1.add_step(self.db_conn, None, self.proc3.id_, 999)
+        add_step(self.proc1, steps_proc1, (None, self.proc3.id_, 999), 5)
         p_1_dict[5] = {
             'process': self.proc3, 'parent_id': None,
             'is_explicit': True, 'steps': {}, 'seen': False
         }
         self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
-        self.proc1.add_step(self.db_conn, None, self.proc3.id_, 3)
+        add_step(self.proc1, steps_proc1, (None, self.proc3.id_, 3), 6)
         p_1_dict[6] = {
             'process': self.proc3, 'parent_id': None,
             'is_explicit': True, 'steps': {}, 'seen': False
@@ -133,7 +143,8 @@ class TestsWithDB(TestCaseWithDB):
     def test_ProcessStep_singularity(self) -> None:
         """Test pointers made for single object keep pointing to it."""
         assert self.proc2.id_ is not None
-        step = self.proc1.add_step(self.db_conn, None, self.proc2.id_, None)
+        self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)])
+        step = self.proc1.explicit_steps[-1]
         assert step.id_ is not None
         step_retrieved = ProcessStep.by_id(self.db_conn, step.id_)
         step.parent_step_id = 99
@@ -142,7 +153,7 @@ class TestsWithDB(TestCaseWithDB):
     def test_Process_singularity(self) -> None:
         """Test pointers made for single object keep pointing to it."""
         assert self.proc2.id_ is not None
-        self.proc1.add_step(self.db_conn, None, self.proc2.id_, None)
+        self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)])
         p_retrieved = Process.by_id(self.db_conn, self.proc1.id_)
         self.assertEqual(self.proc1.explicit_steps, p_retrieved.explicit_steps)
 
index f33534d2b199586a10fb6fd071343d6c3c2236e9..6bc5a05fbe465c57d2cdef1d69735d4c0c00cb2e 100644 (file)
@@ -18,7 +18,7 @@ class TestsWithDB(TestCaseWithDB):
         self.day2 = Day('2024-01-02')
         self.day2.save(self.db_conn)
         self.proc = Process(None)
-        self.proc.save_without_steps(self.db_conn)
+        self.proc.save(self.db_conn)
         self.cond1 = Condition(None)
         self.cond1.save(self.db_conn)
         self.cond2 = Condition(None)
@@ -30,7 +30,7 @@ class TestsWithDB(TestCaseWithDB):
         todo = Todo(None, process_unsaved, False, self.day1)
         with self.assertRaises(NotFoundException):
             todo.save(self.db_conn)
-        process_unsaved.save_without_steps(self.db_conn)
+        process_unsaved.save(self.db_conn)
         todo.save(self.db_conn)
         self.assertEqual(Todo.by_id(self.db_conn, 1), todo)
         with self.assertRaises(NotFoundException):