home · contact · privacy
Further refactor Conditions handling.
[plomtask] / plomtask / processes.py
index e5851d0e0d5586215d6521e495a25da95f360e43..eb438958d4ac7748426116664032cae18f73a5dc 100644 (file)
@@ -3,11 +3,11 @@ from __future__ import annotations
 from typing import Any, Set
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.misc import VersionedAttribute
 from typing import Any, Set
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.misc import VersionedAttribute
-from plomtask.conditions import Condition
+from plomtask.conditions import Condition, ConditionsRelations
 from plomtask.exceptions import NotFoundException, BadFormatException
 
 
 from plomtask.exceptions import NotFoundException, BadFormatException
 
 
-class Process(BaseModel):
+class Process(BaseModel, ConditionsRelations):
     """Template for, and metadata for, Todos, and their arrangements."""
     table_name = 'processes'
 
     """Template for, and metadata for, Todos, and their arrangements."""
     table_name = 'processes'
 
@@ -20,8 +20,8 @@ class Process(BaseModel):
         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
         self.explicit_steps: list[ProcessStep] = []
         self.conditions: list[Condition] = []
         self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
         self.explicit_steps: list[ProcessStep] = []
         self.conditions: list[Condition] = []
-        self.fulfills: list[Condition] = []
-        self.undoes: list[Condition] = []
+        self.enables: list[Condition] = []
+        self.disables: list[Condition] = []
 
     @classmethod
     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
 
     @classmethod
     def all(cls, db_conn: DatabaseConnection) -> list[Process]:
@@ -30,9 +30,9 @@ class Process(BaseModel):
         for id_, process in db_conn.cached_processes.items():
             processes[id_] = process
         already_recorded = processes.keys()
         for id_, process in db_conn.cached_processes.items():
             processes[id_] = process
         already_recorded = processes.keys()
-        for row in db_conn.exec('SELECT id FROM processes'):
-            if row[0] not in already_recorded:
-                process = cls.by_id(db_conn, row[0])
+        for id_ in db_conn.column_all('processes', 'id'):
+            if id_ not in already_recorded:
+                process = cls.by_id(db_conn, id_)
                 processes[process.id_] = process
         return list(processes.values())
 
                 processes[process.id_] = process
         return list(processes.values())
 
@@ -50,26 +50,29 @@ class Process(BaseModel):
         if isinstance(process.id_, int):
             for name in ('title', 'description', 'effort'):
                 table = f'process_{name}s'
         if isinstance(process.id_, int):
             for name in ('title', 'description', 'effort'):
                 table = f'process_{name}s'
-                for row in db_conn.all_where(table, 'parent', process.id_):
+                for row in db_conn.row_where(table, 'parent', process.id_):
                     getattr(process, name).history_from_row(row)
                     getattr(process, name).history_from_row(row)
-            for row in db_conn.all_where('process_steps', 'owner',
+            for row in db_conn.row_where('process_steps', 'owner',
                                          process.id_):
                 step = ProcessStep.from_table_row(db_conn, row)
                 process.explicit_steps += [step]
                                          process.id_):
                 step = ProcessStep.from_table_row(db_conn, row)
                 process.explicit_steps += [step]
-            for name in ('conditions', 'fulfills', 'undoes'):
+            for name in ('conditions', 'enables', 'disables'):
                 table = f'process_{name}'
                 table = f'process_{name}'
-                for row in db_conn.all_where(table, 'process', process.id_):
+                for cond_id in db_conn.column_where(table, 'condition',
+                                                    'process', process.id_):
                     target = getattr(process, name)
                     target = getattr(process, name)
-                    target += [Condition.by_id(db_conn, row[1])]
+                    target += [Condition.by_id(db_conn, cond_id)]
         assert isinstance(process, Process)
         return process
 
     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
         """Return Processes using self for a ProcessStep."""
         assert isinstance(process, Process)
         return process
 
     def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
         """Return Processes using self for a ProcessStep."""
+        if not self.id_:
+            return []
         owner_ids = set()
         owner_ids = set()
