This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b327509  Rewrite non-aggregate group by query to distinct query (#5671)
b327509 is described below

commit b32750951f0fb24923a6280f066a6cd037e1f0cb
Author: Xiang Fu <fx19880...@gmail.com>
AuthorDate: Thu Jul 9 20:44:13 2020 -0700

    Rewrite non-aggregate group by query to distinct query (#5671)
    
    * Rewrite non-aggregate group by query to distinct query
    
    * Adding tests to DistinctQueriesTest
    
    * Update DistinctQueriesTest.java
---
 .../apache/pinot/sql/parsers/CalciteSqlParser.java |  74 +++++++-
 .../pinot/sql/parsers/CalciteSqlCompilerTest.java  |  69 +++++++
 .../org/apache/pinot/queries/BaseQueriesTest.java  |  25 ++-
 .../apache/pinot/queries/DistinctQueriesTest.java  | 208 ++++++++++++++++-----
 .../apache/pinot/queries/FastHllQueriesTest.java   |   6 +-
 ...nerSegmentAggregationMultiValueQueriesTest.java |  24 +--
 ...erSegmentAggregationSingleValueQueriesTest.java |  24 +--
 ...InnerSegmentSelectionMultiValueQueriesTest.java |  16 +-
 ...nnerSegmentSelectionSingleValueQueriesTest.java |  24 +--
 .../queries/PercentileTDigestQueriesTest.java      |   4 +-
 .../RangePredicateWithSortedInvertedIndexTest.java |   2 +-
 .../pinot/queries/SerializedBytesQueriesTest.java  |   6 +-
 .../pinot/queries/TextSearchQueriesTest.java       |   4 +-
 .../apache/pinot/queries/TransformQueriesTest.java |   4 +-
 .../tests/ClusterIntegrationTestUtils.java         |  18 +-
 .../tests/OfflineClusterIntegrationTest.java       |  41 ++++
 16 files changed, 437 insertions(+), 112 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
index de14a51..b56698b 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
@@ -19,6 +19,8 @@
 package org.apache.pinot.sql.parsers;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -196,13 +198,25 @@ public class CalciteSqlParser {
     return false;
   }
 
-  public static Set<String> extractIdentifiers(List<Expression> expressions) {
+  /**
+   * Extract all the identifiers from given expressions.
+   *
+   * @param expressions
+   * @param excludeAs if true, ignores the right side identifier for AS 
function.
+   * @return all the identifier names.
+   */
+  public static Set<String> extractIdentifiers(List<Expression> expressions, 
boolean excludeAs) {
     Set<String> identifiers = new HashSet<>();
     for (Expression expression : expressions) {
       if (expression.getIdentifier() != null) {
         identifiers.add(expression.getIdentifier().getName());
       } else if (expression.getFunctionCall() != null) {
-        
identifiers.addAll(extractIdentifiers(expression.getFunctionCall().getOperands()));
+        if (excludeAs && 
expression.getFunctionCall().getOperator().equalsIgnoreCase("AS")) {
+          
identifiers.addAll(extractIdentifiers(Arrays.asList(expression.getFunctionCall().getOperands().get(0)),
 true));
+          continue;
+        } else {
+          
identifiers.addAll(extractIdentifiers(expression.getFunctionCall().getOperands(),
 excludeAs));
+        }
       }
     }
     return identifiers;
@@ -331,12 +345,68 @@ public class CalciteSqlParser {
       pinotQuery.setFilterExpression(updatedFilterExpression);
     }
 
+    // Rewrite GroupBy to Distinct
+    rewriteNonAggregationGroupByToDistinct(pinotQuery);
+
     // Update alias
     Map<Identifier, Expression> aliasMap = 
extractAlias(pinotQuery.getSelectList());
     applyAlias(aliasMap, pinotQuery);
     validate(aliasMap, pinotQuery);
   }
 
+  /**
+   * Rewrite non-aggregate group by query to distinct query.
+   * E.g.
+   * ```
+   *   SELECT col1+col2*5 FROM foo GROUP BY col1, col2 => SELECT distinct 
col1+col2*5 FROM foo
+   *   SELECT col1, col2 FROM foo GROUP BY col1, col2 => SELECT distinct col1, 
col2 FROM foo
+   * ```
+   * @param pinotQuery
+   */
+  private static void rewriteNonAggregationGroupByToDistinct(PinotQuery 
pinotQuery) {
+    boolean hasAggregation = false;
+    for (Expression select : pinotQuery.getSelectList()) {
+      if (isAggregateExpression(select)) {
+        hasAggregation = true;
+      }
+    }
+    if (pinotQuery.getOrderByList() != null) {
+      for (Expression orderBy : pinotQuery.getOrderByList()) {
+        if (isAggregateExpression(orderBy)) {
+          hasAggregation = true;
+        }
+      }
+    }
+    if (!hasAggregation && pinotQuery.getGroupByListSize() > 0) {
+      Set<String> selectIdentifiers = 
extractIdentifiers(pinotQuery.getSelectList(), true);
+      Set<String> groupByIdentifiers = 
extractIdentifiers(pinotQuery.getGroupByList(), true);
+      if (groupByIdentifiers.containsAll(selectIdentifiers)) {
+        Expression distinctExpression = 
RequestUtils.getFunctionExpression("DISTINCT");
+        for (Expression select : pinotQuery.getSelectList()) {
+          if (isAsFunction(select)) {
+            Function asFunc = select.getFunctionCall();
+            
distinctExpression.getFunctionCall().addToOperands(asFunc.getOperands().get(0));
+          } else {
+            distinctExpression.getFunctionCall().addToOperands(select);
+          }
+        }
+        pinotQuery.setSelectList(Arrays.asList(distinctExpression));
+        pinotQuery.setGroupByList(Collections.emptyList());
+      } else {
+        selectIdentifiers.removeAll(groupByIdentifiers);
+        throw new SqlCompilationException(String.format("For non-aggregation 
group by query, all the identifiers in select clause should be in groupBys. 
Found identifier: %s",
+            Arrays.toString(selectIdentifiers.toArray(new String[0]))));
+      }
+    }
+  }
+
+  private static boolean isAsFunction(Expression expression) {
+    if (expression.getFunctionCall() != null && 
expression.getFunctionCall().getOperator().equalsIgnoreCase("AS")) {
+      return true;
+    }
+    return false;
+  }
+
   private static void invokeCompileTimeFunctions(PinotQuery pinotQuery) {
     for (int i = 0; i < pinotQuery.getSelectListSize(); i++) {
       Expression expression = 
invokeCompileTimeFunctionExpression(pinotQuery.getSelectList().get(i));
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
index 826741c..810e384 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
@@ -1724,4 +1724,73 @@ public class CalciteSqlCompilerTest {
     Assert.assertEquals(brokerRequest.getFilterQuery().getOperator(), 
FilterOperator.IS_NULL);
     Assert.assertEquals(brokerRequest.getFilterQuery().getColumn(), "col");
   }
+
+  @Test
+  public void testNonAggregationGroupByQuery() {
+    PinotQuery2BrokerRequestConverter converter = new 
PinotQuery2BrokerRequestConverter();
+    String query = "SELECT col1 FROM foo GROUP BY col1";
+    PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+    BrokerRequest brokerRequest = converter.convert(pinotQuery);
+    Assert.assertEquals(pinotQuery.getSelectListSize(), 1);
+    
Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperator().toUpperCase(),
 "DISTINCT");
+    
Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getIdentifier().getName(),
 "col1");
+
+    Assert.assertEquals(brokerRequest.getAggregationsInfo().size(), 1);
+    
Assert.assertEquals(brokerRequest.getAggregationsInfo().get(0).getAggregationType().toUpperCase(),
 "DISTINCT");
+    
Assert.assertEquals(brokerRequest.getAggregationsInfo().get(0).getAggregationParams().get("column"),
 "col1");
+
+    query = "SELECT col1, col2 FROM foo GROUP BY col1, col2";
+    pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+    brokerRequest = converter.convert(pinotQuery);
+    Assert.assertEquals(pinotQuery.getSelectListSize(), 1);
+    
Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperator().toUpperCase(),
 "DISTINCT");
+    
Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getIdentifier().getName(),
 "col1");
+    
Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(1).getIdentifier().getName(),
 "col2");
+
+    Assert.assertEquals(brokerRequest.getAggregationsInfo().size(), 1);
+    
Assert.assertEquals(brokerRequest.getAggregationsInfo().get(0).getAggregationType().toUpperCase(),
 "DISTINCT");
+    
Assert.assertEquals(brokerRequest.getAggregationsInfo().get(0).getAggregationParams().get("column"),
 "col1:col2");
+
+    query = "SELECT col1+col2*5 FROM foo GROUP BY col1, col2";
+    pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+    brokerRequest = converter.convert(pinotQuery);
+    Assert.assertEquals(pinotQuery.getSelectListSize(), 1);
+    
Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperator().toUpperCase(),
 "DISTINCT");
