This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5_beta
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit a7531287d6d3733bcadec935f5f2225b413a9e18
Author: huangsheng <huangshen...@163.com>
AuthorDate: Fri Apr 28 17:08:49 2023 +0800

    KYLIN-5650 fix Building global dict can't read meta file on S3
---
 .../engine/spark/builder/DFDictionaryBuilder.scala | 21 +++++++--
 .../apache/spark/dict/NGlobalDictHDFSStore.java    | 54 +++++++++++++++++-----
 2 files changed, 61 insertions(+), 14 deletions(-)

diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala
index 5dfc35eeac..78a5c2cd97 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala
@@ -26,6 +26,7 @@ import org.apache.kylin.metadata.model.TblColRef
 import org.apache.spark.TaskContext
 import org.apache.spark.application.NoRetryException
 import org.apache.spark.dict.NGlobalDictionaryV2
+import org.apache.spark.sql.execution.{ExplainMode, ExtendedMode}
 import org.apache.spark.sql.functions.{col, expr}
 import org.apache.spark.sql.types.StringType
 import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
@@ -94,6 +95,17 @@ class DFDictionaryBuilder(
     originalAQE.toBoolean
   }
 
+  def dictBuilderInfo(bucketPartitionSize: Int, df: Dataset[Row] ) : String = {
+      s"""
+         |==========================[DICT REPARTITION 
INFO]===============================
+         |Partition Size :${df.rdd.getNumPartitions}
+         |Bucket Partition Size: $bucketPartitionSize
+         |AQE Enabled: ${ss.conf.get(AQE)}
+         |Physical Plan:\n 
${df.queryExecution.explainString(ExplainMode.fromString(ExtendedMode.name))}
+         |==========================[DICT REPARTITION 
INFO]===============================
+      """.stripMargin
+  }
+
   @throws[IOException]
   private[builder] def build(ref: TblColRef, bucketPartitionSize: Int,
                              afterDistinct: Dataset[Row]): Unit = 
logTime(s"building global dictionaries V2 for ${ref.getIdentity}") {
@@ -105,10 +117,13 @@ class DFDictionaryBuilder(
     ss.sparkContext.setJobDescription("Build dict " + ref.getIdentity)
 
     val dictCol = col(afterDistinct.schema.fields.head.name)
-    afterDistinct.filter(dictCol.isNotNull)
+    // https://issues.apache.org/jira/browse/SPARK-32051
+    val afterDistinctRepartition = afterDistinct.filter(dictCol.isNotNull)
       .repartition(bucketPartitionSize, dictCol)
-      // https://issues.apache.org/jira/browse/SPARK-32051
-      .foreachPartition((iter: Iterator[Row]) => {
+
+    logInfo(dictBuilderInfo(bucketPartitionSize, afterDistinctRepartition))
+
+    afterDistinctRepartition.foreachPartition((iter: Iterator[Row]) => {
         val partitionID = TaskContext.get().partitionId()
         logInfo(s"Build partition dict col: ${ref.getIdentity}, partitionId: 
$partitionID")
         val broadcastGlobalDict = broadcastDict.value
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java
index 3034b2e8bc..37d2375739 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java
@@ -18,9 +18,11 @@
 
 package org.apache.spark.dict;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -29,6 +31,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+
 import org.apache.hadoop.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -99,20 +102,19 @@ public class NGlobalDictHDFSStore implements 
NGlobalDictStore {
                 path -> path.getName().startsWith(DICT_METADATA_NAME));
 
         if (metaFiles.length == 0) {
-            logger.info("because metaFiles.length is 0, metaInfo is null");
-            return null;
-        }
-
-        String metaFile = metaFiles[0].getPath().getName();
-        Path metaPath = new Path(versionDir, metaFile);
-        if (!fileSystem.exists(metaPath)) {
-            logger.info("because metaPath[{}] is not exists, metaInfo is 
null", metaPath);
-            return null;
+            FileStatus[] allFilesUnderVersionDir = 
fileSystem.listStatus(versionDir);
+            StringBuilder dictFiles = new StringBuilder();
+            for (FileStatus file : allFilesUnderVersionDir) {
+                dictFiles.append("\t-> 
").append(file.getPath().toString()).append("\n");
+            }
+            logger.warn(
+                    "Because metaFiles.length is 0, metaInfo is null. Only the 
following files exist in the current version folder:\n{}",
+                    dictFiles);
         }
 
         NGlobalDictMetaInfo metaInfo;
-
-        try (FSDataInputStream is = fileSystem.open(metaPath)) {
+        Path metaPath = new Path(versionDir, DICT_METADATA_NAME);
+        try (FSDataInputStream is = getMetaFileInputStream(fileSystem, 
metaPath, "Meta file not exists.")) {
             int bucketSize = is.readInt();
             long[] bucketOffsets = new long[bucketSize];
             long[] bucketCount = new long[bucketSize];
@@ -129,6 +131,36 @@ public class NGlobalDictHDFSStore implements 
NGlobalDictStore {
         return metaInfo;
     }
 
+    public FSDataInputStream getMetaFileInputStream(FileSystem fs, Path fp, 
String errorMsg) throws IOException {
+        if (fs.exists(fp)) {
+            return fs.open(fp);
+        } else if (!fs.getScheme().startsWith("s3")) {
+            throw new FileNotFoundException(
+                    String.format("%s. No Retry, file[%s] not found.", 
errorMsg, fp.toString()));
+        } else {
+            logger.info("Try to open {}", fp);
+            int retryTimes = 0;
+            // Retry once per second. The default timeout is 5 minutes
+            while (retryTimes < 300) {
+                logger.info("Open file operation will retry after 1 s, file 
path: {}", fp);
+                try {
+                    TimeUnit.SECONDS.sleep(1L);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    throw new IOException(String.format(
+                            "unexpected things happened[%s] when sleeping for 
retry open file:", ie.getMessage()));
+                }
+                retryTimes += 1;
+                if (fs.exists(fp)) {
+                    return fs.open(fp);
+                }
+            }
+            throw new FileNotFoundException(
+                    String.format("%s. Retry timeout, file[%s] not found.", 
errorMsg, fp.toString()));
+        }
+    }
+
+
     @Override
     public Object2LongMap<String> getBucketDict(long version, 
NGlobalDictMetaInfo metaInfo, int bucketId)
             throws IOException {

Reply via email to