walterddr commented on code in PR #10401:
URL: https://github.com/apache/pinot/pull/10401#discussion_r1132678193


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -81,21 +84,38 @@ public ResultTable submitAndReduce(long requestId, 
QueryPlan queryPlan,
       MailboxService<TransferableBlock> mailboxService, long timeoutMs, 
Map<String, String> queryOptions,
       Map<Integer, ExecutionStatsAggregator> executionStatsAggregator)
       throws Exception {
-    // submit all the distributed stages.
-    int reduceStageId = submit(requestId, queryPlan, timeoutMs, queryOptions);
-    // run reduce stage and return result.
-    MailboxReceiveNode reduceNode = (MailboxReceiveNode) 
queryPlan.getQueryStageMap().get(reduceStageId);
-    MailboxReceiveOperator mailboxReceiveOperator = 
createReduceStageOperator(mailboxService,
-        
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
 requestId,
-        reduceNode.getSenderStageId(), reduceStageId, 
reduceNode.getDataSchema(),
-        new VirtualServerAddress(mailboxService.getHostname(), 
mailboxService.getMailboxPort(), 0), timeoutMs);
-    List<DataBlock> resultDataBlocks =
-        reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, 
executionStatsAggregator, queryPlan);
-    return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(),
-        queryPlan.getQueryStageMap().get(0).getDataSchema());
+    try {
+      // submit all the distributed stages.
+      int reduceStageId = submit(requestId, queryPlan, timeoutMs, 
queryOptions);
+      // run reduce stage and return result.
+      return runReducer(requestId, queryPlan, reduceStageId, timeoutMs, 
mailboxService, executionStatsAggregator);
+    } catch (Exception e) {
+      cancel(requestId, queryPlan);
+      throw new RuntimeException("Error executing query: " + 
ExplainPlanStageVisitor.explain(queryPlan), e);
+    }
   }
 
-  public int submit(long requestId, QueryPlan queryPlan, long timeoutMs, 
Map<String, String> queryOptions)
+  private void cancel(long requestId, QueryPlan queryPlan) {
+    Set<DispatchClient> dispatchClientSet = new HashSet<>();
+    for (Map.Entry<Integer, StageMetadata> stage : 
queryPlan.getStageMetadataMap().entrySet()) {
+      int stageId = stage.getKey();
+      // stage rooting at a mailbox receive node means reduce stage.
+      if (!(queryPlan.getQueryStageMap().get(stageId) instanceof 
MailboxReceiveNode)) {
+        List<VirtualServer> serverInstances = 
stage.getValue().getServerInstances();
+        for (VirtualServer serverInstance : serverInstances) {
+          String host = serverInstance.getHostname();
+          int servicePort = serverInstance.getQueryServicePort();
+          dispatchClientSet.add(getOrCreateDispatchClient(host, servicePort));

Review Comment:
   because dispatchClient can be the same for 2 virtual address. we only want 
to issue 1 request for each server



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to