home · contact · privacy
Slightly improve and re-organize Condition tests.
[plomtask] / plomtask / processes.py
index 23da7c10f365539661fbf7cb5815f61158a6c923..bb1de3a4a3356415473bc652d650e202886eb01b 100644 (file)
@@ -33,7 +33,13 @@ class Process(BaseModel[int], ConditionsRelations):
                          ('process_disables', 'process', 'disables', 0),
                          ('process_step_suppressions', 'process',
                           'suppressed_steps', 0)]
+    add_to_dict = ['explicit_steps']
     to_search = ['title.newest', 'description.newest']
+    can_create_by_id = True
+    sorters = {'steps': lambda p: len(p.explicit_steps),
+               'owners': lambda p: p.n_owners,
+               'effort': lambda p: p.effort.newest,
+               'title': lambda p: p.title.newest}
 
     def __init__(self, id_: int | None, calendarize: bool = False) -> None:
         BaseModel.__init__(self, id_)
@@ -44,25 +50,14 @@ class Process(BaseModel[int], ConditionsRelations):
         self.explicit_steps: list[ProcessStep] = []
         self.suppressed_steps: list[ProcessStep] = []
         self.calendarize = calendarize
+        self.n_owners: int | None = None  # only set by from_table_row
 
     @classmethod
     def from_table_row(cls, db_conn: DatabaseConnection,
                        row: Row | list[Any]) -> Process:
         """Make from DB row, with dependencies."""
         process = super().from_table_row(db_conn, row)
-        assert isinstance(process.id_, int)
-        for name in ('title', 'description', 'effort'):
-            table = f'process_{name}s'
-            for row_ in db_conn.row_where(table, 'parent', process.id_):
-                getattr(process, name).history_from_row(row_)
-        for row_ in db_conn.row_where('process_steps', 'owner',
-                                      process.id_):
-            step = ProcessStep.from_table_row(db_conn, row_)
-            process.explicit_steps += [step]  # pylint: disable=no-member
-        for row_ in db_conn.row_where('process_step_suppressions', 'process',
-                                      process.id_):
-            step = ProcessStep.by_id(db_conn, row_[1])
-            process.suppressed_steps += [step]  # pylint: disable=no-member
+        assert process.id_ is not None
         for name in ('conditions', 'blockers', 'enables', 'disables'):
             table = f'process_{name}'
             assert isinstance(process.id_, int)
@@ -70,6 +65,14 @@ class Process(BaseModel[int], ConditionsRelations):
                                              'process', process.id_):
                 target = getattr(process, name)
                 target += [Condition.by_id(db_conn, c_id)]
+        for row_ in db_conn.row_where('process_steps', 'owner', process.id_):
+            step = ProcessStep.from_table_row(db_conn, row_)
+            process.explicit_steps += [step]
+        for row_ in db_conn.row_where('process_step_suppressions', 'process',
+                                      process.id_):
+            step = ProcessStep.by_id(db_conn, row_[1])
+            process.suppressed_steps += [step]
+        process.n_owners = len(process.used_as_step_by(db_conn))
         return process
 
     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
@@ -146,12 +149,9 @@ class Process(BaseModel[int], ConditionsRelations):
                 walk_steps(step)
 
         assert isinstance(self.id_, int)
-        for step in self.explicit_steps:
-            step.uncache()
-        self.explicit_steps = []
-        db_conn.delete_where('process_steps', 'owner', self.id_)
-        for step in steps:
-            step.save(db_conn)
+        for step in [s for s in self.explicit_steps if s not in steps]:
+            step.remove(db_conn)
+        for step in [s for s in steps if s not in self.explicit_steps]:
             if step.parent_step_id is not None:
                 try:
                     parent_step = ProcessStep.by_id(db_conn,
@@ -161,7 +161,7 @@ class Process(BaseModel[int], ConditionsRelations):
                 except NotFoundException:
                     step.parent_step_id = None
             walk_steps(step)
-            self.explicit_steps += [step]
+            step.save(db_conn)
 
     def set_owners(self, db_conn: DatabaseConnection,
                    owner_ids: list[int]) -> None:
@@ -219,6 +219,16 @@ class ProcessStep(BaseModel[int]):
         self.step_process_id = step_process_id
         self.parent_step_id = parent_step_id
 
+    def save(self, db_conn: DatabaseConnection) -> None:
+        """Update into DB/cache, and owner's .explicit_steps."""
+        super().save(db_conn)
+        owner = Process.by_id(db_conn, self.owner_id)
+        if self not in owner.explicit_steps:
+            for s in [s for s in owner.explicit_steps if s.id_ == self.id_]:
+                s.remove(db_conn)
+            owner.explicit_steps += [self]
+        owner.explicit_steps.sort(key=hash)
+
     def remove(self, db_conn: DatabaseConnection) -> None:
         """Remove from DB, and owner's .explicit_steps."""
         owner = Process.by_id(db_conn, self.owner_id)