X-Git-Url: https://plomlompom.com/repos/berlin_corona.txt?a=blobdiff_plain;f=plomtask%2Fprocesses.py;h=e5851d0e0d5586215d6521e495a25da95f360e43;hb=7adcf651f0053f0dc5d719457a016a0d5b12253b;hp=45de9dbe8e879f3ec06aae5575305e980df8c0d4;hpb=c5fab0b28785bb8f3a8e2b8e455fd679cfe83d25;p=plomtask diff --git a/plomtask/processes.py b/plomtask/processes.py index 45de9db..e5851d0 100644 --- a/plomtask/processes.py +++ b/plomtask/processes.py @@ -1,6 +1,5 @@ """Collecting Processes and Process-related items.""" from __future__ import annotations -from sqlite3 import Row from typing import Any, Set from plomtask.db import DatabaseConnection, BaseModel from plomtask.misc import VersionedAttribute @@ -24,14 +23,6 @@ class Process(BaseModel): self.fulfills: list[Condition] = [] self.undoes: list[Condition] = [] - @classmethod - def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Process: - """Make Process from database row, with empty VersionedAttributes.""" - process = cls(row[0]) - assert isinstance(process.id_, int) - db_conn.cached_processes[process.id_] = process - return process - @classmethod def all(cls, db_conn: DatabaseConnection) -> list[Process]: """Collect all Processes and their connected VersionedAttributes.""" @@ -49,49 +40,35 @@ class Process(BaseModel): def by_id(cls, db_conn: DatabaseConnection, id_: int | None, create: bool = False) -> Process: """Collect Process, its VersionedAttributes, and its child IDs.""" - if id_ in db_conn.cached_processes.keys(): - process = db_conn.cached_processes[id_] - assert isinstance(process, Process) - return process process = None - for row in db_conn.exec('SELECT * FROM processes ' - 'WHERE id = ?', (id_,)): - process = cls(row[0]) - break + if id_: + process, _ = super()._by_id(db_conn, id_) if not process: if not create: raise NotFoundException(f'Process not found of id: {id_}') process = Process(id_) - for row in db_conn.exec('SELECT * FROM process_titles ' - 'WHERE parent_id = ?', (process.id_,)): - process.title.history[row[1]] = row[2] - for row in db_conn.exec('SELECT * FROM process_descriptions ' - 'WHERE parent_id = ?', (process.id_,)): - process.description.history[row[1]] = row[2] - for row in db_conn.exec('SELECT * FROM process_efforts ' - 'WHERE parent_id = ?', (process.id_,)): - process.effort.history[row[1]] = row[2] - for row in db_conn.exec('SELECT * FROM process_steps ' - 'WHERE owner_id = ?', (process.id_,)): - process.explicit_steps += [ProcessStep.from_table_row(db_conn, - row)] - for row in db_conn.exec('SELECT condition FROM process_conditions ' - 'WHERE process = ?', (process.id_,)): - process.conditions += [Condition.by_id(db_conn, row[0])] - for row in db_conn.exec('SELECT condition FROM process_fulfills ' - 'WHERE process = ?', (process.id_,)): - process.fulfills += [Condition.by_id(db_conn, row[0])] - for row in db_conn.exec('SELECT condition FROM process_undoes ' - 'WHERE process = ?', (process.id_,)): - process.undoes += [Condition.by_id(db_conn, row[0])] + if isinstance(process.id_, int): + for name in ('title', 'description', 'effort'): + table = f'process_{name}s' + for row in db_conn.all_where(table, 'parent', process.id_): + getattr(process, name).history_from_row(row) + for row in db_conn.all_where('process_steps', 'owner', + process.id_): + step = ProcessStep.from_table_row(db_conn, row) + process.explicit_steps += [step] + for name in ('conditions', 'fulfills', 'undoes'): + table = f'process_{name}' + for row in db_conn.all_where(table, 'process', process.id_): + target = getattr(process, name) + target += [Condition.by_id(db_conn, row[1])] assert isinstance(process, Process) return process def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]: """Return Processes using self for a ProcessStep.""" owner_ids = set() - for owner_id in db_conn.exec('SELECT owner_id FROM process_steps WHERE' - ' step_process_id = ?', (self.id_,)): + for owner_id in db_conn.exec('SELECT owner FROM process_steps WHERE' + ' step_process = ?', (self.id_,)): owner_ids.add(owner_id[0]) return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids] @@ -189,7 +166,7 @@ class Process(BaseModel): assert isinstance(step.id_, int) del db_conn.cached_process_steps[step.id_] self.explicit_steps = [] - db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?', + db_conn.exec('DELETE FROM process_steps WHERE owner = ?', (self.id_,)) for step_tuple in steps: self._add_step(db_conn, step_tuple[0], @@ -198,26 +175,17 @@ class Process(BaseModel): def save(self, db_conn: DatabaseConnection) -> None: """Add (or re-write) self and connected items to DB.""" self.save_core(db_conn) + assert isinstance(self.id_, int) self.title.save(db_conn) self.description.save(db_conn) self.effort.save(db_conn) - db_conn.exec('DELETE FROM process_conditions WHERE process = ?', - (self.id_,)) - for condition in self.conditions: - db_conn.exec('INSERT INTO process_conditions VALUES (?,?)', - (self.id_, condition.id_)) - db_conn.exec('DELETE FROM process_fulfills WHERE process = ?', - (self.id_,)) - for condition in self.fulfills: - db_conn.exec('INSERT INTO process_fulfills VALUES (?,?)', - (self.id_, condition.id_)) - db_conn.exec('DELETE FROM process_undoes WHERE process = ?', - (self.id_,)) - for condition in self.undoes: - db_conn.exec('INSERT INTO process_undoes VALUES (?,?)', - (self.id_, condition.id_)) - assert isinstance(self.id_, int) - db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?', + db_conn.rewrite_relations('process_conditions', 'process', self.id_, + [[c.id_] for c in self.conditions]) + db_conn.rewrite_relations('process_fulfills', 'process', self.id_, + [[c.id_] for c in self.fulfills]) + db_conn.rewrite_relations('process_undoes', 'process', self.id_, + [[c.id_] for c in self.undoes]) + db_conn.exec('DELETE FROM process_steps WHERE owner = ?', (self.id_,)) for step in self.explicit_steps: step.save(db_conn) @@ -236,25 +204,13 @@ class ProcessStep(BaseModel): self.step_process_id = step_process_id self.parent_step_id = parent_step_id - @classmethod - def from_table_row(cls, db_conn: DatabaseConnection, - row: Row) -> ProcessStep: - """Make ProcessStep from database row, store in DB cache.""" - step = cls(row[0], row[1], row[2], row[3]) - assert isinstance(step.id_, int) - db_conn.cached_process_steps[step.id_] = step - return step - @classmethod def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep: """Retrieve ProcessStep by id_, or throw NotFoundException.""" - if id_ in db_conn.cached_process_steps.keys(): - step = db_conn.cached_process_steps[id_] + step, _ = super()._by_id(db_conn, id_) + if step: assert isinstance(step, ProcessStep) return step - for row in db_conn.exec('SELECT * FROM process_steps ' - 'WHERE step_id = ?', (id_,)): - return cls.from_table_row(db_conn, row) raise NotFoundException(f'found no ProcessStep of ID {id_}') def save(self, db_conn: DatabaseConnection) -> None: