walterddr commented on code in PR #10332:
URL: https://github.com/apache/pinot/pull/10332#discussion_r1119050545


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java:
##########
@@ -220,23 +252,94 @@ public static MailboxReceiveOperator 
createReduceStageOperator(MailboxService<Tr
 
   public void shutdown() {
     for (DispatchClient dispatchClient : _dispatchClientMap.values()) {
-      dispatchClient._managedChannel.shutdown();
+      dispatchClient._channel.shutdown();
     }
     _dispatchClientMap.clear();
   }
 
+  @VisibleForTesting
+  DispatchClient getOrCreateDispatchClient(String host, int port) {
+    String key = String.format("%s_%d", host, port);
+    return _dispatchClientMap.computeIfAbsent(key, k -> new 
DispatchClient(host, port));
+  }
+
   public static class DispatchClient {
-    private final PinotQueryWorkerGrpc.PinotQueryWorkerBlockingStub 
_blockingStub;
-    private final ManagedChannel _managedChannel;
+    private ManagedChannel _channel;
+    private PinotQueryWorkerGrpc.PinotQueryWorkerStub _dispatchStub;
 
     public DispatchClient(String host, int port) {
-      ManagedChannelBuilder managedChannelBuilder = 
ManagedChannelBuilder.forAddress(host, port).usePlaintext();
-      _managedChannel = managedChannelBuilder.build();
-      _blockingStub = PinotQueryWorkerGrpc.newBlockingStub(_managedChannel);
+      _channel = ManagedChannelBuilder.forAddress(host, 
port).usePlaintext().build();

Review Comment:
   nit: let's move the DispatchClient and DispatchObserver to their own file 
under `org.apache.pinot.query.service.dispatch`. makes the code cleaner.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java:
##########
@@ -99,28 +116,43 @@ public int submit(long requestId, QueryPlan queryPlan, 
long timeoutMs, Map<Strin
           String host = serverInstance.getHostname();
           int servicePort = serverInstance.getQueryServicePort();
           DispatchClient client = getOrCreateDispatchClient(host, servicePort);
-          Worker.QueryResponse response = 
client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
-                  
QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId, 
serverInstance)))
-              .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, 
String.valueOf(requestId))
-              .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, 
String.valueOf(timeoutMs))
-              .putAllMetadata(queryOptions).build());
-
-          if 
(response.containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR)) {
-            throw new RuntimeException(
-                String.format("Unable to execute query plan at stage %s on 
server %s: ERROR: %s", stageId,
-                    serverInstance, response));
+          dispatchCalls++;
+          _executorService.submit(() -> {
+            client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
+                
QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId, 
serverInstance)))
+                .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, 
String.valueOf(requestId))
+                .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, 
String.valueOf(timeoutMs))
+                .putAllMetadata(queryOptions).build(), stageId, 
serverInstance, deadline, callbacks::offer);
+          });
+        }
+      }
+    }
+    for (int i = 0; i < dispatchCalls; i++) {
+      AsyncResponse resp;
+      while (!deadline.isExpired()) {
+        resp = callbacks.poll(DEFAULT_DISPATCHER_CALLBACK_POLL_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+        if (resp != null) {
+          if (resp.getThrowable() != null) {
+            throw new RuntimeException(String.format("Error dispatching query 
to server=%s stage=%s",
+                resp._virtualServer, resp._stageId), resp.getThrowable());
+          } else {
+            Worker.QueryResponse response = resp.getQueryResponse();
+            if 
(response.containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR)) {
+              throw new RuntimeException(
+                  String.format("Unable to execute query plan at stage %s on 
server %s: ERROR: %s", resp.getStageId(),
+                      resp.getVirtualServer(), response));
+            }
+            break;

Review Comment:
   not sure why we need a separate deadline expire + 2 nested loop. 
   cant we just do
   ```
   while (!deadline.isExpired() && successfulDispatchCount < dispatchCalls) {
     resp = callbacks.poll(TIMEOUT);
     if (...) {
        .... // all the error conditions.
     } else {
       successfulDispatch ++;
     }
   }
   ```
   



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java:
##########
@@ -99,28 +116,43 @@ public int submit(long requestId, QueryPlan queryPlan, 
long timeoutMs, Map<Strin
           String host = serverInstance.getHostname();
           int servicePort = serverInstance.getQueryServicePort();
           DispatchClient client = getOrCreateDispatchClient(host, servicePort);
-          Worker.QueryResponse response = 
client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
-                  
QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId, 
serverInstance)))
-              .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, 
String.valueOf(requestId))
-              .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, 
String.valueOf(timeoutMs))
-              .putAllMetadata(queryOptions).build());
-
-          if 
(response.containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR)) {
-            throw new RuntimeException(
-                String.format("Unable to execute query plan at stage %s on 
server %s: ERROR: %s", stageId,
-                    serverInstance, response));
+          dispatchCalls++;
+          _executorService.submit(() -> {
+            client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
+                
QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId, 
serverInstance)))
+                .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, 
String.valueOf(requestId))
+                .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, 
String.valueOf(timeoutMs))
+                .putAllMetadata(queryOptions).build(), stageId, 
serverInstance, deadline, callbacks::offer);
+          });
+        }
+      }
+    }
+    for (int i = 0; i < dispatchCalls; i++) {
+      AsyncResponse resp;
+      while (!deadline.isExpired()) {
+        resp = callbacks.poll(DEFAULT_DISPATCHER_CALLBACK_POLL_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+        if (resp != null) {
+          if (resp.getThrowable() != null) {
+            throw new RuntimeException(String.format("Error dispatching query 
to server=%s stage=%s",
+                resp._virtualServer, resp._stageId), resp.getThrowable());
+          } else {
+            Worker.QueryResponse response = resp.getQueryResponse();
+            if 
(response.containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR)) {
+              throw new RuntimeException(
+                  String.format("Unable to execute query plan at stage %s on 
server %s: ERROR: %s", resp.getStageId(),
+                      resp.getVirtualServer(), response));
+            }
+            break;
           }
         }
       }
     }
+    if (deadline.isExpired()) {
+      throw new RuntimeException("Timed out waiting for response of async 
query-dispatch");
+    }

Review Comment:
   add a comment: `TODO: cancel all ongoing dispatched requests`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to