This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new c2f261b KYLIN-5023 Some fix for spark standalone c2f261b is described below commit c2f261b09c17db9b7e9ee21117c5c7d783811fab Author: yaqian.zhang <598593...@qq.com> AuthorDate: Tue Jul 13 14:26:30 2021 +0800 KYLIN-5023 Some fix for spark standalone --- .../java/org/apache/kylin/common/KylinConfigBase.java | 16 ++++++++++++++-- .../apache/kylin/engine/spark/job/NSparkExecutable.java | 5 ++++- .../org/apache/spark/deploy/SparkApplicationClient.scala | 7 ++++--- .../main/scala/org/apache/spark/sql/SparderContext.scala | 4 +++- 4 files changed, 25 insertions(+), 7 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index bccda65..8915669 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1591,8 +1591,20 @@ public abstract class KylinConfigBase implements Serializable { return getPropertiesByPrefix("kylin.engine.flink-conf."); } - public Map<String, String> getSparkConfigOverrideWithSpecificName(String configName) { - return getPropertiesByPrefix("kylin.engine.spark-conf-" + configName + "."); + public String getSparkEngineConfigOverrideWithSpecificName(String configName) { + Map<String, String> config = getPropertiesByPrefix("kylin.engine.spark-conf." + configName); + if (config.size() != 0) { + return String.valueOf(config.values().iterator().next()); + } + return null; + } + + public String getSparderConfigOverrideWithSpecificName(String configName) { + Map<String, String> config = getPropertiesByPrefix("kylin.query.spark-conf." + configName); + if (config.size() != 0) { + return String.valueOf(config.values().iterator().next()); + } + return null; } public Map<String, String> getFlinkConfigOverrideWithSpecificName(String configName) { diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java index 6859e65..fbfae2e 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java @@ -161,7 +161,10 @@ public class NSparkExecutable extends AbstractExecutable { return runLocalMode(filePath, config); } else { logger.info("Task id: {}", getId()); - killOrphanApplicationIfExists(config, getId()); + if ("yarn".equals(config.getSparkEngineConfigOverrideWithSpecificName("spark.master"))) { + logger.info("Try to kill orphan application on yarn."); + killOrphanApplicationIfExists(config, getId()); + } return runSparkSubmit(config, hadoopConf, jars, kylinJobJar, "-className " + getSparkSubmitClassName() + " " + filePath, getParent().getId()); } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala index 5b2caf4..db562b1 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala @@ -43,11 +43,12 @@ object SparkApplicationClient extends Logging { case STANDALONE_CLUSTER => var appState = StandaloneAppClient.getAppState(stepId) while (true) { + appState = StandaloneAppClient.getAppState(stepId) logInfo(s"$stepId state is $appState .") - if (!finalStates.contains(appState)) { - Thread.sleep(10000) + if (finalStates.contains(appState)) { + return appState } - appState = StandaloneAppClient.getAppState(stepId) + Thread.sleep(10000) } appState case m => throw new UnsupportedOperationException("waitAndCheckAppState " + m) diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala index 530cbae..c6916d2 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala @@ -136,10 +136,12 @@ object SparderContext extends Logging { case "true" => "local" case _ => - "yarn-client" + kylinConf.getSparderConfigOverrideWithSpecificName("spark.master") } + logInfo("SparderContext deploy with spark master: " + master) val sparkSession = SparkSession.builder .master(master) + .config("spark.submit.deployMode", "client") .appName(kylinConf.getSparderAppName) .withExtensions { ext => ext.injectPlannerStrategy(_ => KylinSourceStrategy)