From: Christian Heller Date: Fri, 9 Aug 2024 16:15:04 +0000 (+0200) Subject: Clean up do_POST_process code. X-Git-Url: https://plomlompom.com/repos/%7B%7B%20web_path%20%7D%7D/decks/%7B%7Byoutube_prefix%7D%7D%7B%7Bvideo_id%7D%7D?a=commitdiff_plain;h=5a1ad2652cd2984a73eb9c1dc3f1c23f361a2365;p=plomtask Clean up do_POST_process code. --- diff --git a/plomtask/http.py b/plomtask/http.py index c2e1f5d..51b35cb 100644 --- a/plomtask/http.py +++ b/plomtask/http.py @@ -645,7 +645,6 @@ class TaskHandler(BaseHTTPRequestHandler): cond_rels = [self._form.get_all_int(name) for name in ['conditions', 'blockers', 'enables', 'disables']] effort_or_not = self._form.get_str('effort') - # if effort_or_not is not None: if effort_or_not == '': to_update['effort'] = None @@ -655,7 +654,6 @@ class TaskHandler(BaseHTTPRequestHandler): except ValueError as e: msg = 'cannot float form field value for key: effort' raise BadFormatException(msg) from e - todo.set_condition_relations(self._conn, *cond_rels) for filler in [f for f in step_fillers if f != 'ignore']: target_id: int to_int = filler @@ -673,6 +671,8 @@ class TaskHandler(BaseHTTPRequestHandler): to_make['full'] += [target_id] else: adopted_child_ids += [target_id] + # + todo.set_condition_relations(self._conn, *cond_rels) to_remove = [] for child in todo.children: if child.id_ and (child.id_ not in adopted_child_ids): @@ -713,6 +713,16 @@ class TaskHandler(BaseHTTPRequestHandler): def do_POST_process(self, process: Process) -> str: """Update or insert Process of ?id= and fields defined in postvars.""" # pylint: disable=too-many-locals + + def id_or_title(l_id_or_title: list[str]) -> tuple[str, list[int]]: + l_ids, title = [], '' + for id_or_title in l_id_or_title: + try: + l_ids += [int(id_or_title)] + except ValueError: + title = id_or_title + return title, l_ids + versioned = {'title': self._form.get_str_or_fail('title'), 'description': self._form.get_str_or_fail('description'), 'effort': self._form.get_float_or_fail('effort')} @@ -720,55 +730,36 @@ class TaskHandler(BaseHTTPRequestHandler): in ['conditions', 'blockers', 'enables', 'disables']] calendarize = self._form.get_bool_or_none('calendarize') step_of = self._form.get_all_str('step_of') - suppresses = self._form.get_all_int('suppresses') + suppressions = self._form.get_all_int('suppresses') kept_steps = self._form.get_all_int('kept_steps') - new_top_steps = self._form.get_all_str('new_top_step') + new_top_step_procs = self._form.get_all_str('new_top_step') new_steps_to = {} for step_id in kept_steps: name = f'new_step_to_{step_id}' new_steps_to[step_id] = self._form.get_all_int(name) + new_owner_title, owners_to_set = id_or_title(step_of) + new_step_title, new_top_step_proc_ids = id_or_title(new_top_step_procs) # for k, v in versioned.items(): getattr(process, k).set(v) - process.set_condition_relations(self._conn, *cond_rels) if calendarize is not None: process.calendarize = calendarize process.save(self._conn) assert isinstance(process.id_, int) - # set relations to, and if non-existant yet: create, other Processes - # pylint: disable=fixme - # TODO: in what order to set owners, owneds, and possibly step - # suppressions can make the difference between recursion checks - # failing; should probably be handled class-internally to Process - # rather than here! - # 1. owners (upwards) - owners_to_set = [] - new_owner_title = None - for owner_identifier in step_of: - try: - owners_to_set += [int(owner_identifier)] - except ValueError: - new_owner_title = owner_identifier - process.set_owners(self._conn, owners_to_set) - # 2. owneds (downwards) - new_step_title = None - steps: list[ProcessStep] = [ProcessStep.by_id(self._conn, step_id) - for step_id in kept_steps] - for step_id in kept_steps: - new_sub_steps = [ + # set relations to Conditions and ProcessSteps / other Processes + process.set_condition_relations(self._conn, *cond_rels) + owned_steps = [] + for step_id in kept_steps: # collecting sub-steps + owned_steps += [ProcessStep.by_id(self._conn, step_id)] + owned_steps += [ # new sub-steps ProcessStep(None, process.id_, step_process_id, step_id) for step_process_id in new_steps_to[step_id]] - steps += new_sub_steps - for step_id_or_new_title in new_top_steps: - try: - step_process_id = int(step_id_or_new_title) - step = ProcessStep(None, process.id_, step_process_id, None) - steps += [step] - except ValueError: - new_step_title = step_id_or_new_title - process.set_steps(self._conn, steps) - process.set_step_suppressions(self._conn, suppresses) - # encode titles for potentially newly created Processes up or down + for step_process_id in new_top_step_proc_ids: + owned_steps += [ProcessStep(None, process.id_, step_process_id, + None)] + process.set_step_relations(self._conn, owners_to_set, suppressions, + owned_steps) + # encode titles for potential newly-to-create Processes up or down params = f'id={process.id_}' if new_step_title: title_b64_encoded = b64encode(new_step_title.encode()).decode() diff --git a/plomtask/processes.py b/plomtask/processes.py index 56dd282..23eb624 100644 --- a/plomtask/processes.py +++ b/plomtask/processes.py @@ -129,16 +129,54 @@ class Process(BaseModel[int], ConditionsRelations): walk_steps(step_node) return step_nodes - def set_step_suppressions(self, db_conn: DatabaseConnection, - step_ids: list[int]) -> None: + def set_step_relations(self, + db_conn: DatabaseConnection, + owners: list[int], + suppressions: list[int], + owned_steps: list[ProcessStep] + ) -> None: + """Set step owners, suppressions, and owned steps.""" + self._set_owners(db_conn, owners) + self._set_step_suppressions(db_conn, suppressions) + self.set_steps(db_conn, owned_steps) + + def _set_step_suppressions(self, + db_conn: DatabaseConnection, + step_ids: list[int] + ) -> None: """Set self.suppressed_steps from step_ids.""" assert isinstance(self.id_, int) db_conn.delete_where('process_step_suppressions', 'process', self.id_) self.suppressed_steps = [ProcessStep.by_id(db_conn, s) for s in step_ids] - def set_steps(self, db_conn: DatabaseConnection, - steps: list[ProcessStep]) -> None: + def _set_owners(self, + db_conn: DatabaseConnection, + owner_ids: list[int] + ) -> None: + """Re-set owners to those identified in owner_ids.""" + owners_old = self.used_as_step_by(db_conn) + losers = [o for o in owners_old if o.id_ not in owner_ids] + owners_old_ids = [o.id_ for o in owners_old] + winners = [Process.by_id(db_conn, id_) for id_ in owner_ids + if id_ not in owners_old_ids] + steps_to_remove = [] + for loser in losers: + steps_to_remove += [s for s in loser.explicit_steps + if s.step_process_id == self.id_] + for step in steps_to_remove: + step.remove(db_conn) + for winner in winners: + assert isinstance(winner.id_, int) + assert isinstance(self.id_, int) + new_step = ProcessStep(None, winner.id_, self.id_, None) + new_explicit_steps = winner.explicit_steps + [new_step] + winner.set_steps(db_conn, new_explicit_steps) + + def set_steps(self, + db_conn: DatabaseConnection, + steps: list[ProcessStep] + ) -> None: """Set self.explicit_steps in bulk. Checks against recursion, and turns into top-level steps any of @@ -166,27 +204,6 @@ class Process(BaseModel[int], ConditionsRelations): walk_steps(step) step.save(db_conn) - def set_owners(self, db_conn: DatabaseConnection, - owner_ids: list[int]) -> None: - """Re-set owners to those identified in owner_ids.""" - owners_old = self.used_as_step_by(db_conn) - losers = [o for o in owners_old if o.id_ not in owner_ids] - owners_old_ids = [o.id_ for o in owners_old] - winners = [Process.by_id(db_conn, id_) for id_ in owner_ids - if id_ not in owners_old_ids] - steps_to_remove = [] - for loser in losers: - steps_to_remove += [s for s in loser.explicit_steps - if s.step_process_id == self.id_] - for step in steps_to_remove: - step.remove(db_conn) - for winner in winners: - assert isinstance(winner.id_, int) - assert isinstance(self.id_, int) - new_step = ProcessStep(None, winner.id_, self.id_, None) - new_explicit_steps = winner.explicit_steps + [new_step] - winner.set_steps(db_conn, new_explicit_steps) - def save(self, db_conn: DatabaseConnection) -> None: """Add (or re-write) self and connected items to DB.""" super().save(db_conn)