home · contact · privacy
Minor fixes.
[plomtask] / plomtask / processes.py
index 7872c335eebd702e327db08481a83a77827fbc5f..32eee4d350db5ed78f4501f7016688f5c174c7cb 100644 (file)
@@ -1,13 +1,24 @@
 """Collecting Processes and Process-related items."""
 from __future__ import annotations
 """Collecting Processes and Process-related items."""
 from __future__ import annotations
-from typing import Any, Set
+from dataclasses import dataclass
+from typing import Set
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.misc import VersionedAttribute
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.misc import VersionedAttribute
-from plomtask.conditions import Condition
+from plomtask.conditions import Condition, ConditionsRelations
 from plomtask.exceptions import NotFoundException, BadFormatException
 
 
 from plomtask.exceptions import NotFoundException, BadFormatException
 
 
-class Process(BaseModel):
+@dataclass
+class ProcessStepsNode:
+    """Collects what's useful to know for ProcessSteps tree display."""
+    process: Process
+    parent_id: int | None
+    is_explicit: bool
+    steps: dict[int, ProcessStepsNode]
+    seen: bool
+
+
+class Process(BaseModel, ConditionsRelations):
     """Template for, and metadata for, Todos, and their arrangements."""
     table_name = 'processes'
 
     """Template for, and metadata for, Todos, and their arrangements."""
     table_name = 'processes'
 
@@ -20,8 +31,8 @@ class Process(BaseModel):
         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
         self.explicit_steps: list[ProcessStep] = []
         self.conditions: list[Condition] = []
         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
         self.explicit_steps: list[ProcessStep] = []
         self.conditions: list[Condition] = []
-        self.fulfills: list[Condition] = []
-        self.undoes: list[Condition] = []
+        self.enables: list[Condition] = []
+        self.disables: list[Condition] = []
 
     @classmethod
     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
 
     @classmethod
     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
@@ -30,9 +41,9 @@ class Process(BaseModel):
         for id_, process in db_conn.cached_processes.items():
             processes[id_] = process
         already_recorded = processes.keys()
         for id_, process in db_conn.cached_processes.items():
             processes[id_] = process
         already_recorded = processes.keys()
-        for row in db_conn.exec('SELECT id FROM processes'):
-            if row[0] not in already_recorded:
-                process = cls.by_id(db_conn, row[0])
+        for id_ in db_conn.column_all('processes', 'id'):
+            if id_ not in already_recorded:
+                process = cls.by_id(db_conn, id_)
                 processes[process.id_] = process
         return list(processes.values())
 
                 processes[process.id_] = process
         return list(processes.values())
 
@@ -47,63 +58,59 @@ class Process(BaseModel):
             if not create:
                 raise NotFoundException(f'Process not found of id: {id_}')
             process = Process(id_)
             if not create:
                 raise NotFoundException(f'Process not found of id: {id_}')
             process = Process(id_)
-        for row in db_conn.exec('SELECT * FROM process_titles '
-                                'WHERE parent_id = ?', (process.id_,)):
-            process.title.history[row[1]] = row[2]
-        for row in db_conn.exec('SELECT * FROM process_descriptions '
-                                'WHERE parent_id = ?', (process.id_,)):
-            process.description.history[row[1]] = row[2]
-        for row in db_conn.exec('SELECT * FROM process_efforts '
-                                'WHERE parent_id = ?', (process.id_,)):
-            process.effort.history[row[1]] = row[2]
-        for row in db_conn.exec('SELECT * FROM process_steps '
-                                'WHERE owner_id = ?', (process.id_,)):
-            process.explicit_steps += [ProcessStep.from_table_row(db_conn,
-                                                                  row)]
-        for row in db_conn.exec('SELECT condition FROM process_conditions '
-                                'WHERE process = ?', (process.id_,)):
-            process.conditions += [Condition.by_id(db_conn, row[0])]
-        for row in db_conn.exec('SELECT condition FROM process_fulfills '
-                                'WHERE process = ?', (process.id_,)):
-            process.fulfills += [Condition.by_id(db_conn, row[0])]
-        for row in db_conn.exec('SELECT condition FROM process_undoes '
-                                'WHERE process = ?', (process.id_,)):
-            process.undoes += [Condition.by_id(db_conn, row[0])]
+        if isinstance(process.id_, int):
+            for name in ('title', 'description', 'effort'):
+                table = f'process_{name}s'
+                for row in db_conn.row_where(table, 'parent', process.id_):
+                    getattr(process, name).history_from_row(row)
+            for row in db_conn.row_where('process_steps', 'owner',
+                                         process.id_):
+                step = ProcessStep.from_table_row(db_conn, row)
+                process.explicit_steps += [step]
+            for name in ('conditions', 'enables', 'disables'):
+                table = f'process_{name}'
+                for cond_id in db_conn.column_where(table, 'condition',
+                                                    'process', process.id_):
+                    target = getattr(process, name)
+                    target += [Condition.by_id(db_conn, cond_id)]
         assert isinstance(process, Process)
         return process
 
     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
         """Return Processes using self for a ProcessStep."""
         assert isinstance(process, Process)
         return process
 
     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
         """Return Processes using self for a ProcessStep."""
