dsmiley commented on a change in pull request #1770:
URL: https://github.com/apache/lucene-solr/pull/1770#discussion_r486065813



##########
File path: 
solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
##########
@@ -537,40 +541,100 @@ public void registerDocCollectionWatcher(String 
collection, DocCollectionWatcher
       }
     }
 
-    final NamedList<Throwable> exceptions = new NamedList<>();
-    @SuppressWarnings({"rawtypes"})
-    final NamedList<NamedList> shardResponses = new 
NamedList<>(routes.size()+1); // +1 for deleteQuery
+    final NamedList<NamedList<?>> shardResponses = new 
NamedList<>(routes.size()+1); // +1 for deleteQuery
 
     long start = System.nanoTime();
 
+    CompletableFuture<Void> updateFuture;
     if (parallelUpdates) {
-      final Map<String, Future<NamedList<?>>> responseFutures = new 
HashMap<>(routes.size());
-      for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : 
routes.entrySet()) {
-        final String url = entry.getKey();
-        final LBSolrClient.Req lbRequest = entry.getValue();
-        try {
-          MDC.put("CloudSolrClient.url", url);
-          responseFutures.put(url, threadPool.submit(() -> {
-            return getLbClient().request(lbRequest).getResponse();
-          }));
-        } finally {
-          MDC.remove("CloudSolrClient.url");
+      updateFuture = doUpdatesWithExecutor(routes, shardResponses, 
isAsyncRequest);
+    } else {
+      updateFuture = doUpdatesWithoutExecutor(routes, shardResponses, 
isAsyncRequest);
+    }
+
+    CompletableFuture<NamedList<Object>> apiFuture = new CompletableFuture<>();
+    if (!isAsyncRequest) {
+      try {
+        updateFuture.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof SolrServerException) {
+          throw (SolrServerException) cause;
+        } else if (cause instanceof RuntimeException) {
+          throw (RuntimeException) cause;
+        } else {
+          throw new SolrServerException(cause);
         }
       }
+      doDeleteQuery(updateRequest, nonRoutableParams, routes, shardResponses, 
apiFuture, start, isAsyncRequest);
+    } else {
+      updateFuture.whenComplete((result, error) -> {
+        if (updateFuture.isCompletedExceptionally()) {
+          apiFuture.completeExceptionally(error);
+        } else {
+          doDeleteQuery(updateRequest, nonRoutableParams, routes, 
shardResponses, apiFuture, start, isAsyncRequest);
+        }
+      });
 
-      for (final Map.Entry<String, Future<NamedList<?>>> entry: 
responseFutures.entrySet()) {
-        final String url = entry.getKey();
-        final Future<NamedList<?>> responseFuture = entry.getValue();
-        try {
-          shardResponses.add(url, responseFuture.get());
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
-        } catch (ExecutionException e) {
-          exceptions.add(url, e.getCause());
+      apiFuture.exceptionally((error) -> {
+        if (apiFuture.isCancelled()) {
+          updateFuture.cancel(true);
         }
+        return null;
+      });
+    }
+
+    return apiFuture;
+  }
+
+  private CompletableFuture<Void> doUpdatesWithExecutor(final Map<String, ? 
extends LBSolrClient.Req> routes,
+                                                        
NamedList<NamedList<?>> shardResponses,
+                                                        boolean 
isAsyncRequest) {
+    final NamedList<Throwable> exceptions = new NamedList<>();
+    final Map<String, CompletableFuture<NamedList<Object>>> responseFutures = 
new HashMap<>(routes.size());
+    for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : 
routes.entrySet()) {
+      final String url = entry.getKey();
+      final LBSolrClient.Req lbRequest = entry.getValue();
+      try {
+        MDC.put("CloudSolrClient.url", url);
+        final CompletableFuture<NamedList<Object>> future = new 
CompletableFuture<>();
+        if (isAsyncRequest) {
+          CompletableFuture<LBSolrClient.Rsp> reqFuture = 
getLbClient().requestAsync(lbRequest);
+          reqFuture.whenComplete((result, error) -> {
+            if (!reqFuture.isCompletedExceptionally()) {
+              future.complete(result.getResponse());
+            } else {
+              future.completeExceptionally(error);
+            }
+          });
+        } else {

Review comment:
       In my experience, everyone has and wants an ExecutorService.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to