This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch offheap_bloom_filter in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 979828bfbaaeb1a9d4707b53541782fee0bb4eec Author: Xiaotian (Jackie) Jiang <jackie....@gmail.com> AuthorDate: Wed Oct 7 21:24:55 2020 -0700 Implement off-heap bloom filter reader --- .../org/apache/pinot/core/bloom/BloomFilter.java | 74 ------------- .../apache/pinot/core/bloom/BloomFilterUtil.java | 57 ----------- .../pinot/core/bloom/GuavaOnHeapBloomFilter.java | 76 -------------- .../core/bloom/SegmentBloomFilterFactory.java | 51 --------- .../indexsegment/mutable/MutableSegmentImpl.java | 7 ++ .../query/pruner/ColumnValueSegmentPruner.java | 2 +- .../creator/BloomFilterCreator.java} | 42 +++----- .../creator/impl/bloom/BloomFilterCreator.java | 68 ------------ .../impl/bloom/OnHeapGuavaBloomFilterCreator.java | 69 +++++++++++++ .../index/column/PhysicalColumnIndexContainer.java | 16 +-- .../segment/index/loader/IndexLoadingConfig.java | 19 ++-- .../loader/bloomfilter/BloomFilterHandler.java | 58 +++++------ .../segment/index/readers/BloomFilterReader.java | 43 +++----- .../readers/bloom/BloomFilterReaderFactory.java} | 39 ++----- .../bloom/GuavaBloomFilterReaderUtils.java} | 41 +++----- .../bloom/OffHeapGuavaBloomFilterReader.java | 81 +++++++++++++++ .../apache/pinot/core/util/TableConfigUtils.java | 18 ++-- .../index/creator/BloomFilterCreatorTest.java | 114 +++++---------------- .../pinot/spi/config/table/BloomFilterConfig.java | 40 +++----- .../pinot/spi/config/table/IndexingConfig.java | 10 ++ 20 files changed, 320 insertions(+), 605 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilter.java b/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilter.java deleted file mode 100644 index b4aab57..0000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilter.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.bloom; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - - -/** - * Interface for bloom filter - */ -public interface BloomFilter { - - /** - * Get the version of bloom filter implementation - * @return a version - */ - int getVersion(); - - /** - * Get the type of the bloom filter - * @return a bloom filter type - */ - BloomFilterType getBloomFilterType(); - - /** - * Add element to bloom filter - * - * @param input input object - */ - void add(Object input); - - /** - * Check if the input element may exist or not - * - * @param input input object for testing - * @return true if the input may exist, false if it does not exist - */ - boolean mightContain(Object input); - - /** - * Serialize bloom filter to output stream. - * - * @param out output stream - * @throws IOException - */ - void writeTo(OutputStream out) - throws IOException; - - /** - * Deserialize the bloom filter from input stream. - * @param in input stream - * @throws IOException - */ - void readFrom(InputStream in) - throws IOException; -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterUtil.java b/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterUtil.java deleted file mode 100644 index fc83b01..0000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterUtil.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.bloom; - -/** - * Util class for bloom filter - */ -public class BloomFilterUtil { - - private BloomFilterUtil() { - } - - public static long computeNumBits(long cardinality, double maxFalsePosProbability) { - return (long) (Math - .ceil((cardinality * Math.log(maxFalsePosProbability)) / Math.log(1.0 / Math.pow(2.0, Math.log(2.0))))); - } - - public static int computeNumberOfHashFunctions(long cardinality, long numBits) { - return (int) Math.max(1.0, Math.round(((double) numBits / cardinality) * Math.log(2.0))); - } - - public static double computeMaxFalsePosProbability(long cardinality, int numHashFunction, long numBits) { - return Math.pow(1.0 - Math.exp(-1.0 * numHashFunction / ((double) numBits / cardinality)), numHashFunction); - } - - public static double computeMaxFalsePositiveProbabilityForNumBits(long cardinality, long maxNumBits, - double defaultMaxFalsePosProbability) { - // Get the number of bits required for achieving default false positive probability - long numBitsRequired = BloomFilterUtil.computeNumBits(cardinality, defaultMaxFalsePosProbability); - - // If the size of bloom filter is smaller than 1MB, use default max false positive probability - if (numBitsRequired <= maxNumBits) { - return defaultMaxFalsePosProbability; - } - - // If the size of bloom filter is larger than 1MB, compute the maximum false positive probability within - // storage limit - int numHashFunction = BloomFilterUtil.computeNumberOfHashFunctions(cardinality, maxNumBits); - return BloomFilterUtil.computeMaxFalsePosProbability(cardinality, numHashFunction, maxNumBits); - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/GuavaOnHeapBloomFilter.java b/pinot-core/src/main/java/org/apache/pinot/core/bloom/GuavaOnHeapBloomFilter.java deleted file mode 100644 index 1339516..0000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/GuavaOnHeapBloomFilter.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.bloom; - -import com.google.common.hash.Funnels; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.charset.Charset; - - -/** - * Bloom filter implementation with guava library - */ -public class GuavaOnHeapBloomFilter implements BloomFilter { - // Increment the version when the bloom filter implementation becomes backward incompatible - private static final int VERSION = 1; - - private com.google.common.hash.BloomFilter _bloomFilter; - - public GuavaOnHeapBloomFilter() { - } - - public GuavaOnHeapBloomFilter(int cardinality, double maxFalsePosProbability) { - _bloomFilter = com.google.common.hash.BloomFilter - .create(Funnels.stringFunnel(Charset.forName("UTF-8")), cardinality, maxFalsePosProbability); - } - - @Override - public int getVersion() { - return VERSION; - } - - @Override - public BloomFilterType getBloomFilterType() { - return BloomFilterType.GUAVA_ON_HEAP; - } - - @Override - public void add(Object input) { - _bloomFilter.put(input.toString()); - } - - @Override - public boolean mightContain(Object input) { - return _bloomFilter.mightContain(input.toString()); - } - - @Override - public void writeTo(OutputStream out) - throws IOException { - _bloomFilter.writeTo(out); - } - - @Override - public void readFrom(InputStream in) - throws IOException { - _bloomFilter = com.google.common.hash.BloomFilter.readFrom(in, Funnels.stringFunnel(Charset.forName("UTF-8"))); - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/SegmentBloomFilterFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/bloom/SegmentBloomFilterFactory.java deleted file mode 100644 index 4d6dbd9..0000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/SegmentBloomFilterFactory.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.bloom; - -/** - * Factory for bloom filter - */ -public class SegmentBloomFilterFactory { - - /** - * Factory used when creating a new bloom filter - * - * @param cardinality cardinality of column - * @param maxFalsePosProbability maximum false positive probability - * @return a bloom filter - */ - public static BloomFilter createSegmentBloomFilter(int cardinality, double maxFalsePosProbability) { - // TODO: when we add more types of bloom filter, we will need to add a new config and wire in here - return new GuavaOnHeapBloomFilter(cardinality, maxFalsePosProbability); - } - - /** - * Factory used when deserializing a bloom filter - * - * @param type a bloom filter type - * @return a bloom filter based on the given type - */ - public static BloomFilter createSegmentBloomFilter(BloomFilterType type) { - switch (type) { - case GUAVA_ON_HEAP: - return new GuavaOnHeapBloomFilter(); - } - throw new RuntimeException("Invalid bloom filter type: " + type.toString()); - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java index 57534cf..38e061f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java @@ -1067,6 +1067,13 @@ public class MutableSegmentImpl implements MutableSegment { _logger.error("Caught exception while closing text index for column: {}, continuing with error", column, e); } } + if (_bloomFilter != null) { + try { + _bloomFilter.close(); + } catch (Exception e) { + _logger.error("Caught exception while closing bloom filter for column: {}, continuing with error", column, e); + } + } } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java index 2704dc6..98eb36e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java @@ -155,7 +155,7 @@ public class ColumnValueSegmentPruner implements SegmentPruner { // Check bloom filter BloomFilterReader bloomFilter = dataSource.getBloomFilter(); if (bloomFilter != null) { - if (!bloomFilter.mightContain(value)) { + if (!bloomFilter.mightContain(value.toString())) { return true; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/BloomFilterCreator.java similarity index 50% copy from pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java copy to pinot-core/src/main/java/org/apache/pinot/core/segment/creator/BloomFilterCreator.java index ac6efd8..7647b96 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/BloomFilterCreator.java @@ -16,38 +16,22 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.core.bloom; +package org.apache.pinot.core.segment.creator; -import java.util.HashMap; -import java.util.Map; +import java.io.Closeable; +import java.io.IOException; -/** - * Enum for bloom filter type - */ -public enum BloomFilterType { - // NOTE: Do not change the value of bloom filter type when adding a new type since we are writing/checking type value - // when serializing/deserializing a bloom filter - GUAVA_ON_HEAP(1); - - private int _value; - private static Map<Integer, BloomFilterType> _bloomFilterTypeMap = new HashMap<>(); - - BloomFilterType(int value) { - _value = value; - } - - static { - for (BloomFilterType pageType : BloomFilterType.values()) { - _bloomFilterTypeMap.put(pageType._value, pageType); - } - } +public interface BloomFilterCreator extends Closeable { - public static BloomFilterType valueOf(int pageType) { - return _bloomFilterTypeMap.get(pageType); - } + /** + * Adds a value to the bloom filter. + */ + void add(String value); - public int getValue() { - return _value; - } + /** + * Seals the index and flushes it to disk. + */ + void seal() + throws IOException; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java deleted file mode 100644 index 7c1a6c4..0000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/bloom/BloomFilterCreator.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.segment.creator.impl.bloom; - -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import org.apache.pinot.core.bloom.BloomFilter; -import org.apache.pinot.core.bloom.BloomFilterUtil; -import org.apache.pinot.core.bloom.SegmentBloomFilterFactory; -import org.apache.pinot.core.segment.creator.impl.V1Constants; - - -/** - * Bloom filter creator - * - * Note: - * 1. Currently, we limit the filter size to 1MB to avoid the heap overhead. We can remove it once we have the offheap - * implementation of the bloom filter. - * 2. When capping the bloom filter to 1MB, max false pos steeply grows from 1 million cardinality. If the column has - * larger than "5 million" cardinality, it is not recommended to use bloom filter since maxFalsePosProb is already - * 0.45 when the filter size is 1MB. - */ -public class BloomFilterCreator implements AutoCloseable { - private static double DEFAULT_MAX_FALSE_POS_PROBABILITY = 0.05; - private static int MB_IN_BITS = 8388608; - - private BloomFilter _bloomFilter; - private File _bloomFilterFile; - - public BloomFilterCreator(File indexDir, String columnName, int cardinality) { - _bloomFilterFile = new File(indexDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION); - double maxFalsePosProbability = BloomFilterUtil - .computeMaxFalsePositiveProbabilityForNumBits(cardinality, MB_IN_BITS, DEFAULT_MAX_FALSE_POS_PROBABILITY); - _bloomFilter = SegmentBloomFilterFactory.createSegmentBloomFilter(cardinality, maxFalsePosProbability); - } - - @Override - public void close() - throws IOException { - try (DataOutputStream outputStream = new DataOutputStream(new FileOutputStream(_bloomFilterFile))) { - outputStream.writeInt(_bloomFilter.getBloomFilterType().getValue()); - outputStream.writeInt(_bloomFilter.getVersion()); - _bloomFilter.writeTo(outputStream); - } - } - - public void add(Object input) { - _bloomFilter.add(input.toString()); - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/bloom/OnHeapGuavaBloomFilterCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/bloom/OnHeapGuavaBloomFilterCreator.java new file mode 100644 index 0000000..fec9a76 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/bloom/OnHeapGuavaBloomFilterCreator.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.creator.impl.bloom; + +import com.google.common.hash.BloomFilter; +import com.google.common.hash.Funnels; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import org.apache.pinot.core.segment.creator.BloomFilterCreator; +import org.apache.pinot.core.segment.creator.impl.V1Constants; +import org.apache.pinot.spi.config.table.BloomFilterConfig; + + +/** + * On-heap creator for guava bloom filter. + */ +@SuppressWarnings("UnstableApiUsage") +public class OnHeapGuavaBloomFilterCreator implements BloomFilterCreator { + public static final int TYPE_VALUE = 1; + public static final int VERSION = 1; + + private final File _bloomFilterFile; + private final BloomFilter<String> _bloomFilter; + + public OnHeapGuavaBloomFilterCreator(File indexDir, String columnName, int cardinality, + BloomFilterConfig bloomFilterConfig) { + _bloomFilterFile = new File(indexDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION); + _bloomFilter = + BloomFilter.create(Funnels.stringFunnel(StandardCharsets.UTF_8), cardinality, bloomFilterConfig.getFpp()); + } + + @Override + public void add(String value) { + _bloomFilter.put(value); + } + + @Override + public void seal() + throws IOException { + try (DataOutputStream out = new DataOutputStream(new FileOutputStream(_bloomFilterFile))) { + out.writeInt(TYPE_VALUE); + out.writeInt(VERSION); + _bloomFilter.writeTo(out); + } + } + + @Override + public void close() { + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java index 8405cf3..0573afe 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java @@ -44,6 +44,7 @@ import org.apache.pinot.core.segment.index.readers.RangeIndexReader; import org.apache.pinot.core.segment.index.readers.SortedIndexReader; import org.apache.pinot.core.segment.index.readers.StringDictionary; import org.apache.pinot.core.segment.index.readers.TextIndexReader; +import org.apache.pinot.core.segment.index.readers.bloom.BloomFilterReaderFactory; import org.apache.pinot.core.segment.index.readers.forward.FixedBitMVForwardIndexReader; import org.apache.pinot.core.segment.index.readers.forward.FixedBitSVForwardIndexReader; import org.apache.pinot.core.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader; @@ -66,7 +67,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer private final InvertedIndexReader<?> _rangeIndex; private final TextIndexReader _textIndex; private final BaseImmutableDictionary _dictionary; - private final BloomFilterReader _bloomFilterReader; + private final BloomFilterReader _bloomFilter; private final NullValueVectorReaderImpl _nullValueVectorReader; public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, ColumnMetadata metadata, @@ -82,7 +83,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer loadInvertedIndex = indexLoadingConfig.getInvertedIndexColumns().contains(columnName); loadRangeIndex = indexLoadingConfig.getRangeIndexColumns().contains(columnName); loadOnHeapDictionary = indexLoadingConfig.getOnHeapDictionaryColumns().contains(columnName); - loadBloomFilter = indexLoadingConfig.getBloomFilterColumns().contains(columnName); + loadBloomFilter = indexLoadingConfig.getBloomFilterConfigs().containsKey(columnName); loadTextIndex = indexLoadingConfig.getTextIndexColumns().contains(columnName); } @@ -108,9 +109,9 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer //bloom filter if (loadBloomFilter) { PinotDataBuffer bloomFilterBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.BLOOM_FILTER); - _bloomFilterReader = new BloomFilterReader(bloomFilterBuffer); + _bloomFilter = BloomFilterReaderFactory.getBloomFilterReader(bloomFilterBuffer); } else { - _bloomFilterReader = null; + _bloomFilter = null; } // Dictionary-based index _dictionary = loadDictionary(segmentReader.getIndexFor(columnName, ColumnIndexType.DICTIONARY), metadata, @@ -150,7 +151,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer // Raw index _forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType()); _dictionary = null; - _bloomFilterReader = null; + _bloomFilter = null; _rangeIndex = null; _invertedIndex = null; } @@ -183,7 +184,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer @Override public BloomFilterReader getBloomFilter() { - return _bloomFilterReader; + return _bloomFilter; } @Override @@ -265,5 +266,8 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer if (_textIndex != null) { _textIndex.close(); } + if (_bloomFilter != null) { + _bloomFilter.close(); + } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java index e296da2..798ce61 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java @@ -30,6 +30,7 @@ import org.apache.pinot.common.segment.ReadMode; import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig; import org.apache.pinot.core.indexsegment.generator.SegmentVersion; import org.apache.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGeneratorMode; +import org.apache.pinot.spi.config.table.BloomFilterConfig; import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.StarTreeIndexConfig; @@ -52,7 +53,7 @@ public class IndexLoadingConfig { private Map<String, String> _noDictionaryConfig = new HashMap<>(); private Set<String> _varLengthDictionaryColumns = new HashSet<>(); private Set<String> _onHeapDictionaryColumns = new HashSet<>(); - private Set<String> _bloomFilterColumns = new HashSet<>(); + private Map<String, BloomFilterConfig> _bloomFilterConfigs = new HashMap<>(); private boolean _enableDynamicStarTreeCreation; private List<StarTreeIndexConfig> _starTreeIndexConfigs; private boolean _enableDefaultStarTree; @@ -98,7 +99,13 @@ public class IndexLoadingConfig { List<String> bloomFilterColumns = indexingConfig.getBloomFilterColumns(); if (bloomFilterColumns != null) { - _bloomFilterColumns.addAll(bloomFilterColumns); + for (String bloomFilterColumn : bloomFilterColumns) { + _bloomFilterConfigs.put(bloomFilterColumn, new BloomFilterConfig(BloomFilterConfig.DEFAULT_FPP)); + } + } + Map<String, BloomFilterConfig> bloomFilterConfigs = indexingConfig.getBloomFilterConfigs(); + if (bloomFilterConfigs != null) { + _bloomFilterConfigs.putAll(bloomFilterConfigs); } List<String> noDictionaryColumns = indexingConfig.getNoDictionaryColumns(); @@ -265,8 +272,8 @@ public class IndexLoadingConfig { } @VisibleForTesting - public void setBloomFilterColumns(Set<String> bloomFilterColumns) { - _bloomFilterColumns = bloomFilterColumns; + public void setBloomFilterConfigs(Map<String, BloomFilterConfig> bloomFilterConfigs) { + _bloomFilterConfigs = bloomFilterConfigs; } @VisibleForTesting @@ -290,8 +297,8 @@ public class IndexLoadingConfig { return _onHeapDictionaryColumns; } - public Set<String> getBloomFilterColumns() { - return _bloomFilterColumns; + public Map<String, BloomFilterConfig> getBloomFilterConfigs() { + return _bloomFilterConfigs; } public boolean isEnableDynamicStarTreeCreation() { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java index 6c6ec81..1e15e79 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/bloomfilter/BloomFilterHandler.java @@ -21,17 +21,20 @@ package org.apache.pinot.core.segment.index.loader.bloomfilter; import java.io.File; import java.io.IOException; import java.util.HashSet; +import java.util.Map; import java.util.Set; -import javax.annotation.Nonnull; import org.apache.commons.io.FileUtils; import org.apache.pinot.core.indexsegment.generator.SegmentVersion; +import org.apache.pinot.core.segment.creator.BloomFilterCreator; import org.apache.pinot.core.segment.creator.impl.V1Constants; -import org.apache.pinot.core.segment.creator.impl.bloom.BloomFilterCreator; +import org.apache.pinot.core.segment.creator.impl.bloom.OnHeapGuavaBloomFilterCreator; import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.core.segment.index.loader.LoaderUtils; import org.apache.pinot.core.segment.index.metadata.ColumnMetadata; import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl; import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary; +import org.apache.pinot.core.segment.index.readers.BytesDictionary; +import org.apache.pinot.core.segment.index.readers.Dictionary; import org.apache.pinot.core.segment.index.readers.DoubleDictionary; import org.apache.pinot.core.segment.index.readers.FloatDictionary; import org.apache.pinot.core.segment.index.readers.IntDictionary; @@ -40,6 +43,7 @@ import org.apache.pinot.core.segment.index.readers.StringDictionary; import org.apache.pinot.core.segment.memory.PinotDataBuffer; import org.apache.pinot.core.segment.store.ColumnIndexType; import org.apache.pinot.core.segment.store.SegmentDirectory; +import org.apache.pinot.spi.config.table.BloomFilterConfig; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,16 +56,18 @@ public class BloomFilterHandler { private final SegmentDirectory.Writer _segmentWriter; private final String _segmentName; private final SegmentVersion _segmentVersion; + private final Map<String, BloomFilterConfig> _bloomFilterConfigs; private final Set<ColumnMetadata> _bloomFilterColumns = new HashSet<>(); - public BloomFilterHandler(@Nonnull File indexDir, @Nonnull SegmentMetadataImpl segmentMetadata, - @Nonnull IndexLoadingConfig indexLoadingConfig, @Nonnull SegmentDirectory.Writer segmentWriter) { + public BloomFilterHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig, + SegmentDirectory.Writer segmentWriter) { _indexDir = indexDir; _segmentWriter = segmentWriter; _segmentName = segmentMetadata.getName(); _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion()); + _bloomFilterConfigs = indexLoadingConfig.getBloomFilterConfigs(); - for (String column : indexLoadingConfig.getBloomFilterColumns()) { + for (String column : _bloomFilterConfigs.keySet()) { ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column); if (columnMetadata != null) { _bloomFilterColumns.add(columnMetadata); @@ -75,6 +81,7 @@ public class BloomFilterHandler { if (columnMetadata.hasDictionary()) { createBloomFilterForColumn(columnMetadata); } + // TODO: Support raw index } } @@ -102,19 +109,17 @@ public class BloomFilterHandler { } // Create new bloom filter for the column. - LOGGER.info("Creating new bloom filter for segment: {}, column: {}", _segmentName, columnName); - try (BloomFilterCreator creator = new BloomFilterCreator(_indexDir, columnName, columnMetadata.getCardinality())) { - if (columnMetadata.hasDictionary()) { - // Read dictionary - try (BaseImmutableDictionary dictionaryReader = getDictionaryReader(columnMetadata, _segmentWriter)) { - for (int i = 0; i < dictionaryReader.length(); i++) { - creator.add(dictionaryReader.get(i)); - } - } - } else { - // Read the forward index - throw new UnsupportedOperationException("Bloom filters not supported for no dictionary columns"); + BloomFilterConfig bloomFilterConfig = _bloomFilterConfigs.get(columnName); + LOGGER.info("Creating new bloom filter for segment: {}, column: {} with config: {}", _segmentName, columnName, + bloomFilterConfig); + try (BloomFilterCreator bloomFilterCreator = new OnHeapGuavaBloomFilterCreator(_indexDir, columnName, + columnMetadata.getCardinality(), bloomFilterConfig); + Dictionary dictionary = getDictionaryReader(columnMetadata, _segmentWriter)) { + int length = dictionary.length(); + for (int i = 0; i < length; i++) { + bloomFilterCreator.add(dictionary.getStringValue(i)); } + bloomFilterCreator.seal(); } // For v3, write the generated bloom filter file into the single file and remove it. @@ -133,29 +138,24 @@ public class BloomFilterHandler { PinotDataBuffer dictionaryBuffer = segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.DICTIONARY); int cardinality = columnMetadata.getCardinality(); - BaseImmutableDictionary dictionaryReader; DataType dataType = columnMetadata.getDataType(); switch (dataType) { case INT: - dictionaryReader = new IntDictionary(dictionaryBuffer, cardinality); - break; + return new IntDictionary(dictionaryBuffer, cardinality); case LONG: - dictionaryReader = new LongDictionary(dictionaryBuffer, cardinality); - break; + return new LongDictionary(dictionaryBuffer, cardinality); case FLOAT: - dictionaryReader = new FloatDictionary(dictionaryBuffer, cardinality); - break; + return new FloatDictionary(dictionaryBuffer, cardinality); case DOUBLE: - dictionaryReader = new DoubleDictionary(dictionaryBuffer, cardinality); - break; + return new DoubleDictionary(dictionaryBuffer, cardinality); case STRING: - dictionaryReader = new StringDictionary(dictionaryBuffer, cardinality, columnMetadata.getColumnMaxLength(), + return new StringDictionary(dictionaryBuffer, cardinality, columnMetadata.getColumnMaxLength(), (byte) columnMetadata.getPaddingCharacter()); - break; + case BYTES: + return new BytesDictionary(dictionaryBuffer, cardinality, columnMetadata.getColumnMaxLength()); default: throw new IllegalStateException( "Unsupported data type: " + dataType + " for column: " + columnMetadata.getColumnName()); } - return dictionaryReader; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BloomFilterReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BloomFilterReader.java index 63bebde..09a1dcf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BloomFilterReader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BloomFilterReader.java @@ -18,40 +18,23 @@ */ package org.apache.pinot.core.segment.index.readers; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; -import org.apache.pinot.core.bloom.BloomFilter; -import org.apache.pinot.core.bloom.BloomFilterType; -import org.apache.pinot.core.bloom.SegmentBloomFilterFactory; -import org.apache.pinot.core.segment.memory.PinotDataBuffer; +import java.io.Closeable; /** - * Bloom filter reader + * Interface for bloom filter reader. */ -public class BloomFilterReader { +public interface BloomFilterReader extends Closeable { - private BloomFilter _bloomFilter; + /** + * Returns {@code true} if the given value might have been put in this bloom filer, {@code false} otherwise. + */ + boolean mightContain(String value); - public BloomFilterReader(PinotDataBuffer bloomFilterBuffer) - throws IOException { - byte[] buffer = new byte[(int) bloomFilterBuffer.size()]; - bloomFilterBuffer.copyTo(0, buffer); - - try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(buffer))) { - BloomFilterType bloomFilterType = BloomFilterType.valueOf(in.readInt()); - int version = in.readInt(); - _bloomFilter = SegmentBloomFilterFactory.createSegmentBloomFilter(bloomFilterType); - if (version != _bloomFilter.getVersion()) { - throw new IOException( - "Unexpected bloom filter version (type: " + bloomFilterType.toString() + ", version: " + version); - } - _bloomFilter.readFrom(in); - } - } - - public boolean mightContain(Object key) { - return _bloomFilter.mightContain(key.toString()); - } + /** + * Returns {@code true} if the value with the given hash might have been put in this bloom filer, {@code false} + * otherwise. + * <p>This method is provided to prevent hashing the same value multiple times. + */ + boolean mightContain(byte[] hash); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/BloomFilterReaderFactory.java similarity index 50% copy from pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java copy to pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/BloomFilterReaderFactory.java index ac6efd8..e507b3d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/BloomFilterReaderFactory.java @@ -16,38 +16,21 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.core.bloom; +package org.apache.pinot.core.segment.index.readers.bloom; -import java.util.HashMap; -import java.util.Map; +import com.google.common.base.Preconditions; +import org.apache.pinot.core.segment.index.readers.BloomFilterReader; +import org.apache.pinot.core.segment.memory.PinotDataBuffer; -/** - * Enum for bloom filter type - */ -public enum BloomFilterType { - // NOTE: Do not change the value of bloom filter type when adding a new type since we are writing/checking type value - // when serializing/deserializing a bloom filter - GUAVA_ON_HEAP(1); - - private int _value; - private static Map<Integer, BloomFilterType> _bloomFilterTypeMap = new HashMap<>(); - - BloomFilterType(int value) { - _value = value; - } - - static { - for (BloomFilterType pageType : BloomFilterType.values()) { - _bloomFilterTypeMap.put(pageType._value, pageType); - } - } - - public static BloomFilterType valueOf(int pageType) { - return _bloomFilterTypeMap.get(pageType); +public class BloomFilterReaderFactory { + private BloomFilterReaderFactory() { } - public int getValue() { - return _value; + public static BloomFilterReader getBloomFilterReader(PinotDataBuffer dataBuffer) { + int typeValue = dataBuffer.getInt(0); + int version = dataBuffer.getInt(4); + Preconditions.checkState(typeValue == 1 && version == 1); + return new OffHeapGuavaBloomFilterReader(dataBuffer.view(8, dataBuffer.size())); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/GuavaBloomFilterReaderUtils.java similarity index 50% copy from pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java copy to pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/GuavaBloomFilterReaderUtils.java index ac6efd8..648fdff 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/GuavaBloomFilterReaderUtils.java @@ -16,38 +16,25 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.core.bloom; +package org.apache.pinot.core.segment.index.readers.bloom; -import java.util.HashMap; -import java.util.Map; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import org.apache.pinot.spi.utils.StringUtils; -/** - * Enum for bloom filter type - */ -public enum BloomFilterType { - // NOTE: Do not change the value of bloom filter type when adding a new type since we are writing/checking type value - // when serializing/deserializing a bloom filter - GUAVA_ON_HEAP(1); - - private int _value; - private static Map<Integer, BloomFilterType> _bloomFilterTypeMap = new HashMap<>(); - - BloomFilterType(int value) { - _value = value; +@SuppressWarnings("UnstableApiUsage") +public class GuavaBloomFilterReaderUtils { + private GuavaBloomFilterReaderUtils() { } - static { - for (BloomFilterType pageType : BloomFilterType.values()) { - _bloomFilterTypeMap.put(pageType._value, pageType); - } - } - - public static BloomFilterType valueOf(int pageType) { - return _bloomFilterTypeMap.get(pageType); - } + // DO NOT change the hash function. It has to be aligned with the bloom filter creator. + private static final HashFunction HASH_FUNCTION = Hashing.murmur3_128(); - public int getValue() { - return _value; + /** + * Returns the hash of the given value as a byte array. + */ + public static byte[] hash(String value) { + return HASH_FUNCTION.hashBytes(StringUtils.encodeUtf8(value)).asBytes(); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/OffHeapGuavaBloomFilterReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/OffHeapGuavaBloomFilterReader.java new file mode 100644 index 0000000..19a632f --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/bloom/OffHeapGuavaBloomFilterReader.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.index.readers.bloom; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; +import org.apache.pinot.core.segment.index.readers.BloomFilterReader; +import org.apache.pinot.core.segment.memory.PinotDataBuffer; + + +/** + * Off-heap reader for guava bloom filter. + * <p>The behavior should be aligned with {@link com.google.common.hash.BloomFilter}. + */ +@SuppressWarnings("UnstableApiUsage") +public class OffHeapGuavaBloomFilterReader implements BloomFilterReader { + private final int _numHashFunctions; + private final long _numBits; + private final PinotDataBuffer _valueBuffer; + + /** + * Format of the data buffer header: + * - Strategy ordinal: 1 byte + * - Number of hash functions: 1 byte + * - Number of long values: 4 bytes + */ + public OffHeapGuavaBloomFilterReader(PinotDataBuffer dataBuffer) { + byte strategyOrdinal = dataBuffer.getByte(0); + Preconditions.checkState(strategyOrdinal == 1, "Invalid strategy ordinal: %s", strategyOrdinal); + _numHashFunctions = dataBuffer.getByte(1) & 0xFF; + _numBits = (long) dataBuffer.getInt(2) * Long.SIZE; + _valueBuffer = dataBuffer.view(6, dataBuffer.size()); + } + + @Override + public boolean mightContain(String value) { + return mightContain(GuavaBloomFilterReaderUtils.hash(value)); + } + + @Override + public boolean mightContain(byte[] hash) { + long hash1 = Longs.fromBytes(hash[7], hash[6], hash[5], hash[4], hash[3], hash[2], hash[1], hash[0]); + long hash2 = Longs.fromBytes(hash[15], hash[14], hash[13], hash[12], hash[11], hash[10], hash[9], hash[8]); + long combinedHash = hash1; + for (int i = 0; i < _numHashFunctions; i++) { + long bitIndex = (combinedHash & Long.MAX_VALUE) % _numBits; + // NOTE: Guava bloom filter stores bits in a long array. Inside each long value, the bits are stored in the + // reverse order (the first bit is stored as the right most bit of the long). + int longIndex = (int) (bitIndex >>> 6); + int bitIndexInLong = (int) (bitIndex & 0x3F); + int byteIndex = (longIndex << 3) | (7 - (bitIndexInLong >>> 3)); + if ((_valueBuffer.getByte(byteIndex) & (1 << (bitIndexInLong & 7))) == 0) { + return false; + } + combinedHash += hash2; + } + return true; + } + + @Override + public void close() { + // NOTE: DO NOT close the PinotDataBuffer here because it is tracked by the caller and might be reused later. The + // caller is responsible of closing the PinotDataBuffer. + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java index d793f34..18267f8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java @@ -40,7 +40,6 @@ import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.StarTreeIndexConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.config.table.TenantConfig; import org.apache.pinot.spi.config.table.TierConfig; import org.apache.pinot.spi.config.table.ingestion.FilterConfig; import org.apache.pinot.spi.config.table.ingestion.TransformConfig; @@ -294,14 +293,19 @@ public final class TableConfigUtils { noDictionaryColumnsSet.add(columnName); } } + Set<String> bloomFilterColumns = new HashSet<>(); if (indexingConfig.getBloomFilterColumns() != null) { - for (String columnName : indexingConfig.getBloomFilterColumns()) { - if (noDictionaryColumnsSet.contains(columnName)) { - throw new IllegalStateException( - "Cannot create a Bloom Filter on column " + columnName + " specified in the noDictionaryColumns config"); - } - columnNameToConfigMap.put(columnName, "Bloom Filter Config"); + bloomFilterColumns.addAll(indexingConfig.getBloomFilterColumns()); + } + if (indexingConfig.getBloomFilterConfigs() != null) { + bloomFilterColumns.addAll(indexingConfig.getBloomFilterConfigs().keySet()); + } + for (String bloomFilterColumn : bloomFilterColumns) { + if (noDictionaryColumnsSet.contains(bloomFilterColumn)) { + throw new IllegalStateException("Cannot create a Bloom Filter on column " + bloomFilterColumn + + " specified in the noDictionaryColumns config"); } + columnNameToConfigMap.put(bloomFilterColumn, "Bloom Filter Config"); } if (indexingConfig.getInvertedIndexColumns() != null) { for (String columnName : indexingConfig.getInvertedIndexColumns()) { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/BloomFilterCreatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/BloomFilterCreatorTest.java index e89b36d..4ecc586 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/BloomFilterCreatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/BloomFilterCreatorTest.java @@ -18,120 +18,54 @@ */ package org.apache.pinot.core.segment.index.creator; -import com.google.common.base.Preconditions; -import java.io.DataInputStream; import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import org.apache.pinot.core.bloom.BloomFilterType; -import org.apache.pinot.core.bloom.BloomFilterUtil; -import org.apache.pinot.core.bloom.GuavaOnHeapBloomFilter; +import org.apache.pinot.core.segment.creator.BloomFilterCreator; import org.apache.pinot.core.segment.creator.impl.V1Constants; -import org.apache.pinot.core.segment.creator.impl.bloom.BloomFilterCreator; +import org.apache.pinot.core.segment.creator.impl.bloom.OnHeapGuavaBloomFilterCreator; +import org.apache.pinot.core.segment.index.readers.BloomFilterReader; +import org.apache.pinot.core.segment.index.readers.bloom.BloomFilterReaderFactory; +import org.apache.pinot.core.segment.memory.PinotDataBuffer; +import org.apache.pinot.spi.config.table.BloomFilterConfig; +import org.apache.pinot.util.TestUtils; import org.testng.Assert; import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; public class BloomFilterCreatorTest { private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "BloomFilterCreatorTest"); - private static int MB_IN_BYTES = 1024 * 1024; + @BeforeClass public void setUp() throws Exception { - if (TEMP_DIR.exists()) { - FileUtils.deleteQuietly(TEMP_DIR); - } - TEMP_DIR.mkdir(); - } - - @Test - public void testBloomFilterUtil() { - // Test against the known results - Assert.assertEquals(BloomFilterUtil.computeNumBits(1000000, 0.03), 7298441); - Assert.assertEquals(BloomFilterUtil.computeNumBits(10000000, 0.03), 72984409); - Assert.assertEquals(BloomFilterUtil.computeNumBits(10000000, 0.1), 47925292); - - Assert.assertEquals(BloomFilterUtil.computeNumberOfHashFunctions(1000000, 7298441), 5); - Assert.assertEquals(BloomFilterUtil.computeNumberOfHashFunctions(10000000, 72984409), 5); - Assert.assertEquals(BloomFilterUtil.computeNumberOfHashFunctions(10000000, 47925292), 3); - - double threshold = 0.001; - Assert - .assertTrue(compareDouble(BloomFilterUtil.computeMaxFalsePosProbability(1000000, 5, 7298441), 0.03, threshold)); - Assert.assertTrue( - compareDouble(BloomFilterUtil.computeMaxFalsePosProbability(10000000, 5, 72984409), 0.03, threshold)); - Assert.assertTrue( - compareDouble(BloomFilterUtil.computeMaxFalsePosProbability(10000000, 3, 47925292), 0.1, threshold)); - } - - private boolean compareDouble(double a, double b, double threshold) { - if (Math.abs(a - b) < threshold) { - return true; - } - return false; + TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR); } @Test public void testBloomFilterCreator() throws Exception { - // Create bloom filter directory - File bloomFilterDir = new File(TEMP_DIR, "bloomFilterDir"); - bloomFilterDir.mkdirs(); - - // Create a bloom filter and serialize it to a file + // Create the bloom filter int cardinality = 10000; String columnName = "testColumn"; - BloomFilterCreator bloomFilterCreator = new BloomFilterCreator(bloomFilterDir, columnName, cardinality); - for (int i = 0; i < 5; i++) { - bloomFilterCreator.add(Integer.toString(i)); - } - bloomFilterCreator.close(); - - // Deserialize the bloom filter and validate - File bloomFilterFile = new File(bloomFilterDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION); - - try (DataInputStream in = new DataInputStream(new FileInputStream(bloomFilterFile))) { - BloomFilterType type = BloomFilterType.valueOf(in.readInt()); - int version = in.readInt(); - GuavaOnHeapBloomFilter bloomFilter = new GuavaOnHeapBloomFilter(); - - Assert.assertEquals(type, bloomFilter.getBloomFilterType()); - Assert.assertEquals(version, bloomFilter.getVersion()); - - bloomFilter.readFrom(in); + try (BloomFilterCreator bloomFilterCreator = new OnHeapGuavaBloomFilterCreator(TEMP_DIR, columnName, cardinality, + new BloomFilterConfig(BloomFilterConfig.DEFAULT_FPP))) { for (int i = 0; i < 5; i++) { - Assert.assertTrue(bloomFilter.mightContain(Integer.toString(i))); - } - for (int j = 5; j < 10; j++) { - Assert.assertFalse(bloomFilter.mightContain(Integer.toString(j))); + bloomFilterCreator.add(Integer.toString(i)); } + bloomFilterCreator.seal(); } - } - - @Test - public void testBloomFilterSize() - throws Exception { - int cardinalityArray[] = new int[]{10, 100, 1000, 100000, 100000, 1000000, 5000000, 10000000}; - for (int cardinality : cardinalityArray) { - FileUtils.deleteQuietly(TEMP_DIR); - File indexDir = new File(TEMP_DIR, "testBloomFilterSize"); - Preconditions.checkState(indexDir.mkdirs()); - - String columnName = "testSize"; - BloomFilterCreator bloomFilterCreator = new BloomFilterCreator(indexDir, columnName, cardinality); - bloomFilterCreator.close(); - File bloomFilterFile = new File(indexDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION); - - try (InputStream inputStream = new FileInputStream(bloomFilterFile)) { - byte[] bloomFilterBytes = IOUtils.toByteArray(inputStream); - long actualBloomFilterSize = bloomFilterBytes.length; - // Check if the size of bloom filter does not go beyond 1MB. Note that guava bloom filter has 11-12 bytes of - // overhead - Assert.assertTrue(actualBloomFilterSize < MB_IN_BYTES + 12); + // Read the bloom filter + File bloomFilterFile = new File(TEMP_DIR, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION); + try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(bloomFilterFile); + BloomFilterReader bloomFilterReader = BloomFilterReaderFactory.getBloomFilterReader(dataBuffer)) { + for (int i = 0; i < 5; i++) { + Assert.assertTrue(bloomFilterReader.mightContain(Integer.toString(i))); + } + for (int i = 5; i < 10; i++) { + Assert.assertFalse(bloomFilterReader.mightContain(Integer.toString(i))); } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/BloomFilterConfig.java similarity index 50% rename from pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/config/table/BloomFilterConfig.java index ac6efd8..d488ece 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/bloom/BloomFilterType.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/BloomFilterConfig.java @@ -16,38 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.core.bloom; +package org.apache.pinot.spi.config.table; -import java.util.HashMap; -import java.util.Map; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.pinot.spi.config.BaseJsonConfig; -/** - * Enum for bloom filter type - */ -public enum BloomFilterType { - // NOTE: Do not change the value of bloom filter type when adding a new type since we are writing/checking type value - // when serializing/deserializing a bloom filter - GUAVA_ON_HEAP(1); - - private int _value; - private static Map<Integer, BloomFilterType> _bloomFilterTypeMap = new HashMap<>(); +public class BloomFilterConfig extends BaseJsonConfig { + public static final double DEFAULT_FPP = 0.05; - BloomFilterType(int value) { - _value = value; - } - - static { - for (BloomFilterType pageType : BloomFilterType.values()) { - _bloomFilterTypeMap.put(pageType._value, pageType); - } - } + private final double _fpp; - public static BloomFilterType valueOf(int pageType) { - return _bloomFilterTypeMap.get(pageType); + @JsonCreator + public BloomFilterConfig(@JsonProperty(value = "fpp", required = true) double fpp) { + Preconditions.checkArgument(fpp > 0.0 && fpp < 1.0, "Invalid fpp (false positive probability): %s", fpp); + _fpp = fpp; } - public int getValue() { - return _value; + public double getFpp() { + return _fpp; } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java index 3081e28..3dd137b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java @@ -31,6 +31,7 @@ public class IndexingConfig extends BaseJsonConfig { private boolean _createInvertedIndexDuringSegmentGeneration; private List<String> _sortedColumn; private List<String> _bloomFilterColumns; + private Map<String, BloomFilterConfig> _bloomFilterConfigs; private String _loadMode; private Map<String, String> _streamConfigs; private String _segmentFormatVersion; @@ -105,6 +106,15 @@ public class IndexingConfig extends BaseJsonConfig { } @Nullable + public Map<String, BloomFilterConfig> getBloomFilterConfigs() { + return _bloomFilterConfigs; + } + + public void setBloomFilterConfigs(Map<String, BloomFilterConfig> bloomFilterConfigs) { + _bloomFilterConfigs = bloomFilterConfigs; + } + + @Nullable public String getLoadMode() { return _loadMode; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org