This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new 3aa72f3 KYLIN-4875 Remove executor configurations when execute resource detect step (local mode) 3aa72f3 is described below commit 3aa72f3fdc1265aa2a819e5a475f4419d8efcc71 Author: Zhichao Zhang <441586...@qq.com> AuthorDate: Mon Jan 18 17:21:14 2021 +0800 KYLIN-4875 Remove executor configurations when execute resource detect step (local mode) --- build/conf/spark-driver-log4j.properties | 2 -- .../org/apache/kylin/common/KylinConfigBase.java | 22 +++++++++++---- .../common/logging/AbstractHdfsLogAppender.java | 33 +++++++++++++++++++--- .../engine/spark/job/NResourceDetectStep.java | 9 ++++++ .../kylin/engine/spark/job/NSparkExecutable.java | 16 +++++++++-- 5 files changed, 69 insertions(+), 13 deletions(-) diff --git a/build/conf/spark-driver-log4j.properties b/build/conf/spark-driver-log4j.properties index 8b0e82d..04c648c 100644 --- a/build/conf/spark-driver-log4j.properties +++ b/build/conf/spark-driver-log4j.properties @@ -20,9 +20,7 @@ log4j.rootLogger=INFO,hdfs log4j.logger.org.apache.kylin=DEBUG log4j.logger.org.springframework=WARN -log4j.logger.org.springframework.security=WARN log4j.logger.org.apache.spark=WARN -log4j.logger.org.apache.spark.ContextCleaner=WARN # hdfs file appender log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkDriverHdfsLogAppender diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index b297806..4941595 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2918,14 +2918,19 @@ public abstract class KylinConfigBase implements Serializable { /** * Used to upload user-defined log4j configuration + * + * @param isLocal run spark local mode or not */ - public String sparkUploadFiles() { + public String sparkUploadFiles(boolean isLocal) { try { - File storageFile = FileUtils.findFile(KylinConfigBase.getKylinHome() + "/conf", - "spark-executor-log4j.properties"); String path1 = ""; - if (storageFile != null) { - path1 = storageFile.getCanonicalPath(); + if (!isLocal) { + File storageFile = FileUtils.findFile(KylinConfigBase.getKylinHome() + "/conf", + "spark-executor-log4j.properties"); + if (storageFile != null) { + path1 = storageFile.getCanonicalPath(); + + } } return getOptional("kylin.query.engine.sparder-additional-files", path1); @@ -2934,6 +2939,13 @@ public abstract class KylinConfigBase implements Serializable { } } + /** + * Used to upload user-defined log4j configuration + */ + public String sparkUploadFiles() { + return sparkUploadFiles(false); + } + @ConfigTag(ConfigTag.Tag.NOT_CLEAR) public String sparderJars() { try { diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java index 5a90fb2..ff5240c 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java +++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java @@ -133,7 +133,7 @@ public abstract class AbstractHdfsLogAppender extends AppenderSkeleton { logBufferQue = new LinkedBlockingDeque<>(getLogQueueCapacity()); appendHdfsService = Executors.newSingleThreadExecutor(); appendHdfsService.execute(this::checkAndFlushLog); - Runtime.getRuntime().addShutdownHook(new Thread(this::close)); + Runtime.getRuntime().addShutdownHook(new Thread(this::closing)); LogLog.warn(String.format(Locale.ROOT, "%s started ...", getAppenderName())); } @@ -153,20 +153,45 @@ public abstract class AbstractHdfsLogAppender extends AppenderSkeleton { } } + /** + * flush log when shutdowning + */ + public void closing() { + LogLog.warn(String.format(Locale.ROOT, "%s flush log when shutdown ...", + getAppenderName())); + synchronized (closeLock) { + if (!this.closed) { + List<LoggingEvent> transaction = Lists.newArrayList(); + try { + flushLog(getLogBufferQue().size(), transaction); + } catch (Exception e) { + transaction.forEach(this::printLoggingEvent); + try { + while (!getLogBufferQue().isEmpty()) { + printLoggingEvent(getLogBufferQue().take()); + } + } catch (Exception ie) { + LogLog.error("clear the logging buffer queue failed!", ie); + } + } + } + } + } + @Override public void close() { + LogLog.warn(String.format(Locale.ROOT, "%s attempt to closing ...", + getAppenderName())); synchronized (closeLock) { if (!this.closed) { this.closed = true; - List<LoggingEvent> transaction = Lists.newArrayList(); try { flushLog(getLogBufferQue().size(), transaction); - - closeWriter(); if (appendHdfsService != null && !appendHdfsService.isShutdown()) { appendHdfsService.shutdownNow(); } + closeWriter(); } catch (Exception e) { transaction.forEach(this::printLoggingEvent); try { diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java index 811dc11..2aed71e 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java @@ -28,6 +28,10 @@ import org.apache.kylin.job.execution.DefaultChainedExecutable; public class NResourceDetectStep extends NSparkExecutable { + private final static String[] excludedSparkConf = new String[] {"spark.executor.cores", + "spark.executor.memoryOverhead", "spark.executor.extraJavaOptions", + "spark.executor.instances", "spark.executor.memory", "spark.executor.extraClassPath"}; + // called by reflection public NResourceDetectStep() { @@ -62,6 +66,11 @@ public class NResourceDetectStep extends NSparkExecutable { sparkConfigOverride.put("spark.master", "local"); sparkConfigOverride.put("spark.sql.autoBroadcastJoinThreshold", "-1"); sparkConfigOverride.put("spark.sql.adaptive.enabled", "false"); + for (String sparkConf : excludedSparkConf) { + if (sparkConfigOverride.containsKey(sparkConf)) { + sparkConfigOverride.remove(sparkConf); + } + } return sparkConfigOverride; } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java index 4558cf0..475713f 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java @@ -337,6 +337,7 @@ public class NSparkExecutable extends AbstractExecutable { } sb.append(String.format(Locale.ROOT, " -Dkylin.hdfs.working.dir=%s ", hdfsWorkingDir)); sb.append(String.format(Locale.ROOT, " -Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath)); + sb.append(String.format(Locale.ROOT, " -Dlog4j.debug=%s ", "true")); sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.ip=%s ", serverIp)); sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.port=%s ", serverPort)); sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s ", getId())); @@ -356,13 +357,18 @@ public class NSparkExecutable extends AbstractExecutable { for (Entry<String, String> entry : sparkConfs.entrySet()) { appendSparkConf(sb, entry.getKey(), entry.getValue()); } - appendSparkConf(sb, "spark.executor.extraClassPath", Paths.get(kylinJobJar).getFileName().toString()); + if (!isLocalMaster(sparkConfs)) { + appendSparkConf(sb, "spark.executor.extraClassPath", Paths.get(kylinJobJar).getFileName().toString()); + } appendSparkConf(sb, "spark.driver.extraClassPath", kylinJobJar); if (sparkConfs.containsKey("spark.sql.hive.metastore.jars")) { jars = jars + "," + sparkConfs.get("spark.sql.hive.metastore.jars"); } - sb.append("--files ").append(config.sparkUploadFiles()).append(" "); + String sparkUploadFiles = config.sparkUploadFiles(isLocalMaster(sparkConfs)); + if (StringUtils.isNotBlank(sparkUploadFiles)) { + sb.append("--files ").append(sparkUploadFiles).append(" "); + } sb.append("--name job_step_%s "); sb.append("--jars %s %s %s"); String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, sparkSubmitCmd, getId(), jars, kylinJobJar, @@ -410,6 +416,12 @@ public class NSparkExecutable extends AbstractExecutable { } } + protected boolean isLocalMaster(Map<String, String> sparkConfs) { + String master = sparkConfs.getOrDefault("spark.master", "yarn"); + return (master.equalsIgnoreCase("local")) || (master.toLowerCase(Locale.ROOT) + .startsWith("local[")); + } + public boolean needMergeMetadata() { return false; }