+    
Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getFunctionCall().getOperator(),
 "PLUS");
+    
Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(0).getIdentifier().getName(),
 "col1");
+    
Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(1).getFunctionCall().getOperator(),
 "TIMES");
+    
Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(1).getFunctionCall().getOperands().get(0).getIdentifier().getName(),
 "col2");
+    
Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(1).getFunctionCall().getOperands().get(1).getLiteral().getLongValue(),
 5L);
+
+    Assert.assertEquals(brokerRequest.getAggregationsInfo().size(), 1);
+    
Assert.assertEquals(brokerRequest.getAggregationsInfo().get(0).getAggregationType().toUpperCase(),
 "DISTINCT");
+    
Assert.assertEquals(brokerRequest.getAggregationsInfo().get(0).getAggregationParams().get("column"),
 "plus(col1,times(col2,'5'))");
+
+    query = "SELECT col1+col2*5 AS col3 FROM foo GROUP BY col1, col2";
+    pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+    brokerRequest = converter.convert(pinotQuery);
+    Assert.assertEquals(pinotQuery.getSelectListSize(), 1);
+    
Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperator().toUpperCase(),
 "DISTINCT");
+    
Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getFunctionCall().getOperator(),
 "PLUS");
+    
Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(0).getIdentifier().getName(),
 "col1");
+    
Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(1).getFunctionCall().getOperator(),
 "TIMES");
+    
Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(1).getFunctionCall().getOperands().get(0).getIdentifier().getName(),
 "col2");
+    
Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(1).getFunctionCall().getOperands().get(1).getLiteral().getLongValue(),
 5L);
+
+    Assert.assertEquals(brokerRequest.getAggregationsInfo().size(), 1);
+    
Assert.assertEquals(brokerRequest.getAggregationsInfo().get(0).getAggregationType().toUpperCase(),
 "DISTINCT");
+    
Assert.assertEquals(brokerRequest.getAggregationsInfo().get(0).getAggregationParams().get("column"),
 "plus(col1,times(col2,'5'))");
