This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 3dc5bfd19c347441efea29b630972f3e950ec20d
Author: ChenliangLu <31469905+yab...@users.noreply.github.com>
AuthorDate: Sun Apr 23 14:45:29 2023 +0800

    KYLIN-5640 Support building dynamic bloom filter that adapts to data
---
 pom.xml                                                  |  2 +-
 .../java/org/apache/kylin/common/KylinConfigBase.java    |  4 ++++
 .../java/org/apache/kylin/newten/BloomFilterTest.java    |  4 ++--
 .../kylin/engine/spark/filter/ParquetBloomFilter.java    | 16 +++++++++++++---
 4 files changed, 20 insertions(+), 6 deletions(-)

diff --git a/pom.xml b/pom.xml
index 3f0f3922f2..46f61ffa70 100644
--- a/pom.xml
+++ b/pom.xml
@@ -315,7 +315,7 @@
         <zkclient.version>0.8</zkclient.version>
         <grpc.version>1.0.2</grpc.version>
         <fastPFOR.version>0.0.13</fastPFOR.version>
-        <parquet.version>1.12.2-kylin-r3</parquet.version>
+        <parquet.version>1.12.2-kylin-r4</parquet.version>
         <quartz.version>2.1.1</quartz.version>
         <janino.version>3.0.9</janino.version>
 
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 753d4e00bd..20cfa46a6c 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3942,6 +3942,10 @@ public abstract class KylinConfigBase implements 
Serializable {
         return getOptional("kylin.bloom.build.column", "");
     }
 
+    public String getBloomBuildColumnIds() {
+        return getOptional("kylin.bloom.build.column-ids", "");
+    }
+
     public int getBloomBuildColumnNvd() {
         return Integer.parseInt(getOptional("kylin.bloom.build.column.nvd", 
"200000"));
     }
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java
index 3241711864..9dcebdebe8 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java
@@ -121,7 +121,7 @@ public class BloomFilterTest extends 
NLocalWithSparkSessionTest implements Adapt
         Assert.assertNotNull(layout);
         populateSSWithCSVData(getTestConfig(), getProject(), 
SparderEnv.getSparkSession());
 
-        overwriteSystemProp("kylin.bloom.build.column", "0#10000#1#10000");
+        overwriteSystemProp("kylin.bloom.build.column-ids", "0#1");
         indexDataConstructor.buildIndex(dfID, 
SegmentRange.TimePartitionedSegmentRange.createInfinite(),
                 Sets.newHashSet(
                         
dataflow.getIndexPlan().getLayoutEntity(20000000001L)), true);
@@ -174,7 +174,7 @@ public class BloomFilterTest extends 
NLocalWithSparkSessionTest implements Adapt
             return true;
         });
 
-        overwriteSystemProp("kylin.bloom.build.column", "");
+        overwriteSystemProp("kylin.bloom.build.column-ids", "");
         overwriteSystemProp("kylin.bloom.build.column.max-size", "1");
         ParquetBloomFilter.resetParquetBloomFilter();
         indexDataConstructor.buildIndex(dfID, 
SegmentRange.TimePartitionedSegmentRange.createInfinite(),
diff --git 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java
 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java
index d0be382682..b24f368a0a 100644
--- 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java
+++ 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java
@@ -106,6 +106,7 @@ public class ParquetBloomFilter {
             return;
         }
         String manualColumn = config.getBloomBuildColumn();
+        String bloomColumnIds= config.getBloomBuildColumnIds();
         if (StringUtils.isNotBlank(manualColumn)) {
             String[] blooms = manualColumn.split("#");
             for (int i = 0; i < blooms.length; i += 2) {
@@ -117,6 +118,17 @@ public class ParquetBloomFilter {
             }
             return;
         }
+        dataWriter.option("parquet.bloom.filter.dynamic.enabled", "true");
+        dataWriter.option("parquet.bloom.filter.candidate.size", "10");
+        if (StringUtils.isNotBlank(bloomColumnIds)) {
+            String[] blooms = bloomColumnIds.split("#");
+            for (int i = 0; i < blooms.length; i++) {
+                dataWriter.option("parquet.bloom.filter.enabled#" + blooms[i], 
"true");
+                LOGGER.info("build dynamic BloomFilter info: columnIds is {}", 
blooms[i]);
+                buildBloomColumns.add(blooms[i]);
+            }
+            return;
+        }
         Set<String> columns = 
Arrays.stream(data.columns()).collect(Collectors.toSet());
         Set<ColumnFilter> dataColumns = columnFilters.stream()
                 .filter(column -> 
columns.contains(column.columnId)).collect(Collectors.toSet());
@@ -126,10 +138,8 @@ public class ParquetBloomFilter {
                 break;
             }
             dataWriter.option("parquet.bloom.filter.enabled#" + 
columnFilter.columnId, "true");
-            dataWriter.option("parquet.bloom.filter.expected.ndv#" + 
columnFilter.columnId, config.getBloomBuildColumnNvd());
             buildBloomColumns.add(columnFilter.columnId);
-            LOGGER.info("building BloomFilter : columnId is {}; nvd is {}",
-                    columnFilter.columnId, config.getBloomBuildColumnNvd());
+            LOGGER.info("building BloomFilter : columnId is {}", 
columnFilter.columnId);
             count++;
         }
     }

Reply via email to