This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new 88ad6ec KYLIN-4917 Fix some problem of logger system in kylin4 88ad6ec is described below commit 88ad6ec31b1d5ba0308bebd96d9c65d93935d3d8 Author: yaqian.zhang <598593...@qq.com> AuthorDate: Wed Feb 24 17:25:30 2021 +0800 KYLIN-4917 Fix some problem of logger system in kylin4 --- .../common/logging/AbstractHdfsLogAppender.java | 10 ++-- .../common/logging/SparkExecutorHdfsAppender.java | 54 ++++++++++++++++++---- .../apache/kylin/rest/job/StorageCleanupJob.java | 2 +- 3 files changed, 53 insertions(+), 13 deletions(-) diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java index ff5240c..17422a8 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java +++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java @@ -336,7 +336,9 @@ public abstract class AbstractHdfsLogAppender extends AppenderSkeleton { * @throws IOException */ protected void write(String message) throws IOException { - bufferedWriter.write(message); + if (isWriterInited()) { + bufferedWriter.write(message); + } } /** @@ -373,8 +375,10 @@ public abstract class AbstractHdfsLogAppender extends AppenderSkeleton { * @throws IOException */ private void flush() throws IOException { - bufferedWriter.flush(); - outStream.hsync(); + if (isWriterInited()) { + bufferedWriter.flush(); + outStream.hsync(); + } } /** diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java index b2e4146..0bbd3e7 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java +++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java @@ -25,10 +25,13 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.log4j.helpers.LogLog; import org.apache.log4j.spi.LoggingEvent; import org.apache.spark.SparkEnv; +import org.apache.spark.deploy.SparkHadoopUtil; import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil; +import scala.runtime.BoxedUnit; import java.io.File; import java.io.IOException; @@ -160,12 +163,21 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender { String user = System.getenv("USER"); LogLog.warn("login user is " + UserGroupInformation.getLoginUser() + " SPARK_USER is " + sparkuser + " USER is " + user); - if (!initHdfsWriter(file, new Configuration())) { - LogLog.error("Failed to init the hdfs writer!"); - } - doRollingClean(loggingEvent); + SparkHadoopUtil.get().runAsSparkUser(new scala.runtime.AbstractFunction0<scala.runtime.BoxedUnit>() { + @Override + public BoxedUnit apply() { + if (!initHdfsWriter(file, new Configuration())) { + LogLog.error("Failed to init the hdfs writer!"); + } + try { + doRollingClean(loggingEvent); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + }); } - transaction.add(loggingEvent); writeLogEvent(loggingEvent); size--; @@ -229,9 +241,9 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender { @VisibleForTesting String getRootPathName() { if ("job".equals(getCategory())) { - return parseHdfsWordingDir() + "/" + getProject() + "/spark_logs/executor/"; + return getHdfsWorkingDir() + "/" + getProject() + "/spark_logs/executor/"; } else if ("sparder".equals(getCategory())) { - return parseHdfsWordingDir() + "/_sparder_logs"; + return parseHdfsWorkingDir() + "/_sparder_logs"; } else { throw new IllegalArgumentException("illegal category: " + getCategory()); } @@ -254,8 +266,32 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender { return false; } - private String parseHdfsWordingDir() { - return StringUtils.appendIfMissing(getHdfsWorkingDir(), "/"); + private String parseHdfsWorkingDir() { + String root = getHdfsWorkingDir(); + Path path = new Path(root); + if (!path.isAbsolute()) + throw new IllegalArgumentException("kylin.env.hdfs-working-dir must be absolute, but got " + root); + + try { + FileSystem fs = path.getFileSystem(HadoopUtil.getCurrentConfiguration()); + path = fs.makeQualified(path); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // append metadata-url prefix + String metaId = getMetadataIdentifier().replace(':', '-'); + //transform relative path for local metadata + if (metaId.startsWith("../")) { + metaId = metaId.replace("../", ""); + metaId = metaId.replace('/', '-'); + } + + root = new Path(path, metaId).toString(); + + if (!root.endsWith("/")) + root += "/"; + return root; } } diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java index 2b7569e..f4ad269 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java +++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java @@ -82,7 +82,7 @@ public class StorageCleanupJob extends AbstractApplication { protected long storageTimeCut; - protected static final List<String> protectedDir = Arrays.asList("cube_statistics", "resources-jdbc"); + protected static final List<String> protectedDir = Arrays.asList("cube_statistics", "resources-jdbc", "_sparder_logs"); protected static PathFilter pathFilter = status -> !protectedDir.contains(status.getName()); public StorageCleanupJob() throws IOException {