This is an automated email from the ASF dual-hosted git repository.
arafat2198 pushed a commit to branch HDDS-13177
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-13177 by this push:
new e91543b7aae HDDS-13465. Adding endpoint at Recon to get consolidated
storage distribution report (#8995)
e91543b7aae is described below
commit e91543b7aaec0c2568c66a052cb5f79458d84386
Author: Priyesh Karatha <[email protected]>
AuthorDate: Tue Sep 30 14:04:48 2025 +0530
HDDS-13465. Adding endpoint at Recon to get consolidated storage
distribution report (#8995)
---
hadoop-ozone/recon/pom.xml | 17 +
.../org/apache/hadoop/ozone/recon/ReconUtils.java | 101 +++++
.../ozone/recon/api/ClusterStateEndpoint.java | 12 +-
.../hadoop/ozone/recon/api/NodeEndpoint.java | 13 +-
.../ozone/recon/api/OMDBInsightEndpoint.java | 8 +-
.../recon/api/StorageDistributionEndpoint.java | 341 ++++++++++++++++
.../recon/api/types/DatanodeStorageReport.java | 134 +++++-
...eport.java => DeletionPendingBytesByStage.java} | 43 +-
...orageReport.java => GlobalNamespaceReport.java} | 46 +--
.../ozone/recon/api/types/GlobalStorageReport.java | 58 +++
.../types/StorageCapacityDistributionResponse.java | 130 ++++++
.../ozone/recon/api/types/UsedSpaceBreakDown.java | 82 ++++
.../recon/api/TestStorageDistributionEndpoint.java | 452 +++++++++++++++++++++
13 files changed, 1371 insertions(+), 66 deletions(-)
diff --git a/hadoop-ozone/recon/pom.xml b/hadoop-ozone/recon/pom.xml
index 079022c2f28..b24b251614c 100644
--- a/hadoop-ozone/recon/pom.xml
+++ b/hadoop-ozone/recon/pom.xml
@@ -126,6 +126,23 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>${httpclient.version}</version>
+ <exclusions>
+ <exclusion>
+ <!-- depend on jcl-over-slf4j instead -->
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>${httpcore.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-common</artifactId>
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
index 2758dfb34cc..4898c2a7e94 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
@@ -27,6 +27,8 @@
import static org.jooq.impl.DSL.select;
import static org.jooq.impl.DSL.using;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.inject.Singleton;
@@ -49,6 +51,7 @@
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.ws.rs.core.Response;
import org.apache.commons.io.FileUtils;
@@ -56,6 +59,7 @@
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.ScmUtils;
import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
@@ -80,6 +84,13 @@
import org.apache.hadoop.ozone.recon.scm.ReconContainerReportQueue;
import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
import
org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.util.EntityUtils;
import org.apache.ozone.recon.schema.generated.tables.daos.GlobalStatsDao;
import org.apache.ozone.recon.schema.generated.tables.pojos.GlobalStats;
import org.jooq.Configuration;
@@ -94,6 +105,13 @@ public class ReconUtils {
private static Logger log = LoggerFactory.getLogger(
ReconUtils.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static CloseableHttpClient httpClient;
+ private static final int MAX_CONNECTIONS_PER_ROUTE = 20;
+ private static final int MAX_TOTAL_CONNECTIONS = 100;
+ private static final int CONNECTION_TIMEOUT_MS = 5000;
+ private static final int SOCKET_TIMEOUT_MS = 10000;
public ReconUtils() {
}
@@ -108,6 +126,40 @@ public static
org.apache.hadoop.ozone.recon.tasks.NSSummaryTask.RebuildState get
return org.apache.hadoop.ozone.recon.tasks.NSSummaryTask.getRebuildState();
}
+ static {
+ initializeHttpClient();
+ }
+
+ private static void initializeHttpClient() {
+ PoolingHttpClientConnectionManager connectionManager = new
PoolingHttpClientConnectionManager();
+ connectionManager.setMaxTotal(MAX_TOTAL_CONNECTIONS);
+ connectionManager.setDefaultMaxPerRoute(MAX_CONNECTIONS_PER_ROUTE);
+ connectionManager.setValidateAfterInactivity(30000); // 30 seconds
+
+ RequestConfig requestConfig = RequestConfig.custom()
+ .setConnectionRequestTimeout(CONNECTION_TIMEOUT_MS)
+ .setConnectTimeout(CONNECTION_TIMEOUT_MS)
+ .setSocketTimeout(SOCKET_TIMEOUT_MS)
+ .build();
+
+ httpClient = HttpClientBuilder.create()
+ .setConnectionManager(connectionManager)
+ .setDefaultRequestConfig(requestConfig)
+ .setConnectionTimeToLive(60, TimeUnit.SECONDS)
+ .evictIdleConnections(30, TimeUnit.SECONDS)
+ .build();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ if (httpClient != null) {
+ httpClient.close();
+ }
+ } catch (IOException e) {
+ log.warn("Error closing HTTP client", e);
+ }
+ }));
+ }
+
public static File getReconScmDbDir(ConfigurationSource conf) {
return new ReconUtils().getReconDbDir(conf, OZONE_RECON_SCM_DB_DIR);
}
@@ -850,4 +902,53 @@ public static String constructObjectPathWithPrefix(long...
ids) {
}
return pathBuilder.toString();
}
+
+ public static long getMetricsFromDatanode(DatanodeDetails datanode, String
service, String name, String keyName)
+ throws IOException {
+ // Construct metrics URL for DataNode JMX endpoint
+ String metricsUrl =
String.format("http://%s:%d/jmx?qry=Hadoop:service=%s,name=%s",
+ datanode.getIpAddress(),
+ datanode.getPort(DatanodeDetails.Port.Name.HTTP).getValue(),
+ service,
+ name);
+ HttpGet request = new HttpGet(metricsUrl);
+ try (CloseableHttpResponse response = httpClient.execute(request)) {
+ if (response.getStatusLine().getStatusCode() != 200) {
+ throw new IOException("HTTP request failed with status: " +
+ response.getStatusLine().getStatusCode());
+ }
+
+ String jsonResponse = EntityUtils.toString(response.getEntity());
+ return parseMetrics(jsonResponse, name, keyName);
+ } catch (IOException e) {
+ log.error("Error getting metrics from datanode: {}",
datanode.getIpAddress(), e);
+ }
+ return 0;
+ }
+
+ private static long parseMetrics(String jsonResponse, String serviceName,
String keyName) {
+ if (jsonResponse == null || jsonResponse.isEmpty()) {
+ return -1L;
+ }
+ try {
+ JsonNode root = OBJECT_MAPPER.readTree(jsonResponse);
+ JsonNode beans = root.get("beans");
+ if (beans != null && beans.isArray()) {
+ for (JsonNode bean : beans) {
+ String name = bean.path("name").asText("");
+ if (name.contains(serviceName)) {
+ return extractMetrics(bean, keyName);
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.warn("Failed to parse block deletion metrics JSON: {}",
e.toString());
+ }
+ return 0L;
+ }
+
+ /** Extract block deletion metrics from JMX bean node. */
+ private static long extractMetrics(JsonNode beanNode, String keyName) {
+ return beanNode.path(keyName).asLong(0L);
+ }
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
index 8be9ef47469..056ac9e21a0 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
@@ -119,10 +119,14 @@ public Response getClusterState() {
nodeManager.getNodeCount(NodeStatus.inServiceHealthyReadOnly());
SCMNodeStat stats = nodeManager.getStats();
- DatanodeStorageReport storageReport =
- new DatanodeStorageReport(stats.getCapacity().get(),
- stats.getScmUsed().get(), stats.getRemaining().get(),
- stats.getCommitted().get());
+
+ DatanodeStorageReport storageReport = DatanodeStorageReport.newBuilder()
+ .setCapacity(stats.getCapacity().get())
+ .setCommitted(stats.getCommitted().get())
+ .setUsed(stats.getScmUsed().get())
+ .setMinimumFreeSpace(stats.getFreeSpaceToSpare().get())
+ .setRemaining(stats.getRemaining().get())
+ .build();
ClusterStateResponse.Builder builder = ClusterStateResponse.newBuilder();
GlobalStats volumeRecord = globalStatsDao.findById(
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/NodeEndpoint.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/NodeEndpoint.java
index cb7693054a3..58e72c1a915 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/NodeEndpoint.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/NodeEndpoint.java
@@ -173,11 +173,14 @@ public Response getDatanodes() {
private DatanodeStorageReport getStorageReport(DatanodeDetails datanode) {
SCMNodeStat nodeStat =
nodeManager.getNodeStat(datanode).get();
- long capacity = nodeStat.getCapacity().get();
- long used = nodeStat.getScmUsed().get();
- long remaining = nodeStat.getRemaining().get();
- long committed = nodeStat.getCommitted().get();
- return new DatanodeStorageReport(capacity, used, remaining, committed);
+ DatanodeStorageReport storageReport = DatanodeStorageReport.newBuilder()
+ .setCapacity(nodeStat.getCapacity().get())
+ .setUsed(nodeStat.getScmUsed().get())
+ .setRemaining(nodeStat.getRemaining().get())
+ .setCommitted(nodeStat.getCommitted().get())
+ .setMinimumFreeSpace(nodeStat.getFreeSpaceToSpare().get())
+ .build();
+ return storageReport;
}
/**
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
index 2dcfc83d1ae..a94dc72bc73 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/OMDBInsightEndpoint.java
@@ -340,7 +340,7 @@ public Response getOpenKeySummary() {
*
* @param keysSummary A map to store the keys summary information.
*/
- private void createKeysSummaryForOpenKey(
+ public void createKeysSummaryForOpenKey(
Map<String, Long> keysSummary) {
Long replicatedSizeOpenKey = getValueFromId(globalStatsDao.findById(
OmTableInsightTask.getReplicatedSizeKeyFromTable(OPEN_KEY_TABLE)));
@@ -402,7 +402,7 @@ public Response getOpenMPUKeySummary() {
*
* @param keysSummary A map to store the keys summary information.
*/
- private void createKeysSummaryForOpenMPUKey(Map<String, Long> keysSummary) {
+ public void createKeysSummaryForOpenMPUKey(Map<String, Long> keysSummary) {
Long replicatedSizeOpenMPUKey = getValueFromId(globalStatsDao.findById(
OmTableInsightTask.getReplicatedSizeKeyFromTable(MULTIPART_INFO_TABLE)));
Long unreplicatedSizeOpenMPUKey = getValueFromId(globalStatsDao.findById(
@@ -589,7 +589,7 @@ private boolean getPendingForDeletionKeyInfo(
*
* @param keysSummary A map to store the keys summary information.
*/
- private void createKeysSummaryForDeletedKey(Map<String, Long> keysSummary) {
+ public void createKeysSummaryForDeletedKey(Map<String, Long> keysSummary) {
// Fetch the necessary metrics for deleted keys
Long replicatedSizeDeleted = getValueFromId(globalStatsDao.findById(
OmTableInsightTask.getReplicatedSizeKeyFromTable(DELETED_TABLE)));
@@ -674,7 +674,7 @@ private void getPendingForDeletionDirInfo(
}
}
- private void calculateTotalPendingDeletedDirSizes(Map<String, Long>
dirSummary) {
+ public void calculateTotalPendingDeletedDirSizes(Map<String, Long>
dirSummary) {
long totalDataSize = 0L;
long totalReplicatedDataSize = 0L;
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/StorageDistributionEndpoint.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/StorageDistributionEndpoint.java
new file mode 100644
index 00000000000..5365884a0d8
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/StorageDistributionEndpoint.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api;
+
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FILE_TABLE;
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Response;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import
org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
+import org.apache.hadoop.ozone.recon.ReconUtils;
+import org.apache.hadoop.ozone.recon.api.types.DUResponse;
+import org.apache.hadoop.ozone.recon.api.types.DatanodeStorageReport;
+import org.apache.hadoop.ozone.recon.api.types.DeletionPendingBytesByStage;
+import org.apache.hadoop.ozone.recon.api.types.GlobalNamespaceReport;
+import org.apache.hadoop.ozone.recon.api.types.GlobalStorageReport;
+import
org.apache.hadoop.ozone.recon.api.types.StorageCapacityDistributionResponse;
+import org.apache.hadoop.ozone.recon.api.types.UsedSpaceBreakDown;
+import org.apache.hadoop.ozone.recon.scm.ReconNodeManager;
+import org.apache.hadoop.ozone.recon.tasks.OmTableInsightTask;
+import org.apache.ozone.recon.schema.generated.tables.daos.GlobalStatsDao;
+import org.apache.ozone.recon.schema.generated.tables.pojos.GlobalStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This endpoint handles requests related to storage distribution across
+ * different datanodes in a Recon instance. It provides detailed reports
+ * on storage capacity, utilization, and associated metrics.
+ * <p>
+ * The data is aggregated from multiple sources, including node manager
+ * statistics, and is used to construct responses with information
+ * about global storage and namespace usage, storage usage breakdown,
+ * and deletion operations in progress.
+ * <p>
+ * An instance of {@link ReconNodeManager} is used to fetch detailed
+ * node-specific statistics required for generating the report.
+ */
+@Path("/storageDistribution")
+@Produces("application/json")
+@AdminOnly
+public class StorageDistributionEndpoint {
+ private final ReconNodeManager nodeManager;
+ private final OMDBInsightEndpoint omdbInsightEndpoint;
+ private final NSSummaryEndpoint nsSummaryEndpoint;
+ private final StorageContainerLocationProtocol scmClient;
+ private static Logger log =
LoggerFactory.getLogger(StorageDistributionEndpoint.class);
+ private Map<DatanodeDetails, Long> blockDeletionMetricsMap = new
ConcurrentHashMap<>();
+ private GlobalStatsDao globalStatsDao;
+
+ @Inject
+ public StorageDistributionEndpoint(OzoneStorageContainerManager reconSCM,
+ OMDBInsightEndpoint omDbInsightEndpoint,
+ NSSummaryEndpoint nsSummaryEndpoint,
+ GlobalStatsDao globalStatsDao,
+ StorageContainerLocationProtocol
scmClient) {
+ this.nodeManager = (ReconNodeManager) reconSCM.getScmNodeManager();
+ this.omdbInsightEndpoint = omDbInsightEndpoint;
+ this.nsSummaryEndpoint = nsSummaryEndpoint;
+ this.scmClient = scmClient;
+ this.globalStatsDao = globalStatsDao;
+ }
+
+ @GET
+ public Response getStorageDistribution() {
+ try {
+ initializeBlockDeletionMetricsMap();
+ List<DatanodeStorageReport> nodeStorageReports =
collectDatanodeReports();
+ GlobalStorageReport globalStorageReport = calculateGlobalStorageReport();
+
+ Map<String, Long> namespaceMetrics = new HashMap<>();
+ try {
+ namespaceMetrics = calculateNamespaceMetrics();
+ } catch (Exception e) {
+ log.error("Error calculating namespace metrics", e);
+ // Initialize with default values
+ namespaceMetrics.put("totalUsedNamespace", 0L);
+ namespaceMetrics.put("totalOpenKeySize", 0L);
+ namespaceMetrics.put("totalCommittedSize", 0L);
+ namespaceMetrics.put("pendingDirectorySize", 0L);
+ namespaceMetrics.put("pendingKeySize", 0L);
+ namespaceMetrics.put("totalKeys", 0L);
+ }
+
+ StorageCapacityDistributionResponse response =
buildStorageDistributionResponse(
+ nodeStorageReports, globalStorageReport, namespaceMetrics);
+ return Response.ok(response).build();
+ } catch (Exception e) {
+ log.error("Error getting storage distribution", e);
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity("Error retrieving storage distribution: " +
e.getMessage())
+ .build();
+ }
+ }
+
+ private GlobalStorageReport calculateGlobalStorageReport() {
+ try {
+ SCMNodeStat stats = nodeManager.getStats();
+ if (stats == null) {
+ log.warn("Node manager stats are null, returning default values");
+ return new GlobalStorageReport(0L, 0L, 0L);
+ }
+
+ long scmUsed = stats.getScmUsed() != null ? stats.getScmUsed().get() :
0L;
+ long remaining = stats.getRemaining() != null ?
stats.getRemaining().get() : 0L;
+ long capacity = stats.getCapacity() != null ? stats.getCapacity().get()
: 0L;
+
+ return new GlobalStorageReport(scmUsed, remaining, capacity);
+ } catch (Exception e) {
+ log.error("Error calculating global storage report", e);
+ return new GlobalStorageReport(0L, 0L, 0L);
+ }
+ }
+
+ private Map<String, Long> calculateNamespaceMetrics() {
+ Map<String, Long> metrics = new HashMap<>();
+ Map<String, Long> totalPendingAtOmSide = calculatePendingSizes();
+ long totalOpenKeySize = calculateOpenKeySizes();
+ long totalCommittedSize = calculateCommittedSize();
+ long pendingDirectorySize =
totalPendingAtOmSide.getOrDefault("pendingDirectorySize", 0L);
+ long pendingKeySize = totalPendingAtOmSide.getOrDefault("pendingKeySize",
0L);
+ long totalUsedNamespace = pendingDirectorySize + pendingKeySize +
totalOpenKeySize + totalCommittedSize;
+
+ long totalKeys = 0L;
+ // Keys from OBJECT_STORE buckets.
+ GlobalStats keyRecord = globalStatsDao.findById(
+ OmTableInsightTask.getTableCountKeyFromTable(KEY_TABLE));
+ // Keys from FILE_SYSTEM_OPTIMIZED buckets
+ GlobalStats fileRecord = globalStatsDao.findById(
+ OmTableInsightTask.getTableCountKeyFromTable(FILE_TABLE));
+ if (keyRecord != null) {
+ totalKeys += keyRecord.getValue();
+ }
+ if (fileRecord != null) {
+ totalKeys += fileRecord.getValue();
+ }
+
+ metrics.put("pendingDirectorySize", pendingDirectorySize);
+ metrics.put("pendingKeySize", pendingKeySize);
+ metrics.put("totalOpenKeySize", totalOpenKeySize);
+ metrics.put("totalCommittedSize", totalCommittedSize);
+ metrics.put("totalUsedNamespace", totalUsedNamespace);
+ metrics.put("totalKeys", totalKeys);
+ return metrics;
+ }
+
+ private StorageCapacityDistributionResponse buildStorageDistributionResponse(
+ List<DatanodeStorageReport> nodeStorageReports,
+ GlobalStorageReport storageMetrics,
+ Map<String, Long> namespaceMetrics) {
+ DeletedBlocksTransactionSummary scmSummary = null;
+ try {
+ scmSummary = scmClient.getDeletedBlockSummary();
+ } catch (IOException e) {
+ log.error("Failed to get deleted block summary from SCM", e);
+ }
+
+ long totalPendingAtDnSide = 0L;
+ try {
+ totalPendingAtDnSide =
blockDeletionMetricsMap.values().stream().reduce(0L, Long::sum);
+ } catch (Exception e) {
+ log.error("Error calculating pending deletion metrics", e);
+ }
+
+ DeletionPendingBytesByStage deletionPendingBytesByStage =
+ createDeletionPendingBytesByStage(
+ namespaceMetrics.getOrDefault("pendingDirectorySize", 0L),
+ namespaceMetrics.getOrDefault("pendingKeySize", 0L),
+ scmSummary != null ?
scmSummary.getTotalBlockReplicatedSize() : 0L,
+ totalPendingAtDnSide);
+
+ // Safely get values from namespaceMetrics with null checks
+ Long totalUsedNamespace = namespaceMetrics.get("totalUsedNamespace");
+ Long totalOpenKeySize = namespaceMetrics.get("totalOpenKeySize");
+ Long totalCommittedSize = namespaceMetrics.get("totalCommittedSize");
+ Long totalKeys = namespaceMetrics.get("totalKeys");
+ Long totalContainerPreAllocated = nodeStorageReports.stream()
+ .map(report -> report.getCommitted())
+ .reduce(0L, Long::sum);
+
+ return StorageCapacityDistributionResponse.newBuilder()
+ .setDataNodeUsage(nodeStorageReports)
+ .setGlobalStorage(storageMetrics)
+ .setGlobalNamespace(new GlobalNamespaceReport(
+ totalUsedNamespace != null ? totalUsedNamespace : 0L,
+ totalKeys != null ? totalKeys : 0L))
+ .setUsedSpaceBreakDown(new UsedSpaceBreakDown(
+ totalOpenKeySize != null ? totalOpenKeySize : 0L,
+ totalCommittedSize != null ? totalCommittedSize : 0L,
+ totalContainerPreAllocated != null ?
totalContainerPreAllocated : 0L,
+ deletionPendingBytesByStage))
+ .build();
+ }
+
+ private List<DatanodeStorageReport> collectDatanodeReports() {
+ return nodeManager.getAllNodes().stream()
+ .map(this::getStorageReport)
+ .filter(report -> report != null) // Filter out null reports
+ .collect(Collectors.toList());
+ }
+
+ private Map<String, Long> calculatePendingSizes() {
+ Map<String, Long> result = new HashMap<>();
+ Map<String, Long> pendingDeletedDirSizes = new HashMap<>();
+
omdbInsightEndpoint.calculateTotalPendingDeletedDirSizes(pendingDeletedDirSizes);
+ Map<String, Long> pendingKeySize = new HashMap<>();
+ omdbInsightEndpoint.createKeysSummaryForDeletedKey(pendingKeySize);
+ result.put("pendingDirectorySize",
pendingDeletedDirSizes.getOrDefault("totalReplicatedDataSize", 0L));
+ result.put("pendingKeySize",
pendingKeySize.getOrDefault("totalReplicatedDataSize", 0L));
+ return result;
+ }
+
+ private long calculateOpenKeySizes() {
+ Map<String, Long> openKeySummary = new HashMap<>();
+ omdbInsightEndpoint.createKeysSummaryForOpenKey(openKeySummary);
+ Map<String, Long> openKeyMPUSummary = new HashMap<>();
+ omdbInsightEndpoint.createKeysSummaryForOpenMPUKey(openKeyMPUSummary);
+ long openKeyDataSize =
openKeySummary.getOrDefault("totalReplicatedDataSize", 0L);
+ long totalMPUKeySize =
openKeyMPUSummary.getOrDefault("totalReplicatedDataSize", 0L);
+ return openKeyDataSize + totalMPUKeySize;
+ }
+
+ private long calculateCommittedSize() {
+ try {
+ Response rootResponse = nsSummaryEndpoint.getDiskUsage("/", false, true,
false);
+ if (rootResponse.getStatus() != Response.Status.OK.getStatusCode()) {
+ log.warn("Failed to get disk usage, status: {}",
rootResponse.getStatus());
+ return 0L;
+ }
+ DUResponse duRootRes = (DUResponse) rootResponse.getEntity();
+ return duRootRes != null ? duRootRes.getSizeWithReplica() : 0L;
+ } catch (IOException e) {
+ log.error("IOException while calculating committed size", e);
+ return 0L;
+ }
+ }
+
+ private DeletionPendingBytesByStage createDeletionPendingBytesByStage(long
pendingDirectorySize,
+ long
pendingKeySize,
+ long
totalPendingAtScmSide,
+ long
totalPendingAtDnSide) {
+ long totalPending = pendingDirectorySize + pendingKeySize +
totalPendingAtScmSide + totalPendingAtDnSide;
+ Map<String, Map<String, Long>> stageItems = new HashMap<>();
+ Map<String, Long> omMap = new HashMap<>();
+ omMap.put("totalBytes", pendingDirectorySize + pendingKeySize);
+ omMap.put("pendingDirectoryBytes", pendingDirectorySize);
+ omMap.put("pendingKeyBytes", pendingKeySize);
+ Map<String, Long> scmMap = new HashMap<>();
+ scmMap.put("pendingBytes", totalPendingAtScmSide);
+ Map<String, Long> dnMap = new HashMap<>();
+ dnMap.put("pendingBytes", totalPendingAtDnSide);
+ stageItems.put("OM", omMap);
+ stageItems.put("SCM", scmMap);
+ stageItems.put("DN", dnMap);
+ return new DeletionPendingBytesByStage(totalPending, stageItems);
+ }
+
+ private void initializeBlockDeletionMetricsMap() {
+ nodeManager.getNodeStats().keySet().parallelStream().forEach(nodeId -> {
+ try {
+ long dnPending = ReconUtils.getMetricsFromDatanode(nodeId,
+ "HddsDatanode",
+ "BlockDeletingService",
+ "TotalPendingBlockBytes");
+ blockDeletionMetricsMap.put(nodeId, dnPending);
+ } catch (IOException e) {
+ blockDeletionMetricsMap.put(nodeId, -1L);
+ }
+ });
+ }
+
+ private DatanodeStorageReport getStorageReport(DatanodeDetails datanode) {
+ try {
+ SCMNodeMetric nodeMetric = nodeManager.getNodeStat(datanode);
+ if (nodeMetric == null) {
+ log.warn("Node statistics not available for datanode: {}", datanode);
+ return null; // Return null for unavailable nodes
+ }
+
+ SCMNodeStat nodeStat = nodeMetric.get();
+ if (nodeStat == null) {
+ log.warn("Node stat is null for datanode: {}", datanode);
+ return null; // Return null for unavailable stats
+ }
+
+ long capacity = nodeStat.getCapacity() != null ?
nodeStat.getCapacity().get() : 0L;
+ long used = nodeStat.getScmUsed() != null ? nodeStat.getScmUsed().get()
: 0L;
+ long remaining = nodeStat.getRemaining() != null ?
nodeStat.getRemaining().get() : 0L;
+ long committed = nodeStat.getCommitted() != null ?
nodeStat.getCommitted().get() : 0L;
+ long pendingDeletion = blockDeletionMetricsMap.getOrDefault(datanode,
0L);
+ long minFreeSpace = nodeStat.getFreeSpaceToSpare() != null ?
nodeStat.getFreeSpaceToSpare().get() : 0L;
+ if (pendingDeletion < 0) {
+ log.warn("Block deletion metrics unavailable for datanode: {}",
datanode);
+ pendingDeletion = 0L;
+ }
+ DatanodeStorageReport storageReport = DatanodeStorageReport.newBuilder()
+ .setCapacity(capacity)
+ .setUsed(used)
+ .setRemaining(remaining)
+ .setCommitted(committed)
+ .setPendingDeletion(pendingDeletion)
+ .setMinimumFreeSpace(minFreeSpace)
+ .setDatanodeUuid(datanode.getUuidString())
+ .setHostName(datanode.getHostName())
+ .build();
+ return storageReport;
+ } catch (Exception e) {
+ log.error("Error getting storage report for datanode: {}", datanode, e);
+ return null; // Return null on any error
+ }
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodeStorageReport.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodeStorageReport.java
index d677554705f..87126c58051 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodeStorageReport.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodeStorageReport.java
@@ -17,21 +17,40 @@
package org.apache.hadoop.ozone.recon.api.types;
+import java.util.Objects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Metadata object that contains storage report of a Datanode.
*/
-public class DatanodeStorageReport {
+public final class DatanodeStorageReport {
+ private String datanodeUuid;
+ private String hostName;
private long capacity;
private long used;
private long remaining;
private long committed;
+ private long pendingDeletion;
+ private long minimumFreeSpace;
+
+ private DatanodeStorageReport(Builder builder) {
+ this.datanodeUuid = builder.datanodeUuid;
+ this.hostName = builder.hostName;
+ this.capacity = builder.capacity;
+ this.used = builder.used;
+ this.remaining = builder.remaining;
+ this.committed = builder.committed;
+ this.pendingDeletion = builder.pendingDeletion;
+ this.minimumFreeSpace = builder.minimumFreeSpace;
+ }
- public DatanodeStorageReport(long capacity, long used, long remaining,
- long committed) {
- this.capacity = capacity;
- this.used = used;
- this.remaining = remaining;
- this.committed = committed;
+ public String getDatanodeUuid() {
+ return datanodeUuid;
+ }
+
+ public String getHostName() {
+ return hostName;
}
public long getCapacity() {
@@ -49,4 +68,105 @@ public long getRemaining() {
public long getCommitted() {
return committed;
}
+
+ public long getPendingDeletion() {
+ return pendingDeletion;
+ }
+
+ public long getMinimumFreeSpace() {
+ return minimumFreeSpace;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder class for DataNodeStorage Report.
+ */
+ public static final class Builder {
+ private String datanodeUuid = "";
+ private String hostName = "";
+ private long capacity = 0;
+ private long used = 0;
+ private long remaining = 0;
+ private long committed = 0;
+ private long pendingDeletion = 0;
+ private long minimumFreeSpace = 0;
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(Builder.class);
+
+ private Builder() {
+ }
+
+ public Builder setDatanodeUuid(String datanodeUuid) {
+ this.datanodeUuid = datanodeUuid;
+ return this;
+ }
+
+ public Builder setHostName(String hostName) {
+ this.hostName = hostName;
+ return this;
+ }
+
+ public Builder setCapacity(long capacity) {
+ this.capacity = capacity;
+ return this;
+ }
+
+ public Builder setUsed(long used) {
+ this.used = used;
+ return this;
+ }
+
+ public Builder setRemaining(long remaining) {
+ this.remaining = remaining;
+ return this;
+ }
+
+ public Builder setCommitted(long committed) {
+ this.committed = committed;
+ return this;
+ }
+
+ public Builder setPendingDeletion(long pendingDeletion) {
+ this.pendingDeletion = pendingDeletion;
+ return this;
+ }
+
+ public Builder setMinimumFreeSpace(long minimumFreeSpace) {
+ this.minimumFreeSpace = minimumFreeSpace;
+ return this;
+ }
+
+ public void validate() {
+ Objects.requireNonNull(hostName, "hostName cannot be null");
+
+ if (capacity < 0) {
+ throw new IllegalArgumentException("capacity cannot be negative");
+ }
+ if (used < 0) {
+ throw new IllegalArgumentException("used cannot be negative");
+ }
+ if (remaining < 0) {
+ throw new IllegalArgumentException("remaining cannot be negative");
+ }
+ if (committed < 0) {
+ throw new IllegalArgumentException("committed cannot be negative");
+ }
+ if (pendingDeletion < 0) {
+ throw new IllegalArgumentException("pendingDeletion cannot be
negative");
+ }
+ // Logical consistency checks
+ if (used + remaining > capacity) {
+ LOG.warn("Inconsistent storage report for {}: used({}) + remaining({})
> capacity({})",
+ hostName, used, remaining, capacity);
+ }
+ }
+
+ public DatanodeStorageReport build() {
+ return new DatanodeStorageReport(this);
+ }
+ }
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodeStorageReport.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DeletionPendingBytesByStage.java
similarity index 53%
copy from
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodeStorageReport.java
copy to
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DeletionPendingBytesByStage.java
index d677554705f..024464f5549 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodeStorageReport.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DeletionPendingBytesByStage.java
@@ -17,36 +17,33 @@
package org.apache.hadoop.ozone.recon.api.types;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+
/**
- * Metadata object that contains storage report of a Datanode.
+ * This class represents the metadata related to deletion stages and the
+ * corresponding bytes pending to be deleted. It encapsulates the total
+ * pending deletion byte count and provides structured data to represent
+ * pending bytes categorized by different stages.
*/
-public class DatanodeStorageReport {
- private long capacity;
- private long used;
- private long remaining;
- private long committed;
-
- public DatanodeStorageReport(long capacity, long used, long remaining,
- long committed) {
- this.capacity = capacity;
- this.used = used;
- this.remaining = remaining;
- this.committed = committed;
- }
+public class DeletionPendingBytesByStage {
- public long getCapacity() {
- return capacity;
- }
+ @JsonProperty("total")
+ private long total;
+
+ @JsonProperty("byStage")
+ private Map<String, Map<String, Long>> byStage;
- public long getUsed() {
- return used;
+ public DeletionPendingBytesByStage(long total, Map<String, Map<String,
Long>> byStage) {
+ this.total = total;
+ this.byStage = byStage;
}
- public long getRemaining() {
- return remaining;
+ public long getTotal() {
+ return total;
}
- public long getCommitted() {
- return committed;
+ public Map<String, Map<String, Long>> getByStage() {
+ return byStage;
}
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodeStorageReport.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/GlobalNamespaceReport.java
similarity index 52%
copy from
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodeStorageReport.java
copy to
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/GlobalNamespaceReport.java
index d677554705f..7adeaaf7c0b 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/DatanodeStorageReport.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/GlobalNamespaceReport.java
@@ -17,36 +17,36 @@
package org.apache.hadoop.ozone.recon.api.types;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
/**
- * Metadata object that contains storage report of a Datanode.
+ * The GlobalNamespaceReport class serves as a representation of
+ * the global namespace metadata summary for a storage system.
+ * This class encapsulates statistical information related
+ * to the state of the global namespace.
+ *
+ * The metadata includes:
+ * - Total space utilized.
+ * - Total number of keys present in the namespace.
*/
-public class DatanodeStorageReport {
- private long capacity;
- private long used;
- private long remaining;
- private long committed;
-
- public DatanodeStorageReport(long capacity, long used, long remaining,
- long committed) {
- this.capacity = capacity;
- this.used = used;
- this.remaining = remaining;
- this.committed = committed;
- }
+public class GlobalNamespaceReport {
- public long getCapacity() {
- return capacity;
- }
+ @JsonProperty("totalUsedSpace")
+ private long totalUsedSpace;
+
+ @JsonProperty("totalKeys")
+ private long totalKeys;
- public long getUsed() {
- return used;
+ public GlobalNamespaceReport(long totalUsedSpace, long totalKeys) {
+ this.totalUsedSpace = totalUsedSpace;
+ this.totalKeys = totalKeys;
}
- public long getRemaining() {
- return remaining;
+ public long getTotalUsedSpace() {
+ return totalUsedSpace;
}
- public long getCommitted() {
- return committed;
+ public long getTotalKeys() {
+ return totalKeys;
}
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/GlobalStorageReport.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/GlobalStorageReport.java
new file mode 100644
index 00000000000..650203187f3
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/GlobalStorageReport.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api.types;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Represents a report detailing global storage usage metrics.
+ *
+ * This class provides information about the total used space,
+ * total free space, and the total storage capacity. It is used
+ * to encapsulate and transport these metrics, which can assist
+ * in monitoring and analyzing storage allocation and usage.
+ */
+public class GlobalStorageReport {
+
+ @JsonProperty("totalUsedSpace")
+ private long totalUsedSpace;
+
+ @JsonProperty("totalFreeSpace")
+ private long totalFreeSpace;
+
+ @JsonProperty("totalCapacity")
+ private long totalCapacity;
+
+ public GlobalStorageReport(long totalUsedSpace, long totalFreeSpace, long
totalCapacity) {
+ this.totalUsedSpace = totalUsedSpace;
+ this.totalFreeSpace = totalFreeSpace;
+ this.totalCapacity = totalCapacity;
+ }
+
+ public long getTotalUsedSpace() {
+ return totalUsedSpace;
+ }
+
+ public long getTotalFreeSpace() {
+ return totalFreeSpace;
+ }
+
+ public long getTotalCapacity() {
+ return totalCapacity;
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/StorageCapacityDistributionResponse.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/StorageCapacityDistributionResponse.java
new file mode 100644
index 00000000000..65d141e24e4
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/StorageCapacityDistributionResponse.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api.types;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+/**
+ * Represents the response structure for storage capacity distribution in the
system.
+ * Provides aggregated information about global storage, namespace, space
usage breakdown,
+ * and individual data node storage reports.
+ *
+ * The response contains the following key components:
+ *
+ * - Global storage statistics for the entire cluster.
+ * - Namespace report providing high-level metadata of the namespace.
+ * - Detailed breakdown of used storage space by category.
+ * - A list of metadata reports pertaining to storage usage on individual data
nodes.
+ */
+public class StorageCapacityDistributionResponse {
+
+ @JsonProperty("globalStorage")
+ private GlobalStorageReport globalStorage;
+
+ @JsonProperty("globalNamespace")
+ private GlobalNamespaceReport globalNamespace;
+
+ @JsonProperty("usedSpaceBreakdown")
+ private UsedSpaceBreakDown usedSpaceBreakDown;
+
+ @JsonProperty("dataNodeUsage")
+ private List<DatanodeStorageReport> dataNodeUsage;
+
+ public StorageCapacityDistributionResponse(Builder builder) {
+ this.globalStorage = builder.globalStorage;
+ this.globalNamespace = builder.globalNamespace;
+ this.usedSpaceBreakDown = builder.usedSpaceBreakDown;
+ this.dataNodeUsage = builder.dataNodeUsage;
+ }
+
+ public GlobalStorageReport getGlobalStorage() {
+ return globalStorage;
+ }
+
+ public GlobalNamespaceReport getGlobalNamespace() {
+ return globalNamespace;
+ }
+
+ public UsedSpaceBreakDown getUsedSpaceBreakDown() {
+ return usedSpaceBreakDown;
+ }
+
+ public List<DatanodeStorageReport> getDataNodeUsage() {
+ return dataNodeUsage;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder class for constructing instances of {@link
StorageCapacityDistributionResponse}.
+ *
+ * This class follows the builder pattern, allowing for a flexible and
readable
+ * construction of a StorageCapacityDistributionResponse object. It provides
+ * methods to set individual components of the response before building
+ * the final object.
+ *
+ * The following components can be set using this builder:
+ * - Global storage report, represented by {@link GlobalStorageReport}.
+ * - Global namespace report, represented by {@link GlobalNamespaceReport}.
+ * - Breakdown of used storage space by category, represented by {@link
UsedSpaceBreakDown}.
+ * - A list of data node storage usage reports, represented by {@link
DatanodeStorageReport}.
+ *
+ * The build method generates a StorageCapacityDistributionResponse instance
using
+ * the values set in this builder. Unset values will remain null.
+ */
+ public static final class Builder {
+ private GlobalStorageReport globalStorage;
+ private GlobalNamespaceReport globalNamespace;
+ private UsedSpaceBreakDown usedSpaceBreakDown;
+ private List<DatanodeStorageReport> dataNodeUsage;
+
+ public Builder() {
+ this.globalStorage = null;
+ this.globalNamespace = null;
+ this.usedSpaceBreakDown = null;
+ this.dataNodeUsage = null;
+ }
+
+ public Builder setGlobalStorage(GlobalStorageReport globalStorage) {
+ this.globalStorage = globalStorage;
+ return this;
+ }
+
+ public Builder setGlobalNamespace(GlobalNamespaceReport globalNamespace) {
+ this.globalNamespace = globalNamespace;
+ return this;
+ }
+
+ public Builder setDataNodeUsage(List<DatanodeStorageReport> dataNodeUsage)
{
+ this.dataNodeUsage = dataNodeUsage;
+ return this;
+ }
+
+ public Builder setUsedSpaceBreakDown(UsedSpaceBreakDown
usedSpaceBreakDown) {
+ this.usedSpaceBreakDown = usedSpaceBreakDown;
+ return this;
+ }
+
+ public StorageCapacityDistributionResponse build() {
+ return new StorageCapacityDistributionResponse(this);
+ }
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UsedSpaceBreakDown.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UsedSpaceBreakDown.java
new file mode 100644
index 00000000000..e1dfd318e1c
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UsedSpaceBreakDown.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api.types;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Represents a breakdown of storage space usage in a system by categorizing
+ * the used space into open keys, committed bytes, and deletion-pending bytes.
+ *
+ * This class serves as a container for information about the allocation of
+ * storage resources across different states of usage. It provides a detailed
+ * view of three primary categories of used space:
+ *
+ * 1. Open keys: The total space occupied by keys that are in an open,
+ * uncommitted state.
+ *
+ * 2. Committed bytes: Information on space occupied by keys that have been
+ * committed.
+ *
+ * 3. Deletion-pending bytes: Information on the space occupied by keys that
+ * are marked for deletion but are still occupying storage. This is
organized
+ * by deletion stages and is encapsulated within the
+ * {@link DeletionPendingBytesByStage} object.
+ *
+ * This class is typically used in scenarios where it is necessary to analyze
+ * or monitor the storage utilization in a system to gain insights or plan
+ * optimizations.
+ */
+public class UsedSpaceBreakDown {
+
+ @JsonProperty("openKeysBytes")
+ private long openKeysBytes;
+
+ @JsonProperty("committedBytes")
+ private long committedBytes;
+
+ @JsonProperty("containerPreAllocated")
+ private long containerPreAllocated;
+
+ @JsonProperty("deletionPendingBytes")
+ private DeletionPendingBytesByStage deletionPendingBytesByStage;
+
+ public UsedSpaceBreakDown(long openKeysBytes, long committedBytes, long
containerPreAllocated,
+ DeletionPendingBytesByStage deletionPendingBytesByStage) {
+ this.openKeysBytes = openKeysBytes;
+ this.committedBytes = committedBytes;
+ this.containerPreAllocated = containerPreAllocated;
+ this.deletionPendingBytesByStage = deletionPendingBytesByStage;
+ }
+
+ public long getOpenKeysBytes() {
+ return openKeysBytes;
+ }
+
+ public long getCommittedBytes() {
+ return committedBytes;
+ }
+
+ public long getContainerPreAllocated() {
+ return containerPreAllocated;
+ }
+
+ public DeletionPendingBytesByStage getDeletionPendingBytesByStage() {
+ return deletionPendingBytesByStage;
+ }
+}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java
new file mode 100644
index 00000000000..d46e13a1335
--- /dev/null
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.anyMap;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.ws.rs.core.Response;
+import
org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.LongMetric;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
+import org.apache.hadoop.ozone.recon.api.types.DUResponse;
+import org.apache.hadoop.ozone.recon.api.types.DeletionPendingBytesByStage;
+import org.apache.hadoop.ozone.recon.api.types.GlobalNamespaceReport;
+import org.apache.hadoop.ozone.recon.api.types.GlobalStorageReport;
+import
org.apache.hadoop.ozone.recon.api.types.StorageCapacityDistributionResponse;
+import org.apache.hadoop.ozone.recon.api.types.UsedSpaceBreakDown;
+import org.apache.hadoop.ozone.recon.scm.ReconNodeManager;
+import org.apache.ozone.recon.schema.generated.tables.daos.GlobalStatsDao;
+import org.apache.ozone.recon.schema.generated.tables.pojos.GlobalStats;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test class for StorageDistributionEndpoint, responsible for testing
+ * the behavior and responses of the storage distribution endpoint in
+ * different scenarios, including successful responses and exception cases.
+ */
+class StorageDistributionEndpointTest {
+
+ private ReconNodeManager mockNodeManager;
+ private OMDBInsightEndpoint mockOmdbInsightEndpoint;
+ private NSSummaryEndpoint mockNsSummaryEndpoint;
+ private StorageContainerLocationProtocol mockScmClient;
+ private GlobalStatsDao globalStatsDao;
+ private OzoneStorageContainerManager mockReconScm;
+ private DatanodeInfo datanodeDetails;
+ private SCMNodeStat globalStats;
+
+ private static final long NODE_CAPACITY = 5000L;
+ private static final long NODE_USED = 2000L;
+ private static final long NODE_FREE = 3000L;
+ private static final long NAMESPACE_USED = 500L;
+ private static final long OPEN_KEYS_SIZE = 150L;
+ private static final long COMMITTED_SIZE = 300L;
+ private static final long OM_PENDING_TOTAL = 50L;
+ private static final long SCM_PENDING = 75L;
+
+ @Test
+ void testGetStorageDistributionSuccessfulResponse() throws IOException {
+ setupMockDependencies();
+ setupSuccessfulScenario();
+ StorageDistributionEndpoint endpoint = new
StorageDistributionEndpoint(mockReconScm, mockOmdbInsightEndpoint,
+ mockNsSummaryEndpoint, globalStatsDao, mockScmClient);
+ Response response = endpoint.getStorageDistribution();
+
+ assertNotNull(response);
+ assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+ StorageCapacityDistributionResponse responsePayload =
(StorageCapacityDistributionResponse) response.getEntity();
+ assertNotNull(responsePayload);
+
+ GlobalStorageReport globalStorage = responsePayload.getGlobalStorage();
+ assertEquals(NODE_USED, globalStorage.getTotalUsedSpace());
+ assertEquals(NODE_FREE, globalStorage.getTotalFreeSpace());
+ assertEquals(NODE_CAPACITY, globalStorage.getTotalCapacity());
+
+ GlobalNamespaceReport namespaceReport =
responsePayload.getGlobalNamespace();
+ assertEquals(NAMESPACE_USED, namespaceReport.getTotalUsedSpace());
+
+ UsedSpaceBreakDown usedSpaceBreakDown =
responsePayload.getUsedSpaceBreakDown();
+ assertEquals(OPEN_KEYS_SIZE, usedSpaceBreakDown.getOpenKeysBytes());
+ assertEquals(COMMITTED_SIZE, usedSpaceBreakDown.getCommittedBytes());
+
+ DeletionPendingBytesByStage deletionBreakdown =
usedSpaceBreakDown.getDeletionPendingBytesByStage();
+ assertEquals(OM_PENDING_TOTAL,
deletionBreakdown.getByStage().get("OM").get("totalBytes"));
+ assertEquals(SCM_PENDING,
deletionBreakdown.getByStage().get("SCM").get("pendingBytes"));
+ assertEquals(0L,
deletionBreakdown.getByStage().get("DN").get("pendingBytes"));
+ }
+
+ @Test
+ void testGetStorageDistributionWithSCMExceptionResponse() throws IOException
{
+ setupMockDependencies();
+ setupScmExceptionScenario();
+ StorageDistributionEndpoint endpoint = new
StorageDistributionEndpoint(mockReconScm, mockOmdbInsightEndpoint,
+ mockNsSummaryEndpoint, globalStatsDao, mockScmClient);
+ Response response = endpoint.getStorageDistribution();
+ assertNotNull(response);
+ assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+ StorageCapacityDistributionResponse responsePayload =
(StorageCapacityDistributionResponse) response.getEntity();
+ assertNotNull(responsePayload);
+ GlobalStorageReport globalStorage = responsePayload.getGlobalStorage();
+ assertEquals(NODE_USED, globalStorage.getTotalUsedSpace());
+ assertEquals(NODE_FREE, globalStorage.getTotalFreeSpace());
+ assertEquals(NODE_CAPACITY, globalStorage.getTotalCapacity());
+
+ GlobalNamespaceReport namespaceReport =
responsePayload.getGlobalNamespace();
+ assertEquals(NAMESPACE_USED, namespaceReport.getTotalUsedSpace());
+
+ UsedSpaceBreakDown usedSpaceBreakDown =
responsePayload.getUsedSpaceBreakDown();
+ assertEquals(OPEN_KEYS_SIZE, usedSpaceBreakDown.getOpenKeysBytes());
+ assertEquals(COMMITTED_SIZE, usedSpaceBreakDown.getCommittedBytes());
+
+ DeletionPendingBytesByStage deletionBreakdown =
usedSpaceBreakDown.getDeletionPendingBytesByStage();
+ assertEquals(OM_PENDING_TOTAL,
deletionBreakdown.getByStage().get("OM").get("totalBytes"));
+ assertEquals(0,
deletionBreakdown.getByStage().get("SCM").get("pendingBytes"));
+ }
+
+ @Test
+ void testGetStorageDistributionWithEmptyNodeList() throws IOException {
+ setupMockDependencies();
+ when(mockNodeManager.getAllNodes()).thenReturn(Collections.emptyList());
+ StorageDistributionEndpoint endpoint = new
StorageDistributionEndpoint(mockReconScm, mockOmdbInsightEndpoint,
+ mockNsSummaryEndpoint, globalStatsDao, mockScmClient);
+ Response response = endpoint.getStorageDistribution();
+ assertNotNull(response);
+ assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+ StorageCapacityDistributionResponse payload =
(StorageCapacityDistributionResponse) response.getEntity();
+ assertEquals(NODE_CAPACITY, payload.getGlobalStorage().getTotalCapacity());
+ }
+
+ @Test
+ void testGetStorageDistributionWithNullNodeStats() throws IOException {
+ setupMockDependencies();
+ when(mockNodeManager.getNodeStat(datanodeDetails)).thenReturn(null);
+ StorageDistributionEndpoint endpoint = new
StorageDistributionEndpoint(mockReconScm, mockOmdbInsightEndpoint,
+ mockNsSummaryEndpoint, globalStatsDao, mockScmClient);
+ Response response = endpoint.getStorageDistribution();
+ assertNotNull(response);
+ assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+ StorageCapacityDistributionResponse payload =
(StorageCapacityDistributionResponse) response.getEntity();
+ assertEquals(NODE_CAPACITY, payload.getGlobalStorage().getTotalCapacity());
+ }
+
+ @Test
+ void testGetStorageDistributionWithNullGlobalStats() throws IOException {
+ setupMockDependencies();
+ when(mockNodeManager.getStats()).thenReturn(null);
+ StorageDistributionEndpoint endpoint = new
StorageDistributionEndpoint(mockReconScm, mockOmdbInsightEndpoint,
+ mockNsSummaryEndpoint, globalStatsDao, mockScmClient);
+ Response response = endpoint.getStorageDistribution();
+ assertNotNull(response);
+ assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+ StorageCapacityDistributionResponse payload =
(StorageCapacityDistributionResponse) response.getEntity();
+ assertEquals(0L, payload.getGlobalStorage().getTotalCapacity());
+ }
+
+ @Test
+ void testGetStorageDistributionWithUnreachableNodes() throws IOException {
+ setupMockDependencies();
+ setupUnreachableNodesScenario();
+ StorageDistributionEndpoint endpoint = new
StorageDistributionEndpoint(mockReconScm, mockOmdbInsightEndpoint,
+ mockNsSummaryEndpoint, globalStatsDao, mockScmClient);
+ Response response = endpoint.getStorageDistribution();
+ assertNotNull(response);
+ assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+ StorageCapacityDistributionResponse payload =
(StorageCapacityDistributionResponse) response.getEntity();
+ long reachableCapacity = NODE_CAPACITY / 2;
+ assertEquals(reachableCapacity,
payload.getGlobalStorage().getTotalCapacity());
+ }
+
+ @Test
+ void testGetStorageDistributionWithJmxMetricsFailure() throws IOException {
+ setupMockDependencies();
+ setupJmxMetricsFailureScenario();
+ StorageDistributionEndpoint endpoint = new
StorageDistributionEndpoint(mockReconScm, mockOmdbInsightEndpoint,
+ mockNsSummaryEndpoint, globalStatsDao, mockScmClient);
+ Response response = endpoint.getStorageDistribution();
+ assertNotNull(response);
+ assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+ StorageCapacityDistributionResponse payload =
(StorageCapacityDistributionResponse) response.getEntity();
+ assertEquals(0L, payload.getUsedSpaceBreakDown().getOpenKeysBytes());
+ }
+
+ private void setupMockDependencies() throws IOException {
+ DUResponse mockDuResponse = mock(DUResponse.class);
+ SCMNodeStat mockNodeStat = mock(SCMNodeStat.class);
+ mockNodeManager = mock(ReconNodeManager.class);
+ mockOmdbInsightEndpoint = mock(OMDBInsightEndpoint.class);
+ mockNsSummaryEndpoint = mock(NSSummaryEndpoint.class);
+ mockScmClient = mock(StorageContainerLocationProtocol.class);
+ mockReconScm = mock(OzoneStorageContainerManager.class);
+ when(mockReconScm.getScmNodeManager()).thenReturn(mockNodeManager);
+ datanodeDetails = mock(DatanodeInfo.class);
+ globalStatsDao = mock(GlobalStatsDao.class);
+ when(globalStatsDao.findById(any())).thenReturn(new GlobalStats("test",
0L, new Timestamp(10000L)));
+
when(mockNodeManager.getAllNodes()).thenReturn(Collections.singletonList(datanodeDetails));
+ when(mockNodeManager.getNodeStat(datanodeDetails)).thenReturn(new
SCMNodeMetric(mockNodeStat));
+ when(mockNodeStat.getCapacity()).thenReturn(new LongMetric(NODE_CAPACITY));
+ when(mockNodeStat.getScmUsed()).thenReturn(new LongMetric(NODE_USED));
+ when(mockNodeStat.getRemaining()).thenReturn(new LongMetric(NODE_FREE));
+ when(mockNodeStat.getCommitted()).thenReturn(new
LongMetric(COMMITTED_SIZE));
+
+ globalStats = mock(SCMNodeStat.class);
+ when(mockNodeManager.getStats()).thenReturn(globalStats);
+ when(globalStats.getScmUsed()).thenReturn(new LongMetric(NODE_USED));
+ when(globalStats.getCapacity()).thenReturn(new LongMetric(NODE_CAPACITY));
+ when(globalStats.getRemaining()).thenReturn(new LongMetric(NODE_FREE));
+
+ Map<String, Long> pendingDeletedDirSizes = new HashMap<>();
+ pendingDeletedDirSizes.put("totalReplicatedDataSize", OM_PENDING_TOTAL);
+ doAnswer(invocation -> {
+ ((Map<String, Long>)
invocation.getArgument(0)).putAll(pendingDeletedDirSizes);
+ return null;
+
}).when(mockOmdbInsightEndpoint).calculateTotalPendingDeletedDirSizes(anyMap());
+
+ Map<String, Long> openKeySummary = new HashMap<>();
+ openKeySummary.put("totalReplicatedDataSize", OPEN_KEYS_SIZE);
+ doAnswer(invocation -> {
+ ((Map<String, Long>) invocation.getArgument(0)).putAll(openKeySummary);
+ return null;
+ }).when(mockOmdbInsightEndpoint).createKeysSummaryForOpenKey(anyMap());
+ when(mockDuResponse.getSizeWithReplica()).thenReturn(COMMITTED_SIZE);
+ when(mockNsSummaryEndpoint.getDiskUsage(eq("/"), eq(false), eq(true),
eq(false)))
+ .thenReturn(Response.ok(mockDuResponse).build());
+ }
+
+ @Test
+ void testStorageDistributionWithJMXTimeouts() throws IOException {
+ // Setup mock dependencies
+ setupMockDependencies();
+
+ // Create multiple datanodes - some will timeout, some will succeed
+ DatanodeInfo timeoutNode1 = mock(DatanodeInfo.class);
+ DatanodeInfo timeoutNode2 = mock(DatanodeInfo.class);
+ DatanodeInfo successNode = mock(DatanodeInfo.class);
+
+ List<DatanodeInfo> allNodes = new ArrayList<>();
+ allNodes.add(datanodeDetails); // original node that works
+ allNodes.add(timeoutNode1);
+ allNodes.add(timeoutNode2);
+ allNodes.add(successNode);
+
+ when(mockNodeManager.getAllNodes()).thenReturn(allNodes);
+
+ // Configure timeout nodes to throw timeout exceptions
+ when(mockNodeManager.getNodeStat(timeoutNode1))
+ .thenThrow(new RuntimeException("JMX call timeout: Connection timed
out"));
+ when(mockNodeManager.getNodeStat(timeoutNode2))
+ .thenThrow(new RuntimeException("JMX timeout: Read timed out"));
+
+ // Configure success node to return valid metrics
+ SCMNodeStat successNodeStat = mock(SCMNodeStat.class);
+ when(mockNodeManager.getNodeStat(successNode)).thenReturn(new
SCMNodeMetric(successNodeStat));
+ when(successNodeStat.getCapacity()).thenReturn(new
LongMetric(NODE_CAPACITY));
+ when(successNodeStat.getScmUsed()).thenReturn(new LongMetric(NODE_USED));
+ when(successNodeStat.getRemaining()).thenReturn(new LongMetric(NODE_FREE));
+ when(successNodeStat.getCommitted()).thenReturn(new
LongMetric(COMMITTED_SIZE));
+
+ // Simulate partial JMX timeout in namespace metrics collection
+ doAnswer(invocation -> {
+ Map<String, Long> metricsMap = invocation.getArgument(0);
+ // Simulate that only partial metrics are available due to timeouts
+ metricsMap.put("totalReplicatedDataSize", OPEN_KEYS_SIZE / 2); //
Reduced due to timeout
+ return null;
+ }).when(mockOmdbInsightEndpoint).createKeysSummaryForOpenKey(anyMap());
+
+ // Setup SCM client to work normally
+ DeletedBlocksTransactionSummary scmSummary =
mock(DeletedBlocksTransactionSummary.class);
+ when(scmSummary.getTotalBlockReplicatedSize()).thenReturn(SCM_PENDING);
+ when(scmSummary.getTotalBlockSize()).thenReturn(SCM_PENDING / 3);
+ when(mockScmClient.getDeletedBlockSummary()).thenReturn(scmSummary);
+
+ // Execute the test
+ StorageDistributionEndpoint endpoint = new StorageDistributionEndpoint(
+ mockReconScm, mockOmdbInsightEndpoint, mockNsSummaryEndpoint,
+ globalStatsDao, mockScmClient);
+ Response response = endpoint.getStorageDistribution();
+
+ // Verify response is successful despite timeouts
+ assertNotNull(response);
+ assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+
+ StorageCapacityDistributionResponse payload =
+ (StorageCapacityDistributionResponse) response.getEntity();
+ assertNotNull(payload);
+
+ // Verify that partial results are returned
+ GlobalStorageReport globalStorage = payload.getGlobalStorage();
+ // Should only include stats from nodes that didn't timeout (2 out of 4
nodes)
+ assertEquals(NODE_CAPACITY, globalStorage.getTotalCapacity()); // From
global stats
+ assertEquals(NODE_USED, globalStorage.getTotalUsedSpace());
+ assertEquals(NODE_FREE, globalStorage.getTotalFreeSpace());
+
+ // Verify namespace metrics are partially available
+ UsedSpaceBreakDown usedSpaceBreakDown = payload.getUsedSpaceBreakDown();
+ // Should be reduced due to timeout affecting some nodes
+ assertEquals(OPEN_KEYS_SIZE / 2, usedSpaceBreakDown.getOpenKeysBytes());
+ assertEquals(COMMITTED_SIZE, usedSpaceBreakDown.getCommittedBytes());
+
+ // Verify deletion metrics are still available (SCM didn't timeout)
+ DeletionPendingBytesByStage deletionBreakdown =
+ usedSpaceBreakDown.getDeletionPendingBytesByStage();
+ assertEquals(SCM_PENDING,
deletionBreakdown.getByStage().get("SCM").get("pendingBytes"));
+ }
+
+ @Test
+ void testStorageDistributionWithAllJMXTimeouts() throws IOException {
+ // Test scenario where all JMX calls timeout
+ setupMockDependencies();
+
+ // Configure all datanodes to timeout
+ DatanodeInfo timeoutNode1 = mock(DatanodeInfo.class);
+ DatanodeInfo timeoutNode2 = mock(DatanodeInfo.class);
+
+ List<DatanodeInfo> allNodes = new ArrayList<>();
+ allNodes.add(timeoutNode1);
+ allNodes.add(timeoutNode2);
+
+ when(mockNodeManager.getAllNodes()).thenReturn(allNodes);
+
+ // All nodes timeout
+ when(mockNodeManager.getNodeStat(timeoutNode1))
+ .thenThrow(new RuntimeException("JMX timeout"));
+ when(mockNodeManager.getNodeStat(timeoutNode2))
+ .thenThrow(new RuntimeException("JMX timeout"));
+
+ // Namespace metrics also timeout
+ doThrow(new RuntimeException("JMX namespace timeout"))
+ .when(mockOmdbInsightEndpoint).createKeysSummaryForOpenKey(anyMap());
+ doThrow(new RuntimeException("JMX directory timeout"))
+
.when(mockOmdbInsightEndpoint).calculateTotalPendingDeletedDirSizes(anyMap());
+
+ // SCM also times out
+ when(mockScmClient.getDeletedBlockSummary())
+ .thenThrow(new IOException("JMX SCM timeout"));
+
+ StorageDistributionEndpoint endpoint = new StorageDistributionEndpoint(
+ mockReconScm, mockOmdbInsightEndpoint, mockNsSummaryEndpoint,
+ globalStatsDao, mockScmClient);
+ Response response = endpoint.getStorageDistribution();
+
+ // Should still return OK with default/fallback values
+ assertNotNull(response);
+ assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+
+ StorageCapacityDistributionResponse payload =
+ (StorageCapacityDistributionResponse) response.getEntity();
+ assertNotNull(payload);
+
+ // Should fall back to global stats
+ GlobalStorageReport globalStorage = payload.getGlobalStorage();
+ assertEquals(NODE_CAPACITY, globalStorage.getTotalCapacity());
+
+ // Namespace and breakdown metrics should have default values
+ UsedSpaceBreakDown usedSpaceBreakDown = payload.getUsedSpaceBreakDown();
+ assertEquals(0L, usedSpaceBreakDown.getOpenKeysBytes());
+
+ DeletionPendingBytesByStage deletionBreakdown =
+ usedSpaceBreakDown.getDeletionPendingBytesByStage();
+ assertEquals(0L,
deletionBreakdown.getByStage().get("SCM").get("pendingBytes"));
+ }
+
+ @Test
+ void testStorageDistributionWithJMXTimeoutConfiguration() throws IOException
{
+ // Test that timeout behavior respects configuration
+ setupMockDependencies();
+
+ // Simulate a slow JMX call that would timeout with short timeout but
succeed with longer timeout
+ DatanodeInfo slowNode = mock(DatanodeInfo.class);
+
when(mockNodeManager.getAllNodes()).thenReturn(Collections.singletonList(slowNode));
+
+ // Mock a slow response that eventually succeeds
+ SCMNodeStat slowNodeStat = mock(SCMNodeStat.class);
+ when(mockNodeManager.getNodeStat(slowNode)).thenAnswer(invocation -> {
+ // Simulate slow response
+ Thread.sleep(100); // Short delay to simulate slow response
+ SCMNodeMetric nodeMetric = new SCMNodeMetric(slowNodeStat);
+ return nodeMetric;
+ });
+
+ when(slowNodeStat.getCapacity()).thenReturn(new LongMetric(NODE_CAPACITY));
+ when(slowNodeStat.getScmUsed()).thenReturn(new LongMetric(NODE_USED));
+ when(slowNodeStat.getRemaining()).thenReturn(new LongMetric(NODE_FREE));
+ when(slowNodeStat.getCommitted()).thenReturn(new
LongMetric(COMMITTED_SIZE));
+
+ StorageDistributionEndpoint endpoint = new StorageDistributionEndpoint(
+ mockReconScm, mockOmdbInsightEndpoint, mockNsSummaryEndpoint,
+ globalStatsDao, mockScmClient);
+
+ Response response = endpoint.getStorageDistribution();
+
+ // Should succeed if timeout is configured appropriately
+ assertNotNull(response);
+ assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+
+ StorageCapacityDistributionResponse payload =
+ (StorageCapacityDistributionResponse) response.getEntity();
+ assertNotNull(payload);
+
+ // Verify that the slow node's metrics are included
+ GlobalStorageReport globalStorage = payload.getGlobalStorage();
+ assertEquals(NODE_CAPACITY, globalStorage.getTotalCapacity());
+ }
+
+ private void setupSuccessfulScenario() throws IOException {
+ DeletedBlocksTransactionSummary scmSummary =
mock(DeletedBlocksTransactionSummary.class);
+ when(scmSummary.getTotalBlockReplicatedSize()).thenReturn(SCM_PENDING);
+ when(scmSummary.getTotalBlockSize()).thenReturn(SCM_PENDING / 3);
+ when(mockScmClient.getDeletedBlockSummary()).thenReturn(scmSummary);
+ }
+
+ private void setupScmExceptionScenario() throws IOException {
+ when(mockScmClient.getDeletedBlockSummary()).thenThrow(new
IOException("Test Exception"));
+ }
+
+ private void setupUnreachableNodesScenario() {
+ DatanodeInfo unreachableNode = mock(DatanodeInfo.class);
+ List<DatanodeInfo> allNodes = new ArrayList<>();
+ allNodes.add(datanodeDetails);
+ allNodes.add(unreachableNode);
+ when(mockNodeManager.getAllNodes()).thenReturn(allNodes);
+ when(mockNodeManager.getNodeStat(unreachableNode)).thenReturn(null);
+ when(globalStats.getCapacity()).thenReturn(new LongMetric(NODE_CAPACITY /
2));
+ }
+
+ private void setupJmxMetricsFailureScenario() {
+ doThrow(new RuntimeException("JMX Metrics Failure"))
+ .when(mockOmdbInsightEndpoint).createKeysSummaryForOpenKey(anyMap());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]