This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 3dc5bfd19c347441efea29b630972f3e950ec20d Author: ChenliangLu <31469905+yab...@users.noreply.github.com> AuthorDate: Sun Apr 23 14:45:29 2023 +0800 KYLIN-5640 Support building dynamic bloom filter that adapts to data --- pom.xml | 2 +- .../java/org/apache/kylin/common/KylinConfigBase.java | 4 ++++ .../java/org/apache/kylin/newten/BloomFilterTest.java | 4 ++-- .../kylin/engine/spark/filter/ParquetBloomFilter.java | 16 +++++++++++++--- 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index 3f0f3922f2..46f61ffa70 100644 --- a/pom.xml +++ b/pom.xml @@ -315,7 +315,7 @@ <zkclient.version>0.8</zkclient.version> <grpc.version>1.0.2</grpc.version> <fastPFOR.version>0.0.13</fastPFOR.version> - <parquet.version>1.12.2-kylin-r3</parquet.version> + <parquet.version>1.12.2-kylin-r4</parquet.version> <quartz.version>2.1.1</quartz.version> <janino.version>3.0.9</janino.version> diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 753d4e00bd..20cfa46a6c 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -3942,6 +3942,10 @@ public abstract class KylinConfigBase implements Serializable { return getOptional("kylin.bloom.build.column", ""); } + public String getBloomBuildColumnIds() { + return getOptional("kylin.bloom.build.column-ids", ""); + } + public int getBloomBuildColumnNvd() { return Integer.parseInt(getOptional("kylin.bloom.build.column.nvd", "200000")); } diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java index 3241711864..9dcebdebe8 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java @@ -121,7 +121,7 @@ public class BloomFilterTest extends NLocalWithSparkSessionTest implements Adapt Assert.assertNotNull(layout); populateSSWithCSVData(getTestConfig(), getProject(), SparderEnv.getSparkSession()); - overwriteSystemProp("kylin.bloom.build.column", "0#10000#1#10000"); + overwriteSystemProp("kylin.bloom.build.column-ids", "0#1"); indexDataConstructor.buildIndex(dfID, SegmentRange.TimePartitionedSegmentRange.createInfinite(), Sets.newHashSet( dataflow.getIndexPlan().getLayoutEntity(20000000001L)), true); @@ -174,7 +174,7 @@ public class BloomFilterTest extends NLocalWithSparkSessionTest implements Adapt return true; }); - overwriteSystemProp("kylin.bloom.build.column", ""); + overwriteSystemProp("kylin.bloom.build.column-ids", ""); overwriteSystemProp("kylin.bloom.build.column.max-size", "1"); ParquetBloomFilter.resetParquetBloomFilter(); indexDataConstructor.buildIndex(dfID, SegmentRange.TimePartitionedSegmentRange.createInfinite(), diff --git a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java index d0be382682..b24f368a0a 100644 --- a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java +++ b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java @@ -106,6 +106,7 @@ public class ParquetBloomFilter { return; } String manualColumn = config.getBloomBuildColumn(); + String bloomColumnIds= config.getBloomBuildColumnIds(); if (StringUtils.isNotBlank(manualColumn)) { String[] blooms = manualColumn.split("#"); for (int i = 0; i < blooms.length; i += 2) { @@ -117,6 +118,17 @@ public class ParquetBloomFilter { } return; } + dataWriter.option("parquet.bloom.filter.dynamic.enabled", "true"); + dataWriter.option("parquet.bloom.filter.candidate.size", "10"); + if (StringUtils.isNotBlank(bloomColumnIds)) { + String[] blooms = bloomColumnIds.split("#"); + for (int i = 0; i < blooms.length; i++) { + dataWriter.option("parquet.bloom.filter.enabled#" + blooms[i], "true"); + LOGGER.info("build dynamic BloomFilter info: columnIds is {}", blooms[i]); + buildBloomColumns.add(blooms[i]); + } + return; + } Set<String> columns = Arrays.stream(data.columns()).collect(Collectors.toSet()); Set<ColumnFilter> dataColumns = columnFilters.stream() .filter(column -> columns.contains(column.columnId)).collect(Collectors.toSet()); @@ -126,10 +138,8 @@ public class ParquetBloomFilter { break; } dataWriter.option("parquet.bloom.filter.enabled#" + columnFilter.columnId, "true"); - dataWriter.option("parquet.bloom.filter.expected.ndv#" + columnFilter.columnId, config.getBloomBuildColumnNvd()); buildBloomColumns.add(columnFilter.columnId); - LOGGER.info("building BloomFilter : columnId is {}; nvd is {}", - columnFilter.columnId, config.getBloomBuildColumnNvd()); + LOGGER.info("building BloomFilter : columnId is {}", columnFilter.columnId); count++; } }