This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 2d6d8b55ff384d6ce393e1111feecd35987600d0 Author: Zhichao Zhang <441586...@qq.com> AuthorDate: Thu Aug 27 15:04:38 2020 +0800 KYLIN-4695 Rewrite application id to file when restart sparder application --- .gitignore | 3 ++- .../org/apache/spark/sql/SparderContext.scala | 15 +++++++++++- ...rkerContext.java => InitialSparderContext.java} | 28 ++++++---------------- server/src/main/resources/applicationContext.xml | 2 +- 4 files changed, 24 insertions(+), 24 deletions(-) diff --git a/.gitignore b/.gitignore index b02ae49..9213fd0 100644 --- a/.gitignore +++ b/.gitignore @@ -95,6 +95,7 @@ webapp/package-lock.json #kylin examples/test_case_data/* +examples/sparkappid src/spark-project/examples/test_metadata/* # stream_index -stream-receiver/stream_index \ No newline at end of file +stream-receiver/stream_index diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala index 1c9f500..d40e2d1 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import java.lang.{Boolean => JBoolean, String => JString} +import java.nio.file.Paths import org.apache.spark.memory.MonitorEnv import org.apache.spark.util.Utils @@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.KylinSession._ import java.util.concurrent.atomic.AtomicReference +import org.apache.commons.io.FileUtils import org.apache.kylin.common.KylinConfig import org.apache.kylin.spark.classloader.ClassLoaderUtils import org.apache.spark.{SparkConf, SparkContext, SparkEnv} @@ -146,7 +148,18 @@ object SparderContext extends Logging { .getOrCreateKylinSession() } spark = sparkSession - sparkSession.sparkContext.applicationId + val appid = sparkSession.sparkContext.applicationId + // write application id to file 'sparkappid' + val kylinHomePath = KylinConfig.getKylinHomeAtBestEffort().getCanonicalPath + try { + val appidFile = Paths.get(kylinHomePath, "sparkappid").toFile + FileUtils.writeStringToFile(appidFile, appid) + logInfo("Spark application id is " + appid) + } catch { + case e: Exception => + logError("Failed to generate spark application id[" + appid + "] file", e) + } + logInfo("Spark context started successfully with stack trace:") logInfo(Thread.currentThread().getStackTrace.mkString("\n")) logInfo( diff --git a/server-base/src/main/java/org/apache/kylin/rest/init/InitialSparkerContext.java b/server-base/src/main/java/org/apache/kylin/rest/init/InitialSparderContext.java similarity index 61% rename from server-base/src/main/java/org/apache/kylin/rest/init/InitialSparkerContext.java rename to server-base/src/main/java/org/apache/kylin/rest/init/InitialSparderContext.java index 34f977a..05559e9 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/init/InitialSparkerContext.java +++ b/server-base/src/main/java/org/apache/kylin/rest/init/InitialSparderContext.java @@ -18,23 +18,16 @@ package org.apache.kylin.rest.init; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.ServerMode; import org.apache.spark.sql.SparderContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; -import java.io.File; -import java.nio.file.Paths; +public class InitialSparderContext implements InitializingBean { -/** - * Created by zhangzc on 8/26/20. - */ -public class InitialSparkerContext implements InitializingBean { - - private static final Logger logger = LoggerFactory.getLogger(InitialSparkerContext.class); + private static final Logger logger = LoggerFactory.getLogger(InitialSparderContext.class); @Override public void afterPropertiesSet() throws Exception { @@ -42,18 +35,11 @@ public class InitialSparkerContext implements InitializingBean { } private void runInitialSparder() { + if (ServerMode.isJobOnly(KylinConfig.getInstanceFromEnv())) { + logger.info("This is job node, do not need to start Spark"); + return; + } logger.info("Spark is starting....."); SparderContext.init(); - final String kylinHome = StringUtils.defaultIfBlank(KylinConfig.getKylinHome(), "./"); - final File appidFile = Paths.get(kylinHome, "sparkappid").toFile(); - String appid = null; - try { - appid = SparderContext.getSparkSession().sparkContext().applicationId(); - FileUtils.writeStringToFile(appidFile, appid); - logger.info("Spark application id is {}", appid); - } catch (Exception e) { - logger.error("Failed to generate spark application id[{}] file", - StringUtils.defaultString(appid), e); - } } } diff --git a/server/src/main/resources/applicationContext.xml b/server/src/main/resources/applicationContext.xml index dcf249b..be13a89 100644 --- a/server/src/main/resources/applicationContext.xml +++ b/server/src/main/resources/applicationContext.xml @@ -38,7 +38,7 @@ <aop:aspectj-autoproxy/> <bean class="org.apache.kylin.rest.init.InitialTaskManager" /> - <bean class="org.apache.kylin.rest.init.InitialSparkerContext" /> + <bean class="org.apache.kylin.rest.init.InitialSparderContext" /> <context:component-scan base-package="org.apache.kylin.rest"/>