This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new e9271f6dd7 Storage Quota imposition on Realtime tables (#13584) e9271f6dd7 is described below commit e9271f6dd73cb1162e9743b14e61fd420dbf3e27 Author: Shounak kulkarni <shounakmk...@gmail.com> AuthorDate: Wed Sep 18 17:52:46 2024 +0530 Storage Quota imposition on Realtime tables (#13584) * Storage quota imposition on realtime tables * fix test mock * Handle IS update failure case * refactor * tests * move quota checker to PinotLLCRealtimeSegmentManager * test fix * Add tableStorageQuotaExceeded gauge on controller * refactor to use PauseState * revert TABLE_STORAGE_QUOTA_EXCEEDED metric * cosmetic * refactors * check consuming segments only when table is not paused * refactor --- .../pinot/controller/BaseControllerStarter.java | 4 +- .../PinotSegmentUploadDownloadRestletResource.java | 2 +- .../api/upload/SegmentValidationUtils.java | 9 +--- .../realtime/PinotLLCRealtimeSegmentManager.java | 3 +- .../RealtimeSegmentValidationManager.java | 47 ++++++++++++++-- .../controller/validation/StorageQuotaChecker.java | 34 ++++++++++-- .../PinotLLCRealtimeSegmentManagerTest.java | 2 +- .../helix/core/realtime/SegmentCompletionTest.java | 6 +++ .../validation/StorageQuotaCheckerTest.java | 63 +++++++++++++++++----- .../apache/pinot/spi/config/table/PauseState.java | 2 +- 10 files changed, 138 insertions(+), 34 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 5e4ff8751f..44c8e96f36 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -493,7 +493,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { new TableSizeReader(_executorService, _connectionManager, _controllerMetrics, _helixResourceManager, _leadControllerManager); _storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, _controllerMetrics, _leadControllerManager, - _helixResourceManager); + _helixResourceManager, _config); // Setting up periodic tasks List<PeriodicTask> controllerPeriodicTasks = setupControllerPeriodicTasks(); @@ -852,7 +852,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { periodicTasks.add(_offlineSegmentIntervalChecker); _realtimeSegmentValidationManager = new RealtimeSegmentValidationManager(_config, _helixResourceManager, _leadControllerManager, - _pinotLLCRealtimeSegmentManager, _validationMetrics, _controllerMetrics); + _pinotLLCRealtimeSegmentManager, _validationMetrics, _controllerMetrics, _storageQuotaChecker); periodicTasks.add(_realtimeSegmentValidationManager); _brokerResourceValidationManager = new BrokerResourceValidationManager(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java index 156a3e9095..5b7cbed00e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java @@ -355,7 +355,7 @@ public class PinotSegmentUploadDownloadRestletResource { untarredSegmentSizeInBytes = FileUtils.sizeOfDirectory(tempSegmentDir); } SegmentValidationUtils.checkStorageQuota(segmentName, untarredSegmentSizeInBytes, tableConfig, - _controllerConf, _storageQuotaChecker); + _storageQuotaChecker); // Encrypt segment String crypterNameInTableConfig = tableConfig.getValidationConfig().getCrypterClassName(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java index dff3dd3d11..ee6219876f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java @@ -19,7 +19,6 @@ package org.apache.pinot.controller.api.upload; import javax.ws.rs.core.Response; -import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.controller.validation.StorageQuotaChecker; import org.apache.pinot.segment.spi.SegmentMetadata; @@ -59,14 +58,10 @@ public class SegmentValidationUtils { } public static void checkStorageQuota(String segmentName, long segmentSizeInBytes, TableConfig tableConfig, - ControllerConf controllerConf, StorageQuotaChecker quotaChecker) { - if (!controllerConf.getEnableStorageQuotaCheck()) { - return; - } + StorageQuotaChecker quotaChecker) { StorageQuotaChecker.QuotaCheckerResponse response; try { - response = quotaChecker.isSegmentStorageWithinQuota(tableConfig, segmentName, segmentSizeInBytes, - controllerConf.getServerAdminRequestTimeoutSeconds() * 1000); + response = quotaChecker.isSegmentStorageWithinQuota(tableConfig, segmentName, segmentSizeInBytes); } catch (Exception e) { throw new ControllerApplicationException(LOGGER, String.format("Caught exception while checking the storage quota for segment: %s of table: %s", segmentName, diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 76bef151a0..d799000ed3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -426,8 +426,7 @@ public class PinotLLCRealtimeSegmentManager { } } - @VisibleForTesting - IdealState getIdealState(String realtimeTableName) { + public IdealState getIdealState(String realtimeTableName) { try { IdealState idealState = HelixHelper.getTableIdealState(_helixManager, realtimeTableName); Preconditions.checkState(idealState != null, "Failed to find IdealState for table: " + realtimeTableName); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index 42ed189bbf..856d88c226 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -29,9 +29,11 @@ import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.metrics.ValidationMetrics; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; +import org.apache.pinot.controller.api.resources.PauseStatusDetails; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; +import org.apache.pinot.spi.config.table.PauseState; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.StreamConfig; @@ -51,6 +53,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager; private final ValidationMetrics _validationMetrics; private final ControllerMetrics _controllerMetrics; + private final StorageQuotaChecker _storageQuotaChecker; private final int _segmentLevelValidationIntervalInSeconds; private long _lastSegmentLevelValidationRunTimeMs = 0L; @@ -60,13 +63,14 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, - ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) { + ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics, StorageQuotaChecker quotaChecker) { super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(), config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics); _llcRealtimeSegmentManager = llcRealtimeSegmentManager; _validationMetrics = validationMetrics; _controllerMetrics = controllerMetrics; + _storageQuotaChecker = quotaChecker; _segmentLevelValidationIntervalInSeconds = config.getSegmentLevelValidationIntervalInSeconds(); Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0); @@ -108,8 +112,45 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea if (context._runSegmentLevelValidation) { runSegmentLevelValidation(tableConfig, streamConfig); } - _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfig, - context._recreateDeletedConsumingSegment, context._offsetCriteria); + + if (shouldEnsureConsuming(tableNameWithType, context)) { + _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfig, + context._recreateDeletedConsumingSegment, context._offsetCriteria); + } + } + + private boolean shouldEnsureConsuming(String tableNameWithType, Context context) { + // Keeps the table paused/unpaused based pause validations. + // Skips updating the pause state if table is paused by admin + PauseState pauseState = computePauseState(tableNameWithType); + if (!pauseState.isPaused()) { + boolean unPausedUponStorageWithinQuota = + pauseState.getReasonCode().equals(PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED); + if (unPausedUponStorageWithinQuota) { + // recreate consuming segments if table is resumed upon the table storage getting within quota limit + context._recreateDeletedConsumingSegment = true; + } + } + return !pauseState.isPaused(); + } + + private PauseState computePauseState(String tableNameWithType) { + PauseStatusDetails pauseStatus = _llcRealtimeSegmentManager.getPauseStatusDetails(tableNameWithType); + boolean isTablePaused = pauseStatus.getPauseFlag(); + // if table is paused by admin then don't compute + if (!isTablePaused || pauseStatus.getReasonCode().equals(PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED)) { + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); + boolean isQuotaExceeded = _storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig); + // if quota breach and pause flag is not in sync, update the IS + if (isQuotaExceeded != isTablePaused) { + String storageQuota = tableConfig.getQuotaConfig() != null ? tableConfig.getQuotaConfig().getStorage() : "NA"; + pauseStatus = _llcRealtimeSegmentManager.pauseConsumption(tableNameWithType, + PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, + isQuotaExceeded ? "Storage quota of " + storageQuota + " exceeded." : "Table storage within quota limits"); + } + } + return new PauseState(pauseStatus.getPauseFlag(), pauseStatus.getReasonCode(), pauseStatus.getComment(), + pauseStatus.getTimestamp()); } private void runSegmentLevelValidation(TableConfig tableConfig, StreamConfig streamConfig) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java index ec7d34bd64..2ae652bf78 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.util.TableSizeReader; @@ -43,14 +44,19 @@ public class StorageQuotaChecker { private final ControllerMetrics _controllerMetrics; private final LeadControllerManager _leadControllerManager; private final PinotHelixResourceManager _pinotHelixResourceManager; + private final boolean _isEnabled; + private final int _timeoutMs; public StorageQuotaChecker(TableSizeReader tableSizeReader, ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager, - PinotHelixResourceManager pinotHelixResourceManager) { + PinotHelixResourceManager pinotHelixResourceManager, ControllerConf controllerConf) { _tableSizeReader = tableSizeReader; _controllerMetrics = controllerMetrics; _leadControllerManager = leadControllerManager; _pinotHelixResourceManager = pinotHelixResourceManager; + _isEnabled = controllerConf.getEnableStorageQuotaCheck(); + _timeoutMs = controllerConf.getServerAdminRequestTimeoutSeconds() * 1000; + Preconditions.checkArgument(_timeoutMs > 0, "Timeout value must be > 0, input: %s", _timeoutMs); } public static class QuotaCheckerResponse { @@ -75,9 +81,11 @@ public class StorageQuotaChecker { * Returns whether the new added segment is within the storage quota. */ public QuotaCheckerResponse isSegmentStorageWithinQuota(TableConfig tableConfig, String segmentName, - long segmentSizeInBytes, int timeoutMs) + long segmentSizeInBytes) throws InvalidConfigException { - Preconditions.checkArgument(timeoutMs > 0, "Timeout value must be > 0, input: %s", timeoutMs); + if (!_isEnabled) { + return success("Storage quota check is disabled, skipping the check"); + } // 1. Read table config // 2. read table size from all the servers @@ -102,7 +110,7 @@ public class StorageQuotaChecker { // read table size TableSizeReader.TableSubTypeSizeDetails tableSubtypeSize; try { - tableSubtypeSize = _tableSizeReader.getTableSubtypeSize(tableNameWithType, timeoutMs); + tableSubtypeSize = _tableSizeReader.getTableSubtypeSize(tableNameWithType, _timeoutMs); } catch (InvalidConfigException e) { LOGGER.error("Failed to get table size for table {}", tableNameWithType, e); throw e; @@ -113,7 +121,10 @@ public class StorageQuotaChecker { return success("Missing size reports from all servers. Bypassing storage quota check for " + tableNameWithType); } - if (tableSubtypeSize._missingSegments > 0) { + // The logic inside this if block is applicable for missing segments as well as + // when we are checking the quota for only existing segments (segmentSizeInBytes == 0) + // as in both cases quota is checked across existing segments estimated size alone + if (segmentSizeInBytes == 0 || tableSubtypeSize._missingSegments > 0) { if (tableSubtypeSize._estimatedSizeInBytes > allowedStorageBytes) { return failure("Table " + tableNameWithType + " already over quota. Estimated size for all replicas is " + DataSizeUtils.fromBytes(tableSubtypeSize._estimatedSizeInBytes) + ". Configured size for " + numReplicas @@ -205,4 +216,17 @@ public class StorageQuotaChecker { return failure(message); } } + + /** + * Checks whether the table is within the storage quota. + * @return true if storage quota is exceeded by the table, else false. + */ + public boolean isTableStorageQuotaExceeded(TableConfig tableConfig) { + try { + return !isSegmentStorageWithinQuota(tableConfig, null, 0)._isSegmentWithinQuota; + } catch (InvalidConfigException e) { + // skip the check upon exception + return false; + } + } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index b1f8520aad..7127294fee 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -1197,7 +1197,7 @@ public class PinotLLCRealtimeSegmentManagerTest { } @Override - protected IdealState getIdealState(String realtimeTableName) { + public IdealState getIdealState(String realtimeTableName) { return _idealState; } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java index 4d85223b41..19814928a4 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java @@ -30,6 +30,7 @@ import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor; +import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.LongMsgOffsetFactory; @@ -1395,6 +1396,11 @@ public class SegmentCompletionTest { _stoppedSegmentName = llcSegmentName; _stoppedInstance = instanceName; } + + @Override + public TableConfig getTableConfig(String realtimeTableName) { + return mock(TableConfig.class); + } } public static class MockSegmentCompletionManager extends SegmentCompletionManager { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java index 689fd622a1..3e42be2f50 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java @@ -23,6 +23,7 @@ import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.metrics.MetricValueUtils; +import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.util.TableSizeReader; @@ -31,6 +32,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.mockito.ArgumentMatchers.eq; @@ -43,12 +45,21 @@ import static org.testng.Assert.assertTrue; public class StorageQuotaCheckerTest { private static final String OFFLINE_TABLE_NAME = "testTable_OFFLINE"; + private static final String REALTIME_TABLE_NAME = "testTable_REALTIME"; private static final String SEGMENT_NAME = "testSegment"; private static final long SEGMENT_SIZE_IN_BYTES = 1024; private static final int NUM_REPLICAS = 2; private TableSizeReader _tableSizeReader; private StorageQuotaChecker _storageQuotaChecker; + private ControllerConf _controllerConf; + + @BeforeClass + public void init() { + _controllerConf = mock(ControllerConf.class); + when(_controllerConf.getServerAdminRequestTimeoutSeconds()).thenReturn(1); + when(_controllerConf.getEnableStorageQuotaCheck()).thenReturn(true); + } @Test public void testNoQuota() @@ -58,7 +69,7 @@ public class StorageQuotaCheckerTest { _tableSizeReader = mock(TableSizeReader.class); ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry()); _storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, controllerMetrics, - mock(LeadControllerManager.class), mock(PinotHelixResourceManager.class)); + mock(LeadControllerManager.class), mock(PinotHelixResourceManager.class), _controllerConf); tableConfig.setQuotaConfig(null); assertTrue(isSegmentWithinQuota(tableConfig)); } @@ -72,11 +83,39 @@ public class StorageQuotaCheckerTest { _tableSizeReader = mock(TableSizeReader.class); ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry()); _storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, controllerMetrics, - mock(LeadControllerManager.class), mock(PinotHelixResourceManager.class)); + mock(LeadControllerManager.class), mock(PinotHelixResourceManager.class), _controllerConf); tableConfig.setQuotaConfig(null); assertTrue(isSegmentWithinQuota(tableConfig)); } + @Test + public void testRealtimeTable() + throws InvalidConfigException { + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(REALTIME_TABLE_NAME) + .setNumReplicas(NUM_REPLICAS).build(); + _tableSizeReader = mock(TableSizeReader.class); + ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry()); + PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class); + when(pinotHelixResourceManager.getNumReplicas(eq(tableConfig))).thenReturn(NUM_REPLICAS); + _storageQuotaChecker = + new StorageQuotaChecker(_tableSizeReader, controllerMetrics, mock(LeadControllerManager.class), + pinotHelixResourceManager, _controllerConf); + + tableConfig.setQuotaConfig(new QuotaConfig(null, null)); + assertFalse(_storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig)); + + tableConfig.setQuotaConfig(new QuotaConfig("2.8K", null)); + + // Within quota but with missing segments, should pass without updating metrics + mockTableSizeResult(REALTIME_TABLE_NAME, 4 * 1024, 1); + assertFalse(_storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig)); + + + // Exceed quota and with missing segments, should fail without updating metrics + mockTableSizeResult(REALTIME_TABLE_NAME, 8 * 1024, 1); + assertTrue(_storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig)); + } + @Test public void testNoStorageQuotaConfig() throws InvalidConfigException { @@ -85,7 +124,7 @@ public class StorageQuotaCheckerTest { _tableSizeReader = mock(TableSizeReader.class); ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry()); _storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, controllerMetrics, - mock(LeadControllerManager.class), mock(PinotHelixResourceManager.class)); + mock(LeadControllerManager.class), mock(PinotHelixResourceManager.class), _controllerConf); tableConfig.setQuotaConfig(new QuotaConfig(null, null)); assertTrue(isSegmentWithinQuota(tableConfig)); } @@ -101,18 +140,18 @@ public class StorageQuotaCheckerTest { when(pinotHelixResourceManager.getNumReplicas(eq(tableConfig))).thenReturn(NUM_REPLICAS); _storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, controllerMetrics, mock(LeadControllerManager.class), - pinotHelixResourceManager); + pinotHelixResourceManager, _controllerConf); tableConfig.setQuotaConfig(new QuotaConfig("2.8K", null)); // No response from server, should pass without updating metrics - mockTableSizeResult(-1, 0); + mockTableSizeResult(OFFLINE_TABLE_NAME, -1, 0); assertTrue(isSegmentWithinQuota(tableConfig)); assertFalse( MetricValueUtils.tableGaugeExists(controllerMetrics, OFFLINE_TABLE_NAME, ControllerGauge.OFFLINE_TABLE_ESTIMATED_SIZE)); // Within quota but with missing segments, should pass without updating metrics - mockTableSizeResult(4 * 1024, 1); + mockTableSizeResult(OFFLINE_TABLE_NAME, 4 * 1024, 1); assertTrue(isSegmentWithinQuota(tableConfig)); assertFalse( MetricValueUtils.tableGaugeExists(controllerMetrics, OFFLINE_TABLE_NAME, @@ -120,21 +159,21 @@ public class StorageQuotaCheckerTest { // Exceed quota and with missing segments, should fail without updating metrics - mockTableSizeResult(8 * 1024, 1); + mockTableSizeResult(OFFLINE_TABLE_NAME, 8 * 1024, 1); assertFalse(isSegmentWithinQuota(tableConfig)); assertFalse( MetricValueUtils.tableGaugeExists(controllerMetrics, OFFLINE_TABLE_NAME, ControllerGauge.OFFLINE_TABLE_ESTIMATED_SIZE)); // Within quota without missing segments, should pass and update metrics - mockTableSizeResult(3 * 1024, 0); + mockTableSizeResult(OFFLINE_TABLE_NAME, 3 * 1024, 0); assertTrue(isSegmentWithinQuota(tableConfig)); assertEquals( MetricValueUtils.getTableGaugeValue(controllerMetrics, OFFLINE_TABLE_NAME, ControllerGauge.OFFLINE_TABLE_ESTIMATED_SIZE), 3 * 1024); // Exceed quota without missing segments, should fail and update metrics - mockTableSizeResult(4 * 1024, 0); + mockTableSizeResult(OFFLINE_TABLE_NAME, 4 * 1024, 0); assertFalse(isSegmentWithinQuota(tableConfig)); assertEquals( MetricValueUtils.getTableGaugeValue(controllerMetrics, OFFLINE_TABLE_NAME, @@ -144,15 +183,15 @@ public class StorageQuotaCheckerTest { private boolean isSegmentWithinQuota(TableConfig tableConfig) throws InvalidConfigException { return _storageQuotaChecker - .isSegmentStorageWithinQuota(tableConfig, SEGMENT_NAME, SEGMENT_SIZE_IN_BYTES, 1000)._isSegmentWithinQuota; + .isSegmentStorageWithinQuota(tableConfig, SEGMENT_NAME, SEGMENT_SIZE_IN_BYTES)._isSegmentWithinQuota; } - public void mockTableSizeResult(long tableSizeInBytes, int numMissingSegments) + public void mockTableSizeResult(String tableName, long tableSizeInBytes, int numMissingSegments) throws InvalidConfigException { TableSizeReader.TableSubTypeSizeDetails tableSizeResult = new TableSizeReader.TableSubTypeSizeDetails(); tableSizeResult._estimatedSizeInBytes = tableSizeInBytes; tableSizeResult._segments = Collections.emptyMap(); tableSizeResult._missingSegments = numMissingSegments; - when(_tableSizeReader.getTableSubtypeSize(OFFLINE_TABLE_NAME, 1000)).thenReturn(tableSizeResult); + when(_tableSizeReader.getTableSubtypeSize(tableName, 1000)).thenReturn(tableSizeResult); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java index 2a71d480fe..6a0f8c63e9 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java @@ -70,6 +70,6 @@ public class PauseState extends BaseJsonConfig { } public enum ReasonCode { - ADMINISTRATIVE + ADMINISTRATIVE, STORAGE_QUOTA_EXCEEDED } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org