This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ec452a49f3 Refine PeerServerSegmentFinder (#12933)
ec452a49f3 is described below

commit ec452a49f3c885308613bc45dfa44b48a16076ba
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Mon Apr 15 17:02:18 2024 -0700

    Refine PeerServerSegmentFinder (#12933)
---
 .../common/utils/fetcher/BaseSegmentFetcher.java   |   9 +-
 .../common/utils/fetcher/HttpSegmentFetcher.java   |  28 ++--
 .../pinot/core/util/PeerServerSegmentFinder.java   | 101 ++++++--------
 .../utils/fetcher/HttpSegmentFetcherTest.java      | 152 +++++++--------------
 .../realtime/PinotLLCRealtimeSegmentManager.java   |   3 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        |  91 +++++-------
 .../core/data/manager/BaseTableDataManager.java    |   9 +-
 .../manager/realtime/RealtimeTableDataManager.java |  28 ++--
 .../data/manager/BaseTableDataManagerTest.java     |   4 +-
 .../core/util/PeerServerSegmentFinderTest.java     | 128 ++++++++---------
 .../utils/retry/ExponentialBackoffRetryPolicy.java |   6 +-
 11 files changed, 220 insertions(+), 339 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
index d33c7ead43..5fb82388f2 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java
@@ -42,13 +42,13 @@ public abstract class BaseSegmentFetcher implements 
SegmentFetcher {
   public static final String RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY = 
"retry.delay.scale.factor";
   public static final int DEFAULT_RETRY_COUNT = 3;
   public static final int DEFAULT_RETRY_WAIT_MS = 100;
-  public static final int DEFAULT_RETRY_DELAY_SCALE_FACTOR = 5;
+  public static final double DEFAULT_RETRY_DELAY_SCALE_FACTOR = 5;
 
   protected final Logger _logger = 
LoggerFactory.getLogger(getClass().getSimpleName());
 
   protected int _retryCount;
   protected int _retryWaitMs;
-  protected int _retryDelayScaleFactor;
+  protected double _retryDelayScaleFactor;
   protected AuthProvider _authProvider;
 
   @Override
@@ -58,9 +58,8 @@ public abstract class BaseSegmentFetcher implements 
SegmentFetcher {
     _retryDelayScaleFactor = 
config.getProperty(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY, 
DEFAULT_RETRY_DELAY_SCALE_FACTOR);
     _authProvider = AuthProviderUtils.extractAuthProvider(config, 
CommonConstants.KEY_OF_AUTH);
     doInit(config);
-    _logger
-        .info("Initialized with retryCount: {}, retryWaitMs: {}, 
retryDelayScaleFactor: {}", _retryCount, _retryWaitMs,
-            _retryDelayScaleFactor);
+    _logger.info("Initialized with retryCount: {}, retryWaitMs: {}, 
retryDelayScaleFactor: {}", _retryCount,
+        _retryWaitMs, _retryDelayScaleFactor);
   }
 
   /**
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
index 170327dc5b..6872ac7714 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java
@@ -44,23 +44,16 @@ import org.apache.pinot.spi.utils.retry.RetryPolicies;
 public class HttpSegmentFetcher extends BaseSegmentFetcher {
   protected FileUploadDownloadClient _httpClient;
 
-  @Override
-  protected void doInit(PinotConfiguration config) {
-    _httpClient = new 
FileUploadDownloadClient(HttpClientConfig.newBuilder(config).build());
-  }
-
-  public HttpSegmentFetcher() {
-  }
-
   @VisibleForTesting
-  protected HttpSegmentFetcher(FileUploadDownloadClient httpClient, 
PinotConfiguration config) {
+  void setHttpClient(FileUploadDownloadClient httpClient) {
     _httpClient = httpClient;
-    _retryCount = config.getProperty(RETRY_COUNT_CONFIG_KEY, 
DEFAULT_RETRY_COUNT);
-    _retryWaitMs = config.getProperty(RETRY_WAIT_MS_CONFIG_KEY, 
DEFAULT_RETRY_WAIT_MS);
-    _retryDelayScaleFactor = 
config.getProperty(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY, 
DEFAULT_RETRY_DELAY_SCALE_FACTOR);
-    _logger
-        .info("Initialized with retryCount: {}, retryWaitMs: {}, 
retryDelayScaleFactor: {}", _retryCount, _retryWaitMs,
-            _retryDelayScaleFactor);
+  }
+
+  @Override
+  protected void doInit(PinotConfiguration config) {
+    if (_httpClient == null) {
+      _httpClient = new 
FileUploadDownloadClient(HttpClientConfig.newBuilder(config).build());
+    }
   }
 
   @Override
@@ -87,9 +80,8 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher {
           httpHeaders.add(new BasicHeader(HttpHeaders.HOST, hostName + ":" + 
port));
         }
         int statusCode = _httpClient.downloadFile(uri, dest, _authProvider, 
httpHeaders);
-        _logger
-            .info("Downloaded segment from: {} to: {} of size: {}; Response 
status code: {}", uri, dest, dest.length(),
-                statusCode);
+        _logger.info("Downloaded segment from: {} to: {} of size: {}; Response 
status code: {}", uri, dest,
+            dest.length(), statusCode);
         return true;
       } catch (HttpErrorStatusException e) {
         int statusCode = e.getStatusCode();
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
 
b/pinot-common/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
index e2c9d509f6..7f26d75935 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java
@@ -19,21 +19,19 @@
 package org.apache.pinot.core.util;
 
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import org.apache.commons.collections.ListUtils;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.helix.HelixHelper;
-import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.Helix.Instance;
+import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.apache.pinot.spi.utils.CommonConstants.Server;
 import org.apache.pinot.spi.utils.StringUtil;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,93 +45,74 @@ public class PeerServerSegmentFinder {
   private PeerServerSegmentFinder() {
   }
 
-  private static final Logger _logger = 
LoggerFactory.getLogger(PeerServerSegmentFinder.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PeerServerSegmentFinder.class);
   private static final int MAX_NUM_ATTEMPTS = 5;
   private static final int INITIAL_DELAY_MS = 500;
   private static final double DELAY_SCALE_FACTOR = 2;
 
   /**
-   *
-   * @param segmentName
-   * @param downloadScheme Can be either http or https.
-   * @param helixManager
-   * @return a list of uri strings of the form 
http(s)://hostname:port/segments/tablenameWithType/segmentName
-   * for the servers hosting ONLINE segments; empty list if no such server 
found.
+   * Returns a list of URIs of the form 
'http(s)://hostname:port/segments/tableNameWithType/segmentName' for the servers
+   * hosting ONLINE segments; empty list if no such server found. The download 
scheme can be either 'http' or 'https'.
    */
-  public static List<URI> getPeerServerURIs(String segmentName, String 
downloadScheme, HelixManager helixManager) {
-    LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
-    String tableNameWithType =
-        
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(llcSegmentName.getTableName());
-    return getPeerServerURIs(segmentName, downloadScheme, helixManager, 
tableNameWithType);
-  }
-
-  public static List<URI> getPeerServerURIs(String segmentName, String 
downloadScheme,
-      HelixManager helixManager, String tableNameWithType) {
+  public static List<URI> getPeerServerURIs(HelixManager helixManager, String 
tableNameWithType, String segmentName,
+      String downloadScheme) {
     HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
     String clusterName = helixManager.getClusterName();
-    if (clusterName == null) {
-      _logger.error("ClusterName not found");
-      return ListUtils.EMPTY_LIST;
-    }
-    final List<URI> onlineServerURIs = new ArrayList<>();
+    List<URI> onlineServerURIs = new ArrayList<>();
     try {
       RetryPolicies.exponentialBackoffRetryPolicy(MAX_NUM_ATTEMPTS, 
INITIAL_DELAY_MS, DELAY_SCALE_FACTOR)
           .attempt(() -> {
-            getOnlineServersFromExternalView(segmentName, downloadScheme, 
tableNameWithType, helixAdmin, clusterName,
+            getOnlineServersFromExternalView(helixAdmin, clusterName, 
tableNameWithType, segmentName, downloadScheme,
                 onlineServerURIs);
             return !onlineServerURIs.isEmpty();
           });
+    } catch (AttemptsExceededException e) {
+      LOGGER.error("Failed to find ONLINE servers for segment: {} in table: {} 
after {} attempts", segmentName,
+          tableNameWithType, MAX_NUM_ATTEMPTS);
     } catch (Exception e) {
-      _logger.error("Failure in getting online servers for segment {}", 
segmentName, e);
+      LOGGER.error("Caught exception while getting peer server URIs for 
segment: {} in table: {}", segmentName,
+          tableNameWithType, e);
     }
     return onlineServerURIs;
   }
 
-  private static void getOnlineServersFromExternalView(String segmentName, 
String downloadScheme,
-      String tableNameWithType, HelixAdmin helixAdmin, String clusterName, 
List<URI> onlineServerURIs) {
-    ExternalView externalViewForResource =
-        HelixHelper.getExternalViewForResource(helixAdmin, clusterName, 
tableNameWithType);
-    if (externalViewForResource == null) {
-      _logger.warn("External View not found for table {}", tableNameWithType);
+  private static void getOnlineServersFromExternalView(HelixAdmin helixAdmin, 
String clusterName,
+      String tableNameWithType, String segmentName, String downloadScheme, 
List<URI> onlineServerURIs)
+      throws Exception {
+    ExternalView externalView = 
helixAdmin.getResourceExternalView(clusterName, tableNameWithType);
+    if (externalView == null) {
+      LOGGER.warn("Failed to find external view for table: {}", 
tableNameWithType);
       return;
     }
     // Find out the ONLINE servers serving the segment.
-    Map<String, String> instanceToStateMap = 
externalViewForResource.getStateMap(segmentName);
-    for (Map.Entry<String, String> instanceState : 
instanceToStateMap.entrySet()) {
-      if ("ONLINE".equals(instanceState.getValue())) {
+    Map<String, String> instanceStateMap = 
externalView.getStateMap(segmentName);
+    if (instanceStateMap == null) {
+      LOGGER.warn("Failed to find segment: {} in table: {}", segmentName, 
tableNameWithType);
+      return;
+    }
+    for (Map.Entry<String, String> instanceState : 
instanceStateMap.entrySet()) {
+      if (SegmentStateModel.ONLINE.equals(instanceState.getValue())) {
         String instanceId = instanceState.getKey();
-        _logger.info("Found ONLINE server {} for segment {}.", instanceId, 
segmentName);
+        LOGGER.info("Found ONLINE server: {} for segment: {} in table: {}", 
instanceId, segmentName, tableNameWithType);
         InstanceConfig instanceConfig = 
helixAdmin.getInstanceConfig(clusterName, instanceId);
         String hostName = instanceConfig.getHostName();
-        int port = getServerAdminPort(helixAdmin, clusterName, instanceId, 
downloadScheme);
-        try {
-          onlineServerURIs.add(new URI(StringUtil
-              .join("/", downloadScheme + "://" + hostName + ":" + port, 
"segments", tableNameWithType, segmentName)));
-        } catch (URISyntaxException e) {
-          _logger.warn("Error in uri syntax: ", e);
-        }
+        String adminPortKey = getAdminPortKey(downloadScheme);
+        int port = instanceConfig.getRecord().getIntField(adminPortKey, 
Server.DEFAULT_ADMIN_API_PORT);
+        onlineServerURIs.add(new URI(
+            StringUtil.join("/", downloadScheme + "://" + hostName + ":" + 
port, "segments", tableNameWithType,
+                segmentName)));
       }
     }
   }
 
-  private static int getServerAdminPort(HelixAdmin helixAdmin, String 
clusterName, String instanceId,
-                                        String downloadScheme) {
-    try {
-      return Integer.parseInt(HelixHelper.getInstanceConfigsMapFor(instanceId, 
clusterName, helixAdmin)
-          .get(getServerAdminPortKey(downloadScheme)));
-    } catch (Exception e) {
-      _logger.warn("Failed to retrieve ADMIN PORT for instanceId {} in the 
cluster {} ", instanceId, clusterName, e);
-      return CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT;
-    }
-  }
-
-  private static String getServerAdminPortKey(String downloadScheme) {
+  private static String getAdminPortKey(String downloadScheme) {
     switch (downloadScheme) {
-      case CommonConstants.HTTPS_PROTOCOL:
-        return CommonConstants.Helix.Instance.ADMIN_HTTPS_PORT_KEY;
       case CommonConstants.HTTP_PROTOCOL:
+        return Instance.ADMIN_PORT_KEY;
+      case CommonConstants.HTTPS_PROTOCOL:
+        return Instance.ADMIN_HTTPS_PORT_KEY;
       default:
-        return CommonConstants.Helix.Instance.ADMIN_PORT_KEY;
+        throw new IllegalArgumentException("Unsupported download scheme: " + 
downloadScheme);
     }
   }
 }
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java
index 3159168dab..1a567901b9 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java
@@ -19,153 +19,97 @@
 package org.apache.pinot.common.utils.fetcher;
 
 import java.io.File;
-import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.List;
-import org.apache.helix.HelixManager;
-import org.apache.pinot.common.exception.HttpErrorStatusException;
+import java.util.function.Supplier;
+import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.core.util.PeerServerSegmentFinder;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
-import org.mockito.MockedStatic;
-import org.testng.Assert;
-import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
 
 
 public class HttpSegmentFetcherTest {
-  private MockedStatic<PeerServerSegmentFinder> _peerServerSegmentFinder = 
mockStatic(PeerServerSegmentFinder.class);
+  private static final String SEGMENT_NAME = "testSegment";
+  private static final File SEGMENT_FILE = new 
File(FileUtils.getTempDirectory(), SEGMENT_NAME);
+
   private PinotConfiguration _fetcherConfig;
 
-  @BeforeSuite
-  public void initTest() {
+  @BeforeClass
+  public void setUp() {
     _fetcherConfig = new PinotConfiguration();
     _fetcherConfig.setProperty(BaseSegmentFetcher.RETRY_COUNT_CONFIG_KEY, 3);
+    _fetcherConfig.setProperty(BaseSegmentFetcher.RETRY_WAIT_MS_CONFIG_KEY, 
10);
+    
_fetcherConfig.setProperty(BaseSegmentFetcher.RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY,
 1.1);
+  }
+
+  private HttpSegmentFetcher getSegmentFetcher(FileUploadDownloadClient 
client) {
+    HttpSegmentFetcher segmentFetcher = new HttpSegmentFetcher();
+    segmentFetcher.setHttpClient(client);
+    segmentFetcher.init(_fetcherConfig);
+    return segmentFetcher;
   }
 
   @Test
   public void testFetchSegmentToLocalSucceedAtFirstAttempt()
-      throws URISyntaxException, IOException, HttpErrorStatusException {
+      throws Exception {
     FileUploadDownloadClient client = mock(FileUploadDownloadClient.class);
     when(client.downloadFile(any(), any(), any())).thenReturn(200);
-    HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, 
_fetcherConfig);
-    HelixManager helixManager = mock(HelixManager.class);
-
-    List<URI> uris = new ArrayList<>();
-    uris.add(new URI("http://h1:8080";));
-    uris.add(new URI("http://h2:8080";));
-    _peerServerSegmentFinder.when(() -> 
PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any()))
-        .thenReturn(uris);
-    try {
-      httpSegmentFetcher.fetchSegmentToLocal("seg",
-          () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", 
helixManager), new File("/file"));
-    } catch (Exception e) {
-      // If we reach here, the download fails.
-      Assert.assertTrue(false, "Download segment failed");
-      Assert.assertTrue(e instanceof AttemptsExceededException);
-    }
-    _peerServerSegmentFinder.reset();
+    HttpSegmentFetcher segmentFetcher = getSegmentFetcher(client);
+    List<URI> uris = List.of(new URI("http://h1:8080";), new 
URI("http://h2:8080";));
+    segmentFetcher.fetchSegmentToLocal(SEGMENT_NAME, () -> uris, SEGMENT_FILE);
   }
 
-  @Test
+  @Test(expectedExceptions = AttemptsExceededException.class)
   public void testFetchSegmentToLocalAllDownloadAttemptsFailed()
-      throws URISyntaxException, IOException, HttpErrorStatusException {
+      throws Exception {
     FileUploadDownloadClient client = mock(FileUploadDownloadClient.class);
-    // All three attempts fails.
-    when(client.downloadFile(any(), any(), 
any())).thenReturn(300).thenReturn(300).thenReturn(300);
-    HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, 
_fetcherConfig);
-    HelixManager helixManager = mock(HelixManager.class);
-    List<URI> uris = new ArrayList<>();
-    uris.add(new URI("http://h1:8080";));
-    uris.add(new URI("http://h2:8080";));
-
-    _peerServerSegmentFinder.when(() -> 
PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any()))
-        .thenReturn(uris);
-    try {
-      httpSegmentFetcher.fetchSegmentToLocal("seg",
-          () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", 
helixManager), new File("/file"));
-      // The test should not reach here because the fetch will throw exception.
-      Assert.assertTrue(false, "Download segment failed");
-    } catch (Exception e) {
-      // If we reach here, the download fails.
-      Assert.assertTrue(true, "Download segment failed");
-    }
+    // All attempts failed
+    when(client.downloadFile(any(), any(), any())).thenReturn(300);
+    HttpSegmentFetcher segmentFetcher = getSegmentFetcher(client);
+    List<URI> uris = List.of(new URI("http://h1:8080";), new 
URI("http://h2:8080";));
+    segmentFetcher.fetchSegmentToLocal(SEGMENT_NAME, () -> uris, SEGMENT_FILE);
   }
 
   @Test
   public void testFetchSegmentToLocalSuccessAfterRetry()
-      throws URISyntaxException, IOException, HttpErrorStatusException {
+      throws Exception {
     FileUploadDownloadClient client = mock(FileUploadDownloadClient.class);
-    // the first two attempts failed until the last attempt succeeds
+    // The first two attempts failed and the last attempt succeeded
     when(client.downloadFile(any(), any(), 
any())).thenReturn(300).thenReturn(300).thenReturn(200);
-    HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, 
_fetcherConfig);
-    HelixManager helixManager = mock(HelixManager.class);
-    List<URI> uris = new ArrayList<>();
-    uris.add(new URI("http://h1:8080";));
-    uris.add(new URI("http://h2:8080";));
-
-    _peerServerSegmentFinder.when(() -> 
PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any()))
-        .thenReturn(uris);
-    try {
-      httpSegmentFetcher.fetchSegmentToLocal("seg",
-          () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", 
helixManager), new File("/file"));
-    } catch (Exception e) {
-      // If we reach here, the download fails.
-      Assert.assertTrue(false, "Download segment failed");
-    }
+    HttpSegmentFetcher segmentFetcher = getSegmentFetcher(client);
+    List<URI> uris = List.of(new URI("http://h1:8080";), new 
URI("http://h2:8080";));
+    segmentFetcher.fetchSegmentToLocal(SEGMENT_NAME, () -> uris, SEGMENT_FILE);
   }
 
   @Test
   public void 
testFetchSegmentToLocalSuccessAfterFirstTwoAttemptsFoundNoPeerServers()
-      throws URISyntaxException, IOException, HttpErrorStatusException {
+      throws Exception {
     FileUploadDownloadClient client = mock(FileUploadDownloadClient.class);
-    //  The download always succeeds.
+    // The download always succeeds
     when(client.downloadFile(any(), any(), any())).thenReturn(200);
-    HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, 
_fetcherConfig);
-    HelixManager helixManager = mock(HelixManager.class);
-    List<URI> uris = new ArrayList<>();
-    uris.add(new URI("http://h1:8080";));
-    uris.add(new URI("http://h2:8080";));
-
-    // The first two attempts find NO peers hosting the segment but the last 
one found two servers.
-    _peerServerSegmentFinder.when(() -> 
PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any()))
-        .thenReturn(List.of()).thenReturn(List.of()).thenReturn(uris);
-    try {
-      httpSegmentFetcher.fetchSegmentToLocal("seg",
-          () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", 
helixManager), new File("/file"));
-    } catch (Exception e) {
-      // If we reach here, the download fails.
-      Assert.assertTrue(false, "Download segment failed");
-    }
+    HttpSegmentFetcher segmentFetcher = getSegmentFetcher(client);
+    List<URI> uris = List.of(new URI("http://h1:8080";), new 
URI("http://h2:8080";));
+    // The first two attempts found NO peers hosting the segment, and the last 
one found two servers
+    //noinspection unchecked
+    Supplier<List<URI>> uriSupplier = mock(Supplier.class);
+    
when(uriSupplier.get()).thenReturn(List.of()).thenReturn(List.of()).thenReturn(uris);
+    segmentFetcher.fetchSegmentToLocal(SEGMENT_NAME, uriSupplier, 
SEGMENT_FILE);
   }
 
-  @Test
+  @Test(expectedExceptions = AttemptsExceededException.class)
   public void testFetchSegmentToLocalFailureWithNoPeerServers()
-      throws IOException, HttpErrorStatusException {
+      throws Exception {
     FileUploadDownloadClient client = mock(FileUploadDownloadClient.class);
-    // the download always succeeds.
+    // The download always succeeds
     when(client.downloadFile(any(), any(), any())).thenReturn(200);
-    HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, 
_fetcherConfig);
-    HelixManager helixManager = mock(HelixManager.class);
-
-    _peerServerSegmentFinder.when(() -> 
PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any()))
-        .thenReturn(List.of()).thenReturn(List.of()).thenReturn(List.of());
-    try {
-      httpSegmentFetcher.fetchSegmentToLocal("seg",
-          () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", 
helixManager), new File("/file"));
-      // The test should not reach here because the fetch will throw exception.
-      Assert.assertTrue(false, "Download segment failed");
-    } catch (Exception e) {
-      Assert.assertTrue(true, "Download segment failed");
-      Assert.assertTrue(e instanceof AttemptsExceededException);
-    }
+    HttpSegmentFetcher segmentFetcher = getSegmentFetcher(client);
+    List<URI> uris = List.of();
+    segmentFetcher.fetchSegmentToLocal(SEGMENT_NAME, () -> uris, SEGMENT_FILE);
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 25e40084ab..838a03a268 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -1483,7 +1483,8 @@ public class PinotLLCRealtimeSegmentManager {
           LOGGER.info("Fixing LLC segment {} whose deep store copy is 
unavailable", segmentName);
           // Find servers which have online replica
           List<URI> peerSegmentURIs =
-              PeerServerSegmentFinder.getPeerServerURIs(segmentName, 
CommonConstants.HTTP_PROTOCOL, _helixManager);
+              PeerServerSegmentFinder.getPeerServerURIs(_helixManager, 
realtimeTableName, segmentName,
+                  CommonConstants.HTTP_PROTOCOL);
           if (peerSegmentURIs.isEmpty()) {
             throw new IllegalStateException(
                 String.format("Failed to upload segment %s to deep store 
because no online replica is found",
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 60b83ba24a..f0496a8ee7 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -43,7 +43,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
 import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -75,10 +74,10 @@ import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
+import org.apache.pinot.spi.utils.CommonConstants.Helix.Instance;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
-import org.apache.pinot.spi.utils.StringUtil;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
@@ -91,8 +90,6 @@ import org.testng.annotations.Test;
 import static 
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION;
 import static 
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS;
 import static 
org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.*;
@@ -101,6 +98,7 @@ import static org.testng.Assert.*;
 public class PinotLLCRealtimeSegmentManagerTest {
   private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"PinotLLCRealtimeSegmentManagerTest");
   private static final String SCHEME = "file:";
+  private static final String CLUSTER_NAME = "testCluster";
   private static final String RAW_TABLE_NAME = "testTable";
   private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
 
@@ -927,13 +925,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
         (ZkHelixPropertyStore<ZNRecord>) mock(ZkHelixPropertyStore.class);
     
when(pinotHelixResourceManager.getHelixZkManager()).thenReturn(helixManager);
     when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
-    when(helixManager.getClusterName()).thenReturn("cluster_name");
+    when(helixManager.getClusterName()).thenReturn(CLUSTER_NAME);
     
when(pinotHelixResourceManager.getPropertyStore()).thenReturn(zkHelixPropertyStore);
 
     // init fake PinotLLCRealtimeSegmentManager
     ControllerConf controllerConfig = new ControllerConf();
-    controllerConfig.setProperty(
-        
ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT,
 true);
+    
controllerConfig.setProperty(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT,
+        true);
     controllerConfig.setDataDir(TEMP_DIR.toString());
     FakePinotLLCRealtimeSegmentManager segmentManager =
         new FakePinotLLCRealtimeSegmentManager(pinotHelixResourceManager, 
controllerConfig);
@@ -946,19 +944,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
     
segmentsValidationAndRetentionConfig.setRetentionTimeUnit(TimeUnit.DAYS.toString());
     segmentsValidationAndRetentionConfig.setRetentionTimeValue("3");
     
segmentManager._tableConfig.setValidationConfig(segmentsValidationAndRetentionConfig);
-    List<SegmentZKMetadata> segmentsZKMetadata =
-        new ArrayList<>(segmentManager._segmentZKMetadataMap.values());
+    List<SegmentZKMetadata> segmentsZKMetadata = new 
ArrayList<>(segmentManager._segmentZKMetadataMap.values());
     Assert.assertEquals(segmentsZKMetadata.size(), 5);
 
     // Set up external view for this table
     ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME);
-    when(helixAdmin.getResourceExternalView("cluster_name", 
REALTIME_TABLE_NAME))
-        .thenReturn(externalView);
-    when(helixAdmin.getConfigKeys(any(HelixConfigScope.class))).thenReturn(new 
ArrayList<>());
-    String adminPort = "2077";
-    Map<String, String> instanceConfigMap = new HashMap<>();
-    instanceConfigMap.put(CommonConstants.Helix.Instance.ADMIN_PORT_KEY, 
adminPort);
-    when(helixAdmin.getConfig(any(HelixConfigScope.class), 
any(List.class))).thenReturn(instanceConfigMap);
+    when(helixAdmin.getResourceExternalView(CLUSTER_NAME, 
REALTIME_TABLE_NAME)).thenReturn(externalView);
 
     // Change 1st segment status to be DONE, but with default peer download 
url.
     // Verify later the download url is fixed after upload success.
@@ -966,28 +957,26 @@ public class PinotLLCRealtimeSegmentManagerTest {
     segmentsZKMetadata.get(0).setDownloadUrl(METADATA_URI_FOR_PEER_DOWNLOAD);
     // set up the external view for 1st segment
     String instance0 = "instance0";
+    int adminPort = 2077;
     externalView.setState(segmentsZKMetadata.get(0).getSegmentName(), 
instance0, "ONLINE");
     InstanceConfig instanceConfig0 = new InstanceConfig(instance0);
     instanceConfig0.setHostName(instance0);
-    when(helixAdmin.getInstanceConfig(any(String.class), 
eq(instance0))).thenReturn(instanceConfig0);
+    instanceConfig0.getRecord().setIntField(Instance.ADMIN_PORT_KEY, 
adminPort);
+    when(helixAdmin.getInstanceConfig(CLUSTER_NAME, 
instance0)).thenReturn(instanceConfig0);
     // mock the request/response for 1st segment upload
-    String serverUploadRequestUrl0 = StringUtil
-        .join("/",
-            CommonConstants.HTTP_PROTOCOL + "://" + instance0 + ":" + 
adminPort,
-            "segments",
-            REALTIME_TABLE_NAME,
-            segmentsZKMetadata.get(0).getSegmentName(),
-            "upload") + "?uploadTimeoutMs=-1";
+    String serverUploadRequestUrl0 =
+        String.format("http://%s:%d/segments/%s/%s/upload?uploadTimeoutMs=-1";, 
instance0, adminPort,
+            REALTIME_TABLE_NAME, segmentsZKMetadata.get(0).getSegmentName());
     // tempSegmentFileLocation is the location where the segment uploader will 
upload the segment. This usually ends
     // with a random UUID
     File tempSegmentFileLocation = new File(TEMP_DIR, 
segmentsZKMetadata.get(0).getSegmentName() + UUID.randomUUID());
     FileUtils.write(tempSegmentFileLocation, "test");
     // After the deep-store retry task gets the segment location returned by 
Pinot server, it will move the segment to
     // its final location. This is the expected segment location.
-    String expectedSegmentLocation = 
segmentManager.createSegmentPath(RAW_TABLE_NAME,
-        segmentsZKMetadata.get(0).getSegmentName()).toString();
-    when(segmentManager._mockedFileUploadDownloadClient
-        
.uploadToSegmentStore(serverUploadRequestUrl0)).thenReturn(tempSegmentFileLocation.getPath());
+    String expectedSegmentLocation =
+        segmentManager.createSegmentPath(RAW_TABLE_NAME, 
segmentsZKMetadata.get(0).getSegmentName()).toString();
+    
when(segmentManager._mockedFileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl0)).thenReturn(
+        tempSegmentFileLocation.getPath());
 
     // Change 2nd segment status to be DONE, but with default peer download 
url.
     // Verify later the download url isn't fixed after upload failure.
@@ -998,25 +987,20 @@ public class PinotLLCRealtimeSegmentManagerTest {
     externalView.setState(segmentsZKMetadata.get(1).getSegmentName(), 
instance1, "ONLINE");
     InstanceConfig instanceConfig1 = new InstanceConfig(instance1);
     instanceConfig1.setHostName(instance1);
-    when(helixAdmin.getInstanceConfig(any(String.class), 
eq(instance1))).thenReturn(instanceConfig1);
+    instanceConfig1.getRecord().setIntField(Instance.ADMIN_PORT_KEY, 
adminPort);
+    when(helixAdmin.getInstanceConfig(CLUSTER_NAME, 
instance1)).thenReturn(instanceConfig1);
     // mock the request/response for 2nd segment upload
-    String serverUploadRequestUrl1 = StringUtil
-        .join("/",
-            CommonConstants.HTTP_PROTOCOL + "://" + instance1 + ":" + 
adminPort,
-            "segments",
-            REALTIME_TABLE_NAME,
-            segmentsZKMetadata.get(1).getSegmentName(),
-            "upload") + "?uploadTimeoutMs=-1";
-    when(segmentManager._mockedFileUploadDownloadClient
-        .uploadToSegmentStore(serverUploadRequestUrl1))
-        .thenThrow(new HttpErrorStatusException(
-            "failed to upload segment", 
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()));
+    String serverUploadRequestUrl1 =
+        String.format("http://%s:%d/segments/%s/%s/upload?uploadTimeoutMs=-1";, 
instance1, adminPort,
+            REALTIME_TABLE_NAME, segmentsZKMetadata.get(1).getSegmentName());
+    
when(segmentManager._mockedFileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl1)).thenThrow(
+        new HttpErrorStatusException("failed to upload segment",
+            Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()));
 
     // Change 3rd segment status to be DONE, but with default peer download 
url.
     // Verify later the download url isn't fixed because no ONLINE replica 
found in any server.
     segmentsZKMetadata.get(2).setStatus(Status.DONE);
-    segmentsZKMetadata.get(2).setDownloadUrl(
-        METADATA_URI_FOR_PEER_DOWNLOAD);
+    segmentsZKMetadata.get(2).setDownloadUrl(METADATA_URI_FOR_PEER_DOWNLOAD);
     // set up the external view for 3rd segment
     String instance2 = "instance2";
     externalView.setState(segmentsZKMetadata.get(2).getSegmentName(), 
instance2, "OFFLINE");
@@ -1029,11 +1013,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     // Keep 5th segment status as IN_PROGRESS.
 
-    List<String> segmentNames = segmentsZKMetadata.stream()
-        .map(SegmentZKMetadata::getSegmentName).collect(Collectors.toList());
-    when(pinotHelixResourceManager.getTableConfig(REALTIME_TABLE_NAME))
-        .thenReturn(segmentManager._tableConfig);
-
+    List<String> segmentNames =
+        
segmentsZKMetadata.stream().map(SegmentZKMetadata::getSegmentName).collect(Collectors.toList());
+    
when(pinotHelixResourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(segmentManager._tableConfig);
 
     // Verify the result
     segmentManager.uploadToDeepStoreIfMissing(segmentManager._tableConfig, 
segmentsZKMetadata);
@@ -1042,23 +1024,18 @@ public class PinotLLCRealtimeSegmentManagerTest {
     TestUtils.waitForCondition(aVoid -> 
segmentManager.deepStoreUploadExecutorPendingSegmentsIsEmpty(), 30_000L,
         "Timed out waiting for upload retry tasks to finish");
 
-    assertEquals(
-        segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
segmentNames.get(0), null).getDownloadUrl(),
+    assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
segmentNames.get(0), null).getDownloadUrl(),
         expectedSegmentLocation);
     assertFalse(tempSegmentFileLocation.exists(),
         "Deep-store retry task should move the file from temp location to 
permanent location");
 
-    assertEquals(
-        segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
segmentNames.get(1), null).getDownloadUrl(),
+    assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
segmentNames.get(1), null).getDownloadUrl(),
         METADATA_URI_FOR_PEER_DOWNLOAD);
-    assertEquals(
-        segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
segmentNames.get(2), null).getDownloadUrl(),
+    assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
segmentNames.get(2), null).getDownloadUrl(),
         METADATA_URI_FOR_PEER_DOWNLOAD);
-    assertEquals(
-        segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
segmentNames.get(3), null).getDownloadUrl(),
+    assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
segmentNames.get(3), null).getDownloadUrl(),
         defaultDownloadUrl);
-    assertNull(
-        segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
segmentNames.get(4), null).getDownloadUrl());
+    assertNull(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
segmentNames.get(4), null).getDownloadUrl());
   }
 
   @Test
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index c46a85690d..1237db547a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -153,6 +153,13 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     if (_peerDownloadScheme == null) {
       _peerDownloadScheme = 
instanceDataManagerConfig.getSegmentPeerDownloadScheme();
     }
