home · contact · privacy
Add text-based search/filter for Conditions and Processes.
[plomtask] / plomtask / processes.py
index 490acc38db705d25a7d783c1c0420f6bcc552f9c..684dec81dde47e6bd409ffaa52edc3a251246e8d 100644 (file)
@@ -1,68 +1,67 @@
 """Collecting Processes and Process-related items."""
 from __future__ import annotations
 """Collecting Processes and Process-related items."""
 from __future__ import annotations
-from typing import Any, Set
+from dataclasses import dataclass
+from typing import Set, Any
+from sqlite3 import Row
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.db import DatabaseConnection, BaseModel
-from plomtask.misc import VersionedAttribute
-from plomtask.conditions import Condition
-from plomtask.exceptions import NotFoundException, BadFormatException
+from plomtask.versioned_attributes import VersionedAttribute
+from plomtask.conditions import Condition, ConditionsRelations
+from plomtask.exceptions import (NotFoundException, BadFormatException,
+                                 HandledException)
 
 
 
 
-class Process(BaseModel):
-    """Template for, and metadata for, Todos, and their arrangements."""
-    table_name = 'processes'
+@dataclass
+class ProcessStepsNode:
+    """Collects what's useful to know for ProcessSteps tree display."""
+    process: Process
+    parent_id: int | None
+    is_explicit: bool
+    steps: dict[int, ProcessStepsNode]
+    seen: bool
 
 
-    # pylint: disable=too-many-instance-attributes
 
 
-    def __init__(self, id_: int | None) -> None:
-        self.set_int_id(id_)
+class Process(BaseModel[int], ConditionsRelations):
+    """Template for, and metadata for, Todos, and their arrangements."""
+    # pylint: disable=too-many-instance-attributes
+    table_name = 'processes'
+    to_save = ['calendarize']
+    to_save_versioned = ['title', 'description', 'effort']
+    to_save_relations = [('process_conditions', 'process', 'conditions'),
+                         ('process_blockers', 'process', 'blockers'),
+                         ('process_enables', 'process', 'enables'),
+                         ('process_disables', 'process', 'disables')]
+    to_search = ['title.newest', 'description.newest']
+
+    def __init__(self, id_: int | None, calendarize: bool = False) -> None:
+        BaseModel.__init__(self, id_)
+        ConditionsRelations.__init__(self)
         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
         self.description = VersionedAttribute(self, 'process_descriptions', '')
         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
         self.explicit_steps: list[ProcessStep] = []
         self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
         self.description = VersionedAttribute(self, 'process_descriptions', '')
         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
         self.explicit_steps: list[ProcessStep] = []
-        self.conditions: list[Condition] = []
-        self.fulfills: list[Condition] = []
-        self.undoes: list[Condition] = []
+        self.calendarize = calendarize
 
     @classmethod
 
     @classmethod
-    def all(cls, db_conn: DatabaseConnection) -> list[Process]:
-        """Collect all Processes and their connected VersionedAttributes."""
-        processes = {}
-        for id_, process in db_conn.cached_processes.items():
-            processes[id_] = process
-        already_recorded = processes.keys()
-        for id_ in db_conn.column_all('processes', 'id'):
-            if id_ not in already_recorded:
-                process = cls.by_id(db_conn, id_)
-                processes[process.id_] = process
-        return list(processes.values())
-
-    @classmethod
-    def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
-              create: bool = False) -> Process:
-        """Collect Process, its VersionedAttributes, and its child IDs."""
-        process = None
-        if id_:
-            process, _ = super()._by_id(db_conn, id_)
-        if not process:
-            if not create:
-                raise NotFoundException(f'Process not found of id: {id_}')
-            process = Process(id_)
-        if isinstance(process.id_, int):
-            for name in ('title', 'description', 'effort'):
-                table = f'process_{name}s'
-                for row in db_conn.row_where(table, 'parent', process.id_):
-                    getattr(process, name).history_from_row(row)
-            for row in db_conn.row_where('process_steps', 'owner',
-                                         process.id_):
-                step = ProcessStep.from_table_row(db_conn, row)
-                process.explicit_steps += [step]
-            for name in ('conditions', 'fulfills', 'undoes'):
-                table = f'process_{name}'
-                for cond_id in db_conn.column_where(table, 'condition',
-                                                    'process', process.id_):
-                    target = getattr(process, name)
-                    target += [Condition.by_id(db_conn, cond_id)]
-        assert isinstance(process, Process)
+    def from_table_row(cls, db_conn: DatabaseConnection,
+                       row: Row | list[Any]) -> Process:
+        """Make from DB row, with dependencies."""
+        process = super().from_table_row(db_conn, row)
+        assert isinstance(process.id_, int)
+        for name in ('title', 'description', 'effort'):
+            table = f'process_{name}s'
+            for row_ in db_conn.row_where(table, 'parent', process.id_):
+                getattr(process, name).history_from_row(row_)
+        for row_ in db_conn.row_where('process_steps', 'owner',
+                                      process.id_):
+            step = ProcessStep.from_table_row(db_conn, row_)
+            process.explicit_steps += [step]  # pylint: disable=no-member
+        for name in ('conditions', 'blockers', 'enables', 'disables'):
+            table = f'process_{name}'
+            assert isinstance(process.id_, int)
+            for c_id in db_conn.column_where(table, 'condition',
+                                             'process', process.id_):
+                target = getattr(process, name)
+                target += [Condition.by_id(db_conn, c_id)]
         return process
 
     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
         return process
 
     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
