This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit c61dc4189968f29f8e56e98f72a16f100e9d6e2b
Author: huangsheng <huangshen...@163.com>
AuthorDate: Fri Sep 22 16:13:50 2023 +0800

    KYLIN-5828 Concurrently dict v2 jobs lead to abnormal encoding result
---
 .../engine/spark/builder/CreateFlatTable.scala     |  15 +--
 .../engine/spark/builder/DFDictionaryBuilder.scala |  16 +--
 .../engine/spark/builder/DFTableEncoder.scala      |  13 ++-
 .../spark/builder/DictionaryBuilderHelper.java     |   5 +-
 .../job/stage/build/FlatTableAndDictBase.scala     |  18 ++--
 .../engine/spark/dict/NGlobalDictionaryV2Test.java |  77 +++++++++------
 .../kylin/engine/spark/builder/TestDFChooser.scala |   5 +-
 .../engine/spark/builder/TestGlobalDictBuild.scala |  58 +++++++++--
 .../v3dict/GlobalDictionaryUpdateSuite.scala       |   6 +-
 .../scala/org/apache/spark/sql/KapFunctions.scala  |   4 +-
 .../sql/catalyst/expressions/KapExpresssions.scala |  24 +++--
 .../org/apache/spark/sql/udf/DictEncodeImpl.scala  |   9 +-
 .../org/apache/spark/dict/NBucketDictionary.java   |  62 +++++++++---
 .../spark/dict/NGlobalDictBuilderAssist.scala      |   5 +-
 .../apache/spark/dict/NGlobalDictHDFSStore.java    |  10 +-
 .../org/apache/spark/dict/NGlobalDictS3Store.java  |  16 +--
 .../org/apache/spark/dict/NGlobalDictStore.java    |   4 +-
 .../org/apache/spark/dict/NGlobalDictionaryV2.java | 107 +++++++++++++++++----
 18 files changed, 326 insertions(+), 128 deletions(-)

diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala
index b659b87969..654c5b9f6f 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala
@@ -31,6 +31,7 @@ import org.apache.kylin.guava30.shaded.common.collect.Sets
 import org.apache.kylin.metadata.cube.cuboid.NSpanningTree
 import org.apache.kylin.metadata.cube.model.{NCubeJoinedFlatTableDesc, 
NDataSegment}
 import org.apache.kylin.metadata.model._
+import org.apache.spark.dict.NGlobalDictionaryV2.NO_VERSION_SPECIFIED
 import org.apache.spark.sql.functions.{col, expr}
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
 
