This is an automated email from the ASF dual-hosted git repository.
caolu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new 40ddcf8a30 KYLIN-6061 build enable gluten
40ddcf8a30 is described below
commit 40ddcf8a30aa1ce8259349c7984780e80dcee219
Author: jlf <[email protected]>
AuthorDate: Tue Dec 31 10:56:32 2024 +0800
KYLIN-6061 build enable gluten
1. Enable Gulten for Async query
2. Enable Gulten for Build Job
3. fix build stage execution progress exception
---
.../org/apache/kylin/common/KylinConfigBase.java | 12 ++
.../apache/kylin/common/KylinConfigBaseTest.java | 8 +-
.../spark/job/InternalTableLoadingJobTest.java | 1 +
.../apache/kylin/query/relnode/OlapContext.java | 24 ++--
.../apache/kylin/query/util/SchemaConverter.java | 3 +-
.../apache/kylin/rest/service/AsyncQueryJob.java | 8 +-
.../kylin/query/engine/AsyncQueryApplication.java | 17 +++
.../AsyncQueryApplicationWithMetadataTest.java | 142 +++++++++++++++++++
.../kylin/query/relnode/OlapContextTest.java | 36 ++++-
.../kylin/query/util/SchemaConverterTest.java | 13 ++
.../engine/spark/application/SparkApplication.java | 27 ++--
.../kylin/engine/spark/job/NSparkExecutable.java | 7 +-
.../kylin/engine/spark/job/SegmentBuildJob.java | 3 +-
.../apache/kylin/engine/spark/job/StepExec.scala | 9 +-
.../engine/spark/job/step/ResourceWaitStage.scala | 6 +-
.../kylin/engine/spark/job/step/StageExec.scala | 4 +-
.../spark/job/step/build/BuildStepExec.scala} | 16 +--
.../spark/application/SparkApplicationTest.java | 153 ++++++++++++++-------
.../engine/spark/job/NSparkExecutableTest.java | 14 ++
.../spark/job/step/ResourceWaitStageTest.scala | 27 ++++
20 files changed, 426 insertions(+), 104 deletions(-)
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index cec9eda198..735813be56 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3739,6 +3739,10 @@ public abstract class KylinConfigBase implements
Serializable {
return
Boolean.parseBoolean(getOptional("kylin.metrics.prometheus-enabled", TRUE));
}
+ public boolean getWaitResourceEnabled() {
+ return
Boolean.parseBoolean(getOptional("kylin.build.wait-resource.enabled", TRUE));
+ }
+
public boolean getCheckResourceEnabled() {
return
Boolean.parseBoolean(getOptional("kylin.build.resource.check-enabled", FALSE));
}
@@ -4412,6 +4416,14 @@ public abstract class KylinConfigBase implements
Serializable {
return
Boolean.parseBoolean(this.getOptional("kylin.query.index-use-gulten", TRUE));
}
+ public boolean buildUseGlutenEnabled() {
+ return
Boolean.parseBoolean(this.getOptional("kylin.engine.gluten.enabled", FALSE));
+ }
+
+ public boolean uniqueAsyncQueryUseGlutenEnabled() {
+ return
Boolean.parseBoolean(this.getOptional("kylin.unique-async-query.gluten.enabled",
FALSE));
+ }
+
public boolean queryUseGlutenEnabled() {
return
Boolean.parseBoolean(this.getOptional("kylin.storage.columnar.spark-conf.spark.gluten.enabled",
FALSE))
&&
this.getOptional("kylin.storage.columnar.spark-conf.spark.plugins",
"").contains("GlutenPlugin");
diff --git
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
index 6539d2d8f7..5bacdfc38e 100644
---
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
+++
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
@@ -915,10 +915,10 @@ class KylinConfigBaseTest {
map.put("decimalOperationsAllowPrecisionLoss", new PropertiesEntity(
"kylin.storage.columnar.spark-conf.spark.sql.decimalOperations.allowPrecisionLoss",
"true", true));
map.put("isSparkUIAclEnabled", new
PropertiesEntity("kylin.query.engine.spark-ui-acl.enabled", "", false));
- map.put("isInternalTablePreloadCacheEnabled",
- new
PropertiesEntity("kylin.internal-table.preloaded-cache.enabled", "true",
false));
- map.put("isIndexPreloadCacheEnabled",
- new PropertiesEntity("kylin.index.preloaded-cache.enabled",
"true", false));
+ map.put("getWaitResourceEnabled", new
PropertiesEntity("kylin.build.wait-resource.enabled", "", false));
+ map.put("buildUseGlutenEnabled", new
PropertiesEntity("kylin.engine.gluten.enabled", "", false));
+ map.put("uniqueAsyncQueryUseGlutenEnabled",
+ new
PropertiesEntity("kylin.unique-async-query.gluten.enabled", "", false));
map.put("getConcurrentRunningThresholdForGlutenCache",
new
PropertiesEntity("kylin.cache.gluten-cache-concurrent-running-threshold", "20",
20));
}
diff --git
a/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJobTest.java
b/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJobTest.java
index 8e1c9cb4fd..6422d2ff8b 100644
---
a/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJobTest.java
+++
b/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJobTest.java
@@ -126,6 +126,7 @@ class InternalTableLoadingJobTest extends AbstractTestCase {
internalTableLoadingJob.getTasks().forEach(task -> {
if (task instanceof InternalTableLoadingStep) {
Assertions.assertTrue(task.isInternalTableSparkJob());
+ Assertions.assertTrue(((InternalTableLoadingStep)
task).needRemoveGlutenParams(config));
} else {
Assertions.assertFalse(task.isInternalTableSparkJob());
}
diff --git
a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OlapContext.java
b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OlapContext.java
index 3aafa90102..cc3c7c6c5a 100644
---
a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OlapContext.java
+++
b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OlapContext.java
@@ -469,11 +469,13 @@ public class OlapContext {
if (tableDesc == null) {
return policy;
}
- if (olapConfig.isInternalTableEnabled() &&
tableDesc.isHasInternal() && !isAsyncQuery(olapConfig)) {
- logger.info("Hit internal table {}", factTable);
- policy = getSQLDigest().isDigestOfRawQuery()//
- ? NLookupCandidate.Policy.INTERNAL_TABLE
- : NLookupCandidate.Policy.AGG_THEN_INTERNAL_TABLE;
+ if (olapConfig.isInternalTableEnabled() &&
tableDesc.isHasInternal()) {
+ if (!isUniqueAsyncQuery(olapConfig) ||
asyncQueryUseGlutenEnabled(olapConfig)) {
+ logger.info("Hit internal table {}", factTable);
+ policy = getSQLDigest().isDigestOfRawQuery()//
+ ? NLookupCandidate.Policy.INTERNAL_TABLE
+ : NLookupCandidate.Policy.AGG_THEN_INTERNAL_TABLE;
+ }
} else if (!olapConfig.isInternalTableEnabled() &&
!StringUtils.isBlank(tableDesc.getLastSnapshotPath())) {
logger.info("Hit the snapshot {}, the path is: {}", factTable,
tableDesc.getLastSnapshotPath());
policy = getSQLDigest().isDigestOfRawQuery() //
@@ -484,10 +486,14 @@ public class OlapContext {
return policy;
}
- public boolean isAsyncQuery(KylinConfig olapConfig) {
+ public boolean isUniqueAsyncQuery(KylinConfig olapConfig) {
return QueryContext.current().getQueryTagInfo().isAsyncQuery() &&
olapConfig.isUniqueAsyncQueryYarnQueue();
}
+ public boolean asyncQueryUseGlutenEnabled(KylinConfig olapConfig) {
+ return isUniqueAsyncQuery(olapConfig) &&
olapConfig.uniqueAsyncQueryUseGlutenEnabled();
+ }
+
public String incapableMsg() {
StringBuilder buf = new StringBuilder("OlapContext");
if (incapableInfo.getReason() != null) {
@@ -653,10 +659,8 @@ public class OlapContext {
maxList[colId] =
convertToColumnDataType(rangeInfo.getMax(), dataType);
} else {
RelDataType sqlType = OlapTable.createSqlType(typeFactory,
c.getUpgradedType(), c.isNullable());
- minList[colId] =
SparderTypeUtil.convertToStringWithCalciteType(rangeInfo.getMin(), sqlType,
- false);
- maxList[colId] =
SparderTypeUtil.convertToStringWithCalciteType(rangeInfo.getMax(), sqlType,
- false);
+ minList[colId] =
SparderTypeUtil.convertToStringWithCalciteType(rangeInfo.getMin(), sqlType,
false);
+ maxList[colId] =
SparderTypeUtil.convertToStringWithCalciteType(rangeInfo.getMax(), sqlType,
false);
}
}
diff --git
a/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
b/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
index 0df7bc20b0..04860c8319 100644
---
a/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
+++
b/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
@@ -60,7 +60,8 @@ public class SchemaConverter implements IPushDownConverter {
log.debug("Pushdown tag is not found, skip it.");
return originSql;
}
- if (QueryContext.current().getQueryTagInfo().isAsyncQuery() &&
config.isUniqueAsyncQueryYarnQueue()) {
+ if (QueryContext.current().getQueryTagInfo().isAsyncQuery() &&
config.isUniqueAsyncQueryYarnQueue()
+ && !config.uniqueAsyncQueryUseGlutenEnabled()) {
log.debug("Async query, skip it");
return originSql;
}
diff --git
a/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryJob.java
b/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryJob.java
index e237b93cb6..4bb4b4df4e 100644
---
a/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryJob.java
+++
b/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryJob.java
@@ -148,7 +148,7 @@ public class AsyncQueryJob extends NSparkExecutable {
@Override
protected String getExtJar() {
- return getConfig().getKylinExtJarsPath(false);
+ return
getConfig().getKylinExtJarsPath(getConfig().uniqueAsyncQueryUseGlutenEnabled());
}
@Override
@@ -233,4 +233,10 @@ public class AsyncQueryJob extends NSparkExecutable {
props.put("kylin.internal-table-enabled", KylinConfig.FALSE);
props.remove("kylin.storage.columnar.spark-conf.spark.sql.catalog.INTERNAL_CATALOG");
}
+
+ @Override
+ public boolean needRemoveGlutenParams(KylinConfig config) {
+ // need to remove gulten params, return false
+ return config.buildUseGlutenEnabled();
+ }
}
diff --git
a/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java
b/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java
index a5e9b166c4..bc0db031ff 100644
---
a/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java
+++
b/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java
@@ -34,6 +34,7 @@ import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.engine.spark.application.SparkApplication;
import org.apache.kylin.engine.spark.scheduler.JobFailed;
+import org.apache.kylin.job.common.ExecutableUtil;
import org.apache.kylin.metadata.query.QueryHistorySql;
import org.apache.kylin.metadata.query.QueryHistorySqlParam;
import org.apache.kylin.metadata.query.QueryMetricsContext;
@@ -93,6 +94,14 @@ public class AsyncQueryApplication extends SparkApplication {
}
}
+ @Override
+ public void disableCurrentThreadGlutenIfNeed() {
+ if (!config.uniqueAsyncQueryUseGlutenEnabled()) {
+
ss.sparkContext().setLocalProperty("gluten.enabledForCurrentThread", "false");
+ logger.info("Disable current thread gluten for Async Query");
+ }
+ }
+
@Override
public void reportSparkJobExtraInfo(SparkSession sparkSession) {
// do nothing
@@ -118,6 +127,14 @@ public class AsyncQueryApplication extends
SparkApplication {
// do nothing
}
+ @Override
+ public Map<String, String> removeGlutenParamsIfNeed(Map<String, String>
baseSparkConf) {
+ if (!config.uniqueAsyncQueryUseGlutenEnabled()) {
+ return ExecutableUtil.removeGultenParams(baseSparkConf);
+ }
+ return baseSparkConf;
+ }
+
private void saveQueryHistory(QueryContext queryContext, QueryParams
queryParams) {
if (StringUtils.isEmpty(queryContext.getMetrics().getCorrectedSql())) {
queryContext.getMetrics().setCorrectedSql(queryContext.getUserSQL());
diff --git
a/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryApplicationWithMetadataTest.java
b/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryApplicationWithMetadataTest.java
new file mode 100644
index 0000000000..82b90252e5
--- /dev/null
+++
b/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryApplicationWithMetadataTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.engine;
+
+import static
org.apache.kylin.job.constant.ExecutableConstants.COLUMNAR_SHUFFLE_MANAGER;
+import static org.apache.kylin.job.constant.ExecutableConstants.GLUTEN_PLUGIN;
+import static org.apache.kylin.job.constant.ExecutableConstants.SPARK_PLUGINS;
+import static
org.apache.kylin.job.constant.ExecutableConstants.SPARK_SHUFFLE_MANAGER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.kylin.common.KylinConfigBase;
+import org.apache.kylin.common.util.TestUtils;
+import org.apache.kylin.engine.spark.application.SparkApplication;
+import org.apache.kylin.junit.annotation.MetadataInfo;
+import org.apache.kylin.metadata.streaming.ReflectionUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.Test;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import lombok.val;
+
+@MetadataInfo
+class AsyncQueryApplicationWithMetadataTest {
+
+ @Test
+ void testDisableCurrentThreadGlutenIfNeed() throws Exception {
+ val asyncQueryApplication = new AsyncQueryApplication();
+ val config = TestUtils.getTestConfig();
+ ReflectionUtils.setField(asyncQueryApplication, "config", config);
+
+ val sparkConf = new SparkConf();
+ sparkConf.set("spark.master", "local[111]");
+ assertFalseEnableForCurrentThread(asyncQueryApplication, sparkConf);
+
+ config.setProperty("kylin.unique-async-query.gluten.enabled",
KylinConfigBase.TRUE);
+ assertNullEnableForCurrentThread(asyncQueryApplication, sparkConf);
+
+ config.setProperty("kylin.unique-async-query.gluten.enabled",
KylinConfigBase.FALSE);
+ assertFalseEnableForCurrentThread(asyncQueryApplication, sparkConf);
+ }
+
+ private void assertNullEnableForCurrentThread(SparkApplication
application, SparkConf sparkConf) {
+ try (val sparkSession = new
SparkSession.Builder().config(sparkConf).getOrCreate();) {
+ ReflectionUtils.setField(application, "ss", sparkSession);
+
+ application.disableCurrentThreadGlutenIfNeed();
+
+ val ss = (SparkSession) ReflectionUtils.getField(application,
"ss");
+
assertNull(ss.sparkContext().getLocalProperty("gluten.enabledForCurrentThread"));
+ }
+ }
+
+ private void assertFalseEnableForCurrentThread(SparkApplication
application, SparkConf sparkConf) {
+ try (val sparkSession = new
SparkSession.Builder().config(sparkConf).getOrCreate();) {
+ ReflectionUtils.setField(application, "ss", sparkSession);
+
+ application.disableCurrentThreadGlutenIfNeed();
+
+ val ss = (SparkSession) ReflectionUtils.getField(application,
"ss");
+ assertEquals("false",
ss.sparkContext().getLocalProperty("gluten.enabledForCurrentThread"));
+ }
+ }
+
+ @Test
+ void testRemoveGlutenParamsIfNeed() throws Exception {
+ val asyncQueryApplication = new AsyncQueryApplication() {
+ @Override
+ protected void doExecute() {
+ // do nothing
+ }
+ };
+ val config = TestUtils.getTestConfig();
+ ReflectionUtils.setField(asyncQueryApplication, "config", config);
+ val sparkPrefix = "kylin.query.async-query.spark-conf.";
+ config.setProperty("kylin.env", "PROD");
+ config.setProperty(sparkPrefix + SPARK_PLUGINS, GLUTEN_PLUGIN +
",org.apache.spark.kyuubi.KyuubiPlugin");
+ config.setProperty(sparkPrefix + "spark.gluten.enable", "true");
+ config.setProperty(sparkPrefix + "spark.master", "yarn");
+ config.setProperty(sparkPrefix + "spark.eventLog.enabled", "false");
+ config.setProperty(sparkPrefix + SPARK_SHUFFLE_MANAGER,
COLUMNAR_SHUFFLE_MANAGER);
+ config.setProperty("kylin.engine.gluten.enabled", "true");
+
+ assertWithOutGluten(asyncQueryApplication);
+
+ config.setProperty("kylin.unique-async-query.gluten.enabled",
KylinConfigBase.TRUE);
+ assertWithGluten(asyncQueryApplication);
+
+ config.setProperty("kylin.unique-async-query.gluten.enabled",
KylinConfigBase.FALSE);
+ assertWithOutGluten(asyncQueryApplication);
+
+ }
+
+ private static void assertWithGluten(SparkApplication application) throws
Exception {
+ val sparkConf = new SparkConf();
+ sparkConf.set("spark.master", "yarn");
+ sparkConf.set("spark.eventLog.enabled", "false");
+ application.exchangeSparkConf(sparkConf);
+ val atomicSparkConf = ((AtomicReference<SparkConf>)
ReflectionTestUtils.getField(application,
+ "atomicSparkConf"));
+ val actalSparkConf = atomicSparkConf.get();
+ assertEquals(COLUMNAR_SHUFFLE_MANAGER,
actalSparkConf.get(SPARK_SHUFFLE_MANAGER));
+ assertEquals("true", actalSparkConf.get("spark.gluten.enable"));
+ assertEquals(GLUTEN_PLUGIN + ",org.apache.spark.kyuubi.KyuubiPlugin",
actalSparkConf.get(SPARK_PLUGINS));
+ assertEquals("yarn", actalSparkConf.get("spark.master"));
+ assertEquals("false", actalSparkConf.get("spark.eventLog.enabled"));
+ }
+
+ private static void assertWithOutGluten(SparkApplication application)
throws Exception {
+ val sparkConf = new SparkConf();
+ sparkConf.set("spark.master", "yarn");
+ sparkConf.set("spark.eventLog.enabled", "false");
+ application.exchangeSparkConf(sparkConf);
+ val atomicSparkConf = ((AtomicReference<SparkConf>)
ReflectionTestUtils.getField(application,
+ "atomicSparkConf"));
+ val actalSparkConf = atomicSparkConf.get();
+ assertFalse(Arrays.stream(actalSparkConf.getAll()).anyMatch(conf ->
conf._1.contains("gluten")));
+ assertEquals("sort", actalSparkConf.get(SPARK_SHUFFLE_MANAGER));
+ assertEquals("org.apache.spark.kyuubi.KyuubiPlugin",
actalSparkConf.get(SPARK_PLUGINS));
+ }
+}
diff --git
a/src/query/src/test/java/org/apache/kylin/query/relnode/OlapContextTest.java
b/src/query/src/test/java/org/apache/kylin/query/relnode/OlapContextTest.java
index 5dbeb0cbc6..0558902d7b 100644
---
a/src/query/src/test/java/org/apache/kylin/query/relnode/OlapContextTest.java
+++
b/src/query/src/test/java/org/apache/kylin/query/relnode/OlapContextTest.java
@@ -35,20 +35,48 @@ class OlapContextTest {
}
@Test
- void isAsyncQuery() {
+ void isUniqueAsyncQuery() {
OlapContext mock = new OlapContext(1);
val config = Mockito.mock(KylinConfig.class);
Mockito.when(config.isUniqueAsyncQueryYarnQueue()).thenReturn(true);
QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
- boolean asyncQuery = mock.isAsyncQuery(config);
+ boolean asyncQuery = mock.isUniqueAsyncQuery(config);
Assertions.assertTrue(asyncQuery);
Mockito.when(config.isUniqueAsyncQueryYarnQueue()).thenReturn(false);
- asyncQuery = mock.isAsyncQuery(config);
+ asyncQuery = mock.isUniqueAsyncQuery(config);
Assertions.assertFalse(asyncQuery);
QueryContext.current().getQueryTagInfo().setAsyncQuery(false);
- asyncQuery = mock.isAsyncQuery(config);
+ Mockito.when(config.isUniqueAsyncQueryYarnQueue()).thenReturn(true);
+ asyncQuery = mock.isUniqueAsyncQuery(config);
+ Assertions.assertFalse(asyncQuery);
+ }
+
+ @Test
+ void asyncQueryUseGlutenEnabled() {
+ OlapContext mock = new OlapContext(1);
+ val config = Mockito.mock(KylinConfig.class);
+ Mockito.when(config.isUniqueAsyncQueryYarnQueue()).thenReturn(true);
+
Mockito.when(config.uniqueAsyncQueryUseGlutenEnabled()).thenReturn(true);
+ QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
+ boolean asyncQuery = mock.asyncQueryUseGlutenEnabled(config);
+ Assertions.assertTrue(asyncQuery);
+
+ Mockito.when(config.isUniqueAsyncQueryYarnQueue()).thenReturn(false);
+ asyncQuery = mock.asyncQueryUseGlutenEnabled(config);
+ Assertions.assertFalse(asyncQuery);
+
+
Mockito.when(config.uniqueAsyncQueryUseGlutenEnabled()).thenReturn(false);
+ asyncQuery = mock.asyncQueryUseGlutenEnabled(config);
+ Assertions.assertFalse(asyncQuery);
+
+ Mockito.when(config.isUniqueAsyncQueryYarnQueue()).thenReturn(false);
+ asyncQuery = mock.asyncQueryUseGlutenEnabled(config);
+ Assertions.assertFalse(asyncQuery);
+
+ QueryContext.current().getQueryTagInfo().setAsyncQuery(false);
+ asyncQuery = mock.asyncQueryUseGlutenEnabled(config);
Assertions.assertFalse(asyncQuery);
}
}
diff --git
a/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
b/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
index 612233e66e..b47cf429d9 100644
---
a/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
+++
b/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
@@ -58,8 +58,15 @@ public class SchemaConverterTest {
QueryContext.current().getQueryTagInfo().setPushdown(true);
+ checkAsyncQuery(sql, expectedSql);
+
+ Assertions.assertEquals(expectedSql, converter.convert(sql, "default",
null));
+ }
+
+ private void checkAsyncQuery(String sql, String expectedSql) {
QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
getTestConfig().setProperty("kylin.query.unique-async-query-yarn-queue-enabled",
"true");
+ getTestConfig().setProperty("kylin.unique-async-query.gluten.enabled",
"false");
Assertions.assertEquals(sql, converter.convert(sql, "default", null));
QueryContext.current().getQueryTagInfo().setAsyncQuery(false);
@@ -68,9 +75,15 @@ public class SchemaConverterTest {
getTestConfig().setProperty("kylin.query.unique-async-query-yarn-queue-enabled",
"false");
Assertions.assertEquals(expectedSql, converter.convert(sql, "default",
null));
+ getTestConfig().setProperty("kylin.unique-async-query.gluten.enabled",
"true");
+ Assertions.assertEquals(expectedSql, converter.convert(sql, "default",
null));
+
QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
+
getTestConfig().setProperty("kylin.query.unique-async-query-yarn-queue-enabled",
"false");
Assertions.assertEquals(expectedSql, converter.convert(sql, "default",
null));
+
getTestConfig().setProperty("kylin.query.unique-async-query-yarn-queue-enabled",
"true");
+ getTestConfig().setProperty("kylin.unique-async-query.gluten.enabled",
"true");
Assertions.assertEquals(expectedSql, converter.convert(sql, "default",
null));
}
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 7e03bbfc6b..1fcfdb97c8 100644
---
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -342,10 +342,7 @@ public abstract class SparkApplication implements
Application {
Unsafe.setProperty("kylin.env", config.getDeployEnv());
}
- if (className != null &&
!className.equals(InternalTableLoadJob.class.getName())) {
-
ss.sparkContext().setLocalProperty("gluten.enabledForCurrentThread", "false");
- logger.info("Disable gluten for normal build");
- }
+ disableCurrentThreadGlutenIfNeed();
logger.info("Start job");
infos.startJob();
@@ -372,6 +369,13 @@ public abstract class SparkApplication implements
Application {
}
}
+ public void disableCurrentThreadGlutenIfNeed() {
+ if (!config.buildUseGlutenEnabled() &&
!className.equals(InternalTableLoadJob.class.getName())) {
+
ss.sparkContext().setLocalProperty("gluten.enabledForCurrentThread", "false");
+ logger.info("Disable current thread gluten for Build Job");
+ }
+ }
+
protected void handleException(Exception e) throws Exception {
if (e instanceof AccessControlException) {
interceptAccessControlException(e);
@@ -649,14 +653,17 @@ public abstract class SparkApplication implements
Application {
}
}
- @VisibleForTesting
- void exchangeSparkConf(SparkConf sparkConf) throws Exception {
+ public Map<String, String> removeGlutenParamsIfNeed(Map<String, String>
baseSparkConf) {
+ if (!config.buildUseGlutenEnabled() &&
!className.equals(InternalTableLoadJob.class.getName())) {
+ return ExecutableUtil.removeGultenParams(baseSparkConf);
+ }
+ return baseSparkConf;
+ }
+
+ public void exchangeSparkConf(SparkConf sparkConf) throws Exception {
if (isJobOnCluster(sparkConf) && !(this instanceof ResourceDetect)) {
Map<String, String> baseSparkConf = getSparkConfigOverride(config);
- if (className != null &&
!className.equals(InternalTableLoadJob.class.getName())) {
- baseSparkConf =
ExecutableUtil.removeGultenParams(baseSparkConf);
- }
-
+ baseSparkConf = removeGlutenParamsIfNeed(baseSparkConf);
if (!baseSparkConf.isEmpty()) {
baseSparkConf.forEach(sparkConf::set);
String baseSparkConfStr =
JsonUtil.writeValueAsString(baseSparkConf);
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 93b5f40258..a1d3cf4d77 100644
---
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -481,12 +481,17 @@ public class NSparkExecutable extends AbstractExecutable
implements ChainedStage
if (UserGroupInformation.isSecurityEnabled()) {
confMap.put("spark.hadoop.hive.metastore.sasl.enabled", "true");
}
- if (!isInternalTableSparkJob()) {
+ if (!needRemoveGlutenParams(config)) {
return ExecutableUtil.removeGultenParams(confMap);
}
return confMap;
}
+ public boolean needRemoveGlutenParams(KylinConfig config) {
+ // need to remove gulten params, return false
+ return isInternalTableSparkJob() || config.buildUseGlutenEnabled();
+ }
+
private ExecuteResult runLocalMode(String appArgs) {
try {
Class<?> appClz = ClassUtil.forName(getSparkSubmitClassName(),
Object.class);
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/SegmentBuildJob.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/SegmentBuildJob.java
index 2af01a3f18..83db0d919b 100644
---
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/SegmentBuildJob.java
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/SegmentBuildJob.java
@@ -44,6 +44,7 @@ import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.engine.spark.builder.SnapshotBuilder;
import org.apache.kylin.engine.spark.job.step.ParamPropagation;
import org.apache.kylin.engine.spark.job.step.StageExec;
+import org.apache.kylin.engine.spark.job.step.build.BuildStepExec;
import org.apache.kylin.fileseg.FileSegments;
import org.apache.kylin.guava30.shaded.common.base.Throwables;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
@@ -115,7 +116,7 @@ public class SegmentBuildJob extends SegmentJob {
infos.clearCuboidsNumPerLayer(segment.getId());
val stepId = StringUtils.replace(infos.getJobStepId(),
JOB_NAME_PREFIX, "");
- val step = new StepExec(stepId);
+ val step = new BuildStepExec(stepId);
final ParamPropagation params = new ParamPropagation();
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/StepExec.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/StepExec.scala
index 5e7fb62f8e..2ae949001c 100644
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/StepExec.scala
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/StepExec.scala
@@ -18,13 +18,12 @@
package org.apache.kylin.engine.spark.job
-import java.io.IOException
-import java.util
-import java.util.Locale
-
import org.apache.kylin.engine.spark.job.step.StageExec
import org.apache.spark.internal.Logging
+import java.io.IOException
+import java.util
+import java.util.Locale
import scala.collection.JavaConverters._
class StepExec(stepId: String) extends Logging {
@@ -35,7 +34,7 @@ class StepExec(stepId: String) extends Logging {
}
def addStage(stage: StageExec): Unit = {
- val stageId = subStageList.size + 1
+ val stageId = getStageId()
stage.setStageId(getStepId + "_" + String.format(Locale.ROOT, "%02d",
Integer.valueOf(stageId)))
this.subStageList.add(stage)
}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStage.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStage.scala
index 7c93796593..6d3e985802 100644
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStage.scala
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStage.scala
@@ -39,8 +39,12 @@ class ResourceWaitStage(jobContext: SparkApplication)
extends StageExec {
jobStepId + "_00"
}
+ def needCheckResource: Boolean = {
+ jobContext.isJobOnCluster(sparkConf) && config.getWaitResourceEnabled
+ }
+
override def execute(): Unit = {
- if (jobContext.isJobOnCluster(sparkConf)) {
+ if (needCheckResource) {
val sleepSeconds = (Math.random * 60L).toLong
logInfo(s"Sleep $sleepSeconds seconds to avoid submitting too many spark
job at the same time.")
KylinBuildEnv.get().buildJobInfos.startWait()
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/StageExec.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/StageExec.scala
index 2e6c5c4af9..19b2a805ce 100644
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/StageExec.scala
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/StageExec.scala
@@ -46,7 +46,7 @@ trait StageExec extends Logging {
}
def onStageStart(): Unit = {
- if(getJobContext.isSkipFollowingStages(getSegmentId)){
+ if (getJobContext.isSkipFollowingStages(getSegmentId)) {
return
}
updateStageInfo(ExecutableState.RUNNING.toString, null, null)
@@ -100,7 +100,7 @@ trait StageExec extends Logging {
onStageStart()
var state: ExecutableState = ExecutableState.SUCCEED
try {
- if(getJobContext.isSkipFollowingStages(getSegmentId)){
+ if (getJobContext.isSkipFollowingStages(getSegmentId)) {
state = ExecutableState.SKIP
return
}
diff --git
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStageTest.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/build/BuildStepExec.scala
similarity index 62%
copy from
src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStageTest.scala
copy to
src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/build/BuildStepExec.scala
index bd0dd4116c..fd058610a6 100644
---
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStageTest.scala
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/build/BuildStepExec.scala
@@ -16,19 +16,13 @@
* limitations under the License.
*/
-package org.apache.kylin.engine.spark.job.step
+package org.apache.kylin.engine.spark.job.step.build
-import org.apache.kylin.engine.spark.application.SparkApplication
-import org.junit.Assert
-import org.mockito.Mockito
-import org.scalatest.funsuite.AnyFunSuite
+import org.apache.kylin.engine.spark.job.StepExec
-class ResourceWaitStageTest extends AnyFunSuite {
+class BuildStepExec(id: String) extends StepExec(id) {
- test("test ResourceWaitStage getStageName") {
- val sparkApplication = Mockito.mock(classOf[SparkApplication])
-
- val wfr = new ResourceWaitStage(sparkApplication)
- Assert.assertEquals("ResourceWaitStage", wfr.getStageName)
+ override def getStageId(): Int = {
+ subStageList.size + 2
}
}
diff --git
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
index e27af8fdc7..9d3296aeec 100644
---
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
+++
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
@@ -24,6 +24,7 @@ import static
org.apache.kylin.job.constant.ExecutableConstants.SPARK_PLUGINS;
import static
org.apache.kylin.job.constant.ExecutableConstants.SPARK_SHUFFLE_MANAGER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
@@ -267,59 +268,6 @@ public class SparkApplicationTest extends
NLocalWithSparkSessionTestBase {
application.exchangeSparkConf(sparkConf);
}
- @Test
- public void testNotInternalTableLoadJobRemoveGluten() throws Exception {
- val sparkPrefix = "kylin.engine.spark-conf.";
- val config = getTestConfig();
- config.setProperty("kylin.env", "PROD");
- config.setProperty(sparkPrefix + SPARK_PLUGINS, GLUTEN_PLUGIN +
",org.apache.spark.kyuubi.KyuubiPlugin");
- config.setProperty(sparkPrefix + "spark.gluten.enable", "true");
- config.setProperty(sparkPrefix + "spark.master", "yarn");
- config.setProperty(sparkPrefix + "spark.eventLog.enabled", "false");
- val application = new SparkApplication() {
- @Override
- protected void doExecute() {
- // do nothing
- }
- };
- application.config = config;
- assertWithGluten(application);
-
- application.className = InternalTableLoadJob.class.getName();
- assertWithGluten(application);
-
- application.className = SegmentBuildJob.class.getName();
- assertWithOutGluten(application);
- }
-
- private static void assertWithGluten(SparkApplication application) throws
Exception {
- val sparkConf = new SparkConf();
- sparkConf.set("spark.master", "yarn");
- sparkConf.set("spark.eventLog.enabled", "false");
- application.exchangeSparkConf(sparkConf);
- val atomicSparkConf = ((AtomicReference<SparkConf>)
ReflectionTestUtils.getField(application,
- "atomicSparkConf"));
- val actalSparkConf = atomicSparkConf.get();
- assertEquals(COLUMNAR_SHUFFLE_MANAGER,
actalSparkConf.get(SPARK_SHUFFLE_MANAGER));
- assertEquals("true", actalSparkConf.get("spark.gluten.enable"));
- assertEquals(GLUTEN_PLUGIN + ",org.apache.spark.kyuubi.KyuubiPlugin",
actalSparkConf.get(SPARK_PLUGINS));
- assertEquals("yarn", actalSparkConf.get("spark.master"));
- assertEquals("false", actalSparkConf.get("spark.eventLog.enabled"));
- }
-
- private static void assertWithOutGluten(SparkApplication application)
throws Exception {
- val sparkConf = new SparkConf();
- sparkConf.set("spark.master", "yarn");
- sparkConf.set("spark.eventLog.enabled", "false");
- application.exchangeSparkConf(sparkConf);
- val atomicSparkConf = ((AtomicReference<SparkConf>)
ReflectionTestUtils.getField(application,
- "atomicSparkConf"));
- val actalSparkConf = atomicSparkConf.get();
- assertFalse(Arrays.stream(actalSparkConf.getAll()).anyMatch(conf ->
conf._1.contains("gluten")));
- assertEquals("sort", actalSparkConf.get(SPARK_SHUFFLE_MANAGER));
- assertEquals("org.apache.spark.kyuubi.KyuubiPlugin",
actalSparkConf.get(SPARK_PLUGINS));
- }
-
@Test
public void testUpdateJobErrorInfo() throws JsonProcessingException {
val config = getTestConfig();
@@ -400,4 +348,103 @@ public class SparkApplicationTest extends
NLocalWithSparkSessionTestBase {
Mockito.verify(application.getReport(),
Mockito.times(1)).updateSparkJobExtraInfo(paramsMap,
"/kylin/api/jobs/spark", null, null, json);
}
+
+ @Test
+ public void testRemoveGlutenParamsIfNeed() throws Exception {
+ val sparkPrefix = "kylin.engine.spark-conf.";
+ val config = getTestConfig();
+ config.setProperty("kylin.env", "PROD");
+ config.setProperty(sparkPrefix + SPARK_PLUGINS, GLUTEN_PLUGIN +
",org.apache.spark.kyuubi.KyuubiPlugin");
+ config.setProperty(sparkPrefix + "spark.gluten.enable", "true");
+ config.setProperty(sparkPrefix + "spark.master", "yarn");
+ config.setProperty(sparkPrefix + "spark.eventLog.enabled", "false");
+ config.setProperty("kylin.engine.gluten.enabled", "true");
+ val application = new SparkApplication() {
+ @Override
+ protected void doExecute() {
+ // do nothing
+ }
+ };
+ application.className = SegmentBuildJob.class.getName();
+ application.config = config;
+ assertWithGluten(application);
+
+ config.setProperty("kylin.engine.gluten.enabled", "false");
+ assertWithOutGluten(application);
+
+ application.className = InternalTableLoadJob.class.getName();
+ assertWithGluten(application);
+
+ application.className = SegmentBuildJob.class.getName();
+ assertWithOutGluten(application);
+ }
+
+ private static void assertWithGluten(SparkApplication application) throws
Exception {
+ val sparkConf = new SparkConf();
+ sparkConf.set("spark.master", "yarn");
+ sparkConf.set("spark.eventLog.enabled", "false");
+ application.exchangeSparkConf(sparkConf);
+ val atomicSparkConf = ((AtomicReference<SparkConf>)
ReflectionTestUtils.getField(application,
+ "atomicSparkConf"));
+ val actalSparkConf = atomicSparkConf.get();
+ assertEquals(COLUMNAR_SHUFFLE_MANAGER,
actalSparkConf.get(SPARK_SHUFFLE_MANAGER));
+ assertEquals("true", actalSparkConf.get("spark.gluten.enable"));
+ assertEquals(GLUTEN_PLUGIN + ",org.apache.spark.kyuubi.KyuubiPlugin",
actalSparkConf.get(SPARK_PLUGINS));
+ assertEquals("yarn", actalSparkConf.get("spark.master"));
+ assertEquals("false", actalSparkConf.get("spark.eventLog.enabled"));
+ }
+
+ private static void assertWithOutGluten(SparkApplication application)
throws Exception {
+ val sparkConf = new SparkConf();
+ sparkConf.set("spark.master", "yarn");
+ sparkConf.set("spark.eventLog.enabled", "false");
+ application.exchangeSparkConf(sparkConf);
+ val atomicSparkConf = ((AtomicReference<SparkConf>)
ReflectionTestUtils.getField(application,
+ "atomicSparkConf"));
+ val actalSparkConf = atomicSparkConf.get();
+ assertFalse(Arrays.stream(actalSparkConf.getAll()).anyMatch(conf ->
conf._1.contains("gluten")));
+ assertEquals("sort", actalSparkConf.get(SPARK_SHUFFLE_MANAGER));
+ assertEquals("org.apache.spark.kyuubi.KyuubiPlugin",
actalSparkConf.get(SPARK_PLUGINS));
+ }
+
+ @Test
+ public void testDisableCurrentThreadGlutenIfNeed() throws Exception {
+ val config = getTestConfig();
+ config.setProperty("kylin.engine.gluten.enabled", "true");
+ val application = new SparkApplication() {
+ @Override
+ protected void doExecute() {
+ // do nothing
+ }
+ };
+ application.className = SegmentBuildJob.class.getName();
+ application.config = config;
+ val sparkConf = new SparkConf();
+ sparkConf.set("spark.master", "local[111]");
+ assertNullEnableForCurrentThread(application, sparkConf);
+
+ config.setProperty("kylin.engine.gluten.enabled", "false");
+ assertFalseEnableForCurrentThread(application, sparkConf);
+
+ application.className = InternalTableLoadJob.class.getName();
+ assertNullEnableForCurrentThread(application, sparkConf);
+ }
+
+ private void assertNullEnableForCurrentThread(SparkApplication
application, SparkConf sparkConf) {
+ try (val sparkSession = new
SparkSession.Builder().config(sparkConf).getOrCreate();) {
+ application.ss = sparkSession;
+
+ application.disableCurrentThreadGlutenIfNeed();
+
assertNull(ss.sparkContext().getLocalProperty("gluten.enabledForCurrentThread"));
+ }
+ }
+
+ private void assertFalseEnableForCurrentThread(SparkApplication
application, SparkConf sparkConf) {
+ try (val sparkSession = new
SparkSession.Builder().config(sparkConf).getOrCreate();) {
+ application.ss = sparkSession;
+
+ application.disableCurrentThreadGlutenIfNeed();
+ assertEquals("false",
application.ss.sparkContext().getLocalProperty("gluten.enabledForCurrentThread"));
+ }
+ }
}
diff --git
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkExecutableTest.java
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkExecutableTest.java
index dfb897247c..ce369a610a 100644
---
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkExecutableTest.java
+++
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkExecutableTest.java
@@ -23,6 +23,7 @@ import java.util.Random;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigBase;
import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.common.util.RandomUtil;
@@ -258,4 +259,17 @@ public class NSparkExecutableTest extends
NLocalFileMetadataTestCase {
kylinConfigExt = sparkExecutable.getKylinConfigExt(kylinConfig,
"default");
Assert.assertEquals("123",
kylinConfigExt.getOptional("kylin.engine.spark-conf.test", null));
}
+
+ @Test
+ public void testNeedRemoveGlutenParams() {
+ KylinConfig kylinConfig =
KylinConfig.createKylinConfig(getTestConfig());
+ NSparkExecutable sparkExecutable = new NSparkExecutable();
+
+
Assert.assertFalse(sparkExecutable.needRemoveGlutenParams(kylinConfig));
+
+ kylinConfig.setProperty("kylin.engine.gluten.enabled",
KylinConfigBase.TRUE);
+ Assert.assertTrue(sparkExecutable.needRemoveGlutenParams(kylinConfig));
+ kylinConfig.setProperty("kylin.engine.gluten.enabled",
KylinConfigBase.FALSE);
+
+ }
}
diff --git
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStageTest.scala
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStageTest.scala
index bd0dd4116c..d96dfd8723 100644
---
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStageTest.scala
+++
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/step/ResourceWaitStageTest.scala
@@ -18,7 +18,10 @@
package org.apache.kylin.engine.spark.job.step
+import org.apache.kylin.common.KylinConfig
import org.apache.kylin.engine.spark.application.SparkApplication
+import org.apache.kylin.metadata.streaming.ReflectionUtils
+import org.apache.spark.SparkConf
import org.junit.Assert
import org.mockito.Mockito
import org.scalatest.funsuite.AnyFunSuite
@@ -31,4 +34,28 @@ class ResourceWaitStageTest extends AnyFunSuite {
val wfr = new ResourceWaitStage(sparkApplication)
Assert.assertEquals("ResourceWaitStage", wfr.getStageName)
}
+
+ test("test ResourceWaitStage needCheckResource") {
+ val sparkApplication = Mockito.mock(classOf[SparkApplication])
+ val sparkConf = Mockito.mock(classOf[SparkConf])
+ val kylinConfig = Mockito.mock(classOf[KylinConfig])
+
+ val wfr = new ResourceWaitStage(sparkApplication)
+ ReflectionUtils.setField(wfr, "sparkConf", sparkConf)
+ ReflectionUtils.setField(wfr, "config", kylinConfig)
+
+ Mockito.when(sparkApplication.isJobOnCluster(sparkConf)).thenReturn(true)
+ Mockito.when(kylinConfig.getWaitResourceEnabled).thenReturn(true)
+ Assert.assertTrue(wfr.needCheckResource)
+
+ Mockito.when(sparkApplication.isJobOnCluster(sparkConf)).thenReturn(false)
+ Assert.assertFalse(wfr.needCheckResource)
+
+ Mockito.when(sparkApplication.isJobOnCluster(sparkConf)).thenReturn(true)
+ Mockito.when(kylinConfig.getWaitResourceEnabled).thenReturn(false)
+ Assert.assertFalse(wfr.needCheckResource)
+
+ Mockito.when(sparkApplication.isJobOnCluster(sparkConf)).thenReturn(false)
+ Assert.assertFalse(wfr.needCheckResource)
+ }
}