@@ -76,29 +75,30 @@ class Process(BaseModel):
         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
 
     def get_steps(self, db_conn: DatabaseConnection, external_owner:
         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
 
     def get_steps(self, db_conn: DatabaseConnection, external_owner:
-                  Process | None = None) -> dict[int, dict[str, object]]:
+                  Process | None = None) -> dict[int, ProcessStepsNode]:
         """Return tree of depended-on explicit and implicit ProcessSteps."""
 
         """Return tree of depended-on explicit and implicit ProcessSteps."""
 
-        def make_node(step: ProcessStep) -> dict[str, object]:
+        def make_node(step: ProcessStep) -> ProcessStepsNode:
             is_explicit = False
             if external_owner is not None:
                 is_explicit = step.owner_id == external_owner.id_
             process = self.__class__.by_id(db_conn, step.step_process_id)
             step_steps = process.get_steps(db_conn, external_owner)
             is_explicit = False
             if external_owner is not None:
                 is_explicit = step.owner_id == external_owner.id_
             process = self.__class__.by_id(db_conn, step.step_process_id)
             step_steps = process.get_steps(db_conn, external_owner)
-            return {'process': process, 'parent_id': step.parent_step_id,
-                    'is_explicit': is_explicit, 'steps': step_steps}
+            return ProcessStepsNode(process, step.parent_step_id,
+                                    is_explicit, step_steps, False)
 
 
-        def walk_steps(node_id: int, node: dict[str, Any]) -> None:
+        def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
             explicit_children = [s for s in self.explicit_steps
                                  if s.parent_step_id == node_id]
             for child in explicit_children:
             explicit_children = [s for s in self.explicit_steps
                                  if s.parent_step_id == node_id]
             for child in explicit_children:
-                node['steps'][child.id_] = make_node(child)
-            node['seen'] = node_id in seen_step_ids
+                assert isinstance(child.id_, int)
+                node.steps[child.id_] = make_node(child)
+            node.seen = node_id in seen_step_ids
             seen_step_ids.add(node_id)
             seen_step_ids.add(node_id)
-            for id_, step in node['steps'].items():
+            for id_, step in node.steps.items():
                 walk_steps(id_, step)
 
                 walk_steps(id_, step)
 
-        steps: dict[int, dict[str, object]] = {}
+        steps: dict[int, ProcessStepsNode] = {}
         seen_step_ids: Set[int] = set()
         if external_owner is None:
             external_owner = self
         seen_step_ids: Set[int] = set()
         if external_owner is None:
             external_owner = self
@@ -110,24 +110,6 @@ class Process(BaseModel):
             walk_steps(step_id, step_node)
         return steps
 
             walk_steps(step_id, step_node)
         return steps
 