+  }
+
+  @Test(expectedExceptions = SqlCompilationException.class)
+  public void testInvalidNonAggregationGroupBy() {
+    // Not support Aggregation functions in case statements.
+    try {
+      CalciteSqlParser.compileToPinotQuery("SELECT col1+col2 FROM foo GROUP BY 
col1");
+    } catch (SqlCompilationException e) {
+      Assert.assertEquals(e.getMessage(),
+          "For non-aggregation group by query, all the identifiers in select 
clause should be in groupBys. Found identifier: [col2]");
+      throw e;
+    }
+  }
 }
\ No newline at end of file
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
index 51ad264..619302c 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
@@ -64,7 +64,7 @@ public abstract class BaseQueriesTest {
    * <p>Use this to test a single operator.
    */
   @SuppressWarnings({"rawtypes", "unchecked"})
-  protected <T extends Operator> T getOperatorForQuery(String pqlQuery) {
+  protected <T extends Operator> T getOperatorForPqlQuery(String pqlQuery) {
     QueryContext queryContext = 
QueryContextConverterUtils.getQueryContextFromPQL(pqlQuery);
     return (T) PLAN_MAKER.makeSegmentPlanNode(getIndexSegment(), 
queryContext).run();
   }
@@ -74,8 +74,27 @@ public abstract class BaseQueriesTest {
    * <p>Use this to test a single operator.
    */
   @SuppressWarnings("rawtypes")
-  protected <T extends Operator> T getOperatorForQueryWithFilter(String 
pqlQuery) {
-    return getOperatorForQuery(pqlQuery + getFilter());
+  protected <T extends Operator> T getOperatorForPqlQueryWithFilter(String 
pqlQuery) {
+    return getOperatorForPqlQuery(pqlQuery + getFilter());
+  }
+
+  /**
+   * Run SQL query on single index segment.
+   * <p>Use this to test a single operator.
+   */
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  protected <T extends Operator> T getOperatorForSqlQuery(String sqlQuery) {
+    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContextFromSQL(sqlQuery);
+    return (T) PLAN_MAKER.makeSegmentPlanNode(getIndexSegment(), 
queryContext).run();
+  }
+
+  /**
+   * Run SQL query with hard-coded filter on single index segment.
+   * <p>Use this to test a single operator.
+   */
+  @SuppressWarnings("rawtypes")
+  protected <T extends Operator> T getOperatorForSqlQueryWithFilter(String 
sqlQuery) {
+    return getOperatorForSqlQuery(sqlQuery + getFilter());
   }
 
   /**
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
index d0be665..19afd73 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
@@ -179,18 +179,15 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    *   <li>Selecting some columns with filter that does not match any 
record</li>
    * </ul>
    */
-  @Test
-  public void testDistinctInnerSegment()
+  private void testDistinctInnerSegmentHelper(String[] queries, boolean isPql)
       throws Exception {
     _indexSegment = createSegment(0, generateRecords(0));
     try {
       {
         // Test selecting all columns
-        String query =
-            "SELECT DISTINCT(intColumn, longColumn, floatColumn, doubleColumn, 
stringColumn, bytesColumn) FROM testTable LIMIT 10000";
 
         // Check data schema
-        DistinctTable distinctTable = getDistinctTableInnerSegment(query);
+        DistinctTable distinctTable = getDistinctTableInnerSegment(queries[0], 
isPql);
         DataSchema dataSchema = distinctTable.getDataSchema();
         assertEquals(dataSchema.getColumnNames(),
             new String[]{"intColumn", "longColumn", "floatColumn", 
"doubleColumn", "stringColumn", "bytesColumn"});
@@ -220,11 +217,9 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       }
       {
         // Test selecting some columns with filter
-        String query =
-            "SELECT DISTINCT(stringColumn, bytesColumn, floatColumn) FROM 
testTable WHERE intColumn >= 60 LIMIT 10000";
 
         // Check data schema
-        DistinctTable distinctTable = getDistinctTableInnerSegment(query);
+        DistinctTable distinctTable = getDistinctTableInnerSegment(queries[1], 
isPql);
         DataSchema dataSchema = distinctTable.getDataSchema();
         assertEquals(dataSchema.getColumnNames(), new String[]{"stringColumn", 
"bytesColumn", "floatColumn"});
         assertEquals(dataSchema.getColumnDataTypes(),
@@ -250,10 +245,9 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       }
       {
         // Test selecting some columns order by BYTES column
-        String query = "SELECT DISTINCT(intColumn, bytesColumn) FROM testTable 
ORDER BY bytesColumn LIMIT 5";
 
         // Check data schema
-        DistinctTable distinctTable = getDistinctTableInnerSegment(query);
+        DistinctTable distinctTable = getDistinctTableInnerSegment(queries[2], 
isPql);
         DataSchema dataSchema = distinctTable.getDataSchema();
         assertEquals(dataSchema.getColumnNames(), new String[]{"intColumn", 
"bytesColumn"});
         assertEquals(dataSchema.getColumnDataTypes(), new 
ColumnDataType[]{ColumnDataType.INT, ColumnDataType.BYTES});
@@ -274,11 +268,9 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       {
         // Test selecting some columns with transform, filter, order-by and 
limit. Spaces in 'add' are intentional
         // to ensure that AggregationFunction arguments are standardized (to 
remove spaces).
-        String query =
-            "SELECT DISTINCT(ADD ( intColumn,  floatColumn  ), stringColumn) 
FROM testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, 
floatColumn) ASC LIMIT 10";
 
         // Check data schema
-        DistinctTable distinctTable = getDistinctTableInnerSegment(query);
+        DistinctTable distinctTable = getDistinctTableInnerSegment(queries[3], 
isPql);
         DataSchema dataSchema = distinctTable.getDataSchema();
         assertEquals(dataSchema.getColumnNames(), new 
String[]{"add(intColumn,floatColumn)", "stringColumn"});
         assertEquals(dataSchema.getColumnDataTypes(),
@@ -297,11 +289,9 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       }
       {
         // Test selecting some columns with filter that does not match any 
record
-        String query =
-            "SELECT DISTINCT(floatColumn, longColumn) FROM testTable WHERE 
stringColumn = 'a' ORDER BY longColumn LIMIT 10";
 
         // Check data schema, where data type should be STRING for all columns
-        DistinctTable distinctTable = getDistinctTableInnerSegment(query);
+        DistinctTable distinctTable = getDistinctTableInnerSegment(queries[4], 
isPql);
         DataSchema dataSchema = distinctTable.getDataSchema();
         assertEquals(dataSchema.getColumnNames(), new String[]{"floatColumn", 
"longColumn"});
         assertEquals(dataSchema.getColumnDataTypes(),
@@ -315,11 +305,63 @@ public class DistinctQueriesTest extends BaseQueriesTest {
     }
   }
 
+
+  /**
+   * Test DISTINCT query within a single segment.
+   * <p>The following query types are tested:
+   * <ul>
+   *   <li>Selecting all columns</li>
+   *   <li>Selecting some columns with filter</li>
+   *   <li>Selecting some columns order by BYTES column</li>
+   *   <li>Selecting some columns transform, filter, order-by and limit</li>
+   *   <li>Selecting some columns with filter that does not match any 
record</li>
+   * </ul>
+   */
+  @Test
+  public void testDistinctInnerSegment()
+      throws Exception {
+    testDistinctInnerSegmentHelper(new String[]{
+        "SELECT DISTINCT(intColumn, longColumn, floatColumn, doubleColumn, 
stringColumn, bytesColumn) FROM testTable LIMIT 10000",
+        "SELECT DISTINCT(stringColumn, bytesColumn, floatColumn) FROM 
testTable WHERE intColumn >= 60 LIMIT 10000",
+        "SELECT DISTINCT(intColumn, bytesColumn) FROM testTable ORDER BY 
bytesColumn LIMIT 5",
+        "SELECT DISTINCT(ADD ( intColumn,  floatColumn  ), stringColumn) FROM 
testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, 
floatColumn) ASC LIMIT 10",
+        "SELECT DISTINCT(floatColumn, longColumn) FROM testTable WHERE 
stringColumn = 'a' ORDER BY longColumn LIMIT 10"
+    }, true);
+  }
+
+  /**
+   * Test Non-Aggregation GroupBy query rewrite to Distinct query within a 
single segment.
+   * <p>The following query types are tested:
+   * <ul>
+   *   <li>Selecting all columns</li>
+   *   <li>Selecting some columns with filter</li>
+   *   <li>Selecting some columns order by BYTES column</li>
+   *   <li>Selecting some columns transform, filter, order-by and limit</li>
+   *   <li>Selecting some columns with filter that does not match any 
record</li>
+   * </ul>
+   */
+  @Test
+  public void testNonAggGroupByRewriteToDistinctInnerSegment()
+      throws Exception {
+    testDistinctInnerSegmentHelper(new String[]{
+        "SELECT intColumn, longColumn, floatColumn, doubleColumn, 
stringColumn, bytesColumn FROM testTable GROUP BY intColumn, longColumn, 
floatColumn, doubleColumn, stringColumn, bytesColumn LIMIT 10000",
+        "SELECT stringColumn, bytesColumn, floatColumn FROM testTable WHERE 
intColumn >= 60 GROUP BY stringColumn, bytesColumn, floatColumn LIMIT 10000",
+        "SELECT intColumn, bytesColumn FROM testTable GROUP BY intColumn, 
bytesColumn ORDER BY bytesColumn LIMIT 5",
+        "SELECT ADD ( intColumn,  floatColumn  ), stringColumn FROM testTable 
WHERE longColumn < 60 GROUP BY ADD ( intColumn,  floatColumn  ), stringColumn 
ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
+        "SELECT floatColumn, longColumn FROM testTable WHERE stringColumn = 
'a' GROUP BY floatColumn, longColumn ORDER BY longColumn LIMIT 10"
+    }, false);
+  }
+
   /**
    * Helper method to get the DistinctTable result for one single segment for 
the given query.
    */
-  private DistinctTable getDistinctTableInnerSegment(String query) {
-    AggregationOperator aggregationOperator = getOperatorForQuery(query);
+  private DistinctTable getDistinctTableInnerSegment(String query, boolean 
isPql) {
+    AggregationOperator aggregationOperator;
+    if (isPql) {
+      aggregationOperator = getOperatorForPqlQuery(query);
+    } else {
+      aggregationOperator = getOperatorForSqlQuery(query);
+    }
     List<Object> aggregationResult = 
aggregationOperator.nextBlock().getAggregationResult();
     assertNotNull(aggregationResult);
     assertEquals(aggregationResult.size(), 1);
@@ -347,8 +389,7 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    *   </li>
    * </ul>
    */
-  @Test
-  public void testDistinctInterSegment()
+  private void testDistinctInterSegmentHelper(String[] pqlQueries, String[] 
sqlQueries)
       throws Exception {
     ImmutableSegment segment0 = createSegment(0, generateRecords(0));
     ImmutableSegment segment1 = createSegment(1, generateRecords(1000));
@@ -356,10 +397,8 @@ public class DistinctQueriesTest extends BaseQueriesTest {
     try {
       {
         // Test selecting all columns
-        String pqlQuery =
-            "SELECT DISTINCT(intColumn, longColumn, floatColumn, doubleColumn, 
stringColumn, bytesColumn) FROM testTable LIMIT 10000";
-        String sqlQuery =
-            "SELECT DISTINCT intColumn, longColumn, floatColumn, doubleColumn, 
stringColumn, bytesColumn FROM testTable LIMIT 10000";
+        String pqlQuery = pqlQueries[0];
+        String sqlQuery = sqlQueries[0];
 
         // Check data schema
         BrokerResponseNative pqlResponse = 
getBrokerResponseForPqlQuery(pqlQuery);
@@ -411,10 +450,8 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       }
       {
         // Test selecting some columns with filter
-        String pqlQuery =
-            "SELECT DISTINCT(stringColumn, bytesColumn, floatColumn) FROM 
testTable WHERE intColumn >= 60 LIMIT 10000";
-        String sqlQuery =
-            "SELECT DISTINCT stringColumn, bytesColumn, floatColumn FROM 
testTable WHERE intColumn >= 60 LIMIT 10000";
+        String pqlQuery = pqlQueries[1];
+        String sqlQuery = sqlQueries[1];
 
         // Check data schema
         BrokerResponseNative pqlResponse = 
getBrokerResponseForPqlQuery(pqlQuery);
@@ -460,8 +497,8 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       }
       {
         // Test selecting some columns order by BYTES column
-        String pqlQuery = "SELECT DISTINCT(intColumn, bytesColumn) FROM 
testTable ORDER BY bytesColumn LIMIT 5";
-        String sqlQuery = "SELECT DISTINCT intColumn, bytesColumn FROM 
testTable ORDER BY bytesColumn LIMIT 5";
+        String pqlQuery = pqlQueries[2];
+        String sqlQuery = sqlQueries[2];
 
         // Check data schema
         BrokerResponseNative pqlResponse = 
getBrokerResponseForPqlQuery(pqlQuery);
@@ -498,10 +535,8 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       }
       {
         // Test selecting some columns with transform, filter, order-by and 
limit
-        String pqlQuery =
-            "SELECT DISTINCT(ADD(intColumn, floatColumn), stringColumn) FROM 
testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, 
floatColumn) ASC LIMIT 10";
-        String sqlQuery =
-            "SELECT DISTINCT ADD(intColumn, floatColumn), stringColumn FROM 
testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, 
floatColumn) ASC LIMIT 10";
+        String pqlQuery = pqlQueries[3];
+        String sqlQuery = sqlQueries[3];
 
         // Check data schema
         BrokerResponseNative pqlResponse = 
getBrokerResponseForPqlQuery(pqlQuery);
@@ -537,10 +572,8 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       }
       {
         // Test selecting some columns with filter that does not match any 
record
-        String pqlQuery =
-            "SELECT DISTINCT(floatColumn, longColumn) FROM testTable WHERE 
stringColumn = 'a' ORDER BY longColumn LIMIT 10";
-        String sqlQuery =
-            "SELECT DISTINCT floatColumn, longColumn FROM testTable WHERE 
stringColumn = 'a' ORDER BY longColumn LIMIT 10";
+        String pqlQuery = pqlQueries[4];
+        String sqlQuery = sqlQueries[4];
 
         // Check data schema, where data type should be STRING for all columns
         BrokerResponseNative pqlResponse = 
getBrokerResponseForPqlQuery(pqlQuery);
@@ -564,10 +597,8 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       {
         // Test selecting some columns with filter that does not match any 
record in one segment but matches some
         // records in the other segment
-        String pqlQuery =
-            "SELECT DISTINCT(intColumn) FROM testTable WHERE floatColumn > 200 
ORDER BY intColumn ASC LIMIT 5";
-        String sqlQuery =
-            "SELECT DISTINCT intColumn FROM testTable WHERE floatColumn > 200 
ORDER BY intColumn ASC LIMIT 5";
+        String pqlQuery = pqlQueries[5];
+        String sqlQuery = sqlQueries[5];
 
         // Check data schema
         BrokerResponseNative pqlResponse = 
getBrokerResponseForPqlQuery(pqlQuery);
@@ -599,10 +630,8 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       {
         // Test electing some columns with filter that does not match any 
record in one server but matches some records
         // in the other server
-        String pqlQuery =
-            "SELECT DISTINCT(longColumn) FROM testTable WHERE doubleColumn < 
200 ORDER BY longColumn DESC LIMIT 5";
-        String sqlQuery =
-            "SELECT DISTINCT longColumn FROM testTable WHERE doubleColumn < 
200 ORDER BY longColumn DESC LIMIT 5";
+        String pqlQuery = pqlQueries[6];
+        String sqlQuery = sqlQueries[6];
 
         QueryContext pqlQueryContext = 
QueryContextConverterUtils.getQueryContextFromPQL(pqlQuery);
         BrokerResponseNative pqlResponse = 
queryServersWithDifferentSegments(pqlQueryContext, segment0, segment1);
@@ -644,6 +673,93 @@ public class DistinctQueriesTest extends BaseQueriesTest {
   }
 
   /**
+   * Test DISTINCT query across multiple segments and servers (2 servers, each 
with 2 segments).
+   * <p>Both PQL and SQL format are tested.
+   * <p>The following query types are tested:
+   * <ul>
+   *   <li>Selecting all columns</li>
+   *   <li>Selecting some columns with filter</li>
+   *   <li>Selecting some columns order by BYTES column</li>
+   *   <li>Selecting some columns transform, filter, order-by and limit</li>
+   *   <li>Selecting some columns with filter that does not match any 
record</li>
+   *   <li>
+   *     Selecting some columns with filter that does not match any record in 
one segment but matches some records in
+   *     the other segment
+   *   </li>
+   *   <li>
+   *     Selecting some columns with filter that does not match any record in 
one server but matches some records in the
+   *     other server
+   *   </li>
+   * </ul>
+   */
+  @Test
+  public void testDistinctInterSegment()
+      throws Exception {
+    String[] pqlQueries = new String[] {
+        "SELECT DISTINCT(intColumn, longColumn, floatColumn, doubleColumn, 
stringColumn, bytesColumn) FROM testTable LIMIT 10000",
+        "SELECT DISTINCT(stringColumn, bytesColumn, floatColumn) FROM 
testTable WHERE intColumn >= 60 LIMIT 10000",
+        "SELECT DISTINCT(intColumn, bytesColumn) FROM testTable ORDER BY 
bytesColumn LIMIT 5",
+        "SELECT DISTINCT(ADD(intColumn, floatColumn), stringColumn) FROM 
testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, 
floatColumn) ASC LIMIT 10",
+        "SELECT DISTINCT(floatColumn, longColumn) FROM testTable WHERE 
stringColumn = 'a' ORDER BY longColumn LIMIT 10",
+        "SELECT DISTINCT(intColumn) FROM testTable WHERE floatColumn > 200 
ORDER BY intColumn ASC LIMIT 5",
+        "SELECT DISTINCT(longColumn) FROM testTable WHERE doubleColumn < 200 
ORDER BY longColumn DESC LIMIT 5",
+    };
+    String[] sqlQueries = new String[] {
+        "SELECT DISTINCT intColumn, longColumn, floatColumn, doubleColumn, 
stringColumn, bytesColumn FROM testTable LIMIT 10000",
+        "SELECT DISTINCT stringColumn, bytesColumn, floatColumn FROM testTable 
WHERE intColumn >= 60 LIMIT 10000",
+        "SELECT DISTINCT intColumn, bytesColumn FROM testTable ORDER BY 
bytesColumn LIMIT 5",
+        "SELECT DISTINCT ADD(intColumn, floatColumn), stringColumn FROM 
testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, 
floatColumn) ASC LIMIT 10",
+        "SELECT DISTINCT floatColumn, longColumn FROM testTable WHERE 
stringColumn = 'a' ORDER BY longColumn LIMIT 10",
+        "SELECT DISTINCT intColumn FROM testTable WHERE floatColumn > 200 
ORDER BY intColumn ASC LIMIT 5",
+        "SELECT DISTINCT longColumn FROM testTable WHERE doubleColumn < 200 
ORDER BY longColumn DESC LIMIT 5",
+    };
+    testDistinctInterSegmentHelper(pqlQueries, sqlQueries);
+  }
+
+  /**
+   * Test Non-Aggregation GroupBy query rewrite to Distinct query across 
multiple segments and servers (2 servers, each with 2 segments).
+   * <p>Only SQL format are tested.
+   * <p>The following query types are tested:
+   * <ul>
+   *   <li>Selecting all columns</li>
+   *   <li>Selecting some columns with filter</li>
+   *   <li>Selecting some columns order by BYTES column</li>
+   *   <li>Selecting some columns transform, filter, order-by and limit</li>
+   *   <li>Selecting some columns with filter that does not match any 
record</li>
+   *   <li>
+   *     Selecting some columns with filter that does not match any record in 
one segment but matches some records in
+   *     the other segment
+   *   </li>
+   *   <li>
+   *     Selecting some columns with filter that does not match any record in 
one server but matches some records in the
+   *     other server
+   *   </li>
+   * </ul>
+   */
+  @Test
+  public void testNonAggGroupByRewriteToDistinctInterSegment()
+      throws Exception {
+    String[] pqlQueries = new String[] {
+        "SELECT DISTINCT(intColumn, longColumn, floatColumn, doubleColumn, 
stringColumn, bytesColumn) FROM testTable LIMIT 10000",
+        "SELECT DISTINCT(stringColumn, bytesColumn, floatColumn) FROM 
testTable WHERE intColumn >= 60 LIMIT 10000",
+        "SELECT DISTINCT(intColumn, bytesColumn) FROM testTable ORDER BY 
bytesColumn LIMIT 5",
+        "SELECT DISTINCT(ADD(intColumn, floatColumn), stringColumn) FROM 
testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, 
floatColumn) ASC LIMIT 10",
+        "SELECT DISTINCT(floatColumn, longColumn) FROM testTable WHERE 
stringColumn = 'a' ORDER BY longColumn LIMIT 10",
+        "SELECT DISTINCT(intColumn) FROM testTable WHERE floatColumn > 200 
ORDER BY intColumn ASC LIMIT 5",
+        "SELECT DISTINCT(longColumn) FROM testTable WHERE doubleColumn < 200 
ORDER BY longColumn DESC LIMIT 5",
+    };
+    String[] sqlQueries = new String[] {
+        "SELECT intColumn, longColumn, floatColumn, doubleColumn, 
stringColumn, bytesColumn FROM testTable GROUP BY intColumn, longColumn, 
floatColumn, doubleColumn, stringColumn, bytesColumn LIMIT 10000",
+        "SELECT stringColumn, bytesColumn, floatColumn FROM testTable WHERE 
intColumn >= 60 GROUP BY stringColumn, bytesColumn, floatColumn LIMIT 10000",
+        "SELECT intColumn, bytesColumn FROM testTable GROUP BY intColumn, 
bytesColumn ORDER BY bytesColumn LIMIT 5",
+        "SELECT ADD(intColumn, floatColumn), stringColumn FROM testTable WHERE 
longColumn < 60 GROUP BY ADD(intColumn, floatColumn), stringColumn ORDER BY 
stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
+        "SELECT floatColumn, longColumn FROM testTable WHERE stringColumn = 
'a' GROUP BY floatColumn, longColumn ORDER BY longColumn LIMIT 10",
+        "SELECT intColumn FROM testTable WHERE floatColumn > 200 GROUP BY 
intColumn ORDER BY intColumn ASC LIMIT 5",
+        "SELECT longColumn FROM testTable WHERE doubleColumn < 200 GROUP BY 
longColumn ORDER BY longColumn DESC LIMIT 5",
+    };
+    testDistinctInterSegmentHelper(pqlQueries, sqlQueries);
+  }
+  /**
    * Helper method to query 2 servers with different segments. Server0 will 
have 2 copies of segment0; Server1 will have
    * 2 copies of segment1.
    */
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/FastHllQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/FastHllQueriesTest.java
index 386ae27..7429f3d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/FastHllQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/FastHllQueriesTest.java
@@ -107,7 +107,7 @@ public class FastHllQueriesTest extends BaseQueriesTest {
 
     // Test inner segment queries
     // Test base query
-    AggregationOperator aggregationOperator = getOperatorForQuery(BASE_QUERY);
+    AggregationOperator aggregationOperator = 
getOperatorForPqlQuery(BASE_QUERY);
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
     ExecutionStatistics executionStatistics = 
aggregationOperator.getExecutionStatistics();
     QueriesTestUtils.testInnerSegmentExecutionStatistics(executionStatistics, 
30000L, 0L, 60000L, 30000L);
@@ -115,7 +115,7 @@ public class FastHllQueriesTest extends BaseQueriesTest {
     Assert.assertEquals(((HyperLogLog) 
aggregationResult.get(0)).cardinality(), 21L);
     Assert.assertEquals(((HyperLogLog) 
aggregationResult.get(1)).cardinality(), 1762L);
     // Test query with filter
-    aggregationOperator = getOperatorForQueryWithFilter(BASE_QUERY);
+    aggregationOperator = getOperatorForPqlQueryWithFilter(BASE_QUERY);
     resultsBlock = aggregationOperator.nextBlock();
     executionStatistics = aggregationOperator.getExecutionStatistics();
     QueriesTestUtils.testInnerSegmentExecutionStatistics(executionStatistics, 
6129L, 84134L, 12258L, 30000L);
@@ -123,7 +123,7 @@ public class FastHllQueriesTest extends BaseQueriesTest {
     Assert.assertEquals(((HyperLogLog) 
aggregationResult.get(0)).cardinality(), 17L);
     Assert.assertEquals(((HyperLogLog) 
aggregationResult.get(1)).cardinality(), 1197L);
     // Test query with group-by
-    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForQuery(BASE_QUERY + GROUP_BY);
+    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForPqlQuery(BASE_QUERY + GROUP_BY);
     resultsBlock = aggregationGroupByOperator.nextBlock();
     executionStatistics = aggregationGroupByOperator.getExecutionStatistics();
     QueriesTestUtils.testInnerSegmentExecutionStatistics(executionStatistics, 
30000L, 0L, 90000L, 30000L);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationMultiValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationMultiValueQueriesTest.java
index 16cb48e..670d6b1 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationMultiValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationMultiValueQueriesTest.java
@@ -45,7 +45,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
     String query = "SELECT" + AGGREGATION + " FROM testTable";
 
     // Test query without filter.
-    AggregationOperator aggregationOperator = getOperatorForQuery(query);
+    AggregationOperator aggregationOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
 100000L, 0L, 400000L,
@@ -55,7 +55,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
             1182655, 83439903673981L, 100000L);
 
     // Test query with filter.
-    aggregationOperator = getOperatorForQueryWithFilter(query);
+    aggregationOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
 15620L, 275416, 62480L,
@@ -70,7 +70,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
     String query = "SELECT" + MULTI_VALUE_AGGREGATION + " FROM testTable";
 
     // Test query without filter.
-    AggregationOperator aggregationOperator = getOperatorForQuery(query);
+    AggregationOperator aggregationOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
 100000L, 0L, 200000L,
@@ -80,7 +80,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
             201, 121081150452570L, 106688L);
 
     // Test query with filter.
-    aggregationOperator = getOperatorForQueryWithFilter(query);
+    aggregationOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
 15620L, 275416L, 31240L,
@@ -95,7 +95,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
     String query = "SELECT" + AGGREGATION + " FROM testTable" + SMALL_GROUP_BY;
 
     // Test query without filter.
-    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForQuery(query);
+    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = 
aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(),
 100000L, 0L, 500000L,
@@ -105,7 +105,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
             2100941020, 117939666, 23061775005L, 26L);
 
     // Test query with filter.
-    aggregationGroupByOperator = getOperatorForQueryWithFilter(query);
+    aggregationGroupByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(),
 15620L, 275416L,
@@ -120,7 +120,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
     String query = "SELECT" + AGGREGATION + " FROM testTable" + 
MEDIUM_GROUP_BY;
 
     // Test query without filter.
-    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForQuery(query);
+    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = 
aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(),
 100000L, 0L, 700000L,
@@ -130,7 +130,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
             1095214422L, 1547156787, 528554902, 52058876L, 1L);
 
     // Test query with filter.
-    aggregationGroupByOperator = getOperatorForQueryWithFilter(query);
+    aggregationGroupByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(),
 15620L, 275416L,
@@ -145,7 +145,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
     String query = "SELECT" + AGGREGATION + " FROM testTable" + LARGE_GROUP_BY;
 
     // Test query without filter.
-    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForQuery(query);
+    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = 
aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(),
 100000L, 0L, 700000L,
@@ -154,7 +154,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
         "240129976\tL\t2147483647\t2147483647", 1L, 240129976L, 1649812746, 
2077178039, 1952924139L, 1L);
 
     // Test query with filter.
-    aggregationGroupByOperator = getOperatorForQueryWithFilter(query);
+    aggregationGroupByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(),
 15620L, 275416L,
@@ -168,7 +168,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
     String query = "SELECT" + AGGREGATION + " FROM testTable" + 
VERY_LARGE_GROUP_BY;
 
     // Test query without filter.
-    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForQuery(query);
+    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = 
aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(),
 100000L, 0L, 700000L,
@@ -178,7 +178,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest 
extends BaseMultiValue
         675163196L, 1L);
 
     // Test query with filter.
-    aggregationGroupByOperator = getOperatorForQueryWithFilter(query);
+    aggregationGroupByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(),
 15620L, 275416L,
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
index 27dde46..9d5408c 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
@@ -49,7 +49,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest 
extends BaseSingleVal
     String query = "SELECT" + AGGREGATION + " FROM testTable";
 
     // Test query without filter.
-    AggregationOperator aggregationOperator = getOperatorForQuery(query);
+    AggregationOperator aggregationOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
 30000L, 0L, 120000L, 30000L);
@@ -58,7 +58,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest 
extends BaseSingleVal
             1689277, 28175373944314L, 30000L);
 
     // Test query with filter.
-    aggregationOperator = getOperatorForQueryWithFilter(query);
+    aggregationOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(),
 6129L, 84134L, 24516L,
@@ -73,7 +73,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest 
extends BaseSingleVal
     String query = "SELECT" + AGGREGATION + " FROM testTable" + SMALL_GROUP_BY;
 
     // Test query without filter.
-    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForQuery(query);
+    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = 
aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(),
 30000L, 0L, 150000L,
@@ -83,7 +83,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest 
extends BaseSingleVal
             1215316262, 1328642550, 788414092L, 1L);
 
     // Test query with filter.
