This is an automated email from the ASF dual-hosted git repository. zhangzc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push: new 23365b6 KYLIN-5011 HotFix for scatter skew data in dict-encoding step 23365b6 is described below commit 23365b6cae8939e6bbefb328d1c80d4236afda34 Author: yaqian.zhang <598593...@qq.com> AuthorDate: Thu Aug 12 16:36:53 2021 +0800 KYLIN-5011 HotFix for scatter skew data in dict-encoding step --- .../org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala index c9cea1f..cf5d07c 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala @@ -53,7 +53,7 @@ object CubeTableEncoder extends Logging { val bucketThreshold = seg.kylinconf.getGlobalDictV2ThresholdBucketSize val minBucketSize: Long = sourceCnt / bucketThreshold - var repartitionSizeAfterEncode = 0; + var repartitionSizeAfterEncode = 0 cols.asScala.foreach( ref => { val globalDict = new NGlobalDictionary(seg.project, ref.tableAliasName, ref.columnName, seg.kylinconf.getHdfsWorkingDirectory) @@ -72,11 +72,12 @@ object CubeTableEncoder extends Logging { var encodeCol = dict_encode(col(encodeColRef).cast(StringType), lit(dictParams), lit(bucketSize).cast(StringType)).as(aliasName) val columns = partitionedDs.schema.map(ty => col(ty.name)) + var scatterSkewedData = false if (seg.kylinconf.detectDataSkewInDictEncodingEnabled()) { //find skewed data in dict-encoding step val castEncodeColRef = col(encodeColRef).cast(StringType) val sampleData = ds.select(castEncodeColRef).sample(seg.kylinconf.sampleRateInEncodingSkewDetection()).cache() - val totalCount = sampleData.count(); + val totalCount = sampleData.count() val skewDictStorage = new Path(seg.kylinconf.getJobTmpDir(seg.project) + "/" + jobId + "/skewed_data/" + ref.identity) val skewedDict = new Object2LongOpenHashMap[String]() @@ -90,6 +91,7 @@ object CubeTableEncoder extends Logging { //save skewed data dict if (skewedDict.size() > 0) { + scatterSkewedData = true val kryo = new Kryo() val fs = skewDictStorage.getFileSystem(new Configuration()) if (fs.exists(skewDictStorage)) { @@ -112,7 +114,8 @@ object CubeTableEncoder extends Logging { .repartition(enlargedBucketSize, col("scatter_skew_data_" + ref.columnName)) .select(columns ++ Seq(encodeCol): _*) } - } else { + } + if (!scatterSkewedData) { partitionedDs = partitionedDs .repartition(enlargedBucketSize, col(encodeColRef).cast(StringType)) .select(columns ++ Seq(encodeCol): _*)