This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fbd673482b [multistage] [feature] Add a query option to pass some v1 
limit (#9957)
fbd673482b is described below

commit fbd673482b909e656c924274927ab3f58d6dd6cf
Author: Yao Liu <y...@startree.ai>
AuthorDate: Thu Jan 19 14:54:57 2023 -0800

    [multistage] [feature] Add a query option to pass some v1 limit (#9957)
---
 .../MultiStageBrokerRequestHandler.java            | 10 ++--
 .../common/utils/config/QueryOptionsUtils.java     | 24 +++++++++
 .../core/plan/maker/InstancePlanMakerImplV2.java   | 34 ++++++++----
 .../tests/ClusterIntegrationTestUtils.java         | 30 +++++++++--
 .../tests/MultiStageEngineIntegrationTest.java     |  9 ++++
 .../runtime/plan/ServerRequestPlanVisitor.java     | 60 +++++++++++++---------
 .../pinot/query/service/QueryDispatcher.java       | 10 ++--
 .../pinot/query/service/QueryDispatcherTest.java   |  2 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |  5 ++
 9 files changed, 137 insertions(+), 47 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index c9c604d9db..f5295bad52 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -98,7 +98,8 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
 
     // it is OK to ignore the onDataAvailable callback because the broker 
top-level operators
     // always run in-line (they don't have any scheduler)
-    _mailboxService = MultiplexingMailboxService.newInstance(_reducerHostname, 
_reducerPort, config, ignored -> { });
+    _mailboxService = MultiplexingMailboxService.newInstance(_reducerHostname, 
_reducerPort, config, ignored -> {
+    });
 
     // TODO: move this to a startUp() function.
     _mailboxService.start();
@@ -165,7 +166,8 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
 
     ResultTable queryResults;
     try {
-      queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan, 
_mailboxService, queryTimeoutMs);
+      queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan, 
_mailboxService, queryTimeoutMs,
+          sqlNodeAndOptions.getOptions());
     } catch (Exception e) {
       LOGGER.info("query execution failed", e);
       return new 
BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
@@ -175,8 +177,8 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     long executionEndTimeNs = System.nanoTime();
 
     // Set total query processing time
-    long totalTimeMs = 
TimeUnit.NANOSECONDS.toMillis(sqlNodeAndOptions.getParseTimeNs()
-        + (executionEndTimeNs - compilationStartTimeNs));
+    long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(
+        sqlNodeAndOptions.getParseTimeNs() + (executionEndTimeNs - 
compilationStartTimeNs));
     brokerResponse.setTimeUsedMs(totalTimeMs);
     brokerResponse.setResultTable(queryResults);
     requestContext.setQueryProcessingTime(totalTimeMs);
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 0fa3c82b33..420924af0a 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -166,4 +166,28 @@ public class QueryOptionsUtils {
   public static String getOrderByAlgorithm(Map<String, String> queryOptions) {
     return queryOptions.get(QueryOptionKey.ORDER_BY_ALGORITHM);
   }
+
+  @Nullable
+  public static Integer getMultiStageLeafLimit(Map<String, String> 
queryOptions) {
+    String maxLeafLimitStr = 
queryOptions.get(QueryOptionKey.MULTI_STAGE_LEAF_LIMIT);
+    return maxLeafLimitStr != null ? Integer.parseInt(maxLeafLimitStr) : null;
+  }
+
+  @Nullable
+  public static Integer getNumGroupsLimit(Map<String, String> queryOptions) {
+    String maxNumGroupLimit = 
queryOptions.get(QueryOptionKey.NUM_GROUPS_LIMIT);
+    return maxNumGroupLimit != null ? Integer.parseInt(maxNumGroupLimit) : 
null;
+  }
+
+  @Nullable
+  public static Integer getMaxInitialResultHolderCapacity(Map<String, String> 
queryOptions) {
+    String maxInitResultCap = 
queryOptions.get(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY);
+    return maxInitResultCap != null ? Integer.parseInt(maxInitResultCap) : 
null;
+  }
+
+  @Nullable
+  public static Integer getGroupTrimThreshold(Map<String, String> 
queryOptions) {
+    String groupByTrimThreshold = 
queryOptions.get(QueryOptionKey.GROUP_TRIM_THRESHOLD);
+    return groupByTrimThreshold != null ? 
Integer.parseInt(groupByTrimThreshold) : null;
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 391a8797f0..4510e3e9fa 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -209,27 +209,39 @@ public class InstancePlanMakerImplV2 implements PlanMaker 
{
 
     // Set group-by query options
     if (QueryContextUtils.isAggregationQuery(queryContext) && 
queryContext.getGroupByExpressions() != null) {
-
       // Set maxInitialResultHolderCapacity
-      
queryContext.setMaxInitialResultHolderCapacity(_maxInitialResultHolderCapacity);
-
+      Integer initResultCap = 
QueryOptionsUtils.getMaxInitialResultHolderCapacity(queryOptions);
+      if (initResultCap != null) {
+        queryContext.setMaxInitialResultHolderCapacity(initResultCap);
+      } else {
+        
queryContext.setMaxInitialResultHolderCapacity(_maxInitialResultHolderCapacity);
+      }
       // Set numGroupsLimit
-      queryContext.setNumGroupsLimit(_numGroupsLimit);
-
+      Integer numGroupsLimit = 
QueryOptionsUtils.getNumGroupsLimit(queryOptions);
+      if (numGroupsLimit != null) {
+        queryContext.setNumGroupsLimit(numGroupsLimit);
+      } else {
+        queryContext.setNumGroupsLimit(_numGroupsLimit);
+      }
       // Set minSegmentGroupTrimSize
       Integer minSegmentGroupTrimSizeFromQuery = 
QueryOptionsUtils.getMinSegmentGroupTrimSize(queryOptions);
-      int minSegmentGroupTrimSize =
-          minSegmentGroupTrimSizeFromQuery != null ? 
minSegmentGroupTrimSizeFromQuery : _minSegmentGroupTrimSize;
-      queryContext.setMinSegmentGroupTrimSize(minSegmentGroupTrimSize);
-
+      if (minSegmentGroupTrimSizeFromQuery != null) {
+        
queryContext.setMinSegmentGroupTrimSize(minSegmentGroupTrimSizeFromQuery);
+      } else {
+        queryContext.setMinSegmentGroupTrimSize(_minSegmentGroupTrimSize);
+      }
       // Set minServerGroupTrimSize
       Integer minServerGroupTrimSizeFromQuery = 
QueryOptionsUtils.getMinServerGroupTrimSize(queryOptions);
       int minServerGroupTrimSize =
           minServerGroupTrimSizeFromQuery != null ? 
minServerGroupTrimSizeFromQuery : _minServerGroupTrimSize;
       queryContext.setMinServerGroupTrimSize(minServerGroupTrimSize);
-
       // Set groupTrimThreshold
-      queryContext.setGroupTrimThreshold(_groupByTrimThreshold);
+      Integer groupTrimThreshold = 
QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
+      if (groupTrimThreshold != null) {
+        queryContext.setGroupTrimThreshold(groupTrimThreshold);
+      } else {
+        queryContext.setGroupTrimThreshold(_groupByTrimThreshold);
+      }
     }
   }
 
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index 3a3e867348..67a1ea78f1 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -560,11 +560,28 @@ public class ClusterIntegrationTestUtils {
     testQuery(pinotQuery, brokerUrl, pinotConnection, h2Query, h2Connection, 
headers, null);
   }
 
+  /**
+   *  Compare # of rows in pinot and H2 only. Succeed if # of rows matches. 
Note this only applies to non-aggregation
+   *  query.
+   */
+  static void testQueryWithMatchingRowCount(String pinotQuery, String 
brokerUrl,
+      org.apache.pinot.client.Connection pinotConnection, String h2Query, 
Connection h2Connection,
+      @Nullable Map<String, String> headers, @Nullable Map<String, String> 
extraJsonProperties)
+      throws Exception {
+    try {
+      testQueryInternal(pinotQuery, brokerUrl, pinotConnection, h2Query, 
h2Connection, headers, extraJsonProperties,
+          true);
+    } catch (Exception e) {
+      failure(pinotQuery, h2Query, "Caught exception while testing query!", e);
+    }
+  }
+
   static void testQuery(String pinotQuery, String brokerUrl, 
org.apache.pinot.client.Connection pinotConnection,
       String h2Query, Connection h2Connection, @Nullable Map<String, String> 
headers,
       @Nullable Map<String, String> extraJsonProperties) {
     try {
-      testQueryInternal(pinotQuery, brokerUrl, pinotConnection, h2Query, 
h2Connection, headers, extraJsonProperties);
+      testQueryInternal(pinotQuery, brokerUrl, pinotConnection, h2Query, 
h2Connection, headers, extraJsonProperties,
+          false);
     } catch (Exception e) {
       failure(pinotQuery, h2Query, "Caught exception while testing query!", e);
     }
@@ -572,7 +589,8 @@ public class ClusterIntegrationTestUtils {
 
   private static void testQueryInternal(String pinotQuery, String brokerUrl,
       org.apache.pinot.client.Connection pinotConnection, String h2Query, 
Connection h2Connection,
-      @Nullable Map<String, String> headers, @Nullable Map<String, String> 
extraJsonProperties)
+      @Nullable Map<String, String> headers, @Nullable Map<String, String> 
extraJsonProperties,
+      boolean matchingRowCount)
       throws Exception {
     // broker response
     JsonNode pinotResponse = ClusterTest.postQuery(pinotQuery, brokerUrl, 
headers, extraJsonProperties);
@@ -609,7 +627,13 @@ public class ClusterIntegrationTestUtils {
       List<String> expectedOrderByValues = new ArrayList<>();
       int h2NumRows = getH2ExpectedValues(expectedValues, 
expectedOrderByValues, h2ResultSet, h2ResultSet.getMetaData(),
           orderByColumns);
-
+      if (matchingRowCount) {
+        if (numRows != h2NumRows) {
+          throw new RuntimeException("Pinot # of rows " + numRows + " doesn't 
match h2 # of rows " + h2NumRows);
+        } else {
+          return;
+        }
+      }
       comparePinotResultsWithExpectedValues(expectedValues, 
expectedOrderByValues, resultTableResultSet, orderByColumns,
           pinotQuery, h2Query, h2NumRows, pinotNumRecordsSelected);
     } else {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 79a1c9c786..651a8da1bd 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -99,6 +99,15 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
     super.testGeneratedQueries(false, true);
   }
 
+  @Test
+  public void testQueryOptions()
+      throws Exception {
+    String pinotQuery = "SET multistageLeafLimit = 1; SELECT * FROM mytable;";
+    String h2Query = "SELECT * FROM mytable limit 1";
+    ClusterIntegrationTestUtils.testQueryWithMatchingRowCount(pinotQuery, 
_brokerBaseApiUrl, getPinotConnection(),
+        h2Query, getH2Connection(), null, ImmutableMap.of("queryOptions", 
"useMultistageEngine=true"));
+  }
+
   @Override
   protected Connection getPinotConnection() {
     Properties properties = new Properties();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
index d7ef580824..071a9325bd 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -31,6 +31,7 @@ import org.apache.pinot.common.request.Expression;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.common.request.QuerySource;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.query.optimizer.QueryOptimizer;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
@@ -60,6 +61,8 @@ import 
org.apache.pinot.sql.parsers.rewriter.NonAggregationGroupByToDistinctQuer
 import org.apache.pinot.sql.parsers.rewriter.PredicateComparisonRewriter;
 import org.apache.pinot.sql.parsers.rewriter.QueryRewriter;
 import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -73,13 +76,12 @@ import 
org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
  */
 public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, 
ServerPlanRequestContext> {
   private static final int DEFAULT_LEAF_NODE_LIMIT = 10_000_000;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerRequestPlanVisitor.class);
   private static final List<String> QUERY_REWRITERS_CLASS_NAMES =
-      ImmutableList.of(
-          PredicateComparisonRewriter.class.getName(),
-          NonAggregationGroupByToDistinctQueryRewriter.class.getName()
-      );
-  private static final List<QueryRewriter> QUERY_REWRITERS = new ArrayList<>(
-      QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES));
+      ImmutableList.of(PredicateComparisonRewriter.class.getName(),
+          NonAggregationGroupByToDistinctQueryRewriter.class.getName());
+  private static final List<QueryRewriter> QUERY_REWRITERS =
+      new 
ArrayList<>(QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES));
   private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer();
 
   private static final ServerRequestPlanVisitor INSTANCE = new 
ServerRequestPlanVisitor();
@@ -92,11 +94,18 @@ public class ServerRequestPlanVisitor implements 
StageNodeVisitor<Void, ServerPl
     long requestId = 
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
     long timeoutMs = 
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
     PinotQuery pinotQuery = new PinotQuery();
-    pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT);
+    Integer leafNodeLimit = 
QueryOptionsUtils.getMultiStageLeafLimit(requestMetadataMap);
+    if (leafNodeLimit != null) {
+      pinotQuery.setLimit(leafNodeLimit);
+    } else {
+      pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT);
+    }
+    LOGGER.debug("QueryID" + requestId + " leafNodeLimit:" + leafNodeLimit);
     pinotQuery.setExplain(false);
-    ServerPlanRequestContext context = new 
ServerPlanRequestContext(mailboxService, requestId, stagePlan.getStageId(),
-        timeoutMs, stagePlan.getServerInstance().getHostname(), 
stagePlan.getServerInstance().getPort(),
-        stagePlan.getMetadataMap(), pinotQuery, tableType, timeBoundaryInfo);
+    ServerPlanRequestContext context =
+        new ServerPlanRequestContext(mailboxService, requestId, 
stagePlan.getStageId(), timeoutMs,
+            stagePlan.getServerInstance().getHostname(), 
stagePlan.getServerInstance().getPort(),
+            stagePlan.getMetadataMap(), pinotQuery, tableType, 
timeBoundaryInfo);
 
     // visit the plan and create query physical plan.
     ServerRequestPlanVisitor.walkStageNode(stagePlan.getStageRoot(), context);
@@ -112,8 +121,8 @@ public class ServerRequestPlanVisitor implements 
StageNodeVisitor<Void, ServerPl
     QUERY_OPTIMIZER.optimize(pinotQuery, tableConfig, schema);
 
     // 2. set pinot query options according to requestMetadataMap
-    
pinotQuery.setQueryOptions(ImmutableMap.of(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
-        String.valueOf(timeoutMs)));
+    pinotQuery.setQueryOptions(
+        
ImmutableMap.of(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, 
String.valueOf(timeoutMs)));
 
     // 3. wrapped around in broker request
     BrokerRequest brokerRequest = new BrokerRequest();
@@ -145,19 +154,20 @@ public class ServerRequestPlanVisitor implements 
StageNodeVisitor<Void, ServerPl
   public Void visitAggregate(AggregateNode node, ServerPlanRequestContext 
context) {
     visitChildren(node, context);
     // set group-by list
-    
context.getPinotQuery().setGroupByList(CalciteRexExpressionParser.convertGroupByList(
-        node.getGroupSet(), context.getPinotQuery()));
+    context.getPinotQuery()
+        
.setGroupByList(CalciteRexExpressionParser.convertGroupByList(node.getGroupSet(),
 context.getPinotQuery()));
     // set agg list
-    
context.getPinotQuery().setSelectList(CalciteRexExpressionParser.addSelectList(
-        context.getPinotQuery().getGroupByList(), node.getAggCalls(), 
context.getPinotQuery()));
+    context.getPinotQuery().setSelectList(
+        
CalciteRexExpressionParser.addSelectList(context.getPinotQuery().getGroupByList(),
 node.getAggCalls(),
+            context.getPinotQuery()));
     return _aVoid;
   }
 
   @Override
   public Void visitFilter(FilterNode node, ServerPlanRequestContext context) {
     visitChildren(node, context);
-    
context.getPinotQuery().setFilterExpression(CalciteRexExpressionParser.toExpression(
-        node.getCondition(), context.getPinotQuery()));
+    context.getPinotQuery()
+        
.setFilterExpression(CalciteRexExpressionParser.toExpression(node.getCondition(),
 context.getPinotQuery()));
     return _aVoid;
   }
 
@@ -182,8 +192,8 @@ public class ServerRequestPlanVisitor implements 
StageNodeVisitor<Void, ServerPl
   @Override
   public Void visitProject(ProjectNode node, ServerPlanRequestContext context) 
{
     visitChildren(node, context);
-    
context.getPinotQuery().setSelectList(CalciteRexExpressionParser.overwriteSelectList(
-        node.getProjects(), context.getPinotQuery()));
+    context.getPinotQuery()
+        
.setSelectList(CalciteRexExpressionParser.overwriteSelectList(node.getProjects(),
 context.getPinotQuery()));
     return _aVoid;
   }
 
@@ -191,8 +201,9 @@ public class ServerRequestPlanVisitor implements 
StageNodeVisitor<Void, ServerPl
   public Void visitSort(SortNode node, ServerPlanRequestContext context) {
     visitChildren(node, context);
     if (node.getCollationKeys().size() > 0) {
-      
context.getPinotQuery().setOrderByList(CalciteRexExpressionParser.convertOrderByList(node.getCollationKeys(),
-          node.getCollationDirections(), context.getPinotQuery()));
+      context.getPinotQuery().setOrderByList(
+          
CalciteRexExpressionParser.convertOrderByList(node.getCollationKeys(), 
node.getCollationDirections(),
+              context.getPinotQuery()));
     }
     if (node.getFetch() > 0) {
       context.getPinotQuery().setLimit(node.getFetch());
@@ -210,8 +221,8 @@ public class ServerRequestPlanVisitor implements 
StageNodeVisitor<Void, ServerPl
         
.tableNameWithType(TableNameBuilder.extractRawTableName(node.getTableName()));
     dataSource.setTableName(tableNameWithType);
     context.getPinotQuery().setDataSource(dataSource);
-    context.getPinotQuery().setSelectList(node.getTableScanColumns().stream()
-        
.map(RequestUtils::getIdentifierExpression).collect(Collectors.toList()));
+    context.getPinotQuery().setSelectList(
+        
node.getTableScanColumns().stream().map(RequestUtils::getIdentifierExpression).collect(Collectors.toList()));
     return _aVoid;
   }
 
@@ -226,6 +237,7 @@ public class ServerRequestPlanVisitor implements 
StageNodeVisitor<Void, ServerPl
       child.visit(this, context);
     }
   }
+
   /**
    * Helper method to attach the time boundary to the given PinotQuery.
    */
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index 4cbd4aa624..f0eb564327 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -62,10 +62,10 @@ public class QueryDispatcher {
   }
 
   public ResultTable submitAndReduce(long requestId, QueryPlan queryPlan,
-      MailboxService<TransferableBlock> mailboxService, long timeoutMs)
+      MailboxService<TransferableBlock> mailboxService, long timeoutMs, 
Map<String, String> queryOptions)
       throws Exception {
     // submit all the distributed stages.
-    int reduceStageId = submit(requestId, queryPlan, timeoutMs);
+    int reduceStageId = submit(requestId, queryPlan, timeoutMs, queryOptions);
     // run reduce stage and return result.
     MailboxReceiveNode reduceNode = (MailboxReceiveNode) 
queryPlan.getQueryStageMap().get(reduceStageId);
     MailboxReceiveOperator mailboxReceiveOperator = 
createReduceStageOperator(mailboxService,
@@ -83,7 +83,7 @@ public class QueryDispatcher {
     return resultTable;
   }
 
-  public int submit(long requestId, QueryPlan queryPlan, long timeoutMs)
+  public int submit(long requestId, QueryPlan queryPlan, long timeoutMs, 
Map<String, String> queryOptions)
       throws Exception {
     int reduceStageId = -1;
     for (Map.Entry<Integer, StageMetadata> stage : 
queryPlan.getStageMetadataMap().entrySet()) {
@@ -100,7 +100,9 @@ public class QueryDispatcher {
           Worker.QueryResponse response = 
client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
                   
QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId, 
serverInstance)))
               .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, 
String.valueOf(requestId))
-              .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, 
String.valueOf(timeoutMs)).build());
+              .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, 
String.valueOf(timeoutMs))
+              .putAllMetadata(queryOptions).build());
+
           if 
