walterddr commented on code in PR #10332: URL: https://github.com/apache/pinot/pull/10332#discussion_r1119050545
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java: ########## @@ -220,23 +252,94 @@ public static MailboxReceiveOperator createReduceStageOperator(MailboxService<Tr public void shutdown() { for (DispatchClient dispatchClient : _dispatchClientMap.values()) { - dispatchClient._managedChannel.shutdown(); + dispatchClient._channel.shutdown(); } _dispatchClientMap.clear(); } + @VisibleForTesting + DispatchClient getOrCreateDispatchClient(String host, int port) { + String key = String.format("%s_%d", host, port); + return _dispatchClientMap.computeIfAbsent(key, k -> new DispatchClient(host, port)); + } + public static class DispatchClient { - private final PinotQueryWorkerGrpc.PinotQueryWorkerBlockingStub _blockingStub; - private final ManagedChannel _managedChannel; + private ManagedChannel _channel; + private PinotQueryWorkerGrpc.PinotQueryWorkerStub _dispatchStub; public DispatchClient(String host, int port) { - ManagedChannelBuilder managedChannelBuilder = ManagedChannelBuilder.forAddress(host, port).usePlaintext(); - _managedChannel = managedChannelBuilder.build(); - _blockingStub = PinotQueryWorkerGrpc.newBlockingStub(_managedChannel); + _channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(); Review Comment: nit: let's move the DispatchClient and DispatchObserver to their own file under `org.apache.pinot.query.service.dispatch`. makes the code cleaner. ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java: ########## @@ -99,28 +116,43 @@ public int submit(long requestId, QueryPlan queryPlan, long timeoutMs, Map<Strin String host = serverInstance.getHostname(); int servicePort = serverInstance.getQueryServicePort(); DispatchClient client = getOrCreateDispatchClient(host, servicePort); - Worker.QueryResponse response = client.submit(Worker.QueryRequest.newBuilder().setStagePlan( - QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId, serverInstance))) - .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(requestId)) - .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, String.valueOf(timeoutMs)) - .putAllMetadata(queryOptions).build()); - - if (response.containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR)) { - throw new RuntimeException( - String.format("Unable to execute query plan at stage %s on server %s: ERROR: %s", stageId, - serverInstance, response)); + dispatchCalls++; + _executorService.submit(() -> { + client.submit(Worker.QueryRequest.newBuilder().setStagePlan( + QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId, serverInstance))) + .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(requestId)) + .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, String.valueOf(timeoutMs)) + .putAllMetadata(queryOptions).build(), stageId, serverInstance, deadline, callbacks::offer); + }); + } + } + } + for (int i = 0; i < dispatchCalls; i++) { + AsyncResponse resp; + while (!deadline.isExpired()) { + resp = callbacks.poll(DEFAULT_DISPATCHER_CALLBACK_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS); + if (resp != null) { + if (resp.getThrowable() != null) { + throw new RuntimeException(String.format("Error dispatching query to server=%s stage=%s", + resp._virtualServer, resp._stageId), resp.getThrowable()); + } else { + Worker.QueryResponse response = resp.getQueryResponse(); + if (response.containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR)) { + throw new RuntimeException( + String.format("Unable to execute query plan at stage %s on server %s: ERROR: %s", resp.getStageId(), + resp.getVirtualServer(), response)); + } + break; Review Comment: not sure why we need a separate deadline expire + 2 nested loop. cant we just do ``` while (!deadline.isExpired() && successfulDispatchCount < dispatchCalls) { resp = callbacks.poll(TIMEOUT); if (...) { .... // all the error conditions. } else { successfulDispatch ++; } } ``` ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java: ########## @@ -99,28 +116,43 @@ public int submit(long requestId, QueryPlan queryPlan, long timeoutMs, Map<Strin String host = serverInstance.getHostname(); int servicePort = serverInstance.getQueryServicePort(); DispatchClient client = getOrCreateDispatchClient(host, servicePort); - Worker.QueryResponse response = client.submit(Worker.QueryRequest.newBuilder().setStagePlan( - QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId, serverInstance))) - .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(requestId)) - .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, String.valueOf(timeoutMs)) - .putAllMetadata(queryOptions).build()); - - if (response.containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR)) { - throw new RuntimeException( - String.format("Unable to execute query plan at stage %s on server %s: ERROR: %s", stageId, - serverInstance, response)); + dispatchCalls++; + _executorService.submit(() -> { + client.submit(Worker.QueryRequest.newBuilder().setStagePlan( + QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId, serverInstance))) + .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(requestId)) + .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, String.valueOf(timeoutMs)) + .putAllMetadata(queryOptions).build(), stageId, serverInstance, deadline, callbacks::offer); + }); + } + } + } + for (int i = 0; i < dispatchCalls; i++) { + AsyncResponse resp; + while (!deadline.isExpired()) { + resp = callbacks.poll(DEFAULT_DISPATCHER_CALLBACK_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS); + if (resp != null) { + if (resp.getThrowable() != null) { + throw new RuntimeException(String.format("Error dispatching query to server=%s stage=%s", + resp._virtualServer, resp._stageId), resp.getThrowable()); + } else { + Worker.QueryResponse response = resp.getQueryResponse(); + if (response.containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR)) { + throw new RuntimeException( + String.format("Unable to execute query plan at stage %s on server %s: ERROR: %s", resp.getStageId(), + resp.getVirtualServer(), response)); + } + break; } } } } + if (deadline.isExpired()) { + throw new RuntimeException("Timed out waiting for response of async query-dispatch"); + } Review Comment: add a comment: `TODO: cancel all ongoing dispatched requests` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org