richardstartin commented on a change in pull request #8272: URL: https://github.com/apache/pinot/pull/8272#discussion_r817701130
########## File path: pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java ########## @@ -106,15 +104,18 @@ public BrokerResponseNative reduceOnStreamResponse(BrokerRequest brokerRequest, return brokerResponseNative; } - private static void processIterativeServerResponse(StreamingReducer reducer, ExecutorService executorService, + @VisibleForTesting + static void processIterativeServerResponse(StreamingReducer reducer, Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponseMap, long reduceTimeOutMs, - ExecutionStatsAggregator aggregator) throws Exception { + ExecutionStatsAggregator aggregator) + throws Exception { int cnt = 0; - Future[] futures = new Future[serverResponseMap.size()]; - CountDownLatch countDownLatch = new CountDownLatch(serverResponseMap.size()); - - for (Map.Entry<ServerRoutingInstance, Iterator<Server.ServerResponse>> entry: serverResponseMap.entrySet()) { - futures[cnt++] = executorService.submit(() -> { + CompletableFuture<Void>[] futures = new CompletableFuture[serverResponseMap.size()]; + // based on ideas from on https://stackoverflow.com/questions/19348248/waiting-on-a-list-of-future + // and https://stackoverflow.com/questions/23301598/transform-java-future-into-a-completablefuture + // Future created via ExecutorService.submit() can be created by CompletableFuture.supplyAsync() + for (Map.Entry<ServerRoutingInstance, Iterator<Server.ServerResponse>> entry : serverResponseMap.entrySet()) { + futures[cnt++] = CompletableFuture.supplyAsync(() -> { Review comment: This executes in a global pool which isn't the right behaviour. You can supply an executor to make it execute in the allotted thread pool. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org