This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e9271f6dd7 Storage Quota imposition on Realtime tables (#13584)
e9271f6dd7 is described below

commit e9271f6dd73cb1162e9743b14e61fd420dbf3e27
Author: Shounak kulkarni <shounakmk...@gmail.com>
AuthorDate: Wed Sep 18 17:52:46 2024 +0530

    Storage Quota imposition on Realtime tables (#13584)
    
    * Storage quota imposition on realtime tables
    
    * fix test mock
    
    * Handle IS update failure case
    
    * refactor
    
    * tests
    
    * move quota checker to PinotLLCRealtimeSegmentManager
    
    * test fix
    
    * Add tableStorageQuotaExceeded gauge on controller
    
    * refactor to use PauseState
    
    * revert TABLE_STORAGE_QUOTA_EXCEEDED metric
    
    * cosmetic
    
    * refactors
    
    * check consuming segments only when table is not paused
    
    * refactor
---
 .../pinot/controller/BaseControllerStarter.java    |  4 +-
 .../PinotSegmentUploadDownloadRestletResource.java |  2 +-
 .../api/upload/SegmentValidationUtils.java         |  9 +---
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  3 +-
 .../RealtimeSegmentValidationManager.java          | 47 ++++++++++++++--
 .../controller/validation/StorageQuotaChecker.java | 34 ++++++++++--
 .../PinotLLCRealtimeSegmentManagerTest.java        |  2 +-
 .../helix/core/realtime/SegmentCompletionTest.java |  6 +++
 .../validation/StorageQuotaCheckerTest.java        | 63 +++++++++++++++++-----
 .../apache/pinot/spi/config/table/PauseState.java  |  2 +-
 10 files changed, 138 insertions(+), 34 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 5e4ff8751f..44c8e96f36 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -493,7 +493,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
         new TableSizeReader(_executorService, _connectionManager, 
_controllerMetrics, _helixResourceManager,
             _leadControllerManager);
     _storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, 
_controllerMetrics, _leadControllerManager,
-        _helixResourceManager);
+        _helixResourceManager, _config);
 
     // Setting up periodic tasks
     List<PeriodicTask> controllerPeriodicTasks = 
setupControllerPeriodicTasks();
@@ -852,7 +852,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     periodicTasks.add(_offlineSegmentIntervalChecker);
     _realtimeSegmentValidationManager =
         new RealtimeSegmentValidationManager(_config, _helixResourceManager, 
_leadControllerManager,
-            _pinotLLCRealtimeSegmentManager, _validationMetrics, 
_controllerMetrics);
+            _pinotLLCRealtimeSegmentManager, _validationMetrics, 
_controllerMetrics, _storageQuotaChecker);
     periodicTasks.add(_realtimeSegmentValidationManager);
     _brokerResourceValidationManager =
         new BrokerResourceValidationManager(_config, _helixResourceManager, 
_leadControllerManager, _controllerMetrics);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 156a3e9095..5b7cbed00e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -355,7 +355,7 @@ public class PinotSegmentUploadDownloadRestletResource {
         untarredSegmentSizeInBytes = FileUtils.sizeOfDirectory(tempSegmentDir);
       }
       SegmentValidationUtils.checkStorageQuota(segmentName, 
untarredSegmentSizeInBytes, tableConfig,
-          _controllerConf, _storageQuotaChecker);
+          _storageQuotaChecker);
 
       // Encrypt segment
       String crypterNameInTableConfig = 
tableConfig.getValidationConfig().getCrypterClassName();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java
index dff3dd3d11..ee6219876f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.controller.api.upload;
 
 import javax.ws.rs.core.Response;
-import org.apache.pinot.controller.ControllerConf;
 import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.validation.StorageQuotaChecker;
 import org.apache.pinot.segment.spi.SegmentMetadata;