-    aggregationGroupByOperator = getOperatorForQueryWithFilter(query);
+    aggregationGroupByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(),
 6129L, 84134L, 30645L,
@@ -98,7 +98,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest 
extends BaseSingleVal
     String query = "SELECT" + AGGREGATION + " FROM testTable" + 
MEDIUM_GROUP_BY;
 
     // Test query without filter.
-    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForQuery(query);
+    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = 
aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(),
 30000L, 0L, 210000L,
@@ -108,7 +108,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest 
extends BaseSingleVal
             4L, 2062187196L, 1988589001, 394608493, 4782388964L, 4L);
 
     // Test query with filter.
-    aggregationGroupByOperator = getOperatorForQueryWithFilter(query);
+    aggregationGroupByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(),
 6129L, 84134L, 42903L,
@@ -122,7 +122,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest 
extends BaseSingleVal
     String query = "SELECT" + AGGREGATION + " FROM testTable" + LARGE_GROUP_BY;
 
     // Test query without filter.
-    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForQuery(query);
+    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = 
aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(),
 30000L, 0L, 210000L,
@@ -131,7 +131,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest 
extends BaseSingleVal
         "484569489\t16200443\t1159557463\tP\tMaztCmmxxgguBUxPti", 2L, 
969138978L, 995355481, 16200443, 2222394270L, 2L);
 
     // Test query with filter.
