This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 58819e19b41 KAFKA-20492: Prepare DBAccessor for txn support (#22142)
58819e19b41 is described below
commit 58819e19b41803896a8fd165a058be3118431138
Author: Nick Telford <[email protected]>
AuthorDate: Fri May 15 22:18:53 2026 +0100
KAFKA-20492: Prepare DBAccessor for txn support (#22142)
Moves iterator construction (all, range, prefixScan) from
SingleColumnFamilyAccessor into default methods on DBAccessor.
SingleColumnFamilyAccessor now delegates to the accessor, allowing
future DBAccessor implementations (e.g. a transactional wrapper) to
override iteration behaviour.
Adds no-op commitStagedWrites() and rollbackStagedWrites() defaults to
DBAccessor, and calls commitStagedWrites() from
AbstractColumnFamilyAccessor.commit(). These are no-ops for the existing
DirectDBAccessor but provide the hook for a transactional accessor to
flush buffered writes atomically with offset commits.
Reviewers: Bill Bejeck <[email protected]>
---
.../internals/AbstractColumnFamilyAccessor.java | 1 +
.../state/internals/DualColumnFamilyAccessor.java | 204 +++++----------------
.../streams/state/internals/RocksDBStore.java | 55 +++---
.../AbstractColumnFamilyAccessorTest.java | 7 +
.../internals/DualColumnFamilyAccessorTest.java | 178 +++++++++++++++---
5 files changed, 241 insertions(+), 204 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
index 004c824cc3e..2cf776423a0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
@@ -69,6 +69,7 @@ abstract class AbstractColumnFamilyAccessor implements
RocksDBStore.ColumnFamily
}
}
}
+ accessor.commitStagedWrites();
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
index 7d25989083b..cb23ef9134b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java
@@ -26,7 +26,6 @@ import
org.apache.kafka.streams.state.internals.RocksDBStore.DBAccessor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatchInterface;
import java.util.Comparator;
@@ -177,15 +176,13 @@ class DualColumnFamilyAccessor extends
AbstractColumnFamilyAccessor {
final Bytes from,
final Bytes to,
final boolean forward)
{
- return new RocksDBDualCFRangeIterator(
- store.name(),
- accessor.newIterator(newColumnFamily),
- accessor.newIterator(oldColumnFamily),
- from,
- to,
- forward,
- true,
- valueConverter);
+ final ManagedKeyValueIterator<Bytes, byte[]> iterNew =
+ accessor.range(newColumnFamily, store.name(), from, to,
forward, true);
+ iterNew.onClose(() -> { });
+ final ManagedKeyValueIterator<Bytes, byte[]> iterOld =
+ accessor.range(oldColumnFamily, store.name(), from, to,
forward, true);
+ iterOld.onClose(() -> { });
+ return new RocksDBDualCFIterator(store.name(), iterNew, iterOld,
forward, valueConverter);
}
@Override
@@ -205,32 +202,26 @@ class DualColumnFamilyAccessor extends
AbstractColumnFamilyAccessor {
@Override
public ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor
accessor,
final boolean forward) {
- final RocksIterator innerIterNew =
accessor.newIterator(newColumnFamily);
- final RocksIterator innerIterOld =
accessor.newIterator(oldColumnFamily);
- if (forward) {
- innerIterNew.seekToFirst();
- innerIterOld.seekToFirst();
- } else {
- innerIterNew.seekToLast();
- innerIterOld.seekToLast();
- }
- return new RocksDBDualCFIterator(store.name(), innerIterNew,
innerIterOld, forward, valueConverter);
+ final ManagedKeyValueIterator<Bytes, byte[]> iterNew =
+ accessor.all(newColumnFamily, store.name(), forward);
+ iterNew.onClose(() -> { });
+ final ManagedKeyValueIterator<Bytes, byte[]> iterOld =
+ accessor.all(oldColumnFamily, store.name(), forward);
+ iterOld.onClose(() -> { });
+ return new RocksDBDualCFIterator(store.name(), iterNew, iterOld,
forward, valueConverter);
}
@Override
public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final DBAccessor
accessor,
final Bytes
prefix) {
final Bytes to = incrementWithoutOverflow(prefix);
- return new RocksDBDualCFRangeIterator(
- store.name(),
- accessor.newIterator(newColumnFamily),
- accessor.newIterator(oldColumnFamily),
- prefix,
- to,
- true,
- false,
- valueConverter
- );
+ final ManagedKeyValueIterator<Bytes, byte[]> iterNew =
+ accessor.prefixScan(newColumnFamily, store.name(), prefix, to);
+ iterNew.onClose(() -> { });
+ final ManagedKeyValueIterator<Bytes, byte[]> iterOld =
+ accessor.prefixScan(oldColumnFamily, store.name(), prefix, to);
+ iterOld.onClose(() -> { });
+ return new RocksDBDualCFIterator(store.name(), iterNew, iterOld, true,
valueConverter);
}
@Override
@@ -270,21 +261,19 @@ class DualColumnFamilyAccessor extends
AbstractColumnFamilyAccessor {
private final Comparator<byte[]> comparator =
ByteUtils.BYTES_LEXICO_COMPARATOR;
private final String storeName;
- private final RocksIterator iterNewFormat;
- private final RocksIterator iterOldFormat;
+ private final ManagedKeyValueIterator<Bytes, byte[]> iterNewFormat;
+ private final ManagedKeyValueIterator<Bytes, byte[]> iterOldFormat;
private final boolean forward;
private final Function<byte[], byte[]> valueConverter;
private volatile boolean open = true;
- private byte[] nextNewFormat;
- private byte[] nextOldFormat;
private KeyValue<Bytes, byte[]> next;
private Runnable closeCallback = null;
RocksDBDualCFIterator(final String storeName,
- final RocksIterator iterNewFormat,
- final RocksIterator iterOldFormat,
+ final ManagedKeyValueIterator<Bytes, byte[]>
iterNewFormat,
+ final ManagedKeyValueIterator<Bytes, byte[]>
iterOldFormat,
final boolean forward,
final Function<byte[], byte[]> valueConverter) {
this.iterNewFormat = iterNewFormat;
@@ -310,58 +299,35 @@ class DualColumnFamilyAccessor extends
AbstractColumnFamilyAccessor {
@Override
protected KeyValue<Bytes, byte[]> makeNext() {
- if (nextOldFormat == null && iterOldFormat.isValid()) {
- nextOldFormat = iterOldFormat.key();
- }
+ final boolean oldHas = iterOldFormat.hasNext();
+ final boolean newHas = iterNewFormat.hasNext();
- if (nextNewFormat == null && iterNewFormat.isValid()) {
- nextNewFormat = iterNewFormat.key();
+ if (!oldHas && !newHas) {
+ return allDone();
+ }
+ if (!oldHas) {
+ next = iterNewFormat.next();
+ return next;
+ }
+ if (!newHas) {
+ final KeyValue<Bytes, byte[]> kv = iterOldFormat.next();
+ next = KeyValue.pair(kv.key, valueConverter.apply(kv.value));
+ return next;
}
- if (nextOldFormat == null && !iterOldFormat.isValid()) {
- if (nextNewFormat == null && !iterNewFormat.isValid()) {
- return allDone();
- } else {
- next = KeyValue.pair(new Bytes(nextNewFormat),
iterNewFormat.value());
- nextNewFormat = null;
- if (forward) {
- iterNewFormat.next();
- } else {
- iterNewFormat.prev();
- }
- }
+ final int cmp = comparator.compare(
+ iterOldFormat.peekNextKey().get(),
+ iterNewFormat.peekNextKey().get());
+ // New-format wins on equality: the new CF value supersedes the
old CF value for
+ // the same key. Advance both sides to avoid re-emitting the
shadowed old entry.
+ if (cmp == 0) {
+ iterOldFormat.next();
+ next = iterNewFormat.next();
+ } else if (forward ? (cmp < 0) : (cmp > 0)) {
+ final KeyValue<Bytes, byte[]> kv = iterOldFormat.next();
+ next = KeyValue.pair(kv.key, valueConverter.apply(kv.value));
} else {
- if (nextNewFormat == null) {
- next = KeyValue.pair(new Bytes(nextOldFormat),
valueConverter.apply(iterOldFormat.value()));
- nextOldFormat = null;
- if (forward) {
- iterOldFormat.next();
- } else {
- iterOldFormat.prev();
- }
- } else {
- if (forward) {
- if (comparator.compare(nextOldFormat, nextNewFormat)
<= 0) {
- next = KeyValue.pair(new Bytes(nextOldFormat),
valueConverter.apply(iterOldFormat.value()));
- nextOldFormat = null;
- iterOldFormat.next();
- } else {
- next = KeyValue.pair(new Bytes(nextNewFormat),
iterNewFormat.value());
- nextNewFormat = null;
- iterNewFormat.next();
- }
- } else {
- if (comparator.compare(nextOldFormat, nextNewFormat)
>= 0) {
- next = KeyValue.pair(new Bytes(nextOldFormat),
valueConverter.apply(iterOldFormat.value()));
- nextOldFormat = null;
- iterOldFormat.prev();
- } else {
- next = KeyValue.pair(new Bytes(nextNewFormat),
iterNewFormat.value());
- nextNewFormat = null;
- iterNewFormat.prev();
- }
- }
- }
+ next = iterNewFormat.next();
}
return next;
}
@@ -392,74 +358,4 @@ class DualColumnFamilyAccessor extends
AbstractColumnFamilyAccessor {
this.closeCallback = closeCallback;
}
}
-
- private static class RocksDBDualCFRangeIterator extends
RocksDBDualCFIterator {
- // RocksDB's JNI interface does not expose getters/setters that allow the
- // comparator to be pluggable, and the default is lexicographic, so it's
- // safe to just force lexicographic comparator here for now.
- private final Comparator<byte[]> comparator =
ByteUtils.BYTES_LEXICO_COMPARATOR;
- private final byte[] rawLastKey;
- private final boolean forward;
- private final boolean toInclusive;
-
- RocksDBDualCFRangeIterator(final String storeName,
- final RocksIterator iterNewFormat,
- final RocksIterator iterOldFormat,
- final Bytes from,
- final Bytes to,
- final boolean forward,
- final boolean toInclusive,
- final Function<byte[], byte[]>
valueConverter) {
- super(storeName, iterNewFormat, iterOldFormat, forward,
valueConverter);
- this.forward = forward;
- this.toInclusive = toInclusive;
- if (forward) {
- if (from == null) {
- iterNewFormat.seekToFirst();
- iterOldFormat.seekToFirst();
- } else {
- iterNewFormat.seek(from.get());
- iterOldFormat.seek(from.get());
- }
- rawLastKey = to == null ? null : to.get();
- } else {
- if (to == null) {
- iterNewFormat.seekToLast();
- iterOldFormat.seekToLast();
- } else {
- iterNewFormat.seekForPrev(to.get());
- iterOldFormat.seekForPrev(to.get());
- }
- rawLastKey = from == null ? null : from.get();
- }
- }
-
- @Override
- protected KeyValue<Bytes, byte[]> makeNext() {
- final KeyValue<Bytes, byte[]> next = super.makeNext();
-
- if (next == null) {
- return allDone();
- } else if (rawLastKey == null) {
- // null means range endpoint is open
- return next;
- } else {
- if (forward) {
- if (comparator.compare(next.key.get(), rawLastKey) < 0) {
- return next;
- } else if (comparator.compare(next.key.get(), rawLastKey)
== 0) {
- return toInclusive ? next : allDone();
- } else {
- return allDone();
- }
- } else {
- if (comparator.compare(next.key.get(), rawLastKey) >= 0) {
- return next;
- } else {
- return allDone();
- }
- }
- }
- }
- }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index b4dcf371a2d..7ecc9d7f099 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -912,6 +912,35 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
void flush(final ColumnFamilyHandle... columnFamilies) throws
RocksDBException;
void reset();
void close();
+
+ default ManagedKeyValueIterator<Bytes, byte[]> all(final
ColumnFamilyHandle cf, final String storeName, final boolean forward) {
+ final RocksIterator iter = newIterator(cf);
+ if (forward) {
+ iter.seekToFirst();
+ } else {
+ iter.seekToLast();
+ }
+ return new RocksDbIterator(storeName, iter, forward);
+ }
+
+ default ManagedKeyValueIterator<Bytes, byte[]> range(final
ColumnFamilyHandle cf, final String storeName,
+ final Bytes from,
final Bytes to,
+ final boolean
forward, final boolean toInclusive) {
+ return new RocksDBRangeIterator(storeName, newIterator(cf), from,
to, forward, toInclusive);
+ }
+
+ default ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final
ColumnFamilyHandle cf, final String storeName,
+ final Bytes
prefix, final Bytes to) {
+ return new RocksDBRangeIterator(storeName, newIterator(cf),
prefix, to, true, false);
+ }
+
+ default void commitStagedWrites() throws RocksDBException {
+ // no-op for non-transactional accessors
+ }
+
+ default void rollbackStagedWrites() {
+ // no-op for non-transactional accessors
+ }
}
static class DirectDBAccessor implements DBAccessor {
@@ -1094,14 +1123,7 @@ public class RocksDBStore implements
KeyValueStore<Bytes, byte[]>, BatchWritingS
final Bytes from,
final Bytes to,
final boolean
forward) {
- return new RocksDBRangeIterator(
- name,
- accessor.newIterator(columnFamily),
- from,
- to,
- forward,
- true
- );
+ return accessor.range(columnFamily, name, from, to, forward, true);
}
@Override
@@ -1116,26 +1138,13 @@ public class RocksDBStore implements
KeyValueStore<Bytes, byte[]>, BatchWritingS
@Override
public ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor
accessor, final boolean forward) {
- final RocksIterator innerIterWithTimestamp =
accessor.newIterator(columnFamily);
- if (forward) {
- innerIterWithTimestamp.seekToFirst();
- } else {
- innerIterWithTimestamp.seekToLast();
- }
- return new RocksDbIterator(name, innerIterWithTimestamp, forward);
+ return accessor.all(columnFamily, name, forward);
}
@Override
public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final
DBAccessor accessor, final Bytes prefix) {
final Bytes to = incrementWithoutOverflow(prefix);
- return new RocksDBRangeIterator(
- name,
- accessor.newIterator(columnFamily),
- prefix,
- to,
- true,
- false
- );
+ return accessor.prefixScan(columnFamily, name, prefix, to);
}
@Override
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
index 9a5c39728cd..848030dc570 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
@@ -133,6 +133,13 @@ abstract class AbstractColumnFamilyAccessorTest {
assertEquals(positionToStore,
PositionSerde.deserialize(ByteBuffer.wrap(dbAccessor.get(offsetsCF,
toBytes("position")))));
}
+ @Test
+ public void shouldCommitStagedWritesWhenCommittingOffsets() throws
RocksDBException {
+ final TopicPartition tp0 = new TopicPartition("testTopic", 0);
+ accessor.commit(dbAccessor, Map.of(tp0, 10L));
+ verify(dbAccessor).commitStagedWrites();
+ }
+
@Test
public void shouldWipeCommittedOffsetsOnEmptyCommit() throws
RocksDBException {
dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
index 437352b5c90..ba8be285ac3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java
@@ -28,8 +28,8 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatchInterface;
import java.nio.ByteBuffer;
@@ -39,9 +39,11 @@ import java.util.function.Function;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
@@ -339,64 +341,186 @@ public class DualColumnFamilyAccessorTest extends
AbstractColumnFamilyAccessorTe
}
@Test
+ @SuppressWarnings("unchecked")
public void shouldCreateRangeIterator() {
- final RocksIterator iterNewFormat = mock(RocksIterator.class);
- final RocksIterator oldIterFormat = mock(RocksIterator.class);
- when(dbAccessor.newIterator(newCF)).thenReturn(iterNewFormat);
- when(dbAccessor.newIterator(oldCF)).thenReturn(oldIterFormat);
-
+ final ManagedKeyValueIterator<Bytes, byte[]> iterNew =
mock(ManagedKeyValueIterator.class);
+ final ManagedKeyValueIterator<Bytes, byte[]> iterOld =
mock(ManagedKeyValueIterator.class);
final Bytes from = new Bytes("a".getBytes());
final Bytes to = new Bytes("z".getBytes());
+ when(dbAccessor.range(newCF, STORE_NAME, from, to, true,
true)).thenReturn(iterNew);
+ when(dbAccessor.range(oldCF, STORE_NAME, from, to, true,
true)).thenReturn(iterOld);
final ManagedKeyValueIterator<Bytes, byte[]> iterator =
accessor.range(dbAccessor, from, to, true);
assertNotNull(iterator);
- verify(dbAccessor).newIterator(newCF);
- verify(dbAccessor).newIterator(oldCF);
+ verify(dbAccessor).range(newCF, STORE_NAME, from, to, true, true);
+ verify(dbAccessor).range(oldCF, STORE_NAME, from, to, true, true);
+ verify(dbAccessor, never()).newIterator(any());
}
@Test
+ @SuppressWarnings("unchecked")
public void shouldCreateAllIteratorForward() {
- final RocksIterator newIterFormat = mock(RocksIterator.class);
- final RocksIterator oldIterFormat = mock(RocksIterator.class);
- when(dbAccessor.newIterator(newCF)).thenReturn(newIterFormat);
- when(dbAccessor.newIterator(oldCF)).thenReturn(oldIterFormat);
+ final ManagedKeyValueIterator<Bytes, byte[]> iterNew =
mock(ManagedKeyValueIterator.class);
+ final ManagedKeyValueIterator<Bytes, byte[]> iterOld =
mock(ManagedKeyValueIterator.class);
+ when(dbAccessor.all(newCF, STORE_NAME, true)).thenReturn(iterNew);
+ when(dbAccessor.all(oldCF, STORE_NAME, true)).thenReturn(iterOld);
final ManagedKeyValueIterator<Bytes, byte[]> iterator =
accessor.all(dbAccessor, true);
assertNotNull(iterator);
- verify(oldIterFormat).seekToFirst();
- verify(oldIterFormat).seekToFirst();
+ verify(dbAccessor).all(newCF, STORE_NAME, true);
+ verify(dbAccessor).all(oldCF, STORE_NAME, true);
+ verify(dbAccessor, never()).newIterator(any());
}
@Test
+ @SuppressWarnings("unchecked")
public void shouldCreateAllIteratorReverse() {
- final RocksIterator newIterFormat = mock(RocksIterator.class);
- final RocksIterator oldIterFormat = mock(RocksIterator.class);
- when(dbAccessor.newIterator(newCF)).thenReturn(newIterFormat);
- when(dbAccessor.newIterator(oldCF)).thenReturn(oldIterFormat);
+ final ManagedKeyValueIterator<Bytes, byte[]> iterNew =
mock(ManagedKeyValueIterator.class);
+ final ManagedKeyValueIterator<Bytes, byte[]> iterOld =
mock(ManagedKeyValueIterator.class);
+ when(dbAccessor.all(newCF, STORE_NAME, false)).thenReturn(iterNew);
+ when(dbAccessor.all(oldCF, STORE_NAME, false)).thenReturn(iterOld);
final ManagedKeyValueIterator<Bytes, byte[]> iterator =
accessor.all(dbAccessor, false);
assertNotNull(iterator);
- verify(newIterFormat).seekToLast();
- verify(oldIterFormat).seekToLast();
+ verify(dbAccessor).all(newCF, STORE_NAME, false);
+ verify(dbAccessor).all(oldCF, STORE_NAME, false);
+ verify(dbAccessor, never()).newIterator(any());
}
@Test
+ @SuppressWarnings("unchecked")
public void shouldCreatePrefixScanIterator() {
- final RocksIterator newIterFormat = mock(RocksIterator.class);
- final RocksIterator oldIterFormat = mock(RocksIterator.class);
- when(dbAccessor.newIterator(newCF)).thenReturn(newIterFormat);
- when(dbAccessor.newIterator(oldCF)).thenReturn(oldIterFormat);
-
+ final ManagedKeyValueIterator<Bytes, byte[]> iterNew =
mock(ManagedKeyValueIterator.class);
+ final ManagedKeyValueIterator<Bytes, byte[]> iterOld =
mock(ManagedKeyValueIterator.class);
final Bytes prefix = new Bytes("prefix".getBytes());
+ when(dbAccessor.prefixScan(eq(newCF), eq(STORE_NAME), eq(prefix),
any())).thenReturn(iterNew);
+ when(dbAccessor.prefixScan(eq(oldCF), eq(STORE_NAME), eq(prefix),
any())).thenReturn(iterOld);
final ManagedKeyValueIterator<Bytes, byte[]> iterator =
accessor.prefixScan(dbAccessor, prefix);
assertNotNull(iterator);
- verify(dbAccessor).newIterator(newCF);
- verify(dbAccessor).newIterator(oldCF);
+ verify(dbAccessor).prefixScan(eq(newCF), eq(STORE_NAME), eq(prefix),
any());
+ verify(dbAccessor).prefixScan(eq(oldCF), eq(STORE_NAME), eq(prefix),
any());
+ verify(dbAccessor, never()).newIterator(any());
+ }
+
+ @Test
+ public void shouldMergeEntriesFromBothColumnFamiliesInForwardOrder()
throws RocksDBException {
+ final InMemoryRocksDBAccessor inMemory = new
InMemoryRocksDBAccessor(mock(RocksDB.class));
+ inMemory.put(oldCF, "a".getBytes(), "old-a".getBytes());
+ inMemory.put(oldCF, "c".getBytes(), "old-c".getBytes());
+ inMemory.put(newCF, "b".getBytes(), "new-b".getBytes());
+ inMemory.put(newCF, "d".getBytes(), "new-d".getBytes());
+
+ final ManagedKeyValueIterator<Bytes, byte[]> it =
accessor.all(inMemory, true);
+ it.onClose(() -> { });
+
+ assertTrue(it.hasNext());
+ final KeyValue<Bytes, byte[]> kv0 = it.next();
+ assertArrayEquals("a".getBytes(), kv0.key.get());
+ assertTrue(new String(kv0.value).contains("old-a"), "expected
converted old-a value");
+
+ assertTrue(it.hasNext());
+ final KeyValue<Bytes, byte[]> kv1 = it.next();
+ assertArrayEquals("b".getBytes(), kv1.key.get());
+ assertArrayEquals("new-b".getBytes(), kv1.value);
+
+ assertTrue(it.hasNext());
+ final KeyValue<Bytes, byte[]> kv2 = it.next();
+ assertArrayEquals("c".getBytes(), kv2.key.get());
+ assertTrue(new String(kv2.value).contains("old-c"), "expected
converted old-c value");
+
+ assertTrue(it.hasNext());
+ final KeyValue<Bytes, byte[]> kv3 = it.next();
+ assertArrayEquals("d".getBytes(), kv3.key.get());
+ assertArrayEquals("new-d".getBytes(), kv3.value);
+
+ assertFalse(it.hasNext());
+ }
+
+ @Test
+ public void shouldMergeEntriesFromBothColumnFamiliesInReverseOrder()
throws RocksDBException {
+ final InMemoryRocksDBAccessor inMemory = new
InMemoryRocksDBAccessor(mock(RocksDB.class));
+ inMemory.put(oldCF, "a".getBytes(), "old-a".getBytes());
+ inMemory.put(oldCF, "c".getBytes(), "old-c".getBytes());
+ inMemory.put(newCF, "b".getBytes(), "new-b".getBytes());
+ inMemory.put(newCF, "d".getBytes(), "new-d".getBytes());
+
+ final ManagedKeyValueIterator<Bytes, byte[]> it =
accessor.all(inMemory, false);
+ it.onClose(() -> { });
+
+ final List<Bytes> keys = new ArrayList<>();
+ while (it.hasNext()) {
+ keys.add(it.next().key);
+ }
+ assertEquals(4, keys.size());
+ assertTrue(new String(keys.get(0).get()).compareTo(new
String(keys.get(1).get())) > 0,
+ "expected descending order");
+ assertTrue(new String(keys.get(1).get()).compareTo(new
String(keys.get(2).get())) > 0,
+ "expected descending order");
+ assertTrue(new String(keys.get(2).get()).compareTo(new
String(keys.get(3).get())) > 0,
+ "expected descending order");
+ }
+
+ @Test
+ public void shouldRespectRangeBoundsAcrossBothColumnFamilies() throws
RocksDBException {
+ final InMemoryRocksDBAccessor inMemory = new
InMemoryRocksDBAccessor(mock(RocksDB.class));
+ inMemory.put(oldCF, "a".getBytes(), "old-a".getBytes());
+ inMemory.put(oldCF, "c".getBytes(), "old-c".getBytes());
+ inMemory.put(newCF, "b".getBytes(), "new-b".getBytes());
+ inMemory.put(newCF, "d".getBytes(), "new-d".getBytes());
+
+ final ManagedKeyValueIterator<Bytes, byte[]> it =
+ accessor.range(inMemory, Bytes.wrap("b".getBytes()),
Bytes.wrap("c".getBytes()), true);
+ it.onClose(() -> { });
+
+ final List<byte[]> keys = new ArrayList<>();
+ while (it.hasNext()) {
+ keys.add(it.next().key.get());
+ }
+ assertEquals(2, keys.size());
+ assertArrayEquals("b".getBytes(), keys.get(0));
+ assertArrayEquals("c".getBytes(), keys.get(1));
+ }
+
+ @Test
+ public void shouldRespectPrefixScanAcrossBothColumnFamilies() throws
RocksDBException {
+ final InMemoryRocksDBAccessor inMemory = new
InMemoryRocksDBAccessor(mock(RocksDB.class));
+ inMemory.put(oldCF, "foo:1".getBytes(), "old-1".getBytes());
+ inMemory.put(newCF, "foo:2".getBytes(), "new-2".getBytes());
+ inMemory.put(newCF, "bar:1".getBytes(), "new-bar".getBytes());
+
+ final ManagedKeyValueIterator<Bytes, byte[]> it =
+ accessor.prefixScan(inMemory, Bytes.wrap("foo:".getBytes()));
+ it.onClose(() -> { });
+
+ final List<byte[]> keys = new ArrayList<>();
+ while (it.hasNext()) {
+ keys.add(it.next().key.get());
+ }
+ assertEquals(2, keys.size());
+ assertArrayEquals("foo:1".getBytes(), keys.get(0));
+ assertArrayEquals("foo:2".getBytes(), keys.get(1));
+ }
+
+ @Test
+ public void
shouldPreferNewFormatValueWhenSameKeyExistsInBothColumnFamilies() throws
RocksDBException {
+ final InMemoryRocksDBAccessor inMemory = new
InMemoryRocksDBAccessor(mock(RocksDB.class));
+ inMemory.put(oldCF, "x".getBytes(), "old-x".getBytes());
+ inMemory.put(newCF, "x".getBytes(), "new-x".getBytes());
+
+ final ManagedKeyValueIterator<Bytes, byte[]> it =
accessor.all(inMemory, true);
+ it.onClose(() -> { });
+
+ assertTrue(it.hasNext());
+ final KeyValue<Bytes, byte[]> kv = it.next();
+ assertArrayEquals("x".getBytes(), kv.key.get());
+ assertArrayEquals("new-x".getBytes(), kv.value,
+ "new-format value should win over old-format value for the
same key");
+ assertFalse(it.hasNext(), "same key should appear exactly once");
}
@Test