This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7239d34117 Added support to pause and resume ingestion based on 
resource utilization (#15008)
7239d34117 is described below

commit 7239d3411719f89370de8645979061647a843354
Author: Ragesh Rajagopalan <ragesh.rajagopa...@gmail.com>
AuthorDate: Sun Feb 16 21:04:34 2025 -0800

    Added support to pause and resume ingestion based on resource utilization 
(#15008)
---
 .../pinot/common/metrics/ControllerGauge.java      |   5 +-
 .../common/restlet/resources/DiskUsageInfo.java    |  74 +++++++++
 .../pinot/controller/BaseControllerStarter.java    |  19 ++-
 .../apache/pinot/controller/ControllerConf.java    |  38 +++++
 .../helix/core/minion/PinotTaskManager.java        |  25 ++-
 .../validation/DiskUtilizationChecker.java         | 136 ++++++++++++++++
 .../RealtimeSegmentValidationManager.java          |  38 ++++-
 .../validation/ResourceUtilizationChecker.java     |  86 ++++++++++
 .../validation/ResourceUtilizationInfo.java        |  45 ++++++
 .../validation/ResourceUtilizationManager.java     |  49 ++++++
 ...ControllerPeriodicTaskStarterStatelessTest.java |   2 +-
 .../validation/DiskUtilizationCheckerTest.java     | 178 +++++++++++++++++++++
 .../RealtimeSegmentValidationManagerTest.java      | 116 ++++++++++++++
 .../validation/ResourceUtilizationInfoTest.java    |  64 ++++++++
 .../validation/ResourceUtilizationManagerTest.java |  85 ++++++++++
 .../server/api/resources/DiskUtilization.java      |  66 ++++++++
 .../server/api/resources/InstanceResource.java     |  24 +++
 .../pinot/server/api/DiskUtilizationTest.java      |  52 ++++++
 .../apache/pinot/spi/config/table/PauseState.java  |   2 +-
 19 files changed, 1094 insertions(+), 10 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 66bb2be961..40e2d09b4f 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -187,7 +187,10 @@ public enum ControllerGauge implements 
AbstractMetrics.Gauge {
   TABLE_REBALANCE_IN_PROGRESS("tableRebalanceInProgress", false),
 
   // Number of reingested segments getting uploaded
-  REINGESTED_SEGMENT_UPLOADS_IN_PROGRESS("reingestedSegmentUploadsInProgress", 
true);
+  REINGESTED_SEGMENT_UPLOADS_IN_PROGRESS("reingestedSegmentUploadsInProgress", 
true),
+
+  // Resource utilization is within limits or not for a table
+  RESOURCE_UTILIZATION_LIMIT_EXCEEDED("ResourceUtilizationLimitExceeded", 
false);
 
   private final String _gaugeName;
   private final String _unit;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/DiskUsageInfo.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/DiskUsageInfo.java
new file mode 100644
index 0000000000..7f5d6ed138
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/DiskUsageInfo.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+/**
+ * A simple container class to hold disk usage information.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DiskUsageInfo {
+  private final String _instanceId;
+  private final String _path;
+  private final long _totalSpaceBytes;
+  private final long _usedSpaceBytes;
+  private final long _lastUpdatedTimeInEpochMs;
+
+  @JsonCreator
+  public DiskUsageInfo(@JsonProperty("instanceId") String instanceId, 
@JsonProperty("path") String path,
+      @JsonProperty("totalSpaceBytes") long totalSpaceBytes,
+      @JsonProperty("usedSpaceBytes") long usedSpaceBytes,
+      @JsonProperty("lastUpdatedTimeInEpochMs") long lastUpdatedTimeInEpochMs) 
{
+    _instanceId = instanceId;
+    _path = path;
+    _totalSpaceBytes = totalSpaceBytes;
+    _usedSpaceBytes = usedSpaceBytes;
+    _lastUpdatedTimeInEpochMs = lastUpdatedTimeInEpochMs;
+  }
+
+  public String getInstanceId() {
+    return _instanceId;
+  }
+
+  public String getPath() {
+    return _path;
+  }
+
+  public long getTotalSpaceBytes() {
+    return _totalSpaceBytes;
+  }
+
+  public long getUsedSpaceBytes() {
+    return _usedSpaceBytes;
+  }
+
+  public long getLastUpdatedTimeInEpochMs() {
+    return _lastUpdatedTimeInEpochMs;
+  }
+
+  public String toString() {
+    return "DiskUsageInfo{" + "_instanceId='" + _instanceId + '\'' + ", 
_path='" + _path + '\'' + ", _totalSpaceBytes="
+        + _totalSpaceBytes + ", _usedSpaceBytes=" + _usedSpaceBytes + ", 
_lastUpdatedTimeInEpochMs="
+        + _lastUpdatedTimeInEpochMs + '}';
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index e95e0f5d87..5761cb27e5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -113,8 +113,11 @@ import 
org.apache.pinot.controller.helix.starter.HelixConfig;
 import org.apache.pinot.controller.tuner.TableConfigTunerRegistry;
 import org.apache.pinot.controller.util.TableSizeReader;
 import org.apache.pinot.controller.validation.BrokerResourceValidationManager;
+import org.apache.pinot.controller.validation.DiskUtilizationChecker;
 import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
 import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
+import org.apache.pinot.controller.validation.ResourceUtilizationChecker;
+import org.apache.pinot.controller.validation.ResourceUtilizationManager;
 import org.apache.pinot.controller.validation.StorageQuotaChecker;
 import org.apache.pinot.core.periodictask.PeriodicTask;
 import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
@@ -204,6 +207,8 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
   protected ExecutorService _tenantRebalanceExecutorService;
   protected TableSizeReader _tableSizeReader;
   protected StorageQuotaChecker _storageQuotaChecker;
+  protected DiskUtilizationChecker _diskUtilizationChecker;
+  protected ResourceUtilizationManager _resourceUtilizationManager;
 
   @Override
   public void init(PinotConfiguration pinotConfiguration)
@@ -510,6 +515,9 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     _storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, 
_controllerMetrics, _leadControllerManager,
         _helixResourceManager, _config);
 
+    _diskUtilizationChecker = new 
DiskUtilizationChecker(_helixResourceManager, _config);
+    _resourceUtilizationManager = new ResourceUtilizationManager(_config, 
_diskUtilizationChecker);
+
     // Setting up periodic tasks
     List<PeriodicTask> controllerPeriodicTasks = 
setupControllerPeriodicTasks();
     LOGGER.info("Init controller periodic tasks scheduler");
@@ -561,6 +569,8 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
         bind(_tenantRebalancer).to(TenantRebalancer.class);
         bind(_tableSizeReader).to(TableSizeReader.class);
         bind(_storageQuotaChecker).to(StorageQuotaChecker.class);
+        bind(_diskUtilizationChecker).to(DiskUtilizationChecker.class);
+        bind(_resourceUtilizationManager).to(ResourceUtilizationManager.class);
         
bind(controllerStartTime).named(ControllerAdminApiApplication.START_TIME);
         String loggerRootDir = 
_config.getProperty(CommonConstants.Controller.CONFIG_OF_LOGGER_ROOT_DIR);
         if (loggerRootDir != null) {
@@ -862,7 +872,8 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     _taskManagerStatusCache = getTaskManagerStatusCache();
     _taskManager =
         new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, 
_leadControllerManager, _config,
-            _controllerMetrics, _taskManagerStatusCache, _executorService, 
_connectionManager);
+            _controllerMetrics, _taskManagerStatusCache, _executorService, 
_connectionManager,
+            _resourceUtilizationManager);
     periodicTasks.add(_taskManager);
     _retentionManager =
         new RetentionManager(_helixResourceManager, _leadControllerManager, 
_config, _controllerMetrics);
@@ -873,7 +884,8 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     periodicTasks.add(_offlineSegmentIntervalChecker);
     _realtimeSegmentValidationManager =
         new RealtimeSegmentValidationManager(_config, _helixResourceManager, 
_leadControllerManager,
-            _pinotLLCRealtimeSegmentManager, _validationMetrics, 
_controllerMetrics, _storageQuotaChecker);
+            _pinotLLCRealtimeSegmentManager, _validationMetrics, 
_controllerMetrics, _storageQuotaChecker,
+            _resourceUtilizationManager);
     periodicTasks.add(_realtimeSegmentValidationManager);
     _brokerResourceValidationManager =
         new BrokerResourceValidationManager(_config, _helixResourceManager, 
_leadControllerManager, _controllerMetrics);
@@ -902,6 +914,9 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     PeriodicTask responseStoreCleaner = new ResponseStoreCleaner(_config, 
_helixResourceManager, _leadControllerManager,
         _controllerMetrics, _executorService, _connectionManager);
     periodicTasks.add(responseStoreCleaner);
+    PeriodicTask resourceUtilizationChecker = new 
ResourceUtilizationChecker(_config, _connectionManager,
+        _controllerMetrics, _diskUtilizationChecker, _executorService, 
_helixResourceManager);
+    periodicTasks.add(resourceUtilizationChecker);
 
     return periodicTasks;
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 5ea1fd20cf..d3396cd3a7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -300,6 +300,14 @@ public class ControllerConf extends PinotConfiguration {
   private static final String REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS =
       "controller.realtime.segment.metadata.commit.numLocks";
   private static final String ENABLE_STORAGE_QUOTA_CHECK = 
"controller.enable.storage.quota.check";
+  private static final String DISK_UTILIZATION_THRESHOLD = 
"controller.disk.utilization.threshold"; // 0 < threshold < 1
+  private static final String DISK_UTILIZATION_CHECK_TIMEOUT_MS = 
"controller.disk.utilization.check.timeoutMs";
+  private static final String DISK_UTILIZATION_PATH = 
"controller.disk.utilization.path";
+  private static final String ENABLE_RESOURCE_UTILIZATION_CHECK = 
"controller.enable.resource.utilization.check";
+  private static final String RESOURCE_UTILIZATION_CHECKER_INITIAL_DELAY =
+      "controller.resource.utilization.checker.initial.delay";
+  private static final String RESOURCE_UTILIZATION_CHECKER_FREQUENCY =
+      "controller.resource.utilization.checker.frequency";
   private static final String ENABLE_BATCH_MESSAGE_MODE = 
"controller.enable.batch.message.mode";
   public static final String DIM_TABLE_MAX_SIZE = 
"controller.dimTable.maxSize";
 
@@ -323,6 +331,12 @@ public class ControllerConf extends PinotConfiguration {
   private static final int DEFAULT_MIN_NUM_CHARS_IN_IS_TO_TURN_ON_COMPRESSION 
= -1;
   private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 
64;
   private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true;
+  private static final double DEFAULT_DISK_UTILIZATION_THRESHOLD = 0.95;
+  private static final int DEFAULT_DISK_UTILIZATION_CHECK_TIMEOUT_MS = 30_000;
+  private static final String DEFAULT_DISK_UTILIZATION_PATH = 
"/home/pinot/data";
+  private static final boolean DEFAULT_ENABLE_RESOURCE_UTILIZATION_CHECK = 
false;
+  private static final long DEFAULT_RESOURCE_UTILIZATION_CHECKER_INITIAL_DELAY 
= 300L; // 5 minutes
+  private static final long DEFAULT_RESOURCE_UTILIZATION_CHECKER_FREQUENCY = 
300L; // 5 minutes
   private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = false;
   // Disallow any high level consumer (HLC) table
   private static final boolean DEFAULT_ALLOW_HLC_TABLES = false;
@@ -987,6 +1001,30 @@ public class ControllerConf extends PinotConfiguration {
     return getProperty(ENABLE_STORAGE_QUOTA_CHECK, 
DEFAULT_ENABLE_STORAGE_QUOTA_CHECK);
   }
 
+  public String getDiskUtilizationPath() {
+    return getProperty(DISK_UTILIZATION_PATH, DEFAULT_DISK_UTILIZATION_PATH);
+  }
+
+  public double getDiskUtilizationThreshold() {
+    return getProperty(DISK_UTILIZATION_THRESHOLD, 
DEFAULT_DISK_UTILIZATION_THRESHOLD);
+  }
+
+  public int getDiskUtilizationCheckTimeoutMs() {
+    return getProperty(DISK_UTILIZATION_CHECK_TIMEOUT_MS, 
DEFAULT_DISK_UTILIZATION_CHECK_TIMEOUT_MS);
+  }
+
+  public long getResourceUtilizationCheckerInitialDelay() {
+    return getProperty(RESOURCE_UTILIZATION_CHECKER_INITIAL_DELAY, 
DEFAULT_RESOURCE_UTILIZATION_CHECKER_INITIAL_DELAY);
+  }
+
+  public long getResourceUtilizationCheckerFrequency() {
+    return getProperty(RESOURCE_UTILIZATION_CHECKER_FREQUENCY, 
DEFAULT_RESOURCE_UTILIZATION_CHECKER_FREQUENCY);
+  }
+
+  public boolean isResourceUtilizationCheckEnabled() {
+    return getProperty(ENABLE_RESOURCE_UTILIZATION_CHECK, 
DEFAULT_ENABLE_RESOURCE_UTILIZATION_CHECK);
+  }
+
   public boolean getEnableBatchMessageMode() {
     return getProperty(ENABLE_BATCH_MESSAGE_MODE, 
DEFAULT_ENABLE_BATCH_MESSAGE_MODE);
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 9f1f938187..c62d7a1035 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -55,6 +55,7 @@ import 
org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
 import 
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
 import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.controller.validation.ResourceUtilizationManager;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
@@ -100,6 +101,7 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
   private final PinotHelixTaskResourceManager _helixTaskResourceManager;
   private final ClusterInfoAccessor _clusterInfoAccessor;
   private final TaskGeneratorRegistry _taskGeneratorRegistry;
+  private final ResourceUtilizationManager _resourceUtilizationManager;
 
   // For cron-based scheduling
   private final Scheduler _scheduler;
@@ -120,11 +122,12 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
       PinotHelixResourceManager helixResourceManager, LeadControllerManager 
leadControllerManager,
       ControllerConf controllerConf, ControllerMetrics controllerMetrics,
       TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> 
taskManagerStatusCache, Executor executor,
-      PoolingHttpClientConnectionManager connectionManager) {
+      PoolingHttpClientConnectionManager connectionManager, 
ResourceUtilizationManager resourceUtilizationManager) {
     super("PinotTaskManager", 
controllerConf.getTaskManagerFrequencyInSeconds(),
         controllerConf.getPinotTaskManagerInitialDelaySeconds(), 
helixResourceManager, leadControllerManager,
         controllerMetrics);
     _helixTaskResourceManager = helixTaskResourceManager;
+    _resourceUtilizationManager = resourceUtilizationManager;
     _taskManagerStatusCache = taskManagerStatusCache;
     _clusterInfoAccessor =
         new ClusterInfoAccessor(helixResourceManager, 
helixTaskResourceManager, controllerConf, controllerMetrics,
@@ -205,6 +208,16 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
     for (String tableNameWithType : tableNameWithTypes) {
       TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
       LOGGER.info("Trying to create tasks of type: {}, table: {}", taskType, 
tableNameWithType);
+      try {
+        if 
(!_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableNameWithType))
 {
+          LOGGER.warn("Resource utilization is above threshold, skipping task 
creation for table: {}", tableName);
+          _controllerMetrics.setOrUpdateTableGauge(tableName, 
ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 1L);
+          continue;
+        }
+        _controllerMetrics.setOrUpdateTableGauge(tableName, 
ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 0L);
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while checking resource utilization for 
table: {}", tableName, e);
+      }
       List<PinotTaskConfig> pinotTaskConfigs = 
taskGenerator.generateTasks(tableConfig, taskConfigs);
       if (pinotTaskConfigs.isEmpty()) {
         LOGGER.warn("No ad-hoc task generated for task type: {}", taskType);
@@ -695,6 +708,16 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
     for (TableConfig tableConfig : enabledTableConfigs) {
       String tableName = tableConfig.getTableName();
       try {
+        if 
(!_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableName)) {
+          String message = String.format("Skipping tasks generation as 
resource utilization is not within limits for "
+              + "table: %s. Disk utilization for one or more servers hosting 
this table has exceeded the threshold. "
+              + "Tasks won't be generated until the issue is mitigated.", 
tableName);
+          LOGGER.warn(message);
+          response.addSchedulingError(message);
+          _controllerMetrics.setOrUpdateTableGauge(tableName, 
ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 1L);
+          continue;
+        }
+        _controllerMetrics.setOrUpdateTableGauge(tableName, 
ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 0L);
         String minionInstanceTag = minionInstanceTagForTask != null ? 
minionInstanceTagForTask
             : taskGenerator.getMinionInstanceTag(tableConfig);
         List<PinotTaskConfig> presentTaskConfig =
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/DiskUtilizationChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/DiskUtilizationChecker.java
new file mode 100644
index 0000000000..8edd8b6cd1
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/DiskUtilizationChecker.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import com.google.common.collect.BiMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.restlet.resources.DiskUsageInfo;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.CompletionServiceHelper;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DiskUtilizationChecker {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DiskUtilizationChecker.class);
+  private final int _resourceUtilizationCheckTimeoutMs;
+  private final long _resourceUtilizationCheckerFrequencyMs;
+  private final double _diskUtilizationThreshold;
+  private final String _diskUtilizationPath;
+  public static final String DISK_UTILIZATION_API_PATH = 
"/instance/diskUtilization";
+
+  private final PinotHelixResourceManager _helixResourceManager;
+
+  public DiskUtilizationChecker(PinotHelixResourceManager 
helixResourceManager, ControllerConf controllerConf) {
+    _helixResourceManager = helixResourceManager;
+    _diskUtilizationPath = controllerConf.getDiskUtilizationPath();
+    _diskUtilizationThreshold = controllerConf.getDiskUtilizationThreshold();
+    _resourceUtilizationCheckTimeoutMs = 
controllerConf.getDiskUtilizationCheckTimeoutMs();
+    _resourceUtilizationCheckerFrequencyMs = 
controllerConf.getResourceUtilizationCheckerFrequency() * 1000;
+  }
+
+  /**
+   * Check if disk utilization for the requested table is within the 
configured limits.
+   */
+  public boolean isDiskUtilizationWithinLimits(String tableNameWithType) {
+    if (StringUtils.isEmpty(tableNameWithType)) {
+      throw new IllegalArgumentException("Table name found to be null or empty 
while computing disk utilization.");
+    }
+    TableConfig tableConfig = 
_helixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      LOGGER.warn("Table config for table: {} is null", tableNameWithType);
+      return true; // table does not exist
+    }
+    List<String> instances;
+    if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+      instances = 
_helixResourceManager.getServerInstancesForTable(tableNameWithType, 
TableType.OFFLINE);
+    } else {
+      instances = 
_helixResourceManager.getServerInstancesForTable(tableNameWithType, 
TableType.REALTIME);
+    }
+    return isDiskUtilizationWithinLimits(instances);
+  }
+
+  private boolean isDiskUtilizationWithinLimits(List<String> instances) {
+    for (String instance : instances) {
+      DiskUsageInfo diskUsageInfo = 
ResourceUtilizationInfo.getDiskUsageInfo(instance);
+      if (diskUsageInfo == null) {
+        LOGGER.warn("Disk utilization info for server: {} is null", instance);
+        continue;
+      }
+      // Ignore if the disk utilization info is stale. The info is considered 
stale if it is older than the
+      // ResourceUtilizationChecker tasks frequency.
+      if (diskUsageInfo.getLastUpdatedTimeInEpochMs()
+          < System.currentTimeMillis() - 
_resourceUtilizationCheckerFrequencyMs) {
+        LOGGER.warn("Disk utilization info for server: {} is stale", instance);
+        continue;
+      }
+      if (diskUsageInfo.getUsedSpaceBytes() > 
diskUsageInfo.getTotalSpaceBytes() * _diskUtilizationThreshold) {
+        LOGGER.warn("Disk utilization for server: {} is above threshold: {}%. 
UsedBytes: {}, TotalBytes: {}",
+            instance, diskUsageInfo.getUsedSpaceBytes() * 100 / 
diskUsageInfo.getTotalSpaceBytes(), diskUsageInfo
+                .getUsedSpaceBytes(), diskUsageInfo.getTotalSpaceBytes());
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Compute disk utilization for the requested instances using the 
<code>CompletionServiceHelper</code>.
+   */
+  public void computeDiskUtilization(BiMap<String, String> 
endpointsToInstances,
+      CompletionServiceHelper completionServiceHelper) {
+    List<String> diskUtilizationUris = new 
ArrayList<>(endpointsToInstances.size());
+    for (String endpoint : endpointsToInstances.keySet()) {
+      String diskUtilizationUri = endpoint + DISK_UTILIZATION_API_PATH;
+      diskUtilizationUris.add(diskUtilizationUri);
+    }
+    Map<String, String> reqHeaders = new HashMap<>();
+    reqHeaders.put("diskUtilizationPath", _diskUtilizationPath);
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(diskUtilizationUris, 
"no-op", false, reqHeaders,
+            _resourceUtilizationCheckTimeoutMs, "get disk utilization info 
from servers");
+    LOGGER.info("Service response: {}", serviceResponse._httpResponses);
+    Map<String, DiskUsageInfo> diskUsageInfoMap = new HashMap<>();
+    for (Map.Entry<String, String> streamResponse : 
serviceResponse._httpResponses.entrySet()) {
+      try {
+        DiskUsageInfo diskUsageInfo = 
JsonUtils.stringToObject(streamResponse.getValue(), DiskUsageInfo.class);
+        if (diskUsageInfo != null && 
StringUtils.isNotEmpty(diskUsageInfo.getInstanceId())) {
+          LOGGER.debug("Disk utilization for instance: {} is {}", 
diskUsageInfo.getInstanceId(), diskUsageInfo);
+          diskUsageInfoMap.put(diskUsageInfo.getInstanceId(), diskUsageInfo);
+        } else {
+          LOGGER.warn("Disk utilization info for server {} is null or empty", 
streamResponse.getKey());
+        }
+      } catch (Exception e) {
+        LOGGER.warn("Unable to parse server {} response due to an error: ", 
streamResponse.getKey(), e);
+      }
+    }
+    if (!diskUsageInfoMap.isEmpty()) {
+      ResourceUtilizationInfo.setDiskUsageInfo(diskUsageInfoMap);
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 398e06c272..f25f27bda0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -26,6 +26,7 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.metrics.ValidationMetrics;
@@ -58,6 +59,7 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
   private final ValidationMetrics _validationMetrics;
   private final ControllerMetrics _controllerMetrics;
   private final StorageQuotaChecker _storageQuotaChecker;
+  private final ResourceUtilizationManager _resourceUtilizationManager;
 
   private final int _segmentLevelValidationIntervalInSeconds;
   private long _lastSegmentLevelValidationRunTimeMs = 0L;
@@ -67,7 +69,8 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
 
   public RealtimeSegmentValidationManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
       LeadControllerManager leadControllerManager, 
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
-      ValidationMetrics validationMetrics, ControllerMetrics 
controllerMetrics, StorageQuotaChecker quotaChecker) {
+      ValidationMetrics validationMetrics, ControllerMetrics 
controllerMetrics, StorageQuotaChecker quotaChecker,
+      ResourceUtilizationManager resourceUtilizationManager) {
     super("RealtimeSegmentValidationManager", 
config.getRealtimeSegmentValidationFrequencyInSeconds(),
         config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), 
pinotHelixResourceManager,
         leadControllerManager, controllerMetrics);
@@ -75,6 +78,7 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
     _validationMetrics = validationMetrics;
     _controllerMetrics = controllerMetrics;
     _storageQuotaChecker = quotaChecker;
+    _resourceUtilizationManager = resourceUtilizationManager;
 
     _segmentLevelValidationIntervalInSeconds = 
config.getSegmentLevelValidationIntervalInSeconds();
     _segmentAutoResetOnErrorAtValidation = 
config.isAutoResetErrorSegmentsOnValidationEnabled();
@@ -140,13 +144,39 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
    * Skips updating the pause state if table is paused by admin.
    * Returns true if table is not paused
    */
-  private boolean shouldEnsureConsuming(String tableNameWithType) {
+  @VisibleForTesting
+  boolean shouldEnsureConsuming(String tableNameWithType) {
     PauseStatusDetails pauseStatus = 
_llcRealtimeSegmentManager.getPauseStatusDetails(tableNameWithType);
     boolean isTablePaused = pauseStatus.getPauseFlag();
-    // if table is paused by admin then don't compute
     if (isTablePaused && 
pauseStatus.getReasonCode().equals(PauseState.ReasonCode.ADMINISTRATIVE)) {
-      return false;
+      return false; // if table is paused by admin, then skip subsequent checks
+    }
+    // Perform resource utilization checks.
+    boolean isResourceUtilizationWithinLimits =
+        
_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableNameWithType);
+    if (!isResourceUtilizationWithinLimits) {
+      LOGGER.warn("Resource utilization limit exceeded for table: {}", 
tableNameWithType);
+      _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, 
ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
+          1L);
+      // update IS when table is not paused or paused with another reason code
+      if (!isTablePaused || !pauseStatus.getReasonCode()
+          .equals(PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED)) {
+        _llcRealtimeSegmentManager.pauseConsumption(tableNameWithType,
+            PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 
"Resource utilization limit exceeded.");
+      }
+      return false; // if resource utilization check failed, then skip 
subsequent checks
+      // within limits and table previously paused by resource utilization --> 
unpause
+    } else if (isTablePaused && pauseStatus.getReasonCode()
+        .equals(PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED)) {
+      // unset the pause state and allow consuming segment recreation.
+      
_llcRealtimeSegmentManager.updatePauseStateInIdealState(tableNameWithType, 
false,
+          PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, "Resource 
utilization within limits");
+      pauseStatus = 
_llcRealtimeSegmentManager.getPauseStatusDetails(tableNameWithType);
+      isTablePaused = pauseStatus.getPauseFlag();
     }
+    _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, 
ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
+        0L);
+    // Perform storage quota checks.
     TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
     boolean isQuotaExceeded = 
_storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig);
     if (isQuotaExceeded == isTablePaused) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationChecker.java
new file mode 100644
index 0000000000..60990f5151
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationChecker.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import com.google.common.collect.BiMap;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.CompletionServiceHelper;
+import org.apache.pinot.core.periodictask.BasePeriodicTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class is responsible for checking resource utilization for Pinot 
instances. To begin with, it checks
+ * disk utilization for all server instances. The computed disk utilization is 
stored in the class
+ * <code>org.apache.pinot.controller.validation.ResourceUtilizationInfo</code>.
+ */
+public class ResourceUtilizationChecker extends BasePeriodicTask {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResourceUtilizationChecker.class);
+  private final static String TASK_NAME = 
ResourceUtilizationChecker.class.getSimpleName();
+
+  private final PoolingHttpClientConnectionManager _connectionManager;
+  private final ControllerMetrics _controllerMetrics;
+  private final DiskUtilizationChecker _diskUtilizationChecker;
+  private final Executor _executor;
+  private final PinotHelixResourceManager _helixResourceManager;
+
+  public ResourceUtilizationChecker(ControllerConf config, 
PoolingHttpClientConnectionManager connectionManager,
+      ControllerMetrics controllerMetrics, DiskUtilizationChecker 
diskUtilizationChecker, Executor executor,
+      PinotHelixResourceManager pinotHelixResourceManager) {
+    super(TASK_NAME, config.getResourceUtilizationCheckerFrequency(),
+        config.getResourceUtilizationCheckerInitialDelay());
+    _connectionManager = connectionManager;
+    _controllerMetrics = controllerMetrics;
+    _diskUtilizationChecker = diskUtilizationChecker;
+    _executor = executor;
+    _helixResourceManager = pinotHelixResourceManager;
+  }
+
+  @Override
+  protected final void runTask(Properties periodicTaskProperties) {
+    _controllerMetrics.addMeteredTableValue(_taskName, 
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN, 1L);
+    Set<String> instances = new HashSet<>();
+    try {
+      Set<String> serverTenantNames = 
_helixResourceManager.getAllServerTenantNames();
+      for (String serverTenantName : serverTenantNames) {
+        Set<String> instancesForServerTenant = 
_helixResourceManager.getAllInstancesForServerTenant(serverTenantName);
+        if (!instancesForServerTenant.isEmpty()) {
+          instances.addAll(instancesForServerTenant);
+        }
+      }
+      BiMap<String, String> instanceAdminEndpoints = 
_helixResourceManager.getDataInstanceAdminEndpoints(instances);
+      BiMap<String, String> endpointsToInstances = 
instanceAdminEndpoints.inverse();
+      CompletionServiceHelper completionServiceHelper =
+          new CompletionServiceHelper(_executor, _connectionManager, 
endpointsToInstances);
+      _diskUtilizationChecker.computeDiskUtilization(endpointsToInstances, 
completionServiceHelper);
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while running task: {}", _taskName, e);
+      _controllerMetrics.addMeteredTableValue(_taskName, 
ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR, 1L);
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationInfo.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationInfo.java
new file mode 100644
index 0000000000..d781311ce0
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationInfo.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.restlet.resources.DiskUsageInfo;
+
+
+/**
+ * This class is used to capture resource utilization information for all 
instances. The periodic task,
+ * <code>ResourceUtilizationChecker</code>, will update this information.
+ */
+public class ResourceUtilizationInfo {
+
+  private ResourceUtilizationInfo() {
+  }
+
+  // Assumption – instanceId is unique across all tenants
+  private static Map<String, DiskUsageInfo> _instanceDiskUsageInfo = new 
HashMap<>();
+
+  public static DiskUsageInfo getDiskUsageInfo(String instanceId) {
+    return _instanceDiskUsageInfo.get(instanceId);
+  }
+
+  public static void setDiskUsageInfo(Map<String, DiskUsageInfo> 
newDiskUsageInfo) {
+    _instanceDiskUsageInfo = newDiskUsageInfo;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationManager.java
new file mode 100644
index 0000000000..c53507913b
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/ResourceUtilizationManager.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.controller.ControllerConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ResourceUtilizationManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResourceUtilizationManager.class);
+
+  private final boolean _isResourceUtilizationCheckEnabled;
+  private final DiskUtilizationChecker _diskUtilizationChecker;
+
+  public ResourceUtilizationManager(ControllerConf controllerConf, 
DiskUtilizationChecker diskUtilizationChecker) {
+    _isResourceUtilizationCheckEnabled = 
controllerConf.isResourceUtilizationCheckEnabled();
+    LOGGER.info("Resource utilization check is: {}", 
_isResourceUtilizationCheckEnabled ? "enabled" : "disabled");
+    _diskUtilizationChecker = diskUtilizationChecker;
+  }
+
+  public boolean isResourceUtilizationWithinLimits(String tableNameWithType) {
+    if (!_isResourceUtilizationCheckEnabled) {
+      return true;
+    }
+    if (StringUtils.isEmpty(tableNameWithType)) {
+      throw new IllegalArgumentException("Table name found to be null or empty 
while checking resource utilization.");
+    }
+    LOGGER.info("Checking resource utilization for table: {}", 
tableNameWithType);
+    return 
_diskUtilizationChecker.isDiskUtilizationWithinLimits(tableNameWithType);
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
index 305c0a26a0..413d23c383 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
@@ -57,7 +57,7 @@ public class ControllerPeriodicTaskStarterStatelessTest 
extends ControllerTest {
   }
 
   private class MockControllerStarter extends ControllerStarter {
-    private static final int NUM_PERIODIC_TASKS = 12;
+    private static final int NUM_PERIODIC_TASKS = 13;
 
     public MockControllerStarter() {
       super();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/DiskUtilizationCheckerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/DiskUtilizationCheckerTest.java
new file mode 100644
index 0000000000..ea0b5aa2ba
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/DiskUtilizationCheckerTest.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.restlet.resources.DiskUsageInfo;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.CompletionServiceHelper;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class DiskUtilizationCheckerTest {
+
+  private static final String DISK_UTILIZATION_PATH = "/disk/utilization/path";
+  private PinotHelixResourceManager _helixResourceManager;
+  private ControllerConf _controllerConf;
+  private DiskUtilizationChecker _diskUtilizationChecker;
+
+  @BeforeMethod
+  public void setUp() {
+    _helixResourceManager = mock(PinotHelixResourceManager.class);
+    _controllerConf = mock(ControllerConf.class);
+
+    
when(_controllerConf.getDiskUtilizationPath()).thenReturn(DISK_UTILIZATION_PATH);
+    when(_controllerConf.getDiskUtilizationThreshold()).thenReturn(0.8);
+    when(_controllerConf.getDiskUtilizationCheckTimeoutMs()).thenReturn(5000);
+    
when(_controllerConf.getResourceUtilizationCheckerFrequency()).thenReturn(120L);
+
+    _diskUtilizationChecker = new 
DiskUtilizationChecker(_helixResourceManager, _controllerConf);
+  }
+
+  @Test
+  public void testIsDiskUtilizationWithinLimitsNullOrEmptyTableName() {
+    Assert.assertThrows(IllegalArgumentException.class,
+        () -> _diskUtilizationChecker.isDiskUtilizationWithinLimits(null));
+    Assert.assertThrows(IllegalArgumentException.class,
+        () -> _diskUtilizationChecker.isDiskUtilizationWithinLimits(""));
+  }
+
+  @Test
+  public void testIsDiskUtilizationWithinLimitsNonExistentOfflineTable() {
+    String tableName = "test_OFFLINE";
+    
when(_helixResourceManager.getOfflineTableConfig(tableName)).thenReturn(null);
+
+    boolean result = 
_diskUtilizationChecker.isDiskUtilizationWithinLimits(tableName);
+    Assert.assertTrue(result);
+  }
+
+  @Test
+  public void testIsDiskUtilizationWithinLimitsNonExistentRealtimeTable() {
+    String tableName = "test_REALTIME";
+    
when(_helixResourceManager.getOfflineTableConfig(tableName)).thenReturn(null);
+
+    boolean result = 
_diskUtilizationChecker.isDiskUtilizationWithinLimits(tableName);
+    Assert.assertTrue(result);
+  }
+
+  @Test
+  public void testIsDiskUtilizationWithinLimitsValidOfflineTable() {
+    String tableName = "test_OFFLINE";
+
+    TableConfig mockTableConfig = mock(TableConfig.class);
+    
when(_helixResourceManager.getOfflineTableConfig(tableName)).thenReturn(mockTableConfig);
+
+    List<String> mockInstances = Arrays.asList("server1", "server2");
+    when(_helixResourceManager.getServerInstancesForTable(tableName, 
TableType.OFFLINE)).thenReturn(mockInstances);
+
+    // Mock disk usage
+    Map<String, DiskUsageInfo> diskUsageInfoMap = new HashMap<>();
+    DiskUsageInfo diskUsageInfo1 =
+        new DiskUsageInfo("server1", DISK_UTILIZATION_PATH, 1000L, 500L, 
System.currentTimeMillis());
+    diskUsageInfoMap.put("server1", diskUsageInfo1);
+
+    DiskUsageInfo diskUsageInfo2 =
+        new DiskUsageInfo("server2", DISK_UTILIZATION_PATH, 2000L, 1000L, 
System.currentTimeMillis());
+    diskUsageInfoMap.put("server2", diskUsageInfo2);
+    ResourceUtilizationInfo.setDiskUsageInfo(diskUsageInfoMap);
+
+    boolean result = 
_diskUtilizationChecker.isDiskUtilizationWithinLimits(tableName);
+    Assert.assertTrue(result);
+  }
+
+  @Test
+  public void testIsDiskUtilizationWithinLimitsAboveThreshold() {
+    String tableName = "test_OFFLINE";
+
+    TableConfig mockTableConfig = mock(TableConfig.class);
+    
when(_helixResourceManager.getTableConfig(tableName)).thenReturn(mockTableConfig);
+
+    List<String> mockInstances = Arrays.asList("server1", "server2");
+    when(_helixResourceManager.getServerInstancesForTable(tableName, 
TableType.OFFLINE)).thenReturn(mockInstances);
+
+    // Mock disk usage with high utilization
+    Map<String, DiskUsageInfo> diskUsageInfoMap = new HashMap<>();
+    DiskUsageInfo diskUsageInfo1 = new DiskUsageInfo("server1", 
DISK_UTILIZATION_PATH, 1000L, 900L,
+        System.currentTimeMillis()); // Above threshold (90%)
+    diskUsageInfoMap.put("server1", diskUsageInfo1);
+
+    DiskUsageInfo diskUsageInfo2 = new DiskUsageInfo("server2", 
DISK_UTILIZATION_PATH, 2000L, 1900L,
+        System.currentTimeMillis()); // Below threshold (50%)
+    diskUsageInfoMap.put("server2", diskUsageInfo2);
+    ResourceUtilizationInfo.setDiskUsageInfo(diskUsageInfoMap);
+
+    boolean result = 
_diskUtilizationChecker.isDiskUtilizationWithinLimits(tableName);
+    Assert.assertFalse(result);
+  }
+
+  @Test
+  public void testComputeDiskUtilizationValidInstances()
+      throws InvalidConfigException {
+    Set<String> instances = new HashSet<>(Arrays.asList("server1", "server2"));
+
+    // Mock admin endpoints
+    BiMap<String, String> instanceAdminEndpoints = HashBiMap.create();
+    instanceAdminEndpoints.put("server1", "http://server1";);
+    instanceAdminEndpoints.put("server2", "http://server2";);
+    
when(_helixResourceManager.getDataInstanceAdminEndpoints(instances)).thenReturn(instanceAdminEndpoints);
+
+    // Mock responses
+    Map<String, String> responseMap = new HashMap<>();
+    responseMap.put("http://server1"; + 
DiskUtilizationChecker.DISK_UTILIZATION_API_PATH,
+        "{ \"instanceId\": \"server1\", \"totalSpaceBytes\": 1000, 
\"usedSpaceBytes\": 500 }");
+    responseMap.put("http://server2"; + 
DiskUtilizationChecker.DISK_UTILIZATION_API_PATH,
+        "{ \"instanceId\": \"server2\", \"totalSpaceBytes\": 2000, 
\"usedSpaceBytes\": 1500 }");
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        new CompletionServiceHelper.CompletionServiceResponse();
+    serviceResponse._httpResponses = responseMap;
+
+    CompletionServiceHelper completionServiceHelper = 
mock(CompletionServiceHelper.class);
+    when(completionServiceHelper.doMultiGetRequest(anyList(), anyString(), 
anyBoolean(), anyMap(), anyInt(),
+        anyString())).thenReturn(serviceResponse);
+
+    
_diskUtilizationChecker.computeDiskUtilization(instanceAdminEndpoints.inverse(),
 completionServiceHelper);
+
+    DiskUsageInfo diskUsageInfo1 = 
ResourceUtilizationInfo.getDiskUsageInfo("server1");
+    DiskUsageInfo diskUsageInfo2 = 
ResourceUtilizationInfo.getDiskUsageInfo("server2");
+
+    Assert.assertNotNull(diskUsageInfo1);
+    Assert.assertEquals(diskUsageInfo1.getTotalSpaceBytes(), 1000L);
+    Assert.assertEquals(diskUsageInfo1.getUsedSpaceBytes(), 500L);
+
+    Assert.assertNotNull(diskUsageInfo2);
+    Assert.assertEquals(diskUsageInfo2.getTotalSpaceBytes(), 2000L);
+    Assert.assertEquals(diskUsageInfo2.getUsedSpaceBytes(), 1500L);
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
new file mode 100644
index 0000000000..7ca553f2b3
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.api.resources.PauseStatusDetails;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.spi.config.table.PauseState;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class RealtimeSegmentValidationManagerTest {
+  @Mock
+  private PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+
+  @Mock
+  private ResourceUtilizationManager _resourceUtilizationManager;
+
+  @Mock
+  private PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @Mock
+  private StorageQuotaChecker _storageQuotaChecker;
+
+  @Mock
+  private ControllerMetrics _controllerMetrics;
+
+  private AutoCloseable _mocks;
+  private RealtimeSegmentValidationManager _realtimeSegmentValidationManager;
+
+  @BeforeMethod
+  public void setup() {
+    ControllerConf controllerConf = new ControllerConf();
+    _mocks = MockitoAnnotations.openMocks(this);
+    _realtimeSegmentValidationManager =
+        new RealtimeSegmentValidationManager(controllerConf, 
_pinotHelixResourceManager, null,
+            _llcRealtimeSegmentManager, null, _controllerMetrics, 
_storageQuotaChecker, _resourceUtilizationManager);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @DataProvider(name = "testCases")
+  public Object[][] testCases() {
+    return new Object[][]{
+        // Table is paused due to admin intervention, should return false
+        {true, PauseState.ReasonCode.ADMINISTRATIVE, false, false, false},
+
+        // Resource utilization exceeded and pause state is updated, should 
return false
+        {false, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 
true, false, false},
+
+        // Resource utilization is within limits but was previously paused due 
to resource utilization,
+        // should return true
+        {true, PauseState.ReasonCode.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, 
false, false, true},
+
+        // Resource utilization is within limits but was previously paused due 
to storage quota exceeded,
+        // should return false
+        {true, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, false, true, 
false},
+
+        // Storage quota exceeded, should return false
+        {false, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, false, true, 
false},
+
+        // Storage quota within limits but was previously paused due to 
storage quota exceeded, should return true
+        {true, PauseState.ReasonCode.STORAGE_QUOTA_EXCEEDED, false, false, 
true}};
+  }
+
+  @Test(dataProvider = "testCases")
+  public void testShouldEnsureConsuming(boolean isTablePaused, 
PauseState.ReasonCode reasonCode,
+      boolean isResourceUtilizationExceeded, boolean isQuotaExceeded, boolean 
expectedResult) {
+    String tableName = "testTable_REALTIME";
+    PauseStatusDetails pauseStatus = mock(PauseStatusDetails.class);
+    TableConfig tableConfig = mock(TableConfig.class);
+
+    when(pauseStatus.getPauseFlag()).thenReturn(isTablePaused);
+    when(pauseStatus.getReasonCode()).thenReturn(reasonCode);
+    
when(_llcRealtimeSegmentManager.getPauseStatusDetails(tableName)).thenReturn(pauseStatus);
+    
when(_resourceUtilizationManager.isResourceUtilizationWithinLimits(tableName)).thenReturn(
+        !isResourceUtilizationExceeded);
+    
when(_pinotHelixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig);
+    
when(_storageQuotaChecker.isTableStorageQuotaExceeded(tableConfig)).thenReturn(isQuotaExceeded);
+
+    boolean result = 
_realtimeSegmentValidationManager.shouldEnsureConsuming(tableName);
+
+    Assert.assertEquals(result, expectedResult);
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ResourceUtilizationInfoTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ResourceUtilizationInfoTest.java
new file mode 100644
index 0000000000..11f3d77b1f
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ResourceUtilizationInfoTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.restlet.resources.DiskUsageInfo;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class ResourceUtilizationInfoTest {
+
+  private static final String INSTANCE_ID_1 = "server-1";
+  private static final String INSTANCE_ID_2 = "server-2";
+
+  private DiskUsageInfo _diskUsageInfo1;
+  private DiskUsageInfo _diskUsageInfo2;
+
+  @BeforeMethod
+  public void setUp() {
+    // Create sample DiskUsageInfo objects
+    _diskUsageInfo1 = new DiskUsageInfo(INSTANCE_ID_1, "/path/to/disk1", 
1000L, 500L, System.currentTimeMillis());
+    _diskUsageInfo2 = new DiskUsageInfo(INSTANCE_ID_2, "/path/to/disk2", 
2000L, 1500L, System.currentTimeMillis());
+  }
+
+  @Test
+  public void testDiskUsageInfo() {
+    // Set disk usage info for multiple instances
+    Map<String, DiskUsageInfo> diskUsageInfoMap = new HashMap<>();
+    diskUsageInfoMap.put(INSTANCE_ID_1, _diskUsageInfo1);
+    diskUsageInfoMap.put(INSTANCE_ID_2, _diskUsageInfo2);
+    ResourceUtilizationInfo.setDiskUsageInfo(diskUsageInfoMap);
+
+    // Validate instance 1
+    DiskUsageInfo diskUsageInfoInstance1 = 
ResourceUtilizationInfo.getDiskUsageInfo(INSTANCE_ID_1);
+    Assert.assertNotNull(diskUsageInfoInstance1);
+    Assert.assertEquals(diskUsageInfoInstance1.getTotalSpaceBytes(), 1000L);
+    Assert.assertEquals(diskUsageInfoInstance1.getUsedSpaceBytes(), 500L);
+
+    // Validate instance 2
+    DiskUsageInfo diskUsageInfoInstance2 = 
ResourceUtilizationInfo.getDiskUsageInfo(INSTANCE_ID_2);
+    Assert.assertNotNull(diskUsageInfoInstance2);
+    Assert.assertEquals(diskUsageInfoInstance2.getTotalSpaceBytes(), 2000L);
+    Assert.assertEquals(diskUsageInfoInstance2.getUsedSpaceBytes(), 1500L);
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ResourceUtilizationManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ResourceUtilizationManagerTest.java
new file mode 100644
index 0000000000..bb61d6e083
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ResourceUtilizationManagerTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.validation;
+
+import org.apache.pinot.controller.ControllerConf;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class ResourceUtilizationManagerTest {
+
+  private DiskUtilizationChecker _diskUtilizationChecker;
+  private ControllerConf _controllerConf;
+  private ResourceUtilizationManager _resourceUtilizationManager;
+  private final String _testTable = "myTable_OFFLINE";
+
+  @BeforeMethod
+  public void setUp() {
+    _diskUtilizationChecker = Mockito.mock(DiskUtilizationChecker.class);
+    _controllerConf = Mockito.mock(ControllerConf.class);
+  }
+
+  @Test
+  public void testIsResourceUtilizationWithinLimitsWhenCheckIsDisabled() {
+    
Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(false);
+    _resourceUtilizationManager = new 
ResourceUtilizationManager(_controllerConf, _diskUtilizationChecker);
+
+    boolean result = 
_resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable);
+    Assert.assertTrue(result, "Resource utilization should be within limits 
when the check is disabled");
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testIsResourceUtilizationWithinLimitsWithNullTableName() {
+    
Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(true);
+    _resourceUtilizationManager = new 
ResourceUtilizationManager(_controllerConf, _diskUtilizationChecker);
+
+    _resourceUtilizationManager.isResourceUtilizationWithinLimits(null);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testIsResourceUtilizationWithinLimitsWithEmptyTableName() {
+    
Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(true);
+    _resourceUtilizationManager = new 
ResourceUtilizationManager(_controllerConf, _diskUtilizationChecker);
+
+    _resourceUtilizationManager.isResourceUtilizationWithinLimits("");
+  }
+
+  @Test
+  public void testIsResourceUtilizationWithinLimitsWhenCheckIsEnabled() {
+    
Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(true);
+    
Mockito.when(_diskUtilizationChecker.isDiskUtilizationWithinLimits(_testTable)).thenReturn(true);
+    _resourceUtilizationManager = new 
ResourceUtilizationManager(_controllerConf, _diskUtilizationChecker);
+
+    boolean result = 
_resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable);
+    Assert.assertTrue(result, "Resource utilization should be within limits 
when disk check returns true");
+  }
+
+  @Test
+  public void testIsResourceUtilizationWithinLimitsWhenCheckFails() {
+    
Mockito.when(_controllerConf.isResourceUtilizationCheckEnabled()).thenReturn(true);
+    
Mockito.when(_diskUtilizationChecker.isDiskUtilizationWithinLimits(_testTable)).thenReturn(false);
+    _resourceUtilizationManager = new 
ResourceUtilizationManager(_controllerConf, _diskUtilizationChecker);
+
+    boolean result = 
_resourceUtilizationManager.isResourceUtilizationWithinLimits(_testTable);
+    Assert.assertFalse(result, "Resource utilization should not be within 
limits when disk check returns false");
+  }
+}
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DiskUtilization.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DiskUtilization.java
new file mode 100644
index 0000000000..b2822e0e20
--- /dev/null
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DiskUtilization.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import java.io.IOException;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.restlet.resources.DiskUsageInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DiskUtilization {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DiskUtilization.class);
+
+  private DiskUtilization() {
+  }
+  /**
+   * Computes the disk usage for the file store containing the given path 
using NIO.
+   */
+  public static DiskUsageInfo computeDiskUsage(String instanceId, String 
pathStr) throws IOException {
+    if (StringUtils.isEmpty(instanceId)) {
+      throw new IllegalArgumentException("InstanceID cannot be null or empty 
while computing disk utilization.");
+    }
+
+    if (StringUtils.isEmpty(pathStr)) {
+      throw new IllegalArgumentException("Path cannot be null or empty while 
computing disk utilization.");
+    }
+
+    Path path = Paths.get(pathStr);
+    if (!Files.exists(path)) {
+      throw new IllegalArgumentException("The path " + pathStr + " does not 
exist.");
+    }
+
+    // Obtain the file store corresponding to the path.
+    FileStore store = Files.getFileStore(path);
+
+    long totalSpace = store.getTotalSpace();
+    // getUnallocatedSpace() returns the number of bytes that are unallocated 
in the file store,
+    // which corresponds to the "free space" on disk.
+    long freeSpace = store.getUnallocatedSpace();
+    long usedSpace = totalSpace - freeSpace;
+    LOGGER.info("Disk usage for instance: {} at path: {} totalSpace: {}, 
usedSpace: {}", instanceId, pathStr,
+        totalSpace, usedSpace);
+    return new DiskUsageInfo(instanceId, path.toString(), totalSpace, 
usedSpace, System.currentTimeMillis());
+  }
+}
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/InstanceResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/InstanceResource.java
index 8c0618f64d..d2a7a0c4f3 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/InstanceResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/InstanceResource.java
@@ -27,6 +27,7 @@ import io.swagger.annotations.ApiResponses;
 import io.swagger.annotations.Authorization;
 import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -35,10 +36,15 @@ import javax.inject.Named;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.restlet.resources.DiskUsageInfo;
+import org.apache.pinot.common.restlet.resources.ResourceUtils;
 import org.apache.pinot.common.utils.config.InstanceUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.server.api.AdminApiApplication;
@@ -97,4 +103,22 @@ public class InstanceResource {
     Map<String, String> pools = 
instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY);
     return pools == null ? Collections.emptyMap() : pools;
   }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/diskUtilization")
+  @ApiOperation(value = "Show disk utilization", notes = "Disk capacity and 
usage shown in bytes")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 400, message = "Bad Request – Invalid disk 
utilization path in header")
+  })
+  public String getDiskUsageInfo(@Context HttpHeaders headers)
+      throws WebApplicationException, IOException {
+    String pathStr = headers.getHeaderString("diskUtilizationPath");
+    if (StringUtils.isEmpty(pathStr)) {
+      throw new WebApplicationException("Invalid disk utilization path in 
header", 400);
+    }
+    DiskUsageInfo diskUsageInfo = 
DiskUtilization.computeDiskUsage(_instanceId, pathStr);
+    return ResourceUtils.convertToJsonString(diskUsageInfo);
+  }
 }
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/api/DiskUtilizationTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/api/DiskUtilizationTest.java
new file mode 100644
index 0000000000..f3648b48dc
--- /dev/null
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/api/DiskUtilizationTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api;
+
+import java.io.IOException;
+import org.apache.pinot.common.restlet.resources.DiskUsageInfo;
+import org.apache.pinot.server.api.resources.DiskUtilization;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class DiskUtilizationTest {
+
+  private static final String TEST_PATH = "/tmp";
+
+  @Test
+  public void testComputeDiskUsageForExistingPath()
+      throws IOException {
+    DiskUsageInfo usage = DiskUtilization.computeDiskUsage("myInstanceId", 
TEST_PATH);
+
+    // Assert that total space is positive.
+    Assert.assertTrue(usage.getTotalSpaceBytes() > 0, "Total space should be 
positive.");
+
+    // Assert that used space is non-negative and not more than the total 
space.
+    Assert.assertTrue(usage.getUsedSpaceBytes() >= 0, "Used space should be 
non-negative.");
+    Assert.assertTrue(usage.getUsedSpaceBytes() <= usage.getTotalSpaceBytes(),
+        "Used space should be less than or equal to total space.");
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testComputeDiskUsageForNonExistingPath()
+      throws IOException {
+    String nonExistentPath = "non/existent/path/for/testing";
+    DiskUtilization.computeDiskUsage("myInstanceId", nonExistentPath);
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java
index 6a0f8c63e9..4781bfbdac 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/PauseState.java
@@ -70,6 +70,6 @@ public class PauseState extends BaseJsonConfig {
   }
 
   public enum ReasonCode {
-    ADMINISTRATIVE, STORAGE_QUOTA_EXCEEDED
+    ADMINISTRATIVE, STORAGE_QUOTA_EXCEEDED, RESOURCE_UTILIZATION_LIMIT_EXCEEDED
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to