This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new 588e08b KYLIN-4877 Use all dimension columns as sort columns when saving cuboid data 588e08b is described below commit 588e08b465115a39810819102074c80161e6c790 Author: Zhichao Zhang <441586...@qq.com> AuthorDate: Mon Jan 18 17:27:29 2021 +0800 KYLIN-4877 Use all dimension columns as sort columns when saving cuboid data --- .../main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java | 4 ++-- .../main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java | 4 ++-- .../main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java index 51f9f2c..b463dad 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java @@ -421,7 +421,7 @@ public class CubeBuildJob extends SparkApplication { ss.sparkContext().setJobDescription("build " + layoutEntity.getId() + " from parent " + parentName); Set<Integer> orderedDims = layoutEntity.getOrderedDimensions().keySet(); Dataset<Row> afterSort = afterPrj.select(NSparkCubingUtil.getColumns(orderedDims)) - .sortWithinPartitions(NSparkCubingUtil.getFirstColumn(orderedDims)); + .sortWithinPartitions(NSparkCubingUtil.getColumns(orderedDims)); saveAndUpdateLayout(afterSort, seg, layoutEntity, parentId); } else { Dataset<Row> afterAgg = CuboidAggregator.agg(ss, parent, dimIndexes, cuboid.getOrderedMeasures(), @@ -432,7 +432,7 @@ public class CubeBuildJob extends SparkApplication { Dataset<Row> afterSort = afterAgg .select(NSparkCubingUtil.getColumns(rowKeys, layoutEntity.getOrderedMeasures().keySet())) - .sortWithinPartitions(NSparkCubingUtil.getFirstColumn(rowKeys)); + .sortWithinPartitions(NSparkCubingUtil.getColumns(rowKeys)); saveAndUpdateLayout(afterSort, seg, layoutEntity, parentId); } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java index a716d6f..12e939d 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java @@ -99,13 +99,13 @@ public class CubeMergeJob extends SparkApplication { Dataset<Row> afterSort; if (layout.isTableIndex()) { afterSort = - afterMerge.sortWithinPartitions(NSparkCubingUtil.getFirstColumn(layout.getOrderedDimensions().keySet())); + afterMerge.sortWithinPartitions(NSparkCubingUtil.getColumns(layout.getOrderedDimensions().keySet())); } else { Set<Integer> dimColumns = layout.getOrderedDimensions().keySet(); Dataset<Row> afterAgg = CuboidAggregator.agg(ss, afterMerge, dimColumns, layout.getOrderedMeasures(), spanningTree, false); afterSort = afterAgg.sortWithinPartitions( - NSparkCubingUtil.getFirstColumn(dimColumns)); + NSparkCubingUtil.getColumns(dimColumns)); } buildLayoutWithUpdate.submit(new BuildLayoutWithUpdate.JobEntity() { @Override diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java index e40ed64..01cc9f1 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java @@ -157,12 +157,12 @@ public class Repartitioner { //ss.sessionState().conf().setLocalProperty("spark.sql.adaptive.enabled", "false"); data = storage.getFrom(tempPath, ss).repartition(repartitionNum, NSparkCubingUtil.getColumns(getShardByColumns())) - .sortWithinPartitions(sortCols[0]); + .sortWithinPartitions(sortCols); } else { // repartition for single file size is too small logger.info("Cuboid[{}] repartition to {}", cuboid, repartitionNum); data = storage.getFrom(tempPath, ss).repartition(repartitionNum) - .sortWithinPartitions(sortCols[0]); + .sortWithinPartitions(sortCols); } storage.saveTo(path, data, ss);