-        for owner_id in db_conn.exec('SELECT owner FROM process_steps WHERE'
-                                     ' step_process = ?', (self.id_,)):
-            owner_ids.add(owner_id[0])
+        for id_ in db_conn.column_where('process_steps', 'owner',
+                                        'step_process', self.id_):
+            owner_ids.add(id_)
         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
 
     def get_steps(self, db_conn: DatabaseConnection, external_owner:
         return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
 
     def get_steps(self, db_conn: DatabaseConnection, external_owner:
@@ -107,24 +110,6 @@ class Process(BaseModel):
             walk_steps(step_id, step_node)
         return steps
 
             walk_steps(step_id, step_node)
         return steps
 
-    def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
-                       trgt: str = 'conditions') -> None:
-        """Set self.[target] to Conditions identified by ids."""
-        trgt_list = getattr(self, trgt)
-        while len(trgt_list) > 0:
-            trgt_list.pop()
-        for id_ in ids:
-            trgt_list += [Condition.by_id(db_conn, id_)]
-
-    def set_fulfills(self, db_conn: DatabaseConnection,
-                     ids: list[int]) -> None:
-        """Set self.fulfills to Conditions identified by ids."""
-        self.set_conditions(db_conn, ids, 'fulfills')
-
-    def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
-        """Set self.undoes to Conditions identified by ids."""
-        self.set_conditions(db_conn, ids, 'undoes')
-
     def _add_step(self,
                   db_conn: DatabaseConnection,
                   id_: int | None,
     def _add_step(self,
                   db_conn: DatabaseConnection,
                   id_: int | None,
@@ -162,12 +147,12 @@ class Process(BaseModel):
     def set_steps(self, db_conn: DatabaseConnection,
                   steps: list[tuple[int | None, int, int | None]]) -> None:
         """Set self.explicit_steps in bulk."""
     def set_steps(self, db_conn: DatabaseConnection,
                   steps: list[tuple[int | None, int, int | None]]) -> None:
         """Set self.explicit_steps in bulk."""
+        assert isinstance(self.id_, int)
         for step in self.explicit_steps:
             assert isinstance(step.id_, int)
             del db_conn.cached_process_steps[step.id_]
         self.explicit_steps = []
         for step in self.explicit_steps:
             assert isinstance(step.id_, int)
             del db_conn.cached_process_steps[step.id_]
         self.explicit_steps = []
-        db_conn.exec('DELETE FROM process_steps WHERE owner = ?',
-                     (self.id_,))
+        db_conn.delete_where('process_steps', 'owner', self.id_)
         for step_tuple in steps:
             self._add_step(db_conn, step_tuple[0],
                            step_tuple[1], step_tuple[2])
         for step_tuple in steps:
             self._add_step(db_conn, step_tuple[0],
                            step_tuple[1], step_tuple[2])
@@ -181,12 +166,11 @@ class Process(BaseModel):
         self.effort.save(db_conn)
         db_conn.rewrite_relations('process_conditions', 'process', self.id_,
                                   [[c.id_] for c in self.conditions])
         self.effort.save(db_conn)
         db_conn.rewrite_relations('process_conditions', 'process', self.id_,
                                   [[c.id_] for c in self.conditions])
-        db_conn.rewrite_relations('process_fulfills', 'process', self.id_,
-                                  [[c.id_] for c in self.fulfills])
-        db_conn.rewrite_relations('process_undoes', 'process', self.id_,
-                                  [[c.id_] for c in self.undoes])
-        db_conn.exec('DELETE FROM process_steps WHERE owner = ?',
-                     (self.id_,))
+        db_conn.rewrite_relations('process_enables', 'process', self.id_,
+                                  [[c.id_] for c in self.enables])
+        db_conn.rewrite_relations('process_disables', 'process', self.id_,
+                                  [[c.id_] for c in self.disables])
+        db_conn.delete_where('process_steps', 'owner', self.id_)
         for step in self.explicit_steps:
             step.save(db_conn)
         db_conn.cached_processes[self.id_] = self
         for step in self.explicit_steps:
             step.save(db_conn)
         db_conn.cached_processes[self.id_] = self