This is an automated email from the ASF dual-hosted git repository.
swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 45c900d5d9 HDDS-12619. Optimize Recon OM Container Mismatch API (#8101)
45c900d5d9 is described below
commit 45c900d5d9d20e271fdb01cbd1814ce40c68ce47
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Wed Mar 19 18:54:56 2025 -0700
HDDS-12619. Optimize Recon OM Container Mismatch API (#8101)
---
.../apache/hadoop/ozone/util/SeekableIterator.java | 28 +++++
.../hadoop/ozone/recon/api/ContainerEndpoint.java | 126 +++++++++------------
.../recon/spi/ReconContainerMetadataManager.java | 4 +
.../impl/ReconContainerMetadataManagerImpl.java | 116 ++++++++++++-------
.../TestReconContainerMetadataManagerImpl.java | 4 +-
5 files changed, 160 insertions(+), 118 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/SeekableIterator.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/SeekableIterator.java
new file mode 100644
index 0000000000..fb424b28ca
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/SeekableIterator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.util;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * An {@link Iterator} that may hold resources until it is closed.
+ */
+public interface SeekableIterator<K, E> extends ClosableIterator<E> {
+ void seek(K position) throws IOException;
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
index 041bcc8e6b..b38e7138a1 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
@@ -29,8 +29,9 @@
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -50,7 +51,6 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -80,11 +80,13 @@
import org.apache.hadoop.ozone.recon.scm.ReconContainerManager;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
+import org.apache.hadoop.ozone.util.SeekableIterator;
import
org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates;
import
org.apache.ozone.recon.schema.generated.tables.pojos.UnhealthyContainers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Endpoint for querying keys that belong to a container.
*/
@@ -585,109 +587,87 @@ public Response getContainerMisMatchInsights(
List<ContainerDiscrepancyInfo> containerDiscrepancyInfoList =
new ArrayList<>();
- try {
- Map<Long, ContainerMetadata> omContainers =
- reconContainerMetadataManager.getContainers(-1, -1);
- List<Long> scmNonDeletedContainers =
- containerManager.getContainers().stream()
- .filter(containerInfo -> containerInfo.getState() !=
- HddsProtos.LifeCycleState.DELETED)
- .map(containerInfo -> containerInfo.getContainerID())
- .collect(Collectors.toList());
- DataFilter dataFilter = DataFilter.fromValue(missingIn.toUpperCase());
-
+ Long minContainerID = prevKey + 1;
+ Iterator<ContainerInfo> scmNonDeletedContainers =
+ containerManager.getContainers().stream()
+ .filter(containerInfo -> (containerInfo.getContainerID()
>= minContainerID))
+ .filter(containerInfo -> containerInfo.getState() !=
HddsProtos.LifeCycleState.DELETED)
+
.sorted(Comparator.comparingLong(ContainerInfo::getContainerID)).iterator();
+ ContainerInfo scmContainerInfo = scmNonDeletedContainers.hasNext() ?
+ scmNonDeletedContainers.next() : null;
+ DataFilter dataFilter = DataFilter.fromValue(missingIn.toUpperCase());
+ try (SeekableIterator<Long, ContainerMetadata> omContainers =
+ reconContainerMetadataManager.getContainersIterator()) {
+ omContainers.seek(minContainerID);
+ ContainerMetadata containerMetadata = omContainers.hasNext() ?
omContainers.next() : null;
switch (dataFilter) {
-
case SCM:
- List<Map.Entry<Long, ContainerMetadata>> notSCMContainers =
- omContainers.entrySet().stream()
- .filter(
- containerMetadataEntry ->
!scmNonDeletedContainers.contains(
- containerMetadataEntry.getKey()))
- .collect(Collectors.toList());
-
- if (prevKey > 0) {
- int index = 0;
- while (index < notSCMContainers.size() &&
- notSCMContainers.get(index).getKey() <= prevKey) {
- index++;
- }
- if (index < notSCMContainers.size()) {
- notSCMContainers = notSCMContainers.subList(index,
- Math.min(index + limit, notSCMContainers.size()));
+ List<ContainerMetadata> notSCMContainers = new ArrayList<>();
+ while (containerMetadata != null && notSCMContainers.size() < limit) {
+ Long omContainerID = containerMetadata.getContainerID();
+ Long scmContainerID = scmContainerInfo == null ? null :
scmContainerInfo.getContainerID();
+ if (omContainerID.equals(scmContainerID)) {
+ containerMetadata = omContainers.hasNext() ? omContainers.next() :
null;
+ scmContainerInfo = scmNonDeletedContainers.hasNext() ?
scmNonDeletedContainers.next() : null;
+ } else if (scmContainerID == null ||
omContainerID.compareTo(scmContainerID) < 0) {
+ notSCMContainers.add(containerMetadata);
+ containerMetadata = omContainers.hasNext() ? omContainers.next() :
null;
} else {
- notSCMContainers = Collections.emptyList();
+ scmContainerInfo = scmNonDeletedContainers.hasNext() ?
scmNonDeletedContainers.next() : null;
}
- } else {
- notSCMContainers = notSCMContainers.subList(0,
- Math.min(limit, notSCMContainers.size()));
}
notSCMContainers.forEach(nonSCMContainer -> {
ContainerDiscrepancyInfo containerDiscrepancyInfo =
new ContainerDiscrepancyInfo();
- containerDiscrepancyInfo.setContainerID(nonSCMContainer.getKey());
+
containerDiscrepancyInfo.setContainerID(nonSCMContainer.getContainerID());
containerDiscrepancyInfo.setNumberOfKeys(
- nonSCMContainer.getValue().getNumberOfKeys());
+ nonSCMContainer.getNumberOfKeys());
containerDiscrepancyInfo.setPipelines(
- nonSCMContainer.getValue().getPipelines());
+ nonSCMContainer.getPipelines());
containerDiscrepancyInfo.setExistsAt("OM");
containerDiscrepancyInfoList.add(containerDiscrepancyInfo);
});
break;
case OM:
- List<Long> nonOMContainers = scmNonDeletedContainers.stream()
- .filter(containerId -> !omContainers.containsKey(containerId))
- .collect(Collectors.toList());
-
- if (prevKey > 0) {
- int index = 0;
- while (index < nonOMContainers.size() &&
- nonOMContainers.get(index) <= prevKey) {
- index++;
- }
- if (index < nonOMContainers.size()) {
- nonOMContainers = nonOMContainers.subList(index,
- Math.min(index + limit, nonOMContainers.size()));
+ List<ContainerInfo> nonOMContainers = new ArrayList<>();
+ while (scmContainerInfo != null && nonOMContainers.size() < limit) {
+ Long omContainerID = containerMetadata == null ? null :
containerMetadata.getContainerID();
+ Long scmContainerID = scmContainerInfo.getContainerID();
+ if (scmContainerID.equals(omContainerID)) {
+ scmContainerInfo = scmNonDeletedContainers.hasNext() ?
scmNonDeletedContainers.next() : null;
+ containerMetadata = omContainers.hasNext() ? omContainers.next() :
null;
+ } else if (omContainerID == null ||
scmContainerID.compareTo(omContainerID) < 0) {
+ nonOMContainers.add(scmContainerInfo);
+ scmContainerInfo = scmNonDeletedContainers.hasNext() ?
scmNonDeletedContainers.next() : null;
} else {
- nonOMContainers = Collections.emptyList();
+ //Seeking directly to SCM containerId sequential read is just
wasteful here if there are too many values
+ // to be read in b/w omContainerID & scmContainerID since
(omContainerId<scmContainerID)
+ omContainers.seek(scmContainerID);
+ containerMetadata = omContainers.hasNext() ? omContainers.next() :
null;
}
- } else {
- nonOMContainers = nonOMContainers.subList(0,
- Math.min(limit, nonOMContainers.size()));
}
List<Pipeline> pipelines = new ArrayList<>();
- nonOMContainers.forEach(nonOMContainerId -> {
- boolean containerExistsInScm = true;
- ContainerDiscrepancyInfo containerDiscrepancyInfo =
- new ContainerDiscrepancyInfo();
- containerDiscrepancyInfo.setContainerID(nonOMContainerId);
+ nonOMContainers.forEach(containerInfo -> {
+ ContainerDiscrepancyInfo containerDiscrepancyInfo = new
ContainerDiscrepancyInfo();
+
containerDiscrepancyInfo.setContainerID(containerInfo.getContainerID());
containerDiscrepancyInfo.setNumberOfKeys(0);
PipelineID pipelineID = null;
try {
- pipelineID = containerManager.getContainer(
- ContainerID.valueOf(nonOMContainerId)).getPipelineID();
+ pipelineID = containerInfo.getPipelineID();
if (pipelineID != null) {
pipelines.add(pipelineManager.getPipeline(pipelineID));
}
- } catch (ContainerNotFoundException e) {
- containerExistsInScm = false;
- LOG.warn("Container {} not found in SCM: {}", nonOMContainerId,
- e);
} catch (PipelineNotFoundException e) {
LOG.debug(
"Pipeline not found for container: {} and pipelineId: {}",
- nonOMContainerId, pipelineID, e);
- }
- // The container might have been deleted in SCM after the call to
- // get the list of containers
- if (containerExistsInScm) {
- containerDiscrepancyInfo.setPipelines(pipelines);
- containerDiscrepancyInfo.setExistsAt("SCM");
- containerDiscrepancyInfoList.add(containerDiscrepancyInfo);
+ containerInfo, pipelineID, e);
}
+ containerDiscrepancyInfo.setPipelines(pipelines);
+ containerDiscrepancyInfo.setExistsAt("SCM");
+ containerDiscrepancyInfoList.add(containerDiscrepancyInfo);
});
break;
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerMetadataManager.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerMetadataManager.java
index 24605c95a0..1400279d12 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerMetadataManager.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerMetadataManager.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata;
import org.apache.hadoop.ozone.recon.api.types.KeyPrefixContainer;
import org.apache.hadoop.ozone.recon.scm.ContainerReplicaHistory;
+import org.apache.hadoop.ozone.util.SeekableIterator;
/**
* The Recon Container DB Service interface.
@@ -186,6 +187,9 @@ Map<ContainerKeyPrefix, Integer> getKeyPrefixesForContainer(
Map<Long, ContainerMetadata> getContainers(int limit, long prevContainer)
throws IOException;
+
+ SeekableIterator<Long, ContainerMetadata> getContainersIterator() throws
IOException;
+
/**
* Delete an entry in the container DB.
*
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerMetadataManagerImpl.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerMetadataManagerImpl.java
index 27567333d9..17cb793985 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerMetadataManagerImpl.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerMetadataManagerImpl.java
@@ -26,6 +26,7 @@
import jakarta.annotation.Nonnull;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@@ -38,6 +39,7 @@
import javax.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
@@ -54,6 +56,7 @@
import org.apache.hadoop.ozone.recon.scm.ContainerReplicaHistory;
import org.apache.hadoop.ozone.recon.scm.ContainerReplicaHistoryList;
import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
+import org.apache.hadoop.ozone.util.SeekableIterator;
import org.apache.ozone.recon.schema.generated.tables.daos.GlobalStatsDao;
import org.apache.ozone.recon.schema.generated.tables.pojos.GlobalStats;
import org.jooq.Configuration;
@@ -431,53 +434,80 @@ public Map<Long, ContainerMetadata> getContainers(int
limit,
long prevContainer)
throws IOException {
Map<Long, ContainerMetadata> containers = new LinkedHashMap<>();
- try (
- TableIterator<ContainerKeyPrefix,
- ? extends KeyValue<ContainerKeyPrefix, Integer>>
- containerIterator = containerKeyTable.iterator()) {
- ContainerKeyPrefix seekKey;
- if (prevContainer > 0L) {
- seekKey = ContainerKeyPrefix.get(prevContainer);
- KeyValue<ContainerKeyPrefix,
- Integer> seekKeyValue = containerIterator.seek(seekKey);
- // Check if RocksDB was able to correctly seek to the given
- // prevContainer containerId. If not, then return empty result
- if (seekKeyValue != null &&
- seekKeyValue.getKey().getContainerId() != prevContainer) {
- return containers;
- } else {
- // seek to the prevContainer+1 containerID to start scan
- seekKey = ContainerKeyPrefix.get(prevContainer + 1);
- containerIterator.seek(seekKey);
- }
+ try (SeekableIterator<Long, ContainerMetadata> containerIterator =
getContainersIterator()) {
+ containerIterator.seek(prevContainer + 1);
+ while (containerIterator.hasNext() && ((limit < 0) || containers.size()
< limit)) {
+ ContainerMetadata containerMetadata = containerIterator.next();
+ containers.put(containerMetadata.getContainerID(), containerMetadata);
}
- while (containerIterator.hasNext()) {
- KeyValue<ContainerKeyPrefix, Integer> keyValue =
- containerIterator.next();
- ContainerKeyPrefix containerKeyPrefix = keyValue.getKey();
- Long containerID = containerKeyPrefix.getContainerId();
- Integer numberOfKeys = keyValue.getValue();
- List<Pipeline> pipelines =
- getPipelines(containerKeyPrefix);
-
- // break the loop if limit has been reached
- // and one more new entity needs to be added to the containers map
- if (containers.size() == limit &&
- !containers.containsKey(containerID)) {
- break;
- }
+ }
+ return containers;
+ }
+
+ @Override
+ public SeekableIterator<Long, ContainerMetadata> getContainersIterator()
+ throws IOException {
+ return new ContainerMetadataIterator();
+ }
+
+ private class ContainerMetadataIterator implements SeekableIterator<Long,
ContainerMetadata> {
+ private TableIterator<ContainerKeyPrefix, ? extends
KeyValue<ContainerKeyPrefix, Integer>> containerIterator;
+ private KeyValue<ContainerKeyPrefix, Integer> currentKey;
+
+ ContainerMetadataIterator()
+ throws IOException {
+ containerIterator = containerKeyTable.iterator();
+ currentKey = containerIterator.hasNext() ? containerIterator.next() :
null;
+ }
- // initialize containerMetadata with 0 as number of keys.
- containers.computeIfAbsent(containerID, ContainerMetadata::new);
- // increment number of keys for the containerID
- ContainerMetadata containerMetadata = containers.get(containerID);
- containerMetadata.setNumberOfKeys(containerMetadata.getNumberOfKeys() +
- numberOfKeys);
- containerMetadata.setPipelines(pipelines);
- containers.put(containerID, containerMetadata);
+ @Override
+ public void seek(Long containerID) throws IOException {
+ ContainerKeyPrefix seekKey = ContainerKeyPrefix.get(containerID);
+ containerIterator.seek(seekKey);
+ currentKey = containerIterator.hasNext() ? containerIterator.next() :
null;
+ }
+
+ @Override
+ public void close() {
+ try {
+ containerIterator.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return currentKey != null;
+ }
+
+ @Override
+ public ContainerMetadata next() {
+ try {
+ if (currentKey == null) {
+ return null;
+ }
+ Map<PipelineID, Pipeline> pipelines = new HashMap<>();
+ ContainerMetadata containerMetadata = new
ContainerMetadata(currentKey.getKey().getContainerId());
+ do {
+ ContainerKeyPrefix containerKeyPrefix = this.currentKey.getKey();
+
containerMetadata.setNumberOfKeys(containerMetadata.getNumberOfKeys() + 1);
+ getPipelines(containerKeyPrefix).forEach(pipeline -> {
+ pipelines.putIfAbsent(pipeline.getId(), pipeline);
+ });
+ if (containerIterator.hasNext()) {
+ currentKey = containerIterator.next();
+ } else {
+ currentKey = null;
+ }
+ } while (currentKey != null &&
+ currentKey.getKey().getContainerId() ==
containerMetadata.getContainerID());
+ containerMetadata.setPipelines(new ArrayList<>(pipelines.values()));
+ return containerMetadata;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
}
}
- return containers;
}
@Nonnull
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestReconContainerMetadataManagerImpl.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestReconContainerMetadataManagerImpl.java
index dbac3e5ee5..a3b5cef123 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestReconContainerMetadataManagerImpl.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestReconContainerMetadataManagerImpl.java
@@ -365,8 +365,8 @@ public void testGetContainersWithPrevContainer() throws
Exception {
reconContainerMetadataManager.getContainers(-1, 0L);
assertEquals(2, containerMap.size());
- assertEquals(3, containerMap.get(containerId).getNumberOfKeys());
- assertEquals(3, containerMap.get(nextContainerId).getNumberOfKeys());
+ assertEquals(2, containerMap.get(containerId).getNumberOfKeys());
+ assertEquals(1, containerMap.get(nextContainerId).getNumberOfKeys());
// test if limit works
containerMap = reconContainerMetadataManager.getContainers(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]