+    if (_peerDownloadScheme != null) {
+      _peerDownloadScheme = _peerDownloadScheme.toLowerCase();
+      Preconditions.checkState(
+          CommonConstants.HTTP_PROTOCOL.equals(_peerDownloadScheme) || 
CommonConstants.HTTPS_PROTOCOL.equals(
+              _peerDownloadScheme), "Unsupported peer download scheme: %s for 
table: %s", _peerDownloadScheme,
+          _tableNameWithType);
+    }
 
     _streamSegmentDownloadUntarRateLimitBytesPerSec =
         instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit();
@@ -691,7 +698,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
       throws Exception {
     Preconditions.checkState(_peerDownloadScheme != null, "Download peers 
require non null peer download scheme");
     List<URI> peerSegmentURIs =
-        PeerServerSegmentFinder.getPeerServerURIs(segmentName, 
_peerDownloadScheme, _helixManager, _tableNameWithType);
+        PeerServerSegmentFinder.getPeerServerURIs(_helixManager, 
_tableNameWithType, segmentName, _peerDownloadScheme);
     if (peerSegmentURIs.isEmpty()) {
       String msg = String.format("segment %s doesn't have any peers", 
segmentName);
       LOGGER.warn(msg);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 8e50049028..b120867d6b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -635,17 +635,15 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
       } catch (Exception e) {
         _logger.warn("Download segment {} from deepstore uri {} failed.", 
segmentName, uri, e);
         // Download from deep store failed; try to download from peer if peer 
download is setup for the table.
-        if (isPeerSegmentDownloadEnabled(tableConfig)) {
-          downloadSegmentFromPeer(segmentName, 
tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(),
-              indexLoadingConfig);
+        if (_peerDownloadScheme != null) {
+          downloadSegmentFromPeer(segmentName, indexLoadingConfig);
         } else {
           throw e;
         }
       }
     } else {
-      if (isPeerSegmentDownloadEnabled(tableConfig)) {
-        downloadSegmentFromPeer(segmentName, 
tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(),
-            indexLoadingConfig);
+      if (_peerDownloadScheme != null) {
+        downloadSegmentFromPeer(segmentName, indexLoadingConfig);
       } else {
         throw new RuntimeException("Peer segment download not enabled for 
segment " + segmentName);
       }
@@ -687,23 +685,16 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     replaceLLSegment(segmentName, indexLoadingConfig);
   }
 
-  private boolean isPeerSegmentDownloadEnabled(TableConfig tableConfig) {
-    return
-        
CommonConstants.HTTP_PROTOCOL.equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme())
-            || CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(
-            tableConfig.getValidationConfig().getPeerSegmentDownloadScheme());
-  }
-
-  private void downloadSegmentFromPeer(String segmentName, String 
downloadScheme,
-      IndexLoadingConfig indexLoadingConfig) {
+  private void downloadSegmentFromPeer(String segmentName, IndexLoadingConfig 
indexLoadingConfig) {
     File tempRootDir = null;
     try {
       tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + 
System.currentTimeMillis());
       File segmentTarFile = new File(tempRootDir, segmentName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
       // Next download the segment from a randomly chosen server using 
configured download scheme (http or https).
-      
SegmentFetcherFactory.getSegmentFetcher(downloadScheme).fetchSegmentToLocal(segmentName,
 () -> {
+      
SegmentFetcherFactory.getSegmentFetcher(_peerDownloadScheme).fetchSegmentToLocal(segmentName,
 () -> {
         List<URI> peerServerURIs =
-            PeerServerSegmentFinder.getPeerServerURIs(segmentName, 
downloadScheme, _helixManager);
+            PeerServerSegmentFinder.getPeerServerURIs(_helixManager, 
_tableNameWithType, segmentName,
+                _peerDownloadScheme);
         Collections.shuffle(peerServerURIs);
         return peerServerURIs;
       }, segmentTarFile);
@@ -711,7 +702,8 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
           segmentTarFile.length());
       untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, 
tempRootDir);
     } catch (Exception e) {
-      _logger.warn("Download and move segment {} from peer with scheme {} 
failed.", segmentName, downloadScheme, e);
+      _logger.warn("Download and move segment {} from peer with scheme {} 
failed.", segmentName, _peerDownloadScheme,
+          e);
       throw new RuntimeException(e);
     } finally {
       FileUtils.deleteQuietly(tempRootDir);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index d4c5f4fc29..261fe0f238 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -660,8 +660,8 @@ public class BaseTableDataManagerTest {
     File destFile = new File(tempRootDir, "seg01" + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
     try (MockedStatic<PeerServerSegmentFinder> mockPeerSegFinder = 
mockStatic(PeerServerSegmentFinder.class)) {
       mockPeerSegFinder.when(
-              () -> PeerServerSegmentFinder.getPeerServerURIs("seg01", "http", 
helixManager, TABLE_NAME_WITH_TYPE))
-          .thenReturn(Collections.singletonList(uri));
+          () -> PeerServerSegmentFinder.getPeerServerURIs(helixManager, 
TABLE_NAME_WITH_TYPE, "seg01",
+              CommonConstants.HTTP_PROTOCOL)).thenReturn(List.of(uri));
       tmgr.downloadFromPeersWithoutStreaming("seg01", 
mock(SegmentZKMetadata.class), destFile);
     }
     assertEquals(FileUtils.readFileToString(destFile), "this is from somewhere 
remote");
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/util/PeerServerSegmentFinderTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/util/PeerServerSegmentFinderTest.java
index 4b6c6fb910..2af972695f 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/util/PeerServerSegmentFinderTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/util/PeerServerSegmentFinderTest.java
@@ -19,103 +19,93 @@
 package org.apache.pinot.core.util;
 
 import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
 import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.StringUtil;
-import org.testng.Assert;
+import org.apache.pinot.spi.utils.CommonConstants.Helix.Instance;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 
 
 public class PeerServerSegmentFinderTest {
-  private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME";
-  private static final String SEGMENT_1 = "testTable__0__0__t11";
-  private static final String SEGMENT_2 = "testTable__0__1__t11";
-  private static final String CLUSTER_NAME = "dummyCluster";
-  private static final String INSTANCE_ID1 = "Server_localhost_1000";
-  private static final String INSTANCE_ID2 = "Server_localhost_1001";
-  private static final String INSTANCE_ID3 = "Server_localhost_1003";
-  public static final String ADMIN_PORT = "1008";
-  public static final String HOST_1_NAME = "s1";
-  public static final String HOST_2_NAME = "s2";
-  public static final String HOST_3_NAME = "s3";
+  private static final String CLUSTER_NAME = "testCluster";
+  private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
+  private static final String SEGMENT_1 = "testSegment1";
+  private static final String SEGMENT_2 = "testSegment2";
+  private static final String INSTANCE_ID_1 = "Server_s1_1007";
+  private static final String INSTANCE_ID_2 = "Server_s2_1007";
+  private static final String INSTANCE_ID_3 = "Server_s3_1007";
+  private static final String HOSTNAME_1 = "s1";
+  private static final String HOSTNAME_2 = "s2";
+  private static final String HOSTNAME_3 = "s3";
+  private static final int HELIX_PORT = 1007;
+  private static final int HTTP_ADMIN_PORT = 1008;
+  private static final int HTTPS_ADMIN_PORT = 1009;
+
   private HelixManager _helixManager;
 
   @BeforeClass
-  public void initSegmentFetcherFactoryWithPeerServerSegmentFetcher()
-      throws Exception {
-    HelixAdmin helixAdmin;
-    {
-      ExternalView ev = new ExternalView(TABLE_NAME_WITH_TYPE);
-      ev.setState(SEGMENT_1, INSTANCE_ID1, "ONLINE");
-      ev.setState(SEGMENT_1, INSTANCE_ID2, "OFFLINE");
-      ev.setState(SEGMENT_1, INSTANCE_ID3, "ONLINE");
-      ev.setState(SEGMENT_2, INSTANCE_ID1, "OFFLINE");
-      ev.setState(SEGMENT_2, INSTANCE_ID2, "OFFLINE");
-      _helixManager = mock(HelixManager.class);
-      helixAdmin = mock(HelixAdmin.class);
-      when(_helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
-      when(_helixManager.getClusterName()).thenReturn(CLUSTER_NAME);
-      when(helixAdmin.getResourceExternalView(CLUSTER_NAME, 
TABLE_NAME_WITH_TYPE)).thenReturn(ev);
-      
when(helixAdmin.getConfigKeys(any(HelixConfigScope.class))).thenReturn(new 
ArrayList<>());
-      Map<String, String> instanceConfigMap = new HashMap<>();
-      instanceConfigMap.put(CommonConstants.Helix.Instance.ADMIN_PORT_KEY, 
ADMIN_PORT);
-      when(helixAdmin.getConfig(any(HelixConfigScope.class), 
any(List.class))).thenReturn(instanceConfigMap);
-      InstanceConfig instanceConfig1 = new InstanceConfig(INSTANCE_ID1);
-      instanceConfig1.setHostName(HOST_1_NAME);
-      instanceConfig1.setPort("1000");
-      when(helixAdmin.getInstanceConfig(any(String.class), 
eq(INSTANCE_ID1))).thenReturn(instanceConfig1);
+  public void initSegmentFetcherFactoryWithPeerServerSegmentFetcher() {
+    ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME);
+    externalView.setState(SEGMENT_1, INSTANCE_ID_1, "ONLINE");
+    externalView.setState(SEGMENT_1, INSTANCE_ID_2, "OFFLINE");
+    externalView.setState(SEGMENT_1, INSTANCE_ID_3, "ONLINE");
+    externalView.setState(SEGMENT_2, INSTANCE_ID_1, "OFFLINE");
+    externalView.setState(SEGMENT_2, INSTANCE_ID_2, "OFFLINE");
 
-      InstanceConfig instanceConfig2 = new InstanceConfig(INSTANCE_ID2);
-      instanceConfig2.setHostName(HOST_2_NAME);
-      instanceConfig2.setPort("1000");
-      when(helixAdmin.getInstanceConfig(any(String.class), 
eq(INSTANCE_ID2))).thenReturn(instanceConfig2);
+    _helixManager = mock(HelixManager.class);
+    HelixAdmin helixAdmin = mock(HelixAdmin.class);
+    when(_helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+    when(_helixManager.getClusterName()).thenReturn(CLUSTER_NAME);
+    when(helixAdmin.getResourceExternalView(CLUSTER_NAME, 
REALTIME_TABLE_NAME)).thenReturn(externalView);
+    when(helixAdmin.getInstanceConfig(CLUSTER_NAME, INSTANCE_ID_1)).thenReturn(
+        getInstanceConfig(INSTANCE_ID_1, HOSTNAME_1));
+    when(helixAdmin.getInstanceConfig(CLUSTER_NAME, INSTANCE_ID_2)).thenReturn(
+        getInstanceConfig(INSTANCE_ID_2, HOSTNAME_2));
+    when(helixAdmin.getInstanceConfig(CLUSTER_NAME, INSTANCE_ID_3)).thenReturn(
+        getInstanceConfig(INSTANCE_ID_3, HOSTNAME_3));
+  }
 
-      InstanceConfig instanceConfig3 = new InstanceConfig(INSTANCE_ID3);
-      instanceConfig3.setHostName(HOST_3_NAME);
-      instanceConfig3.setPort("1000");
-      when(helixAdmin.getInstanceConfig(any(String.class), 
eq(INSTANCE_ID3))).thenReturn(instanceConfig3);
-    }
+  private static InstanceConfig getInstanceConfig(String instanceId, String 
hostName) {
+    InstanceConfig instanceConfig = new InstanceConfig(instanceId);
+    instanceConfig.setHostName(hostName);
+    instanceConfig.setPort(Integer.toString(HELIX_PORT));
+    instanceConfig.getRecord().setIntField(Instance.ADMIN_PORT_KEY, 
HTTP_ADMIN_PORT);
+    instanceConfig.getRecord().setIntField(Instance.ADMIN_HTTPS_PORT_KEY, 
HTTPS_ADMIN_PORT);
+    return instanceConfig;
   }
 
   @Test
   public void testSegmentFoundSuccessfully()
       throws Exception {
     // SEGMENT_1 has only 2 online replicas.
-    List<URI> httpServerURIs =
-        PeerServerSegmentFinder.getPeerServerURIs(SEGMENT_1, 
CommonConstants.HTTP_PROTOCOL, _helixManager);
-    assertEquals(2, httpServerURIs.size());
-    httpServerURIs.contains(new URI(
-        StringUtil.join("/", "http://"; + HOST_1_NAME + ":" + ADMIN_PORT, 
"segments", TABLE_NAME_WITH_TYPE, SEGMENT_1)));
-    httpServerURIs.contains(new URI(
-        StringUtil.join("/", "http://"; + HOST_3_NAME + ":" + ADMIN_PORT, 
"segments", TABLE_NAME_WITH_TYPE, SEGMENT_1)));
-    List<URI> httpsServerURIs =
-        PeerServerSegmentFinder.getPeerServerURIs(SEGMENT_1, 
CommonConstants.HTTPS_PROTOCOL, _helixManager);
-    assertEquals(2, httpsServerURIs.size());
-    httpServerURIs.contains(new URI(StringUtil
-        .join("/", "https://"; + HOST_1_NAME + ":" + ADMIN_PORT, "segments", 
TABLE_NAME_WITH_TYPE, SEGMENT_1)));
-    httpServerURIs.contains(new URI(StringUtil
-        .join("/", "https://"; + HOST_3_NAME + ":" + ADMIN_PORT, "segments", 
TABLE_NAME_WITH_TYPE, SEGMENT_1)));
+    List<URI> httpServerURIs = 
PeerServerSegmentFinder.getPeerServerURIs(_helixManager, REALTIME_TABLE_NAME, 
SEGMENT_1,
+        CommonConstants.HTTP_PROTOCOL);
+    assertEquals(httpServerURIs.size(), 2);
+    assertTrue(httpServerURIs.contains(new URI(
+        String.format("http://%s:%d/segments/%s/%s";, HOSTNAME_1, 
HTTP_ADMIN_PORT, REALTIME_TABLE_NAME, SEGMENT_1))));
+    assertTrue(httpServerURIs.contains(new URI(
+        String.format("http://%s:%d/segments/%s/%s";, HOSTNAME_3, 
HTTP_ADMIN_PORT, REALTIME_TABLE_NAME, SEGMENT_1))));
+    List<URI> httpsServerURIs = 
PeerServerSegmentFinder.getPeerServerURIs(_helixManager, REALTIME_TABLE_NAME, 
SEGMENT_1,
+        CommonConstants.HTTPS_PROTOCOL);
+    assertEquals(httpsServerURIs.size(), 2);
+    assertTrue(httpsServerURIs.contains(new URI(
+        String.format("https://%s:%d/segments/%s/%s";, HOSTNAME_1, 
HTTPS_ADMIN_PORT, REALTIME_TABLE_NAME, SEGMENT_1))));
+    assertTrue(httpsServerURIs.contains(new URI(
+        String.format("https://%s:%d/segments/%s/%s";, HOSTNAME_3, 
HTTPS_ADMIN_PORT, REALTIME_TABLE_NAME, SEGMENT_1))));
   }
 
   @Test
-  public void testSegmentNotFound()
-      throws Exception {
-    Assert.assertEquals(0,
-        PeerServerSegmentFinder.getPeerServerURIs(SEGMENT_2, 
CommonConstants.HTTP_PROTOCOL, _helixManager).size());
+  public void testSegmentNotFound() {
+    assertTrue(PeerServerSegmentFinder.getPeerServerURIs(_helixManager, 
REALTIME_TABLE_NAME, SEGMENT_2,
+        CommonConstants.HTTP_PROTOCOL).isEmpty());
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/ExponentialBackoffRetryPolicy.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/ExponentialBackoffRetryPolicy.java
index 6151aab06f..e5b9b7dc1a 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/ExponentialBackoffRetryPolicy.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/ExponentialBackoffRetryPolicy.java
@@ -39,8 +39,8 @@ public class ExponentialBackoffRetryPolicy extends 
BaseRetryPolicy {
 
   @Override
   protected long getDelayMs(int currentAttempt) {
-    double minDelayMs = _initialDelayMs * Math.pow(_delayScaleFactor, 
currentAttempt);
-    double maxDelayMs = minDelayMs * _delayScaleFactor;
-    return _random.nextLong((long) minDelayMs, (long) maxDelayMs);
+    long minDelayMs = (long) (_initialDelayMs * Math.pow(_delayScaleFactor, 
currentAttempt));
+    long maxDelayMs = (long) (minDelayMs * _delayScaleFactor);
+    return minDelayMs < maxDelayMs ? _random.nextLong(minDelayMs, maxDelayMs) 
: minDelayMs;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to