This is an automated email from the ASF dual-hosted git repository.

richardstartin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2279e71  accelerate counts over bitmap based filters (#8411)
2279e71 is described below

commit 2279e71416aeae4182257e7b682757f0222ca1a8
Author: Richard Startin <rich...@startree.ai>
AuthorDate: Wed Mar 30 18:34:35 2022 +0100

    accelerate counts over bitmap based filters (#8411)
---
 .../core/operator/filter/AndFilterOperator.java    |  24 ++
 .../core/operator/filter/BaseFilterOperator.java   |  28 ++
 .../operator/filter/BitmapBasedFilterOperator.java |  70 ++++-
 .../core/operator/filter/BitmapCollection.java     | 129 +++++++++
 .../core/operator/filter/EmptyFilterOperator.java  |  10 +
 .../operator/filter/JsonMatchFilterOperator.java   |  20 ++
 .../operator/filter/MatchAllFilterOperator.java    |  10 +
 .../core/operator/filter/NotFilterOperator.java    |  20 ++
 .../core/operator/filter/OrFilterOperator.java     |  23 ++
 .../filter/SortedIndexBasedFilterOperator.java     |  87 ++++++
 .../operator/filter/TextMatchFilterOperator.java   |  20 ++
 .../operator/query/FastFilteredCountOperator.java  |  82 ++++++
 .../pinot/core/plan/AggregationPlanNode.java       |  12 +-
 .../core/operator/filter/BitmapCollectionTest.java | 237 ++++++++++++++++
 ...adataAndDictionaryAggregationPlanMakerTest.java |  10 +-
 .../pinot/queries/ExplainPlanQueriesTest.java      |  60 +++--
 .../pinot/queries/FastFilteredCountTest.java       | 300 +++++++++++++++++++++
 .../pinot/queries/NotOperatorQueriesTest.java      |   5 +-
 .../pinot/queries/TextSearchQueriesTest.java       |   4 +-
 .../org/apache/pinot/perf/BenchmarkQueries.java    |  25 +-
 20 files changed, 1136 insertions(+), 40 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/AndFilterOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/AndFilterOperator.java
index b964819..dae8e14 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/AndFilterOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/AndFilterOperator.java
@@ -24,6 +24,8 @@ import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.FilterBlock;
 import org.apache.pinot.core.operator.docidsets.AndDocIdSet;
 import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
+import org.roaringbitmap.buffer.BufferFastAggregation;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 
 
 public class AndFilterOperator extends BaseFilterOperator {
@@ -46,6 +48,28 @@ public class AndFilterOperator extends BaseFilterOperator {
   }
 
   @Override
+  public boolean canOptimizeCount() {
+    boolean allChildrenCanProduceBitmaps = true;
+    for (BaseFilterOperator child : _filterOperators) {
+      allChildrenCanProduceBitmaps &= child.canProduceBitmaps();
+    }
+    return allChildrenCanProduceBitmaps;
+  }
+
+  @Override
+  public int getNumMatchingDocs() {
+    if (_filterOperators.size() == 2) {
+      return 
_filterOperators.get(0).getBitmaps().andCardinality(_filterOperators.get(1).getBitmaps());
+    }
+    ImmutableRoaringBitmap[] bitmaps = new 
ImmutableRoaringBitmap[_filterOperators.size()];
+    int i = 0;
+    for (BaseFilterOperator child : _filterOperators) {
+      bitmaps[i++] = child.getBitmaps().reduce();
+    }
+    return BufferFastAggregation.and(bitmaps).getCardinality();
+  }
+
+  @Override
   public String getOperatorName() {
     return OPERATOR_NAME;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BaseFilterOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BaseFilterOperator.java
index 3d8e1b3..eb70c89 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BaseFilterOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BaseFilterOperator.java
@@ -40,4 +40,32 @@ public abstract class BaseFilterOperator extends 
BaseOperator<FilterBlock> {
   public boolean isResultMatchingAll() {
     return false;
   }
+
+  /**
+   * Returns {@code true} if the filter has an optimized count implementation.
+   */
+  public boolean canOptimizeCount() {
+    return false;
+  }
+
+  /**
+   * @return the number of matching docs, or throws if it cannot produce this 
count.
+   */
+  public int getNumMatchingDocs() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * @return true if the filter operator can produce a bitmap of docIds
+   */
+  public boolean canProduceBitmaps() {
+    return false;
+  }
+
+  /**
+   * @return bitmaps of matching docIds
+   */
+  public BitmapCollection getBitmaps() {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java
index 2e619a0..4ce8410 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java
@@ -38,14 +38,15 @@ public class BitmapBasedFilterOperator extends 
BaseFilterOperator {
   private static final String EXPLAIN_NAME = "FILTER_INVERTED_INDEX";
 
   private final PredicateEvaluator _predicateEvaluator;
-  private final InvertedIndexReader _invertedIndexReader;
+  private final InvertedIndexReader<ImmutableRoaringBitmap> 
_invertedIndexReader;
   private final ImmutableRoaringBitmap _docIds;
   private final boolean _exclusive;
   private final int _numDocs;
 
+  @SuppressWarnings("unchecked")
   BitmapBasedFilterOperator(PredicateEvaluator predicateEvaluator, DataSource 
dataSource, int numDocs) {
     _predicateEvaluator = predicateEvaluator;
-    _invertedIndexReader = dataSource.getInvertedIndex();
+    _invertedIndexReader = (InvertedIndexReader<ImmutableRoaringBitmap>) 
dataSource.getInvertedIndex();
     _docIds = null;
     _exclusive = predicateEvaluator.isExclusive();
     _numDocs = numDocs;
@@ -75,7 +76,7 @@ public class BitmapBasedFilterOperator extends 
BaseFilterOperator {
       return EmptyFilterBlock.getInstance();
     }
     if (numDictIds == 1) {
-      ImmutableRoaringBitmap docIds = (ImmutableRoaringBitmap) 
_invertedIndexReader.getDocIds(dictIds[0]);
+      ImmutableRoaringBitmap docIds = 
_invertedIndexReader.getDocIds(dictIds[0]);
       if (_exclusive) {
         if (docIds instanceof MutableRoaringBitmap) {
           MutableRoaringBitmap mutableRoaringBitmap = (MutableRoaringBitmap) 
docIds;
@@ -90,7 +91,7 @@ public class BitmapBasedFilterOperator extends 
BaseFilterOperator {
     } else {
       ImmutableRoaringBitmap[] bitmaps = new 
ImmutableRoaringBitmap[numDictIds];
       for (int i = 0; i < numDictIds; i++) {
-        bitmaps[i] = (ImmutableRoaringBitmap) 
_invertedIndexReader.getDocIds(dictIds[i]);
+        bitmaps[i] = _invertedIndexReader.getDocIds(dictIds[i]);
       }
       MutableRoaringBitmap docIds = ImmutableRoaringBitmap.or(bitmaps);
       if (_exclusive) {
@@ -101,6 +102,67 @@ public class BitmapBasedFilterOperator extends 
BaseFilterOperator {
   }
 
   @Override
+  public boolean canOptimizeCount() {
+    return true;
+  }
+
+  @Override
+  public int getNumMatchingDocs() {
+    int count = 0;
+    if (_docIds == null) {
+      int[] dictIds = _exclusive
+          ? _predicateEvaluator.getNonMatchingDictIds()
+          : _predicateEvaluator.getMatchingDictIds();
+      switch (dictIds.length) {
+        case 0:
+          break;
+        case 1: {
+          count = _invertedIndexReader.getDocIds(dictIds[0]).getCardinality();
+          break;
+        }
+        case 2: {
+          count = 
ImmutableRoaringBitmap.orCardinality(_invertedIndexReader.getDocIds(dictIds[0]),
+              _invertedIndexReader.getDocIds(dictIds[1]));
+          break;
+        }
+        default: {
+          // this could be optimised if the bitmaps are known to be disjoint 
(as in a single value bitmap index)
+          MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+          for (int dictId : dictIds) {
+            bitmap.or(_invertedIndexReader.getDocIds(dictId));
+          }
+          count = bitmap.getCardinality();
+          break;
+        }
+      }
+    } else {
+      count = _docIds.getCardinality();
+    }
+    return _exclusive ? _numDocs - count : count;
+  }
+
+  @Override
+  public boolean canProduceBitmaps() {
+    return true;
+  }
+
+  @Override
+  public BitmapCollection getBitmaps() {
+    if (_docIds == null) {
+      int[] dictIds = _exclusive
+          ? _predicateEvaluator.getNonMatchingDictIds()
+          : _predicateEvaluator.getMatchingDictIds();
+      ImmutableRoaringBitmap[] bitmaps = new 
ImmutableRoaringBitmap[dictIds.length];
+      for (int i = 0; i < dictIds.length; i++) {
+        bitmaps[i] = _invertedIndexReader.getDocIds(dictIds[i]);
+      }
+      return new BitmapCollection(_numDocs, _exclusive, bitmaps);
+    } else {
+      return new BitmapCollection(_numDocs, _exclusive, _docIds);
+    }
+  }
+
+  @Override
   public String getOperatorName() {
     return OPERATOR_NAME;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapCollection.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapCollection.java
new file mode 100644
index 0000000..b3f40ed
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapCollection.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import org.roaringbitmap.buffer.BufferFastAggregation;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+/**
+ * Encapsulates a collection of bitmaps, and allows inversion without 
modifying the bitmaps.
+ * Provides simplified access to efficient cardinality calculation which work 
regardless of
+ * inversion status without computing the complement of the union of the 
bitmaps.
+ */
+public class BitmapCollection {
+  private final int _numDocs;
+  private boolean _inverted;
+  private final ImmutableRoaringBitmap[] _bitmaps;
+
+  public BitmapCollection(int numDocs, boolean inverted, 
ImmutableRoaringBitmap... bitmaps) {
+    _numDocs = numDocs;
+    _inverted = inverted;
+    _bitmaps = bitmaps;
+  }
+
+  /**
+   * Inverts the bitmaps in constant time and space.
+   * @return this bitmap collection inverted.
+   */
+  public BitmapCollection invert() {
+    _inverted = !_inverted;
+    return this;
+  }
+
+  /**
+   * Computes the size of the intersection of the bitmaps efficiently 
regardless of negation, without
+   * needing to invert inputs or materialize an intermediate bitmap.
+   *
+   * @param bitmaps to intersect with
+   * @return the size of the intersection of the bitmaps in this collection 
and in the other collection
+   */
+  public int andCardinality(BitmapCollection bitmaps) {
+    ImmutableRoaringBitmap left = reduceInternal();
+    ImmutableRoaringBitmap right = bitmaps.reduceInternal();
+    if (!_inverted) {
+      if (!bitmaps._inverted) {
+        return ImmutableRoaringBitmap.andCardinality(left, right);
+      }
+      return ImmutableRoaringBitmap.andNotCardinality(left, right);
+    } else {
+      if (!bitmaps._inverted) {
+        return ImmutableRoaringBitmap.andNotCardinality(right, left);
+      }
+      return _numDocs - ImmutableRoaringBitmap.orCardinality(left, right);
+    }
+  }
+
+  /**
+   * Computes the size of the union of the bitmaps efficiently regardless of 
negation, without
+   * needing to invert inputs or materialize an intermediate bitmap. If either 
this collection
+   * or the other collection has more than one bitmap, the union will be 
materialized.
+   *
+   * @param bitmaps to intersect with
+   * @return the size of the union of the bitmaps in this collection and in 
the other collection
+   */
+  public int orCardinality(BitmapCollection bitmaps) {
+    ImmutableRoaringBitmap left = reduceInternal();
+    ImmutableRoaringBitmap right = bitmaps.reduceInternal();
+    if (!_inverted) {
+      if (!bitmaps._inverted) {
+        return ImmutableRoaringBitmap.orCardinality(left, right);
+      }
+      return _numDocs - right.getCardinality() - 
ImmutableRoaringBitmap.andCardinality(left, right);
+    } else {
+      if (!bitmaps._inverted) {
+        return _numDocs - left.getCardinality()
+            + ImmutableRoaringBitmap.andCardinality(right, left);
+      }
+      return _numDocs - ImmutableRoaringBitmap.andCardinality(left, right);
+    }
+  }
+
+  private ImmutableRoaringBitmap reduceInternal() {
+    if (_bitmaps.length == 1) {
+      return _bitmaps[0];
+    }
+    return BufferFastAggregation.or(_bitmaps);
+  }
+
+  /**
+   * Reduces the bitmaps to a single bitmap. In common cases, when the 
collection
+   * is not inverted and only has one bitmap, this operation is cheap. However,
+   * this may be a costly operation: a new bitmap may be allocated, one or many
+   * bitmaps may need to be inverted. Prefer {@see andCardinality} or {@see 
orCardinality}
+   * when appropriate.
+   * @return a bitmap
+   */
+  public ImmutableRoaringBitmap reduce() {
+    if (!_inverted) {
+      return reduceInternal();
+    }
+    return invertedOr();
+  }
+
+  private MutableRoaringBitmap invertedOr() {
+    MutableRoaringBitmap complement = new MutableRoaringBitmap();
+    complement.add(0L, _numDocs);
+    for (ImmutableRoaringBitmap bitmap : _bitmaps) {
+      complement.andNot(bitmap);
+    }
+    return complement;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/EmptyFilterOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/EmptyFilterOperator.java
index ff148e8..18dbd95 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/EmptyFilterOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/EmptyFilterOperator.java
@@ -47,6 +47,16 @@ public final class EmptyFilterOperator extends 
BaseFilterOperator {
   }
 
   @Override
+  public boolean canOptimizeCount() {
+    return true;
+  }
+
+  @Override
+  public int getNumMatchingDocs() {
+    return 0;
+  }
+
+  @Override
   protected FilterBlock getNextBlock() {
     return EmptyFilterBlock.getInstance();
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JsonMatchFilterOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JsonMatchFilterOperator.java
index 791e84e..efac5de 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JsonMatchFilterOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JsonMatchFilterOperator.java
@@ -51,6 +51,26 @@ public class JsonMatchFilterOperator extends 
BaseFilterOperator {
   }
 
   @Override
+  public boolean canOptimizeCount() {
+    return true;
+  }
+
+  @Override
+  public int getNumMatchingDocs() {
+    return 
_jsonIndex.getMatchingDocIds(_predicate.getValue()).getCardinality();
+  }
+
+  @Override
+  public boolean canProduceBitmaps() {
+    return true;
+  }
+
+  @Override
+  public BitmapCollection getBitmaps() {
+    return new BitmapCollection(_numDocs, false, 
_jsonIndex.getMatchingDocIds(_predicate.getValue()));
+  }
+
+  @Override
   public String getOperatorName() {
     return OPERATOR_NAME;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MatchAllFilterOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MatchAllFilterOperator.java
index b629703..7694f0b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MatchAllFilterOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MatchAllFilterOperator.java
@@ -55,6 +55,16 @@ public class MatchAllFilterOperator extends 
BaseFilterOperator {
   }
 
   @Override
+  public boolean canOptimizeCount() {
+    return true;
+  }
+
+  @Override
+  public int getNumMatchingDocs() {
+    return _numDocs;
+  }
+
+  @Override
   public String toExplainString() {
     return new 
StringBuilder(EXPLAIN_NAME).append("(docs:").append(_numDocs).append(')').toString();
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/NotFilterOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/NotFilterOperator.java
index f1aeb50..044d2f6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/NotFilterOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/NotFilterOperator.java
@@ -58,6 +58,26 @@ public class NotFilterOperator extends BaseFilterOperator {
     return new FilterBlock(new 
NotDocIdSet(_filterOperator.nextBlock().getBlockDocIdSet(), _numDocs));
   }
 
+  @Override
+  public boolean canOptimizeCount() {
+    return _filterOperator.canOptimizeCount();
+  }
+
+  @Override
+  public int getNumMatchingDocs() {
+    return _numDocs - _filterOperator.getNumMatchingDocs();
+  }
+
+  @Override
+  public boolean canProduceBitmaps() {
+    return _filterOperator.canProduceBitmaps();
+  }
+
+  @Override
+  public BitmapCollection getBitmaps() {
+    return _filterOperator.getBitmaps().invert();
+  }
+
   public BaseFilterOperator getChildFilterOperator() {
     return _filterOperator;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/OrFilterOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/OrFilterOperator.java
index 591c8ed..2e32d8f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/OrFilterOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/OrFilterOperator.java
@@ -24,6 +24,8 @@ import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.FilterBlock;
 import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
 import org.apache.pinot.core.operator.docidsets.OrDocIdSet;
+import org.roaringbitmap.buffer.BufferFastAggregation;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 
 
 public class OrFilterOperator extends BaseFilterOperator {
@@ -60,4 +62,25 @@ public class OrFilterOperator extends BaseFilterOperator {
   public List<Operator> getChildOperators() {
     return new ArrayList<>(_filterOperators);
   }
+
+  @Override
+  public boolean canOptimizeCount() {
+    boolean allChildrenProduceBitmaps = true;
+    for (BaseFilterOperator child : _filterOperators) {
+      allChildrenProduceBitmaps &= child.canProduceBitmaps();
+    }
+    return allChildrenProduceBitmaps;
+  }
+
+  @Override
+  public int getNumMatchingDocs() {
+    if (_filterOperators.size() == 2) {
+      return 
_filterOperators.get(0).getBitmaps().orCardinality(_filterOperators.get(1).getBitmaps());
+    }
+    ImmutableRoaringBitmap[] bitmaps = new 
ImmutableRoaringBitmap[_filterOperators.size()];
+    for (int i = 0; i < _filterOperators.size(); i++) {
+      bitmaps[i] = _filterOperators.get(i).getBitmaps().reduce();
+    }
+    return BufferFastAggregation.or(bitmaps).getCardinality();
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/SortedIndexBasedFilterOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/SortedIndexBasedFilterOperator.java
index 861fda0..291f385 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/SortedIndexBasedFilterOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/SortedIndexBasedFilterOperator.java
@@ -31,6 +31,7 @@ import 
org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFa
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.index.reader.SortedIndexReader;
 import org.apache.pinot.spi.utils.Pairs.IntPair;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
 public class SortedIndexBasedFilterOperator extends BaseFilterOperator {
@@ -133,6 +134,92 @@ public class SortedIndexBasedFilterOperator extends 
BaseFilterOperator {
   }
 
   @Override
+  public boolean canOptimizeCount() {
+    return true;
+  }
+
+  @Override
+  public int getNumMatchingDocs() {
+    int count = 0;
+    boolean exclusive = _predicateEvaluator.isExclusive();
+    if (_predicateEvaluator instanceof 
SortedDictionaryBasedRangePredicateEvaluator) {
+      // For RANGE predicate, use start/end document id to construct a new 
document id range
+      SortedDictionaryBasedRangePredicateEvaluator rangePredicateEvaluator =
+          (SortedDictionaryBasedRangePredicateEvaluator) _predicateEvaluator;
+      int startDocId = 
_sortedIndexReader.getDocIds(rangePredicateEvaluator.getStartDictId()).getLeft();
+      // NOTE: End dictionary id is exclusive in 
OfflineDictionaryBasedRangePredicateEvaluator.
+      int endDocId = 
_sortedIndexReader.getDocIds(rangePredicateEvaluator.getEndDictId() - 
1).getRight();
+      count = endDocId - startDocId + 1;
+    } else {
+      int[] dictIds =
+          exclusive ? _predicateEvaluator.getNonMatchingDictIds() : 
_predicateEvaluator.getMatchingDictIds();
+      int numDictIds = dictIds.length;
+      // NOTE: PredicateEvaluator without matching/non-matching dictionary ids 
should not reach here.
+      Preconditions.checkState(numDictIds > 0);
+      if (numDictIds == 1) {
+        IntPair docIdRange = _sortedIndexReader.getDocIds(dictIds[0]);
+        count = docIdRange.getRight() - docIdRange.getLeft() + 1;
+      } else {
+        IntPair lastDocIdRange = _sortedIndexReader.getDocIds(dictIds[0]);
+        for (int i = 1; i < numDictIds; i++) {
+          IntPair docIdRange = _sortedIndexReader.getDocIds(dictIds[i]);
+          if (docIdRange.getLeft() == lastDocIdRange.getRight() + 1) {
+            lastDocIdRange.setRight(docIdRange.getRight());
+          } else {
+            count += lastDocIdRange.getRight() - lastDocIdRange.getLeft() + 1;
+            lastDocIdRange = docIdRange;
+          }
+        }
+        count += lastDocIdRange.getRight() - lastDocIdRange.getLeft() + 1;
+      }
+    }
+    return exclusive ? _numDocs - count : count;
+  }
+
+  @Override
+  public boolean canProduceBitmaps() {
+    return true;
+  }
+
+  @Override
+  public BitmapCollection getBitmaps() {
+    MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+    boolean exclusive = _predicateEvaluator.isExclusive();
+    if (_predicateEvaluator instanceof 
SortedDictionaryBasedRangePredicateEvaluator) {
+      // For RANGE predicate, use start/end document id to construct a new 
document id range
+      SortedDictionaryBasedRangePredicateEvaluator rangePredicateEvaluator =
+          (SortedDictionaryBasedRangePredicateEvaluator) _predicateEvaluator;
+      int startDocId = 
_sortedIndexReader.getDocIds(rangePredicateEvaluator.getStartDictId()).getLeft();
+      // NOTE: End dictionary id is exclusive in 
OfflineDictionaryBasedRangePredicateEvaluator.
+      int endDocId = 
_sortedIndexReader.getDocIds(rangePredicateEvaluator.getEndDictId() - 
1).getRight();
+      bitmap.add(startDocId, endDocId + 1L);
+    } else {
+      int[] dictIds =
+          exclusive ? _predicateEvaluator.getNonMatchingDictIds() : 
_predicateEvaluator.getMatchingDictIds();
+      int numDictIds = dictIds.length;
+      // NOTE: PredicateEvaluator without matching/non-matching dictionary ids 
should not reach here.
+      Preconditions.checkState(numDictIds > 0);
+      if (numDictIds == 1) {
+        IntPair docIdRange = _sortedIndexReader.getDocIds(dictIds[0]);
+        bitmap.add(docIdRange.getLeft(), docIdRange.getRight() + 1L);
+      } else {
+        IntPair lastDocIdRange = _sortedIndexReader.getDocIds(dictIds[0]);
+        for (int i = 1; i < numDictIds; i++) {
+          IntPair docIdRange = _sortedIndexReader.getDocIds(dictIds[i]);
+          if (docIdRange.getLeft() == lastDocIdRange.getRight() + 1) {
+            lastDocIdRange.setRight(docIdRange.getRight());
+          } else {
+            bitmap.add(lastDocIdRange.getLeft(), lastDocIdRange.getRight() + 
1L);
+            lastDocIdRange = docIdRange;
+          }
+        }
+        bitmap.add(lastDocIdRange.getLeft(), lastDocIdRange.getRight() + 1L);
+      }
+    }
+    return new BitmapCollection(_numDocs, exclusive, bitmap);
+  }
+
+  @Override
   public String getOperatorName() {
     return OPERATOR_NAME;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/TextMatchFilterOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/TextMatchFilterOperator.java
index 09421dc..45b74be 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/TextMatchFilterOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/TextMatchFilterOperator.java
@@ -51,6 +51,26 @@ public class TextMatchFilterOperator extends 
BaseFilterOperator {
   }
 
   @Override
+  public boolean canOptimizeCount() {
+    return true;
+  }
+
+  @Override
+  public int getNumMatchingDocs() {
+    return _textIndexReader.getDocIds(_predicate.getValue()).getCardinality();
+  }
+
+  @Override
+  public boolean canProduceBitmaps() {
+    return true;
+  }
+
+  @Override
+  public BitmapCollection getBitmaps() {
+    return new BitmapCollection(_numDocs, false, 
_textIndexReader.getDocIds(_predicate.getValue()));
+  }
+
+  @Override
   public String getOperatorName() {
     return OPERATOR_NAME;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FastFilteredCountOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FastFilteredCountOperator.java
new file mode 100644
index 0000000..33a1845
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/FastFilteredCountOperator.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.filter.BaseFilterOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+
+
+@SuppressWarnings("rawtypes")
+public class FastFilteredCountOperator extends 
BaseOperator<IntermediateResultsBlock> {
+
+  private static final String EXPLAIN_NAME = "FAST_FILTERED_COUNT";
+
+  private final BaseFilterOperator _filterOperator;
+  private final AggregationFunction[] _aggregationFunctions;
+  private final SegmentMetadata _segmentMetadata;
+
+  private long _docsCounted;
+
+  public FastFilteredCountOperator(AggregationFunction[] aggregationFunctions, 
BaseFilterOperator filterOperator,
+      SegmentMetadata segmentMetadata) {
+    _filterOperator = filterOperator;
+    _segmentMetadata = segmentMetadata;
+    _aggregationFunctions = aggregationFunctions;
+  }
+
+  @Override
+  public String getOperatorName() {
+    return getClass().getSimpleName();
+  }
+
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public List<Operator> getChildOperators() {
+    return Collections.singletonList(_filterOperator);
+  }
+
+  @Override
+  protected IntermediateResultsBlock getNextBlock() {
+    long count = _filterOperator.getNumMatchingDocs();
+    List<Object> aggregates = new ArrayList<>(1);
+    aggregates.add(count);
+    _docsCounted += count;
+    return new IntermediateResultsBlock(_aggregationFunctions, aggregates, 
false);
+  }
+
+  @Override
+  public ExecutionStatistics getExecutionStatistics() {
+    // fabricate the number of docs scanned to keep compatibility tests happy 
for now, but this should be set to zero
+    return new ExecutionStatistics(_docsCounted, 0, 0,
+        _segmentMetadata.getTotalDocs());
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
index 4ff6080..7ee09c3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
@@ -33,6 +33,7 @@ import 
org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.operator.filter.BaseFilterOperator;
 import org.apache.pinot.core.operator.filter.CombinedFilterOperator;
 import org.apache.pinot.core.operator.query.AggregationOperator;
+import org.apache.pinot.core.operator.query.FastFilteredCountOperator;
 import org.apache.pinot.core.operator.query.FilteredAggregationOperator;
 import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
 import org.apache.pinot.core.operator.transform.TransformOperator;
@@ -177,7 +178,10 @@ public class AggregationPlanNode implements PlanNode {
     FilterPlanNode filterPlanNode = new FilterPlanNode(_indexSegment, 
_queryContext);
     BaseFilterOperator filterOperator = filterPlanNode.run();
 
-    // Use metadata/dictionary to solve the query if possible
+    if (canOptimizeFilteredCount(filterOperator, aggregationFunctions)) {
+      return new FastFilteredCountOperator(aggregationFunctions, 
filterOperator, _indexSegment.getSegmentMetadata());
+    }
+
     if (filterOperator.isResultMatchingAll()) {
       if (isFitForNonScanBasedPlan(aggregationFunctions, _indexSegment)) {
         DataSource[] dataSources = new DataSource[aggregationFunctions.length];
@@ -253,4 +257,10 @@ public class AggregationPlanNode implements PlanNode {
     }
     return true;
   }
+
+  private static boolean canOptimizeFilteredCount(BaseFilterOperator 
filterOperator,
+      AggregationFunction[] aggregationFunctions) {
+    return (aggregationFunctions.length == 1 && 
aggregationFunctions[0].getType() == COUNT)
+        && filterOperator.canOptimizeCount();
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/operator/filter/BitmapCollectionTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/filter/BitmapCollectionTest.java
new file mode 100644
index 0000000..e7985a0
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/filter/BitmapCollectionTest.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class BitmapCollectionTest {
+
+  @DataProvider
+  public static Object[][] andCardinalityTestCases() {
+    return new Object[][]{
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), false,
+            ImmutableRoaringBitmap.bitmapOf(0, 4), false, 1
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), false,
+            ImmutableRoaringBitmap.bitmapOf(1, 4), false, 0
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), false,
+            ImmutableRoaringBitmap.bitmapOf(), false, 0
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(), false,
+            ImmutableRoaringBitmap.bitmapOf(0, 5), false, 0
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(), false,
+            ImmutableRoaringBitmap.bitmapOf(), false, 0
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), true,
+            ImmutableRoaringBitmap.bitmapOf(0, 4), false, 1
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), true,
+            ImmutableRoaringBitmap.bitmapOf(1, 4), false, 2
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), true,
+            ImmutableRoaringBitmap.bitmapOf(), false, 0
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(), true,
+            ImmutableRoaringBitmap.bitmapOf(0, 5), false, 2
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(), true,
+            ImmutableRoaringBitmap.bitmapOf(), false, 0
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), false,
+            ImmutableRoaringBitmap.bitmapOf(0, 4), true, 1
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), false,
+            ImmutableRoaringBitmap.bitmapOf(1, 4), true, 2
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), false,
+            ImmutableRoaringBitmap.bitmapOf(), true, 2
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(), false,
+            ImmutableRoaringBitmap.bitmapOf(), true, 0
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(), false,
+            ImmutableRoaringBitmap.bitmapOf(0, 5), true, 0
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), true,
+            ImmutableRoaringBitmap.bitmapOf(0, 4), true, 7
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), true,
+            ImmutableRoaringBitmap.bitmapOf(1, 4), true, 6
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), true,
+            ImmutableRoaringBitmap.bitmapOf(), true, 8
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(), true,
+            ImmutableRoaringBitmap.bitmapOf(0, 5), true, 8
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(), true,
+            ImmutableRoaringBitmap.bitmapOf(), true, 10
+        },
+    };
+  }
+
+  @Test(dataProvider = "andCardinalityTestCases")
+  public void testAndCardinality(int numDocs, ImmutableRoaringBitmap left, 
boolean leftInverted,
+      ImmutableRoaringBitmap right, boolean rightInverted, int expected) {
+    assertEquals(new BitmapCollection(numDocs, leftInverted, 
left).andCardinality(
+        new BitmapCollection(numDocs, rightInverted, right)), expected);
+    assertEquals(new BitmapCollection(numDocs, leftInverted, 
split(left)).andCardinality(
+        new BitmapCollection(numDocs, rightInverted, right)), expected);
+    assertEquals(new BitmapCollection(numDocs, leftInverted, 
left).andCardinality(
+        new BitmapCollection(numDocs, rightInverted, split(right))), expected);
+    assertEquals(new BitmapCollection(numDocs, leftInverted, 
split(left)).andCardinality(
+        new BitmapCollection(numDocs, rightInverted, split(right))), expected);
+  }
+
+  @DataProvider
+  public static Object[][] orCardinalityTestCases() {
+    return new Object[][]{
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), false,
+            ImmutableRoaringBitmap.bitmapOf(0, 4), false, 3
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), false,
+            ImmutableRoaringBitmap.bitmapOf(1, 4), false, 4
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), false,
+            ImmutableRoaringBitmap.bitmapOf(), false, 2
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(), false,
+            ImmutableRoaringBitmap.bitmapOf(0, 5), false, 2
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(), false,
+            ImmutableRoaringBitmap.bitmapOf(), false, 0
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), true,
+            ImmutableRoaringBitmap.bitmapOf(0, 4), false, 9
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), true,
+            ImmutableRoaringBitmap.bitmapOf(1, 4), false, 8
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), true,
+            ImmutableRoaringBitmap.bitmapOf(), false, 8
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(), true,
+            ImmutableRoaringBitmap.bitmapOf(0, 5), false, 10
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(), true,
+            ImmutableRoaringBitmap.bitmapOf(), false, 10
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), false,
+            ImmutableRoaringBitmap.bitmapOf(0, 4), true, 7
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), false,
+            ImmutableRoaringBitmap.bitmapOf(1, 4), true, 8
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), false,
+            ImmutableRoaringBitmap.bitmapOf(), true, 10
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(), false,
+            ImmutableRoaringBitmap.bitmapOf(0, 5), true, 8
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(), false,
+            ImmutableRoaringBitmap.bitmapOf(), true, 10
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), true,
+            ImmutableRoaringBitmap.bitmapOf(0, 4), true, 9
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), true,
+            ImmutableRoaringBitmap.bitmapOf(1, 4), true, 10
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(0, 5), true,
+            ImmutableRoaringBitmap.bitmapOf(), true, 10
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(), true,
+            ImmutableRoaringBitmap.bitmapOf(0, 5), true, 10
+        },
+        {
+            10, ImmutableRoaringBitmap.bitmapOf(), true,
+            ImmutableRoaringBitmap.bitmapOf(), true, 10
+        },
+    };
+  }
+
+  @Test(dataProvider = "orCardinalityTestCases")
+  public void testOrCardinality(int numDocs, ImmutableRoaringBitmap left, 
boolean leftInverted,
+      ImmutableRoaringBitmap right, boolean rightInverted, int expected) {
+    assertEquals(new BitmapCollection(numDocs, leftInverted, 
left).orCardinality(
+        new BitmapCollection(numDocs, rightInverted, right)), expected);
+    assertEquals(new BitmapCollection(numDocs, leftInverted, 
split(left)).orCardinality(
+        new BitmapCollection(numDocs, rightInverted, right)), expected);
+    assertEquals(new BitmapCollection(numDocs, leftInverted, 
left).orCardinality(
+        new BitmapCollection(numDocs, rightInverted, split(right))), expected);
+    assertEquals(new BitmapCollection(numDocs, leftInverted, 
split(left)).orCardinality(
+        new BitmapCollection(numDocs, rightInverted, split(right))), expected);
+  }
+
+  private ImmutableRoaringBitmap[] split(ImmutableRoaringBitmap bitmap) {
+    if (bitmap.isEmpty()) {
+      return new ImmutableRoaringBitmap[]{bitmap};
+    }
+    ImmutableRoaringBitmap[] split = new ImmutableRoaringBitmap[2];
+    split[0] = bitmap;
+    split[1] = ImmutableRoaringBitmap.bitmapOf(bitmap.last());
+    return split;
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index 2bf354a..a422273 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
 import org.apache.pinot.core.operator.query.AggregationOperator;
+import org.apache.pinot.core.operator.query.FastFilteredCountOperator;
 import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
 import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -162,14 +163,15 @@ public class 
MetadataAndDictionaryAggregationPlanMakerTest {
     entries.add(new Object[]{
         "select * from testTable where daysSinceEpoch > 100", 
SelectionOnlyOperator.class, SelectionOnlyOperator.class
     });
-    // COUNT from metadata
+    // COUNT from metadata (via fast filtered count which knows the number of 
docs in the segment)
     entries.add(new Object[]{
-        "select count(*) from testTable", 
NonScanBasedAggregationOperator.class, AggregationOperator.class
+        "select count(*) from testTable", FastFilteredCountOperator.class,
+        FastFilteredCountOperator.class
     });
     // COUNT from metadata with match all filter
     entries.add(new Object[]{
-        "select count(*) from testTable where column1 > 10", 
NonScanBasedAggregationOperator.class,
-        AggregationOperator.class
+        "select count(*) from testTable where column1 > 10", 
FastFilteredCountOperator.class,
+        FastFilteredCountOperator.class
     });
     // MIN/MAX from dictionary
     entries.add(new Object[]{
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
index 617ffd7..b5b85ed 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
@@ -502,7 +502,8 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest 
{
     List<Object[]> result1 = new ArrayList<>();
     result1.add(new Object[]{"BROKER_REDUCE(limit:10)", 0, -1});
     result1.add(new Object[]{"COMBINE_AGGREGATE", 1, 0});
-    result1.add(new Object[]{"AGGREGATE_NO_SCAN", 2, 1});
+    result1.add(new Object[]{"FAST_FILTERED_COUNT", 2, 1});
+    result1.add(new Object[]{"FILTER_MATCH_ENTIRE_SEGMENT(docs:3)", 3, 2});
     check(query1, new ResultTable(DATA_SCHEMA, result1));
 
     String query2 = "EXPLAIN PLAN FOR SELECT min(invertedIndexCol1) FROM 
testTable";
@@ -560,43 +561,52 @@ public class ExplainPlanQueriesTest extends 
BaseQueriesTest {
     List<Object[]> result1 = new ArrayList<>();
     result1.add(new Object[]{"BROKER_REDUCE(limit:10)", 0, -1});
     result1.add(new Object[]{"COMBINE_AGGREGATE", 1, 0});
-    result1.add(new Object[]{"AGGREGATE(aggregations:count(*))", 2, 1});
-    result1.add(new Object[]{"TRANSFORM_PASSTHROUGH()", 3, 2});
-    result1.add(new Object[]{"PROJECT()", 4, 3});
-    result1.add(new Object[]{
-        
"FILTER_SORTED_INDEX(indexLookUp:sorted_index,operator:EQ,predicate:invertedIndexCol3
 = " + "'mickey')", 5, 4});
+    result1.add(new Object[]{"FAST_FILTERED_COUNT", 2, 1});
+    result1.add(new 
Object[]{"FILTER_SORTED_INDEX(indexLookUp:sorted_index,operator:EQ,predicate:invertedIndexCol3
 "
+        + "= 'mickey')", 3, 2});
     check(query1, new ResultTable(DATA_SCHEMA, result1));
 
-    String query2 =
-        "EXPLAIN PLAN FOR SELECT count(*), max(noIndexCol1), sum(noIndexCol2), 
avg(noIndexCol3) FROM testTable WHERE "
-            + "invertedIndexCol1 = 1.1 OR noIndexCol1 = 20";
+    String query2 = "EXPLAIN PLAN FOR SELECT sum(noIndexCol2) FROM testTable 
WHERE invertedIndexCol3 = 'mickey'";
     List<Object[]> result2 = new ArrayList<>();
     result2.add(new Object[]{"BROKER_REDUCE(limit:10)", 0, -1});
     result2.add(new Object[]{"COMBINE_AGGREGATE", 1, 0});
-    result2.add(
-        new Object[]{"AGGREGATE(aggregations:count(*), max(noIndexCol1), 
sum(noIndexCol2), avg(noIndexCol3))", 2, 1});
-    result2.add(new Object[]{"TRANSFORM_PASSTHROUGH(noIndexCol1, noIndexCol2, 
noIndexCol3)", 3, 2});
-    result2.add(new Object[]{"PROJECT(noIndexCol3, noIndexCol2, noIndexCol1)", 
4, 3});
+    result2.add(new Object[]{"AGGREGATE(aggregations:sum(noIndexCol2))", 2, 
1});
+    result2.add(new Object[]{"TRANSFORM_PASSTHROUGH(noIndexCol2)", 3, 2});
+    result2.add(new Object[]{"PROJECT(noIndexCol2)", 4, 3});
     result2.add(new Object[]{
-        
"FILTER_INVERTED_INDEX(indexLookUp:inverted_index,operator:EQ,predicate:invertedIndexCol1
 = '1"
-            + ".1')", 5, 4});
+        
"FILTER_SORTED_INDEX(indexLookUp:sorted_index,operator:EQ,predicate:invertedIndexCol3
 = " + "'mickey')", 5, 4});
     check(query2, new ResultTable(DATA_SCHEMA, result2));
 
-    // Use a Transform function in filter on an indexed column.
-    String query3 = "EXPLAIN PLAN FOR SELECT invertedIndexCol3 FROM testTable 
WHERE concat (invertedIndexCol3, 'test',"
-        + "'-') = 'mickey-test' OR invertedIndexCol1 = 1.1";
+    String query3 =
+        "EXPLAIN PLAN FOR SELECT count(*), max(noIndexCol1), sum(noIndexCol2), 
avg(noIndexCol3) FROM testTable WHERE "
+            + "invertedIndexCol1 = 1.1 OR noIndexCol1 = 20";
     List<Object[]> result3 = new ArrayList<>();
     result3.add(new Object[]{"BROKER_REDUCE(limit:10)", 0, -1});
-    result3.add(new Object[]{"COMBINE_SELECT", 1, 0});
-    result3.add(new Object[]{"SELECT(selectList:invertedIndexCol3)", 2, 1});
-    result3.add(new Object[]{"TRANSFORM_PASSTHROUGH(invertedIndexCol3)", 3, 
2});
-    result3.add(new Object[]{"PROJECT(invertedIndexCol3)", 4, 3});
-    result3.add(new Object[]{"FILTER_OR", 5, 4});
+    result3.add(new Object[]{"COMBINE_AGGREGATE", 1, 0});
+    result3.add(
+        new Object[]{"AGGREGATE(aggregations:count(*), max(noIndexCol1), 
sum(noIndexCol2), avg(noIndexCol3))", 2, 1});
+    result3.add(new Object[]{"TRANSFORM_PASSTHROUGH(noIndexCol1, noIndexCol2, 
noIndexCol3)", 3, 2});
+    result3.add(new Object[]{"PROJECT(noIndexCol3, noIndexCol2, noIndexCol1)", 
4, 3});
     result3.add(new Object[]{
+        
"FILTER_INVERTED_INDEX(indexLookUp:inverted_index,operator:EQ,predicate:invertedIndexCol1
 = '1"
+            + ".1')", 5, 4});
+    check(query3, new ResultTable(DATA_SCHEMA, result3));
+
+    // Use a Transform function in filter on an indexed column.
+    String query4 = "EXPLAIN PLAN FOR SELECT invertedIndexCol3 FROM testTable 
WHERE concat (invertedIndexCol3, 'test',"
+        + "'-') = 'mickey-test' OR invertedIndexCol1 = 1.1";
+    List<Object[]> result4 = new ArrayList<>();
+    result4.add(new Object[]{"BROKER_REDUCE(limit:10)", 0, -1});
+    result4.add(new Object[]{"COMBINE_SELECT", 1, 0});
+    result4.add(new Object[]{"SELECT(selectList:invertedIndexCol3)", 2, 1});
+    result4.add(new Object[]{"TRANSFORM_PASSTHROUGH(invertedIndexCol3)", 3, 
2});
+    result4.add(new Object[]{"PROJECT(invertedIndexCol3)", 4, 3});
+    result4.add(new Object[]{"FILTER_OR", 5, 4});
+    result4.add(new Object[]{
         
"FILTER_EXPRESSION(operator:EQ,predicate:concat(invertedIndexCol3,'test','-') = 
" + "'mickey-test')", 6, 5});
-    result3.add(new 
Object[]{"FILTER_INVERTED_INDEX(indexLookUp:inverted_index,operator:EQ,"
+    result4.add(new 
Object[]{"FILTER_INVERTED_INDEX(indexLookUp:inverted_index,operator:EQ,"
         + "predicate:invertedIndexCol1 = '1.1')", 7, 5});
-    check(query3, new ResultTable(DATA_SCHEMA, result3));
+    check(query4, new ResultTable(DATA_SCHEMA, result4));
   }
 
   @Test
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/FastFilteredCountTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/FastFilteredCountTest.java
new file mode 100644
index 0000000..7771476
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/FastFilteredCountTest.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.IntStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.query.FastFilteredCountOperator;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+public class FastFilteredCountTest extends BaseQueriesTest {
+
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"FastFilteredCountTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+
+  private static final int NUM_RECORDS = 1000;
+  private static final int BUCKET_SIZE = 8;
+  private static final String SORTED_COLUMN = "sorted";
+  private static final String CLASSIFICATION_COLUMN = "class";
+  private static final String TEXT_COLUMN = "textCol";
+  private static final String JSON_COLUMN = "jsonCol";
+
+  private static final Schema SCHEMA = new Schema.SchemaBuilder()
+      .addSingleValueDimension(SORTED_COLUMN, FieldSpec.DataType.INT)
+      .addSingleValueDimension(CLASSIFICATION_COLUMN, FieldSpec.DataType.INT)
+      .addSingleValueDimension(TEXT_COLUMN, FieldSpec.DataType.STRING)
+      .addSingleValueDimension(JSON_COLUMN, FieldSpec.DataType.JSON)
+      .build();
+
+  private static final TableConfig TABLE_CONFIG =
+      new TableConfigBuilder(TableType.OFFLINE)
+          .setTableName(RAW_TABLE_NAME)
+          .setSortedColumn(SORTED_COLUMN)
+          .build();
+
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    return "";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteQuietly(INDEX_DIR);
+
+    List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      GenericRow record = new GenericRow();
+      record.putValue(CLASSIFICATION_COLUMN, i % BUCKET_SIZE);
+      record.putValue(SORTED_COLUMN, i);
+      record.putValue(TEXT_COLUMN, "text" + (i % BUCKET_SIZE));
+      record.putValue(JSON_COLUMN, "{\"field\":" + (i % BUCKET_SIZE) + "}");
+      records.add(record);
+    }
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+    driver.build();
+
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+    indexLoadingConfig.setInvertedIndexColumns(new 
HashSet<>(Arrays.asList(CLASSIFICATION_COLUMN, SORTED_COLUMN)));
+    indexLoadingConfig.setTextIndexColumns(Collections.singleton(TEXT_COLUMN));
+    indexLoadingConfig.setJsonIndexColumns(Collections.singleton(JSON_COLUMN));
+
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, SEGMENT_NAME),
+        indexLoadingConfig);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    FileUtils.deleteDirectory(INDEX_DIR);
+  }
+
+  @DataProvider
+  public Object[][] testCases() {
+    int bucketCount = NUM_RECORDS / BUCKET_SIZE;
+    int bucketCountComplement = NUM_RECORDS - NUM_RECORDS / BUCKET_SIZE;
+    int min = 20;
+    int max = NUM_RECORDS - 20;
+    String allBuckets = Arrays.toString(IntStream.range(0, 
BUCKET_SIZE).toArray())
+        .replace('[', '(').replace(']', ')');
+    String twoBuckets = Arrays.toString(new int[] {0, 7})
+        .replace('[', '(').replace(']', ')');
+    return new Object[][] {
+        {"select count(*) from " + RAW_TABLE_NAME, NUM_RECORDS},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + CLASSIFICATION_COLUMN + " = 1", bucketCount},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where JSON_MATCH(" + JSON_COLUMN + ", '\"$.field\"=1')", 
bucketCount},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where NOT JSON_MATCH(" + JSON_COLUMN + ", '\"$.field\"=1')", 
bucketCountComplement},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where TEXT_MATCH(" + TEXT_COLUMN + ", 'text1')", bucketCount},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where NOT TEXT_MATCH(" + TEXT_COLUMN + ", 'text1')", 
bucketCountComplement},
+        {"select count(*) from " + RAW_TABLE_NAME + " where " + SORTED_COLUMN 
+ " = 1", 1},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + SORTED_COLUMN + " between " + min + " and " + max, 
max - min + 1},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + SORTED_COLUMN + " not between " + min + " and " + 
max, NUM_RECORDS - (max - min + 1)},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + SORTED_COLUMN + " in " + allBuckets, BUCKET_SIZE},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + SORTED_COLUMN + " in " + allBuckets
+            + " and " + CLASSIFICATION_COLUMN + " in " + allBuckets, 
BUCKET_SIZE},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + CLASSIFICATION_COLUMN + " <> 1", 
bucketCountComplement},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + CLASSIFICATION_COLUMN + " in " + twoBuckets, 2 * 
bucketCount},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + CLASSIFICATION_COLUMN + " not in " + twoBuckets, 
NUM_RECORDS - 2 * bucketCount},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + CLASSIFICATION_COLUMN + " in " + twoBuckets
+            + " and " + SORTED_COLUMN + " < " + (NUM_RECORDS / 2), 
bucketCount},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + SORTED_COLUMN + " = 1"
+            + " and " + CLASSIFICATION_COLUMN + " = 1", 1},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + SORTED_COLUMN + " = 1"
+            + " and " + CLASSIFICATION_COLUMN + " <> 1", 0},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + SORTED_COLUMN + " = 1"
+            + " and " + CLASSIFICATION_COLUMN + " <> 0", 1},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where TEXT_MATCH(" + TEXT_COLUMN + ", 'text0')"
+            + " and " + CLASSIFICATION_COLUMN + " <> 1", bucketCount},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where TEXT_MATCH(" + TEXT_COLUMN + ", 'text0')"
+            + " or " + CLASSIFICATION_COLUMN + " <> 1", bucketCountComplement},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where TEXT_MATCH(" + TEXT_COLUMN + ", 'text0')"
+            + " or " + CLASSIFICATION_COLUMN + " = 1", 2 * bucketCount},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where not TEXT_MATCH(" + TEXT_COLUMN + ", 'text0')"
+            + " or " + CLASSIFICATION_COLUMN + " = 1", bucketCountComplement},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where TEXT_MATCH(" + TEXT_COLUMN + ", 'text0')"
+            + " or JSON_MATCH(" + JSON_COLUMN + ", '\"$.field\"=1')"
+            + " or " + CLASSIFICATION_COLUMN + " = 2", 3 * bucketCount},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where not TEXT_MATCH(" + TEXT_COLUMN + ", 'text0')"
+            + " or not JSON_MATCH(" + JSON_COLUMN + ", '\"$.field\"=0')"
+            + " or " + CLASSIFICATION_COLUMN + " <> 0", bucketCountComplement},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where TEXT_MATCH(" + TEXT_COLUMN + ", 'text0')"
+            + " or JSON_MATCH(" + JSON_COLUMN + ", '\"$.field\"=1')"
+            + " or " + CLASSIFICATION_COLUMN + " <> 2", bucketCountComplement},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where not TEXT_MATCH(" + TEXT_COLUMN + ", 'text0')"
+            + " or not JSON_MATCH(" + JSON_COLUMN + ", '\"$.field\"=1')"
+            + " or " + CLASSIFICATION_COLUMN + " <> 2", NUM_RECORDS},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where not TEXT_MATCH(" + TEXT_COLUMN + ", 'text0')"
+            + " or JSON_MATCH(" + JSON_COLUMN + ", '\"$.field\"=1')"
+            + " or " + CLASSIFICATION_COLUMN + " <> 2", NUM_RECORDS},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where not TEXT_MATCH(" + TEXT_COLUMN + ", 'text0')"
+            + " or JSON_MATCH(" + JSON_COLUMN + ", '\"$.field\"=1')"
+            + " or " + CLASSIFICATION_COLUMN + " = 0", NUM_RECORDS},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + SORTED_COLUMN + " <> 1"
+            + " and " + CLASSIFICATION_COLUMN + " = 1", bucketCount - 1},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + SORTED_COLUMN + " >= 0"
+            + " and " + CLASSIFICATION_COLUMN + " = 1", bucketCount},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + SORTED_COLUMN + " > 1"
+            + " and " + CLASSIFICATION_COLUMN + " = 1", bucketCount - 1},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + SORTED_COLUMN + " >= 0"
+            + " and " + CLASSIFICATION_COLUMN + " <> 1", 
bucketCountComplement},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where not TEXT_MATCH(" + TEXT_COLUMN + ", 'text0')"
+            + " and " + CLASSIFICATION_COLUMN + " <> 1", NUM_RECORDS - 2 * 
bucketCount},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where not TEXT_MATCH(" + TEXT_COLUMN + ", 'text0')"
+            + " or " + CLASSIFICATION_COLUMN + " <> 1", NUM_RECORDS},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where not TEXT_MATCH(" + TEXT_COLUMN + ", 'text0')"
+            + " or " + CLASSIFICATION_COLUMN + " <> 0", bucketCountComplement},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + SORTED_COLUMN + " >= 0"
+            + " or " + CLASSIFICATION_COLUMN + " <> 0", NUM_RECORDS},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where TEXT_MATCH(" + TEXT_COLUMN + ",  'text0')"
+            + " and " + SORTED_COLUMN + " <> 1", bucketCount},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where TEXT_MATCH(" + TEXT_COLUMN + ",  'text1')"
+            + " and " + SORTED_COLUMN + " <> 1", bucketCount - 1},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where TEXT_MATCH(" + TEXT_COLUMN + ",  'text0')"
+            + " and " + CLASSIFICATION_COLUMN + " <> 1", bucketCount},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + SORTED_COLUMN + " >= 500"
+            + " and " + CLASSIFICATION_COLUMN + " <> 0"
+            + " and not TEXT_MATCH(" + TEXT_COLUMN + ", 'text0')", 
bucketCountComplement / 2 + 1},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + SORTED_COLUMN + " >= 500"
+            + " and " + CLASSIFICATION_COLUMN + " <> 0"
+            + " and TEXT_MATCH(" + TEXT_COLUMN + ", 'text0')", 0},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + SORTED_COLUMN + " < " + bucketCount
+            + " and " + CLASSIFICATION_COLUMN + " <> 0", bucketCount - 
bucketCount / BUCKET_SIZE - 1},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + SORTED_COLUMN + " >= " + bucketCount
+            + " and " + CLASSIFICATION_COLUMN + " <> 0", bucketCountComplement 
- bucketCountComplement / BUCKET_SIZE},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + SORTED_COLUMN + " < " + (BUCKET_SIZE - 1)
+            + " and " + CLASSIFICATION_COLUMN + " = " + (BUCKET_SIZE - 1), 0},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + SORTED_COLUMN + " >= " + (BUCKET_SIZE - 2)
+            + " and " + CLASSIFICATION_COLUMN + " = " + (BUCKET_SIZE - 2), 
bucketCount},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + SORTED_COLUMN + " >= " + min
+            + " and " + SORTED_COLUMN + " < " + max
+            + " and " + CLASSIFICATION_COLUMN + " = 0", bucketCount - (min + 
NUM_RECORDS - max) / BUCKET_SIZE},
+        {"select count(*) from " + RAW_TABLE_NAME
+            + " where " + SORTED_COLUMN + " >= 500"
+            + " and " + CLASSIFICATION_COLUMN + " <> 0"
+            + " and not JSON_MATCH(" + JSON_COLUMN + ", '\"$.field\"=0')"
+            + " and not TEXT_MATCH(" + TEXT_COLUMN + ", 'text0')", 
bucketCountComplement / 2 + 1}
+    };
+  }
+
+  @Test(dataProvider = "testCases")
+  public void test(String query, int expectedCount) {
+    Operator<?> operator = getOperatorForSqlQuery(query);
+    assertTrue(operator instanceof FastFilteredCountOperator);
+    List<Object> aggregationResult = ((FastFilteredCountOperator) 
operator).nextBlock().getAggregationResult();
+    assertNotNull(aggregationResult);
+    assertEquals(aggregationResult.size(), 1);
+    assertEquals(((Number) aggregationResult.get(0)).intValue(), 
expectedCount, query);
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/NotOperatorQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/NotOperatorQueriesTest.java
index ccb9626..3bc6c6c 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/NotOperatorQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/NotOperatorQueriesTest.java
@@ -25,7 +25,8 @@ import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
-import org.apache.pinot.core.operator.query.AggregationOperator;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -136,7 +137,7 @@ public class NotOperatorQueriesTest extends BaseQueriesTest 
{
 
   private void testNotOperator(String filter, long expectedSegmentResult) {
     String query = "SELECT COUNT(*) FROM testTable WHERE " + filter;
-    AggregationOperator aggregationOperator = getOperatorForSqlQuery(query);
+    BaseOperator<IntermediateResultsBlock> aggregationOperator = 
getOperatorForSqlQuery(query);
     List<Object> segmentResults = 
aggregationOperator.nextBlock().getAggregationResult();
     assertNotNull(segmentResults);
     assertEquals((long) segmentResults.get(0), expectedSegmentResult);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
index 46b8878..b970555 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
@@ -56,8 +56,8 @@ import 
org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.response.broker.SelectionResults;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
-import org.apache.pinot.core.operator.query.AggregationOperator;
 import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndex;
@@ -1693,7 +1693,7 @@ public class TextSearchQueriesTest extends 
BaseQueriesTest {
   }
 
   private void testTextSearchAggregationQueryHelper(String query, int 
expectedCount) {
-    AggregationOperator operator = getOperatorForPqlQuery(query);
+    BaseOperator<IntermediateResultsBlock> operator = 
getOperatorForPqlQuery(query);
     IntermediateResultsBlock operatorResult = operator.nextBlock();
     long count = (Long) operatorResult.getAggregationResult().get(0);
     Assert.assertEquals(expectedCount, count);
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueries.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueries.java
index 0d13c6f..6f61b19 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueries.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueries.java
@@ -85,6 +85,7 @@ public class BenchmarkQueries extends BaseQueriesTest {
   private static final String FIRST_SEGMENT_NAME = "firstTestSegment";
   private static final String SECOND_SEGMENT_NAME = "secondTestSegment";
   private static final String INT_COL_NAME = "INT_COL";
+  private static final String SORTED_COL_NAME = "SORTED_COL";
   private static final String RAW_INT_COL_NAME = "RAW_INT_COL";
   private static final String RAW_STRING_COL_NAME = "RAW_STRING_COL";
   private static final String NO_INDEX_INT_COL_NAME = "NO_INDEX_INT_COL";
@@ -130,6 +131,21 @@ public class BenchmarkQueries extends BaseQueriesTest {
       + "year(INT_COL) as y, month(INT_COL) as m "
       + "from MyTable group by y, m";
 
+  public static final String COUNT_OVER_BITMAP_INDEX_IN = "SELECT COUNT(*) 
FROM MyTable "
+      + "WHERE INT_COL IN (0, 1, 2, 3, 4, 5, 7, 9, 10)";
+
+  public static final String COUNT_OVER_BITMAP_INDEX_EQUALS = "SELECT COUNT(*) 
FROM MyTable "
+      + "WHERE LOW_CARDINALITY_STRING_COL = 'value1'";
+
+  public static final String COUNT_OVER_BITMAP_INDEXES = "SELECT COUNT(*) FROM 
MyTable "
+      + "WHERE INT_COL IN (0, 1, 2, 3, 4, 5, 7, 9, 10) "
+      + "AND LOW_CARDINALITY_STRING_COL = 'value1' ";
+
+  public static final String COUNT_OVER_BITMAP_AND_SORTED_INDEXES = "SELECT 
COUNT(*) FROM MyTable "
+      + "WHERE INT_COL IN (0, 1, 2, 3, 4, 5, 7, 9, 10) "
+      + "AND LOW_CARDINALITY_STRING_COL = 'value1' "
+      + "AND SORTED_COL BETWEEN 10 and 50";
+
   public static final String RAW_COLUMN_SUMMARY_STATS = "SELECT "
       + "MIN(RAW_INT_COL), MAX(RAW_INT_COL), COUNT(*) "
       + "FROM MyTable";
@@ -141,7 +157,8 @@ public class BenchmarkQueries extends BaseQueriesTest {
   @Param({
       MULTI_GROUP_BY_WITH_RAW_QUERY, MULTI_GROUP_BY_WITH_RAW_QUERY_2, 
FILTERED_QUERY, NON_FILTERED_QUERY,
       SUM_QUERY, NO_INDEX_LIKE_QUERY, MULTI_GROUP_BY_ORDER_BY, 
MULTI_GROUP_BY_ORDER_BY_LOW_HIGH, TIME_GROUP_BY,
-      RAW_COLUMN_SUMMARY_STATS
+      RAW_COLUMN_SUMMARY_STATS, COUNT_OVER_BITMAP_INDEX_IN, 
COUNT_OVER_BITMAP_INDEXES,
+      COUNT_OVER_BITMAP_AND_SORTED_INDEXES, COUNT_OVER_BITMAP_INDEX_EQUALS
   })
   String _query;
   private IndexSegment _indexSegment;
@@ -160,6 +177,7 @@ public class BenchmarkQueries extends BaseQueriesTest {
 
     Set<String> invertedIndexCols = new HashSet<>();
     invertedIndexCols.add(INT_COL_NAME);
+    invertedIndexCols.add(LOW_CARDINALITY_STRING_COL);
 
     indexLoadingConfig.setRangeIndexColumns(invertedIndexCols);
     indexLoadingConfig.setInvertedIndexColumns(invertedIndexCols);
@@ -185,10 +203,11 @@ public class BenchmarkQueries extends BaseQueriesTest {
   private List<GenericRow> createTestData(int numRows) {
     Map<Integer, String> strings = new HashMap<>();
     List<GenericRow> rows = new ArrayList<>();
-    String[] lowCardinalityValues = IntStream.range(0, 10).mapToObj(i -> 
UUID.randomUUID().toString())
+    String[] lowCardinalityValues = IntStream.range(0, 10).mapToObj(i -> 
"value" + i)
         .toArray(String[]::new);
     for (int i = 0; i < numRows; i++) {
       GenericRow row = new GenericRow();
+      row.putValue(SORTED_COL_NAME, numRows - i);
       row.putValue(INT_COL_NAME, (int) _supplier.getAsLong());
       row.putValue(NO_INDEX_INT_COL_NAME, (int) _supplier.getAsLong());
       row.putValue(RAW_INT_COL_NAME, (int) _supplier.getAsLong());
@@ -210,8 +229,10 @@ public class BenchmarkQueries extends BaseQueriesTest {
         .setInvertedIndexColumns(Collections.singletonList(INT_COL_NAME))
         .setFieldConfigList(fieldConfigs)
         .setNoDictionaryColumns(Arrays.asList(RAW_INT_COL_NAME, 
RAW_STRING_COL_NAME))
+        .setSortedColumn(SORTED_COL_NAME)
         .build();
     Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension(SORTED_COL_NAME, FieldSpec.DataType.INT)
         .addSingleValueDimension(NO_INDEX_INT_COL_NAME, FieldSpec.DataType.INT)
         .addSingleValueDimension(RAW_INT_COL_NAME, FieldSpec.DataType.INT)
         .addSingleValueDimension(INT_COL_NAME, FieldSpec.DataType.INT)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to