This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 67cb52c04b Refine SegmentFetcherFactory (#12936)
67cb52c04b is described below

commit 67cb52c04b5a2e81bfe26f4878e6782422f8f1c1
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Tue Apr 16 14:09:24 2024 -0700

    Refine SegmentFetcherFactory (#12936)
---
 .../utils/fetcher/SegmentFetcherFactory.java       | 115 +++++++--------------
 .../core/data/manager/BaseTableDataManager.java    |  29 +++---
 .../data/manager/BaseTableDataManagerTest.java     |  21 ----
 3 files changed, 51 insertions(+), 114 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
index 3c3f66248c..543db8c403 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
@@ -18,61 +18,45 @@
  */
 package org.apache.pinot.common.utils.fetcher;
 
-import com.google.common.base.Preconditions;
 import java.io.File;
 import java.net.URI;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.auth.AuthConfig;
 import org.apache.pinot.common.auth.AuthProviderUtils;
 import org.apache.pinot.spi.crypt.PinotCrypter;
 import org.apache.pinot.spi.crypt.PinotCrypterFactory;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
-import org.checkerframework.checker.nullness.qual.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 public class SegmentFetcherFactory {
-  private final static SegmentFetcherFactory INSTANCE = new 
SegmentFetcherFactory();
-
-  static final String SEGMENT_FETCHER_CLASS_KEY_SUFFIX = ".class";
-  private static final String PROTOCOLS_KEY = "protocols";
-  private static final String ENCODED_SUFFIX = ".enc";
-  private static final String AUTH_KEY = CommonConstants.KEY_OF_AUTH;
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentFetcherFactory.class);
-  private static final Random RANDOM = new Random();
-
-  private final Map<String, SegmentFetcher> _segmentFetcherMap = new 
HashMap<>();
-  private final SegmentFetcher _httpSegmentFetcher = new HttpSegmentFetcher();
-  private final SegmentFetcher _pinotFSSegmentFetcher = new 
PinotFSSegmentFetcher();
-
   private SegmentFetcherFactory() {
-    // left blank
   }
 
-  public static SegmentFetcherFactory getInstance() {
-    return INSTANCE;
-  }
+  public static final String SEGMENT_FETCHER_CLASS_KEY_SUFFIX = ".class";
+  public static final String PROTOCOLS_KEY = "protocols";
+  public static final String ENCODED_SUFFIX = ".enc";
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentFetcherFactory.class);
+  private static final Map<String, SegmentFetcher> SEGMENT_FETCHER_MAP = new 
HashMap<>();
+  private static final SegmentFetcher HTTP_SEGMENT_FETCHER = new 
HttpSegmentFetcher();
+  private static final SegmentFetcher PINOT_FS_SEGMENT_FETCHER = new 
PinotFSSegmentFetcher();
 
   /**
    * Initializes the segment fetcher factory. This method should only be 
called once.
    */
   public static void init(PinotConfiguration config)
       throws Exception {
-    getInstance().initInternal(config);
-  }
-
-  private void initInternal(PinotConfiguration config)
-      throws Exception {
-    _httpSegmentFetcher.init(config); // directly, without sub-namespace
-    _pinotFSSegmentFetcher.init(config); // directly, without sub-namespace
+    HTTP_SEGMENT_FETCHER.init(config); // directly, without sub-namespace
+    PINOT_FS_SEGMENT_FETCHER.init(config); // directly, without sub-namespace
 
     List<String> protocols = config.getProperty(PROTOCOLS_KEY, 
Collections.emptyList());
     for (String protocol : protocols) {
@@ -93,22 +77,22 @@ public class SegmentFetcherFactory {
         }
       } else {
         LOGGER.info("Creating segment fetcher for protocol: {} with class: 
{}", protocol, segmentFetcherClassName);
-        segmentFetcher = (SegmentFetcher) 
Class.forName(segmentFetcherClassName).newInstance();
+        segmentFetcher = (SegmentFetcher) 
Class.forName(segmentFetcherClassName).getConstructor().newInstance();
       }
 
-      AuthConfig authConfig = AuthProviderUtils.extractAuthConfig(config, 
AUTH_KEY);
-
       PinotConfiguration subConfig = config.subset(protocol);
-      AuthConfig subAuthConfig = 
AuthProviderUtils.extractAuthConfig(subConfig, AUTH_KEY);
+      Map<String, Object> subConfigMap = subConfig.toMap();
 
-      Map<String, Object> subConfigMap = config.subset(protocol).toMap();
+      // Put global auth properties into sub-config if sub-config does not 
have auth properties
+      AuthConfig authConfig = AuthProviderUtils.extractAuthConfig(config, 
CommonConstants.KEY_OF_AUTH);
+      AuthConfig subAuthConfig = 
AuthProviderUtils.extractAuthConfig(subConfig, CommonConstants.KEY_OF_AUTH);
       if (subAuthConfig.getProperties().isEmpty() && 
!authConfig.getProperties().isEmpty()) {
-        authConfig.getProperties().forEach((key, value) -> 
subConfigMap.put(AUTH_KEY + "." + key, value));
+        authConfig.getProperties()
+            .forEach((key, value) -> 
subConfigMap.put(CommonConstants.KEY_OF_AUTH + "." + key, value));
       }
 
       segmentFetcher.init(new PinotConfiguration(subConfigMap));
-
-      _segmentFetcherMap.put(protocol, segmentFetcher);
+      SEGMENT_FETCHER_MAP.put(protocol, segmentFetcher);
     }
   }
 
@@ -117,11 +101,7 @@ public class SegmentFetcherFactory {
    * ({@link HttpSegmentFetcher} for "http" and "https", {@link 
PinotFSSegmentFetcher} for other protocols).
    */
   public static SegmentFetcher getSegmentFetcher(String protocol) {
-    return getInstance().getSegmentFetcherInternal(protocol);
-  }
-
-  private SegmentFetcher getSegmentFetcherInternal(String protocol) {
-    SegmentFetcher segmentFetcher = _segmentFetcherMap.get(protocol);
+    SegmentFetcher segmentFetcher = SEGMENT_FETCHER_MAP.get(protocol);
     if (segmentFetcher != null) {
       return segmentFetcher;
     } else {
@@ -129,9 +109,9 @@ public class SegmentFetcherFactory {
       switch (protocol) {
         case CommonConstants.HTTP_PROTOCOL:
         case CommonConstants.HTTPS_PROTOCOL:
-          return _httpSegmentFetcher;
+          return HTTP_SEGMENT_FETCHER;
         default:
-          return _pinotFSSegmentFetcher;
+          return PINOT_FS_SEGMENT_FETCHER;
       }
     }
   }
@@ -141,7 +121,7 @@ public class SegmentFetcherFactory {
    */
   public static void fetchSegmentToLocal(URI uri, File dest)
       throws Exception {
-    getInstance().fetchSegmentToLocalInternal(uri, dest);
+    getSegmentFetcher(uri.getScheme()).fetchSegmentToLocal(uri, dest);
   }
 
   /**
@@ -149,13 +129,7 @@ public class SegmentFetcherFactory {
    */
   public static void fetchSegmentToLocal(String uri, File dest)
       throws Exception {
-    getInstance().fetchSegmentToLocalInternal(new URI(uri), dest);
-  }
-
-  private void fetchSegmentToLocalInternal(URI uri, File dest)
-      throws Exception {
-    // caller untars
-    getSegmentFetcher(uri.getScheme()).fetchSegmentToLocal(uri, dest);
+    fetchSegmentToLocal(new URI(uri), dest);
   }
 
   /**
@@ -167,17 +141,17 @@ public class SegmentFetcherFactory {
    * @return the untared directory
    * @throws Exception
    */
-  public static File fetchAndStreamUntarToLocal(String uri, File tempRootDir,
-      long maxStreamRateInByte, AtomicInteger attempts)
+  public static File fetchAndStreamUntarToLocal(URI uri, File tempRootDir, 
long maxStreamRateInByte,
+      AtomicInteger attempts)
       throws Exception {
-    return getInstance().fetchAndStreamUntarToLocalInternal(new URI(uri), 
tempRootDir, maxStreamRateInByte, attempts);
+    return 
getSegmentFetcher(uri.getScheme()).fetchUntarSegmentToLocalStreamed(uri, 
tempRootDir, maxStreamRateInByte,
+        attempts);
   }
 
-  private File fetchAndStreamUntarToLocalInternal(URI uri, File tempRootDir,
-      long maxStreamRateInByte, AtomicInteger attempts)
+  public static File fetchAndStreamUntarToLocal(String uri, File tempRootDir, 
long maxStreamRateInByte,
+      AtomicInteger attempts)
       throws Exception {
-    return 
getSegmentFetcher(uri.getScheme()).fetchUntarSegmentToLocalStreamed(uri, 
tempRootDir, maxStreamRateInByte,
-        attempts);
+    return fetchAndStreamUntarToLocal(new URI(uri), tempRootDir, 
maxStreamRateInByte, attempts);
   }
 
   /**
@@ -185,18 +159,7 @@ public class SegmentFetcherFactory {
    * @param uri remote segment location
    * @param dest local file
    */
-  public static void fetchAndDecryptSegmentToLocal(String uri, File dest, 
String crypterName)
-      throws Exception {
-    getInstance().fetchAndDecryptSegmentToLocalInternal(uri, dest, 
crypterName);
-  }
-
-  // uris have equal weight to be selected for segment download
-  public static void fetchAndDecryptSegmentToLocal(List<URI> uris, File dest, 
String crypterName)
-          throws Exception {
-    getInstance().fetchAndDecryptSegmentToLocalInternal(uris, dest, 
crypterName);
-  }
-
-  private void fetchAndDecryptSegmentToLocalInternal(String uri, File dest, 
String crypterName)
+  public static void fetchAndDecryptSegmentToLocal(String uri, File dest, 
@Nullable String crypterName)
       throws Exception {
     if (crypterName == null) {
       fetchSegmentToLocal(uri, dest);
@@ -211,16 +174,16 @@ public class SegmentFetcherFactory {
     }
   }
 
-  private void fetchAndDecryptSegmentToLocalInternal(@NonNull List<URI> uris, 
File dest, String crypterName)
-          throws Exception {
-    Preconditions.checkArgument(!uris.isEmpty(), "empty uris passed into the 
fetchAndDecryptSegmentToLocalInternal");
-    URI uri = uris.get(RANDOM.nextInt(uris.size()));
+  public static void fetchAndDecryptSegmentToLocal(String segmentName, String 
scheme, Supplier<List<URI>> uriSupplier,
+      File dest, @Nullable String crypterName)
+      throws Exception {
+    SegmentFetcher segmentFetcher = getSegmentFetcher(scheme);
     if (crypterName == null) {
-      fetchSegmentToLocal(uri, dest);
+      segmentFetcher.fetchSegmentToLocal(segmentName, uriSupplier, dest);
     } else {
       // download
       File tempDownloadedFile = new File(dest.getPath() + ENCODED_SUFFIX);
-      fetchSegmentToLocal(uri, tempDownloadedFile);
+      segmentFetcher.fetchSegmentToLocal(segmentName, uriSupplier, 
tempDownloadedFile);
 
       // decrypt
       PinotCrypter crypter = PinotCrypterFactory.create(crypterName);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 1237db547a..73af1ed3a0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -693,27 +693,22 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     }
   }
 
-  // not thread safe. Caller should invoke it with safe concurrency control.
   protected void downloadFromPeersWithoutStreaming(String segmentName, 
SegmentZKMetadata zkMetadata, File destTarFile)
       throws Exception {
-    Preconditions.checkState(_peerDownloadScheme != null, "Download peers 
require non null peer download scheme");
-    List<URI> peerSegmentURIs =
-        PeerServerSegmentFinder.getPeerServerURIs(_helixManager, 
_tableNameWithType, segmentName, _peerDownloadScheme);
-    if (peerSegmentURIs.isEmpty()) {
-      String msg = String.format("segment %s doesn't have any peers", 
segmentName);
-      LOGGER.warn(msg);
-      // HelixStateTransitionHandler would catch the runtime exception and 
mark the segment state as Error
-      throw new RuntimeException(msg);
-    }
+    Preconditions.checkState(_peerDownloadScheme != null, "Peer download is 
not enabled for table: %s",
+        _tableNameWithType);
     try {
-      // Next download the segment from a randomly chosen server using 
configured scheme.
-      SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(peerSegmentURIs, 
destTarFile, zkMetadata.getCrypterName());
-      LOGGER.info("Fetched segment {} from peers: {} to: {} of size: {}", 
segmentName, peerSegmentURIs, destTarFile,
+      SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(segmentName, 
_peerDownloadScheme, () -> {
+        List<URI> peerServerURIs =
+            PeerServerSegmentFinder.getPeerServerURIs(_helixManager, 
_tableNameWithType, segmentName,
+                _peerDownloadScheme);
+        Collections.shuffle(peerServerURIs);
+        return peerServerURIs;
+      }, destTarFile, zkMetadata.getCrypterName());
+      _logger.info("Downloaded tarred segment: {} from peers to: {}, file 
length: {}", segmentName, destTarFile,
           destTarFile.length());
-    } catch (AttemptsExceededException e) {
-      LOGGER.error("Attempts exceeded when downloading segment: {} for table: 
{} from peers {} to: {}", segmentName,
-          _tableNameWithType, peerSegmentURIs, destTarFile);
-      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_DOWNLOAD_FROM_PEERS_FAILURES, 1L);
+    } catch (Exception e) {
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_DOWNLOAD_FROM_PEERS_FAILURES, 1);
       throw e;
     }
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 261fe0f238..ace744c999 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -38,7 +38,6 @@ import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
-import org.apache.pinot.core.util.PeerServerSegmentFinder;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -647,26 +646,6 @@ public class BaseTableDataManagerTest {
     verify(tmgr, times(1)).downloadFromPeersWithoutStreaming("seg01", zkmd, 
destFile);
   }
 
-  // happy case: download from peers
-  @Test
-  public void testDownloadFromPeersWithoutStreaming()
-      throws Exception {
-    URI uri = mockRemoteCopy();
-    InstanceDataManagerConfig config = 
createDefaultInstanceDataManagerConfig();
-    when(config.getSegmentPeerDownloadScheme()).thenReturn("http");
-    HelixManager helixManager = mock(HelixManager.class);
-    BaseTableDataManager tmgr = createTableManager(config, helixManager);
-    File tempRootDir = 
tmgr.getTmpSegmentDataDir("test-download-peer-without-streaming");
-    File destFile = new File(tempRootDir, "seg01" + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-    try (MockedStatic<PeerServerSegmentFinder> mockPeerSegFinder = 
mockStatic(PeerServerSegmentFinder.class)) {
-      mockPeerSegFinder.when(
-          () -> PeerServerSegmentFinder.getPeerServerURIs(helixManager, 
TABLE_NAME_WITH_TYPE, "seg01",
-              CommonConstants.HTTP_PROTOCOL)).thenReturn(List.of(uri));
-      tmgr.downloadFromPeersWithoutStreaming("seg01", 
mock(SegmentZKMetadata.class), destFile);
-    }
-    assertEquals(FileUtils.readFileToString(destFile), "this is from somewhere 
remote");
-  }
-
   @Test
   public void testUntarAndMoveSegment()
       throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to