+        if not self.id_:
+            return []
         owner_ids = set()
         owner_ids = set()
-        for owner_id in db_conn.exec('SELECT owner_id FROM process_steps WHERE'
-                                     ' step_process_id = ?', (self.id_,)):
-            owner_ids.add(owner_id[0])
+        for id_ in db_conn.column_where('process_steps', 'owner',
+                                        'step_process', self.id_):
+            owner_ids.add(id_)
         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
 
     def get_steps(self, db_conn: DatabaseConnection, external_owner:
         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
 
     def get_steps(self, db_conn: DatabaseConnection, external_owner:
-                  Process | None = None) -> dict[int, dict[str, object]]:
+                  Process | None = None) -> dict[int, ProcessStepsNode]:
         """Return tree of depended-on explicit and implicit ProcessSteps."""
 
         """Return tree of depended-on explicit and implicit ProcessSteps."""
 
-        def make_node(step: ProcessStep) -> dict[str, object]:
+        def make_node(step: ProcessStep) -> ProcessStepsNode:
             is_explicit = False
             if external_owner is not None:
                 is_explicit = step.owner_id == external_owner.id_
             process = self.__class__.by_id(db_conn, step.step_process_id)
             step_steps = process.get_steps(db_conn, external_owner)
             is_explicit = False
             if external_owner is not None:
                 is_explicit = step.owner_id == external_owner.id_
             process = self.__class__.by_id(db_conn, step.step_process_id)
             step_steps = process.get_steps(db_conn, external_owner)
-            return {'process': process, 'parent_id': step.parent_step_id,
-                    'is_explicit': is_explicit, 'steps': step_steps}
+            return ProcessStepsNode(process, step.parent_step_id,
+                                    is_explicit, step_steps, False)
 
 
-        def walk_steps(node_id: int, node: dict[str, Any]) -> None:
+        def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
             explicit_children = [s for s in self.explicit_steps
                                  if s.parent_step_id == node_id]
             for child in explicit_children:
             explicit_children = [s for s in self.explicit_steps
                                  if s.parent_step_id == node_id]
             for child in explicit_children:
-                node['steps'][child.id_] = make_node(child)
-            node['seen'] = node_id in seen_step_ids
+                assert isinstance(child.id_, int)
+                node.steps[child.id_] = make_node(child)
+            node.seen = node_id in seen_step_ids
             seen_step_ids.add(node_id)
             seen_step_ids.add(node_id)
-            for id_, step in node['steps'].items():
+            for id_, step in node.steps.items():
                 walk_steps(id_, step)
 
                 walk_steps(id_, step)
 
-        steps: dict[int, dict[str, object]] = {}
+        steps: dict[int, ProcessStepsNode] = {}
         seen_step_ids: Set[int] = set()
         if external_owner is None:
             external_owner = self
         seen_step_ids: Set[int] = set()
         if external_owner is None:
             external_owner = self
@@ -115,24 +122,6 @@ class Process(BaseModel):
             walk_steps(step_id, step_node)
         return steps
 
             walk_steps(step_id, step_node)
         return steps
 
