This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5_beta in repository https://gitbox.apache.org/repos/asf/kylin.git
commit a7531287d6d3733bcadec935f5f2225b413a9e18 Author: huangsheng <huangshen...@163.com> AuthorDate: Fri Apr 28 17:08:49 2023 +0800 KYLIN-5650 fix Building global dict can't read meta file on S3 --- .../engine/spark/builder/DFDictionaryBuilder.scala | 21 +++++++-- .../apache/spark/dict/NGlobalDictHDFSStore.java | 54 +++++++++++++++++----- 2 files changed, 61 insertions(+), 14 deletions(-) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala index 5dfc35eeac..78a5c2cd97 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala @@ -26,6 +26,7 @@ import org.apache.kylin.metadata.model.TblColRef import org.apache.spark.TaskContext import org.apache.spark.application.NoRetryException import org.apache.spark.dict.NGlobalDictionaryV2 +import org.apache.spark.sql.execution.{ExplainMode, ExtendedMode} import org.apache.spark.sql.functions.{col, expr} import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{Column, Dataset, Row, SparkSession} @@ -94,6 +95,17 @@ class DFDictionaryBuilder( originalAQE.toBoolean } + def dictBuilderInfo(bucketPartitionSize: Int, df: Dataset[Row] ) : String = { + s""" + |==========================[DICT REPARTITION INFO]=============================== + |Partition Size :${df.rdd.getNumPartitions} + |Bucket Partition Size: $bucketPartitionSize + |AQE Enabled: ${ss.conf.get(AQE)} + |Physical Plan:\n ${df.queryExecution.explainString(ExplainMode.fromString(ExtendedMode.name))} + |==========================[DICT REPARTITION INFO]=============================== + """.stripMargin + } + @throws[IOException] private[builder] def build(ref: TblColRef, bucketPartitionSize: Int, afterDistinct: Dataset[Row]): Unit = logTime(s"building global dictionaries V2 for ${ref.getIdentity}") { @@ -105,10 +117,13 @@ class DFDictionaryBuilder( ss.sparkContext.setJobDescription("Build dict " + ref.getIdentity) val dictCol = col(afterDistinct.schema.fields.head.name) - afterDistinct.filter(dictCol.isNotNull) + // https://issues.apache.org/jira/browse/SPARK-32051 + val afterDistinctRepartition = afterDistinct.filter(dictCol.isNotNull) .repartition(bucketPartitionSize, dictCol) - // https://issues.apache.org/jira/browse/SPARK-32051 - .foreachPartition((iter: Iterator[Row]) => { + + logInfo(dictBuilderInfo(bucketPartitionSize, afterDistinctRepartition)) + + afterDistinctRepartition.foreachPartition((iter: Iterator[Row]) => { val partitionID = TaskContext.get().partitionId() logInfo(s"Build partition dict col: ${ref.getIdentity}, partitionId: $partitionID") val broadcastGlobalDict = broadcastDict.value diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java index 3034b2e8bc..37d2375739 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java @@ -18,9 +18,11 @@ package org.apache.spark.dict; +import java.io.FileNotFoundException; import java.io.IOException; import java.nio.charset.Charset; import java.util.TreeSet; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -29,6 +31,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; + import org.apache.hadoop.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,20 +102,19 @@ public class NGlobalDictHDFSStore implements NGlobalDictStore { path -> path.getName().startsWith(DICT_METADATA_NAME)); if (metaFiles.length == 0) { - logger.info("because metaFiles.length is 0, metaInfo is null"); - return null; - } - - String metaFile = metaFiles[0].getPath().getName(); - Path metaPath = new Path(versionDir, metaFile); - if (!fileSystem.exists(metaPath)) { - logger.info("because metaPath[{}] is not exists, metaInfo is null", metaPath); - return null; + FileStatus[] allFilesUnderVersionDir = fileSystem.listStatus(versionDir); + StringBuilder dictFiles = new StringBuilder(); + for (FileStatus file : allFilesUnderVersionDir) { + dictFiles.append("\t-> ").append(file.getPath().toString()).append("\n"); + } + logger.warn( + "Because metaFiles.length is 0, metaInfo is null. Only the following files exist in the current version folder:\n{}", + dictFiles); } NGlobalDictMetaInfo metaInfo; - - try (FSDataInputStream is = fileSystem.open(metaPath)) { + Path metaPath = new Path(versionDir, DICT_METADATA_NAME); + try (FSDataInputStream is = getMetaFileInputStream(fileSystem, metaPath, "Meta file not exists.")) { int bucketSize = is.readInt(); long[] bucketOffsets = new long[bucketSize]; long[] bucketCount = new long[bucketSize]; @@ -129,6 +131,36 @@ public class NGlobalDictHDFSStore implements NGlobalDictStore { return metaInfo; } + public FSDataInputStream getMetaFileInputStream(FileSystem fs, Path fp, String errorMsg) throws IOException { + if (fs.exists(fp)) { + return fs.open(fp); + } else if (!fs.getScheme().startsWith("s3")) { + throw new FileNotFoundException( + String.format("%s. No Retry, file[%s] not found.", errorMsg, fp.toString())); + } else { + logger.info("Try to open {}", fp); + int retryTimes = 0; + // Retry once per second. The default timeout is 5 minutes + while (retryTimes < 300) { + logger.info("Open file operation will retry after 1 s, file path: {}", fp); + try { + TimeUnit.SECONDS.sleep(1L); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException(String.format( + "unexpected things happened[%s] when sleeping for retry open file:", ie.getMessage())); + } + retryTimes += 1; + if (fs.exists(fp)) { + return fs.open(fp); + } + } + throw new FileNotFoundException( + String.format("%s. Retry timeout, file[%s] not found.", errorMsg, fp.toString())); + } + } + + @Override public Object2LongMap<String> getBucketDict(long version, NGlobalDictMetaInfo metaInfo, int bucketId) throws IOException {