"""Template for, and metadata for, Todos, and their arrangements."""
# pylint: disable=too-many-instance-attributes
table_name = 'processes'
- to_save = ['calendarize']
- to_save_versioned = ['title', 'description', 'effort']
+ to_save_simples = ['calendarize']
to_save_relations = [('process_conditions', 'process', 'conditions', 0),
('process_blockers', 'process', 'blockers', 0),
('process_enables', 'process', 'enables', 0),
('process_disables', 'process', 'disables', 0),
('process_step_suppressions', 'process',
'suppressed_steps', 0)]
+ add_to_dict = ['explicit_steps']
+ versioned_defaults = {'title': 'UNNAMED', 'description': '', 'effort': 1.0}
to_search = ['title.newest', 'description.newest']
+ can_create_by_id = True
+ sorters = {'steps': lambda p: len(p.explicit_steps),
+ 'owners': lambda p: p.n_owners,
+ 'effort': lambda p: p.effort.newest,
+ 'title': lambda p: p.title.newest}
def __init__(self, id_: int | None, calendarize: bool = False) -> None:
BaseModel.__init__(self, id_)
ConditionsRelations.__init__(self)
- self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
- self.description = VersionedAttribute(self, 'process_descriptions', '')
- self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
+ for name in ['title', 'description', 'effort']:
+ attr = VersionedAttribute(self, f'process_{name}s',
+ self.versioned_defaults[name])
+ setattr(self, name, attr)
self.explicit_steps: list[ProcessStep] = []
self.suppressed_steps: list[ProcessStep] = []
self.calendarize = calendarize
def from_table_row(cls, db_conn: DatabaseConnection,
row: Row | list[Any]) -> Process:
"""Make from DB row, with dependencies."""
- # pylint: disable=no-member
process = super().from_table_row(db_conn, row)
- assert isinstance(process.id_, int)
- for name in ('title', 'description', 'effort'):
- table = f'process_{name}s'
- for row_ in db_conn.row_where(table, 'parent', process.id_):
- getattr(process, name).history_from_row(row_)
- for row_ in db_conn.row_where('process_steps', 'owner',
- process.id_):
- step = ProcessStep.from_table_row(db_conn, row_)
- process.explicit_steps += [step]
- for row_ in db_conn.row_where('process_step_suppressions', 'process',
- process.id_):
- step = ProcessStep.by_id(db_conn, row_[1])
- process.suppressed_steps += [step]
+ assert process.id_ is not None
for name in ('conditions', 'blockers', 'enables', 'disables'):
table = f'process_{name}'
assert isinstance(process.id_, int)
'process', process.id_):
target = getattr(process, name)
target += [Condition.by_id(db_conn, c_id)]
+ for row_ in db_conn.row_where('process_steps', 'owner', process.id_):
+ step = ProcessStep.from_table_row(db_conn, row_)
+ process.explicit_steps += [step]
+ for row_ in db_conn.row_where('process_step_suppressions', 'process',
+ process.id_):
+ step = ProcessStep.by_id(db_conn, row_[1])
+ process.suppressed_steps += [step]
process.n_owners = len(process.used_as_step_by(db_conn))
return process
walk_steps(step)
assert isinstance(self.id_, int)
- for step in self.explicit_steps:
- step.uncache()
- self.explicit_steps = []
- db_conn.delete_where('process_steps', 'owner', self.id_)
- for step in steps:
- step.save(db_conn)
+ for step in [s for s in self.explicit_steps if s not in steps]:
+ step.remove(db_conn)
+ for step in [s for s in steps if s not in self.explicit_steps]:
if step.parent_step_id is not None:
try:
parent_step = ProcessStep.by_id(db_conn,
except NotFoundException:
step.parent_step_id = None
walk_steps(step)
- self.explicit_steps += [step]
+ step.save(db_conn)
def set_owners(self, db_conn: DatabaseConnection,
owner_ids: list[int]) -> None:
class ProcessStep(BaseModel[int]):
"""Sub-unit of Processes."""
table_name = 'process_steps'
- to_save = ['owner_id', 'step_process_id', 'parent_step_id']
+ to_save_simples = ['owner_id', 'step_process_id', 'parent_step_id']
def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
parent_step_id: int | None) -> None:
self.step_process_id = step_process_id
self.parent_step_id = parent_step_id
+ def save(self, db_conn: DatabaseConnection) -> None:
+ """Update into DB/cache, and owner's .explicit_steps."""
+ super().save(db_conn)
+ owner = Process.by_id(db_conn, self.owner_id)
+ if self not in owner.explicit_steps:
+ for s in [s for s in owner.explicit_steps if s.id_ == self.id_]:
+ s.remove(db_conn)
+ owner.explicit_steps += [self]
+ owner.explicit_steps.sort(key=hash)
+
def remove(self, db_conn: DatabaseConnection) -> None:
"""Remove from DB, and owner's .explicit_steps."""
owner = Process.by_id(db_conn, self.owner_id)