This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit c61dc4189968f29f8e56e98f72a16f100e9d6e2b Author: huangsheng <huangshen...@163.com> AuthorDate: Fri Sep 22 16:13:50 2023 +0800 KYLIN-5828 Concurrently dict v2 jobs lead to abnormal encoding result --- .../engine/spark/builder/CreateFlatTable.scala | 15 +-- .../engine/spark/builder/DFDictionaryBuilder.scala | 16 +-- .../engine/spark/builder/DFTableEncoder.scala | 13 ++- .../spark/builder/DictionaryBuilderHelper.java | 5 +- .../job/stage/build/FlatTableAndDictBase.scala | 18 ++-- .../engine/spark/dict/NGlobalDictionaryV2Test.java | 77 +++++++++------ .../kylin/engine/spark/builder/TestDFChooser.scala | 5 +- .../engine/spark/builder/TestGlobalDictBuild.scala | 58 +++++++++-- .../v3dict/GlobalDictionaryUpdateSuite.scala | 6 +- .../scala/org/apache/spark/sql/KapFunctions.scala | 4 +- .../sql/catalyst/expressions/KapExpresssions.scala | 24 +++-- .../org/apache/spark/sql/udf/DictEncodeImpl.scala | 9 +- .../org/apache/spark/dict/NBucketDictionary.java | 62 +++++++++--- .../spark/dict/NGlobalDictBuilderAssist.scala | 5 +- .../apache/spark/dict/NGlobalDictHDFSStore.java | 10 +- .../org/apache/spark/dict/NGlobalDictS3Store.java | 16 +-- .../org/apache/spark/dict/NGlobalDictStore.java | 4 +- .../org/apache/spark/dict/NGlobalDictionaryV2.java | 107 +++++++++++++++++---- 18 files changed, 326 insertions(+), 128 deletions(-) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala index b659b87969..654c5b9f6f 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala @@ -31,6 +31,7 @@ import org.apache.kylin.guava30.shaded.common.collect.Sets import org.apache.kylin.metadata.cube.cuboid.NSpanningTree import org.apache.kylin.metadata.cube.model.{NCubeJoinedFlatTableDesc, NDataSegment} import org.apache.kylin.metadata.model._ +import org.apache.spark.dict.NGlobalDictionaryV2.NO_VERSION_SPECIFIED import org.apache.spark.sql.functions.{col, expr} import org.apache.spark.sql.{Dataset, Row, SparkSession} @@ -115,12 +116,14 @@ class CreateFlatTable(val flatTable: IJoinedFlatTableDesc, dictCols: Set[TblColRef], encodeCols: Set[TblColRef]): Dataset[Row] = { val ccDataset = withColumn(ds, ccCols) + var buildVersion = System.currentTimeMillis() if (seg.isDictReady) { logInfo(s"Skip already built dict, segment: ${seg.getId} of dataflow: ${seg.getDataflow.getId}") + buildVersion = NO_VERSION_SPECIFIED } else { - buildDict(ccDataset, dictCols) + buildDict(ccDataset, dictCols, buildVersion) } - encodeColumn(ccDataset, encodeCols) + encodeColumn(ccDataset, encodeCols, buildVersion) } private def withColumn(ds: Dataset[Row], withCols: Set[TblColRef]): Dataset[Row] = { @@ -131,21 +134,21 @@ class CreateFlatTable(val flatTable: IJoinedFlatTableDesc, withDs } - private def buildDict(ds: Dataset[Row], dictCols: Set[TblColRef]): Unit = { + private def buildDict(ds: Dataset[Row], dictCols: Set[TblColRef], buildVersion: Long): Unit = { val matchedCols = if (seg.getIndexPlan.isSkipEncodeIntegerFamilyEnabled) { filterOutIntegerFamilyType(ds, dictCols) } else { selectColumnsInTable(ds, dictCols) } val builder = new DFDictionaryBuilder(ds, seg, ss, Sets.newHashSet(matchedCols.asJavaCollection)) - builder.buildDictSet() + builder.buildDictSet(buildVersion) } - private def encodeColumn(ds: Dataset[Row], encodeCols: Set[TblColRef]): Dataset[Row] = { + private def encodeColumn(ds: Dataset[Row], encodeCols: Set[TblColRef], buildVersion: Long): Dataset[Row] = { val matchedCols = selectColumnsInTable(ds, encodeCols) var encodeDs = ds if (matchedCols.nonEmpty) { - encodeDs = DFTableEncoder.encodeTable(ds, seg, matchedCols.asJava) + encodeDs = DFTableEncoder.encodeTable(ds, seg, matchedCols.asJava, buildVersion) } encodeDs } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala index af8399ca3a..d930fd178b 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala @@ -43,8 +43,8 @@ class DFDictionaryBuilder( val colRefSet: util.Set[TblColRef]) extends LogEx with Serializable { @throws[IOException] - def buildDictSet(): Unit = { - colRefSet.asScala.foreach(col => safeBuild(col)) + def buildDictSet(buildVersion: Long): Unit = { + colRefSet.asScala.foreach(col => safeBuild(col, buildVersion)) changeAQEConfig(true) } @@ -52,7 +52,7 @@ class DFDictionaryBuilder( private val originalAQE = ss.conf.get(AQE) @throws[IOException] - private[builder] def safeBuild(ref: TblColRef): Unit = { + private[builder] def safeBuild(ref: TblColRef, buildVersion: Long): Unit = { val sourceColumn = ref.getIdentity ZKHelper.tryZKJaasConfiguration(ss) val lock: Lock = KylinConfig.getInstanceFromEnv.getDistributedLockFactory @@ -64,7 +64,7 @@ class DFDictionaryBuilder( val bucketPartitionSize = logTime(s"calculating bucket size for $sourceColumn") { DictionaryBuilderHelper.calculateBucketSize(seg, ref, dictColDistinct) } - build(ref, bucketPartitionSize, dictColDistinct) + build(ref, bucketPartitionSize, dictColDistinct, buildVersion) } finally lock.unlock() } @@ -90,10 +90,12 @@ class DFDictionaryBuilder( """.stripMargin } + @throws[IOException] private[builder] def build(ref: TblColRef, bucketPartitionSize: Int, - afterDistinct: Dataset[Row]): Unit = logTime(s"building global dictionaries V2 for ${ref.getIdentity}") { - val globalDict = new NGlobalDictionaryV2(seg.getProject, ref.getTable, ref.getName, seg.getConfig.getHdfsWorkingDirectory) + afterDistinct: Dataset[Row], + buildVersion: Long): Unit = logTime(s"building global dictionaries V2 for ${ref.getIdentity}") { + val globalDict = new NGlobalDictionaryV2(seg.getProject, ref.getTable, ref.getName, seg.getConfig.getHdfsWorkingDirectory, buildVersion) globalDict.prepareWrite() val broadcastDict = ss.sparkContext.broadcast(globalDict) @@ -121,7 +123,7 @@ class DFDictionaryBuilder( if (seg.getConfig.isGlobalDictCheckEnabled) { logInfo(s"Start to check the correctness of the global dict, table: ${ref.getTableAlias}, col: ${ref.getName}") - val latestGD = new NGlobalDictionaryV2(seg.getProject, ref.getTable, ref.getName, seg.getConfig.getHdfsWorkingDirectory) + val latestGD = new NGlobalDictionaryV2(seg.getProject, ref.getTable, ref.getName, seg.getConfig.getHdfsWorkingDirectory, buildVersion) for (bid <- 0 until globalDict.getMetaInfo.getBucketSize) { val dMap = latestGD.loadBucketDictionary(bid).getAbsoluteDictMap val vdCount = dMap.values().stream().distinct().count() diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFTableEncoder.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFTableEncoder.scala index 9e3ca24734..a49c090838 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFTableEncoder.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFTableEncoder.scala @@ -22,6 +22,7 @@ import org.apache.kylin.engine.spark.job.NSparkCubingUtil._ import org.apache.kylin.metadata.cube.model.NDataSegment import org.apache.kylin.metadata.model.TblColRef import org.apache.spark.dict.NGlobalDictionaryV2 +import org.apache.spark.dict.NGlobalDictionaryV2.NO_VERSION_SPECIFIED import org.apache.spark.internal.Logging import org.apache.spark.sql.KapFunctions._ import org.apache.spark.sql.functions.{col, _} @@ -34,7 +35,7 @@ import scala.collection.mutable._ object DFTableEncoder extends Logging { - def encodeTable(ds: Dataset[Row], seg: NDataSegment, cols: util.Set[TblColRef]): Dataset[Row] = { + def encodeTable(ds: Dataset[Row], seg: NDataSegment, cols: util.Set[TblColRef], buildVersion: Long): Dataset[Row] = { val structType = ds.schema var partitionedDs = ds @@ -61,17 +62,21 @@ object DFTableEncoder extends Logging { val encodingArgs = encodingCols.map { ref => - val globalDict = new NGlobalDictionaryV2(seg.getProject, ref.getTable, ref.getName, seg.getConfig.getHdfsWorkingDirectory) + val globalDict = new NGlobalDictionaryV2(seg.getProject, ref.getTable, + ref.getName, seg.getConfig.getHdfsWorkingDirectory, buildVersion) val bucketSize = globalDict.getBucketSizeOrDefault(seg.getConfig.getGlobalDictV2MinHashPartitions) val enlargedBucketSize = (((minBucketSize / bucketSize) + 1) * bucketSize).toInt - + logInfo(s"[EncodeTable/${buildVersion}] bucketSize:$bucketSize") val encodeColRef = convertFromDot(ref.getBackTickIdentity) val columnIndex = structType.fieldIndex(encodeColRef) val dictParams = Array(seg.getProject, ref.getTable, ref.getName, seg.getConfig.getHdfsWorkingDirectory) .mkString(SEPARATOR) val aliasName = structType.apply(columnIndex).name.concat(ENCODE_SUFFIX) - val encodeCol = dict_encode(col(encodeColRef).cast(StringType), lit(dictParams), lit(bucketSize).cast(StringType)).as(aliasName) + val encodeCol = dict_encode(col(encodeColRef).cast(StringType), + lit(dictParams), + lit(bucketSize).cast(StringType), + lit(buildVersion).cast(LongType)).as(aliasName) val columns = encodeCol (enlargedBucketSize, col(encodeColRef).cast(StringType), columns, aliasName, bucketSize == 1) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java index df0288164d..97ef4f433c 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java @@ -58,8 +58,9 @@ public class DictionaryBuilderHelper { */ public static int calculateBucketSize(NDataSegment seg, TblColRef col, Dataset<Row> afterDistinct) throws IOException { + long resizeVersion = System.currentTimeMillis(); NGlobalDictionaryV2 globalDict = new NGlobalDictionaryV2(seg.getProject(), col.getTable(), col.getName(), - seg.getConfig().getHdfsWorkingDirectory()); + seg.getConfig().getHdfsWorkingDirectory(), resizeVersion); int bucketPartitionSize = globalDict.getBucketSizeOrDefault(seg.getConfig().getGlobalDictV2MinHashPartitions()); int bucketThreshold = seg.getConfig().getGlobalDictV2ThresholdBucketSize(); int resizeBucketSize = bucketPartitionSize; @@ -107,7 +108,7 @@ public class DictionaryBuilderHelper { if (resizeBucketSize != bucketPartitionSize) { logger.info("Start building a global dictionary column for {}, need resize from {} to {} ", col.getName(), bucketPartitionSize, resizeBucketSize); - resize(col, seg, resizeBucketSize, afterDistinct.sparkSession()); + resize(col, seg, resizeBucketSize, afterDistinct.sparkSession(), resizeVersion); logger.info("End building a global dictionary column for {}, need resize from {} to {} ", col.getName(), bucketPartitionSize, resizeBucketSize); } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala index 935eaefaea..bacd24c225 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala @@ -44,6 +44,7 @@ import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBu import org.apache.kylin.metadata.cube.model.NDataSegment import org.apache.kylin.metadata.cube.planner.CostBasePlannerUtils import org.apache.kylin.metadata.model._ +import org.apache.spark.dict.NGlobalDictionaryV2.NO_VERSION_SPECIFIED import org.apache.spark.sql.KapFunctions.dict_encode_v3 import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.JoinType @@ -563,18 +564,20 @@ abstract class FlatTableAndDictBase(private val jobContext: SegmentJob, if (dictCols.isEmpty && encodeCols.isEmpty) { return table } + var buildVersion = System.currentTimeMillis() if (dataSegment.isDictReady) { logInfo(s"Skip DICTIONARY segment $segmentId") + buildVersion = NO_VERSION_SPECIFIED } else { // ensure at least one worker was registered before dictionary lock added. waitTillWorkerRegistered() - buildDict(table, dictCols) + buildDict(table, dictCols, buildVersion) } if (config.isV3DictEnable) { buildV3DictIfNeeded(table, encodeCols) } else { - encodeColumn(table, encodeCols) + encodeColumn(table, encodeCols, buildVersion) } } @@ -632,23 +635,24 @@ abstract class FlatTableAndDictBase(private val jobContext: SegmentJob, tableWithCcs } - private def buildDict(ds: Dataset[Row], dictCols: Set[TblColRef]): Unit = { + private def buildDict(ds: Dataset[Row], dictCols: Set[TblColRef], buildVersion: Long): Unit = { if (config.isV2DictEnable) { - logInfo("Build v2 dict default.") + logInfo(s"Build v2 dict default. " + + s"[${dataSegment.getModel.getAlias}] model dict build version is $buildVersion") var matchedCols = selectColumnsInTable(ds, dictCols) if (dataSegment.getIndexPlan.isSkipEncodeIntegerFamilyEnabled) { matchedCols = matchedCols.filterNot(_.getType.isIntegerFamily) } val builder = new DFDictionaryBuilder(ds, dataSegment, sparkSession, Sets.newHashSet(matchedCols.asJavaCollection)) - builder.buildDictSet() + builder.buildDictSet(buildVersion) } } - private def encodeColumn(ds: Dataset[Row], encodeCols: Set[TblColRef]): Dataset[Row] = { + private def encodeColumn(ds: Dataset[Row], encodeCols: Set[TblColRef], buildVersion: Long): Dataset[Row] = { val matchedCols = selectColumnsInTable(ds, encodeCols) var encodeDs = ds if (matchedCols.nonEmpty) { - encodeDs = DFTableEncoder.encodeTable(ds, dataSegment, matchedCols.asJava) + encodeDs = DFTableEncoder.encodeTable(ds, dataSegment, matchedCols.asJava, buildVersion) } encodeDs } diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/dict/NGlobalDictionaryV2Test.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/dict/NGlobalDictionaryV2Test.java index 8d29579e4a..6f30eb2fc3 100644 --- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/dict/NGlobalDictionaryV2Test.java +++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/dict/NGlobalDictionaryV2Test.java @@ -35,6 +35,7 @@ import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.dict.NBucketDictionary; import org.apache.spark.dict.NGlobalDictHDFSStore; import org.apache.spark.dict.NGlobalDictMetaInfo; +import org.apache.spark.dict.NGlobalDictS3Store; import org.apache.spark.dict.NGlobalDictStore; import org.apache.spark.dict.NGlobalDictionaryV2; import org.apache.spark.sql.Dataset; @@ -70,33 +71,33 @@ public class NGlobalDictionaryV2Test extends NLocalWithSparkSessionTest { @Test public void testGlobalDictHDFSStoreRoundTest() throws IOException { - testAll(); + testAll(false); } @Test public void testGlobalDictS3StoreRoundTest() throws IOException { // global s3 dict store overwriteSystemProp("kylin.engine.global-dict.store.impl", "org.apache.spark.dict.NGlobalDictS3Store"); - testAll(); + testAll(true); } - private void testAll() throws IOException { - roundTest(5); - roundTest(50); - roundTest(500); + private void testAll(boolean isS3Store) throws IOException { + roundTest(5, isS3Store); + roundTest(50, isS3Store); + roundTest(500, isS3Store); } - private void roundTest(int size) throws IOException { + private void roundTest(int size, boolean isS3Store) throws IOException { System.out.println("NGlobalDictionaryV2Test -> roundTest -> " + System.currentTimeMillis()); KylinConfig config = KylinConfig.getInstanceFromEnv(); - NGlobalDictionaryV2 dict1 = new NGlobalDictionaryV2("t1", "a", "spark", config.getHdfsWorkingDirectory()); - NGlobalDictionaryV2 dict2 = new NGlobalDictionaryV2("t2", "a", "local", config.getHdfsWorkingDirectory()); + NGlobalDictionaryV2 dict1 = new NGlobalDictionaryV2("t1", "a", "spark", config.getHdfsWorkingDirectory(), System.currentTimeMillis()); + NGlobalDictionaryV2 dict2 = new NGlobalDictionaryV2("t2", "a", "local", config.getHdfsWorkingDirectory(), System.currentTimeMillis()); List<String> stringList = generateRandomData(size); Collections.sort(stringList); - runWithSparkBuildGlobalDict(dict1, stringList); - runWithLocalBuildGlobalDict(dict2, stringList); - compareTwoVersionDict(dict1, dict2); - compareTwoModeVersionNum(dict1, dict2); + runWithSparkBuildGlobalDict(dict1, stringList, isS3Store); + runWithLocalBuildGlobalDict(dict2, stringList, isS3Store); + compareTwoVersionDict(dict1, dict2, isS3Store); + compareTwoModeVersionNum(dict1, dict2, isS3Store); } private List<String> generateRandomData(int size) { @@ -107,9 +108,14 @@ public class NGlobalDictionaryV2Test extends NLocalWithSparkSessionTest { return stringList; } - private void runWithSparkBuildGlobalDict(NGlobalDictionaryV2 dict, List<String> stringSet) throws IOException { + private void runWithSparkBuildGlobalDict(NGlobalDictionaryV2 dict, List<String> stringSet, boolean isS3Store) throws IOException { KylinConfig config = KylinConfig.getInstanceFromEnv(); - dict.prepareWrite(); + if (isS3Store) { + dict.prepareWriteS3(); + } else { + dict.prepareWrite(); + } + List<Row> rowList = Lists.newLinkedList(); for (String str : stringSet) { rowList.add(RowFactory.create(str)); @@ -122,7 +128,7 @@ public class NGlobalDictionaryV2Test extends NLocalWithSparkSessionTest { return new Tuple2<>(row.get(0).toString(), null); }).sortByKey().partitionBy(new HashPartitioner(BUCKET_SIZE)).mapPartitionsWithIndex( (Function2<Integer, Iterator<Tuple2<String, String>>, Iterator<Object>>) (bucketId, tuple2Iterator) -> { - NBucketDictionary bucketDict = dict.loadBucketDictionary(bucketId); + NBucketDictionary bucketDict = isS3Store? dict.loadBucketDictionaryForTestS3(bucketId) : dict.loadBucketDictionary(bucketId); while (tuple2Iterator.hasNext()) { Tuple2<String, String> tuple2 = tuple2Iterator.next(); bucketDict.addRelativeValue(tuple2._1); @@ -131,12 +137,20 @@ public class NGlobalDictionaryV2Test extends NLocalWithSparkSessionTest { return Collections.emptyIterator(); }, true).count(); - dict.writeMetaDict(BUCKET_SIZE, config.getGlobalDictV2MaxVersions(), config.getGlobalDictV2VersionTTL()); + if (isS3Store) { + dict.writeMetaDictForTestS3(BUCKET_SIZE, config.getGlobalDictV2MaxVersions(), config.getGlobalDictV2VersionTTL()); + } else { + dict.writeMetaDict(BUCKET_SIZE, config.getGlobalDictV2MaxVersions(), config.getGlobalDictV2VersionTTL()); + } } - private void runWithLocalBuildGlobalDict(NGlobalDictionaryV2 dict, List<String> stringSet) throws IOException { + private void runWithLocalBuildGlobalDict(NGlobalDictionaryV2 dict, List<String> stringSet, boolean isS3Store) throws IOException { KylinConfig config = KylinConfig.getInstanceFromEnv(); - dict.prepareWrite(); + if (isS3Store) { + dict.prepareWriteS3(); + } else { + dict.prepareWrite(); + } HashPartitioner partitioner = new HashPartitioner(BUCKET_SIZE); Map<Integer, List<String>> vmap = new HashMap<>(); for (int i = 0; i < BUCKET_SIZE; i++) { @@ -148,31 +162,36 @@ public class NGlobalDictionaryV2Test extends NLocalWithSparkSessionTest { } for (Map.Entry<Integer, List<String>> entry : vmap.entrySet()) { - NBucketDictionary bucketDict = dict.loadBucketDictionary(entry.getKey()); + NBucketDictionary bucketDict = isS3Store ? dict.loadBucketDictionaryForTestS3(entry.getKey()):dict.loadBucketDictionary(entry.getKey()); for (String s : entry.getValue()) { bucketDict.addRelativeValue(s); } bucketDict.saveBucketDict(entry.getKey()); } - dict.writeMetaDict(BUCKET_SIZE, config.getGlobalDictV2MaxVersions(), config.getGlobalDictV2VersionTTL()); + if (isS3Store) { + dict.writeMetaDictForTestS3(BUCKET_SIZE, config.getGlobalDictV2MaxVersions(), config.getGlobalDictV2VersionTTL()); + } else { + dict.writeMetaDict(BUCKET_SIZE, config.getGlobalDictV2MaxVersions(), config.getGlobalDictV2VersionTTL()); + } + } - private void compareTwoModeVersionNum(NGlobalDictionaryV2 dict1, NGlobalDictionaryV2 dict2) throws IOException { - NGlobalDictStore store1 = new NGlobalDictHDFSStore(dict1.getResourceDir()); - NGlobalDictStore store2 = new NGlobalDictHDFSStore(dict2.getResourceDir()); + private void compareTwoModeVersionNum(NGlobalDictionaryV2 dict1, NGlobalDictionaryV2 dict2, boolean isS3Store) throws IOException { + NGlobalDictStore store1 = isS3Store? new NGlobalDictS3Store(dict1.getResourceDir()) : new NGlobalDictHDFSStore(dict1.getResourceDir()); + NGlobalDictStore store2 = isS3Store? new NGlobalDictS3Store(dict2.getResourceDir()) : new NGlobalDictHDFSStore(dict2.getResourceDir()); Assert.assertEquals(store1.listAllVersions().length, store2.listAllVersions().length); } - private void compareTwoVersionDict(NGlobalDictionaryV2 dict1, NGlobalDictionaryV2 dict2) throws IOException { - NGlobalDictMetaInfo metadata1 = dict1.getMetaInfo(); - NGlobalDictMetaInfo metadata2 = dict2.getMetaInfo(); + private void compareTwoVersionDict(NGlobalDictionaryV2 dict1, NGlobalDictionaryV2 dict2, boolean isS3Store) throws IOException { + NGlobalDictMetaInfo metadata1 = isS3Store? dict1.getMetaInfoForTestS3():dict1.getMetaInfo(); + NGlobalDictMetaInfo metadata2 = isS3Store? dict2.getMetaInfoForTestS3():dict2.getMetaInfo(); // compare dict meta info Assert.assertEquals(metadata1, metadata2); for (int i = 0; i < metadata1.getBucketSize(); i++) { - NBucketDictionary bucket1 = dict1.loadBucketDictionary(i); - NBucketDictionary bucket2 = dict2.loadBucketDictionary(i); + NBucketDictionary bucket1 = isS3Store? dict1.loadBucketDictionaryForTestS3(i):dict1.loadBucketDictionary(i); + NBucketDictionary bucket2 = isS3Store? dict2.loadBucketDictionaryForTestS3(i):dict2.loadBucketDictionary(i); Object2LongMap<String> map1 = bucket1.getAbsoluteDictMap(); Object2LongMap<String> map2 = bucket2.getAbsoluteDictMap(); diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestDFChooser.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestDFChooser.scala index 9dd526d6bd..5ebedddd13 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestDFChooser.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestDFChooser.scala @@ -160,8 +160,9 @@ class TestDFChooser extends SparderBaseFunSuite with SharedSparkSession with Loc val dict1 = new NGlobalDictionaryV2(seg.getProject, col.getTable, col.getName, seg.getConfig.getHdfsWorkingDirectory) val meta1 = dict1.getMetaInfo val needResizeBucketSize = dict1.getBucketSizeOrDefault(seg.getConfig.getGlobalDictV2MinHashPartitions) + 10 - NGlobalDictBuilderAssist.resize(col, seg, needResizeBucketSize, spark) - val dict2 = new NGlobalDictionaryV2(seg.getProject, col.getTable, col.getName, seg.getConfig.getHdfsWorkingDirectory) + val resizeVersion = System.currentTimeMillis() + NGlobalDictBuilderAssist.resize(col, seg, needResizeBucketSize, spark, resizeVersion) + val dict2 = new NGlobalDictionaryV2(seg.getProject, col.getTable, col.getName, seg.getConfig.getHdfsWorkingDirectory, resizeVersion) Assert.assertEquals(meta1.getDictCount, dict2.getMetaInfo.getDictCount) Assert.assertEquals(meta1.getBucketSize + 10, dict2.getMetaInfo.getBucketSize) } diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala index 764229ff44..016844e7c8 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala @@ -17,10 +17,12 @@ */ package org.apache.kylin.engine.spark.builder -import org.apache.commons.lang3.RandomStringUtils +import org.apache.commons.lang3.{RandomStringUtils, StringUtils} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.kylin.common.KylinConfig +import org.apache.kylin.engine.spark.builder.v3dict.GlobalDictionaryBuilderHelper.genRandomData +import org.apache.kylin.engine.spark.job.NSparkCubingUtil import org.apache.kylin.metadata.cube.cuboid.{AdaptiveSpanningTree, NSpanningTreeFactory} import org.apache.kylin.metadata.cube.model.{NDataSegment, NDataflow, NDataflowManager} import org.apache.kylin.metadata.model.TblColRef @@ -48,6 +50,40 @@ class TestGlobalDictBuild extends SparderBaseFunSuite with SharedSparkSession wi config } + test("build v2 dict and encode flattable") { + val dsMgr: NDataflowManager = NDataflowManager.getInstance(getTestConfig, DEFAULT_PROJECT) + val df: NDataflow = dsMgr.getDataflow(CUBE_NAME) + val seg = df.getLastSegment + val nSpanningTree = NSpanningTreeFactory.fromLayouts(seg.getIndexPlan.getAllLayouts, df.getUuid) + val dictColSet = DictionaryBuilderHelper.extractTreeRelatedGlobalDicts(seg, nSpanningTree.getAllIndexEntities) + + val dictCol = dictColSet.iterator().next() + val encodeColName: String = StringUtils.split(dictCol.getTable, ".").apply(1) + NSparkCubingUtil.SEPARATOR + dictCol.getName + val randomDF = genRandomData(spark, encodeColName, 1000, 10) + + val dictionaryBuilder = new DFDictionaryBuilder(randomDF, seg, randomDF.sparkSession, dictColSet) + val colName = dictColSet.iterator().next() + val bucketPartitionSize = DictionaryBuilderHelper.calculateBucketSize(seg, colName, randomDF) + val buildVersion = System.currentTimeMillis() + dictionaryBuilder.build(colName, bucketPartitionSize, randomDF, buildVersion) + val dict = new NGlobalDictionaryV2(seg.getProject, + colName.getTable, + colName.getName, + seg.getConfig.getHdfsWorkingDirectory, + buildVersion) + val meta1 = dict.getMetaInfo + Assert.assertEquals(1000, meta1.getDictCount) + val encode = encodeColumn(randomDF, seg, dictColSet, buildVersion) + val rowsWithZero = encode.filter(encode(encode.columns(1)) === 0) + Assert.assertEquals(true, rowsWithZero.isEmpty) + // clean all + val cleanCol = dictColSet.iterator().next() + val cleanDict = new NGlobalDictionaryV2(seg.getProject, cleanCol.getTable, cleanCol.getName, seg.getConfig.getHdfsWorkingDirectory) + val cleanDictPath = new Path(seg.getConfig.getHdfsWorkingDirectory + cleanDict.getResourceDir) + val fileSystem = cleanDictPath.getFileSystem(new Configuration()) + fileSystem.delete(cleanDictPath, true) + } + test("global dict build and checkout bucket resize strategy") { val dsMgr: NDataflowManager = NDataflowManager.getInstance(getTestConfig, DEFAULT_PROJECT) Assert.assertTrue(getTestConfig.getHdfsWorkingDirectory.startsWith("file:")) @@ -150,10 +186,11 @@ class TestGlobalDictBuild extends SparderBaseFunSuite with SharedSparkSession wi val dictColumn = col(dataflow.getModel.getColumnIdByColumnName(dictCol.getIdentity).toString) val distinctDS = sampleDS.select(dictColumn).distinct() val dictBuilder = new DFDictionaryBuilder(sampleDS, segment, spark, dictColSet) - dictBuilder.build(dictCol, shufflePartitionSizeInt, distinctDS) + val buildVersion = System.currentTimeMillis() + dictBuilder.build(dictCol, shufflePartitionSizeInt, distinctDS, buildVersion) val dictStore = NGlobalDictStoreFactory.getResourceStore(segment.getConfig.getHdfsWorkingDirectory + dictV2.getResourceDir) - val versionPath = dictStore.getVersionDir(dictStore.listAllVersions()(0)) + val versionPath = dictStore.getVersionDir(buildVersion) val dictDirSize = fileSystem.listStatus(versionPath, new PathFilter { override def accept(path: Path): Boolean = { @@ -184,7 +221,7 @@ class TestGlobalDictBuild extends SparderBaseFunSuite with SharedSparkSession wi seg.getConfig.setProperty("kylin.engine.global-dict-aqe-enabled", "FALSE") dictionaryBuilder.changeAQEConfig(false) Assert.assertFalse(spark.conf.get("spark.sql.adaptive.enabled").toBoolean) - dictionaryBuilder.build(col, bucketPartitionSize, ds) + dictionaryBuilder.build(col, bucketPartitionSize, ds, System.currentTimeMillis()) Assert.assertTrue(spark.conf.get("spark.sql.adaptive.enabled").equals(originalAQE)) // false true @@ -196,7 +233,7 @@ class TestGlobalDictBuild extends SparderBaseFunSuite with SharedSparkSession wi dictionaryBuilder.changeAQEConfig(false) Assert.assertTrue(spark.conf.get("spark.sql.adaptive.enabled").equals(originalAQE)) - dictionaryBuilder.build(col, bucketPartitionSize, ds) + dictionaryBuilder.build(col, bucketPartitionSize, ds, System.currentTimeMillis()) Assert.assertTrue(spark.conf.get("spark.sql.adaptive.enabled").equals(originalAQE)) // true true @@ -210,12 +247,19 @@ class TestGlobalDictBuild extends SparderBaseFunSuite with SharedSparkSession wi val col = dictColSet.iterator().next() val ds = randomDataSet.select("26").distinct() val bucketPartitionSize = DictionaryBuilderHelper.calculateBucketSize(seg, col, ds) - dictionaryBuilder.build(col, bucketPartitionSize, ds) + val buildVersion = System.currentTimeMillis() + dictionaryBuilder.build(col, bucketPartitionSize, ds, buildVersion) val dict = new NGlobalDictionaryV2(seg.getProject, col.getTable, col.getName, - seg.getConfig.getHdfsWorkingDirectory) + seg.getConfig.getHdfsWorkingDirectory, buildVersion) dict.getMetaInfo } + def encodeColumn(ds: Dataset[Row], dataSegment: NDataSegment, + encodeCols: Set[TblColRef], buildVersion: Long): Dataset[Row] = { + val encodeDs = DFTableEncoder.encodeTable(ds, dataSegment, encodeCols, buildVersion) + encodeDs + } + def generateOriginData(count: Int, length: Int): Dataset[Row] = { var schema = new StructType diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionaryUpdateSuite.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionaryUpdateSuite.scala index 71b49a0924..e089abcdb8 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionaryUpdateSuite.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/v3dict/GlobalDictionaryUpdateSuite.scala @@ -87,11 +87,13 @@ class GlobalDictionaryUpdateSuite extends SparderBaseFunSuite with LocalMetadata val dictionaryBuilder = new DFDictionaryBuilder(randomDataSet, seg, randomDataSet.sparkSession, dictColSet) val colName = dictColSet.iterator().next() val bucketPartitionSize = DictionaryBuilderHelper.calculateBucketSize(seg, colName, randomDataSet) - dictionaryBuilder.build(colName, bucketPartitionSize, randomDataSet) + val buildVersion = System.currentTimeMillis() + dictionaryBuilder.build(colName, bucketPartitionSize, randomDataSet, buildVersion) val dict = new NGlobalDictionaryV2(seg.getProject, colName.getTable, colName.getName, - seg.getConfig.getHdfsWorkingDirectory) + seg.getConfig.getHdfsWorkingDirectory, + buildVersion) dict.getMetaInfo } } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KapFunctions.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KapFunctions.scala index 77cce256e1..bf18b4aee7 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KapFunctions.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KapFunctions.scala @@ -247,8 +247,8 @@ object KapFunctions { } } - def dict_encode(column: Column, dictParams: Column, bucketSize: Column): Column = { - Column(DictEncode(column.expr, dictParams.expr, bucketSize.expr)) + def dict_encode(column: Column, dictParams: Column, bucketSize: Column, buildVersion: Column): Column = { + Column(DictEncode(column.expr, dictParams.expr, bucketSize.expr, buildVersion.expr)) } def dict_encode_v3(column: Column, colName: String): Column = { diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala index 74beffaae3..a27fff2ad1 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala @@ -366,7 +366,9 @@ case class DictEncodeV3(child: Expression, col: String) extends UnaryExpression override protected def nullSafeEval(input: Any): Any = super.nullSafeEval(input) } -case class DictEncode(left: Expression, mid: Expression, right: Expression) extends TernaryExpression with ExpectsInputTypes { +case class DictEncode(left: Expression, mid: Expression, + right: Expression, buildVersion: Expression) + extends QuaternaryExpression with ExpectsInputTypes { def maxFields: Int = SQLConf.get.maxToStringFields @@ -376,7 +378,9 @@ case class DictEncode(left: Expression, mid: Expression, right: Expression) exte override def third: Expression = right - override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, StringType, StringType) + override def fourth: Expression = buildVersion + + override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, StringType, StringType, LongType) override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { @@ -395,13 +399,14 @@ case class DictEncode(left: Expression, mid: Expression, right: Expression) exte val dictParamsTerm = mid.simpleString(maxFields) val bucketSizeTerm = right.simpleString(maxFields).toInt + val version = buildVersion.simpleString(maxFields).toLong val initBucketDictFuncName = ctx.addNewFunction(s"init${bucketDictTerm.replace("[", "").replace("]", "")}BucketDict", s""" | private void init${bucketDictTerm.replace("[", "").replace("]", "")}BucketDict(int idx) { | try { | int bucketId = idx % $bucketSizeTerm; - | $globalDictTerm = new org.apache.spark.dict.NGlobalDictionaryV2("$dictParamsTerm"); + | $globalDictTerm = new org.apache.spark.dict.NGlobalDictionaryV2("$dictParamsTerm",${version}L); | $bucketDictTerm = $globalDictTerm.loadBucketDictionary(bucketId, true); | } catch (Exception e) { | throw new RuntimeException(e); @@ -411,13 +416,13 @@ case class DictEncode(left: Expression, mid: Expression, right: Expression) exte ctx.addPartitionInitializationStatement(s"$initBucketDictFuncName(partitionIndex);"); - defineCodeGen(ctx, ev, (arg1, arg2, arg3) => { + defineCodeGen(ctx, ev, (arg1, arg2, arg3, arg4) => { s"""$bucketDictTerm.encode($arg1)""" }) } - override protected def nullSafeEval(input1: Any, input2: Any, input3: Any): Any = { - DictEncodeImpl.evaluate(input1.toString, input2.toString, input3.toString) + override protected def nullSafeEval(input1: Any, input2: Any, input3: Any, input4: Any): Any = { + DictEncodeImpl.evaluate(input1.toString, input2.toString, input3.toString, input4.toString) } override def eval(input: InternalRow): Any = { @@ -432,8 +437,11 @@ case class DictEncode(left: Expression, mid: Expression, right: Expression) exte override def prettyName: String = "DICTENCODE" - override protected def withNewChildrenInternal(newFirst: Expression, newSecond: Expression, newThird: Expression): Expression = { - val newChildren = Seq(newFirst, newSecond, newThird) + override protected def withNewChildrenInternal(newFirst: Expression, + newSecond: Expression, + newThird: Expression, + newFourth: Expression): Expression = { + val newChildren = Seq(newFirst, newSecond, newThird, newFourth) super.legacyWithNewChildren(newChildren) } } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/DictEncodeImpl.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/DictEncodeImpl.scala index a33f29d736..afd1d8fbcd 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/DictEncodeImpl.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/DictEncodeImpl.scala @@ -30,19 +30,18 @@ object DictEncodeImpl { override def initialValue(): util.HashMap[String, NBucketDictionary] = new util.HashMap[String, NBucketDictionary]() } - def evaluate(inputValue: String, dictParams: String, bucketSize: String): Long = { + def evaluate(inputValue: String, dictParams: String, bucketSize: String, buildVersion: String): Long = { var cachedBucketDict = DictEncodeImpl.cacheBucketDict.get().get(dictParams) if (cachedBucketDict == null) { - cachedBucketDict = initBucketDict(dictParams, bucketSize) + cachedBucketDict = initBucketDict(dictParams, bucketSize, buildVersion) } cachedBucketDict.encode(inputValue) } - private def initBucketDict(dictParams: String, bucketSize: String): NBucketDictionary = { + private def initBucketDict(dictParams: String, bucketSize: String, buildVersion: String): NBucketDictionary = { val partitionID = TaskContext.get.partitionId val encodeBucketId = partitionID % bucketSize.toInt - val globalDict = new NGlobalDictionaryV2(dictParams) - + val globalDict = new NGlobalDictionaryV2(dictParams, buildVersion.toLong) val cachedBucketDict = globalDict.loadBucketDictionary(encodeBucketId, true) DictEncodeImpl.cacheBucketDict.get.put(dictParams, cachedBucketDict) TaskContext.get().addTaskCompletionListener(new TaskCompletionListener { diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java index e7fb062c1d..024b88c907 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java @@ -18,6 +18,9 @@ package org.apache.spark.dict; import java.io.IOException; +import java.util.Arrays; + +import com.alibaba.nacos.common.JustForTest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +28,8 @@ import org.slf4j.LoggerFactory; import it.unimi.dsi.fastutil.objects.Object2LongMap; import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; +import static org.apache.spark.dict.NGlobalDictionaryV2.NO_VERSION_SPECIFIED; + public class NBucketDictionary { protected static final Logger logger = LoggerFactory.getLogger(NBucketDictionary.class); @@ -33,30 +38,58 @@ public class NBucketDictionary { private int bucketId; + private long buildVersion; + private Object2LongMap<String> absoluteDictMap; // Relative dictionary needs to calculate dictionary code according to NGlobalDictMetaInfo's bucketOffsets private Object2LongMap<String> relativeDictMap; - NBucketDictionary(String baseDir, String workingDir, int bucketId, NGlobalDictMetaInfo metainfo) - throws IOException { - this(baseDir, workingDir, bucketId, metainfo, false); - } - - NBucketDictionary(String baseDir, String workingDir, int bucketId, NGlobalDictMetaInfo metainfo, boolean isForColumnEncoding) - throws IOException { - this.workingDir = workingDir; - this.bucketId = bucketId; - final NGlobalDictStore globalDictStore = NGlobalDictStoreFactory.getResourceStore(baseDir); + private void initDictMap(int bucketId, NGlobalDictMetaInfo metainfo, + boolean isForColumnEncoding, long buildVersion, NGlobalDictStore globalDictStore) throws IOException { Long[] versions = globalDictStore.listAllVersions(); logger.debug("versions.length is {}", versions.length); if (versions.length == 0) { this.absoluteDictMap = new Object2LongOpenHashMap<>(); - } else { + } else if (buildVersion == NO_VERSION_SPECIFIED || !Arrays.asList(versions).contains(buildVersion)) { + logger.info("Initializes dict map with the latest version:{}",versions[versions.length - 1]); this.absoluteDictMap = globalDictStore.getBucketDict(versions[versions.length - 1], metainfo, bucketId, isForColumnEncoding); + } else { + logger.info("Initializes dict map with the specified version:{}",buildVersion); + this.absoluteDictMap = globalDictStore.getBucketDict(buildVersion, metainfo, bucketId, isForColumnEncoding); } this.relativeDictMap = new Object2LongOpenHashMap<>(); } + private void initDictMap(String baseDir, int bucketId, NGlobalDictMetaInfo metainfo, boolean isForColumnEncoding, + long buildVersion) throws IOException { + final NGlobalDictStore globalDictStore = NGlobalDictStoreFactory.getResourceStore(baseDir); + initDictMap(bucketId, metainfo, isForColumnEncoding, buildVersion, globalDictStore); + } + + @JustForTest + private void initDictMapForS3Test(String baseDir, int bucketId, NGlobalDictMetaInfo metainfo, boolean isForColumnEncoding, long buildVersion) throws IOException { + final NGlobalDictStore globalDictStore = new NGlobalDictS3Store(baseDir); + initDictMap(bucketId, metainfo, isForColumnEncoding, buildVersion, globalDictStore); + } + + @JustForTest + NBucketDictionary(String baseDir, String workingDir, int bucketId, NGlobalDictMetaInfo metainfo + , boolean isForColumnEncoding, long buildVersion, boolean justForTest) + throws IOException { + this.workingDir = workingDir; + this.bucketId = bucketId; + this.buildVersion = buildVersion; + initDictMapForS3Test(baseDir, bucketId, metainfo, isForColumnEncoding, this.buildVersion); + } + + NBucketDictionary(String baseDir, String workingDir, int bucketId, NGlobalDictMetaInfo metainfo, boolean isForColumnEncoding, long buildVersion) + throws IOException { + this.workingDir = workingDir; + this.bucketId = bucketId; + this.buildVersion = buildVersion; + initDictMap(baseDir, bucketId, metainfo, isForColumnEncoding, this.buildVersion); + } + NBucketDictionary(String workingDir) { this.workingDir = workingDir; this.absoluteDictMap = new Object2LongOpenHashMap<>(); @@ -78,7 +111,12 @@ public class NBucketDictionary { } public long encode(Object value) { - return absoluteDictMap.getLong(value.toString()); + long encodeValue = absoluteDictMap.getLong(value.toString()); + if (encodeValue == 0){ + throw new IllegalArgumentException(String.format("DFTable encode key:%s with error value:%s", + value.toString(), encodeValue )); + } + return encodeValue; } public void saveBucketDict(int bucketId) throws IOException { diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala index 41c4389d52..76b39cd67d 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala @@ -34,8 +34,9 @@ import scala.collection.JavaConverters._ object NGlobalDictBuilderAssist extends Logging { @throws[IOException] - def resize(ref: TblColRef, seg: NDataSegment, bucketPartitionSize: Int, ss: SparkSession): Unit = { - val globalDict = new NGlobalDictionaryV2(seg.getProject, ref.getTable, ref.getName, seg.getConfig.getHdfsWorkingDirectory) + def resize(ref: TblColRef, seg: NDataSegment, bucketPartitionSize: Int, ss: SparkSession, resizeVersion: Long): Unit = { + val globalDict = new NGlobalDictionaryV2(seg.getProject, ref.getTable, + ref.getName, seg.getConfig.getHdfsWorkingDirectory, resizeVersion) val broadcastDict = ss.sparkContext.broadcast(globalDict) globalDict.prepareWrite() diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java index 6aa9ade586..49c6a30ec1 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java @@ -38,6 +38,8 @@ import org.slf4j.LoggerFactory; import it.unimi.dsi.fastutil.objects.Object2LongMap; import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; +import static org.apache.spark.dict.NGlobalDictionaryV2.NO_VERSION_SPECIFIED; + public class NGlobalDictHDFSStore implements NGlobalDictStore { protected static final String VERSION_PREFIX = "version_"; @@ -381,17 +383,17 @@ public class NGlobalDictHDFSStore implements NGlobalDictStore { } @Override - public void commit(String workingDir, int maxVersions, long versionTTL) throws IOException { + public void commit(String workingDir, int maxVersions, long versionTTL, long buildVersion) throws IOException { Path workingPath = new Path(workingDir); - // copy working dir to newVersion dir - Path newVersionPath = new Path(basePath, VERSION_PREFIX + System.currentTimeMillis()); + final long commitVersion = buildVersion == NO_VERSION_SPECIFIED ? System.currentTimeMillis() : buildVersion; + Path newVersionPath = new Path(basePath, VERSION_PREFIX + commitVersion); fileSystem.rename(workingPath, newVersionPath); logger.info("Commit from {} to {}", workingPath, newVersionPath); cleanUp(maxVersions, versionTTL); } @Override - public String getWorkingDir() { + public String getWorkingDir(long buildVersion) { return baseDir + WORKING_DIR; } diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictS3Store.java b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictS3Store.java index 01b42b9699..24e6d046e6 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictS3Store.java +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictS3Store.java @@ -71,21 +71,25 @@ public class NGlobalDictS3Store extends NGlobalDictHDFSStore { } @Override - public void commit(String workingDir, int maxVersions, long versionTTL) throws IOException { + public void commit(String workingDir, int maxVersions, long versionTTL, long buildVersion) throws IOException { cleanWorkingFlagPath(basePath); logger.info("Commit {}", workingDir); cleanUp(maxVersions, versionTTL); } @Override - public String getWorkingDir() { - Path path = getWorkingFlagPath(basePath); + public String getWorkingDir(long buildVersion) { + Path path = getWorkingFlagPath(basePath, buildVersion); long timestamp = Long.parseLong(path.getName().substring(WORKING_PREFIX.length())); return baseDir + VERSION_PREFIX + timestamp; } private Path getWorkingFlagPath(Path basePath) { - long timestamp = pathToVersion.getOrDefault(basePath.toString(), System.currentTimeMillis()); + return getWorkingFlagPath(basePath, System.currentTimeMillis()); + } + + private Path getWorkingFlagPath(Path basePath, long buildVersion) { + long timestamp = pathToVersion.getOrDefault(basePath.toString(), buildVersion); pathToVersion.putIfAbsent(basePath.toString(), timestamp); try { FileSystem fileSystem = HadoopUtil.getFileSystem(basePath); @@ -100,10 +104,10 @@ public class NGlobalDictS3Store extends NGlobalDictHDFSStore { } catch (IOException ioe) { logger.error("Get exception when get version", ioe); } - return getWorkingFlagPath(basePath, timestamp); + return generateWorkingFlagPath(basePath, timestamp); } - private Path getWorkingFlagPath(Path basePath, long version) { + private Path generateWorkingFlagPath(Path basePath, long version) { return new Path(basePath + "/" + WORKING_PREFIX + version); } diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictStore.java b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictStore.java index 3a31d6dfb7..fd45d4c72b 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictStore.java +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictStore.java @@ -48,7 +48,7 @@ public interface NGlobalDictStore { void writeMetaInfo(int bucketSize, String workingDir) throws IOException; - void commit(String workingDir, int maxVersions, long versionTTL) throws IOException; + void commit(String workingDir, int maxVersions, long versionTTL, long buildVersion) throws IOException; - String getWorkingDir(); + String getWorkingDir(long buildVersion); } diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java index 8f37e43db2..769ca389e4 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java @@ -19,6 +19,9 @@ package org.apache.spark.dict; import java.io.IOException; import java.io.Serializable; +import java.util.Arrays; + +import com.alibaba.nacos.common.JustForTest; import org.apache.kylin.common.util.HadoopUtil; import org.slf4j.Logger; @@ -29,6 +32,10 @@ import lombok.val; public class NGlobalDictionaryV2 implements Serializable { public static final String SEPARATOR = "_0_DOT_0_"; + + // The latest dictionary file is read by default if no version is specified + public static final long NO_VERSION_SPECIFIED = -1L; + protected static final Logger logger = LoggerFactory.getLogger(NGlobalDictionaryV2.class); private NGlobalDictMetaInfo metadata; @@ -36,27 +43,41 @@ public class NGlobalDictionaryV2 implements Serializable { private String project; private String sourceTable; private String sourceColumn; + + // Default is -1L, which means that no version is specified. + // Dict data is read from the latest version each time + private final long buildVersion; private boolean isFirst = true; - public NGlobalDictionaryV2(String project, String sourceTable, String sourceColumn, String baseDir) - throws IOException { - this.project = project; - this.sourceTable = sourceTable; - this.sourceColumn = sourceColumn; + public NGlobalDictionaryV2(String dictParams, long buildVersion) throws IOException { + val dictInfo = dictParams.split(SEPARATOR); + this.project = dictInfo[0]; + this.sourceTable = dictInfo[1]; + this.sourceColumn = dictInfo[2]; + this.baseDir = dictInfo[3]; this.baseDir = baseDir + getResourceDir(); + this.buildVersion = buildVersion; this.metadata = getMetaInfo(); if (metadata != null) { isFirst = false; } } - public NGlobalDictionaryV2(String dictParams) throws IOException { - val dictInfo = dictParams.split(SEPARATOR); - this.project = dictInfo[0]; - this.sourceTable = dictInfo[1]; - this.sourceColumn = dictInfo[2]; - this.baseDir = dictInfo[3]; + // Note: You do not need to specify a build version + // if you do not need to encode flat tables or + // if you only need to read the latest dictionary data + public NGlobalDictionaryV2(String project, String sourceTable, String sourceColumn, String baseDir) + throws IOException { + this(project,sourceTable,sourceColumn,baseDir, NO_VERSION_SPECIFIED); + } + + public NGlobalDictionaryV2(String project, String sourceTable, String sourceColumn, String baseDir, long buildVersion) + throws IOException { + this.project = project; + this.sourceTable = sourceTable; + this.sourceColumn = sourceColumn; this.baseDir = baseDir + getResourceDir(); + this.buildVersion = buildVersion; this.metadata = getMetaInfo(); if (metadata != null) { isFirst = false; @@ -68,18 +89,31 @@ public class NGlobalDictionaryV2 implements Serializable { } private String getWorkingDir() { - return getResourceStore(baseDir).getWorkingDir(); + return getResourceStore(baseDir).getWorkingDir(this.buildVersion); } public NBucketDictionary loadBucketDictionary(int bucketId) throws IOException { return loadBucketDictionary(bucketId, false); } + // Only for S3 test ! + @JustForTest + public NBucketDictionary loadBucketDictionaryForTestS3(int bucketId) throws IOException { + if (null == metadata) { + metadata = getMetaInfoForTestS3(); + } + return new NBucketDictionary(baseDir, getWorkingDir(), bucketId, metadata, false, this.buildVersion, true); + } + public NBucketDictionary loadBucketDictionary(int bucketId, boolean isForColumnEncoding) throws IOException { + return loadBucketDictionary(bucketId, isForColumnEncoding, this.buildVersion); + } + + public NBucketDictionary loadBucketDictionary(int bucketId, boolean isForColumnEncoding, long buildVersion) throws IOException { if (null == metadata) { metadata = getMetaInfo(); } - return new NBucketDictionary(baseDir, getWorkingDir(), bucketId, metadata, isForColumnEncoding); + return new NBucketDictionary(baseDir, getWorkingDir(), bucketId, metadata, isForColumnEncoding, buildVersion); } public NBucketDictionary createNewBucketDictionary() { @@ -91,25 +125,56 @@ public class NGlobalDictionaryV2 implements Serializable { globalDictStore.prepareForWrite(getWorkingDir()); } + // Only for S3 test ! + @JustForTest + public void prepareWriteS3() throws IOException { + NGlobalDictStore globalDictStore = new NGlobalDictS3Store(baseDir); + globalDictStore.prepareForWrite(globalDictStore.getWorkingDir(this.buildVersion)); + } + + // Only for S3 test ! + @JustForTest + public void writeMetaDictForTestS3(int bucketSize, int maxVersions, long versionTTL) throws IOException { + NGlobalDictStore globalDictStore = new NGlobalDictS3Store(baseDir); + String workingDir = globalDictStore.getWorkingDir(this.buildVersion); + globalDictStore.writeMetaInfo(bucketSize, workingDir); + globalDictStore.commit(workingDir, maxVersions, versionTTL, this.buildVersion); + } + public void writeMetaDict(int bucketSize, int maxVersions, long versionTTL) throws IOException { NGlobalDictStore globalDictStore = getResourceStore(baseDir); globalDictStore.writeMetaInfo(bucketSize, getWorkingDir()); commit(maxVersions, versionTTL); } - public NGlobalDictMetaInfo getMetaInfo() throws IOException { - NGlobalDictStore globalDictStore = getResourceStore(baseDir); - NGlobalDictMetaInfo metadata; + + private NGlobalDictMetaInfo getMetaInfo(NGlobalDictStore globalDictStore) throws IOException { + NGlobalDictMetaInfo nGlobalDictMetaInfo; Long[] versions = globalDictStore.listAllVersions(); logger.info("getMetaInfo versions.length is {}", versions.length); - if (versions.length == 0) { return null; + } else if (this.buildVersion == NO_VERSION_SPECIFIED || !Arrays.asList(versions).contains(buildVersion)) { + logger.info("Initializes dict metainfo with the latest version:{}", versions[versions.length - 1]); + nGlobalDictMetaInfo = globalDictStore.getMetaInfo(versions[versions.length - 1]); } else { - metadata = globalDictStore.getMetaInfo(versions[versions.length - 1]); + logger.info("Initializes dict metainfo with the specified version:{}", this.buildVersion); + nGlobalDictMetaInfo = globalDictStore.getMetaInfo(this.buildVersion); } - logger.info("getMetaInfo metadata is null : [{}]", metadata == null); - return metadata; + logger.info("getMetaInfo metadata is null : [{}]", nGlobalDictMetaInfo == null); + return nGlobalDictMetaInfo; + } + + // Only for S3 test ! + @JustForTest + public NGlobalDictMetaInfo getMetaInfoForTestS3() throws IOException { + NGlobalDictStore globalDictStore = new NGlobalDictS3Store(baseDir); + return getMetaInfo(globalDictStore); + } + + public NGlobalDictMetaInfo getMetaInfo() throws IOException { + NGlobalDictStore globalDictStore = getResourceStore(baseDir); + return getMetaInfo(globalDictStore); } public int getBucketSizeOrDefault(int defaultSize) { @@ -133,7 +198,7 @@ public class NGlobalDictionaryV2 implements Serializable { private void commit(int maxVersions, long versionTTL) throws IOException { NGlobalDictStore globalDictStore = getResourceStore(baseDir); - globalDictStore.commit(getWorkingDir(), maxVersions, versionTTL); + globalDictStore.commit(getWorkingDir(), maxVersions, versionTTL, this.buildVersion); } private NGlobalDictStore getResourceStore(String baseDir) {