This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 9a4df78f291cd7e34a30cbbdd9ec43124e6cd46f
Author: yaqian.zhang <598593...@qq.com>
AuthorDate: Tue Nov 24 14:12:06 2020 +0800

    KYLIN-4813 Some adjustments for executor-side log collection
---
 .../org/apache/kylin/common/KylinConfigBase.java   | 18 ++++---
 .../org/apache/kylin/common/KylinConfigExt.java    | 18 +++++--
 .../src/main/resources/kylin-defaults.properties   |  2 +-
 .../common/logging/SparkExecutorHdfsAppender.java  |  2 +-
 .../engine/spark/application/SparkApplication.java |  9 +---
 .../kylin/engine/spark/job/NSparkExecutable.java   | 61 ++++++++--------------
 .../kylin/engine/spark/utils/MetaDumpUtil.java     |  3 +-
 7 files changed, 53 insertions(+), 60 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 01dc374..964eb64 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -181,8 +181,8 @@ public abstract class KylinConfigBase implements 
Serializable {
     protected String getOptional(String prop, String dft) {
 
         final String property = System.getProperty(prop);
-        return property != null ? StrSubstitutor.replace(property, 
System.getenv())
-                : StrSubstitutor.replace(properties.getProperty(prop, dft), 
System.getenv());
+        return property != null ? getSubstitutor().replace(property, 
System.getenv())
+                : getSubstitutor().replace(properties.getProperty(prop, dft), 
System.getenv());
     }
 
     protected Properties getAllProperties() {
@@ -194,8 +194,7 @@ public abstract class KylinConfigBase implements 
Serializable {
      * @return properties which contained in propertyKeys
      */
     protected Properties getProperties(Collection<String> propertyKeys) {
-        Map<String, String> envMap = System.getenv();
-        StrSubstitutor sub = new StrSubstitutor(envMap);
+        final StrSubstitutor sub = getSubstitutor();
 
         Properties filteredProperties = new Properties();
         for (Entry<Object, Object> entry : this.properties.entrySet()) {
@@ -206,9 +205,17 @@ public abstract class KylinConfigBase implements 
Serializable {
         return filteredProperties;
     }
 
+    protected StrSubstitutor getSubstitutor() {
+        // env > properties
+        final Map<String, Object> all = Maps.newHashMap();
+        all.putAll((Map) properties);
+        all.putAll(System.getenv());
+
+        return new StrSubstitutor(all);
+    }
+
     protected Properties getRawAllProperties() {
         return properties;
-
     }
 
     protected final Map<String, String> getPropertiesByPrefix(String prefix) {
@@ -259,7 +266,6 @@ public abstract class KylinConfigBase implements 
Serializable {
     final protected void reloadKylinConfig(Properties properties) {
         this.properties = BCC.check(properties);
         setProperty("kylin.metadata.url.identifier", getMetadataUrlPrefix());
-        setProperty("kylin.log.spark-driver-properties-file", 
getLogSparkDriverPropertiesFile());
         setProperty("kylin.log.spark-executor-properties-file", 
getLogSparkExecutorPropertiesFile());
     }
 
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
index ddec154..fda4100 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
@@ -18,11 +18,12 @@
 
 package org.apache.kylin.common;
 
+import org.apache.commons.lang.text.StrSubstitutor;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.commons.lang3.text.StrSubstitutor;
-
 /**
  * Extends a KylinConfig with additional overrides.
  */
@@ -59,7 +60,7 @@ public class KylinConfigExt extends KylinConfig {
     public String getOptional(String prop, String dft) {
         String value = overrides.get(prop);
         if (value != null)
-            return   StrSubstitutor.replace(value, System.getenv());
+            return   getSubstitutor().replace(value, System.getenv());
         else
             return super.getOptional(prop, dft);
     }
@@ -67,11 +68,20 @@ public class KylinConfigExt extends KylinConfig {
     @Override
     protected Properties getAllProperties() {
         Properties result = new Properties();
-        result.putAll(super.getRawAllProperties());
+        result.putAll(super.getAllProperties());
         result.putAll(overrides);
         return result;
     }
 
+    @Override
+    protected StrSubstitutor getSubstitutor() {
+        final Map<String, Object> all = Maps.newHashMap();
+        all.putAll((Map) properties);
+        all.putAll(System.getenv());
+        all.putAll(overrides);
+        return new StrSubstitutor(all);
+    }
+
     public Map<String, String> getExtendedOverrides() {
         return overrides;
     }
diff --git a/core-common/src/main/resources/kylin-defaults.properties 
b/core-common/src/main/resources/kylin-defaults.properties
index 858278f..c5cd317 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -259,7 +259,7 @@ kylin.engine.spark-conf.spark.eventLog.enabled=true
 kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
 
kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
 kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
-kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 
-Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties 
-Dlog4j.debug -Dkylin.spark.category=job
+kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 
-Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties 
-Dlog4j.debug -Dkylin.hdfs.working.dir=${hdfs.working.dir} 
-Dkylin.metadata.identifier=${kylin.metadata.url.identifier} 
-Dkylin.spark.category=job -Dkylin.spark.project=${job.project} 
-Dkylin.spark.identifier=${job.id} -Dkylin.spark.jobName=${job.stepId} 
-Duser.timezone=${user.timezone}
 #kylin.engine.spark-conf.spark.sql.shuffle.partitions=1
 
 # manually upload spark-assembly jar to HDFS and then set this property will 
avoid repeatedly uploading jar at runtime
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
 
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
index d8b250f..b2e4146 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
@@ -229,7 +229,7 @@ public class SparkExecutorHdfsAppender extends 
AbstractHdfsLogAppender {
     @VisibleForTesting
     String getRootPathName() {
         if ("job".equals(getCategory())) {
-            return parseHdfsWordingDir() + "/" + getProject() + "/spark_logs";
+            return parseHdfsWordingDir() + "/" + getProject() + 
"/spark_logs/executor/";
         } else if ("sparder".equals(getCategory())) {
             return parseHdfsWordingDir() + "/_sparder_logs";
         } else {
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 4ec461a..84737cb 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -135,13 +135,8 @@ public abstract class SparkApplication {
                 }
                 if (!config.getSparkConfigOverride().isEmpty()) {
                     for (Map.Entry<String, String> entry : 
config.getSparkConfigOverride().entrySet()) {
-                        if 
(entry.getKey().contains("spark.executor.extraJavaOptions")) {
-                            // Just let 
NSparkExecutable#replaceSparkNodeJavaOpsConfIfNeeded(in JobServer) to determine 
executor's JVM level configuration
-                            logger.info("Do not override {}={}.", 
entry.getKey(), entry.getValue());
-                        } else {
-                            logger.info("Override user-defined spark conf, set 
{}={}.", entry.getKey(), entry.getValue());
-                            sparkConf.set(entry.getKey(), entry.getValue());
-                        }
+                        logger.info("Override user-defined spark conf, set 
{}={}.", entry.getKey(), entry.getValue());
+                        sparkConf.set(entry.getKey(), entry.getValue());
                     }
                 }
             } else if (!isJobOnCluster(sparkConf)) {
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 987c215..7cd178c 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -163,7 +163,7 @@ public class NSparkExecutable extends AbstractExecutable {
     void attachMetadataAndKylinProps(KylinConfig config) throws IOException {
         // The way of Updating metadata is CopyOnWrite. So it is safe to use 
Reference in the value.
         Set<String> dumpList = getMetadataDumpList(config);
-        MetaDumpUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, 
KylinConfigExt.createInstance(config, new HashMap<>()), getDistMetaUrl());
+        MetaDumpUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, config, 
getDistMetaUrl());
     }
 
     String dumpArgs() throws ExecuteException {
@@ -211,6 +211,7 @@ public class NSparkExecutable extends AbstractExecutable {
             jobOverrides.put("job.stepId", getId());
         }
         jobOverrides.put("user.timezone", 
KylinConfig.getInstanceFromEnv().getTimeZone());
+        jobOverrides.put("hdfs.working.dir", 
KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
         jobOverrides.put("spark.driver.log4j.appender.hdfs.File",
                 Objects.isNull(this.getLogPath()) ? "null" : 
this.getLogPath());
 
@@ -302,8 +303,7 @@ public class NSparkExecutable extends AbstractExecutable {
             
sparkConfigOverride.put("spark.hadoop.hive.metastore.sasl.enabled", "true");
         }
 
-        replaceSparkNodeJavaOpsConfIfNeeded(true, config, sparkConfigOverride);
-        replaceSparkNodeJavaOpsConfIfNeeded(false, config, 
sparkConfigOverride);
+        replaceSparkNodeJavaOpsConfIfNeeded(config, sparkConfigOverride);
         return sparkConfigOverride;
     }
 
@@ -313,11 +313,13 @@ public class NSparkExecutable extends AbstractExecutable {
      * 1. conf/spark-driver-log4j.properties and 
conf/spark-executor-log4j.properties
      * 2. AbstractHdfsLogAppender
      */
-    private void replaceSparkNodeJavaOpsConfIfNeeded(boolean isDriver, 
KylinConfig config,
+    private void replaceSparkNodeJavaOpsConfIfNeeded(KylinConfig config,
                                                      Map<String, String> 
sparkConfigOverride) {
-        String sparkNodeExtraJavaOptionsKey = isDriver ? 
"spark.driver.extraJavaOptions" : "spark.executor.extraJavaOptions";
+        String sparkDriverExtraJavaOptionsKey = 
"spark.driver.extraJavaOptions";
         StringBuilder sb = new StringBuilder();
-
+        if (sparkConfigOverride.containsKey(sparkDriverExtraJavaOptionsKey)) {
+            sb.append(sparkConfigOverride.get(sparkDriverExtraJavaOptionsKey));
+        }
         String serverIp = "127.0.0.1";
         try {
             serverIp = InetAddress.getLocalHost().getHostAddress();
@@ -326,35 +328,17 @@ public class NSparkExecutable extends AbstractExecutable {
         }
         String serverPort = config.getServerPort();
         String hdfsWorkingDir = config.getHdfsWorkingDirectory();
-        String log4jConfiguration;
-
-        if (isDriver) {
-            String sparkDriverHdfsLogPath;
-            if (config instanceof KylinConfigExt) {
-                Map<String, String> extendedOverrides = ((KylinConfigExt) 
config).getExtendedOverrides();
-                if (Objects.nonNull(extendedOverrides)) {
-                    sparkDriverHdfsLogPath = 
extendedOverrides.get("spark.driver.log4j.appender.hdfs.File");
-                    sb.append(String.format(Locale.ROOT, " 
-Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath));
-                }
-            }
-            log4jConfiguration = "file:" + 
config.getLogSparkDriverPropertiesFile();
-            sb.append(String.format(Locale.ROOT, " -Dlog4j.configuration=%s ", 
log4jConfiguration));
-            sb.append(String.format(Locale.ROOT, " 
-Dspark.driver.rest.server.ip=%s ", serverIp));
-            sb.append(String.format(Locale.ROOT, " 
-Dspark.driver.rest.server.port=%s ", serverPort));
-            sb.append(String.format(Locale.ROOT, " 
-Dspark.driver.param.taskId=%s ", getId()));
-            sb.append(String.format(Locale.ROOT, " 
-Dspark.driver.local.logDir=%s ", config.getKylinLogDir() + "/spark"));
-        } else {
-            if (config instanceof KylinConfigExt) {
-                Map<String, String> extendedOverrides = ((KylinConfigExt) 
config).getExtendedOverrides();
-                if (Objects.nonNull(extendedOverrides)) {
-                    sb.append(String.format(Locale.ROOT, " 
-Dkylin.spark.project=%s ", extendedOverrides.get("job.project")));
-                    sb.append(String.format(Locale.ROOT, " 
-Dkylin.spark.jobName=%s ", extendedOverrides.get("job.stepId")));
-                    sb.append(String.format(Locale.ROOT, " 
-Dkylin.spark.identifier=%s ", extendedOverrides.get("job.id")));
-                }
+
+        String sparkDriverHdfsLogPath = null;
+        if (config instanceof KylinConfigExt) {
+            Map<String, String> extendedOverrides = ((KylinConfigExt) 
config).getExtendedOverrides();
+            if (Objects.nonNull(extendedOverrides)) {
+                sparkDriverHdfsLogPath = 
extendedOverrides.get("spark.driver.log4j.appender.hdfs.File");
             }
-            sb.append(String.format(Locale.ROOT, " 
-Dkylin.metadata.identifier=%s ", config.getMetadataUrlPrefix()));
         }
 
+        String log4jConfiguration = "file:" + 
config.getLogSparkDriverPropertiesFile();
+        sb.append(String.format(Locale.ROOT, " -Dlog4j.configuration=%s ", 
log4jConfiguration));
         sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.enabled=%s ", 
config.isKerberosEnabled()));
         if (config.isKerberosEnabled()) {
             sb.append(String.format(Locale.ROOT, " 
-Dkylin.kerberos.principal=%s ", config.getKerberosPrincipal()));
@@ -365,13 +349,12 @@ public class NSparkExecutable extends AbstractExecutable {
             }
         }
         sb.append(String.format(Locale.ROOT, " -Dkylin.hdfs.working.dir=%s ", 
hdfsWorkingDir));
-
-        if (sparkConfigOverride.containsKey(sparkNodeExtraJavaOptionsKey)) {
-            sb.append(" 
").append(sparkConfigOverride.get(sparkNodeExtraJavaOptionsKey));
-        }
-        logger.debug("Final property is set to <<{}>> for {} .", 
sb.toString(), sparkNodeExtraJavaOptionsKey);
-        // Here replace original Java options for Spark node
-        sparkConfigOverride.put(sparkNodeExtraJavaOptionsKey, sb.toString());
+        sb.append(String.format(Locale.ROOT, " 
-Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath));
+        sb.append(String.format(Locale.ROOT, " 
-Dspark.driver.rest.server.ip=%s ", serverIp));
+        sb.append(String.format(Locale.ROOT, " 
-Dspark.driver.rest.server.port=%s ", serverPort));
+        sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s 
", getId()));
+        sb.append(String.format(Locale.ROOT, " -Dspark.driver.local.logDir=%s 
", config.getKylinLogDir() + "/spark"));
+        sparkConfigOverride.put(sparkDriverExtraJavaOptionsKey, sb.toString());
     }
 
     protected String generateSparkCmd(KylinConfig config, String hadoopConf, 
String jars, String kylinJobJar,
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java
index 32ca143..c2ef2c9 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java
@@ -29,7 +29,6 @@ import java.util.Set;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.persistence.AutoDeleteDirectory;
 import org.apache.kylin.common.persistence.RawResource;
@@ -64,7 +63,7 @@ public class MetaDumpUtil {
         return dumpList;
     }
 
-    public static void dumpAndUploadKylinPropsAndMetadata(Set<String> 
dumpList, KylinConfigExt kylinConfig,
+    public static void dumpAndUploadKylinPropsAndMetadata(Set<String> 
dumpList, KylinConfig kylinConfig,
             String metadataUrl) throws IOException {
 
         try (AutoDeleteDirectory tmpDir = new 
AutoDeleteDirectory("kylin_job_meta", "");

Reply via email to