This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/main by this push:
     new 23365b6  KYLIN-5011 HotFix for scatter skew data in dict-encoding step
23365b6 is described below

commit 23365b6cae8939e6bbefb328d1c80d4236afda34
Author: yaqian.zhang <598593...@qq.com>
AuthorDate: Thu Aug 12 16:36:53 2021 +0800

    KYLIN-5011 HotFix for scatter skew data in dict-encoding step
---
 .../org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
index c9cea1f..cf5d07c 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
@@ -53,7 +53,7 @@ object CubeTableEncoder extends Logging {
     val bucketThreshold = seg.kylinconf.getGlobalDictV2ThresholdBucketSize
     val minBucketSize: Long = sourceCnt / bucketThreshold
 
-    var repartitionSizeAfterEncode = 0;
+    var repartitionSizeAfterEncode = 0
     cols.asScala.foreach(
       ref => {
         val globalDict = new NGlobalDictionary(seg.project, 
ref.tableAliasName, ref.columnName, seg.kylinconf.getHdfsWorkingDirectory)
@@ -72,11 +72,12 @@ object CubeTableEncoder extends Logging {
         var encodeCol = dict_encode(col(encodeColRef).cast(StringType), 
lit(dictParams), lit(bucketSize).cast(StringType)).as(aliasName)
         val columns = partitionedDs.schema.map(ty => col(ty.name))
 
+        var scatterSkewedData = false
         if (seg.kylinconf.detectDataSkewInDictEncodingEnabled()) {
           //find skewed data in dict-encoding step
           val castEncodeColRef = col(encodeColRef).cast(StringType)
           val sampleData = 
ds.select(castEncodeColRef).sample(seg.kylinconf.sampleRateInEncodingSkewDetection()).cache()
-          val totalCount = sampleData.count();
+          val totalCount = sampleData.count()
           val skewDictStorage = new 
Path(seg.kylinconf.getJobTmpDir(seg.project) +
             "/" + jobId + "/skewed_data/" + ref.identity)
           val skewedDict = new Object2LongOpenHashMap[String]()
@@ -90,6 +91,7 @@ object CubeTableEncoder extends Logging {
 
           //save skewed data dict
           if (skewedDict.size() > 0) {
+            scatterSkewedData = true
             val kryo = new Kryo()
             val fs = skewDictStorage.getFileSystem(new Configuration())
             if (fs.exists(skewDictStorage)) {
@@ -112,7 +114,8 @@ object CubeTableEncoder extends Logging {
               .repartition(enlargedBucketSize, col("scatter_skew_data_" + 
ref.columnName))
               .select(columns ++ Seq(encodeCol): _*)
           }
-        } else {
+        }
+        if (!scatterSkewedData) {
           partitionedDs = partitionedDs
             .repartition(enlargedBucketSize, 
col(encodeColRef).cast(StringType))
             .select(columns ++ Seq(encodeCol): _*)

Reply via email to