This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ec452a49f3 Refine PeerServerSegmentFinder (#12933) ec452a49f3 is described below commit ec452a49f3c885308613bc45dfa44b48a16076ba Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Mon Apr 15 17:02:18 2024 -0700 Refine PeerServerSegmentFinder (#12933) --- .../common/utils/fetcher/BaseSegmentFetcher.java | 9 +- .../common/utils/fetcher/HttpSegmentFetcher.java | 28 ++-- .../pinot/core/util/PeerServerSegmentFinder.java | 101 ++++++-------- .../utils/fetcher/HttpSegmentFetcherTest.java | 152 +++++++-------------- .../realtime/PinotLLCRealtimeSegmentManager.java | 3 +- .../PinotLLCRealtimeSegmentManagerTest.java | 91 +++++------- .../core/data/manager/BaseTableDataManager.java | 9 +- .../manager/realtime/RealtimeTableDataManager.java | 28 ++-- .../data/manager/BaseTableDataManagerTest.java | 4 +- .../core/util/PeerServerSegmentFinderTest.java | 128 ++++++++--------- .../utils/retry/ExponentialBackoffRetryPolicy.java | 6 +- 11 files changed, 220 insertions(+), 339 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java index d33c7ead43..5fb82388f2 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/BaseSegmentFetcher.java @@ -42,13 +42,13 @@ public abstract class BaseSegmentFetcher implements SegmentFetcher { public static final String RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY = "retry.delay.scale.factor"; public static final int DEFAULT_RETRY_COUNT = 3; public static final int DEFAULT_RETRY_WAIT_MS = 100; - public static final int DEFAULT_RETRY_DELAY_SCALE_FACTOR = 5; + public static final double DEFAULT_RETRY_DELAY_SCALE_FACTOR = 5; protected final Logger _logger = LoggerFactory.getLogger(getClass().getSimpleName()); protected int _retryCount; protected int _retryWaitMs; - protected int _retryDelayScaleFactor; + protected double _retryDelayScaleFactor; protected AuthProvider _authProvider; @Override @@ -58,9 +58,8 @@ public abstract class BaseSegmentFetcher implements SegmentFetcher { _retryDelayScaleFactor = config.getProperty(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY, DEFAULT_RETRY_DELAY_SCALE_FACTOR); _authProvider = AuthProviderUtils.extractAuthProvider(config, CommonConstants.KEY_OF_AUTH); doInit(config); - _logger - .info("Initialized with retryCount: {}, retryWaitMs: {}, retryDelayScaleFactor: {}", _retryCount, _retryWaitMs, - _retryDelayScaleFactor); + _logger.info("Initialized with retryCount: {}, retryWaitMs: {}, retryDelayScaleFactor: {}", _retryCount, + _retryWaitMs, _retryDelayScaleFactor); } /** diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java index 170327dc5b..6872ac7714 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java @@ -44,23 +44,16 @@ import org.apache.pinot.spi.utils.retry.RetryPolicies; public class HttpSegmentFetcher extends BaseSegmentFetcher { protected FileUploadDownloadClient _httpClient; - @Override - protected void doInit(PinotConfiguration config) { - _httpClient = new FileUploadDownloadClient(HttpClientConfig.newBuilder(config).build()); - } - - public HttpSegmentFetcher() { - } - @VisibleForTesting - protected HttpSegmentFetcher(FileUploadDownloadClient httpClient, PinotConfiguration config) { + void setHttpClient(FileUploadDownloadClient httpClient) { _httpClient = httpClient; - _retryCount = config.getProperty(RETRY_COUNT_CONFIG_KEY, DEFAULT_RETRY_COUNT); - _retryWaitMs = config.getProperty(RETRY_WAIT_MS_CONFIG_KEY, DEFAULT_RETRY_WAIT_MS); - _retryDelayScaleFactor = config.getProperty(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY, DEFAULT_RETRY_DELAY_SCALE_FACTOR); - _logger - .info("Initialized with retryCount: {}, retryWaitMs: {}, retryDelayScaleFactor: {}", _retryCount, _retryWaitMs, - _retryDelayScaleFactor); + } + + @Override + protected void doInit(PinotConfiguration config) { + if (_httpClient == null) { + _httpClient = new FileUploadDownloadClient(HttpClientConfig.newBuilder(config).build()); + } } @Override @@ -87,9 +80,8 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher { httpHeaders.add(new BasicHeader(HttpHeaders.HOST, hostName + ":" + port)); } int statusCode = _httpClient.downloadFile(uri, dest, _authProvider, httpHeaders); - _logger - .info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, dest.length(), - statusCode); + _logger.info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, + dest.length(), statusCode); return true; } catch (HttpErrorStatusException e) { int statusCode = e.getStatusCode(); diff --git a/pinot-common/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java b/pinot-common/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java index e2c9d509f6..7f26d75935 100644 --- a/pinot-common/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java +++ b/pinot-common/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java @@ -19,21 +19,19 @@ package org.apache.pinot.core.util; import java.net.URI; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.commons.collections.ListUtils; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; import org.apache.helix.model.ExternalView; import org.apache.helix.model.InstanceConfig; -import org.apache.pinot.common.utils.LLCSegmentName; -import org.apache.pinot.common.utils.helix.HelixHelper; -import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.CommonConstants.Helix.Instance; +import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; +import org.apache.pinot.spi.utils.CommonConstants.Server; import org.apache.pinot.spi.utils.StringUtil; -import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.spi.utils.retry.AttemptsExceededException; import org.apache.pinot.spi.utils.retry.RetryPolicies; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,93 +45,74 @@ public class PeerServerSegmentFinder { private PeerServerSegmentFinder() { } - private static final Logger _logger = LoggerFactory.getLogger(PeerServerSegmentFinder.class); + private static final Logger LOGGER = LoggerFactory.getLogger(PeerServerSegmentFinder.class); private static final int MAX_NUM_ATTEMPTS = 5; private static final int INITIAL_DELAY_MS = 500; private static final double DELAY_SCALE_FACTOR = 2; /** - * - * @param segmentName - * @param downloadScheme Can be either http or https. - * @param helixManager - * @return a list of uri strings of the form http(s)://hostname:port/segments/tablenameWithType/segmentName - * for the servers hosting ONLINE segments; empty list if no such server found. + * Returns a list of URIs of the form 'http(s)://hostname:port/segments/tableNameWithType/segmentName' for the servers + * hosting ONLINE segments; empty list if no such server found. The download scheme can be either 'http' or 'https'. */ - public static List<URI> getPeerServerURIs(String segmentName, String downloadScheme, HelixManager helixManager) { - LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); - String tableNameWithType = - TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(llcSegmentName.getTableName()); - return getPeerServerURIs(segmentName, downloadScheme, helixManager, tableNameWithType); - } - - public static List<URI> getPeerServerURIs(String segmentName, String downloadScheme, - HelixManager helixManager, String tableNameWithType) { + public static List<URI> getPeerServerURIs(HelixManager helixManager, String tableNameWithType, String segmentName, + String downloadScheme) { HelixAdmin helixAdmin = helixManager.getClusterManagmentTool(); String clusterName = helixManager.getClusterName(); - if (clusterName == null) { - _logger.error("ClusterName not found"); - return ListUtils.EMPTY_LIST; - } - final List<URI> onlineServerURIs = new ArrayList<>(); + List<URI> onlineServerURIs = new ArrayList<>(); try { RetryPolicies.exponentialBackoffRetryPolicy(MAX_NUM_ATTEMPTS, INITIAL_DELAY_MS, DELAY_SCALE_FACTOR) .attempt(() -> { - getOnlineServersFromExternalView(segmentName, downloadScheme, tableNameWithType, helixAdmin, clusterName, + getOnlineServersFromExternalView(helixAdmin, clusterName, tableNameWithType, segmentName, downloadScheme, onlineServerURIs); return !onlineServerURIs.isEmpty(); }); + } catch (AttemptsExceededException e) { + LOGGER.error("Failed to find ONLINE servers for segment: {} in table: {} after {} attempts", segmentName, + tableNameWithType, MAX_NUM_ATTEMPTS); } catch (Exception e) { - _logger.error("Failure in getting online servers for segment {}", segmentName, e); + LOGGER.error("Caught exception while getting peer server URIs for segment: {} in table: {}", segmentName, + tableNameWithType, e); } return onlineServerURIs; } - private static void getOnlineServersFromExternalView(String segmentName, String downloadScheme, - String tableNameWithType, HelixAdmin helixAdmin, String clusterName, List<URI> onlineServerURIs) { - ExternalView externalViewForResource = - HelixHelper.getExternalViewForResource(helixAdmin, clusterName, tableNameWithType); - if (externalViewForResource == null) { - _logger.warn("External View not found for table {}", tableNameWithType); + private static void getOnlineServersFromExternalView(HelixAdmin helixAdmin, String clusterName, + String tableNameWithType, String segmentName, String downloadScheme, List<URI> onlineServerURIs) + throws Exception { + ExternalView externalView = helixAdmin.getResourceExternalView(clusterName, tableNameWithType); + if (externalView == null) { + LOGGER.warn("Failed to find external view for table: {}", tableNameWithType); return; } // Find out the ONLINE servers serving the segment. - Map<String, String> instanceToStateMap = externalViewForResource.getStateMap(segmentName); - for (Map.Entry<String, String> instanceState : instanceToStateMap.entrySet()) { - if ("ONLINE".equals(instanceState.getValue())) { + Map<String, String> instanceStateMap = externalView.getStateMap(segmentName); + if (instanceStateMap == null) { + LOGGER.warn("Failed to find segment: {} in table: {}", segmentName, tableNameWithType); + return; + } + for (Map.Entry<String, String> instanceState : instanceStateMap.entrySet()) { + if (SegmentStateModel.ONLINE.equals(instanceState.getValue())) { String instanceId = instanceState.getKey(); - _logger.info("Found ONLINE server {} for segment {}.", instanceId, segmentName); + LOGGER.info("Found ONLINE server: {} for segment: {} in table: {}", instanceId, segmentName, tableNameWithType); InstanceConfig instanceConfig = helixAdmin.getInstanceConfig(clusterName, instanceId); String hostName = instanceConfig.getHostName(); - int port = getServerAdminPort(helixAdmin, clusterName, instanceId, downloadScheme); - try { - onlineServerURIs.add(new URI(StringUtil - .join("/", downloadScheme + "://" + hostName + ":" + port, "segments", tableNameWithType, segmentName))); - } catch (URISyntaxException e) { - _logger.warn("Error in uri syntax: ", e); - } + String adminPortKey = getAdminPortKey(downloadScheme); + int port = instanceConfig.getRecord().getIntField(adminPortKey, Server.DEFAULT_ADMIN_API_PORT); + onlineServerURIs.add(new URI( + StringUtil.join("/", downloadScheme + "://" + hostName + ":" + port, "segments", tableNameWithType, + segmentName))); } } } - private static int getServerAdminPort(HelixAdmin helixAdmin, String clusterName, String instanceId, - String downloadScheme) { - try { - return Integer.parseInt(HelixHelper.getInstanceConfigsMapFor(instanceId, clusterName, helixAdmin) - .get(getServerAdminPortKey(downloadScheme))); - } catch (Exception e) { - _logger.warn("Failed to retrieve ADMIN PORT for instanceId {} in the cluster {} ", instanceId, clusterName, e); - return CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT; - } - } - - private static String getServerAdminPortKey(String downloadScheme) { + private static String getAdminPortKey(String downloadScheme) { switch (downloadScheme) { - case CommonConstants.HTTPS_PROTOCOL: - return CommonConstants.Helix.Instance.ADMIN_HTTPS_PORT_KEY; case CommonConstants.HTTP_PROTOCOL: + return Instance.ADMIN_PORT_KEY; + case CommonConstants.HTTPS_PROTOCOL: + return Instance.ADMIN_HTTPS_PORT_KEY; default: - return CommonConstants.Helix.Instance.ADMIN_PORT_KEY; + throw new IllegalArgumentException("Unsupported download scheme: " + downloadScheme); } } } diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java index 3159168dab..1a567901b9 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcherTest.java @@ -19,153 +19,97 @@ package org.apache.pinot.common.utils.fetcher; import java.io.File; -import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; import java.util.List; -import org.apache.helix.HelixManager; -import org.apache.pinot.common.exception.HttpErrorStatusException; +import java.util.function.Supplier; +import org.apache.commons.io.FileUtils; import org.apache.pinot.common.utils.FileUploadDownloadClient; -import org.apache.pinot.core.util.PeerServerSegmentFinder; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.retry.AttemptsExceededException; -import org.mockito.MockedStatic; -import org.testng.Assert; -import org.testng.annotations.BeforeSuite; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.when; public class HttpSegmentFetcherTest { - private MockedStatic<PeerServerSegmentFinder> _peerServerSegmentFinder = mockStatic(PeerServerSegmentFinder.class); + private static final String SEGMENT_NAME = "testSegment"; + private static final File SEGMENT_FILE = new File(FileUtils.getTempDirectory(), SEGMENT_NAME); + private PinotConfiguration _fetcherConfig; - @BeforeSuite - public void initTest() { + @BeforeClass + public void setUp() { _fetcherConfig = new PinotConfiguration(); _fetcherConfig.setProperty(BaseSegmentFetcher.RETRY_COUNT_CONFIG_KEY, 3); + _fetcherConfig.setProperty(BaseSegmentFetcher.RETRY_WAIT_MS_CONFIG_KEY, 10); + _fetcherConfig.setProperty(BaseSegmentFetcher.RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY, 1.1); + } + + private HttpSegmentFetcher getSegmentFetcher(FileUploadDownloadClient client) { + HttpSegmentFetcher segmentFetcher = new HttpSegmentFetcher(); + segmentFetcher.setHttpClient(client); + segmentFetcher.init(_fetcherConfig); + return segmentFetcher; } @Test public void testFetchSegmentToLocalSucceedAtFirstAttempt() - throws URISyntaxException, IOException, HttpErrorStatusException { + throws Exception { FileUploadDownloadClient client = mock(FileUploadDownloadClient.class); when(client.downloadFile(any(), any(), any())).thenReturn(200); - HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, _fetcherConfig); - HelixManager helixManager = mock(HelixManager.class); - - List<URI> uris = new ArrayList<>(); - uris.add(new URI("http://h1:8080")); - uris.add(new URI("http://h2:8080")); - _peerServerSegmentFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any())) - .thenReturn(uris); - try { - httpSegmentFetcher.fetchSegmentToLocal("seg", - () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", helixManager), new File("/file")); - } catch (Exception e) { - // If we reach here, the download fails. - Assert.assertTrue(false, "Download segment failed"); - Assert.assertTrue(e instanceof AttemptsExceededException); - } - _peerServerSegmentFinder.reset(); + HttpSegmentFetcher segmentFetcher = getSegmentFetcher(client); + List<URI> uris = List.of(new URI("http://h1:8080"), new URI("http://h2:8080")); + segmentFetcher.fetchSegmentToLocal(SEGMENT_NAME, () -> uris, SEGMENT_FILE); } - @Test + @Test(expectedExceptions = AttemptsExceededException.class) public void testFetchSegmentToLocalAllDownloadAttemptsFailed() - throws URISyntaxException, IOException, HttpErrorStatusException { + throws Exception { FileUploadDownloadClient client = mock(FileUploadDownloadClient.class); - // All three attempts fails. - when(client.downloadFile(any(), any(), any())).thenReturn(300).thenReturn(300).thenReturn(300); - HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, _fetcherConfig); - HelixManager helixManager = mock(HelixManager.class); - List<URI> uris = new ArrayList<>(); - uris.add(new URI("http://h1:8080")); - uris.add(new URI("http://h2:8080")); - - _peerServerSegmentFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any())) - .thenReturn(uris); - try { - httpSegmentFetcher.fetchSegmentToLocal("seg", - () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", helixManager), new File("/file")); - // The test should not reach here because the fetch will throw exception. - Assert.assertTrue(false, "Download segment failed"); - } catch (Exception e) { - // If we reach here, the download fails. - Assert.assertTrue(true, "Download segment failed"); - } + // All attempts failed + when(client.downloadFile(any(), any(), any())).thenReturn(300); + HttpSegmentFetcher segmentFetcher = getSegmentFetcher(client); + List<URI> uris = List.of(new URI("http://h1:8080"), new URI("http://h2:8080")); + segmentFetcher.fetchSegmentToLocal(SEGMENT_NAME, () -> uris, SEGMENT_FILE); } @Test public void testFetchSegmentToLocalSuccessAfterRetry() - throws URISyntaxException, IOException, HttpErrorStatusException { + throws Exception { FileUploadDownloadClient client = mock(FileUploadDownloadClient.class); - // the first two attempts failed until the last attempt succeeds + // The first two attempts failed and the last attempt succeeded when(client.downloadFile(any(), any(), any())).thenReturn(300).thenReturn(300).thenReturn(200); - HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, _fetcherConfig); - HelixManager helixManager = mock(HelixManager.class); - List<URI> uris = new ArrayList<>(); - uris.add(new URI("http://h1:8080")); - uris.add(new URI("http://h2:8080")); - - _peerServerSegmentFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any())) - .thenReturn(uris); - try { - httpSegmentFetcher.fetchSegmentToLocal("seg", - () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", helixManager), new File("/file")); - } catch (Exception e) { - // If we reach here, the download fails. - Assert.assertTrue(false, "Download segment failed"); - } + HttpSegmentFetcher segmentFetcher = getSegmentFetcher(client); + List<URI> uris = List.of(new URI("http://h1:8080"), new URI("http://h2:8080")); + segmentFetcher.fetchSegmentToLocal(SEGMENT_NAME, () -> uris, SEGMENT_FILE); } @Test public void testFetchSegmentToLocalSuccessAfterFirstTwoAttemptsFoundNoPeerServers() - throws URISyntaxException, IOException, HttpErrorStatusException { + throws Exception { FileUploadDownloadClient client = mock(FileUploadDownloadClient.class); - // The download always succeeds. + // The download always succeeds when(client.downloadFile(any(), any(), any())).thenReturn(200); - HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, _fetcherConfig); - HelixManager helixManager = mock(HelixManager.class); - List<URI> uris = new ArrayList<>(); - uris.add(new URI("http://h1:8080")); - uris.add(new URI("http://h2:8080")); - - // The first two attempts find NO peers hosting the segment but the last one found two servers. - _peerServerSegmentFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any())) - .thenReturn(List.of()).thenReturn(List.of()).thenReturn(uris); - try { - httpSegmentFetcher.fetchSegmentToLocal("seg", - () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", helixManager), new File("/file")); - } catch (Exception e) { - // If we reach here, the download fails. - Assert.assertTrue(false, "Download segment failed"); - } + HttpSegmentFetcher segmentFetcher = getSegmentFetcher(client); + List<URI> uris = List.of(new URI("http://h1:8080"), new URI("http://h2:8080")); + // The first two attempts found NO peers hosting the segment, and the last one found two servers + //noinspection unchecked + Supplier<List<URI>> uriSupplier = mock(Supplier.class); + when(uriSupplier.get()).thenReturn(List.of()).thenReturn(List.of()).thenReturn(uris); + segmentFetcher.fetchSegmentToLocal(SEGMENT_NAME, uriSupplier, SEGMENT_FILE); } - @Test + @Test(expectedExceptions = AttemptsExceededException.class) public void testFetchSegmentToLocalFailureWithNoPeerServers() - throws IOException, HttpErrorStatusException { + throws Exception { FileUploadDownloadClient client = mock(FileUploadDownloadClient.class); - // the download always succeeds. + // The download always succeeds when(client.downloadFile(any(), any(), any())).thenReturn(200); - HttpSegmentFetcher httpSegmentFetcher = new HttpSegmentFetcher(client, _fetcherConfig); - HelixManager helixManager = mock(HelixManager.class); - - _peerServerSegmentFinder.when(() -> PeerServerSegmentFinder.getPeerServerURIs(any(), any(), any())) - .thenReturn(List.of()).thenReturn(List.of()).thenReturn(List.of()); - try { - httpSegmentFetcher.fetchSegmentToLocal("seg", - () -> PeerServerSegmentFinder.getPeerServerURIs("seg", "http", helixManager), new File("/file")); - // The test should not reach here because the fetch will throw exception. - Assert.assertTrue(false, "Download segment failed"); - } catch (Exception e) { - Assert.assertTrue(true, "Download segment failed"); - Assert.assertTrue(e instanceof AttemptsExceededException); - } + HttpSegmentFetcher segmentFetcher = getSegmentFetcher(client); + List<URI> uris = List.of(); + segmentFetcher.fetchSegmentToLocal(SEGMENT_NAME, () -> uris, SEGMENT_FILE); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 25e40084ab..838a03a268 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -1483,7 +1483,8 @@ public class PinotLLCRealtimeSegmentManager { LOGGER.info("Fixing LLC segment {} whose deep store copy is unavailable", segmentName); // Find servers which have online replica List<URI> peerSegmentURIs = - PeerServerSegmentFinder.getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager); + PeerServerSegmentFinder.getPeerServerURIs(_helixManager, realtimeTableName, segmentName, + CommonConstants.HTTP_PROTOCOL); if (peerSegmentURIs.isEmpty()) { throw new IllegalStateException( String.format("Failed to upload segment %s to deep store because no online replica is found", diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 60b83ba24a..f0496a8ee7 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -43,7 +43,6 @@ import org.apache.commons.io.FileUtils; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; import org.apache.helix.model.ExternalView; -import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.store.zk.ZkHelixPropertyStore; @@ -75,10 +74,10 @@ import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Helix; +import org.apache.pinot.spi.utils.CommonConstants.Helix.Instance; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.spi.utils.IngestionConfigUtils; -import org.apache.pinot.spi.utils.StringUtil; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; @@ -91,8 +90,6 @@ import org.testng.annotations.Test; import static org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.ENABLE_TMP_SEGMENT_ASYNC_DELETION; import static org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.TMP_SEGMENT_RETENTION_IN_SECONDS; import static org.apache.pinot.spi.utils.CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.*; @@ -101,6 +98,7 @@ import static org.testng.Assert.*; public class PinotLLCRealtimeSegmentManagerTest { private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "PinotLLCRealtimeSegmentManagerTest"); private static final String SCHEME = "file:"; + private static final String CLUSTER_NAME = "testCluster"; private static final String RAW_TABLE_NAME = "testTable"; private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME); @@ -927,13 +925,13 @@ public class PinotLLCRealtimeSegmentManagerTest { (ZkHelixPropertyStore<ZNRecord>) mock(ZkHelixPropertyStore.class); when(pinotHelixResourceManager.getHelixZkManager()).thenReturn(helixManager); when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin); - when(helixManager.getClusterName()).thenReturn("cluster_name"); + when(helixManager.getClusterName()).thenReturn(CLUSTER_NAME); when(pinotHelixResourceManager.getPropertyStore()).thenReturn(zkHelixPropertyStore); // init fake PinotLLCRealtimeSegmentManager ControllerConf controllerConfig = new ControllerConf(); - controllerConfig.setProperty( - ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, true); + controllerConfig.setProperty(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, + true); controllerConfig.setDataDir(TEMP_DIR.toString()); FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(pinotHelixResourceManager, controllerConfig); @@ -946,19 +944,12 @@ public class PinotLLCRealtimeSegmentManagerTest { segmentsValidationAndRetentionConfig.setRetentionTimeUnit(TimeUnit.DAYS.toString()); segmentsValidationAndRetentionConfig.setRetentionTimeValue("3"); segmentManager._tableConfig.setValidationConfig(segmentsValidationAndRetentionConfig); - List<SegmentZKMetadata> segmentsZKMetadata = - new ArrayList<>(segmentManager._segmentZKMetadataMap.values()); + List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>(segmentManager._segmentZKMetadataMap.values()); Assert.assertEquals(segmentsZKMetadata.size(), 5); // Set up external view for this table ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME); - when(helixAdmin.getResourceExternalView("cluster_name", REALTIME_TABLE_NAME)) - .thenReturn(externalView); - when(helixAdmin.getConfigKeys(any(HelixConfigScope.class))).thenReturn(new ArrayList<>()); - String adminPort = "2077"; - Map<String, String> instanceConfigMap = new HashMap<>(); - instanceConfigMap.put(CommonConstants.Helix.Instance.ADMIN_PORT_KEY, adminPort); - when(helixAdmin.getConfig(any(HelixConfigScope.class), any(List.class))).thenReturn(instanceConfigMap); + when(helixAdmin.getResourceExternalView(CLUSTER_NAME, REALTIME_TABLE_NAME)).thenReturn(externalView); // Change 1st segment status to be DONE, but with default peer download url. // Verify later the download url is fixed after upload success. @@ -966,28 +957,26 @@ public class PinotLLCRealtimeSegmentManagerTest { segmentsZKMetadata.get(0).setDownloadUrl(METADATA_URI_FOR_PEER_DOWNLOAD); // set up the external view for 1st segment String instance0 = "instance0"; + int adminPort = 2077; externalView.setState(segmentsZKMetadata.get(0).getSegmentName(), instance0, "ONLINE"); InstanceConfig instanceConfig0 = new InstanceConfig(instance0); instanceConfig0.setHostName(instance0); - when(helixAdmin.getInstanceConfig(any(String.class), eq(instance0))).thenReturn(instanceConfig0); + instanceConfig0.getRecord().setIntField(Instance.ADMIN_PORT_KEY, adminPort); + when(helixAdmin.getInstanceConfig(CLUSTER_NAME, instance0)).thenReturn(instanceConfig0); // mock the request/response for 1st segment upload - String serverUploadRequestUrl0 = StringUtil - .join("/", - CommonConstants.HTTP_PROTOCOL + "://" + instance0 + ":" + adminPort, - "segments", - REALTIME_TABLE_NAME, - segmentsZKMetadata.get(0).getSegmentName(), - "upload") + "?uploadTimeoutMs=-1"; + String serverUploadRequestUrl0 = + String.format("http://%s:%d/segments/%s/%s/upload?uploadTimeoutMs=-1", instance0, adminPort, + REALTIME_TABLE_NAME, segmentsZKMetadata.get(0).getSegmentName()); // tempSegmentFileLocation is the location where the segment uploader will upload the segment. This usually ends // with a random UUID File tempSegmentFileLocation = new File(TEMP_DIR, segmentsZKMetadata.get(0).getSegmentName() + UUID.randomUUID()); FileUtils.write(tempSegmentFileLocation, "test"); // After the deep-store retry task gets the segment location returned by Pinot server, it will move the segment to // its final location. This is the expected segment location. - String expectedSegmentLocation = segmentManager.createSegmentPath(RAW_TABLE_NAME, - segmentsZKMetadata.get(0).getSegmentName()).toString(); - when(segmentManager._mockedFileUploadDownloadClient - .uploadToSegmentStore(serverUploadRequestUrl0)).thenReturn(tempSegmentFileLocation.getPath()); + String expectedSegmentLocation = + segmentManager.createSegmentPath(RAW_TABLE_NAME, segmentsZKMetadata.get(0).getSegmentName()).toString(); + when(segmentManager._mockedFileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl0)).thenReturn( + tempSegmentFileLocation.getPath()); // Change 2nd segment status to be DONE, but with default peer download url. // Verify later the download url isn't fixed after upload failure. @@ -998,25 +987,20 @@ public class PinotLLCRealtimeSegmentManagerTest { externalView.setState(segmentsZKMetadata.get(1).getSegmentName(), instance1, "ONLINE"); InstanceConfig instanceConfig1 = new InstanceConfig(instance1); instanceConfig1.setHostName(instance1); - when(helixAdmin.getInstanceConfig(any(String.class), eq(instance1))).thenReturn(instanceConfig1); + instanceConfig1.getRecord().setIntField(Instance.ADMIN_PORT_KEY, adminPort); + when(helixAdmin.getInstanceConfig(CLUSTER_NAME, instance1)).thenReturn(instanceConfig1); // mock the request/response for 2nd segment upload - String serverUploadRequestUrl1 = StringUtil - .join("/", - CommonConstants.HTTP_PROTOCOL + "://" + instance1 + ":" + adminPort, - "segments", - REALTIME_TABLE_NAME, - segmentsZKMetadata.get(1).getSegmentName(), - "upload") + "?uploadTimeoutMs=-1"; - when(segmentManager._mockedFileUploadDownloadClient - .uploadToSegmentStore(serverUploadRequestUrl1)) - .thenThrow(new HttpErrorStatusException( - "failed to upload segment", Response.Status.INTERNAL_SERVER_ERROR.getStatusCode())); + String serverUploadRequestUrl1 = + String.format("http://%s:%d/segments/%s/%s/upload?uploadTimeoutMs=-1", instance1, adminPort, + REALTIME_TABLE_NAME, segmentsZKMetadata.get(1).getSegmentName()); + when(segmentManager._mockedFileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl1)).thenThrow( + new HttpErrorStatusException("failed to upload segment", + Response.Status.INTERNAL_SERVER_ERROR.getStatusCode())); // Change 3rd segment status to be DONE, but with default peer download url. // Verify later the download url isn't fixed because no ONLINE replica found in any server. segmentsZKMetadata.get(2).setStatus(Status.DONE); - segmentsZKMetadata.get(2).setDownloadUrl( - METADATA_URI_FOR_PEER_DOWNLOAD); + segmentsZKMetadata.get(2).setDownloadUrl(METADATA_URI_FOR_PEER_DOWNLOAD); // set up the external view for 3rd segment String instance2 = "instance2"; externalView.setState(segmentsZKMetadata.get(2).getSegmentName(), instance2, "OFFLINE"); @@ -1029,11 +1013,9 @@ public class PinotLLCRealtimeSegmentManagerTest { // Keep 5th segment status as IN_PROGRESS. - List<String> segmentNames = segmentsZKMetadata.stream() - .map(SegmentZKMetadata::getSegmentName).collect(Collectors.toList()); - when(pinotHelixResourceManager.getTableConfig(REALTIME_TABLE_NAME)) - .thenReturn(segmentManager._tableConfig); - + List<String> segmentNames = + segmentsZKMetadata.stream().map(SegmentZKMetadata::getSegmentName).collect(Collectors.toList()); + when(pinotHelixResourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(segmentManager._tableConfig); // Verify the result segmentManager.uploadToDeepStoreIfMissing(segmentManager._tableConfig, segmentsZKMetadata); @@ -1042,23 +1024,18 @@ public class PinotLLCRealtimeSegmentManagerTest { TestUtils.waitForCondition(aVoid -> segmentManager.deepStoreUploadExecutorPendingSegmentsIsEmpty(), 30_000L, "Timed out waiting for upload retry tasks to finish"); - assertEquals( - segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(0), null).getDownloadUrl(), + assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(0), null).getDownloadUrl(), expectedSegmentLocation); assertFalse(tempSegmentFileLocation.exists(), "Deep-store retry task should move the file from temp location to permanent location"); - assertEquals( - segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(1), null).getDownloadUrl(), + assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(1), null).getDownloadUrl(), METADATA_URI_FOR_PEER_DOWNLOAD); - assertEquals( - segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(2), null).getDownloadUrl(), + assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(2), null).getDownloadUrl(), METADATA_URI_FOR_PEER_DOWNLOAD); - assertEquals( - segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(3), null).getDownloadUrl(), + assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(3), null).getDownloadUrl(), defaultDownloadUrl); - assertNull( - segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(4), null).getDownloadUrl()); + assertNull(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(4), null).getDownloadUrl()); } @Test diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index c46a85690d..1237db547a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -153,6 +153,13 @@ public abstract class BaseTableDataManager implements TableDataManager { if (_peerDownloadScheme == null) { _peerDownloadScheme = instanceDataManagerConfig.getSegmentPeerDownloadScheme(); } + if (_peerDownloadScheme != null) { + _peerDownloadScheme = _peerDownloadScheme.toLowerCase(); + Preconditions.checkState( + CommonConstants.HTTP_PROTOCOL.equals(_peerDownloadScheme) || CommonConstants.HTTPS_PROTOCOL.equals( + _peerDownloadScheme), "Unsupported peer download scheme: %s for table: %s", _peerDownloadScheme, + _tableNameWithType); + } _streamSegmentDownloadUntarRateLimitBytesPerSec = instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit(); @@ -691,7 +698,7 @@ public abstract class BaseTableDataManager implements TableDataManager { throws Exception { Preconditions.checkState(_peerDownloadScheme != null, "Download peers require non null peer download scheme"); List<URI> peerSegmentURIs = - PeerServerSegmentFinder.getPeerServerURIs(segmentName, _peerDownloadScheme, _helixManager, _tableNameWithType); + PeerServerSegmentFinder.getPeerServerURIs(_helixManager, _tableNameWithType, segmentName, _peerDownloadScheme); if (peerSegmentURIs.isEmpty()) { String msg = String.format("segment %s doesn't have any peers", segmentName); LOGGER.warn(msg); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 8e50049028..b120867d6b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -635,17 +635,15 @@ public class RealtimeTableDataManager extends BaseTableDataManager { } catch (Exception e) { _logger.warn("Download segment {} from deepstore uri {} failed.", segmentName, uri, e); // Download from deep store failed; try to download from peer if peer download is setup for the table. - if (isPeerSegmentDownloadEnabled(tableConfig)) { - downloadSegmentFromPeer(segmentName, tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), - indexLoadingConfig); + if (_peerDownloadScheme != null) { + downloadSegmentFromPeer(segmentName, indexLoadingConfig); } else { throw e; } } } else { - if (isPeerSegmentDownloadEnabled(tableConfig)) { - downloadSegmentFromPeer(segmentName, tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), - indexLoadingConfig); + if (_peerDownloadScheme != null) { + downloadSegmentFromPeer(segmentName, indexLoadingConfig); } else { throw new RuntimeException("Peer segment download not enabled for segment " + segmentName); } @@ -687,23 +685,16 @@ public class RealtimeTableDataManager extends BaseTableDataManager { replaceLLSegment(segmentName, indexLoadingConfig); } - private boolean isPeerSegmentDownloadEnabled(TableConfig tableConfig) { - return - CommonConstants.HTTP_PROTOCOL.equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()) - || CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase( - tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()); - } - - private void downloadSegmentFromPeer(String segmentName, String downloadScheme, - IndexLoadingConfig indexLoadingConfig) { + private void downloadSegmentFromPeer(String segmentName, IndexLoadingConfig indexLoadingConfig) { File tempRootDir = null; try { tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "." + System.currentTimeMillis()); File segmentTarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); // Next download the segment from a randomly chosen server using configured download scheme (http or https). - SegmentFetcherFactory.getSegmentFetcher(downloadScheme).fetchSegmentToLocal(segmentName, () -> { + SegmentFetcherFactory.getSegmentFetcher(_peerDownloadScheme).fetchSegmentToLocal(segmentName, () -> { List<URI> peerServerURIs = - PeerServerSegmentFinder.getPeerServerURIs(segmentName, downloadScheme, _helixManager); + PeerServerSegmentFinder.getPeerServerURIs(_helixManager, _tableNameWithType, segmentName, + _peerDownloadScheme); Collections.shuffle(peerServerURIs); return peerServerURIs; }, segmentTarFile); @@ -711,7 +702,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager { segmentTarFile.length()); untarAndMoveSegment(segmentName, indexLoadingConfig, segmentTarFile, tempRootDir); } catch (Exception e) { - _logger.warn("Download and move segment {} from peer with scheme {} failed.", segmentName, downloadScheme, e); + _logger.warn("Download and move segment {} from peer with scheme {} failed.", segmentName, _peerDownloadScheme, + e); throw new RuntimeException(e); } finally { FileUtils.deleteQuietly(tempRootDir); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java index d4c5f4fc29..261fe0f238 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java @@ -660,8 +660,8 @@ public class BaseTableDataManagerTest { File destFile = new File(tempRootDir, "seg01" + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); try (MockedStatic<PeerServerSegmentFinder> mockPeerSegFinder = mockStatic(PeerServerSegmentFinder.class)) { mockPeerSegFinder.when( - () -> PeerServerSegmentFinder.getPeerServerURIs("seg01", "http", helixManager, TABLE_NAME_WITH_TYPE)) - .thenReturn(Collections.singletonList(uri)); + () -> PeerServerSegmentFinder.getPeerServerURIs(helixManager, TABLE_NAME_WITH_TYPE, "seg01", + CommonConstants.HTTP_PROTOCOL)).thenReturn(List.of(uri)); tmgr.downloadFromPeersWithoutStreaming("seg01", mock(SegmentZKMetadata.class), destFile); } assertEquals(FileUtils.readFileToString(destFile), "this is from somewhere remote"); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/util/PeerServerSegmentFinderTest.java b/pinot-core/src/test/java/org/apache/pinot/core/util/PeerServerSegmentFinderTest.java index 4b6c6fb910..2af972695f 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/util/PeerServerSegmentFinderTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/util/PeerServerSegmentFinderTest.java @@ -19,103 +19,93 @@ package org.apache.pinot.core.util; import java.net.URI; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; import org.apache.helix.model.ExternalView; -import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.spi.utils.CommonConstants; -import org.apache.pinot.spi.utils.StringUtil; -import org.testng.Assert; +import org.apache.pinot.spi.utils.CommonConstants.Helix.Instance; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; public class PeerServerSegmentFinderTest { - private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME"; - private static final String SEGMENT_1 = "testTable__0__0__t11"; - private static final String SEGMENT_2 = "testTable__0__1__t11"; - private static final String CLUSTER_NAME = "dummyCluster"; - private static final String INSTANCE_ID1 = "Server_localhost_1000"; - private static final String INSTANCE_ID2 = "Server_localhost_1001"; - private static final String INSTANCE_ID3 = "Server_localhost_1003"; - public static final String ADMIN_PORT = "1008"; - public static final String HOST_1_NAME = "s1"; - public static final String HOST_2_NAME = "s2"; - public static final String HOST_3_NAME = "s3"; + private static final String CLUSTER_NAME = "testCluster"; + private static final String REALTIME_TABLE_NAME = "testTable_REALTIME"; + private static final String SEGMENT_1 = "testSegment1"; + private static final String SEGMENT_2 = "testSegment2"; + private static final String INSTANCE_ID_1 = "Server_s1_1007"; + private static final String INSTANCE_ID_2 = "Server_s2_1007"; + private static final String INSTANCE_ID_3 = "Server_s3_1007"; + private static final String HOSTNAME_1 = "s1"; + private static final String HOSTNAME_2 = "s2"; + private static final String HOSTNAME_3 = "s3"; + private static final int HELIX_PORT = 1007; + private static final int HTTP_ADMIN_PORT = 1008; + private static final int HTTPS_ADMIN_PORT = 1009; + private HelixManager _helixManager; @BeforeClass - public void initSegmentFetcherFactoryWithPeerServerSegmentFetcher() - throws Exception { - HelixAdmin helixAdmin; - { - ExternalView ev = new ExternalView(TABLE_NAME_WITH_TYPE); - ev.setState(SEGMENT_1, INSTANCE_ID1, "ONLINE"); - ev.setState(SEGMENT_1, INSTANCE_ID2, "OFFLINE"); - ev.setState(SEGMENT_1, INSTANCE_ID3, "ONLINE"); - ev.setState(SEGMENT_2, INSTANCE_ID1, "OFFLINE"); - ev.setState(SEGMENT_2, INSTANCE_ID2, "OFFLINE"); - _helixManager = mock(HelixManager.class); - helixAdmin = mock(HelixAdmin.class); - when(_helixManager.getClusterManagmentTool()).thenReturn(helixAdmin); - when(_helixManager.getClusterName()).thenReturn(CLUSTER_NAME); - when(helixAdmin.getResourceExternalView(CLUSTER_NAME, TABLE_NAME_WITH_TYPE)).thenReturn(ev); - when(helixAdmin.getConfigKeys(any(HelixConfigScope.class))).thenReturn(new ArrayList<>()); - Map<String, String> instanceConfigMap = new HashMap<>(); - instanceConfigMap.put(CommonConstants.Helix.Instance.ADMIN_PORT_KEY, ADMIN_PORT); - when(helixAdmin.getConfig(any(HelixConfigScope.class), any(List.class))).thenReturn(instanceConfigMap); - InstanceConfig instanceConfig1 = new InstanceConfig(INSTANCE_ID1); - instanceConfig1.setHostName(HOST_1_NAME); - instanceConfig1.setPort("1000"); - when(helixAdmin.getInstanceConfig(any(String.class), eq(INSTANCE_ID1))).thenReturn(instanceConfig1); + public void initSegmentFetcherFactoryWithPeerServerSegmentFetcher() { + ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME); + externalView.setState(SEGMENT_1, INSTANCE_ID_1, "ONLINE"); + externalView.setState(SEGMENT_1, INSTANCE_ID_2, "OFFLINE"); + externalView.setState(SEGMENT_1, INSTANCE_ID_3, "ONLINE"); + externalView.setState(SEGMENT_2, INSTANCE_ID_1, "OFFLINE"); + externalView.setState(SEGMENT_2, INSTANCE_ID_2, "OFFLINE"); - InstanceConfig instanceConfig2 = new InstanceConfig(INSTANCE_ID2); - instanceConfig2.setHostName(HOST_2_NAME); - instanceConfig2.setPort("1000"); - when(helixAdmin.getInstanceConfig(any(String.class), eq(INSTANCE_ID2))).thenReturn(instanceConfig2); + _helixManager = mock(HelixManager.class); + HelixAdmin helixAdmin = mock(HelixAdmin.class); + when(_helixManager.getClusterManagmentTool()).thenReturn(helixAdmin); + when(_helixManager.getClusterName()).thenReturn(CLUSTER_NAME); + when(helixAdmin.getResourceExternalView(CLUSTER_NAME, REALTIME_TABLE_NAME)).thenReturn(externalView); + when(helixAdmin.getInstanceConfig(CLUSTER_NAME, INSTANCE_ID_1)).thenReturn( + getInstanceConfig(INSTANCE_ID_1, HOSTNAME_1)); + when(helixAdmin.getInstanceConfig(CLUSTER_NAME, INSTANCE_ID_2)).thenReturn( + getInstanceConfig(INSTANCE_ID_2, HOSTNAME_2)); + when(helixAdmin.getInstanceConfig(CLUSTER_NAME, INSTANCE_ID_3)).thenReturn( + getInstanceConfig(INSTANCE_ID_3, HOSTNAME_3)); + } - InstanceConfig instanceConfig3 = new InstanceConfig(INSTANCE_ID3); - instanceConfig3.setHostName(HOST_3_NAME); - instanceConfig3.setPort("1000"); - when(helixAdmin.getInstanceConfig(any(String.class), eq(INSTANCE_ID3))).thenReturn(instanceConfig3); - } + private static InstanceConfig getInstanceConfig(String instanceId, String hostName) { + InstanceConfig instanceConfig = new InstanceConfig(instanceId); + instanceConfig.setHostName(hostName); + instanceConfig.setPort(Integer.toString(HELIX_PORT)); + instanceConfig.getRecord().setIntField(Instance.ADMIN_PORT_KEY, HTTP_ADMIN_PORT); + instanceConfig.getRecord().setIntField(Instance.ADMIN_HTTPS_PORT_KEY, HTTPS_ADMIN_PORT); + return instanceConfig; } @Test public void testSegmentFoundSuccessfully() throws Exception { // SEGMENT_1 has only 2 online replicas. - List<URI> httpServerURIs = - PeerServerSegmentFinder.getPeerServerURIs(SEGMENT_1, CommonConstants.HTTP_PROTOCOL, _helixManager); - assertEquals(2, httpServerURIs.size()); - httpServerURIs.contains(new URI( - StringUtil.join("/", "http://" + HOST_1_NAME + ":" + ADMIN_PORT, "segments", TABLE_NAME_WITH_TYPE, SEGMENT_1))); - httpServerURIs.contains(new URI( - StringUtil.join("/", "http://" + HOST_3_NAME + ":" + ADMIN_PORT, "segments", TABLE_NAME_WITH_TYPE, SEGMENT_1))); - List<URI> httpsServerURIs = - PeerServerSegmentFinder.getPeerServerURIs(SEGMENT_1, CommonConstants.HTTPS_PROTOCOL, _helixManager); - assertEquals(2, httpsServerURIs.size()); - httpServerURIs.contains(new URI(StringUtil - .join("/", "https://" + HOST_1_NAME + ":" + ADMIN_PORT, "segments", TABLE_NAME_WITH_TYPE, SEGMENT_1))); - httpServerURIs.contains(new URI(StringUtil - .join("/", "https://" + HOST_3_NAME + ":" + ADMIN_PORT, "segments", TABLE_NAME_WITH_TYPE, SEGMENT_1))); + List<URI> httpServerURIs = PeerServerSegmentFinder.getPeerServerURIs(_helixManager, REALTIME_TABLE_NAME, SEGMENT_1, + CommonConstants.HTTP_PROTOCOL); + assertEquals(httpServerURIs.size(), 2); + assertTrue(httpServerURIs.contains(new URI( + String.format("http://%s:%d/segments/%s/%s", HOSTNAME_1, HTTP_ADMIN_PORT, REALTIME_TABLE_NAME, SEGMENT_1)))); + assertTrue(httpServerURIs.contains(new URI( + String.format("http://%s:%d/segments/%s/%s", HOSTNAME_3, HTTP_ADMIN_PORT, REALTIME_TABLE_NAME, SEGMENT_1)))); + List<URI> httpsServerURIs = PeerServerSegmentFinder.getPeerServerURIs(_helixManager, REALTIME_TABLE_NAME, SEGMENT_1, + CommonConstants.HTTPS_PROTOCOL); + assertEquals(httpsServerURIs.size(), 2); + assertTrue(httpsServerURIs.contains(new URI( + String.format("https://%s:%d/segments/%s/%s", HOSTNAME_1, HTTPS_ADMIN_PORT, REALTIME_TABLE_NAME, SEGMENT_1)))); + assertTrue(httpsServerURIs.contains(new URI( + String.format("https://%s:%d/segments/%s/%s", HOSTNAME_3, HTTPS_ADMIN_PORT, REALTIME_TABLE_NAME, SEGMENT_1)))); } @Test - public void testSegmentNotFound() - throws Exception { - Assert.assertEquals(0, - PeerServerSegmentFinder.getPeerServerURIs(SEGMENT_2, CommonConstants.HTTP_PROTOCOL, _helixManager).size()); + public void testSegmentNotFound() { + assertTrue(PeerServerSegmentFinder.getPeerServerURIs(_helixManager, REALTIME_TABLE_NAME, SEGMENT_2, + CommonConstants.HTTP_PROTOCOL).isEmpty()); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/ExponentialBackoffRetryPolicy.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/ExponentialBackoffRetryPolicy.java index 6151aab06f..e5b9b7dc1a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/ExponentialBackoffRetryPolicy.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/retry/ExponentialBackoffRetryPolicy.java @@ -39,8 +39,8 @@ public class ExponentialBackoffRetryPolicy extends BaseRetryPolicy { @Override protected long getDelayMs(int currentAttempt) { - double minDelayMs = _initialDelayMs * Math.pow(_delayScaleFactor, currentAttempt); - double maxDelayMs = minDelayMs * _delayScaleFactor; - return _random.nextLong((long) minDelayMs, (long) maxDelayMs); + long minDelayMs = (long) (_initialDelayMs * Math.pow(_delayScaleFactor, currentAttempt)); + long maxDelayMs = (long) (minDelayMs * _delayScaleFactor); + return minDelayMs < maxDelayMs ? _random.nextLong(minDelayMs, maxDelayMs) : minDelayMs; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org