This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new f65b401dc7 Set max number of parallel segment downloads per table in pinot-server (#8694) f65b401dc7 is described below commit f65b401dc7f29c66687b14ba24b04bca45cfa91a Author: Jialiang Li <j...@linkedin.com> AuthorDate: Fri May 13 10:20:39 2022 -0700 Set max number of parallel segment downloads per table in pinot-server (#8694) Co-authored-by: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> --- .../core/data/manager/BaseTableDataManager.java | 25 +++++++++++++++++++++- .../manager/offline/TableDataManagerProvider.java | 5 ++++- .../BaseTableDataManagerAcquireSegmentTest.java | 2 +- .../data/manager/BaseTableDataManagerTest.java | 2 +- .../offline/DimensionTableDataManagerTest.java | 2 +- .../local/data/manager/TableDataManager.java | 2 +- .../helix/HelixInstanceDataManagerConfig.java | 13 +++++++++++ .../apache/pinot/server/api/BaseResourceTest.java | 2 +- .../config/instance/InstanceDataManagerConfig.java | 2 ++ 9 files changed, 48 insertions(+), 7 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 79d1741fb6..9abd4164f9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -72,6 +73,8 @@ public abstract class BaseTableDataManager implements TableDataManager { private static final Logger LOGGER = LoggerFactory.getLogger(BaseTableDataManager.class); protected final ConcurrentHashMap<String, SegmentDataManager> _segmentDataManagerMap = new ConcurrentHashMap<>(); + // Semaphore to restrict the maximum number of parallel segment downloads for a table. + private Semaphore _segmentDownloadSemaphore; protected TableDataManagerConfig _tableDataManagerConfig; protected String _instanceId; @@ -92,7 +95,7 @@ public abstract class BaseTableDataManager implements TableDataManager { @Override public void init(TableDataManagerConfig tableDataManagerConfig, String instanceId, ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager, - @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache) { + @Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache, int maxParallelSegmentDownloads) { LOGGER.info("Initializing table data manager for table: {}", tableDataManagerConfig.getTableName()); _tableDataManagerConfig = tableDataManagerConfig; @@ -119,6 +122,14 @@ public abstract class BaseTableDataManager implements TableDataManager { _resourceTmpDir); } _errorCache = errorCache; + if (maxParallelSegmentDownloads > 0) { + LOGGER.info( + "Construct segment download semaphore for Table: {}. Maximum number of parallel segment downloads: {}", + _tableNameWithType, maxParallelSegmentDownloads); + _segmentDownloadSemaphore = new Semaphore(maxParallelSegmentDownloads, true); + } else { + _segmentDownloadSemaphore = null; + } _logger = LoggerFactory.getLogger(_tableNameWithType + "-" + getClass().getSimpleName()); doInit(); @@ -403,6 +414,14 @@ public abstract class BaseTableDataManager implements TableDataManager { File tarFile = new File(tempRootDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); String uri = zkMetadata.getDownloadUrl(); try { + if (_segmentDownloadSemaphore != null) { + long startTime = System.currentTimeMillis(); + LOGGER.info("Trying to acquire segment download semaphore for: {}. queue-length: {} ", segmentName, + _segmentDownloadSemaphore.getQueueLength()); + _segmentDownloadSemaphore.acquire(); + LOGGER.info("Acquired segment download semaphore for: {} (lock-time={}ms, queue-length={}).", segmentName, + System.currentTimeMillis() - startTime, _segmentDownloadSemaphore.getQueueLength()); + } SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(uri, tarFile, zkMetadata.getCrypterName()); LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: {}, file length: {}", segmentName, _tableNameWithType, uri, tarFile, tarFile.length()); @@ -412,6 +431,10 @@ public abstract class BaseTableDataManager implements TableDataManager { _tableNameWithType, uri, tarFile); _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FAILURES, 1L); throw e; + } finally { + if (_segmentDownloadSemaphore != null) { + _segmentDownloadSemaphore.release(); + } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java index 42994edcfd..e673c9618d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java @@ -38,6 +38,7 @@ import org.apache.pinot.spi.utils.Pair; */ public class TableDataManagerProvider { private static Semaphore _segmentBuildSemaphore; + private static int _maxParallelSegmentDownloads; private TableDataManagerProvider() { } @@ -47,6 +48,7 @@ public class TableDataManagerProvider { if (maxParallelBuilds > 0) { _segmentBuildSemaphore = new Semaphore(maxParallelBuilds, true); } + _maxParallelSegmentDownloads = instanceDataManagerConfig.getMaxParallelSegmentDownloads(); } public static TableDataManager getTableDataManager(TableDataManagerConfig tableDataManagerConfig, String instanceId, @@ -67,7 +69,8 @@ public class TableDataManagerProvider { default: throw new IllegalStateException(); } - tableDataManager.init(tableDataManagerConfig, instanceId, propertyStore, serverMetrics, helixManager, errorCache); + tableDataManager.init(tableDataManagerConfig, instanceId, propertyStore, serverMetrics, helixManager, errorCache, + _maxParallelSegmentDownloads); return tableDataManager; } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java index 86299bfb92..7adc732b61 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java @@ -119,7 +119,7 @@ public class BaseTableDataManagerAcquireSegmentTest { when(config.getDataDir()).thenReturn(_tmpDir.getAbsolutePath()); } tableDataManager.init(config, "dummyInstance", mock(ZkHelixPropertyStore.class), - new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null); + new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, 0); tableDataManager.start(); Field segsMapField = BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap"); segsMapField.setAccessible(true); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java index fac8977c5f..97362fc48c 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java @@ -529,7 +529,7 @@ public class BaseTableDataManagerTest { OfflineTableDataManager tableDataManager = new OfflineTableDataManager(); tableDataManager.init(config, "dummyInstance", mock(ZkHelixPropertyStore.class), - new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null); + new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null, 0); tableDataManager.start(); return tableDataManager; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java index 7eb74e5fc7..922ce7b49d 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java @@ -124,7 +124,7 @@ public class DimensionTableDataManagerTest { when(config.getDataDir()).thenReturn(INDEX_DIR.getAbsolutePath()); } tableDataManager.init(config, "dummyInstance", mockPropertyStore(), - new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null); + new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null, 0); tableDataManager.start(); return tableDataManager; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java index 3ae95a6452..e6efe563c8 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java @@ -49,7 +49,7 @@ public interface TableDataManager { */ void init(TableDataManagerConfig tableDataManagerConfig, String instanceId, ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager, - LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache); + LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache, int maxParallelSegmentDownloads); /** * Starts the table data manager. Should be called only once after table data manager gets initialized but before diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java index 809a86642d..b9ea877404 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java @@ -74,6 +74,13 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig private static final String MAX_PARALLEL_SEGMENT_BUILDS = "realtime.max.parallel.segment.builds"; private static final int DEFAULT_MAX_PARALLEL_SEGMENT_BUILDS = 4; + // Key of how many parallel segment downloads can be made per table. + // A value of <= 0 indicates unlimited. + // Unlimited parallel downloads can make Pinot controllers receive high burst of download requests, + // causing controllers unavailable for that period of time. + private static final String MAX_PARALLEL_SEGMENT_DOWNLOADS = "table.level.max.parallel.segment.downloads"; + private static final int DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS = -1; + // Key of whether to enable split commit private static final String ENABLE_SPLIT_COMMIT = "enable.split.commit"; // Key of whether to enable split commit end with segment metadata files. @@ -211,6 +218,12 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig .getProperty(MAX_PARALLEL_SEGMENT_BUILDS, DEFAULT_MAX_PARALLEL_SEGMENT_BUILDS); } + @Override + public int getMaxParallelSegmentDownloads() { + return _instanceDataManagerConfiguration.getProperty(MAX_PARALLEL_SEGMENT_DOWNLOADS, + DEFAULT_MAX_PARALLEL_SEGMENT_DOWNLOADS); + } + @Override public String getAuthToken() { return _instanceDataManagerConfiguration.getProperty(AUTH_TOKEN); diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java index 2708373a08..9b38dfd63b 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java @@ -193,7 +193,7 @@ public abstract class BaseResourceTest { TableDataManager tableDataManager = new OfflineTableDataManager(); tableDataManager .init(tableDataManagerConfig, "testInstance", mock(ZkHelixPropertyStore.class), mock(ServerMetrics.class), - mock(HelixManager.class), null); + mock(HelixManager.class), null, 0); tableDataManager.start(); _tableDataManagerMap.put(tableNameWithType, tableDataManager); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java index 7718930657..baa29861ca 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java @@ -53,6 +53,8 @@ public interface InstanceDataManagerConfig { int getMaxParallelSegmentBuilds(); + int getMaxParallelSegmentDownloads(); + String getAuthToken(); String getSegmentDirectoryLoader(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org