richardstartin commented on a change in pull request #8272:
URL: https://github.com/apache/pinot/pull/8272#discussion_r817701130



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
##########
@@ -106,15 +104,18 @@ public BrokerResponseNative 
reduceOnStreamResponse(BrokerRequest brokerRequest,
     return brokerResponseNative;
   }
 
-  private static void processIterativeServerResponse(StreamingReducer reducer, 
ExecutorService executorService,
+  @VisibleForTesting
+  static void processIterativeServerResponse(StreamingReducer reducer,
       Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> 
serverResponseMap, long reduceTimeOutMs,
-      ExecutionStatsAggregator aggregator) throws Exception {
+      ExecutionStatsAggregator aggregator)
+      throws Exception {
     int cnt = 0;
-    Future[] futures = new Future[serverResponseMap.size()];
-    CountDownLatch countDownLatch = new 
CountDownLatch(serverResponseMap.size());
-
-    for (Map.Entry<ServerRoutingInstance, Iterator<Server.ServerResponse>> 
entry: serverResponseMap.entrySet()) {
-      futures[cnt++] = executorService.submit(() -> {
+    CompletableFuture<Void>[] futures = new 
CompletableFuture[serverResponseMap.size()];
+    // based on ideas from on 
https://stackoverflow.com/questions/19348248/waiting-on-a-list-of-future
+    // and 
https://stackoverflow.com/questions/23301598/transform-java-future-into-a-completablefuture
+    // Future created via ExecutorService.submit() can be created by 
CompletableFuture.supplyAsync()
+    for (Map.Entry<ServerRoutingInstance, Iterator<Server.ServerResponse>> 
entry : serverResponseMap.entrySet()) {
+      futures[cnt++] = CompletableFuture.supplyAsync(() -> {

Review comment:
       This executes in a global pool which isn't the right behaviour. You can 
supply an executor to make it execute in the allotted thread pool.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to