- @classmethod
- def all(cls, db_conn: DatabaseConnection) -> list[Condition]:
- """Collect all Conditions and their VersionedAttributes."""
- conditions = {}
- for id_, condition in db_conn.cached_conditions.items():
- conditions[id_] = condition
- already_recorded = conditions.keys()
- for id_ in db_conn.column_all('conditions', 'id'):
- if id_ not in already_recorded:
- condition = cls.by_id(db_conn, id_)
- conditions[condition.id_] = condition
- return list(conditions.values())
-
- @classmethod
- def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
- create: bool = False) -> Condition:
- """Collect (or create) Condition and its VersionedAttributes."""
- condition = None
- if id_:
- condition, _ = super()._by_id(db_conn, id_)
- if not condition:
- if not create:
- raise NotFoundException(f'Condition not found of id: {id_}')
- condition = cls(id_, False)
- condition.save(db_conn)
- assert isinstance(condition, Condition)
- return condition
-
- def save(self, db_conn: DatabaseConnection) -> None:
- """Save self and its VersionedAttributes to DB and cache."""
- self.save_core(db_conn)
- self.title.save(db_conn)
- self.description.save(db_conn)
- assert isinstance(self.id_, int)
- db_conn.cached_conditions[self.id_] = self
+ Checks for Todos and Processes that depend on Condition, prohibits
+ deletion if found.
+ """
+ if self.id_ is None:
+ raise HandledException('cannot remove unsaved item')
+ for item in ('process', 'todo'):
+ for attr in ('conditions', 'blockers', 'enables', 'disables'):
+ table_name = f'{item}_{attr}'
+ for _ in db_conn.row_where(table_name, 'condition', self.id_):
+ raise HandledException('cannot remove Condition in use')
+ super().remove(db_conn)