This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 221db828cf Let applyAnd to be applied using different window sizes (#10372) 221db828cf is described below commit 221db828cf535eb751267124bca35c9a3b1f9c52 Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com> AuthorDate: Thu Mar 9 19:45:33 2023 +0100 Let applyAnd to be applied using different window sizes (#10372) --- .../core/operator/BitmapDocIdSetOperator.java | 5 +++ .../ExpressionScanDocIdIterator.java | 16 +++++++ .../dociditerators/MVScanDocIdIterator.java | 20 ++++++--- .../dociditerators/SVScanDocIdIterator.java | 50 ++++++++++++++-------- .../dociditerators/ScanBasedDocIdIterator.java | 11 ++++- .../core/operator/docidsets/SVScanDocIdSet.java | 4 +- .../operator/filter/ScanBasedFilterOperator.java | 13 +++++- 7 files changed, 89 insertions(+), 30 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java index b251c552d8..4f203599c1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java @@ -52,6 +52,11 @@ public class BitmapDocIdSetOperator extends BaseOperator<DocIdSetBlock> { _docIdBuffer = new int[Math.min(numDocs, DocIdSetPlanNode.MAX_DOC_PER_CALL)]; } + public BitmapDocIdSetOperator(IntIterator intIterator, int[] docIdBuffer) { + _intIterator = intIterator; + _docIdBuffer = docIdBuffer; + } + public BitmapDocIdSetOperator(ImmutableBitmapDataProvider bitmap, int[] docIdBuffer) { _intIterator = bitmap.getIntIterator(); _docIdBuffer = docIdBuffer; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java index 64987293bc..b5b2273644 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java @@ -21,6 +21,7 @@ package org.apache.pinot.core.operator.dociditerators; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.OptionalInt; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.operator.BitmapDocIdSetOperator; @@ -33,7 +34,9 @@ import org.apache.pinot.core.operator.transform.function.TransformFunction; import org.apache.pinot.core.plan.DocIdSetPlanNode; import org.apache.pinot.segment.spi.Constants; import org.apache.pinot.segment.spi.datasource.DataSource; +import org.roaringbitmap.BatchIterator; import org.roaringbitmap.BitmapDataProvider; +import org.roaringbitmap.IntIterator; import org.roaringbitmap.PeekableIntIterator; import org.roaringbitmap.RoaringBitmap; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; @@ -110,6 +113,19 @@ public final class ExpressionScanDocIdIterator implements ScanBasedDocIdIterator return next(); } + @Override + public MutableRoaringBitmap applyAnd(BatchIterator batchIterator, OptionalInt firstDoc, OptionalInt lastDoc) { + IntIterator intIterator = batchIterator.asIntIterator(new int[OPTIMAL_ITERATOR_BATCH_SIZE]); + ProjectionOperator projectionOperator = + new ProjectionOperator(_dataSourceMap, new BitmapDocIdSetOperator(intIterator, _docIdBuffer)); + MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap(); + ProjectionBlock projectionBlock; + while ((projectionBlock = projectionOperator.nextBlock()) != null) { + processProjectionBlock(projectionBlock, matchingDocIds); + } + return matchingDocIds; + } + @Override public MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) { ProjectionOperator projectionOperator = diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java index 558c4003d9..1c794e81c0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.operator.dociditerators; +import java.util.OptionalInt; import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.segment.spi.Constants; import org.apache.pinot.segment.spi.datasource.DataSource; @@ -26,7 +27,6 @@ import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; import org.apache.pinot.spi.utils.CommonConstants.Query.OptimizationConstants; import org.roaringbitmap.BatchIterator; import org.roaringbitmap.RoaringBitmapWriter; -import org.roaringbitmap.buffer.ImmutableRoaringBitmap; import org.roaringbitmap.buffer.MutableRoaringBitmap; @@ -79,13 +79,21 @@ public final class MVScanDocIdIterator implements ScanBasedDocIdIterator { } @Override - public MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) { - if (docIds.isEmpty()) { + public MutableRoaringBitmap applyAnd(BatchIterator docIdIterator, OptionalInt firstDoc, OptionalInt lastDoc) { + if (!docIdIterator.hasNext()) { return new MutableRoaringBitmap(); } - RoaringBitmapWriter<MutableRoaringBitmap> result = RoaringBitmapWriter.bufferWriter() - .expectedRange(docIds.first(), docIds.last()).runCompress(false).get(); - BatchIterator docIdIterator = docIds.getBatchIterator(); + RoaringBitmapWriter<MutableRoaringBitmap> result; + if (firstDoc.isPresent() && lastDoc.isPresent()) { + result = RoaringBitmapWriter.bufferWriter() + .expectedRange(firstDoc.getAsInt(), lastDoc.getAsInt()) + .runCompress(false) + .get(); + } else { + result = RoaringBitmapWriter.bufferWriter() + .runCompress(false) + .get(); + } int[] buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE]; while (docIdIterator.hasNext()) { int limit = docIdIterator.nextBatch(buffer); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java index 1a4f9c2c54..baf07c16aa 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java @@ -18,7 +18,9 @@ */ package org.apache.pinot.core.operator.dociditerators; +import java.util.OptionalInt; import javax.annotation.Nullable; +import org.apache.pinot.core.common.BlockDocIdIterator; import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.segment.spi.Constants; import org.apache.pinot.segment.spi.datasource.DataSource; @@ -44,7 +46,7 @@ public final class SVScanDocIdIterator implements ScanBasedDocIdIterator { private final ForwardIndexReaderContext _readerContext; private final int _numDocs; private final ValueMatcher _valueMatcher; - private final int[] _batch = new int[OPTIMAL_ITERATOR_BATCH_SIZE]; + private final int[] _batch; private int _firstMismatch; private int _cursor; private final int _cardinality; @@ -53,7 +55,8 @@ public final class SVScanDocIdIterator implements ScanBasedDocIdIterator { private long _numEntriesScanned = 0L; public SVScanDocIdIterator(PredicateEvaluator predicateEvaluator, DataSource dataSource, int numDocs, - @Nullable NullValueVectorReader nullValueReader) { + @Nullable NullValueVectorReader nullValueReader, int batchSize) { + _batch = new int[batchSize]; _predicateEvaluator = predicateEvaluator; _reader = dataSource.getForwardIndex(); _readerContext = _reader.createContext(); @@ -69,6 +72,7 @@ public final class SVScanDocIdIterator implements ScanBasedDocIdIterator { // for testing public SVScanDocIdIterator(PredicateEvaluator predicateEvaluator, ForwardIndexReader reader, int numDocs, @Nullable NullValueVectorReader nullValueReader) { + _batch = new int[BlockDocIdIterator.OPTIMAL_ITERATOR_BATCH_SIZE]; _predicateEvaluator = predicateEvaluator; _reader = reader; _readerContext = reader.createContext(); @@ -87,7 +91,7 @@ public final class SVScanDocIdIterator implements ScanBasedDocIdIterator { int limit; int batchSize = 0; do { - limit = Math.min(_numDocs - _nextDocId, OPTIMAL_ITERATOR_BATCH_SIZE); + limit = Math.min(_numDocs - _nextDocId, _batch.length); if (limit > 0) { for (int i = 0; i < limit; i++) { _batch[i] = _nextDocId + i; @@ -121,14 +125,22 @@ public final class SVScanDocIdIterator implements ScanBasedDocIdIterator { } @Override - public MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) { - if (docIds.isEmpty()) { + public MutableRoaringBitmap applyAnd(BatchIterator docIdIterator, OptionalInt firstDoc, OptionalInt lastDoc) { + if (!docIdIterator.hasNext()) { return new MutableRoaringBitmap(); } - RoaringBitmapWriter<MutableRoaringBitmap> result = RoaringBitmapWriter.bufferWriter() - .expectedRange(docIds.first(), docIds.last()).runCompress(false).get(); - BatchIterator docIdIterator = docIds.getBatchIterator(); - int[] buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE]; + RoaringBitmapWriter<MutableRoaringBitmap> result; + if (firstDoc.isPresent() && lastDoc.isPresent()) { + result = RoaringBitmapWriter.bufferWriter() + .expectedRange(firstDoc.getAsInt(), lastDoc.getAsInt()) + .runCompress(false) + .get(); + } else { + result = RoaringBitmapWriter.bufferWriter() + .runCompress(false) + .get(); + } + int[] buffer = new int[_batch.length]; while (docIdIterator.hasNext()) { int limit = docIdIterator.nextBatch(buffer); if (limit > 0) { @@ -264,7 +276,7 @@ public final class SVScanDocIdIterator implements ScanBasedDocIdIterator { private class DictIdMatcher implements ValueMatcher { - private final int[] _buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE]; + private final int[] _buffer = new int[_batch.length]; @Override public boolean doesValueMatch(int docId) { @@ -280,7 +292,7 @@ public final class SVScanDocIdIterator implements ScanBasedDocIdIterator { private class DictIdMatcherAndNullHandler implements ValueMatcher { - private final int[] _buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE]; + private final int[] _buffer = new int[_batch.length]; private final ImmutableRoaringBitmap _nullBitmap; public DictIdMatcherAndNullHandler(ImmutableRoaringBitmap nullBitmap) { @@ -308,7 +320,7 @@ public final class SVScanDocIdIterator implements ScanBasedDocIdIterator { private class IntMatcher implements ValueMatcher { - private final int[] _buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE]; + private final int[] _buffer = new int[_batch.length]; @Override public boolean doesValueMatch(int docId) { @@ -325,7 +337,7 @@ public final class SVScanDocIdIterator implements ScanBasedDocIdIterator { private class IntMatcherAndNullHandler implements ValueMatcher { private final ImmutableRoaringBitmap _nullBitmap; - private final int[] _buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE]; + private final int[] _buffer = new int[_batch.length]; public IntMatcherAndNullHandler(ImmutableRoaringBitmap nullBitmap) { _nullBitmap = nullBitmap; @@ -349,7 +361,7 @@ public final class SVScanDocIdIterator implements ScanBasedDocIdIterator { private class LongMatcher implements ValueMatcher { - private final long[] _buffer = new long[OPTIMAL_ITERATOR_BATCH_SIZE]; + private final long[] _buffer = new long[_batch.length]; @Override public boolean doesValueMatch(int docId) { @@ -366,7 +378,7 @@ public final class SVScanDocIdIterator implements ScanBasedDocIdIterator { private class LongMatcherAndNullHandler implements ValueMatcher { private final ImmutableRoaringBitmap _nullBitmap; - private final long[] _buffer = new long[OPTIMAL_ITERATOR_BATCH_SIZE]; + private final long[] _buffer = new long[_batch.length]; public LongMatcherAndNullHandler(ImmutableRoaringBitmap nullBitmap) { _nullBitmap = nullBitmap; @@ -390,7 +402,7 @@ public final class SVScanDocIdIterator implements ScanBasedDocIdIterator { private class FloatMatcher implements ValueMatcher { - private final float[] _buffer = new float[OPTIMAL_ITERATOR_BATCH_SIZE]; + private final float[] _buffer = new float[_batch.length]; @Override public boolean doesValueMatch(int docId) { @@ -407,7 +419,7 @@ public final class SVScanDocIdIterator implements ScanBasedDocIdIterator { private class FloatMatcherAndNullHandler implements ValueMatcher { private final ImmutableRoaringBitmap _nullBitmap; - private final float[] _buffer = new float[OPTIMAL_ITERATOR_BATCH_SIZE]; + private final float[] _buffer = new float[_batch.length]; public FloatMatcherAndNullHandler(ImmutableRoaringBitmap nullBitmap) { _nullBitmap = nullBitmap; @@ -431,7 +443,7 @@ public final class SVScanDocIdIterator implements ScanBasedDocIdIterator { private class DoubleMatcher implements ValueMatcher { - private final double[] _buffer = new double[OPTIMAL_ITERATOR_BATCH_SIZE]; + private final double[] _buffer = new double[_batch.length]; @Override public boolean doesValueMatch(int docId) { @@ -448,7 +460,7 @@ public final class SVScanDocIdIterator implements ScanBasedDocIdIterator { private class DoubleMatcherAndNullHandler implements ValueMatcher { private final ImmutableRoaringBitmap _nullBitmap; - private final double[] _buffer = new double[OPTIMAL_ITERATOR_BATCH_SIZE]; + private final double[] _buffer = new double[_batch.length]; public DoubleMatcherAndNullHandler(ImmutableRoaringBitmap nullBitmap) { _nullBitmap = nullBitmap; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ScanBasedDocIdIterator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ScanBasedDocIdIterator.java index 1ed890cc83..1caa526444 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ScanBasedDocIdIterator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ScanBasedDocIdIterator.java @@ -18,7 +18,9 @@ */ package org.apache.pinot.core.operator.dociditerators; +import java.util.OptionalInt; import org.apache.pinot.core.common.BlockDocIdIterator; +import org.roaringbitmap.BatchIterator; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; import org.roaringbitmap.buffer.MutableRoaringBitmap; @@ -34,10 +36,17 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap; */ public interface ScanBasedDocIdIterator extends BlockDocIdIterator { + MutableRoaringBitmap applyAnd(BatchIterator batchIterator, OptionalInt firstDoc, OptionalInt lastDoc); + /** * Applies AND operation to the given bitmap of document ids, returns a bitmap of the matching document ids. */ - MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds); + default MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) { + if (docIds.isEmpty()) { + return new MutableRoaringBitmap(); + } + return applyAnd(docIds.getBatchIterator(), OptionalInt.of(docIds.first()), OptionalInt.of(docIds.last())); + } /** * Returns the number of entries (SV value contains one entry, MV value contains multiple entries) scanned during the diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/SVScanDocIdSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/SVScanDocIdSet.java index 85ca55bacc..40fc00a621 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/SVScanDocIdSet.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/SVScanDocIdSet.java @@ -29,9 +29,9 @@ public final class SVScanDocIdSet implements BlockDocIdSet { private final SVScanDocIdIterator _docIdIterator; public SVScanDocIdSet(PredicateEvaluator predicateEvaluator, DataSource dataSource, int numDocs, - boolean nullHandlingEnabled) { + boolean nullHandlingEnabled, int batchSize) { NullValueVectorReader nullValueVector = nullHandlingEnabled ? dataSource.getNullValueVector() : null; - _docIdIterator = new SVScanDocIdIterator(predicateEvaluator, dataSource, numDocs, nullValueVector); + _docIdIterator = new SVScanDocIdIterator(predicateEvaluator, dataSource, numDocs, nullValueVector, batchSize); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java index 89305d24fb..b2241cfc6f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java @@ -21,6 +21,7 @@ package org.apache.pinot.core.operator.filter; import com.google.common.base.Preconditions; import java.util.Collections; import java.util.List; +import org.apache.pinot.core.common.BlockDocIdIterator; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.blocks.FilterBlock; import org.apache.pinot.core.operator.docidsets.MVScanDocIdSet; @@ -37,9 +38,15 @@ public class ScanBasedFilterOperator extends BaseFilterOperator { private final DataSource _dataSource; private final int _numDocs; private final boolean _nullHandlingEnabled; + private final int _batchSize; - ScanBasedFilterOperator(PredicateEvaluator predicateEvaluator, DataSource dataSource, int numDocs, + public ScanBasedFilterOperator(PredicateEvaluator predicateEvaluator, DataSource dataSource, int numDocs, boolean nullHandlingEnabled) { + this(predicateEvaluator, dataSource, numDocs, nullHandlingEnabled, BlockDocIdIterator.OPTIMAL_ITERATOR_BATCH_SIZE); + } + + public ScanBasedFilterOperator(PredicateEvaluator predicateEvaluator, DataSource dataSource, int numDocs, + boolean nullHandlingEnabled, int batchSize) { _predicateEvaluator = predicateEvaluator; _dataSource = dataSource; _numDocs = numDocs; @@ -47,13 +54,15 @@ public class ScanBasedFilterOperator extends BaseFilterOperator { Preconditions.checkState(_dataSource.getForwardIndex() != null, "Forward index disabled for column: %s, scan based filtering not supported!", _dataSource.getDataSourceMetadata().getFieldSpec().getName()); + _batchSize = batchSize; } @Override protected FilterBlock getNextBlock() { DataSourceMetadata dataSourceMetadata = _dataSource.getDataSourceMetadata(); if (dataSourceMetadata.isSingleValue()) { - return new FilterBlock(new SVScanDocIdSet(_predicateEvaluator, _dataSource, _numDocs, _nullHandlingEnabled)); + return new FilterBlock(new SVScanDocIdSet(_predicateEvaluator, _dataSource, _numDocs, _nullHandlingEnabled, + _batchSize)); } else { return new FilterBlock(new MVScanDocIdSet(_predicateEvaluator, _dataSource, _numDocs)); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org