This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 221db828cf Let applyAnd to be applied using different window sizes 
(#10372)
221db828cf is described below

commit 221db828cf535eb751267124bca35c9a3b1f9c52
Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com>
AuthorDate: Thu Mar 9 19:45:33 2023 +0100

    Let applyAnd to be applied using different window sizes (#10372)
---
 .../core/operator/BitmapDocIdSetOperator.java      |  5 +++
 .../ExpressionScanDocIdIterator.java               | 16 +++++++
 .../dociditerators/MVScanDocIdIterator.java        | 20 ++++++---
 .../dociditerators/SVScanDocIdIterator.java        | 50 ++++++++++++++--------
 .../dociditerators/ScanBasedDocIdIterator.java     | 11 ++++-
 .../core/operator/docidsets/SVScanDocIdSet.java    |  4 +-
 .../operator/filter/ScanBasedFilterOperator.java   | 13 +++++-
 7 files changed, 89 insertions(+), 30 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
index b251c552d8..4f203599c1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/BitmapDocIdSetOperator.java
@@ -52,6 +52,11 @@ public class BitmapDocIdSetOperator extends 
BaseOperator<DocIdSetBlock> {
     _docIdBuffer = new int[Math.min(numDocs, 
DocIdSetPlanNode.MAX_DOC_PER_CALL)];
   }
 
+  public BitmapDocIdSetOperator(IntIterator intIterator, int[] docIdBuffer) {
+    _intIterator = intIterator;
+    _docIdBuffer = docIdBuffer;
+  }
+
   public BitmapDocIdSetOperator(ImmutableBitmapDataProvider bitmap, int[] 
docIdBuffer) {
     _intIterator = bitmap.getIntIterator();
     _docIdBuffer = docIdBuffer;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
index 64987293bc..b5b2273644 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ExpressionScanDocIdIterator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.operator.dociditerators;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalInt;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.BitmapDocIdSetOperator;
@@ -33,7 +34,9 @@ import 
org.apache.pinot.core.operator.transform.function.TransformFunction;
 import org.apache.pinot.core.plan.DocIdSetPlanNode;
 import org.apache.pinot.segment.spi.Constants;
 import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.roaringbitmap.BatchIterator;
 import org.roaringbitmap.BitmapDataProvider;
+import org.roaringbitmap.IntIterator;
 import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.RoaringBitmap;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
@@ -110,6 +113,19 @@ public final class ExpressionScanDocIdIterator implements 
ScanBasedDocIdIterator
     return next();
   }
 
+  @Override
+  public MutableRoaringBitmap applyAnd(BatchIterator batchIterator, 
OptionalInt firstDoc, OptionalInt lastDoc) {
+    IntIterator intIterator = batchIterator.asIntIterator(new 
int[OPTIMAL_ITERATOR_BATCH_SIZE]);
+    ProjectionOperator projectionOperator =
+        new ProjectionOperator(_dataSourceMap, new 
BitmapDocIdSetOperator(intIterator, _docIdBuffer));
+    MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap();
+    ProjectionBlock projectionBlock;
+    while ((projectionBlock = projectionOperator.nextBlock()) != null) {
+      processProjectionBlock(projectionBlock, matchingDocIds);
+    }
+    return matchingDocIds;
+  }
+
   @Override
   public MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) {
     ProjectionOperator projectionOperator =
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java
index 558c4003d9..1c794e81c0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/MVScanDocIdIterator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.operator.dociditerators;
 
+import java.util.OptionalInt;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.segment.spi.Constants;
 import org.apache.pinot.segment.spi.datasource.DataSource;
@@ -26,7 +27,6 @@ import 
org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.spi.utils.CommonConstants.Query.OptimizationConstants;
 import org.roaringbitmap.BatchIterator;
 import org.roaringbitmap.RoaringBitmapWriter;
-import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
@@ -79,13 +79,21 @@ public final class MVScanDocIdIterator implements 
ScanBasedDocIdIterator {
   }
 
   @Override
-  public MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) {
-    if (docIds.isEmpty()) {
+  public MutableRoaringBitmap applyAnd(BatchIterator docIdIterator, 
OptionalInt firstDoc, OptionalInt lastDoc) {
+    if (!docIdIterator.hasNext()) {
       return new MutableRoaringBitmap();
     }
-    RoaringBitmapWriter<MutableRoaringBitmap> result = 
RoaringBitmapWriter.bufferWriter()
-        .expectedRange(docIds.first(), docIds.last()).runCompress(false).get();
-    BatchIterator docIdIterator = docIds.getBatchIterator();
+    RoaringBitmapWriter<MutableRoaringBitmap> result;
+    if (firstDoc.isPresent() && lastDoc.isPresent()) {
+      result = RoaringBitmapWriter.bufferWriter()
+          .expectedRange(firstDoc.getAsInt(), lastDoc.getAsInt())
+          .runCompress(false)
+          .get();
+    } else {
+      result = RoaringBitmapWriter.bufferWriter()
+          .runCompress(false)
+          .get();
+    }
     int[] buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
     while (docIdIterator.hasNext()) {
       int limit = docIdIterator.nextBatch(buffer);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java
index 1a4f9c2c54..baf07c16aa 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/SVScanDocIdIterator.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pinot.core.operator.dociditerators;
 
+import java.util.OptionalInt;
 import javax.annotation.Nullable;
+import org.apache.pinot.core.common.BlockDocIdIterator;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.segment.spi.Constants;
 import org.apache.pinot.segment.spi.datasource.DataSource;
@@ -44,7 +46,7 @@ public final class SVScanDocIdIterator implements 
ScanBasedDocIdIterator {
   private final ForwardIndexReaderContext _readerContext;
   private final int _numDocs;
   private final ValueMatcher _valueMatcher;
-  private final int[] _batch = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
+  private final int[] _batch;
   private int _firstMismatch;
   private int _cursor;
   private final int _cardinality;
@@ -53,7 +55,8 @@ public final class SVScanDocIdIterator implements 
ScanBasedDocIdIterator {
   private long _numEntriesScanned = 0L;
 
   public SVScanDocIdIterator(PredicateEvaluator predicateEvaluator, DataSource 
dataSource, int numDocs,
-      @Nullable NullValueVectorReader nullValueReader) {
+      @Nullable NullValueVectorReader nullValueReader, int batchSize) {
+    _batch = new int[batchSize];
     _predicateEvaluator = predicateEvaluator;
     _reader = dataSource.getForwardIndex();
     _readerContext = _reader.createContext();
@@ -69,6 +72,7 @@ public final class SVScanDocIdIterator implements 
ScanBasedDocIdIterator {
   // for testing
   public SVScanDocIdIterator(PredicateEvaluator predicateEvaluator, 
ForwardIndexReader reader, int numDocs,
       @Nullable NullValueVectorReader nullValueReader) {
+    _batch = new int[BlockDocIdIterator.OPTIMAL_ITERATOR_BATCH_SIZE];
     _predicateEvaluator = predicateEvaluator;
     _reader = reader;
     _readerContext = reader.createContext();
@@ -87,7 +91,7 @@ public final class SVScanDocIdIterator implements 
ScanBasedDocIdIterator {
       int limit;
       int batchSize = 0;
       do {
-        limit = Math.min(_numDocs - _nextDocId, OPTIMAL_ITERATOR_BATCH_SIZE);
+        limit = Math.min(_numDocs - _nextDocId, _batch.length);
         if (limit > 0) {
           for (int i = 0; i < limit; i++) {
             _batch[i] = _nextDocId + i;
@@ -121,14 +125,22 @@ public final class SVScanDocIdIterator implements 
ScanBasedDocIdIterator {
   }
 
   @Override
-  public MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) {
-    if (docIds.isEmpty()) {
+  public MutableRoaringBitmap applyAnd(BatchIterator docIdIterator, 
OptionalInt firstDoc, OptionalInt lastDoc) {
+    if (!docIdIterator.hasNext()) {
       return new MutableRoaringBitmap();
     }
-    RoaringBitmapWriter<MutableRoaringBitmap> result = 
RoaringBitmapWriter.bufferWriter()
-        .expectedRange(docIds.first(), docIds.last()).runCompress(false).get();
-    BatchIterator docIdIterator = docIds.getBatchIterator();
-    int[] buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
+    RoaringBitmapWriter<MutableRoaringBitmap> result;
+    if (firstDoc.isPresent() && lastDoc.isPresent()) {
+      result = RoaringBitmapWriter.bufferWriter()
+          .expectedRange(firstDoc.getAsInt(), lastDoc.getAsInt())
+          .runCompress(false)
+          .get();
+    } else {
+      result = RoaringBitmapWriter.bufferWriter()
+          .runCompress(false)
+          .get();
+    }
+    int[] buffer = new int[_batch.length];
     while (docIdIterator.hasNext()) {
       int limit = docIdIterator.nextBatch(buffer);
       if (limit > 0) {
@@ -264,7 +276,7 @@ public final class SVScanDocIdIterator implements 
ScanBasedDocIdIterator {
 
   private class DictIdMatcher implements ValueMatcher {
 
-    private final int[] _buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
+    private final int[] _buffer = new int[_batch.length];
 
     @Override
     public boolean doesValueMatch(int docId) {
@@ -280,7 +292,7 @@ public final class SVScanDocIdIterator implements 
ScanBasedDocIdIterator {
 
   private class DictIdMatcherAndNullHandler implements ValueMatcher {
 
-    private final int[] _buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
+    private final int[] _buffer = new int[_batch.length];
     private final ImmutableRoaringBitmap _nullBitmap;
 
     public DictIdMatcherAndNullHandler(ImmutableRoaringBitmap nullBitmap) {
@@ -308,7 +320,7 @@ public final class SVScanDocIdIterator implements 
ScanBasedDocIdIterator {
 
   private class IntMatcher implements ValueMatcher {
 
-    private final int[] _buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
+    private final int[] _buffer = new int[_batch.length];
 
     @Override
     public boolean doesValueMatch(int docId) {
@@ -325,7 +337,7 @@ public final class SVScanDocIdIterator implements 
ScanBasedDocIdIterator {
   private class IntMatcherAndNullHandler implements ValueMatcher {
 
     private final ImmutableRoaringBitmap _nullBitmap;
-    private final int[] _buffer = new int[OPTIMAL_ITERATOR_BATCH_SIZE];
+    private final int[] _buffer = new int[_batch.length];
 
     public IntMatcherAndNullHandler(ImmutableRoaringBitmap nullBitmap) {
       _nullBitmap = nullBitmap;
@@ -349,7 +361,7 @@ public final class SVScanDocIdIterator implements 
ScanBasedDocIdIterator {
 
   private class LongMatcher implements ValueMatcher {
 
-    private final long[] _buffer = new long[OPTIMAL_ITERATOR_BATCH_SIZE];
+    private final long[] _buffer = new long[_batch.length];
 
     @Override
     public boolean doesValueMatch(int docId) {
@@ -366,7 +378,7 @@ public final class SVScanDocIdIterator implements 
ScanBasedDocIdIterator {
   private class LongMatcherAndNullHandler implements ValueMatcher {
 
     private final ImmutableRoaringBitmap _nullBitmap;
-    private final long[] _buffer = new long[OPTIMAL_ITERATOR_BATCH_SIZE];
+    private final long[] _buffer = new long[_batch.length];
 
     public LongMatcherAndNullHandler(ImmutableRoaringBitmap nullBitmap) {
       _nullBitmap = nullBitmap;
@@ -390,7 +402,7 @@ public final class SVScanDocIdIterator implements 
ScanBasedDocIdIterator {
 
   private class FloatMatcher implements ValueMatcher {
 
-    private final float[] _buffer = new float[OPTIMAL_ITERATOR_BATCH_SIZE];
+    private final float[] _buffer = new float[_batch.length];
 
     @Override
     public boolean doesValueMatch(int docId) {
@@ -407,7 +419,7 @@ public final class SVScanDocIdIterator implements 
ScanBasedDocIdIterator {
   private class FloatMatcherAndNullHandler implements ValueMatcher {
 
     private final ImmutableRoaringBitmap _nullBitmap;
-    private final float[] _buffer = new float[OPTIMAL_ITERATOR_BATCH_SIZE];
+    private final float[] _buffer = new float[_batch.length];
 
     public FloatMatcherAndNullHandler(ImmutableRoaringBitmap nullBitmap) {
       _nullBitmap = nullBitmap;
@@ -431,7 +443,7 @@ public final class SVScanDocIdIterator implements 
ScanBasedDocIdIterator {
 
   private class DoubleMatcher implements ValueMatcher {
 
-    private final double[] _buffer = new double[OPTIMAL_ITERATOR_BATCH_SIZE];
+    private final double[] _buffer = new double[_batch.length];
 
     @Override
     public boolean doesValueMatch(int docId) {
@@ -448,7 +460,7 @@ public final class SVScanDocIdIterator implements 
ScanBasedDocIdIterator {
   private class DoubleMatcherAndNullHandler implements ValueMatcher {
 
     private final ImmutableRoaringBitmap _nullBitmap;
-    private final double[] _buffer = new double[OPTIMAL_ITERATOR_BATCH_SIZE];
+    private final double[] _buffer = new double[_batch.length];
 
     public DoubleMatcherAndNullHandler(ImmutableRoaringBitmap nullBitmap) {
       _nullBitmap = nullBitmap;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ScanBasedDocIdIterator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ScanBasedDocIdIterator.java
index 1ed890cc83..1caa526444 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ScanBasedDocIdIterator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/dociditerators/ScanBasedDocIdIterator.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pinot.core.operator.dociditerators;
 
+import java.util.OptionalInt;
 import org.apache.pinot.core.common.BlockDocIdIterator;
+import org.roaringbitmap.BatchIterator;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
@@ -34,10 +36,17 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
  */
 public interface ScanBasedDocIdIterator extends BlockDocIdIterator {
 
+  MutableRoaringBitmap applyAnd(BatchIterator batchIterator, OptionalInt 
firstDoc, OptionalInt lastDoc);
+
   /**
    * Applies AND operation to the given bitmap of document ids, returns a 
bitmap of the matching document ids.
    */
-  MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds);
+  default MutableRoaringBitmap applyAnd(ImmutableRoaringBitmap docIds) {
+    if (docIds.isEmpty()) {
+      return new MutableRoaringBitmap();
+    }
+    return applyAnd(docIds.getBatchIterator(), OptionalInt.of(docIds.first()), 
OptionalInt.of(docIds.last()));
+  }
 
   /**
    * Returns the number of entries (SV value contains one entry, MV value 
contains multiple entries) scanned during the
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/SVScanDocIdSet.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/SVScanDocIdSet.java
index 85ca55bacc..40fc00a621 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/SVScanDocIdSet.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/docidsets/SVScanDocIdSet.java
@@ -29,9 +29,9 @@ public final class SVScanDocIdSet implements BlockDocIdSet {
   private final SVScanDocIdIterator _docIdIterator;
 
   public SVScanDocIdSet(PredicateEvaluator predicateEvaluator, DataSource 
dataSource, int numDocs,
-      boolean nullHandlingEnabled) {
+      boolean nullHandlingEnabled, int batchSize) {
     NullValueVectorReader nullValueVector = nullHandlingEnabled ? 
dataSource.getNullValueVector() : null;
-    _docIdIterator = new SVScanDocIdIterator(predicateEvaluator, dataSource, 
numDocs, nullValueVector);
+    _docIdIterator = new SVScanDocIdIterator(predicateEvaluator, dataSource, 
numDocs, nullValueVector, batchSize);
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java
index 89305d24fb..b2241cfc6f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.operator.filter;
 import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.List;
+import org.apache.pinot.core.common.BlockDocIdIterator;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.FilterBlock;
 import org.apache.pinot.core.operator.docidsets.MVScanDocIdSet;
@@ -37,9 +38,15 @@ public class ScanBasedFilterOperator extends 
BaseFilterOperator {
   private final DataSource _dataSource;
   private final int _numDocs;
   private final boolean _nullHandlingEnabled;
+  private final int _batchSize;
 
-  ScanBasedFilterOperator(PredicateEvaluator predicateEvaluator, DataSource 
dataSource, int numDocs,
+  public ScanBasedFilterOperator(PredicateEvaluator predicateEvaluator, 
DataSource dataSource, int numDocs,
       boolean nullHandlingEnabled) {
+    this(predicateEvaluator, dataSource, numDocs, nullHandlingEnabled, 
BlockDocIdIterator.OPTIMAL_ITERATOR_BATCH_SIZE);
+  }
+
+  public ScanBasedFilterOperator(PredicateEvaluator predicateEvaluator, 
DataSource dataSource, int numDocs,
+      boolean nullHandlingEnabled, int batchSize) {
     _predicateEvaluator = predicateEvaluator;
     _dataSource = dataSource;
     _numDocs = numDocs;
@@ -47,13 +54,15 @@ public class ScanBasedFilterOperator extends 
BaseFilterOperator {
     Preconditions.checkState(_dataSource.getForwardIndex() != null,
         "Forward index disabled for column: %s, scan based filtering not 
supported!",
         _dataSource.getDataSourceMetadata().getFieldSpec().getName());
+    _batchSize = batchSize;
   }
 
   @Override
   protected FilterBlock getNextBlock() {
     DataSourceMetadata dataSourceMetadata = 
_dataSource.getDataSourceMetadata();
     if (dataSourceMetadata.isSingleValue()) {
-      return new FilterBlock(new SVScanDocIdSet(_predicateEvaluator, 
_dataSource, _numDocs, _nullHandlingEnabled));
+      return new FilterBlock(new SVScanDocIdSet(_predicateEvaluator, 
_dataSource, _numDocs, _nullHandlingEnabled,
+          _batchSize));
     } else {
       return new FilterBlock(new MVScanDocIdSet(_predicateEvaluator, 
_dataSource, _numDocs));
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to