gortiz commented on code in PR #15445: URL: https://github.com/apache/pinot/pull/15445#discussion_r2028246011
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java: ########## @@ -257,54 +258,57 @@ public void processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, Map Map<String, String> opChainMetadata = consolidateMetadata(stageMetadata.getCustomProperties(), requestMetadata); // run pre-stage execution for all pipeline breakers - PipelineBreakerResult pipelineBreakerResult = + CompletableFuture<PipelineBreakerResult> pipelineBreakerResultFuture = CompletableFuture.supplyAsync(() -> PipelineBreakerExecutor.executePipelineBreakers(_opChainScheduler, _mailboxService, workerMetadata, stagePlan, - opChainMetadata, requestId, deadlineMs, parentContext, _sendStats.getAsBoolean()); - - // Send error block to all the receivers if pipeline breaker fails - if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() != null) { - TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock(); - int stageId = stageMetadata.getStageId(); - LOGGER.error("Error executing pipeline breaker for request: {}, stage: {}, sending error block: {}", requestId, - stageId, errorBlock.getExceptions()); - MailboxSendNode rootNode = (MailboxSendNode) stagePlan.getRootNode(); - List<RoutingInfo> routingInfos = new ArrayList<>(); - for (Integer receiverStageId : rootNode.getReceiverStageIds()) { - List<MailboxInfo> receiverMailboxInfos = - workerMetadata.getMailboxInfosMap().get(receiverStageId).getMailboxInfos(); - List<RoutingInfo> stageRoutingInfos = - MailboxIdUtils.toRoutingInfos(requestId, stageId, workerMetadata.getWorkerId(), receiverStageId, - receiverMailboxInfos); - routingInfos.addAll(stageRoutingInfos); - } - for (RoutingInfo routingInfo : routingInfos) { - try { - StatMap<MailboxSendOperator.StatKey> statMap = new StatMap<>(MailboxSendOperator.StatKey.class); - _mailboxService.getSendingMailbox(routingInfo.getHostname(), routingInfo.getPort(), - routingInfo.getMailboxId(), deadlineMs, statMap).send(errorBlock); - } catch (TimeoutException e) { - LOGGER.warn("Timed out sending error block to mailbox: {} for request: {}, stage: {}", - routingInfo.getMailboxId(), requestId, stageId, e); - } catch (Exception e) { - LOGGER.error("Caught exception sending error block to mailbox: {} for request: {}, stage: {}", - routingInfo.getMailboxId(), requestId, stageId, e); + opChainMetadata, requestId, deadlineMs, parentContext, _sendStats.getAsBoolean()), + _executorService); + + pipelineBreakerResultFuture.thenAcceptAsync(pipelineBreakerResult -> { Review Comment: No, we don't need that. I don't know why I decided to refactor it that way. Performance impact shouldn't be important, but I've changed the code to resemble the previous one by creating a blocking internal method with the same code we had before -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org