This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8bb1e8e5ca [AIP-51] Add helper to import default executor class 
(#27974)
8bb1e8e5ca is described below

commit 8bb1e8e5ca2a122a5d4915b553eeca3422bc6766
Author: Niko <[email protected]>
AuthorDate: Sat Dec 3 09:05:28 2022 -0800

    [AIP-51] Add helper to import default executor class (#27974)
    
    Also add missing testing for executor loader import mechanism
---
 airflow/executors/executor_loader.py    | 28 ++++++++++++++++++++++------
 tests/executors/test_executor_loader.py | 32 +++++++++++++++++++++++++++++++-
 2 files changed, 53 insertions(+), 7 deletions(-)

diff --git a/airflow/executors/executor_loader.py 
b/airflow/executors/executor_loader.py
index 56802017e4..0852b5206b 100644
--- a/airflow/executors/executor_loader.py
+++ b/airflow/executors/executor_loader.py
@@ -65,18 +65,23 @@ class ExecutorLoader:
         DEBUG_EXECUTOR: "airflow.executors.debug_executor.DebugExecutor",
     }
 
+    @classmethod
+    def get_default_executor_name(cls) -> str:
+        """Returns the default executor name from Airflow configuration.
+
+        :return: executor name from Airflow configuration
+        """
+        from airflow.configuration import conf
+
+        return conf.get_mandatory_value("core", "EXECUTOR")
+
     @classmethod
     def get_default_executor(cls) -> BaseExecutor:
         """Creates a new instance of the configured executor if none exists 
and returns it."""
         if cls._default_executor is not None:
             return cls._default_executor
 
-        from airflow.configuration import conf
-
-        executor_name = conf.get_mandatory_value("core", "EXECUTOR")
-        cls._default_executor = cls.load_executor(executor_name)
-
-        return cls._default_executor
+        return cls.load_executor(cls.get_default_executor_name())
 
     @classmethod
     def load_executor(cls, executor_name: str) -> BaseExecutor:
@@ -134,6 +139,17 @@ class ExecutorLoader:
                 return import_string(f"airflow.executors.{executor_name}"), 
ConnectorSource.PLUGIN
         return import_string(executor_name), ConnectorSource.CUSTOM_PATH
 
+    @classmethod
+    def import_default_executor_cls(cls) -> tuple[type[BaseExecutor], 
ConnectorSource]:
+        """
+        Imports the default executor class.
+
+        :return: executor class and executor import source
+        """
+        executor_name = cls.get_default_executor_name()
+
+        return cls.import_executor_cls(executor_name)
+
     @classmethod
     def __load_celery_kubernetes_executor(cls) -> BaseExecutor:
         celery_executor = import_string(cls.executors[CELERY_EXECUTOR])()
diff --git a/tests/executors/test_executor_loader.py 
b/tests/executors/test_executor_loader.py
index 180e7b961c..96d7030e17 100644
--- a/tests/executors/test_executor_loader.py
+++ b/tests/executors/test_executor_loader.py
@@ -21,7 +21,7 @@ from unittest import mock
 import pytest
 
 from airflow import plugins_manager
-from airflow.executors.executor_loader import ExecutorLoader
+from airflow.executors.executor_loader import ConnectorSource, ExecutorLoader
 from tests.test_utils.config import conf_vars
 
 # Plugin Manager creates new modules, which is difficult to mock, so we use 
test isolation by a unique name.
@@ -73,3 +73,33 @@ class TestExecutorLoader:
             executor = ExecutorLoader.get_default_executor()
             assert executor is not None
             assert "FakeExecutor" == executor.__class__.__name__
+
+    @pytest.mark.parametrize(
+        "executor_name",
+        [
+            "CeleryExecutor",
+            "CeleryKubernetesExecutor",
+            "DebugExecutor",
+            "KubernetesExecutor",
+            "LocalExecutor",
+        ],
+    )
+    def test_should_support_import_executor_from_core(self, executor_name):
+        with conf_vars({("core", "executor"): executor_name}):
+            executor, import_source = 
ExecutorLoader.import_default_executor_cls()
+            assert executor_name == executor.__name__
+            assert import_source == ConnectorSource.CORE
+
+    @mock.patch("airflow.plugins_manager.plugins", [FakePlugin()])
+    @mock.patch("airflow.plugins_manager.executors_modules", None)
+    def test_should_support_import_plugins(self):
+        with conf_vars({("core", "executor"): 
f"{TEST_PLUGIN_NAME}.FakeExecutor"}):
+            executor, import_source = 
ExecutorLoader.import_default_executor_cls()
+            assert "FakeExecutor" == executor.__name__
+            assert import_source == ConnectorSource.PLUGIN
+
+    def test_should_support_import_custom_path(self):
+        with conf_vars({("core", "executor"): 
"tests.executors.test_executor_loader.FakeExecutor"}):
+            executor, import_source = 
ExecutorLoader.import_default_executor_cls()
+            assert "FakeExecutor" == executor.__name__
+            assert import_source == ConnectorSource.CUSTOM_PATH

Reply via email to