X-Git-Url: https://plomlompom.com/repos/berlin_corona.txt?a=blobdiff_plain;f=plomtask%2Fprocesses.py;h=7872c335eebd702e327db08481a83a77827fbc5f;hb=5a5d713ce0b223ab2f6ef34c15bb82b614bdda98;hp=45de9dbe8e879f3ec06aae5575305e980df8c0d4;hpb=c5fab0b28785bb8f3a8e2b8e455fd679cfe83d25;p=plomtask diff --git a/plomtask/processes.py b/plomtask/processes.py index 45de9db..7872c33 100644 --- a/plomtask/processes.py +++ b/plomtask/processes.py @@ -1,6 +1,5 @@ """Collecting Processes and Process-related items.""" from __future__ import annotations -from sqlite3 import Row from typing import Any, Set from plomtask.db import DatabaseConnection, BaseModel from plomtask.misc import VersionedAttribute @@ -24,14 +23,6 @@ class Process(BaseModel): self.fulfills: list[Condition] = [] self.undoes: list[Condition] = [] - @classmethod - def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Process: - """Make Process from database row, with empty VersionedAttributes.""" - process = cls(row[0]) - assert isinstance(process.id_, int) - db_conn.cached_processes[process.id_] = process - return process - @classmethod def all(cls, db_conn: DatabaseConnection) -> list[Process]: """Collect all Processes and their connected VersionedAttributes.""" @@ -49,15 +40,9 @@ class Process(BaseModel): def by_id(cls, db_conn: DatabaseConnection, id_: int | None, create: bool = False) -> Process: """Collect Process, its VersionedAttributes, and its child IDs.""" - if id_ in db_conn.cached_processes.keys(): - process = db_conn.cached_processes[id_] - assert isinstance(process, Process) - return process process = None - for row in db_conn.exec('SELECT * FROM processes ' - 'WHERE id = ?', (id_,)): - process = cls(row[0]) - break + if id_: + process, _ = super()._by_id(db_conn, id_) if not process: if not create: raise NotFoundException(f'Process not found of id: {id_}') @@ -236,25 +221,13 @@ class ProcessStep(BaseModel): self.step_process_id = step_process_id self.parent_step_id = parent_step_id - @classmethod - def from_table_row(cls, db_conn: DatabaseConnection, - row: Row) -> ProcessStep: - """Make ProcessStep from database row, store in DB cache.""" - step = cls(row[0], row[1], row[2], row[3]) - assert isinstance(step.id_, int) - db_conn.cached_process_steps[step.id_] = step - return step - @classmethod def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep: """Retrieve ProcessStep by id_, or throw NotFoundException.""" - if id_ in db_conn.cached_process_steps.keys(): - step = db_conn.cached_process_steps[id_] + step, _ = super()._by_id(db_conn, id_) + if step: assert isinstance(step, ProcessStep) return step - for row in db_conn.exec('SELECT * FROM process_steps ' - 'WHERE step_id = ?', (id_,)): - return cls.from_table_row(db_conn, row) raise NotFoundException(f'found no ProcessStep of ID {id_}') def save(self, db_conn: DatabaseConnection) -> None: