- for child_id in self.child_ids:
- walk_descendants(child_id)
- db_conn.exec('INSERT INTO process_children VALUES (?, ?)',
- (self.id_, child_id))
-
-
-class VersionedAttribute:
- """Attributes whose values are recorded as a timestamped history."""
-
- def __init__(self,
- parent: Process, name: str, default: str | float) -> None:
- self.parent = parent
- self.name = name
- self.default = default
- self.history: dict[str, str | float] = {}
-
- @property
- def _newest_timestamp(self) -> str:
- """Return most recent timestamp."""
- return sorted(self.history.keys())[-1]
-
- @property
- def newest(self) -> str | float:
- """Return most recent value, or self.default if self.history empty."""
- if 0 == len(self.history):
- return self.default
- return self.history[self._newest_timestamp]
-
- def set(self, value: str | float) -> None:
- """Add to self.history if and only if not same value as newest one."""
- if 0 == len(self.history) \
- or value != self.history[self._newest_timestamp]:
- self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
-
- def at(self, queried_time: str) -> str | float:
- """Retrieve value of timestamp nearest queried_time from the past."""
- sorted_timestamps = sorted(self.history.keys())
- if 0 == len(sorted_timestamps):
- return self.default
- selected_timestamp = sorted_timestamps[0]
- for timestamp in sorted_timestamps[1:]:
- if timestamp > queried_time:
- break
- selected_timestamp = timestamp
- return self.history[selected_timestamp]
+ for condition in self.undoes:
+ db_conn.exec('INSERT INTO process_undoes VALUES (?,?)',
+ (self.id_, condition.id_))
+ assert self.id_ is not None
+ db_conn.cached_processes[self.id_] = self
+
+ def fix_steps(self, db_conn: DatabaseConnection) -> None:
+ """Rewrite ProcessSteps from self.explicit_steps.
+
+ This also fixes illegal Step.parent_step_id values, i.e. those pointing
+ to steps now absent, or owned by a different Process, fall back into
+ .parent_step_id=None
+ """
+ db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
+ (self.id_,))
+ for step in self.explicit_steps:
+ if step.parent_step_id is not None:
+ try:
+ parent_step = ProcessStep.by_id(db_conn,
+ step.parent_step_id)
+ if parent_step.owner_id != self.id_:
+ step.parent_step_id = None
+ except NotFoundException:
+ step.parent_step_id = None
+ step.save(db_conn)
+
+
+class ProcessStep:
+ """Sub-unit of Processes."""
+
+ def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
+ parent_step_id: int | None) -> None:
+ self.id_ = id_
+ self.owner_id = owner_id
+ self.step_process_id = step_process_id
+ self.parent_step_id = parent_step_id
+
+ @classmethod
+ def from_table_row(cls, db_conn: DatabaseConnection,
+ row: Row) -> ProcessStep:
+ """Make ProcessStep from database row, store in DB cache."""
+ step = cls(row[0], row[1], row[2], row[3])
+ assert step.id_ is not None
+ db_conn.cached_process_steps[step.id_] = step
+ return step
+
+ @classmethod
+ def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
+ """Retrieve ProcessStep by id_, or throw NotFoundException."""
+ if id_ in db_conn.cached_process_steps.keys():
+ step = db_conn.cached_process_steps[id_]
+ assert isinstance(step, ProcessStep)
+ return step
+ for row in db_conn.exec('SELECT * FROM process_steps '
+ 'WHERE step_id = ?', (id_,)):
+ return cls.from_table_row(db_conn, row)
+ raise NotFoundException(f'found no ProcessStep of ID {id_}')