This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 1d5f176fc3c940309dd9fd91f68b2bcd14f37975 Author: Zhichao Zhang <[email protected]> AuthorDate: Sat Mar 20 15:53:02 2021 +0800 KYLIN-4937 Verify the uniqueness of the global dictionary after building global dictionary --- .../org/apache/kylin/common/KylinConfigBase.java | 4 +++ .../spark/dict/NGlobalDictBuilderAssist.scala | 30 ++++++++++++++++++++++ .../spark/builder/CubeDictionaryBuilder.scala | 12 ++++++--- .../engine/spark/builder/TestCreateFlatTable.scala | 2 +- 4 files changed, 43 insertions(+), 5 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 2ed1cb5..f255c1c 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -699,6 +699,10 @@ public abstract class KylinConfigBase implements Serializable { return Long.parseLong(getOptional("kylin.dictionary.globalV2-version-ttl", "259200000")); } + public boolean isCheckGlobalDictV2() { + return Boolean.parseBoolean(getOptional("kylin.dictionary.globalV2-check", "true")); + } + // ============================================================================ // CUBE // ============================================================================ diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala index a16a9da..dbac8b8 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala @@ -71,4 +71,34 @@ object NGlobalDictBuilderAssist extends Logging { desc.kylinconf.getGlobalDictV2MaxVersions, desc.kylinconf.getGlobalDictV2VersionTTL) } + /** + * check the global dict + */ + @throws[IOException] + def checkGlobalDict(ref: ColumnDesc, desc: SegmentInfo, bucketPartitionSize: Int, + ss: SparkSession): Unit = { + if (desc.kylinconf.isCheckGlobalDictV2) { + val globalDict = new NGlobalDictionary(desc.project, ref.tableAliasName, ref.columnName, desc.kylinconf.getHdfsWorkingDirectory) + val broadcastDict = ss.sparkContext.broadcast(globalDict) + import ss.implicits._ + val existsDictDs = ss.createDataset(0 to bucketPartitionSize) + .flatMap { + bucketId => + val gDict: NGlobalDictionary = broadcastDict.value + val bucketDict: NBucketDictionary = gDict.loadBucketDictionary(bucketId) + val tupleList = new util.ArrayList[(String, Long)](bucketDict.getAbsoluteDictMap.size) + bucketDict.getAbsoluteDictMap.object2LongEntrySet.asScala + .foreach(dictTuple => tupleList.add((dictTuple.getKey, dictTuple.getLongValue))) + tupleList.asScala.iterator + } + val valueCount = existsDictDs.dropDuplicates("_1").count() + val keyCount = existsDictDs.dropDuplicates("_2").count() + if (valueCount != keyCount) { + logError(s"Global dict build error on column ${ref.columnName}, " + + s"key distinct count is ${keyCount}, and value distinct count is ${valueCount}.") + throw new RuntimeException(s"Global dict build error on column ${ref.columnName}, " + + s"key distinct count is ${keyCount}, and value distinct count is ${valueCount}.") + } + } + } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala index 5c15f7d..b39486b 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala @@ -19,14 +19,13 @@ package org.apache.kylin.engine.spark.builder import java.io.IOException import java.util - import org.apache.kylin.common.KylinConfig import org.apache.kylin.common.lock.DistributedLock import org.apache.kylin.common.util.HadoopUtil import org.apache.kylin.engine.spark.builder.CubeBuilderHelper._ import org.apache.kylin.engine.spark.job.NSparkCubingUtil import org.apache.kylin.engine.spark.metadata.{ColumnDesc, SegmentInfo} -import org.apache.spark.dict.NGlobalDictionary +import org.apache.spark.dict.{NGlobalDictBuilderAssist, NGlobalDictionary} import org.apache.spark.internal.Logging import org.apache.spark.sql.functions.{col, expr} import org.apache.spark.sql.types.StringType @@ -54,8 +53,10 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row], logInfo(s"Start building global dictionaries V2 for seg $seg") val m = s"Build global dictionaries V2 for seg $seg succeeded" time(m, colRefSet.asScala.foreach(col => safeBuild(col))) - // set the original value to 'spark.sql.adaptive.enabled' - ss.conf.set("spark.sql.adaptive.enabled", aeOriginalValue); + if (aeOriginalValue) { + // set the original value to 'spark.sql.adaptive.enabled' + ss.conf.set("spark.sql.adaptive.enabled", aeOriginalValue); + } } @throws[IOException] @@ -95,6 +96,9 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row], } globalDict.writeMetaDict(bucketPartitionSize, seg.kylinconf.getGlobalDictV2MaxVersions, seg.kylinconf.getGlobalDictV2VersionTTL) + + // after writing global dict, check the uniqueness for global dict + NGlobalDictBuilderAssist.checkGlobalDict(ref, seg, bucketPartitionSize, ss) } private def getLockPath(pathName: String) = s"/${seg.project}${HadoopUtil.GLOBAL_DICT_STORAGE_ROOT}/$pathName/lock" diff --git a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala index fe84f82..3ecdfb5 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala +++ b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala @@ -97,7 +97,7 @@ class TestCreateFlatTable extends SparderBaseFunSuite with SharedSparkSession wi afterJoin1.collect() val jobs = helper.getJobsByGroupId(groupId) - Assert.assertEquals(jobs.length, 13) + Assert.assertEquals(jobs.length, 15) DefaultScheduler.destroyInstance() }
