seen: bool
-class Process(BaseModel, ConditionsRelations):
+class Process(BaseModel[int], ConditionsRelations):
"""Template for, and metadata for, Todos, and their arrangements."""
table_name = 'processes'
def all(cls, db_conn: DatabaseConnection) -> list[Process]:
"""Collect all Processes and their connected VersionedAttributes."""
processes = {}
- for id_, process in db_conn.cached_processes.items():
+ for id_, process in cls.cache_.items():
processes[id_] = process
already_recorded = processes.keys()
for id_ in db_conn.column_all('processes', 'id'):
if id_ not in already_recorded:
process = cls.by_id(db_conn, id_)
+ assert isinstance(process.id_, int)
processes[process.id_] = process
return list(processes.values())
create: bool = False) -> Process:
"""Collect Process, its VersionedAttributes, and its child IDs."""
process = None
+ from_cache = False
if id_:
- process, _ = super()._by_id(db_conn, id_)
- if not process:
- if not create:
- raise NotFoundException(f'Process not found of id: {id_}')
- process = Process(id_)
- if isinstance(process.id_, int):
- for name in ('title', 'description', 'effort'):
- table = f'process_{name}s'
- for row in db_conn.row_where(table, 'parent', process.id_):
- getattr(process, name).history_from_row(row)
- for row in db_conn.row_where('process_steps', 'owner',
- process.id_):
- step = ProcessStep.from_table_row(db_conn, row)
- process.explicit_steps += [step]
- for name in ('conditions', 'enables', 'disables'):
- table = f'process_{name}'
- for cond_id in db_conn.column_where(table, 'condition',
- 'process', process.id_):
- target = getattr(process, name)
- target += [Condition.by_id(db_conn, cond_id)]
+ process, from_cache = super()._by_id(db_conn, id_)
+ if not from_cache:
+ if not process:
+ if not create:
+ raise NotFoundException(f'Process not found of id: {id_}')
+ process = Process(id_)
+ if isinstance(process.id_, int):
+ for name in ('title', 'description', 'effort'):
+ table = f'process_{name}s'
+ for row in db_conn.row_where(table, 'parent', process.id_):
+ getattr(process, name).history_from_row(row)
+ for row in db_conn.row_where('process_steps', 'owner',
+ process.id_):
+ step = ProcessStep.from_table_row(db_conn, row)
+ process.explicit_steps += [step]
+ for name in ('conditions', 'enables', 'disables'):
+ table = f'process_{name}'
+ for c_id in db_conn.column_where(table, 'condition',
+ 'process', process.id_):
+ target = getattr(process, name)
+ target += [Condition.by_id(db_conn, c_id)]
assert isinstance(process, Process)
return process
"""Set self.explicit_steps in bulk."""
assert isinstance(self.id_, int)
for step in self.explicit_steps:
- assert isinstance(step.id_, int)
- del db_conn.cached_process_steps[step.id_]
+ step.uncache()
self.explicit_steps = []
db_conn.delete_where('process_steps', 'owner', self.id_)
for step_tuple in steps:
db_conn.delete_where('process_steps', 'owner', self.id_)
for step in self.explicit_steps:
step.save(db_conn)
- db_conn.cached_processes[self.id_] = self
-class ProcessStep(BaseModel):
+class ProcessStep(BaseModel[int]):
"""Sub-unit of Processes."""
table_name = 'process_steps'
to_save = ['owner_id', 'step_process_id', 'parent_step_id']
"""Retrieve ProcessStep by id_, or throw NotFoundException."""
step, _ = super()._by_id(db_conn, id_)
if step:
- assert isinstance(step, ProcessStep)
return step
raise NotFoundException(f'found no ProcessStep of ID {id_}')