('process_disables', 'process', 'disables', 0),
('process_step_suppressions', 'process',
'suppressed_steps', 0)]
+ add_to_dict = ['explicit_steps']
to_search = ['title.newest', 'description.newest']
+ can_create_by_id = True
def __init__(self, id_: int | None, calendarize: bool = False) -> None:
BaseModel.__init__(self, id_)
def from_table_row(cls, db_conn: DatabaseConnection,
row: Row | list[Any]) -> Process:
"""Make from DB row, with dependencies."""
- # pylint: disable=no-member
process = super().from_table_row(db_conn, row)
- assert isinstance(process.id_, int)
- for name in ('title', 'description', 'effort'):
- table = f'process_{name}s'
- for row_ in db_conn.row_where(table, 'parent', process.id_):
- getattr(process, name).history_from_row(row_)
- for row_ in db_conn.row_where('process_steps', 'owner',
- process.id_):
- step = ProcessStep.from_table_row(db_conn, row_)
- process.explicit_steps += [step]
- for row_ in db_conn.row_where('process_step_suppressions', 'process',
- process.id_):
- step = ProcessStep.by_id(db_conn, row_[1])
- process.suppressed_steps += [step]
+ assert process.id_ is not None
for name in ('conditions', 'blockers', 'enables', 'disables'):
table = f'process_{name}'
assert isinstance(process.id_, int)
'process', process.id_):
target = getattr(process, name)
target += [Condition.by_id(db_conn, c_id)]
+ for row_ in db_conn.row_where('process_steps', 'owner', process.id_):
+ step = ProcessStep.from_table_row(db_conn, row_)
+ process.explicit_steps += [step]
+ for row_ in db_conn.row_where('process_step_suppressions', 'process',
+ process.id_):
+ step = ProcessStep.by_id(db_conn, row_[1])
+ process.suppressed_steps += [step]
process.n_owners = len(process.used_as_step_by(db_conn))
return process
walk_steps(step)
assert isinstance(self.id_, int)
- for step in self.explicit_steps:
- step.uncache()
- self.explicit_steps = []
- db_conn.delete_where('process_steps', 'owner', self.id_)
- for step in steps:
- step.save(db_conn)
+ for step in [s for s in self.explicit_steps if s not in steps]:
+ step.remove(db_conn)
+ for step in [s for s in steps if s not in self.explicit_steps]:
if step.parent_step_id is not None:
try:
parent_step = ProcessStep.by_id(db_conn,
except NotFoundException:
step.parent_step_id = None
walk_steps(step)
- self.explicit_steps += [step]
+ step.save(db_conn)
def set_owners(self, db_conn: DatabaseConnection,
owner_ids: list[int]) -> None:
self.step_process_id = step_process_id
self.parent_step_id = parent_step_id
+ def save(self, db_conn: DatabaseConnection) -> None:
+ """Update into DB/cache, and owner's .explicit_steps."""
+ super().save(db_conn)
+ owner = Process.by_id(db_conn, self.owner_id)
+ if self not in owner.explicit_steps:
+ for s in [s for s in owner.explicit_steps if s.id_ == self.id_]:
+ s.remove(db_conn)
+ owner.explicit_steps += [self]
+ owner.explicit_steps.sort(key=hash)
+
def remove(self, db_conn: DatabaseConnection) -> None:
"""Remove from DB, and owner's .explicit_steps."""
owner = Process.by_id(db_conn, self.owner_id)