This is an automated email from the ASF dual-hosted git repository.

caolu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new 40ddcf8a30 KYLIN-6061 build enable gluten
40ddcf8a30 is described below

commit 40ddcf8a30aa1ce8259349c7984780e80dcee219
Author: jlf <[email protected]>
AuthorDate: Tue Dec 31 10:56:32 2024 +0800

    KYLIN-6061 build enable gluten
    
    1. Enable Gulten for Async query
    2. Enable Gulten for Build Job
    3. fix build stage execution progress exception
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  12 ++
 .../apache/kylin/common/KylinConfigBaseTest.java   |   8 +-
 .../spark/job/InternalTableLoadingJobTest.java     |   1 +
 .../apache/kylin/query/relnode/OlapContext.java    |  24 ++--
 .../apache/kylin/query/util/SchemaConverter.java   |   3 +-
 .../apache/kylin/rest/service/AsyncQueryJob.java   |   8 +-
 .../kylin/query/engine/AsyncQueryApplication.java  |  17 +++
 .../AsyncQueryApplicationWithMetadataTest.java     | 142 +++++++++++++++++++
 .../kylin/query/relnode/OlapContextTest.java       |  36 ++++-
 .../kylin/query/util/SchemaConverterTest.java      |  13 ++
 .../engine/spark/application/SparkApplication.java |  27 ++--
 .../kylin/engine/spark/job/NSparkExecutable.java   |   7 +-
 .../kylin/engine/spark/job/SegmentBuildJob.java    |   3 +-
 .../apache/kylin/engine/spark/job/StepExec.scala   |   9 +-
 .../engine/spark/job/step/ResourceWaitStage.scala  |   6 +-
 .../kylin/engine/spark/job/step/StageExec.scala    |   4 +-
 .../spark/job/step/build/BuildStepExec.scala}      |  16 +--
 .../spark/application/SparkApplicationTest.java    | 153 ++++++++++++++-------
 .../engine/spark/job/NSparkExecutableTest.java     |  14 ++
 .../spark/job/step/ResourceWaitStageTest.scala     |  27 ++++
 20 files changed, 426 insertions(+), 104 deletions(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index cec9eda198..735813be56 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3739,6 +3739,10 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
Boolean.parseBoolean(getOptional("kylin.metrics.prometheus-enabled", TRUE));
     }
 
+    public boolean getWaitResourceEnabled() {
+        return 
Boolean.parseBoolean(getOptional("kylin.build.wait-resource.enabled", TRUE));
+    }
+
     public boolean getCheckResourceEnabled() {
         return 
Boolean.parseBoolean(getOptional("kylin.build.resource.check-enabled", FALSE));
     }
@@ -4412,6 +4416,14 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
Boolean.parseBoolean(this.getOptional("kylin.query.index-use-gulten", TRUE));
     }
 
