self.form_data.get_all_int('condition'))
process.set_fulfills(self.conn, self.form_data.get_all_int('fulfills'))
process.set_undoes(self.conn, self.form_data.get_all_int('undoes'))
- process.save_without_steps(self.conn)
+ process.save_id(self.conn)
assert process.id_ is not None # for mypy
process.explicit_steps = []
+ steps: list[tuple[int | None, int, int | None]] = []
for step_id in self.form_data.get_all_int('steps'):
- for step_process_id in\
- self.form_data.get_all_int(f'new_step_to_{step_id}'):
- process.add_step(self.conn, None, step_process_id, step_id)
+ for step_process_id in self.form_data.get_all_int(
+ f'new_step_to_{step_id}'):
+ steps += [(None, step_process_id, step_id)]
if step_id not in self.form_data.get_all_int('keep_step'):
continue
step_process_id = self.form_data.get_int(
f'step_{step_id}_process_id')
parent_id = self.form_data.get_int_or_none(
f'step_{step_id}_parent_id')
- process.add_step(self.conn, step_id, step_process_id, parent_id)
+ steps += [(step_id, step_process_id, parent_id)]
for step_process_id in self.form_data.get_all_int('new_top_step'):
- process.add_step(self.conn, None, step_process_id, None)
- process.fix_steps(self.conn)
+ steps += [(None, step_process_id, None)]
+ process.set_steps(self.conn, steps)
+ process.save(self.conn)
def do_POST_condition(self) -> None:
"""Update/insert Condition of ?id= and fields defined in postvars."""
"""Set self.undoes to Conditions identified by ids."""
self.set_conditions(db_conn, ids, 'undoes')
- def add_step(self, db_conn: DatabaseConnection, id_: int | None,
- step_process_id: int,
- parent_step_id: int | None) -> ProcessStep:
+ def _add_step(self,
+ db_conn: DatabaseConnection,
+ id_: int | None,
+ step_process_id: int,
+ parent_step_id: int | None) -> ProcessStep:
"""Create new ProcessStep, save and add it to self.explicit_steps.
Also checks against step recursion.
+
The new step's parent_step_id will fall back to None either if no
matching ProcessStep is found (which can be assumed in case it was
just deleted under its feet), or if the parent step would not be
step.save(db_conn) # NB: This ensures a non-None step.id_.
return step
- def save_without_steps(self, db_conn: DatabaseConnection) -> None:
- """Add (or re-write) self and connected VersionedAttributes to DB."""
+ def set_steps(self, db_conn: DatabaseConnection,
+ steps: list[tuple[int | None, int, int | None]]) -> None:
+ """Set self.explicit_steps in bulk."""
+ for step in self.explicit_steps:
+ assert step.id_ is not None
+ del db_conn.cached_process_steps[step.id_]
+ self.explicit_steps = []
+ db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
+ (self.id_,))
+ for step_tuple in steps:
+ self._add_step(db_conn, step_tuple[0],
+ step_tuple[1], step_tuple[2])
+
+ def save_id(self, db_conn: DatabaseConnection) -> None:
+ """Write bare-bones self (sans connected items), ensuring self.id_."""
cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
self.id_ = cursor.lastrowid
+
+ def save(self, db_conn: DatabaseConnection) -> None:
+ """Add (or re-write) self and connected items to DB."""
+ self.save_id(db_conn)
self.title.save(db_conn)
self.description.save(db_conn)
self.effort.save(db_conn)
db_conn.exec('INSERT INTO process_undoes VALUES (?,?)',
(self.id_, condition.id_))
assert self.id_ is not None
- db_conn.cached_processes[self.id_] = self
-
- def fix_steps(self, db_conn: DatabaseConnection) -> None:
- """Rewrite ProcessSteps from self.explicit_steps.
-
- This also fixes illegal Step.parent_step_id values, i.e. those pointing
- to steps now absent, or owned by a different Process, fall back into
- .parent_step_id=None
- """
db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
(self.id_,))
for step in self.explicit_steps:
- if step.parent_step_id is not None:
- try:
- parent_step = ProcessStep.by_id(db_conn,
- step.parent_step_id)
- if parent_step.owner_id != self.id_:
- step.parent_step_id = None
- except NotFoundException:
- step.parent_step_id = None
step.save(db_conn)
+ db_conn.cached_processes[self.id_] = self
class ProcessStep:
def setUp(self) -> None:
super().setUp()
self.proc1 = Process(None)
- self.proc1.save_without_steps(self.db_conn)
+ self.proc1.save(self.db_conn)
self.proc2 = Process(None)
- self.proc2.save_without_steps(self.db_conn)
+ self.proc2.save(self.db_conn)
self.proc3 = Process(None)
- self.proc3.save_without_steps(self.db_conn)
+ self.proc3.save(self.db_conn)
def test_Process_ids(self) -> None:
- """Test Process.save_without_steps() re Process.id_."""
+ """Test Process.save() re Process.id_."""
self.assertEqual(self.proc1.id_,
Process.by_id(self.db_conn, 1, create=False).id_)
self.assertEqual(self.proc2.id_,
Process.by_id(self.db_conn, 2, create=False).id_)
proc5 = Process(5)
- proc5.save_without_steps(self.db_conn)
+ proc5.save(self.db_conn)
self.assertEqual(proc5.id_,
Process.by_id(self.db_conn, 5, create=False).id_)
def test_Process_steps(self) -> None:
"""Test addition, nesting, and non-recursion of ProcessSteps"""
+ def add_step(proc: Process,
+ steps_proc: list[tuple[int | None, int, int | None]],
+ step_tuple: tuple[int | None, int, int | None],
+ expected_id: int) -> None:
+ steps_proc += [step_tuple]
+ proc.set_steps(self.db_conn, steps_proc)
+ steps_proc[-1] = (expected_id, step_tuple[1], step_tuple[2])
assert self.proc2.id_ is not None
assert self.proc3.id_ is not None
- self.proc1.add_step(self.db_conn, None, self.proc2.id_, None)
+ steps_proc1: list[tuple[int | None, int, int | None]] = []
+ add_step(self.proc1, steps_proc1, (None, self.proc2.id_, None), 1)
p_1_dict: dict[int, dict[str, Any]] = {1: {
'process': self.proc2, 'parent_id': None,
'is_explicit': True, 'steps': {}, 'seen': False
}}
self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
- s_b = self.proc1.add_step(self.db_conn, None, self.proc3.id_, None)
+ add_step(self.proc1, steps_proc1, (None, self.proc3.id_, None), 2)
+ step_2 = self.proc1.explicit_steps[-1]
p_1_dict[2] = {
'process': self.proc3, 'parent_id': None,
'is_explicit': True, 'steps': {}, 'seen': False
}
self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
- s_c = self.proc2.add_step(self.db_conn, None, self.proc3.id_, None)
- assert s_c.id_ is not None
+ steps_proc2: list[tuple[int | None, int, int | None]] = []
+ add_step(self.proc2, steps_proc2, (None, self.proc3.id_, None), 3)
p_1_dict[1]['steps'] = {3: {
'process': self.proc3, 'parent_id': None,
'is_explicit': False, 'steps': {}, 'seen': False
}}
self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
- self.proc1.add_step(self.db_conn, None, self.proc2.id_, s_b.id_)
+ add_step(self.proc1, steps_proc1, (None, self.proc2.id_, step_2.id_),
+ 4)
p_1_dict[2]['steps'][4] = {
- 'process': self.proc2, 'parent_id': s_b.id_, 'seen': False,
+ 'process': self.proc2, 'parent_id': step_2.id_, 'seen': False,
'is_explicit': True, 'steps': {3: {
'process': self.proc3, 'parent_id': None,
'is_explicit': False, 'steps': {}, 'seen': True
}}}
self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
- self.proc1.add_step(self.db_conn, None, self.proc3.id_, 999)
+ add_step(self.proc1, steps_proc1, (None, self.proc3.id_, 999), 5)
p_1_dict[5] = {
'process': self.proc3, 'parent_id': None,
'is_explicit': True, 'steps': {}, 'seen': False
}
self.assertEqual(self.proc1.get_steps(self.db_conn, None), p_1_dict)
- self.proc1.add_step(self.db_conn, None, self.proc3.id_, 3)
+ add_step(self.proc1, steps_proc1, (None, self.proc3.id_, 3), 6)
p_1_dict[6] = {
'process': self.proc3, 'parent_id': None,
'is_explicit': True, 'steps': {}, 'seen': False
def test_ProcessStep_singularity(self) -> None:
"""Test pointers made for single object keep pointing to it."""
assert self.proc2.id_ is not None
- step = self.proc1.add_step(self.db_conn, None, self.proc2.id_, None)
+ self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)])
+ step = self.proc1.explicit_steps[-1]
assert step.id_ is not None
step_retrieved = ProcessStep.by_id(self.db_conn, step.id_)
step.parent_step_id = 99
def test_Process_singularity(self) -> None:
"""Test pointers made for single object keep pointing to it."""
assert self.proc2.id_ is not None
- self.proc1.add_step(self.db_conn, None, self.proc2.id_, None)
+ self.proc1.set_steps(self.db_conn, [(None, self.proc2.id_, None)])
p_retrieved = Process.by_id(self.db_conn, self.proc1.id_)
self.assertEqual(self.proc1.explicit_steps, p_retrieved.explicit_steps)