-    def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
-                       trgt: str = 'conditions') -> None:
-        """Set self.[target] to Conditions identified by ids."""
-        trgt_list = getattr(self, trgt)
-        while len(trgt_list) > 0:
-            trgt_list.pop()
-        for id_ in ids:
-            trgt_list += [Condition.by_id(db_conn, id_)]
-
-    def set_fulfills(self, db_conn: DatabaseConnection,
-                     ids: list[int]) -> None:
-        """Set self.fulfills to Conditions identified by ids."""
-        self.set_conditions(db_conn, ids, 'fulfills')
-
-    def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
-        """Set self.undoes to Conditions identified by ids."""
-        self.set_conditions(db_conn, ids, 'undoes')
-
     def _add_step(self,
                   db_conn: DatabaseConnection,
                   id_: int | None,
     def _add_step(self,
                   db_conn: DatabaseConnection,
                   id_: int | None,
@@ -147,12 +136,14 @@ class Process(BaseModel):
         just deleted under its feet), or if the parent step would not be
         owned by the current Process.
         """
         just deleted under its feet), or if the parent step would not be
         owned by the current Process.
         """
+
         def walk_steps(node: ProcessStep) -> None:
             if node.step_process_id == self.id_:
                 raise BadFormatException('bad step selection causes recursion')
             step_process = self.by_id(db_conn, node.step_process_id)
             for step in step_process.explicit_steps:
                 walk_steps(step)
         def walk_steps(node: ProcessStep) -> None:
             if node.step_process_id == self.id_:
                 raise BadFormatException('bad step selection causes recursion')
             step_process = self.by_id(db_conn, node.step_process_id)
             for step in step_process.explicit_steps:
                 walk_steps(step)
+
         if parent_step_id is not None:
             try:
                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
         if parent_step_id is not None:
             try:
                 parent_step = ProcessStep.by_id(db_conn, parent_step_id)
@@ -170,12 +161,12 @@ class Process(BaseModel):
     def set_steps(self, db_conn: DatabaseConnection,
                   steps: list[tuple[int | None, int, int | None]]) -> None:
         """Set self.explicit_steps in bulk."""
     def set_steps(self, db_conn: DatabaseConnection,
                   steps: list[tuple[int | None, int, int | None]]) -> None:
         """Set self.explicit_steps in bulk."""
+        assert isinstance(self.id_, int)
         for step in self.explicit_steps:
             assert isinstance(step.id_, int)
             del db_conn.cached_process_steps[step.id_]
         self.explicit_steps = []
         for step in self.explicit_steps:
             assert isinstance(step.id_, int)
             del db_conn.cached_process_steps[step.id_]
         self.explicit_steps = []
-        db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
-                     (self.id_,))
+        db_conn.delete_where('process_steps', 'owner', self.id_)
         for step_tuple in steps:
             self._add_step(db_conn, step_tuple[0],
                            step_tuple[1], step_tuple[2])
         for step_tuple in steps:
             self._add_step(db_conn, step_tuple[0],
                            step_tuple[1], step_tuple[2])
@@ -183,27 +174,17 @@ class Process(BaseModel):
     def save(self, db_conn: DatabaseConnection) -> None:
         """Add (or re-write) self and connected items to DB."""
         self.save_core(db_conn)
     def save(self, db_conn: DatabaseConnection) -> None:
         """Add (or re-write) self and connected items to DB."""
         self.save_core(db_conn)
+        assert isinstance(self.id_, int)
         self.title.save(db_conn)
         self.description.save(db_conn)
         self.effort.save(db_conn)
         self.title.save(db_conn)
         self.description.save(db_conn)
         self.effort.save(db_conn)
-        db_conn.exec('DELETE FROM process_conditions WHERE process = ?',
-                     (self.id_,))
-        for condition in self.conditions:
-            db_conn.exec('INSERT INTO process_conditions VALUES (?,?)',
-                         (self.id_, condition.id_))
-        db_conn.exec('DELETE FROM process_fulfills WHERE process = ?',
-                     (self.id_,))
-        for condition in self.fulfills:
-            db_conn.exec('INSERT INTO process_fulfills VALUES (?,?)',
-                         (self.id_, condition.id_))
-        db_conn.exec('DELETE FROM process_undoes WHERE process = ?',
-                     (self.id_,))
-        for condition in self.undoes:
-            db_conn.exec('INSERT INTO process_undoes VALUES (?,?)',
-                         (self.id_, condition.id_))
-        assert isinstance(self.id_, int)
-        db_conn.exec('DELETE FROM process_steps WHERE owner_id = ?',
-                     (self.id_,))
+        db_conn.rewrite_relations('process_conditions', 'process', self.id_,
+                                  [[c.id_] for c in self.conditions])
+        db_conn.rewrite_relations('process_enables', 'process', self.id_,
+                                  [[c.id_] for c in self.enables])
+        db_conn.rewrite_relations('process_disables', 'process', self.id_,
+                                  [[c.id_] for c in self.disables])
+        db_conn.delete_where('process_steps', 'owner', self.id_)
         for step in self.explicit_steps:
             step.save(db_conn)
         db_conn.cached_processes[self.id_] = self
         for step in self.explicit_steps:
             step.save(db_conn)
         db_conn.cached_processes[self.id_] = self