-    aggregationGroupByOperator = getOperatorForQueryWithFilter(query);
+    aggregationGroupByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(),
 6129L, 84134L, 42903L,
@@ -145,7 +145,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest 
extends BaseSingleVal
     String query = "SELECT" + AGGREGATION + " FROM testTable" + 
VERY_LARGE_GROUP_BY;
 
     // Test query without filter.
-    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForQuery(query);
+    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = 
aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(),
 30000L, 0L, 270000L,
@@ -155,7 +155,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest 
extends BaseSingleVal
         204243323, 628170461, 1985159279L, 1L);
 
     // Test query with filter.
-    aggregationGroupByOperator = getOperatorForQueryWithFilter(query);
+    aggregationGroupByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         
.testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(),
 6129L, 84134L, 55161L,
@@ -174,7 +174,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest 
extends BaseSingleVal
   @Test
   public void testSingleColumnDistinct() {
     String query = "SELECT DISTINCT(column1) FROM testTable LIMIT 1000000";
-    AggregationOperator aggregationOperator = getOperatorForQuery(query);
+    AggregationOperator aggregationOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
     List<Object> operatorResult = resultsBlock.getAggregationResult();
 
@@ -206,7 +206,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest 
extends BaseSingleVal
   @Test
   public void testMultiColumnDistinct() {
     String query = "SELECT DISTINCT(column1, column3) FROM testTable LIMIT 
1000000";
-    AggregationOperator aggregationOperator = getOperatorForQuery(query);
+    AggregationOperator aggregationOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
     List<Object> operatorResult = resultsBlock.getAggregationResult();
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
index 8f11ee9..c264a85 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
@@ -41,7 +41,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest 
extends BaseMultiValueQu
     String query = "SELECT * FROM testTable LIMIT 0";
 
     // Test query without filter
-    EmptySelectionOperator emptySelectionOperator = getOperatorForQuery(query);
+    EmptySelectionOperator emptySelectionOperator = 
getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = emptySelectionOperator.nextBlock();
     ExecutionStatistics executionStatistics = 
emptySelectionOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 0L);
@@ -61,7 +61,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest 
extends BaseMultiValueQu
     Assert.assertTrue(resultsBlock.getSelectionResult().isEmpty());
 
     // Test query with filter
-    emptySelectionOperator = getOperatorForQueryWithFilter(query);
+    emptySelectionOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = emptySelectionOperator.nextBlock();
     executionStatistics = emptySelectionOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 0L);
