This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new e8dcba123a Fix query option validation for group-by queries (#14618) e8dcba123a is described below commit e8dcba123aad95ba2848f88a4b8c7b4db2e1ff26 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Sun Dec 8 17:35:26 2024 -0800 Fix query option validation for group-by queries (#14618) --- .../BaseSingleStageBrokerRequestHandler.java | 16 +- .../common/utils/config/QueryOptionsUtils.java | 160 +++++---- .../common/utils/config/QueryOptionsUtilsTest.java | 137 +++++--- .../operator/combine/GroupByCombineOperator.java | 26 +- .../streaming/StreamingGroupByCombineOperator.java | 23 +- .../core/plan/maker/InstancePlanMakerImplV2.java | 2 +- .../core/query/reduce/GroupByDataTableReducer.java | 16 +- .../aggregation/function/ArrayAggFunctionTest.java | 374 +++++++++------------ .../pinot/queries/WithOptionQueriesTest.java | 190 ----------- .../query/runtime/queries/QueryRunnerTest.java | 120 +++---- 10 files changed, 438 insertions(+), 626 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index ed6c58ad0f..1364919592 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -1847,21 +1847,7 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ throw new IllegalStateException( "Value for 'LIMIT' (" + limit + ") exceeds maximum allowed value of " + queryResponseLimit); } - - Map<String, String> queryOptions = pinotQuery.getQueryOptions(); - try { - // throw errors if options is less than 1 or invalid - Integer numReplicaGroupsToQuery = QueryOptionsUtils.getNumReplicaGroupsToQuery(queryOptions); - if (numReplicaGroupsToQuery != null) { - Preconditions.checkState(numReplicaGroupsToQuery > 0, "numReplicaGroups must be " + "positive number, got: %d", - numReplicaGroupsToQuery); - } - } catch (NumberFormatException e) { - String numReplicaGroupsToQuery = queryOptions.get(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY); - throw new IllegalStateException( - String.format("numReplicaGroups must be a positive number, got: %s", numReplicaGroupsToQuery)); - } - + QueryOptionsUtils.getNumReplicaGroupsToQuery(pinotQuery.getQueryOptions()); if (pinotQuery.getDataSource().getSubquery() != null) { validateRequest(pinotQuery.getDataSource().getSubquery(), queryResponseLimit); } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index bcc82efbf5..1ac9e6fab8 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -98,19 +98,19 @@ public class QueryOptionsUtils { @Nullable public static Long getTimeoutMs(Map<String, String> queryOptions) { String timeoutMsString = queryOptions.get(QueryOptionKey.TIMEOUT_MS); - return checkedParseLong(QueryOptionKey.TIMEOUT_MS, timeoutMsString, 1); + return checkedParseLongPositive(QueryOptionKey.TIMEOUT_MS, timeoutMsString); } @Nullable public static Long getMaxServerResponseSizeBytes(Map<String, String> queryOptions) { String responseSize = queryOptions.get(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES); - return checkedParseLong(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, responseSize, 1); + return checkedParseLongPositive(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES, responseSize); } @Nullable public static Long getMaxQueryResponseSizeBytes(Map<String, String> queryOptions) { String responseSize = queryOptions.get(QueryOptionKey.MAX_QUERY_RESPONSE_SIZE_BYTES); - return checkedParseLong(QueryOptionKey.MAX_QUERY_RESPONSE_SIZE_BYTES, responseSize, 1); + return checkedParseLongPositive(QueryOptionKey.MAX_QUERY_RESPONSE_SIZE_BYTES, responseSize); } public static boolean isAndScanReorderingEnabled(Map<String, String> queryOptions) { @@ -179,7 +179,7 @@ public class QueryOptionsUtils { @Nullable public static Integer getNumReplicaGroupsToQuery(Map<String, String> queryOptions) { String numReplicaGroupsToQuery = queryOptions.get(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY); - return checkedParseInt(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY, numReplicaGroupsToQuery); + return checkedParseIntPositive(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY, numReplicaGroupsToQuery); } public static boolean isExplainPlanVerbose(Map<String, String> queryOptions) { @@ -201,25 +201,35 @@ public class QueryOptionsUtils { @Nullable public static Integer getMaxExecutionThreads(Map<String, String> queryOptions) { String maxExecutionThreadsString = queryOptions.get(QueryOptionKey.MAX_EXECUTION_THREADS); - return checkedParseInt(QueryOptionKey.MAX_EXECUTION_THREADS, maxExecutionThreadsString); + return checkedParseIntPositive(QueryOptionKey.MAX_EXECUTION_THREADS, maxExecutionThreadsString); } @Nullable public static Integer getMinSegmentGroupTrimSize(Map<String, String> queryOptions) { String minSegmentGroupTrimSizeString = queryOptions.get(QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE); - return checkedParseInt(QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE, minSegmentGroupTrimSizeString); + // NOTE: Non-positive value means turning off the segment level trim + return uncheckedParseInt(QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE, minSegmentGroupTrimSizeString); } @Nullable public static Integer getMinServerGroupTrimSize(Map<String, String> queryOptions) { String minServerGroupTrimSizeString = queryOptions.get(QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE); - return checkedParseInt(QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE, minServerGroupTrimSizeString); + // NOTE: Non-positive value means turning off the segment level trim + return uncheckedParseInt(QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE, minServerGroupTrimSizeString); } @Nullable public static Integer getMinBrokerGroupTrimSize(Map<String, String> queryOptions) { String minBrokerGroupTrimSizeString = queryOptions.get(QueryOptionKey.MIN_BROKER_GROUP_TRIM_SIZE); - return checkedParseInt(QueryOptionKey.MIN_BROKER_GROUP_TRIM_SIZE, minBrokerGroupTrimSizeString); + // NOTE: Non-positive value means turning off the broker level trim + return uncheckedParseInt(QueryOptionKey.MIN_BROKER_GROUP_TRIM_SIZE, minBrokerGroupTrimSizeString); + } + + @Nullable + public static Integer getGroupTrimThreshold(Map<String, String> queryOptions) { + String groupByTrimThreshold = queryOptions.get(QueryOptionKey.GROUP_TRIM_THRESHOLD); + // NOTE: Non-positive value means turning off the on-the-fly trim before all groups are added + return uncheckedParseInt(QueryOptionKey.GROUP_TRIM_THRESHOLD, groupByTrimThreshold); } public static boolean isNullHandlingEnabled(Map<String, String> queryOptions) { @@ -246,73 +256,25 @@ public class QueryOptionsUtils { @Nullable public static Integer getMultiStageLeafLimit(Map<String, String> queryOptions) { String maxLeafLimitStr = queryOptions.get(QueryOptionKey.MULTI_STAGE_LEAF_LIMIT); - return checkedParseInt(QueryOptionKey.MULTI_STAGE_LEAF_LIMIT, maxLeafLimitStr); + return checkedParseIntNonNegative(QueryOptionKey.MULTI_STAGE_LEAF_LIMIT, maxLeafLimitStr); } @Nullable public static Integer getNumGroupsLimit(Map<String, String> queryOptions) { String maxNumGroupLimit = queryOptions.get(QueryOptionKey.NUM_GROUPS_LIMIT); - return checkedParseInt(QueryOptionKey.NUM_GROUPS_LIMIT, maxNumGroupLimit); + return checkedParseIntPositive(QueryOptionKey.NUM_GROUPS_LIMIT, maxNumGroupLimit); } @Nullable public static Integer getMaxInitialResultHolderCapacity(Map<String, String> queryOptions) { - String maxInitResultCap = queryOptions.get(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY); - return checkedParseInt(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY, maxInitResultCap); + String maxInitialResultHolderCapacity = queryOptions.get(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY); + return checkedParseIntPositive(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY, maxInitialResultHolderCapacity); } public static boolean optimizeMaxInitialResultHolderCapacityEnabled(Map<String, String> queryOptions) { return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.OPTIMIZE_MAX_INITIAL_RESULT_HOLDER_CAPACITY)); } - @Nullable - public static Integer getGroupTrimThreshold(Map<String, String> queryOptions) { - String groupByTrimThreshold = queryOptions.get(QueryOptionKey.GROUP_TRIM_THRESHOLD); - return checkedParseInt(QueryOptionKey.GROUP_TRIM_THRESHOLD, groupByTrimThreshold); - } - - private static Long checkedParseLong(String optionName, String optionValue, int minValue) { - try { - if (optionValue != null) { - Long value = Long.parseLong(optionValue); - if (value < minValue) { - throw longParseException(optionName, optionValue, minValue); - } - return value; - } else { - return null; - } - } catch (NumberFormatException nfe) { - throw longParseException(optionName, optionValue, minValue); - } - } - - private static IllegalArgumentException longParseException(String optionName, String optionValue, int minValue) { - return new IllegalArgumentException( - String.format("%s must be a number between %d and 2^63-1, got: %s", optionName, minValue, optionValue)); - } - - private static Integer checkedParseInt(String optionName, String optionValue) { - try { - if (optionValue != null) { - int value = Integer.parseInt(optionValue); - if (value < 0) { - throw intParseException(optionName, optionValue); - } - return value; - } else { - return null; - } - } catch (NumberFormatException nfe) { - throw intParseException(optionName, optionValue); - } - } - - private static IllegalArgumentException intParseException(String optionName, String optionValue) { - return new IllegalArgumentException( - String.format("%s must be a number between 0 and 2^31-1, got: %s", optionName, optionValue)); - } - public static boolean shouldDropResults(Map<String, String> queryOptions) { return Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.DROP_RESULTS)); } @@ -320,13 +282,13 @@ public class QueryOptionsUtils { @Nullable public static Integer getMaxStreamingPendingBlocks(Map<String, String> queryOptions) { String maxStreamingPendingBlocks = queryOptions.get(QueryOptionKey.MAX_STREAMING_PENDING_BLOCKS); - return checkedParseInt(QueryOptionKey.MAX_STREAMING_PENDING_BLOCKS, maxStreamingPendingBlocks); + return checkedParseIntPositive(QueryOptionKey.MAX_STREAMING_PENDING_BLOCKS, maxStreamingPendingBlocks); } @Nullable public static Integer getMaxRowsInJoin(Map<String, String> queryOptions) { String maxRowsInJoin = queryOptions.get(QueryOptionKey.MAX_ROWS_IN_JOIN); - return checkedParseInt(QueryOptionKey.MAX_ROWS_IN_JOIN, maxRowsInJoin); + return checkedParseIntPositive(QueryOptionKey.MAX_ROWS_IN_JOIN, maxRowsInJoin); } @Nullable @@ -338,7 +300,7 @@ public class QueryOptionsUtils { @Nullable public static Integer getMaxRowsInWindow(Map<String, String> queryOptions) { String maxRowsInWindow = queryOptions.get(QueryOptionKey.MAX_ROWS_IN_WINDOW); - return checkedParseInt(QueryOptionKey.MAX_ROWS_IN_WINDOW, maxRowsInWindow); + return checkedParseIntPositive(QueryOptionKey.MAX_ROWS_IN_WINDOW, maxRowsInWindow); } @Nullable @@ -354,4 +316,76 @@ public class QueryOptionsUtils { public static boolean isSecondaryWorkload(Map<String, String> queryOptions) { return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.IS_SECONDARY_WORKLOAD)); } + + @Nullable + private static Integer uncheckedParseInt(String optionName, @Nullable String optionValue) { + if (optionValue == null) { + return null; + } + try { + return Integer.parseInt(optionValue); + } catch (NumberFormatException nfe) { + throw new IllegalArgumentException(String.format("%s must be an integer, got: %s", optionName, optionValue)); + } + } + + @Nullable + private static Integer checkedParseIntPositive(String optionName, @Nullable String optionValue) { + return checkedParseInt(optionName, optionValue, 1); + } + + @Nullable + private static Integer checkedParseIntNonNegative(String optionName, @Nullable String optionValue) { + return checkedParseInt(optionName, optionValue, 0); + } + + @Nullable + private static Integer checkedParseInt(String optionName, @Nullable String optionValue, int minValue) { + if (optionValue == null) { + return null; + } + int value; + try { + value = Integer.parseInt(optionValue); + } catch (NumberFormatException nfe) { + throw intParseException(optionName, optionValue, minValue); + } + if (value < minValue) { + throw intParseException(optionName, optionValue, minValue); + } + return value; + } + + private static IllegalArgumentException intParseException(String optionName, String optionValue, int minValue) { + return new IllegalArgumentException( + String.format("%s must be a number between %d and 2^31-1, got: %s", optionName, minValue, optionValue)); + } + + @Nullable + private static Long checkedParseLongPositive(String optionName, @Nullable String optionValue) { + return checkedParseLong(optionName, optionValue, 1); + } + + @Nullable + private static Long checkedParseLong(String optionName, @Nullable String optionValue, long minValue) { + if (optionValue == null) { + return null; + } + long value; + try { + value = Long.parseLong(optionValue); + } catch (NumberFormatException nfe) { + throw longParseException(optionName, optionValue, minValue); + } + if (value < minValue) { + throw longParseException(optionName, optionValue, minValue); + } + return value; + } + + private static IllegalArgumentException longParseException(String optionName, @Nullable String optionValue, + long minValue) { + return new IllegalArgumentException( + String.format("%s must be a number between %d and 2^63-1, got: %s", optionName, minValue, optionValue)); + } } diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java index 6f0f469c5b..b2ca6573b6 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java @@ -19,110 +19,127 @@ package org.apache.pinot.common.utils.config; -import com.google.common.collect.ImmutableMap; -import java.util.Arrays; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.pinot.spi.config.table.FieldConfig; -import org.apache.pinot.spi.utils.CommonConstants; -import org.testng.Assert; import org.testng.annotations.Test; import static org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; public class QueryOptionsUtilsTest { + private static final List<String> POSITIVE_INT_KEYS = + List.of(NUM_REPLICA_GROUPS_TO_QUERY, MAX_EXECUTION_THREADS, NUM_GROUPS_LIMIT, MAX_INITIAL_RESULT_HOLDER_CAPACITY, + MAX_STREAMING_PENDING_BLOCKS, MAX_ROWS_IN_JOIN, MAX_ROWS_IN_WINDOW); + private static final List<String> NON_NEGATIVE_INT_KEYS = List.of(MULTI_STAGE_LEAF_LIMIT); + private static final List<String> UNBOUNDED_INT_KEYS = + List.of(MIN_SEGMENT_GROUP_TRIM_SIZE, MIN_SERVER_GROUP_TRIM_SIZE, MIN_BROKER_GROUP_TRIM_SIZE, + GROUP_TRIM_THRESHOLD); + private static final List<String> INT_KEYS = new ArrayList<>() {{ + addAll(POSITIVE_INT_KEYS); + addAll(NON_NEGATIVE_INT_KEYS); + addAll(UNBOUNDED_INT_KEYS); + }}; + private static final List<String> POSITIVE_LONG_KEYS = + List.of(TIMEOUT_MS, MAX_SERVER_RESPONSE_SIZE_BYTES, MAX_QUERY_RESPONSE_SIZE_BYTES); @Test public void shouldConvertCaseInsensitiveMapToUseCorrectValues() { // Given: - Map<String, String> configs = ImmutableMap.of( - "ENABLENullHandling", "true", - "useMULTISTAGEEngine", "false" - ); + Map<String, String> configs = Map.of("ENABLENullHandling", "true", "useMULTISTAGEEngine", "false"); // When: Map<String, String> resolved = QueryOptionsUtils.resolveCaseInsensitiveOptions(configs); // Then: - Assert.assertEquals(resolved.get(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING), "true"); - Assert.assertEquals(resolved.get(CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE), "false"); + assertEquals(resolved.get(ENABLE_NULL_HANDLING), "true"); + assertEquals(resolved.get(USE_MULTISTAGE_ENGINE), "false"); } @Test public void testSkipIndexesParsing() { String skipIndexesStr = "col1=inverted,range&col2=sorted"; - Map<String, String> queryOptions = - Map.of(CommonConstants.Broker.Request.QueryOptionKey.SKIP_INDEXES, skipIndexesStr); + Map<String, String> queryOptions = Map.of(SKIP_INDEXES, skipIndexesStr); Map<String, Set<FieldConfig.IndexType>> skipIndexes = QueryOptionsUtils.getSkipIndexes(queryOptions); - Assert.assertEquals(skipIndexes.get("col1"), - Set.of(FieldConfig.IndexType.RANGE, FieldConfig.IndexType.INVERTED)); - Assert.assertEquals(skipIndexes.get("col2"), - Set.of(FieldConfig.IndexType.SORTED)); + assertEquals(skipIndexes.get("col1"), Set.of(FieldConfig.IndexType.RANGE, FieldConfig.IndexType.INVERTED)); + assertEquals(skipIndexes.get("col2"), Set.of(FieldConfig.IndexType.SORTED)); } @Test(expectedExceptions = RuntimeException.class) public void testSkipIndexesParsingInvalid() { String skipIndexesStr = "col1=inverted,range&col2"; - Map<String, String> queryOptions = - Map.of(CommonConstants.Broker.Request.QueryOptionKey.SKIP_INDEXES, skipIndexesStr); - QueryOptionsUtils.getSkipIndexes(queryOptions); + Map<String, String> queryOptions = Map.of(SKIP_INDEXES, skipIndexesStr); + QueryOptionsUtils.getSkipIndexes(queryOptions); } @Test public void testIntegerSettingParseSuccess() { HashMap<String, String> map = new HashMap<>(); - for (String setting : Arrays.asList(NUM_GROUPS_LIMIT, MAX_INITIAL_RESULT_HOLDER_CAPACITY, MULTI_STAGE_LEAF_LIMIT, - GROUP_TRIM_THRESHOLD, MAX_STREAMING_PENDING_BLOCKS, MAX_ROWS_IN_JOIN, MAX_STREAMING_PENDING_BLOCKS, - MAX_EXECUTION_THREADS, MIN_SEGMENT_GROUP_TRIM_SIZE, MIN_SERVER_GROUP_TRIM_SIZE)) { - map.clear(); - for (Integer val : new Integer[]{null, 1, 10, Integer.MAX_VALUE}) { - map.put(setting, val != null ? String.valueOf(val) : null); - Assert.assertEquals(getValue(map, setting), val); + for (String key : INT_KEYS) { + for (Integer value : new Integer[]{null, 1, 10, Integer.MAX_VALUE}) { + map.put(key, value != null ? String.valueOf(value) : null); + assertEquals(getValue(map, key), value); } } - for (String setting : Arrays.asList(TIMEOUT_MS, MAX_SERVER_RESPONSE_SIZE_BYTES, MAX_QUERY_RESPONSE_SIZE_BYTES)) { - map.clear(); - for (Long val : new Long[]{null, 1L, 10L, Long.MAX_VALUE}) { - map.put(setting, val != null ? String.valueOf(val) : null); - Assert.assertEquals(getValue(map, setting), val); + for (String key : POSITIVE_LONG_KEYS) { + for (Long value : new Long[]{null, 1L, 10L, Long.MAX_VALUE}) { + map.put(key, value != null ? String.valueOf(value) : null); + assertEquals(getValue(map, key), value); } } } @Test public void testIntegerSettingParseErrors() { - HashMap<String, String> map = new HashMap<>(); + for (String key : POSITIVE_INT_KEYS) { + for (String value : new String[]{"-10000000000", "-2147483648", "-1", "0", "2147483648", "10000000000"}) { + try { + getValue(Map.of(key, value), key); + fail(key); + } catch (IllegalArgumentException ise) { + assertEquals(ise.getMessage(), key + " must be a number between 1 and 2^31-1, got: " + value); + } + } + } + + for (String key : NON_NEGATIVE_INT_KEYS) { + for (String value : new String[]{"-10000000000", "-2147483648", "-1", "2147483648", "10000000000"}) { + try { + getValue(Map.of(key, value), key); + fail(); + } catch (IllegalArgumentException ise) { + assertEquals(ise.getMessage(), key + " must be a number between 0 and 2^31-1, got: " + value); + } + } + } - for (String setting : Arrays.asList(NUM_GROUPS_LIMIT, MAX_INITIAL_RESULT_HOLDER_CAPACITY, MULTI_STAGE_LEAF_LIMIT, - GROUP_TRIM_THRESHOLD, MAX_STREAMING_PENDING_BLOCKS, MAX_ROWS_IN_JOIN, MAX_STREAMING_PENDING_BLOCKS, - MAX_EXECUTION_THREADS, MIN_SEGMENT_GROUP_TRIM_SIZE, MIN_SERVER_GROUP_TRIM_SIZE)) { - for (String val : new String[]{"-10000000000", "-2147483648", "-1", "2147483648", "10000000000"}) { - map.clear(); - map.put(setting, val); + for (String key : UNBOUNDED_INT_KEYS) { + for (String value : new String[]{"-10000000000", "2147483648", "10000000000"}) { try { - getValue(map, setting); - Assert.fail(); + getValue(Map.of(key, value), key); + fail(); } catch (IllegalArgumentException ise) { - Assert.assertEquals(ise.getMessage(), setting + " must be a number between 0 and 2^31-1, got: " + val); + assertEquals(ise.getMessage(), key + " must be an integer, got: " + value); } } } - for (String setting : Arrays.asList(TIMEOUT_MS, MAX_SERVER_RESPONSE_SIZE_BYTES, MAX_QUERY_RESPONSE_SIZE_BYTES)) { - for (String val : new String[]{ + for (String key : POSITIVE_LONG_KEYS) { + for (String value : new String[]{ "-100000000000000000000", "-9223372036854775809", "-1", "0", "9223372036854775808", "100000000000000000000" }) { - map.clear(); - map.put(setting, val); try { - getValue(map, setting); - Assert.fail(); + getValue(Map.of(key, value), key); + fail(); } catch (IllegalArgumentException ise) { - Assert.assertEquals(ise.getMessage(), setting + " must be a number between 1 and 2^63-1, got: " + val); + assertEquals(ise.getMessage(), key + " must be a number between 1 and 2^63-1, got: " + value); } } } @@ -130,26 +147,34 @@ public class QueryOptionsUtilsTest { private static Object getValue(Map<String, String> map, String key) { switch (key) { - //ints + // Positive ints + case NUM_REPLICA_GROUPS_TO_QUERY: + return QueryOptionsUtils.getNumReplicaGroupsToQuery(map); + case MAX_EXECUTION_THREADS: + return QueryOptionsUtils.getMaxExecutionThreads(map); case NUM_GROUPS_LIMIT: return QueryOptionsUtils.getNumGroupsLimit(map); case MAX_INITIAL_RESULT_HOLDER_CAPACITY: return QueryOptionsUtils.getMaxInitialResultHolderCapacity(map); - case MULTI_STAGE_LEAF_LIMIT: - return QueryOptionsUtils.getMultiStageLeafLimit(map); - case GROUP_TRIM_THRESHOLD: - return QueryOptionsUtils.getGroupTrimThreshold(map); case MAX_STREAMING_PENDING_BLOCKS: return QueryOptionsUtils.getMaxStreamingPendingBlocks(map); case MAX_ROWS_IN_JOIN: return QueryOptionsUtils.getMaxRowsInJoin(map); - case MAX_EXECUTION_THREADS: - return QueryOptionsUtils.getMaxExecutionThreads(map); + case MAX_ROWS_IN_WINDOW: + return QueryOptionsUtils.getMaxRowsInWindow(map); + // Non-negative ints + case MULTI_STAGE_LEAF_LIMIT: + return QueryOptionsUtils.getMultiStageLeafLimit(map); + // Unbounded ints case MIN_SEGMENT_GROUP_TRIM_SIZE: return QueryOptionsUtils.getMinSegmentGroupTrimSize(map); case MIN_SERVER_GROUP_TRIM_SIZE: return QueryOptionsUtils.getMinServerGroupTrimSize(map); - //longs + case MIN_BROKER_GROUP_TRIM_SIZE: + return QueryOptionsUtils.getMinBrokerGroupTrimSize(map); + case GROUP_TRIM_THRESHOLD: + return QueryOptionsUtils.getGroupTrimThreshold(map); + // Positive longs case TIMEOUT_MS: return QueryOptionsUtils.getTimeoutMs(map); case MAX_SERVER_RESPONSE_SIZE_BYTES: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java index 0b928b9a17..ecb0a56cbf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java @@ -33,6 +33,7 @@ import org.apache.pinot.core.data.table.IndexedTable; import org.apache.pinot.core.data.table.IntermediateRecord; import org.apache.pinot.core.data.table.Key; import org.apache.pinot.core.data.table.Record; +import org.apache.pinot.core.data.table.SimpleIndexedTable; import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable; import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; @@ -86,7 +87,8 @@ public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<Group // without ordering. Consider ordering on group-by columns if no ordering is specified. _trimSize = limit; } - _trimThreshold = queryContext.getGroupTrimThreshold(); + int trimThreshold = queryContext.getGroupTrimThreshold(); + _trimThreshold = trimThreshold > 0 ? trimThreshold : Integer.MAX_VALUE; } else { // Server trim is disabled _trimSize = Integer.MAX_VALUE; @@ -135,16 +137,20 @@ public class GroupByCombineOperator extends BaseSingleBlockCombineOperator<Group synchronized (this) { if (_indexedTable == null) { DataSchema dataSchema = resultsBlock.getDataSchema(); - // NOTE: Use trimSize as resultSize on server size. - if (_trimThreshold >= MAX_TRIM_THRESHOLD) { - // special case of trim threshold where it is set to max value. - // there won't be any trimming during upsert in this case. - // thus we can avoid the overhead of read-lock and write-lock - // in the upsert method. - _indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize); + // NOTE: Use trimSize as resultSize on server side. + if (_numTasks == 1) { + _indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold); } else { - _indexedTable = - new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold); + if (_trimThreshold >= MAX_TRIM_THRESHOLD) { + // special case of trim threshold where it is set to max value. + // there won't be any trimming during upsert in this case. + // thus we can avoid the overhead of read-lock and write-lock + // in the upsert method. + _indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize); + } else { + _indexedTable = + new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold); + } } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java index 943ed169a1..1e8c88e9ce 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingGroupByCombineOperator.java @@ -34,6 +34,7 @@ import org.apache.pinot.core.data.table.IndexedTable; import org.apache.pinot.core.data.table.IntermediateRecord; import org.apache.pinot.core.data.table.Key; import org.apache.pinot.core.data.table.Record; +import org.apache.pinot.core.data.table.SimpleIndexedTable; import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable; import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; @@ -163,16 +164,20 @@ public class StreamingGroupByCombineOperator extends BaseStreamingCombineOperato synchronized (this) { if (_indexedTable == null) { DataSchema dataSchema = resultsBlock.getDataSchema(); - // NOTE: Use trimSize as resultSize on server size. - if (_trimThreshold >= MAX_TRIM_THRESHOLD) { - // special case of trim threshold where it is set to max value. - // there won't be any trimming during upsert in this case. - // thus we can avoid the overhead of read-lock and write-lock - // in the upsert method. - _indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize); + // NOTE: Use trimSize as resultSize on server side. + if (_numTasks == 1) { + _indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold); } else { - _indexedTable = - new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold); + if (_trimThreshold >= MAX_TRIM_THRESHOLD) { + // special case of trim threshold where it is set to max value. + // there won't be any trimming during upsert in this case. + // thus we can avoid the overhead of read-lock and write-lock + // in the upsert method. + _indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize); + } else { + _indexedTable = + new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold); + } } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java index 6912dd2586..e76a649886 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java @@ -181,7 +181,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker { // Set maxExecutionThreads int maxExecutionThreads; Integer maxExecutionThreadsFromQuery = QueryOptionsUtils.getMaxExecutionThreads(queryOptions); - if (maxExecutionThreadsFromQuery != null && maxExecutionThreadsFromQuery > 0) { + if (maxExecutionThreadsFromQuery != null) { // Do not allow query to override the execution threads over the instance-level limit if (_maxExecutionThreads > 0) { maxExecutionThreads = Math.min(_maxExecutionThreads, maxExecutionThreadsFromQuery); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java index 09b4d6a156..34395febfb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java @@ -240,11 +240,23 @@ public class GroupByDataTableReducer implements DataTableReducer { boolean hasFinalInput = _queryContext.isServerReturnFinalResult() || _queryContext.isServerReturnFinalResultKeyUnpartitioned(); int limit = _queryContext.getLimit(); - int trimSize = GroupByUtils.getTableCapacity(limit, reducerContext.getMinGroupTrimSize()); + int minTrimSize = reducerContext.getMinGroupTrimSize(); + int trimSize; + int trimThreshold; + if (minTrimSize > 0) { + trimSize = GroupByUtils.getTableCapacity(limit, minTrimSize); + trimThreshold = reducerContext.getGroupByTrimThreshold(); + if (trimThreshold <= 0) { + trimThreshold = Integer.MAX_VALUE; + } + } else { + // Broker trim is disabled + trimSize = Integer.MAX_VALUE; + trimThreshold = Integer.MAX_VALUE; + } // NOTE: For query with HAVING clause, use trimSize as resultSize to ensure the result accuracy. // TODO: Resolve the HAVING clause within the IndexedTable before returning the result int resultSize = _queryContext.getHavingFilter() != null ? trimSize : limit; - int trimThreshold = reducerContext.getGroupByTrimThreshold(); IndexedTable indexedTable; if (numReduceThreadsToUse == 1) { indexedTable = diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/ArrayAggFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/ArrayAggFunctionTest.java index 4f062d522a..17487a7d13 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/ArrayAggFunctionTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/ArrayAggFunctionTest.java @@ -142,65 +142,53 @@ public class ArrayAggFunctionTest extends AbstractAggregationFunctionTest { @Test void aggregationGroupBySVIntWithNullHandlingDisabled() { new DataTypeScenario(FieldSpec.DataType.INT).getDeclaringTable(false) - .onFirstInstance("myField", - "1", - "2", - "null" - ).andOnSecondInstance("myField", - "1", - "2", - "null" - ).whenQuery("select myField, arrayagg(myField, 'INT') from testTable group by myField") - .thenResultIs(new Object[]{1, new int[]{1, 1}}, new Object[]{2, new int[]{2, 2}}, - new Object[]{Integer.MIN_VALUE, new int[]{Integer.MIN_VALUE, Integer.MIN_VALUE}}); + .onFirstInstance("myField", "1", "2", "null") + .andOnSecondInstance("myField", "1", "2", "null") + .whenQuery("select myField, arrayagg(myField, 'INT') from testTable group by myField order by myField") + .thenResultIs( + new Object[]{Integer.MIN_VALUE, new int[]{Integer.MIN_VALUE, Integer.MIN_VALUE}}, + new Object[]{1, new int[]{1, 1}}, + new Object[]{2, new int[]{2, 2}} + ); } @Test void aggregationGroupBySVIntWithNullHandlingEnabled() { new DataTypeScenario(FieldSpec.DataType.INT).getDeclaringTable(true) - .onFirstInstance("myField", - "1", - "2", - "null" - ).andOnSecondInstance("myField", - "1", - "2", - "null" - ).whenQuery("select myField, arrayagg(myField, 'INT') from testTable group by myField") - .thenResultIs(new Object[]{1, new int[]{1, 1}}, new Object[]{2, new int[]{2, 2}}, - new Object[]{null, new int[0]}); + .onFirstInstance("myField", "1", "2", "null") + .andOnSecondInstance("myField", "1", "2", "null") + .whenQuery("select myField, arrayagg(myField, 'INT') from testTable group by myField order by myField") + .thenResultIs( + new Object[]{1, new int[]{1, 1}}, + new Object[]{2, new int[]{2, 2}}, + new Object[]{null, new int[0]} + ); } @Test void aggregationDistinctGroupBySVIntWithNullHandlingDisabled() { new DataTypeScenario(FieldSpec.DataType.INT).getDeclaringTable(false) - .onFirstInstance("myField", - "1", - "2", - "null" - ).andOnSecondInstance("myField", - "1", - "2", - "null" - ).whenQuery("select myField, arrayagg(myField, 'INT', true) from testTable group by myField") - .thenResultIs(new Object[]{1, new int[]{1}}, new Object[]{2, new int[]{2}}, - new Object[]{Integer.MIN_VALUE, new int[]{Integer.MIN_VALUE}}); + .onFirstInstance("myField", "1", "2", "null") + .andOnSecondInstance("myField", "1", "2", "null") + .whenQuery("select myField, arrayagg(myField, 'INT', true) from testTable group by myField order by myField") + .thenResultIs( + new Object[]{Integer.MIN_VALUE, new int[]{Integer.MIN_VALUE}}, + new Object[]{1, new int[]{1}}, + new Object[]{2, new int[]{2}} + ); } @Test void aggregationDistinctGroupBySVIntWithNullHandlingEnabled() { new DataTypeScenario(FieldSpec.DataType.INT).getDeclaringTable(true) - .onFirstInstance("myField", - "1", - "2", - "null" - ).andOnSecondInstance("myField", - "1", - "2", - "null" - ).whenQuery("select myField, arrayagg(myField, 'INT', true) from testTable group by myField") - .thenResultIs(new Object[]{1, new int[]{1}}, new Object[]{2, new int[]{2}}, - new Object[]{null, new int[0]}); + .onFirstInstance("myField", "1", "2", "null") + .andOnSecondInstance("myField", "1", "2", "null") + .whenQuery("select myField, arrayagg(myField, 'INT', true) from testTable group by myField order by myField") + .thenResultIs( + new Object[]{1, new int[]{1}}, + new Object[]{2, new int[]{2}}, + new Object[]{null, new int[0]} + ); } @Test @@ -257,65 +245,53 @@ public class ArrayAggFunctionTest extends AbstractAggregationFunctionTest { @Test void aggregationGroupBySVLongWithNullHandlingDisabled() { new DataTypeScenario(FieldSpec.DataType.LONG).getDeclaringTable(false) - .onFirstInstance("myField", - "1", - "2", - "null" - ).andOnSecondInstance("myField", - "1", - "2", - "null" - ).whenQuery("select myField, arrayagg(myField, 'LONG') from testTable group by myField") - .thenResultIs(new Object[]{1L, new long[]{1, 1}}, new Object[]{2L, new long[]{2, 2}}, - new Object[]{Long.MIN_VALUE, new long[]{Long.MIN_VALUE, Long.MIN_VALUE}}); + .onFirstInstance("myField", "1", "2", "null") + .andOnSecondInstance("myField", "1", "2", "null") + .whenQuery("select myField, arrayagg(myField, 'LONG') from testTable group by myField order by myField") + .thenResultIs( + new Object[]{Long.MIN_VALUE, new long[]{Long.MIN_VALUE, Long.MIN_VALUE}}, + new Object[]{1L, new long[]{1, 1}}, + new Object[]{2L, new long[]{2, 2}} + ); } @Test void aggregationGroupBySVLongWithNullHandlingEnabled() { new DataTypeScenario(FieldSpec.DataType.LONG).getDeclaringTable(true) - .onFirstInstance("myField", - "1", - "2", - "null" - ).andOnSecondInstance("myField", - "1", - "2", - "null" - ).whenQuery("select myField, arrayagg(myField, 'LONG') from testTable group by myField") - .thenResultIs(new Object[]{1L, new long[]{1, 1}}, new Object[]{2L, new long[]{2, 2}}, - new Object[]{null, new long[0]}); + .onFirstInstance("myField", "1", "2", "null") + .andOnSecondInstance("myField", "1", "2", "null") + .whenQuery("select myField, arrayagg(myField, 'LONG') from testTable group by myField order by myField") + .thenResultIs( + new Object[]{1L, new long[]{1, 1}}, + new Object[]{2L, new long[]{2, 2}}, + new Object[]{null, new long[0]} + ); } @Test void aggregationDistinctGroupBySVLongWithNullHandlingDisabled() { new DataTypeScenario(FieldSpec.DataType.LONG).getDeclaringTable(false) - .onFirstInstance("myField", - "1", - "2", - "null" - ).andOnSecondInstance("myField", - "1", - "2", - "null" - ).whenQuery("select myField, arrayagg(myField, 'LONG', true) from testTable group by myField") - .thenResultIs(new Object[]{1L, new long[]{1}}, new Object[]{2L, new long[]{2}}, - new Object[]{Long.MIN_VALUE, new long[]{Long.MIN_VALUE}}); + .onFirstInstance("myField", "1", "2", "null") + .andOnSecondInstance("myField", "1", "2", "null") + .whenQuery("select myField, arrayagg(myField, 'LONG', true) from testTable group by myField order by myField") + .thenResultIs( + new Object[]{Long.MIN_VALUE, new long[]{Long.MIN_VALUE}}, + new Object[]{1L, new long[]{1}}, + new Object[]{2L, new long[]{2}} + ); } @Test void aggregationDistinctGroupBySVLongWithNullHandlingEnabled() { new DataTypeScenario(FieldSpec.DataType.LONG).getDeclaringTable(true) - .onFirstInstance("myField", - "1", - "2", - "null" - ).andOnSecondInstance("myField", - "1", - "2", - "null" - ).whenQuery("select myField, arrayagg(myField, 'LONG', true) from testTable group by myField") - .thenResultIs(new Object[]{1L, new long[]{1}}, new Object[]{2L, new long[]{2}}, - new Object[]{null, new long[0]}); + .onFirstInstance("myField", "1", "2", "null") + .andOnSecondInstance("myField", "1", "2", "null") + .whenQuery("select myField, arrayagg(myField, 'LONG', true) from testTable group by myField order by myField") + .thenResultIs( + new Object[]{1L, new long[]{1}}, + new Object[]{2L, new long[]{2}}, + new Object[]{null, new long[0]} + ); } @Test @@ -373,60 +349,51 @@ public class ArrayAggFunctionTest extends AbstractAggregationFunctionTest { @Test void aggregationGroupBySVFloatWithNullHandlingDisabled() { new DataTypeScenario(FieldSpec.DataType.FLOAT).getDeclaringTable(false) - .onFirstInstance("myField", - "null", - "1.0", - "2.0" - ).andOnSecondInstance("myField", - "null", - "1.0", - "2.0" - ).whenQuery("select myField, arrayagg(myField, 'FLOAT') from testTable group by myField") - .thenResultIs(new Object[]{Float.NEGATIVE_INFINITY, - new float[]{Float.NEGATIVE_INFINITY, Float.NEGATIVE_INFINITY}}, - new Object[]{1.0f, new float[]{1.0f, 1.0f}}, new Object[]{2.0f, new float[]{2.0f, 2.0f}}); + .onFirstInstance("myField", "null", "1.0", "2.0") + .andOnSecondInstance("myField", "null", "1.0", "2.0") + .whenQuery("select myField, arrayagg(myField, 'FLOAT') from testTable group by myField order by myField") + .thenResultIs( + new Object[]{Float.NEGATIVE_INFINITY, new float[]{Float.NEGATIVE_INFINITY, Float.NEGATIVE_INFINITY}}, + new Object[]{1.0f, new float[]{1.0f, 1.0f}}, + new Object[]{2.0f, new float[]{2.0f, 2.0f}} + ); } @Test void aggregationGroupBySVFloatWithNullHandlingEnabled() { new DataTypeScenario(FieldSpec.DataType.FLOAT).getDeclaringTable(true) - .onFirstInstance("myField", - "null", - "1.0" - ).andOnSecondInstance("myField", - "null", - "1.0" - ).whenQuery("select myField, arrayagg(myField, 'FLOAT') from testTable group by myField") - .thenResultIs(new Object[]{null, new float[0]}, new Object[]{1.0f, new float[]{1.0f, 1.0f}}); + .onFirstInstance("myField", "null", "1.0") + .andOnSecondInstance("myField", "null", "1.0") + .whenQuery("select myField, arrayagg(myField, 'FLOAT') from testTable group by myField order by myField") + .thenResultIs( + new Object[]{1.0f, new float[]{1.0f, 1.0f}}, + new Object[]{null, new float[0]} + ); } @Test void aggregationDistinctGroupBySVFloatWithNullHandlingDisabled() { new DataTypeScenario(FieldSpec.DataType.FLOAT).getDeclaringTable(false) - .onFirstInstance("myField", - "null", - "1.0", - "2.0" - ).andOnSecondInstance("myField", - "null", - "1.0", - "2.0" - ).whenQuery("select myField, arrayagg(myField, 'FLOAT', true) from testTable group by myField") - .thenResultIs(new Object[]{Float.NEGATIVE_INFINITY, new float[]{Float.NEGATIVE_INFINITY}}, - new Object[]{1.0f, new float[]{1.0f}}, new Object[]{2.0f, new float[]{2.0f}}); + .onFirstInstance("myField", "null", "1.0", "2.0") + .andOnSecondInstance("myField", "null", "1.0", "2.0") + .whenQuery("select myField, arrayagg(myField, 'FLOAT', true) from testTable group by myField order by myField") + .thenResultIs( + new Object[]{Float.NEGATIVE_INFINITY, new float[]{Float.NEGATIVE_INFINITY}}, + new Object[]{1.0f, new float[]{1.0f}}, + new Object[]{2.0f, new float[]{2.0f}} + ); } @Test void aggregationDistinctGroupBySVFloatWithNullHandlingEnabled() { new DataTypeScenario(FieldSpec.DataType.FLOAT).getDeclaringTable(true) - .onFirstInstance("myField", - "null", - "1.0" - ).andOnSecondInstance("myField", - "null", - "1.0" - ).whenQuery("select myField, arrayagg(myField, 'FLOAT', true) from testTable group by myField") - .thenResultIs(new Object[]{null, new float[0]}, new Object[]{1.0f, new float[]{1.0f}}); + .onFirstInstance("myField", "null", "1.0") + .andOnSecondInstance("myField", "null", "1.0") + .whenQuery("select myField, arrayagg(myField, 'FLOAT', true) from testTable group by myField order by myField") + .thenResultIs( + new Object[]{1.0f, new float[]{1.0f}}, + new Object[]{null, new float[0]} + ); } @Test @@ -484,66 +451,53 @@ public class ArrayAggFunctionTest extends AbstractAggregationFunctionTest { @Test void aggregationGroupBySVDoubleWithNullHandlingDisabled() { new DataTypeScenario(FieldSpec.DataType.DOUBLE).getDeclaringTable(false) - .onFirstInstance("myField", - "null", - "1.0", - "2.0" - ).andOnSecondInstance("myField", - "null", - "1.0", - "2.0" - ).whenQuery("select myField, arrayagg(myField, 'DOUBLE') from testTable group by myField") - .thenResultIs(new Object[]{Double.NEGATIVE_INFINITY, new double[]{Double.NEGATIVE_INFINITY, - Double.NEGATIVE_INFINITY}}, new Object[]{1.0, new double[]{1.0, 1.0}}, - new Object[]{2.0, new double[]{2.0, 2.0}}); + .onFirstInstance("myField", "null", "1.0", "2.0") + .andOnSecondInstance("myField", "null", "1.0", "2.0") + .whenQuery("select myField, arrayagg(myField, 'DOUBLE') from testTable group by myField order by myField") + .thenResultIs( + new Object[]{Double.NEGATIVE_INFINITY, new double[]{Double.NEGATIVE_INFINITY, Double.NEGATIVE_INFINITY}}, + new Object[]{1.0, new double[]{1.0, 1.0}}, + new Object[]{2.0, new double[]{2.0, 2.0}} + ); } @Test void aggregationGroupBySVDoubleWithNullHandlingEnabled() { new DataTypeScenario(FieldSpec.DataType.DOUBLE).getDeclaringTable(true) - .onFirstInstance("myField", - "null", - "1.0", - "2.0" - ).andOnSecondInstance("myField", - "null", - "1.0", - "2.0" - ).whenQuery("select myField, arrayagg(myField, 'DOUBLE') from testTable group by myField") - .thenResultIs(new Object[]{null, new double[0]}, new Object[]{1.0, new double[]{1.0, 1.0}}, - new Object[]{2.0, new double[]{2.0, 2.0}}); + .onFirstInstance("myField", "null", "1.0", "2.0") + .andOnSecondInstance("myField", "null", "1.0", "2.0") + .whenQuery("select myField, arrayagg(myField, 'DOUBLE') from testTable group by myField order by myField") + .thenResultIs( + new Object[]{1.0, new double[]{1.0, 1.0}}, + new Object[]{2.0, new double[]{2.0, 2.0}}, + new Object[]{null, new double[0]} + ); } @Test void aggregationDistinctGroupBySVDoubleWithNullHandlingDisabled() { new DataTypeScenario(FieldSpec.DataType.DOUBLE).getDeclaringTable(false) - .onFirstInstance("myField", - "null", - "1.0", - "2.0" - ).andOnSecondInstance("myField", - "null", - "1.0", - "2.0" - ).whenQuery("select myField, arrayagg(myField, 'DOUBLE', true) from testTable group by myField") - .thenResultIs(new Object[]{Double.NEGATIVE_INFINITY, new double[]{Double.NEGATIVE_INFINITY}}, - new Object[]{1.0, new double[]{1.0}}, new Object[]{2.0, new double[]{2.0}}); + .onFirstInstance("myField", "null", "1.0", "2.0") + .andOnSecondInstance("myField", "null", "1.0", "2.0") + .whenQuery("select myField, arrayagg(myField, 'DOUBLE', true) from testTable group by myField order by myField") + .thenResultIs( + new Object[]{Double.NEGATIVE_INFINITY, new double[]{Double.NEGATIVE_INFINITY}}, + new Object[]{1.0, new double[]{1.0}}, + new Object[]{2.0, new double[]{2.0}} + ); } @Test void aggregationDistinctGroupBySVDoubleWithNullHandlingEnabled() { new DataTypeScenario(FieldSpec.DataType.DOUBLE).getDeclaringTable(true) - .onFirstInstance("myField", - "null", - "1.0", - "2.0" - ).andOnSecondInstance("myField", - "null", - "1.0", - "2.0" - ).whenQuery("select myField, arrayagg(myField, 'DOUBLE', true) from testTable group by myField") - .thenResultIs(new Object[]{null, new double[0]}, new Object[]{1.0, new double[]{1.0}}, - new Object[]{2.0, new double[]{2.0}}); + .onFirstInstance("myField", "null", "1.0", "2.0") + .andOnSecondInstance("myField", "null", "1.0", "2.0") + .whenQuery("select myField, arrayagg(myField, 'DOUBLE', true) from testTable group by myField order by myField") + .thenResultIs( + new Object[]{1.0, new double[]{1.0}}, + new Object[]{2.0, new double[]{2.0}}, + new Object[]{null, new double[0]} + ); } @Test @@ -600,65 +554,53 @@ public class ArrayAggFunctionTest extends AbstractAggregationFunctionTest { @Test void aggregationGroupBySVStringWithNullHandlingDisabled() { new DataTypeScenario(FieldSpec.DataType.STRING).getDeclaringTable(false) - .onFirstInstance("myField", - "a", - "b", - "null" - ).andOnSecondInstance("myField", - "a", - "b", - "null" - ).whenQuery("select myField, arrayagg(myField, 'STRING') from testTable group by myField") - .thenResultIs(new Object[]{"a", new String[]{"a", "a"}}, new Object[]{"b", new String[]{"b", "b"}}, - new Object[]{"null", new String[]{"null", "null"}}); + .onFirstInstance("myField", "a", "b", "null") + .andOnSecondInstance("myField", "a", "b", "null") + .whenQuery("select myField, arrayagg(myField, 'STRING') from testTable group by myField order by myField") + .thenResultIs( + new Object[]{"a", new String[]{"a", "a"}}, + new Object[]{"b", new String[]{"b", "b"}}, + new Object[]{"null", new String[]{"null", "null"}} + ); } @Test void aggregationGroupBySVStringWithNullHandlingEnabled() { new DataTypeScenario(FieldSpec.DataType.STRING).getDeclaringTable(true) - .onFirstInstance("myField", - "a", - "b", - "null" - ).andOnSecondInstance("myField", - "a", - "b", - "null" - ).whenQuery("select myField, arrayagg(myField, 'STRING') from testTable group by myField") - .thenResultIs(new Object[]{"a", new String[]{"a", "a"}}, new Object[]{"b", new String[]{"b", "b"}}, - new Object[]{null, new String[0]}); + .onFirstInstance("myField", "a", "b", "null") + .andOnSecondInstance("myField", "a", "b", "null") + .whenQuery("select myField, arrayagg(myField, 'STRING') from testTable group by myField order by myField") + .thenResultIs( + new Object[]{"a", new String[]{"a", "a"}}, + new Object[]{"b", new String[]{"b", "b"}}, + new Object[]{null, new String[0]} + ); } @Test void aggregationDistinctGroupBySVStringWithNullHandlingDisabled() { new DataTypeScenario(FieldSpec.DataType.STRING).getDeclaringTable(false) - .onFirstInstance("myField", - "a", - "b", - "null" - ).andOnSecondInstance("myField", - "a", - "b", - "null" - ).whenQuery("select myField, arrayagg(myField, 'STRING', true) from testTable group by myField") - .thenResultIs(new Object[]{"a", new String[]{"a"}}, new Object[]{"b", new String[]{"b"}}, - new Object[]{"null", new String[]{"null"}}); + .onFirstInstance("myField", "a", "b", "null") + .andOnSecondInstance("myField", "a", "b", "null") + .whenQuery("select myField, arrayagg(myField, 'STRING', true) from testTable group by myField order by myField") + .thenResultIs( + new Object[]{"a", new String[]{"a"}}, + new Object[]{"b", new String[]{"b"}}, + new Object[]{"null", new String[]{"null"}} + ); } @Test void aggregationDistinctGroupBySVStringWithNullHandlingEnabled() { new DataTypeScenario(FieldSpec.DataType.STRING).getDeclaringTable(true) - .onFirstInstance("myField", - "a", - "b", - "null" - ).andOnSecondInstance("myField", - "a", - "b", - "null" - ).whenQuery("select myField, arrayagg(myField, 'STRING', true) from testTable group by myField") - .thenResultIs(new Object[]{"a", new String[]{"a"}}, new Object[]{"b", new String[]{"b"}}, - new Object[]{null, new String[0]}); + .onFirstInstance("myField", "a", "b", "null") + .andOnSecondInstance("myField", "a", "b", "null") + .whenQuery("select myField, arrayagg(myField, 'STRING', true) from testTable group by myField order by myField") + .thenResultIs( + new Object[]{"a", new String[]{"a"}}, + new Object[]{"b", new String[]{"b"}}, + new Object[]{null, new String[0]} + ); } @Test diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/WithOptionQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/WithOptionQueriesTest.java deleted file mode 100644 index 654a0add08..0000000000 --- a/pinot-core/src/test/java/org/apache/pinot/queries/WithOptionQueriesTest.java +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pinot.queries; - -import java.io.File; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import org.apache.commons.io.FileUtils; -import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; -import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; -import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; -import org.apache.pinot.segment.spi.ImmutableSegment; -import org.apache.pinot.segment.spi.IndexSegment; -import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; -import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.data.FieldSpec; -import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey; -import org.apache.pinot.spi.utils.ReadMode; -import org.apache.pinot.spi.utils.builder.TableConfigBuilder; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import static org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.MAX_QUERY_RESPONSE_SIZE_BYTES; -import static org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES; -import static org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS; - - -public class WithOptionQueriesTest extends BaseQueriesTest { - - private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "WithOptionQueriesTest"); - private static final String RAW_TABLE_NAME = "testTable"; - private static final String SEGMENT_NAME = "testSegment"; - - private static final int NUM_RECORDS = 10; - private static final String X_COL = "x"; - private static final String Y_COL = "y"; - - private static final Schema SCHEMA = new Schema.SchemaBuilder().addSingleValueDimension(X_COL, FieldSpec.DataType.INT) - .addSingleValueDimension(Y_COL, FieldSpec.DataType.DOUBLE).build(); - - private static final TableConfig TABLE_CONFIG = - new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); - - private IndexSegment _indexSegment; - private List<IndexSegment> _indexSegments; - - @Override - protected String getFilter() { - return ""; - } - - @Override - protected IndexSegment getIndexSegment() { - return _indexSegment; - } - - @Override - protected List<IndexSegment> getIndexSegments() { - return _indexSegments; - } - - private final List<Object[]> _allRecords = new ArrayList<>(); - - @BeforeClass - public void setUp() - throws Exception { - FileUtils.deleteQuietly(INDEX_DIR); - - List<GenericRow> records = new ArrayList<>(NUM_RECORDS); - for (int i = 0; i < NUM_RECORDS; i++) { - GenericRow record = new GenericRow(); - record.putValue(X_COL, i); - record.putValue(Y_COL, 0.25); - records.add(record); - _allRecords.add(new Object[]{i, 0.25}); - } - - SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA); - segmentGeneratorConfig.setTableName(RAW_TABLE_NAME); - segmentGeneratorConfig.setSegmentName(SEGMENT_NAME); - segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath()); - - SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); - driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records)); - driver.build(); - - ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap); - _indexSegment = immutableSegment; - _indexSegments = Arrays.asList(immutableSegment, immutableSegment); - } - - @Test - public void testOptionParsingFailure() { - HashMap<String, String> options = new HashMap<>(); - - // int values - for (String setting : Arrays.asList(QueryOptionKey.NUM_GROUPS_LIMIT, - QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY, QueryOptionKey.GROUP_TRIM_THRESHOLD, - QueryOptionKey.MAX_EXECUTION_THREADS, QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE, - QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE, QueryOptionKey.MIN_BROKER_GROUP_TRIM_SIZE)) { - - options.clear(); - for (String value : new String[]{"-10000000000", "-2147483648", "-1", "2147483648", "10000000000"}) { - options.put(setting, value); - - IllegalArgumentException exception = Assert.expectThrows(IllegalArgumentException.class, () -> { - getBrokerResponse("SELECT x, count(*) FROM " + RAW_TABLE_NAME + " GROUP BY x", options); - }); - Assert.assertEquals(setting + " must be a number between 0 and 2^31-1, got: " + value, exception.getMessage()); - } - } - } - - @Test - public void testOptionParsingSuccess() { - HashMap<String, String> options = new HashMap<>(); - List<Object> groupRows = new ArrayList(); - groupRows.add(new Object[]{0d, 40L}); //four times 10 records because segment gets multiplied under the hood - - // int values - for (String setting : Arrays.asList(QueryOptionKey.NUM_GROUPS_LIMIT, - QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY, QueryOptionKey.MULTI_STAGE_LEAF_LIMIT, - QueryOptionKey.GROUP_TRIM_THRESHOLD, QueryOptionKey.MAX_STREAMING_PENDING_BLOCKS, - QueryOptionKey.MAX_ROWS_IN_JOIN, QueryOptionKey.MAX_EXECUTION_THREADS, - QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE, QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE, - QueryOptionKey.MIN_BROKER_GROUP_TRIM_SIZE, QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY)) { - - options.clear(); - for (String value : new String[]{"0", "1", "10000", "2147483647"}) { - - options.put(setting, value); - List<Object[]> rows = - getBrokerResponse("SELECT mod(x,1), count(*) FROM " + RAW_TABLE_NAME + " GROUP BY mod(x,1)", - options).getResultTable().getRows(); - if (QueryOptionKey.NUM_GROUPS_LIMIT == setting && "0".equals(value)) { - Assert.assertEquals(0, rows.size()); - } else { - assertEquals(rows, groupRows); - } - } - } - - //long values - for (String setting : Arrays.asList(TIMEOUT_MS, MAX_SERVER_RESPONSE_SIZE_BYTES, MAX_QUERY_RESPONSE_SIZE_BYTES)) { - options.clear(); - for (String value : new String[]{"1", "10000", "9223372036854775807"}) { - options.put(setting, value); - List<Object[]> rows = getBrokerResponse("SELECT * FROM " + RAW_TABLE_NAME, options).getResultTable().getRows(); - assertEquals(rows, _allRecords); - } - } - } - - private void assertEquals(List actual, List expected) { - if (actual == expected) { - return; - } - - if (actual == null || expected == null || actual.size() != expected.size()) { - Assert.fail("Expected \n" + expected + "\n but got \n" + actual); - } - - for (int i = 0; i < actual.size(); i++) { - Assert.assertEquals(actual.get(i), expected.get(i)); - } - } -} diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java index 3e946d5eab..b70b39a8f5 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java @@ -39,7 +39,6 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; -import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testng.Assert; @@ -48,6 +47,8 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import static org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey.*; + /** * all special tests that doesn't fit into {@link org.apache.pinot.query.runtime.queries.ResourceBasedQueriesTest} @@ -69,7 +70,8 @@ public class QueryRunnerTest extends QueryRunnerTestBase { SCHEMA_BUILDER = new Schema.SchemaBuilder().addSingleValueDimension("col1", FieldSpec.DataType.STRING, "") .addSingleValueDimension("col2", FieldSpec.DataType.STRING, "") .addDateTime("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS") - .addMetric("col3", FieldSpec.DataType.INT, 0).setSchemaName("defaultSchemaName") + .addMetric("col3", FieldSpec.DataType.INT, 0) + .setSchemaName("defaultSchemaName") .setEnableColumnBasedNullHandling(true); } @@ -293,69 +295,59 @@ public class QueryRunnerTest extends QueryRunnerTestBase { @DataProvider(name = "testDataWithSqlExecutionExceptions") protected Iterator<Object[]> provideTestSqlWithExecutionException() { - //@formatter:off - List<Object[]> testCases = new ArrayList(); - testCases.addAll( - Arrays.asList( - // Missing index - new Object[]{"SELECT col1 FROM a WHERE textMatch(col1, 'f') LIMIT 10", "without text index"}, - - // Query hint with dynamic broadcast pipeline breaker should return error upstream - new Object[]{ - "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ col1 FROM a WHERE a.col1 IN " - + "(SELECT b.col2 FROM b WHERE textMatch(col1, 'f')) AND a.col3 > 0", - "without text index" - }, - - // Timeout exception should occur with this option: - new Object[]{"SET timeoutMs = 1; SELECT * FROM a JOIN b ON a.col1 = b.col1 JOIN c ON a.col1 = c.col1", - "Timeout"}, - - // Function with incorrect argument signature should throw runtime exception when casting string to numeric - new Object[]{"SELECT least(a.col2, b.col3) FROM a JOIN b ON a.col1 = b.col1", "For input string:"}, - - // Scalar function that doesn't have a valid use should throw an exception on the leaf stage - // - predicate only functions: - new Object[]{"SELECT * FROM a WHERE textMatch(col1, 'f')", "without text index"}, - new Object[]{"SELECT * FROM a WHERE text_match(col1, 'f')", "without text index"}, - new Object[]{"SELECT * FROM a WHERE textContains(col1, 'f')", "supported only on native text index"}, - new Object[]{"SELECT * FROM a WHERE text_contains(col1, 'f')", "supported only on native text index"}, - - // - transform only functions - new Object[]{"SELECT jsonExtractKey(col1, 'path') FROM a", "was expecting (JSON String"}, - new Object[]{"SELECT json_extract_key(col1, 'path') FROM a", "was expecting (JSON String"}, - - // - PlaceholderScalarFunction registered will throw on intermediate stage, but works on leaf stage. - // - checked "Illegal Json Path" as col1 is not actually a json string, but the call is correctly triggered. - new Object[]{"SELECT CAST(jsonExtractScalar(col1, 'path', 'INT') AS INT) FROM a", "Cannot resolve JSON path"}, - // - checked function cannot be found b/c there's no intermediate stage impl for json_extract_scalar - new Object[]{ - "SELECT CAST(json_extract_scalar(a.col1, b.col2, 'INT') AS INT) FROM a JOIN b ON a.col1 = b.col1", - "Unsupported function: JSONEXTRACTSCALAR" - })); - //@formatter:on - - // int values - for (String setting : Arrays.asList(QueryOptionKey.NUM_GROUPS_LIMIT, - QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY, QueryOptionKey.MULTI_STAGE_LEAF_LIMIT, - QueryOptionKey.GROUP_TRIM_THRESHOLD, QueryOptionKey.MAX_STREAMING_PENDING_BLOCKS, - QueryOptionKey.MAX_ROWS_IN_JOIN, QueryOptionKey.MAX_EXECUTION_THREADS, - QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE, QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE)) { - - for (String val : new String[]{"-10000000000", "-2147483648", "-1", "2147483648", "10000000000"}) { - testCases.add(new Object[]{ - "set " + setting + " = " + val + "; SELECT col1, count(*) FROM a GROUP BY col1", - setting + " must be a number between 0 and 2^31-1, got: " + val - }); - } - } - - // int values; triggered for query with window clause - for (String setting : Arrays.asList(QueryOptionKey.MAX_ROWS_IN_WINDOW)) { - for (String val : new String[]{"-10000000000", "-2147483648", "-1", "2147483648", "10000000000"}) { + List<Object[]> testCases = new ArrayList<>(); + // Missing index + testCases.add(new Object[]{"SELECT col1 FROM a WHERE textMatch(col1, 'f') LIMIT 10", "without text index"}); + + // Query hint with dynamic broadcast pipeline breaker should return error upstream + testCases.add(new Object[]{ + "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ col1 FROM a WHERE a.col1 IN " + + "(SELECT b.col2 FROM b WHERE textMatch(col1, 'f')) AND a.col3 > 0", + "without text index" + }); + + // Timeout exception should occur with this option: + testCases.add(new Object[]{ + "SET timeoutMs = 1; SELECT * FROM a JOIN b ON a.col1 = b.col1 JOIN c ON a.col1 = c.col1", + "Timeout" + }); + + // Function with incorrect argument signature should throw runtime exception when casting string to numeric + testCases.add(new Object[]{"SELECT least(a.col2, b.col3) FROM a JOIN b ON a.col1 = b.col1", "For input string:"}); + + // Scalar function that doesn't have a valid use should throw an exception on the leaf stage + // - predicate only functions: + testCases.add(new Object[]{"SELECT * FROM a WHERE textMatch(col1, 'f')", "without text index"}); + testCases.add(new Object[]{"SELECT * FROM a WHERE text_match(col1, 'f')", "without text index"}); + testCases.add(new Object[]{"SELECT * FROM a WHERE textContains(col1, 'f')", "supported only on native text index"}); + testCases.add(new Object[]{ + "SELECT * FROM a WHERE text_contains(col1, 'f')", + "supported only on native text index"} + ); + + // - transform only functions + testCases.add(new Object[]{"SELECT jsonExtractKey(col1, 'path') FROM a", "was expecting (JSON String"}); + testCases.add(new Object[]{"SELECT json_extract_key(col1, 'path') FROM a", "was expecting (JSON String"}); + + // - PlaceholderScalarFunction registered will throw on intermediate stage, but works on leaf stage. + // - checked "Illegal Json Path" as col1 is not actually a json string, but the call is correctly triggered. + testCases.add( + new Object[]{"SELECT CAST(jsonExtractScalar(col1, 'path', 'INT') AS INT) FROM a", "Cannot resolve JSON path"}); + // - checked function cannot be found b/c there's no intermediate stage impl for json_extract_scalar + testCases.add(new Object[]{ + "SELECT CAST(json_extract_scalar(a.col1, b.col2, 'INT') AS INT) FROM a JOIN b ON a.col1 = b.col1", + "Unsupported function: JSONEXTRACTSCALAR" + }); + + // Positive int keys (only included ones that will be parsed for this query) + for (String key : new String[]{ + MAX_EXECUTION_THREADS, NUM_GROUPS_LIMIT, MAX_INITIAL_RESULT_HOLDER_CAPACITY, MAX_STREAMING_PENDING_BLOCKS, + MAX_ROWS_IN_JOIN + }) { + for (String value : new String[]{"-10000000000", "-2147483648", "-1", "0", "2147483648", "10000000000"}) { testCases.add(new Object[]{ - "set " + setting + " = " + val + "; SELECT ROW_NUMBER() over (PARTITION BY col1) FROM a", - setting + " must be a number between 0 and 2^31-1, got: " + val + "set " + key + " = " + value + "; SELECT col1, count(*) FROM a GROUP BY col1", + key + " must be a number between 1 and 2^31-1, got: " + value }); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org