(response.containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR)) {
             throw new RuntimeException(
                 String.format("Unable to execute query plan at stage %s on 
server %s: ERROR: %s", stageId,
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
index cff9a26d37..b0f2dc0a5e 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
@@ -78,7 +78,7 @@ public class QueryDispatcherTest extends QueryTestSet {
       throws Exception {
     QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
     QueryDispatcher dispatcher = new QueryDispatcher();
-    int reducerStageId = dispatcher.submit(RANDOM_REQUEST_ID_GEN.nextLong(), 
queryPlan, 10_000L);
+    int reducerStageId = dispatcher.submit(RANDOM_REQUEST_ID_GEN.nextLong(), 
queryPlan, 10_000L, new HashMap<>());
     Assert.assertTrue(PlannerUtils.isRootStage(reducerStageId));
     dispatcher.shutdown();
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 3e499b30ce..0abc692fbb 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -295,6 +295,11 @@ public class CommonConstants {
 
         public static final String ORDER_BY_ALGORITHM = "orderByAlgorithm";
 
+        public static final String MULTI_STAGE_LEAF_LIMIT = 
"multiStageLeafLimit";
+        public static final String NUM_GROUPS_LIMIT = "numGroupsLimit";
+        public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY = 
"maxInitialResultHolderCapacity";
+        public static final String GROUP_TRIM_THRESHOLD = "groupTrimThreshold";
+
         // TODO: Remove these keys (only apply to PQL) after releasing 0.11.0
         @Deprecated
         public static final String PRESERVE_TYPE = "preserveType";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to