This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this 
push:
     new 588e08b  KYLIN-4877 Use all dimension columns as sort columns when 
saving cuboid data
588e08b is described below

commit 588e08b465115a39810819102074c80161e6c790
Author: Zhichao Zhang <441586...@qq.com>
AuthorDate: Mon Jan 18 17:27:29 2021 +0800

    KYLIN-4877 Use all dimension columns as sort columns when saving cuboid data
---
 .../main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java    | 4 ++--
 .../main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java    | 4 ++--
 .../main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java | 4 ++--
 3 files changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index 51f9f2c..b463dad 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -421,7 +421,7 @@ public class CubeBuildJob extends SparkApplication {
             ss.sparkContext().setJobDescription("build " + 
layoutEntity.getId() + " from parent " + parentName);
             Set<Integer> orderedDims = 
layoutEntity.getOrderedDimensions().keySet();
             Dataset<Row> afterSort = 
afterPrj.select(NSparkCubingUtil.getColumns(orderedDims))
-                    
.sortWithinPartitions(NSparkCubingUtil.getFirstColumn(orderedDims));
+                    
.sortWithinPartitions(NSparkCubingUtil.getColumns(orderedDims));
             saveAndUpdateLayout(afterSort, seg, layoutEntity, parentId);
         } else {
             Dataset<Row> afterAgg = CuboidAggregator.agg(ss, parent, 
dimIndexes, cuboid.getOrderedMeasures(),
@@ -432,7 +432,7 @@ public class CubeBuildJob extends SparkApplication {
 
             Dataset<Row> afterSort = afterAgg
                     .select(NSparkCubingUtil.getColumns(rowKeys, 
layoutEntity.getOrderedMeasures().keySet()))
-                    
.sortWithinPartitions(NSparkCubingUtil.getFirstColumn(rowKeys));
+                    
.sortWithinPartitions(NSparkCubingUtil.getColumns(rowKeys));
 
             saveAndUpdateLayout(afterSort, seg, layoutEntity, parentId);
         }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
index a716d6f..12e939d 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
@@ -99,13 +99,13 @@ public class CubeMergeJob extends SparkApplication {
             Dataset<Row> afterSort;
             if (layout.isTableIndex()) {
                 afterSort =
-                        
afterMerge.sortWithinPartitions(NSparkCubingUtil.getFirstColumn(layout.getOrderedDimensions().keySet()));
+                        
afterMerge.sortWithinPartitions(NSparkCubingUtil.getColumns(layout.getOrderedDimensions().keySet()));
             } else {
                 Set<Integer> dimColumns = 
layout.getOrderedDimensions().keySet();
                 Dataset<Row> afterAgg = CuboidAggregator.agg(ss, afterMerge, 
dimColumns,
                         layout.getOrderedMeasures(), spanningTree, false);
                 afterSort = afterAgg.sortWithinPartitions(
-                        NSparkCubingUtil.getFirstColumn(dimColumns));
+                        NSparkCubingUtil.getColumns(dimColumns));
             }
             buildLayoutWithUpdate.submit(new BuildLayoutWithUpdate.JobEntity() 
{
                 @Override
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
index e40ed64..01cc9f1 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
@@ -157,12 +157,12 @@ public class Repartitioner {
                 
//ss.sessionState().conf().setLocalProperty("spark.sql.adaptive.enabled", 
"false");
                 data = storage.getFrom(tempPath, 
ss).repartition(repartitionNum,
                         NSparkCubingUtil.getColumns(getShardByColumns()))
-                        .sortWithinPartitions(sortCols[0]);
+                        .sortWithinPartitions(sortCols);
             } else {
                 // repartition for single file size is too small
                 logger.info("Cuboid[{}] repartition to {}", cuboid, 
repartitionNum);
                 data = storage.getFrom(tempPath, 
ss).repartition(repartitionNum)
-                        .sortWithinPartitions(sortCols[0]);
+                        .sortWithinPartitions(sortCols);
             }
 
             storage.saveTo(path, data, ss);

Reply via email to