This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f65b401dc7 Set max number of parallel segment downloads per table in 
pinot-server (#8694)
f65b401dc7 is described below

commit f65b401dc7f29c66687b14ba24b04bca45cfa91a
Author: Jialiang Li <j...@linkedin.com>
AuthorDate: Fri May 13 10:20:39 2022 -0700

    Set max number of parallel segment downloads per table in pinot-server 
(#8694)
    
    Co-authored-by: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz>
---
 .../core/data/manager/BaseTableDataManager.java    | 25 +++++++++++++++++++++-
 .../manager/offline/TableDataManagerProvider.java  |  5 ++++-
 .../BaseTableDataManagerAcquireSegmentTest.java    |  2 +-
 .../data/manager/BaseTableDataManagerTest.java     |  2 +-
 .../offline/DimensionTableDataManagerTest.java     |  2 +-
 .../local/data/manager/TableDataManager.java       |  2 +-
 .../helix/HelixInstanceDataManagerConfig.java      | 13 +++++++++++
 .../apache/pinot/server/api/BaseResourceTest.java  |  2 +-
 .../config/instance/InstanceDataManagerConfig.java |  2 ++
 9 files changed, 48 insertions(+), 7 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 79d1741fb6..9abd4164f9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
@@ -72,6 +73,8 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseTableDataManager.class);
 
   protected final ConcurrentHashMap<String, SegmentDataManager> 
_segmentDataManagerMap = new ConcurrentHashMap<>();
+  // Semaphore to restrict the maximum number of parallel segment downloads 
for a table.
+  private Semaphore _segmentDownloadSemaphore;
 
   protected TableDataManagerConfig _tableDataManagerConfig;
   protected String _instanceId;
@@ -92,7 +95,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   @Override
   public void init(TableDataManagerConfig tableDataManagerConfig, String 
instanceId,
       ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics 
serverMetrics, HelixManager helixManager,
-      @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> 
errorCache) {
+      @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> 
errorCache, int maxParallelSegmentDownloads) {
     LOGGER.info("Initializing table data manager for table: {}", 
tableDataManagerConfig.getTableName());
 
     _tableDataManagerConfig = tableDataManagerConfig;
@@ -119,6 +122,14 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
           _resourceTmpDir);
     }
     _errorCache = errorCache;
+    if (maxParallelSegmentDownloads > 0) {
+      LOGGER.info(
+          "Construct segment download semaphore for Table: {}. Maximum number 
of parallel segment downloads: {}",
+          _tableNameWithType, maxParallelSegmentDownloads);
+      _segmentDownloadSemaphore = new Semaphore(maxParallelSegmentDownloads, 
true);
+    } else {
+      _segmentDownloadSemaphore = null;
+    }
     _logger = LoggerFactory.getLogger(_tableNameWithType + "-" + 
getClass().getSimpleName());
 
     doInit();
@@ -403,6 +414,14 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     File tarFile = new File(tempRootDir, segmentName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
     String uri = zkMetadata.getDownloadUrl();
     try {
+      if (_segmentDownloadSemaphore != null) {
+        long startTime = System.currentTimeMillis();
+        LOGGER.info("Trying to acquire segment download semaphore for: {}. 
queue-length: {} ", segmentName,
+            _segmentDownloadSemaphore.getQueueLength());
+        _segmentDownloadSemaphore.acquire();
+        LOGGER.info("Acquired segment download semaphore for: {} 
(lock-time={}ms, queue-length={}).", segmentName,
+            System.currentTimeMillis() - startTime, 
_segmentDownloadSemaphore.getQueueLength());
+      }
       SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(uri, tarFile, 
zkMetadata.getCrypterName());
       LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: 
{}, file length: {}", segmentName,
           _tableNameWithType, uri, tarFile, tarFile.length());
@@ -412,6 +431,10 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
           _tableNameWithType, uri, tarFile);
       _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1L);
       throw e;