@@ -85,7 +85,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest 
extends BaseMultiValueQu
     String query = "SELECT * FROM testTable";
 
     // Test query without filter
-    BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = 
getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = 
getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = selectionOnlyOperator.nextBlock();
     ExecutionStatistics executionStatistics = 
selectionOnlyOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -110,7 +110,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest 
extends BaseMultiValueQu
     Assert.assertEquals(firstRow[columnIndexMap.get("column6")], new 
int[]{2147483647});
 
     // Test query with filter
-    selectionOnlyOperator = getOperatorForQueryWithFilter(query);
+    selectionOnlyOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = selectionOnlyOperator.nextBlock();
     executionStatistics = selectionOnlyOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -140,7 +140,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest 
extends BaseMultiValueQu
     String query = "SELECT" + SELECTION + " FROM testTable";
 
     // Test query without filter
-    BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = 
getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = 
getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = selectionOnlyOperator.nextBlock();
     ExecutionStatistics executionStatistics = 
selectionOnlyOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -165,7 +165,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest 
extends BaseMultiValueQu
     Assert.assertEquals(firstRow[columnIndexMap.get("column6")], new 
int[]{2147483647});
 
     // Test query with filter
-    selectionOnlyOperator = getOperatorForQueryWithFilter(query);
+    selectionOnlyOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = selectionOnlyOperator.nextBlock();
     executionStatistics = selectionOnlyOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -195,7 +195,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest 
extends BaseMultiValueQu
     String query = "SELECT" + SELECTION + " FROM testTable" + ORDER_BY;
 
     // Test query without filter
-    BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = 
getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = 
getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = 
selectionOrderByOperator.nextBlock();
     ExecutionStatistics executionStatistics = 
selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 100000L);
@@ -221,7 +221,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest 
extends BaseMultiValueQu
     Assert.assertEquals(lastRow[columnIndexMap.get("column6")], new 
int[]{1252});
 
     // Test query with filter
-    selectionOrderByOperator = getOperatorForQueryWithFilter(query);
+    selectionOrderByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = selectionOrderByOperator.nextBlock();
     executionStatistics = selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 15620L);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
index 76e22dd..607857e 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
@@ -41,7 +41,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     String query = "SELECT * FROM testTable LIMIT 0";
 
     // Test query without filter
-    EmptySelectionOperator emptySelectionOperator = getOperatorForQuery(query);
+    EmptySelectionOperator emptySelectionOperator = 
getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = emptySelectionOperator.nextBlock();
     ExecutionStatistics executionStatistics = 
emptySelectionOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 0L);
@@ -60,7 +60,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     Assert.assertTrue(resultsBlock.getSelectionResult().isEmpty());
 
     // Test query with filter
-    emptySelectionOperator = getOperatorForQueryWithFilter(query);
+    emptySelectionOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = emptySelectionOperator.nextBlock();
     executionStatistics = emptySelectionOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 0L);
@@ -85,7 +85,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     String query = "SELECT * FROM testTable";
 
     // Test query without filter
-    BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = 
getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = 
getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = selectionOnlyOperator.nextBlock();
     ExecutionStatistics executionStatistics = 
selectionOnlyOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -110,7 +110,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     Assert.assertEquals((String) firstRow[columnIndexMap.get("column11")], 
"P");
 
     // Test query with filter
-    selectionOnlyOperator = getOperatorForQueryWithFilter(query);
+    selectionOnlyOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = selectionOnlyOperator.nextBlock();
     executionStatistics = selectionOnlyOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -140,7 +140,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     String query = "SELECT" + SELECTION + " FROM testTable";
 
     // Test query without filter
-    BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = 
getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = 
getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = selectionOnlyOperator.nextBlock();
     ExecutionStatistics executionStatistics = 
selectionOnlyOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -165,7 +165,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     Assert.assertEquals((String) firstRow[2], "P");
 
     // Test query with filter
-    selectionOnlyOperator = getOperatorForQueryWithFilter(query);
+    selectionOnlyOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = selectionOnlyOperator.nextBlock();
     executionStatistics = selectionOnlyOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -194,7 +194,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     String query = "SELECT" + SELECTION + " FROM testTable" + ORDER_BY;
 
     // Test query without filter
-    BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = 
getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = 
getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = 
selectionOrderByOperator.nextBlock();
     ExecutionStatistics executionStatistics = 
selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
@@ -220,7 +220,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     Assert.assertEquals(((Integer) 
lastRow[columnIndexMap.get("column1")]).intValue(), 10542595);
 
     // Test query with filter
-    selectionOrderByOperator = getOperatorForQueryWithFilter(query);
+    selectionOrderByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = selectionOrderByOperator.nextBlock();
     executionStatistics = selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 6129L);
@@ -251,7 +251,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     String query = "SELECT * " + " FROM testTable" + ORDER_BY;
 
     // Test query without filter
-    BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = 
getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = 
getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = 
selectionOrderByOperator.nextBlock();
     ExecutionStatistics executionStatistics = 
selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
@@ -278,7 +278,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     Assert.assertEquals(((Integer) 
lastRow[columnIndexMap.get("column1")]).intValue(), 10542595);
 
     // Test query with filter
-    selectionOrderByOperator = getOperatorForQueryWithFilter(query);
+    selectionOrderByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = selectionOrderByOperator.nextBlock();
     executionStatistics = selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 6129L);
@@ -310,7 +310,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     String query = "SELECT * " + " FROM testTable" + ORDER_BY + " LIMIT 5000, 
7000";
 
     // Test query without filter
-    BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = 
getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = 
getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = 
selectionOrderByOperator.nextBlock();
     ExecutionStatistics executionStatistics = 
selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
@@ -337,7 +337,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest 
extends BaseSingleValue
     Assert.assertEquals((int) lastRow[columnIndexMap.get("column1")], 
1715964282);
 
     // Test query with filter
-    selectionOrderByOperator = getOperatorForQueryWithFilter(query);
+    selectionOrderByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = selectionOrderByOperator.nextBlock();
     executionStatistics = selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 6129L);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
