This is an automated email from the ASF dual-hosted git repository.

swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 90994bb75f7 HDDS-14330. MinHeapMergeIterator should use key comparator 
while popping out entries from the heap (#9576)
90994bb75f7 is described below

commit 90994bb75f771f2023f9697430678ff05e3cc957
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Fri Jan 2 09:03:57 2026 -0500

    HDDS-14330. MinHeapMergeIterator should use key comparator while popping 
out entries from the heap (#9576)
---
 .../org/apache/hadoop/hdds/utils/db/DBStore.java   |  2 +-
 .../hadoop/hdds/utils/db/MinHeapMergeIterator.java |  4 +-
 .../hdds/utils/db/TestMinHeapMergeIterator.java    | 73 ++++++++++++----------
 3 files changed, 43 insertions(+), 36 deletions(-)

diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
index 298260efdf9..f83f9b3d10a 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
@@ -198,7 +198,7 @@ default <KEY> ClosableIterator<KeyValue<KEY, 
Collection<Object>>> getMergeIterat
     KeyValue<KEY, Object> defaultNullValue = newKeyValue(null, null);
     Comparator<KeyValue<KEY, Object>> comparator = 
Comparator.comparing(KeyValue::getKey, keyComparator);
     return new MinHeapMergeIterator<KeyValue<KEY, Object>, 
Table.KeyValueIterator<KEY, Object>,
-        KeyValue<KEY, Collection<Object>>>(table.length + 1, comparator) {
+        KeyValue<KEY, Collection<Object>>>(table.length, comparator) {
       @Override
       protected Table.KeyValueIterator<KEY, Object> getIterator(int idx) 
throws IOException {
         return table[idx].iterator(prefix);
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/MinHeapMergeIterator.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/MinHeapMergeIterator.java
index 8326b9e9ca1..1536619d51c 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/MinHeapMergeIterator.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/MinHeapMergeIterator.java
@@ -55,7 +55,7 @@ public MinHeapMergeIterator(int numberOfIterators, 
Comparator<K> comparator) {
     keys = new HashMap<>(numberOfIterators);
     iterators = IntStream.range(0, numberOfIterators).mapToObj(i -> (I) 
null).collect(Collectors.toList());
     this.initialized = false;
-    this.comparator = comparator;
+    this.comparator = Objects.requireNonNull(comparator, "comparator cannot be 
null");
   }
 
   protected abstract I getIterator(int idx) throws IOException;
@@ -109,7 +109,7 @@ public V next() {
     // Clear the keys list by setting all entries to null.
     keys.clear();
     // Advance all entries with the same key (from different files)
-    while (!minHeap.isEmpty() && 
Objects.equals(minHeap.peek().getCurrentKey(), currentKey)) {
+    while (!minHeap.isEmpty() && 
comparator.compare(minHeap.peek().getCurrentKey(), currentKey) == 0) {
       HeapEntry<K> entry = minHeap.poll();
       int idx = entry.index;
       // Set the key for the current entry in the keys list.
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/hadoop/hdds/utils/db/TestMinHeapMergeIterator.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/hadoop/hdds/utils/db/TestMinHeapMergeIterator.java
index 1ac177c646b..d79fab7796c 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/hadoop/hdds/utils/db/TestMinHeapMergeIterator.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/hadoop/hdds/utils/db/TestMinHeapMergeIterator.java
@@ -26,10 +26,12 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.primitives.UnsignedBytes;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
@@ -38,6 +40,7 @@
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
+import org.apache.hadoop.hdds.StringUtils;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -45,7 +48,7 @@
  */
 class TestMinHeapMergeIterator {
 
-  private static final Comparator<String> STRING_COMPARATOR = 
String::compareTo;
+  private static final Comparator<byte[]> BYTE_COMPARATOR = 
UnsignedBytes.lexicographicalComparator();
 
   /**
    * A closeable iterator which tracks close() calls.
@@ -87,8 +90,8 @@ private static final class MergeResult {
     private final String key;
     private final Set<Integer> sources;
 
-    private MergeResult(String key, Set<Integer> sources) {
-      this.key = key;
+    private MergeResult(byte[] key, Set<Integer> sources) {
+      this.key = StringUtils.bytes2String(key);
       this.sources = sources;
     }
 
@@ -104,17 +107,17 @@ Set<Integer> getSources() {
   /**
    * Concrete implementation for tests.
    */
-  private static final class TestIterator extends MinHeapMergeIterator<String,
-      TrackingCloseableIterator<String>, MergeResult> {
+  private static final class TestIterator extends MinHeapMergeIterator<byte[],
+      TrackingCloseableIterator<byte[]>, MergeResult> {
 
-    private final List<TrackingCloseableIterator<String>> itrs;
+    private final List<TrackingCloseableIterator<byte[]>> itrs;
     private final List<MergeResult> merged = new ArrayList<>();
 
     private IOException ioExceptionAtIndex;
     private int exceptionIndex = -1;
 
-    private TestIterator(List<TrackingCloseableIterator<String>> itrs) {
-      super(itrs.size(), STRING_COMPARATOR);
+    private TestIterator(List<TrackingCloseableIterator<byte[]>> itrs) {
+      super(itrs.size(), BYTE_COMPARATOR);
       this.itrs = itrs;
     }
 
@@ -125,7 +128,7 @@ private TestIterator withGetIteratorIOException(int index, 
IOException ex) {
     }
 
     @Override
-    protected TrackingCloseableIterator<String> getIterator(int idx)
+    protected TrackingCloseableIterator<byte[]> getIterator(int idx)
         throws IOException {
       if (idx == exceptionIndex) {
         if (ioExceptionAtIndex != null) {
@@ -136,9 +139,9 @@ protected TrackingCloseableIterator<String> getIterator(int 
idx)
     }
 
     @Override
-    protected MergeResult merge(Map<Integer, String> keysToMerge) {
+    protected MergeResult merge(Map<Integer, byte[]> keysToMerge) {
       // All values in keysToMerge are expected to be equal (same key across 
iterators).
-      String key = keysToMerge.values().iterator().next();
+      byte[] key = keysToMerge.values().iterator().next();
       MergeResult r = new MergeResult(key, new 
HashSet<>(keysToMerge.keySet()));
       merged.add(r);
       return r;
@@ -149,14 +152,18 @@ List<MergeResult> getMerged() {
     }
   }
 
+  private ImmutableList<byte[]> toBytesList(String... keys) {
+    return 
Arrays.stream(keys).map(StringUtils::string2Bytes).collect(ImmutableList.toImmutableList());
+  }
+
   @Test
   void testMergedOrderAndDuplicateGroupingAndAutoCloseOnExhaustion() {
-    TrackingCloseableIterator<String> itr0 =
-        new TrackingCloseableIterator<>(ImmutableList.of("a", "c", "e", "g"));
-    TrackingCloseableIterator<String> itr1 =
-        new TrackingCloseableIterator<>(ImmutableList.of("b", "c", "d", "g", 
"h"));
-    TrackingCloseableIterator<String> itr2 =
-        new TrackingCloseableIterator<>(ImmutableList.of("c", "e", "f", "h"));
+    TrackingCloseableIterator<byte[]> itr0 =
+        new TrackingCloseableIterator<>(toBytesList("a", "c", "e", "g"));
+    TrackingCloseableIterator<byte[]> itr1 =
+        new TrackingCloseableIterator<>(toBytesList("b", "c", "d", "g", "h"));
+    TrackingCloseableIterator<byte[]> itr2 =
+        new TrackingCloseableIterator<>(toBytesList("c", "e", "f", "h"));
 
     List<String> keys = new ArrayList<>();
     try (TestIterator mergeItr = new TestIterator(ImmutableList.of(itr0, itr1, 
itr2))) {
@@ -195,10 +202,10 @@ void 
testMergedOrderAndDuplicateGroupingAndAutoCloseOnExhaustion() {
 
   @Test
   void testInitClosesEmptyIterators() {
-    TrackingCloseableIterator<String> empty =
+    TrackingCloseableIterator<byte[]> empty =
         new TrackingCloseableIterator<>(Collections.emptyList());
-    TrackingCloseableIterator<String> nonEmpty =
-        new TrackingCloseableIterator<>(ImmutableList.of("a"));
+    TrackingCloseableIterator<byte[]> nonEmpty =
+        new TrackingCloseableIterator<>(toBytesList("a"));
 
     try (TestIterator mergeItr = new TestIterator(ImmutableList.of(empty, 
nonEmpty))) {
       assertTrue(mergeItr.hasNext()); // triggers init
@@ -212,10 +219,10 @@ void testInitClosesEmptyIterators() {
 
   @Test
   void testCloseClosesAllIterators() {
-    TrackingCloseableIterator<String> itr0 =
-        new TrackingCloseableIterator<>(ImmutableList.of("a", "c"));
-    TrackingCloseableIterator<String> itr1 =
-        new TrackingCloseableIterator<>(ImmutableList.of("b", "d"));
+    TrackingCloseableIterator<byte[]> itr0 =
+        new TrackingCloseableIterator<>(toBytesList("a", "c"));
+    TrackingCloseableIterator<byte[]> itr1 =
+        new TrackingCloseableIterator<>(toBytesList("b", "d"));
 
     try (TestIterator mergeItr = new TestIterator(ImmutableList.of(itr0, 
itr1))) {
       assertTrue(mergeItr.hasNext()); // triggers init
@@ -233,10 +240,10 @@ void testCloseClosesAllIterators() {
   @Test
   void testHasNextWrapsIOExceptionFromGetIterator() {
     IOException expected = new IOException("boom");
-    TrackingCloseableIterator<String> itr0 =
-        new TrackingCloseableIterator<>(ImmutableList.of("a"));
-    TrackingCloseableIterator<String> itr1 =
-        new TrackingCloseableIterator<>(ImmutableList.of("b"));
+    TrackingCloseableIterator<byte[]> itr0 =
+        new TrackingCloseableIterator<>(toBytesList("a"));
+    TrackingCloseableIterator<byte[]> itr1 =
+        new TrackingCloseableIterator<>(toBytesList("b"));
     TestIterator mergeItr = new TestIterator(ImmutableList.of(itr0, itr1));
     mergeItr.withGetIteratorIOException(1, expected);
     try (TestIterator ignored = mergeItr) {
@@ -253,10 +260,10 @@ void testHasNextWrapsIOExceptionFromGetIterator() {
 
   @Test
   void 
testHasNextWrapsRocksDBExceptionFromGetIteratorAndClosesOpenedIterators() 
throws Exception {
-    TrackingCloseableIterator<String> itr0 =
-        new TrackingCloseableIterator<>(ImmutableList.of("a", "b"));
-    TrackingCloseableIterator<String> itr1 =
-        new TrackingCloseableIterator<>(ImmutableList.of("c"));
+    TrackingCloseableIterator<byte[]> itr0 =
+        new TrackingCloseableIterator<>(toBytesList("a", "b"));
+    TrackingCloseableIterator<byte[]> itr1 =
+        new TrackingCloseableIterator<>(toBytesList("c"));
     RocksDatabaseException rdbEx = new RocksDatabaseException("rocks");
     TestIterator mergeItr = new TestIterator(ImmutableList.of(itr0, itr1));
     mergeItr.withGetIteratorIOException(1, rdbEx);
@@ -275,7 +282,7 @@ void 
testHasNextWrapsRocksDBExceptionFromGetIteratorAndClosesOpenedIterators() t
 
   @Test
   void testNextWhenEmptyThrowsNoSuchElement() {
-    TrackingCloseableIterator<String> empty =
+    TrackingCloseableIterator<byte[]> empty =
         new TrackingCloseableIterator<>(Collections.emptyList());
     try (TestIterator mergeItr = new TestIterator(ImmutableList.of(empty))) {
       assertFalse(mergeItr.hasNext());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to