This is an automated email from the ASF dual-hosted git repository.
pfzhan pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new 1b3d5d56ce KYLIN-6025 sort data to avoid too much data folder for
internal table
1b3d5d56ce is described below
commit 1b3d5d56ce8612e228c0807536033ffaad60fa16
Author: Zhiting Guo <[email protected]>
AuthorDate: Thu Nov 14 18:48:32 2024 +0800
KYLIN-6025 sort data to avoid too much data folder for internal table
---
.../org/apache/kylin/common/KylinConfigBase.java | 4 +++
.../kylin/metadata/table/InternalTableDesc.java | 10 +++++++
.../metadata/table/InternalTableDescTest.java | 32 ++++++++++++++++++++++
.../engine/spark/builder/InternalTableLoader.scala | 7 ++++-
4 files changed, 52 insertions(+), 1 deletion(-)
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 86edbb79fd..ef388db120 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -520,6 +520,10 @@ public abstract class KylinConfigBase implements
Serializable {
return preloadCacheEnabled && isInternalTableEnabled() &&
queryUseGlutenEnabled();
}
+ public boolean isInternalTableSortByPartitionEnabled() {
+ return
Boolean.parseBoolean(getOptional("kylin.internal-table.sort-by-partition.enabled",
TRUE));
+ }
+
public int getQueryConcurrentRunningThresholdForProject() {
// by default there's no limitation
return
Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold",
"0"));
diff --git
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/table/InternalTableDesc.java
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/table/InternalTableDesc.java
index 6a5e3a5d9f..17899196bf 100644
---
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/table/InternalTableDesc.java
+++
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/table/InternalTableDesc.java
@@ -32,6 +32,7 @@ import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.NProjectManager;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -51,6 +52,7 @@ public class InternalTableDesc extends ATable implements
Serializable {
private static final String PRELOADED_CACHE = "preloadedCache";
private static final String PRIMARY_KEY = "primaryKey";
private static final String SORT_BY_KEY = "sortByKey";
+ private static final String SORT_BY_PARTITION_BEFORE_SAVE =
"sortByPartition";
public static final int INIT_SIZE = 0;
@Getter
@@ -174,6 +176,14 @@ public class InternalTableDesc extends ATable implements
Serializable {
return
Boolean.parseBoolean(tblProperties.getOrDefault(PRELOADED_CACHE,
KylinConfig.FALSE));
}
+ public boolean isSortByPartitionEnabled() {
+ if (tblProperties.containsKey(SORT_BY_PARTITION_BEFORE_SAVE)) {
+ return
Boolean.parseBoolean(tblProperties.get(SORT_BY_PARTITION_BEFORE_SAVE));
+ } else {
+ return
NProjectManager.getProjectConfig(project).isInternalTableSortByPartitionEnabled();
+ }
+ }
+
public String generateInternalTableLocation() {
String workingDir =
KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory();
return workingDir + project + "/Internal/" + getDatabase() + "/" +
getName();
diff --git
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/table/InternalTableDescTest.java
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/table/InternalTableDescTest.java
index 3fc6b96aff..b4237f94a2 100644
---
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/table/InternalTableDescTest.java
+++
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/table/InternalTableDescTest.java
@@ -67,4 +67,36 @@ class InternalTableDescTest {
Assertions.assertEquals(dateFormat, table.getDatePartitionFormat());
Assertions.assertEquals(3, table.getTblProperties().size());
}
+
+ @Test
+ void testSortByPartitionConf() {
+ NTableMetadataManager tableMetadataManager =
NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv(),
+ "default");
+ TableDesc originTable =
tableMetadataManager.getTableDesc("DEFAULT.TEST_KYLIN_FACT");
+ InternalTableDesc table = new InternalTableDesc(originTable);
+
+ HashMap<String, String> tblProperties = new HashMap<>();
+
+ table.setTblProperties(tblProperties);
+ table.optimizeTblProperties();
+ Assertions.assertTrue(table.isSortByPartitionEnabled());
+
+ tblProperties.put("sortByPartition", "false");
+ table.setTblProperties(tblProperties);
+ table.optimizeTblProperties();
+ Assertions.assertFalse(table.isSortByPartitionEnabled());
+
+ // disable sort-by-partition in kylinConfig
+
KylinConfig.getInstanceFromEnv().setProperty("kylin.internal-table.sort-by-partition.enabled",
"false");
+
+ tblProperties.clear();
+ table.setTblProperties(tblProperties);
+ table.optimizeTblProperties();
+ Assertions.assertFalse(table.isSortByPartitionEnabled());
+
+ tblProperties.put("sortByPartition", "true");
+ table.setTblProperties(tblProperties);
+ table.optimizeTblProperties();
+ Assertions.assertTrue(table.isSortByPartitionEnabled());
+ }
}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
index 497ee08b56..73c2e32493 100644
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
@@ -66,13 +66,18 @@ class InternalTableLoader extends Logging {
storagePolicy: String,
incremental: Boolean): Unit = {
val location = table.generateInternalTableLocation
- val sourceData = getSourceData(ss, table, startDate, endDate, incremental)
+ var sourceData = getSourceData(ss, table, startDate, endDate, incremental)
val tablePartition = table.getTablePartition
val bucketColumn = table.getBucketColumn
val bucketNum = table.getBucketNumber
val primaryKey = table.getTblProperties.get(NBatchConstants.P_PRIMARY_KEY)
val orderByKey = table.getTblProperties.get(NBatchConstants.P_ORDER_BY_KEY)
val outPutMode = OVERWRITE
+
+ if (tablePartition != null && table.isSortByPartitionEnabled) {
+ val partitionColumn = tablePartition.getPartitionColumns
+ sourceData = sourceData.sort(partitionColumn(0),
partitionColumn.slice(1, partitionColumn.length): _*)
+ }
var writer = sourceData.write.option(STORAGE_POLICY, storagePolicy)
if (tablePartition != null) {