This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this 
push:
     new 3aa72f3  KYLIN-4875 Remove executor configurations when execute 
resource detect step (local mode)
3aa72f3 is described below

commit 3aa72f3fdc1265aa2a819e5a475f4419d8efcc71
Author: Zhichao Zhang <441586...@qq.com>
AuthorDate: Mon Jan 18 17:21:14 2021 +0800

    KYLIN-4875 Remove executor configurations when execute resource detect step 
(local mode)
---
 build/conf/spark-driver-log4j.properties           |  2 --
 .../org/apache/kylin/common/KylinConfigBase.java   | 22 +++++++++++----
 .../common/logging/AbstractHdfsLogAppender.java    | 33 +++++++++++++++++++---
 .../engine/spark/job/NResourceDetectStep.java      |  9 ++++++
 .../kylin/engine/spark/job/NSparkExecutable.java   | 16 +++++++++--
 5 files changed, 69 insertions(+), 13 deletions(-)

diff --git a/build/conf/spark-driver-log4j.properties 
b/build/conf/spark-driver-log4j.properties
index 8b0e82d..04c648c 100644
--- a/build/conf/spark-driver-log4j.properties
+++ b/build/conf/spark-driver-log4j.properties
@@ -20,9 +20,7 @@
 log4j.rootLogger=INFO,hdfs
 log4j.logger.org.apache.kylin=DEBUG
 log4j.logger.org.springframework=WARN
-log4j.logger.org.springframework.security=WARN
 log4j.logger.org.apache.spark=WARN
-log4j.logger.org.apache.spark.ContextCleaner=WARN
 
 # hdfs file appender
 
