This is an automated email from the ASF dual-hosted git repository.

jlfsdtc pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new 5082123921 KYLIN-6010 Only InternalTableLoadingJob use gluten
5082123921 is described below

commit 5082123921f2ef5f3a25dabc3adb4e0dd2226ccf
Author: jlf <[email protected]>
AuthorDate: Sat Oct 12 15:57:03 2024 +0800

    KYLIN-6010 Only InternalTableLoadingJob use gluten
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   5 +-
 .../apache/kylin/common/KylinConfigBaseTest.java   |  27 +++++
 .../apache/kylin/job/common/ExecutableUtil.java    |  29 +++++
 .../kylin/job/constant/ExecutableConstants.java    |   6 +
 .../kylin/job/execution/AbstractExecutable.java    |   4 +
 .../kylin/job/common/ExecutableUtilTest.java       |  46 +++++++
 .../spark/job/InternalTableLoadingJobTest.java     | 134 +++++++++++++++++++++
 .../apache/kylin/query/relnode/OlapContext.java    |   9 +-
 .../apache/kylin/query/util/SchemaConverter.java   |   4 +
 .../apache/kylin/rest/service/AsyncQueryJob.java   |   7 +-
 .../kylin/query/engine/AsyncQueryApplication.java  |   6 +
 .../kylin/query/relnode/OlapContextTest.java       |  54 +++++++++
 .../kylin/query/util/SchemaConverterTest.java      |  29 +++--
 .../engine/spark/application/SparkApplication.java |  22 +++-
 .../engine/spark/job/InternalTableLoadingStep.java |   4 +
 .../kylin/job/execution/NSparkExecutable.java      |   7 +-
 .../spark/application/SparkApplicationTest.java    | 132 ++++++++++++++++++--
 .../streaming/jobs/impl/StreamingJobLauncher.java  |   2 +-
 18 files changed, 495 insertions(+), 32 deletions(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index cb03a4ca63..0ad06127dc 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1564,7 +1564,7 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
SizeConvertUtil.byteStringAs(getOptional("kylin.streaming.custom-jar-size", 
"20mb"), ByteUnit.BYTE);
     }
 
-    public String getKylinExtJarsPath() {
+    public String getKylinExtJarsPath(Boolean withGluten) {
         String kylinHome = getKylinHome();
         if (StringUtils.isEmpty(kylinHome)) {
             return "";
@@ -1575,6 +1575,9 @@ public abstract class KylinConfigBase implements 
Serializable {
         }
         StringBuilder extJar = new StringBuilder();
         for (File file : files) {
+            if (!withGluten && StringUtils.containsIgnoreCase(file.getName(), 
"gluten")) {
+                continue;
+            }
             extJar.append(",");
             extJar.append(file.getAbsolutePath());
         }
diff --git 
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
 
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
index 3d995ae7af..3ef0e75bc6 100644
--- 
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
+++ 
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
@@ -44,6 +44,7 @@ import static 
org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_SOURC
 import static 
org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_SOURCE_NAME_KEY;
 import static org.apache.kylin.common.constant.Constants.SNAPSHOT_AUTO_REFRESH;
 
+import java.io.File;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -60,6 +61,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.Shell;
@@ -1522,6 +1524,31 @@ class KylinConfigBaseTest {
         Assertions.assertEquals(3003, map.get(9).get(2));
         Assertions.assertEquals(5002, map.get(13).get(1));
     }
+
+    @Test
+    public void getKylinExtJarsPath() throws Exception {
+        val config = KylinConfig.getInstanceFromEnv();
+        val kylinHome = KylinConfig.getKylinHome();
+        val libExtDir = new File(kylinHome + File.separator + "lib/ext");
+        FileUtils.forceMkdir(libExtDir);
+        val glutenJar = new File(libExtDir, "gluten.jar");
+        FileUtils.write(glutenJar, "gluten jar");
+        val celebornJar = new File(libExtDir, "celeborn-client-spark-3.jar");
+        FileUtils.write(celebornJar, "celeborn client spark3 jar");
+        val mysqlJar = new File(libExtDir, "mysqlJar.jar");
+        FileUtils.write(mysqlJar, "mysqlJar jar");
+        val glutenCelebornJar = new File(libExtDir, 
"gluten-celeborn-clickhouse-jar-with-dependencies.jar");
+        FileUtils.write(glutenCelebornJar, "gluten celeborn jar");
+
+        val withGluten = config.getKylinExtJarsPath(true);
+        val withGlutenExpected = "," + glutenJar.getAbsolutePath() + "," + 
celebornJar.getAbsolutePath() + ","
+                + mysqlJar.getAbsolutePath() + "," + 
glutenCelebornJar.getAbsolutePath();
+        Assertions.assertEquals(withGlutenExpected, withGluten);
+
+        val withoutGluten = config.getKylinExtJarsPath(false);
+        val withoutExpected = "," + celebornJar.getAbsolutePath() + "," + 
mysqlJar.getAbsolutePath();
+        Assertions.assertEquals(withoutExpected, withoutGluten);
+    }
 }
 
 class EnvironmentUpdateUtils {
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/common/ExecutableUtil.java 
b/src/core-job/src/main/java/org/apache/kylin/job/common/ExecutableUtil.java
index 824ea19fa2..29c33ee0c9 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/common/ExecutableUtil.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/common/ExecutableUtil.java
@@ -20,8 +20,15 @@ package org.apache.kylin.job.common;
 
 import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_INDEX_FAIL;
 import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_MULTI_PARTITION_EMPTY;
+import static 
org.apache.kylin.job.constant.ExecutableConstants.COLUMNAR_SHUFFLE_MANAGER;
+import static org.apache.kylin.job.constant.ExecutableConstants.GLUTEN_PLUGIN;
+import static org.apache.kylin.job.constant.ExecutableConstants.GLUTEN_PREFIX;
+import static org.apache.kylin.job.constant.ExecutableConstants.SPARK_PLUGINS;
+import static 
org.apache.kylin.job.constant.ExecutableConstants.SPARK_SHUFFLE_MANAGER;
 
+import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -29,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.exception.code.ErrorCodeProducer;
@@ -151,4 +159,25 @@ public abstract class ExecutableUtil {
      */
     public void computePartitions(JobParam jobParam) {
     }
+
+    public static Map<String, String> removeGultenParams(Map<String, String> 
params) {
+        params.computeIfPresent(SPARK_PLUGINS, (pluginKey, pluginValue) -> {
+            String tempPluginValue = pluginValue;
+            String comma = ",";
+            if (StringUtils.contains(pluginValue, GLUTEN_PLUGIN)) {
+                tempPluginValue = Arrays.stream(tempPluginValue.split(comma))
+                        .filter(p -> !StringUtils.equals(p, 
GLUTEN_PLUGIN)).collect(Collectors.joining(comma));
+            }
+            return tempPluginValue;
+        });
+        params.computeIfPresent(SPARK_SHUFFLE_MANAGER, (pluginKey, 
pluginValue) -> {
+            String tempPluginValue = pluginValue;
+            if (StringUtils.equals(pluginValue, COLUMNAR_SHUFFLE_MANAGER)) {
+                tempPluginValue = "sort";
+            }
+            return tempPluginValue;
+        });
+        return params.entrySet().stream().filter(e -> 
!e.getKey().startsWith(GLUTEN_PREFIX))
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+    }
 }
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 29ddd7cc95..eaf115e9ee 100644
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -25,6 +25,12 @@ public final class ExecutableConstants {
     private ExecutableConstants() {
     }
 
+    public static final String SPARK_PLUGINS = "spark.plugins";
+    public static final String GLUTEN_PREFIX = "spark.gluten.";
+    public static final String GLUTEN_PLUGIN = 
"org.apache.gluten.GlutenPlugin";
+    public static final String COLUMNAR_SHUFFLE_MANAGER = 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager";
+    public static final String SPARK_SHUFFLE_MANAGER = "spark.shuffle.manager";
+
     public static final String YARN_APP_ID = "yarn_application_id";
     public static final String YARN_APP_IDS = "yarn_application_ids";
     public static final String YARN_APP_IDS_DELIMITER = ",";
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 2bdaf17928..cf11a5d502 100644
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -958,4 +958,8 @@ public abstract class AbstractExecutable extends 
AbstractJobExecutable implement
             return true;
         });
     }
+
+    public boolean isInternalTableSparkJob() {
+        return false;
+    }
 }