@@ -115,12 +116,14 @@ class CreateFlatTable(val flatTable: IJoinedFlatTableDesc,
                                dictCols: Set[TblColRef],
                                encodeCols: Set[TblColRef]): Dataset[Row] = {
     val ccDataset = withColumn(ds, ccCols)
+    var buildVersion = System.currentTimeMillis()
     if (seg.isDictReady) {
       logInfo(s"Skip already built dict, segment: ${seg.getId} of dataflow: 
${seg.getDataflow.getId}")
+      buildVersion = NO_VERSION_SPECIFIED
     } else {
-      buildDict(ccDataset, dictCols)
+      buildDict(ccDataset, dictCols, buildVersion)
     }
-    encodeColumn(ccDataset, encodeCols)
+    encodeColumn(ccDataset, encodeCols, buildVersion)
   }
 
   private def withColumn(ds: Dataset[Row], withCols: Set[TblColRef]): 
Dataset[Row] = {
@@ -131,21 +134,21 @@ class CreateFlatTable(val flatTable: IJoinedFlatTableDesc,
     withDs
   }
 
-  private def buildDict(ds: Dataset[Row], dictCols: Set[TblColRef]): Unit = {
+  private def buildDict(ds: Dataset[Row], dictCols: Set[TblColRef], 
buildVersion: Long): Unit = {
     val matchedCols = if (seg.getIndexPlan.isSkipEncodeIntegerFamilyEnabled) {
       filterOutIntegerFamilyType(ds, dictCols)
     } else {
       selectColumnsInTable(ds, dictCols)
     }
     val builder = new DFDictionaryBuilder(ds, seg, ss, 
Sets.newHashSet(matchedCols.asJavaCollection))
-    builder.buildDictSet()
+    builder.buildDictSet(buildVersion)
   }
 
-  private def encodeColumn(ds: Dataset[Row], encodeCols: Set[TblColRef]): 
Dataset[Row] = {
+  private def encodeColumn(ds: Dataset[Row], encodeCols: Set[TblColRef], 
buildVersion: Long): Dataset[Row] = {
     val matchedCols = selectColumnsInTable(ds, encodeCols)
     var encodeDs = ds
     if (matchedCols.nonEmpty) {
-      encodeDs = DFTableEncoder.encodeTable(ds, seg, matchedCols.asJava)
+      encodeDs = DFTableEncoder.encodeTable(ds, seg, matchedCols.asJava, 
buildVersion)
     }
     encodeDs
   }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala
index af8399ca3a..d930fd178b 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala
@@ -43,8 +43,8 @@ class DFDictionaryBuilder(
                            val colRefSet: util.Set[TblColRef]) extends LogEx 
with Serializable {
 
   @throws[IOException]
-  def buildDictSet(): Unit = {
-    colRefSet.asScala.foreach(col => safeBuild(col))
+  def buildDictSet(buildVersion: Long): Unit = {
+    colRefSet.asScala.foreach(col => safeBuild(col, buildVersion))
     changeAQEConfig(true)
   }
 
@@ -52,7 +52,7 @@ class DFDictionaryBuilder(
   private val originalAQE = ss.conf.get(AQE)
 
   @throws[IOException]
-  private[builder] def safeBuild(ref: TblColRef): Unit = {
+  private[builder] def safeBuild(ref: TblColRef, buildVersion: Long): Unit = {
     val sourceColumn = ref.getIdentity
     ZKHelper.tryZKJaasConfiguration(ss)
     val lock: Lock = KylinConfig.getInstanceFromEnv.getDistributedLockFactory
@@ -64,7 +64,7 @@ class DFDictionaryBuilder(
       val bucketPartitionSize = logTime(s"calculating bucket size for 
$sourceColumn") {
         DictionaryBuilderHelper.calculateBucketSize(seg, ref, dictColDistinct)
       }
-      build(ref, bucketPartitionSize, dictColDistinct)
+      build(ref, bucketPartitionSize, dictColDistinct, buildVersion)
     } finally lock.unlock()
   }
 
@@ -90,10 +90,12 @@ class DFDictionaryBuilder(
       """.stripMargin
   }
 
+
   @throws[IOException]
   private[builder] def build(ref: TblColRef, bucketPartitionSize: Int,
-                             afterDistinct: Dataset[Row]): Unit = 
logTime(s"building global dictionaries V2 for ${ref.getIdentity}") {
-    val globalDict = new NGlobalDictionaryV2(seg.getProject, ref.getTable, 
ref.getName, seg.getConfig.getHdfsWorkingDirectory)
+                             afterDistinct: Dataset[Row],
+                             buildVersion: Long): Unit = logTime(s"building 
global dictionaries V2 for ${ref.getIdentity}") {
+    val globalDict = new NGlobalDictionaryV2(seg.getProject, ref.getTable, 
ref.getName, seg.getConfig.getHdfsWorkingDirectory, buildVersion)
     globalDict.prepareWrite()
     val broadcastDict = ss.sparkContext.broadcast(globalDict)
 
@@ -121,7 +123,7 @@ class DFDictionaryBuilder(
 
     if (seg.getConfig.isGlobalDictCheckEnabled) {
       logInfo(s"Start to check the correctness of the global dict, table: 
${ref.getTableAlias}, col: ${ref.getName}")
-      val latestGD = new NGlobalDictionaryV2(seg.getProject, ref.getTable, 
ref.getName, seg.getConfig.getHdfsWorkingDirectory)
+      val latestGD = new NGlobalDictionaryV2(seg.getProject, ref.getTable, 
ref.getName, seg.getConfig.getHdfsWorkingDirectory, buildVersion)
       for (bid <- 0 until globalDict.getMetaInfo.getBucketSize) {
         val dMap = latestGD.loadBucketDictionary(bid).getAbsoluteDictMap
         val vdCount = dMap.values().stream().distinct().count()
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFTableEncoder.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFTableEncoder.scala
index 9e3ca24734..a49c090838 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFTableEncoder.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFTableEncoder.scala
@@ -22,6 +22,7 @@ import org.apache.kylin.engine.spark.job.NSparkCubingUtil._
 import org.apache.kylin.metadata.cube.model.NDataSegment
 import org.apache.kylin.metadata.model.TblColRef
 import org.apache.spark.dict.NGlobalDictionaryV2
+import org.apache.spark.dict.NGlobalDictionaryV2.NO_VERSION_SPECIFIED
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.KapFunctions._
 import org.apache.spark.sql.functions.{col, _}
@@ -34,7 +35,7 @@ import scala.collection.mutable._
 
 object DFTableEncoder extends Logging {
 
-  def encodeTable(ds: Dataset[Row], seg: NDataSegment, cols: 
util.Set[TblColRef]): Dataset[Row] = {
+  def encodeTable(ds: Dataset[Row], seg: NDataSegment, cols: 
util.Set[TblColRef], buildVersion: Long): Dataset[Row] = {
     val structType = ds.schema
     var partitionedDs = ds
 
@@ -61,17 +62,21 @@ object DFTableEncoder extends Logging {
 
     val encodingArgs = encodingCols.map {
       ref =>
-        val globalDict = new NGlobalDictionaryV2(seg.getProject, ref.getTable, 
ref.getName, seg.getConfig.getHdfsWorkingDirectory)
+        val globalDict = new NGlobalDictionaryV2(seg.getProject, ref.getTable,
+          ref.getName, seg.getConfig.getHdfsWorkingDirectory, buildVersion)
         val bucketSize = 
globalDict.getBucketSizeOrDefault(seg.getConfig.getGlobalDictV2MinHashPartitions)
         val enlargedBucketSize = (((minBucketSize / bucketSize) + 1) * 
bucketSize).toInt
-
+        logInfo(s"[EncodeTable/${buildVersion}] bucketSize:$bucketSize")
         val encodeColRef = convertFromDot(ref.getBackTickIdentity)
         val columnIndex = structType.fieldIndex(encodeColRef)
 
         val dictParams = Array(seg.getProject, ref.getTable, ref.getName, 
seg.getConfig.getHdfsWorkingDirectory)
           .mkString(SEPARATOR)
         val aliasName = 
structType.apply(columnIndex).name.concat(ENCODE_SUFFIX)
-        val encodeCol = dict_encode(col(encodeColRef).cast(StringType), 
lit(dictParams), lit(bucketSize).cast(StringType)).as(aliasName)
+        val encodeCol = dict_encode(col(encodeColRef).cast(StringType),
+                                    lit(dictParams),
+                                    lit(bucketSize).cast(StringType),
+                                    
lit(buildVersion).cast(LongType)).as(aliasName)
         val columns = encodeCol
         (enlargedBucketSize, col(encodeColRef).cast(StringType), columns, 
aliasName,
           bucketSize == 1)
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java
index df0288164d..97ef4f433c 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java
@@ -58,8 +58,9 @@ public class DictionaryBuilderHelper {
      */
     public static int calculateBucketSize(NDataSegment seg, TblColRef col, 
Dataset<Row> afterDistinct)
             throws IOException {
+        long resizeVersion = System.currentTimeMillis();
         NGlobalDictionaryV2 globalDict = new 
NGlobalDictionaryV2(seg.getProject(), col.getTable(), col.getName(),
-                seg.getConfig().getHdfsWorkingDirectory());
+                seg.getConfig().getHdfsWorkingDirectory(), resizeVersion);
         int bucketPartitionSize = 
globalDict.getBucketSizeOrDefault(seg.getConfig().getGlobalDictV2MinHashPartitions());
         int bucketThreshold = 
seg.getConfig().getGlobalDictV2ThresholdBucketSize();
         int resizeBucketSize = bucketPartitionSize;
@@ -107,7 +108,7 @@ public class DictionaryBuilderHelper {
             if (resizeBucketSize != bucketPartitionSize) {
                 logger.info("Start building a global dictionary column for {}, 
need resize from {} to {} ",
                         col.getName(), bucketPartitionSize, resizeBucketSize);
-                resize(col, seg, resizeBucketSize, 
afterDistinct.sparkSession());
+                resize(col, seg, resizeBucketSize, 
afterDistinct.sparkSession(), resizeVersion);
                 logger.info("End building a global dictionary column for {}, 
need resize from {} to {} ", col.getName(),
                         bucketPartitionSize, resizeBucketSize);
             }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
index 935eaefaea..bacd24c225 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
@@ -44,6 +44,7 @@ import 
org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBu
 import org.apache.kylin.metadata.cube.model.NDataSegment
 import org.apache.kylin.metadata.cube.planner.CostBasePlannerUtils
 import org.apache.kylin.metadata.model._
+import org.apache.spark.dict.NGlobalDictionaryV2.NO_VERSION_SPECIFIED
 import org.apache.spark.sql.KapFunctions.dict_encode_v3
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.JoinType
@@ -563,18 +564,20 @@ abstract class FlatTableAndDictBase(private val 
jobContext: SegmentJob,
     if (dictCols.isEmpty && encodeCols.isEmpty) {
       return table
     }
+    var buildVersion = System.currentTimeMillis()
     if (dataSegment.isDictReady) {
       logInfo(s"Skip DICTIONARY segment $segmentId")
+      buildVersion = NO_VERSION_SPECIFIED
     } else {
       // ensure at least one worker was registered before dictionary lock 
added.
       waitTillWorkerRegistered()
-      buildDict(table, dictCols)
+      buildDict(table, dictCols, buildVersion)
     }
 
     if (config.isV3DictEnable) {
       buildV3DictIfNeeded(table, encodeCols)
     } else {
-      encodeColumn(table, encodeCols)
+      encodeColumn(table, encodeCols, buildVersion)
     }
   }
 
@@ -632,23 +635,24 @@ abstract class FlatTableAndDictBase(private val 
jobContext: SegmentJob,
     tableWithCcs
   }
 
-  private def buildDict(ds: Dataset[Row], dictCols: Set[TblColRef]): Unit = {
+  private def buildDict(ds: Dataset[Row], dictCols: Set[TblColRef], 
buildVersion: Long): Unit = {
     if (config.isV2DictEnable) {
-      logInfo("Build v2 dict default.")
+      logInfo(s"Build v2 dict default. " +
+        s"[${dataSegment.getModel.getAlias}] model dict build version is 
$buildVersion")
       var matchedCols = selectColumnsInTable(ds, dictCols)
       if (dataSegment.getIndexPlan.isSkipEncodeIntegerFamilyEnabled) {
         matchedCols = matchedCols.filterNot(_.getType.isIntegerFamily)
       }
       val builder = new DFDictionaryBuilder(ds, dataSegment, sparkSession, 
Sets.newHashSet(matchedCols.asJavaCollection))
-      builder.buildDictSet()
+      builder.buildDictSet(buildVersion)
     }
   }
 
-  private def encodeColumn(ds: Dataset[Row], encodeCols: Set[TblColRef]): 
Dataset[Row] = {
+  private def encodeColumn(ds: Dataset[Row], encodeCols: Set[TblColRef], 
buildVersion: Long): Dataset[Row] = {
     val matchedCols = selectColumnsInTable(ds, encodeCols)
     var encodeDs = ds
     if (matchedCols.nonEmpty) {
-      encodeDs = DFTableEncoder.encodeTable(ds, dataSegment, 
matchedCols.asJava)
+      encodeDs = DFTableEncoder.encodeTable(ds, dataSegment, 
matchedCols.asJava, buildVersion)
     }
     encodeDs
   }
diff --git 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/dict/NGlobalDictionaryV2Test.java
 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/dict/NGlobalDictionaryV2Test.java
index 8d29579e4a..6f30eb2fc3 100644
--- 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/dict/NGlobalDictionaryV2Test.java
+++ 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/dict/NGlobalDictionaryV2Test.java
@@ -35,6 +35,7 @@ import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.dict.NBucketDictionary;
 import org.apache.spark.dict.NGlobalDictHDFSStore;
 import org.apache.spark.dict.NGlobalDictMetaInfo;
+import org.apache.spark.dict.NGlobalDictS3Store;
 import org.apache.spark.dict.NGlobalDictStore;
 import org.apache.spark.dict.NGlobalDictionaryV2;
 import org.apache.spark.sql.Dataset;
@@ -70,33 +71,33 @@ public class NGlobalDictionaryV2Test extends 
NLocalWithSparkSessionTest {
 
     @Test
     public void testGlobalDictHDFSStoreRoundTest() throws IOException {
-        testAll();
+        testAll(false);
     }
 
     @Test
     public void testGlobalDictS3StoreRoundTest() throws IOException {
         // global s3 dict store
         overwriteSystemProp("kylin.engine.global-dict.store.impl", 
"org.apache.spark.dict.NGlobalDictS3Store");
-        testAll();
+        testAll(true);
     }
 
-    private void testAll() throws IOException {
-        roundTest(5);
-        roundTest(50);
-        roundTest(500);
+    private void testAll(boolean isS3Store) throws IOException {
+        roundTest(5, isS3Store);
+        roundTest(50, isS3Store);
+        roundTest(500, isS3Store);
     }
 
-    private void roundTest(int size) throws IOException {
+    private void roundTest(int size, boolean isS3Store) throws IOException {
         System.out.println("NGlobalDictionaryV2Test -> roundTest -> " + 
System.currentTimeMillis());
         KylinConfig config = KylinConfig.getInstanceFromEnv();
-        NGlobalDictionaryV2 dict1 = new NGlobalDictionaryV2("t1", "a", 
"spark", config.getHdfsWorkingDirectory());
-        NGlobalDictionaryV2 dict2 = new NGlobalDictionaryV2("t2", "a", 
"local", config.getHdfsWorkingDirectory());
+        NGlobalDictionaryV2 dict1 = new NGlobalDictionaryV2("t1", "a", 
"spark", config.getHdfsWorkingDirectory(), System.currentTimeMillis());
+        NGlobalDictionaryV2 dict2 = new NGlobalDictionaryV2("t2", "a", 
"local", config.getHdfsWorkingDirectory(), System.currentTimeMillis());
         List<String> stringList = generateRandomData(size);
         Collections.sort(stringList);
-        runWithSparkBuildGlobalDict(dict1, stringList);
-        runWithLocalBuildGlobalDict(dict2, stringList);
-        compareTwoVersionDict(dict1, dict2);
-        compareTwoModeVersionNum(dict1, dict2);
+        runWithSparkBuildGlobalDict(dict1, stringList, isS3Store);
+        runWithLocalBuildGlobalDict(dict2, stringList, isS3Store);
+        compareTwoVersionDict(dict1, dict2, isS3Store);
+        compareTwoModeVersionNum(dict1, dict2, isS3Store);
     }
 
     private List<String> generateRandomData(int size) {
@@ -107,9 +108,14 @@ public class NGlobalDictionaryV2Test extends 
NLocalWithSparkSessionTest {
         return stringList;
     }
 
-    private void runWithSparkBuildGlobalDict(NGlobalDictionaryV2 dict, 
List<String> stringSet) throws IOException {
+    private void runWithSparkBuildGlobalDict(NGlobalDictionaryV2 dict, 
List<String> stringSet, boolean isS3Store) throws IOException {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
-        dict.prepareWrite();
+        if (isS3Store) {
+            dict.prepareWriteS3();
+        } else {
+            dict.prepareWrite();
+        }
+
         List<Row> rowList = Lists.newLinkedList();
         for (String str : stringSet) {
             rowList.add(RowFactory.create(str));
@@ -122,7 +128,7 @@ public class NGlobalDictionaryV2Test extends 
NLocalWithSparkSessionTest {
             return new Tuple2<>(row.get(0).toString(), null);
         }).sortByKey().partitionBy(new 
HashPartitioner(BUCKET_SIZE)).mapPartitionsWithIndex(
                 (Function2<Integer, Iterator<Tuple2<String, String>>, 
Iterator<Object>>) (bucketId, tuple2Iterator) -> {
-                    NBucketDictionary bucketDict = 
dict.loadBucketDictionary(bucketId);
+                    NBucketDictionary bucketDict = isS3Store? 
dict.loadBucketDictionaryForTestS3(bucketId) : 
dict.loadBucketDictionary(bucketId);
                     while (tuple2Iterator.hasNext()) {
                         Tuple2<String, String> tuple2 = tuple2Iterator.next();
                         bucketDict.addRelativeValue(tuple2._1);
@@ -131,12 +137,20 @@ public class NGlobalDictionaryV2Test extends 
NLocalWithSparkSessionTest {
                     return Collections.emptyIterator();
                 }, true).count();
 
-        dict.writeMetaDict(BUCKET_SIZE, config.getGlobalDictV2MaxVersions(), 
config.getGlobalDictV2VersionTTL());
+        if (isS3Store) {
+            dict.writeMetaDictForTestS3(BUCKET_SIZE, 
config.getGlobalDictV2MaxVersions(), config.getGlobalDictV2VersionTTL());
+        } else {
+            dict.writeMetaDict(BUCKET_SIZE, 
config.getGlobalDictV2MaxVersions(), config.getGlobalDictV2VersionTTL());
+        }
     }
 
-    private void runWithLocalBuildGlobalDict(NGlobalDictionaryV2 dict, 
List<String> stringSet) throws IOException {
+    private void runWithLocalBuildGlobalDict(NGlobalDictionaryV2 dict, 
List<String> stringSet, boolean isS3Store) throws IOException {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
-        dict.prepareWrite();
+        if (isS3Store) {
+            dict.prepareWriteS3();
+        } else {
+            dict.prepareWrite();
+        }
         HashPartitioner partitioner = new HashPartitioner(BUCKET_SIZE);
         Map<Integer, List<String>> vmap = new HashMap<>();
         for (int i = 0; i < BUCKET_SIZE; i++) {
@@ -148,31 +162,36 @@ public class NGlobalDictionaryV2Test extends 
NLocalWithSparkSessionTest {
         }
 
         for (Map.Entry<Integer, List<String>> entry : vmap.entrySet()) {
-            NBucketDictionary bucketDict = 
dict.loadBucketDictionary(entry.getKey());
+            NBucketDictionary bucketDict = isS3Store ? 
dict.loadBucketDictionaryForTestS3(entry.getKey()):dict.loadBucketDictionary(entry.getKey());
             for (String s : entry.getValue()) {
                 bucketDict.addRelativeValue(s);
             }
             bucketDict.saveBucketDict(entry.getKey());
         }
 
-        dict.writeMetaDict(BUCKET_SIZE, config.getGlobalDictV2MaxVersions(), 
config.getGlobalDictV2VersionTTL());
+        if (isS3Store) {
+            dict.writeMetaDictForTestS3(BUCKET_SIZE, 
config.getGlobalDictV2MaxVersions(), config.getGlobalDictV2VersionTTL());
+        } else {
+            dict.writeMetaDict(BUCKET_SIZE, 
config.getGlobalDictV2MaxVersions(), config.getGlobalDictV2VersionTTL());
+        }
+
     }
 
-    private void compareTwoModeVersionNum(NGlobalDictionaryV2 dict1, 
NGlobalDictionaryV2 dict2) throws IOException {
-        NGlobalDictStore store1 = new 
NGlobalDictHDFSStore(dict1.getResourceDir());
-        NGlobalDictStore store2 = new 
NGlobalDictHDFSStore(dict2.getResourceDir());
+    private void compareTwoModeVersionNum(NGlobalDictionaryV2 dict1, 
NGlobalDictionaryV2 dict2, boolean isS3Store) throws IOException {
+        NGlobalDictStore store1 = isS3Store? new 
NGlobalDictS3Store(dict1.getResourceDir()) : new 
NGlobalDictHDFSStore(dict1.getResourceDir());
+        NGlobalDictStore store2 = isS3Store? new 
NGlobalDictS3Store(dict2.getResourceDir()) : new 
NGlobalDictHDFSStore(dict2.getResourceDir());
         Assert.assertEquals(store1.listAllVersions().length, 
store2.listAllVersions().length);
     }
 
-    private void compareTwoVersionDict(NGlobalDictionaryV2 dict1, 
NGlobalDictionaryV2 dict2) throws IOException {
-        NGlobalDictMetaInfo metadata1 = dict1.getMetaInfo();
-        NGlobalDictMetaInfo metadata2 = dict2.getMetaInfo();
+    private void compareTwoVersionDict(NGlobalDictionaryV2 dict1, 
NGlobalDictionaryV2 dict2, boolean isS3Store) throws IOException {
+        NGlobalDictMetaInfo metadata1 = isS3Store? 
dict1.getMetaInfoForTestS3():dict1.getMetaInfo();
+        NGlobalDictMetaInfo metadata2 = isS3Store? 
dict2.getMetaInfoForTestS3():dict2.getMetaInfo();
         // compare dict meta info
         Assert.assertEquals(metadata1, metadata2);
 
         for (int i = 0; i < metadata1.getBucketSize(); i++) {
-            NBucketDictionary bucket1 = dict1.loadBucketDictionary(i);
-            NBucketDictionary bucket2 = dict2.loadBucketDictionary(i);
+            NBucketDictionary bucket1 = isS3Store? 
dict1.loadBucketDictionaryForTestS3(i):dict1.loadBucketDictionary(i);
+            NBucketDictionary bucket2 = isS3Store? 
dict2.loadBucketDictionaryForTestS3(i):dict2.loadBucketDictionary(i);
 
             Object2LongMap<String> map1 = bucket1.getAbsoluteDictMap();
             Object2LongMap<String> map2 = bucket2.getAbsoluteDictMap();
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestDFChooser.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestDFChooser.scala
index 9dd526d6bd..5ebedddd13 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestDFChooser.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestDFChooser.scala
@@ -160,8 +160,9 @@ class TestDFChooser extends SparderBaseFunSuite with 
SharedSparkSession with Loc
         val dict1 = new NGlobalDictionaryV2(seg.getProject, col.getTable, 
col.getName, seg.getConfig.getHdfsWorkingDirectory)
         val meta1 = dict1.getMetaInfo
         val needResizeBucketSize = 
dict1.getBucketSizeOrDefault(seg.getConfig.getGlobalDictV2MinHashPartitions) + 
10
-        NGlobalDictBuilderAssist.resize(col, seg, needResizeBucketSize, spark)
-        val dict2 = new NGlobalDictionaryV2(seg.getProject, col.getTable, 
col.getName, seg.getConfig.getHdfsWorkingDirectory)
+        val resizeVersion = System.currentTimeMillis()
+        NGlobalDictBuilderAssist.resize(col, seg, needResizeBucketSize, spark, 
resizeVersion)
+        val dict2 = new NGlobalDictionaryV2(seg.getProject, col.getTable, 
col.getName, seg.getConfig.getHdfsWorkingDirectory, resizeVersion)
         Assert.assertEquals(meta1.getDictCount, dict2.getMetaInfo.getDictCount)
         Assert.assertEquals(meta1.getBucketSize + 10, 
dict2.getMetaInfo.getBucketSize)
       }
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala
index 764229ff44..016844e7c8 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala
@@ -17,10 +17,12 @@
  */
 package org.apache.kylin.engine.spark.builder
 
-import org.apache.commons.lang3.RandomStringUtils
+import org.apache.commons.lang3.{RandomStringUtils, StringUtils}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, PathFilter}
 import org.apache.kylin.common.KylinConfig
+import 
org.apache.kylin.engine.spark.builder.v3dict.GlobalDictionaryBuilderHelper.genRandomData
+import org.apache.kylin.engine.spark.job.NSparkCubingUtil
 import org.apache.kylin.metadata.cube.cuboid.{AdaptiveSpanningTree, 
NSpanningTreeFactory}
 import org.apache.kylin.metadata.cube.model.{NDataSegment, NDataflow, 
NDataflowManager}
 import org.apache.kylin.metadata.model.TblColRef
@@ -48,6 +50,40 @@ class TestGlobalDictBuild extends SparderBaseFunSuite with 
SharedSparkSession wi
     config
   }
 
+  test("build v2 dict and encode flattable") {
+    val dsMgr: NDataflowManager = NDataflowManager.getInstance(getTestConfig, 
DEFAULT_PROJECT)
+    val df: NDataflow = dsMgr.getDataflow(CUBE_NAME)
+    val seg = df.getLastSegment
+    val nSpanningTree = 
NSpanningTreeFactory.fromLayouts(seg.getIndexPlan.getAllLayouts, df.getUuid)
+    val dictColSet = 
DictionaryBuilderHelper.extractTreeRelatedGlobalDicts(seg, 
nSpanningTree.getAllIndexEntities)
+
+    val dictCol = dictColSet.iterator().next()
+    val encodeColName: String = StringUtils.split(dictCol.getTable, 
".").apply(1) + NSparkCubingUtil.SEPARATOR + dictCol.getName
+    val randomDF = genRandomData(spark, encodeColName, 1000, 10)
+
+    val dictionaryBuilder = new DFDictionaryBuilder(randomDF, seg, 
randomDF.sparkSession, dictColSet)
+    val colName = dictColSet.iterator().next()
+    val bucketPartitionSize = DictionaryBuilderHelper.calculateBucketSize(seg, 
colName, randomDF)
+    val buildVersion = System.currentTimeMillis()
+    dictionaryBuilder.build(colName, bucketPartitionSize, randomDF, 
buildVersion)
+    val dict = new NGlobalDictionaryV2(seg.getProject,
+      colName.getTable,
+      colName.getName,
+      seg.getConfig.getHdfsWorkingDirectory,
+      buildVersion)
+    val meta1 = dict.getMetaInfo
+    Assert.assertEquals(1000, meta1.getDictCount)
+    val encode = encodeColumn(randomDF, seg, dictColSet, buildVersion)
+    val rowsWithZero = encode.filter(encode(encode.columns(1)) === 0)
+    Assert.assertEquals(true, rowsWithZero.isEmpty)
+    // clean all
+    val cleanCol = dictColSet.iterator().next()
+    val cleanDict = new NGlobalDictionaryV2(seg.getProject, cleanCol.getTable, 
cleanCol.getName, seg.getConfig.getHdfsWorkingDirectory)
+    val cleanDictPath = new Path(seg.getConfig.getHdfsWorkingDirectory + 
cleanDict.getResourceDir)
+    val fileSystem = cleanDictPath.getFileSystem(new Configuration())
+    fileSystem.delete(cleanDictPath, true)
+  }
+
   test("global dict build and checkout bucket resize strategy") {
     val dsMgr: NDataflowManager = NDataflowManager.getInstance(getTestConfig, 
DEFAULT_PROJECT)
     
Assert.assertTrue(getTestConfig.getHdfsWorkingDirectory.startsWith("file:"))
@@ -150,10 +186,11 @@ class TestGlobalDictBuild extends SparderBaseFunSuite 
with SharedSparkSession wi
     val dictColumn = 
col(dataflow.getModel.getColumnIdByColumnName(dictCol.getIdentity).toString)
     val distinctDS = sampleDS.select(dictColumn).distinct()
     val dictBuilder = new DFDictionaryBuilder(sampleDS, segment, spark, 
dictColSet)
-    dictBuilder.build(dictCol, shufflePartitionSizeInt, distinctDS)
+    val buildVersion = System.currentTimeMillis()
+    dictBuilder.build(dictCol, shufflePartitionSizeInt, distinctDS, 
buildVersion)
 
     val dictStore = 
NGlobalDictStoreFactory.getResourceStore(segment.getConfig.getHdfsWorkingDirectory
 + dictV2.getResourceDir)
-    val versionPath = dictStore.getVersionDir(dictStore.listAllVersions()(0))
+    val versionPath = dictStore.getVersionDir(buildVersion)
 
     val dictDirSize = fileSystem.listStatus(versionPath, new PathFilter {
       override def accept(path: Path): Boolean = {
@@ -184,7 +221,7 @@ class TestGlobalDictBuild extends SparderBaseFunSuite with 
SharedSparkSession wi
     seg.getConfig.setProperty("kylin.engine.global-dict-aqe-enabled", "FALSE")
     dictionaryBuilder.changeAQEConfig(false)
     Assert.assertFalse(spark.conf.get("spark.sql.adaptive.enabled").toBoolean)
-    dictionaryBuilder.build(col, bucketPartitionSize, ds)
+    dictionaryBuilder.build(col, bucketPartitionSize, ds, 
System.currentTimeMillis())
     
Assert.assertTrue(spark.conf.get("spark.sql.adaptive.enabled").equals(originalAQE))
 
     // false true
@@ -196,7 +233,7 @@ class TestGlobalDictBuild extends SparderBaseFunSuite with 
SharedSparkSession wi
     dictionaryBuilder.changeAQEConfig(false)
     
Assert.assertTrue(spark.conf.get("spark.sql.adaptive.enabled").equals(originalAQE))
 
-    dictionaryBuilder.build(col, bucketPartitionSize, ds)
+    dictionaryBuilder.build(col, bucketPartitionSize, ds, 
System.currentTimeMillis())
     
Assert.assertTrue(spark.conf.get("spark.sql.adaptive.enabled").equals(originalAQE))
 
     // true true
@@ -210,12 +247,19 @@ class TestGlobalDictBuild extends SparderBaseFunSuite 
with SharedSparkSession wi
     val col = dictColSet.iterator().next()
     val ds = randomDataSet.select("26").distinct()
     val bucketPartitionSize = DictionaryBuilderHelper.calculateBucketSize(seg, 
col, ds)
-    dictionaryBuilder.build(col, bucketPartitionSize, ds)
+    val buildVersion = System.currentTimeMillis()
+    dictionaryBuilder.build(col, bucketPartitionSize, ds, buildVersion)
     val dict = new NGlobalDictionaryV2(seg.getProject, col.getTable, 
col.getName,
-      seg.getConfig.getHdfsWorkingDirectory)
+      seg.getConfig.getHdfsWorkingDirectory, buildVersion)
     dict.getMetaInfo
   }
 
+  def encodeColumn(ds: Dataset[Row], dataSegment: NDataSegment,
+                   encodeCols: Set[TblColRef], buildVersion: Long): 
Dataset[Row] = {
+    val encodeDs = DFTableEncoder.encodeTable(ds, dataSegment, encodeCols, 
buildVersion)
+    encodeDs
+  }
+
   def generateOriginData(count: Int, length: Int): Dataset[Row] = {
     var schema = new StructType
 
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionaryUpdateSuite.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionaryUpdateSuite.scala
index 71b49a0924..e089abcdb8 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionaryUpdateSuite.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionaryUpdateSuite.scala
@@ -87,11 +87,13 @@ class GlobalDictionaryUpdateSuite extends 
SparderBaseFunSuite with LocalMetadata
     val dictionaryBuilder = new DFDictionaryBuilder(randomDataSet, seg, 
randomDataSet.sparkSession, dictColSet)
     val colName = dictColSet.iterator().next()
     val bucketPartitionSize = DictionaryBuilderHelper.calculateBucketSize(seg, 
colName, randomDataSet)
-    dictionaryBuilder.build(colName, bucketPartitionSize, randomDataSet)
+    val buildVersion = System.currentTimeMillis()
+    dictionaryBuilder.build(colName, bucketPartitionSize, randomDataSet, 
buildVersion)
     val dict = new NGlobalDictionaryV2(seg.getProject,
       colName.getTable,
       colName.getName,
-      seg.getConfig.getHdfsWorkingDirectory)
+      seg.getConfig.getHdfsWorkingDirectory,
+      buildVersion)
     dict.getMetaInfo
   }
 }
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KapFunctions.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KapFunctions.scala
index 77cce256e1..bf18b4aee7 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KapFunctions.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KapFunctions.scala
@@ -247,8 +247,8 @@ object KapFunctions {
     }
   }
 
-  def dict_encode(column: Column, dictParams: Column, bucketSize: Column): 
Column = {
-    Column(DictEncode(column.expr, dictParams.expr, bucketSize.expr))
+  def dict_encode(column: Column, dictParams: Column, bucketSize: Column, 
buildVersion: Column): Column = {
+    Column(DictEncode(column.expr, dictParams.expr, bucketSize.expr, 
buildVersion.expr))
   }
 
   def dict_encode_v3(column: Column, colName: String): Column = {
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala
index 74beffaae3..a27fff2ad1 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala
@@ -366,7 +366,9 @@ case class DictEncodeV3(child: Expression, col: String) 
extends UnaryExpression
   override protected def nullSafeEval(input: Any): Any = 
super.nullSafeEval(input)
 }
 
-case class DictEncode(left: Expression, mid: Expression, right: Expression) 
extends TernaryExpression with ExpectsInputTypes {
+case class DictEncode(left: Expression, mid: Expression,
+                      right: Expression, buildVersion: Expression)
+  extends QuaternaryExpression with ExpectsInputTypes {
 
   def maxFields: Int = SQLConf.get.maxToStringFields
 
@@ -376,7 +378,9 @@ case class DictEncode(left: Expression, mid: Expression, 
right: Expression) exte
 
   override def third: Expression = right
 
-  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, 
StringType, StringType)
+  override def fourth: Expression = buildVersion
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, 
StringType, StringType, LongType)
 
   override protected def doGenCode(ctx: CodegenContext,
                                    ev: ExprCode): ExprCode = {
@@ -395,13 +399,14 @@ case class DictEncode(left: Expression, mid: Expression, 
right: Expression) exte
 
     val dictParamsTerm = mid.simpleString(maxFields)
     val bucketSizeTerm = right.simpleString(maxFields).toInt
+    val version = buildVersion.simpleString(maxFields).toLong
 
     val initBucketDictFuncName = 
ctx.addNewFunction(s"init${bucketDictTerm.replace("[", "").replace("]", 
"")}BucketDict",
       s"""
          | private void init${bucketDictTerm.replace("[", "").replace("]", 
"")}BucketDict(int idx) {
          |   try {
          |     int bucketId = idx % $bucketSizeTerm;
-         |     $globalDictTerm = new 
org.apache.spark.dict.NGlobalDictionaryV2("$dictParamsTerm");
+         |     $globalDictTerm = new 
org.apache.spark.dict.NGlobalDictionaryV2("$dictParamsTerm",${version}L);
          |     $bucketDictTerm = 
$globalDictTerm.loadBucketDictionary(bucketId, true);
          |   } catch (Exception e) {
          |     throw new RuntimeException(e);
@@ -411,13 +416,13 @@ case class DictEncode(left: Expression, mid: Expression, 
right: Expression) exte
 
     
ctx.addPartitionInitializationStatement(s"$initBucketDictFuncName(partitionIndex);");
 
-    defineCodeGen(ctx, ev, (arg1, arg2, arg3) => {
+    defineCodeGen(ctx, ev, (arg1, arg2, arg3, arg4) => {
       s"""$bucketDictTerm.encode($arg1)"""
     })
   }
 
-  override protected def nullSafeEval(input1: Any, input2: Any, input3: Any): 
Any = {
-    DictEncodeImpl.evaluate(input1.toString, input2.toString, input3.toString)
+  override protected def nullSafeEval(input1: Any, input2: Any, input3: Any, 
input4: Any): Any = {
+    DictEncodeImpl.evaluate(input1.toString, input2.toString, input3.toString, 
input4.toString)
   }
 
   override def eval(input: InternalRow): Any = {
@@ -432,8 +437,11 @@ case class DictEncode(left: Expression, mid: Expression, 
right: Expression) exte
 
   override def prettyName: String = "DICTENCODE"
 
-  override protected def withNewChildrenInternal(newFirst: Expression, 
newSecond: Expression, newThird: Expression): Expression = {
-    val newChildren = Seq(newFirst, newSecond, newThird)
+  override protected def withNewChildrenInternal(newFirst: Expression,
+                                                 newSecond: Expression,
+                                                 newThird: Expression,
+                                                 newFourth: Expression): 
Expression = {
+    val newChildren = Seq(newFirst, newSecond, newThird, newFourth)
     super.legacyWithNewChildren(newChildren)
   }
 }
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/DictEncodeImpl.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/DictEncodeImpl.scala
index a33f29d736..afd1d8fbcd 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/DictEncodeImpl.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/DictEncodeImpl.scala
@@ -30,19 +30,18 @@ object DictEncodeImpl {
       override def initialValue(): util.HashMap[String, NBucketDictionary] = 
new util.HashMap[String, NBucketDictionary]()
     }
 
-  def evaluate(inputValue: String, dictParams: String, bucketSize: String): 
Long = {
+  def evaluate(inputValue: String, dictParams: String, bucketSize: String, 
buildVersion: String): Long = {
     var cachedBucketDict = DictEncodeImpl.cacheBucketDict.get().get(dictParams)
     if (cachedBucketDict == null) {
-      cachedBucketDict = initBucketDict(dictParams, bucketSize)
+      cachedBucketDict = initBucketDict(dictParams, bucketSize, buildVersion)
     }
     cachedBucketDict.encode(inputValue)
   }
 
-  private def initBucketDict(dictParams: String, bucketSize: String): 
NBucketDictionary = {
+  private def initBucketDict(dictParams: String, bucketSize: String, 
buildVersion: String): NBucketDictionary = {
     val partitionID = TaskContext.get.partitionId
     val encodeBucketId = partitionID % bucketSize.toInt
-    val globalDict = new NGlobalDictionaryV2(dictParams)
-
+    val globalDict = new NGlobalDictionaryV2(dictParams, buildVersion.toLong)
     val cachedBucketDict = globalDict.loadBucketDictionary(encodeBucketId, 
true)
     DictEncodeImpl.cacheBucketDict.get.put(dictParams, cachedBucketDict)
     TaskContext.get().addTaskCompletionListener(new TaskCompletionListener {
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java
index e7fb062c1d..024b88c907 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java
@@ -18,6 +18,9 @@
 package org.apache.spark.dict;
 
 import java.io.IOException;
+import java.util.Arrays;
+
+import com.alibaba.nacos.common.JustForTest;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -25,6 +28,8 @@ import org.slf4j.LoggerFactory;
 import it.unimi.dsi.fastutil.objects.Object2LongMap;
 import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
 
+import static org.apache.spark.dict.NGlobalDictionaryV2.NO_VERSION_SPECIFIED;
+
 public class NBucketDictionary {
 
     protected static final Logger logger = 
LoggerFactory.getLogger(NBucketDictionary.class);
@@ -33,30 +38,58 @@ public class NBucketDictionary {
 
     private int bucketId;
 
+    private long buildVersion;
+
     private Object2LongMap<String> absoluteDictMap;
     // Relative dictionary needs to calculate dictionary code according to 
NGlobalDictMetaInfo's bucketOffsets
     private Object2LongMap<String> relativeDictMap;
 
-    NBucketDictionary(String baseDir, String workingDir, int bucketId, 
NGlobalDictMetaInfo metainfo)
-            throws IOException {
-        this(baseDir, workingDir, bucketId, metainfo, false);
-    }
-
-    NBucketDictionary(String baseDir, String workingDir, int bucketId, 
NGlobalDictMetaInfo metainfo, boolean isForColumnEncoding)
-            throws IOException {
-        this.workingDir = workingDir;
-        this.bucketId = bucketId;
-        final NGlobalDictStore globalDictStore = 
NGlobalDictStoreFactory.getResourceStore(baseDir);
+    private void initDictMap(int bucketId, NGlobalDictMetaInfo metainfo,
+                             boolean isForColumnEncoding, long buildVersion, 
NGlobalDictStore globalDictStore) throws IOException {
         Long[] versions = globalDictStore.listAllVersions();
         logger.debug("versions.length is {}", versions.length);
         if (versions.length == 0) {
             this.absoluteDictMap = new Object2LongOpenHashMap<>();
-        } else {
+        } else if (buildVersion == NO_VERSION_SPECIFIED || 
!Arrays.asList(versions).contains(buildVersion)) {
+            logger.info("Initializes dict map with the latest 
version:{}",versions[versions.length - 1]);
             this.absoluteDictMap = 
globalDictStore.getBucketDict(versions[versions.length - 1], metainfo, 
bucketId, isForColumnEncoding);
+        } else {
+            logger.info("Initializes dict map with the specified 
version:{}",buildVersion);
+            this.absoluteDictMap = globalDictStore.getBucketDict(buildVersion, 
metainfo, bucketId, isForColumnEncoding);
         }
         this.relativeDictMap = new Object2LongOpenHashMap<>();
     }
 
+    private void initDictMap(String baseDir, int bucketId, NGlobalDictMetaInfo 
metainfo, boolean isForColumnEncoding,
+            long buildVersion) throws IOException {
+        final NGlobalDictStore globalDictStore = 
NGlobalDictStoreFactory.getResourceStore(baseDir);
+        initDictMap(bucketId, metainfo, isForColumnEncoding, buildVersion, 
globalDictStore);
+    }
+
+    @JustForTest
+    private void initDictMapForS3Test(String baseDir, int bucketId, 
NGlobalDictMetaInfo metainfo, boolean isForColumnEncoding, long buildVersion) 
throws IOException {
+        final NGlobalDictStore globalDictStore = new 
NGlobalDictS3Store(baseDir);
+        initDictMap(bucketId, metainfo, isForColumnEncoding, buildVersion, 
globalDictStore);
+    }
+
+    @JustForTest
+    NBucketDictionary(String baseDir, String workingDir, int bucketId, 
NGlobalDictMetaInfo metainfo
+            , boolean isForColumnEncoding, long buildVersion, boolean 
justForTest)
+            throws IOException {
+        this.workingDir = workingDir;
+        this.bucketId = bucketId;
+        this.buildVersion = buildVersion;
+        initDictMapForS3Test(baseDir, bucketId, metainfo, isForColumnEncoding, 
this.buildVersion);
+    }
+
+    NBucketDictionary(String baseDir, String workingDir, int bucketId, 
NGlobalDictMetaInfo metainfo, boolean isForColumnEncoding, long buildVersion)
+            throws IOException {
+        this.workingDir = workingDir;
+        this.bucketId = bucketId;
+        this.buildVersion = buildVersion;
+        initDictMap(baseDir, bucketId, metainfo, isForColumnEncoding, 
this.buildVersion);
+    }
+
     NBucketDictionary(String workingDir) {
         this.workingDir = workingDir;
         this.absoluteDictMap = new Object2LongOpenHashMap<>();
@@ -78,7 +111,12 @@ public class NBucketDictionary {
     }
 
     public long encode(Object value) {
-        return absoluteDictMap.getLong(value.toString());
+        long encodeValue = absoluteDictMap.getLong(value.toString());
+        if (encodeValue == 0){
+            throw new IllegalArgumentException(String.format("DFTable encode 
key:%s with error value:%s",
+                    value.toString(), encodeValue ));
+        }
+        return encodeValue;
     }
 
     public void saveBucketDict(int bucketId) throws IOException {
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
index 41c4389d52..76b39cd67d 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
@@ -34,8 +34,9 @@ import scala.collection.JavaConverters._
 object NGlobalDictBuilderAssist extends Logging {
 
   @throws[IOException]
-  def resize(ref: TblColRef, seg: NDataSegment, bucketPartitionSize: Int, ss: 
SparkSession): Unit = {
-    val globalDict = new NGlobalDictionaryV2(seg.getProject, ref.getTable, 
ref.getName, seg.getConfig.getHdfsWorkingDirectory)
+  def resize(ref: TblColRef, seg: NDataSegment, bucketPartitionSize: Int, ss: 
SparkSession, resizeVersion: Long): Unit = {
+    val globalDict = new NGlobalDictionaryV2(seg.getProject, ref.getTable,
+      ref.getName, seg.getConfig.getHdfsWorkingDirectory, resizeVersion)
 
     val broadcastDict = ss.sparkContext.broadcast(globalDict)
     globalDict.prepareWrite()
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java
index 6aa9ade586..49c6a30ec1 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java
@@ -38,6 +38,8 @@ import org.slf4j.LoggerFactory;
 import it.unimi.dsi.fastutil.objects.Object2LongMap;
 import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
 
+import static org.apache.spark.dict.NGlobalDictionaryV2.NO_VERSION_SPECIFIED;
+
 public class NGlobalDictHDFSStore implements NGlobalDictStore {
 
     protected static final String VERSION_PREFIX = "version_";
@@ -381,17 +383,17 @@ public class NGlobalDictHDFSStore implements 
NGlobalDictStore {
     }
 
     @Override
-    public void commit(String workingDir, int maxVersions, long versionTTL) 
throws IOException {
+    public void commit(String workingDir, int maxVersions, long versionTTL, 
long buildVersion) throws IOException {
         Path workingPath = new Path(workingDir);
-        // copy working dir to newVersion dir
-        Path newVersionPath = new Path(basePath, VERSION_PREFIX + 
System.currentTimeMillis());
+        final long commitVersion = buildVersion == NO_VERSION_SPECIFIED ? 
System.currentTimeMillis() : buildVersion;
+        Path newVersionPath = new Path(basePath, VERSION_PREFIX + 
commitVersion);
         fileSystem.rename(workingPath, newVersionPath);
         logger.info("Commit from {} to {}", workingPath, newVersionPath);
         cleanUp(maxVersions, versionTTL);
     }
 
     @Override
-    public String getWorkingDir() {
+    public String getWorkingDir(long buildVersion) {
         return baseDir + WORKING_DIR;
     }
 
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictS3Store.java
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictS3Store.java
index 01b42b9699..24e6d046e6 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictS3Store.java
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictS3Store.java
@@ -71,21 +71,25 @@ public class NGlobalDictS3Store extends 
NGlobalDictHDFSStore {
     }
 
     @Override
-    public void commit(String workingDir, int maxVersions, long versionTTL) 
throws IOException {
+    public void commit(String workingDir, int maxVersions, long versionTTL, 
long buildVersion) throws IOException {
         cleanWorkingFlagPath(basePath);
         logger.info("Commit {}", workingDir);
         cleanUp(maxVersions, versionTTL);
     }
 
     @Override
-    public String getWorkingDir() {
-        Path path = getWorkingFlagPath(basePath);
+    public String getWorkingDir(long buildVersion) {
+        Path path = getWorkingFlagPath(basePath, buildVersion);
         long timestamp = 
Long.parseLong(path.getName().substring(WORKING_PREFIX.length()));
         return baseDir + VERSION_PREFIX + timestamp;
     }
 
     private Path getWorkingFlagPath(Path basePath) {
-        long timestamp = pathToVersion.getOrDefault(basePath.toString(), 
System.currentTimeMillis());
+        return getWorkingFlagPath(basePath, System.currentTimeMillis());
+    }
+
+    private Path getWorkingFlagPath(Path basePath, long buildVersion) {
+        long timestamp = pathToVersion.getOrDefault(basePath.toString(), 
buildVersion);
         pathToVersion.putIfAbsent(basePath.toString(), timestamp);
         try {
             FileSystem fileSystem = HadoopUtil.getFileSystem(basePath);
@@ -100,10 +104,10 @@ public class NGlobalDictS3Store extends 
NGlobalDictHDFSStore {
         } catch (IOException ioe) {
             logger.error("Get exception when get version", ioe);
         }
-        return getWorkingFlagPath(basePath, timestamp);
+        return generateWorkingFlagPath(basePath, timestamp);
     }
 
-    private Path getWorkingFlagPath(Path basePath, long version) {
+    private Path generateWorkingFlagPath(Path basePath, long version) {
         return new Path(basePath + "/" + WORKING_PREFIX + version);
     }
 
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictStore.java
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictStore.java
index 3a31d6dfb7..fd45d4c72b 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictStore.java
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictStore.java
@@ -48,7 +48,7 @@ public interface NGlobalDictStore {
 
     void writeMetaInfo(int bucketSize, String workingDir) throws IOException;
 
-    void commit(String workingDir, int maxVersions, long versionTTL) throws 
IOException;
+    void commit(String workingDir, int maxVersions, long versionTTL, long 
buildVersion) throws IOException;
 
-    String getWorkingDir();
+    String getWorkingDir(long buildVersion);
 }
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java
index 8f37e43db2..769ca389e4 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java
@@ -19,6 +19,9 @@ package org.apache.spark.dict;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Arrays;
+
+import com.alibaba.nacos.common.JustForTest;
 
 import org.apache.kylin.common.util.HadoopUtil;
 import org.slf4j.Logger;
@@ -29,6 +32,10 @@ import lombok.val;
 public class NGlobalDictionaryV2 implements Serializable {
 
     public static final String SEPARATOR = "_0_DOT_0_";
+
+    // The latest dictionary file is read by default if no version is specified
+    public static final long NO_VERSION_SPECIFIED = -1L;
+
     protected static final Logger logger = 
LoggerFactory.getLogger(NGlobalDictionaryV2.class);
     private NGlobalDictMetaInfo metadata;
 
@@ -36,27 +43,41 @@ public class NGlobalDictionaryV2 implements Serializable {
     private String project;
     private String sourceTable;
     private String sourceColumn;
+
+    // Default is -1L, which means that no version is specified.
+    // Dict data is read from the latest version each time
+    private final long buildVersion;
     private boolean isFirst = true;
 
-    public NGlobalDictionaryV2(String project, String sourceTable, String 
sourceColumn, String baseDir)
-            throws IOException {
-        this.project = project;
-        this.sourceTable = sourceTable;
-        this.sourceColumn = sourceColumn;
+    public NGlobalDictionaryV2(String dictParams, long buildVersion) throws 
IOException {
+        val dictInfo = dictParams.split(SEPARATOR);
+        this.project = dictInfo[0];
+        this.sourceTable = dictInfo[1];
+        this.sourceColumn = dictInfo[2];
+        this.baseDir = dictInfo[3];
         this.baseDir = baseDir + getResourceDir();
+        this.buildVersion = buildVersion;
         this.metadata = getMetaInfo();
         if (metadata != null) {
             isFirst = false;
         }
     }
 
-    public NGlobalDictionaryV2(String dictParams) throws IOException {
-        val dictInfo = dictParams.split(SEPARATOR);
-        this.project = dictInfo[0];
-        this.sourceTable = dictInfo[1];
-        this.sourceColumn = dictInfo[2];
-        this.baseDir = dictInfo[3];
+    // Note: You do not need to specify a build version
+    //       if you do not need to encode flat tables or
+    //       if you only need to read the latest dictionary data
+    public NGlobalDictionaryV2(String project, String sourceTable, String 
sourceColumn, String baseDir)
+            throws IOException {
+        this(project,sourceTable,sourceColumn,baseDir, NO_VERSION_SPECIFIED);
+    }
+
+    public NGlobalDictionaryV2(String project, String sourceTable, String 
sourceColumn, String baseDir, long buildVersion)
+            throws IOException {
+        this.project = project;
+        this.sourceTable = sourceTable;
+        this.sourceColumn = sourceColumn;
         this.baseDir = baseDir + getResourceDir();
+        this.buildVersion = buildVersion;
         this.metadata = getMetaInfo();
         if (metadata != null) {
             isFirst = false;
@@ -68,18 +89,31 @@ public class NGlobalDictionaryV2 implements Serializable {
     }
 
     private String getWorkingDir() {
-        return getResourceStore(baseDir).getWorkingDir();
+        return getResourceStore(baseDir).getWorkingDir(this.buildVersion);
     }
 
     public NBucketDictionary loadBucketDictionary(int bucketId) throws 
IOException {
         return loadBucketDictionary(bucketId, false);
     }
 
+    // Only for S3 test !
+    @JustForTest
+    public NBucketDictionary loadBucketDictionaryForTestS3(int bucketId) 
throws IOException {
+        if (null == metadata) {
+            metadata = getMetaInfoForTestS3();
+        }
+        return new NBucketDictionary(baseDir, getWorkingDir(), bucketId, 
metadata, false, this.buildVersion, true);
+    }
+
     public NBucketDictionary loadBucketDictionary(int bucketId, boolean 
isForColumnEncoding) throws IOException {
+        return loadBucketDictionary(bucketId, isForColumnEncoding, 
this.buildVersion);
+    }
+
+    public NBucketDictionary loadBucketDictionary(int bucketId, boolean 
isForColumnEncoding, long buildVersion) throws IOException {
         if (null == metadata) {
             metadata = getMetaInfo();
         }
-        return new NBucketDictionary(baseDir, getWorkingDir(), bucketId, 
metadata, isForColumnEncoding);
+        return new NBucketDictionary(baseDir, getWorkingDir(), bucketId, 
metadata, isForColumnEncoding, buildVersion);
     }
 
     public NBucketDictionary createNewBucketDictionary() {
@@ -91,25 +125,56 @@ public class NGlobalDictionaryV2 implements Serializable {
         globalDictStore.prepareForWrite(getWorkingDir());
     }
 
+    // Only for S3 test !
+    @JustForTest
+    public void prepareWriteS3() throws IOException {
+        NGlobalDictStore globalDictStore = new NGlobalDictS3Store(baseDir);
+        
globalDictStore.prepareForWrite(globalDictStore.getWorkingDir(this.buildVersion));
+    }
+
+    // Only for S3 test !
+    @JustForTest
+    public void writeMetaDictForTestS3(int bucketSize, int maxVersions, long 
versionTTL) throws IOException {
+        NGlobalDictStore globalDictStore = new NGlobalDictS3Store(baseDir);
+        String workingDir = globalDictStore.getWorkingDir(this.buildVersion);
+        globalDictStore.writeMetaInfo(bucketSize, workingDir);
+        globalDictStore.commit(workingDir, maxVersions, versionTTL, 
this.buildVersion);
+    }
+
     public void writeMetaDict(int bucketSize, int maxVersions, long 
versionTTL) throws IOException {
         NGlobalDictStore globalDictStore = getResourceStore(baseDir);
         globalDictStore.writeMetaInfo(bucketSize, getWorkingDir());
         commit(maxVersions, versionTTL);
     }
 
-    public NGlobalDictMetaInfo getMetaInfo() throws IOException {
-        NGlobalDictStore globalDictStore = getResourceStore(baseDir);
-        NGlobalDictMetaInfo metadata;
+
+    private NGlobalDictMetaInfo getMetaInfo(NGlobalDictStore globalDictStore) 
throws IOException {
+        NGlobalDictMetaInfo nGlobalDictMetaInfo;
         Long[] versions = globalDictStore.listAllVersions();
         logger.info("getMetaInfo versions.length is {}", versions.length);
-
         if (versions.length == 0) {
             return null;
+        } else if (this.buildVersion == NO_VERSION_SPECIFIED || 
!Arrays.asList(versions).contains(buildVersion)) {
+            logger.info("Initializes dict metainfo with the latest 
version:{}", versions[versions.length - 1]);
+            nGlobalDictMetaInfo = 
globalDictStore.getMetaInfo(versions[versions.length - 1]);
         } else {
-            metadata = globalDictStore.getMetaInfo(versions[versions.length - 
1]);
+            logger.info("Initializes dict metainfo with the specified 
version:{}", this.buildVersion);
+            nGlobalDictMetaInfo = 
globalDictStore.getMetaInfo(this.buildVersion);
         }
-        logger.info("getMetaInfo metadata is null : [{}]", metadata == null);
-        return metadata;
+        logger.info("getMetaInfo metadata is null : [{}]", nGlobalDictMetaInfo 
== null);
+        return nGlobalDictMetaInfo;
+    }
+
+    // Only for S3 test !
+    @JustForTest
+    public NGlobalDictMetaInfo getMetaInfoForTestS3() throws IOException {
+        NGlobalDictStore globalDictStore = new NGlobalDictS3Store(baseDir);
+        return getMetaInfo(globalDictStore);
+    }
+
+    public NGlobalDictMetaInfo getMetaInfo() throws IOException {
+        NGlobalDictStore globalDictStore = getResourceStore(baseDir);
+        return getMetaInfo(globalDictStore);
     }
 
     public int getBucketSizeOrDefault(int defaultSize) {
@@ -133,7 +198,7 @@ public class NGlobalDictionaryV2 implements Serializable {
 
     private void commit(int maxVersions, long versionTTL) throws IOException {
         NGlobalDictStore globalDictStore = getResourceStore(baseDir);
-        globalDictStore.commit(getWorkingDir(), maxVersions, versionTTL);
+        globalDictStore.commit(getWorkingDir(), maxVersions, versionTTL, 
this.buildVersion);
     }
 
     private NGlobalDictStore getResourceStore(String baseDir) {

Reply via email to