This is an automated email from the ASF dual-hosted git repository.

richardstartin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d2ca07e  Combine Metadata and Dictionary based plans into single 
operator (#8408)
d2ca07e is described below

commit d2ca07ee57edb8833ef83e4c2bf40915f1e4879e
Author: Richard Startin <rich...@startree.ai>
AuthorDate: Tue Mar 29 22:35:23 2022 +0100

    Combine Metadata and Dictionary based plans into single operator (#8408)
    
    * satisfy queries using datasource metadata when convenient to
    
    * only us metadata if min and max value are non-null
    
    * merge Metadata and Dictionary based plans into DataSource based plan
    
    * rename operator + review comments
    
    * update test comments
    
    * fix exception message
---
 .../query/MetadataBasedAggregationOperator.java    | 94 ----------------------
 ...r.java => NonScanBasedAggregationOperator.java} | 85 ++++++++++++-------
 .../pinot/core/plan/AggregationPlanNode.java       | 73 ++++++++---------
 ...adataAndDictionaryAggregationPlanMakerTest.java | 18 ++---
 .../pinot/queries/DistinctCountQueriesTest.java    | 22 ++---
 .../pinot/queries/ExplainPlanQueriesTest.java      |  4 +-
 ...SegmentPartitionedDistinctCountQueriesTest.java |  6 +-
 ...onaryAggregationPlanClusterIntegrationTest.java | 29 ++++---
 .../org/apache/pinot/perf/BenchmarkQueries.java    |  7 +-
 9 files changed, 136 insertions(+), 202 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java
deleted file mode 100644
index ea40149..0000000
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.operator.query;
-
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.operator.BaseOperator;
-import org.apache.pinot.core.operator.ExecutionStatistics;
-import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.segment.spi.AggregationFunctionType;
-import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.datasource.DataSource;
-
-
-/**
- * Aggregation operator that utilizes metadata for serving aggregation queries.
- */
-@SuppressWarnings("rawtypes")
-public class MetadataBasedAggregationOperator extends 
BaseOperator<IntermediateResultsBlock> {
-  private static final String OPERATOR_NAME = 
"MetadataBasedAggregationOperator";
-  private static final String EXPLAIN_NAME = "AGGREGATE_METADATA";
-
-  private final AggregationFunction[] _aggregationFunctions;
-  private final SegmentMetadata _segmentMetadata;
-  private final Map<String, DataSource> _dataSourceMap;
-
-  public MetadataBasedAggregationOperator(AggregationFunction[] 
aggregationFunctions, SegmentMetadata segmentMetadata,
-      Map<String, DataSource> dataSourceMap) {
-    _aggregationFunctions = aggregationFunctions;
-    _segmentMetadata = segmentMetadata;
-
-    // Datasource is currently not used, but will start getting used as we add 
support for aggregation
-    // functions other than count(*).
-    _dataSourceMap = dataSourceMap;
-  }
-
-  @Override
-  protected IntermediateResultsBlock getNextBlock() {
-    int numAggregationFunctions = _aggregationFunctions.length;
-    List<Object> aggregationResults = new ArrayList<>(numAggregationFunctions);
-    long numTotalDocs = _segmentMetadata.getTotalDocs();
-    for (AggregationFunction aggregationFunction : _aggregationFunctions) {
-      Preconditions.checkState(aggregationFunction.getType() == 
AggregationFunctionType.COUNT,
-          "Metadata based aggregation operator does not support function type: 
" + aggregationFunction.getType());
-      aggregationResults.add(numTotalDocs);
-    }
-
-    // Build intermediate result block based on aggregation result from the 
executor.
-    return new IntermediateResultsBlock(_aggregationFunctions, 
aggregationResults, false);
-  }
-
-  @Override
-  public String getOperatorName() {
-    return OPERATOR_NAME;
-  }
-
-  @Override
-  public String toExplainString() {
-    return EXPLAIN_NAME;
-  }
-
-  @Override
-  public List<Operator> getChildOperators() {
-    return Collections.emptyList();
-  }
-
-  @Override
-  public ExecutionStatistics getExecutionStatistics() {
-    // NOTE: Set numDocsScanned to numTotalDocs for backward compatibility.
-    int numTotalDocs = _segmentMetadata.getTotalDocs();
-    return new ExecutionStatistics(numTotalDocs, 0, 0, numTotalDocs);
-  }
-}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
similarity index 69%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
index 3f0dd87..6408a01 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/NonScanBasedAggregationOperator.java
@@ -27,9 +27,8 @@ import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
-import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.BaseOperator;
@@ -40,80 +39,90 @@ import 
org.apache.pinot.core.query.aggregation.function.DistinctCountHLLAggregat
 import 
org.apache.pinot.core.query.aggregation.function.DistinctCountRawHLLAggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.DistinctCountSmartHLLAggregationFunction;
 import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
+import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.utils.ByteArray;
 
 
 /**
- * Aggregation operator that utilizes dictionary for serving aggregation 
queries.
- * The dictionary operator is selected in the plan maker, if the query is of 
aggregation type min, max, minmaxrange
- * and the column has a dictionary.
+ * Aggregation operator that utilizes dictionary or column metadata for 
serving aggregation queries to avoid scanning.
+ * The scanless operator is selected in the plan maker, if the query is of 
aggregation type min, max, minmaxrange,
+ * distinctcount, distinctcounthll, distinctcountrawhll, 
segmentpartitioneddistinctcount, distinctcountsmarthll,
+ * and the column has a dictionary, or has column metadata with min and max 
value defined. It also supports count(*) if
+ * the query has no filter.
  * We don't use this operator if the segment has star tree,
- * as the dictionary will have aggregated values for the metrics, and 
dimensions will have star node value
+ * as the dictionary will have aggregated values for the metrics, and 
dimensions will have star node value.
  *
- * For min value, we use the first value from the dictionary
- * For max value we use the last value from dictionary
+ * For min value, we use the first value from the dictionary, falling back to 
the column metadata min value if there
+ * is no dictionary.
+ * For max value we use the last value from dictionary, falling back to the 
column metadata max value if there
+ * is no dictionary.
  */
 @SuppressWarnings("rawtypes")
-public class DictionaryBasedAggregationOperator extends 
BaseOperator<IntermediateResultsBlock> {
-  private static final String OPERATOR_NAME = 
"DictionaryBasedAggregationOperator";
-  private static final String EXPLAIN_NAME = "AGGREGATE_DICTIONARY";
+public class NonScanBasedAggregationOperator extends 
BaseOperator<IntermediateResultsBlock> {
+  private static final String OPERATOR_NAME = 
NonScanBasedAggregationOperator.class.getSimpleName();
+  private static final String EXPLAIN_NAME = "AGGREGATE_NO_SCAN";
 
   private final AggregationFunction[] _aggregationFunctions;
-  private final Map<String, Dictionary> _dictionaryMap;
+  private final DataSource[] _dataSources;
   private final int _numTotalDocs;
 
-  public DictionaryBasedAggregationOperator(AggregationFunction[] 
aggregationFunctions,
-      Map<String, Dictionary> dictionaryMap, int numTotalDocs) {
+  public NonScanBasedAggregationOperator(AggregationFunction[] 
aggregationFunctions,
+      DataSource[] dataSources, int numTotalDocs) {
     _aggregationFunctions = aggregationFunctions;
-    _dictionaryMap = dictionaryMap;
+    _dataSources = dataSources;
     _numTotalDocs = numTotalDocs;
   }
 
   @Override
   protected IntermediateResultsBlock getNextBlock() {
     List<Object> aggregationResults = new 
ArrayList<>(_aggregationFunctions.length);
-    for (AggregationFunction aggregationFunction : _aggregationFunctions) {
-      String column = ((ExpressionContext) 
aggregationFunction.getInputExpressions().get(0)).getIdentifier();
-      Dictionary dictionary = _dictionaryMap.get(column);
+    for (int i = 0; i < _aggregationFunctions.length; i++) {
+      AggregationFunction aggregationFunction = _aggregationFunctions[i];
+      // note that dataSource will be null for COUNT, sp do not interact with 
it until it's known this isn't a COUNT
+      DataSource dataSource = _dataSources[i];
       Object result;
       switch (aggregationFunction.getType()) {
+        case COUNT:
+          result = (long) _numTotalDocs;
+          break;
         case MIN:
         case MINMV:
-          result = toDouble(dictionary.getMinVal());
+          result = getMinValue(dataSource);
           break;
         case MAX:
         case MAXMV:
-          result = toDouble(dictionary.getMaxVal());
+          result = getMaxValue(dataSource);
           break;
         case MINMAXRANGE:
         case MINMAXRANGEMV:
-          result = new MinMaxRangePair(toDouble(dictionary.getMinVal()), 
toDouble(dictionary.getMaxVal()));
+          result = new MinMaxRangePair(getMinValue(dataSource), 
getMaxValue(dataSource));
           break;
         case DISTINCTCOUNT:
         case DISTINCTCOUNTMV:
-          result = getDistinctValueSet(dictionary);
+          result = 
getDistinctValueSet(Objects.requireNonNull(dataSource.getDictionary()));
           break;
         case DISTINCTCOUNTHLL:
         case DISTINCTCOUNTHLLMV:
-          result = getDistinctCountHLLResult(dictionary, 
(DistinctCountHLLAggregationFunction) aggregationFunction);
+          result = 
getDistinctCountHLLResult(Objects.requireNonNull(dataSource.getDictionary()),
+              (DistinctCountHLLAggregationFunction) aggregationFunction);
           break;
         case DISTINCTCOUNTRAWHLL:
         case DISTINCTCOUNTRAWHLLMV:
-          result = getDistinctCountHLLResult(dictionary,
+          result = 
getDistinctCountHLLResult(Objects.requireNonNull(dataSource.getDictionary()),
               ((DistinctCountRawHLLAggregationFunction) 
aggregationFunction).getDistinctCountHLLAggregationFunction());
           break;
         case SEGMENTPARTITIONEDDISTINCTCOUNT:
-          result = (long) dictionary.length();
+          result = (long) 
Objects.requireNonNull(dataSource.getDictionary()).length();
           break;
         case DISTINCTCOUNTSMARTHLL:
-          result = getDistinctCountSmartHLLResult(dictionary,
+          result = 
getDistinctCountSmartHLLResult(Objects.requireNonNull(dataSource.getDictionary()),
               (DistinctCountSmartHLLAggregationFunction) aggregationFunction);
           break;
         default:
           throw new IllegalStateException(
-              "Dictionary based aggregation operator does not support function 
type: " + aggregationFunction.getType());
+              "Non-scan based aggregation operator does not support function 
type: " + aggregationFunction.getType());
       }
       aggregationResults.add(result);
     }
@@ -122,8 +131,26 @@ public class DictionaryBasedAggregationOperator extends 
BaseOperator<Intermediat
     return new IntermediateResultsBlock(_aggregationFunctions, 
aggregationResults, false);
   }
 
-  private double toDouble(Comparable value) {
-    if (value instanceof Number) {
+  private static Double getMinValue(DataSource dataSource) {
+    Dictionary dictionary = dataSource.getDictionary();
+    if (dictionary != null) {
+      return toDouble(dictionary.getMinVal());
+    }
+    return toDouble(dataSource.getDataSourceMetadata().getMinValue());
+  }
+
+  private static Double getMaxValue(DataSource dataSource) {
+    Dictionary dictionary = dataSource.getDictionary();
+    if (dictionary != null) {
+      return toDouble(dictionary.getMaxVal());
+    }
+    return toDouble(dataSource.getDataSourceMetadata().getMaxValue());
+  }
+
+  private static Double toDouble(Comparable<?> value) {
+    if (value instanceof Double) {
+      return (Double) value;
+    } else if (value instanceof Number) {
       return ((Number) value).doubleValue();
     } else {
       return Double.parseDouble(value.toString());
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
index 8a4deed..4ff6080 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.core.plan;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
@@ -34,9 +33,8 @@ import 
org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.operator.filter.BaseFilterOperator;
 import org.apache.pinot.core.operator.filter.CombinedFilterOperator;
 import org.apache.pinot.core.operator.query.AggregationOperator;
-import org.apache.pinot.core.operator.query.DictionaryBasedAggregationOperator;
 import org.apache.pinot.core.operator.query.FilteredAggregationOperator;
-import org.apache.pinot.core.operator.query.MetadataBasedAggregationOperator;
+import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
 import org.apache.pinot.core.operator.transform.TransformOperator;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
@@ -46,7 +44,7 @@ import org.apache.pinot.core.startree.StarTreeUtils;
 import org.apache.pinot.core.startree.plan.StarTreeTransformPlanNode;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.segment.spi.datasource.DataSource;
 import 
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
 
@@ -60,9 +58,13 @@ import static 
org.apache.pinot.segment.spi.AggregationFunctionType.*;
 @SuppressWarnings("rawtypes")
 public class AggregationPlanNode implements PlanNode {
   private static final EnumSet<AggregationFunctionType> 
DICTIONARY_BASED_FUNCTIONS =
-      EnumSet.of(MIN, MINMV, MAX, MAXMV, MINMAXRANGE, MINMAXRANGEMV, 
DISTINCTCOUNT, DISTINCTCOUNTMV, DISTINCTCOUNTHLL,
-          DISTINCTCOUNTHLLMV, DISTINCTCOUNTRAWHLL, DISTINCTCOUNTRAWHLLMV, 
SEGMENTPARTITIONEDDISTINCTCOUNT,
-          DISTINCTCOUNTSMARTHLL);
+      EnumSet.of(MIN, MINMV, MAX, MAXMV, MINMAXRANGE, MINMAXRANGEMV, 
DISTINCTCOUNT, DISTINCTCOUNTMV,
+          DISTINCTCOUNTHLL, DISTINCTCOUNTHLLMV, DISTINCTCOUNTRAWHLL, 
DISTINCTCOUNTRAWHLLMV,
+          SEGMENTPARTITIONEDDISTINCTCOUNT, DISTINCTCOUNTSMARTHLL);
+
+  // DISTINCTCOUNT excluded because consuming segment metadata contains 
unknown cardinality when there is no dictionary
+  private static final EnumSet<AggregationFunctionType> 
METADATA_BASED_FUNCTIONS =
+      EnumSet.of(COUNT, MIN, MINMV, MAX, MAXMV, MINMAXRANGE, MINMAXRANGEMV);
 
   private final IndexSegment _indexSegment;
   private final QueryContext _queryContext;
@@ -176,18 +178,17 @@ public class AggregationPlanNode implements PlanNode {
     BaseFilterOperator filterOperator = filterPlanNode.run();
 
     // Use metadata/dictionary to solve the query if possible
-    // TODO: Use the same operator for both of them so that COUNT(*), MAX(col) 
can be optimized
     if (filterOperator.isResultMatchingAll()) {
-      if (isFitForMetadataBasedPlan(aggregationFunctions)) {
-        return new MetadataBasedAggregationOperator(aggregationFunctions, 
_indexSegment.getSegmentMetadata(),
-            Collections.emptyMap());
-      } else if (isFitForDictionaryBasedPlan(aggregationFunctions, 
_indexSegment)) {
-        Map<String, Dictionary> dictionaryMap = new HashMap<>();
-        for (AggregationFunction aggregationFunction : aggregationFunctions) {
-          String column = ((ExpressionContext) 
aggregationFunction.getInputExpressions().get(0)).getIdentifier();
-          dictionaryMap.computeIfAbsent(column, k -> 
_indexSegment.getDataSource(k).getDictionary());
+      if (isFitForNonScanBasedPlan(aggregationFunctions, _indexSegment)) {
+        DataSource[] dataSources = new DataSource[aggregationFunctions.length];
+        for (int i = 0; i < aggregationFunctions.length; i++) {
+          List<?> inputExpressions = 
aggregationFunctions[i].getInputExpressions();
+          if (!inputExpressions.isEmpty()) {
+            String column = ((ExpressionContext) 
inputExpressions.get(0)).getIdentifier();
+            dataSources[i] = _indexSegment.getDataSource(column);
+          }
         }
-        return new DictionaryBasedAggregationOperator(aggregationFunctions, 
dictionaryMap, numTotalDocs);
+        return new NonScanBasedAggregationOperator(aggregationFunctions, 
dataSources, numTotalDocs);
       }
     }
 
@@ -223,36 +224,32 @@ public class AggregationPlanNode implements PlanNode {
   }
 
   /**
-   * Returns {@code true} if the given aggregations can be solved with segment 
metadata, {@code false} otherwise.
-   * <p>Aggregations supported: COUNT
-   */
-  private static boolean isFitForMetadataBasedPlan(AggregationFunction[] 
aggregationFunctions) {
-    for (AggregationFunction aggregationFunction : aggregationFunctions) {
-      if (aggregationFunction.getType() != COUNT) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Returns {@code true} if the given aggregations can be solved with 
dictionary, {@code false} otherwise.
+   * Returns {@code true} if the given aggregations can be solved with 
dictionary or column metadata, {@code false}
+   * otherwise.
    */
-  private static boolean isFitForDictionaryBasedPlan(AggregationFunction[] 
aggregationFunctions,
+  private static boolean isFitForNonScanBasedPlan(AggregationFunction[] 
aggregationFunctions,
       IndexSegment indexSegment) {
     for (AggregationFunction aggregationFunction : aggregationFunctions) {
-      if (!DICTIONARY_BASED_FUNCTIONS.contains(aggregationFunction.getType())) 
{
-        return false;
+      if (aggregationFunction.getType() == COUNT) {
+        continue;
       }
       ExpressionContext argument = (ExpressionContext) 
aggregationFunction.getInputExpressions().get(0);
       if (argument.getType() != ExpressionContext.Type.IDENTIFIER) {
         return false;
       }
-      String column = argument.getIdentifier();
-      Dictionary dictionary = 
indexSegment.getDataSource(column).getDictionary();
-      if (dictionary == null) {
-        return false;
+      DataSource dataSource = 
indexSegment.getDataSource(argument.getIdentifier());
+      if (DICTIONARY_BASED_FUNCTIONS.contains(aggregationFunction.getType())) {
+        if (dataSource.getDictionary() != null) {
+          continue;
+        }
+      }
+      if (METADATA_BASED_FUNCTIONS.contains(aggregationFunction.getType())) {
+        if (dataSource.getDataSourceMetadata().getMaxValue() != null
+            && dataSource.getDataSourceMetadata().getMinValue() != null) {
+          continue;
+        }
       }
+      return false;
     }
     return true;
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index e18f7f3..2bf354a 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -29,8 +29,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
 import org.apache.pinot.core.operator.query.AggregationOperator;
-import org.apache.pinot.core.operator.query.DictionaryBasedAggregationOperator;
-import org.apache.pinot.core.operator.query.MetadataBasedAggregationOperator;
+import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
 import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
@@ -165,32 +164,32 @@ public class 
MetadataAndDictionaryAggregationPlanMakerTest {
     });
     // COUNT from metadata
     entries.add(new Object[]{
-        "select count(*) from testTable", 
MetadataBasedAggregationOperator.class, AggregationOperator.class
+        "select count(*) from testTable", 
NonScanBasedAggregationOperator.class, AggregationOperator.class
     });
     // COUNT from metadata with match all filter
     entries.add(new Object[]{
-        "select count(*) from testTable where column1 > 10", 
MetadataBasedAggregationOperator.class,
+        "select count(*) from testTable where column1 > 10", 
NonScanBasedAggregationOperator.class,
         AggregationOperator.class
     });
     // MIN/MAX from dictionary
     entries.add(new Object[]{
-        "select max(daysSinceEpoch),min(daysSinceEpoch) from testTable", 
DictionaryBasedAggregationOperator.class,
+        "select max(daysSinceEpoch),min(daysSinceEpoch) from testTable", 
NonScanBasedAggregationOperator.class,
         AggregationOperator.class
     });
     // MIN/MAX from dictionary with match all filter
     entries.add(new Object[]{
         "select max(daysSinceEpoch),min(daysSinceEpoch) from testTable where 
column1 > 10",
-        DictionaryBasedAggregationOperator.class, AggregationOperator.class
+        NonScanBasedAggregationOperator.class, AggregationOperator.class
     });
     // MINMAXRANGE from dictionary
     entries.add(new Object[]{
-        "select minmaxrange(daysSinceEpoch) from testTable", 
DictionaryBasedAggregationOperator.class,
+        "select minmaxrange(daysSinceEpoch) from testTable", 
NonScanBasedAggregationOperator.class,
         AggregationOperator.class
     });
     // MINMAXRANGE from dictionary with match all filter
     entries.add(new Object[]{
         "select minmaxrange(daysSinceEpoch) from testTable where column1 > 10",
-        DictionaryBasedAggregationOperator.class, AggregationOperator.class
+        NonScanBasedAggregationOperator.class, AggregationOperator.class
     });
     // Aggregation
     entries.add(new Object[]{
@@ -203,7 +202,8 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
     });
     // COUNT from metadata, MIN from dictionary
     entries.add(new Object[]{
-        "select count(*),min(column17) from testTable", 
AggregationOperator.class, AggregationOperator.class
+        "select count(*),min(column17) from testTable", 
NonScanBasedAggregationOperator.class,
+        AggregationOperator.class
     });
     // Aggregation group-by
     entries.add(new Object[]{
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
index 513b806..f86c6a6 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
@@ -38,7 +38,7 @@ import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
 import org.apache.pinot.core.operator.query.AggregationOperator;
-import org.apache.pinot.core.operator.query.DictionaryBasedAggregationOperator;
+import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
 import 
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
 import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
@@ -193,8 +193,8 @@ public class DistinctCountQueriesTest extends 
BaseQueriesTest {
 
     // Inner segment
     for (Object operator : Arrays.asList(getOperatorForSqlQuery(query), 
getOperatorForSqlQueryWithFilter(query))) {
-      assertTrue(operator instanceof DictionaryBasedAggregationOperator);
-      IntermediateResultsBlock resultsBlock = 
((DictionaryBasedAggregationOperator) operator).nextBlock();
+      assertTrue(operator instanceof NonScanBasedAggregationOperator);
+      IntermediateResultsBlock resultsBlock = 
((NonScanBasedAggregationOperator) operator).nextBlock();
       QueriesTestUtils.testInnerSegmentExecutionStatistics(((Operator) 
operator).getExecutionStatistics(), NUM_RECORDS,
           0, 0, NUM_RECORDS);
       List<Object> aggregationResult = resultsBlock.getAggregationResult();
@@ -307,8 +307,8 @@ public class DistinctCountQueriesTest extends 
BaseQueriesTest {
     // Inner segment
     String[] interSegmentsExpectedResults = new String[5];
     for (Object operator : Arrays.asList(getOperatorForSqlQuery(query), 
getOperatorForSqlQueryWithFilter(query))) {
-      assertTrue(operator instanceof DictionaryBasedAggregationOperator);
-      IntermediateResultsBlock resultsBlock = 
((DictionaryBasedAggregationOperator) operator).nextBlock();
+      assertTrue(operator instanceof NonScanBasedAggregationOperator);
+      IntermediateResultsBlock resultsBlock = 
((NonScanBasedAggregationOperator) operator).nextBlock();
       QueriesTestUtils.testInnerSegmentExecutionStatistics(((Operator) 
operator).getExecutionStatistics(), NUM_RECORDS,
           0, 0, NUM_RECORDS);
       List<Object> aggregationResult = resultsBlock.getAggregationResult();
@@ -378,8 +378,8 @@ public class DistinctCountQueriesTest extends 
BaseQueriesTest {
     // Change log2m
     query = "SELECT DISTINCTCOUNTHLL(intColumn, 12) FROM testTable";
     operator = getOperatorForSqlQuery(query);
-    assertTrue(operator instanceof DictionaryBasedAggregationOperator);
-    aggregationResult = ((DictionaryBasedAggregationOperator) 
operator).nextBlock().getAggregationResult();
+    assertTrue(operator instanceof NonScanBasedAggregationOperator);
+    aggregationResult = ((NonScanBasedAggregationOperator) 
operator).nextBlock().getAggregationResult();
     assertNotNull(aggregationResult);
     assertEquals(aggregationResult.size(), 1);
     assertTrue(aggregationResult.get(0) instanceof HyperLogLog);
@@ -401,8 +401,8 @@ public class DistinctCountQueriesTest extends 
BaseQueriesTest {
     // Inner segment
     String[] interSegmentsExpectedResults = new String[6];
     for (Object operator : Arrays.asList(getOperatorForSqlQuery(query), 
getOperatorForSqlQueryWithFilter(query))) {
-      assertTrue(operator instanceof DictionaryBasedAggregationOperator);
-      IntermediateResultsBlock resultsBlock = 
((DictionaryBasedAggregationOperator) operator).nextBlock();
+      assertTrue(operator instanceof NonScanBasedAggregationOperator);
+      IntermediateResultsBlock resultsBlock = 
((NonScanBasedAggregationOperator) operator).nextBlock();
       QueriesTestUtils.testInnerSegmentExecutionStatistics(((Operator) 
operator).getExecutionStatistics(), NUM_RECORDS,
           0, 0, NUM_RECORDS);
       List<Object> aggregationResult = resultsBlock.getAggregationResult();
@@ -471,8 +471,8 @@ public class DistinctCountQueriesTest extends 
BaseQueriesTest {
     // Change log2m
     query = "SELECT DISTINCTCOUNTSMARTHLL(intColumn, 
'hllLog2m=8;hllConversionThreshold=10') FROM testTable";
     operator = getOperatorForSqlQuery(query);
-    assertTrue(operator instanceof DictionaryBasedAggregationOperator);
-    aggregationResult = ((DictionaryBasedAggregationOperator) 
operator).nextBlock().getAggregationResult();
+    assertTrue(operator instanceof NonScanBasedAggregationOperator);
+    aggregationResult = ((NonScanBasedAggregationOperator) 
operator).nextBlock().getAggregationResult();
     assertNotNull(aggregationResult);
     assertEquals(aggregationResult.size(), 1);
     assertTrue(aggregationResult.get(0) instanceof HyperLogLog);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
index 033d17c..617ffd7 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
@@ -502,14 +502,14 @@ public class ExplainPlanQueriesTest extends 
BaseQueriesTest {
     List<Object[]> result1 = new ArrayList<>();
     result1.add(new Object[]{"BROKER_REDUCE(limit:10)", 0, -1});
     result1.add(new Object[]{"COMBINE_AGGREGATE", 1, 0});
-    result1.add(new Object[]{"AGGREGATE_METADATA", 2, 1});
+    result1.add(new Object[]{"AGGREGATE_NO_SCAN", 2, 1});
     check(query1, new ResultTable(DATA_SCHEMA, result1));
 
     String query2 = "EXPLAIN PLAN FOR SELECT min(invertedIndexCol1) FROM 
testTable";
     List<Object[]> result2 = new ArrayList<>();
     result2.add(new Object[]{"BROKER_REDUCE(limit:10)", 0, -1});
     result2.add(new Object[]{"COMBINE_AGGREGATE", 1, 0});
-    result2.add(new Object[]{"AGGREGATE_DICTIONARY", 2, 1});
+    result2.add(new Object[]{"AGGREGATE_NO_SCAN", 2, 1});
     check(query2, new ResultTable(DATA_SCHEMA, result2));
 
     String query3 =
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentPartitionedDistinctCountQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentPartitionedDistinctCountQueriesTest.java
index 0520086..06ee41e 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentPartitionedDistinctCountQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentPartitionedDistinctCountQueriesTest.java
@@ -37,7 +37,7 @@ import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
 import org.apache.pinot.core.operator.query.AggregationOperator;
-import org.apache.pinot.core.operator.query.DictionaryBasedAggregationOperator;
+import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
 import 
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
 import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
@@ -159,8 +159,8 @@ public class SegmentPartitionedDistinctCountQueriesTest 
extends BaseQueriesTest
 
     // Inner segment
     for (Object operator : Arrays.asList(getOperatorForSqlQuery(query), 
getOperatorForSqlQueryWithFilter(query))) {
-      assertTrue(operator instanceof DictionaryBasedAggregationOperator);
-      IntermediateResultsBlock resultsBlock = 
((DictionaryBasedAggregationOperator) operator).nextBlock();
+      assertTrue(operator instanceof NonScanBasedAggregationOperator);
+      IntermediateResultsBlock resultsBlock = 
((NonScanBasedAggregationOperator) operator).nextBlock();
       QueriesTestUtils.testInnerSegmentExecutionStatistics(((Operator) 
operator).getExecutionStatistics(), NUM_RECORDS,
           0, 0, NUM_RECORDS);
       List<Object> aggregationResult = resultsBlock.getAggregationResult();
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MetadataAndDictionaryAggregationPlanClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MetadataAndDictionaryAggregationPlanClusterIntegrationTest.java
index 5b3d1e1..42e3dfd 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MetadataAndDictionaryAggregationPlanClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MetadataAndDictionaryAggregationPlanClusterIntegrationTest.java
@@ -36,8 +36,7 @@ import static org.testng.Assert.assertTrue;
 
 
 /**
- * Integration test to check aggregation functions which use {@code 
DictionaryBasedAggregationPlanNode} and
- * {@code MetadataBasedAggregationPlanNode}.
+ * Integration test to check aggregation functions which use {@code 
DataSourceBasedAggregationPlanNode}
  */
 // TODO: remove this integration test and add unit test for metadata and 
dictionary based aggregation operator
 public class MetadataAndDictionaryAggregationPlanClusterIntegrationTest 
extends BaseClusterIntegrationTest {
@@ -235,44 +234,44 @@ public class 
MetadataAndDictionaryAggregationPlanClusterIntegrationTest extends
     // Check execution stats
     JsonNode response;
 
-    // Dictionary column: answered by DictionaryBasedAggregationOperator
+    // Dictionary column: answered by NonScanBasedAggregationOperator
     pqlQuery = "SELECT MAX(ArrTime) FROM " + tableName;
     response = postQuery(pqlQuery);
     assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0);
     assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
     assertEquals(response.get("totalDocs").asLong(), 
response.get("numDocsScanned").asLong());
 
-    // Non dictionary column: not answered by 
DictionaryBasedAggregationOperator
+    // Non dictionary column: answered by NonScanBasedAggregationOperator
     pqlQuery = "SELECT MAX(DepDelay) FROM " + tableName;
     response = postQuery(pqlQuery);
-    assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 
response.get("numDocsScanned").asLong());
+    assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0);
     assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
     assertEquals(response.get("totalDocs").asLong(), 
response.get("numDocsScanned").asLong());
 
     // multiple dictionary based aggregation functions, dictionary columns: 
answered by
-    // DictionaryBasedAggregationOperator
+    // NonScanBasedAggregationOperator
     pqlQuery = "SELECT MAX(ArrTime),MIN(ArrTime) FROM " + tableName;
     response = postQuery(pqlQuery);
     assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0);
     assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
     assertEquals(response.get("totalDocs").asLong(), 
response.get("numDocsScanned").asLong());
 
-    // multiple aggregation functions, mix of dictionary based and non 
dictionary based: not answered by
-    // DictionaryBasedAggregationOperator
+    // multiple aggregation functions, mix of dictionary based and non 
dictionary based: answered by
+    // NonScanBasedAggregationOperator
     pqlQuery = "SELECT MAX(ArrTime),COUNT(ArrTime) FROM " + tableName;
     response = postQuery(pqlQuery);
-    assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 
response.get("numDocsScanned").asLong());
+    assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0);
     assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
     assertEquals(response.get("totalDocs").asLong(), 
response.get("numDocsScanned").asLong());
 
-    // group by in query : not answered by DictionaryBasedAggregationOperator
+    // group by in query : not answered by NonScanBasedAggregationOperator
     pqlQuery = "SELECT MAX(ArrTime) FROM " + tableName + "  group by 
DaysSinceEpoch";
     response = postQuery(pqlQuery);
     assertTrue(response.get("numEntriesScannedPostFilter").asLong() > 0);
     assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
     assertEquals(response.get("totalDocs").asLong(), 
response.get("numDocsScanned").asLong());
 
-    // filter in query: not answered by DictionaryBasedAggregationOperator
+    // filter in query: not answered by NonScanBasedAggregationOperator
     pqlQuery = "SELECT MAX(ArrTime) FROM " + tableName + " where 
DaysSinceEpoch > 16100";
     response = postQuery(pqlQuery);
     assertTrue(response.get("numEntriesScannedPostFilter").asLong() > 0);
@@ -305,23 +304,23 @@ public class 
MetadataAndDictionaryAggregationPlanClusterIntegrationTest extends
     assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
     assertEquals(response.get("totalDocs").asLong(), 
response.get("numDocsScanned").asLong());
 
-    // group by present in query: not answered by 
MetadataBasedAggregationOperator
+    // group by present in query: not answered by 
NonScanBasedAggregationOperator
     pqlQuery = "SELECT COUNT(*) FROM " + tableName + " GROUP BY 
DaysSinceEpoch";
     response = postQuery(pqlQuery);
     assertTrue(response.get("numEntriesScannedPostFilter").asLong() > 0);
     assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
     assertEquals(response.get("totalDocs").asLong(), 
response.get("numDocsScanned").asLong());
 
-    // filter present in query: not answered by 
MetadataBasedAggregationOperator
+    // filter present in query: not answered by NonScanBasedAggregationOperator
     pqlQuery = "SELECT COUNT(*) FROM " + tableName + " WHERE DaysSinceEpoch > 
16100";
     response = postQuery(pqlQuery);
     assertEquals(response.get("numEntriesScannedPostFilter").asLong(), 0);
     assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
 
-    // mixed aggregation functions in query: not answered by 
MetadataBasedAggregationOperator
+    // mixed aggregation functions in query: not answered by 
NonScanBasedAggregationOperator
     pqlQuery = "SELECT COUNT(*),MAX(ArrTime) FROM " + tableName;
     response = postQuery(pqlQuery);
-    assertTrue(response.get("numEntriesScannedPostFilter").asLong() > 0);
+    assertTrue(response.get("numEntriesScannedPostFilter").asLong() == 0);
     assertEquals(response.get("numEntriesScannedInFilter").asLong(), 0);
     assertEquals(response.get("totalDocs").asLong(), 
response.get("numDocsScanned").asLong());
   }
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueries.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueries.java
index 21a899a..0d13c6f 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueries.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueries.java
@@ -130,13 +130,18 @@ public class BenchmarkQueries extends BaseQueriesTest {
       + "year(INT_COL) as y, month(INT_COL) as m "
       + "from MyTable group by y, m";
 
+  public static final String RAW_COLUMN_SUMMARY_STATS = "SELECT "
+      + "MIN(RAW_INT_COL), MAX(RAW_INT_COL), COUNT(*) "
+      + "FROM MyTable";
+
   @Param("1500000")
   private int _numRows;
   @Param({"EXP(0.001)", "EXP(0.5)", "EXP(0.999)"})
   String _scenario;
   @Param({
       MULTI_GROUP_BY_WITH_RAW_QUERY, MULTI_GROUP_BY_WITH_RAW_QUERY_2, 
FILTERED_QUERY, NON_FILTERED_QUERY,
-      SUM_QUERY, NO_INDEX_LIKE_QUERY, MULTI_GROUP_BY_ORDER_BY, 
MULTI_GROUP_BY_ORDER_BY_LOW_HIGH, TIME_GROUP_BY
+      SUM_QUERY, NO_INDEX_LIKE_QUERY, MULTI_GROUP_BY_ORDER_BY, 
MULTI_GROUP_BY_ORDER_BY_LOW_HIGH, TIME_GROUP_BY,
+      RAW_COLUMN_SUMMARY_STATS
   })
   String _query;
   private IndexSegment _indexSegment;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to