yashmayya commented on code in PR #15445:
URL: https://github.com/apache/pinot/pull/15445#discussion_r2044463716


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java:
##########
@@ -329,13 +364,12 @@ private <W> void submitStage(
    * applying the submitFunction to each worker and the consumer to the list 
of results.

Review Comment:
   The Javadoc needs to be updated here - it currently says `Submits each stage 
in the request to the workers and waits for all workers to complete` which is 
now incorrect.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -898,10 +898,33 @@ public static class Server {
     public static final int DEFAULT_MSE_MIN_GROUP_TRIM_SIZE = 5000;
 
     // TODO: Merge this with "mse"
+    /**
+     * The ExecutorServiceProvider to use for execution threads, which are the 
ones that execute
+     * MultiStageOperators (and SSE operators in the leaf stages).
+     *
+     * It is recommended to use cached. In case fixed is used, it should use a 
large enough number of threads or
+     * parent operators may consume all threads.
+     * In Java 21 or newer, virtual threads are a good solution. Although 
Apache Pinot doesn't include this option yet,
+     * it is trivial to implement that plugin.
+     *
+     * See QueryRunner
+     */
     public static final String MULTISTAGE_EXECUTOR = "multistage.executor";
     public static final String MULTISTAGE_EXECUTOR_CONFIG_PREFIX =
         QUERY_EXECUTOR_CONFIG_PREFIX + "." + MULTISTAGE_EXECUTOR;
     public static final String DEFAULT_MULTISTAGE_EXECUTOR_TYPE = "cached";
+    /**
+     * The ExecutorServiceProvider to be used for submission threads, which 
are the ones
+     * that receive requests in protobuf and transform them into 
MultiStageOperators.
+     *
+     * It is recommended to use a fixed thread pool here, although defaults to 
cached for historical
+     * reasons.

Review Comment:
   Why aren't we changing this to `fixed` with your changes in this patch? Is 
it in order to be sure that we don't introduce a regression?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java:
##########
@@ -159,37 +164,49 @@ public void submit(Worker.QueryRequest request, 
StreamObserver<Worker.QueryRespo
       return;
     }
 
-    try (QueryThreadContext.CloseableContext queryTlClosable = 
QueryThreadContext.openFromRequestMetadata(reqMetadata);
-        QueryThreadContext.CloseableContext mseTlCloseable = 
MseWorkerThreadContext.open()) {
+    try (QueryThreadContext.CloseableContext qClosable = 
QueryThreadContext.openFromRequestMetadata(reqMetadata);
+        QueryThreadContext.CloseableContext mseCloseable = 
MseWorkerThreadContext.open()) {
+
       long requestId = QueryThreadContext.getRequestId();
       QueryThreadContext.setQueryEngine("mse");
 
-      Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId), 
ThreadExecutionContext.TaskType.MSE);
-      ThreadExecutionContext parentContext = 
Tracing.getThreadAccountant().getThreadExecutionContext();
-      try {
-        forEachStage(request,
-            (stagePlan, workerMetadata) -> {
+      // Submit the stage for each worker
+      List<CompletableFuture<List<Object>>> futures = 
forEachStageAndWorker(request,
+          (stagePlan, workerMetadata) -> {
+            Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId), 
ThreadExecutionContext.TaskType.MSE);
+            ThreadExecutionContext parentContext = 
Tracing.getThreadAccountant().getThreadExecutionContext();
+
+            try {
               _queryRunner.processQuery(workerMetadata, stagePlan, 
reqMetadata, parentContext);
-              return null;
-            },
-            (ignored) -> {
-            });
-      } catch (ExecutionException | InterruptedException | TimeoutException | 
RuntimeException e) {
-        LOGGER.error("Caught exception while submitting request: {}", 
requestId, e);
-        String errorMsg = "Caught exception while submitting request: " + 
e.getMessage();
-        responseObserver.onNext(Worker.QueryResponse.newBuilder()
-            
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR, 
errorMsg)
-                .build());
-        responseObserver.onCompleted();
-        return;
-      } finally {
-        Tracing.ThreadAccountantOps.clear();
-      }
-      responseObserver.onNext(
-          Worker.QueryResponse.newBuilder()
-              
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_OK, "")
+            } finally {
+              Tracing.ThreadAccountantOps.clear();
+            }
+            return null;
+          });
+
+      // A completable future that will finish when all submit task finish or 
on timoeut

Review Comment:
   ```suggestion
         // A completable future that will finish when all submit task finish 
or on timeout
   ```
   nit



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -898,10 +898,33 @@ public static class Server {
     public static final int DEFAULT_MSE_MIN_GROUP_TRIM_SIZE = 5000;
 
     // TODO: Merge this with "mse"
+    /**
+     * The ExecutorServiceProvider to use for execution threads, which are the 
ones that execute
+     * MultiStageOperators (and SSE operators in the leaf stages).
+     *
+     * It is recommended to use cached. In case fixed is used, it should use a 
large enough number of threads or
+     * parent operators may consume all threads.
+     * In Java 21 or newer, virtual threads are a good solution. Although 
Apache Pinot doesn't include this option yet,
+     * it is trivial to implement that plugin.
+     *
+     * See QueryRunner
+     */
     public static final String MULTISTAGE_EXECUTOR = "multistage.executor";
     public static final String MULTISTAGE_EXECUTOR_CONFIG_PREFIX =
         QUERY_EXECUTOR_CONFIG_PREFIX + "." + MULTISTAGE_EXECUTOR;
     public static final String DEFAULT_MULTISTAGE_EXECUTOR_TYPE = "cached";
+    /**
+     * The ExecutorServiceProvider to be used for submission threads, which 
are the ones
+     * that receive requests in protobuf and transform them into 
MultiStageOperators.
+     *
+     * It is recommended to use a fixed thread pool here, although defaults to 
cached for historical
+     * reasons.
+     *
+     * See QueryServer
+     */
+    public static final String MULTISTAGE_SUBMISSION_EXEC_CONFIG_PREFIX =
+        QUERY_EXECUTOR_CONFIG_PREFIX + "multistage.submission";

Review Comment:
   This needs a `.` separator after `QUERY_EXECUTOR_CONFIG_PREFIX`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java:
##########
@@ -159,37 +164,49 @@ public void submit(Worker.QueryRequest request, 
StreamObserver<Worker.QueryRespo
       return;
     }
 
-    try (QueryThreadContext.CloseableContext queryTlClosable = 
QueryThreadContext.openFromRequestMetadata(reqMetadata);
-        QueryThreadContext.CloseableContext mseTlCloseable = 
MseWorkerThreadContext.open()) {
+    try (QueryThreadContext.CloseableContext qClosable = 
QueryThreadContext.openFromRequestMetadata(reqMetadata);
+        QueryThreadContext.CloseableContext mseCloseable = 
MseWorkerThreadContext.open()) {
+
       long requestId = QueryThreadContext.getRequestId();
       QueryThreadContext.setQueryEngine("mse");
 
-      Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId), 
ThreadExecutionContext.TaskType.MSE);
-      ThreadExecutionContext parentContext = 
Tracing.getThreadAccountant().getThreadExecutionContext();
-      try {
-        forEachStage(request,
-            (stagePlan, workerMetadata) -> {
+      // Submit the stage for each worker
+      List<CompletableFuture<List<Object>>> futures = 
forEachStageAndWorker(request,
+          (stagePlan, workerMetadata) -> {
+            Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId), 
ThreadExecutionContext.TaskType.MSE);
+            ThreadExecutionContext parentContext = 
Tracing.getThreadAccountant().getThreadExecutionContext();
+
+            try {
               _queryRunner.processQuery(workerMetadata, stagePlan, 
reqMetadata, parentContext);
-              return null;
-            },
-            (ignored) -> {
-            });
-      } catch (ExecutionException | InterruptedException | TimeoutException | 
RuntimeException e) {
-        LOGGER.error("Caught exception while submitting request: {}", 
requestId, e);
-        String errorMsg = "Caught exception while submitting request: " + 
e.getMessage();
-        responseObserver.onNext(Worker.QueryResponse.newBuilder()
-            
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR, 
errorMsg)
-                .build());
-        responseObserver.onCompleted();
-        return;
-      } finally {
-        Tracing.ThreadAccountantOps.clear();
-      }
-      responseObserver.onNext(
-          Worker.QueryResponse.newBuilder()
-              
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_OK, "")
+            } finally {
+              Tracing.ThreadAccountantOps.clear();
+            }
+            return null;
+          });
+
+      // A completable future that will finish when all submit task finish or 
on timoeut
+      CompletableFuture<Void> allCompleted = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
+          .orTimeout(QueryThreadContext.getDeadlineMs(), 
TimeUnit.MILLISECONDS);
+      // When this future completes, notify the broker.
+      allCompleted.handle((result, error) -> {

Review Comment:
   Why not `whenComplete` instead?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java:
##########
@@ -208,43 +225,61 @@ public void explain(Worker.QueryRequest request, 
StreamObserver<Worker.ExplainRe
       return;
     }
 
-    try (QueryThreadContext.CloseableContext queryTlClosable = 
QueryThreadContext.openFromRequestMetadata(reqMetadata);
+    try (QueryThreadContext.CloseableContext qTlClosable = 
QueryThreadContext.openFromRequestMetadata(reqMetadata);
         QueryThreadContext.CloseableContext mseTlCloseable = 
MseWorkerThreadContext.open()) {
-      try {
-        forEachStage(request,
-            (stagePlan, workerMetadata) -> 
_queryRunner.explainQuery(workerMetadata, stagePlan, reqMetadata),
-            (plans) -> {
-              Worker.ExplainResponse.Builder builder = 
Worker.ExplainResponse.newBuilder();
-              for (StagePlan plan : plans) {
-                ByteString rootAsBytes = 
PlanNodeSerializer.process(plan.getRootNode()).toByteString();
+      // Explain the stage for each worker
+      List<CompletableFuture<List<StagePlan>>> futures = 
forEachStageAndWorker(request,
+          (stagePlan, workerMetadata) -> 
_queryRunner.explainQuery(workerMetadata, stagePlan, reqMetadata));
+      CompletableFuture<?>[] responseFutures = futures.stream()
+          .map(plansFuture ->
+              plansFuture.thenApply(plans -> {
+                Worker.ExplainResponse.Builder builder = 
Worker.ExplainResponse.newBuilder();
+                for (StagePlan plan : plans) {
+                  ByteString rootAsBytes = 
PlanNodeSerializer.process(plan.getRootNode()).toByteString();
+
+                  StageMetadata metadata = plan.getStageMetadata();
+                  List<Worker.WorkerMetadata> protoWorkerMetadataList =
+                      
QueryPlanSerDeUtils.toProtoWorkerMetadataList(metadata.getWorkerMetadataList());
 
-                StageMetadata metadata = plan.getStageMetadata();
-                List<Worker.WorkerMetadata> protoWorkerMetadataList =
-                    
QueryPlanSerDeUtils.toProtoWorkerMetadataList(metadata.getWorkerMetadataList());
+                  
builder.addStagePlan(Worker.StagePlan.newBuilder().setRootNode(rootAsBytes).setStageMetadata(
+                      
Worker.StageMetadata.newBuilder().setStageId(metadata.getStageId())
+                          .addAllWorkerMetadata(protoWorkerMetadataList)
+                          
.setCustomProperty(QueryPlanSerDeUtils.toProtoProperties(metadata.getCustomProperties()))));
+                }
+                
builder.putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_OK,
 "");
+                synchronized (responseObserver) {
+                  responseObserver.onNext(builder.build());
+                }
+                return null;
+              })
+          ).toArray(CompletableFuture[]::new);
 
-                
builder.addStagePlan(Worker.StagePlan.newBuilder().setRootNode(rootAsBytes).setStageMetadata(
-                    
Worker.StageMetadata.newBuilder().setStageId(metadata.getStageId())
-                        .addAllWorkerMetadata(protoWorkerMetadataList)
-                        
.setCustomProperty(QueryPlanSerDeUtils.toProtoProperties(metadata.getCustomProperties()))));
-              }
-              
builder.putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_OK,
 "");
-              responseObserver.onNext(builder.build());
-            });
-      } catch (ExecutionException | InterruptedException | TimeoutException | 
RuntimeException e) {
-        long requestId = QueryThreadContext.getRequestId();
-        LOGGER.error("Caught exception while submitting request: {}", 
requestId, e);
-        String errorMsg = "Caught exception while submitting request: " + 
e.getMessage();
-        responseObserver.onNext(Worker.ExplainResponse.newBuilder()
-            
.putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_ERROR,
 errorMsg)
+      // A completable future that will finish when all submit task finish or 
on timoeut
+      CompletableFuture<Void> allCompleted = 
CompletableFuture.allOf(responseFutures)
+          .orTimeout(QueryThreadContext.getDeadlineMs(), 
TimeUnit.MILLISECONDS);
+      // When this future completes, notify the broker.
+      allCompleted.handle((result, error) -> {
+        if (error != null) {
+          long requestId = QueryThreadContext.getRequestId();
+          LOGGER.error("Caught exception while submitting request: {}", 
requestId, error);
+          String errorMsg = "Caught exception while submitting request: " + 
error.getMessage();
+          synchronized (responseObserver) {

Review Comment:
   Nice catch, I wasn't aware that `StreamObserver` doesn't guarantee thread 
safety. 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java:
##########
@@ -130,6 +124,15 @@ public void start() {
     }
   }
 
+  private <T extends ServerBuilder<T>> Server buildGrpcServer(ServerBuilder<T> 
builder) {
+    return builder
+         // By using directExecutor, GRPC doesn't need to manage its own 
thread pool

Review Comment:
   IIUC, this means that the request processing callbacks will happen directly 
on the transport thread - i.e., the Netty thread? And this is only recommended 
when the request processing is non-blocking - however, we still do proto deser 
on the request processing thread. I assume that that's okay because it should 
have minimal overhead and we don't really risk blocking Netty threads for 
handling other connections and requests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to