This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new fbd673482b [multistage] [feature] Add a query option to pass some v1 limit (#9957) fbd673482b is described below commit fbd673482b909e656c924274927ab3f58d6dd6cf Author: Yao Liu <y...@startree.ai> AuthorDate: Thu Jan 19 14:54:57 2023 -0800 [multistage] [feature] Add a query option to pass some v1 limit (#9957) --- .../MultiStageBrokerRequestHandler.java | 10 ++-- .../common/utils/config/QueryOptionsUtils.java | 24 +++++++++ .../core/plan/maker/InstancePlanMakerImplV2.java | 34 ++++++++---- .../tests/ClusterIntegrationTestUtils.java | 30 +++++++++-- .../tests/MultiStageEngineIntegrationTest.java | 9 ++++ .../runtime/plan/ServerRequestPlanVisitor.java | 60 +++++++++++++--------- .../pinot/query/service/QueryDispatcher.java | 10 ++-- .../pinot/query/service/QueryDispatcherTest.java | 2 +- .../apache/pinot/spi/utils/CommonConstants.java | 5 ++ 9 files changed, 137 insertions(+), 47 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index c9c604d9db..f5295bad52 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -98,7 +98,8 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { // it is OK to ignore the onDataAvailable callback because the broker top-level operators // always run in-line (they don't have any scheduler) - _mailboxService = MultiplexingMailboxService.newInstance(_reducerHostname, _reducerPort, config, ignored -> { }); + _mailboxService = MultiplexingMailboxService.newInstance(_reducerHostname, _reducerPort, config, ignored -> { + }); // TODO: move this to a startUp() function. _mailboxService.start(); @@ -165,7 +166,8 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { ResultTable queryResults; try { - queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan, _mailboxService, queryTimeoutMs); + queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan, _mailboxService, queryTimeoutMs, + sqlNodeAndOptions.getOptions()); } catch (Exception e) { LOGGER.info("query execution failed", e); return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); @@ -175,8 +177,8 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { long executionEndTimeNs = System.nanoTime(); // Set total query processing time - long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(sqlNodeAndOptions.getParseTimeNs() - + (executionEndTimeNs - compilationStartTimeNs)); + long totalTimeMs = TimeUnit.NANOSECONDS.toMillis( + sqlNodeAndOptions.getParseTimeNs() + (executionEndTimeNs - compilationStartTimeNs)); brokerResponse.setTimeUsedMs(totalTimeMs); brokerResponse.setResultTable(queryResults); requestContext.setQueryProcessingTime(totalTimeMs); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index 0fa3c82b33..420924af0a 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -166,4 +166,28 @@ public class QueryOptionsUtils { public static String getOrderByAlgorithm(Map<String, String> queryOptions) { return queryOptions.get(QueryOptionKey.ORDER_BY_ALGORITHM); } + + @Nullable + public static Integer getMultiStageLeafLimit(Map<String, String> queryOptions) { + String maxLeafLimitStr = queryOptions.get(QueryOptionKey.MULTI_STAGE_LEAF_LIMIT); + return maxLeafLimitStr != null ? Integer.parseInt(maxLeafLimitStr) : null; + } + + @Nullable + public static Integer getNumGroupsLimit(Map<String, String> queryOptions) { + String maxNumGroupLimit = queryOptions.get(QueryOptionKey.NUM_GROUPS_LIMIT); + return maxNumGroupLimit != null ? Integer.parseInt(maxNumGroupLimit) : null; + } + + @Nullable + public static Integer getMaxInitialResultHolderCapacity(Map<String, String> queryOptions) { + String maxInitResultCap = queryOptions.get(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY); + return maxInitResultCap != null ? Integer.parseInt(maxInitResultCap) : null; + } + + @Nullable + public static Integer getGroupTrimThreshold(Map<String, String> queryOptions) { + String groupByTrimThreshold = queryOptions.get(QueryOptionKey.GROUP_TRIM_THRESHOLD); + return groupByTrimThreshold != null ? Integer.parseInt(groupByTrimThreshold) : null; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java index 391a8797f0..4510e3e9fa 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java @@ -209,27 +209,39 @@ public class InstancePlanMakerImplV2 implements PlanMaker { // Set group-by query options if (QueryContextUtils.isAggregationQuery(queryContext) && queryContext.getGroupByExpressions() != null) { - // Set maxInitialResultHolderCapacity - queryContext.setMaxInitialResultHolderCapacity(_maxInitialResultHolderCapacity); - + Integer initResultCap = QueryOptionsUtils.getMaxInitialResultHolderCapacity(queryOptions); + if (initResultCap != null) { + queryContext.setMaxInitialResultHolderCapacity(initResultCap); + } else { + queryContext.setMaxInitialResultHolderCapacity(_maxInitialResultHolderCapacity); + } // Set numGroupsLimit - queryContext.setNumGroupsLimit(_numGroupsLimit); - + Integer numGroupsLimit = QueryOptionsUtils.getNumGroupsLimit(queryOptions); + if (numGroupsLimit != null) { + queryContext.setNumGroupsLimit(numGroupsLimit); + } else { + queryContext.setNumGroupsLimit(_numGroupsLimit); + } // Set minSegmentGroupTrimSize Integer minSegmentGroupTrimSizeFromQuery = QueryOptionsUtils.getMinSegmentGroupTrimSize(queryOptions); - int minSegmentGroupTrimSize = - minSegmentGroupTrimSizeFromQuery != null ? minSegmentGroupTrimSizeFromQuery : _minSegmentGroupTrimSize; - queryContext.setMinSegmentGroupTrimSize(minSegmentGroupTrimSize); - + if (minSegmentGroupTrimSizeFromQuery != null) { + queryContext.setMinSegmentGroupTrimSize(minSegmentGroupTrimSizeFromQuery); + } else { + queryContext.setMinSegmentGroupTrimSize(_minSegmentGroupTrimSize); + } // Set minServerGroupTrimSize Integer minServerGroupTrimSizeFromQuery = QueryOptionsUtils.getMinServerGroupTrimSize(queryOptions); int minServerGroupTrimSize = minServerGroupTrimSizeFromQuery != null ? minServerGroupTrimSizeFromQuery : _minServerGroupTrimSize; queryContext.setMinServerGroupTrimSize(minServerGroupTrimSize); - // Set groupTrimThreshold - queryContext.setGroupTrimThreshold(_groupByTrimThreshold); + Integer groupTrimThreshold = QueryOptionsUtils.getGroupTrimThreshold(queryOptions); + if (groupTrimThreshold != null) { + queryContext.setGroupTrimThreshold(groupTrimThreshold); + } else { + queryContext.setGroupTrimThreshold(_groupByTrimThreshold); + } } } diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java index 3a3e867348..67a1ea78f1 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java @@ -560,11 +560,28 @@ public class ClusterIntegrationTestUtils { testQuery(pinotQuery, brokerUrl, pinotConnection, h2Query, h2Connection, headers, null); } + /** + * Compare # of rows in pinot and H2 only. Succeed if # of rows matches. Note this only applies to non-aggregation + * query. + */ + static void testQueryWithMatchingRowCount(String pinotQuery, String brokerUrl, + org.apache.pinot.client.Connection pinotConnection, String h2Query, Connection h2Connection, + @Nullable Map<String, String> headers, @Nullable Map<String, String> extraJsonProperties) + throws Exception { + try { + testQueryInternal(pinotQuery, brokerUrl, pinotConnection, h2Query, h2Connection, headers, extraJsonProperties, + true); + } catch (Exception e) { + failure(pinotQuery, h2Query, "Caught exception while testing query!", e); + } + } + static void testQuery(String pinotQuery, String brokerUrl, org.apache.pinot.client.Connection pinotConnection, String h2Query, Connection h2Connection, @Nullable Map<String, String> headers, @Nullable Map<String, String> extraJsonProperties) { try { - testQueryInternal(pinotQuery, brokerUrl, pinotConnection, h2Query, h2Connection, headers, extraJsonProperties); + testQueryInternal(pinotQuery, brokerUrl, pinotConnection, h2Query, h2Connection, headers, extraJsonProperties, + false); } catch (Exception e) { failure(pinotQuery, h2Query, "Caught exception while testing query!", e); } @@ -572,7 +589,8 @@ public class ClusterIntegrationTestUtils { private static void testQueryInternal(String pinotQuery, String brokerUrl, org.apache.pinot.client.Connection pinotConnection, String h2Query, Connection h2Connection, - @Nullable Map<String, String> headers, @Nullable Map<String, String> extraJsonProperties) + @Nullable Map<String, String> headers, @Nullable Map<String, String> extraJsonProperties, + boolean matchingRowCount) throws Exception { // broker response JsonNode pinotResponse = ClusterTest.postQuery(pinotQuery, brokerUrl, headers, extraJsonProperties); @@ -609,7 +627,13 @@ public class ClusterIntegrationTestUtils { List<String> expectedOrderByValues = new ArrayList<>(); int h2NumRows = getH2ExpectedValues(expectedValues, expectedOrderByValues, h2ResultSet, h2ResultSet.getMetaData(), orderByColumns); - + if (matchingRowCount) { + if (numRows != h2NumRows) { + throw new RuntimeException("Pinot # of rows " + numRows + " doesn't match h2 # of rows " + h2NumRows); + } else { + return; + } + } comparePinotResultsWithExpectedValues(expectedValues, expectedOrderByValues, resultTableResultSet, orderByColumns, pinotQuery, h2Query, h2NumRows, pinotNumRecordsSelected); } else { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java index 79a1c9c786..651a8da1bd 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java @@ -99,6 +99,15 @@ public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTestS super.testGeneratedQueries(false, true); } + @Test + public void testQueryOptions() + throws Exception { + String pinotQuery = "SET multistageLeafLimit = 1; SELECT * FROM mytable;"; + String h2Query = "SELECT * FROM mytable limit 1"; + ClusterIntegrationTestUtils.testQueryWithMatchingRowCount(pinotQuery, _brokerBaseApiUrl, getPinotConnection(), + h2Query, getH2Connection(), null, ImmutableMap.of("queryOptions", "useMultistageEngine=true")); + } + @Override protected Connection getPinotConnection() { Properties properties = new Properties(); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java index d7ef580824..071a9325bd 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java @@ -31,6 +31,7 @@ import org.apache.pinot.common.request.Expression; import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.common.request.PinotQuery; import org.apache.pinot.common.request.QuerySource; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.core.query.optimizer.QueryOptimizer; import org.apache.pinot.core.routing.TimeBoundaryInfo; @@ -60,6 +61,8 @@ import org.apache.pinot.sql.parsers.rewriter.NonAggregationGroupByToDistinctQuer import org.apache.pinot.sql.parsers.rewriter.PredicateComparisonRewriter; import org.apache.pinot.sql.parsers.rewriter.QueryRewriter; import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -73,13 +76,12 @@ import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory; */ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPlanRequestContext> { private static final int DEFAULT_LEAF_NODE_LIMIT = 10_000_000; + private static final Logger LOGGER = LoggerFactory.getLogger(ServerRequestPlanVisitor.class); private static final List<String> QUERY_REWRITERS_CLASS_NAMES = - ImmutableList.of( - PredicateComparisonRewriter.class.getName(), - NonAggregationGroupByToDistinctQueryRewriter.class.getName() - ); - private static final List<QueryRewriter> QUERY_REWRITERS = new ArrayList<>( - QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES)); + ImmutableList.of(PredicateComparisonRewriter.class.getName(), + NonAggregationGroupByToDistinctQueryRewriter.class.getName()); + private static final List<QueryRewriter> QUERY_REWRITERS = + new ArrayList<>(QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES)); private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer(); private static final ServerRequestPlanVisitor INSTANCE = new ServerRequestPlanVisitor(); @@ -92,11 +94,18 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)); long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS)); PinotQuery pinotQuery = new PinotQuery(); - pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT); + Integer leafNodeLimit = QueryOptionsUtils.getMultiStageLeafLimit(requestMetadataMap); + if (leafNodeLimit != null) { + pinotQuery.setLimit(leafNodeLimit); + } else { + pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT); + } + LOGGER.debug("QueryID" + requestId + " leafNodeLimit:" + leafNodeLimit); pinotQuery.setExplain(false); - ServerPlanRequestContext context = new ServerPlanRequestContext(mailboxService, requestId, stagePlan.getStageId(), - timeoutMs, stagePlan.getServerInstance().getHostname(), stagePlan.getServerInstance().getPort(), - stagePlan.getMetadataMap(), pinotQuery, tableType, timeBoundaryInfo); + ServerPlanRequestContext context = + new ServerPlanRequestContext(mailboxService, requestId, stagePlan.getStageId(), timeoutMs, + stagePlan.getServerInstance().getHostname(), stagePlan.getServerInstance().getPort(), + stagePlan.getMetadataMap(), pinotQuery, tableType, timeBoundaryInfo); // visit the plan and create query physical plan. ServerRequestPlanVisitor.walkStageNode(stagePlan.getStageRoot(), context); @@ -112,8 +121,8 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl QUERY_OPTIMIZER.optimize(pinotQuery, tableConfig, schema); // 2. set pinot query options according to requestMetadataMap - pinotQuery.setQueryOptions(ImmutableMap.of(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, - String.valueOf(timeoutMs))); + pinotQuery.setQueryOptions( + ImmutableMap.of(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, String.valueOf(timeoutMs))); // 3. wrapped around in broker request BrokerRequest brokerRequest = new BrokerRequest(); @@ -145,19 +154,20 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl public Void visitAggregate(AggregateNode node, ServerPlanRequestContext context) { visitChildren(node, context); // set group-by list - context.getPinotQuery().setGroupByList(CalciteRexExpressionParser.convertGroupByList( - node.getGroupSet(), context.getPinotQuery())); + context.getPinotQuery() + .setGroupByList(CalciteRexExpressionParser.convertGroupByList(node.getGroupSet(), context.getPinotQuery())); // set agg list - context.getPinotQuery().setSelectList(CalciteRexExpressionParser.addSelectList( - context.getPinotQuery().getGroupByList(), node.getAggCalls(), context.getPinotQuery())); + context.getPinotQuery().setSelectList( + CalciteRexExpressionParser.addSelectList(context.getPinotQuery().getGroupByList(), node.getAggCalls(), + context.getPinotQuery())); return _aVoid; } @Override public Void visitFilter(FilterNode node, ServerPlanRequestContext context) { visitChildren(node, context); - context.getPinotQuery().setFilterExpression(CalciteRexExpressionParser.toExpression( - node.getCondition(), context.getPinotQuery())); + context.getPinotQuery() + .setFilterExpression(CalciteRexExpressionParser.toExpression(node.getCondition(), context.getPinotQuery())); return _aVoid; } @@ -182,8 +192,8 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl @Override public Void visitProject(ProjectNode node, ServerPlanRequestContext context) { visitChildren(node, context); - context.getPinotQuery().setSelectList(CalciteRexExpressionParser.overwriteSelectList( - node.getProjects(), context.getPinotQuery())); + context.getPinotQuery() + .setSelectList(CalciteRexExpressionParser.overwriteSelectList(node.getProjects(), context.getPinotQuery())); return _aVoid; } @@ -191,8 +201,9 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl public Void visitSort(SortNode node, ServerPlanRequestContext context) { visitChildren(node, context); if (node.getCollationKeys().size() > 0) { - context.getPinotQuery().setOrderByList(CalciteRexExpressionParser.convertOrderByList(node.getCollationKeys(), - node.getCollationDirections(), context.getPinotQuery())); + context.getPinotQuery().setOrderByList( + CalciteRexExpressionParser.convertOrderByList(node.getCollationKeys(), node.getCollationDirections(), + context.getPinotQuery())); } if (node.getFetch() > 0) { context.getPinotQuery().setLimit(node.getFetch()); @@ -210,8 +221,8 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl .tableNameWithType(TableNameBuilder.extractRawTableName(node.getTableName())); dataSource.setTableName(tableNameWithType); context.getPinotQuery().setDataSource(dataSource); - context.getPinotQuery().setSelectList(node.getTableScanColumns().stream() - .map(RequestUtils::getIdentifierExpression).collect(Collectors.toList())); + context.getPinotQuery().setSelectList( + node.getTableScanColumns().stream().map(RequestUtils::getIdentifierExpression).collect(Collectors.toList())); return _aVoid; } @@ -226,6 +237,7 @@ public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPl child.visit(this, context); } } + /** * Helper method to attach the time boundary to the given PinotQuery. */ diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java index 4cbd4aa624..f0eb564327 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java @@ -62,10 +62,10 @@ public class QueryDispatcher { } public ResultTable submitAndReduce(long requestId, QueryPlan queryPlan, - MailboxService<TransferableBlock> mailboxService, long timeoutMs) + MailboxService<TransferableBlock> mailboxService, long timeoutMs, Map<String, String> queryOptions) throws Exception { // submit all the distributed stages. - int reduceStageId = submit(requestId, queryPlan, timeoutMs); + int reduceStageId = submit(requestId, queryPlan, timeoutMs, queryOptions); // run reduce stage and return result. MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(reduceStageId); MailboxReceiveOperator mailboxReceiveOperator = createReduceStageOperator(mailboxService, @@ -83,7 +83,7 @@ public class QueryDispatcher { return resultTable; } - public int submit(long requestId, QueryPlan queryPlan, long timeoutMs) + public int submit(long requestId, QueryPlan queryPlan, long timeoutMs, Map<String, String> queryOptions) throws Exception { int reduceStageId = -1; for (Map.Entry<Integer, StageMetadata> stage : queryPlan.getStageMetadataMap().entrySet()) { @@ -100,7 +100,9 @@ public class QueryDispatcher { Worker.QueryResponse response = client.submit(Worker.QueryRequest.newBuilder().setStagePlan( QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId, serverInstance))) .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(requestId)) - .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, String.valueOf(timeoutMs)).build()); + .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, String.valueOf(timeoutMs)) + .putAllMetadata(queryOptions).build()); + if (response.containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR)) { throw new RuntimeException( String.format("Unable to execute query plan at stage %s on server %s: ERROR: %s", stageId, diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java index cff9a26d37..b0f2dc0a5e 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java @@ -78,7 +78,7 @@ public class QueryDispatcherTest extends QueryTestSet { throws Exception { QueryPlan queryPlan = _queryEnvironment.planQuery(sql); QueryDispatcher dispatcher = new QueryDispatcher(); - int reducerStageId = dispatcher.submit(RANDOM_REQUEST_ID_GEN.nextLong(), queryPlan, 10_000L); + int reducerStageId = dispatcher.submit(RANDOM_REQUEST_ID_GEN.nextLong(), queryPlan, 10_000L, new HashMap<>()); Assert.assertTrue(PlannerUtils.isRootStage(reducerStageId)); dispatcher.shutdown(); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 3e499b30ce..0abc692fbb 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -295,6 +295,11 @@ public class CommonConstants { public static final String ORDER_BY_ALGORITHM = "orderByAlgorithm"; + public static final String MULTI_STAGE_LEAF_LIMIT = "multiStageLeafLimit"; + public static final String NUM_GROUPS_LIMIT = "numGroupsLimit"; + public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY = "maxInitialResultHolderCapacity"; + public static final String GROUP_TRIM_THRESHOLD = "groupTrimThreshold"; + // TODO: Remove these keys (only apply to PQL) after releasing 0.11.0 @Deprecated public static final String PRESERVE_TYPE = "preserveType"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org