Jackie-Jiang commented on code in PR #15445: URL: https://github.com/apache/pinot/pull/15445#discussion_r2027511719
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java: ########## @@ -159,37 +164,49 @@ public void submit(Worker.QueryRequest request, StreamObserver<Worker.QueryRespo return; } - try (QueryThreadContext.CloseableContext queryTlClosable = QueryThreadContext.openFromRequestMetadata(reqMetadata); + try (QueryThreadContext.CloseableContext qTlClosable = QueryThreadContext.openFromRequestMetadata(reqMetadata); Review Comment: (minor) The naming is quite confusing. What does `tl` stands for? ########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java: ########## @@ -898,10 +898,33 @@ public static class Server { public static final int DEFAULT_MSE_MIN_GROUP_TRIM_SIZE = 5000; // TODO: Merge this with "mse" + /** + * The ExecutorServiceProvider to use for execution threads, which are the ones that execute + * MultiStageOperators (and SSE operators in the leaf stages). + * + * It is recommended to use cached. In case fixed is used, it should use a large enough number of threads or + * parent operators may consume all threads. + * In Java 21 or newer, virtual threads are a good solution. Although Apache Pinot doesn't include this option yet, + * it is trivial to implement that plugin. + * + * See QueryRunner + */ public static final String MULTISTAGE_EXECUTOR = "multistage.executor"; public static final String MULTISTAGE_EXECUTOR_CONFIG_PREFIX = QUERY_EXECUTOR_CONFIG_PREFIX + "." + MULTISTAGE_EXECUTOR; public static final String DEFAULT_MULTISTAGE_EXECUTOR_TYPE = "cached"; + /** + * The ExecutorServiceProvider to be used for submission threads, which are the ones + * that receive requests in protobuf and transform them into MultiStageOperators. + * + * It is recommended to use a fixed thread pool here, although defaults to cached for historical + * reasons. + * + * See QueryServer + */ + public static final String MULTISTAGE_SUBMISSION_EXEC_CONFIG_PREFIX = + QUERY_EXECUTOR_CONFIG_PREFIX + "." + MULTISTAGE_EXECUTOR; Review Comment: This is the same as `MULTISTAGE_EXECUTOR_CONFIG_PREFIX` ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java: ########## @@ -257,54 +258,57 @@ public void processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, Map Map<String, String> opChainMetadata = consolidateMetadata(stageMetadata.getCustomProperties(), requestMetadata); // run pre-stage execution for all pipeline breakers - PipelineBreakerResult pipelineBreakerResult = + CompletableFuture<PipelineBreakerResult> pipelineBreakerResultFuture = CompletableFuture.supplyAsync(() -> PipelineBreakerExecutor.executePipelineBreakers(_opChainScheduler, _mailboxService, workerMetadata, stagePlan, - opChainMetadata, requestId, deadlineMs, parentContext, _sendStats.getAsBoolean()); - - // Send error block to all the receivers if pipeline breaker fails - if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() != null) { - TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock(); - int stageId = stageMetadata.getStageId(); - LOGGER.error("Error executing pipeline breaker for request: {}, stage: {}, sending error block: {}", requestId, - stageId, errorBlock.getExceptions()); - MailboxSendNode rootNode = (MailboxSendNode) stagePlan.getRootNode(); - List<RoutingInfo> routingInfos = new ArrayList<>(); - for (Integer receiverStageId : rootNode.getReceiverStageIds()) { - List<MailboxInfo> receiverMailboxInfos = - workerMetadata.getMailboxInfosMap().get(receiverStageId).getMailboxInfos(); - List<RoutingInfo> stageRoutingInfos = - MailboxIdUtils.toRoutingInfos(requestId, stageId, workerMetadata.getWorkerId(), receiverStageId, - receiverMailboxInfos); - routingInfos.addAll(stageRoutingInfos); - } - for (RoutingInfo routingInfo : routingInfos) { - try { - StatMap<MailboxSendOperator.StatKey> statMap = new StatMap<>(MailboxSendOperator.StatKey.class); - _mailboxService.getSendingMailbox(routingInfo.getHostname(), routingInfo.getPort(), - routingInfo.getMailboxId(), deadlineMs, statMap).send(errorBlock); - } catch (TimeoutException e) { - LOGGER.warn("Timed out sending error block to mailbox: {} for request: {}, stage: {}", - routingInfo.getMailboxId(), requestId, stageId, e); - } catch (Exception e) { - LOGGER.error("Caught exception sending error block to mailbox: {} for request: {}, stage: {}", - routingInfo.getMailboxId(), requestId, stageId, e); + opChainMetadata, requestId, deadlineMs, parentContext, _sendStats.getAsBoolean()), + _executorService); + + pipelineBreakerResultFuture.thenAcceptAsync(pipelineBreakerResult -> { Review Comment: Why do we need to split it into 2 steps? Does it involve extra overhead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org