This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9321eabe78 Add round-robin logic during downloadSegmentFromPeer (#12353) 9321eabe78 is described below commit 9321eabe784f036102804c6090be5842cb164a69 Author: Pratik Tibrewal <tibrewalpra...@uber.com> AuthorDate: Wed Feb 14 02:53:55 2024 +0530 Add round-robin logic during downloadSegmentFromPeer (#12353) --- .../pinot/common/utils/RoundRobinURIProvider.java | 58 +++++++++++++++------- .../common/utils/fetcher/BaseSegmentFetcher.java | 6 +-- .../common/utils/fetcher/HttpSegmentFetcher.java | 4 +- .../common/utils/RoundRobinURIProviderTest.java | 2 +- .../minion/tasks/SegmentConversionUtils.java | 2 +- 5 files changed, 47 insertions(+), 25 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/RoundRobinURIProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/RoundRobinURIProvider.java index 39fe142ea2..e6ba13c764 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/RoundRobinURIProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/RoundRobinURIProvider.java @@ -23,44 +23,66 @@ import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; import java.util.Random; import org.apache.http.client.utils.URIBuilder; /** - * RoundRobinURIProvider accept a URI, try to resolve it into multiple URIs with IP address, and return a IP address URI - * in a Round Robin way. + * RoundRobinURIProvider accept a list of URIs and whether to resolve them into multiple URIs with IP address. + * If resolveHost = true, it returns a IP address URI in a Round Robin way. + * If resolveHost = false, then it returns a URI in a Round Robin way. */ public class RoundRobinURIProvider { - private final URI[] _uris; + private final List<URI> _uris; private int _index; - public RoundRobinURIProvider(URI originalUri) + public RoundRobinURIProvider(List<URI> originalUris, boolean resolveHost) throws UnknownHostException, URISyntaxException { + if (resolveHost) { + _uris = resolveHostsToIPAddresses(originalUris); + } else { + _uris = List.copyOf(originalUris); + } + _index = new Random().nextInt(_uris.size()); + } + + public int numAddresses() { + return _uris.size(); + } + + public URI next() { + URI result = _uris.get(_index); + _index = (_index + 1) % _uris.size(); + return result; + } + + private List<URI> resolveHostToIPAddresses(URI originalUri) + throws UnknownHostException, URISyntaxException { + List<URI> resolvedUris = new ArrayList<>(); String hostName = originalUri.getHost(); if (InetAddresses.isInetAddress(hostName)) { - _uris = new URI[]{originalUri}; + resolvedUris.add(originalUri); } else { // Resolve host name to IP addresses via DNS InetAddress[] addresses = InetAddress.getAllByName(hostName); - _uris = new URI[addresses.length]; URIBuilder uriBuilder = new URIBuilder(originalUri); - for (int i = 0; i < addresses.length; i++) { - String ip = addresses[i].getHostAddress(); - _uris[i] = uriBuilder.setHost(ip).build(); + for (InetAddress address : addresses) { + String ip = address.getHostAddress(); + resolvedUris.add(uriBuilder.setHost(ip).build()); } } - _index = new Random().nextInt(_uris.length); - } - - public int numAddresses() { - return _uris.length; + return resolvedUris; } - public URI next() { - URI result = _uris[_index]; - _index = (_index + 1) % _uris.length; - return result; + private List<URI> resolveHostsToIPAddresses(List<URI> originalUri) + throws UnknownHostException, URISyntaxException { + List<URI> resolvedUris = new ArrayList<>(); + for (URI uri : originalUri) { + resolvedUris.addAll(resolveHostToIPAddresses(uri)); + } + return resolvedUris; } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java index 9deca98343..d33c7ead43 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java @@ -21,10 +21,10 @@ package org.apache.pinot.common.utils.fetcher; import java.io.File; import java.net.URI; import java.util.List; -import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import org.apache.pinot.common.auth.AuthProviderUtils; +import org.apache.pinot.common.utils.RoundRobinURIProvider; import org.apache.pinot.spi.auth.AuthProvider; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; @@ -90,9 +90,9 @@ public abstract class BaseSegmentFetcher implements SegmentFetcher { if (uris == null || uris.isEmpty()) { throw new IllegalArgumentException("The input uri list is null or empty"); } - Random r = new Random(); + RoundRobinURIProvider roundRobinURIProvider = new RoundRobinURIProvider(uris, false); RetryPolicies.exponentialBackoffRetryPolicy(_retryCount, _retryWaitMs, _retryDelayScaleFactor).attempt(() -> { - URI uri = uris.get(r.nextInt(uris.size())); + URI uri = roundRobinURIProvider.next(); try { fetchSegmentToLocalWithoutRetry(uri, dest); _logger.info("Fetched segment from: {} to: {} of size: {}", uri, dest, dest.length()); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java index 2b986259b5..170327dc5b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java @@ -68,7 +68,7 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher { throws Exception { // Create a RoundRobinURIProvider to round robin IP addresses when retry uploading. Otherwise may always try to // download from a same broken host as: 1) DNS may not RR the IP addresses 2) OS cache the DNS resolution result. - RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(downloadURI); + RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(List.of(downloadURI), true); int retryCount = getRetryCount(uriProvider); @@ -124,7 +124,7 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher { throws Exception { // Create a RoundRobinURIProvider to round robin IP addresses when retry uploading. Otherwise, may always try to // download from a same broken host as: 1) DNS may not RR the IP addresses 2) OS cache the DNS resolution result. - RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(downloadURI); + RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(List.of(downloadURI), true); int retryCount = getRetryCount(uriProvider); diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java index 3ef82a8e21..05dfff8a6f 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/RoundRobinURIProviderTest.java @@ -162,7 +162,7 @@ public class RoundRobinURIProviderTest { for (TestCase testCase : testCases) { String uri = testCase._originalUri; - RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(new URI(uri)); + RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(List.of(new URI(uri)), true); int n = testCase._expectedUris.size(); int previousIndex = -1; int currentIndex; diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java index 7bd2e434c4..76824abe0d 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java @@ -117,7 +117,7 @@ public class SegmentConversionUtils { throws Exception { // Create a RoundRobinURIProvider to round-robin IP addresses when retry uploading. Otherwise, it may always try to // upload to a same broken host as: 1) DNS may not RR the IP addresses 2) OS cache the DNS resolution result. - RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(new URI(uploadURL)); + RoundRobinURIProvider uriProvider = new RoundRobinURIProvider(List.of(new URI(uploadURL)), true); // Generate retry policy based on the config String maxNumAttemptsConfigStr = configs.get(MinionConstants.MAX_NUM_ATTEMPTS_KEY); int maxNumAttemptsFromConfig = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org