home · contact · privacy
Hide (almost all) remaining SQL code in DB module.
[plomtask] / plomtask / processes.py
index e5851d0e0d5586215d6521e495a25da95f360e43..490acc38db705d25a7d783c1c0420f6bcc552f9c 100644 (file)
@@ -30,9 +30,9 @@ class Process(BaseModel):
         for id_, process in db_conn.cached_processes.items():
             processes[id_] = process
         already_recorded = processes.keys()
-        for row in db_conn.exec('SELECT id FROM processes'):
-            if row[0] not in already_recorded:
-                process = cls.by_id(db_conn, row[0])
+        for id_ in db_conn.column_all('processes', 'id'):
+            if id_ not in already_recorded:
+                process = cls.by_id(db_conn, id_)
                 processes[process.id_] = process
         return list(processes.values())
 
@@ -50,26 +50,29 @@ class Process(BaseModel):
         if isinstance(process.id_, int):
             for name in ('title', 'description', 'effort'):
                 table = f'process_{name}s'
-                for row in db_conn.all_where(table, 'parent', process.id_):
+                for row in db_conn.row_where(table, 'parent', process.id_):
                     getattr(process, name).history_from_row(row)
-            for row in db_conn.all_where('process_steps', 'owner',
+            for row in db_conn.row_where('process_steps', 'owner',
                                          process.id_):
                 step = ProcessStep.from_table_row(db_conn, row)
                 process.explicit_steps += [step]
             for name in ('conditions', 'fulfills', 'undoes'):
                 table = f'process_{name}'
-                for row in db_conn.all_where(table, 'process', process.id_):
+                for cond_id in db_conn.column_where(table, 'condition',
+                                                    'process', process.id_):
                     target = getattr(process, name)
-                    target += [Condition.by_id(db_conn, row[1])]
+                    target += [Condition.by_id(db_conn, cond_id)]
         assert isinstance(process, Process)
         return process
 
     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
         """Return Processes using self for a ProcessStep."""
+        if not self.id_:
+            return []
         owner_ids = set()
-        for owner_id in db_conn.exec('SELECT owner FROM process_steps WHERE'
-                                     ' step_process = ?', (self.id_,)):
-            owner_ids.add(owner_id[0])
+        for id_ in db_conn.column_where('process_steps', 'owner',
+                                        'step_process', self.id_):
+            owner_ids.add(id_)
         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
 
     def get_steps(self, db_conn: DatabaseConnection, external_owner:
@@ -162,12 +165,12 @@ class Process(BaseModel):
     def set_steps(self, db_conn: DatabaseConnection,
                   steps: list[tuple[int | None, int, int | None]]) -> None:
         """Set self.explicit_steps in bulk."""
+        assert isinstance(self.id_, int)
         for step in self.explicit_steps:
             assert isinstance(step.id_, int)
             del db_conn.cached_process_steps[step.id_]
         self.explicit_steps = []
-        db_conn.exec('DELETE FROM process_steps WHERE owner = ?',
-                     (self.id_,))
+        db_conn.delete_where('process_steps', 'owner', self.id_)
         for step_tuple in steps:
             self._add_step(db_conn, step_tuple[0],
                            step_tuple[1], step_tuple[2])
@@ -185,8 +188,7 @@ class Process(BaseModel):
                                   [[c.id_] for c in self.fulfills])
         db_conn.rewrite_relations('process_undoes', 'process', self.id_,
                                   [[c.id_] for c in self.undoes])
-        db_conn.exec('DELETE FROM process_steps WHERE owner = ?',
-                     (self.id_,))
+        db_conn.delete_where('process_steps', 'owner', self.id_)
         for step in self.explicit_steps:
             step.save(db_conn)
         db_conn.cached_processes[self.id_] = self