This is an automated email from the ASF dual-hosted git repository.

pfzhan pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new 1b3d5d56ce KYLIN-6025 sort data to avoid too much data folder for 
internal table
1b3d5d56ce is described below

commit 1b3d5d56ce8612e228c0807536033ffaad60fa16
Author: Zhiting Guo <[email protected]>
AuthorDate: Thu Nov 14 18:48:32 2024 +0800

    KYLIN-6025 sort data to avoid too much data folder for internal table
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  4 +++
 .../kylin/metadata/table/InternalTableDesc.java    | 10 +++++++
 .../metadata/table/InternalTableDescTest.java      | 32 ++++++++++++++++++++++
 .../engine/spark/builder/InternalTableLoader.scala |  7 ++++-
 4 files changed, 52 insertions(+), 1 deletion(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 86edbb79fd..ef388db120 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -520,6 +520,10 @@ public abstract class KylinConfigBase implements 
Serializable {
         return preloadCacheEnabled && isInternalTableEnabled() && 
queryUseGlutenEnabled();
     }
 
+    public boolean isInternalTableSortByPartitionEnabled() {
+        return 
Boolean.parseBoolean(getOptional("kylin.internal-table.sort-by-partition.enabled",
 TRUE));
+    }
+
     public int getQueryConcurrentRunningThresholdForProject() {
         // by default there's no limitation
         return 
Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold",
 "0"));
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/table/InternalTableDesc.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/table/InternalTableDesc.java
index 6a5e3a5d9f..17899196bf 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/table/InternalTableDesc.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/table/InternalTableDesc.java
@@ -32,6 +32,7 @@ import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.NProjectManager;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -51,6 +52,7 @@ public class InternalTableDesc extends ATable implements 
Serializable {
     private static final String PRELOADED_CACHE = "preloadedCache";
     private static final String PRIMARY_KEY = "primaryKey";
     private static final String SORT_BY_KEY = "sortByKey";
+    private static final String SORT_BY_PARTITION_BEFORE_SAVE = 
"sortByPartition";
     public static final int INIT_SIZE = 0;
 
     @Getter
@@ -174,6 +176,14 @@ public class InternalTableDesc extends ATable implements 
Serializable {
         return 
Boolean.parseBoolean(tblProperties.getOrDefault(PRELOADED_CACHE, 
KylinConfig.FALSE));
     }
 
+    public boolean isSortByPartitionEnabled() {
+        if (tblProperties.containsKey(SORT_BY_PARTITION_BEFORE_SAVE)) {
+            return 
Boolean.parseBoolean(tblProperties.get(SORT_BY_PARTITION_BEFORE_SAVE));
+        } else {
+            return 
NProjectManager.getProjectConfig(project).isInternalTableSortByPartitionEnabled();
+        }
+    }
+
     public String generateInternalTableLocation() {
         String workingDir = 
KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory();
         return workingDir + project + "/Internal/" + getDatabase() + "/" + 
getName();
diff --git 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/table/InternalTableDescTest.java
 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/table/InternalTableDescTest.java
index 3fc6b96aff..b4237f94a2 100644
--- 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/table/InternalTableDescTest.java
+++ 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/table/InternalTableDescTest.java
@@ -67,4 +67,36 @@ class InternalTableDescTest {
         Assertions.assertEquals(dateFormat, table.getDatePartitionFormat());
         Assertions.assertEquals(3, table.getTblProperties().size());
     }
+
+    @Test
+    void testSortByPartitionConf() {
+        NTableMetadataManager tableMetadataManager = 
NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv(),
+                "default");
+        TableDesc originTable = 
tableMetadataManager.getTableDesc("DEFAULT.TEST_KYLIN_FACT");
+        InternalTableDesc table = new InternalTableDesc(originTable);
+
+        HashMap<String, String> tblProperties = new HashMap<>();
+
+        table.setTblProperties(tblProperties);
+        table.optimizeTblProperties();
+        Assertions.assertTrue(table.isSortByPartitionEnabled());
+
+        tblProperties.put("sortByPartition", "false");
+        table.setTblProperties(tblProperties);
+        table.optimizeTblProperties();
+        Assertions.assertFalse(table.isSortByPartitionEnabled());
+
+        // disable sort-by-partition in kylinConfig
+        
KylinConfig.getInstanceFromEnv().setProperty("kylin.internal-table.sort-by-partition.enabled",
 "false");
+
+        tblProperties.clear();
+        table.setTblProperties(tblProperties);
+        table.optimizeTblProperties();
+        Assertions.assertFalse(table.isSortByPartitionEnabled());
+
+        tblProperties.put("sortByPartition", "true");
+        table.setTblProperties(tblProperties);
+        table.optimizeTblProperties();
+        Assertions.assertTrue(table.isSortByPartitionEnabled());
+    }
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
index 497ee08b56..73c2e32493 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
@@ -66,13 +66,18 @@ class InternalTableLoader extends Logging {
                         storagePolicy: String,
                         incremental: Boolean): Unit = {
     val location = table.generateInternalTableLocation
-    val sourceData = getSourceData(ss, table, startDate, endDate, incremental)
+    var sourceData = getSourceData(ss, table, startDate, endDate, incremental)
     val tablePartition = table.getTablePartition
     val bucketColumn = table.getBucketColumn
     val bucketNum = table.getBucketNumber
     val primaryKey = table.getTblProperties.get(NBatchConstants.P_PRIMARY_KEY)
     val orderByKey = table.getTblProperties.get(NBatchConstants.P_ORDER_BY_KEY)
     val outPutMode = OVERWRITE
+
+    if (tablePartition != null && table.isSortByPartitionEnabled) {
+      val partitionColumn = tablePartition.getPartitionColumns
+      sourceData = sourceData.sort(partitionColumn(0), 
partitionColumn.slice(1, partitionColumn.length): _*)
+    }
     var writer = sourceData.write.option(STORAGE_POLICY, storagePolicy)
 
     if (tablePartition != null) {

Reply via email to