This is an automated email from the ASF dual-hosted git repository.
jlfsdtc pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new 5082123921 KYLIN-6010 Only InternalTableLoadingJob use gluten
5082123921 is described below
commit 5082123921f2ef5f3a25dabc3adb4e0dd2226ccf
Author: jlf <[email protected]>
AuthorDate: Sat Oct 12 15:57:03 2024 +0800
KYLIN-6010 Only InternalTableLoadingJob use gluten
---
.../org/apache/kylin/common/KylinConfigBase.java | 5 +-
.../apache/kylin/common/KylinConfigBaseTest.java | 27 +++++
.../apache/kylin/job/common/ExecutableUtil.java | 29 +++++
.../kylin/job/constant/ExecutableConstants.java | 6 +
.../kylin/job/execution/AbstractExecutable.java | 4 +
.../kylin/job/common/ExecutableUtilTest.java | 46 +++++++
.../spark/job/InternalTableLoadingJobTest.java | 134 +++++++++++++++++++++
.../apache/kylin/query/relnode/OlapContext.java | 9 +-
.../apache/kylin/query/util/SchemaConverter.java | 4 +
.../apache/kylin/rest/service/AsyncQueryJob.java | 7 +-
.../kylin/query/engine/AsyncQueryApplication.java | 6 +
.../kylin/query/relnode/OlapContextTest.java | 54 +++++++++
.../kylin/query/util/SchemaConverterTest.java | 29 +++--
.../engine/spark/application/SparkApplication.java | 22 +++-
.../engine/spark/job/InternalTableLoadingStep.java | 4 +
.../kylin/job/execution/NSparkExecutable.java | 7 +-
.../spark/application/SparkApplicationTest.java | 132 ++++++++++++++++++--
.../streaming/jobs/impl/StreamingJobLauncher.java | 2 +-
18 files changed, 495 insertions(+), 32 deletions(-)
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index cb03a4ca63..0ad06127dc 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1564,7 +1564,7 @@ public abstract class KylinConfigBase implements
Serializable {
return
SizeConvertUtil.byteStringAs(getOptional("kylin.streaming.custom-jar-size",
"20mb"), ByteUnit.BYTE);
}
- public String getKylinExtJarsPath() {
+ public String getKylinExtJarsPath(Boolean withGluten) {
String kylinHome = getKylinHome();
if (StringUtils.isEmpty(kylinHome)) {
return "";
@@ -1575,6 +1575,9 @@ public abstract class KylinConfigBase implements
Serializable {
}
StringBuilder extJar = new StringBuilder();
for (File file : files) {
+ if (!withGluten && StringUtils.containsIgnoreCase(file.getName(),
"gluten")) {
+ continue;
+ }
extJar.append(",");
extJar.append(file.getAbsolutePath());
}
diff --git
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
index 3d995ae7af..3ef0e75bc6 100644
---
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
+++
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
@@ -44,6 +44,7 @@ import static
org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_SOURC
import static
org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_SOURCE_NAME_KEY;
import static org.apache.kylin.common.constant.Constants.SNAPSHOT_AUTO_REFRESH;
+import java.io.File;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -60,6 +61,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Shell;
@@ -1522,6 +1524,31 @@ class KylinConfigBaseTest {
Assertions.assertEquals(3003, map.get(9).get(2));
Assertions.assertEquals(5002, map.get(13).get(1));
}
+
+ @Test
+ public void getKylinExtJarsPath() throws Exception {
+ val config = KylinConfig.getInstanceFromEnv();
+ val kylinHome = KylinConfig.getKylinHome();
+ val libExtDir = new File(kylinHome + File.separator + "lib/ext");
+ FileUtils.forceMkdir(libExtDir);
+ val glutenJar = new File(libExtDir, "gluten.jar");
+ FileUtils.write(glutenJar, "gluten jar");
+ val celebornJar = new File(libExtDir, "celeborn-client-spark-3.jar");
+ FileUtils.write(celebornJar, "celeborn client spark3 jar");
+ val mysqlJar = new File(libExtDir, "mysqlJar.jar");
+ FileUtils.write(mysqlJar, "mysqlJar jar");
+ val glutenCelebornJar = new File(libExtDir,
"gluten-celeborn-clickhouse-jar-with-dependencies.jar");
+ FileUtils.write(glutenCelebornJar, "gluten celeborn jar");
+
+ val withGluten = config.getKylinExtJarsPath(true);
+ val withGlutenExpected = "," + glutenJar.getAbsolutePath() + "," +
celebornJar.getAbsolutePath() + ","
+ + mysqlJar.getAbsolutePath() + "," +
glutenCelebornJar.getAbsolutePath();
+ Assertions.assertEquals(withGlutenExpected, withGluten);
+
+ val withoutGluten = config.getKylinExtJarsPath(false);
+ val withoutExpected = "," + celebornJar.getAbsolutePath() + "," +
mysqlJar.getAbsolutePath();
+ Assertions.assertEquals(withoutExpected, withoutGluten);
+ }
}
class EnvironmentUpdateUtils {
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/common/ExecutableUtil.java
b/src/core-job/src/main/java/org/apache/kylin/job/common/ExecutableUtil.java
index 824ea19fa2..29c33ee0c9 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/common/ExecutableUtil.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/common/ExecutableUtil.java
@@ -20,8 +20,15 @@ package org.apache.kylin.job.common;
import static
org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_INDEX_FAIL;
import static
org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_MULTI_PARTITION_EMPTY;
+import static
org.apache.kylin.job.constant.ExecutableConstants.COLUMNAR_SHUFFLE_MANAGER;
+import static org.apache.kylin.job.constant.ExecutableConstants.GLUTEN_PLUGIN;
+import static org.apache.kylin.job.constant.ExecutableConstants.GLUTEN_PREFIX;
+import static org.apache.kylin.job.constant.ExecutableConstants.SPARK_PLUGINS;
+import static
org.apache.kylin.job.constant.ExecutableConstants.SPARK_SHUFFLE_MANAGER;
+import java.util.Arrays;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -29,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.code.ErrorCodeProducer;
@@ -151,4 +159,25 @@ public abstract class ExecutableUtil {
*/
public void computePartitions(JobParam jobParam) {
}
+
+ public static Map<String, String> removeGultenParams(Map<String, String>
params) {
+ params.computeIfPresent(SPARK_PLUGINS, (pluginKey, pluginValue) -> {
+ String tempPluginValue = pluginValue;
+ String comma = ",";
+ if (StringUtils.contains(pluginValue, GLUTEN_PLUGIN)) {
+ tempPluginValue = Arrays.stream(tempPluginValue.split(comma))
+ .filter(p -> !StringUtils.equals(p,
GLUTEN_PLUGIN)).collect(Collectors.joining(comma));
+ }
+ return tempPluginValue;
+ });
+ params.computeIfPresent(SPARK_SHUFFLE_MANAGER, (pluginKey,
pluginValue) -> {
+ String tempPluginValue = pluginValue;
+ if (StringUtils.equals(pluginValue, COLUMNAR_SHUFFLE_MANAGER)) {
+ tempPluginValue = "sort";
+ }
+ return tempPluginValue;
+ });
+ return params.entrySet().stream().filter(e ->
!e.getKey().startsWith(GLUTEN_PREFIX))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ }
}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
b/src/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 29ddd7cc95..eaf115e9ee 100644
---
a/src/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++
b/src/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -25,6 +25,12 @@ public final class ExecutableConstants {
private ExecutableConstants() {
}
+ public static final String SPARK_PLUGINS = "spark.plugins";
+ public static final String GLUTEN_PREFIX = "spark.gluten.";
+ public static final String GLUTEN_PLUGIN =
"org.apache.gluten.GlutenPlugin";
+ public static final String COLUMNAR_SHUFFLE_MANAGER =
"org.apache.spark.shuffle.sort.ColumnarShuffleManager";
+ public static final String SPARK_SHUFFLE_MANAGER = "spark.shuffle.manager";
+
public static final String YARN_APP_ID = "yarn_application_id";
public static final String YARN_APP_IDS = "yarn_application_ids";
public static final String YARN_APP_IDS_DELIMITER = ",";
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
b/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 2bdaf17928..cf11a5d502 100644
---
a/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++
b/src/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -958,4 +958,8 @@ public abstract class AbstractExecutable extends
AbstractJobExecutable implement
return true;
});
}
+
+ public boolean isInternalTableSparkJob() {
+ return false;
+ }
}
diff --git
a/src/core-job/src/test/java/org/apache/kylin/job/common/ExecutableUtilTest.java
b/src/core-job/src/test/java/org/apache/kylin/job/common/ExecutableUtilTest.java
index 129564e59a..de116146ab 100644
---
a/src/core-job/src/test/java/org/apache/kylin/job/common/ExecutableUtilTest.java
+++
b/src/core-job/src/test/java/org/apache/kylin/job/common/ExecutableUtilTest.java
@@ -18,6 +18,10 @@
package org.apache.kylin.job.common;
+import static
org.apache.kylin.job.constant.ExecutableConstants.COLUMNAR_SHUFFLE_MANAGER;
+import static org.apache.kylin.job.constant.ExecutableConstants.GLUTEN_PLUGIN;
+import static org.apache.kylin.job.constant.ExecutableConstants.SPARK_PLUGINS;
+import static
org.apache.kylin.job.constant.ExecutableConstants.SPARK_SHUFFLE_MANAGER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@@ -26,9 +30,13 @@ import static org.mockito.Mockito.when;
import java.util.Collections;
import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.job.model.JobParam;
import org.junit.Test;
+import lombok.val;
+import lombok.var;
+
public class ExecutableUtilTest {
@Test
@@ -45,4 +53,42 @@ public class ExecutableUtilTest {
e.toString());
}
}
+
+ @Test
+ public void removeGultenParams() {
+ val requestMap = Maps.<String, String> newHashMap();
+ requestMap.put(SPARK_PLUGINS, GLUTEN_PLUGIN);
+ var params = ExecutableUtil.removeGultenParams(requestMap);
+ assertEquals(1, params.size());
+ assertEquals("", params.get(SPARK_PLUGINS));
+
+ requestMap.put(SPARK_PLUGINS,
+ GLUTEN_PLUGIN +
",org.apache.gluten.GlutenPlugin,org.apache.spark.kyuubi.KyuubiPlugin");
+ params = ExecutableUtil.removeGultenParams(requestMap);
+ assertEquals(1, params.size());
+ assertEquals("org.apache.spark.kyuubi.KyuubiPlugin",
params.get(SPARK_PLUGINS));
+
+ requestMap.put("spark.gluten.enable", "true");
+ params = ExecutableUtil.removeGultenParams(requestMap);
+ assertEquals(1, params.size());
+ assertEquals("org.apache.spark.kyuubi.KyuubiPlugin",
params.get(SPARK_PLUGINS));
+
+ requestMap.put(SPARK_SHUFFLE_MANAGER,
"org.apache.spark.shuffle.sort.SortShuffleManager");
+ params = ExecutableUtil.removeGultenParams(requestMap);
+ assertEquals(2, params.size());
+ assertEquals("org.apache.spark.kyuubi.KyuubiPlugin",
params.get(SPARK_PLUGINS));
+ assertEquals("org.apache.spark.shuffle.sort.SortShuffleManager",
params.get(SPARK_SHUFFLE_MANAGER));
+
+ requestMap.put(SPARK_SHUFFLE_MANAGER, "sort");
+ params = ExecutableUtil.removeGultenParams(requestMap);
+ assertEquals(2, params.size());
+ assertEquals("org.apache.spark.kyuubi.KyuubiPlugin",
params.get(SPARK_PLUGINS));
+ assertEquals("sort", params.get(SPARK_SHUFFLE_MANAGER));
+
+ requestMap.put(SPARK_SHUFFLE_MANAGER, COLUMNAR_SHUFFLE_MANAGER);
+ params = ExecutableUtil.removeGultenParams(requestMap);
+ assertEquals(2, params.size());
+ assertEquals("org.apache.spark.kyuubi.KyuubiPlugin",
params.get(SPARK_PLUGINS));
+ assertEquals("sort", params.get(SPARK_SHUFFLE_MANAGER));
+ }
}
diff --git
a/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJobTest.java
b/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJobTest.java
new file mode 100644
index 0000000000..156d24faa1
--- /dev/null
+++
b/src/data-loading-service/src/test/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJobTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.job;
+
+import static org.apache.kylin.job.execution.JobTypeEnum.INTERNAL_TABLE_BUILD;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.common.AbstractTestCase;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.spark.NLocalWithSparkSessionTestBase;
+import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils;
+import org.apache.kylin.job.handler.InternalTableJobHandler;
+import org.apache.kylin.job.model.JobParam;
+import org.apache.kylin.job.service.InternalTableLoadingService;
+import org.apache.kylin.junit.annotation.MetadataInfo;
+import org.apache.kylin.metadata.cube.model.NBatchConstants;
+import org.apache.kylin.metadata.model.NTableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.streaming.ReflectionUtils;
+import org.apache.kylin.metadata.table.InternalTableDesc;
+import org.apache.kylin.metadata.table.InternalTableManager;
+import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.service.InternalTableService;
+import org.apache.kylin.rest.service.TableService;
+import org.apache.kylin.rest.util.AclEvaluate;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.Spy;
+import org.springframework.security.authentication.TestingAuthenticationToken;
+import org.springframework.security.core.context.SecurityContextHolder;
+
+import lombok.val;
+
+@MetadataInfo
+class InternalTableLoadingJobTest extends AbstractTestCase {
+ protected static final String PROJECT = "default";
+ protected static final String TABLE_INDENTITY = "DEFAULT.TEST_KYLIN_FACT";
+ protected static final String DATE_COL = "CAL_DT";
+
+ @Mock
+ protected AclEvaluate aclEvaluate = Mockito.spy(AclEvaluate.class);
+ @Spy
+ protected InternalTableLoadingService internalTableLoadingService =
Mockito.spy(new InternalTableLoadingService());
+ @InjectMocks
+ protected InternalTableService internalTableService = Mockito.spy(new
InternalTableService());
+
+ @InjectMocks
+ protected TableService tableService = mock(TableService.class);
+
+ @BeforeAll
+ public static void beforeClass() {
+ NLocalWithSparkSessionTestBase.beforeClass();
+ }
+
+ @AfterAll
+ public static void afterClass() {
+ NLocalWithSparkSessionTestBase.afterClass();
+ }
+
+ @BeforeEach
+ void setUp() throws Exception {
+ MockitoAnnotations.openMocks(this);
+ SecurityContextHolder.getContext()
+ .setAuthentication(new TestingAuthenticationToken("ADMIN",
"ADMIN", Constant.ROLE_ADMIN));
+ SparkJobFactoryUtils.initJobFactory();
+ overwriteSystemProp("kylin.source.provider.9",
"org.apache.kylin.engine.spark.mockup.CsvSource");
+ ReflectionUtils.setField(internalTableService, "aclEvaluate",
aclEvaluate);
+ ReflectionUtils.setField(internalTableService,
"internalTableLoadingService", internalTableLoadingService);
+ }
+
+ protected InternalTableDesc getInternalTableDesc(KylinConfig config)
throws Exception {
+ NTableMetadataManager tManager =
NTableMetadataManager.getInstance(config, PROJECT);
+ InternalTableManager internalTableManager =
InternalTableManager.getInstance(config, PROJECT);
+ TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
+ String[] partitionCols = new String[] { DATE_COL };
+ Map<String, String> tblProperties = new HashMap<>();
+ val datePartitionFormat = "yyyy-MM-dd";
+ when(tableService.getPartitionColumnFormat(any(), any(), any(),
any())).thenReturn(datePartitionFormat);
+ internalTableService.createInternalTable(PROJECT, table.getName(),
table.getDatabase(), partitionCols,
+ "yyyy-MM-dd", tblProperties,
InternalTableDesc.StorageType.PARQUET.name());
+ InternalTableDesc internalTable =
internalTableManager.getInternalTableDesc(TABLE_INDENTITY);
+ Assertions.assertNotNull(internalTable);
+ return internalTable;
+ }
+
+ @Test
+ void isInternalTableSparkJob() throws Exception {
+ val config = KylinConfig.getInstanceFromEnv();
+ val internalTable = getInternalTableDesc(config);
+ val jobParam = new
JobParam().withProject(PROJECT).withTable(internalTable.getIdentity()).withYarnQueue(null)
+ .withJobTypeEnum(INTERNAL_TABLE_BUILD).withOwner("UT")
+ .addExtParams(NBatchConstants.P_INCREMENTAL_BUILD,
String.valueOf(false))
+ .addExtParams(NBatchConstants.P_OUTPUT_MODE,
String.valueOf(false))
+ .addExtParams(NBatchConstants.P_START_DATE,
"").addExtParams(NBatchConstants.P_END_DATE, "");
+ val internalTableJobParam = new
InternalTableJobHandler.InternalTableJobBuildParam(jobParam);
+ val internalTableLoadingJob =
InternalTableLoadingJob.create(internalTableJobParam);
+
Assertions.assertFalse(internalTableLoadingJob.isInternalTableSparkJob());
+ internalTableLoadingJob.getTasks().forEach(task -> {
+ if (task instanceof InternalTableLoadingStep) {
+ Assertions.assertTrue(task.isInternalTableSparkJob());
+ } else {
+ Assertions.assertFalse(task.isInternalTableSparkJob());
+ }
+ });
+ }
+}
diff --git
a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OlapContext.java
b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OlapContext.java
index 1e0ef2e8f9..04b241223c 100644
---
a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OlapContext.java
+++
b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OlapContext.java
@@ -38,6 +38,7 @@ import org.apache.calcite.rex.RexNode;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContext;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
@@ -76,7 +77,7 @@ public class OlapContext {
private static final Logger logger =
LoggerFactory.getLogger(OlapContext.class);
public static final String PRM_ACCEPT_PARTIAL_RESULT =
"AcceptPartialResult";
- public static final Set<String> UNSUPPORTED_FUNCTION_IN_LOOKUP = new
HashSet<>(
+ public static final HashSet<String> UNSUPPORTED_FUNCTION_IN_LOOKUP = new
HashSet<>(
Collections.singleton(FunctionDesc.FUNC_INTERSECT_COUNT));
private final int id;
@@ -464,7 +465,7 @@ public class OlapContext {
if (tableDesc == null) {
return policy;
}
- if (olapConfig.isInternalTableEnabled() &&
tableDesc.isHasInternal()) {
+ if (olapConfig.isInternalTableEnabled() &&
tableDesc.isHasInternal() && !isAsyncQuery(olapConfig)) {
logger.info("Hit internal table {}", factTable);
policy = getSQLDigest().isDigestOfRawQuery()//
? NLookupCandidate.Policy.INTERNAL_TABLE
@@ -479,6 +480,10 @@ public class OlapContext {
return policy;
}
+ public boolean isAsyncQuery(KylinConfig olapConfig) {
+ return QueryContext.current().getQueryTagInfo().isAsyncQuery() &&
olapConfig.isUniqueAsyncQueryYarnQueue();
+ }
+
public String incapableMsg() {
StringBuilder buf = new StringBuilder("OlapContext");
if (incapableInfo.getReason() != null) {
diff --git
a/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
b/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
index dbbb0b4291..f2f429850b 100644
---
a/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
+++
b/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
@@ -55,6 +55,10 @@ public class SchemaConverter implements IPushDownConverter {
log.debug("Pushdown tag is not found, skip it.");
return originSql;
}
+ if (QueryContext.current().getQueryTagInfo().isAsyncQuery() &&
config.isUniqueAsyncQueryYarnQueue()) {
+ log.debug("Async query, skip it");
+ return originSql;
+ }
try {
String transformedSql = transform(originSql, project, config);
QueryContext.current().setPushdownEngine("GLUTEN");
diff --git
a/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryJob.java
b/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryJob.java
index b62c7bd8e6..f7699318ce 100644
---
a/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryJob.java
+++
b/src/query-service/src/main/java/org/apache/kylin/rest/service/AsyncQueryJob.java
@@ -61,6 +61,7 @@ import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.engine.spark.job.DefaultSparkBuildJobHandler;
import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.job.common.ExecutableUtil;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.job.execution.JobTypeEnum;
@@ -137,7 +138,7 @@ public class AsyncQueryJob extends NSparkExecutable {
if (UserGroupInformation.isSecurityEnabled()) {
overrides.put("spark.hadoop.hive.metastore.sasl.enabled", "true");
}
- return overrides;
+ return ExecutableUtil.removeGultenParams(overrides);
}
@Override
@@ -147,7 +148,7 @@ public class AsyncQueryJob extends NSparkExecutable {
@Override
protected String getExtJar() {
- return getConfig().getKylinExtJarsPath();
+ return getConfig().getKylinExtJarsPath(false);
}
@Override
@@ -229,5 +230,7 @@ public class AsyncQueryJob extends NSparkExecutable {
if (!KylinInfoExtension.getFactory().checkKylinInfo()) {
props.setProperty("kylin.streaming.enabled", KylinConfig.FALSE);
}
+ props.put("kylin.internal-table-enabled", KylinConfig.FALSE);
+
props.remove("kylin.storage.columnar.spark-conf.spark.sql.catalog.INTERNAL_CATALOG");
}
}
diff --git
a/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java
b/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java
index b1cf0a6571..e76a2d2b3d 100644
---
a/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java
+++
b/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java
@@ -42,6 +42,7 @@ import org.apache.kylin.metadata.query.util.QueryHistoryUtil;
import org.apache.kylin.query.util.AsyncQueryUtil;
import org.apache.kylin.query.util.QueryParams;
import org.apache.spark.sql.KapFunctions;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.udf.UdfManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,6 +93,11 @@ public class AsyncQueryApplication extends SparkApplication {
}
}
+ @Override
+ public void reportSparkJobExtraInfo(SparkSession sparkSession) {
+ // do nothing
+ }
+
@Override
protected Map<String, String> getSparkConfigOverride(KylinConfig config) {
return config.getAsyncQuerySparkConfigOverride();
diff --git
a/src/query/src/test/java/org/apache/kylin/query/relnode/OlapContextTest.java
b/src/query/src/test/java/org/apache/kylin/query/relnode/OlapContextTest.java
new file mode 100644
index 0000000000..5dbeb0cbc6
--- /dev/null
+++
b/src/query/src/test/java/org/apache/kylin/query/relnode/OlapContextTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.relnode;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import lombok.val;
+
+class OlapContextTest {
+
+ @AfterEach
+ void tearDown() {
+ QueryContext.current().close();
+ }
+
+ @Test
+ void isAsyncQuery() {
+ OlapContext mock = new OlapContext(1);
+ val config = Mockito.mock(KylinConfig.class);
+ Mockito.when(config.isUniqueAsyncQueryYarnQueue()).thenReturn(true);
+ QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
+ boolean asyncQuery = mock.isAsyncQuery(config);
+ Assertions.assertTrue(asyncQuery);
+
+ Mockito.when(config.isUniqueAsyncQueryYarnQueue()).thenReturn(false);
+ asyncQuery = mock.isAsyncQuery(config);
+ Assertions.assertFalse(asyncQuery);
+
+ QueryContext.current().getQueryTagInfo().setAsyncQuery(false);
+ asyncQuery = mock.isAsyncQuery(config);
+ Assertions.assertFalse(asyncQuery);
+ }
+}
diff --git
a/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
b/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
index 3ae2e79f35..647f33244a 100644
---
a/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
+++
b/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
@@ -35,6 +35,15 @@ public class SchemaConverterTest {
@Test
void testCatalogConvert() {
+ String sql = "select t1.id t_id from SSB.P_LINEORDER t1 left join
\"SSB\".\"PART\" on t1.PARTKEY = PART.PARTKEY "
+ + "union all select * from SSB.LINEORDER union all select *
from \"DEFAULT\".TEST_COUNTRY t";
+
+ String expectedSql = "select t1.id t_id from
\"INTERNAL_CATALOG\".\"default\".\"SSB\".\"P_LINEORDER\" t1 "
+ + "left join \"INTERNAL_CATALOG\".\"default\".\"SSB\".\"PART\"
on t1.PARTKEY = PART.PARTKEY "
+ + "union all select * from
\"INTERNAL_CATALOG\".\"default\".\"SSB\".\"LINEORDER\" "
+ + "union all select * from
\"INTERNAL_CATALOG\".\"default\".\"DEFAULT\".\"TEST_COUNTRY\" t";
+ Assertions.assertEquals(sql, converter.convert(sql, "default", null));
+
getTestConfig().setProperty("kylin.internal-table-enabled", "true");
InternalTableManager innerTableMgr =
InternalTableManager.getInstance(getTestConfig(), "default");
@@ -43,17 +52,23 @@ public class SchemaConverterTest {
innerTableMgr.createInternalTable(new
InternalTableDesc(tableDesc));
}
- String sql = "select t1.id t_id from SSB.P_LINEORDER t1 left join
\"SSB\".\"PART\" on t1.PARTKEY = PART.PARTKEY "
- + "union all select * from SSB.LINEORDER union all select *
from \"DEFAULT\".TEST_COUNTRY t";
+ Assertions.assertEquals(sql, converter.convert(sql, "default", null));
- String expectedSql = "select t1.id t_id from
\"INTERNAL_CATALOG\".\"default\".\"SSB\".\"P_LINEORDER\" t1 "
- + "left join \"INTERNAL_CATALOG\".\"default\".\"SSB\".\"PART\"
on t1.PARTKEY = PART.PARTKEY "
- + "union all select * from
\"INTERNAL_CATALOG\".\"default\".\"SSB\".\"LINEORDER\" "
- + "union all select * from
\"INTERNAL_CATALOG\".\"default\".\"DEFAULT\".\"TEST_COUNTRY\" t";
+ QueryContext.current().getQueryTagInfo().setPushdown(true);
+ QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
+
getTestConfig().setProperty("kylin.query.unique-async-query-yarn-queue-enabled",
"true");
Assertions.assertEquals(sql, converter.convert(sql, "default", null));
- QueryContext.current().getQueryTagInfo().setPushdown(true);
+ QueryContext.current().getQueryTagInfo().setAsyncQuery(false);
+ Assertions.assertEquals(expectedSql, converter.convert(sql, "default",
null));
+
+
getTestConfig().setProperty("kylin.query.unique-async-query-yarn-queue-enabled",
"false");
+ Assertions.assertEquals(expectedSql, converter.convert(sql, "default",
null));
+
+ QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
+ Assertions.assertEquals(expectedSql, converter.convert(sql, "default",
null));
+
Assertions.assertEquals(expectedSql, converter.convert(sql, "default",
null));
}
}
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index b0ccbbf592..3d082c68d3 100644
---
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -79,6 +79,7 @@ import org.apache.kylin.engine.spark.utils.HDFSUtils;
import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
import org.apache.kylin.engine.spark.utils.SparkConfHelper;
import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
+import org.apache.kylin.job.common.ExecutableUtil;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.NSparkExecutable;
@@ -130,6 +131,7 @@ public abstract class SparkApplication implements
Application {
protected String project;
protected int layoutSize = -1;
protected BuildJobInfos infos;
+ protected String className;
protected ConcurrentHashMap<String, Boolean> skipFollowingStagesMap = new
ConcurrentHashMap<>();
/**
@@ -286,6 +288,7 @@ public abstract class SparkApplication implements
Application {
String hdfsMetalUrl = getParam(NBatchConstants.P_DIST_META_URL);
jobId = getParam(NBatchConstants.P_JOB_ID);
project = getParam(NBatchConstants.P_PROJECT_NAME);
+ className = getParam(NBatchConstants.P_CLASS_NAME);
if (getParam(NBatchConstants.P_LAYOUT_IDS) != null) {
layoutSize =
StringUtils.split(getParam(NBatchConstants.P_LAYOUT_IDS), ",").length;
}
@@ -339,7 +342,6 @@ public abstract class SparkApplication implements
Application {
Unsafe.setProperty("kylin.env", config.getDeployEnv());
}
- String className = getParam(NBatchConstants.P_CLASS_NAME);
if (className != null &&
!className.equals(InternalTableLoadJob.class.getName())) {
ss.sparkContext().setLocalProperty("gluten.enabledForCurrentThread", "false");
logger.info("Disable gluten for normal build");
@@ -651,6 +653,10 @@ public abstract class SparkApplication implements
Application {
void exchangeSparkConf(SparkConf sparkConf) throws Exception {
if (isJobOnCluster(sparkConf) && !(this instanceof ResourceDetect)) {
Map<String, String> baseSparkConf = getSparkConfigOverride(config);
+ if (className != null &&
!className.equals(InternalTableLoadJob.class.getName())) {
+ baseSparkConf =
ExecutableUtil.removeGultenParams(baseSparkConf);
+ }
+
if (!baseSparkConf.isEmpty()) {
baseSparkConf.forEach(sparkConf::set);
String baseSparkConfStr =
JsonUtil.writeValueAsString(baseSparkConf);
@@ -687,11 +693,7 @@ public abstract class SparkApplication implements
Application {
}
sparkSession = createSpark(sparkConf);
- if (!config.isUTEnv()) {
- Map<String, String> extraInfo = getTrackingInfo(sparkSession,
config.isTrackingUrlIpAddressEnabled());
- extraInfo.put("job_last_running_start_time",
getParam(JOB_LAST_RUNNING_START_TIME));
- getReport().updateSparkJobExtraInfo(getReportParams(),
"/kylin/api/jobs/spark", project, jobId, extraInfo);
- }
+ reportSparkJobExtraInfo(sparkSession);
// for spark metrics
JobMetricsUtils.registerListener(sparkSession);
@@ -705,6 +707,14 @@ public abstract class SparkApplication implements
Application {
atomicSparkSession.set(sparkSession);
}
+ public void reportSparkJobExtraInfo(SparkSession sparkSession) {
+ if (!config.isUTEnv()) {
+ Map<String, String> extraInfo = getTrackingInfo(sparkSession,
config.isTrackingUrlIpAddressEnabled());
+ extraInfo.put("job_last_running_start_time",
getParam(JOB_LAST_RUNNING_START_TIME));
+ getReport().updateSparkJobExtraInfo(getReportParams(),
"/kylin/api/jobs/spark", project, jobId, extraInfo);
+ }
+ }
+
private void prepareSparkSession() throws NoRetryException {
SparkConf sparkConf = atomicSparkConf.get();
if (Objects.isNull(sparkConf)) {
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingStep.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingStep.java
index 3a84a01356..b9b718e8eb 100644
---
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingStep.java
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingStep.java
@@ -91,4 +91,8 @@ public class InternalTableLoadingStep extends
NSparkExecutable {
return result;
}
+ @Override
+ public boolean isInternalTableSparkJob() {
+ return true;
+ }
}
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/job/execution/NSparkExecutable.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/job/execution/NSparkExecutable.java
index 9a541a67dc..4f23fa9b05 100644
---
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/job/execution/NSparkExecutable.java
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/job/execution/NSparkExecutable.java
@@ -18,6 +18,8 @@
package org.apache.kylin.job.execution;
+import static org.apache.kylin.job.constant.ExecutableConstants.SPARK_PLUGINS;
+
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
@@ -60,6 +62,7 @@ import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.job.JobContext;
+import org.apache.kylin.job.common.ExecutableUtil;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.exception.JobStoppedException;
import org.apache.kylin.job.util.JobContextUtil;
@@ -118,7 +121,6 @@ public class NSparkExecutable extends AbstractExecutable
implements ChainedStage
public static final String JOB_LAST_RUNNING_START_TIME =
"jobLastRunningStartTime";
- protected static final String SPARK_PLUGINS = "spark.plugins";
protected ISparkJobHandler sparkJobHandler;
private final transient List<StageBase> stages =
Lists.newCopyOnWriteArrayList();
@@ -474,6 +476,9 @@ public class NSparkExecutable extends AbstractExecutable
implements ChainedStage
if (UserGroupInformation.isSecurityEnabled()) {
confMap.put("spark.hadoop.hive.metastore.sasl.enabled", "true");
}
+ if (!isInternalTableSparkJob()) {
+ return ExecutableUtil.removeGultenParams(confMap);
+ }
return confMap;
}
diff --git
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
index c37b1b6ccf..e27af8fdc7 100644
---
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
+++
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
@@ -18,12 +18,22 @@
package org.apache.kylin.engine.spark.application;
+import static
org.apache.kylin.job.constant.ExecutableConstants.COLUMNAR_SHUFFLE_MANAGER;
+import static org.apache.kylin.job.constant.ExecutableConstants.GLUTEN_PLUGIN;
+import static org.apache.kylin.job.constant.ExecutableConstants.SPARK_PLUGINS;
+import static
org.apache.kylin.job.constant.ExecutableConstants.SPARK_SHUFFLE_MANAGER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -31,12 +41,15 @@ import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.engine.spark.NLocalWithSparkSessionTestBase;
import org.apache.kylin.engine.spark.job.BuildJobInfos;
+import org.apache.kylin.engine.spark.job.InternalTableLoadJob;
import org.apache.kylin.engine.spark.job.KylinBuildEnv;
import org.apache.kylin.engine.spark.job.MockJobProgressReport;
import org.apache.kylin.engine.spark.job.ParamsConstants;
import org.apache.kylin.engine.spark.job.RestfulJobProgressReport;
+import org.apache.kylin.engine.spark.job.SegmentBuildJob;
import org.apache.kylin.engine.spark.scheduler.JobFailed;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
@@ -49,9 +62,11 @@ import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableRef;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.application.MockClusterManager;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockedStatic;
@@ -64,7 +79,6 @@ import lombok.val;
public class SparkApplicationTest extends NLocalWithSparkSessionTestBase {
-
File tempDir = new File("./temp/");
File file1 = new File(tempDir, "temp1_" + ResourceDetectUtils.fileName());
File file2 = new File(tempDir, "temp2_" + ResourceDetectUtils.fileName());
@@ -98,7 +112,7 @@ public class SparkApplicationTest extends
NLocalWithSparkSessionTestBase {
map2.put("1", 200L);
ResourceDetectUtils.write(new Path(file2.getAbsolutePath()), map2);
- Assert.assertEquals("300b", application.chooseContentSize(new
Path(tempDir.getAbsolutePath())));
+ assertEquals("300b", application.chooseContentSize(new
Path(tempDir.getAbsolutePath())));
}
@Test
@@ -134,7 +148,7 @@ public class SparkApplicationTest extends
NLocalWithSparkSessionTestBase {
params.put(ParamsConstants.JOB_TMP_DIR,
getTestConfig().getJobTmpDir("test_job_output", true));
Mockito.doReturn(Boolean.TRUE).when(report).updateSparkJobInfo(params,
"/kylin/api/jobs/spark", payloadJson);
- Assert.assertTrue(report.updateSparkJobExtraInfo(params,
"/kylin/api/jobs/spark", "test_job_output",
+ assertTrue(report.updateSparkJobExtraInfo(params,
"/kylin/api/jobs/spark", "test_job_output",
"cb91189b-2b12-4527-aa35-0130e7d54ec0", extraInfo));
Mockito.verify(report).updateSparkJobInfo(params,
"/kylin/api/jobs/spark", payloadJson);
@@ -144,7 +158,7 @@ public class SparkApplicationTest extends
NLocalWithSparkSessionTestBase {
Mockito.doReturn("http://sandbox.hortonworks.com:8088/proxy/application_1561370224051_0160/").when(application)
.getTrackingUrl(null, ss);
Mockito.doReturn(Boolean.FALSE).when(report).updateSparkJobInfo(params,
"/kylin/api/jobs/spark", payloadJson);
- Assert.assertFalse(report.updateSparkJobExtraInfo(params,
"/kylin/api/jobs/spark", "test_job_output",
+ assertFalse(report.updateSparkJobExtraInfo(params,
"/kylin/api/jobs/spark", "test_job_output",
"cb91189b-2b12-4527-aa35-0130e7d54ec0", extraInfo));
Mockito.verify(report, Mockito.times(3)).updateSparkJobInfo(params,
"/kylin/api/jobs/spark", payloadJson);
@@ -180,7 +194,7 @@ public class SparkApplicationTest extends
NLocalWithSparkSessionTestBase {
Mockito.when(sparkApplication.checkRangePartitionTableIsExist(Mockito.any())).thenCallRealMethod();
tableRefs.add(tableRef);
nDataModel.setAllTableRefs(tableRefs);
-
Assert.assertFalse(sparkApplication.checkRangePartitionTableIsExist(nDataModel));
+
assertFalse(sparkApplication.checkRangePartitionTableIsExist(nDataModel));
NDataModel nDataModel2 = new NDataModel();
nDataModel2.setUuid(UUID.randomUUID().toString());
@@ -196,7 +210,7 @@ public class SparkApplicationTest extends
NLocalWithSparkSessionTestBase {
tableRefs.clear();
tableRefs.add(tableRef);
nDataModel2.setAllTableRefs(tableRefs);
-
Assert.assertTrue(sparkApplication.checkRangePartitionTableIsExist(nDataModel2));
+
assertTrue(sparkApplication.checkRangePartitionTableIsExist(nDataModel2));
}
@Test
@@ -206,15 +220,16 @@ public class SparkApplicationTest extends
NLocalWithSparkSessionTestBase {
SparkApplication application = new SparkApplication() {
@Override
protected void doExecute() {
+ // do nothing
}
};
File upload = new File(path);
FileUtils.forceMkdir(upload);
- Assert.assertTrue(upload.exists());
+ assertTrue(upload.exists());
config.setProperty(config.getKubernetesUploadPathKey(), path);
ReflectionTestUtils.setField(application, "config", config);
application.extraDestroy();
- Assert.assertFalse(upload.exists());
+ assertFalse(upload.exists());
}
@Test
@@ -223,6 +238,7 @@ public class SparkApplicationTest extends
NLocalWithSparkSessionTestBase {
SparkApplication application = new SparkApplication() {
@Override
protected void doExecute() {
+ // do nothing
}
};
application.config = config;
@@ -240,17 +256,70 @@ public class SparkApplicationTest extends
NLocalWithSparkSessionTestBase {
sparkConf.set("spark.eventLog.enabled", "false");
sparkConf.set("spark.eventLog.dir", notExistedLogDir.toString());
application.exchangeSparkConf(sparkConf);
- assert !fs.exists(notExistedLogDir);
+ assertFalse(fs.exists(notExistedLogDir));
sparkConf.set("spark.eventLog.enabled", "true");
application.exchangeSparkConf(sparkConf);
- assert fs.exists(notExistedLogDir);
+ assertTrue(fs.exists(notExistedLogDir));
sparkConf.set("spark.eventLog.dir", existedLogDir.toString());
application.exchangeSparkConf(sparkConf);
- assert fs.exists(existedLogDir);
+ assertTrue(fs.exists(existedLogDir));
sparkConf.set("spark.eventLog.dir", "");
application.exchangeSparkConf(sparkConf);
}
+ @Test
+ public void testNotInternalTableLoadJobRemoveGluten() throws Exception {
+ val sparkPrefix = "kylin.engine.spark-conf.";
+ val config = getTestConfig();
+ config.setProperty("kylin.env", "PROD");
+ config.setProperty(sparkPrefix + SPARK_PLUGINS, GLUTEN_PLUGIN +
",org.apache.spark.kyuubi.KyuubiPlugin");
+ config.setProperty(sparkPrefix + "spark.gluten.enable", "true");
+ config.setProperty(sparkPrefix + "spark.master", "yarn");
+ config.setProperty(sparkPrefix + "spark.eventLog.enabled", "false");
+ val application = new SparkApplication() {
+ @Override
+ protected void doExecute() {
+ // do nothing
+ }
+ };
+ application.config = config;
+ assertWithGluten(application);
+
+ application.className = InternalTableLoadJob.class.getName();
+ assertWithGluten(application);
+
+ application.className = SegmentBuildJob.class.getName();
+ assertWithOutGluten(application);
+ }
+
+ private static void assertWithGluten(SparkApplication application) throws
Exception {
+ val sparkConf = new SparkConf();
+ sparkConf.set("spark.master", "yarn");
+ sparkConf.set("spark.eventLog.enabled", "false");
+ application.exchangeSparkConf(sparkConf);
+ val atomicSparkConf = ((AtomicReference<SparkConf>)
ReflectionTestUtils.getField(application,
+ "atomicSparkConf"));
+ val actalSparkConf = atomicSparkConf.get();
+ assertEquals(COLUMNAR_SHUFFLE_MANAGER,
actalSparkConf.get(SPARK_SHUFFLE_MANAGER));
+ assertEquals("true", actalSparkConf.get("spark.gluten.enable"));
+ assertEquals(GLUTEN_PLUGIN + ",org.apache.spark.kyuubi.KyuubiPlugin",
actalSparkConf.get(SPARK_PLUGINS));
+ assertEquals("yarn", actalSparkConf.get("spark.master"));
+ assertEquals("false", actalSparkConf.get("spark.eventLog.enabled"));
+ }
+
+ private static void assertWithOutGluten(SparkApplication application)
throws Exception {
+ val sparkConf = new SparkConf();
+ sparkConf.set("spark.master", "yarn");
+ sparkConf.set("spark.eventLog.enabled", "false");
+ application.exchangeSparkConf(sparkConf);
+ val atomicSparkConf = ((AtomicReference<SparkConf>)
ReflectionTestUtils.getField(application,
+ "atomicSparkConf"));
+ val actalSparkConf = atomicSparkConf.get();
+ assertFalse(Arrays.stream(actalSparkConf.getAll()).anyMatch(conf ->
conf._1.contains("gluten")));
+ assertEquals("sort", actalSparkConf.get(SPARK_SHUFFLE_MANAGER));
+ assertEquals("org.apache.spark.kyuubi.KyuubiPlugin",
actalSparkConf.get(SPARK_PLUGINS));
+ }
+
@Test
public void testUpdateJobErrorInfo() throws JsonProcessingException {
val config = getTestConfig();
@@ -258,6 +327,7 @@ public class SparkApplicationTest extends
NLocalWithSparkSessionTestBase {
SparkApplication application = Mockito.spy(new SparkApplication() {
@Override
protected void doExecute() {
+ // do nothing
}
});
@@ -292,4 +362,42 @@ public class SparkApplicationTest extends
NLocalWithSparkSessionTestBase {
Mockito.verify(application.getReport(),
Mockito.times(1)).updateSparkJobInfo(paramsMap, "/kylin/api/jobs/error",
json);
}
+
+ @Test
+ public void reportSparkJobExtraInfo() {
+ overwriteSystemProp("kylin.env", "PROD");
+ overwriteSystemProp("kylin.engine.spark.cluster-manager-class-name",
+ MockClusterManager.class.getCanonicalName());
+ val appId = RandomUtil.randomUUIDStr();
+ val config = getTestConfig();
+ val sparkSession = Mockito.mock(SparkSession.class);
+ val sparkContext = Mockito.mock(SparkContext.class);
+ Mockito.when(sparkSession.sparkContext()).thenReturn(sparkContext);
+ Mockito.when(sparkContext.applicationId()).thenReturn(appId);
+ Mockito.when(sparkContext.conf()).thenReturn(new SparkConf());
+ val application = Mockito.spy(new SparkApplication() {
+ @Override
+ protected void doExecute() {
+ // only for test
+ }
+ });
+ MockJobProgressReport mockJobProgressReport = Mockito.spy(new
MockJobProgressReport());
+
Mockito.when(application.getReport()).thenReturn(mockJobProgressReport);
+ ReflectionTestUtils.setField(application, "config", config);
+ val atomicBuildEnv = new
AtomicReference<KylinBuildEnv>(KylinBuildEnv.getOrCreate(config));
+ ReflectionTestUtils.setField(application, "atomicBuildEnv",
atomicBuildEnv);
+ application.reportSparkJobExtraInfo(sparkSession);
+
+ val paramsMap = Maps.<String, String> newHashMap();
+ paramsMap.put(ParamsConstants.TIME_OUT,
String.valueOf(config.getUpdateJobInfoTimeout()));
+ paramsMap.put(ParamsConstants.JOB_TMP_DIR, config.getJobTmpDir(null,
true));
+ val json = Maps.<String, String> newHashMap();
+ json.put("queue_name", "default");
+ json.put("job_last_running_start_time", null);
+ json.put("cores", "0");
+ json.put("memory", "0");
+ json.put("yarn_app_id", appId);
+ Mockito.verify(application.getReport(),
Mockito.times(1)).updateSparkJobExtraInfo(paramsMap,
+ "/kylin/api/jobs/spark", null, null, json);
+ }
}
diff --git
a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java
b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java
index d3898c481e..0dda19355e 100644
---
a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java
+++
b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java
@@ -397,7 +397,7 @@ public class StreamingJobLauncher extends
AbstractSparkJobLauncher {
.setConf(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH,
Paths.get(kylinJobJar).getFileName().toString())
.setConf(SPARK_DRIVER_OPTS, wrapDriverJavaOptions(sparkConf))
.setConf(SPARK_EXECUTOR_OPTS,
wrapExecutorJavaOptions(sparkConf))
- .setConf(SPARK_YARN_AM_OPTS,
wrapYarnAmJavaOptions(sparkConf)).addJar(config.getKylinExtJarsPath())
+ .setConf(SPARK_YARN_AM_OPTS,
wrapYarnAmJavaOptions(sparkConf)).addJar(config.getKylinExtJarsPath(true))
.addFile(config.getLogSparkStreamingExecutorPropertiesFile()).setAppResource(kylinJobJar)
.setMainClass(mainClazz).addAppArgs(appArgs);
handler = sparkLauncher.startApplication(listener);