index d1e7023..bb17808 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
@@ -164,7 +164,7 @@ public class PercentileTDigestQueriesTest extends 
BaseQueriesTest {
   @Test
   public void testInnerSegmentAggregation() {
     // For inner segment case, percentile does not affect the intermediate 
result
-    AggregationOperator aggregationOperator = 
getOperatorForQuery(getAggregationQuery(0));
+    AggregationOperator aggregationOperator = 
getOperatorForPqlQuery(getAggregationQuery(0));
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
     List<Object> aggregationResult = resultsBlock.getAggregationResult();
     Assert.assertNotNull(aggregationResult);
@@ -193,7 +193,7 @@ public class PercentileTDigestQueriesTest extends 
BaseQueriesTest {
   @Test
   public void testInnerSegmentGroupBy() {
     // For inner segment case, percentile does not affect the intermediate 
result
-    AggregationGroupByOperator groupByOperator = 
getOperatorForQuery(getGroupByQuery(0));
+    AggregationGroupByOperator groupByOperator = 
getOperatorForPqlQuery(getGroupByQuery(0));
     IntermediateResultsBlock resultsBlock = groupByOperator.nextBlock();
     AggregationGroupByResult groupByResult = 
resultsBlock.getAggregationGroupByResult();
     Assert.assertNotNull(groupByResult);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/RangePredicateWithSortedInvertedIndexTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/RangePredicateWithSortedInvertedIndexTest.java
index 99f3a90..cd97723 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/RangePredicateWithSortedInvertedIndexTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/RangePredicateWithSortedInvertedIndexTest.java
@@ -195,7 +195,7 @@ public class RangePredicateWithSortedInvertedIndexTest 
extends BaseQueriesTest {
   }
 
   private void runQuery(String query, int count, List<Pairs.IntPair> intPairs, 
int numColumns) {
-    SelectionOnlyOperator operator = getOperatorForQuery(query);
+    SelectionOnlyOperator operator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock block = operator.nextBlock();
     Collection<Object[]> rows = block.getSelectionResult();
     assertNotNull(rows, ERROR_MESSAGE);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
index b1572b2..6b7bafb 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
@@ -225,7 +225,7 @@ public class SerializedBytesQueriesTest extends 
BaseQueriesTest {
   @Test
   public void testInnerSegmentAggregation()
       throws Exception {
-    AggregationOperator aggregationOperator = 
getOperatorForQuery(getAggregationQuery());
+    AggregationOperator aggregationOperator = 
getOperatorForPqlQuery(getAggregationQuery());
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
     List<Object> aggregationResult = resultsBlock.getAggregationResult();
     assertNotNull(aggregationResult);
@@ -386,7 +386,7 @@ public class SerializedBytesQueriesTest extends 
BaseQueriesTest {
   @Test
   public void testInnerSegmentSVGroupBy()
       throws Exception {
-    AggregationGroupByOperator groupByOperator = 
getOperatorForQuery(getSVGroupByQuery());
+    AggregationGroupByOperator groupByOperator = 
getOperatorForPqlQuery(getSVGroupByQuery());
     IntermediateResultsBlock resultsBlock = groupByOperator.nextBlock();
     AggregationGroupByResult groupByResult = 
resultsBlock.getAggregationGroupByResult();
     assertNotNull(groupByResult);
@@ -582,7 +582,7 @@ public class SerializedBytesQueriesTest extends 
BaseQueriesTest {
   @Test
   public void testInnerSegmentMVGroupBy()
       throws Exception {
-    AggregationGroupByOperator groupByOperator = 
getOperatorForQuery(getMVGroupByQuery());
+    AggregationGroupByOperator groupByOperator = 
getOperatorForPqlQuery(getMVGroupByQuery());
     IntermediateResultsBlock resultsBlock = groupByOperator.nextBlock();
     AggregationGroupByResult groupByResult = 
resultsBlock.getAggregationGroupByResult();
     assertNotNull(groupByResult);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
index e8ab6d4..fb44226 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
@@ -1066,7 +1066,7 @@ public class TextSearchQueriesTest extends 
BaseQueriesTest {  private static fin
   private void testTextSearchSelectQueryHelper(String query, int 
expectedResultSize, boolean compareGrepOutput,
       List<Serializable[]> expectedResults)
       throws Exception {
-    SelectionOnlyOperator operator = getOperatorForQuery(query);
+    SelectionOnlyOperator operator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock operatorResult = operator.nextBlock();
     List<Object[]> resultset = (List<Object[]>) 
operatorResult.getSelectionResult();
     Assert.assertNotNull(resultset);
@@ -1108,7 +1108,7 @@ public class TextSearchQueriesTest extends 
BaseQueriesTest {  private static fin
   }
 
   private void testTextSearchAggregationQueryHelper(String query, int 
expectedCount) {
-    AggregationOperator operator = getOperatorForQuery(query);
+    AggregationOperator operator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock operatorResult = operator.nextBlock();
     long count = (Long) operatorResult.getAggregationResult().get(0);
     Assert.assertEquals(expectedCount, count);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
index 0b5c9f3..1127f93 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
@@ -185,7 +185,7 @@ public class TransformQueriesTest extends BaseQueriesTest {
   }
 
   private void runAndVerifyInnerSegmentQuery(String query, double expectedSum, 
int expectedCount) {
-    AggregationOperator aggregationOperator = getOperatorForQuery(query);
+    AggregationOperator aggregationOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
     List<Object> aggregationResult = resultsBlock.getAggregationResult();
     assertNotNull(aggregationResult);
@@ -210,7 +210,7 @@ public class TransformQueriesTest extends BaseQueriesTest {
   }
 
   private void verifyDateTruncationResult(String query, String 
expectedStringKey) {
-    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForQuery(query);
+    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = 
aggregationGroupByOperator.nextBlock();
     AggregationGroupByResult aggregationGroupByResult = 
resultsBlock.getAggregationGroupByResult();
     assertNotNull(aggregationGroupByResult);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index 64b1c47..c9f5bac 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -857,7 +857,7 @@ public class ClusterIntegrationTestUtils {
     // compare results
     BrokerRequest brokerRequest =
         
PinotQueryParserFactory.get(CommonConstants.Broker.Request.SQL).compileToBrokerRequest(pinotQuery);
-    if (brokerRequest.getSelections() != null) { // selection
+    if (isSelectionQuery(brokerRequest)) { // selection
       // TODO: compare results for selection queries, w/o order by
 
       // Compare results for selection queries, with order by
@@ -867,9 +867,9 @@ public class ClusterIntegrationTestUtils {
           return;
         }
         Set<String> orderByColumns =
-            
CalciteSqlParser.extractIdentifiers(brokerRequest.getPinotQuery().getOrderByList());
+            
CalciteSqlParser.extractIdentifiers(brokerRequest.getPinotQuery().getOrderByList(),
 false);
         Set<String> selectionColumns =
-            
CalciteSqlParser.extractIdentifiers(brokerRequest.getPinotQuery().getSelectList());
+            
CalciteSqlParser.extractIdentifiers(brokerRequest.getPinotQuery().getSelectList(),
 false);
         if (!selectionColumns.containsAll(orderByColumns)) {
           // Selection columns has no overlap with order by column, don't 
compare.
           return;
@@ -881,7 +881,7 @@ public class ClusterIntegrationTestUtils {
               String brokerValue = brokerResponseRows.get(i).get(c).asText();
               String connectionValue = resultTableResultSet.getString(i, c);
               if (orderByColumns.containsAll(CalciteSqlParser
-                  
.extractIdentifiers(Arrays.asList(brokerRequest.getPinotQuery().getSelectList().get(c)))))
 {
+                  
.extractIdentifiers(Arrays.asList(brokerRequest.getPinotQuery().getSelectList().get(c)),
 false))) {
                 boolean error = fuzzyCompare(h2Value, brokerValue, 
connectionValue);
                 if (error) {
                   String failureMessage =
@@ -962,6 +962,16 @@ public class ClusterIntegrationTestUtils {
     }
   }
 
+  private static boolean isSelectionQuery(BrokerRequest brokerRequest) {
+    if (brokerRequest.getSelections() != null) {
+      return true;
+    }
+    if (brokerRequest.getAggregationsInfo() != null && 
brokerRequest.getAggregationsInfo().get(0).getAggregationType().equalsIgnoreCase("DISTINCT"))
 {
+      return true;
+    }
+    return false;
+  }
+
   private static boolean fuzzyCompare(String h2Value, String brokerValue, 
String connectionValue) {
     // Fuzzy compare expected value and actual value
     boolean error = false;
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 3cde6b5..774f209 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -1074,18 +1074,59 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     String pql = "SELECT DISTINCT(Carrier) FROM mytable LIMIT 1000000";
     String sql = "SELECT DISTINCT Carrier FROM mytable";
     testQuery(pql, Collections.singletonList(sql));
+    pql = "SELECT DISTINCT Carrier FROM mytable LIMIT 1000000";
+    testSqlQuery(pql, Collections.singletonList(sql));
 
     pql = "SELECT DISTINCT(Carrier, DestAirportID) FROM mytable LIMIT 1000000";
     sql = "SELECT DISTINCT Carrier, DestAirportID FROM mytable";
     testQuery(pql, Collections.singletonList(sql));
+    pql = "SELECT DISTINCT Carrier, DestAirportID FROM mytable LIMIT 1000000";
+    testSqlQuery(pql, Collections.singletonList(sql));
 
     pql = "SELECT DISTINCT(Carrier, DestAirportID, DestStateName) FROM mytable 
LIMIT 1000000";
     sql = "SELECT DISTINCT Carrier, DestAirportID, DestStateName FROM mytable";
     testQuery(pql, Collections.singletonList(sql));
+    pql = "SELECT DISTINCT Carrier, DestAirportID, DestStateName FROM mytable 
LIMIT 1000000";
+    testSqlQuery(pql, Collections.singletonList(sql));
 
     pql = "SELECT DISTINCT(Carrier, DestAirportID, DestCityName) FROM mytable 
LIMIT 1000000";
     sql = "SELECT DISTINCT Carrier, DestAirportID, DestCityName FROM mytable";
     testQuery(pql, Collections.singletonList(sql));
+    pql = "SELECT DISTINCT Carrier, DestAirportID, DestCityName FROM mytable 
LIMIT 1000000";
+    testSqlQuery(pql, Collections.singletonList(sql));
+  }
+
+  @Test
+  public void testNonAggregationGroupByQuery()
+      throws Exception {
+    // by default 10 rows will be returned, so use high limit
+    String pql = "SELECT Carrier FROM mytable GROUP BY Carrier LIMIT 1000000";
+    String sql = "SELECT Carrier FROM mytable GROUP BY Carrier";
+    testSqlQuery(pql, Collections.singletonList(sql));
+
+    pql = "SELECT Carrier, DestAirportID FROM mytable GROUP BY Carrier, 
DestAirportID LIMIT 1000000";
+    sql = "SELECT Carrier, DestAirportID FROM mytable GROUP BY Carrier, 
DestAirportID";
+    testSqlQuery(pql, Collections.singletonList(sql));
+
+    pql = "SELECT Carrier, DestAirportID, DestStateName FROM mytable GROUP BY 
Carrier, DestAirportID, DestStateName LIMIT 1000000";
+    sql = "SELECT Carrier, DestAirportID, DestStateName FROM mytable GROUP BY 
Carrier, DestAirportID, DestStateName";
+    testSqlQuery(pql, Collections.singletonList(sql));
+
+    pql = "SELECT Carrier, DestAirportID, DestCityName FROM mytable GROUP BY 
Carrier, DestAirportID, DestCityName LIMIT 1000000";
+    sql = "SELECT Carrier, DestAirportID, DestCityName FROM mytable GROUP BY 
Carrier, DestAirportID, DestCityName";
+    testSqlQuery(pql, Collections.singletonList(sql));
+
+    pql = "SELECT ArrTime-DepTime FROM mytable GROUP BY ArrTime, DepTime LIMIT 
1000000";
+    sql = "SELECT ArrTime-DepTime FROM mytable GROUP BY ArrTime, DepTime";
+    testSqlQuery(pql, Collections.singletonList(sql));
+
+    pql = "SELECT ArrTime-DepTime,ArrTime/3,DepTime*2 FROM mytable GROUP BY 
ArrTime, DepTime LIMIT 1000000";
+    sql = "SELECT ArrTime-DepTime,ArrTime/3,DepTime*2 FROM mytable GROUP BY 
ArrTime, DepTime";
+    testSqlQuery(pql, Collections.singletonList(sql));
+
+    pql = "SELECT ArrTime+DepTime FROM mytable GROUP BY ArrTime + DepTime 
LIMIT 1000000";
+    sql = "SELECT ArrTime+DepTime FROM mytable GROUP BY ArrTime + DepTime";
+    testSqlQuery(pql, Collections.singletonList(sql));
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to