This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 9a4df78f291cd7e34a30cbbdd9ec43124e6cd46f Author: yaqian.zhang <598593...@qq.com> AuthorDate: Tue Nov 24 14:12:06 2020 +0800 KYLIN-4813 Some adjustments for executor-side log collection --- .../org/apache/kylin/common/KylinConfigBase.java | 18 ++++--- .../org/apache/kylin/common/KylinConfigExt.java | 18 +++++-- .../src/main/resources/kylin-defaults.properties | 2 +- .../common/logging/SparkExecutorHdfsAppender.java | 2 +- .../engine/spark/application/SparkApplication.java | 9 +--- .../kylin/engine/spark/job/NSparkExecutable.java | 61 ++++++++-------------- .../kylin/engine/spark/utils/MetaDumpUtil.java | 3 +- 7 files changed, 53 insertions(+), 60 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 01dc374..964eb64 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -181,8 +181,8 @@ public abstract class KylinConfigBase implements Serializable { protected String getOptional(String prop, String dft) { final String property = System.getProperty(prop); - return property != null ? StrSubstitutor.replace(property, System.getenv()) - : StrSubstitutor.replace(properties.getProperty(prop, dft), System.getenv()); + return property != null ? getSubstitutor().replace(property, System.getenv()) + : getSubstitutor().replace(properties.getProperty(prop, dft), System.getenv()); } protected Properties getAllProperties() { @@ -194,8 +194,7 @@ public abstract class KylinConfigBase implements Serializable { * @return properties which contained in propertyKeys */ protected Properties getProperties(Collection<String> propertyKeys) { - Map<String, String> envMap = System.getenv(); - StrSubstitutor sub = new StrSubstitutor(envMap); + final StrSubstitutor sub = getSubstitutor(); Properties filteredProperties = new Properties(); for (Entry<Object, Object> entry : this.properties.entrySet()) { @@ -206,9 +205,17 @@ public abstract class KylinConfigBase implements Serializable { return filteredProperties; } + protected StrSubstitutor getSubstitutor() { + // env > properties + final Map<String, Object> all = Maps.newHashMap(); + all.putAll((Map) properties); + all.putAll(System.getenv()); + + return new StrSubstitutor(all); + } + protected Properties getRawAllProperties() { return properties; - } protected final Map<String, String> getPropertiesByPrefix(String prefix) { @@ -259,7 +266,6 @@ public abstract class KylinConfigBase implements Serializable { final protected void reloadKylinConfig(Properties properties) { this.properties = BCC.check(properties); setProperty("kylin.metadata.url.identifier", getMetadataUrlPrefix()); - setProperty("kylin.log.spark-driver-properties-file", getLogSparkDriverPropertiesFile()); setProperty("kylin.log.spark-executor-properties-file", getLogSparkExecutorPropertiesFile()); } diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java index ddec154..fda4100 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java @@ -18,11 +18,12 @@ package org.apache.kylin.common; +import org.apache.commons.lang.text.StrSubstitutor; +import org.apache.kylin.shaded.com.google.common.collect.Maps; + import java.util.Map; import java.util.Properties; -import org.apache.commons.lang3.text.StrSubstitutor; - /** * Extends a KylinConfig with additional overrides. */ @@ -59,7 +60,7 @@ public class KylinConfigExt extends KylinConfig { public String getOptional(String prop, String dft) { String value = overrides.get(prop); if (value != null) - return StrSubstitutor.replace(value, System.getenv()); + return getSubstitutor().replace(value, System.getenv()); else return super.getOptional(prop, dft); } @@ -67,11 +68,20 @@ public class KylinConfigExt extends KylinConfig { @Override protected Properties getAllProperties() { Properties result = new Properties(); - result.putAll(super.getRawAllProperties()); + result.putAll(super.getAllProperties()); result.putAll(overrides); return result; } + @Override + protected StrSubstitutor getSubstitutor() { + final Map<String, Object> all = Maps.newHashMap(); + all.putAll((Map) properties); + all.putAll(System.getenv()); + all.putAll(overrides); + return new StrSubstitutor(all); + } + public Map<String, String> getExtendedOverrides() { return overrides; } diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties index 858278f..c5cd317 100644 --- a/core-common/src/main/resources/kylin-defaults.properties +++ b/core-common/src/main/resources/kylin-defaults.properties @@ -259,7 +259,7 @@ kylin.engine.spark-conf.spark.eventLog.enabled=true kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false -kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.spark.category=job +kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${hdfs.working.dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=job -Dkylin.spark.project=${job.project} -Dkylin.spark.identifier=${job.id} -Dkylin.spark.jobName=${job.stepId} -Duser.timezone=${user.timezone} #kylin.engine.spark-conf.spark.sql.shuffle.partitions=1 # manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java index d8b250f..b2e4146 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java +++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java @@ -229,7 +229,7 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender { @VisibleForTesting String getRootPathName() { if ("job".equals(getCategory())) { - return parseHdfsWordingDir() + "/" + getProject() + "/spark_logs"; + return parseHdfsWordingDir() + "/" + getProject() + "/spark_logs/executor/"; } else if ("sparder".equals(getCategory())) { return parseHdfsWordingDir() + "/_sparder_logs"; } else { diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java index 4ec461a..84737cb 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java @@ -135,13 +135,8 @@ public abstract class SparkApplication { } if (!config.getSparkConfigOverride().isEmpty()) { for (Map.Entry<String, String> entry : config.getSparkConfigOverride().entrySet()) { - if (entry.getKey().contains("spark.executor.extraJavaOptions")) { - // Just let NSparkExecutable#replaceSparkNodeJavaOpsConfIfNeeded(in JobServer) to determine executor's JVM level configuration - logger.info("Do not override {}={}.", entry.getKey(), entry.getValue()); - } else { - logger.info("Override user-defined spark conf, set {}={}.", entry.getKey(), entry.getValue()); - sparkConf.set(entry.getKey(), entry.getValue()); - } + logger.info("Override user-defined spark conf, set {}={}.", entry.getKey(), entry.getValue()); + sparkConf.set(entry.getKey(), entry.getValue()); } } } else if (!isJobOnCluster(sparkConf)) { diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java index 987c215..7cd178c 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java @@ -163,7 +163,7 @@ public class NSparkExecutable extends AbstractExecutable { void attachMetadataAndKylinProps(KylinConfig config) throws IOException { // The way of Updating metadata is CopyOnWrite. So it is safe to use Reference in the value. Set<String> dumpList = getMetadataDumpList(config); - MetaDumpUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, KylinConfigExt.createInstance(config, new HashMap<>()), getDistMetaUrl()); + MetaDumpUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, config, getDistMetaUrl()); } String dumpArgs() throws ExecuteException { @@ -211,6 +211,7 @@ public class NSparkExecutable extends AbstractExecutable { jobOverrides.put("job.stepId", getId()); } jobOverrides.put("user.timezone", KylinConfig.getInstanceFromEnv().getTimeZone()); + jobOverrides.put("hdfs.working.dir", KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()); jobOverrides.put("spark.driver.log4j.appender.hdfs.File", Objects.isNull(this.getLogPath()) ? "null" : this.getLogPath()); @@ -302,8 +303,7 @@ public class NSparkExecutable extends AbstractExecutable { sparkConfigOverride.put("spark.hadoop.hive.metastore.sasl.enabled", "true"); } - replaceSparkNodeJavaOpsConfIfNeeded(true, config, sparkConfigOverride); - replaceSparkNodeJavaOpsConfIfNeeded(false, config, sparkConfigOverride); + replaceSparkNodeJavaOpsConfIfNeeded(config, sparkConfigOverride); return sparkConfigOverride; } @@ -313,11 +313,13 @@ public class NSparkExecutable extends AbstractExecutable { * 1. conf/spark-driver-log4j.properties and conf/spark-executor-log4j.properties * 2. AbstractHdfsLogAppender */ - private void replaceSparkNodeJavaOpsConfIfNeeded(boolean isDriver, KylinConfig config, + private void replaceSparkNodeJavaOpsConfIfNeeded(KylinConfig config, Map<String, String> sparkConfigOverride) { - String sparkNodeExtraJavaOptionsKey = isDriver ? "spark.driver.extraJavaOptions" : "spark.executor.extraJavaOptions"; + String sparkDriverExtraJavaOptionsKey = "spark.driver.extraJavaOptions"; StringBuilder sb = new StringBuilder(); - + if (sparkConfigOverride.containsKey(sparkDriverExtraJavaOptionsKey)) { + sb.append(sparkConfigOverride.get(sparkDriverExtraJavaOptionsKey)); + } String serverIp = "127.0.0.1"; try { serverIp = InetAddress.getLocalHost().getHostAddress(); @@ -326,35 +328,17 @@ public class NSparkExecutable extends AbstractExecutable { } String serverPort = config.getServerPort(); String hdfsWorkingDir = config.getHdfsWorkingDirectory(); - String log4jConfiguration; - - if (isDriver) { - String sparkDriverHdfsLogPath; - if (config instanceof KylinConfigExt) { - Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides(); - if (Objects.nonNull(extendedOverrides)) { - sparkDriverHdfsLogPath = extendedOverrides.get("spark.driver.log4j.appender.hdfs.File"); - sb.append(String.format(Locale.ROOT, " -Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath)); - } - } - log4jConfiguration = "file:" + config.getLogSparkDriverPropertiesFile(); - sb.append(String.format(Locale.ROOT, " -Dlog4j.configuration=%s ", log4jConfiguration)); - sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.ip=%s ", serverIp)); - sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.port=%s ", serverPort)); - sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s ", getId())); - sb.append(String.format(Locale.ROOT, " -Dspark.driver.local.logDir=%s ", config.getKylinLogDir() + "/spark")); - } else { - if (config instanceof KylinConfigExt) { - Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides(); - if (Objects.nonNull(extendedOverrides)) { - sb.append(String.format(Locale.ROOT, " -Dkylin.spark.project=%s ", extendedOverrides.get("job.project"))); - sb.append(String.format(Locale.ROOT, " -Dkylin.spark.jobName=%s ", extendedOverrides.get("job.stepId"))); - sb.append(String.format(Locale.ROOT, " -Dkylin.spark.identifier=%s ", extendedOverrides.get("job.id"))); - } + + String sparkDriverHdfsLogPath = null; + if (config instanceof KylinConfigExt) { + Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides(); + if (Objects.nonNull(extendedOverrides)) { + sparkDriverHdfsLogPath = extendedOverrides.get("spark.driver.log4j.appender.hdfs.File"); } - sb.append(String.format(Locale.ROOT, " -Dkylin.metadata.identifier=%s ", config.getMetadataUrlPrefix())); } + String log4jConfiguration = "file:" + config.getLogSparkDriverPropertiesFile(); + sb.append(String.format(Locale.ROOT, " -Dlog4j.configuration=%s ", log4jConfiguration)); sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.enabled=%s ", config.isKerberosEnabled())); if (config.isKerberosEnabled()) { sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.principal=%s ", config.getKerberosPrincipal())); @@ -365,13 +349,12 @@ public class NSparkExecutable extends AbstractExecutable { } } sb.append(String.format(Locale.ROOT, " -Dkylin.hdfs.working.dir=%s ", hdfsWorkingDir)); - - if (sparkConfigOverride.containsKey(sparkNodeExtraJavaOptionsKey)) { - sb.append(" ").append(sparkConfigOverride.get(sparkNodeExtraJavaOptionsKey)); - } - logger.debug("Final property is set to <<{}>> for {} .", sb.toString(), sparkNodeExtraJavaOptionsKey); - // Here replace original Java options for Spark node - sparkConfigOverride.put(sparkNodeExtraJavaOptionsKey, sb.toString()); + sb.append(String.format(Locale.ROOT, " -Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath)); + sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.ip=%s ", serverIp)); + sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.port=%s ", serverPort)); + sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s ", getId())); + sb.append(String.format(Locale.ROOT, " -Dspark.driver.local.logDir=%s ", config.getKylinLogDir() + "/spark")); + sparkConfigOverride.put(sparkDriverExtraJavaOptionsKey, sb.toString()); } protected String generateSparkCmd(KylinConfig config, String hadoopConf, String jars, String kylinJobJar, diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java index 32ca143..c2ef2c9 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java @@ -29,7 +29,6 @@ import java.util.Set; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.KylinConfigExt; import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.persistence.AutoDeleteDirectory; import org.apache.kylin.common.persistence.RawResource; @@ -64,7 +63,7 @@ public class MetaDumpUtil { return dumpList; } - public static void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt kylinConfig, + public static void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfig kylinConfig, String metadataUrl) throws IOException { try (AutoDeleteDirectory tmpDir = new AutoDeleteDirectory("kylin_job_meta", "");