for child in explicit_children:
assert isinstance(child.id_, int)
node.steps[child.id_] = make_node(child)
- # ensure that one (!) explicit step of process replaces
- # one (!) implicit step of same process
- for i in [i for i, s in node.steps.items()
- if not s.is_explicit
- and s.process.id_ == child.step_process_id]:
- del node.steps[i]
- break
+ # # ensure that one (!) explicit step of process replaces
+ # # one (!) implicit step of same process
+ # for i in [i for i, s in node.steps.items()
+ # if not s.process_step.owner_id == child.id_
+ # and s.process.id_ == child.step_process_id]:
+ # del node.steps[i]
+ # break
node.seen = node_id in seen_step_ids
seen_step_ids.add(node_id)
for id_, step in node.steps.items():
walk_steps(step_id, step_node)
return steps
- def _add_step(self,
- db_conn: DatabaseConnection,
- id_: int | None,
- step_process_id: int,
- parent_step_id: int | None) -> ProcessStep:
- """Create new ProcessStep, save and add it to self.explicit_steps.
-
- Also checks against step recursion.
+ def set_steps(self, db_conn: DatabaseConnection,
+ steps: list[ProcessStep]) -> None:
+ """Set self.explicit_steps in bulk.
- The new step's parent_step_id will fall back to None either if no
- matching ProcessStep is found (which can be assumed in case it was
- just deleted under its feet), or if the parent step would not be
- owned by the current Process.
+ Checks against recursion, and turns into top-level steps any of
+ unknown or non-owned parent.
"""
def walk_steps(node: ProcessStep) -> None:
if node.step_process_id == self.id_:
for step in step_process.explicit_steps:
walk_steps(step)
- if parent_step_id is not None:
- try:
- parent_step = ProcessStep.by_id(db_conn, parent_step_id)
- if parent_step.owner_id != self.id_:
- parent_step_id = None
- except NotFoundException:
- parent_step_id = None
- assert isinstance(self.id_, int)
- step = ProcessStep(id_, self.id_, step_process_id, parent_step_id)
- walk_steps(step)
- self.explicit_steps += [step]
- step.save(db_conn) # NB: This ensures a non-None step.id_.
- return step
-
- def set_steps(self, db_conn: DatabaseConnection,
- steps: list[tuple[int | None, int, int | None]]) -> None:
- """Set self.explicit_steps in bulk."""
assert isinstance(self.id_, int)
for step in self.explicit_steps:
step.uncache()
self.explicit_steps = []
db_conn.delete_where('process_steps', 'owner', self.id_)
- for step_tuple in steps:
- self._add_step(db_conn, step_tuple[0],
- step_tuple[1], step_tuple[2])
+ for step in steps:
+ step.save(db_conn)
+ if step.parent_step_id is not None:
+ try:
+ parent_step = ProcessStep.by_id(db_conn,
+ step.parent_step_id)
+ if parent_step.owner_id != self.id_:
+ step.parent_step_id = None
+ except NotFoundException:
+ step.parent_step_id = None
+ walk_steps(step)
+ self.explicit_steps += [step]
def save(self, db_conn: DatabaseConnection) -> None:
"""Add (or re-write) self and connected items to DB."""