seen: bool
-class Process(BaseModel, ConditionsRelations):
+class Process(BaseModel[int], ConditionsRelations):
"""Template for, and metadata for, Todos, and their arrangements."""
table_name = 'processes'
self.enables: list[Condition] = []
self.disables: list[Condition] = []
- @classmethod
- def all(cls, db_conn: DatabaseConnection) -> list[Process]:
- """Collect all Processes and their connected VersionedAttributes."""
- processes = {}
- for id_, process in db_conn.cached_processes.items():
- processes[id_] = process
- already_recorded = processes.keys()
- for id_ in db_conn.column_all('processes', 'id'):
- if id_ not in already_recorded:
- process = cls.by_id(db_conn, id_)
- processes[process.id_] = process
- return list(processes.values())
-
@classmethod
def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
create: bool = False) -> Process:
"""Set self.explicit_steps in bulk."""
assert isinstance(self.id_, int)
for step in self.explicit_steps:
- assert isinstance(step.id_, int)
- del db_conn.cached_process_steps[step.id_]
+ step.uncache()
self.explicit_steps = []
db_conn.delete_where('process_steps', 'owner', self.id_)
for step_tuple in steps:
db_conn.delete_where('process_steps', 'owner', self.id_)
for step in self.explicit_steps:
step.save(db_conn)
- db_conn.cached_processes[self.id_] = self
-class ProcessStep(BaseModel):
+class ProcessStep(BaseModel[int]):
"""Sub-unit of Processes."""
table_name = 'process_steps'
to_save = ['owner_id', 'step_process_id', 'parent_step_id']
"""Retrieve ProcessStep by id_, or throw NotFoundException."""
step, _ = super()._by_id(db_conn, id_)
if step:
- assert isinstance(step, ProcessStep)
return step
raise NotFoundException(f'found no ProcessStep of ID {id_}')