+    public boolean buildUseGlutenEnabled() {
+        return 
Boolean.parseBoolean(this.getOptional("kylin.engine.gluten.enabled", FALSE));
+    }
+
+    public boolean uniqueAsyncQueryUseGlutenEnabled() {
+        return 
Boolean.parseBoolean(this.getOptional("kylin.unique-async-query.gluten.enabled",
 FALSE));
+    }
+
     public boolean queryUseGlutenEnabled() {
         return 
Boolean.parseBoolean(this.getOptional("kylin.storage.columnar.spark-conf.spark.gluten.enabled",
 FALSE))
                 && 
this.getOptional("kylin.storage.columnar.spark-conf.spark.plugins", 
"").contains("GlutenPlugin");
diff --git 
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
 
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
index 6539d2d8f7..5bacdfc38e 100644
--- 
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
+++ 
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
@@ -915,10 +915,10 @@ class KylinConfigBaseTest {
         map.put("decimalOperationsAllowPrecisionLoss", new PropertiesEntity(
                 
"kylin.storage.columnar.spark-conf.spark.sql.decimalOperations.allowPrecisionLoss",
 "true", true));
         map.put("isSparkUIAclEnabled", new 
PropertiesEntity("kylin.query.engine.spark-ui-acl.enabled", "", false));
-        map.put("isInternalTablePreloadCacheEnabled",
-                new 
PropertiesEntity("kylin.internal-table.preloaded-cache.enabled", "true", 
false));
-        map.put("isIndexPreloadCacheEnabled",
-                new PropertiesEntity("kylin.index.preloaded-cache.enabled", 
"true", false));
+        map.put("getWaitResourceEnabled", new 
PropertiesEntity("kylin.build.wait-resource.enabled", "", false));
+        map.put("buildUseGlutenEnabled", new 
PropertiesEntity("kylin.engine.gluten.enabled", "", false));
+        map.put("uniqueAsyncQueryUseGlutenEnabled",
+                new 
PropertiesEntity("kylin.unique-async-query.gluten.enabled", "", false));
         map.put("getConcurrentRunningThresholdForGlutenCache",
                 new 
PropertiesEntity("kylin.cache.gluten-cache-concurrent-running-threshold", "20", 
20));
     }
diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJobTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJobTest.java
index 8e1c9cb4fd..6422d2ff8b 100644
--- 
a/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJobTest.java
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJobTest.java
@@ -126,6 +126,7 @@ class InternalTableLoadingJobTest extends AbstractTestCase {
         internalTableLoadingJob.getTasks().forEach(task -> {
             if (task instanceof InternalTableLoadingStep) {
                 Assertions.assertTrue(task.isInternalTableSparkJob());
+                Assertions.assertTrue(((InternalTableLoadingStep) 
task).needRemoveGlutenParams(config));
             } else {
                 Assertions.assertFalse(task.isInternalTableSparkJob());
             }
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OlapContext.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OlapContext.java
index 3aafa90102..cc3c7c6c5a 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OlapContext.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OlapContext.java
@@ -469,11 +469,13 @@ public class OlapContext {
             if (tableDesc == null) {
                 return policy;
             }
-            if (olapConfig.isInternalTableEnabled() && 
tableDesc.isHasInternal() && !isAsyncQuery(olapConfig)) {
-                logger.info("Hit internal table {}", factTable);
-                policy = getSQLDigest().isDigestOfRawQuery()//
-                        ? NLookupCandidate.Policy.INTERNAL_TABLE
-                        : NLookupCandidate.Policy.AGG_THEN_INTERNAL_TABLE;
+            if (olapConfig.isInternalTableEnabled() && 
tableDesc.isHasInternal()) {
+                if (!isUniqueAsyncQuery(olapConfig) || 
asyncQueryUseGlutenEnabled(olapConfig)) {
+                    logger.info("Hit internal table {}", factTable);
+                    policy = getSQLDigest().isDigestOfRawQuery()//
+                            ? NLookupCandidate.Policy.INTERNAL_TABLE
+                            : NLookupCandidate.Policy.AGG_THEN_INTERNAL_TABLE;
+                }
             } else if (!olapConfig.isInternalTableEnabled() && 
!StringUtils.isBlank(tableDesc.getLastSnapshotPath())) {
                 logger.info("Hit the snapshot {}, the path is: {}", factTable, 
tableDesc.getLastSnapshotPath());
                 policy = getSQLDigest().isDigestOfRawQuery() //
@@ -484,10 +486,14 @@ public class OlapContext {
         return policy;
     }
 
-    public boolean isAsyncQuery(KylinConfig olapConfig) {
+    public boolean isUniqueAsyncQuery(KylinConfig olapConfig) {
         return QueryContext.current().getQueryTagInfo().isAsyncQuery() && 
olapConfig.isUniqueAsyncQueryYarnQueue();
     }
 
+    public boolean asyncQueryUseGlutenEnabled(KylinConfig olapConfig) {
+        return isUniqueAsyncQuery(olapConfig) && 
olapConfig.uniqueAsyncQueryUseGlutenEnabled();
+    }
+
     public String incapableMsg() {
         StringBuilder buf = new StringBuilder("OlapContext");
         if (incapableInfo.getReason() != null) {
@@ -653,10 +659,8 @@ public class OlapContext {
                     maxList[colId] = 
convertToColumnDataType(rangeInfo.getMax(), dataType);
                 } else {
                     RelDataType sqlType = OlapTable.createSqlType(typeFactory, 
c.getUpgradedType(), c.isNullable());
-                    minList[colId] = 
SparderTypeUtil.convertToStringWithCalciteType(rangeInfo.getMin(), sqlType,
-                            false);
-                    maxList[colId] = 
SparderTypeUtil.convertToStringWithCalciteType(rangeInfo.getMax(), sqlType,
-                            false);
+                    minList[colId] = 
SparderTypeUtil.convertToStringWithCalciteType(rangeInfo.getMin(), sqlType, 
false);
+                    maxList[colId] = 
SparderTypeUtil.convertToStringWithCalciteType(rangeInfo.getMax(), sqlType, 
false);
                 }
             }
 
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
index 0df7bc20b0..04860c8319 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
@@ -60,7 +60,8 @@ public class SchemaConverter implements IPushDownConverter {
             log.debug("Pushdown tag is not found, skip it.");
             return originSql;
         }
-        if (QueryContext.current().getQueryTagInfo().isAsyncQuery() && 
config.isUniqueAsyncQueryYarnQueue()) {
+        if (QueryContext.current().getQueryTagInfo().isAsyncQuery() && 
config.isUniqueAsyncQueryYarnQueue()
+                && !config.uniqueAsyncQueryUseGlutenEnabled()) {
             log.debug("Async query, skip it");
             return originSql;
         }
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryJob.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryJob.java
index e237b93cb6..4bb4b4df4e 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryJob.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryJob.java
@@ -148,7 +148,7 @@ public class AsyncQueryJob extends NSparkExecutable {
 
     @Override
     protected String getExtJar() {
-        return getConfig().getKylinExtJarsPath(false);
+        return 
getConfig().getKylinExtJarsPath(getConfig().uniqueAsyncQueryUseGlutenEnabled());
     }
 
     @Override
@@ -233,4 +233,10 @@ public class AsyncQueryJob extends NSparkExecutable {
         props.put("kylin.internal-table-enabled", KylinConfig.FALSE);
         
props.remove("kylin.storage.columnar.spark-conf.spark.sql.catalog.INTERNAL_CATALOG");
     }
+
+    @Override
+    public boolean needRemoveGlutenParams(KylinConfig config) {
+        // need to remove gulten params, return false
+        return config.buildUseGlutenEnabled();
+    }
 }
diff --git 
a/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java
 
b/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java
index a5e9b166c4..bc0db031ff 100644
--- 
a/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java
+++ 
b/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java
@@ -34,6 +34,7 @@ import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.engine.spark.application.SparkApplication;
 import org.apache.kylin.engine.spark.scheduler.JobFailed;
+import org.apache.kylin.job.common.ExecutableUtil;
 import org.apache.kylin.metadata.query.QueryHistorySql;
 import org.apache.kylin.metadata.query.QueryHistorySqlParam;
 import org.apache.kylin.metadata.query.QueryMetricsContext;
@@ -93,6 +94,14 @@ public class AsyncQueryApplication extends SparkApplication {
         }
     }
 
+    @Override
+    public void disableCurrentThreadGlutenIfNeed() {
+        if (!config.uniqueAsyncQueryUseGlutenEnabled()) {
+            
ss.sparkContext().setLocalProperty("gluten.enabledForCurrentThread", "false");
+            logger.info("Disable current thread gluten for Async Query");
+        }
+    }
+
     @Override
     public void reportSparkJobExtraInfo(SparkSession sparkSession) {
         // do nothing
@@ -118,6 +127,14 @@ public class AsyncQueryApplication extends 
SparkApplication {
         // do nothing
     }
 
+    @Override
+    public Map<String, String> removeGlutenParamsIfNeed(Map<String, String> 
baseSparkConf) {
+        if (!config.uniqueAsyncQueryUseGlutenEnabled()) {
+            return ExecutableUtil.removeGultenParams(baseSparkConf);
+        }
+        return baseSparkConf;
+    }
+
     private void saveQueryHistory(QueryContext queryContext, QueryParams 
queryParams) {
         if (StringUtils.isEmpty(queryContext.getMetrics().getCorrectedSql())) {
             
queryContext.getMetrics().setCorrectedSql(queryContext.getUserSQL());
diff --git 
a/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryApplicationWithMetadataTest.java
 
b/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryApplicationWithMetadataTest.java
new file mode 100644
index 0000000000..82b90252e5
--- /dev/null
+++ 
b/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryApplicationWithMetadataTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.engine;
+
+import static 
org.apache.kylin.job.constant.ExecutableConstants.COLUMNAR_SHUFFLE_MANAGER;
+import static org.apache.kylin.job.constant.ExecutableConstants.GLUTEN_PLUGIN;
+import static org.apache.kylin.job.constant.ExecutableConstants.SPARK_PLUGINS;
+import static 
org.apache.kylin.job.constant.ExecutableConstants.SPARK_SHUFFLE_MANAGER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.kylin.common.KylinConfigBase;
+import org.apache.kylin.common.util.TestUtils;
+import org.apache.kylin.engine.spark.application.SparkApplication;
+import org.apache.kylin.junit.annotation.MetadataInfo;
+import org.apache.kylin.metadata.streaming.ReflectionUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.Test;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import lombok.val;
+
+@MetadataInfo
+class AsyncQueryApplicationWithMetadataTest {
+
+    @Test
+    void testDisableCurrentThreadGlutenIfNeed() throws Exception {
+        val asyncQueryApplication = new AsyncQueryApplication();
+        val config = TestUtils.getTestConfig();
+        ReflectionUtils.setField(asyncQueryApplication, "config", config);
+
+        val sparkConf = new SparkConf();
+        sparkConf.set("spark.master", "local[111]");
+        assertFalseEnableForCurrentThread(asyncQueryApplication, sparkConf);
+
+        config.setProperty("kylin.unique-async-query.gluten.enabled", 
KylinConfigBase.TRUE);
+        assertNullEnableForCurrentThread(asyncQueryApplication, sparkConf);
+
+        config.setProperty("kylin.unique-async-query.gluten.enabled", 
KylinConfigBase.FALSE);
+        assertFalseEnableForCurrentThread(asyncQueryApplication, sparkConf);
+    }
+
+    private void assertNullEnableForCurrentThread(SparkApplication 
application, SparkConf sparkConf) {
+        try (val sparkSession = new 
SparkSession.Builder().config(sparkConf).getOrCreate();) {
+            ReflectionUtils.setField(application, "ss", sparkSession);
+
+            application.disableCurrentThreadGlutenIfNeed();
+
+            val ss = (SparkSession) ReflectionUtils.getField(application, 
"ss");
+            
assertNull(ss.sparkContext().getLocalProperty("gluten.enabledForCurrentThread"));
+        }
+    }
+
+    private void assertFalseEnableForCurrentThread(SparkApplication 
application, SparkConf sparkConf) {
+        try (val sparkSession = new 
SparkSession.Builder().config(sparkConf).getOrCreate();) {
+            ReflectionUtils.setField(application, "ss", sparkSession);
+
+            application.disableCurrentThreadGlutenIfNeed();
+
+            val ss = (SparkSession) ReflectionUtils.getField(application, 
"ss");
+            assertEquals("false", 
ss.sparkContext().getLocalProperty("gluten.enabledForCurrentThread"));
+        }
+    }
+
+    @Test
+    void testRemoveGlutenParamsIfNeed() throws Exception {
+        val asyncQueryApplication = new AsyncQueryApplication() {
+            @Override
+            protected void doExecute() {
+                // do nothing
+            }
+        };
+        val config = TestUtils.getTestConfig();
+        ReflectionUtils.setField(asyncQueryApplication, "config", config);
+        val sparkPrefix = "kylin.query.async-query.spark-conf.";
+        config.setProperty("kylin.env", "PROD");
+        config.setProperty(sparkPrefix + SPARK_PLUGINS, GLUTEN_PLUGIN + 
",org.apache.spark.kyuubi.KyuubiPlugin");
+        config.setProperty(sparkPrefix + "spark.gluten.enable", "true");
+        config.setProperty(sparkPrefix + "spark.master", "yarn");
+        config.setProperty(sparkPrefix + "spark.eventLog.enabled", "false");
+        config.setProperty(sparkPrefix + SPARK_SHUFFLE_MANAGER, 
COLUMNAR_SHUFFLE_MANAGER);
+        config.setProperty("kylin.engine.gluten.enabled", "true");
+
+        assertWithOutGluten(asyncQueryApplication);
+
+        config.setProperty("kylin.unique-async-query.gluten.enabled", 
KylinConfigBase.TRUE);
+        assertWithGluten(asyncQueryApplication);
+
+        config.setProperty("kylin.unique-async-query.gluten.enabled", 
KylinConfigBase.FALSE);
+        assertWithOutGluten(asyncQueryApplication);
+
+    }
+
+    private static void assertWithGluten(SparkApplication application) throws 
Exception {
+        val sparkConf = new SparkConf();
+        sparkConf.set("spark.master", "yarn");
+        sparkConf.set("spark.eventLog.enabled", "false");
+        application.exchangeSparkConf(sparkConf);
+        val atomicSparkConf = ((AtomicReference<SparkConf>) 
ReflectionTestUtils.getField(application,
+                "atomicSparkConf"));
+        val actalSparkConf = atomicSparkConf.get();
+        assertEquals(COLUMNAR_SHUFFLE_MANAGER, 
actalSparkConf.get(SPARK_SHUFFLE_MANAGER));
+        assertEquals("true", actalSparkConf.get("spark.gluten.enable"));
+        assertEquals(GLUTEN_PLUGIN + ",org.apache.spark.kyuubi.KyuubiPlugin", 
actalSparkConf.get(SPARK_PLUGINS));
+        assertEquals("yarn", actalSparkConf.get("spark.master"));
+        assertEquals("false", actalSparkConf.get("spark.eventLog.enabled"));
+    }
+
+    private static void assertWithOutGluten(SparkApplication application) 
throws Exception {
+        val sparkConf = new SparkConf();
+        sparkConf.set("spark.master", "yarn");
+        sparkConf.set("spark.eventLog.enabled", "false");
+        application.exchangeSparkConf(sparkConf);
+        val atomicSparkConf = ((AtomicReference<SparkConf>) 
ReflectionTestUtils.getField(application,
+                "atomicSparkConf"));
+        val actalSparkConf = atomicSparkConf.get();
+        assertFalse(Arrays.stream(actalSparkConf.getAll()).anyMatch(conf -> 
conf._1.contains("gluten")));
+        assertEquals("sort", actalSparkConf.get(SPARK_SHUFFLE_MANAGER));
+        assertEquals("org.apache.spark.kyuubi.KyuubiPlugin", 
actalSparkConf.get(SPARK_PLUGINS));
+    }
+}
diff --git 
a/src/query/src/test/java/org/apache/kylin/query/relnode/OlapContextTest.java 
b/src/query/src/test/java/org/apache/kylin/query/relnode/OlapContextTest.java
index 5dbeb0cbc6..0558902d7b 100644
--- 
a/src/query/src/test/java/org/apache/kylin/query/relnode/OlapContextTest.java
+++ 
b/src/query/src/test/java/org/apache/kylin/query/relnode/OlapContextTest.java
@@ -35,20 +35,48 @@ class OlapContextTest {
     }
 
     @Test
-    void isAsyncQuery() {
+    void isUniqueAsyncQuery() {
         OlapContext mock = new OlapContext(1);
         val config = Mockito.mock(KylinConfig.class);
         Mockito.when(config.isUniqueAsyncQueryYarnQueue()).thenReturn(true);
         QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
-        boolean asyncQuery = mock.isAsyncQuery(config);
+        boolean asyncQuery = mock.isUniqueAsyncQuery(config);
         Assertions.assertTrue(asyncQuery);
 
         Mockito.when(config.isUniqueAsyncQueryYarnQueue()).thenReturn(false);
-        asyncQuery = mock.isAsyncQuery(config);
+        asyncQuery = mock.isUniqueAsyncQuery(config);
         Assertions.assertFalse(asyncQuery);
 
         QueryContext.current().getQueryTagInfo().setAsyncQuery(false);
-        asyncQuery = mock.isAsyncQuery(config);
+        Mockito.when(config.isUniqueAsyncQueryYarnQueue()).thenReturn(true);
+        asyncQuery = mock.isUniqueAsyncQuery(config);
+        Assertions.assertFalse(asyncQuery);
+    }
+
+    @Test
+    void asyncQueryUseGlutenEnabled() {
+        OlapContext mock = new OlapContext(1);
+        val config = Mockito.mock(KylinConfig.class);
+        Mockito.when(config.isUniqueAsyncQueryYarnQueue()).thenReturn(true);
+        
Mockito.when(config.uniqueAsyncQueryUseGlutenEnabled()).thenReturn(true);
+        QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
+        boolean asyncQuery = mock.asyncQueryUseGlutenEnabled(config);
+        Assertions.assertTrue(asyncQuery);
+
+        Mockito.when(config.isUniqueAsyncQueryYarnQueue()).thenReturn(false);
+        asyncQuery = mock.asyncQueryUseGlutenEnabled(config);
+        Assertions.assertFalse(asyncQuery);
+
+        
Mockito.when(config.uniqueAsyncQueryUseGlutenEnabled()).thenReturn(false);
+        asyncQuery = mock.asyncQueryUseGlutenEnabled(config);
+        Assertions.assertFalse(asyncQuery);
+
+        Mockito.when(config.isUniqueAsyncQueryYarnQueue()).thenReturn(false);
+        asyncQuery = mock.asyncQueryUseGlutenEnabled(config);
+        Assertions.assertFalse(asyncQuery);
+
+        QueryContext.current().getQueryTagInfo().setAsyncQuery(false);
+        asyncQuery = mock.asyncQueryUseGlutenEnabled(config);
         Assertions.assertFalse(asyncQuery);
     }
 }
diff --git 
a/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java 
b/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
index 612233e66e..b47cf429d9 100644
--- 
a/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
+++ 
b/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
@@ -58,8 +58,15 @@ public class SchemaConverterTest {
 
         QueryContext.current().getQueryTagInfo().setPushdown(true);
 
+        checkAsyncQuery(sql, expectedSql);
+
+        Assertions.assertEquals(expectedSql, converter.convert(sql, "default", 
null));
+    }
+
+    private void checkAsyncQuery(String sql, String expectedSql) {
         QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
         
getTestConfig().setProperty("kylin.query.unique-async-query-yarn-queue-enabled",
 "true");
+        getTestConfig().setProperty("kylin.unique-async-query.gluten.enabled", 
"false");
         Assertions.assertEquals(sql, converter.convert(sql, "default", null));
 
         QueryContext.current().getQueryTagInfo().setAsyncQuery(false);
@@ -68,9 +75,15 @@ public class SchemaConverterTest {
         
getTestConfig().setProperty("kylin.query.unique-async-query-yarn-queue-enabled",
 "false");
         Assertions.assertEquals(expectedSql, converter.convert(sql, "default", 
null));
 
+        getTestConfig().setProperty("kylin.unique-async-query.gluten.enabled", 
"true");
+        Assertions.assertEquals(expectedSql, converter.convert(sql, "default", 
null));
+
         QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
+        
getTestConfig().setProperty("kylin.query.unique-async-query-yarn-queue-enabled",
 "false");
         Assertions.assertEquals(expectedSql, converter.convert(sql, "default", 
null));
 
+        
getTestConfig().setProperty("kylin.query.unique-async-query-yarn-queue-enabled",
 "true");
+        getTestConfig().setProperty("kylin.unique-async-query.gluten.enabled", 
"true");
         Assertions.assertEquals(expectedSql, converter.convert(sql, "default", 
null));
     }
 
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 7e03bbfc6b..1fcfdb97c8 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -342,10 +342,7 @@ public abstract class SparkApplication implements 
Application {
                 Unsafe.setProperty("kylin.env", config.getDeployEnv());
             }
 
-            if (className != null && 
!className.equals(InternalTableLoadJob.class.getName())) {
-                
ss.sparkContext().setLocalProperty("gluten.enabledForCurrentThread", "false");
-                logger.info("Disable gluten for normal build");
-            }
+            disableCurrentThreadGlutenIfNeed();
 
             logger.info("Start job");
             infos.startJob();
@@ -372,6 +369,13 @@ public abstract class SparkApplication implements 
Application {
         }
     }
 
+    public void disableCurrentThreadGlutenIfNeed() {
+        if (!config.buildUseGlutenEnabled() && 
!className.equals(InternalTableLoadJob.class.getName())) {
+            
ss.sparkContext().setLocalProperty("gluten.enabledForCurrentThread", "false");
+            logger.info("Disable current thread gluten for Build Job");
+        }
+    }
+
     protected void handleException(Exception e) throws Exception {
         if (e instanceof AccessControlException) {
             interceptAccessControlException(e);
@@ -649,14 +653,17 @@ public abstract class SparkApplication implements 
Application {
         }
     }
 
-    @VisibleForTesting
-    void exchangeSparkConf(SparkConf sparkConf) throws Exception {
+    public Map<String, String> removeGlutenParamsIfNeed(Map<String, String> 
baseSparkConf) {
+        if (!config.buildUseGlutenEnabled() && 
!className.equals(InternalTableLoadJob.class.getName())) {
+            return ExecutableUtil.removeGultenParams(baseSparkConf);
+        }
+        return baseSparkConf;
+    }
+
+    public void exchangeSparkConf(SparkConf sparkConf) throws Exception {
         if (isJobOnCluster(sparkConf) && !(this instanceof ResourceDetect)) {
             Map<String, String> baseSparkConf = getSparkConfigOverride(config);
-            if (className != null && 
!className.equals(InternalTableLoadJob.class.getName())) {
-                baseSparkConf = 
ExecutableUtil.removeGultenParams(baseSparkConf);
-            }
-
+            baseSparkConf = removeGlutenParamsIfNeed(baseSparkConf);
             if (!baseSparkConf.isEmpty()) {
                 baseSparkConf.forEach(sparkConf::set);
                 String baseSparkConfStr = 
JsonUtil.writeValueAsString(baseSparkConf);
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 93b5f40258..a1d3cf4d77 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -481,12 +481,17 @@ public class NSparkExecutable extends AbstractExecutable 
implements ChainedStage
         if (UserGroupInformation.isSecurityEnabled()) {
             confMap.put("spark.hadoop.hive.metastore.sasl.enabled", "true");
         }
-        if (!isInternalTableSparkJob()) {
+        if (!needRemoveGlutenParams(config)) {
             return ExecutableUtil.removeGultenParams(confMap);
         }
         return confMap;
     }
 
+    public boolean needRemoveGlutenParams(KylinConfig config) {
+        // need to remove gulten params, return false
+        return isInternalTableSparkJob() || config.buildUseGlutenEnabled();
+    }
+
     private ExecuteResult runLocalMode(String appArgs) {
         try {
             Class<?> appClz = ClassUtil.forName(getSparkSubmitClassName(), 
Object.class);
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/SegmentBuildJob.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/SegmentBuildJob.java
index 2af01a3f18..83db0d919b 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/SegmentBuildJob.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/SegmentBuildJob.java
@@ -44,6 +44,7 @@ import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.engine.spark.builder.SnapshotBuilder;
 import org.apache.kylin.engine.spark.job.step.ParamPropagation;
 import org.apache.kylin.engine.spark.job.step.StageExec;
+import org.apache.kylin.engine.spark.job.step.build.BuildStepExec;
 import org.apache.kylin.fileseg.FileSegments;
 import org.apache.kylin.guava30.shaded.common.base.Throwables;
 import org.apache.kylin.guava30.shaded.common.collect.Lists;
@@ -115,7 +116,7 @@ public class SegmentBuildJob extends SegmentJob {
                 infos.clearCuboidsNumPerLayer(segment.getId());
 
                 val stepId = StringUtils.replace(infos.getJobStepId(), 
JOB_NAME_PREFIX, "");
-                val step = new StepExec(stepId);
+                val step = new BuildStepExec(stepId);
 
                 final ParamPropagation params = new ParamPropagation();
 
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/StepExec.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/StepExec.scala
index 5e7fb62f8e..2ae949001c 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/StepExec.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/StepExec.scala
@@ -18,13 +18,12 @@
 
 package org.apache.kylin.engine.spark.job
 
-import java.io.IOException
-import java.util
-import java.util.Locale
-
 import org.apache.kylin.engine.spark.job.step.StageExec
 import org.apache.spark.internal.Logging
 
+import java.io.IOException
+import java.util
+import java.util.Locale
 import scala.collection.JavaConverters._
 
 class StepExec(stepId: String) extends Logging {
@@ -35,7 +34,7 @@ class StepExec(stepId: String) extends Logging {
   }
 
   def addStage(stage: StageExec): Unit = {
-    val stageId = subStageList.size + 1
+    val stageId = getStageId()
     stage.setStageId(getStepId + "_" + String.format(Locale.ROOT, "%02d", 
Integer.valueOf(stageId)))
     this.subStageList.add(stage)
   }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStage.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStage.scala
index 7c93796593..6d3e985802 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStage.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStage.scala
@@ -39,8 +39,12 @@ class ResourceWaitStage(jobContext: SparkApplication) 
extends StageExec {
     jobStepId + "_00"
   }
 
+  def needCheckResource: Boolean = {
+    jobContext.isJobOnCluster(sparkConf) && config.getWaitResourceEnabled
+  }
+
   override def execute(): Unit = {
-    if (jobContext.isJobOnCluster(sparkConf)) {
+    if (needCheckResource) {
       val sleepSeconds = (Math.random * 60L).toLong
       logInfo(s"Sleep $sleepSeconds seconds to avoid submitting too many spark 
job at the same time.")
       KylinBuildEnv.get().buildJobInfos.startWait()
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/StageExec.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/StageExec.scala
index 2e6c5c4af9..19b2a805ce 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/StageExec.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/StageExec.scala
@@ -46,7 +46,7 @@ trait StageExec extends Logging {
   }
 
   def onStageStart(): Unit = {
-    if(getJobContext.isSkipFollowingStages(getSegmentId)){
+    if (getJobContext.isSkipFollowingStages(getSegmentId)) {
       return
     }
     updateStageInfo(ExecutableState.RUNNING.toString, null, null)
@@ -100,7 +100,7 @@ trait StageExec extends Logging {
     onStageStart()
     var state: ExecutableState = ExecutableState.SUCCEED
     try {
-      if(getJobContext.isSkipFollowingStages(getSegmentId)){
+      if (getJobContext.isSkipFollowingStages(getSegmentId)) {
         state = ExecutableState.SKIP
         return
       }
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStageTest.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/build/BuildStepExec.scala
similarity index 62%
copy from 
src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStageTest.scala
copy to 
src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/build/BuildStepExec.scala
index bd0dd4116c..fd058610a6 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStageTest.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/build/BuildStepExec.scala
@@ -16,19 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.engine.spark.job.step
+package org.apache.kylin.engine.spark.job.step.build
 
-import org.apache.kylin.engine.spark.application.SparkApplication
-import org.junit.Assert
-import org.mockito.Mockito
-import org.scalatest.funsuite.AnyFunSuite
+import org.apache.kylin.engine.spark.job.StepExec
 
-class ResourceWaitStageTest extends AnyFunSuite {
+class BuildStepExec(id: String) extends StepExec(id) {
 
-  test("test ResourceWaitStage getStageName") {
-    val sparkApplication = Mockito.mock(classOf[SparkApplication])
-
-    val wfr = new ResourceWaitStage(sparkApplication)
-    Assert.assertEquals("ResourceWaitStage", wfr.getStageName)
+  override def getStageId(): Int = {
+    subStageList.size + 2
   }
 }
diff --git 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
index e27af8fdc7..9d3296aeec 100644
--- 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
+++ 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
@@ -24,6 +24,7 @@ import static 
org.apache.kylin.job.constant.ExecutableConstants.SPARK_PLUGINS;
 import static 
org.apache.kylin.job.constant.ExecutableConstants.SPARK_SHUFFLE_MANAGER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
@@ -267,59 +268,6 @@ public class SparkApplicationTest extends 
NLocalWithSparkSessionTestBase {
         application.exchangeSparkConf(sparkConf);
     }
 
-    @Test
-    public void testNotInternalTableLoadJobRemoveGluten() throws Exception {
-        val sparkPrefix = "kylin.engine.spark-conf.";
-        val config = getTestConfig();
-        config.setProperty("kylin.env", "PROD");
-        config.setProperty(sparkPrefix + SPARK_PLUGINS, GLUTEN_PLUGIN + 
",org.apache.spark.kyuubi.KyuubiPlugin");
-        config.setProperty(sparkPrefix + "spark.gluten.enable", "true");
-        config.setProperty(sparkPrefix + "spark.master", "yarn");
-        config.setProperty(sparkPrefix + "spark.eventLog.enabled", "false");
-        val application = new SparkApplication() {
-            @Override
-            protected void doExecute() {
-                // do nothing
-            }
-        };
-        application.config = config;
-        assertWithGluten(application);
-
-        application.className = InternalTableLoadJob.class.getName();
-        assertWithGluten(application);
-
-        application.className = SegmentBuildJob.class.getName();
-        assertWithOutGluten(application);
-    }
-
-    private static void assertWithGluten(SparkApplication application) throws 
Exception {
-        val sparkConf = new SparkConf();
-        sparkConf.set("spark.master", "yarn");
-        sparkConf.set("spark.eventLog.enabled", "false");
-        application.exchangeSparkConf(sparkConf);
-        val atomicSparkConf = ((AtomicReference<SparkConf>) 
ReflectionTestUtils.getField(application,
-                "atomicSparkConf"));
-        val actalSparkConf = atomicSparkConf.get();
-        assertEquals(COLUMNAR_SHUFFLE_MANAGER, 
actalSparkConf.get(SPARK_SHUFFLE_MANAGER));
-        assertEquals("true", actalSparkConf.get("spark.gluten.enable"));
-        assertEquals(GLUTEN_PLUGIN + ",org.apache.spark.kyuubi.KyuubiPlugin", 
actalSparkConf.get(SPARK_PLUGINS));
-        assertEquals("yarn", actalSparkConf.get("spark.master"));
-        assertEquals("false", actalSparkConf.get("spark.eventLog.enabled"));
-    }
-
-    private static void assertWithOutGluten(SparkApplication application) 
throws Exception {
-        val sparkConf = new SparkConf();
-        sparkConf.set("spark.master", "yarn");
-        sparkConf.set("spark.eventLog.enabled", "false");
-        application.exchangeSparkConf(sparkConf);
-        val atomicSparkConf = ((AtomicReference<SparkConf>) 
ReflectionTestUtils.getField(application,
-                "atomicSparkConf"));
-        val actalSparkConf = atomicSparkConf.get();
-        assertFalse(Arrays.stream(actalSparkConf.getAll()).anyMatch(conf -> 
conf._1.contains("gluten")));
-        assertEquals("sort", actalSparkConf.get(SPARK_SHUFFLE_MANAGER));
-        assertEquals("org.apache.spark.kyuubi.KyuubiPlugin", 
actalSparkConf.get(SPARK_PLUGINS));
-    }
-
     @Test
     public void testUpdateJobErrorInfo() throws JsonProcessingException {
         val config = getTestConfig();
@@ -400,4 +348,103 @@ public class SparkApplicationTest extends 
NLocalWithSparkSessionTestBase {
         Mockito.verify(application.getReport(), 
Mockito.times(1)).updateSparkJobExtraInfo(paramsMap,
                 "/kylin/api/jobs/spark", null, null, json);
     }
+
+    @Test
+    public void testRemoveGlutenParamsIfNeed() throws Exception {
+        val sparkPrefix = "kylin.engine.spark-conf.";
+        val config = getTestConfig();
+        config.setProperty("kylin.env", "PROD");
+        config.setProperty(sparkPrefix + SPARK_PLUGINS, GLUTEN_PLUGIN + 
",org.apache.spark.kyuubi.KyuubiPlugin");
+        config.setProperty(sparkPrefix + "spark.gluten.enable", "true");
+        config.setProperty(sparkPrefix + "spark.master", "yarn");
+        config.setProperty(sparkPrefix + "spark.eventLog.enabled", "false");
+        config.setProperty("kylin.engine.gluten.enabled", "true");
+        val application = new SparkApplication() {
+            @Override
+            protected void doExecute() {
+                // do nothing
+            }
+        };
+        application.className = SegmentBuildJob.class.getName();
+        application.config = config;
+        assertWithGluten(application);
+
+        config.setProperty("kylin.engine.gluten.enabled", "false");
+        assertWithOutGluten(application);
+
+        application.className = InternalTableLoadJob.class.getName();
+        assertWithGluten(application);
+
+        application.className = SegmentBuildJob.class.getName();
+        assertWithOutGluten(application);
+    }
+
+    private static void assertWithGluten(SparkApplication application) throws 
Exception {
+        val sparkConf = new SparkConf();
+        sparkConf.set("spark.master", "yarn");
+        sparkConf.set("spark.eventLog.enabled", "false");
+        application.exchangeSparkConf(sparkConf);
+        val atomicSparkConf = ((AtomicReference<SparkConf>) 
ReflectionTestUtils.getField(application,
+                "atomicSparkConf"));
+        val actalSparkConf = atomicSparkConf.get();
+        assertEquals(COLUMNAR_SHUFFLE_MANAGER, 
actalSparkConf.get(SPARK_SHUFFLE_MANAGER));
+        assertEquals("true", actalSparkConf.get("spark.gluten.enable"));
+        assertEquals(GLUTEN_PLUGIN + ",org.apache.spark.kyuubi.KyuubiPlugin", 
actalSparkConf.get(SPARK_PLUGINS));
+        assertEquals("yarn", actalSparkConf.get("spark.master"));
+        assertEquals("false", actalSparkConf.get("spark.eventLog.enabled"));
+    }
+
+    private static void assertWithOutGluten(SparkApplication application) 
throws Exception {
+        val sparkConf = new SparkConf();
+        sparkConf.set("spark.master", "yarn");
+        sparkConf.set("spark.eventLog.enabled", "false");
+        application.exchangeSparkConf(sparkConf);
+        val atomicSparkConf = ((AtomicReference<SparkConf>) 
ReflectionTestUtils.getField(application,
+                "atomicSparkConf"));
+        val actalSparkConf = atomicSparkConf.get();
+        assertFalse(Arrays.stream(actalSparkConf.getAll()).anyMatch(conf -> 
conf._1.contains("gluten")));
+        assertEquals("sort", actalSparkConf.get(SPARK_SHUFFLE_MANAGER));
+        assertEquals("org.apache.spark.kyuubi.KyuubiPlugin", 
actalSparkConf.get(SPARK_PLUGINS));
+    }
+
+    @Test
+    public void testDisableCurrentThreadGlutenIfNeed() throws Exception {
+        val config = getTestConfig();
+        config.setProperty("kylin.engine.gluten.enabled", "true");
+        val application = new SparkApplication() {
+            @Override
+            protected void doExecute() {
+                // do nothing
+            }
+        };
+        application.className = SegmentBuildJob.class.getName();
+        application.config = config;
+        val sparkConf = new SparkConf();
+        sparkConf.set("spark.master", "local[111]");
+        assertNullEnableForCurrentThread(application, sparkConf);
+
+        config.setProperty("kylin.engine.gluten.enabled", "false");
+        assertFalseEnableForCurrentThread(application, sparkConf);
+
+        application.className = InternalTableLoadJob.class.getName();
+        assertNullEnableForCurrentThread(application, sparkConf);
+    }
+
+    private void assertNullEnableForCurrentThread(SparkApplication 
application, SparkConf sparkConf) {
+        try (val sparkSession = new 
SparkSession.Builder().config(sparkConf).getOrCreate();) {
+            application.ss = sparkSession;
+
+            application.disableCurrentThreadGlutenIfNeed();
+            
assertNull(ss.sparkContext().getLocalProperty("gluten.enabledForCurrentThread"));
+        }
+    }
+
+    private void assertFalseEnableForCurrentThread(SparkApplication 
application, SparkConf sparkConf) {
+        try (val sparkSession = new 
SparkSession.Builder().config(sparkConf).getOrCreate();) {
+            application.ss = sparkSession;
+
+            application.disableCurrentThreadGlutenIfNeed();
+            assertEquals("false", 
application.ss.sparkContext().getLocalProperty("gluten.enabledForCurrentThread"));
+        }
+    }
 }
diff --git 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkExecutableTest.java
 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkExecutableTest.java
index dfb897247c..ce369a610a 100644
--- 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkExecutableTest.java
+++ 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkExecutableTest.java
@@ -23,6 +23,7 @@ import java.util.Random;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigBase;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.common.util.RandomUtil;
@@ -258,4 +259,17 @@ public class NSparkExecutableTest extends 
NLocalFileMetadataTestCase {
         kylinConfigExt = sparkExecutable.getKylinConfigExt(kylinConfig, 
"default");
         Assert.assertEquals("123", 
kylinConfigExt.getOptional("kylin.engine.spark-conf.test", null));
     }
+
+    @Test
+    public void testNeedRemoveGlutenParams() {
+        KylinConfig kylinConfig = 
KylinConfig.createKylinConfig(getTestConfig());
+        NSparkExecutable sparkExecutable = new NSparkExecutable();
+
+        
Assert.assertFalse(sparkExecutable.needRemoveGlutenParams(kylinConfig));
+
+        kylinConfig.setProperty("kylin.engine.gluten.enabled", 
KylinConfigBase.TRUE);
+        Assert.assertTrue(sparkExecutable.needRemoveGlutenParams(kylinConfig));
+        kylinConfig.setProperty("kylin.engine.gluten.enabled", 
KylinConfigBase.FALSE);
+
+    }
 }
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStageTest.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStageTest.scala
index bd0dd4116c..d96dfd8723 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStageTest.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStageTest.scala
@@ -18,7 +18,10 @@
 
 package org.apache.kylin.engine.spark.job.step
 
+import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.engine.spark.application.SparkApplication
+import org.apache.kylin.metadata.streaming.ReflectionUtils
+import org.apache.spark.SparkConf
 import org.junit.Assert
 import org.mockito.Mockito
 import org.scalatest.funsuite.AnyFunSuite
@@ -31,4 +34,28 @@ class ResourceWaitStageTest extends AnyFunSuite {
     val wfr = new ResourceWaitStage(sparkApplication)
     Assert.assertEquals("ResourceWaitStage", wfr.getStageName)
   }
+
+  test("test ResourceWaitStage needCheckResource") {
+    val sparkApplication = Mockito.mock(classOf[SparkApplication])
+    val sparkConf = Mockito.mock(classOf[SparkConf])
+    val kylinConfig = Mockito.mock(classOf[KylinConfig])
+
+    val wfr = new ResourceWaitStage(sparkApplication)
+    ReflectionUtils.setField(wfr, "sparkConf", sparkConf)
+    ReflectionUtils.setField(wfr, "config", kylinConfig)
+
+    Mockito.when(sparkApplication.isJobOnCluster(sparkConf)).thenReturn(true)
+    Mockito.when(kylinConfig.getWaitResourceEnabled).thenReturn(true)
+    Assert.assertTrue(wfr.needCheckResource)
+
+    Mockito.when(sparkApplication.isJobOnCluster(sparkConf)).thenReturn(false)
+    Assert.assertFalse(wfr.needCheckResource)
+
+    Mockito.when(sparkApplication.isJobOnCluster(sparkConf)).thenReturn(true)
+    Mockito.when(kylinConfig.getWaitResourceEnabled).thenReturn(false)
+    Assert.assertFalse(wfr.needCheckResource)
+
+    Mockito.when(sparkApplication.isJobOnCluster(sparkConf)).thenReturn(false)
+    Assert.assertFalse(wfr.needCheckResource)
+  }
 }


Reply via email to