return {'process': process,
'steps': process.get_steps(self.conn),
'owners': process.used_as_step_by(self.conn),
- 'process_candidates': Process.all(self.conn),
+ 'step_candidates': Process.all(self.conn),
'condition_candidates': Condition.all(self.conn)}
def do_GET_processes(self) -> dict[str, object]:
create: bool = False) -> Process:
"""Collect Process, its VersionedAttributes, and its child IDs."""
process = None
+ from_cache = False
if id_:
- process, _ = super()._by_id(db_conn, id_)
- if not process:
- if not create:
- raise NotFoundException(f'Process not found of id: {id_}')
- process = Process(id_)
- if isinstance(process.id_, int):
- for name in ('title', 'description', 'effort'):
- table = f'process_{name}s'
- for row in db_conn.row_where(table, 'parent', process.id_):
- getattr(process, name).history_from_row(row)
- for row in db_conn.row_where('process_steps', 'owner',
- process.id_):
- step = ProcessStep.from_table_row(db_conn, row)
- process.explicit_steps += [step]
- for name in ('conditions', 'enables', 'disables'):
- table = f'process_{name}'
- for cond_id in db_conn.column_where(table, 'condition',
- 'process', process.id_):
- target = getattr(process, name)
- target += [Condition.by_id(db_conn, cond_id)]
+ process, from_cache = super()._by_id(db_conn, id_)
+ if not from_cache:
+ if not process:
+ if not create:
+ raise NotFoundException(f'Process not found of id: {id_}')
+ process = Process(id_)
+ if isinstance(process.id_, int):
+ for name in ('title', 'description', 'effort'):
+ table = f'process_{name}s'
+ for row in db_conn.row_where(table, 'parent', process.id_):
+ getattr(process, name).history_from_row(row)
+ for row in db_conn.row_where('process_steps', 'owner',
+ process.id_):
+ step = ProcessStep.from_table_row(db_conn, row)
+ process.explicit_steps += [step]
+ for name in ('conditions', 'enables', 'disables'):
+ table = f'process_{name}'
+ for c_id in db_conn.column_where(table, 'condition',
+ 'process', process.id_):
+ target = getattr(process, name)
+ target += [Condition.by_id(db_conn, c_id)]
assert isinstance(process, Process)
return process
{% if step_node.is_explicit %}
<input type="checkbox" name="keep_step" value="{{step_id}}" checked />
<input type="hidden" name="step_{{step_id}}_process_id" value="{{step_node.process.id_}}" />
-<input type="hidden" name="step_{{step_id}}_condition_id" value="{{step_node.condition.id_}}" />
<input type="hidden" name="step_{{step_id}}_parent_id" value="{{step_node.parent_id or ''}}" />
{% endif %}
</td>
add disables: <input name="disables" list="condition_candidates" autocomplete="off" />
<h4>steps</h4>
<table>
-{% for step_node in steps %}
+{% for step_id, step_node in steps.items() %}
{{ step_with_steps(step_id, step_node, 0) }}
{% endfor %}
</table>
self.assertEqual(step.parent_step_id, step_retrieved.parent_step_id)
def test_Process_singularity(self) -> None:
- """Test pointers made for single object keep pointing to it."""
+ """Test pointers made for single object keep pointing to it, and
+ subsequent retrievals don't overload relations."""
assert isinstance(self.proc1.id_, int)
assert isinstance(self.proc2.id_, int)
+ c1 = Condition(None, False)
+ c1.save(self.db_conn)
+ assert isinstance(c1.id_, int)
+ self.proc1.set_conditions(self.db_conn, [c1.id_])
self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)])
+ self.proc1.save(self.db_conn)
p_retrieved = Process.by_id(self.db_conn, self.proc1.id_)
self.assertEqual(self.proc1.explicit_steps, p_retrieved.explicit_steps)
+ self.assertEqual(self.proc1.conditions, p_retrieved.conditions)
+ self.proc1.save(self.db_conn)
def test_Process_versioned_attributes_singularity(self) -> None:
"""Test behavior of VersionedAttributes on saving (with .title)."""