This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9321eabe78 Add round-robin logic during downloadSegmentFromPeer 
(#12353)
9321eabe78 is described below

commit 9321eabe784f036102804c6090be5842cb164a69
Author: Pratik Tibrewal <tibrewalpra...@uber.com>
AuthorDate: Wed Feb 14 02:53:55 2024 +0530

    Add round-robin logic during downloadSegmentFromPeer (#12353)
---
 .../pinot/common/utils/RoundRobinURIProvider.java  | 58 +++++++++++++++-------
 .../common/utils/fetcher/BaseSegmentFetcher.java   |  6 +--
 .../common/utils/fetcher/HttpSegmentFetcher.java   |  4 +-
 .../common/utils/RoundRobinURIProviderTest.java    |  2 +-
 .../minion/tasks/SegmentConversionUtils.java       |  2 +-
 5 files changed, 47 insertions(+), 25 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/RoundRobinURIProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/RoundRobinURIProvider.java
index 39fe142ea2..e6ba13c764 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/RoundRobinURIProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/RoundRobinURIProvider.java
@@ -23,44 +23,66 @@ import java.net.InetAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 import org.apache.http.client.utils.URIBuilder;
 
 
 /**
- * RoundRobinURIProvider accept a URI, try to resolve it into multiple URIs 
with IP address, and return a IP address URI
- * in a Round Robin way.
+ * RoundRobinURIProvider accept a list of URIs and whether to resolve them 
into multiple URIs with IP address.
+ * If resolveHost = true, it returns a IP address URI in a Round Robin way.
+ * If resolveHost = false, then it returns a URI in a Round Robin way.
  */
 public class RoundRobinURIProvider {
 
-  private final URI[] _uris;
+  private final List<URI> _uris;
   private int _index;
 
-  public RoundRobinURIProvider(URI originalUri)
+  public RoundRobinURIProvider(List<URI> originalUris, boolean resolveHost)
       throws UnknownHostException, URISyntaxException {
+    if (resolveHost) {
+      _uris = resolveHostsToIPAddresses(originalUris);
+    } else {
+      _uris = List.copyOf(originalUris);
+    }
+    _index = new Random().nextInt(_uris.size());
+  }
+
+  public int numAddresses() {
+    return _uris.size();
+  }
+
+  public URI next() {
+    URI result = _uris.get(_index);
+    _index = (_index + 1) % _uris.size();
+    return result;
+  }
+
+  private List<URI> resolveHostToIPAddresses(URI originalUri)
+      throws UnknownHostException, URISyntaxException {
+    List<URI> resolvedUris = new ArrayList<>();
     String hostName = originalUri.getHost();
     if (InetAddresses.isInetAddress(hostName)) {
-      _uris = new URI[]{originalUri};
+      resolvedUris.add(originalUri);
     } else {
       // Resolve host name to IP addresses via DNS
       InetAddress[] addresses = InetAddress.getAllByName(hostName);
-      _uris = new URI[addresses.length];
       URIBuilder uriBuilder = new URIBuilder(originalUri);
-      for (int i = 0; i < addresses.length; i++) {
-        String ip = addresses[i].getHostAddress();
-        _uris[i] = uriBuilder.setHost(ip).build();
+      for (InetAddress address : addresses) {
+        String ip = address.getHostAddress();
+        resolvedUris.add(uriBuilder.setHost(ip).build());
       }
     }
-    _index = new Random().nextInt(_uris.length);
-  }
-
-  public int numAddresses() {
-    return _uris.length;
+    return resolvedUris;
   }
 
-  public URI next() {
-    URI result = _uris[_index];
-    _index = (_index + 1) % _uris.length;
-    return result;
+  private List<URI> resolveHostsToIPAddresses(List<URI> originalUri)
+      throws UnknownHostException, URISyntaxException {
+    List<URI> resolvedUris = new ArrayList<>();
+    for (URI uri : originalUri) {
+      resolvedUris.addAll(resolveHostToIPAddresses(uri));
+    }
+    return resolvedUris;
   }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
index 9deca98343..d33c7ead43 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
@@ -21,10 +21,10 @@ package org.apache.pinot.common.utils.fetcher;
 import java.io.File;
 import java.net.URI;
 import java.util.List;
-import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.utils.RoundRobinURIProvider;
 import org.apache.pinot.spi.auth.AuthProvider;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -90,9 +90,9 @@ public abstract class BaseSegmentFetcher implements 
SegmentFetcher {
     if (uris == null || uris.isEmpty()) {
       throw new IllegalArgumentException("The input uri list is null or 
empty");
     }
-    Random r = new Random();
+    RoundRobinURIProvider roundRobinURIProvider = new 
RoundRobinURIProvider(uris, false);
     RetryPolicies.exponentialBackoffRetryPolicy(_retryCount, _retryWaitMs, 
_retryDelayScaleFactor).attempt(() -> {
-      URI uri = uris.get(r.nextInt(uris.size()));
+      URI uri = roundRobinURIProvider.next();
       try {
         fetchSegmentToLocalWithoutRetry(uri, dest);
         _logger.info("Fetched segment from: {} to: {} of size: {}", uri, dest, 
dest.length());
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
index 2b986259b5..170327dc5b 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
@@ -68,7 +68,7 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
       throws Exception {
     // Create a RoundRobinURIProvider to round robin IP addresses when retry 
uploading. Otherwise may always try to
     // download from a same broken host as: 1) DNS may not RR the IP addresses 
2) OS cache the DNS resolution result.
-    RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(downloadURI);
+    RoundRobinURIProvider uriProvider = new 
RoundRobinURIProvider(List.of(downloadURI), true);
 
     int retryCount = getRetryCount(uriProvider);
 
@@ -124,7 +124,7 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
       throws Exception {
     // Create a RoundRobinURIProvider to round robin IP addresses when retry 
uploading. Otherwise, may always try to
     // download from a same broken host as: 1) DNS may not RR the IP addresses 
2) OS cache the DNS resolution result.
-    RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(downloadURI);
+    RoundRobinURIProvider uriProvider = new 
RoundRobinURIProvider(List.of(downloadURI), true);
 
     int retryCount = getRetryCount(uriProvider);
 
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java
index 3ef82a8e21..05dfff8a6f 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java
@@ -162,7 +162,7 @@ public class RoundRobinURIProviderTest {
 
     for (TestCase testCase : testCases) {
       String uri = testCase._originalUri;
-      RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(new 
URI(uri));
+      RoundRobinURIProvider uriProvider = new 
RoundRobinURIProvider(List.of(new URI(uri)), true);
       int n = testCase._expectedUris.size();
       int previousIndex = -1;
       int currentIndex;
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
index 7bd2e434c4..76824abe0d 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
@@ -117,7 +117,7 @@ public class SegmentConversionUtils {
       throws Exception {
     // Create a RoundRobinURIProvider to round-robin IP addresses when retry 
uploading. Otherwise, it may always try to
     // upload to a same broken host as: 1) DNS may not RR the IP addresses 2) 
OS cache the DNS resolution result.
-    RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(new 
URI(uploadURL));
+    RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(List.of(new 
URI(uploadURL)), true);
     // Generate retry policy based on the config
     String maxNumAttemptsConfigStr = 
configs.get(MinionConstants.MAX_NUM_ATTEMPTS_KEY);
     int maxNumAttemptsFromConfig =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to