This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this 
push:
     new c2f261b  KYLIN-5023 Some fix for spark standalone
c2f261b is described below

commit c2f261b09c17db9b7e9ee21117c5c7d783811fab
Author: yaqian.zhang <598593...@qq.com>
AuthorDate: Tue Jul 13 14:26:30 2021 +0800

    KYLIN-5023 Some fix for spark standalone
---
 .../java/org/apache/kylin/common/KylinConfigBase.java    | 16 ++++++++++++++--
 .../apache/kylin/engine/spark/job/NSparkExecutable.java  |  5 ++++-
 .../org/apache/spark/deploy/SparkApplicationClient.scala |  7 ++++---
 .../main/scala/org/apache/spark/sql/SparderContext.scala |  4 +++-
 4 files changed, 25 insertions(+), 7 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index bccda65..8915669 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1591,8 +1591,20 @@ public abstract class KylinConfigBase implements 
Serializable {
         return getPropertiesByPrefix("kylin.engine.flink-conf.");
     }
 
-    public Map<String, String> getSparkConfigOverrideWithSpecificName(String 
configName) {
-        return getPropertiesByPrefix("kylin.engine.spark-conf-" + configName + 
".");
+    public String getSparkEngineConfigOverrideWithSpecificName(String 
configName) {
+        Map<String, String> config = 
getPropertiesByPrefix("kylin.engine.spark-conf." + configName);
+        if (config.size() != 0) {
+            return String.valueOf(config.values().iterator().next());
+        }
+        return null;
+    }
+
+    public String getSparderConfigOverrideWithSpecificName(String configName) {
+        Map<String, String> config = 
getPropertiesByPrefix("kylin.query.spark-conf." + configName);
+        if (config.size() != 0) {
+            return String.valueOf(config.values().iterator().next());
+        }
+        return null;
     }
 
     public Map<String, String> getFlinkConfigOverrideWithSpecificName(String 
configName) {
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 6859e65..fbfae2e 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -161,7 +161,10 @@ public class NSparkExecutable extends AbstractExecutable {
             return runLocalMode(filePath, config);
         } else {
             logger.info("Task id: {}", getId());
-            killOrphanApplicationIfExists(config, getId());
+            if 
("yarn".equals(config.getSparkEngineConfigOverrideWithSpecificName("spark.master")))
 {
+                logger.info("Try to kill orphan application on yarn.");
+                killOrphanApplicationIfExists(config, getId());
+            }
             return runSparkSubmit(config, hadoopConf, jars, kylinJobJar,
                     "-className " + getSparkSubmitClassName() + " " + 
filePath, getParent().getId());
         }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala
index 5b2caf4..db562b1 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala
@@ -43,11 +43,12 @@ object SparkApplicationClient extends Logging {
       case STANDALONE_CLUSTER =>
         var appState = StandaloneAppClient.getAppState(stepId)
         while (true) {
+          appState = StandaloneAppClient.getAppState(stepId)
           logInfo(s"$stepId state is $appState .")
-          if (!finalStates.contains(appState)) {
-            Thread.sleep(10000)
+          if (finalStates.contains(appState)) {
+            return appState
           }
-          appState = StandaloneAppClient.getAppState(stepId)
+          Thread.sleep(10000)
         }
         appState
       case m => throw new UnsupportedOperationException("waitAndCheckAppState 
" + m)
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
index 530cbae..c6916d2 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
@@ -136,10 +136,12 @@ object SparderContext extends Logging {
                 case "true" =>
                   "local"
                 case _ =>
-                  "yarn-client"
+                  
kylinConf.getSparderConfigOverrideWithSpecificName("spark.master")
               }
+              logInfo("SparderContext deploy with spark master: " + master)
               val sparkSession = SparkSession.builder
                 .master(master)
+                .config("spark.submit.deployMode", "client")
                 .appName(kylinConf.getSparderAppName)
                 .withExtensions { ext =>
                   ext.injectPlannerStrategy(_ => KylinSourceStrategy)

Reply via email to