-    def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
-                       trgt: str = 'conditions') -> None:
-        """Set self.[target] to Conditions identified by ids."""
-        trgt_list = getattr(self, trgt)
-        while len(trgt_list) > 0:
-            trgt_list.pop()
-        for id_ in ids:
-            trgt_list += [Condition.by_id(db_conn, id_)]
-
-    def set_fulfills(self, db_conn: DatabaseConnection,
-                     ids: list[int]) -> None:
-        """Set self.fulfills to Conditions identified by ids."""
-        self.set_conditions(db_conn, ids, 'fulfills')
-
-    def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
-        """Set self.undoes to Conditions identified by ids."""
-        self.set_conditions(db_conn, ids, 'undoes')
-
     def _add_step(self,
                   db_conn: DatabaseConnection,
                   id_: int | None,
     def _add_step(self,
                   db_conn: DatabaseConnection,
                   id_: int | None,
@@ -142,12 +124,14 @@ class Process(BaseModel):
         just deleted under its feet), or if the parent step would not be
         owned by the current Process.
         """
         just deleted under its feet), or if the parent step would not be
         owned by the current Process.
         """
+
         def walk_steps(node: ProcessStep) -> None:
             if node.step_process_id == self.id_:
                 raise BadFormatException('bad step selection causes recursion')
             step_process = self.by_id(db_conn, node.step_process_id)
             for step in step_process.explicit_steps:
                 walk_steps(step)
         def walk_steps(node: ProcessStep) -> None:
             if node.step_process_id == self.id_:
                 raise BadFormatException('bad step selection causes recursion')
             step_process = self.by_id(db_conn, node.step_process_id)
             for step in step_process.explicit_steps:
                 walk_steps(step)
+
         if parent_step_id is not None:
             try:
                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
         if parent_step_id is not None:
             try:
                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
@@ -167,8 +151,7 @@ class Process(BaseModel):
         """Set self.explicit_steps in bulk."""
         assert isinstance(self.id_, int)
         for step in self.explicit_steps:
         """Set self.explicit_steps in bulk."""
         assert isinstance(self.id_, int)
         for step in self.explicit_steps:
-            assert isinstance(step.id_, int)
-            del db_conn.cached_process_steps[step.id_]
+            step.uncache()
         self.explicit_steps = []
         db_conn.delete_where('process_steps', 'owner', self.id_)
         for step_tuple in steps:
         self.explicit_steps = []
         db_conn.delete_where('process_steps', 'owner', self.id_)
         for step_tuple in steps:
@@ -177,44 +160,41 @@ class Process(BaseModel):
 
     def save(self, db_conn: DatabaseConnection) -> None:
         """Add (or re-write) self and connected items to DB."""
 
     def save(self, db_conn: DatabaseConnection) -> None:
         """Add (or re-write) self and connected items to DB."""
-        self.save_core(db_conn)
+        super().save(db_conn)
         assert isinstance(self.id_, int)
         assert isinstance(self.id_, int)
-        self.title.save(db_conn)
-        self.description.save(db_conn)
-        self.effort.save(db_conn)
-        db_conn.rewrite_relations('process_conditions', 'process', self.id_,
-                                  [[c.id_] for c in self.conditions])
-        db_conn.rewrite_relations('process_fulfills', 'process', self.id_,
-                                  [[c.id_] for c in self.fulfills])
-        db_conn.rewrite_relations('process_undoes', 'process', self.id_,
-                                  [[c.id_] for c in self.undoes])
         db_conn.delete_where('process_steps', 'owner', self.id_)
         for step in self.explicit_steps:
             step.save(db_conn)
         db_conn.delete_where('process_steps', 'owner', self.id_)
         for step in self.explicit_steps:
             step.save(db_conn)
-        db_conn.cached_processes[self.id_] = self
 
 
+    def remove(self, db_conn: DatabaseConnection) -> None:
+        """Remove from DB, with dependencies.
 
 
-class ProcessStep(BaseModel):
+        Guard against removal of Processes in use.
+        """
+        assert isinstance(self.id_, int)
+        for _ in db_conn.row_where('process_steps', 'step_process', self.id_):
+            raise HandledException('cannot remove Process in use')
+        for _ in db_conn.row_where('todos', 'process', self.id_):
+            raise HandledException('cannot remove Process in use')
+        for step in self.explicit_steps:
+            step.remove(db_conn)
+        super().remove(db_conn)
+
+
+class ProcessStep(BaseModel[int]):
     """Sub-unit of Processes."""
     table_name = 'process_steps'
     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
 
     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
                  parent_step_id: int | None) -> None:
     """Sub-unit of Processes."""
     table_name = 'process_steps'
     to_save = ['owner_id', 'step_process_id', 'parent_step_id']
 
     def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
                  parent_step_id: int | None) -> None:
-        self.set_int_id(id_)
+        super().__init__(id_)
         self.owner_id = owner_id
         self.step_process_id = step_process_id
         self.parent_step_id = parent_step_id
 
         self.owner_id = owner_id
         self.step_process_id = step_process_id
         self.parent_step_id = parent_step_id
 
-    @classmethod
-    def by_id(cls, db_conn: DatabaseConnection, id_: int) -> ProcessStep:
-        """Retrieve ProcessStep by id_, or throw NotFoundException."""
-        step, _ = super()._by_id(db_conn, id_)
-        if step:
-            assert isinstance(step, ProcessStep)
-            return step
-        raise NotFoundException(f'found no ProcessStep of ID {id_}')
-
-    def save(self, db_conn: DatabaseConnection) -> None:
-        """Default to simply calling self.save_core for simple cases."""
-        self.save_core(db_conn)
+    def remove(self, db_conn: DatabaseConnection) -> None:
+        """Remove from DB, and owner's .explicit_steps."""
+        owner = Process.by_id(db_conn, self.owner_id)
+        owner.explicit_steps.remove(self)
+        super().remove(db_conn)