This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 58819e19b41 KAFKA-20492: Prepare DBAccessor for txn support (#22142)
58819e19b41 is described below

commit 58819e19b41803896a8fd165a058be3118431138
Author: Nick Telford <[email protected]>
AuthorDate: Fri May 15 22:18:53 2026 +0100

    KAFKA-20492: Prepare DBAccessor for txn support (#22142)
    
    Moves iterator construction (all, range, prefixScan) from
    SingleColumnFamilyAccessor into default methods on DBAccessor.
    SingleColumnFamilyAccessor now delegates to the accessor, allowing
    future DBAccessor implementations (e.g. a transactional wrapper) to
    override iteration behaviour.
    
    Adds no-op commitStagedWrites() and rollbackStagedWrites() defaults to
    DBAccessor, and calls commitStagedWrites() from
    AbstractColumnFamilyAccessor.commit(). These are no-ops for the existing
    DirectDBAccessor but provide the hook for a transactional accessor to
    flush buffered writes atomically with offset commits.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../internals/AbstractColumnFamilyAccessor.java    |   1 +
 .../state/internals/DualColumnFamilyAccessor.java  | 204 +++++----------------
 .../streams/state/internals/RocksDBStore.java      |  55 +++---
 .../AbstractColumnFamilyAccessorTest.java          |   7 +
 .../internals/DualColumnFamilyAccessorTest.java    | 178 +++++++++++++++---
 5 files changed, 241 insertions(+), 204 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
index 004c824cc3e..2cf776423a0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
@@ -69,6 +69,7 @@ abstract class AbstractColumnFamilyAccessor implements 
RocksDBStore.ColumnFamily
                 }
             }
         }
+        accessor.commitStagedWrites();
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
index 7d25989083b..cb23ef9134b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
@@ -26,7 +26,6 @@ import 
org.apache.kafka.streams.state.internals.RocksDBStore.DBAccessor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
 import org.rocksdb.WriteBatchInterface;
 
 import java.util.Comparator;
@@ -177,15 +176,13 @@ class DualColumnFamilyAccessor extends 
AbstractColumnFamilyAccessor {
                                                         final Bytes from,
                                                         final Bytes to,
                                                         final boolean forward) 
{
-        return new RocksDBDualCFRangeIterator(
-            store.name(),
-            accessor.newIterator(newColumnFamily),
-            accessor.newIterator(oldColumnFamily),
-            from,
-            to,
-            forward,
-            true,
-            valueConverter);
+        final ManagedKeyValueIterator<Bytes, byte[]> iterNew =
+                accessor.range(newColumnFamily, store.name(), from, to, 
forward, true);
+        iterNew.onClose(() -> { });
+        final ManagedKeyValueIterator<Bytes, byte[]> iterOld =
+                accessor.range(oldColumnFamily, store.name(), from, to, 
forward, true);
+        iterOld.onClose(() -> { });
+        return new RocksDBDualCFIterator(store.name(), iterNew, iterOld, 
forward, valueConverter);
     }
 
     @Override
@@ -205,32 +202,26 @@ class DualColumnFamilyAccessor extends 
AbstractColumnFamilyAccessor {
     @Override
     public ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor 
accessor,
                                                       final boolean forward) {
-        final RocksIterator innerIterNew = 
accessor.newIterator(newColumnFamily);
-        final RocksIterator innerIterOld = 
accessor.newIterator(oldColumnFamily);
-        if (forward) {
-            innerIterNew.seekToFirst();
-            innerIterOld.seekToFirst();
-        } else {
-            innerIterNew.seekToLast();
-            innerIterOld.seekToLast();
-        }
-        return new RocksDBDualCFIterator(store.name(), innerIterNew, 
innerIterOld, forward, valueConverter);
+        final ManagedKeyValueIterator<Bytes, byte[]> iterNew =
+                accessor.all(newColumnFamily, store.name(), forward);
+        iterNew.onClose(() -> { });
+        final ManagedKeyValueIterator<Bytes, byte[]> iterOld =
+                accessor.all(oldColumnFamily, store.name(), forward);
+        iterOld.onClose(() -> { });
+        return new RocksDBDualCFIterator(store.name(), iterNew, iterOld, 
forward, valueConverter);
     }
 
     @Override
     public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final DBAccessor 
