This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this
push:
new c2f261b KYLIN-5023 Some fix for spark standalone
c2f261b is described below
commit c2f261b09c17db9b7e9ee21117c5c7d783811fab
Author: yaqian.zhang <[email protected]>
AuthorDate: Tue Jul 13 14:26:30 2021 +0800
KYLIN-5023 Some fix for spark standalone
---
.../java/org/apache/kylin/common/KylinConfigBase.java | 16 ++++++++++++++--
.../apache/kylin/engine/spark/job/NSparkExecutable.java | 5 ++++-
.../org/apache/spark/deploy/SparkApplicationClient.scala | 7 ++++---
.../main/scala/org/apache/spark/sql/SparderContext.scala | 4 +++-
4 files changed, 25 insertions(+), 7 deletions(-)
diff --git
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index bccda65..8915669 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1591,8 +1591,20 @@ public abstract class KylinConfigBase implements
Serializable {
return getPropertiesByPrefix("kylin.engine.flink-conf.");
}
- public Map<String, String> getSparkConfigOverrideWithSpecificName(String
configName) {
- return getPropertiesByPrefix("kylin.engine.spark-conf-" + configName +
".");
+ public String getSparkEngineConfigOverrideWithSpecificName(String
configName) {
+ Map<String, String> config =
getPropertiesByPrefix("kylin.engine.spark-conf." + configName);
+ if (config.size() != 0) {
+ return String.valueOf(config.values().iterator().next());
+ }
+ return null;
+ }
+
+ public String getSparderConfigOverrideWithSpecificName(String configName) {
+ Map<String, String> config =
getPropertiesByPrefix("kylin.query.spark-conf." + configName);
+ if (config.size() != 0) {
+ return String.valueOf(config.values().iterator().next());
+ }
+ return null;
}
public Map<String, String> getFlinkConfigOverrideWithSpecificName(String
configName) {
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 6859e65..fbfae2e 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -161,7 +161,10 @@ public class NSparkExecutable extends AbstractExecutable {
return runLocalMode(filePath, config);
} else {
logger.info("Task id: {}", getId());
- killOrphanApplicationIfExists(config, getId());
+ if
("yarn".equals(config.getSparkEngineConfigOverrideWithSpecificName("spark.master")))
{
+ logger.info("Try to kill orphan application on yarn.");
+ killOrphanApplicationIfExists(config, getId());
+ }
return runSparkSubmit(config, hadoopConf, jars, kylinJobJar,
"-className " + getSparkSubmitClassName() + " " +
filePath, getParent().getId());
}
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala
index 5b2caf4..db562b1 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala
@@ -43,11 +43,12 @@ object SparkApplicationClient extends Logging {
case STANDALONE_CLUSTER =>
var appState = StandaloneAppClient.getAppState(stepId)
while (true) {
+ appState = StandaloneAppClient.getAppState(stepId)
logInfo(s"$stepId state is $appState .")
- if (!finalStates.contains(appState)) {
- Thread.sleep(10000)
+ if (finalStates.contains(appState)) {
+ return appState
}
- appState = StandaloneAppClient.getAppState(stepId)
+ Thread.sleep(10000)
}
appState
case m => throw new UnsupportedOperationException("waitAndCheckAppState
" + m)
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
index 530cbae..c6916d2 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
@@ -136,10 +136,12 @@ object SparderContext extends Logging {
case "true" =>
"local"
case _ =>
- "yarn-client"
+
kylinConf.getSparderConfigOverrideWithSpecificName("spark.master")
}
+ logInfo("SparderContext deploy with spark master: " + master)
val sparkSession = SparkSession.builder
.master(master)
+ .config("spark.submit.deployMode", "client")
.appName(kylinConf.getSparderAppName)
.withExtensions { ext =>
ext.injectPlannerStrategy(_ => KylinSourceStrategy)