yashmayya commented on code in PR #15445: URL: https://github.com/apache/pinot/pull/15445#discussion_r2044463716
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java: ########## @@ -329,13 +364,12 @@ private <W> void submitStage( * applying the submitFunction to each worker and the consumer to the list of results. Review Comment: The Javadoc needs to be updated here - it currently says `Submits each stage in the request to the workers and waits for all workers to complete` which is now incorrect. ########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java: ########## @@ -898,10 +898,33 @@ public static class Server { public static final int DEFAULT_MSE_MIN_GROUP_TRIM_SIZE = 5000; // TODO: Merge this with "mse" + /** + * The ExecutorServiceProvider to use for execution threads, which are the ones that execute + * MultiStageOperators (and SSE operators in the leaf stages). + * + * It is recommended to use cached. In case fixed is used, it should use a large enough number of threads or + * parent operators may consume all threads. + * In Java 21 or newer, virtual threads are a good solution. Although Apache Pinot doesn't include this option yet, + * it is trivial to implement that plugin. + * + * See QueryRunner + */ public static final String MULTISTAGE_EXECUTOR = "multistage.executor"; public static final String MULTISTAGE_EXECUTOR_CONFIG_PREFIX = QUERY_EXECUTOR_CONFIG_PREFIX + "." + MULTISTAGE_EXECUTOR; public static final String DEFAULT_MULTISTAGE_EXECUTOR_TYPE = "cached"; + /** + * The ExecutorServiceProvider to be used for submission threads, which are the ones + * that receive requests in protobuf and transform them into MultiStageOperators. + * + * It is recommended to use a fixed thread pool here, although defaults to cached for historical + * reasons. Review Comment: Why aren't we changing this to `fixed` with your changes in this patch? Is it in order to be sure that we don't introduce a regression? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java: ########## @@ -159,37 +164,49 @@ public void submit(Worker.QueryRequest request, StreamObserver<Worker.QueryRespo return; } - try (QueryThreadContext.CloseableContext queryTlClosable = QueryThreadContext.openFromRequestMetadata(reqMetadata); - QueryThreadContext.CloseableContext mseTlCloseable = MseWorkerThreadContext.open()) { + try (QueryThreadContext.CloseableContext qClosable = QueryThreadContext.openFromRequestMetadata(reqMetadata); + QueryThreadContext.CloseableContext mseCloseable = MseWorkerThreadContext.open()) { + long requestId = QueryThreadContext.getRequestId(); QueryThreadContext.setQueryEngine("mse"); - Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId), ThreadExecutionContext.TaskType.MSE); - ThreadExecutionContext parentContext = Tracing.getThreadAccountant().getThreadExecutionContext(); - try { - forEachStage(request, - (stagePlan, workerMetadata) -> { + // Submit the stage for each worker + List<CompletableFuture<List<Object>>> futures = forEachStageAndWorker(request, + (stagePlan, workerMetadata) -> { + Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId), ThreadExecutionContext.TaskType.MSE); + ThreadExecutionContext parentContext = Tracing.getThreadAccountant().getThreadExecutionContext(); + + try { _queryRunner.processQuery(workerMetadata, stagePlan, reqMetadata, parentContext); - return null; - }, - (ignored) -> { - }); - } catch (ExecutionException | InterruptedException | TimeoutException | RuntimeException e) { - LOGGER.error("Caught exception while submitting request: {}", requestId, e); - String errorMsg = "Caught exception while submitting request: " + e.getMessage(); - responseObserver.onNext(Worker.QueryResponse.newBuilder() - .putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR, errorMsg) - .build()); - responseObserver.onCompleted(); - return; - } finally { - Tracing.ThreadAccountantOps.clear(); - } - responseObserver.onNext( - Worker.QueryResponse.newBuilder() - .putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_OK, "") + } finally { + Tracing.ThreadAccountantOps.clear(); + } + return null; + }); + + // A completable future that will finish when all submit task finish or on timoeut Review Comment: ```suggestion // A completable future that will finish when all submit task finish or on timeout ``` nit ########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java: ########## @@ -898,10 +898,33 @@ public static class Server { public static final int DEFAULT_MSE_MIN_GROUP_TRIM_SIZE = 5000; // TODO: Merge this with "mse" + /** + * The ExecutorServiceProvider to use for execution threads, which are the ones that execute + * MultiStageOperators (and SSE operators in the leaf stages). + * + * It is recommended to use cached. In case fixed is used, it should use a large enough number of threads or + * parent operators may consume all threads. + * In Java 21 or newer, virtual threads are a good solution. Although Apache Pinot doesn't include this option yet, + * it is trivial to implement that plugin. + * + * See QueryRunner + */ public static final String MULTISTAGE_EXECUTOR = "multistage.executor"; public static final String MULTISTAGE_EXECUTOR_CONFIG_PREFIX = QUERY_EXECUTOR_CONFIG_PREFIX + "." + MULTISTAGE_EXECUTOR; public static final String DEFAULT_MULTISTAGE_EXECUTOR_TYPE = "cached"; + /** + * The ExecutorServiceProvider to be used for submission threads, which are the ones + * that receive requests in protobuf and transform them into MultiStageOperators. + * + * It is recommended to use a fixed thread pool here, although defaults to cached for historical + * reasons. + * + * See QueryServer + */ + public static final String MULTISTAGE_SUBMISSION_EXEC_CONFIG_PREFIX = + QUERY_EXECUTOR_CONFIG_PREFIX + "multistage.submission"; Review Comment: This needs a `.` separator after `QUERY_EXECUTOR_CONFIG_PREFIX` ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java: ########## @@ -159,37 +164,49 @@ public void submit(Worker.QueryRequest request, StreamObserver<Worker.QueryRespo return; } - try (QueryThreadContext.CloseableContext queryTlClosable = QueryThreadContext.openFromRequestMetadata(reqMetadata); - QueryThreadContext.CloseableContext mseTlCloseable = MseWorkerThreadContext.open()) { + try (QueryThreadContext.CloseableContext qClosable = QueryThreadContext.openFromRequestMetadata(reqMetadata); + QueryThreadContext.CloseableContext mseCloseable = MseWorkerThreadContext.open()) { + long requestId = QueryThreadContext.getRequestId(); QueryThreadContext.setQueryEngine("mse"); - Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId), ThreadExecutionContext.TaskType.MSE); - ThreadExecutionContext parentContext = Tracing.getThreadAccountant().getThreadExecutionContext(); - try { - forEachStage(request, - (stagePlan, workerMetadata) -> { + // Submit the stage for each worker + List<CompletableFuture<List<Object>>> futures = forEachStageAndWorker(request, + (stagePlan, workerMetadata) -> { + Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId), ThreadExecutionContext.TaskType.MSE); + ThreadExecutionContext parentContext = Tracing.getThreadAccountant().getThreadExecutionContext(); + + try { _queryRunner.processQuery(workerMetadata, stagePlan, reqMetadata, parentContext); - return null; - }, - (ignored) -> { - }); - } catch (ExecutionException | InterruptedException | TimeoutException | RuntimeException e) { - LOGGER.error("Caught exception while submitting request: {}", requestId, e); - String errorMsg = "Caught exception while submitting request: " + e.getMessage(); - responseObserver.onNext(Worker.QueryResponse.newBuilder() - .putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR, errorMsg) - .build()); - responseObserver.onCompleted(); - return; - } finally { - Tracing.ThreadAccountantOps.clear(); - } - responseObserver.onNext( - Worker.QueryResponse.newBuilder() - .putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_OK, "") + } finally { + Tracing.ThreadAccountantOps.clear(); + } + return null; + }); + + // A completable future that will finish when all submit task finish or on timoeut + CompletableFuture<Void> allCompleted = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .orTimeout(QueryThreadContext.getDeadlineMs(), TimeUnit.MILLISECONDS); + // When this future completes, notify the broker. + allCompleted.handle((result, error) -> { Review Comment: Why not `whenComplete` instead? ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java: ########## @@ -208,43 +225,61 @@ public void explain(Worker.QueryRequest request, StreamObserver<Worker.ExplainRe return; } - try (QueryThreadContext.CloseableContext queryTlClosable = QueryThreadContext.openFromRequestMetadata(reqMetadata); + try (QueryThreadContext.CloseableContext qTlClosable = QueryThreadContext.openFromRequestMetadata(reqMetadata); QueryThreadContext.CloseableContext mseTlCloseable = MseWorkerThreadContext.open()) { - try { - forEachStage(request, - (stagePlan, workerMetadata) -> _queryRunner.explainQuery(workerMetadata, stagePlan, reqMetadata), - (plans) -> { - Worker.ExplainResponse.Builder builder = Worker.ExplainResponse.newBuilder(); - for (StagePlan plan : plans) { - ByteString rootAsBytes = PlanNodeSerializer.process(plan.getRootNode()).toByteString(); + // Explain the stage for each worker + List<CompletableFuture<List<StagePlan>>> futures = forEachStageAndWorker(request, + (stagePlan, workerMetadata) -> _queryRunner.explainQuery(workerMetadata, stagePlan, reqMetadata)); + CompletableFuture<?>[] responseFutures = futures.stream() + .map(plansFuture -> + plansFuture.thenApply(plans -> { + Worker.ExplainResponse.Builder builder = Worker.ExplainResponse.newBuilder(); + for (StagePlan plan : plans) { + ByteString rootAsBytes = PlanNodeSerializer.process(plan.getRootNode()).toByteString(); + + StageMetadata metadata = plan.getStageMetadata(); + List<Worker.WorkerMetadata> protoWorkerMetadataList = + QueryPlanSerDeUtils.toProtoWorkerMetadataList(metadata.getWorkerMetadataList()); - StageMetadata metadata = plan.getStageMetadata(); - List<Worker.WorkerMetadata> protoWorkerMetadataList = - QueryPlanSerDeUtils.toProtoWorkerMetadataList(metadata.getWorkerMetadataList()); + builder.addStagePlan(Worker.StagePlan.newBuilder().setRootNode(rootAsBytes).setStageMetadata( + Worker.StageMetadata.newBuilder().setStageId(metadata.getStageId()) + .addAllWorkerMetadata(protoWorkerMetadataList) + .setCustomProperty(QueryPlanSerDeUtils.toProtoProperties(metadata.getCustomProperties())))); + } + builder.putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_OK, ""); + synchronized (responseObserver) { + responseObserver.onNext(builder.build()); + } + return null; + }) + ).toArray(CompletableFuture[]::new); - builder.addStagePlan(Worker.StagePlan.newBuilder().setRootNode(rootAsBytes).setStageMetadata( - Worker.StageMetadata.newBuilder().setStageId(metadata.getStageId()) - .addAllWorkerMetadata(protoWorkerMetadataList) - .setCustomProperty(QueryPlanSerDeUtils.toProtoProperties(metadata.getCustomProperties())))); - } - builder.putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_OK, ""); - responseObserver.onNext(builder.build()); - }); - } catch (ExecutionException | InterruptedException | TimeoutException | RuntimeException e) { - long requestId = QueryThreadContext.getRequestId(); - LOGGER.error("Caught exception while submitting request: {}", requestId, e); - String errorMsg = "Caught exception while submitting request: " + e.getMessage(); - responseObserver.onNext(Worker.ExplainResponse.newBuilder() - .putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_ERROR, errorMsg) + // A completable future that will finish when all submit task finish or on timoeut + CompletableFuture<Void> allCompleted = CompletableFuture.allOf(responseFutures) + .orTimeout(QueryThreadContext.getDeadlineMs(), TimeUnit.MILLISECONDS); + // When this future completes, notify the broker. + allCompleted.handle((result, error) -> { + if (error != null) { + long requestId = QueryThreadContext.getRequestId(); + LOGGER.error("Caught exception while submitting request: {}", requestId, error); + String errorMsg = "Caught exception while submitting request: " + error.getMessage(); + synchronized (responseObserver) { Review Comment: Nice catch, I wasn't aware that `StreamObserver` doesn't guarantee thread safety. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java: ########## @@ -130,6 +124,15 @@ public void start() { } } + private <T extends ServerBuilder<T>> Server buildGrpcServer(ServerBuilder<T> builder) { + return builder + // By using directExecutor, GRPC doesn't need to manage its own thread pool Review Comment: IIUC, this means that the request processing callbacks will happen directly on the transport thread - i.e., the Netty thread? And this is only recommended when the request processing is non-blocking - however, we still do proto deser on the request processing thread. I assume that that's okay because it should have minimal overhead and we don't really risk blocking Netty threads for handling other connections and requests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org