walterddr commented on code in PR #10401: URL: https://github.com/apache/pinot/pull/10401#discussion_r1132678193
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java: ########## @@ -81,21 +84,38 @@ public ResultTable submitAndReduce(long requestId, QueryPlan queryPlan, MailboxService<TransferableBlock> mailboxService, long timeoutMs, Map<String, String> queryOptions, Map<Integer, ExecutionStatsAggregator> executionStatsAggregator) throws Exception { - // submit all the distributed stages. - int reduceStageId = submit(requestId, queryPlan, timeoutMs, queryOptions); - // run reduce stage and return result. - MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(reduceStageId); - MailboxReceiveOperator mailboxReceiveOperator = createReduceStageOperator(mailboxService, - queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(), requestId, - reduceNode.getSenderStageId(), reduceStageId, reduceNode.getDataSchema(), - new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getMailboxPort(), 0), timeoutMs); - List<DataBlock> resultDataBlocks = - reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, executionStatsAggregator, queryPlan); - return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(), - queryPlan.getQueryStageMap().get(0).getDataSchema()); + try { + // submit all the distributed stages. + int reduceStageId = submit(requestId, queryPlan, timeoutMs, queryOptions); + // run reduce stage and return result. + return runReducer(requestId, queryPlan, reduceStageId, timeoutMs, mailboxService, executionStatsAggregator); + } catch (Exception e) { + cancel(requestId, queryPlan); + throw new RuntimeException("Error executing query: " + ExplainPlanStageVisitor.explain(queryPlan), e); + } } - public int submit(long requestId, QueryPlan queryPlan, long timeoutMs, Map<String, String> queryOptions) + private void cancel(long requestId, QueryPlan queryPlan) { + Set<DispatchClient> dispatchClientSet = new HashSet<>(); + for (Map.Entry<Integer, StageMetadata> stage : queryPlan.getStageMetadataMap().entrySet()) { + int stageId = stage.getKey(); + // stage rooting at a mailbox receive node means reduce stage. + if (!(queryPlan.getQueryStageMap().get(stageId) instanceof MailboxReceiveNode)) { + List<VirtualServer> serverInstances = stage.getValue().getServerInstances(); + for (VirtualServer serverInstance : serverInstances) { + String host = serverInstance.getHostname(); + int servicePort = serverInstance.getQueryServicePort(); + dispatchClientSet.add(getOrCreateDispatchClient(host, servicePort)); Review Comment: because dispatchClient can be the same for 2 virtual address. we only want to issue 1 request for each server -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org