home · contact · privacy
Enable deletion of Conditions.
[plomtask] / plomtask / processes.py
index 32eee4d350db5ed78f4501f7016688f5c174c7cb..e67b13414e26d40a87e77545ce57d238bc050028 100644 (file)
@@ -1,7 +1,8 @@
 """Collecting Processes and Process-related items."""
 from __future__ import annotations
 from dataclasses import dataclass
 """Collecting Processes and Process-related items."""
 from __future__ import annotations
 from dataclasses import dataclass
-from typing import Set
+from typing import Set, Any
+from sqlite3 import Row
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.misc import VersionedAttribute
 from plomtask.conditions import Condition, ConditionsRelations
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.misc import VersionedAttribute
 from plomtask.conditions import Condition, ConditionsRelations
@@ -18,14 +19,14 @@ class ProcessStepsNode:
     seen: bool
 
 
     seen: bool
 
 
-class Process(BaseModel, ConditionsRelations):
+class Process(BaseModel[int], ConditionsRelations):
     """Template for, and metadata for, Todos, and their arrangements."""
     table_name = 'processes'
 
     # pylint: disable=too-many-instance-attributes
 
     def __init__(self, id_: int | None) -> None:
     """Template for, and metadata for, Todos, and their arrangements."""
     table_name = 'processes'
 
     # pylint: disable=too-many-instance-attributes
 
     def __init__(self, id_: int | None) -> None:
-        self.set_int_id(id_)
+        super().__init__(id_)
         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
         self.description = VersionedAttribute(self, 'process_descriptions', '')
         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
         self.description = VersionedAttribute(self, 'process_descriptions', '')
         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
@@ -35,45 +36,26 @@ class Process(BaseModel, ConditionsRelations):
         self.disables: list[Condition] = []
 
     @classmethod
         self.disables: list[Condition] = []
 
     @classmethod
-    def all(cls, db_conn: DatabaseConnection) -> list[Process]:
-        """Collect all Processes and their connected VersionedAttributes."""
-        processes = {}
-        for id_, process in db_conn.cached_processes.items():
-            processes[id_] = process
-        already_recorded = processes.keys()
-        for id_ in db_conn.column_all('processes', 'id'):
-            if id_ not in already_recorded:
-                process = cls.by_id(db_conn, id_)
-                processes[process.id_] = process
-        return list(processes.values())
-
-    @classmethod
-    def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
-              create: bool = False) -> Process:
-        """Collect Process, its VersionedAttributes, and its child IDs."""
-        process = None
-        if id_:
-            process, _ = super()._by_id(db_conn, id_)
-        if not process:
-            if not create:
-                raise NotFoundException(f'Process not found of id: {id_}')
-            process = Process(id_)
-        if isinstance(process.id_, int):
-            for name in ('title', 'description', 'effort'):
-                table = f'process_{name}s'
-                for row in db_conn.row_where(table, 'parent', process.id_):
-                    getattr(process, name).history_from_row(row)
-            for row in db_conn.row_where('process_steps', 'owner',
-                                         process.id_):
-                step = ProcessStep.from_table_row(db_conn, row)
-                process.explicit_steps += [step]
-            for name in ('conditions', 'enables', 'disables'):
-                table = f'process_{name}'
-                for cond_id in db_conn.column_where(table, 'condition',
-                                                    'process', process.id_):
-                    target = getattr(process, name)
-                    target += [Condition.by_id(db_conn, cond_id)]
-        assert isinstance(process, Process)
+    def from_table_row(cls, db_conn: DatabaseConnection,
+                       row: Row | list[Any]) -> Process:
+        """Make from DB row, with dependencies."""
+        process = super().from_table_row(db_conn, row)
+        assert isinstance(process.id_, int)
+        for name in ('title', 'description', 'effort'):
+            table = f'process_{name}s'
+            for row_ in db_conn.row_where(table, 'parent', process.id_):
+                getattr(process, name).history_from_row(row_)
+        for row_ in db_conn.row_where('process_steps', 'owner',
+                                      process.id_):
+            step = ProcessStep.from_table_row(db_conn, row_)
+            process.explicit_steps += [step]  # pylint: disable=no-member
+        for name in ('conditions', 'enables', 'disables'):
+            table = f'process_{name}'
+            assert isinstance(process.id_, int)
+            for c_id in db_conn.column_where(table, 'condition',
+                                             'process', process.id_):
+                target = getattr(process, name)
+                target += [Condition.by_id(db_conn, c_id)]
         return process
 
     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
         return process
 
     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
@@ -163,8 +145,7 @@ class Process(BaseModel, ConditionsRelations):
         """Set self.explicit_steps in bulk."""
         assert isinstance(self.id_, int)
         for step in self.explicit_steps:
         """Set self.explicit_steps in bulk."""
         assert isinstance(self.id_, int)
         for step in self.explicit_steps:
-            assert isinstance(step.id_, int)
-            del db_conn.cached_process_steps[step.id_]
+            step.uncache()
         self.explicit_steps = []
         db_conn.delete_where('process_steps', 'owner', self.id_)
         for step_tuple in steps:
         self.explicit_steps = []
         db_conn.delete_where('process_steps', 'owner', self.id_)
         for step_tuple in steps:
@@ -187,30 +168,36 @@ class Process(BaseModel, ConditionsRelations):
         db_conn.delete_where('process_steps', 'owner', self.id_)
         for step in self.explicit_steps:
             step.save(db_conn)
         db_conn.delete_where('process_steps', 'owner', self.id_)
         for step in self.explicit_steps:
             step.save(db_conn)
-        db_conn.cached_processes[self.id_] = self
 
 
+    def remove(self, db_conn: DatabaseConnection) -> None:
+        """Remove from DB, with dependencies."""
+        assert isinstance(self.id_, int)
+        db_conn.delete_where('process_conditions', 'process', self.id_)
+        db_conn.delete_where('process_enables', 'process', self.id_)
+        db_conn.delete_where('process_disables', 'process', self.id_)
+        for step in self.explicit_steps:
+            step.remove(db_conn)
+        super().remove(db_conn)
 
 
-class ProcessStep(BaseModel):
+
+class ProcessStep(BaseModel[int]):
     """Sub-unit of Processes."""
     table_name = 'process_steps'
     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
 
     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
                  parent_step_id: int | None) -> None:
     """Sub-unit of Processes."""
     table_name = 'process_steps'
     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
 
     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
                  parent_step_id: int | None) -> None:
-        self.set_int_id(id_)
+        super().__init__(id_)
         self.owner_id = owner_id
         self.step_process_id = step_process_id
         self.parent_step_id = parent_step_id
 
         self.owner_id = owner_id
         self.step_process_id = step_process_id
         self.parent_step_id = parent_step_id
 
-    @classmethod
-    def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
-        """Retrieve ProcessStep by id_, or throw NotFoundException."""
-        step, _ = super()._by_id(db_conn, id_)
-        if step:
-            assert isinstance(step, ProcessStep)
-            return step
-        raise NotFoundException(f'found no ProcessStep of ID {id_}')
-
     def save(self, db_conn: DatabaseConnection) -> None:
         """Default to simply calling self.save_core for simple cases."""
         self.save_core(db_conn)
     def save(self, db_conn: DatabaseConnection) -> None:
         """Default to simply calling self.save_core for simple cases."""
         self.save_core(db_conn)
+
+    def remove(self, db_conn: DatabaseConnection) -> None:
+        """Remove from DB, and owner's .explicit_steps."""
+        owner = Process.by_id(db_conn, self.owner_id)
+        owner.explicit_steps.remove(self)
+        super().remove(db_conn)