@@ -59,14 +58,10 @@ public class SegmentValidationUtils {
   }
 
   public static void checkStorageQuota(String segmentName, long 
segmentSizeInBytes, TableConfig tableConfig,
-      ControllerConf controllerConf, StorageQuotaChecker quotaChecker) {
-    if (!controllerConf.getEnableStorageQuotaCheck()) {
-      return;
-    }
+      StorageQuotaChecker quotaChecker) {
     StorageQuotaChecker.QuotaCheckerResponse response;
     try {
-      response = quotaChecker.isSegmentStorageWithinQuota(tableConfig, 
segmentName, segmentSizeInBytes,
-          controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+      response = quotaChecker.isSegmentStorageWithinQuota(tableConfig, 
segmentName, segmentSizeInBytes);
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER,
           String.format("Caught exception while checking the storage quota for 
segment: %s of table: %s", segmentName,
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 76bef151a0..d799000ed3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -426,8 +426,7 @@ public class PinotLLCRealtimeSegmentManager {
     }
   }
 
-  @VisibleForTesting
-  IdealState getIdealState(String realtimeTableName) {
+  public IdealState getIdealState(String realtimeTableName) {
     try {
       IdealState idealState = HelixHelper.getTableIdealState(_helixManager, 
realtimeTableName);
       Preconditions.checkState(idealState != null, "Failed to find IdealState 
for table: " + realtimeTableName);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 42ed189bbf..856d88c226 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -29,9 +29,11 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.metrics.ValidationMetrics;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.PauseStatusDetails;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.spi.config.table.PauseState;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.StreamConfig;
@@ -51,6 +53,7 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
   private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
   private final ValidationMetrics _validationMetrics;
   private final ControllerMetrics _controllerMetrics;
+  private final StorageQuotaChecker _storageQuotaChecker;
 
   private final int _segmentLevelValidationIntervalInSeconds;
   private long _lastSegmentLevelValidationRunTimeMs = 0L;
@@ -60,13 +63,14 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
 
   public RealtimeSegmentValidationManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
       LeadControllerManager leadControllerManager, 
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
-      ValidationMetrics validationMetrics, ControllerMetrics 
controllerMetrics) {
+      ValidationMetrics validationMetrics, ControllerMetrics 
controllerMetrics, StorageQuotaChecker quotaChecker) {
     super("RealtimeSegmentValidationManager", 
config.getRealtimeSegmentValidationFrequencyInSeconds(),
         config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), 
pinotHelixResourceManager,
         leadControllerManager, controllerMetrics);
     _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
     _validationMetrics = validationMetrics;
     _controllerMetrics = controllerMetrics;
+    _storageQuotaChecker = quotaChecker;
 
     _segmentLevelValidationIntervalInSeconds = 
config.getSegmentLevelValidationIntervalInSeconds();
     Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
@@ -108,8 +112,45 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
     if (context._runSegmentLevelValidation) {
       runSegmentLevelValidation(tableConfig, streamConfig);
     }
-    _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, 
streamConfig,
-        context._recreateDeletedConsumingSegment, context._offsetCriteria);
+
+    if (shouldEnsureConsuming(tableNameWithType, context)) {
+      _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, 
streamConfig,
+          context._recreateDeletedConsumingSegment, context._offsetCriteria);
+    }
+  }
+
+  private boolean shouldEnsureConsuming(String tableNameWithType, Context 
context) {
+    // Keeps the table paused/unpaused based pause validations.
+    // Skips updating the pause state if table is paused by admin
+    PauseState pauseState = computePauseState(tableNameWithType);
+    if (!pauseState.isPaused()) {
+      boolean unPausedUponStorageWithinQuota =
+        
pauseState.getReasonCode().equals(PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED);
+      if (unPausedUponStorageWithinQuota) {
+        // recreate consuming segments if table is resumed upon the table 
storage getting within quota limit
+        context._recreateDeletedConsumingSegment = true;
+      }
+    }
+    return !pauseState.isPaused();
+  }
+
+  private PauseState computePauseState(String tableNameWithType) {
+    PauseStatusDetails pauseStatus = 
_llcRealtimeSegmentManager.getPauseStatusDetails(tableNameWithType);
+    boolean isTablePaused = pauseStatus.getPauseFlag();
+    // if table is paused by admin then don't compute
+    if (!isTablePaused || 
pauseStatus.getReasonCode().equals(PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED))
 {
+      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      boolean isQuotaExceeded = 
_storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig);
+      // if quota breach and pause flag is not in sync, update the IS
+      if (isQuotaExceeded != isTablePaused) {
+        String storageQuota = tableConfig.getQuotaConfig() != null ? 
tableConfig.getQuotaConfig().getStorage() : "NA";
+        pauseStatus = 
_llcRealtimeSegmentManager.pauseConsumption(tableNameWithType,
+            PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED,
+            isQuotaExceeded ? "Storage quota of " + storageQuota + " 
exceeded." : "Table storage within quota limits");
+      }
+    }
+    return new PauseState(pauseStatus.getPauseFlag(), 
pauseStatus.getReasonCode(), pauseStatus.getComment(),
+        pauseStatus.getTimestamp());
   }
 
   private void runSegmentLevelValidation(TableConfig tableConfig, StreamConfig 
streamConfig) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
index ec7d34bd64..2ae652bf78 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.TableSizeReader;
@@ -43,14 +44,19 @@ public class StorageQuotaChecker {
   private final ControllerMetrics _controllerMetrics;
   private final LeadControllerManager _leadControllerManager;
   private final PinotHelixResourceManager _pinotHelixResourceManager;
+  private final boolean _isEnabled;
+  private final int _timeoutMs;
 
   public StorageQuotaChecker(TableSizeReader tableSizeReader,
       ControllerMetrics controllerMetrics, LeadControllerManager 
leadControllerManager,
-      PinotHelixResourceManager pinotHelixResourceManager) {
+      PinotHelixResourceManager pinotHelixResourceManager, ControllerConf 
controllerConf) {
     _tableSizeReader = tableSizeReader;
     _controllerMetrics = controllerMetrics;
     _leadControllerManager = leadControllerManager;
     _pinotHelixResourceManager = pinotHelixResourceManager;
+    _isEnabled = controllerConf.getEnableStorageQuotaCheck();
+    _timeoutMs = controllerConf.getServerAdminRequestTimeoutSeconds() * 1000;
+    Preconditions.checkArgument(_timeoutMs > 0, "Timeout value must be > 0, 
input: %s", _timeoutMs);
   }
 
   public static class QuotaCheckerResponse {
@@ -75,9 +81,11 @@ public class StorageQuotaChecker {
    * Returns whether the new added segment is within the storage quota.
    */
   public QuotaCheckerResponse isSegmentStorageWithinQuota(TableConfig 
tableConfig, String segmentName,
-      long segmentSizeInBytes, int timeoutMs)
+      long segmentSizeInBytes)
       throws InvalidConfigException {
-    Preconditions.checkArgument(timeoutMs > 0, "Timeout value must be > 0, 
input: %s", timeoutMs);
+    if (!_isEnabled) {
+      return success("Storage quota check is disabled, skipping the check");
+    }
 
     // 1. Read table config
     // 2. read table size from all the servers
@@ -102,7 +110,7 @@ public class StorageQuotaChecker {
     // read table size
     TableSizeReader.TableSubTypeSizeDetails tableSubtypeSize;
     try {
-      tableSubtypeSize = 
_tableSizeReader.getTableSubtypeSize(tableNameWithType, timeoutMs);
+      tableSubtypeSize = 
_tableSizeReader.getTableSubtypeSize(tableNameWithType, _timeoutMs);
     } catch (InvalidConfigException e) {
       LOGGER.error("Failed to get table size for table {}", tableNameWithType, 
e);
       throw e;
@@ -113,7 +121,10 @@ public class StorageQuotaChecker {
       return success("Missing size reports from all servers. Bypassing storage 
quota check for " + tableNameWithType);
     }
 
-    if (tableSubtypeSize._missingSegments > 0) {
+    // The logic inside this if block is applicable for missing segments as 
well as
+    // when we are checking the quota for only existing segments 
(segmentSizeInBytes == 0)
+    // as in both cases quota is checked across existing segments estimated 
size alone
+    if (segmentSizeInBytes == 0 || tableSubtypeSize._missingSegments > 0) {
       if (tableSubtypeSize._estimatedSizeInBytes > allowedStorageBytes) {
         return failure("Table " + tableNameWithType + " already over quota. 
Estimated size for all replicas is "
             + DataSizeUtils.fromBytes(tableSubtypeSize._estimatedSizeInBytes) 
+ ". Configured size for " + numReplicas
@@ -205,4 +216,17 @@ public class StorageQuotaChecker {
       return failure(message);
     }
   }
+
+  /**
+   * Checks whether the table is within the storage quota.
+   * @return true if storage quota is exceeded by the table, else false.
+   */
+  public boolean isTableStorageQuotaExceeded(TableConfig tableConfig) {
+    try {
+      return !isSegmentStorageWithinQuota(tableConfig, null, 
0)._isSegmentWithinQuota;
+    } catch (InvalidConfigException e) {
+      // skip the check upon exception
+      return false;
+    }
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index b1f8520aad..7127294fee 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -1197,7 +1197,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     }
 
     @Override
-    protected IdealState getIdealState(String realtimeTableName) {
+    public IdealState getIdealState(String realtimeTableName) {
       return _idealState;
     }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 4d85223b41..19814928a4 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -30,6 +30,7 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
@@ -1395,6 +1396,11 @@ public class SegmentCompletionTest {
       _stoppedSegmentName = llcSegmentName;
       _stoppedInstance = instanceName;
     }
+
+    @Override
+    public TableConfig getTableConfig(String realtimeTableName) {
+      return mock(TableConfig.class);
+    }
   }
 
   public static class MockSegmentCompletionManager extends 
SegmentCompletionManager {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
index 689fd622a1..3e42be2f50 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
@@ -23,6 +23,7 @@ import 
org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.metrics.MetricValueUtils;
+import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.TableSizeReader;
@@ -31,6 +32,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.eq;
@@ -43,12 +45,21 @@ import static org.testng.Assert.assertTrue;
 
 public class StorageQuotaCheckerTest {
   private static final String OFFLINE_TABLE_NAME = "testTable_OFFLINE";
+  private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
   private static final String SEGMENT_NAME = "testSegment";
   private static final long SEGMENT_SIZE_IN_BYTES = 1024;
   private static final int NUM_REPLICAS = 2;
 
   private TableSizeReader _tableSizeReader;
   private StorageQuotaChecker _storageQuotaChecker;
+  private ControllerConf _controllerConf;
+
+  @BeforeClass
+  public void init() {
+    _controllerConf = mock(ControllerConf.class);
+    when(_controllerConf.getServerAdminRequestTimeoutSeconds()).thenReturn(1);
+    when(_controllerConf.getEnableStorageQuotaCheck()).thenReturn(true);
+  }
 
   @Test
   public void testNoQuota()
@@ -58,7 +69,7 @@ public class StorageQuotaCheckerTest {
     _tableSizeReader = mock(TableSizeReader.class);
     ControllerMetrics controllerMetrics = new 
ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
     _storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, 
controllerMetrics,
-        mock(LeadControllerManager.class), 
mock(PinotHelixResourceManager.class));
+        mock(LeadControllerManager.class), 
mock(PinotHelixResourceManager.class), _controllerConf);
     tableConfig.setQuotaConfig(null);
     assertTrue(isSegmentWithinQuota(tableConfig));
   }
@@ -72,11 +83,39 @@ public class StorageQuotaCheckerTest {
     _tableSizeReader = mock(TableSizeReader.class);
     ControllerMetrics controllerMetrics = new 
ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
     _storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, 
controllerMetrics,
-        mock(LeadControllerManager.class), 
mock(PinotHelixResourceManager.class));
+        mock(LeadControllerManager.class), 
mock(PinotHelixResourceManager.class), _controllerConf);
     tableConfig.setQuotaConfig(null);
     assertTrue(isSegmentWithinQuota(tableConfig));
   }
 
+  @Test
+  public void testRealtimeTable()
+      throws InvalidConfigException {
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(REALTIME_TABLE_NAME)
+        .setNumReplicas(NUM_REPLICAS).build();
+    _tableSizeReader = mock(TableSizeReader.class);
+    ControllerMetrics controllerMetrics = new 
ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+    PinotHelixResourceManager pinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    
when(pinotHelixResourceManager.getNumReplicas(eq(tableConfig))).thenReturn(NUM_REPLICAS);
+    _storageQuotaChecker =
+        new StorageQuotaChecker(_tableSizeReader, controllerMetrics, 
mock(LeadControllerManager.class),
+            pinotHelixResourceManager, _controllerConf);
+
+    tableConfig.setQuotaConfig(new QuotaConfig(null, null));
+    assertFalse(_storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig));
+
+    tableConfig.setQuotaConfig(new QuotaConfig("2.8K", null));
+
+    // Within quota but with missing segments, should pass without updating 
metrics
+    mockTableSizeResult(REALTIME_TABLE_NAME, 4 * 1024, 1);
+    assertFalse(_storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig));
+
+
+    // Exceed quota and with missing segments, should fail without updating 
metrics
+    mockTableSizeResult(REALTIME_TABLE_NAME, 8 * 1024, 1);
+    assertTrue(_storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig));
+  }
+
   @Test
   public void testNoStorageQuotaConfig()
       throws InvalidConfigException {
@@ -85,7 +124,7 @@ public class StorageQuotaCheckerTest {
     _tableSizeReader = mock(TableSizeReader.class);
     ControllerMetrics controllerMetrics = new 
ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
     _storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, 
controllerMetrics,
-        mock(LeadControllerManager.class), 
mock(PinotHelixResourceManager.class));
+        mock(LeadControllerManager.class), 
mock(PinotHelixResourceManager.class), _controllerConf);
     tableConfig.setQuotaConfig(new QuotaConfig(null, null));
     assertTrue(isSegmentWithinQuota(tableConfig));
   }
@@ -101,18 +140,18 @@ public class StorageQuotaCheckerTest {
     
when(pinotHelixResourceManager.getNumReplicas(eq(tableConfig))).thenReturn(NUM_REPLICAS);
     _storageQuotaChecker =
         new StorageQuotaChecker(_tableSizeReader, controllerMetrics, 
mock(LeadControllerManager.class),
-            pinotHelixResourceManager);
+            pinotHelixResourceManager, _controllerConf);
     tableConfig.setQuotaConfig(new QuotaConfig("2.8K", null));
 
     // No response from server, should pass without updating metrics
-    mockTableSizeResult(-1, 0);
+    mockTableSizeResult(OFFLINE_TABLE_NAME, -1, 0);
     assertTrue(isSegmentWithinQuota(tableConfig));
     assertFalse(
         MetricValueUtils.tableGaugeExists(controllerMetrics, 
OFFLINE_TABLE_NAME,
             ControllerGauge.OFFLINE_TABLE_ESTIMATED_SIZE));
 
     // Within quota but with missing segments, should pass without updating 
metrics
-    mockTableSizeResult(4 * 1024, 1);
+    mockTableSizeResult(OFFLINE_TABLE_NAME, 4 * 1024, 1);
     assertTrue(isSegmentWithinQuota(tableConfig));
     assertFalse(
         MetricValueUtils.tableGaugeExists(controllerMetrics, 
OFFLINE_TABLE_NAME,
@@ -120,21 +159,21 @@ public class StorageQuotaCheckerTest {
 
 
     // Exceed quota and with missing segments, should fail without updating 
metrics
-    mockTableSizeResult(8 * 1024, 1);
+    mockTableSizeResult(OFFLINE_TABLE_NAME, 8 * 1024, 1);
     assertFalse(isSegmentWithinQuota(tableConfig));
     assertFalse(
         MetricValueUtils.tableGaugeExists(controllerMetrics, 
OFFLINE_TABLE_NAME,
             ControllerGauge.OFFLINE_TABLE_ESTIMATED_SIZE));
 
     // Within quota without missing segments, should pass and update metrics
-    mockTableSizeResult(3 * 1024, 0);
+    mockTableSizeResult(OFFLINE_TABLE_NAME, 3 * 1024, 0);
     assertTrue(isSegmentWithinQuota(tableConfig));
     assertEquals(
         MetricValueUtils.getTableGaugeValue(controllerMetrics, 
OFFLINE_TABLE_NAME,
             ControllerGauge.OFFLINE_TABLE_ESTIMATED_SIZE), 3 * 1024);
 
     // Exceed quota without missing segments, should fail and update metrics
-    mockTableSizeResult(4 * 1024, 0);
+    mockTableSizeResult(OFFLINE_TABLE_NAME, 4 * 1024, 0);
     assertFalse(isSegmentWithinQuota(tableConfig));
     assertEquals(
         MetricValueUtils.getTableGaugeValue(controllerMetrics, 
OFFLINE_TABLE_NAME,
@@ -144,15 +183,15 @@ public class StorageQuotaCheckerTest {
   private boolean isSegmentWithinQuota(TableConfig tableConfig)
       throws InvalidConfigException {
     return _storageQuotaChecker
-        .isSegmentStorageWithinQuota(tableConfig, SEGMENT_NAME, 
SEGMENT_SIZE_IN_BYTES, 1000)._isSegmentWithinQuota;
+        .isSegmentStorageWithinQuota(tableConfig, SEGMENT_NAME, 
SEGMENT_SIZE_IN_BYTES)._isSegmentWithinQuota;
   }
 
-  public void mockTableSizeResult(long tableSizeInBytes, int 
numMissingSegments)
+  public void mockTableSizeResult(String tableName, long tableSizeInBytes, int 
numMissingSegments)
       throws InvalidConfigException {
     TableSizeReader.TableSubTypeSizeDetails tableSizeResult = new 
TableSizeReader.TableSubTypeSizeDetails();
     tableSizeResult._estimatedSizeInBytes = tableSizeInBytes;
     tableSizeResult._segments = Collections.emptyMap();
     tableSizeResult._missingSegments = numMissingSegments;
-    when(_tableSizeReader.getTableSubtypeSize(OFFLINE_TABLE_NAME, 
1000)).thenReturn(tableSizeResult);
+    when(_tableSizeReader.getTableSubtypeSize(tableName, 
1000)).thenReturn(tableSizeResult);
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java
index 2a71d480fe..6a0f8c63e9 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java
@@ -70,6 +70,6 @@ public class PauseState extends BaseJsonConfig {
   }
 
   public enum ReasonCode {
-    ADMINISTRATIVE
+    ADMINISTRATIVE, STORAGE_QUOTA_EXCEEDED
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to