This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e8dcba123a Fix query option validation for group-by queries (#14618)
e8dcba123a is described below

commit e8dcba123aad95ba2848f88a4b8c7b4db2e1ff26
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Sun Dec 8 17:35:26 2024 -0800

    Fix query option validation for group-by queries (#14618)
---
 .../BaseSingleStageBrokerRequestHandler.java       |  16 +-
 .../common/utils/config/QueryOptionsUtils.java     | 160 +++++----
 .../common/utils/config/QueryOptionsUtilsTest.java | 137 +++++---
 .../operator/combine/GroupByCombineOperator.java   |  26 +-
 .../streaming/StreamingGroupByCombineOperator.java |  23 +-
 .../core/plan/maker/InstancePlanMakerImplV2.java   |   2 +-
 .../core/query/reduce/GroupByDataTableReducer.java |  16 +-
 .../aggregation/function/ArrayAggFunctionTest.java | 374 +++++++++------------
 .../pinot/queries/WithOptionQueriesTest.java       | 190 -----------
 .../query/runtime/queries/QueryRunnerTest.java     | 120 +++----
 10 files changed, 438 insertions(+), 626 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index ed6c58ad0f..1364919592 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -1847,21 +1847,7 @@ public abstract class 
BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ
       throw new IllegalStateException(
           "Value for 'LIMIT' (" + limit + ") exceeds maximum allowed value of 
" + queryResponseLimit);
     }
-
-    Map<String, String> queryOptions = pinotQuery.getQueryOptions();
-    try {
-      // throw errors if options is less than 1 or invalid
-      Integer numReplicaGroupsToQuery = 
QueryOptionsUtils.getNumReplicaGroupsToQuery(queryOptions);
-      if (numReplicaGroupsToQuery != null) {
-        Preconditions.checkState(numReplicaGroupsToQuery > 0, 
"numReplicaGroups must be " + "positive number, got: %d",
-            numReplicaGroupsToQuery);
-      }
-    } catch (NumberFormatException e) {
-      String numReplicaGroupsToQuery = 
queryOptions.get(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY);
-      throw new IllegalStateException(
-          String.format("numReplicaGroups must be a positive number, got: %s", 
numReplicaGroupsToQuery));
-    }
-
+    QueryOptionsUtils.getNumReplicaGroupsToQuery(pinotQuery.getQueryOptions());
     if (pinotQuery.getDataSource().getSubquery() != null) {
       validateRequest(pinotQuery.getDataSource().getSubquery(), 
queryResponseLimit);
     }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index bcc82efbf5..1ac9e6fab8 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -98,19 +98,19 @@ public class QueryOptionsUtils {
   @Nullable
   public static Long getTimeoutMs(Map<String, String> queryOptions) {
     String timeoutMsString = queryOptions.get(QueryOptionKey.TIMEOUT_MS);
-    return checkedParseLong(QueryOptionKey.TIMEOUT_MS, timeoutMsString, 1);
+    return checkedParseLongPositive(QueryOptionKey.TIMEOUT_MS, 
timeoutMsString);
   }
 
   @Nullable
   public static Long getMaxServerResponseSizeBytes(Map<String, String> 
queryOptions) {
     String responseSize = 
queryOptions.get(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES);
-    return checkedParseLong(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, 
responseSize, 1);
+    return 
checkedParseLongPositive(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, 
responseSize);
   }
 
   @Nullable
   public static Long getMaxQueryResponseSizeBytes(Map<String, String> 
queryOptions) {
     String responseSize = 
queryOptions.get(QueryOptionKey.MAX_QUERY_RESPONSE_SIZE_BYTES);
-    return checkedParseLong(QueryOptionKey.MAX_QUERY_RESPONSE_SIZE_BYTES, 
responseSize, 1);
+    return 
checkedParseLongPositive(QueryOptionKey.MAX_QUERY_RESPONSE_SIZE_BYTES, 
responseSize);
   }
 
   public static boolean isAndScanReorderingEnabled(Map<String, String> 
queryOptions) {
@@ -179,7 +179,7 @@ public class QueryOptionsUtils {
   @Nullable
   public static Integer getNumReplicaGroupsToQuery(Map<String, String> 
queryOptions) {
     String numReplicaGroupsToQuery = 
queryOptions.get(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY);
-    return checkedParseInt(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY, 
numReplicaGroupsToQuery);
+    return checkedParseIntPositive(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY, 
numReplicaGroupsToQuery);
   }
 
   public static boolean isExplainPlanVerbose(Map<String, String> queryOptions) 
{
@@ -201,25 +201,35 @@ public class QueryOptionsUtils {
   @Nullable
   public static Integer getMaxExecutionThreads(Map<String, String> 
queryOptions) {
     String maxExecutionThreadsString = 
queryOptions.get(QueryOptionKey.MAX_EXECUTION_THREADS);
-    return checkedParseInt(QueryOptionKey.MAX_EXECUTION_THREADS, 
maxExecutionThreadsString);
+    return checkedParseIntPositive(QueryOptionKey.MAX_EXECUTION_THREADS, 
maxExecutionThreadsString);
   }
 
   @Nullable
   public static Integer getMinSegmentGroupTrimSize(Map<String, String> 
queryOptions) {
     String minSegmentGroupTrimSizeString = 
queryOptions.get(QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE);
-    return checkedParseInt(QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE, 
minSegmentGroupTrimSizeString);
+    // NOTE: Non-positive value means turning off the segment level trim
+    return uncheckedParseInt(QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE, 
minSegmentGroupTrimSizeString);
   }
 
   @Nullable
   public static Integer getMinServerGroupTrimSize(Map<String, String> 
queryOptions) {
     String minServerGroupTrimSizeString = 
queryOptions.get(QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE);
-    return checkedParseInt(QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE, 
minServerGroupTrimSizeString);
+    // NOTE: Non-positive value means turning off the segment level trim
+    return uncheckedParseInt(QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE, 
minServerGroupTrimSizeString);
   }
 
   @Nullable
   public static Integer getMinBrokerGroupTrimSize(Map<String, String> 
queryOptions) {
     String minBrokerGroupTrimSizeString = 
queryOptions.get(QueryOptionKey.MIN_BROKER_GROUP_TRIM_SIZE);
-    return checkedParseInt(QueryOptionKey.MIN_BROKER_GROUP_TRIM_SIZE, 
minBrokerGroupTrimSizeString);
+    // NOTE: Non-positive value means turning off the broker level trim
+    return uncheckedParseInt(QueryOptionKey.MIN_BROKER_GROUP_TRIM_SIZE, 
minBrokerGroupTrimSizeString);
+  }
+
+  @Nullable
+  public static Integer getGroupTrimThreshold(Map<String, String> 
queryOptions) {
+    String groupByTrimThreshold = 
queryOptions.get(QueryOptionKey.GROUP_TRIM_THRESHOLD);
+    // NOTE: Non-positive value means turning off the on-the-fly trim before 
all groups are added
+    return uncheckedParseInt(QueryOptionKey.GROUP_TRIM_THRESHOLD, 
groupByTrimThreshold);
   }
 
   public static boolean isNullHandlingEnabled(Map<String, String> 
queryOptions) {
@@ -246,73 +256,25 @@ public class QueryOptionsUtils {
   @Nullable
   public static Integer getMultiStageLeafLimit(Map<String, String> 
queryOptions) {
     String maxLeafLimitStr = 
queryOptions.get(QueryOptionKey.MULTI_STAGE_LEAF_LIMIT);
-    return checkedParseInt(QueryOptionKey.MULTI_STAGE_LEAF_LIMIT, 
maxLeafLimitStr);
+    return checkedParseIntNonNegative(QueryOptionKey.MULTI_STAGE_LEAF_LIMIT, 
maxLeafLimitStr);
   }
 
   @Nullable
   public static Integer getNumGroupsLimit(Map<String, String> queryOptions) {
     String maxNumGroupLimit = 
queryOptions.get(QueryOptionKey.NUM_GROUPS_LIMIT);
-    return checkedParseInt(QueryOptionKey.NUM_GROUPS_LIMIT, maxNumGroupLimit);
+    return checkedParseIntPositive(QueryOptionKey.NUM_GROUPS_LIMIT, 
maxNumGroupLimit);
   }
 
   @Nullable
   public static Integer getMaxInitialResultHolderCapacity(Map<String, String> 
queryOptions) {
-    String maxInitResultCap = 
queryOptions.get(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY);
-    return checkedParseInt(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY, 
maxInitResultCap);
+    String maxInitialResultHolderCapacity = 
queryOptions.get(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+    return 
checkedParseIntPositive(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY, 
maxInitialResultHolderCapacity);
   }
 
   public static boolean 
optimizeMaxInitialResultHolderCapacityEnabled(Map<String, String> queryOptions) 
{
     return 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.OPTIMIZE_MAX_INITIAL_RESULT_HOLDER_CAPACITY));
   }
 
-  @Nullable
-  public static Integer getGroupTrimThreshold(Map<String, String> 
queryOptions) {
-    String groupByTrimThreshold = 
queryOptions.get(QueryOptionKey.GROUP_TRIM_THRESHOLD);
-    return checkedParseInt(QueryOptionKey.GROUP_TRIM_THRESHOLD, 
groupByTrimThreshold);
-  }
-
-  private static Long checkedParseLong(String optionName, String optionValue, 
int minValue) {
-    try {
-      if (optionValue != null) {
-        Long value = Long.parseLong(optionValue);
-        if (value < minValue) {
-          throw longParseException(optionName, optionValue, minValue);
-        }
-        return value;
-      } else {
-        return null;
-      }
-    } catch (NumberFormatException nfe) {
-      throw longParseException(optionName, optionValue, minValue);
-    }
-  }
-
-  private static IllegalArgumentException longParseException(String 
optionName, String optionValue, int minValue) {
-    return new IllegalArgumentException(
-        String.format("%s must be a number between %d and 2^63-1, got: %s", 
optionName, minValue, optionValue));
-  }
-
-  private static Integer checkedParseInt(String optionName, String 
optionValue) {
-    try {
-      if (optionValue != null) {
-        int value = Integer.parseInt(optionValue);
-        if (value < 0) {
-          throw intParseException(optionName, optionValue);
-        }
-        return value;
-      } else {
-        return null;
-      }
-    } catch (NumberFormatException nfe) {
-      throw intParseException(optionName, optionValue);
-    }
-  }
-
-  private static IllegalArgumentException intParseException(String optionName, 
String optionValue) {
-    return new IllegalArgumentException(
-        String.format("%s must be a number between 0 and 2^31-1, got: %s", 
optionName, optionValue));
-  }
-
   public static boolean shouldDropResults(Map<String, String> queryOptions) {
     return 
Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.DROP_RESULTS));
   }
@@ -320,13 +282,13 @@ public class QueryOptionsUtils {
   @Nullable
   public static Integer getMaxStreamingPendingBlocks(Map<String, String> 
queryOptions) {
     String maxStreamingPendingBlocks = 
queryOptions.get(QueryOptionKey.MAX_STREAMING_PENDING_BLOCKS);
-    return checkedParseInt(QueryOptionKey.MAX_STREAMING_PENDING_BLOCKS, 
maxStreamingPendingBlocks);
+    return 
checkedParseIntPositive(QueryOptionKey.MAX_STREAMING_PENDING_BLOCKS, 
maxStreamingPendingBlocks);
   }
 
   @Nullable
   public static Integer getMaxRowsInJoin(Map<String, String> queryOptions) {
     String maxRowsInJoin = queryOptions.get(QueryOptionKey.MAX_ROWS_IN_JOIN);
-    return checkedParseInt(QueryOptionKey.MAX_ROWS_IN_JOIN, maxRowsInJoin);
+    return checkedParseIntPositive(QueryOptionKey.MAX_ROWS_IN_JOIN, 
maxRowsInJoin);
   }
 
   @Nullable
@@ -338,7 +300,7 @@ public class QueryOptionsUtils {
   @Nullable
   public static Integer getMaxRowsInWindow(Map<String, String> queryOptions) {
     String maxRowsInWindow = 
queryOptions.get(QueryOptionKey.MAX_ROWS_IN_WINDOW);
-    return checkedParseInt(QueryOptionKey.MAX_ROWS_IN_WINDOW, maxRowsInWindow);
+    return checkedParseIntPositive(QueryOptionKey.MAX_ROWS_IN_WINDOW, 
maxRowsInWindow);
   }
 
   @Nullable
@@ -354,4 +316,76 @@ public class QueryOptionsUtils {
   public static boolean isSecondaryWorkload(Map<String, String> queryOptions) {
     return 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.IS_SECONDARY_WORKLOAD));
   }
+
+  @Nullable
+  private static Integer uncheckedParseInt(String optionName, @Nullable String 
optionValue) {
+    if (optionValue == null) {
+      return null;
+    }
+    try {
+      return Integer.parseInt(optionValue);
+    } catch (NumberFormatException nfe) {
+      throw new IllegalArgumentException(String.format("%s must be an integer, 
got: %s", optionName, optionValue));
+    }
+  }
+
+  @Nullable
+  private static Integer checkedParseIntPositive(String optionName, @Nullable 
String optionValue) {
+    return checkedParseInt(optionName, optionValue, 1);
+  }
+
+  @Nullable
+  private static Integer checkedParseIntNonNegative(String optionName, 
@Nullable String optionValue) {
+    return checkedParseInt(optionName, optionValue, 0);
+  }
+
+  @Nullable
+  private static Integer checkedParseInt(String optionName, @Nullable String 
optionValue, int minValue) {
+    if (optionValue == null) {
+      return null;
+    }
+    int value;
+    try {
+      value = Integer.parseInt(optionValue);
+    } catch (NumberFormatException nfe) {
+      throw intParseException(optionName, optionValue, minValue);
+    }
+    if (value < minValue) {
+      throw intParseException(optionName, optionValue, minValue);
+    }
+    return value;
+  }
+
+  private static IllegalArgumentException intParseException(String optionName, 
String optionValue, int minValue) {
+    return new IllegalArgumentException(
+        String.format("%s must be a number between %d and 2^31-1, got: %s", 
optionName, minValue, optionValue));
+  }
+
+  @Nullable
+  private static Long checkedParseLongPositive(String optionName, @Nullable 
String optionValue) {
+    return checkedParseLong(optionName, optionValue, 1);
+  }
+
+  @Nullable
+  private static Long checkedParseLong(String optionName, @Nullable String 
optionValue, long minValue) {
+    if (optionValue == null) {
+      return null;
+    }
+    long value;
+    try {
+      value = Long.parseLong(optionValue);
+    } catch (NumberFormatException nfe) {
+      throw longParseException(optionName, optionValue, minValue);
+    }
+    if (value < minValue) {
+      throw longParseException(optionName, optionValue, minValue);
+    }
+    return value;
+  }
+
+  private static IllegalArgumentException longParseException(String 
optionName, @Nullable String optionValue,
+      long minValue) {
+    return new IllegalArgumentException(
+        String.format("%s must be a number between %d and 2^63-1, got: %s", 
optionName, minValue, optionValue));
+  }
 }
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
index 6f0f469c5b..b2ca6573b6 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
@@ -19,110 +19,127 @@
 
 package org.apache.pinot.common.utils.config;
 
-import com.google.common.collect.ImmutableMap;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.spi.config.table.FieldConfig;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import static 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
 
 
 public class QueryOptionsUtilsTest {
+  private static final List<String> POSITIVE_INT_KEYS =
+      List.of(NUM_REPLICA_GROUPS_TO_QUERY, MAX_EXECUTION_THREADS, 
NUM_GROUPS_LIMIT, MAX_INITIAL_RESULT_HOLDER_CAPACITY,
+          MAX_STREAMING_PENDING_BLOCKS, MAX_ROWS_IN_JOIN, MAX_ROWS_IN_WINDOW);
+  private static final List<String> NON_NEGATIVE_INT_KEYS = 
List.of(MULTI_STAGE_LEAF_LIMIT);
+  private static final List<String> UNBOUNDED_INT_KEYS =
+      List.of(MIN_SEGMENT_GROUP_TRIM_SIZE, MIN_SERVER_GROUP_TRIM_SIZE, 
MIN_BROKER_GROUP_TRIM_SIZE,
+          GROUP_TRIM_THRESHOLD);
+  private static final List<String> INT_KEYS = new ArrayList<>() {{
+    addAll(POSITIVE_INT_KEYS);
+    addAll(NON_NEGATIVE_INT_KEYS);
+    addAll(UNBOUNDED_INT_KEYS);
+  }};
+  private static final List<String> POSITIVE_LONG_KEYS =
+      List.of(TIMEOUT_MS, MAX_SERVER_RESPONSE_SIZE_BYTES, 
MAX_QUERY_RESPONSE_SIZE_BYTES);
 
   @Test
   public void shouldConvertCaseInsensitiveMapToUseCorrectValues() {
     // Given:
-    Map<String, String> configs = ImmutableMap.of(
-        "ENABLENullHandling", "true",
-        "useMULTISTAGEEngine", "false"
-    );
+    Map<String, String> configs = Map.of("ENABLENullHandling", "true", 
"useMULTISTAGEEngine", "false");
 
     // When:
     Map<String, String> resolved = 
QueryOptionsUtils.resolveCaseInsensitiveOptions(configs);
 
     // Then:
-    
Assert.assertEquals(resolved.get(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING),
 "true");
-    
Assert.assertEquals(resolved.get(CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE),
 "false");
+    assertEquals(resolved.get(ENABLE_NULL_HANDLING), "true");
+    assertEquals(resolved.get(USE_MULTISTAGE_ENGINE), "false");
   }
 
   @Test
   public void testSkipIndexesParsing() {
     String skipIndexesStr = "col1=inverted,range&col2=sorted";
-    Map<String, String> queryOptions =
-        Map.of(CommonConstants.Broker.Request.QueryOptionKey.SKIP_INDEXES, 
skipIndexesStr);
+    Map<String, String> queryOptions = Map.of(SKIP_INDEXES, skipIndexesStr);
     Map<String, Set<FieldConfig.IndexType>> skipIndexes = 
QueryOptionsUtils.getSkipIndexes(queryOptions);
-    Assert.assertEquals(skipIndexes.get("col1"),
-        Set.of(FieldConfig.IndexType.RANGE, FieldConfig.IndexType.INVERTED));
-    Assert.assertEquals(skipIndexes.get("col2"),
-        Set.of(FieldConfig.IndexType.SORTED));
+    assertEquals(skipIndexes.get("col1"), Set.of(FieldConfig.IndexType.RANGE, 
FieldConfig.IndexType.INVERTED));
+    assertEquals(skipIndexes.get("col2"), 
Set.of(FieldConfig.IndexType.SORTED));
   }
 
   @Test(expectedExceptions = RuntimeException.class)
   public void testSkipIndexesParsingInvalid() {
     String skipIndexesStr = "col1=inverted,range&col2";
-    Map<String, String> queryOptions =
-        Map.of(CommonConstants.Broker.Request.QueryOptionKey.SKIP_INDEXES, 
skipIndexesStr);
-     QueryOptionsUtils.getSkipIndexes(queryOptions);
+    Map<String, String> queryOptions = Map.of(SKIP_INDEXES, skipIndexesStr);
+    QueryOptionsUtils.getSkipIndexes(queryOptions);
   }
 
   @Test
   public void testIntegerSettingParseSuccess() {
     HashMap<String, String> map = new HashMap<>();
 
-    for (String setting : Arrays.asList(NUM_GROUPS_LIMIT, 
MAX_INITIAL_RESULT_HOLDER_CAPACITY, MULTI_STAGE_LEAF_LIMIT,
-        GROUP_TRIM_THRESHOLD, MAX_STREAMING_PENDING_BLOCKS, MAX_ROWS_IN_JOIN, 
MAX_STREAMING_PENDING_BLOCKS,
-        MAX_EXECUTION_THREADS, MIN_SEGMENT_GROUP_TRIM_SIZE, 
MIN_SERVER_GROUP_TRIM_SIZE)) {
-      map.clear();
-      for (Integer val : new Integer[]{null, 1, 10, Integer.MAX_VALUE}) {
-        map.put(setting, val != null ? String.valueOf(val) : null);
-        Assert.assertEquals(getValue(map, setting), val);
+    for (String key : INT_KEYS) {
+      for (Integer value : new Integer[]{null, 1, 10, Integer.MAX_VALUE}) {
+        map.put(key, value != null ? String.valueOf(value) : null);
+        assertEquals(getValue(map, key), value);
       }
     }
 
-    for (String setting : Arrays.asList(TIMEOUT_MS, 
MAX_SERVER_RESPONSE_SIZE_BYTES, MAX_QUERY_RESPONSE_SIZE_BYTES)) {
-      map.clear();
-      for (Long val : new Long[]{null, 1L, 10L, Long.MAX_VALUE}) {
-        map.put(setting, val != null ? String.valueOf(val) : null);
-        Assert.assertEquals(getValue(map, setting), val);
+    for (String key : POSITIVE_LONG_KEYS) {
+      for (Long value : new Long[]{null, 1L, 10L, Long.MAX_VALUE}) {
+        map.put(key, value != null ? String.valueOf(value) : null);
+        assertEquals(getValue(map, key), value);
       }
     }
   }
 
   @Test
   public void testIntegerSettingParseErrors() {
-    HashMap<String, String> map = new HashMap<>();
+    for (String key : POSITIVE_INT_KEYS) {
+      for (String value : new String[]{"-10000000000", "-2147483648", "-1", 
"0", "2147483648", "10000000000"}) {
+        try {
+          getValue(Map.of(key, value), key);
+          fail(key);
+        } catch (IllegalArgumentException ise) {
+          assertEquals(ise.getMessage(), key + " must be a number between 1 
and 2^31-1, got: " + value);
+        }
+      }
+    }
+
+    for (String key : NON_NEGATIVE_INT_KEYS) {
+      for (String value : new String[]{"-10000000000", "-2147483648", "-1", 
"2147483648", "10000000000"}) {
+        try {
+          getValue(Map.of(key, value), key);
+          fail();
+        } catch (IllegalArgumentException ise) {
+          assertEquals(ise.getMessage(), key + " must be a number between 0 
and 2^31-1, got: " + value);
+        }
+      }
+    }
 
-    for (String setting : Arrays.asList(NUM_GROUPS_LIMIT, 
MAX_INITIAL_RESULT_HOLDER_CAPACITY, MULTI_STAGE_LEAF_LIMIT,
-        GROUP_TRIM_THRESHOLD, MAX_STREAMING_PENDING_BLOCKS, MAX_ROWS_IN_JOIN, 
MAX_STREAMING_PENDING_BLOCKS,
-        MAX_EXECUTION_THREADS, MIN_SEGMENT_GROUP_TRIM_SIZE, 
MIN_SERVER_GROUP_TRIM_SIZE)) {
-      for (String val : new String[]{"-10000000000", "-2147483648", "-1", 
"2147483648", "10000000000"}) {
-        map.clear();
-        map.put(setting, val);
+    for (String key : UNBOUNDED_INT_KEYS) {
+      for (String value : new String[]{"-10000000000", "2147483648", 
"10000000000"}) {
         try {
-          getValue(map, setting);
-          Assert.fail();
+          getValue(Map.of(key, value), key);
+          fail();
         } catch (IllegalArgumentException ise) {
-          Assert.assertEquals(ise.getMessage(), setting + " must be a number 
between 0 and 2^31-1, got: " + val);
+          assertEquals(ise.getMessage(), key + " must be an integer, got: " + 
value);
         }
       }
     }
 
-    for (String setting : Arrays.asList(TIMEOUT_MS, 
MAX_SERVER_RESPONSE_SIZE_BYTES, MAX_QUERY_RESPONSE_SIZE_BYTES)) {
-      for (String val : new String[]{
+    for (String key : POSITIVE_LONG_KEYS) {
+      for (String value : new String[]{
           "-100000000000000000000", "-9223372036854775809", "-1", "0", 
"9223372036854775808", "100000000000000000000"
       }) {
-        map.clear();
-        map.put(setting, val);
         try {
-          getValue(map, setting);
-          Assert.fail();
+          getValue(Map.of(key, value), key);
+          fail();
         } catch (IllegalArgumentException ise) {
-          Assert.assertEquals(ise.getMessage(), setting + " must be a number 
between 1 and 2^63-1, got: " + val);
+          assertEquals(ise.getMessage(), key + " must be a number between 1 
and 2^63-1, got: " + value);
         }
       }
     }
@@ -130,26 +147,34 @@ public class QueryOptionsUtilsTest {
 
   private static Object getValue(Map<String, String> map, String key) {
     switch (key) {
-      //ints
+      // Positive ints
+      case NUM_REPLICA_GROUPS_TO_QUERY:
+        return QueryOptionsUtils.getNumReplicaGroupsToQuery(map);
+      case MAX_EXECUTION_THREADS:
+        return QueryOptionsUtils.getMaxExecutionThreads(map);
       case NUM_GROUPS_LIMIT:
         return QueryOptionsUtils.getNumGroupsLimit(map);
       case MAX_INITIAL_RESULT_HOLDER_CAPACITY:
         return QueryOptionsUtils.getMaxInitialResultHolderCapacity(map);
-      case MULTI_STAGE_LEAF_LIMIT:
-        return QueryOptionsUtils.getMultiStageLeafLimit(map);
-      case GROUP_TRIM_THRESHOLD:
-        return QueryOptionsUtils.getGroupTrimThreshold(map);
       case MAX_STREAMING_PENDING_BLOCKS:
         return QueryOptionsUtils.getMaxStreamingPendingBlocks(map);
       case MAX_ROWS_IN_JOIN:
         return QueryOptionsUtils.getMaxRowsInJoin(map);
-      case MAX_EXECUTION_THREADS:
-        return QueryOptionsUtils.getMaxExecutionThreads(map);
+      case MAX_ROWS_IN_WINDOW:
+        return QueryOptionsUtils.getMaxRowsInWindow(map);
+      // Non-negative ints
+      case MULTI_STAGE_LEAF_LIMIT:
+        return QueryOptionsUtils.getMultiStageLeafLimit(map);
+      // Unbounded ints
       case MIN_SEGMENT_GROUP_TRIM_SIZE:
         return QueryOptionsUtils.getMinSegmentGroupTrimSize(map);
       case MIN_SERVER_GROUP_TRIM_SIZE:
         return QueryOptionsUtils.getMinServerGroupTrimSize(map);
-      //longs
+      case MIN_BROKER_GROUP_TRIM_SIZE:
+        return QueryOptionsUtils.getMinBrokerGroupTrimSize(map);
+      case GROUP_TRIM_THRESHOLD:
+        return QueryOptionsUtils.getGroupTrimThreshold(map);
+      // Positive longs
       case TIMEOUT_MS:
         return QueryOptionsUtils.getTimeoutMs(map);
       case MAX_SERVER_RESPONSE_SIZE_BYTES:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
index 0b928b9a17..ecb0a56cbf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java
@@ -33,6 +33,7 @@ import org.apache.pinot.core.data.table.IndexedTable;
 import org.apache.pinot.core.data.table.IntermediateRecord;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
 import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
 import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
 import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
@@ -86,7 +87,8 @@ public class GroupByCombineOperator extends 
BaseSingleBlockCombineOperator<Group
         //       without ordering. Consider ordering on group-by columns if no 
ordering is specified.
         _trimSize = limit;
       }
-      _trimThreshold = queryContext.getGroupTrimThreshold();
+      int trimThreshold = queryContext.getGroupTrimThreshold();
+      _trimThreshold = trimThreshold > 0 ? trimThreshold : Integer.MAX_VALUE;
     } else {
       // Server trim is disabled
       _trimSize = Integer.MAX_VALUE;
@@ -135,16 +137,20 @@ public class GroupByCombineOperator extends 
BaseSingleBlockCombineOperator<Group
           synchronized (this) {
             if (_indexedTable == null) {
               DataSchema dataSchema = resultsBlock.getDataSchema();
-              // NOTE: Use trimSize as resultSize on server size.
-              if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
-                // special case of trim threshold where it is set to max value.
-                // there won't be any trimming during upsert in this case.
-                // thus we can avoid the overhead of read-lock and write-lock
-                // in the upsert method.
-                _indexedTable = new 
UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
+              // NOTE: Use trimSize as resultSize on server side.
+              if (_numTasks == 1) {
+                _indexedTable = new SimpleIndexedTable(dataSchema, 
_queryContext, _trimSize, _trimSize, _trimThreshold);
               } else {
-                _indexedTable =
-                    new ConcurrentIndexedTable(dataSchema, _queryContext, 
_trimSize, _trimSize, _trimThreshold);
+                if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
+                  // special case of trim threshold where it is set to max 
value.
+                  // there won't be any trimming during upsert in this case.
+                  // thus we can avoid the overhead of read-lock and write-lock
+                  // in the upsert method.
+                  _indexedTable = new 
UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
+                } else {
+                  _indexedTable =
+                      new ConcurrentIndexedTable(dataSchema, _queryContext, 
_trimSize, _trimSize, _trimThreshold);
+                }
               }
             }
           }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
index 943ed169a1..1e8c88e9ce 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java
@@ -34,6 +34,7 @@ import org.apache.pinot.core.data.table.IndexedTable;
 import org.apache.pinot.core.data.table.IntermediateRecord;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
 import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
 import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
 import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
@@ -163,16 +164,20 @@ public class StreamingGroupByCombineOperator extends 
BaseStreamingCombineOperato
           synchronized (this) {
             if (_indexedTable == null) {
               DataSchema dataSchema = resultsBlock.getDataSchema();
-              // NOTE: Use trimSize as resultSize on server size.
-              if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
-                // special case of trim threshold where it is set to max value.
-                // there won't be any trimming during upsert in this case.
-                // thus we can avoid the overhead of read-lock and write-lock
-                // in the upsert method.
-                _indexedTable = new 
UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
+              // NOTE: Use trimSize as resultSize on server side.
+              if (_numTasks == 1) {
+                _indexedTable = new SimpleIndexedTable(dataSchema, 
_queryContext, _trimSize, _trimSize, _trimThreshold);
               } else {
-                _indexedTable =
-                    new ConcurrentIndexedTable(dataSchema, _queryContext, 
_trimSize, _trimSize, _trimThreshold);
+                if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
+                  // special case of trim threshold where it is set to max 
value.
+                  // there won't be any trimming during upsert in this case.
+                  // thus we can avoid the overhead of read-lock and write-lock
+                  // in the upsert method.
+                  _indexedTable = new 
UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
+                } else {
+                  _indexedTable =
+                      new ConcurrentIndexedTable(dataSchema, _queryContext, 
_trimSize, _trimSize, _trimThreshold);
+                }
               }
             }
           }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 6912dd2586..e76a649886 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -181,7 +181,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
     // Set maxExecutionThreads
     int maxExecutionThreads;
     Integer maxExecutionThreadsFromQuery = 
QueryOptionsUtils.getMaxExecutionThreads(queryOptions);
-    if (maxExecutionThreadsFromQuery != null && maxExecutionThreadsFromQuery > 
0) {
+    if (maxExecutionThreadsFromQuery != null) {
       // Do not allow query to override the execution threads over the 
instance-level limit
       if (_maxExecutionThreads > 0) {
         maxExecutionThreads = Math.min(_maxExecutionThreads, 
maxExecutionThreadsFromQuery);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 09b4d6a156..34395febfb 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -240,11 +240,23 @@ public class GroupByDataTableReducer implements 
DataTableReducer {
     boolean hasFinalInput =
         _queryContext.isServerReturnFinalResult() || 
_queryContext.isServerReturnFinalResultKeyUnpartitioned();
     int limit = _queryContext.getLimit();
-    int trimSize = GroupByUtils.getTableCapacity(limit, 
reducerContext.getMinGroupTrimSize());
+    int minTrimSize = reducerContext.getMinGroupTrimSize();
+    int trimSize;
+    int trimThreshold;
+    if (minTrimSize > 0) {
+      trimSize = GroupByUtils.getTableCapacity(limit, minTrimSize);
+      trimThreshold = reducerContext.getGroupByTrimThreshold();
+      if (trimThreshold <= 0) {
+        trimThreshold = Integer.MAX_VALUE;
+      }
+    } else {
+      // Broker trim is disabled
+      trimSize = Integer.MAX_VALUE;
+      trimThreshold = Integer.MAX_VALUE;
+    }
     // NOTE: For query with HAVING clause, use trimSize as resultSize to 
ensure the result accuracy.
     // TODO: Resolve the HAVING clause within the IndexedTable before 
returning the result
     int resultSize = _queryContext.getHavingFilter() != null ? trimSize : 
limit;
-    int trimThreshold = reducerContext.getGroupByTrimThreshold();
     IndexedTable indexedTable;
     if (numReduceThreadsToUse == 1) {
       indexedTable =
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/ArrayAggFunctionTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/ArrayAggFunctionTest.java
index 4f062d522a..17487a7d13 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/ArrayAggFunctionTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/ArrayAggFunctionTest.java
@@ -142,65 +142,53 @@ public class ArrayAggFunctionTest extends 
AbstractAggregationFunctionTest {
   @Test
   void aggregationGroupBySVIntWithNullHandlingDisabled() {
     new DataTypeScenario(FieldSpec.DataType.INT).getDeclaringTable(false)
-        .onFirstInstance("myField",
-            "1",
-            "2",
-            "null"
-        ).andOnSecondInstance("myField",
-            "1",
-            "2",
-            "null"
-        ).whenQuery("select myField, arrayagg(myField, 'INT') from testTable 
group by myField")
-        .thenResultIs(new Object[]{1, new int[]{1, 1}}, new Object[]{2, new 
int[]{2, 2}},
-            new Object[]{Integer.MIN_VALUE, new int[]{Integer.MIN_VALUE, 
Integer.MIN_VALUE}});
+        .onFirstInstance("myField", "1", "2", "null")
+        .andOnSecondInstance("myField", "1", "2", "null")
+        .whenQuery("select myField, arrayagg(myField, 'INT') from testTable 
group by myField order by myField")
+        .thenResultIs(
+            new Object[]{Integer.MIN_VALUE, new int[]{Integer.MIN_VALUE, 
Integer.MIN_VALUE}},
+            new Object[]{1, new int[]{1, 1}},
+            new Object[]{2, new int[]{2, 2}}
+        );
   }
 
   @Test
   void aggregationGroupBySVIntWithNullHandlingEnabled() {
     new DataTypeScenario(FieldSpec.DataType.INT).getDeclaringTable(true)
-        .onFirstInstance("myField",
-            "1",
-            "2",
-            "null"
-        ).andOnSecondInstance("myField",
-            "1",
-            "2",
-            "null"
-        ).whenQuery("select myField, arrayagg(myField, 'INT') from testTable 
group by myField")
-        .thenResultIs(new Object[]{1, new int[]{1, 1}}, new Object[]{2, new 
int[]{2, 2}},
-            new Object[]{null, new int[0]});
+        .onFirstInstance("myField", "1", "2", "null")
+        .andOnSecondInstance("myField", "1", "2", "null")
+        .whenQuery("select myField, arrayagg(myField, 'INT') from testTable 
group by myField order by myField")
+        .thenResultIs(
+            new Object[]{1, new int[]{1, 1}},
+            new Object[]{2, new int[]{2, 2}},
+            new Object[]{null, new int[0]}
+        );
   }
 
   @Test
   void aggregationDistinctGroupBySVIntWithNullHandlingDisabled() {
     new DataTypeScenario(FieldSpec.DataType.INT).getDeclaringTable(false)
-        .onFirstInstance("myField",
-            "1",
-            "2",
-            "null"
-        ).andOnSecondInstance("myField",
-            "1",
-            "2",
-            "null"
-        ).whenQuery("select myField, arrayagg(myField, 'INT', true) from 
testTable group by myField")
-        .thenResultIs(new Object[]{1, new int[]{1}}, new Object[]{2, new 
int[]{2}},
-            new Object[]{Integer.MIN_VALUE, new int[]{Integer.MIN_VALUE}});
+        .onFirstInstance("myField", "1", "2", "null")
+        .andOnSecondInstance("myField", "1", "2", "null")
+        .whenQuery("select myField, arrayagg(myField, 'INT', true) from 
testTable group by myField order by myField")
+        .thenResultIs(
+            new Object[]{Integer.MIN_VALUE, new int[]{Integer.MIN_VALUE}},
+            new Object[]{1, new int[]{1}},
+            new Object[]{2, new int[]{2}}
+        );
   }
 
   @Test
   void aggregationDistinctGroupBySVIntWithNullHandlingEnabled() {
     new DataTypeScenario(FieldSpec.DataType.INT).getDeclaringTable(true)
-        .onFirstInstance("myField",
-            "1",
-            "2",
-            "null"
-        ).andOnSecondInstance("myField",
-            "1",
-            "2",
-            "null"
-        ).whenQuery("select myField, arrayagg(myField, 'INT', true) from 
testTable group by myField")
-        .thenResultIs(new Object[]{1, new int[]{1}}, new Object[]{2, new 
int[]{2}},
-            new Object[]{null, new int[0]});
+        .onFirstInstance("myField", "1", "2", "null")
+        .andOnSecondInstance("myField", "1", "2", "null")
+        .whenQuery("select myField, arrayagg(myField, 'INT', true) from 
testTable group by myField order by myField")
+        .thenResultIs(
+            new Object[]{1, new int[]{1}},
+            new Object[]{2, new int[]{2}},
+            new Object[]{null, new int[0]}
+        );
   }
 
   @Test
@@ -257,65 +245,53 @@ public class ArrayAggFunctionTest extends 
AbstractAggregationFunctionTest {
   @Test
   void aggregationGroupBySVLongWithNullHandlingDisabled() {
     new DataTypeScenario(FieldSpec.DataType.LONG).getDeclaringTable(false)
-        .onFirstInstance("myField",
-            "1",
-            "2",
-            "null"
-        ).andOnSecondInstance("myField",
-            "1",
-            "2",
-            "null"
-        ).whenQuery("select myField, arrayagg(myField, 'LONG') from testTable 
group by myField")
-        .thenResultIs(new Object[]{1L, new long[]{1, 1}}, new Object[]{2L, new 
long[]{2, 2}},
-            new Object[]{Long.MIN_VALUE, new long[]{Long.MIN_VALUE, 
Long.MIN_VALUE}});
+        .onFirstInstance("myField", "1", "2", "null")
+        .andOnSecondInstance("myField", "1", "2", "null")
+        .whenQuery("select myField, arrayagg(myField, 'LONG') from testTable 
group by myField order by myField")
+        .thenResultIs(
+            new Object[]{Long.MIN_VALUE, new long[]{Long.MIN_VALUE, 
Long.MIN_VALUE}},
+            new Object[]{1L, new long[]{1, 1}},
+            new Object[]{2L, new long[]{2, 2}}
+        );
   }
 
   @Test
   void aggregationGroupBySVLongWithNullHandlingEnabled() {
     new DataTypeScenario(FieldSpec.DataType.LONG).getDeclaringTable(true)
-        .onFirstInstance("myField",
-            "1",
-            "2",
-            "null"
-        ).andOnSecondInstance("myField",
-            "1",
-            "2",
-            "null"
-        ).whenQuery("select myField, arrayagg(myField, 'LONG') from testTable 
group by myField")
-        .thenResultIs(new Object[]{1L, new long[]{1, 1}}, new Object[]{2L, new 
long[]{2, 2}},
-            new Object[]{null, new long[0]});
+        .onFirstInstance("myField", "1", "2", "null")
+        .andOnSecondInstance("myField", "1", "2", "null")
+        .whenQuery("select myField, arrayagg(myField, 'LONG') from testTable 
group by myField order by myField")
+        .thenResultIs(
+            new Object[]{1L, new long[]{1, 1}},
+            new Object[]{2L, new long[]{2, 2}},
+            new Object[]{null, new long[0]}
+        );
   }
 
   @Test
   void aggregationDistinctGroupBySVLongWithNullHandlingDisabled() {
     new DataTypeScenario(FieldSpec.DataType.LONG).getDeclaringTable(false)
-        .onFirstInstance("myField",
-            "1",
-            "2",
-            "null"
-        ).andOnSecondInstance("myField",
-            "1",
-            "2",
-            "null"
-        ).whenQuery("select myField, arrayagg(myField, 'LONG', true) from 
testTable group by myField")
-        .thenResultIs(new Object[]{1L, new long[]{1}}, new Object[]{2L, new 
long[]{2}},
-            new Object[]{Long.MIN_VALUE, new long[]{Long.MIN_VALUE}});
+        .onFirstInstance("myField", "1", "2", "null")
+        .andOnSecondInstance("myField", "1", "2", "null")
+        .whenQuery("select myField, arrayagg(myField, 'LONG', true) from 
testTable group by myField order by myField")
+        .thenResultIs(
+            new Object[]{Long.MIN_VALUE, new long[]{Long.MIN_VALUE}},
+            new Object[]{1L, new long[]{1}},
+            new Object[]{2L, new long[]{2}}
+        );
   }
 
   @Test
   void aggregationDistinctGroupBySVLongWithNullHandlingEnabled() {
     new DataTypeScenario(FieldSpec.DataType.LONG).getDeclaringTable(true)
-        .onFirstInstance("myField",
-            "1",
-            "2",
-            "null"
-        ).andOnSecondInstance("myField",
-            "1",
-            "2",
-            "null"
-        ).whenQuery("select myField, arrayagg(myField, 'LONG', true) from 
testTable group by myField")
-        .thenResultIs(new Object[]{1L, new long[]{1}}, new Object[]{2L, new 
long[]{2}},
-            new Object[]{null, new long[0]});
+        .onFirstInstance("myField", "1", "2", "null")
+        .andOnSecondInstance("myField", "1", "2", "null")
+        .whenQuery("select myField, arrayagg(myField, 'LONG', true) from 
testTable group by myField order by myField")
+        .thenResultIs(
+            new Object[]{1L, new long[]{1}},
+            new Object[]{2L, new long[]{2}},
+            new Object[]{null, new long[0]}
+        );
   }
 
   @Test
@@ -373,60 +349,51 @@ public class ArrayAggFunctionTest extends 
AbstractAggregationFunctionTest {
   @Test
   void aggregationGroupBySVFloatWithNullHandlingDisabled() {
     new DataTypeScenario(FieldSpec.DataType.FLOAT).getDeclaringTable(false)
-        .onFirstInstance("myField",
-            "null",
-            "1.0",
-            "2.0"
-        ).andOnSecondInstance("myField",
-            "null",
-            "1.0",
-            "2.0"
-        ).whenQuery("select myField, arrayagg(myField, 'FLOAT') from testTable 
group by myField")
-        .thenResultIs(new Object[]{Float.NEGATIVE_INFINITY,
-                new float[]{Float.NEGATIVE_INFINITY, Float.NEGATIVE_INFINITY}},
-            new Object[]{1.0f, new float[]{1.0f, 1.0f}}, new Object[]{2.0f, 
new float[]{2.0f, 2.0f}});
+        .onFirstInstance("myField", "null", "1.0", "2.0")
+        .andOnSecondInstance("myField", "null", "1.0", "2.0")
+        .whenQuery("select myField, arrayagg(myField, 'FLOAT') from testTable 
group by myField order by myField")
+        .thenResultIs(
+            new Object[]{Float.NEGATIVE_INFINITY, new 
float[]{Float.NEGATIVE_INFINITY, Float.NEGATIVE_INFINITY}},
+            new Object[]{1.0f, new float[]{1.0f, 1.0f}},
+            new Object[]{2.0f, new float[]{2.0f, 2.0f}}
+        );
   }
 
   @Test
   void aggregationGroupBySVFloatWithNullHandlingEnabled() {
     new DataTypeScenario(FieldSpec.DataType.FLOAT).getDeclaringTable(true)
-        .onFirstInstance("myField",
-            "null",
-            "1.0"
-        ).andOnSecondInstance("myField",
-            "null",
-            "1.0"
-        ).whenQuery("select myField, arrayagg(myField, 'FLOAT') from testTable 
group by myField")
-        .thenResultIs(new Object[]{null, new float[0]}, new Object[]{1.0f, new 
float[]{1.0f, 1.0f}});
+        .onFirstInstance("myField", "null", "1.0")
+        .andOnSecondInstance("myField", "null", "1.0")
+        .whenQuery("select myField, arrayagg(myField, 'FLOAT') from testTable 
group by myField order by myField")
+        .thenResultIs(
+            new Object[]{1.0f, new float[]{1.0f, 1.0f}},
+            new Object[]{null, new float[0]}
+        );
   }
 
   @Test
   void aggregationDistinctGroupBySVFloatWithNullHandlingDisabled() {
     new DataTypeScenario(FieldSpec.DataType.FLOAT).getDeclaringTable(false)
-        .onFirstInstance("myField",
-            "null",
-            "1.0",
-            "2.0"
-        ).andOnSecondInstance("myField",
-            "null",
-            "1.0",
-            "2.0"
-        ).whenQuery("select myField, arrayagg(myField, 'FLOAT', true) from 
testTable group by myField")
-        .thenResultIs(new Object[]{Float.NEGATIVE_INFINITY, new 
float[]{Float.NEGATIVE_INFINITY}},
-            new Object[]{1.0f, new float[]{1.0f}}, new Object[]{2.0f, new 
float[]{2.0f}});
+        .onFirstInstance("myField", "null", "1.0", "2.0")
+        .andOnSecondInstance("myField", "null", "1.0", "2.0")
+        .whenQuery("select myField, arrayagg(myField, 'FLOAT', true) from 
testTable group by myField order by myField")
+        .thenResultIs(
+            new Object[]{Float.NEGATIVE_INFINITY, new 
float[]{Float.NEGATIVE_INFINITY}},
+            new Object[]{1.0f, new float[]{1.0f}},
+            new Object[]{2.0f, new float[]{2.0f}}
+        );
   }
 
   @Test
   void aggregationDistinctGroupBySVFloatWithNullHandlingEnabled() {
     new DataTypeScenario(FieldSpec.DataType.FLOAT).getDeclaringTable(true)
-        .onFirstInstance("myField",
-            "null",
-            "1.0"
-        ).andOnSecondInstance("myField",
-            "null",
-            "1.0"
-        ).whenQuery("select myField, arrayagg(myField, 'FLOAT', true) from 
testTable group by myField")
-        .thenResultIs(new Object[]{null, new float[0]}, new Object[]{1.0f, new 
float[]{1.0f}});
+        .onFirstInstance("myField", "null", "1.0")
+        .andOnSecondInstance("myField", "null", "1.0")
+        .whenQuery("select myField, arrayagg(myField, 'FLOAT', true) from 
testTable group by myField order by myField")
+        .thenResultIs(
+            new Object[]{1.0f, new float[]{1.0f}},
+            new Object[]{null, new float[0]}
+        );
   }
 
   @Test
@@ -484,66 +451,53 @@ public class ArrayAggFunctionTest extends 
AbstractAggregationFunctionTest {
   @Test
   void aggregationGroupBySVDoubleWithNullHandlingDisabled() {
     new DataTypeScenario(FieldSpec.DataType.DOUBLE).getDeclaringTable(false)
-        .onFirstInstance("myField",
-            "null",
-            "1.0",
-            "2.0"
-        ).andOnSecondInstance("myField",
-            "null",
-            "1.0",
-            "2.0"
-        ).whenQuery("select myField, arrayagg(myField, 'DOUBLE') from 
testTable group by myField")
-        .thenResultIs(new Object[]{Double.NEGATIVE_INFINITY, new 
double[]{Double.NEGATIVE_INFINITY,
-                Double.NEGATIVE_INFINITY}}, new Object[]{1.0, new 
double[]{1.0, 1.0}},
-            new Object[]{2.0, new double[]{2.0, 2.0}});
+        .onFirstInstance("myField", "null", "1.0", "2.0")
+        .andOnSecondInstance("myField", "null", "1.0", "2.0")
+        .whenQuery("select myField, arrayagg(myField, 'DOUBLE') from testTable 
group by myField order by myField")
+        .thenResultIs(
+            new Object[]{Double.NEGATIVE_INFINITY, new 
double[]{Double.NEGATIVE_INFINITY, Double.NEGATIVE_INFINITY}},
+            new Object[]{1.0, new double[]{1.0, 1.0}},
+            new Object[]{2.0, new double[]{2.0, 2.0}}
+        );
   }
 
   @Test
   void aggregationGroupBySVDoubleWithNullHandlingEnabled() {
     new DataTypeScenario(FieldSpec.DataType.DOUBLE).getDeclaringTable(true)
-        .onFirstInstance("myField",
-            "null",
-            "1.0",
-            "2.0"
-        ).andOnSecondInstance("myField",
-            "null",
-            "1.0",
-            "2.0"
-        ).whenQuery("select myField, arrayagg(myField, 'DOUBLE') from 
testTable group by myField")
-        .thenResultIs(new Object[]{null, new double[0]}, new Object[]{1.0, new 
double[]{1.0, 1.0}},
-            new Object[]{2.0, new double[]{2.0, 2.0}});
+        .onFirstInstance("myField", "null", "1.0", "2.0")
+        .andOnSecondInstance("myField", "null", "1.0", "2.0")
+        .whenQuery("select myField, arrayagg(myField, 'DOUBLE') from testTable 
group by myField order by myField")
+        .thenResultIs(
+            new Object[]{1.0, new double[]{1.0, 1.0}},
+            new Object[]{2.0, new double[]{2.0, 2.0}},
+            new Object[]{null, new double[0]}
+        );
   }
 
   @Test
   void aggregationDistinctGroupBySVDoubleWithNullHandlingDisabled() {
     new DataTypeScenario(FieldSpec.DataType.DOUBLE).getDeclaringTable(false)
-        .onFirstInstance("myField",
-            "null",
-            "1.0",
-            "2.0"
-        ).andOnSecondInstance("myField",
-            "null",
-            "1.0",
-            "2.0"
-        ).whenQuery("select myField, arrayagg(myField, 'DOUBLE', true) from 
testTable group by myField")
-        .thenResultIs(new Object[]{Double.NEGATIVE_INFINITY, new 
double[]{Double.NEGATIVE_INFINITY}},
-            new Object[]{1.0, new double[]{1.0}}, new Object[]{2.0, new 
double[]{2.0}});
+        .onFirstInstance("myField", "null", "1.0", "2.0")
+        .andOnSecondInstance("myField", "null", "1.0", "2.0")
+        .whenQuery("select myField, arrayagg(myField, 'DOUBLE', true) from 
testTable group by myField order by myField")
+        .thenResultIs(
+            new Object[]{Double.NEGATIVE_INFINITY, new 
double[]{Double.NEGATIVE_INFINITY}},
+            new Object[]{1.0, new double[]{1.0}},
+            new Object[]{2.0, new double[]{2.0}}
+        );
   }
 
   @Test
   void aggregationDistinctGroupBySVDoubleWithNullHandlingEnabled() {
     new DataTypeScenario(FieldSpec.DataType.DOUBLE).getDeclaringTable(true)
-        .onFirstInstance("myField",
-            "null",
-            "1.0",
-            "2.0"
-        ).andOnSecondInstance("myField",
-            "null",
-            "1.0",
-            "2.0"
-        ).whenQuery("select myField, arrayagg(myField, 'DOUBLE', true) from 
testTable group by myField")
-        .thenResultIs(new Object[]{null, new double[0]}, new Object[]{1.0, new 
double[]{1.0}},
-            new Object[]{2.0, new double[]{2.0}});
+        .onFirstInstance("myField", "null", "1.0", "2.0")
+        .andOnSecondInstance("myField", "null", "1.0", "2.0")
+        .whenQuery("select myField, arrayagg(myField, 'DOUBLE', true) from 
testTable group by myField order by myField")
+        .thenResultIs(
+            new Object[]{1.0, new double[]{1.0}},
+            new Object[]{2.0, new double[]{2.0}},
+            new Object[]{null, new double[0]}
+        );
   }
 
   @Test
@@ -600,65 +554,53 @@ public class ArrayAggFunctionTest extends 
AbstractAggregationFunctionTest {
   @Test
   void aggregationGroupBySVStringWithNullHandlingDisabled() {
     new DataTypeScenario(FieldSpec.DataType.STRING).getDeclaringTable(false)
-        .onFirstInstance("myField",
-            "a",
-            "b",
-            "null"
-        ).andOnSecondInstance("myField",
-            "a",
-            "b",
-            "null"
-        ).whenQuery("select myField, arrayagg(myField, 'STRING') from 
testTable group by myField")
-        .thenResultIs(new Object[]{"a", new String[]{"a", "a"}}, new 
Object[]{"b", new String[]{"b", "b"}},
-            new Object[]{"null", new String[]{"null", "null"}});
+        .onFirstInstance("myField", "a", "b", "null")
+        .andOnSecondInstance("myField", "a", "b", "null")
+        .whenQuery("select myField, arrayagg(myField, 'STRING') from testTable 
group by myField order by myField")
+        .thenResultIs(
+            new Object[]{"a", new String[]{"a", "a"}},
+            new Object[]{"b", new String[]{"b", "b"}},
+            new Object[]{"null", new String[]{"null", "null"}}
+        );
   }
 
   @Test
   void aggregationGroupBySVStringWithNullHandlingEnabled() {
     new DataTypeScenario(FieldSpec.DataType.STRING).getDeclaringTable(true)
-        .onFirstInstance("myField",
-            "a",
-            "b",
-            "null"
-        ).andOnSecondInstance("myField",
-            "a",
-            "b",
-            "null"
-        ).whenQuery("select myField, arrayagg(myField, 'STRING') from 
testTable group by myField")
-        .thenResultIs(new Object[]{"a", new String[]{"a", "a"}}, new 
Object[]{"b", new String[]{"b", "b"}},
-            new Object[]{null, new String[0]});
+        .onFirstInstance("myField", "a", "b", "null")
+        .andOnSecondInstance("myField", "a", "b", "null")
+        .whenQuery("select myField, arrayagg(myField, 'STRING') from testTable 
group by myField order by myField")
+        .thenResultIs(
+            new Object[]{"a", new String[]{"a", "a"}},
+            new Object[]{"b", new String[]{"b", "b"}},
+            new Object[]{null, new String[0]}
+        );
   }
 
   @Test
   void aggregationDistinctGroupBySVStringWithNullHandlingDisabled() {
     new DataTypeScenario(FieldSpec.DataType.STRING).getDeclaringTable(false)
-        .onFirstInstance("myField",
-            "a",
-            "b",
-            "null"
-        ).andOnSecondInstance("myField",
-            "a",
-            "b",
-            "null"
-        ).whenQuery("select myField, arrayagg(myField, 'STRING', true) from 
testTable group by myField")
-        .thenResultIs(new Object[]{"a", new String[]{"a"}}, new Object[]{"b", 
new String[]{"b"}},
-            new Object[]{"null", new String[]{"null"}});
+        .onFirstInstance("myField", "a", "b", "null")
+        .andOnSecondInstance("myField", "a", "b", "null")
+        .whenQuery("select myField, arrayagg(myField, 'STRING', true) from 
testTable group by myField order by myField")
+        .thenResultIs(
+            new Object[]{"a", new String[]{"a"}},
+            new Object[]{"b", new String[]{"b"}},
+            new Object[]{"null", new String[]{"null"}}
+        );
   }
 
   @Test
   void aggregationDistinctGroupBySVStringWithNullHandlingEnabled() {
     new DataTypeScenario(FieldSpec.DataType.STRING).getDeclaringTable(true)
-        .onFirstInstance("myField",
-            "a",
-            "b",
-            "null"
-        ).andOnSecondInstance("myField",
-            "a",
-            "b",
-            "null"
-        ).whenQuery("select myField, arrayagg(myField, 'STRING', true) from 
testTable group by myField")
-        .thenResultIs(new Object[]{"a", new String[]{"a"}}, new Object[]{"b", 
new String[]{"b"}},
-            new Object[]{null, new String[0]});
+        .onFirstInstance("myField", "a", "b", "null")
+        .andOnSecondInstance("myField", "a", "b", "null")
+        .whenQuery("select myField, arrayagg(myField, 'STRING', true) from 
testTable group by myField order by myField")
+        .thenResultIs(
+            new Object[]{"a", new String[]{"a"}},
+            new Object[]{"b", new String[]{"b"}},
+            new Object[]{null, new String[0]}
+        );
   }
 
   @Test
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/WithOptionQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/WithOptionQueriesTest.java
deleted file mode 100644
index 654a0add08..0000000000
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/WithOptionQueriesTest.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pinot.queries;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import org.apache.commons.io.FileUtils;
-import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
-import org.apache.pinot.segment.spi.ImmutableSegment;
-import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
-import org.apache.pinot.spi.utils.ReadMode;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import static 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.MAX_QUERY_RESPONSE_SIZE_BYTES;
-import static 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES;
-import static 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS;
-
-
-public class WithOptionQueriesTest extends BaseQueriesTest {
-
-  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"WithOptionQueriesTest");
-  private static final String RAW_TABLE_NAME = "testTable";
-  private static final String SEGMENT_NAME = "testSegment";
-
-  private static final int NUM_RECORDS = 10;
-  private static final String X_COL = "x";
-  private static final String Y_COL = "y";
-
-  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().addSingleValueDimension(X_COL, FieldSpec.DataType.INT)
-      .addSingleValueDimension(Y_COL, FieldSpec.DataType.DOUBLE).build();
-
-  private static final TableConfig TABLE_CONFIG =
-      new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
-
-  private IndexSegment _indexSegment;
-  private List<IndexSegment> _indexSegments;
-
-  @Override
-  protected String getFilter() {
-    return "";
-  }
-
-  @Override
-  protected IndexSegment getIndexSegment() {
-    return _indexSegment;
-  }
-
-  @Override
-  protected List<IndexSegment> getIndexSegments() {
-    return _indexSegments;
-  }
-
-  private final List<Object[]> _allRecords = new ArrayList<>();
-
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    FileUtils.deleteQuietly(INDEX_DIR);
-
-    List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
-    for (int i = 0; i < NUM_RECORDS; i++) {
-      GenericRow record = new GenericRow();
-      record.putValue(X_COL, i);
-      record.putValue(Y_COL, 0.25);
-      records.add(record);
-      _allRecords.add(new Object[]{i, 0.25});
-    }
-
-    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
-    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
-    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
-    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
-
-    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
-    driver.build();
-
-    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
-    _indexSegment = immutableSegment;
-    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
-  }
-
-  @Test
-  public void testOptionParsingFailure() {
-    HashMap<String, String> options = new HashMap<>();
-
-    // int values
-    for (String setting : Arrays.asList(QueryOptionKey.NUM_GROUPS_LIMIT,
-        QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY, 
QueryOptionKey.GROUP_TRIM_THRESHOLD,
-        QueryOptionKey.MAX_EXECUTION_THREADS, 
QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE,
-        QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE, 
QueryOptionKey.MIN_BROKER_GROUP_TRIM_SIZE)) {
-
-      options.clear();
-      for (String value : new String[]{"-10000000000", "-2147483648", "-1", 
"2147483648", "10000000000"}) {
-        options.put(setting, value);
-
-        IllegalArgumentException exception = 
Assert.expectThrows(IllegalArgumentException.class, () -> {
-          getBrokerResponse("SELECT x, count(*) FROM " + RAW_TABLE_NAME + " 
GROUP BY x", options);
-        });
-        Assert.assertEquals(setting + " must be a number between 0 and 2^31-1, 
got: " + value, exception.getMessage());
-      }
-    }
-  }
-
-  @Test
-  public void testOptionParsingSuccess() {
-    HashMap<String, String> options = new HashMap<>();
-    List<Object> groupRows = new ArrayList();
-    groupRows.add(new Object[]{0d, 40L}); //four times 10 records because 
segment gets multiplied under the hood
-
-    // int values
-    for (String setting : Arrays.asList(QueryOptionKey.NUM_GROUPS_LIMIT,
-        QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY, 
QueryOptionKey.MULTI_STAGE_LEAF_LIMIT,
-        QueryOptionKey.GROUP_TRIM_THRESHOLD, 
QueryOptionKey.MAX_STREAMING_PENDING_BLOCKS,
-        QueryOptionKey.MAX_ROWS_IN_JOIN, QueryOptionKey.MAX_EXECUTION_THREADS,
-        QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE, 
QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE,
-        QueryOptionKey.MIN_BROKER_GROUP_TRIM_SIZE, 
QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY)) {
-
-      options.clear();
-      for (String value : new String[]{"0", "1", "10000", "2147483647"}) {
-
-        options.put(setting, value);
-        List<Object[]> rows =
-            getBrokerResponse("SELECT mod(x,1), count(*) FROM " + 
RAW_TABLE_NAME + " GROUP BY mod(x,1)",
-                options).getResultTable().getRows();
-        if (QueryOptionKey.NUM_GROUPS_LIMIT == setting && "0".equals(value)) {
-          Assert.assertEquals(0, rows.size());
-        } else {
-          assertEquals(rows, groupRows);
-        }
-      }
-    }
-
-    //long values
-    for (String setting : Arrays.asList(TIMEOUT_MS, 
MAX_SERVER_RESPONSE_SIZE_BYTES, MAX_QUERY_RESPONSE_SIZE_BYTES)) {
-      options.clear();
-      for (String value : new String[]{"1", "10000", "9223372036854775807"}) {
-        options.put(setting, value);
-        List<Object[]> rows = getBrokerResponse("SELECT * FROM " + 
RAW_TABLE_NAME, options).getResultTable().getRows();
-        assertEquals(rows, _allRecords);
-      }
-    }
-  }
-
-  private void assertEquals(List actual, List expected) {
-    if (actual == expected) {
-      return;
-    }
-
-    if (actual == null || expected == null || actual.size() != 
expected.size()) {
-      Assert.fail("Expected \n" + expected + "\n but got \n" + actual);
-    }
-
-    for (int i = 0; i < actual.size(); i++) {
-      Assert.assertEquals(actual.get(i), expected.get(i));
-    }
-  }
-}
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
index 3e946d5eab..b70b39a8f5 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
@@ -39,7 +39,6 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
-import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.Assert;
@@ -48,6 +47,8 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import static 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.*;
+
 
 /**
  * all special tests that doesn't fit into {@link 
org.apache.pinot.query.runtime.queries.ResourceBasedQueriesTest}
@@ -69,7 +70,8 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
     SCHEMA_BUILDER = new 
Schema.SchemaBuilder().addSingleValueDimension("col1", 
FieldSpec.DataType.STRING, "")
         .addSingleValueDimension("col2", FieldSpec.DataType.STRING, "")
         .addDateTime("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", 
"1:HOURS")
-        .addMetric("col3", FieldSpec.DataType.INT, 
0).setSchemaName("defaultSchemaName")
+        .addMetric("col3", FieldSpec.DataType.INT, 0)
+        .setSchemaName("defaultSchemaName")
         .setEnableColumnBasedNullHandling(true);
   }
 
@@ -293,69 +295,59 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
 
   @DataProvider(name = "testDataWithSqlExecutionExceptions")
   protected Iterator<Object[]> provideTestSqlWithExecutionException() {
-    //@formatter:off
-    List<Object[]> testCases = new ArrayList();
-    testCases.addAll(
-      Arrays.asList(
-        // Missing index
-        new Object[]{"SELECT col1 FROM a WHERE textMatch(col1, 'f') LIMIT 10", 
"without text index"},
-
-        // Query hint with dynamic broadcast pipeline breaker should return 
error upstream
-        new Object[]{
-            "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ col1 
FROM a WHERE a.col1 IN "
-                + "(SELECT b.col2 FROM b WHERE textMatch(col1, 'f')) AND 
a.col3 > 0",
-            "without text index"
-        },
-
-        // Timeout exception should occur with this option:
-        new Object[]{"SET timeoutMs = 1; SELECT * FROM a JOIN b ON a.col1 = 
b.col1 JOIN c ON a.col1 = c.col1",
-            "Timeout"},
-
-        // Function with incorrect argument signature should throw runtime 
exception when casting string to numeric
-        new Object[]{"SELECT least(a.col2, b.col3) FROM a JOIN b ON a.col1 = 
b.col1", "For input string:"},
-
-        // Scalar function that doesn't have a valid use should throw an 
exception on the leaf stage
-        //   - predicate only functions:
-        new Object[]{"SELECT * FROM a WHERE textMatch(col1, 'f')", "without 
text index"},
-        new Object[]{"SELECT * FROM a WHERE text_match(col1, 'f')", "without 
text index"},
-        new Object[]{"SELECT * FROM a WHERE textContains(col1, 'f')", 
"supported only on native text index"},
-        new Object[]{"SELECT * FROM a WHERE text_contains(col1, 'f')", 
"supported only on native text index"},
-
-        //  - transform only functions
-        new Object[]{"SELECT jsonExtractKey(col1, 'path') FROM a", "was 
expecting (JSON String"},
-        new Object[]{"SELECT json_extract_key(col1, 'path') FROM a", "was 
expecting (JSON String"},
-
-        //  - PlaceholderScalarFunction registered will throw on intermediate 
stage, but works on leaf stage.
-        //    - checked "Illegal Json Path" as col1 is not actually a json 
string, but the call is correctly triggered.
-        new Object[]{"SELECT CAST(jsonExtractScalar(col1, 'path', 'INT') AS 
INT) FROM a", "Cannot resolve JSON path"},
-        //    - checked function cannot be found b/c there's no intermediate 
stage impl for json_extract_scalar
-        new Object[]{
-            "SELECT CAST(json_extract_scalar(a.col1, b.col2, 'INT') AS INT) 
FROM a JOIN b ON a.col1 = b.col1",
-            "Unsupported function: JSONEXTRACTSCALAR"
-        }));
-    //@formatter:on
-
-    // int values
-    for (String setting : Arrays.asList(QueryOptionKey.NUM_GROUPS_LIMIT,
-        QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY, 
QueryOptionKey.MULTI_STAGE_LEAF_LIMIT,
-        QueryOptionKey.GROUP_TRIM_THRESHOLD, 
QueryOptionKey.MAX_STREAMING_PENDING_BLOCKS,
-        QueryOptionKey.MAX_ROWS_IN_JOIN, QueryOptionKey.MAX_EXECUTION_THREADS,
-        QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE, 
QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE)) {
-
-      for (String val : new String[]{"-10000000000", "-2147483648", "-1", 
"2147483648", "10000000000"}) {
-        testCases.add(new Object[]{
-            "set " + setting + " = " + val + "; SELECT col1, count(*) FROM a 
GROUP BY col1",
-            setting + " must be a number between 0 and 2^31-1, got: " + val
-        });
-      }
-    }
-
-    // int values; triggered for query with window clause
-    for (String setting : Arrays.asList(QueryOptionKey.MAX_ROWS_IN_WINDOW)) {
-      for (String val : new String[]{"-10000000000", "-2147483648", "-1", 
"2147483648", "10000000000"}) {
+    List<Object[]> testCases = new ArrayList<>();
+    // Missing index
+    testCases.add(new Object[]{"SELECT col1 FROM a WHERE textMatch(col1, 'f') 
LIMIT 10", "without text index"});
+
+    // Query hint with dynamic broadcast pipeline breaker should return error 
upstream
+    testCases.add(new Object[]{
+        "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ col1 
FROM a WHERE a.col1 IN "
+            + "(SELECT b.col2 FROM b WHERE textMatch(col1, 'f')) AND a.col3 > 
0",
+        "without text index"
+    });
+
+    // Timeout exception should occur with this option:
+    testCases.add(new Object[]{
+        "SET timeoutMs = 1; SELECT * FROM a JOIN b ON a.col1 = b.col1 JOIN c 
ON a.col1 = c.col1",
+        "Timeout"
+    });
+
+    // Function with incorrect argument signature should throw runtime 
exception when casting string to numeric
+    testCases.add(new Object[]{"SELECT least(a.col2, b.col3) FROM a JOIN b ON 
a.col1 = b.col1", "For input string:"});
+
+    // Scalar function that doesn't have a valid use should throw an exception 
on the leaf stage
+    //   - predicate only functions:
+    testCases.add(new Object[]{"SELECT * FROM a WHERE textMatch(col1, 'f')", 
"without text index"});
+    testCases.add(new Object[]{"SELECT * FROM a WHERE text_match(col1, 'f')", 
"without text index"});
+    testCases.add(new Object[]{"SELECT * FROM a WHERE textContains(col1, 
'f')", "supported only on native text index"});
+    testCases.add(new Object[]{
+        "SELECT * FROM a WHERE text_contains(col1, 'f')",
+        "supported only on native text index"}
+    );
+
+    //  - transform only functions
+    testCases.add(new Object[]{"SELECT jsonExtractKey(col1, 'path') FROM a", 
"was expecting (JSON String"});
+    testCases.add(new Object[]{"SELECT json_extract_key(col1, 'path') FROM a", 
"was expecting (JSON String"});
+
+    //  - PlaceholderScalarFunction registered will throw on intermediate 
stage, but works on leaf stage.
+    //    - checked "Illegal Json Path" as col1 is not actually a json string, 
but the call is correctly triggered.
+    testCases.add(
+        new Object[]{"SELECT CAST(jsonExtractScalar(col1, 'path', 'INT') AS 
INT) FROM a", "Cannot resolve JSON path"});
+    //    - checked function cannot be found b/c there's no intermediate stage 
impl for json_extract_scalar
+    testCases.add(new Object[]{
+        "SELECT CAST(json_extract_scalar(a.col1, b.col2, 'INT') AS INT) FROM a 
JOIN b ON a.col1 = b.col1",
+        "Unsupported function: JSONEXTRACTSCALAR"
+    });
+
+    // Positive int keys (only included ones that will be parsed for this 
query)
+    for (String key : new String[]{
+        MAX_EXECUTION_THREADS, NUM_GROUPS_LIMIT, 
MAX_INITIAL_RESULT_HOLDER_CAPACITY, MAX_STREAMING_PENDING_BLOCKS,
+        MAX_ROWS_IN_JOIN
+    }) {
+      for (String value : new String[]{"-10000000000", "-2147483648", "-1", 
"0", "2147483648", "10000000000"}) {
         testCases.add(new Object[]{
-            "set " + setting + " = " + val + "; SELECT ROW_NUMBER() over 
(PARTITION BY col1) FROM a",
-            setting + " must be a number between 0 and 2^31-1, got: " + val
+            "set " + key + " = " + value + "; SELECT col1, count(*) FROM a 
GROUP BY col1",
+            key + " must be a number between 1 and 2^31-1, got: " + value
         });
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to