diff --git 
a/src/core-job/src/test/java/org/apache/kylin/job/common/ExecutableUtilTest.java
 
b/src/core-job/src/test/java/org/apache/kylin/job/common/ExecutableUtilTest.java
index 129564e59a..de116146ab 100644
--- 
a/src/core-job/src/test/java/org/apache/kylin/job/common/ExecutableUtilTest.java
+++ 
b/src/core-job/src/test/java/org/apache/kylin/job/common/ExecutableUtilTest.java
@@ -18,6 +18,10 @@
 
 package org.apache.kylin.job.common;
 
+import static 
org.apache.kylin.job.constant.ExecutableConstants.COLUMNAR_SHUFFLE_MANAGER;
+import static org.apache.kylin.job.constant.ExecutableConstants.GLUTEN_PLUGIN;
+import static org.apache.kylin.job.constant.ExecutableConstants.SPARK_PLUGINS;
+import static 
org.apache.kylin.job.constant.ExecutableConstants.SPARK_SHUFFLE_MANAGER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -26,9 +30,13 @@ import static org.mockito.Mockito.when;
 import java.util.Collections;
 
 import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.job.model.JobParam;
 import org.junit.Test;
 
+import lombok.val;
+import lombok.var;
+
 public class ExecutableUtilTest {
 
     @Test
@@ -45,4 +53,42 @@ public class ExecutableUtilTest {
                     e.toString());
         }
     }
+
+    @Test
+    public void removeGultenParams() {
+        val requestMap = Maps.<String, String> newHashMap();
+        requestMap.put(SPARK_PLUGINS, GLUTEN_PLUGIN);
+        var params = ExecutableUtil.removeGultenParams(requestMap);
+        assertEquals(1, params.size());
+        assertEquals("", params.get(SPARK_PLUGINS));
+
+        requestMap.put(SPARK_PLUGINS,
+                GLUTEN_PLUGIN + 
",org.apache.gluten.GlutenPlugin,org.apache.spark.kyuubi.KyuubiPlugin");
+        params = ExecutableUtil.removeGultenParams(requestMap);
+        assertEquals(1, params.size());
+        assertEquals("org.apache.spark.kyuubi.KyuubiPlugin", 
params.get(SPARK_PLUGINS));
+
+        requestMap.put("spark.gluten.enable", "true");
+        params = ExecutableUtil.removeGultenParams(requestMap);
+        assertEquals(1, params.size());
+        assertEquals("org.apache.spark.kyuubi.KyuubiPlugin", 
params.get(SPARK_PLUGINS));
+
+        requestMap.put(SPARK_SHUFFLE_MANAGER, 
"org.apache.spark.shuffle.sort.SortShuffleManager");
+        params = ExecutableUtil.removeGultenParams(requestMap);
+        assertEquals(2, params.size());
+        assertEquals("org.apache.spark.kyuubi.KyuubiPlugin", 
params.get(SPARK_PLUGINS));
+        assertEquals("org.apache.spark.shuffle.sort.SortShuffleManager", 
params.get(SPARK_SHUFFLE_MANAGER));
+
+        requestMap.put(SPARK_SHUFFLE_MANAGER, "sort");
+        params = ExecutableUtil.removeGultenParams(requestMap);
+        assertEquals(2, params.size());
+        assertEquals("org.apache.spark.kyuubi.KyuubiPlugin", 
params.get(SPARK_PLUGINS));
+        assertEquals("sort", params.get(SPARK_SHUFFLE_MANAGER));
+
+        requestMap.put(SPARK_SHUFFLE_MANAGER, COLUMNAR_SHUFFLE_MANAGER);
+        params = ExecutableUtil.removeGultenParams(requestMap);
+        assertEquals(2, params.size());
+        assertEquals("org.apache.spark.kyuubi.KyuubiPlugin", 
params.get(SPARK_PLUGINS));
+        assertEquals("sort", params.get(SPARK_SHUFFLE_MANAGER));
+    }
 }
diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJobTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJobTest.java
new file mode 100644
index 0000000000..156d24faa1
--- /dev/null
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJobTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.job;
+
+import static org.apache.kylin.job.execution.JobTypeEnum.INTERNAL_TABLE_BUILD;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.common.AbstractTestCase;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.spark.NLocalWithSparkSessionTestBase;
+import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils;
+import org.apache.kylin.job.handler.InternalTableJobHandler;
+import org.apache.kylin.job.model.JobParam;
+import org.apache.kylin.job.service.InternalTableLoadingService;
+import org.apache.kylin.junit.annotation.MetadataInfo;
+import org.apache.kylin.metadata.cube.model.NBatchConstants;
+import org.apache.kylin.metadata.model.NTableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.streaming.ReflectionUtils;
+import org.apache.kylin.metadata.table.InternalTableDesc;
+import org.apache.kylin.metadata.table.InternalTableManager;
+import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.service.InternalTableService;
+import org.apache.kylin.rest.service.TableService;
+import org.apache.kylin.rest.util.AclEvaluate;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.Spy;
+import org.springframework.security.authentication.TestingAuthenticationToken;
+import org.springframework.security.core.context.SecurityContextHolder;
+
+import lombok.val;
+
+@MetadataInfo
+class InternalTableLoadingJobTest extends AbstractTestCase {
+    protected static final String PROJECT = "default";
+    protected static final String TABLE_INDENTITY = "DEFAULT.TEST_KYLIN_FACT";
+    protected static final String DATE_COL = "CAL_DT";
+
+    @Mock
+    protected AclEvaluate aclEvaluate = Mockito.spy(AclEvaluate.class);
+    @Spy
+    protected InternalTableLoadingService internalTableLoadingService = 
Mockito.spy(new InternalTableLoadingService());
+    @InjectMocks
+    protected InternalTableService internalTableService = Mockito.spy(new 
InternalTableService());
+
+    @InjectMocks
+    protected TableService tableService = mock(TableService.class);
+
+    @BeforeAll
+    public static void beforeClass() {
+        NLocalWithSparkSessionTestBase.beforeClass();
+    }
+
+    @AfterAll
+    public static void afterClass() {
+        NLocalWithSparkSessionTestBase.afterClass();
+    }
+
+    @BeforeEach
+    void setUp() throws Exception {
+        MockitoAnnotations.openMocks(this);
+        SecurityContextHolder.getContext()
+                .setAuthentication(new TestingAuthenticationToken("ADMIN", 
"ADMIN", Constant.ROLE_ADMIN));
+        SparkJobFactoryUtils.initJobFactory();
+        overwriteSystemProp("kylin.source.provider.9", 
"org.apache.kylin.engine.spark.mockup.CsvSource");
+        ReflectionUtils.setField(internalTableService, "aclEvaluate", 
aclEvaluate);
+        ReflectionUtils.setField(internalTableService, 
"internalTableLoadingService", internalTableLoadingService);
+    }
+
+    protected InternalTableDesc getInternalTableDesc(KylinConfig config) 
throws Exception {
+        NTableMetadataManager tManager = 
NTableMetadataManager.getInstance(config, PROJECT);
+        InternalTableManager internalTableManager = 
InternalTableManager.getInstance(config, PROJECT);
+        TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
+        String[] partitionCols = new String[] { DATE_COL };
+        Map<String, String> tblProperties = new HashMap<>();
+        val datePartitionFormat = "yyyy-MM-dd";
+        when(tableService.getPartitionColumnFormat(any(), any(), any(), 
any())).thenReturn(datePartitionFormat);
+        internalTableService.createInternalTable(PROJECT, table.getName(), 
table.getDatabase(), partitionCols,
+                "yyyy-MM-dd", tblProperties, 
InternalTableDesc.StorageType.PARQUET.name());
+        InternalTableDesc internalTable = 
internalTableManager.getInternalTableDesc(TABLE_INDENTITY);
+        Assertions.assertNotNull(internalTable);
+        return internalTable;
+    }
+
+    @Test
+    void isInternalTableSparkJob() throws Exception {
+        val config = KylinConfig.getInstanceFromEnv();
+        val internalTable = getInternalTableDesc(config);
+        val jobParam = new 
JobParam().withProject(PROJECT).withTable(internalTable.getIdentity()).withYarnQueue(null)
+                .withJobTypeEnum(INTERNAL_TABLE_BUILD).withOwner("UT")
+                .addExtParams(NBatchConstants.P_INCREMENTAL_BUILD, 
String.valueOf(false))
+                .addExtParams(NBatchConstants.P_OUTPUT_MODE, 
String.valueOf(false))
+                .addExtParams(NBatchConstants.P_START_DATE, 
"").addExtParams(NBatchConstants.P_END_DATE, "");
+        val internalTableJobParam = new 
InternalTableJobHandler.InternalTableJobBuildParam(jobParam);
+        val internalTableLoadingJob = 
InternalTableLoadingJob.create(internalTableJobParam);
+        
Assertions.assertFalse(internalTableLoadingJob.isInternalTableSparkJob());
+        internalTableLoadingJob.getTasks().forEach(task -> {
+            if (task instanceof InternalTableLoadingStep) {
+                Assertions.assertTrue(task.isInternalTableSparkJob());
+            } else {
+                Assertions.assertFalse(task.isInternalTableSparkJob());
+            }
+        });
+    }
+}
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OlapContext.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OlapContext.java
index 1e0ef2e8f9..04b241223c 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OlapContext.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OlapContext.java
@@ -38,6 +38,7 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.guava30.shaded.common.base.Preconditions;
 import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.guava30.shaded.common.collect.Maps;
