This is an automated email from the ASF dual-hosted git repository.

arafat2198 pushed a commit to branch HDDS-13177
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-13177 by this push:
     new e91543b7aae HDDS-13465. Adding endpoint at Recon to get consolidated 
storage distribution report (#8995)
e91543b7aae is described below

commit e91543b7aaec0c2568c66a052cb5f79458d84386
Author: Priyesh Karatha <[email protected]>
AuthorDate: Tue Sep 30 14:04:48 2025 +0530

    HDDS-13465. Adding endpoint at Recon to get consolidated storage 
distribution report (#8995)
---
 hadoop-ozone/recon/pom.xml                         |  17 +
 .../org/apache/hadoop/ozone/recon/ReconUtils.java  | 101 +++++
 .../ozone/recon/api/ClusterStateEndpoint.java      |  12 +-
 .../hadoop/ozone/recon/api/NodeEndpoint.java       |  13 +-
 .../ozone/recon/api/OMDBInsightEndpoint.java       |   8 +-
 .../recon/api/StorageDistributionEndpoint.java     | 341 ++++++++++++++++
 .../recon/api/types/DatanodeStorageReport.java     | 134 +++++-
 ...eport.java => DeletionPendingBytesByStage.java} |  43 +-
 ...orageReport.java => GlobalNamespaceReport.java} |  46 +--
 .../ozone/recon/api/types/GlobalStorageReport.java |  58 +++
 .../types/StorageCapacityDistributionResponse.java | 130 ++++++
 .../ozone/recon/api/types/UsedSpaceBreakDown.java  |  82 ++++
 .../recon/api/TestStorageDistributionEndpoint.java | 452 +++++++++++++++++++++
 13 files changed, 1371 insertions(+), 66 deletions(-)

diff --git a/hadoop-ozone/recon/pom.xml b/hadoop-ozone/recon/pom.xml
index 079022c2f28..b24b251614c 100644
--- a/hadoop-ozone/recon/pom.xml
+++ b/hadoop-ozone/recon/pom.xml
@@ -126,6 +126,23 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdfs-client</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>${httpclient.version}</version>
+      <exclusions>
+        <exclusion>
+          <!-- depend on jcl-over-slf4j instead -->
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpcore</artifactId>
+      <version>${httpcore.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.ozone</groupId>
       <artifactId>hdds-common</artifactId>
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
index 2758dfb34cc..4898c2a7e94 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
@@ -27,6 +27,8 @@
 import static org.jooq.impl.DSL.select;
 import static org.jooq.impl.DSL.using;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.inject.Singleton;
@@ -49,6 +51,7 @@
 import java.util.Set;
 import java.util.TimeZone;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.ws.rs.core.Response;
 import org.apache.commons.io.FileUtils;
@@ -56,6 +59,7 @@
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.ScmUtils;
 import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
 import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
@@ -80,6 +84,13 @@
 import org.apache.hadoop.ozone.recon.scm.ReconContainerReportQueue;
 import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
 import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.util.EntityUtils;
 import org.apache.ozone.recon.schema.generated.tables.daos.GlobalStatsDao;
 import org.apache.ozone.recon.schema.generated.tables.pojos.GlobalStats;
 import org.jooq.Configuration;
@@ -94,6 +105,13 @@ public class ReconUtils {
 
   private static Logger log = LoggerFactory.getLogger(
       ReconUtils.class);
+  
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static CloseableHttpClient httpClient;
+  private static final int MAX_CONNECTIONS_PER_ROUTE = 20;
+  private static final int MAX_TOTAL_CONNECTIONS = 100;
+  private static final int CONNECTION_TIMEOUT_MS = 5000;
+  private static final int SOCKET_TIMEOUT_MS = 10000;
 
   public ReconUtils() {
   }
@@ -108,6 +126,40 @@ public static 
org.apache.hadoop.ozone.recon.tasks.NSSummaryTask.RebuildState get
     return org.apache.hadoop.ozone.recon.tasks.NSSummaryTask.getRebuildState();
   }
 
+  static {
+    initializeHttpClient();
+  }
+
+  private static void initializeHttpClient() {
+    PoolingHttpClientConnectionManager connectionManager = new 
PoolingHttpClientConnectionManager();
+    connectionManager.setMaxTotal(MAX_TOTAL_CONNECTIONS);
+    connectionManager.setDefaultMaxPerRoute(MAX_CONNECTIONS_PER_ROUTE);
+    connectionManager.setValidateAfterInactivity(30000); // 30 seconds
+
+    RequestConfig requestConfig = RequestConfig.custom()
+        .setConnectionRequestTimeout(CONNECTION_TIMEOUT_MS)
+        .setConnectTimeout(CONNECTION_TIMEOUT_MS)
+        .setSocketTimeout(SOCKET_TIMEOUT_MS)
+        .build();
+
+    httpClient = HttpClientBuilder.create()
+        .setConnectionManager(connectionManager)
+        .setDefaultRequestConfig(requestConfig)
+        .setConnectionTimeToLive(60, TimeUnit.SECONDS)
+        .evictIdleConnections(30, TimeUnit.SECONDS)
+        .build();
+
+    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+      try {
+        if (httpClient != null) {
+          httpClient.close();
+        }
+      } catch (IOException e) {
+        log.warn("Error closing HTTP client", e);
+      }
+    }));
+  }
+
   public static File getReconScmDbDir(ConfigurationSource conf) {
     return new ReconUtils().getReconDbDir(conf, OZONE_RECON_SCM_DB_DIR);
   }
@@ -850,4 +902,53 @@ public static String constructObjectPathWithPrefix(long... 
ids) {
     }
     return pathBuilder.toString();
   }