log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkDriverHdfsLogAppender
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index b297806..4941595 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2918,14 +2918,19 @@ public abstract class KylinConfigBase implements 
Serializable {
 
     /**
      * Used to upload user-defined log4j configuration
+     *
+     * @param isLocal run spark local mode or not
      */
-    public String sparkUploadFiles() {
+    public String sparkUploadFiles(boolean isLocal) {
         try {
-            File storageFile = 
FileUtils.findFile(KylinConfigBase.getKylinHome() + "/conf",
-                    "spark-executor-log4j.properties");
             String path1 = "";
-            if (storageFile != null) {
-                path1 = storageFile.getCanonicalPath();
+            if (!isLocal) {
+                File storageFile = 
FileUtils.findFile(KylinConfigBase.getKylinHome() + "/conf",
+                        "spark-executor-log4j.properties");
+                if (storageFile != null) {
+                    path1 = storageFile.getCanonicalPath();
+
+                }
             }
 
             return getOptional("kylin.query.engine.sparder-additional-files", 
path1);
@@ -2934,6 +2939,13 @@ public abstract class KylinConfigBase implements 
Serializable {
         }
     }
 
+    /**
+     * Used to upload user-defined log4j configuration
+     */
+    public String sparkUploadFiles() {
+        return sparkUploadFiles(false);
+    }
+
     @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
     public String sparderJars() {
         try {
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
 
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
index 5a90fb2..ff5240c 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
@@ -133,7 +133,7 @@ public abstract class AbstractHdfsLogAppender extends 
AppenderSkeleton {
         logBufferQue = new LinkedBlockingDeque<>(getLogQueueCapacity());
         appendHdfsService = Executors.newSingleThreadExecutor();
         appendHdfsService.execute(this::checkAndFlushLog);
-        Runtime.getRuntime().addShutdownHook(new Thread(this::close));
+        Runtime.getRuntime().addShutdownHook(new Thread(this::closing));
 
         LogLog.warn(String.format(Locale.ROOT, "%s started ...", 
getAppenderName()));
     }
@@ -153,20 +153,45 @@ public abstract class AbstractHdfsLogAppender extends 
AppenderSkeleton {
         }
     }
 
+    /**
+     * flush log when shutdowning
+     */
+    public void closing() {
+        LogLog.warn(String.format(Locale.ROOT, "%s flush log when shutdown 
...",
+                getAppenderName()));
+        synchronized (closeLock) {
+            if (!this.closed) {
+                List<LoggingEvent> transaction = Lists.newArrayList();
+                try {
+                    flushLog(getLogBufferQue().size(), transaction);
+                } catch (Exception e) {
+                    transaction.forEach(this::printLoggingEvent);
+                    try {
+                        while (!getLogBufferQue().isEmpty()) {
+                            printLoggingEvent(getLogBufferQue().take());
+                        }
+                    } catch (Exception ie) {
+                        LogLog.error("clear the logging buffer queue failed!", 
ie);
+                    }
+                }
+            }
+        }
+    }
+
     @Override
     public void close() {
+        LogLog.warn(String.format(Locale.ROOT, "%s attempt to closing ...",
+                getAppenderName()));
         synchronized (closeLock) {
             if (!this.closed) {
                 this.closed = true;
-
                 List<LoggingEvent> transaction = Lists.newArrayList();
                 try {
                     flushLog(getLogBufferQue().size(), transaction);
-
-                    closeWriter();
                     if (appendHdfsService != null && 
!appendHdfsService.isShutdown()) {
                         appendHdfsService.shutdownNow();
                     }
+                    closeWriter();
                 } catch (Exception e) {
                     transaction.forEach(this::printLoggingEvent);
                     try {
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java
index 811dc11..2aed71e 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java
@@ -28,6 +28,10 @@ import 
org.apache.kylin.job.execution.DefaultChainedExecutable;
 
 public class NResourceDetectStep extends NSparkExecutable {
 
+    private final static String[] excludedSparkConf = new String[] 
{"spark.executor.cores",
+            "spark.executor.memoryOverhead", "spark.executor.extraJavaOptions",
+            "spark.executor.instances", "spark.executor.memory", 
"spark.executor.extraClassPath"};
+
     // called by reflection
     public NResourceDetectStep() {
 
@@ -62,6 +66,11 @@ public class NResourceDetectStep extends NSparkExecutable {
         sparkConfigOverride.put("spark.master", "local");
         sparkConfigOverride.put("spark.sql.autoBroadcastJoinThreshold", "-1");
         sparkConfigOverride.put("spark.sql.adaptive.enabled", "false");
+        for (String sparkConf : excludedSparkConf) {
+            if (sparkConfigOverride.containsKey(sparkConf)) {
+                sparkConfigOverride.remove(sparkConf);
+            }
+        }
         return sparkConfigOverride;
     }
 }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 4558cf0..475713f 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -337,6 +337,7 @@ public class NSparkExecutable extends AbstractExecutable {
         }
         sb.append(String.format(Locale.ROOT, " -Dkylin.hdfs.working.dir=%s ", 
hdfsWorkingDir));
         sb.append(String.format(Locale.ROOT, " 
-Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath));
+        sb.append(String.format(Locale.ROOT, " -Dlog4j.debug=%s ", "true"));
         sb.append(String.format(Locale.ROOT, " 
-Dspark.driver.rest.server.ip=%s ", serverIp));
         sb.append(String.format(Locale.ROOT, " 
-Dspark.driver.rest.server.port=%s ", serverPort));
         sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s 
", getId()));
@@ -356,13 +357,18 @@ public class NSparkExecutable extends AbstractExecutable {
         for (Entry<String, String> entry : sparkConfs.entrySet()) {
             appendSparkConf(sb, entry.getKey(), entry.getValue());
         }
-        appendSparkConf(sb, "spark.executor.extraClassPath", 
Paths.get(kylinJobJar).getFileName().toString());
+        if (!isLocalMaster(sparkConfs)) {
+            appendSparkConf(sb, "spark.executor.extraClassPath", 
Paths.get(kylinJobJar).getFileName().toString());
+        }
         appendSparkConf(sb, "spark.driver.extraClassPath", kylinJobJar);
 
         if (sparkConfs.containsKey("spark.sql.hive.metastore.jars")) {
             jars = jars + "," + 
sparkConfs.get("spark.sql.hive.metastore.jars");
         }
-        sb.append("--files ").append(config.sparkUploadFiles()).append(" ");
+        String sparkUploadFiles = 
config.sparkUploadFiles(isLocalMaster(sparkConfs));
+        if (StringUtils.isNotBlank(sparkUploadFiles)) {
+            sb.append("--files ").append(sparkUploadFiles).append(" ");
+        }
         sb.append("--name job_step_%s ");
         sb.append("--jars %s %s %s");
         String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, 
sparkSubmitCmd, getId(), jars, kylinJobJar,
@@ -410,6 +416,12 @@ public class NSparkExecutable extends AbstractExecutable {
         }
     }
 
+    protected boolean isLocalMaster(Map<String, String> sparkConfs) {
+        String master = sparkConfs.getOrDefault("spark.master", "yarn");
+        return (master.equalsIgnoreCase("local")) || 
(master.toLowerCase(Locale.ROOT)
+                .startsWith("local["));
+    }
+
     public boolean needMergeMetadata() {
         return false;
     }

Reply via email to