@@ -76,7 +77,7 @@ public class OlapContext {
 
     private static final Logger logger = 
LoggerFactory.getLogger(OlapContext.class);
     public static final String PRM_ACCEPT_PARTIAL_RESULT = 
"AcceptPartialResult";
-    public static final Set<String> UNSUPPORTED_FUNCTION_IN_LOOKUP = new 
HashSet<>(
+    public static final HashSet<String> UNSUPPORTED_FUNCTION_IN_LOOKUP = new 
HashSet<>(
             Collections.singleton(FunctionDesc.FUNC_INTERSECT_COUNT));
 
     private final int id;
@@ -464,7 +465,7 @@ public class OlapContext {
             if (tableDesc == null) {
                 return policy;
             }
-            if (olapConfig.isInternalTableEnabled() && 
tableDesc.isHasInternal()) {
+            if (olapConfig.isInternalTableEnabled() && 
tableDesc.isHasInternal() && !isAsyncQuery(olapConfig)) {
                 logger.info("Hit internal table {}", factTable);
                 policy = getSQLDigest().isDigestOfRawQuery()//
                         ? NLookupCandidate.Policy.INTERNAL_TABLE
@@ -479,6 +480,10 @@ public class OlapContext {
         return policy;
     }
 
+    public boolean isAsyncQuery(KylinConfig olapConfig) {
+        return QueryContext.current().getQueryTagInfo().isAsyncQuery() && 
olapConfig.isUniqueAsyncQueryYarnQueue();
+    }
+
     public String incapableMsg() {
         StringBuilder buf = new StringBuilder("OlapContext");
         if (incapableInfo.getReason() != null) {
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
index dbbb0b4291..f2f429850b 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
@@ -55,6 +55,10 @@ public class SchemaConverter implements IPushDownConverter {
             log.debug("Pushdown tag is not found, skip it.");
             return originSql;
         }
+        if (QueryContext.current().getQueryTagInfo().isAsyncQuery() && 
config.isUniqueAsyncQueryYarnQueue()) {
+            log.debug("Async query, skip it");
+            return originSql;
+        }
         try {
             String transformedSql = transform(originSql, project, config);
             QueryContext.current().setPushdownEngine("GLUTEN");
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryJob.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryJob.java
index b62c7bd8e6..f7699318ce 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryJob.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryJob.java
@@ -61,6 +61,7 @@ import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.engine.spark.job.DefaultSparkBuildJobHandler;
 import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
 import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.job.common.ExecutableUtil;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.job.execution.JobTypeEnum;
@@ -137,7 +138,7 @@ public class AsyncQueryJob extends NSparkExecutable {
         if (UserGroupInformation.isSecurityEnabled()) {
             overrides.put("spark.hadoop.hive.metastore.sasl.enabled", "true");
         }
-        return overrides;
+        return ExecutableUtil.removeGultenParams(overrides);
     }
 
     @Override
@@ -147,7 +148,7 @@ public class AsyncQueryJob extends NSparkExecutable {
 
     @Override
     protected String getExtJar() {
-        return getConfig().getKylinExtJarsPath();
+        return getConfig().getKylinExtJarsPath(false);
     }
 
     @Override
@@ -229,5 +230,7 @@ public class AsyncQueryJob extends NSparkExecutable {
         if (!KylinInfoExtension.getFactory().checkKylinInfo()) {
             props.setProperty("kylin.streaming.enabled", KylinConfig.FALSE);
         }
+        props.put("kylin.internal-table-enabled", KylinConfig.FALSE);
+        
props.remove("kylin.storage.columnar.spark-conf.spark.sql.catalog.INTERNAL_CATALOG");
     }
 }
diff --git 
a/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java
 
b/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java
index b1cf0a6571..e76a2d2b3d 100644
--- 
a/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java
+++ 
b/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java
@@ -42,6 +42,7 @@ import org.apache.kylin.metadata.query.util.QueryHistoryUtil;
 import org.apache.kylin.query.util.AsyncQueryUtil;
 import org.apache.kylin.query.util.QueryParams;
 import org.apache.spark.sql.KapFunctions;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.udf.UdfManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,6 +93,11 @@ public class AsyncQueryApplication extends SparkApplication {
         }
     }
 
+    @Override
+    public void reportSparkJobExtraInfo(SparkSession sparkSession) {
+        // do nothing
+    }
+
     @Override
     protected Map<String, String> getSparkConfigOverride(KylinConfig config) {
         return config.getAsyncQuerySparkConfigOverride();
diff --git 
a/src/query/src/test/java/org/apache/kylin/query/relnode/OlapContextTest.java 
b/src/query/src/test/java/org/apache/kylin/query/relnode/OlapContextTest.java
new file mode 100644
index 0000000000..5dbeb0cbc6
--- /dev/null
+++ 
b/src/query/src/test/java/org/apache/kylin/query/relnode/OlapContextTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.relnode;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import lombok.val;
+
+class OlapContextTest {
+
+    @AfterEach
+    void tearDown() {
+        QueryContext.current().close();
+    }
+
+    @Test
+    void isAsyncQuery() {
+        OlapContext mock = new OlapContext(1);
+        val config = Mockito.mock(KylinConfig.class);
+        Mockito.when(config.isUniqueAsyncQueryYarnQueue()).thenReturn(true);
+        QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
+        boolean asyncQuery = mock.isAsyncQuery(config);
+        Assertions.assertTrue(asyncQuery);
+
+        Mockito.when(config.isUniqueAsyncQueryYarnQueue()).thenReturn(false);
+        asyncQuery = mock.isAsyncQuery(config);
+        Assertions.assertFalse(asyncQuery);
+
+        QueryContext.current().getQueryTagInfo().setAsyncQuery(false);
+        asyncQuery = mock.isAsyncQuery(config);
+        Assertions.assertFalse(asyncQuery);
+    }
+}
diff --git 
a/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java 
b/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
index 3ae2e79f35..647f33244a 100644
--- 
a/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
+++ 
b/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
@@ -35,6 +35,15 @@ public class SchemaConverterTest {
 
     @Test
     void testCatalogConvert() {
+        String sql = "select t1.id t_id from SSB.P_LINEORDER t1 left join 
\"SSB\".\"PART\" on t1.PARTKEY = PART.PARTKEY "
+                + "union all select * from SSB.LINEORDER union all select * 
from \"DEFAULT\".TEST_COUNTRY t";
+
+        String expectedSql = "select t1.id t_id from 
\"INTERNAL_CATALOG\".\"default\".\"SSB\".\"P_LINEORDER\" t1 "
+                + "left join \"INTERNAL_CATALOG\".\"default\".\"SSB\".\"PART\" 
on t1.PARTKEY = PART.PARTKEY "
+                + "union all select * from 
\"INTERNAL_CATALOG\".\"default\".\"SSB\".\"LINEORDER\" "
+                + "union all select * from 
\"INTERNAL_CATALOG\".\"default\".\"DEFAULT\".\"TEST_COUNTRY\" t";
+        Assertions.assertEquals(sql, converter.convert(sql, "default", null));
+
         getTestConfig().setProperty("kylin.internal-table-enabled", "true");
 
         InternalTableManager innerTableMgr = 
InternalTableManager.getInstance(getTestConfig(), "default");
@@ -43,17 +52,23 @@ public class SchemaConverterTest {
             innerTableMgr.createInternalTable(new 
InternalTableDesc(tableDesc));
         }
 
-        String sql = "select t1.id t_id from SSB.P_LINEORDER t1 left join 
\"SSB\".\"PART\" on t1.PARTKEY = PART.PARTKEY "
-                + "union all select * from SSB.LINEORDER union all select * 
from \"DEFAULT\".TEST_COUNTRY t";
+        Assertions.assertEquals(sql, converter.convert(sql, "default", null));
 
-        String expectedSql = "select t1.id t_id from 
\"INTERNAL_CATALOG\".\"default\".\"SSB\".\"P_LINEORDER\" t1 "
-                + "left join \"INTERNAL_CATALOG\".\"default\".\"SSB\".\"PART\" 
on t1.PARTKEY = PART.PARTKEY "
-                + "union all select * from 
\"INTERNAL_CATALOG\".\"default\".\"SSB\".\"LINEORDER\" "
-                + "union all select * from 
\"INTERNAL_CATALOG\".\"default\".\"DEFAULT\".\"TEST_COUNTRY\" t";
+        QueryContext.current().getQueryTagInfo().setPushdown(true);
 
+        QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
+        
getTestConfig().setProperty("kylin.query.unique-async-query-yarn-queue-enabled",
 "true");
         Assertions.assertEquals(sql, converter.convert(sql, "default", null));
 
-        QueryContext.current().getQueryTagInfo().setPushdown(true);
+        QueryContext.current().getQueryTagInfo().setAsyncQuery(false);
+        Assertions.assertEquals(expectedSql, converter.convert(sql, "default", 
null));
+
+        
getTestConfig().setProperty("kylin.query.unique-async-query-yarn-queue-enabled",
 "false");
+        Assertions.assertEquals(expectedSql, converter.convert(sql, "default", 
null));
+
+        QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
+        Assertions.assertEquals(expectedSql, converter.convert(sql, "default", 
null));
+
         Assertions.assertEquals(expectedSql, converter.convert(sql, "default", 
null));
     }
 }
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index b0ccbbf592..3d082c68d3 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -79,6 +79,7 @@ import org.apache.kylin.engine.spark.utils.HDFSUtils;
 import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
 import org.apache.kylin.engine.spark.utils.SparkConfHelper;
 import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
+import org.apache.kylin.job.common.ExecutableUtil;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.NSparkExecutable;
@@ -130,6 +131,7 @@ public abstract class SparkApplication implements 
Application {
     protected String project;
     protected int layoutSize = -1;
     protected BuildJobInfos infos;
+    protected String className;
 
     protected ConcurrentHashMap<String, Boolean> skipFollowingStagesMap = new 
ConcurrentHashMap<>();
     /**
@@ -286,6 +288,7 @@ public abstract class SparkApplication implements 
Application {
         String hdfsMetalUrl = getParam(NBatchConstants.P_DIST_META_URL);
         jobId = getParam(NBatchConstants.P_JOB_ID);
         project = getParam(NBatchConstants.P_PROJECT_NAME);
+        className = getParam(NBatchConstants.P_CLASS_NAME);
         if (getParam(NBatchConstants.P_LAYOUT_IDS) != null) {
             layoutSize = 
StringUtils.split(getParam(NBatchConstants.P_LAYOUT_IDS), ",").length;
         }
@@ -339,7 +342,6 @@ public abstract class SparkApplication implements 
Application {
                 Unsafe.setProperty("kylin.env", config.getDeployEnv());
             }
 
-            String className = getParam(NBatchConstants.P_CLASS_NAME);
             if (className != null && 
!className.equals(InternalTableLoadJob.class.getName())) {
                 
ss.sparkContext().setLocalProperty("gluten.enabledForCurrentThread", "false");
                 logger.info("Disable gluten for normal build");
@@ -651,6 +653,10 @@ public abstract class SparkApplication implements 
Application {
     void exchangeSparkConf(SparkConf sparkConf) throws Exception {
         if (isJobOnCluster(sparkConf) && !(this instanceof ResourceDetect)) {
             Map<String, String> baseSparkConf = getSparkConfigOverride(config);
+            if (className != null && 
!className.equals(InternalTableLoadJob.class.getName())) {
+                baseSparkConf = 
ExecutableUtil.removeGultenParams(baseSparkConf);
+            }
+
             if (!baseSparkConf.isEmpty()) {
                 baseSparkConf.forEach(sparkConf::set);
                 String baseSparkConfStr = 
JsonUtil.writeValueAsString(baseSparkConf);
@@ -687,11 +693,7 @@ public abstract class SparkApplication implements 
Application {
         }
 
         sparkSession = createSpark(sparkConf);
-        if (!config.isUTEnv()) {
-            Map<String, String> extraInfo = getTrackingInfo(sparkSession, 
config.isTrackingUrlIpAddressEnabled());
-            extraInfo.put("job_last_running_start_time", 
getParam(JOB_LAST_RUNNING_START_TIME));
-            getReport().updateSparkJobExtraInfo(getReportParams(), 
"/kylin/api/jobs/spark", project, jobId, extraInfo);
-        }
+        reportSparkJobExtraInfo(sparkSession);
 
         // for spark metrics
         JobMetricsUtils.registerListener(sparkSession);
@@ -705,6 +707,14 @@ public abstract class SparkApplication implements 
Application {
         atomicSparkSession.set(sparkSession);
     }
 
+    public void reportSparkJobExtraInfo(SparkSession sparkSession) {
+        if (!config.isUTEnv()) {
+            Map<String, String> extraInfo = getTrackingInfo(sparkSession, 
config.isTrackingUrlIpAddressEnabled());
+            extraInfo.put("job_last_running_start_time", 
getParam(JOB_LAST_RUNNING_START_TIME));
+            getReport().updateSparkJobExtraInfo(getReportParams(), 
"/kylin/api/jobs/spark", project, jobId, extraInfo);
+        }
+    }
+
     private void prepareSparkSession() throws NoRetryException {
         SparkConf sparkConf = atomicSparkConf.get();
         if (Objects.isNull(sparkConf)) {
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingStep.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingStep.java
index 3a84a01356..b9b718e8eb 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingStep.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingStep.java
@@ -91,4 +91,8 @@ public class InternalTableLoadingStep extends 
NSparkExecutable {
         return result;
     }
 
+    @Override
+    public boolean isInternalTableSparkJob() {
+        return true;
+    }
 }
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/job/execution/NSparkExecutable.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/job/execution/NSparkExecutable.java
index 9a541a67dc..4f23fa9b05 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/job/execution/NSparkExecutable.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/job/execution/NSparkExecutable.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.job.execution;
 
+import static org.apache.kylin.job.constant.ExecutableConstants.SPARK_PLUGINS;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
@@ -60,6 +62,7 @@ import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.job.JobContext;
+import org.apache.kylin.job.common.ExecutableUtil;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.exception.JobStoppedException;
 import org.apache.kylin.job.util.JobContextUtil;
@@ -118,7 +121,6 @@ public class NSparkExecutable extends AbstractExecutable 
implements ChainedStage
 
     public static final String JOB_LAST_RUNNING_START_TIME = 
"jobLastRunningStartTime";
 
-    protected static final String SPARK_PLUGINS = "spark.plugins";
     protected ISparkJobHandler sparkJobHandler;
 
     private final transient List<StageBase> stages = 
Lists.newCopyOnWriteArrayList();
@@ -474,6 +476,9 @@ public class NSparkExecutable extends AbstractExecutable 
implements ChainedStage
         if (UserGroupInformation.isSecurityEnabled()) {
             confMap.put("spark.hadoop.hive.metastore.sasl.enabled", "true");
         }
+        if (!isInternalTableSparkJob()) {
+            return ExecutableUtil.removeGultenParams(confMap);
+        }
         return confMap;
     }
 
diff --git 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
index c37b1b6ccf..e27af8fdc7 100644
--- 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
+++ 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
@@ -18,12 +18,22 @@
 
 package org.apache.kylin.engine.spark.application;
 
+import static 
org.apache.kylin.job.constant.ExecutableConstants.COLUMNAR_SHUFFLE_MANAGER;
+import static org.apache.kylin.job.constant.ExecutableConstants.GLUTEN_PLUGIN;
+import static org.apache.kylin.job.constant.ExecutableConstants.SPARK_PLUGINS;
+import static 
org.apache.kylin.job.constant.ExecutableConstants.SPARK_SHUFFLE_MANAGER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -31,12 +41,15 @@ import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.engine.spark.NLocalWithSparkSessionTestBase;
 import org.apache.kylin.engine.spark.job.BuildJobInfos;
+import org.apache.kylin.engine.spark.job.InternalTableLoadJob;
 import org.apache.kylin.engine.spark.job.KylinBuildEnv;
 import org.apache.kylin.engine.spark.job.MockJobProgressReport;
 import org.apache.kylin.engine.spark.job.ParamsConstants;
 import org.apache.kylin.engine.spark.job.RestfulJobProgressReport;
+import org.apache.kylin.engine.spark.job.SegmentBuildJob;
 import org.apache.kylin.engine.spark.scheduler.JobFailed;
 import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.guava30.shaded.common.collect.Sets;
@@ -49,9 +62,11 @@ import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.application.MockClusterManager;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.MockedStatic;
@@ -64,7 +79,6 @@ import lombok.val;
 
 public class SparkApplicationTest extends NLocalWithSparkSessionTestBase {
 
-
     File tempDir = new File("./temp/");
     File file1 = new File(tempDir, "temp1_" + ResourceDetectUtils.fileName());
     File file2 = new File(tempDir, "temp2_" + ResourceDetectUtils.fileName());
@@ -98,7 +112,7 @@ public class SparkApplicationTest extends 
NLocalWithSparkSessionTestBase {
         map2.put("1", 200L);
         ResourceDetectUtils.write(new Path(file2.getAbsolutePath()), map2);
 
-        Assert.assertEquals("300b", application.chooseContentSize(new 
Path(tempDir.getAbsolutePath())));
+        assertEquals("300b", application.chooseContentSize(new 
Path(tempDir.getAbsolutePath())));
     }
 
     @Test
@@ -134,7 +148,7 @@ public class SparkApplicationTest extends 
NLocalWithSparkSessionTestBase {
         params.put(ParamsConstants.JOB_TMP_DIR, 
getTestConfig().getJobTmpDir("test_job_output", true));
         Mockito.doReturn(Boolean.TRUE).when(report).updateSparkJobInfo(params, 
"/kylin/api/jobs/spark", payloadJson);
 
-        Assert.assertTrue(report.updateSparkJobExtraInfo(params, 
"/kylin/api/jobs/spark", "test_job_output",
+        assertTrue(report.updateSparkJobExtraInfo(params, 
"/kylin/api/jobs/spark", "test_job_output",
                 "cb91189b-2b12-4527-aa35-0130e7d54ec0", extraInfo));
 
         Mockito.verify(report).updateSparkJobInfo(params, 
"/kylin/api/jobs/spark", payloadJson);
@@ -144,7 +158,7 @@ public class SparkApplicationTest extends 
NLocalWithSparkSessionTestBase {
         
Mockito.doReturn("http://sandbox.hortonworks.com:8088/proxy/application_1561370224051_0160/";).when(application)
                 .getTrackingUrl(null, ss);
         
Mockito.doReturn(Boolean.FALSE).when(report).updateSparkJobInfo(params, 
"/kylin/api/jobs/spark", payloadJson);
-        Assert.assertFalse(report.updateSparkJobExtraInfo(params, 
"/kylin/api/jobs/spark", "test_job_output",
+        assertFalse(report.updateSparkJobExtraInfo(params, 
"/kylin/api/jobs/spark", "test_job_output",
                 "cb91189b-2b12-4527-aa35-0130e7d54ec0", extraInfo));
 
         Mockito.verify(report, Mockito.times(3)).updateSparkJobInfo(params, 
"/kylin/api/jobs/spark", payloadJson);
@@ -180,7 +194,7 @@ public class SparkApplicationTest extends 
NLocalWithSparkSessionTestBase {
         
Mockito.when(sparkApplication.checkRangePartitionTableIsExist(Mockito.any())).thenCallRealMethod();
         tableRefs.add(tableRef);
         nDataModel.setAllTableRefs(tableRefs);
-        
Assert.assertFalse(sparkApplication.checkRangePartitionTableIsExist(nDataModel));
+        
assertFalse(sparkApplication.checkRangePartitionTableIsExist(nDataModel));
 
         NDataModel nDataModel2 = new NDataModel();
         nDataModel2.setUuid(UUID.randomUUID().toString());
@@ -196,7 +210,7 @@ public class SparkApplicationTest extends 
NLocalWithSparkSessionTestBase {
         tableRefs.clear();
         tableRefs.add(tableRef);
         nDataModel2.setAllTableRefs(tableRefs);
-        
Assert.assertTrue(sparkApplication.checkRangePartitionTableIsExist(nDataModel2));
+        
assertTrue(sparkApplication.checkRangePartitionTableIsExist(nDataModel2));
     }
 
     @Test
@@ -206,15 +220,16 @@ public class SparkApplicationTest extends 
NLocalWithSparkSessionTestBase {
         SparkApplication application = new SparkApplication() {
             @Override
             protected void doExecute() {
+                // do nothing
             }
         };
         File upload = new File(path);
         FileUtils.forceMkdir(upload);
-        Assert.assertTrue(upload.exists());
+        assertTrue(upload.exists());
         config.setProperty(config.getKubernetesUploadPathKey(), path);
         ReflectionTestUtils.setField(application, "config", config);
         application.extraDestroy();
-        Assert.assertFalse(upload.exists());
+        assertFalse(upload.exists());
     }
 
     @Test
@@ -223,6 +238,7 @@ public class SparkApplicationTest extends 
NLocalWithSparkSessionTestBase {
         SparkApplication application = new SparkApplication() {
             @Override
             protected void doExecute() {
+                // do nothing
             }
         };
         application.config = config;
@@ -240,17 +256,70 @@ public class SparkApplicationTest extends 
NLocalWithSparkSessionTestBase {
         sparkConf.set("spark.eventLog.enabled", "false");
         sparkConf.set("spark.eventLog.dir", notExistedLogDir.toString());
         application.exchangeSparkConf(sparkConf);
-        assert !fs.exists(notExistedLogDir);
+        assertFalse(fs.exists(notExistedLogDir));
         sparkConf.set("spark.eventLog.enabled", "true");
         application.exchangeSparkConf(sparkConf);
-        assert fs.exists(notExistedLogDir);
+        assertTrue(fs.exists(notExistedLogDir));
         sparkConf.set("spark.eventLog.dir", existedLogDir.toString());
         application.exchangeSparkConf(sparkConf);
-        assert fs.exists(existedLogDir);
+        assertTrue(fs.exists(existedLogDir));
         sparkConf.set("spark.eventLog.dir", "");
         application.exchangeSparkConf(sparkConf);
     }
 
+    @Test
+    public void testNotInternalTableLoadJobRemoveGluten() throws Exception {
+        val sparkPrefix = "kylin.engine.spark-conf.";
+        val config = getTestConfig();
+        config.setProperty("kylin.env", "PROD");
+        config.setProperty(sparkPrefix + SPARK_PLUGINS, GLUTEN_PLUGIN + 
",org.apache.spark.kyuubi.KyuubiPlugin");
+        config.setProperty(sparkPrefix + "spark.gluten.enable", "true");
+        config.setProperty(sparkPrefix + "spark.master", "yarn");
+        config.setProperty(sparkPrefix + "spark.eventLog.enabled", "false");
+        val application = new SparkApplication() {
+            @Override
+            protected void doExecute() {
+                // do nothing
+            }
+        };
+        application.config = config;
+        assertWithGluten(application);
+
+        application.className = InternalTableLoadJob.class.getName();
+        assertWithGluten(application);
+
+        application.className = SegmentBuildJob.class.getName();
+        assertWithOutGluten(application);
+    }
+
+    private static void assertWithGluten(SparkApplication application) throws 
Exception {
+        val sparkConf = new SparkConf();
+        sparkConf.set("spark.master", "yarn");
+        sparkConf.set("spark.eventLog.enabled", "false");
+        application.exchangeSparkConf(sparkConf);
+        val atomicSparkConf = ((AtomicReference<SparkConf>) 
ReflectionTestUtils.getField(application,
+                "atomicSparkConf"));
+        val actalSparkConf = atomicSparkConf.get();
+        assertEquals(COLUMNAR_SHUFFLE_MANAGER, 
actalSparkConf.get(SPARK_SHUFFLE_MANAGER));
+        assertEquals("true", actalSparkConf.get("spark.gluten.enable"));
+        assertEquals(GLUTEN_PLUGIN + ",org.apache.spark.kyuubi.KyuubiPlugin", 
actalSparkConf.get(SPARK_PLUGINS));
+        assertEquals("yarn", actalSparkConf.get("spark.master"));
+        assertEquals("false", actalSparkConf.get("spark.eventLog.enabled"));
+    }
+
+    private static void assertWithOutGluten(SparkApplication application) 
throws Exception {
+        val sparkConf = new SparkConf();
+        sparkConf.set("spark.master", "yarn");
+        sparkConf.set("spark.eventLog.enabled", "false");
+        application.exchangeSparkConf(sparkConf);
+        val atomicSparkConf = ((AtomicReference<SparkConf>) 
ReflectionTestUtils.getField(application,
+                "atomicSparkConf"));
+        val actalSparkConf = atomicSparkConf.get();
+        assertFalse(Arrays.stream(actalSparkConf.getAll()).anyMatch(conf -> 
conf._1.contains("gluten")));
+        assertEquals("sort", actalSparkConf.get(SPARK_SHUFFLE_MANAGER));
+        assertEquals("org.apache.spark.kyuubi.KyuubiPlugin", 
actalSparkConf.get(SPARK_PLUGINS));
+    }
+
     @Test
     public void testUpdateJobErrorInfo() throws JsonProcessingException {
         val config = getTestConfig();
@@ -258,6 +327,7 @@ public class SparkApplicationTest extends 
NLocalWithSparkSessionTestBase {
         SparkApplication application = Mockito.spy(new SparkApplication() {
             @Override
             protected void doExecute() {
+                // do nothing
             }
         });
 
@@ -292,4 +362,42 @@ public class SparkApplicationTest extends 
NLocalWithSparkSessionTestBase {
         Mockito.verify(application.getReport(), 
Mockito.times(1)).updateSparkJobInfo(paramsMap, "/kylin/api/jobs/error",
                 json);
     }
+
+    @Test
+    public void reportSparkJobExtraInfo() {
+        overwriteSystemProp("kylin.env", "PROD");
+        overwriteSystemProp("kylin.engine.spark.cluster-manager-class-name",
+                MockClusterManager.class.getCanonicalName());
+        val appId = RandomUtil.randomUUIDStr();
+        val config = getTestConfig();
+        val sparkSession = Mockito.mock(SparkSession.class);
+        val sparkContext = Mockito.mock(SparkContext.class);
+        Mockito.when(sparkSession.sparkContext()).thenReturn(sparkContext);
+        Mockito.when(sparkContext.applicationId()).thenReturn(appId);
+        Mockito.when(sparkContext.conf()).thenReturn(new SparkConf());
+        val application = Mockito.spy(new SparkApplication() {
+            @Override
+            protected void doExecute() {
+                // only for test
+            }
+        });
+        MockJobProgressReport mockJobProgressReport = Mockito.spy(new 
MockJobProgressReport());
+        
Mockito.when(application.getReport()).thenReturn(mockJobProgressReport);
+        ReflectionTestUtils.setField(application, "config", config);
+        val atomicBuildEnv = new 
AtomicReference<KylinBuildEnv>(KylinBuildEnv.getOrCreate(config));
+        ReflectionTestUtils.setField(application, "atomicBuildEnv", 
atomicBuildEnv);
+        application.reportSparkJobExtraInfo(sparkSession);
+
+        val paramsMap = Maps.<String, String> newHashMap();
+        paramsMap.put(ParamsConstants.TIME_OUT, 
String.valueOf(config.getUpdateJobInfoTimeout()));
+        paramsMap.put(ParamsConstants.JOB_TMP_DIR, config.getJobTmpDir(null, 
true));
+        val json = Maps.<String, String> newHashMap();
+        json.put("queue_name", "default");
+        json.put("job_last_running_start_time", null);
+        json.put("cores", "0");
+        json.put("memory", "0");
+        json.put("yarn_app_id", appId);
+        Mockito.verify(application.getReport(), 
Mockito.times(1)).updateSparkJobExtraInfo(paramsMap,
+                "/kylin/api/jobs/spark", null, null, json);
+    }
 }
diff --git 
a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java
 
b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java
index d3898c481e..0dda19355e 100644
--- 
a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java
+++ 
b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java
@@ -397,7 +397,7 @@ public class StreamingJobLauncher extends 
AbstractSparkJobLauncher {
                 .setConf(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH, 
Paths.get(kylinJobJar).getFileName().toString())
                 .setConf(SPARK_DRIVER_OPTS, wrapDriverJavaOptions(sparkConf))
                 .setConf(SPARK_EXECUTOR_OPTS, 
wrapExecutorJavaOptions(sparkConf))
-                .setConf(SPARK_YARN_AM_OPTS, 
wrapYarnAmJavaOptions(sparkConf)).addJar(config.getKylinExtJarsPath())
+                .setConf(SPARK_YARN_AM_OPTS, 
wrapYarnAmJavaOptions(sparkConf)).addJar(config.getKylinExtJarsPath(true))
                 
.addFile(config.getLogSparkStreamingExecutorPropertiesFile()).setAppResource(kylinJobJar)
                 .setMainClass(mainClazz).addAppArgs(appArgs);
         handler = sparkLauncher.startApplication(listener);

Reply via email to