+
+  public static long getMetricsFromDatanode(DatanodeDetails datanode, String 
service, String name, String keyName)
+      throws IOException {
+    // Construct metrics URL for DataNode JMX endpoint
+    String metricsUrl = 
String.format("http://%s:%d/jmx?qry=Hadoop:service=%s,name=%s";,
+        datanode.getIpAddress(),
+        datanode.getPort(DatanodeDetails.Port.Name.HTTP).getValue(),
+        service,
+        name);
+    HttpGet request = new HttpGet(metricsUrl);
+    try (CloseableHttpResponse response = httpClient.execute(request)) {
+      if (response.getStatusLine().getStatusCode() != 200) {
+        throw new IOException("HTTP request failed with status: " +
+            response.getStatusLine().getStatusCode());
+      }
+
+      String jsonResponse = EntityUtils.toString(response.getEntity());
+      return parseMetrics(jsonResponse, name, keyName);
+    } catch (IOException e) {
+      log.error("Error getting metrics from datanode: {}", 
datanode.getIpAddress(), e);
+    }
+    return 0;
+  }
+
+  private static long parseMetrics(String jsonResponse, String serviceName, 
String keyName) {
+    if (jsonResponse == null || jsonResponse.isEmpty()) {
+      return -1L;
+    }
+    try {
+      JsonNode root = OBJECT_MAPPER.readTree(jsonResponse);
+      JsonNode beans = root.get("beans");
+      if (beans != null && beans.isArray()) {
+        for (JsonNode bean : beans) {
+          String name = bean.path("name").asText("");
+          if (name.contains(serviceName)) {
+            return extractMetrics(bean, keyName);
+          }
+        }
+      }
+    } catch (Exception e) {
+      log.warn("Failed to parse block deletion metrics JSON: {}", 
e.toString());
+    }
+    return 0L;
+  }
+
+  /** Extract block deletion metrics from JMX bean node. */
+  private static long extractMetrics(JsonNode beanNode, String keyName) {
+    return beanNode.path(keyName).asLong(0L);
+  }
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
index 8be9ef47469..056ac9e21a0 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
@@ -119,10 +119,14 @@ public Response getClusterState() {
             nodeManager.getNodeCount(NodeStatus.inServiceHealthyReadOnly());
 
     SCMNodeStat stats = nodeManager.getStats();
-    DatanodeStorageReport storageReport =
-        new DatanodeStorageReport(stats.getCapacity().get(),
-            stats.getScmUsed().get(), stats.getRemaining().get(),
-            stats.getCommitted().get());
+
+    DatanodeStorageReport storageReport = DatanodeStorageReport.newBuilder()
+        .setCapacity(stats.getCapacity().get())
+        .setCommitted(stats.getCommitted().get())
+        .setUsed(stats.getScmUsed().get())
+        .setMinimumFreeSpace(stats.getFreeSpaceToSpare().get())
+        .setRemaining(stats.getRemaining().get())
+        .build();
 
     ClusterStateResponse.Builder builder = ClusterStateResponse.newBuilder();
     GlobalStats volumeRecord = globalStatsDao.findById(
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/NodeEndpoint.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/NodeEndpoint.java
index cb7693054a3..58e72c1a915 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/NodeEndpoint.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/NodeEndpoint.java
@@ -173,11 +173,14 @@ public Response getDatanodes() {
   private DatanodeStorageReport getStorageReport(DatanodeDetails datanode) {
     SCMNodeStat nodeStat =
         nodeManager.getNodeStat(datanode).get();
-    long capacity = nodeStat.getCapacity().get();
-    long used = nodeStat.getScmUsed().get();
-    long remaining = nodeStat.getRemaining().get();
-    long committed = nodeStat.getCommitted().get();
-    return new DatanodeStorageReport(capacity, used, remaining, committed);
+    DatanodeStorageReport storageReport = DatanodeStorageReport.newBuilder()
+        .setCapacity(nodeStat.getCapacity().get())
+        .setUsed(nodeStat.getScmUsed().get())
+        .setRemaining(nodeStat.getRemaining().get())
+        .setCommitted(nodeStat.getCommitted().get())
+        .setMinimumFreeSpace(nodeStat.getFreeSpaceToSpare().get())
+        .build();
+    return storageReport;
   }
 
   /**
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
index 2dcfc83d1ae..a94dc72bc73 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
@@ -340,7 +340,7 @@ public Response getOpenKeySummary() {
    *
    * @param keysSummary A map to store the keys summary information.
    */
-  private void createKeysSummaryForOpenKey(
+  public void createKeysSummaryForOpenKey(
       Map<String, Long> keysSummary) {
     Long replicatedSizeOpenKey = getValueFromId(globalStatsDao.findById(
         OmTableInsightTask.getReplicatedSizeKeyFromTable(OPEN_KEY_TABLE)));
@@ -402,7 +402,7 @@ public Response getOpenMPUKeySummary() {
    *
    * @param keysSummary A map to store the keys summary information.
    */
-  private void createKeysSummaryForOpenMPUKey(Map<String, Long> keysSummary) {
+  public void createKeysSummaryForOpenMPUKey(Map<String, Long> keysSummary) {
     Long replicatedSizeOpenMPUKey = getValueFromId(globalStatsDao.findById(
         
OmTableInsightTask.getReplicatedSizeKeyFromTable(MULTIPART_INFO_TABLE)));
     Long unreplicatedSizeOpenMPUKey = getValueFromId(globalStatsDao.findById(
@@ -589,7 +589,7 @@ private boolean getPendingForDeletionKeyInfo(
    *
    * @param keysSummary A map to store the keys summary information.
    */
-  private void createKeysSummaryForDeletedKey(Map<String, Long> keysSummary) {
+  public void createKeysSummaryForDeletedKey(Map<String, Long> keysSummary) {
     // Fetch the necessary metrics for deleted keys
     Long replicatedSizeDeleted = getValueFromId(globalStatsDao.findById(
         OmTableInsightTask.getReplicatedSizeKeyFromTable(DELETED_TABLE)));
@@ -674,7 +674,7 @@ private void getPendingForDeletionDirInfo(
     }
   }
 
-  private void calculateTotalPendingDeletedDirSizes(Map<String, Long> 
dirSummary) {
+  public void calculateTotalPendingDeletedDirSizes(Map<String, Long> 
dirSummary) {
     long totalDataSize = 0L;
     long totalReplicatedDataSize = 0L;
 
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/StorageDistributionEndpoint.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/StorageDistributionEndpoint.java
new file mode 100644
index 00000000000..5365884a0d8
--- /dev/null
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/StorageDistributionEndpoint.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api;
+
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FILE_TABLE;
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Response;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
+import org.apache.hadoop.ozone.recon.ReconUtils;
+import org.apache.hadoop.ozone.recon.api.types.DUResponse;
+import org.apache.hadoop.ozone.recon.api.types.DatanodeStorageReport;
+import org.apache.hadoop.ozone.recon.api.types.DeletionPendingBytesByStage;
+import org.apache.hadoop.ozone.recon.api.types.GlobalNamespaceReport;
+import org.apache.hadoop.ozone.recon.api.types.GlobalStorageReport;
+import 
org.apache.hadoop.ozone.recon.api.types.StorageCapacityDistributionResponse;
+import org.apache.hadoop.ozone.recon.api.types.UsedSpaceBreakDown;
+import org.apache.hadoop.ozone.recon.scm.ReconNodeManager;
+import org.apache.hadoop.ozone.recon.tasks.OmTableInsightTask;
+import org.apache.ozone.recon.schema.generated.tables.daos.GlobalStatsDao;
+import org.apache.ozone.recon.schema.generated.tables.pojos.GlobalStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This endpoint handles requests related to storage distribution across
+ * different datanodes in a Recon instance. It provides detailed reports
+ * on storage capacity, utilization, and associated metrics.
+ * <p>
+ * The data is aggregated from multiple sources, including node manager
+ * statistics, and is used to construct responses with information
+ * about global storage and namespace usage, storage usage breakdown,
+ * and deletion operations in progress.
+ * <p>
+ * An instance of {@link ReconNodeManager} is used to fetch detailed
+ * node-specific statistics required for generating the report.
+ */
+@Path("/storageDistribution")
+@Produces("application/json")
+@AdminOnly
+public class StorageDistributionEndpoint {
+  private final ReconNodeManager nodeManager;
+  private final OMDBInsightEndpoint omdbInsightEndpoint;
+  private final NSSummaryEndpoint nsSummaryEndpoint;
+  private final StorageContainerLocationProtocol scmClient;
+  private static Logger log = 
LoggerFactory.getLogger(StorageDistributionEndpoint.class);
+  private Map<DatanodeDetails, Long> blockDeletionMetricsMap = new 
ConcurrentHashMap<>();
+  private GlobalStatsDao globalStatsDao;
+
+  @Inject
+  public StorageDistributionEndpoint(OzoneStorageContainerManager reconSCM,
+                                     OMDBInsightEndpoint omDbInsightEndpoint,
+                                     NSSummaryEndpoint nsSummaryEndpoint,
+                                     GlobalStatsDao globalStatsDao,
+                                     StorageContainerLocationProtocol 
scmClient) {
+    this.nodeManager = (ReconNodeManager) reconSCM.getScmNodeManager();
+    this.omdbInsightEndpoint = omDbInsightEndpoint;
+    this.nsSummaryEndpoint = nsSummaryEndpoint;
+    this.scmClient = scmClient;
+    this.globalStatsDao = globalStatsDao;
+  }
+
+  @GET
+  public Response getStorageDistribution() {
+    try {
+      initializeBlockDeletionMetricsMap();
+      List<DatanodeStorageReport> nodeStorageReports = 
collectDatanodeReports();
+      GlobalStorageReport globalStorageReport = calculateGlobalStorageReport();
+
+      Map<String, Long> namespaceMetrics = new HashMap<>();
+      try {
+        namespaceMetrics = calculateNamespaceMetrics();
+      } catch (Exception e) {
+        log.error("Error calculating namespace metrics", e);
+        // Initialize with default values
+        namespaceMetrics.put("totalUsedNamespace", 0L);
+        namespaceMetrics.put("totalOpenKeySize", 0L);
+        namespaceMetrics.put("totalCommittedSize", 0L);
+        namespaceMetrics.put("pendingDirectorySize", 0L);
+        namespaceMetrics.put("pendingKeySize", 0L);
+        namespaceMetrics.put("totalKeys", 0L);
+      }
+
+      StorageCapacityDistributionResponse response = 
buildStorageDistributionResponse(
+              nodeStorageReports, globalStorageReport, namespaceMetrics);
+      return Response.ok(response).build();
+    } catch (Exception e) {
+      log.error("Error getting storage distribution", e);
+      return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+              .entity("Error retrieving storage distribution: " + 
e.getMessage())
+              .build();
+    }
+  }
+
+  private GlobalStorageReport calculateGlobalStorageReport() {
+    try {
+      SCMNodeStat stats = nodeManager.getStats();
+      if (stats == null) {
+        log.warn("Node manager stats are null, returning default values");
+        return new GlobalStorageReport(0L, 0L, 0L);
+      }
+
+      long scmUsed = stats.getScmUsed() != null ? stats.getScmUsed().get() : 
0L;
+      long remaining = stats.getRemaining() != null ? 
stats.getRemaining().get() : 0L;
+      long capacity = stats.getCapacity() != null ? stats.getCapacity().get() 
: 0L;
+
+      return new GlobalStorageReport(scmUsed, remaining, capacity);
+    } catch (Exception e) {
+      log.error("Error calculating global storage report", e);
+      return new GlobalStorageReport(0L, 0L, 0L);
+    }
+  }
+
+  private Map<String, Long> calculateNamespaceMetrics() {
+    Map<String, Long> metrics = new HashMap<>();
+    Map<String, Long> totalPendingAtOmSide = calculatePendingSizes();
+    long totalOpenKeySize = calculateOpenKeySizes();
+    long totalCommittedSize = calculateCommittedSize();
+    long pendingDirectorySize = 
totalPendingAtOmSide.getOrDefault("pendingDirectorySize", 0L);
+    long pendingKeySize = totalPendingAtOmSide.getOrDefault("pendingKeySize", 
0L);
+    long totalUsedNamespace = pendingDirectorySize + pendingKeySize + 
totalOpenKeySize + totalCommittedSize;
+
+    long totalKeys = 0L;
+    // Keys from OBJECT_STORE buckets.
+    GlobalStats keyRecord = globalStatsDao.findById(
+            OmTableInsightTask.getTableCountKeyFromTable(KEY_TABLE));
+    // Keys from FILE_SYSTEM_OPTIMIZED buckets
+    GlobalStats fileRecord = globalStatsDao.findById(
+            OmTableInsightTask.getTableCountKeyFromTable(FILE_TABLE));
+    if (keyRecord != null) {
+      totalKeys += keyRecord.getValue();
+    }
+    if (fileRecord != null) {
+      totalKeys += fileRecord.getValue();
+    }
+
+    metrics.put("pendingDirectorySize", pendingDirectorySize);
+    metrics.put("pendingKeySize", pendingKeySize);
+    metrics.put("totalOpenKeySize", totalOpenKeySize);
+    metrics.put("totalCommittedSize", totalCommittedSize);
+    metrics.put("totalUsedNamespace", totalUsedNamespace);
+    metrics.put("totalKeys", totalKeys);
+    return metrics;
+  }
+
+  private StorageCapacityDistributionResponse buildStorageDistributionResponse(
+          List<DatanodeStorageReport> nodeStorageReports,
+          GlobalStorageReport storageMetrics,
+          Map<String, Long> namespaceMetrics) {
+    DeletedBlocksTransactionSummary scmSummary = null;
+    try {
+      scmSummary = scmClient.getDeletedBlockSummary();
+    } catch (IOException e) {
+      log.error("Failed to get deleted block summary from SCM", e);
+    }
+
+    long totalPendingAtDnSide = 0L;
+    try {
+      totalPendingAtDnSide = 
blockDeletionMetricsMap.values().stream().reduce(0L, Long::sum);
+    } catch (Exception e) {
+      log.error("Error calculating pending deletion metrics", e);
+    }
+
+    DeletionPendingBytesByStage deletionPendingBytesByStage =
+            createDeletionPendingBytesByStage(
+                    namespaceMetrics.getOrDefault("pendingDirectorySize", 0L),
+                    namespaceMetrics.getOrDefault("pendingKeySize", 0L),
+                    scmSummary != null ? 
scmSummary.getTotalBlockReplicatedSize() : 0L,
+                    totalPendingAtDnSide);
+
+    // Safely get values from namespaceMetrics with null checks
+    Long totalUsedNamespace = namespaceMetrics.get("totalUsedNamespace");
+    Long totalOpenKeySize = namespaceMetrics.get("totalOpenKeySize");
+    Long totalCommittedSize = namespaceMetrics.get("totalCommittedSize");
+    Long totalKeys = namespaceMetrics.get("totalKeys");
+    Long totalContainerPreAllocated = nodeStorageReports.stream()
+        .map(report -> report.getCommitted())
+        .reduce(0L, Long::sum);
+
+    return StorageCapacityDistributionResponse.newBuilder()
+            .setDataNodeUsage(nodeStorageReports)
+            .setGlobalStorage(storageMetrics)
+            .setGlobalNamespace(new GlobalNamespaceReport(
+                    totalUsedNamespace != null ? totalUsedNamespace : 0L,
+                    totalKeys != null ? totalKeys : 0L))
+            .setUsedSpaceBreakDown(new UsedSpaceBreakDown(
+                    totalOpenKeySize != null ? totalOpenKeySize : 0L,
+                    totalCommittedSize != null ? totalCommittedSize : 0L,
+                    totalContainerPreAllocated != null ? 
totalContainerPreAllocated : 0L,
+                    deletionPendingBytesByStage))
+            .build();
+  }
+
+  private List<DatanodeStorageReport> collectDatanodeReports() {
+    return nodeManager.getAllNodes().stream()
+        .map(this::getStorageReport)
+        .filter(report -> report != null) // Filter out null reports
+        .collect(Collectors.toList());
+  }
+
+  private Map<String, Long> calculatePendingSizes() {
+    Map<String, Long> result = new HashMap<>();
+    Map<String, Long> pendingDeletedDirSizes = new HashMap<>();
+    
omdbInsightEndpoint.calculateTotalPendingDeletedDirSizes(pendingDeletedDirSizes);
+    Map<String, Long> pendingKeySize = new HashMap<>();
+    omdbInsightEndpoint.createKeysSummaryForDeletedKey(pendingKeySize);
+    result.put("pendingDirectorySize", 
pendingDeletedDirSizes.getOrDefault("totalReplicatedDataSize", 0L));
+    result.put("pendingKeySize", 
pendingKeySize.getOrDefault("totalReplicatedDataSize", 0L));
+    return result;
+  }
+
+  private long calculateOpenKeySizes() {
+    Map<String, Long> openKeySummary = new HashMap<>();
+    omdbInsightEndpoint.createKeysSummaryForOpenKey(openKeySummary);
+    Map<String, Long> openKeyMPUSummary = new HashMap<>();
+    omdbInsightEndpoint.createKeysSummaryForOpenMPUKey(openKeyMPUSummary);
+    long openKeyDataSize = 
openKeySummary.getOrDefault("totalReplicatedDataSize", 0L);
+    long totalMPUKeySize = 
openKeyMPUSummary.getOrDefault("totalReplicatedDataSize", 0L);
+    return openKeyDataSize + totalMPUKeySize;
+  }
+
+  private long calculateCommittedSize() {
+    try {
+      Response rootResponse = nsSummaryEndpoint.getDiskUsage("/", false, true, 
false);
+      if (rootResponse.getStatus() != Response.Status.OK.getStatusCode()) {
+        log.warn("Failed to get disk usage, status: {}", 
rootResponse.getStatus());
+        return 0L;
+      }
+      DUResponse duRootRes = (DUResponse) rootResponse.getEntity();
+      return duRootRes != null ? duRootRes.getSizeWithReplica() : 0L;
+    } catch (IOException e) {
+      log.error("IOException while calculating committed size", e);
+      return 0L;
+    }
+  }
+
+  private DeletionPendingBytesByStage createDeletionPendingBytesByStage(long 
pendingDirectorySize,
+                                                                        long 
pendingKeySize,
+                                                                        long 
totalPendingAtScmSide,
+                                                                        long 
totalPendingAtDnSide) {
+    long totalPending = pendingDirectorySize + pendingKeySize + 
totalPendingAtScmSide + totalPendingAtDnSide;
+    Map<String, Map<String, Long>> stageItems = new HashMap<>();
+    Map<String, Long> omMap = new HashMap<>();
+    omMap.put("totalBytes", pendingDirectorySize + pendingKeySize);
+    omMap.put("pendingDirectoryBytes", pendingDirectorySize);
+    omMap.put("pendingKeyBytes", pendingKeySize);
+    Map<String, Long> scmMap = new HashMap<>();
+    scmMap.put("pendingBytes", totalPendingAtScmSide);
+    Map<String, Long> dnMap = new HashMap<>();
+    dnMap.put("pendingBytes", totalPendingAtDnSide);
+    stageItems.put("OM", omMap);
+    stageItems.put("SCM", scmMap);
+    stageItems.put("DN", dnMap);
+    return new DeletionPendingBytesByStage(totalPending, stageItems);
+  }
+
+  private void initializeBlockDeletionMetricsMap() {
+    nodeManager.getNodeStats().keySet().parallelStream().forEach(nodeId -> {
+      try {
+        long dnPending = ReconUtils.getMetricsFromDatanode(nodeId,
+                "HddsDatanode",
+                "BlockDeletingService",
+                "TotalPendingBlockBytes");
+        blockDeletionMetricsMap.put(nodeId, dnPending);
+      } catch (IOException e) {
+        blockDeletionMetricsMap.put(nodeId, -1L);
+      }
+    });
+  }
+
+  private DatanodeStorageReport getStorageReport(DatanodeDetails datanode) {
+    try {
+      SCMNodeMetric nodeMetric = nodeManager.getNodeStat(datanode);
+      if (nodeMetric == null) {
+        log.warn("Node statistics not available for datanode: {}", datanode);
+        return null; // Return null for unavailable nodes
+      }
+
+      SCMNodeStat nodeStat = nodeMetric.get();
+      if (nodeStat == null) {
+        log.warn("Node stat is null for datanode: {}", datanode);
+        return null; // Return null for unavailable stats
+      }
+
+      long capacity = nodeStat.getCapacity() != null ? 
nodeStat.getCapacity().get() : 0L;
+      long used = nodeStat.getScmUsed() != null ? nodeStat.getScmUsed().get() 
: 0L;
+      long remaining = nodeStat.getRemaining() != null ? 
nodeStat.getRemaining().get() : 0L;
+      long committed = nodeStat.getCommitted() != null ? 
nodeStat.getCommitted().get() : 0L;
+      long pendingDeletion = blockDeletionMetricsMap.getOrDefault(datanode, 
0L);
+      long minFreeSpace  = nodeStat.getFreeSpaceToSpare() != null ? 
nodeStat.getFreeSpaceToSpare().get() : 0L;
+      if (pendingDeletion < 0) {
+        log.warn("Block deletion metrics unavailable for datanode: {}", 
datanode);
+        pendingDeletion = 0L;
+      }
+      DatanodeStorageReport storageReport = DatanodeStorageReport.newBuilder()
+          .setCapacity(capacity)
+          .setUsed(used)
+          .setRemaining(remaining)
+          .setCommitted(committed)
+          .setPendingDeletion(pendingDeletion)
+          .setMinimumFreeSpace(minFreeSpace)
+          .setDatanodeUuid(datanode.getUuidString())
+          .setHostName(datanode.getHostName())
+          .build();
+      return storageReport;
+    } catch (Exception e) {
+      log.error("Error getting storage report for datanode: {}", datanode, e);
+      return null; // Return null on any error
+    }
+  }
+}
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodeStorageReport.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodeStorageReport.java
index d677554705f..87126c58051 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodeStorageReport.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodeStorageReport.java
@@ -17,21 +17,40 @@
 
 package org.apache.hadoop.ozone.recon.api.types;
 
+import java.util.Objects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Metadata object that contains storage report of a Datanode.
  */
-public class DatanodeStorageReport {
+public final class DatanodeStorageReport {
+  private String datanodeUuid;
+  private String hostName;
   private long capacity;
   private long used;
   private long remaining;
   private long committed;
+  private long pendingDeletion;
+  private long minimumFreeSpace;
+
+  private DatanodeStorageReport(Builder builder) {
+    this.datanodeUuid = builder.datanodeUuid;
+    this.hostName = builder.hostName;
+    this.capacity = builder.capacity;
+    this.used = builder.used;
+    this.remaining = builder.remaining;
+    this.committed = builder.committed;
+    this.pendingDeletion = builder.pendingDeletion;
+    this.minimumFreeSpace = builder.minimumFreeSpace;
+  }
 
-  public DatanodeStorageReport(long capacity, long used, long remaining,
-                               long committed) {
-    this.capacity = capacity;
-    this.used = used;
-    this.remaining = remaining;
-    this.committed = committed;
+  public String getDatanodeUuid() {
+    return datanodeUuid;
+  }
+
+  public String getHostName() {
+    return hostName;
   }
 
   public long getCapacity() {
@@ -49,4 +68,105 @@ public long getRemaining() {
   public long getCommitted() {
     return committed;
   }
+
+  public long getPendingDeletion() {
+    return pendingDeletion;
+  }
+
+  public long getMinimumFreeSpace() {
+    return minimumFreeSpace;
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder class for DataNodeStorage Report.
+   */
+  public static final class Builder {
+    private String datanodeUuid = "";
+    private String hostName = "";
+    private long capacity = 0;
+    private long used = 0;
+    private long remaining = 0;
+    private long committed = 0;
+    private long pendingDeletion = 0;
+    private long minimumFreeSpace = 0;
+
+    private static final Logger LOG =
+        LoggerFactory.getLogger(Builder.class);
+
+    private Builder() {
+    }
+
+    public Builder setDatanodeUuid(String datanodeUuid) {
+      this.datanodeUuid = datanodeUuid;
+      return this;
+    }
+
+    public Builder setHostName(String hostName) {
+      this.hostName = hostName;
+      return this;
+    }
+
+    public Builder setCapacity(long capacity) {
+      this.capacity = capacity;
+      return this;
+    }
+
+    public Builder setUsed(long used) {
+      this.used = used;
+      return this;
+    }
+
+    public Builder setRemaining(long remaining) {
+      this.remaining = remaining;
+      return this;
+    }
+
+    public Builder setCommitted(long committed) {
+      this.committed = committed;
+      return this;
+    }
+
+    public Builder setPendingDeletion(long pendingDeletion) {
+      this.pendingDeletion = pendingDeletion;
+      return this;
+    }
+
+    public Builder setMinimumFreeSpace(long minimumFreeSpace) {
+      this.minimumFreeSpace = minimumFreeSpace;
+      return this;
+    }
+
+    public void validate() {
+      Objects.requireNonNull(hostName, "hostName cannot be null");
+
+      if (capacity < 0) {
+        throw new IllegalArgumentException("capacity cannot be negative");
+      }
+      if (used < 0) {
+        throw new IllegalArgumentException("used cannot be negative");
+      }
+      if (remaining < 0) {
+        throw new IllegalArgumentException("remaining cannot be negative");
+      }
+      if (committed < 0) {
+        throw new IllegalArgumentException("committed cannot be negative");
+      }
+      if (pendingDeletion < 0) {
+        throw new IllegalArgumentException("pendingDeletion cannot be 
negative");
+      }
+      // Logical consistency checks
+      if (used + remaining > capacity) {
+        LOG.warn("Inconsistent storage report for {}: used({}) + remaining({}) 
> capacity({})",
+            hostName, used, remaining, capacity);
+      }
+    }
+
+    public DatanodeStorageReport build() {
+      return new DatanodeStorageReport(this);
+    }
+  }
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodeStorageReport.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DeletionPendingBytesByStage.java
similarity index 53%
copy from 
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodeStorageReport.java
copy to 
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DeletionPendingBytesByStage.java
index d677554705f..024464f5549 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodeStorageReport.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DeletionPendingBytesByStage.java
@@ -17,36 +17,33 @@
 
 package org.apache.hadoop.ozone.recon.api.types;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+
 /**
- * Metadata object that contains storage report of a Datanode.
+ * This class represents the metadata related to deletion stages and the
+ * corresponding bytes pending to be deleted. It encapsulates the total
+ * pending deletion byte count and provides structured data to represent
+ * pending bytes categorized by different stages.
  */
-public class DatanodeStorageReport {
-  private long capacity;
-  private long used;
-  private long remaining;
-  private long committed;
-
-  public DatanodeStorageReport(long capacity, long used, long remaining,
-                               long committed) {
-    this.capacity = capacity;
-    this.used = used;
-    this.remaining = remaining;
-    this.committed = committed;
-  }
+public class DeletionPendingBytesByStage {
 
-  public long getCapacity() {
-    return capacity;
-  }
+  @JsonProperty("total")
+  private long total;
+
+  @JsonProperty("byStage")
+  private Map<String, Map<String, Long>> byStage;
 
-  public long getUsed() {
-    return used;
+  public DeletionPendingBytesByStage(long total, Map<String, Map<String, 
Long>> byStage) {
+    this.total = total;
+    this.byStage = byStage;
   }
 
-  public long getRemaining() {
-    return remaining;
+  public long getTotal() {
+    return total;
   }
 
-  public long getCommitted() {
-    return committed;
+  public Map<String, Map<String, Long>> getByStage() {
+    return byStage;
   }
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodeStorageReport.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/GlobalNamespaceReport.java
similarity index 52%
copy from 
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodeStorageReport.java
copy to 
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/GlobalNamespaceReport.java
index d677554705f..7adeaaf7c0b 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodeStorageReport.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/GlobalNamespaceReport.java
@@ -17,36 +17,36 @@
 
 package org.apache.hadoop.ozone.recon.api.types;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+
 /**
- * Metadata object that contains storage report of a Datanode.
+ * The GlobalNamespaceReport class serves as a representation of
+ * the global namespace metadata summary for a storage system.
+ * This class encapsulates statistical information related
+ * to the state of the global namespace.
+ *
+ * The metadata includes:
+ * - Total space utilized.
+ * - Total number of keys present in the namespace.
  */
-public class DatanodeStorageReport {
-  private long capacity;
-  private long used;
-  private long remaining;
-  private long committed;
-
-  public DatanodeStorageReport(long capacity, long used, long remaining,
-                               long committed) {
-    this.capacity = capacity;
-    this.used = used;
-    this.remaining = remaining;
-    this.committed = committed;
-  }
+public class GlobalNamespaceReport {
 
-  public long getCapacity() {
-    return capacity;
-  }
+  @JsonProperty("totalUsedSpace")
+  private long totalUsedSpace;
+
+  @JsonProperty("totalKeys")
+  private long totalKeys;
 
-  public long getUsed() {
-    return used;
+  public GlobalNamespaceReport(long totalUsedSpace, long totalKeys) {
+    this.totalUsedSpace = totalUsedSpace;
+    this.totalKeys = totalKeys;
   }
 
-  public long getRemaining() {
-    return remaining;
+  public long getTotalUsedSpace() {
+    return totalUsedSpace;
   }
 
-  public long getCommitted() {
-    return committed;
+  public long getTotalKeys() {
+    return totalKeys;
   }
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/GlobalStorageReport.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/GlobalStorageReport.java
new file mode 100644
index 00000000000..650203187f3
--- /dev/null
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/GlobalStorageReport.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api.types;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Represents a report detailing global storage usage metrics.
+ *
+ * This class provides information about the total used space,
+ * total free space, and the total storage capacity. It is used
+ * to encapsulate and transport these metrics, which can assist
+ * in monitoring and analyzing storage allocation and usage.
+ */
+public class GlobalStorageReport {
+
+  @JsonProperty("totalUsedSpace")
+  private long totalUsedSpace;
+
+  @JsonProperty("totalFreeSpace")
+  private long totalFreeSpace;
+
+  @JsonProperty("totalCapacity")
+  private long totalCapacity;
+
+  public GlobalStorageReport(long totalUsedSpace, long totalFreeSpace, long 
totalCapacity) {
+    this.totalUsedSpace = totalUsedSpace;
+    this.totalFreeSpace = totalFreeSpace;
+    this.totalCapacity = totalCapacity;
+  }
+
+  public long getTotalUsedSpace() {
+    return totalUsedSpace;
+  }
+
+  public long getTotalFreeSpace() {
+    return totalFreeSpace;
+  }
+
+  public long getTotalCapacity() {
+    return totalCapacity;
+  }
+}
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/StorageCapacityDistributionResponse.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/StorageCapacityDistributionResponse.java
new file mode 100644
index 00000000000..65d141e24e4
--- /dev/null
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/StorageCapacityDistributionResponse.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api.types;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+/**
+ * Represents the response structure for storage capacity distribution in the 
system.
+ * Provides aggregated information about global storage, namespace, space 
usage breakdown,
+ * and individual data node storage reports.
+ *
+ * The response contains the following key components:
+ *
+ * - Global storage statistics for the entire cluster.
+ * - Namespace report providing high-level metadata of the namespace.
+ * - Detailed breakdown of used storage space by category.
+ * - A list of metadata reports pertaining to storage usage on individual data 
nodes.
+ */
+public class StorageCapacityDistributionResponse {
+
+  @JsonProperty("globalStorage")
+  private GlobalStorageReport globalStorage;
+
+  @JsonProperty("globalNamespace")
+  private GlobalNamespaceReport globalNamespace;
+
+  @JsonProperty("usedSpaceBreakdown")
+  private  UsedSpaceBreakDown usedSpaceBreakDown;
+
+  @JsonProperty("dataNodeUsage")
+  private List<DatanodeStorageReport> dataNodeUsage;
+
+  public StorageCapacityDistributionResponse(Builder builder) {
+    this.globalStorage = builder.globalStorage;
+    this.globalNamespace = builder.globalNamespace;
+    this.usedSpaceBreakDown = builder.usedSpaceBreakDown;
+    this.dataNodeUsage = builder.dataNodeUsage;
+  }
+
+  public GlobalStorageReport getGlobalStorage() {
+    return globalStorage;
+  }
+
+  public GlobalNamespaceReport getGlobalNamespace() {
+    return globalNamespace;
+  }
+
+  public UsedSpaceBreakDown getUsedSpaceBreakDown() {
+    return usedSpaceBreakDown;
+  }
+
+  public List<DatanodeStorageReport> getDataNodeUsage() {
+    return dataNodeUsage;
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder class for constructing instances of {@link 
StorageCapacityDistributionResponse}.
+   *
+   * This class follows the builder pattern, allowing for a flexible and 
readable
+   * construction of a StorageCapacityDistributionResponse object. It provides
+   * methods to set individual components of the response before building
+   * the final object.
+   *
+   * The following components can be set using this builder:
+   * - Global storage report, represented by {@link GlobalStorageReport}.
+   * - Global namespace report, represented by {@link GlobalNamespaceReport}.
+   * - Breakdown of used storage space by category, represented by {@link 
UsedSpaceBreakDown}.
+   * - A list of data node storage usage reports, represented by {@link 
DatanodeStorageReport}.
+   *
+   * The build method generates a StorageCapacityDistributionResponse instance 
using
+   * the values set in this builder. Unset values will remain null.
+   */
+  public static final class Builder {
+    private GlobalStorageReport globalStorage;
+    private GlobalNamespaceReport globalNamespace;
+    private UsedSpaceBreakDown usedSpaceBreakDown;
+    private List<DatanodeStorageReport> dataNodeUsage;
+
+    public Builder() {
+      this.globalStorage = null;
+      this.globalNamespace = null;
+      this.usedSpaceBreakDown = null;
+      this.dataNodeUsage = null;
+    }
+
+    public Builder setGlobalStorage(GlobalStorageReport globalStorage) {
+      this.globalStorage = globalStorage;
+      return this;
+    }
+
+    public Builder setGlobalNamespace(GlobalNamespaceReport globalNamespace) {
+      this.globalNamespace = globalNamespace;
+      return this;
+    }
+
+    public Builder setDataNodeUsage(List<DatanodeStorageReport> dataNodeUsage) 
{
+      this.dataNodeUsage = dataNodeUsage;
+      return this;
+    }
+
+    public Builder setUsedSpaceBreakDown(UsedSpaceBreakDown 
usedSpaceBreakDown) {
+      this.usedSpaceBreakDown = usedSpaceBreakDown;
+      return this;
+    }
+
+    public StorageCapacityDistributionResponse build() {
+      return new StorageCapacityDistributionResponse(this);
+    }
+  }
+}
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UsedSpaceBreakDown.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UsedSpaceBreakDown.java
new file mode 100644
index 00000000000..e1dfd318e1c
--- /dev/null
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UsedSpaceBreakDown.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api.types;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Represents a breakdown of storage space usage in a system by categorizing
+ * the used space into open keys, committed bytes, and deletion-pending bytes.
+ *
+ * This class serves as a container for information about the allocation of
+ * storage resources across different states of usage. It provides a detailed
+ * view of three primary categories of used space:
+ *
+ * 1. Open keys: The total space occupied by keys that are in an open,
+ *    uncommitted state.
+ *
+ * 2. Committed bytes: Information on space occupied by keys that have been
+ *    committed.
+ *
+ * 3. Deletion-pending bytes: Information on the space occupied by keys that
+ *    are marked for deletion but are still occupying storage. This is 
organized
+ *    by deletion stages and is encapsulated within the
+ *    {@link DeletionPendingBytesByStage} object.
+ *
+ * This class is typically used in scenarios where it is necessary to analyze
+ * or monitor the storage utilization in a system to gain insights or plan
+ * optimizations.
+ */
+public class UsedSpaceBreakDown {
+
+  @JsonProperty("openKeysBytes")
+  private long openKeysBytes;
+
+  @JsonProperty("committedBytes")
+  private long committedBytes;
+
+  @JsonProperty("containerPreAllocated")
+  private long containerPreAllocated;
+
+  @JsonProperty("deletionPendingBytes")
+  private DeletionPendingBytesByStage deletionPendingBytesByStage;
+
+  public UsedSpaceBreakDown(long openKeysBytes, long committedBytes, long 
containerPreAllocated,
+      DeletionPendingBytesByStage deletionPendingBytesByStage) {
+    this.openKeysBytes = openKeysBytes;
+    this.committedBytes = committedBytes;
+    this.containerPreAllocated = containerPreAllocated;
+    this.deletionPendingBytesByStage = deletionPendingBytesByStage;
+  }
+
+  public long getOpenKeysBytes() {
+    return openKeysBytes;
+  }
+
+  public long getCommittedBytes() {
+    return committedBytes;
+  }
+
+  public long getContainerPreAllocated() {
+    return containerPreAllocated;
+  }
+
+  public DeletionPendingBytesByStage getDeletionPendingBytesByStage() {
+    return deletionPendingBytesByStage;
+  }
+}
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java
new file mode 100644
index 00000000000..d46e13a1335
--- /dev/null
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.anyMap;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.ws.rs.core.Response;
+import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.LongMetric;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
+import org.apache.hadoop.ozone.recon.api.types.DUResponse;
+import org.apache.hadoop.ozone.recon.api.types.DeletionPendingBytesByStage;
+import org.apache.hadoop.ozone.recon.api.types.GlobalNamespaceReport;
+import org.apache.hadoop.ozone.recon.api.types.GlobalStorageReport;
+import 
org.apache.hadoop.ozone.recon.api.types.StorageCapacityDistributionResponse;
+import org.apache.hadoop.ozone.recon.api.types.UsedSpaceBreakDown;
+import org.apache.hadoop.ozone.recon.scm.ReconNodeManager;
+import org.apache.ozone.recon.schema.generated.tables.daos.GlobalStatsDao;
+import org.apache.ozone.recon.schema.generated.tables.pojos.GlobalStats;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test class for StorageDistributionEndpoint, responsible for testing
+ * the behavior and responses of the storage distribution endpoint in
+ * different scenarios, including successful responses and exception cases.
+ */
+class StorageDistributionEndpointTest {
+
+  private ReconNodeManager mockNodeManager;
+  private OMDBInsightEndpoint mockOmdbInsightEndpoint;
+  private NSSummaryEndpoint mockNsSummaryEndpoint;
+  private StorageContainerLocationProtocol mockScmClient;
+  private GlobalStatsDao globalStatsDao;
+  private OzoneStorageContainerManager mockReconScm;
+  private DatanodeInfo datanodeDetails;
+  private SCMNodeStat globalStats;
+
+  private static final long NODE_CAPACITY = 5000L;
+  private static final long NODE_USED = 2000L;
+  private static final long NODE_FREE = 3000L;
+  private static final long NAMESPACE_USED = 500L;
+  private static final long OPEN_KEYS_SIZE = 150L;
+  private static final long COMMITTED_SIZE = 300L;
+  private static final long OM_PENDING_TOTAL = 50L;
+  private static final long SCM_PENDING = 75L;
+
+  @Test
+  void testGetStorageDistributionSuccessfulResponse() throws IOException {
+    setupMockDependencies();
+    setupSuccessfulScenario();
+    StorageDistributionEndpoint endpoint = new 
StorageDistributionEndpoint(mockReconScm, mockOmdbInsightEndpoint,
+        mockNsSummaryEndpoint, globalStatsDao, mockScmClient);
+    Response response = endpoint.getStorageDistribution();
+
+    assertNotNull(response);
+    assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+    StorageCapacityDistributionResponse responsePayload = 
(StorageCapacityDistributionResponse) response.getEntity();
+    assertNotNull(responsePayload);
+
+    GlobalStorageReport globalStorage = responsePayload.getGlobalStorage();
+    assertEquals(NODE_USED, globalStorage.getTotalUsedSpace());
+    assertEquals(NODE_FREE, globalStorage.getTotalFreeSpace());
+    assertEquals(NODE_CAPACITY, globalStorage.getTotalCapacity());
+
+    GlobalNamespaceReport namespaceReport = 
responsePayload.getGlobalNamespace();
+    assertEquals(NAMESPACE_USED, namespaceReport.getTotalUsedSpace());
+
+    UsedSpaceBreakDown usedSpaceBreakDown = 
responsePayload.getUsedSpaceBreakDown();
+    assertEquals(OPEN_KEYS_SIZE, usedSpaceBreakDown.getOpenKeysBytes());
+    assertEquals(COMMITTED_SIZE, usedSpaceBreakDown.getCommittedBytes());
+
+    DeletionPendingBytesByStage deletionBreakdown = 
usedSpaceBreakDown.getDeletionPendingBytesByStage();
+    assertEquals(OM_PENDING_TOTAL, 
deletionBreakdown.getByStage().get("OM").get("totalBytes"));
+    assertEquals(SCM_PENDING, 
deletionBreakdown.getByStage().get("SCM").get("pendingBytes"));
+    assertEquals(0L, 
deletionBreakdown.getByStage().get("DN").get("pendingBytes"));
+  }
+
+  @Test
+  void testGetStorageDistributionWithSCMExceptionResponse() throws IOException 
{
+    setupMockDependencies();
+    setupScmExceptionScenario();
+    StorageDistributionEndpoint endpoint = new 
StorageDistributionEndpoint(mockReconScm, mockOmdbInsightEndpoint,
+        mockNsSummaryEndpoint, globalStatsDao, mockScmClient);
+    Response response = endpoint.getStorageDistribution();
+    assertNotNull(response);
+    assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+    StorageCapacityDistributionResponse responsePayload = 
(StorageCapacityDistributionResponse) response.getEntity();
+    assertNotNull(responsePayload);
+    GlobalStorageReport globalStorage = responsePayload.getGlobalStorage();
+    assertEquals(NODE_USED, globalStorage.getTotalUsedSpace());
+    assertEquals(NODE_FREE, globalStorage.getTotalFreeSpace());
+    assertEquals(NODE_CAPACITY, globalStorage.getTotalCapacity());
+
+    GlobalNamespaceReport namespaceReport = 
responsePayload.getGlobalNamespace();
+    assertEquals(NAMESPACE_USED, namespaceReport.getTotalUsedSpace());
+
+    UsedSpaceBreakDown usedSpaceBreakDown = 
responsePayload.getUsedSpaceBreakDown();
+    assertEquals(OPEN_KEYS_SIZE, usedSpaceBreakDown.getOpenKeysBytes());
+    assertEquals(COMMITTED_SIZE, usedSpaceBreakDown.getCommittedBytes());
+
+    DeletionPendingBytesByStage deletionBreakdown = 
usedSpaceBreakDown.getDeletionPendingBytesByStage();
+    assertEquals(OM_PENDING_TOTAL, 
deletionBreakdown.getByStage().get("OM").get("totalBytes"));
+    assertEquals(0, 
deletionBreakdown.getByStage().get("SCM").get("pendingBytes"));
+  }
+
+  @Test
+  void testGetStorageDistributionWithEmptyNodeList() throws IOException {
+    setupMockDependencies();
+    when(mockNodeManager.getAllNodes()).thenReturn(Collections.emptyList());
+    StorageDistributionEndpoint endpoint = new 
StorageDistributionEndpoint(mockReconScm, mockOmdbInsightEndpoint,
+        mockNsSummaryEndpoint, globalStatsDao, mockScmClient);
+    Response response = endpoint.getStorageDistribution();
+    assertNotNull(response);
+    assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+    StorageCapacityDistributionResponse payload = 
(StorageCapacityDistributionResponse) response.getEntity();
+    assertEquals(NODE_CAPACITY, payload.getGlobalStorage().getTotalCapacity());
+  }
+
+  @Test
+  void testGetStorageDistributionWithNullNodeStats() throws IOException {
+    setupMockDependencies();
+    when(mockNodeManager.getNodeStat(datanodeDetails)).thenReturn(null);
+    StorageDistributionEndpoint endpoint = new 
StorageDistributionEndpoint(mockReconScm, mockOmdbInsightEndpoint,
+        mockNsSummaryEndpoint, globalStatsDao, mockScmClient);
+    Response response = endpoint.getStorageDistribution();
+    assertNotNull(response);
+    assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+    StorageCapacityDistributionResponse payload = 
(StorageCapacityDistributionResponse) response.getEntity();
+    assertEquals(NODE_CAPACITY, payload.getGlobalStorage().getTotalCapacity());
+  }
+
+  @Test
+  void testGetStorageDistributionWithNullGlobalStats() throws IOException {
+    setupMockDependencies();
+    when(mockNodeManager.getStats()).thenReturn(null);
+    StorageDistributionEndpoint endpoint = new 
StorageDistributionEndpoint(mockReconScm, mockOmdbInsightEndpoint,
+        mockNsSummaryEndpoint, globalStatsDao, mockScmClient);
+    Response response = endpoint.getStorageDistribution();
+    assertNotNull(response);
+    assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+    StorageCapacityDistributionResponse payload = 
(StorageCapacityDistributionResponse) response.getEntity();
+    assertEquals(0L, payload.getGlobalStorage().getTotalCapacity());
+  }
+
+  @Test
+  void testGetStorageDistributionWithUnreachableNodes() throws IOException {
+    setupMockDependencies();
+    setupUnreachableNodesScenario();
+    StorageDistributionEndpoint endpoint = new 
StorageDistributionEndpoint(mockReconScm, mockOmdbInsightEndpoint,
+        mockNsSummaryEndpoint, globalStatsDao, mockScmClient);
+    Response response = endpoint.getStorageDistribution();
+    assertNotNull(response);
+    assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+    StorageCapacityDistributionResponse payload = 
(StorageCapacityDistributionResponse) response.getEntity();
+    long reachableCapacity = NODE_CAPACITY / 2;
+    assertEquals(reachableCapacity, 
payload.getGlobalStorage().getTotalCapacity());
+  }
+
+  @Test
+  void testGetStorageDistributionWithJmxMetricsFailure() throws IOException {
+    setupMockDependencies();
+    setupJmxMetricsFailureScenario();
+    StorageDistributionEndpoint endpoint = new 
StorageDistributionEndpoint(mockReconScm, mockOmdbInsightEndpoint,
+        mockNsSummaryEndpoint, globalStatsDao, mockScmClient);
+    Response response = endpoint.getStorageDistribution();
+    assertNotNull(response);
+    assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+    StorageCapacityDistributionResponse payload = 
(StorageCapacityDistributionResponse) response.getEntity();
+    assertEquals(0L, payload.getUsedSpaceBreakDown().getOpenKeysBytes());
+  }
+
+  private void setupMockDependencies() throws IOException {
+    DUResponse mockDuResponse = mock(DUResponse.class);
+    SCMNodeStat mockNodeStat = mock(SCMNodeStat.class);
+    mockNodeManager = mock(ReconNodeManager.class);
+    mockOmdbInsightEndpoint = mock(OMDBInsightEndpoint.class);
+    mockNsSummaryEndpoint = mock(NSSummaryEndpoint.class);
+    mockScmClient = mock(StorageContainerLocationProtocol.class);
+    mockReconScm = mock(OzoneStorageContainerManager.class);
+    when(mockReconScm.getScmNodeManager()).thenReturn(mockNodeManager);
+    datanodeDetails = mock(DatanodeInfo.class);
+    globalStatsDao = mock(GlobalStatsDao.class);
+    when(globalStatsDao.findById(any())).thenReturn(new GlobalStats("test", 
0L, new Timestamp(10000L)));
+    
when(mockNodeManager.getAllNodes()).thenReturn(Collections.singletonList(datanodeDetails));
+    when(mockNodeManager.getNodeStat(datanodeDetails)).thenReturn(new 
SCMNodeMetric(mockNodeStat));
+    when(mockNodeStat.getCapacity()).thenReturn(new LongMetric(NODE_CAPACITY));
+    when(mockNodeStat.getScmUsed()).thenReturn(new LongMetric(NODE_USED));
+    when(mockNodeStat.getRemaining()).thenReturn(new LongMetric(NODE_FREE));
+    when(mockNodeStat.getCommitted()).thenReturn(new 
LongMetric(COMMITTED_SIZE));
+
+    globalStats = mock(SCMNodeStat.class);
+    when(mockNodeManager.getStats()).thenReturn(globalStats);
+    when(globalStats.getScmUsed()).thenReturn(new LongMetric(NODE_USED));
+    when(globalStats.getCapacity()).thenReturn(new LongMetric(NODE_CAPACITY));
+    when(globalStats.getRemaining()).thenReturn(new LongMetric(NODE_FREE));
+
+    Map<String, Long> pendingDeletedDirSizes = new HashMap<>();
+    pendingDeletedDirSizes.put("totalReplicatedDataSize", OM_PENDING_TOTAL);
+    doAnswer(invocation -> {
+      ((Map<String, Long>) 
invocation.getArgument(0)).putAll(pendingDeletedDirSizes);
+      return null;
+    
}).when(mockOmdbInsightEndpoint).calculateTotalPendingDeletedDirSizes(anyMap());
+
+    Map<String, Long> openKeySummary = new HashMap<>();
+    openKeySummary.put("totalReplicatedDataSize", OPEN_KEYS_SIZE);
+    doAnswer(invocation -> {
+      ((Map<String, Long>) invocation.getArgument(0)).putAll(openKeySummary);
+      return null;
+    }).when(mockOmdbInsightEndpoint).createKeysSummaryForOpenKey(anyMap());
+    when(mockDuResponse.getSizeWithReplica()).thenReturn(COMMITTED_SIZE);
+    when(mockNsSummaryEndpoint.getDiskUsage(eq("/"), eq(false), eq(true), 
eq(false)))
+        .thenReturn(Response.ok(mockDuResponse).build());
+  }
+
+  @Test
+  void testStorageDistributionWithJMXTimeouts() throws IOException {
+    // Setup mock dependencies
+    setupMockDependencies();
+
+    // Create multiple datanodes - some will timeout, some will succeed
+    DatanodeInfo timeoutNode1 = mock(DatanodeInfo.class);
+    DatanodeInfo timeoutNode2 = mock(DatanodeInfo.class);
+    DatanodeInfo successNode = mock(DatanodeInfo.class);
+
+    List<DatanodeInfo> allNodes = new ArrayList<>();
+    allNodes.add(datanodeDetails); // original node that works
+    allNodes.add(timeoutNode1);
+    allNodes.add(timeoutNode2);
+    allNodes.add(successNode);
+
+    when(mockNodeManager.getAllNodes()).thenReturn(allNodes);
+
+    // Configure timeout nodes to throw timeout exceptions
+    when(mockNodeManager.getNodeStat(timeoutNode1))
+        .thenThrow(new RuntimeException("JMX call timeout: Connection timed 
out"));
+    when(mockNodeManager.getNodeStat(timeoutNode2))
+        .thenThrow(new RuntimeException("JMX timeout: Read timed out"));
+
+    // Configure success node to return valid metrics
+    SCMNodeStat successNodeStat = mock(SCMNodeStat.class);
+    when(mockNodeManager.getNodeStat(successNode)).thenReturn(new 
SCMNodeMetric(successNodeStat));
+    when(successNodeStat.getCapacity()).thenReturn(new 
LongMetric(NODE_CAPACITY));
+    when(successNodeStat.getScmUsed()).thenReturn(new LongMetric(NODE_USED));
+    when(successNodeStat.getRemaining()).thenReturn(new LongMetric(NODE_FREE));
+    when(successNodeStat.getCommitted()).thenReturn(new 
LongMetric(COMMITTED_SIZE));
+
+    // Simulate partial JMX timeout in namespace metrics collection
+    doAnswer(invocation -> {
+      Map<String, Long> metricsMap = invocation.getArgument(0);
+      // Simulate that only partial metrics are available due to timeouts
+      metricsMap.put("totalReplicatedDataSize", OPEN_KEYS_SIZE / 2); // 
Reduced due to timeout
+      return null;
+    }).when(mockOmdbInsightEndpoint).createKeysSummaryForOpenKey(anyMap());
+
+    // Setup SCM client to work normally
+    DeletedBlocksTransactionSummary scmSummary = 
mock(DeletedBlocksTransactionSummary.class);
+    when(scmSummary.getTotalBlockReplicatedSize()).thenReturn(SCM_PENDING);
+    when(scmSummary.getTotalBlockSize()).thenReturn(SCM_PENDING / 3);
+    when(mockScmClient.getDeletedBlockSummary()).thenReturn(scmSummary);
+
+    // Execute the test
+    StorageDistributionEndpoint endpoint = new StorageDistributionEndpoint(
+        mockReconScm, mockOmdbInsightEndpoint, mockNsSummaryEndpoint,
+        globalStatsDao, mockScmClient);
+    Response response = endpoint.getStorageDistribution();
+
+    // Verify response is successful despite timeouts
+    assertNotNull(response);
+    assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+
+    StorageCapacityDistributionResponse payload =
+        (StorageCapacityDistributionResponse) response.getEntity();
+    assertNotNull(payload);
+
+    // Verify that partial results are returned
+    GlobalStorageReport globalStorage = payload.getGlobalStorage();
+    // Should only include stats from nodes that didn't timeout (2 out of 4 
nodes)
+    assertEquals(NODE_CAPACITY, globalStorage.getTotalCapacity()); // From 
global stats
+    assertEquals(NODE_USED, globalStorage.getTotalUsedSpace());
+    assertEquals(NODE_FREE, globalStorage.getTotalFreeSpace());
+
+    // Verify namespace metrics are partially available
+    UsedSpaceBreakDown usedSpaceBreakDown = payload.getUsedSpaceBreakDown();
+    // Should be reduced due to timeout affecting some nodes
+    assertEquals(OPEN_KEYS_SIZE / 2, usedSpaceBreakDown.getOpenKeysBytes());
+    assertEquals(COMMITTED_SIZE, usedSpaceBreakDown.getCommittedBytes());
+
+    // Verify deletion metrics are still available (SCM didn't timeout)
+    DeletionPendingBytesByStage deletionBreakdown =
+        usedSpaceBreakDown.getDeletionPendingBytesByStage();
+    assertEquals(SCM_PENDING, 
deletionBreakdown.getByStage().get("SCM").get("pendingBytes"));
+  }
+
+  @Test
+  void testStorageDistributionWithAllJMXTimeouts() throws IOException {
+    // Test scenario where all JMX calls timeout
+    setupMockDependencies();
+
+    // Configure all datanodes to timeout
+    DatanodeInfo timeoutNode1 = mock(DatanodeInfo.class);
+    DatanodeInfo timeoutNode2 = mock(DatanodeInfo.class);
+
+    List<DatanodeInfo> allNodes = new ArrayList<>();
+    allNodes.add(timeoutNode1);
+    allNodes.add(timeoutNode2);
+
+    when(mockNodeManager.getAllNodes()).thenReturn(allNodes);
+
+    // All nodes timeout
+    when(mockNodeManager.getNodeStat(timeoutNode1))
+        .thenThrow(new RuntimeException("JMX timeout"));
+    when(mockNodeManager.getNodeStat(timeoutNode2))
+        .thenThrow(new RuntimeException("JMX timeout"));
+
+    // Namespace metrics also timeout
+    doThrow(new RuntimeException("JMX namespace timeout"))
+        .when(mockOmdbInsightEndpoint).createKeysSummaryForOpenKey(anyMap());
+    doThrow(new RuntimeException("JMX directory timeout"))
+        
.when(mockOmdbInsightEndpoint).calculateTotalPendingDeletedDirSizes(anyMap());
+
+    // SCM also times out
+    when(mockScmClient.getDeletedBlockSummary())
+        .thenThrow(new IOException("JMX SCM timeout"));
+
+    StorageDistributionEndpoint endpoint = new StorageDistributionEndpoint(
+        mockReconScm, mockOmdbInsightEndpoint, mockNsSummaryEndpoint,
+        globalStatsDao, mockScmClient);
+    Response response = endpoint.getStorageDistribution();
+
+    // Should still return OK with default/fallback values
+    assertNotNull(response);
+    assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+
+    StorageCapacityDistributionResponse payload =
+        (StorageCapacityDistributionResponse) response.getEntity();
+    assertNotNull(payload);
+
+    // Should fall back to global stats
+    GlobalStorageReport globalStorage = payload.getGlobalStorage();
+    assertEquals(NODE_CAPACITY, globalStorage.getTotalCapacity());
+
+    // Namespace and breakdown metrics should have default values
+    UsedSpaceBreakDown usedSpaceBreakDown = payload.getUsedSpaceBreakDown();
+    assertEquals(0L, usedSpaceBreakDown.getOpenKeysBytes());
+
+    DeletionPendingBytesByStage deletionBreakdown =
+        usedSpaceBreakDown.getDeletionPendingBytesByStage();
+    assertEquals(0L, 
deletionBreakdown.getByStage().get("SCM").get("pendingBytes"));
+  }
+
+  @Test
+  void testStorageDistributionWithJMXTimeoutConfiguration() throws IOException 
{
+    // Test that timeout behavior respects configuration
+    setupMockDependencies();
+
+    // Simulate a slow JMX call that would timeout with short timeout but 
succeed with longer timeout
+    DatanodeInfo slowNode = mock(DatanodeInfo.class);
+    
when(mockNodeManager.getAllNodes()).thenReturn(Collections.singletonList(slowNode));
+
+    // Mock a slow response that eventually succeeds
+    SCMNodeStat slowNodeStat = mock(SCMNodeStat.class);
+    when(mockNodeManager.getNodeStat(slowNode)).thenAnswer(invocation -> {
+      // Simulate slow response
+      Thread.sleep(100); // Short delay to simulate slow response
+      SCMNodeMetric nodeMetric = new SCMNodeMetric(slowNodeStat);
+      return nodeMetric;
+    });
+
+    when(slowNodeStat.getCapacity()).thenReturn(new LongMetric(NODE_CAPACITY));
+    when(slowNodeStat.getScmUsed()).thenReturn(new LongMetric(NODE_USED));
+    when(slowNodeStat.getRemaining()).thenReturn(new LongMetric(NODE_FREE));
+    when(slowNodeStat.getCommitted()).thenReturn(new 
LongMetric(COMMITTED_SIZE));
+
+    StorageDistributionEndpoint endpoint = new StorageDistributionEndpoint(
+        mockReconScm, mockOmdbInsightEndpoint, mockNsSummaryEndpoint,
+        globalStatsDao, mockScmClient);
+
+    Response response = endpoint.getStorageDistribution();
+
+    // Should succeed if timeout is configured appropriately
+    assertNotNull(response);
+    assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+
+    StorageCapacityDistributionResponse payload =
+        (StorageCapacityDistributionResponse) response.getEntity();
+    assertNotNull(payload);
+
+    // Verify that the slow node's metrics are included
+    GlobalStorageReport globalStorage = payload.getGlobalStorage();
+    assertEquals(NODE_CAPACITY, globalStorage.getTotalCapacity());
+  }
+
+  private void setupSuccessfulScenario() throws IOException {
+    DeletedBlocksTransactionSummary scmSummary = 
mock(DeletedBlocksTransactionSummary.class);
+    when(scmSummary.getTotalBlockReplicatedSize()).thenReturn(SCM_PENDING);
+    when(scmSummary.getTotalBlockSize()).thenReturn(SCM_PENDING / 3);
+    when(mockScmClient.getDeletedBlockSummary()).thenReturn(scmSummary);
+  }
+
+  private void setupScmExceptionScenario() throws IOException {
+    when(mockScmClient.getDeletedBlockSummary()).thenThrow(new 
IOException("Test Exception"));
+  }
+
+  private void setupUnreachableNodesScenario() {
+    DatanodeInfo unreachableNode = mock(DatanodeInfo.class);
+    List<DatanodeInfo> allNodes = new ArrayList<>();
+    allNodes.add(datanodeDetails);
+    allNodes.add(unreachableNode);
+    when(mockNodeManager.getAllNodes()).thenReturn(allNodes);
+    when(mockNodeManager.getNodeStat(unreachableNode)).thenReturn(null);
+    when(globalStats.getCapacity()).thenReturn(new LongMetric(NODE_CAPACITY /  
 2));
+  }
+
+  private void setupJmxMetricsFailureScenario() {
+    doThrow(new RuntimeException("JMX Metrics Failure"))
+        .when(mockOmdbInsightEndpoint).createKeysSummaryForOpenKey(anyMap());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to