This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 86aae4dc1cd6f364c36e9f47db84b20956b2b911 Author: XiaoxiangYu <x...@apache.org> AuthorDate: Sun Nov 22 22:04:30 2020 +0800 KYLIN-4813 Fix several bugs for executor-side log collection - add --files to upload user-defined log4j properties - fix ClassName error - fix executor.extraJavaOptions be overwrote in Driver side - fix security issue (HTTP response splitting) - code style etc. --- build/conf/spark-executor-log4j.properties | 4 +- .../org/apache/kylin/common/KylinConfigBase.java | 6 +- .../src/main/resources/kylin-defaults.properties | 6 +- .../kylin/job/execution/ExecutableManager.java | 26 +++---- .../common/logging/SparkExecutorHdfsAppender.java | 24 +++++- .../engine/spark/application/SparkApplication.java | 9 ++- .../kylin/engine/spark/job/NSparkExecutable.java | 87 +++++++++++++--------- .../scala/org/apache/spark/sql/KylinSession.scala | 4 +- .../kylin/rest/controller/JobController.java | 25 +++---- .../org/apache/kylin/rest/service/JobService.java | 4 +- 10 files changed, 110 insertions(+), 85 deletions(-) diff --git a/build/conf/spark-executor-log4j.properties b/build/conf/spark-executor-log4j.properties index 7cc5b04..fb5b7e3 100644 --- a/build/conf/spark-executor-log4j.properties +++ b/build/conf/spark-executor-log4j.properties @@ -27,7 +27,7 @@ log4j.appender.stderr.target=System.err log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n -log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkExecutorHdfsLogAppender +log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkExecutorHdfsAppender log4j.appender.hdfs.hdfsWorkingDir=${kylin.hdfs.working.dir} log4j.appender.hdfs.metadataIdentifier=${kylin.metadata.identifier} @@ -41,6 +41,6 @@ log4j.appender.hdfs.logQueueCapacity=5000 #flushPeriod count as millis log4j.appender.hdfs.flushInterval=5000 -log4j.appender.hdfs.layout=org.apache.kylin.engine.spark.common.logging.SensitivePatternLayout +log4j.appender.hdfs.layout=org.apache.kylin.common.logging.SensitivePatternLayout #Don't add line number (%L) as it's too costly! log4j.appender.hdfs.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n \ No newline at end of file diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index e2727fa..01dc374 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2863,8 +2863,10 @@ public abstract class KylinConfigBase implements Serializable { return getOptional("kylin.query.intersect.separator", "|"); } - @ConfigTag(ConfigTag.Tag.NOT_CLEAR) - public String sparderFiles() { + /** + * Used to upload user-defined log4j configuration + */ + public String sparkUploadFiles() { try { File storageFile = FileUtils.findFile(KylinConfigBase.getKylinHome() + "/conf", "spark-executor-log4j.properties"); diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties index 8fba3bc..858278f 100644 --- a/core-common/src/main/resources/kylin-defaults.properties +++ b/core-common/src/main/resources/kylin-defaults.properties @@ -259,7 +259,7 @@ kylin.engine.spark-conf.spark.eventLog.enabled=true kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false -kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=job -Dkylin.spark.project=${job.project} -Dkylin.spark.identifier=${job.id} -Dkylin.spark.jobName=${job.stepId} -Duser.timezone=${user.timezone} +kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.spark.category=job #kylin.engine.spark-conf.spark.sql.shuffle.partitions=1 # manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime @@ -269,7 +269,6 @@ kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -D # uncomment for HDP #kylin.engine.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current #kylin.engine.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current -#kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current ### SPARK QUERY ENGINE CONFIGS (a.k.a. Sparder Context) ### # Enlarge cores and memory to improve query performance in production env, please check https://cwiki.apache.org/confluence/display/KYLIN/User+Manual+4.X @@ -287,11 +286,10 @@ kylin.query.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializ #kylin.query.spark-conf.spark.sql.shuffle.partitions=40 #kylin.query.spark-conf.spark.yarn.jars=hdfs://localhost:9000/spark2_jars/* -kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=sparder -Dkylin.spark.project=${job.project} -XX:MaxDirectMemorySize=896M +kylin.query.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=sparder -Dkylin.spark.project=${job.project} # uncomment for HDP #kylin.query.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current #kylin.query.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current -#kylin.query.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current ### QUERY PUSH DOWN ### diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 590276d..42a9c99 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -6,15 +6,15 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.job.execution; @@ -255,10 +255,6 @@ public class ExecutableManager { return result; } - public Output getOutputFromHDFSByJobId(String jobId) { - return getOutputFromHDFSByJobId(jobId, jobId); - } - public Output getOutputFromHDFSByJobId(String jobId, String stepId) { return getOutputFromHDFSByJobId(jobId, stepId, 100); } @@ -593,7 +589,7 @@ public class ExecutableManager { logger.warn("The job " + jobId + " has been discarded."); } throw new IllegalStateException( - "The job " + job.getId() + " has already been finished and cannot be discarded."); + "The job " + job.getId() + " has already been finished and cannot be discarded."); } if (job instanceof DefaultChainedExecutable) { List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks(); @@ -617,7 +613,7 @@ public class ExecutableManager { for (AbstractExecutable task : tasks) { if (task.getId().compareTo(stepId) >= 0) { logger.debug("rollback task : " + task); - updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "", task.getLogPath()); + updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.READY, Maps.<String, String>newHashMap(), "", task.getLogPath()); } } } @@ -634,11 +630,11 @@ public class ExecutableManager { } if (!(job.getStatus() == ExecutableState.READY - || job.getStatus() == ExecutableState.RUNNING)) { + || job.getStatus() == ExecutableState.RUNNING)) { logger.warn("The status of job " + jobId + " is " + job.getStatus().toString() - + ". It's final state and cannot be transfer to be stopped!!!"); + + ". It's final state and cannot be transfer to be stopped!!!"); throw new IllegalStateException( - "The job " + job.getId() + " has already been finished and cannot be stopped."); + "The job " + job.getId() + " has already been finished and cannot be stopped."); } if (job instanceof DefaultChainedExecutable) { List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks(); @@ -662,7 +658,6 @@ public class ExecutableManager { } public void updateJobOutput(String project, String jobId, ExecutableState newStatus, Map<String, String> info, String output, String logPath) { - // when if (Thread.currentThread().isInterrupted()) { throw new RuntimeException("Current thread is interruptted, aborting"); } @@ -701,7 +696,6 @@ public class ExecutableManager { } if (project != null) { - //write output to HDFS updateJobOutputToHDFS(project, jobId, output, logPath); } } @@ -715,7 +709,7 @@ public class ExecutableManager { jobOutput.setLogPath(logPath); } String outputHDFSPath = KylinConfig.getInstanceFromEnv().getJobOutputStorePath(project, jobId); - + logger.debug("Update JobOutput To HDFS for {} to {} [{}]", jobId, outputHDFSPath, jobOutput.getContent() != null ? jobOutput.getContent().length() : -1); updateJobOutputToHDFS(outputHDFSPath, jobOutput); } @@ -786,7 +780,7 @@ public class ExecutableManager { if (executableDao.getJobOutput(task.getId()).getStatus().equals("SUCCEED")) { continue; } else if (executableDao.getJobOutput(task.getId()).getStatus().equals("RUNNING")) { - updateJobOutput(null, task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "", null); + updateJobOutput(null, task.getId(), ExecutableState.READY, Maps.<String, String>newHashMap(), "", null); } break; } diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java index 4a76022..381fd2d 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java +++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.kylin.engine.spark.common.logging; import com.google.common.annotations.VisibleForTesting; @@ -111,7 +129,7 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender { @Override String getAppenderName() { - return "SparkExecutorHdfsLogAppender"; + return "SparkExecutorHdfsAppender"; } @Override @@ -164,6 +182,7 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender { String rollingDir = dateFormat.format(new Date(event.getTimeStamp())); outPutPath = getOutPutDir(rollingDir); } + LogLog.warn("Update to " + outPutPath); } private String getOutPutDir(String rollingDir) { @@ -236,8 +255,7 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender { } private String parseHdfsWordingDir() { - return StringUtils.appendIfMissing(getHdfsWorkingDir(), "/") - + StringUtils.replace(getMetadataIdentifier(), "/", "-"); + return StringUtils.appendIfMissing(getHdfsWorkingDir(), "/"); } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java index 84737cb..4ec461a 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java @@ -135,8 +135,13 @@ public abstract class SparkApplication { } if (!config.getSparkConfigOverride().isEmpty()) { for (Map.Entry<String, String> entry : config.getSparkConfigOverride().entrySet()) { - logger.info("Override user-defined spark conf, set {}={}.", entry.getKey(), entry.getValue()); - sparkConf.set(entry.getKey(), entry.getValue()); + if (entry.getKey().contains("spark.executor.extraJavaOptions")) { + // Just let NSparkExecutable#replaceSparkNodeJavaOpsConfIfNeeded(in JobServer) to determine executor's JVM level configuration + logger.info("Do not override {}={}.", entry.getKey(), entry.getValue()); + } else { + logger.info("Override user-defined spark conf, set {}={}.", entry.getKey(), entry.getValue()); + sparkConf.set(entry.getKey(), entry.getValue()); + } } } } else if (!isJobOnCluster(sparkConf)) { diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java index 3cc1b60..987c215 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java @@ -122,8 +122,8 @@ public class NSparkExecutable extends AbstractExecutable { String hadoopConf = System.getProperty("kylin.hadoop.conf.dir"); logger.info("write hadoop conf is {} ", config.getBuildConf()); if (!config.getBuildConf().isEmpty()) { - logger.info("write hadoop conf is {} ", config.getBuildConf()); - hadoopConf = config.getBuildConf(); + logger.info("write hadoop conf is {} ", config.getBuildConf()); + hadoopConf = config.getBuildConf(); } if (StringUtils.isEmpty(hadoopConf) && !config.isUTEnv() && !config.isZKLocal()) { throw new RuntimeException( @@ -203,7 +203,7 @@ public class NSparkExecutable extends AbstractExecutable { String project = getParam(MetadataConstants.P_PROJECT_NAME); Preconditions.checkState(StringUtils.isNotBlank(project), "job " + getId() + " project info is empty"); - HashMap<String, String> jobOverrides = new HashMap(); + HashMap<String, String> jobOverrides = new HashMap<>(); String parentId = getParentId(); jobOverrides.put("job.id", StringUtils.defaultIfBlank(parentId, getId())); jobOverrides.put("job.project", project); @@ -250,7 +250,7 @@ public class NSparkExecutable extends AbstractExecutable { } private ExecuteResult runSparkSubmit(KylinConfig config, String hadoopConf, String jars, - String kylinJobJar, String appArgs, String jobId) { + String kylinJobJar, String appArgs, String jobId) { PatternedLogger patternedLogger; if (config.isJobLogPrintEnabled()) { patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() { @@ -302,16 +302,21 @@ public class NSparkExecutable extends AbstractExecutable { sparkConfigOverride.put("spark.hadoop.hive.metastore.sasl.enabled", "true"); } - replaceSparkDriverJavaOpsConfIfNeeded(config, sparkConfigOverride); + replaceSparkNodeJavaOpsConfIfNeeded(true, config, sparkConfigOverride); + replaceSparkNodeJavaOpsConfIfNeeded(false, config, sparkConfigOverride); return sparkConfigOverride; } - private void replaceSparkDriverJavaOpsConfIfNeeded(KylinConfig config, Map<String, String> sparkConfigOverride) { - String sparkDriverExtraJavaOptionsKey = "spark.driver.extraJavaOptions"; + /** + * Add property in spark.xxx.extraJavaOptions for AbstractHdfsLogAppender + * Please check following for detail. + * 1. conf/spark-driver-log4j.properties and conf/spark-executor-log4j.properties + * 2. AbstractHdfsLogAppender + */ + private void replaceSparkNodeJavaOpsConfIfNeeded(boolean isDriver, KylinConfig config, + Map<String, String> sparkConfigOverride) { + String sparkNodeExtraJavaOptionsKey = isDriver ? "spark.driver.extraJavaOptions" : "spark.executor.extraJavaOptions"; StringBuilder sb = new StringBuilder(); - if (sparkConfigOverride.containsKey(sparkDriverExtraJavaOptionsKey)) { - sb.append(sparkConfigOverride.get(sparkDriverExtraJavaOptionsKey)); - } String serverIp = "127.0.0.1"; try { @@ -320,31 +325,37 @@ public class NSparkExecutable extends AbstractExecutable { logger.warn("use the InetAddress get local ip failed!", e); } String serverPort = config.getServerPort(); - String hdfsWorkingDir = config.getHdfsWorkingDirectory(); - - String sparkDriverHdfsLogPath = null; - if (config instanceof KylinConfigExt) { - Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides(); - if (Objects.nonNull(extendedOverrides)) { - sparkDriverHdfsLogPath = extendedOverrides.get("spark.driver.log4j.appender.hdfs.File"); + String log4jConfiguration; + + if (isDriver) { + String sparkDriverHdfsLogPath; + if (config instanceof KylinConfigExt) { + Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides(); + if (Objects.nonNull(extendedOverrides)) { + sparkDriverHdfsLogPath = extendedOverrides.get("spark.driver.log4j.appender.hdfs.File"); + sb.append(String.format(Locale.ROOT, " -Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath)); + } } - } - - /* - if (config.isCloud()) { - String logLocalWorkingDirectory = config.getLogLocalWorkingDirectory(); - if (StringUtils.isNotBlank(logLocalWorkingDirectory)) { - hdfsWorkingDir = logLocalWorkingDirectory; - sparkDriverHdfsLogPath = logLocalWorkingDirectory + sparkDriverHdfsLogPath; + log4jConfiguration = "file:" + config.getLogSparkDriverPropertiesFile(); + sb.append(String.format(Locale.ROOT, " -Dlog4j.configuration=%s ", log4jConfiguration)); + sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.ip=%s ", serverIp)); + sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.port=%s ", serverPort)); + sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s ", getId())); + sb.append(String.format(Locale.ROOT, " -Dspark.driver.local.logDir=%s ", config.getKylinLogDir() + "/spark")); + } else { + if (config instanceof KylinConfigExt) { + Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides(); + if (Objects.nonNull(extendedOverrides)) { + sb.append(String.format(Locale.ROOT, " -Dkylin.spark.project=%s ", extendedOverrides.get("job.project"))); + sb.append(String.format(Locale.ROOT, " -Dkylin.spark.jobName=%s ", extendedOverrides.get("job.stepId"))); + sb.append(String.format(Locale.ROOT, " -Dkylin.spark.identifier=%s ", extendedOverrides.get("job.id"))); + } } + sb.append(String.format(Locale.ROOT, " -Dkylin.metadata.identifier=%s ", config.getMetadataUrlPrefix())); } - */ - String log4jConfiguration = "file:" + config.getLogSparkDriverPropertiesFile(); - sb.append(String.format(Locale.ROOT, " -Dlog4j.configuration=%s ", log4jConfiguration)); sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.enabled=%s ", config.isKerberosEnabled())); - if (config.isKerberosEnabled()) { sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.principal=%s ", config.getKerberosPrincipal())); sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.keytab=%s", config.getKerberosKeytabPath())); @@ -354,16 +365,17 @@ public class NSparkExecutable extends AbstractExecutable { } } sb.append(String.format(Locale.ROOT, " -Dkylin.hdfs.working.dir=%s ", hdfsWorkingDir)); - sb.append(String.format(Locale.ROOT, " -Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath)); - sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.ip=%s ", serverIp)); - sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.port=%s ", serverPort)); - sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s ", getId())); - sb.append(String.format(Locale.ROOT, " -Dspark.driver.local.logDir=%s ", config.getKylinLogDir() + "/spark")); - sparkConfigOverride.put(sparkDriverExtraJavaOptionsKey, sb.toString()); + + if (sparkConfigOverride.containsKey(sparkNodeExtraJavaOptionsKey)) { + sb.append(" ").append(sparkConfigOverride.get(sparkNodeExtraJavaOptionsKey)); + } + logger.debug("Final property is set to <<{}>> for {} .", sb.toString(), sparkNodeExtraJavaOptionsKey); + // Here replace original Java options for Spark node + sparkConfigOverride.put(sparkNodeExtraJavaOptionsKey, sb.toString()); } protected String generateSparkCmd(KylinConfig config, String hadoopConf, String jars, String kylinJobJar, - String appArgs) { + String appArgs) { StringBuilder sb = new StringBuilder(); sb.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.engine.spark.application.SparkEntry "); @@ -377,11 +389,12 @@ public class NSparkExecutable extends AbstractExecutable { if (sparkConfs.containsKey("spark.sql.hive.metastore.jars")) { jars = jars + "," + sparkConfs.get("spark.sql.hive.metastore.jars"); } - + sb.append("--files ").append(config.sparkUploadFiles()).append(" "); sb.append("--name job_step_%s "); sb.append("--jars %s %s %s"); String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, KylinConfig.getSparkHome(), getId(), jars, kylinJobJar, appArgs); + // SparkConf still have a change to be changed in CubeBuildJob.java (Spark Driver) logger.info("spark submit cmd: {}", cmd); return cmd; } diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/KylinSession.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/KylinSession.scala index fb09d38..7ae0937 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/KylinSession.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/KylinSession.scala @@ -174,10 +174,10 @@ object KylinSession extends Logging { if (sparkConf.get("spark.master").startsWith("yarn")) { sparkConf.set("spark.yarn.dist.jars", KylinConfig.getInstanceFromEnv.getKylinParquetJobJarPath) - sparkConf.set("spark.yarn.dist.files", conf.sparderFiles()) + sparkConf.set("spark.yarn.dist.files", conf.sparkUploadFiles()) } else { sparkConf.set("spark.jars", conf.sparderJars) - sparkConf.set("spark.files", conf.sparderFiles()) + sparkConf.set("spark.files", conf.sparkUploadFiles()) } val fileName = KylinConfig.getInstanceFromEnv.getKylinParquetJobJarPath diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java index c28362c..cd63ac7 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java @@ -20,12 +20,14 @@ package org.apache.kylin.rest.controller; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Locale; +import org.apache.kylin.common.util.CliCommandExecutor; import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.constant.JobStatusEnum; import org.apache.kylin.job.constant.JobTimeFilterEnum; @@ -164,29 +166,20 @@ public class JobController extends BasicController { /** * Get a job step output - * - * @return - * @throws IOException */ - @RequestMapping(value = "/{jobId}/steps/{stepId}/output", method = { RequestMethod.GET }, produces = { "application/json" }) @ResponseBody public Map<String, String> getStepOutput(@PathVariable String jobId, @PathVariable String stepId) { - Map<String, String> result = new HashMap<String, String>(); + Map<String, String> result = new HashMap<>(); result.put("jobId", jobId); result.put("stepId", String.valueOf(stepId)); - result.put("cmd_output", jobService.getJobOutput(jobId, stepId)); + result.put("cmd_output", jobService.getJobStepOutput(jobId, stepId)); return result; } /** - * Download a job step output from hdfs - * @param jobId - * @param stepId - * @param project - * @param response - * @return + * Download a step output(Spark driver log) from hdfs */ @RequestMapping(value = "/{job_id:.+}/steps/{step_id:.+}/log", method = { RequestMethod.GET }, produces = { "application/json" }) @ResponseBody @@ -196,10 +189,12 @@ public class JobController extends BasicController { checkRequiredArg("job_id", jobId); checkRequiredArg("step_id", stepId); checkRequiredArg("project", project); - String downloadFilename = String.format(Locale.ROOT, "%s_%s.log", project, stepId); + String validatedPrj = CliCommandExecutor.checkParameter(project); + String validatedStepId = CliCommandExecutor.checkParameter(stepId); + String downloadFilename = String.format(Locale.ROOT, "%s_%s.log", validatedPrj, validatedStepId); - String jobOutput = jobService.getAllJobOutput(jobId, stepId); - setDownloadResponse(new ByteArrayInputStream(jobOutput.getBytes("UTF-8")), downloadFilename, MediaType.APPLICATION_OCTET_STREAM_VALUE, response); + String jobOutput = jobService.getAllJobStepOutput(jobId, stepId); + setDownloadResponse(new ByteArrayInputStream(jobOutput.getBytes(StandardCharsets.UTF_8)), downloadFilename, MediaType.APPLICATION_OCTET_STREAM_VALUE, response); return new EnvelopeResponse<>(ResponseCode.CODE_SUCCESS, "", ""); } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 2c0c20c..90ee782 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -480,12 +480,12 @@ public class JobService extends BasicService implements InitializingBean { return getExecutableManager().getOutput(id); } - public String getJobOutput(String jobId, String stepId) { + public String getJobStepOutput(String jobId, String stepId) { ExecutableManager executableManager = getExecutableManager(); return executableManager.getOutputFromHDFSByJobId(jobId, stepId).getVerboseMsg(); } - public String getAllJobOutput(String jobId, String stepId) { + public String getAllJobStepOutput(String jobId, String stepId) { ExecutableManager executableManager = getExecutableManager(); return executableManager.getOutputFromHDFSByJobId(jobId, stepId, Integer.MAX_VALUE).getVerboseMsg(); }