walterddr commented on code in PR #12363:
URL: https://github.com/apache/pinot/pull/12363#discussion_r1477143458


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java:
##########
@@ -97,42 +97,60 @@ public void shutdown() {
 
   @Override
   public void submit(Worker.QueryRequest request, 
StreamObserver<Worker.QueryResponse> responseObserver) {
-    // Deserialize the request
-    List<DistributedStagePlan> distributedStagePlans;
-    Map<String, String> requestMetadata;
-    requestMetadata = Collections.unmodifiableMap(request.getMetadataMap());
+    Map<String, String> requestMetadata = request.getMetadataMap();
     long requestId = 
Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
     long timeoutMs = 
Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
-    // 1. Deserialized request
-    try {
-      distributedStagePlans = 
QueryPlanSerDeUtils.deserializeStagePlan(request);
-    } catch (Exception e) {
-      LOGGER.error("Caught exception while deserializing the request: {}", 
requestId, e);
-      responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad 
request").withCause(e).asException());
-      return;
-    }
-    // 2. Submit distributed stage plans, await response successful or any 
failure which cancels all other tasks.
-    int numSubmission = distributedStagePlans.size();
-    CompletableFuture<?>[] submissionStubs = new 
CompletableFuture[numSubmission];
-    for (int i = 0; i < numSubmission; i++) {
-      DistributedStagePlan distributedStagePlan = distributedStagePlans.get(i);
-      submissionStubs[i] =
-          CompletableFuture.runAsync(() -> 
_queryRunner.processQuery(distributedStagePlan, requestMetadata),
-              _querySubmissionExecutorService);
+
+    List<Worker.StagePlan> stagePlans = request.getStagePlanList();
+    int numStages = stagePlans.size();
+    CompletableFuture<?>[] stageSubmissionStubs = new 
CompletableFuture[numStages];
+    List<CompletableFuture<Void>> queryExecutionStubs = 
Collections.synchronizedList(new ArrayList<>());
+    for (int i = 0; i < numStages; i++) {
+      Worker.StagePlan stagePlan = stagePlans.get(i);
+      stageSubmissionStubs[i] = CompletableFuture.runAsync(() -> {
+        List<DistributedStagePlan> workerPlans;
+        try {
+          workerPlans = QueryPlanSerDeUtils.deserializeStagePlan(stagePlan);
+        } catch (Exception e) {
+          throw new RuntimeException(
+              String.format("Caught exception while deserializing stage plan 
for request: %d, stage id: %d", requestId,
+                  stagePlan.getStageId()), e);
+        }
+        int numWorkers = workerPlans.size();
+        CompletableFuture<?>[] workerSubmissionStubs = new 
CompletableFuture[numWorkers];
+        for (DistributedStagePlan workerPlan : workerPlans) {
+          queryExecutionStubs.add(
+              CompletableFuture.runAsync(() -> 
_queryRunner.processQuery(workerPlan, requestMetadata),
+                  _querySubmissionExecutorService));
+        }
+        try {
+          CompletableFuture.allOf(workerSubmissionStubs)
+              .get(deadlineMs - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+          throw new RuntimeException(
+              String.format("Caught exception while submitting request: %d, 
stage id: %d", requestId,
+                  stagePlan.getStageId()), e);
+        } finally {
+          for (CompletableFuture<?> future : workerSubmissionStubs) {
+            if (!future.isDone()) {
+              future.cancel(true);
+            }
+          }
+        }
+      }, _querySubmissionExecutorService);
     }
     try {
-      CompletableFuture.allOf(submissionStubs).get(deadlineMs - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+      CompletableFuture.allOf(stageSubmissionStubs).get(deadlineMs - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);

Review Comment:
   looks like there're some issues with submission stub NPE. PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to