+    } finally {
+      if (_segmentDownloadSemaphore != null) {
+        _segmentDownloadSemaphore.release();
+      }
     }
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
index 42994edcfd..e673c9618d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
@@ -38,6 +38,7 @@ import org.apache.pinot.spi.utils.Pair;
  */
 public class TableDataManagerProvider {
   private static Semaphore _segmentBuildSemaphore;
+  private static int _maxParallelSegmentDownloads;
 
   private TableDataManagerProvider() {
   }
@@ -47,6 +48,7 @@ public class TableDataManagerProvider {
     if (maxParallelBuilds > 0) {
       _segmentBuildSemaphore = new Semaphore(maxParallelBuilds, true);
     }
+    _maxParallelSegmentDownloads = 
instanceDataManagerConfig.getMaxParallelSegmentDownloads();
   }
 
   public static TableDataManager getTableDataManager(TableDataManagerConfig 
tableDataManagerConfig, String instanceId,
@@ -67,7 +69,8 @@ public class TableDataManagerProvider {
       default:
         throw new IllegalStateException();
     }
-    tableDataManager.init(tableDataManagerConfig, instanceId, propertyStore, 
serverMetrics, helixManager, errorCache);
+    tableDataManager.init(tableDataManagerConfig, instanceId, propertyStore, 
serverMetrics, helixManager, errorCache,
+        _maxParallelSegmentDownloads);
     return tableDataManager;
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
index 86299bfb92..7adc732b61 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
@@ -119,7 +119,7 @@ public class BaseTableDataManagerAcquireSegmentTest {
       when(config.getDataDir()).thenReturn(_tmpDir.getAbsolutePath());
     }
     tableDataManager.init(config, "dummyInstance", 
mock(ZkHelixPropertyStore.class),
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
mock(HelixManager.class), null);
+        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
mock(HelixManager.class), null, 0);
     tableDataManager.start();
     Field segsMapField = 
BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
     segsMapField.setAccessible(true);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index fac8977c5f..97362fc48c 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -529,7 +529,7 @@ public class BaseTableDataManagerTest {
 
     OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
     tableDataManager.init(config, "dummyInstance", 
mock(ZkHelixPropertyStore.class),
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
mock(HelixManager.class), null);
+        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
mock(HelixManager.class), null, 0);
     tableDataManager.start();
     return tableDataManager;
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
index 7eb74e5fc7..922ce7b49d 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
@@ -124,7 +124,7 @@ public class DimensionTableDataManagerTest {
       when(config.getDataDir()).thenReturn(INDEX_DIR.getAbsolutePath());
     }
     tableDataManager.init(config, "dummyInstance", mockPropertyStore(),
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
helixManager, null);
+        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
helixManager, null, 0);
     tableDataManager.start();
 
     return tableDataManager;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 3ae95a6452..e6efe563c8 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -49,7 +49,7 @@ public interface TableDataManager {
    */
   void init(TableDataManagerConfig tableDataManagerConfig, String instanceId,
       ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics 
serverMetrics, HelixManager helixManager,
-      LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache);
+      LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache, int 
maxParallelSegmentDownloads);
 
   /**
    * Starts the table data manager. Should be called only once after table 
data manager gets initialized but before
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index 809a86642d..b9ea877404 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -74,6 +74,13 @@ public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig
   private static final String MAX_PARALLEL_SEGMENT_BUILDS = 
"realtime.max.parallel.segment.builds";
   private static final int DEFAULT_MAX_PARALLEL_SEGMENT_BUILDS = 4;
 
+  // Key of how many parallel segment downloads can be made per table.
+  // A value of <= 0 indicates unlimited.
+  // Unlimited parallel downloads can make Pinot controllers receive high 
burst of download requests,
+  // causing controllers unavailable for that period of time.
+  private static final String MAX_PARALLEL_SEGMENT_DOWNLOADS = 
"table.level.max.parallel.segment.downloads";
+  private static final int DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS = -1;
+
   // Key of whether to enable split commit
   private static final String ENABLE_SPLIT_COMMIT = "enable.split.commit";
   // Key of whether to enable split commit end with segment metadata files.
@@ -211,6 +218,12 @@ public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig
         .getProperty(MAX_PARALLEL_SEGMENT_BUILDS, 
DEFAULT_MAX_PARALLEL_SEGMENT_BUILDS);
   }
 
+  @Override
+  public int getMaxParallelSegmentDownloads() {
+    return 
_instanceDataManagerConfiguration.getProperty(MAX_PARALLEL_SEGMENT_DOWNLOADS,
+        DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS);
+  }
+
   @Override
   public String getAuthToken() {
     return _instanceDataManagerConfiguration.getProperty(AUTH_TOKEN);
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java 
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
index 2708373a08..9b38dfd63b 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
@@ -193,7 +193,7 @@ public abstract class BaseResourceTest {
     TableDataManager tableDataManager = new OfflineTableDataManager();
     tableDataManager
         .init(tableDataManagerConfig, "testInstance", 
mock(ZkHelixPropertyStore.class), mock(ServerMetrics.class),
-            mock(HelixManager.class), null);
+            mock(HelixManager.class), null, 0);
     tableDataManager.start();
     _tableDataManagerMap.put(tableNameWithType, tableDataManager);
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
index 7718930657..baa29861ca 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
@@ -53,6 +53,8 @@ public interface InstanceDataManagerConfig {
 
   int getMaxParallelSegmentBuilds();
 
+  int getMaxParallelSegmentDownloads();
+
   String getAuthToken();
 
   String getSegmentDirectoryLoader();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to