accessor,
                                                              final Bytes 
prefix) {
         final Bytes to = incrementWithoutOverflow(prefix);
-        return new RocksDBDualCFRangeIterator(
-            store.name(),
-            accessor.newIterator(newColumnFamily),
-            accessor.newIterator(oldColumnFamily),
-            prefix,
-            to,
-            true,
-            false,
-            valueConverter
-        );
+        final ManagedKeyValueIterator<Bytes, byte[]> iterNew =
+                accessor.prefixScan(newColumnFamily, store.name(), prefix, to);
+        iterNew.onClose(() -> { });
+        final ManagedKeyValueIterator<Bytes, byte[]> iterOld =
+                accessor.prefixScan(oldColumnFamily, store.name(), prefix, to);
+        iterOld.onClose(() -> { });
+        return new RocksDBDualCFIterator(store.name(), iterNew, iterOld, true, 
valueConverter);
     }
 
     @Override
@@ -270,21 +261,19 @@ class DualColumnFamilyAccessor extends 
AbstractColumnFamilyAccessor {
         private final Comparator<byte[]> comparator = 
ByteUtils.BYTES_LEXICO_COMPARATOR;
 
         private final String storeName;
-        private final RocksIterator iterNewFormat;
-        private final RocksIterator iterOldFormat;
+        private final ManagedKeyValueIterator<Bytes, byte[]> iterNewFormat;
+        private final ManagedKeyValueIterator<Bytes, byte[]> iterOldFormat;
         private final boolean forward;
         private final Function<byte[], byte[]> valueConverter;
 
         private volatile boolean open = true;
 
-        private byte[] nextNewFormat;
-        private byte[] nextOldFormat;
         private KeyValue<Bytes, byte[]> next;
         private Runnable closeCallback = null;
 
         RocksDBDualCFIterator(final String storeName,
-                              final RocksIterator iterNewFormat,
-                              final RocksIterator iterOldFormat,
+                              final ManagedKeyValueIterator<Bytes, byte[]> 
iterNewFormat,
+                              final ManagedKeyValueIterator<Bytes, byte[]> 
iterOldFormat,
                               final boolean forward,
                               final Function<byte[], byte[]> valueConverter) {
             this.iterNewFormat = iterNewFormat;
@@ -310,58 +299,35 @@ class DualColumnFamilyAccessor extends 
AbstractColumnFamilyAccessor {
 
         @Override
         protected KeyValue<Bytes, byte[]> makeNext() {
-            if (nextOldFormat == null && iterOldFormat.isValid()) {
-                nextOldFormat = iterOldFormat.key();
-            }
+            final boolean oldHas = iterOldFormat.hasNext();
+            final boolean newHas = iterNewFormat.hasNext();
 
-            if (nextNewFormat == null && iterNewFormat.isValid()) {
-                nextNewFormat = iterNewFormat.key();
+            if (!oldHas && !newHas) {
+                return allDone();
+            }
+            if (!oldHas) {
+                next = iterNewFormat.next();
+                return next;
+            }
+            if (!newHas) {
+                final KeyValue<Bytes, byte[]> kv = iterOldFormat.next();
+                next = KeyValue.pair(kv.key, valueConverter.apply(kv.value));
+                return next;
             }
 
-            if (nextOldFormat == null && !iterOldFormat.isValid()) {
-                if (nextNewFormat == null && !iterNewFormat.isValid()) {
-                    return allDone();
-                } else {
-                    next = KeyValue.pair(new Bytes(nextNewFormat), 
iterNewFormat.value());
-                    nextNewFormat = null;
-                    if (forward) {
-                        iterNewFormat.next();
-                    } else {
-                        iterNewFormat.prev();
-                    }
-                }
+            final int cmp = comparator.compare(
+                    iterOldFormat.peekNextKey().get(),
+                    iterNewFormat.peekNextKey().get());
+            // New-format wins on equality: the new CF value supersedes the 
old CF value for
+            // the same key. Advance both sides to avoid re-emitting the 
shadowed old entry.
+            if (cmp == 0) {
+                iterOldFormat.next();
+                next = iterNewFormat.next();
+            } else if (forward ? (cmp < 0) : (cmp > 0)) {
+                final KeyValue<Bytes, byte[]> kv = iterOldFormat.next();
+                next = KeyValue.pair(kv.key, valueConverter.apply(kv.value));
             } else {
-                if (nextNewFormat == null) {
-                    next = KeyValue.pair(new Bytes(nextOldFormat), 
valueConverter.apply(iterOldFormat.value()));
-                    nextOldFormat = null;
-                    if (forward) {
-                        iterOldFormat.next();
-                    } else {
-                        iterOldFormat.prev();
-                    }
-                } else {
-                    if (forward) {
-                        if (comparator.compare(nextOldFormat, nextNewFormat) 
<= 0) {
-                            next = KeyValue.pair(new Bytes(nextOldFormat), 
valueConverter.apply(iterOldFormat.value()));
-                            nextOldFormat = null;
-                            iterOldFormat.next();
-                        } else {
-                            next = KeyValue.pair(new Bytes(nextNewFormat), 
iterNewFormat.value());
-                            nextNewFormat = null;
-                            iterNewFormat.next();
-                        }
-                    } else {
-                        if (comparator.compare(nextOldFormat, nextNewFormat) 
>= 0) {
-                            next = KeyValue.pair(new Bytes(nextOldFormat), 
valueConverter.apply(iterOldFormat.value()));
-                            nextOldFormat = null;
-                            iterOldFormat.prev();
-                        } else {
-                            next = KeyValue.pair(new Bytes(nextNewFormat), 
iterNewFormat.value());
-                            nextNewFormat = null;
-                            iterNewFormat.prev();
-                        }
-                    }
-                }
+                next = iterNewFormat.next();
             }
             return next;
         }
@@ -392,74 +358,4 @@ class DualColumnFamilyAccessor extends 
AbstractColumnFamilyAccessor {
             this.closeCallback = closeCallback;
         }
     }
-
-    private static class RocksDBDualCFRangeIterator extends 
RocksDBDualCFIterator {
-    // RocksDB's JNI interface does not expose getters/setters that allow the
-    // comparator to be pluggable, and the default is lexicographic, so it's
-    // safe to just force lexicographic comparator here for now.
-        private final Comparator<byte[]> comparator = 
ByteUtils.BYTES_LEXICO_COMPARATOR;
-        private final byte[] rawLastKey;
-        private final boolean forward;
-        private final boolean toInclusive;
-
-        RocksDBDualCFRangeIterator(final String storeName,
-                                   final RocksIterator iterNewFormat,
-                                   final RocksIterator iterOldFormat,
-                                   final Bytes from,
-                                   final Bytes to,
-                                   final boolean forward,
-                                   final boolean toInclusive,
-                                   final Function<byte[], byte[]> 
valueConverter) {
-            super(storeName, iterNewFormat, iterOldFormat, forward, 
valueConverter);
-            this.forward = forward;
-            this.toInclusive = toInclusive;
-            if (forward) {
-                if (from == null) {
-                    iterNewFormat.seekToFirst();
-                    iterOldFormat.seekToFirst();
-                } else {
-                    iterNewFormat.seek(from.get());
-                    iterOldFormat.seek(from.get());
-                }
-                rawLastKey = to == null ? null : to.get();
-            } else {
-                if (to == null) {
-                    iterNewFormat.seekToLast();
-                    iterOldFormat.seekToLast();
-                } else {
-                    iterNewFormat.seekForPrev(to.get());
-                    iterOldFormat.seekForPrev(to.get());
-                }
-                rawLastKey = from == null ? null : from.get();
-            }
-        }
-
-        @Override
-        protected KeyValue<Bytes, byte[]> makeNext() {
-            final KeyValue<Bytes, byte[]> next = super.makeNext();
-
-            if (next == null) {
-                return allDone();
-            } else if (rawLastKey == null) {
-                // null means range endpoint is open
-                return next;
-            } else {
-                if (forward) {
-                    if (comparator.compare(next.key.get(), rawLastKey) < 0) {
-                        return next;
-                    } else if (comparator.compare(next.key.get(), rawLastKey) 
== 0) {
-                        return toInclusive ? next : allDone();
-                    } else {
-                        return allDone();
-                    }
-                } else {
-                    if (comparator.compare(next.key.get(), rawLastKey) >= 0) {
-                        return next;
-                    } else {
-                        return allDone();
-                    }
-                }
-            }
-        }
-    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index b4dcf371a2d..7ecc9d7f099 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -912,6 +912,35 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
         void flush(final ColumnFamilyHandle... columnFamilies) throws 
RocksDBException;
         void reset();
         void close();
+
+        default ManagedKeyValueIterator<Bytes, byte[]> all(final 
ColumnFamilyHandle cf, final String storeName, final boolean forward) {
+            final RocksIterator iter = newIterator(cf);
+            if (forward) {
+                iter.seekToFirst();
+            } else {
+                iter.seekToLast();
+            }
+            return new RocksDbIterator(storeName, iter, forward);
+        }
+
+        default ManagedKeyValueIterator<Bytes, byte[]> range(final 
ColumnFamilyHandle cf, final String storeName,
+                                                             final Bytes from, 
final Bytes to,
+                                                             final boolean 
forward, final boolean toInclusive) {
+            return new RocksDBRangeIterator(storeName, newIterator(cf), from, 
to, forward, toInclusive);
+        }
+
+        default ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final 
ColumnFamilyHandle cf, final String storeName,
+                                                                  final Bytes 
prefix, final Bytes to) {
+            return new RocksDBRangeIterator(storeName, newIterator(cf), 
prefix, to, true, false);
+        }
+
+        default void commitStagedWrites() throws RocksDBException {
+            // no-op for non-transactional accessors
+        }
+
+        default void rollbackStagedWrites() {
+            // no-op for non-transactional accessors
+        }
     }
 
     static class DirectDBAccessor implements DBAccessor {
@@ -1094,14 +1123,7 @@ public class RocksDBStore implements 
KeyValueStore<Bytes, byte[]>, BatchWritingS
                                                             final Bytes from,
                                                             final Bytes to,
                                                             final boolean 
forward) {
-            return new RocksDBRangeIterator(
-                    name,
-                    accessor.newIterator(columnFamily),
-                    from,
-                    to,
-                    forward,
-                    true
-            );
+            return accessor.range(columnFamily, name, from, to, forward, true);
         }
 
         @Override
@@ -1116,26 +1138,13 @@ public class RocksDBStore implements 
KeyValueStore<Bytes, byte[]>, BatchWritingS
 
         @Override
         public ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor 
accessor, final boolean forward) {
-            final RocksIterator innerIterWithTimestamp = 
accessor.newIterator(columnFamily);
-            if (forward) {
-                innerIterWithTimestamp.seekToFirst();
-            } else {
-                innerIterWithTimestamp.seekToLast();
-            }
-            return new RocksDbIterator(name, innerIterWithTimestamp, forward);
+            return accessor.all(columnFamily, name, forward);
         }
 
         @Override
         public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final 
DBAccessor accessor, final Bytes prefix) {
             final Bytes to = incrementWithoutOverflow(prefix);
-            return new RocksDBRangeIterator(
-                    name,
-                    accessor.newIterator(columnFamily),
-                    prefix,
-                    to,
-                    true,
-                    false
-            );
+            return accessor.prefixScan(columnFamily, name, prefix, to);
         }
 
         @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
index 9a5c39728cd..848030dc570 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
@@ -133,6 +133,13 @@ abstract class AbstractColumnFamilyAccessorTest {
         assertEquals(positionToStore, 
PositionSerde.deserialize(ByteBuffer.wrap(dbAccessor.get(offsetsCF, 
toBytes("position")))));
     }
 
+    @Test
+    public void shouldCommitStagedWritesWhenCommittingOffsets() throws 
RocksDBException {
+        final TopicPartition tp0 = new TopicPartition("testTopic", 0);
+        accessor.commit(dbAccessor, Map.of(tp0, 10L));
+        verify(dbAccessor).commitStagedWrites();
+    }
+
     @Test
     public void shouldWipeCommittedOffsetsOnEmptyCommit() throws 
RocksDBException {
         dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
index 437352b5c90..ba8be285ac3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
@@ -28,8 +28,8 @@ import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
 import org.rocksdb.WriteBatchInterface;
 
 import java.nio.ByteBuffer;
@@ -39,9 +39,11 @@ import java.util.function.Function;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doThrow;
@@ -339,64 +341,186 @@ public class DualColumnFamilyAccessorTest extends 
AbstractColumnFamilyAccessorTe
     }
 
     @Test
+    @SuppressWarnings("unchecked")
     public void shouldCreateRangeIterator() {
-        final RocksIterator iterNewFormat = mock(RocksIterator.class);
-        final RocksIterator oldIterFormat = mock(RocksIterator.class);
-        when(dbAccessor.newIterator(newCF)).thenReturn(iterNewFormat);
-        when(dbAccessor.newIterator(oldCF)).thenReturn(oldIterFormat);
-
+        final ManagedKeyValueIterator<Bytes, byte[]> iterNew = 
mock(ManagedKeyValueIterator.class);
+        final ManagedKeyValueIterator<Bytes, byte[]> iterOld = 
mock(ManagedKeyValueIterator.class);
         final Bytes from = new Bytes("a".getBytes());
         final Bytes to = new Bytes("z".getBytes());
+        when(dbAccessor.range(newCF, STORE_NAME, from, to, true, 
true)).thenReturn(iterNew);
+        when(dbAccessor.range(oldCF, STORE_NAME, from, to, true, 
true)).thenReturn(iterOld);
 
         final ManagedKeyValueIterator<Bytes, byte[]> iterator = 
accessor.range(dbAccessor, from, to, true);
 
         assertNotNull(iterator);
-        verify(dbAccessor).newIterator(newCF);
-        verify(dbAccessor).newIterator(oldCF);
+        verify(dbAccessor).range(newCF, STORE_NAME, from, to, true, true);
+        verify(dbAccessor).range(oldCF, STORE_NAME, from, to, true, true);
+        verify(dbAccessor, never()).newIterator(any());
     }
 
     @Test
+    @SuppressWarnings("unchecked")
     public void shouldCreateAllIteratorForward() {
-        final RocksIterator newIterFormat = mock(RocksIterator.class);
-        final RocksIterator oldIterFormat = mock(RocksIterator.class);
-        when(dbAccessor.newIterator(newCF)).thenReturn(newIterFormat);
-        when(dbAccessor.newIterator(oldCF)).thenReturn(oldIterFormat);
+        final ManagedKeyValueIterator<Bytes, byte[]> iterNew = 
mock(ManagedKeyValueIterator.class);
+        final ManagedKeyValueIterator<Bytes, byte[]> iterOld = 
mock(ManagedKeyValueIterator.class);
+        when(dbAccessor.all(newCF, STORE_NAME, true)).thenReturn(iterNew);
+        when(dbAccessor.all(oldCF, STORE_NAME, true)).thenReturn(iterOld);
 
         final ManagedKeyValueIterator<Bytes, byte[]> iterator = 
accessor.all(dbAccessor, true);
 
         assertNotNull(iterator);
-        verify(oldIterFormat).seekToFirst();
-        verify(oldIterFormat).seekToFirst();
+        verify(dbAccessor).all(newCF, STORE_NAME, true);
+        verify(dbAccessor).all(oldCF, STORE_NAME, true);
+        verify(dbAccessor, never()).newIterator(any());
     }
 
     @Test
+    @SuppressWarnings("unchecked")
     public void shouldCreateAllIteratorReverse() {
-        final RocksIterator newIterFormat = mock(RocksIterator.class);
-        final RocksIterator oldIterFormat = mock(RocksIterator.class);
-        when(dbAccessor.newIterator(newCF)).thenReturn(newIterFormat);
-        when(dbAccessor.newIterator(oldCF)).thenReturn(oldIterFormat);
+        final ManagedKeyValueIterator<Bytes, byte[]> iterNew = 
mock(ManagedKeyValueIterator.class);
+        final ManagedKeyValueIterator<Bytes, byte[]> iterOld = 
mock(ManagedKeyValueIterator.class);
+        when(dbAccessor.all(newCF, STORE_NAME, false)).thenReturn(iterNew);
+        when(dbAccessor.all(oldCF, STORE_NAME, false)).thenReturn(iterOld);
 
         final ManagedKeyValueIterator<Bytes, byte[]> iterator = 
accessor.all(dbAccessor, false);
 
         assertNotNull(iterator);
-        verify(newIterFormat).seekToLast();
-        verify(oldIterFormat).seekToLast();
+        verify(dbAccessor).all(newCF, STORE_NAME, false);
+        verify(dbAccessor).all(oldCF, STORE_NAME, false);
+        verify(dbAccessor, never()).newIterator(any());
     }
 
     @Test
+    @SuppressWarnings("unchecked")
     public void shouldCreatePrefixScanIterator() {
-        final RocksIterator newIterFormat = mock(RocksIterator.class);
-        final RocksIterator oldIterFormat = mock(RocksIterator.class);
-        when(dbAccessor.newIterator(newCF)).thenReturn(newIterFormat);
-        when(dbAccessor.newIterator(oldCF)).thenReturn(oldIterFormat);
-
+        final ManagedKeyValueIterator<Bytes, byte[]> iterNew = 
mock(ManagedKeyValueIterator.class);
+        final ManagedKeyValueIterator<Bytes, byte[]> iterOld = 
mock(ManagedKeyValueIterator.class);
         final Bytes prefix = new Bytes("prefix".getBytes());
+        when(dbAccessor.prefixScan(eq(newCF), eq(STORE_NAME), eq(prefix), 
any())).thenReturn(iterNew);
+        when(dbAccessor.prefixScan(eq(oldCF), eq(STORE_NAME), eq(prefix), 
any())).thenReturn(iterOld);
 
         final ManagedKeyValueIterator<Bytes, byte[]> iterator = 
accessor.prefixScan(dbAccessor, prefix);
 
         assertNotNull(iterator);
-        verify(dbAccessor).newIterator(newCF);
-        verify(dbAccessor).newIterator(oldCF);
+        verify(dbAccessor).prefixScan(eq(newCF), eq(STORE_NAME), eq(prefix), 
any());
+        verify(dbAccessor).prefixScan(eq(oldCF), eq(STORE_NAME), eq(prefix), 
any());
+        verify(dbAccessor, never()).newIterator(any());
+    }
+
+    @Test
+    public void shouldMergeEntriesFromBothColumnFamiliesInForwardOrder() 
throws RocksDBException {
+        final InMemoryRocksDBAccessor inMemory = new 
InMemoryRocksDBAccessor(mock(RocksDB.class));
+        inMemory.put(oldCF, "a".getBytes(), "old-a".getBytes());
+        inMemory.put(oldCF, "c".getBytes(), "old-c".getBytes());
+        inMemory.put(newCF, "b".getBytes(), "new-b".getBytes());
+        inMemory.put(newCF, "d".getBytes(), "new-d".getBytes());
+
+        final ManagedKeyValueIterator<Bytes, byte[]> it = 
accessor.all(inMemory, true);
+        it.onClose(() -> { });
+
+        assertTrue(it.hasNext());
+        final KeyValue<Bytes, byte[]> kv0 = it.next();
+        assertArrayEquals("a".getBytes(), kv0.key.get());
+        assertTrue(new String(kv0.value).contains("old-a"), "expected 
converted old-a value");
+
+        assertTrue(it.hasNext());
+        final KeyValue<Bytes, byte[]> kv1 = it.next();
+        assertArrayEquals("b".getBytes(), kv1.key.get());
+        assertArrayEquals("new-b".getBytes(), kv1.value);
+
+        assertTrue(it.hasNext());
+        final KeyValue<Bytes, byte[]> kv2 = it.next();
+        assertArrayEquals("c".getBytes(), kv2.key.get());
+        assertTrue(new String(kv2.value).contains("old-c"), "expected 
converted old-c value");
+
+        assertTrue(it.hasNext());
+        final KeyValue<Bytes, byte[]> kv3 = it.next();
+        assertArrayEquals("d".getBytes(), kv3.key.get());
+        assertArrayEquals("new-d".getBytes(), kv3.value);
+
+        assertFalse(it.hasNext());
+    }
+
+    @Test
+    public void shouldMergeEntriesFromBothColumnFamiliesInReverseOrder() 
throws RocksDBException {
+        final InMemoryRocksDBAccessor inMemory = new 
InMemoryRocksDBAccessor(mock(RocksDB.class));
+        inMemory.put(oldCF, "a".getBytes(), "old-a".getBytes());
+        inMemory.put(oldCF, "c".getBytes(), "old-c".getBytes());
+        inMemory.put(newCF, "b".getBytes(), "new-b".getBytes());
+        inMemory.put(newCF, "d".getBytes(), "new-d".getBytes());
+
+        final ManagedKeyValueIterator<Bytes, byte[]> it = 
accessor.all(inMemory, false);
+        it.onClose(() -> { });
+
+        final List<Bytes> keys = new ArrayList<>();
+        while (it.hasNext()) {
+            keys.add(it.next().key);
+        }
+        assertEquals(4, keys.size());
+        assertTrue(new String(keys.get(0).get()).compareTo(new 
String(keys.get(1).get())) > 0,
+                "expected descending order");
+        assertTrue(new String(keys.get(1).get()).compareTo(new 
String(keys.get(2).get())) > 0,
+                "expected descending order");
+        assertTrue(new String(keys.get(2).get()).compareTo(new 
String(keys.get(3).get())) > 0,
+                "expected descending order");
+    }
+
+    @Test
+    public void shouldRespectRangeBoundsAcrossBothColumnFamilies() throws 
RocksDBException {
+        final InMemoryRocksDBAccessor inMemory = new 
InMemoryRocksDBAccessor(mock(RocksDB.class));
+        inMemory.put(oldCF, "a".getBytes(), "old-a".getBytes());
+        inMemory.put(oldCF, "c".getBytes(), "old-c".getBytes());
+        inMemory.put(newCF, "b".getBytes(), "new-b".getBytes());
+        inMemory.put(newCF, "d".getBytes(), "new-d".getBytes());
+
+        final ManagedKeyValueIterator<Bytes, byte[]> it =
+                accessor.range(inMemory, Bytes.wrap("b".getBytes()), 
Bytes.wrap("c".getBytes()), true);
+        it.onClose(() -> { });
+
+        final List<byte[]> keys = new ArrayList<>();
+        while (it.hasNext()) {
+            keys.add(it.next().key.get());
+        }
+        assertEquals(2, keys.size());
+        assertArrayEquals("b".getBytes(), keys.get(0));
+        assertArrayEquals("c".getBytes(), keys.get(1));
+    }
+
+    @Test
+    public void shouldRespectPrefixScanAcrossBothColumnFamilies() throws 
RocksDBException {
+        final InMemoryRocksDBAccessor inMemory = new 
InMemoryRocksDBAccessor(mock(RocksDB.class));
+        inMemory.put(oldCF, "foo:1".getBytes(), "old-1".getBytes());
+        inMemory.put(newCF, "foo:2".getBytes(), "new-2".getBytes());
+        inMemory.put(newCF, "bar:1".getBytes(), "new-bar".getBytes());
+
+        final ManagedKeyValueIterator<Bytes, byte[]> it =
+                accessor.prefixScan(inMemory, Bytes.wrap("foo:".getBytes()));
+        it.onClose(() -> { });
+
+        final List<byte[]> keys = new ArrayList<>();
+        while (it.hasNext()) {
+            keys.add(it.next().key.get());
+        }
+        assertEquals(2, keys.size());
+        assertArrayEquals("foo:1".getBytes(), keys.get(0));
+        assertArrayEquals("foo:2".getBytes(), keys.get(1));
+    }
+
+    @Test
+    public void 
shouldPreferNewFormatValueWhenSameKeyExistsInBothColumnFamilies() throws 
RocksDBException {
+        final InMemoryRocksDBAccessor inMemory = new 
InMemoryRocksDBAccessor(mock(RocksDB.class));
+        inMemory.put(oldCF, "x".getBytes(), "old-x".getBytes());
+        inMemory.put(newCF, "x".getBytes(), "new-x".getBytes());
+
+        final ManagedKeyValueIterator<Bytes, byte[]> it = 
accessor.all(inMemory, true);
+        it.onClose(() -> { });
+
+        assertTrue(it.hasNext());
+        final KeyValue<Bytes, byte[]> kv = it.next();
+        assertArrayEquals("x".getBytes(), kv.key.get());
+        assertArrayEquals("new-x".getBytes(), kv.value,
+                "new-format value should win over old-format value for the 
same key");
+        assertFalse(it.hasNext(), "same key should appear exactly once");
     }
 
     @Test

Reply via email to