This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 2dca9ef9a52cf1adb12684e0ba576b91e96b3792 Author: yaqian.zhang <[email protected]> AuthorDate: Thu Nov 12 17:50:47 2020 +0800 KYLIN-4813 Refactor spark build log --- .../engine/mr/common/MapReduceExecutable.java | 7 +- build/bin/kylin-port-replace-util.sh | 1 + build/conf/kylin-parquet-log4j.properties | 33 -- build/conf/kylin-spark-log4j.properties | 43 --- build/conf/spark-driver-log4j.properties | 45 +++ .../org/apache/kylin/common/KylinConfigBase.java | 26 +- .../common/logging/SensitivePatternLayout.java | 62 ++++ .../src/main/resources/kylin-defaults.properties | 3 + .../apache/kylin/job/dao/ExecutableOutputPO.java | 11 + .../kylin/job/execution/AbstractExecutable.java | 19 +- .../job/execution/DefaultChainedExecutable.java | 18 +- .../kylin/job/execution/ExecutableManager.java | 263 +++++++++++++- .../apache/kylin/job/ExecutableManagerTest.java | 20 +- .../job/impl/threadpool/DefaultSchedulerTest.java | 4 +- .../common/logging/AbstractHdfsLogAppender.java | 377 +++++++++++++++++++++ .../common/logging/SparkDriverHdfsLogAppender.java | 112 ++++++ .../kylin/engine/spark/job/NSparkExecutable.java | 81 +++-- .../kylin/rest/controller/BasicController.java | 16 + .../kylin/rest/controller/JobController.java | 34 +- .../org/apache/kylin/rest/service/JobService.java | 11 + 20 files changed, 1044 insertions(+), 142 deletions(-) diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java index a33f171..4978fa0 100755 --- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java +++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java @@ -57,6 +57,7 @@ import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.job.execution.Output; +import org.apache.kylin.metadata.MetadataConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,7 +86,7 @@ public class MapReduceExecutable extends AbstractExecutable { if (output.getExtra().containsKey(START_TIME)) { final String mrJobId = output.getExtra().get(ExecutableConstants.MR_JOB_ID); if (mrJobId == null) { - getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null); + getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.RUNNING, null, null, getLogPath()); return; } try { @@ -96,7 +97,7 @@ public class MapReduceExecutable extends AbstractExecutable { //remove previous mr job info super.onExecuteStart(executableContext); } else { - getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null); + getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.RUNNING, null, null, getLogPath()); } } catch (IOException | ParseException e) { logger.warn("error get hadoop status"); @@ -180,7 +181,7 @@ public class MapReduceExecutable extends AbstractExecutable { JobStepStatusEnum newStatus = HadoopJobStatusChecker.checkStatus(job, output); if (status == JobStepStatusEnum.KILLED) { - mgr.updateJobOutput(getId(), ExecutableState.ERROR, hadoopCmdOutput.getInfo(), "killed by admin"); + mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.ERROR, hadoopCmdOutput.getInfo(), "killed by admin", null); if (isDiscarded()) { if (getIsNeedLock()) { releaseLock(lock); diff --git a/build/bin/kylin-port-replace-util.sh b/build/bin/kylin-port-replace-util.sh index ea96791..12a2abd 100755 --- a/build/bin/kylin-port-replace-util.sh +++ b/build/bin/kylin-port-replace-util.sh @@ -96,6 +96,7 @@ then sed -i "s/^kylin\.stream\.node=.*$/$stream_node/g" ${KYLIN_CONFIG_FILE} sed -i "s/#*kylin.server.cluster-servers=\(.*\).*:\(.*\)/kylin.server.cluster-servers=\1:${new_kylin_port}/g" ${KYLIN_CONFIG_FILE} + sed -i "s/#*server.port=.*$/server.port=${new_kylin_port}/g" ${KYLIN_CONFIG_FILE} echo "New kylin port is : ${new_kylin_port}" diff --git a/build/conf/kylin-parquet-log4j.properties b/build/conf/kylin-parquet-log4j.properties deleted file mode 100644 index 36b7dd4..0000000 --- a/build/conf/kylin-parquet-log4j.properties +++ /dev/null @@ -1,33 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -#overall config -log4j.rootLogger=WARN,stdout -log4j.logger.org.apache.kylin=DEBUG -log4j.logger.org.springframework=WARN -log4j.logger.org.springframework.security=WARN -log4j.logger.org.apache.spark=WARN -# For the purpose of getting Tracking URL -log4j.logger.org.apache.spark.deploy.yarn=INFO -log4j.logger.org.apache.spark.ContextCleaner=WARN - -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.Target=System.err -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n \ No newline at end of file diff --git a/build/conf/kylin-spark-log4j.properties b/build/conf/kylin-spark-log4j.properties deleted file mode 100644 index 948fb32..0000000 --- a/build/conf/kylin-spark-log4j.properties +++ /dev/null @@ -1,43 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -log4j.rootCategory=WARN,stderr,stdout -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.target=System.out -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n - -log4j.appender.stderr=org.apache.log4j.ConsoleAppender -log4j.appender.stderr.Target=System.err -log4j.appender.stderr.layout=org.apache.log4j.PatternLayout -log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n - - -# Settings to quiet third party logs that are too verbose -log4j.logger.org.spark-project.jetty=WARN -log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR -log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO -log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO -log4j.logger.org.apache.parquet=ERROR -log4j.logger.parquet=ERROR - -# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support -log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL -log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR -log4j.logger.org.apache.spark.sql=WARN - -log4j.logger.org.apache.kylin=DEBUG \ No newline at end of file diff --git a/build/conf/spark-driver-log4j.properties b/build/conf/spark-driver-log4j.properties new file mode 100644 index 0000000..8b0e82d --- /dev/null +++ b/build/conf/spark-driver-log4j.properties @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +#overall config +log4j.rootLogger=INFO,hdfs +log4j.logger.org.apache.kylin=DEBUG +log4j.logger.org.springframework=WARN +log4j.logger.org.springframework.security=WARN +log4j.logger.org.apache.spark=WARN +log4j.logger.org.apache.spark.ContextCleaner=WARN + +# hdfs file appender +log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkDriverHdfsLogAppender +log4j.appender.hdfs.kerberosEnable=${kylin.kerberos.enabled} +log4j.appender.hdfs.kerberosPrincipal=${kylin.kerberos.principal} +log4j.appender.hdfs.kerberosKeytab=${kylin.kerberos.keytab} +log4j.appender.hdfs.hdfsWorkingDir=${kylin.hdfs.working.dir} +log4j.appender.hdfs.logPath=${spark.driver.log4j.appender.hdfs.File} +log4j.appender.hdfs.logQueueCapacity=5000 +#flushPeriod count as millis +log4j.appender.hdfs.flushInterval=5000 +log4j.appender.hdfs.layout=org.apache.kylin.common.logging.SensitivePatternLayout +#Don't add line number (%L) as it's too costly! +log4j.appender.hdfs.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n + +log4j.appender.logFile=org.apache.log4j.FileAppender +log4j.appender.logFile.Threshold=DEBUG +log4j.appender.logFile.File=${spark.driver.local.logDir}/${spark.driver.param.taskId}.log +log4j.appender.logFile.layout=org.apache.kylin.common.logging.SensitivePatternLayout +log4j.appender.logFile.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n \ No newline at end of file diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 8e53a9b..081d23f 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -258,6 +258,8 @@ public abstract class KylinConfigBase implements Serializable { final protected void reloadKylinConfig(Properties properties) { this.properties = BCC.check(properties); + setProperty("kylin.metadata.url.identifier", getMetadataUrlPrefix()); + setProperty("kylin.log.spark-driver-properties-file", getLogSparkDriverPropertiesFile()); } private Map<Integer, String> convertKeyToInteger(Map<String, String> map) { @@ -465,6 +467,10 @@ public abstract class KylinConfigBase implements Serializable { return getMetadataUrl().getIdentifier(); } + public String getServerPort() { + return getOptional("server.port", "7070"); + } + public Map<String, String> getResourceStoreImpls() { Map<String, String> r = Maps.newLinkedHashMap(); // ref constants in ISourceAware @@ -866,6 +872,14 @@ public abstract class KylinConfigBase implements Serializable { return getOptional("kylin.job.log-dir", "/tmp/kylin/logs"); } + public String getKylinLogDir() { + String kylinHome = getKylinHome(); + if (kylinHome == null) { + kylinHome = System.getProperty("KYLIN_HOME"); + } + return kylinHome + File.separator + "logs"; + } + public boolean getRunAsRemoteCommand() { return Boolean.parseBoolean(getOptional("kylin.job.use-remote-cli")); } @@ -2664,6 +2678,10 @@ public abstract class KylinConfigBase implements Serializable { return getHdfsWorkingDirectory() + project + "/job_tmp/"; } + public String getSparkLogDir(String project) { + return getHdfsWorkingDirectory() + project + "/spark_logs/driver/"; + } + @ConfigTag(ConfigTag.Tag.NOT_CLEAR) public int getPersistFlatTableThreshold() { return Integer.parseInt(getOptional("kylin.engine.persist-flattable-threshold", "1")); @@ -2807,8 +2825,8 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.query.spark-engine.enabled", "true")); } - public String getLogSparkPropertiesFile() { - return getLogPropertyFile("kylin-parquet-log4j.properties"); + public String getLogSparkDriverPropertiesFile() { + return getLogPropertyFile("spark-driver-log4j.properties"); } private String getLogPropertyFile(String filename) { @@ -2875,6 +2893,10 @@ public abstract class KylinConfigBase implements Serializable { .parseBoolean(this.getOptional("kylin.query.pushdown.auto-set-shuffle-partitions-enabled", "true")); } + public String getJobOutputStorePath(String project, String jobId) { + return getSparkLogDir(project) + getNestedPath(jobId) + "execute_output.json"; + } + public int getBaseShufflePartitionSize() { return Integer.parseInt(this.getOptional("kylin.query.pushdown.base-shuffle-partition-size", "48")); } diff --git a/core-common/src/main/java/org/apache/kylin/common/logging/SensitivePatternLayout.java b/core-common/src/main/java/org/apache/kylin/common/logging/SensitivePatternLayout.java new file mode 100644 index 0000000..013d42d --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/logging/SensitivePatternLayout.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.logging; + +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; +import org.apache.log4j.spi.LoggingEvent; + +import java.util.Locale; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class SensitivePatternLayout extends PatternLayout { + private static final String PREFIX_GROUP_NAME = "prefix"; + private static final String SENSITIVE_GROUP_NAME = "sensitive"; + private static final String MASK = "******"; + private static final Pattern SENSITIVE_PATTERN = Pattern.compile( + String.format(Locale.ROOT, "(?<%s>password\\s*[:=])(?<%s>[^,.!]*)", PREFIX_GROUP_NAME, SENSITIVE_GROUP_NAME), + Pattern.CASE_INSENSITIVE); + + @Override + public String format(LoggingEvent event) { + if (event.getMessage() instanceof String) { + String maskedMessage = mask(event.getRenderedMessage()); + + Throwable throwable = event.getThrowableInformation() != null + ? event.getThrowableInformation().getThrowable() + : null; + LoggingEvent maskedEvent = new LoggingEvent(event.fqnOfCategoryClass, + Logger.getLogger(event.getLoggerName()), event.timeStamp, event.getLevel(), maskedMessage, + throwable); + + return super.format(maskedEvent); + } + return super.format(event); + } + + private String mask(String message) { + Matcher matcher = SENSITIVE_PATTERN.matcher(message); + if (matcher.find()) { + return matcher.replaceAll(String.format(Locale.ROOT, "${%s}%s", PREFIX_GROUP_NAME, MASK)); + } + return message; + } +} + diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties index eb8398b..3d8268f 100644 --- a/core-common/src/main/resources/kylin-defaults.properties +++ b/core-common/src/main/resources/kylin-defaults.properties @@ -44,6 +44,9 @@ kylin.env.zookeeper-connect-string=sandbox.hortonworks.com # Kylin server mode, valid value [all, query, job] kylin.server.mode=all +## Kylin server port +server.port=7070 + # List of web servers in use, this enables one web server instance to sync up with other servers. kylin.server.cluster-servers=localhost:7070 diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java index fdc6cc4..27a2831 100644 --- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java +++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java @@ -38,6 +38,9 @@ public class ExecutableOutputPO extends RootPersistentEntity { @JsonProperty("status") private String status = "READY"; + @JsonProperty("log_path") + private String logPath; + @JsonProperty("info") private Map<String, String> info = Maps.newHashMap(); @@ -64,4 +67,12 @@ public class ExecutableOutputPO extends RootPersistentEntity { public void setInfo(Map<String, String> info) { this.info = info; } + + public void setLogPath(String logPath) { + this.logPath = logPath; + } + + public String getLogPath() { + return logPath; + } } diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 202d22e..d815e5b 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -82,6 +82,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { private Map<String, String> params = Maps.newHashMap(); protected Integer priority; private CubeBuildTypeEnum jobType; + private String logPath; protected String project; private String targetSubject; private List<String> targetSegments = Lists.newArrayList();//uuid of related segments @@ -95,6 +96,14 @@ public abstract class AbstractExecutable implements Executable, Idempotent { this.config = config; } + public void setLogPath(String logPath) { + this.logPath = logPath; + } + + public String getLogPath() { + return logPath; + } + protected KylinConfig getConfig() { return config; } @@ -107,7 +116,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { checkJobPaused(); Map<String, String> info = Maps.newHashMap(); info.put(START_TIME, Long.toString(System.currentTimeMillis())); - getManager().updateJobOutput(getId(), ExecutableState.RUNNING, info, null); + getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.RUNNING, info, null, getLogPath()); } public KylinConfig getCubeSpecificConfig() { @@ -151,9 +160,9 @@ public abstract class AbstractExecutable implements Executable, Idempotent { setEndTime(System.currentTimeMillis()); if (!isDiscarded() && !isRunnable()) { if (result.succeed()) { - getManager().updateJobOutput(getId(), ExecutableState.SUCCEED, null, result.output()); + getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.SUCCEED, null, result.output(), getLogPath()); } else { - getManager().updateJobOutput(getId(), ExecutableState.ERROR, null, result.output()); + getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.ERROR, null, result.output(), getLogPath()); } } } @@ -168,7 +177,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { exception.printStackTrace(new PrintWriter(out)); output = out.toString(); } - getManager().updateJobOutput(getId(), ExecutableState.ERROR, null, output); + getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.ERROR, null, output, getLogPath()); } } @@ -587,7 +596,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { public final String getProject() { if (project == null) { - throw new IllegalStateException("project is not set for abstract executable " + getId()); + logger.error("project is not set for abstract executable " + getId()); } return project; } diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index bbeaf6a..c0f6d87 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -101,11 +101,11 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai protected void onExecuteStart(ExecutableContext executableContext) { final long startTime = getStartTime(); if (startTime > 0) { - getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null); + getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.RUNNING, null, null, getLogPath()); } else { Map<String, String> info = Maps.newHashMap(); info.put(START_TIME, Long.toString(System.currentTimeMillis())); - getManager().updateJobOutput(getId(), ExecutableState.RUNNING, info, null); + getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.RUNNING, info, null, getLogPath()); } getManager().addJobInfo(getId(), BUILD_INSTANCE, DistributedLockFactory.processAndHost()); } @@ -137,8 +137,8 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai "There shouldn't be a running subtask[jobId: {}, jobName: {}], \n" + "it might cause endless state, will retry to fetch subtask's state.", task.getId(), task.getName()); - getManager().updateJobOutput(task.getId(), ExecutableState.ERROR, null, - "killed due to inconsistent state"); + getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.ERROR, null, + "killed due to inconsistent state", getLogPath()); hasError = true; } @@ -156,21 +156,21 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai } if (allSucceed) { setEndTime(System.currentTimeMillis()); - mgr.updateJobOutput(getId(), ExecutableState.SUCCEED, null, null); + mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.SUCCEED, null, null, getLogPath()); onStatusChange(executableContext, result, ExecutableState.SUCCEED); } else if (hasError) { setEndTime(System.currentTimeMillis()); - mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, null); + mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.ERROR, null, null, getLogPath()); onStatusChange(executableContext, result, ExecutableState.ERROR); } else if (hasDiscarded) { setEndTime(System.currentTimeMillis()); - mgr.updateJobOutput(getId(), ExecutableState.DISCARDED, null, null); + mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.DISCARDED, null, null, getLogPath()); } else { - mgr.updateJobOutput(getId(), ExecutableState.READY, null, null); + mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.READY, null, null, getLogPath()); } } else { setEndTime(System.currentTimeMillis()); - mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, result.output()); + mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.ERROR, null, result.output(), getLogPath()); onStatusChange(executableContext, result, ExecutableState.ERROR); } } diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index ecc2c2e..c9ae8a4 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -23,7 +23,12 @@ import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_ID; import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_URL; import static org.apache.kylin.job.constant.ExecutableConstants.FLINK_JOB_ID; +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import java.nio.file.Paths; import java.util.HashMap; import java.util.IllegalFormatException; @@ -31,18 +36,28 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Deque; +import java.util.ArrayDeque; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.kylin.common.JobProcessContext; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.dao.ExecutableDao; import org.apache.kylin.job.dao.ExecutableOutputPO; import org.apache.kylin.job.dao.ExecutablePO; import org.apache.kylin.job.exception.IllegalStateTranferException; import org.apache.kylin.job.exception.PersistentException; +import org.apache.kylin.metadata.MetadataConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -240,6 +255,197 @@ public class ExecutableManager { return result; } + public Output getOutputFromHDFSByJobId(String jobId) { + return getOutputFromHDFSByJobId(jobId, jobId); + } + + public Output getOutputFromHDFSByJobId(String jobId, String stepId) { + return getOutputFromHDFSByJobId(jobId, stepId, 100); + } + + /** + * get job output from hdfs json file; + * if json file contains logPath, + * the logPath is spark driver log hdfs path(*.json.log), read sample data from log file. + * + * @param jobId + * @return + */ + public Output getOutputFromHDFSByJobId(String jobId, String stepId, int nLines) { + AbstractExecutable jobInstance = getJob(jobId); + String outputStorePath = KylinConfig.getInstanceFromEnv().getJobOutputStorePath(jobInstance.getParam(MetadataConstants.P_PROJECT_NAME), stepId); + ExecutableOutputPO jobOutput = getJobOutputFromHDFS(outputStorePath); + assertOutputNotNull(jobOutput, outputStorePath); + + if (Objects.nonNull(jobOutput.getLogPath())) { + if (isHdfsPathExists(jobOutput.getLogPath())) { + jobOutput.setContent(getSampleDataFromHDFS(jobOutput.getLogPath(), nLines)); + } else if (StringUtils.isEmpty(jobOutput.getContent()) && Objects.nonNull(getJob(jobId)) + && getJob(jobId).getStatus() == ExecutableState.RUNNING) { + jobOutput.setContent("Wait a moment ... "); + } + } + + return parseOutput(jobOutput); + } + + public ExecutableOutputPO getJobOutputFromHDFS(String resPath) { + DataInputStream din = null; + try { + Path path = new Path(resPath); + FileSystem fs = HadoopUtil.getWorkingFileSystem(); + if (!fs.exists(path)) { + ExecutableOutputPO executableOutputPO = new ExecutableOutputPO(); + executableOutputPO.setContent("job output not found, please check kylin.log"); + return executableOutputPO; + } + + din = fs.open(path); + return JsonUtil.readValue(din, ExecutableOutputPO.class); + } catch (Exception e) { + // If the output file on hdfs is corrupt, give an empty output + logger.error("get job output [{}] from HDFS failed.", resPath, e); + ExecutableOutputPO executableOutputPO = new ExecutableOutputPO(); + executableOutputPO.setContent("job output broken, please check kylin.log"); + return executableOutputPO; + } finally { + IOUtils.closeQuietly(din); + } + } + + private void assertOutputNotNull(ExecutableOutputPO output, String idOrPath) { + com.google.common.base.Preconditions.checkArgument(output != null, "there is no related output for job :" + idOrPath); + } + + /** + * check the hdfs path exists. + * + * @param hdfsPath + * @return + */ + public boolean isHdfsPathExists(String hdfsPath) { + if (StringUtils.isBlank(hdfsPath)) { + return false; + } + + Path path = new Path(hdfsPath); + FileSystem fs = HadoopUtil.getWorkingFileSystem(); + try { + return fs.exists(path); + } catch (IOException e) { + logger.error("check the hdfs path [{}] exists failed, ", hdfsPath, e); + } + + return false; + } + + /** + * get sample data from hdfs log file. + * specified the lines, will get the first num lines and last num lines. + * + * @param resPath + * @return + */ + public String getSampleDataFromHDFS(String resPath, final int nLines) { + try { + Path path = new Path(resPath); + FileSystem fs = HadoopUtil.getWorkingFileSystem(); + if (!fs.exists(path)) { + return null; + } + + FileStatus fileStatus = fs.getFileStatus(path); + try (FSDataInputStream din = fs.open(path); + BufferedReader reader = new BufferedReader(new InputStreamReader(din, "UTF-8"))) { + + String line; + StringBuilder sampleData = new StringBuilder(); + for (int i = 0; i < nLines && (line = reader.readLine()) != null; i++) { + if (sampleData.length() > 0) { + sampleData.append('\n'); + } + sampleData.append(line); + } + + int offset = sampleData.toString().getBytes("UTF-8").length + 1; + if (offset < fileStatus.getLen()) { + sampleData.append("\n================================================================\n"); + sampleData.append(tailHdfsFileInputStream(din, offset, fileStatus.getLen(), nLines)); + } + return sampleData.toString(); + } + } catch (IOException e) { + logger.error("get sample data from hdfs log file [{}] failed!", resPath, e); + return null; + } + } + + /** + * get the last N_LINES lines from the end of hdfs file input stream; + * reference: https://olapio.atlassian.net/wiki/spaces/PD/pages/1306918958 + * + * @param hdfsDin + * @param startPos + * @param endPos + * @param nLines + * @return + * @throws IOException + */ + private String tailHdfsFileInputStream(FSDataInputStream hdfsDin, final long startPos, final long endPos, + final int nLines) throws IOException { + com.google.common.base.Preconditions.checkNotNull(hdfsDin); + com.google.common.base.Preconditions.checkArgument(startPos < endPos && startPos >= 0); + com.google.common.base.Preconditions.checkArgument(nLines >= 0); + + Deque<String> deque = new ArrayDeque<>(); + int buffSize = 8192; + byte[] byteBuf = new byte[buffSize]; + + long pos = endPos; + + // cause by log last char is \n + hdfsDin.seek(pos - 1); + int lastChar = hdfsDin.read(); + if ('\n' == lastChar) { + pos--; + } + + int bytesRead = (int) ((pos - startPos) % buffSize); + if (bytesRead == 0) { + bytesRead = buffSize; + } + + pos -= bytesRead; + int lines = nLines; + while (lines > 0 && pos >= startPos) { + bytesRead = hdfsDin.read(pos, byteBuf, 0, bytesRead); + + int last = bytesRead; + for (int i = bytesRead - 1; i >= 0 && lines > 0; i--) { + if (byteBuf[i] == '\n') { + deque.push(new String(byteBuf, i, last - i, StandardCharsets.UTF_8)); + lines--; + last = i; + } + } + + if (lines > 0 && last > 0) { + deque.push(new String(byteBuf, 0, last, StandardCharsets.UTF_8)); + } + + bytesRead = buffSize; + pos -= bytesRead; + } + + StringBuilder sb = new StringBuilder(); + while (!deque.isEmpty()) { + sb.append(deque.pop()); + } + + return sb.length() > 0 && sb.charAt(0) == '\n' ? sb.substring(1) : sb.toString(); + } + + public List<AbstractExecutable> getAllExecutables() { try { List<AbstractExecutable> ret = Lists.newArrayList(); @@ -342,12 +548,12 @@ public class ExecutableManager { List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks(); for (AbstractExecutable task : tasks) { if (task.getStatus() == ExecutableState.RUNNING) { - updateJobOutput(task.getId(), ExecutableState.READY, null, null); + updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.READY, null, null, task.getLogPath()); break; } } } - updateJobOutput(jobId, ExecutableState.READY, null, null); + updateJobOutput(job.getParam(MetadataConstants.P_PROJECT_NAME), jobId, ExecutableState.READY, null, null, job.getLogPath()); } public void resumeJob(String jobId) { @@ -360,7 +566,7 @@ public class ExecutableManager { List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks(); for (AbstractExecutable task : tasks) { if (task.getStatus() == ExecutableState.ERROR || task.getStatus() == ExecutableState.STOPPED) { - updateJobOutput(task.getId(), ExecutableState.READY, null, "no output"); + updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.READY, null, "no output", task.getLogPath()); break; } } @@ -372,7 +578,7 @@ public class ExecutableManager { info.remove(AbstractExecutable.END_TIME); } } - updateJobOutput(jobId, ExecutableState.READY, info, null); + updateJobOutput(job.getParam(MetadataConstants.P_PROJECT_NAME), jobId, ExecutableState.READY, info, null, job.getLogPath()); } public void discardJob(String jobId) { @@ -394,11 +600,11 @@ public class ExecutableManager { List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks(); for (AbstractExecutable task : tasks) { if (!task.getStatus().isFinalState()) { - updateJobOutput(task.getId(), ExecutableState.DISCARDED, null, null); + updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.DISCARDED, null, null, task.getLogPath()); } } } - updateJobOutput(jobId, ExecutableState.DISCARDED, null, null); + updateJobOutput(job.getParam(MetadataConstants.P_PROJECT_NAME), jobId, ExecutableState.DISCARDED, null, null, job.getLogPath()); } public void rollbackJob(String jobId, String stepId) { @@ -412,13 +618,13 @@ public class ExecutableManager { for (AbstractExecutable task : tasks) { if (task.getId().compareTo(stepId) >= 0) { logger.debug("rollback task : " + task); - updateJobOutput(task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), ""); + updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "", task.getLogPath()); } } } if (job.getStatus() == ExecutableState.SUCCEED) { - updateJobOutput(job.getId(), ExecutableState.READY, null, null); + updateJobOutput(job.getParam(MetadataConstants.P_PROJECT_NAME), job.getId(), ExecutableState.READY, null, null, job.getLogPath()); } } @@ -439,12 +645,12 @@ public class ExecutableManager { List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks(); for (AbstractExecutable task : tasks) { if (!task.getStatus().isFinalState()) { - updateJobOutput(task.getId(), ExecutableState.STOPPED, null, null); + updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.STOPPED, null, null, task.getLogPath()); break; } } } - updateJobOutput(jobId, ExecutableState.STOPPED, null, null); + updateJobOutput(job.getParam(MetadataConstants.P_PROJECT_NAME), jobId, ExecutableState.STOPPED, null, null, job.getLogPath()); } public ExecutableOutputPO getJobOutput(String jobId) { @@ -456,7 +662,7 @@ public class ExecutableManager { } } - public void updateJobOutput(String jobId, ExecutableState newStatus, Map<String, String> info, String output) { + public void updateJobOutput(String project, String jobId, ExecutableState newStatus, Map<String, String> info, String output, String logPath) { // when if (Thread.currentThread().isInterrupted()) { throw new RuntimeException("Current thread is interruptted, aborting"); @@ -494,6 +700,39 @@ public class ExecutableManager { logger.error("error change job:" + jobId + " to " + newStatus); throw new RuntimeException(e); } + + if (project != null) { + //write output to HDFS + updateJobOutputToHDFS(project, jobId, output, logPath); + } + } + + public void updateJobOutputToHDFS(String project, String jobId, String output, String logPath) { + ExecutableOutputPO jobOutput = getJobOutput(jobId); + if (null != output) { + jobOutput.setContent(output); + } + if (null != logPath) { + jobOutput.setLogPath(logPath); + } + String outputHDFSPath = KylinConfig.getInstanceFromEnv().getJobOutputStorePath(project, jobId); + + updateJobOutputToHDFS(outputHDFSPath, jobOutput); + } + + public void updateJobOutputToHDFS(String resPath, ExecutableOutputPO obj) { + DataOutputStream dout = null; + try { + Path path = new Path(resPath); + FileSystem fs = HadoopUtil.getWorkingFileSystem(); + dout = fs.create(path, true); + JsonUtil.writeValue(dout, obj); + } catch (Exception e) { + // the operation to update output to hdfs failed, next task should not be interrupted. + logger.error("update job output [{}] to HDFS failed.", resPath, e); + } finally { + IOUtils.closeQuietly(dout); + } } private boolean needDestroyProcess(ExecutableState from, ExecutableState to) { @@ -548,7 +787,7 @@ public class ExecutableManager { if (executableDao.getJobOutput(task.getId()).getStatus().equals("SUCCEED")) { continue; } else if (executableDao.getJobOutput(task.getId()).getStatus().equals("RUNNING")) { - updateJobOutput(task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), ""); + updateJobOutput(null, task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "", null); } break; } diff --git a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java index 3535d37..3341dff 100644 --- a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java @@ -75,7 +75,7 @@ public class ExecutableManagerTest extends LocalFileMetadataTestCase { AbstractExecutable another = service.getJob(executable.getId()); assertJobEqual(executable, another); - service.updateJobOutput(executable.getId(), ExecutableState.RUNNING, null, "test output"); + service.updateJobOutput(null, executable.getId(), ExecutableState.RUNNING, null, "test output", null); assertJobEqual(executable, service.getJob(executable.getId())); } @@ -98,21 +98,21 @@ public class ExecutableManagerTest extends LocalFileMetadataTestCase { SucceedTestExecutable job = new SucceedTestExecutable(); String id = job.getId(); service.addJob(job); - service.updateJobOutput(id, ExecutableState.RUNNING, null, null); - service.updateJobOutput(id, ExecutableState.ERROR, null, null); - service.updateJobOutput(id, ExecutableState.READY, null, null); - service.updateJobOutput(id, ExecutableState.RUNNING, null, null); - service.updateJobOutput(id, ExecutableState.READY, null, null); - service.updateJobOutput(id, ExecutableState.RUNNING, null, null); - service.updateJobOutput(id, ExecutableState.SUCCEED, null, null); + service.updateJobOutput(null, id, ExecutableState.RUNNING, null, null, null); + service.updateJobOutput(null, id, ExecutableState.ERROR, null, null, null); + service.updateJobOutput(null, id, ExecutableState.READY, null, null, null); + service.updateJobOutput(null, id, ExecutableState.RUNNING, null, null, null); + service.updateJobOutput(null, id, ExecutableState.READY, null, null, null); + service.updateJobOutput(null, id, ExecutableState.RUNNING, null, null, null); + service.updateJobOutput(null, id, ExecutableState.SUCCEED, null, null, null); } @Test(expected = IllegalStateTranferException.class) public void testInvalidStateTransfer() { SucceedTestExecutable job = new SucceedTestExecutable(); service.addJob(job); - service.updateJobOutput(job.getId(), ExecutableState.ERROR, null, null); - service.updateJobOutput(job.getId(), ExecutableState.STOPPED, null, null); + service.updateJobOutput(null, job.getId(), ExecutableState.ERROR, null, null, null); + service.updateJobOutput(null, job.getId(), ExecutableState.STOPPED, null, null, null); } private static void assertJobEqual(Executable one, Executable another) { diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java index 0855012..71fc19f 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java @@ -145,8 +145,8 @@ public class DefaultSchedulerTest extends BaseSchedulerTest { job.addTask(task1); job.addTask(task2); execMgr.addJob(job); - ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()).updateJobOutput(task2.getId(), - ExecutableState.RUNNING, null, null); + ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()).updateJobOutput(null, task2.getId(), + ExecutableState.RUNNING, null, null, null); waitForJobFinish(job.getId(), MAX_WAIT_TIME); Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(job.getId()).getState()); Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState()); diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java new file mode 100644 index 0000000..5a90fb2 --- /dev/null +++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.common.logging; + +import com.google.common.collect.Lists; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.helpers.LogLog; +import org.apache.log4j.spi.LoggingEvent; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +public abstract class AbstractHdfsLogAppender extends AppenderSkeleton { + private final Object flushLogLock = new Object(); + private final Object initWriterLock = new Object(); + private final Object closeLock = new Object(); + private final Object fileSystemLock = new Object(); + + private FSDataOutputStream outStream = null; + private BufferedWriter bufferedWriter = null; + + private FileSystem fileSystem = null; + + private ExecutorService appendHdfsService = null; + + private BlockingDeque<LoggingEvent> logBufferQue = null; + private static final double QUEUE_FLUSH_THRESHOLD = 0.2; + + //configurable + private int logQueueCapacity = 8192; + private int flushInterval = 5000; + private String hdfsWorkingDir; + + public int getLogQueueCapacity() { + return logQueueCapacity; + } + + public void setLogQueueCapacity(int logQueueCapacity) { + this.logQueueCapacity = logQueueCapacity; + } + + public BlockingDeque<LoggingEvent> getLogBufferQue() { + return logBufferQue; + } + + public int getFlushInterval() { + return flushInterval; + } + + public void setFlushInterval(int flushInterval) { + this.flushInterval = flushInterval; + } + + public String getHdfsWorkingDir() { + return hdfsWorkingDir; + } + + public void setHdfsWorkingDir(String hdfsWorkingDir) { + this.hdfsWorkingDir = hdfsWorkingDir; + } + + public FileSystem getFileSystem() { + if (null == fileSystem) { + return getFileSystem(new Configuration()); + } + return fileSystem; + } + + private FileSystem getFileSystem(Configuration conf) { + synchronized (fileSystemLock) { + if (null == fileSystem) { + try { + fileSystem = new Path(hdfsWorkingDir).getFileSystem(conf); + } catch (IOException e) { + LogLog.error("Failed to create the file system, ", e); + throw new RuntimeException(e); + } + } + } + return fileSystem; + } + + public boolean isWriterInited() { + synchronized (initWriterLock) { + return null != bufferedWriter; + } + } + + abstract void init(); + + abstract String getAppenderName(); + + /** + * init the load resource. + */ + @Override + public void activateOptions() { + LogLog.warn(String.format(Locale.ROOT, "%s starting ...", getAppenderName())); + LogLog.warn("hdfsWorkingDir -> " + getHdfsWorkingDir()); + + init(); + + logBufferQue = new LinkedBlockingDeque<>(getLogQueueCapacity()); + appendHdfsService = Executors.newSingleThreadExecutor(); + appendHdfsService.execute(this::checkAndFlushLog); + Runtime.getRuntime().addShutdownHook(new Thread(this::close)); + + LogLog.warn(String.format(Locale.ROOT, "%s started ...", getAppenderName())); + } + + @Override + public void append(LoggingEvent loggingEvent) { + try { + boolean offered = logBufferQue.offer(loggingEvent, 10, TimeUnit.SECONDS); + if (!offered) { + LogLog.error("LogEvent cannot put into the logBufferQue, log event content:"); + printLoggingEvent(loggingEvent); + } + } catch (InterruptedException e) { + LogLog.warn("Append logging event interrupted!", e); + // Restore interrupted state... + Thread.currentThread().interrupt(); + } + } + + @Override + public void close() { + synchronized (closeLock) { + if (!this.closed) { + this.closed = true; + + List<LoggingEvent> transaction = Lists.newArrayList(); + try { + flushLog(getLogBufferQue().size(), transaction); + + closeWriter(); + if (appendHdfsService != null && !appendHdfsService.isShutdown()) { + appendHdfsService.shutdownNow(); + } + } catch (Exception e) { + transaction.forEach(this::printLoggingEvent); + try { + while (!getLogBufferQue().isEmpty()) { + printLoggingEvent(getLogBufferQue().take()); + } + } catch (Exception ie) { + LogLog.error("clear the logging buffer queue failed!", ie); + } + LogLog.error(String.format(Locale.ROOT, "close %s failed!", getAppenderName()), e); + } + LogLog.warn(String.format(Locale.ROOT, "%s closed ...", getAppenderName())); + } + } + } + + private void closeWriter() { + IOUtils.closeQuietly(bufferedWriter); + IOUtils.closeQuietly(outStream); + } + + @Override + public boolean requiresLayout() { + return true; + } + + /** + * some times need to wait the component init ok. + * + * @return + */ + abstract boolean isSkipCheckAndFlushLog(); + + /** + * clear the log buffer queue when it was full. + */ + private void clearLogBufferQueueWhenBlocked() { + if (logBufferQue.size() >= getLogQueueCapacity()) { + int removeNum = getLogQueueCapacity() / 5; + while (removeNum > 0) { + try { + LoggingEvent loggingEvent = logBufferQue.take(); + printLoggingEvent(loggingEvent); + } catch (Exception ex) { + LogLog.error("Take event interrupted!", ex); + } + removeNum--; + } + } + } + + /** + * print the logging event to stderr + * @param loggingEvent + */ + private void printLoggingEvent(LoggingEvent loggingEvent) { + try { + String log = getLayout().format(loggingEvent); + LogLog.error(log.endsWith("\n") ? log.substring(0, log.length() - 1) : log); + if (null != loggingEvent.getThrowableStrRep()) { + for (String stack : loggingEvent.getThrowableStrRep()) { + LogLog.error(stack); + } + } + } catch (Exception e) { + LogLog.error("print logging event failed!", e); + } + } + + /** + * flush the log to hdfs when conditions are satisfied. + */ + protected void checkAndFlushLog() { + long start = System.currentTimeMillis(); + do { + List<LoggingEvent> transaction = Lists.newArrayList(); + try { + if (isSkipCheckAndFlushLog()) { + continue; + } + + int eventSize = getLogBufferQue().size(); + if (eventSize > getLogQueueCapacity() * QUEUE_FLUSH_THRESHOLD + || System.currentTimeMillis() - start > getFlushInterval()) { + // update start time before doing flushLog to avoid exception when flushLog + start = System.currentTimeMillis(); + flushLog(eventSize, transaction); + } else { + Thread.sleep(getFlushInterval() / 100); + } + } catch (Exception e) { + transaction.forEach(this::printLoggingEvent); + clearLogBufferQueueWhenBlocked(); + LogLog.error("Error occurred when consume event", e); + } + } while (!closed); + } + + /** + * init the hdfs writer and create the hdfs file with outPath. + * need kerberos authentic, so fileSystem init here. + * + * @param outPath + */ + protected boolean initHdfsWriter(Path outPath, Configuration conf) { + synchronized (initWriterLock) { + closeWriter(); + bufferedWriter = null; + outStream = null; + + int retry = 10; + while (retry-- > 0) { + try { + fileSystem = getFileSystem(conf); + outStream = fileSystem.create(outPath, true); + break; + } catch (Exception e) { + LogLog.error("fail to create stream for path: " + outPath, e); + } + + try { + initWriterLock.wait(1000);//waiting for acl to turn to current user + } catch (InterruptedException e) { + LogLog.warn("Init writer interrupted!", e); + // Restore interrupted state... + Thread.currentThread().interrupt(); + } + } + if (null != outStream) { + bufferedWriter = new BufferedWriter(new OutputStreamWriter(outStream, StandardCharsets.UTF_8)); + return true; + } + } + + return false; + } + + /** + * write the data into the buffer. + * + * @param message + * @throws IOException + */ + protected void write(String message) throws IOException { + bufferedWriter.write(message); + } + + /** + * write the error stack info into buffer + * + * @param loggingEvent + * @throws IOException + */ + protected void writeLogEvent(LoggingEvent loggingEvent) throws IOException { + if (null != loggingEvent) { + write(getLayout().format(loggingEvent)); + + if (null != loggingEvent.getThrowableStrRep()) { + for (String message : loggingEvent.getThrowableStrRep()) { + write(message); + write("\n"); + } + } + } + } + + /** + * do write log to the buffer. + * + * @param eventSize + * @throws IOException + * @throws InterruptedException + */ + abstract void doWriteLog(int eventSize, List<LoggingEvent> transaction) throws IOException, InterruptedException; + + /** + * flush the buffer data to HDFS. + * + * @throws IOException + */ + private void flush() throws IOException { + bufferedWriter.flush(); + outStream.hsync(); + } + + /** + * take the all events from queue and write into the HDFS immediately. + * + * @param eventSize + * @throws IOException + * @throws InterruptedException + */ + protected void flushLog(int eventSize, List<LoggingEvent> transaction) throws IOException, InterruptedException { + if (eventSize <= 0) { + return; + } + + synchronized (flushLogLock) { + if (eventSize > getLogBufferQue().size()) { + eventSize = getLogBufferQue().size(); + } + + doWriteLog(eventSize, transaction); + + flush(); + } + } +} diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkDriverHdfsLogAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkDriverHdfsLogAppender.java new file mode 100644 index 0000000..0e5ad3c --- /dev/null +++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkDriverHdfsLogAppender.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.common.logging; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.log4j.helpers.LogLog; +import org.apache.log4j.spi.LoggingEvent; + +import java.io.IOException; +import java.util.List; + +public class SparkDriverHdfsLogAppender extends AbstractHdfsLogAppender { + + private String logPath; + + // kerberos + private boolean kerberosEnable = false; + private String kerberosPrincipal; + private String kerberosKeytab; + + public String getLogPath() { + return logPath; + } + + public void setLogPath(String logPath) { + this.logPath = logPath; + } + + public boolean isKerberosEnable() { + return kerberosEnable; + } + + public void setKerberosEnable(boolean kerberosEnable) { + this.kerberosEnable = kerberosEnable; + } + + public String getKerberosPrincipal() { + return kerberosPrincipal; + } + + public void setKerberosPrincipal(String kerberosPrincipal) { + this.kerberosPrincipal = kerberosPrincipal; + } + + public String getKerberosKeytab() { + return kerberosKeytab; + } + + public void setKerberosKeytab(String kerberosKeytab) { + this.kerberosKeytab = kerberosKeytab; + } + + @Override + public void init() { + LogLog.warn("spark.driver.log4j.appender.hdfs.File -> " + getLogPath()); + LogLog.warn("kerberosEnable -> " + isKerberosEnable()); + if (isKerberosEnable()) { + LogLog.warn("kerberosPrincipal -> " + getKerberosPrincipal()); + LogLog.warn("kerberosKeytab -> " + getKerberosKeytab()); + } + } + + @Override + String getAppenderName() { + return "SparkDriverHdfsLogAppender"; + } + + @Override + public boolean isSkipCheckAndFlushLog() { + return false; + } + + @Override + public void doWriteLog(int eventSize, List<LoggingEvent> transaction) + throws IOException, InterruptedException { + if (!isWriterInited()) { + Configuration conf = new Configuration(); + if (isKerberosEnable()) { + UserGroupInformation.setConfiguration(conf); + UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(), getKerberosKeytab()); + } + if (!initHdfsWriter(new Path(getLogPath()), conf)) { + LogLog.error("init the hdfs writer failed!"); + } + } + + while (eventSize > 0) { + LoggingEvent loggingEvent = getLogBufferQue().take(); + transaction.add(loggingEvent); + writeLogEvent(loggingEvent); + eventSize--; + } + } +} diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java index 98f63a1..3cc1b60 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java @@ -26,15 +26,14 @@ import java.net.UnknownHostException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Collections; - import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Map.Entry; - import java.util.Set; +import java.util.Objects; +import java.util.Map.Entry; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -108,6 +107,7 @@ public class NSparkExecutable extends AbstractExecutable { CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cube = cubeMgr.getCube(this.getCubeName()); KylinConfig config = cube.getConfig(); + this.setLogPath(getSparkDriverLogHdfsPath(context.getConfig())); config = wrapConfig(config); String sparkHome = KylinConfig.getSparkHome(); @@ -189,15 +189,12 @@ public class NSparkExecutable extends AbstractExecutable { /** * generate the spark driver log hdfs path format, json path + timestamp + .log - * - * @param - * @return */ - /*public String getSparkDriverLogHdfsPath(KylinConfig config) { - return String.format("%s.%s.log", config.getJobTmpOutputStorePath(getProject(), getId()), + public String getSparkDriverLogHdfsPath(KylinConfig config) { + return String.format(Locale.ROOT, "%s.%s.log", config.getJobOutputStorePath(getParam(MetadataConstants.P_PROJECT_NAME), getId()), System.currentTimeMillis()); - }*/ - + } + protected KylinConfig wrapConfig(ExecutableContext context) { return wrapConfig(context.getConfig()); } @@ -206,15 +203,18 @@ public class NSparkExecutable extends AbstractExecutable { String project = getParam(MetadataConstants.P_PROJECT_NAME); Preconditions.checkState(StringUtils.isNotBlank(project), "job " + getId() + " project info is empty"); + HashMap<String, String> jobOverrides = new HashMap(); String parentId = getParentId(); - originalConfig.setProperty("job.id", StringUtils.defaultIfBlank(parentId, getId())); - originalConfig.setProperty("job.project", project); + jobOverrides.put("job.id", StringUtils.defaultIfBlank(parentId, getId())); + jobOverrides.put("job.project", project); if (StringUtils.isNotBlank(parentId)) { - originalConfig.setProperty("job.stepId", getId()); + jobOverrides.put("job.stepId", getId()); } - originalConfig.setProperty("user.timezone", KylinConfig.getInstanceFromEnv().getTimeZone()); + jobOverrides.put("user.timezone", KylinConfig.getInstanceFromEnv().getTimeZone()); + jobOverrides.put("spark.driver.log4j.appender.hdfs.File", + Objects.isNull(this.getLogPath()) ? "null" : this.getLogPath()); - return originalConfig; + return KylinConfigExt.createInstance(originalConfig, jobOverrides); } private void killOrphanApplicationIfExists(KylinConfig config, String jobId) { @@ -302,27 +302,64 @@ public class NSparkExecutable extends AbstractExecutable { sparkConfigOverride.put("spark.hadoop.hive.metastore.sasl.enabled", "true"); } + replaceSparkDriverJavaOpsConfIfNeeded(config, sparkConfigOverride); + return sparkConfigOverride; + } + + private void replaceSparkDriverJavaOpsConfIfNeeded(KylinConfig config, Map<String, String> sparkConfigOverride) { + String sparkDriverExtraJavaOptionsKey = "spark.driver.extraJavaOptions"; + StringBuilder sb = new StringBuilder(); + if (sparkConfigOverride.containsKey(sparkDriverExtraJavaOptionsKey)) { + sb.append(sparkConfigOverride.get(sparkDriverExtraJavaOptionsKey)); + } + String serverIp = "127.0.0.1"; try { serverIp = InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { logger.warn("use the InetAddress get local ip failed!", e); } + String serverPort = config.getServerPort(); - String log4jConfiguration = "file:" + config.getLogSparkPropertiesFile(); + String hdfsWorkingDir = config.getHdfsWorkingDirectory(); - String sparkDriverExtraJavaOptionsKey = "spark.driver.extraJavaOptions"; - StringBuilder sb = new StringBuilder(); - if (sparkConfigOverride.containsKey(sparkDriverExtraJavaOptionsKey)) { - sb.append(sparkConfigOverride.get(sparkDriverExtraJavaOptionsKey)); + String sparkDriverHdfsLogPath = null; + if (config instanceof KylinConfigExt) { + Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides(); + if (Objects.nonNull(extendedOverrides)) { + sparkDriverHdfsLogPath = extendedOverrides.get("spark.driver.log4j.appender.hdfs.File"); + } } + /* + if (config.isCloud()) { + String logLocalWorkingDirectory = config.getLogLocalWorkingDirectory(); + if (StringUtils.isNotBlank(logLocalWorkingDirectory)) { + hdfsWorkingDir = logLocalWorkingDirectory; + sparkDriverHdfsLogPath = logLocalWorkingDirectory + sparkDriverHdfsLogPath; + } + } + */ + + String log4jConfiguration = "file:" + config.getLogSparkDriverPropertiesFile(); sb.append(String.format(Locale.ROOT, " -Dlog4j.configuration=%s ", log4jConfiguration)); + sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.enabled=%s ", config.isKerberosEnabled())); + + if (config.isKerberosEnabled()) { + sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.principal=%s ", config.getKerberosPrincipal())); + sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.keytab=%s", config.getKerberosKeytabPath())); + if (config.getPlatformZKEnable()) { + sb.append(String.format(Locale.ROOT, " -Djava.security.auth.login.config=%s", config.getKerberosJaasConfPath())); + sb.append(String.format(Locale.ROOT, " -Djava.security.krb5.conf=%s", config.getKerberosKrb5ConfPath())); + } + } + sb.append(String.format(Locale.ROOT, " -Dkylin.hdfs.working.dir=%s ", hdfsWorkingDir)); + sb.append(String.format(Locale.ROOT, " -Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath)); sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.ip=%s ", serverIp)); + sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.port=%s ", serverPort)); sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s ", getId())); - + sb.append(String.format(Locale.ROOT, " -Dspark.driver.local.logDir=%s ", config.getKylinLogDir() + "/spark")); sparkConfigOverride.put(sparkDriverExtraJavaOptionsKey, sb.toString()); - return sparkConfigOverride; } protected String generateSparkCmd(KylinConfig config, String hadoopConf, String jars, String kylinJobJar, diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/BasicController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/BasicController.java index 8607348..dba8132 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/BasicController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/BasicController.java @@ -50,6 +50,8 @@ import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseStatus; +import static org.apache.kylin.shaded.com.google.common.net.HttpHeaders.CONTENT_DISPOSITION; + /** */ public class BasicController { @@ -132,6 +134,20 @@ public class BasicController { } } + protected void setDownloadResponse(InputStream inputStream, String fileName, String contentType, + final HttpServletResponse response) throws IOException { + try (OutputStream output = response.getOutputStream()) { + response.reset(); + response.setContentType(contentType); + response.setHeader(CONTENT_DISPOSITION, "attachment; filename=\"" + fileName + "\""); + IOUtils.copyLarge(inputStream, output); + output.flush(); + } catch (IOException e) { + logger.error("Failed download log File!"); + throw e; + } + } + public boolean isAdmin() { boolean isAdmin = false; Authentication authentication = SecurityContextHolder.getContext().getAuthentication(); diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java index 65c036c..c28362c 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java @@ -18,11 +18,13 @@ package org.apache.kylin.rest.controller; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Locale; import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.constant.JobStatusEnum; @@ -30,16 +32,22 @@ import org.apache.kylin.job.constant.JobTimeFilterEnum; import org.apache.kylin.rest.exception.BadRequestException; import org.apache.kylin.rest.exception.InternalErrorException; import org.apache.kylin.rest.request.JobListRequest; +import org.apache.kylin.rest.response.EnvelopeResponse; +import org.apache.kylin.rest.response.ResponseCode; import org.apache.kylin.rest.service.JobService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.RequestParam; + +import javax.servlet.http.HttpServletResponse; @Controller @RequestMapping(value = "jobs") @@ -160,6 +168,7 @@ public class JobController extends BasicController { * @return * @throws IOException */ + @RequestMapping(value = "/{jobId}/steps/{stepId}/output", method = { RequestMethod.GET }, produces = { "application/json" }) @ResponseBody @@ -167,11 +176,34 @@ public class JobController extends BasicController { Map<String, String> result = new HashMap<String, String>(); result.put("jobId", jobId); result.put("stepId", String.valueOf(stepId)); - result.put("cmd_output", jobService.getExecutableManager().getOutput(stepId).getVerboseMsg()); + result.put("cmd_output", jobService.getJobOutput(jobId, stepId)); return result; } /** + * Download a job step output from hdfs + * @param jobId + * @param stepId + * @param project + * @param response + * @return + */ + @RequestMapping(value = "/{job_id:.+}/steps/{step_id:.+}/log", method = { RequestMethod.GET }, produces = { "application/json" }) + @ResponseBody + public EnvelopeResponse<String> downloadLogFile(@PathVariable("job_id") String jobId, + @PathVariable("step_id") String stepId, @RequestParam(value = "project") String project, + HttpServletResponse response) throws IOException { + checkRequiredArg("job_id", jobId); + checkRequiredArg("step_id", stepId); + checkRequiredArg("project", project); + String downloadFilename = String.format(Locale.ROOT, "%s_%s.log", project, stepId); + + String jobOutput = jobService.getAllJobOutput(jobId, stepId); + setDownloadResponse(new ByteArrayInputStream(jobOutput.getBytes("UTF-8")), downloadFilename, MediaType.APPLICATION_OCTET_STREAM_VALUE, response); + return new EnvelopeResponse<>(ResponseCode.CODE_SUCCESS, "", ""); + } + + /** * Resume a cube job * * @return diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index f7be7d1..2c0c20c 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -64,6 +64,7 @@ import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.CheckpointExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.Output; import org.apache.kylin.job.lock.zookeeper.ZookeeperJobLock; import org.apache.kylin.metadata.model.ISourceAware; @@ -479,6 +480,16 @@ public class JobService extends BasicService implements InitializingBean { return getExecutableManager().getOutput(id); } + public String getJobOutput(String jobId, String stepId) { + ExecutableManager executableManager = getExecutableManager(); + return executableManager.getOutputFromHDFSByJobId(jobId, stepId).getVerboseMsg(); + } + + public String getAllJobOutput(String jobId, String stepId) { + ExecutableManager executableManager = getExecutableManager(); + return executableManager.getOutputFromHDFSByJobId(jobId, stepId, Integer.MAX_VALUE).getVerboseMsg(); + } + protected JobInstance getSingleJobInstance(AbstractExecutable job) { Message msg = MsgPicker.getMsg();
