home · contact · privacy
Slightly improve and re-organize Condition tests.
[plomtask] / plomtask / processes.py
index 8082c3cb1d412fa7aa7806735e20ee908f27c404..bb1de3a4a3356415473bc652d650e202886eb01b 100644 (file)
@@ -33,7 +33,13 @@ class Process(BaseModel[int], ConditionsRelations):
                          ('process_disables', 'process', 'disables', 0),
                          ('process_step_suppressions', 'process',
                           'suppressed_steps', 0)]
+    add_to_dict = ['explicit_steps']
     to_search = ['title.newest', 'description.newest']
+    can_create_by_id = True
+    sorters = {'steps': lambda p: len(p.explicit_steps),
+               'owners': lambda p: p.n_owners,
+               'effort': lambda p: p.effort.newest,
+               'title': lambda p: p.title.newest}
 
     def __init__(self, id_: int | None, calendarize: bool = False) -> None:
         BaseModel.__init__(self, id_)
@@ -50,21 +56,8 @@ class Process(BaseModel[int], ConditionsRelations):
     def from_table_row(cls, db_conn: DatabaseConnection,
                        row: Row | list[Any]) -> Process:
         """Make from DB row, with dependencies."""
-        # pylint: disable=no-member
         process = super().from_table_row(db_conn, row)
-        assert isinstance(process.id_, int)
-        for name in ('title', 'description', 'effort'):
-            table = f'process_{name}s'
-            for row_ in db_conn.row_where(table, 'parent', process.id_):
-                getattr(process, name).history_from_row(row_)
-        for row_ in db_conn.row_where('process_steps', 'owner',
-                                      process.id_):
-            step = ProcessStep.from_table_row(db_conn, row_)
-            process.explicit_steps += [step]
-        for row_ in db_conn.row_where('process_step_suppressions', 'process',
-                                      process.id_):
-            step = ProcessStep.by_id(db_conn, row_[1])
-            process.suppressed_steps += [step]
+        assert process.id_ is not None
         for name in ('conditions', 'blockers', 'enables', 'disables'):
             table = f'process_{name}'
             assert isinstance(process.id_, int)
@@ -72,6 +65,13 @@ class Process(BaseModel[int], ConditionsRelations):
                                              'process', process.id_):
                 target = getattr(process, name)
                 target += [Condition.by_id(db_conn, c_id)]
+        for row_ in db_conn.row_where('process_steps', 'owner', process.id_):
+            step = ProcessStep.from_table_row(db_conn, row_)
+            process.explicit_steps += [step]
+        for row_ in db_conn.row_where('process_step_suppressions', 'process',
+                                      process.id_):
+            step = ProcessStep.by_id(db_conn, row_[1])
+            process.suppressed_steps += [step]
         process.n_owners = len(process.used_as_step_by(db_conn))
         return process
 
@@ -149,12 +149,9 @@ class Process(BaseModel[int], ConditionsRelations):
                 walk_steps(step)
 
         assert isinstance(self.id_, int)
-        for step in self.explicit_steps:
-            step.uncache()
-        self.explicit_steps = []
-        db_conn.delete_where('process_steps', 'owner', self.id_)
-        for step in steps:
-            step.save(db_conn)
+        for step in [s for s in self.explicit_steps if s not in steps]:
+            step.remove(db_conn)
+        for step in [s for s in steps if s not in self.explicit_steps]:
             if step.parent_step_id is not None:
                 try:
                     parent_step = ProcessStep.by_id(db_conn,
@@ -164,7 +161,7 @@ class Process(BaseModel[int], ConditionsRelations):
                 except NotFoundException:
                     step.parent_step_id = None
             walk_steps(step)
-            self.explicit_steps += [step]
+            step.save(db_conn)
 
     def set_owners(self, db_conn: DatabaseConnection,
                    owner_ids: list[int]) -> None:
@@ -222,6 +219,16 @@ class ProcessStep(BaseModel[int]):
         self.step_process_id = step_process_id
         self.parent_step_id = parent_step_id
 
+    def save(self, db_conn: DatabaseConnection) -> None:
+        """Update into DB/cache, and owner's .explicit_steps."""
+        super().save(db_conn)
+        owner = Process.by_id(db_conn, self.owner_id)
+        if self not in owner.explicit_steps:
+            for s in [s for s in owner.explicit_steps if s.id_ == self.id_]:
+                s.remove(db_conn)
+            owner.explicit_steps += [self]
+        owner.explicit_steps.sort(key=hash)
+
     def remove(self, db_conn: DatabaseConnection) -> None:
         """Remove from DB, and owner's .explicit_steps."""
         owner = Process.by_id(db_conn, self.owner_id)