X-Git-Url: https://plomlompom.com/repos/foo.html?a=blobdiff_plain;f=plomtask%2Fprocesses.py;h=0e8846d3f8591342d87c860f757ec9c2c8c43d07;hb=8f6b4ec61b126d6edbfda4f20d62398d92390a95;hp=76ed30df049b5d801b283ab904364da27f05f240;hpb=56b8aa166d51d139edb41f0ef250b1854a5e93e8;p=plomtask diff --git a/plomtask/processes.py b/plomtask/processes.py index 76ed30d..0e8846d 100644 --- a/plomtask/processes.py +++ b/plomtask/processes.py @@ -1,37 +1,27 @@ """Collecting Processes and Process-related items.""" from __future__ import annotations -from sqlite3 import Row from typing import Any, Set -from plomtask.db import DatabaseConnection +from plomtask.db import DatabaseConnection, BaseModel from plomtask.misc import VersionedAttribute from plomtask.conditions import Condition from plomtask.exceptions import NotFoundException, BadFormatException -class Process: +class Process(BaseModel): """Template for, and metadata for, Todos, and their arrangements.""" + table_name = 'processes' # pylint: disable=too-many-instance-attributes def __init__(self, id_: int | None) -> None: - if (id_ is not None) and id_ < 1: - raise BadFormatException(f'illegal Process ID, must be >=1: {id_}') - self.id_ = id_ + self.set_int_id(id_) self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED') self.description = VersionedAttribute(self, 'process_descriptions', '') self.effort = VersionedAttribute(self, 'process_efforts', 1.0) self.explicit_steps: list[ProcessStep] = [] self.conditions: list[Condition] = [] - self.fulfills: list[Condition] = [] - self.undoes: list[Condition] = [] - - @classmethod - def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Process: - """Make Process from database row, with empty VersionedAttributes.""" - process = cls(row[0]) - assert process.id_ is not None - db_conn.cached_processes[process.id_] = process - return process + self.enables: list[Condition] = [] + self.disables: list[Condition] = [] @classmethod def all(cls, db_conn: DatabaseConnection) -> list[Process]: @@ -40,9 +30,9 @@ class Process: for id_, process in db_conn.cached_processes.items(): processes[id_] = process already_recorded = processes.keys() - for row in db_conn.exec('SELECT id FROM processes'): - if row[0] not in already_recorded: - process = cls.by_id(db_conn, row[0]) + for id_ in db_conn.column_all('processes', 'id'): + if id_ not in already_recorded: + process = cls.by_id(db_conn, id_) processes[process.id_] = process return list(processes.values()) @@ -50,50 +40,39 @@ class Process: def by_id(cls, db_conn: DatabaseConnection, id_: int | None, create: bool = False) -> Process: """Collect Process, its VersionedAttributes, and its child IDs.""" - if id_ in db_conn.cached_processes.keys(): - process = db_conn.cached_processes[id_] - assert isinstance(process, Process) - return process process = None - for row in db_conn.exec('SELECT * FROM processes ' - 'WHERE id = ?', (id_,)): - process = cls(row[0]) - break + if id_: + process, _ = super()._by_id(db_conn, id_) if not process: if not create: raise NotFoundException(f'Process not found of id: {id_}') process = Process(id_) - for row in db_conn.exec('SELECT * FROM process_titles ' - 'WHERE parent_id = ?', (process.id_,)): - process.title.history[row[1]] = row[2] - for row in db_conn.exec('SELECT * FROM process_descriptions ' - 'WHERE parent_id = ?', (process.id_,)): - process.description.history[row[1]] = row[2] - for row in db_conn.exec('SELECT * FROM process_efforts ' - 'WHERE parent_id = ?', (process.id_,)): - process.effort.history[row[1]] = row[2] - for row in db_conn.exec('SELECT * FROM process_steps ' - 'WHERE owner_id = ?', (process.id_,)): - process.explicit_steps += [ProcessStep.from_table_row(db_conn, - row)] - for row in db_conn.exec('SELECT condition FROM process_conditions ' - 'WHERE process = ?', (process.id_,)): - process.conditions += [Condition.by_id(db_conn, row[0])] - for row in db_conn.exec('SELECT condition FROM process_fulfills ' - 'WHERE process = ?', (process.id_,)): - process.fulfills += [Condition.by_id(db_conn, row[0])] - for row in db_conn.exec('SELECT condition FROM process_undoes ' - 'WHERE process = ?', (process.id_,)): - process.undoes += [Condition.by_id(db_conn, row[0])] + if isinstance(process.id_, int): + for name in ('title', 'description', 'effort'): + table = f'process_{name}s' + for row in db_conn.row_where(table, 'parent', process.id_): + getattr(process, name).history_from_row(row) + for row in db_conn.row_where('process_steps', 'owner', + process.id_): + step = ProcessStep.from_table_row(db_conn, row) + process.explicit_steps += [step] + for name in ('conditions', 'enables', 'disables'): + table = f'process_{name}' + for cond_id in db_conn.column_where(table, 'condition', + 'process', process.id_): + target = getattr(process, name) + target += [Condition.by_id(db_conn, cond_id)] assert isinstance(process, Process) return process def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]: """Return Processes using self for a ProcessStep.""" + if not self.id_: + return [] owner_ids = set() - for owner_id in db_conn.exec('SELECT owner_id FROM process_steps WHERE' - ' step_process_id = ?', (self.id_,)): - owner_ids.add(owner_id[0]) + for id_ in db_conn.column_where('process_steps', 'owner', + 'step_process', self.id_): + owner_ids.add(id_) return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids] def get_steps(self, db_conn: DatabaseConnection, external_owner: @@ -125,7 +104,7 @@ class Process: external_owner = self for step in [s for s in self.explicit_steps if s.parent_step_id is None]: - assert step.id_ is not None # for mypy + assert isinstance(step.id_, int) steps[step.id_] = make_node(step) for step_id, step_node in steps.items(): walk_steps(step_id, step_node) @@ -140,14 +119,15 @@ class Process: for id_ in ids: trgt_list += [Condition.by_id(db_conn, id_)] - def set_fulfills(self, db_conn: DatabaseConnection, - ids: list[int]) -> None: - """Set self.fulfills to Conditions identified by ids.""" - self.set_conditions(db_conn, ids, 'fulfills') + def set_enables(self, db_conn: DatabaseConnection, + ids: list[int]) -> None: + """Set self.enables to Conditions identified by ids.""" + self.set_conditions(db_conn, ids, 'enables') - def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None: - """Set self.undoes to Conditions identified by ids.""" - self.set_conditions(db_conn, ids, 'undoes') + def set_disables(self, db_conn: DatabaseConnection, + ids: list[int]) -> None: + """Set self.disables to Conditions identified by ids.""" + self.set_conditions(db_conn, ids, 'disables') def _add_step(self, db_conn: DatabaseConnection, @@ -176,7 +156,7 @@ class Process: parent_step_id = None except NotFoundException: parent_step_id = None - assert self.id_ is not None + assert isinstance(self.id_, int) step = ProcessStep(id_, self.id_, step_process_id, parent_step_id) walk_steps(step) self.explicit_steps += [step] @@ -186,86 +166,56 @@ class Process: def set_steps(self, db_conn: DatabaseConnection, steps: list[tuple[int | None, int, int | None]]) -> None: """Set self.explicit_steps in bulk.""" + assert isinstance(self.id_, int) for step in self.explicit_steps: - assert step.id_ is not None + assert isinstance(step.id_, int) del db_conn.cached_process_steps[step.id_] self.explicit_steps = [] - db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?', - (self.id_,)) + db_conn.delete_where('process_steps', 'owner', self.id_) for step_tuple in steps: self._add_step(db_conn, step_tuple[0], step_tuple[1], step_tuple[2]) - def save_id(self, db_conn: DatabaseConnection) -> None: - """Write bare-bones self (sans connected items), ensuring self.id_.""" - cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,)) - self.id_ = cursor.lastrowid - def save(self, db_conn: DatabaseConnection) -> None: """Add (or re-write) self and connected items to DB.""" - self.save_id(db_conn) + self.save_core(db_conn) + assert isinstance(self.id_, int) self.title.save(db_conn) self.description.save(db_conn) self.effort.save(db_conn) - db_conn.exec('DELETE FROM process_conditions WHERE process = ?', - (self.id_,)) - for condition in self.conditions: - db_conn.exec('INSERT INTO process_conditions VALUES (?,?)', - (self.id_, condition.id_)) - db_conn.exec('DELETE FROM process_fulfills WHERE process = ?', - (self.id_,)) - for condition in self.fulfills: - db_conn.exec('INSERT INTO process_fulfills VALUES (?,?)', - (self.id_, condition.id_)) - db_conn.exec('DELETE FROM process_undoes WHERE process = ?', - (self.id_,)) - for condition in self.undoes: - db_conn.exec('INSERT INTO process_undoes VALUES (?,?)', - (self.id_, condition.id_)) - assert self.id_ is not None - db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?', - (self.id_,)) + db_conn.rewrite_relations('process_conditions', 'process', self.id_, + [[c.id_] for c in self.conditions]) + db_conn.rewrite_relations('process_enables', 'process', self.id_, + [[c.id_] for c in self.enables]) + db_conn.rewrite_relations('process_disables', 'process', self.id_, + [[c.id_] for c in self.disables]) + db_conn.delete_where('process_steps', 'owner', self.id_) for step in self.explicit_steps: step.save(db_conn) db_conn.cached_processes[self.id_] = self -class ProcessStep: +class ProcessStep(BaseModel): """Sub-unit of Processes.""" + table_name = 'process_steps' + to_save = ['owner_id', 'step_process_id', 'parent_step_id'] def __init__(self, id_: int | None, owner_id: int, step_process_id: int, parent_step_id: int | None) -> None: - self.id_ = id_ + self.set_int_id(id_) self.owner_id = owner_id self.step_process_id = step_process_id self.parent_step_id = parent_step_id - @classmethod - def from_table_row(cls, db_conn: DatabaseConnection, - row: Row) -> ProcessStep: - """Make ProcessStep from database row, store in DB cache.""" - step = cls(row[0], row[1], row[2], row[3]) - assert step.id_ is not None - db_conn.cached_process_steps[step.id_] = step - return step - @classmethod def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep: """Retrieve ProcessStep by id_, or throw NotFoundException.""" - if id_ in db_conn.cached_process_steps.keys(): - step = db_conn.cached_process_steps[id_] + step, _ = super()._by_id(db_conn, id_) + if step: assert isinstance(step, ProcessStep) return step - for row in db_conn.exec('SELECT * FROM process_steps ' - 'WHERE step_id = ?', (id_,)): - return cls.from_table_row(db_conn, row) raise NotFoundException(f'found no ProcessStep of ID {id_}') def save(self, db_conn: DatabaseConnection) -> None: - """Save to database and cache.""" - cursor = db_conn.exec('REPLACE INTO process_steps VALUES (?, ?, ?, ?)', - (self.id_, self.owner_id, self.step_process_id, - self.parent_step_id)) - self.id_ = cursor.lastrowid - assert self.id_ is not None - db_conn.cached_process_steps[self.id_] = self + """Default to simply calling self.save_core for simple cases.""" + self.save_core(db_conn)