This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new feba1af46b7 MINOR: Test Coverage for Segmented stores (#21860)
feba1af46b7 is described below
commit feba1af46b7ea1a77335141d73b5b753ba0dddbf
Author: Eduwer Camacaro <[email protected]>
AuthorDate: Tue Mar 31 11:43:15 2026 -0500
MINOR: Test Coverage for Segmented stores (#21860)
Follow-up PR for KAFKA-19713 that includes test cases for:
- Persist Position offsets in the `offsets` ColumnFamily
- Position gets correctly merged across all segments in Segmented stores
- Persist position after every commit
- Restore positions after store close->open
This PR also refactors AbstractColumnFamily tests to not use a mock by
creating an in-memory version of a RocksDBAccessor.
Reviewers: Bill Bejeck <[email protected]>
---------
Co-authored-by: Bill Bejeck <[email protected]>
---
.../AbstractColumnFamilyAccessorTest.java | 66 ++++--
...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 39 +++-
.../AbstractRocksDBSegmentedBytesStoreTest.java | 43 +++-
.../state/internals/InMemoryRocksDBAccessor.java | 246 +++++++++++++++++++++
.../streams/state/internals/RocksDBStoreTest.java | 2 +-
.../state/internals/RocksDBVersionedStoreTest.java | 2 +-
6 files changed, 380 insertions(+), 18 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
index b1f1a1ec255..b06ee508746 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.query.Position;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -28,18 +29,21 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
+import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.mock;
@ExtendWith(MockitoExtension.class)
abstract class AbstractColumnFamilyAccessorTest {
@@ -67,41 +71,75 @@ abstract class AbstractColumnFamilyAccessorTest {
@Test
public void shouldOpenClean() throws RocksDBException {
- when(dbAccessor.get(offsetsCF,
toBytes("status"))).thenReturn(closedValue);
-
+ dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
// Open the ColumnFamily
accessor.open(dbAccessor, false);
- verify(dbAccessor).put(eq(offsetsCF), eq(toBytes("status")),
eq(openValue));
+ assertArrayEquals(openValue, dbAccessor.get(offsetsCF,
toBytes("status")));
// Now close the ColumnFamily
accessor.close(dbAccessor);
- verify(dbAccessor).put(eq(offsetsCF), eq(toBytes("status")),
eq(closedValue));
+ assertArrayEquals(closedValue, dbAccessor.get(offsetsCF,
toBytes("status")));
+
+ // Open clean again
+ accessor.open(dbAccessor, false);
+ assertArrayEquals(openValue, dbAccessor.get(offsetsCF,
toBytes("status")));
}
@Test
public void shouldThrowOnOpenAfterAUncleanClose() throws RocksDBException {
- when(dbAccessor.get(offsetsCF,
toBytes("status"))).thenReturn(openValue);
+ dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
+ // First, open clean
+ accessor.open(dbAccessor, false);
+
+ // Try to open again, with ignoreUncleanClose=false, which should
throw since the store is already open
final ProcessorStateException thrown =
assertThrowsExactly(ProcessorStateException.class, () ->
accessor.open(dbAccessor, false));
assertEquals("Invalid state during store open. Expected state to be
either empty or closed", thrown.getMessage());
}
@Test
public void shouldIgnoreExceptionAfterUncleanClose() throws
RocksDBException {
- when(dbAccessor.get(offsetsCF,
toBytes("status"))).thenReturn(openValue);
+ dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
+ // First, open clean
+ accessor.open(dbAccessor, false);
+ // Now reopen in an invalid state
accessor.open(dbAccessor, true);
assertTrue(storeOpen.get());
- verify(dbAccessor).put(eq(offsetsCF), eq(toBytes("status")),
eq(openValue));
+ assertArrayEquals(openValue, dbAccessor.get(offsetsCF,
toBytes("status")));
}
@Test
public void shouldCommitOffsets() throws RocksDBException {
+ dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
final TopicPartition tp0 = new TopicPartition("testTopic", 0);
final TopicPartition tp1 = new TopicPartition("testTopic", 1);
final Map<TopicPartition, Long> changelogOffsets = Map.of(tp0, 10L,
tp1, 20L);
accessor.commit(dbAccessor, changelogOffsets);
- verify(dbAccessor).flush(any(ColumnFamilyHandle[].class));
- verify(dbAccessor).put(eq(offsetsCF), eq(toBytes(tp0.toString())),
eq(toBytes(10L)));
- verify(dbAccessor).put(eq(offsetsCF), eq(toBytes(tp1.toString())),
eq(toBytes(20L)));
+ assertEquals(10L, accessor.getCommittedOffset(dbAccessor, tp0));
+ assertEquals(20L, accessor.getCommittedOffset(dbAccessor, tp1));
+ }
+
+ @Test
+ public void shouldCommitPosition() throws RocksDBException {
+ dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
+ final String topic = "testTopic";
+ final TopicPartition tp0 = new TopicPartition(topic, 0);
+ final TopicPartition tp1 = new TopicPartition(topic, 1);
+ final Position positionToStore = Position.fromMap(mkMap(mkEntry(topic,
mkMap(mkEntry(tp0.partition(), 10L), mkEntry(tp1.partition(), 20L)))));
+ accessor.commit(dbAccessor, positionToStore);
+ assertEquals(positionToStore,
PositionSerde.deserialize(ByteBuffer.wrap(dbAccessor.get(offsetsCF,
toBytes("position")))));
+ }
+
+ @Test
+ public void shouldWipeCommittedOffsetsOnEmptyCommit() throws
RocksDBException {
+ dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
+ final TopicPartition tp0 = new TopicPartition("testTopic", 0);
+ final TopicPartition tp1 = new TopicPartition("testTopic", 1);
+ accessor.commit(dbAccessor, Map.of(tp0, 10L, tp1, 20L));
+ assertEquals(10L, accessor.getCommittedOffset(dbAccessor, tp0));
+ assertEquals(20L, accessor.getCommittedOffset(dbAccessor, tp1));
+ accessor.commit(dbAccessor, Map.of());
+ assertNull(accessor.getCommittedOffset(dbAccessor, tp0));
+ assertNull(accessor.getCommittedOffset(dbAccessor, tp1));
}
private byte[] toBytes(final String s) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 3b2265959fe..861ed349ed2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -1626,7 +1626,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
}
@Test
- public void shouldLoadPositionFromFile() {
+ public void shouldMigrateExistingPositionFromFile() {
final Position position = Position.fromMap(mkMap(mkEntry("topic",
mkMap(mkEntry(0, 1L)))));
final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new
File(stateDir, storeName + ".position"));
StoreQueryUtils.checkpointPosition(positionCheckpoint, position);
@@ -1639,6 +1639,43 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
bytesStore.close();
}
+ @Test
+ public void shouldRestoreMergedPositionFromMultipleSegmentsAfterRestart() {
+ final AbstractDualSchemaRocksDBSegmentedBytesStore<KeyValueSegment>
bytesStore = getBytesStore();
+ // 0 segments initially.
+ bytesStore.init(context, bytesStore);
+
+ // Writes record to different partitions
+ context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "t1", new
RecordHeaders()));
+ bytesStore.put(serializeKey(new Windowed<>("a", windows[0])),
serializeValue(10));
+ context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "t1", new
RecordHeaders()));
+ bytesStore.put(serializeKey(new Windowed<>("a", windows[1])),
serializeValue(10));
+ context.setRecordContext(new ProcessorRecordContext(0, 1, 1, "t1", new
RecordHeaders()));
+ bytesStore.put(serializeKey(new Windowed<>("a", windows[2])),
serializeValue(10));
+ context.setRecordContext(new ProcessorRecordContext(0, 3, 1, "t1", new
RecordHeaders()));
+ bytesStore.put(serializeKey(new Windowed<>("a", windows[3])),
serializeValue(10));
+ final Position expected = Position.fromMap(mkMap(mkEntry("t1",
mkMap(mkEntry(0, 2L), mkEntry(1, 3L)))));
+
+ // Each open segment should share the same position.
+ for (final KeyValueSegment segment : bytesStore.getSegments()) {
+ assertEquals(expected, segment.getPosition());
+ }
+
+ // Persist the merged position and simulate a full store restart.
+ bytesStore.commit(Map.of());
+ bytesStore.segments.writePosition();
+ bytesStore.close();
+ bytesStore.init(context, bytesStore);
+
+ // The store-level position should be restored from the merged
position.
+ assertEquals(expected, bytesStore.getPosition());
+
+ // Restored segments should all have the same merged position.
+ for (final KeyValueSegment segment : bytesStore.getSegments()) {
+ assertEquals(expected, segment.getPosition());
+ }
+ }
+
private Set<String> segmentDirs() {
final File windowDir = new File(stateDir, storeName);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 5206f48ad40..201db4bebe3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -712,7 +712,7 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
@ParameterizedTest
@MethodSource("getKeySchemas")
- public void shouldLoadPositionFromFile(final SegmentedBytesStore.KeySchema
schema) {
+ public void shouldMigrateExistingPositionFromFile(final
SegmentedBytesStore.KeySchema schema) {
before(schema);
final Position position = Position.fromMap(mkMap(mkEntry("topic",
mkMap(mkEntry(0, 1L)))));
final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new
File(context.stateDir(), storeName + ".position"));
@@ -725,6 +725,47 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
assertEquals(position, bytesStore.getPosition());
}
+ @ParameterizedTest
+ @MethodSource("getKeySchemas")
+ public void
shouldRestoreMergedPositionFromMultipleSegmentsAfterRestart(final
SegmentedBytesStore.KeySchema schema) {
+ before(schema);
+ bytesStore = getBytesStore();
+ // 0 segments initially.
+ bytesStore.init(context, bytesStore);
+
+ // Writes record to different partitions
+ context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "t1", new
RecordHeaders()));
+ bytesStore.put(serializeKey(new Windowed<>("a", windows[0])),
serializeValue(10));
+ context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "t1", new
RecordHeaders()));
+ bytesStore.put(serializeKey(new Windowed<>("a", windows[1])),
serializeValue(10));
+ context.setRecordContext(new ProcessorRecordContext(0, 1, 1, "t1", new
RecordHeaders()));
+ bytesStore.put(serializeKey(new Windowed<>("a", windows[2])),
serializeValue(10));
+ context.setRecordContext(new ProcessorRecordContext(0, 3, 1, "t1", new
RecordHeaders()));
+ bytesStore.put(serializeKey(new Windowed<>("a", windows[3])),
serializeValue(10));
+ final Position expected = Position.fromMap(mkMap(mkEntry("t1",
mkMap(mkEntry(0, 2L), mkEntry(1, 3L)))));
+
+ // Each open segment should share the same position.
+ for (final S segment : bytesStore.getSegments()) {
+ assertEquals(expected, segment.getPosition());
+ }
+
+ // Persist the merged position and simulate a full store restart.
+ bytesStore.commit(Map.of());
+ for (final S segment : bytesStore.getSegments()) {
+ segment.writePosition();
+ }
+ bytesStore.close();
+ bytesStore.init(context, bytesStore);
+
+ // The store-level position should be restored from the merged
position.
+ assertEquals(expected, bytesStore.getPosition());
+
+ // Restored segments should all have the same merged position.
+ for (final S segment : bytesStore.getSegments()) {
+ assertEquals(expected, segment.getPosition());
+ }
+ }
+
private List<ConsumerRecord<byte[], byte[]>> getChangelogRecords() {
final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
final Headers headers = new RecordHeaders();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java
new file mode 100644
index 00000000000..6e25c97c881
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * An in-memory implementation of {@link RocksDBStore.DBAccessor} intended for
use in tests,
+ * eliminating the need for mocks. Data is kept in a {@link NavigableMap} per
+ * {@link ColumnFamilyHandle} (identified by object identity), providing full
key-ordering semantics
+ * equivalent to RocksDB's default comparator.
+ */
+public class InMemoryRocksDBAccessor implements RocksDBStore.DBAccessor {
+
+ /**
+ * Unsigned lexicographic byte-array comparator — matches RocksDB's
default comparator.
+ */
+ private static final Comparator<byte[]> BYTES_COMPARATOR = Arrays::compare;
+
+ /**
+ * Per-CF stores, keyed by {@link ColumnFamilyHandle} object identity.
+ */
+ private final Map<ColumnFamilyHandle, NavigableMap<byte[], byte[]>> stores
= new IdentityHashMap<>();
+ private final RocksDB rocksDB;
+
+ public InMemoryRocksDBAccessor(final RocksDB rocksDB) {
+ this.rocksDB = rocksDB;
+ }
+
+ private NavigableMap<byte[], byte[]> storeFor(final ColumnFamilyHandle
columnFamily) {
+ return stores.computeIfAbsent(columnFamily, cf -> new
TreeMap<>(BYTES_COMPARATOR));
+ }
+
+ @Override
+ public byte[] get(final ColumnFamilyHandle columnFamily, final byte[] key)
{
+ return storeFor(columnFamily).get(key);
+ }
+
+ @Override
+ public byte[] get(final ColumnFamilyHandle columnFamily, final ReadOptions
readOptions, final byte[] key) {
+ // ReadOptions (snapshots, fill-cache, etc.) are not meaningful
in-memory; delegate to plain get.
+ return get(columnFamily, key);
+ }
+
+ @Override
+ public RocksIterator newIterator(final ColumnFamilyHandle columnFamily) {
+ return new InMemoryRocksIterator(rocksDB, storeFor(columnFamily));
+ }
+
+ @Override
+ public void put(final ColumnFamilyHandle columnFamily, final byte[] key,
final byte[] value) {
+ if (value == null) {
+ delete(columnFamily, key);
+ } else {
+ storeFor(columnFamily).put(key, value);
+ }
+ }
+
+ @Override
+ public void delete(final ColumnFamilyHandle columnFamily, final byte[]
key) {
+ storeFor(columnFamily).remove(key);
+ }
+
+ @Override
+ public void deleteRange(final ColumnFamilyHandle columnFamily, final
byte[] from, final byte[] to) {
+ throw new UnsupportedOperationException("deleteRange not supported
in-memory");
+ }
+
+ @Override
+ public long approximateNumEntries(final ColumnFamilyHandle columnFamily) {
+ return storeFor(columnFamily).size();
+ }
+
+ @Override
+ public void flush(final ColumnFamilyHandle... columnFamilies) {
+ // No-op: in-memory writes are immediately durable.
+ }
+
+ @Override
+ public void reset() {
+ stores.clear();
+ }
+
+ @Override
+ public void close() {
+ // No native resources to release.
+ }
+
+ /**
+ * In-memory iterator backed by a navigable map.
+ */
+ private static class InMemoryRocksIterator extends RocksIterator {
+ private final NavigableMap<byte[], byte[]> data;
+ private byte[] currentKey;
+ private boolean valid;
+
+ InMemoryRocksIterator(final RocksDB rocksDB, final
NavigableMap<byte[], byte[]> data) {
+ super(rocksDB, 0L);
+ this.data = data;
+ this.currentKey = null;
+ this.valid = false;
+ }
+
+ @Override
+ protected void disposeInternal() {
+ // No native resources to release.
+ }
+
+ @Override
+ public boolean isValid() {
+ return valid && currentKey != null && data.containsKey(currentKey);
+ }
+
+ @Override
+ public void seekToFirst() {
+ if (data.isEmpty()) {
+ invalidate();
+ } else {
+ currentKey = data.firstKey();
+ valid = true;
+ }
+ }
+
+ @Override
+ public void seekToLast() {
+ if (data.isEmpty()) {
+ invalidate();
+ } else {
+ currentKey = data.lastKey();
+ valid = true;
+ }
+ }
+
+ @Override
+ public void seek(final byte[] target) {
+ final Map.Entry<byte[], byte[]> entry = data.ceilingEntry(target);
+ if (entry == null) {
+ invalidate();
+ } else {
+ currentKey = entry.getKey();
+ valid = true;
+ }
+ }
+
+ @Override
+ public void seekForPrev(final byte[] target) {
+ final Map.Entry<byte[], byte[]> entry = data.floorEntry(target);
+ if (entry == null) {
+ invalidate();
+ } else {
+ currentKey = entry.getKey();
+ valid = true;
+ }
+ }
+
+ @Override
+ public void seek(final ByteBuffer target) {
+ final ByteBuffer duplicate = target.duplicate();
+ final byte[] bytes = new byte[duplicate.remaining()];
+ duplicate.get(bytes);
+ seek(bytes);
+ }
+
+ @Override
+ public void seekForPrev(final ByteBuffer target) {
+ final ByteBuffer duplicate = target.duplicate();
+ final byte[] bytes = new byte[duplicate.remaining()];
+ duplicate.get(bytes);
+ seekForPrev(bytes);
+ }
+
+ @Override
+ public void next() {
+ final Map.Entry<byte[], byte[]> entry =
data.higherEntry(currentKey);
+ if (entry == null) {
+ invalidate();
+ } else {
+ currentKey = entry.getKey();
+ }
+ }
+
+ @Override
+ public void prev() {
+ if (!isValid()) {
+ return;
+ }
+ final Map.Entry<byte[], byte[]> entry =
data.lowerEntry(currentKey);
+ if (entry == null) {
+ invalidate();
+ } else {
+ currentKey = entry.getKey();
+ }
+ }
+
+ @Override
+ public byte[] key() {
+ return currentKey;
+ }
+
+ @Override
+ public byte[] value() {
+ return isValid() ? data.get(currentKey) : null;
+ }
+
+ @Override
+ public void status() throws RocksDBException {
+ // In-memory iterator never enters an error state.
+ }
+
+ @Override
+ public void close() {
+ invalidate();
+ }
+
+ private void invalidate() {
+ valid = false;
+ currentKey = null;
+ }
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 35d152bd07f..1a6a83848d7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -1255,7 +1255,7 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
}
@Test
- public void shouldLoadPositionFromFile() {
+ public void shouldMigrateExistingPositionFromFile() {
final Position position = Position.fromMap(mkMap(mkEntry("topic",
mkMap(mkEntry(0, 1L)))));
final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new
File(context.stateDir(), rocksDBStore.name + ".position"));
StoreQueryUtils.checkpointPosition(positionCheckpoint, position);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
index f843a38455a..340ca7eb216 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
@@ -834,7 +834,7 @@ public class RocksDBVersionedStoreTest {
}
@Test
- public void shouldLoadPositionFromFile() {
+ public void shouldMigrateExistingPositionFromFile() {
final Position position = Position.fromMap(mkMap(mkEntry("topic",
mkMap(mkEntry(0, 1L)))));
final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new
File(context.stateDir(), store.name() + ".position"));
StoreQueryUtils.checkpointPosition(positionCheckpoint, position);