X-Git-Url: https://plomlompom.com/repos/%7B%7B%20web_path%20%7D%7D/decks/%7B%7Bdeck_id%7D%7D/cards/%7B%7Bcard_id%7D%7D/static/git-logo.png?a=blobdiff_plain;f=plomtask%2Fprocesses.py;h=7872c335eebd702e327db08481a83a77827fbc5f;hb=5a5d713ce0b223ab2f6ef34c15bb82b614bdda98;hp=76ed30df049b5d801b283ab904364da27f05f240;hpb=56b8aa166d51d139edb41f0ef250b1854a5e93e8;p=plomtask diff --git a/plomtask/processes.py b/plomtask/processes.py index 76ed30d..7872c33 100644 --- a/plomtask/processes.py +++ b/plomtask/processes.py @@ -1,22 +1,20 @@ """Collecting Processes and Process-related items.""" from __future__ import annotations -from sqlite3 import Row from typing import Any, Set -from plomtask.db import DatabaseConnection +from plomtask.db import DatabaseConnection, BaseModel from plomtask.misc import VersionedAttribute from plomtask.conditions import Condition from plomtask.exceptions import NotFoundException, BadFormatException -class Process: +class Process(BaseModel): """Template for, and metadata for, Todos, and their arrangements.""" + table_name = 'processes' # pylint: disable=too-many-instance-attributes def __init__(self, id_: int | None) -> None: - if (id_ is not None) and id_ < 1: - raise BadFormatException(f'illegal Process ID, must be >=1: {id_}') - self.id_ = id_ + self.set_int_id(id_) self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED') self.description = VersionedAttribute(self, 'process_descriptions', '') self.effort = VersionedAttribute(self, 'process_efforts', 1.0) @@ -25,14 +23,6 @@ class Process: self.fulfills: list[Condition] = [] self.undoes: list[Condition] = [] - @classmethod - def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Process: - """Make Process from database row, with empty VersionedAttributes.""" - process = cls(row[0]) - assert process.id_ is not None - db_conn.cached_processes[process.id_] = process - return process - @classmethod def all(cls, db_conn: DatabaseConnection) -> list[Process]: """Collect all Processes and their connected VersionedAttributes.""" @@ -50,15 +40,9 @@ class Process: def by_id(cls, db_conn: DatabaseConnection, id_: int | None, create: bool = False) -> Process: """Collect Process, its VersionedAttributes, and its child IDs.""" - if id_ in db_conn.cached_processes.keys(): - process = db_conn.cached_processes[id_] - assert isinstance(process, Process) - return process process = None - for row in db_conn.exec('SELECT * FROM processes ' - 'WHERE id = ?', (id_,)): - process = cls(row[0]) - break + if id_: + process, _ = super()._by_id(db_conn, id_) if not process: if not create: raise NotFoundException(f'Process not found of id: {id_}') @@ -125,7 +109,7 @@ class Process: external_owner = self for step in [s for s in self.explicit_steps if s.parent_step_id is None]: - assert step.id_ is not None # for mypy + assert isinstance(step.id_, int) steps[step.id_] = make_node(step) for step_id, step_node in steps.items(): walk_steps(step_id, step_node) @@ -176,7 +160,7 @@ class Process: parent_step_id = None except NotFoundException: parent_step_id = None - assert self.id_ is not None + assert isinstance(self.id_, int) step = ProcessStep(id_, self.id_, step_process_id, parent_step_id) walk_steps(step) self.explicit_steps += [step] @@ -187,7 +171,7 @@ class Process: steps: list[tuple[int | None, int, int | None]]) -> None: """Set self.explicit_steps in bulk.""" for step in self.explicit_steps: - assert step.id_ is not None + assert isinstance(step.id_, int) del db_conn.cached_process_steps[step.id_] self.explicit_steps = [] db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?', @@ -196,14 +180,9 @@ class Process: self._add_step(db_conn, step_tuple[0], step_tuple[1], step_tuple[2]) - def save_id(self, db_conn: DatabaseConnection) -> None: - """Write bare-bones self (sans connected items), ensuring self.id_.""" - cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,)) - self.id_ = cursor.lastrowid - def save(self, db_conn: DatabaseConnection) -> None: """Add (or re-write) self and connected items to DB.""" - self.save_id(db_conn) + self.save_core(db_conn) self.title.save(db_conn) self.description.save(db_conn) self.effort.save(db_conn) @@ -222,7 +201,7 @@ class Process: for condition in self.undoes: db_conn.exec('INSERT INTO process_undoes VALUES (?,?)', (self.id_, condition.id_)) - assert self.id_ is not None + assert isinstance(self.id_, int) db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?', (self.id_,)) for step in self.explicit_steps: @@ -230,42 +209,27 @@ class Process: db_conn.cached_processes[self.id_] = self -class ProcessStep: +class ProcessStep(BaseModel): """Sub-unit of Processes.""" + table_name = 'process_steps' + to_save = ['owner_id', 'step_process_id', 'parent_step_id'] def __init__(self, id_: int | None, owner_id: int, step_process_id: int, parent_step_id: int | None) -> None: - self.id_ = id_ + self.set_int_id(id_) self.owner_id = owner_id self.step_process_id = step_process_id self.parent_step_id = parent_step_id - @classmethod - def from_table_row(cls, db_conn: DatabaseConnection, - row: Row) -> ProcessStep: - """Make ProcessStep from database row, store in DB cache.""" - step = cls(row[0], row[1], row[2], row[3]) - assert step.id_ is not None - db_conn.cached_process_steps[step.id_] = step - return step - @classmethod def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep: """Retrieve ProcessStep by id_, or throw NotFoundException.""" - if id_ in db_conn.cached_process_steps.keys(): - step = db_conn.cached_process_steps[id_] + step, _ = super()._by_id(db_conn, id_) + if step: assert isinstance(step, ProcessStep) return step - for row in db_conn.exec('SELECT * FROM process_steps ' - 'WHERE step_id = ?', (id_,)): - return cls.from_table_row(db_conn, row) raise NotFoundException(f'found no ProcessStep of ID {id_}') def save(self, db_conn: DatabaseConnection) -> None: - """Save to database and cache.""" - cursor = db_conn.exec('REPLACE INTO process_steps VALUES (?, ?, ?, ?)', - (self.id_, self.owner_id, self.step_process_id, - self.parent_step_id)) - self.id_ = cursor.lastrowid - assert self.id_ is not None - db_conn.cached_process_steps[self.id_] = self + """Default to simply calling self.save_core for simple cases.""" + self.save_core(db_conn)