This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new feba1af46b7 MINOR: Test Coverage for Segmented stores (#21860)
feba1af46b7 is described below

commit feba1af46b7ea1a77335141d73b5b753ba0dddbf
Author: Eduwer Camacaro <[email protected]>
AuthorDate: Tue Mar 31 11:43:15 2026 -0500

    MINOR: Test Coverage for Segmented stores (#21860)
    
    Follow-up PR for KAFKA-19713 that includes test cases for:
    
    - Persist Position offsets in the `offsets` ColumnFamily
    - Position gets correctly merged across all segments in Segmented stores
    - Persist position after every commit
    - Restore positions after store close->open
    
    This PR also refactors AbstractColumnFamily tests to not use a mock by
    creating an in-memory version of a RocksDBAccessor.
    
    Reviewers: Bill Bejeck <[email protected]>
    
    ---------
    
    Co-authored-by: Bill Bejeck <[email protected]>
---
 .../AbstractColumnFamilyAccessorTest.java          |  66 ++++--
 ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java |  39 +++-
 .../AbstractRocksDBSegmentedBytesStoreTest.java    |  43 +++-
 .../state/internals/InMemoryRocksDBAccessor.java   | 246 +++++++++++++++++++++
 .../streams/state/internals/RocksDBStoreTest.java  |   2 +-
 .../state/internals/RocksDBVersionedStoreTest.java |   2 +-
 6 files changed, 380 insertions(+), 18 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
index b1f1a1ec255..b06ee508746 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessorTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.query.Position;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -28,18 +29,21 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 
+import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.mock;
 
 @ExtendWith(MockitoExtension.class)
 abstract class AbstractColumnFamilyAccessorTest {
@@ -67,41 +71,75 @@ abstract class AbstractColumnFamilyAccessorTest {
 
     @Test
     public void shouldOpenClean() throws RocksDBException {
-        when(dbAccessor.get(offsetsCF, 
toBytes("status"))).thenReturn(closedValue);
-
+        dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
         // Open the ColumnFamily
         accessor.open(dbAccessor, false);
-        verify(dbAccessor).put(eq(offsetsCF), eq(toBytes("status")), 
eq(openValue));
+        assertArrayEquals(openValue, dbAccessor.get(offsetsCF, 
toBytes("status")));
 
         // Now close the ColumnFamily
         accessor.close(dbAccessor);
-        verify(dbAccessor).put(eq(offsetsCF), eq(toBytes("status")), 
eq(closedValue));
+        assertArrayEquals(closedValue, dbAccessor.get(offsetsCF, 
toBytes("status")));
+
+        // Open clean again
+        accessor.open(dbAccessor, false);
+        assertArrayEquals(openValue, dbAccessor.get(offsetsCF, 
toBytes("status")));
     }
 
     @Test
     public void shouldThrowOnOpenAfterAUncleanClose() throws RocksDBException {
-        when(dbAccessor.get(offsetsCF, 
toBytes("status"))).thenReturn(openValue);
+        dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
+        // First, open clean
+        accessor.open(dbAccessor, false);
+
+        // Try to open again, with ignoreUncleanClose=false, which should 
throw since the store is already open
         final ProcessorStateException thrown = 
assertThrowsExactly(ProcessorStateException.class, () -> 
accessor.open(dbAccessor, false));
         assertEquals("Invalid state during store open. Expected state to be 
either empty or closed", thrown.getMessage());
     }
 
     @Test
     public void shouldIgnoreExceptionAfterUncleanClose() throws 
RocksDBException {
-        when(dbAccessor.get(offsetsCF, 
toBytes("status"))).thenReturn(openValue);
+        dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
+        // First, open clean
+        accessor.open(dbAccessor, false);
+        // Now reopen in an invalid state
         accessor.open(dbAccessor, true);
         assertTrue(storeOpen.get());
-        verify(dbAccessor).put(eq(offsetsCF), eq(toBytes("status")), 
eq(openValue));
+        assertArrayEquals(openValue, dbAccessor.get(offsetsCF, 
toBytes("status")));
     }
 
     @Test
     public void shouldCommitOffsets() throws RocksDBException {
+        dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
         final TopicPartition tp0 = new TopicPartition("testTopic", 0);
         final TopicPartition tp1 = new TopicPartition("testTopic", 1);
         final Map<TopicPartition, Long> changelogOffsets = Map.of(tp0, 10L, 
tp1, 20L);
         accessor.commit(dbAccessor, changelogOffsets);
-        verify(dbAccessor).flush(any(ColumnFamilyHandle[].class));
-        verify(dbAccessor).put(eq(offsetsCF), eq(toBytes(tp0.toString())), 
eq(toBytes(10L)));
-        verify(dbAccessor).put(eq(offsetsCF), eq(toBytes(tp1.toString())), 
eq(toBytes(20L)));
+        assertEquals(10L, accessor.getCommittedOffset(dbAccessor, tp0));
+        assertEquals(20L, accessor.getCommittedOffset(dbAccessor, tp1));
+    }
+
+    @Test
+    public void shouldCommitPosition() throws RocksDBException {
+        dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
+        final String topic = "testTopic";
+        final TopicPartition tp0 = new TopicPartition(topic, 0);
+        final TopicPartition tp1 = new TopicPartition(topic, 1);
+        final Position positionToStore = Position.fromMap(mkMap(mkEntry(topic, 
mkMap(mkEntry(tp0.partition(), 10L), mkEntry(tp1.partition(), 20L)))));
+        accessor.commit(dbAccessor, positionToStore);
+        assertEquals(positionToStore, 
PositionSerde.deserialize(ByteBuffer.wrap(dbAccessor.get(offsetsCF, 
toBytes("position")))));
+    }
+
+    @Test
+    public void shouldWipeCommittedOffsetsOnEmptyCommit() throws 
RocksDBException {
+        dbAccessor = new InMemoryRocksDBAccessor(mock(RocksDB.class));
+        final TopicPartition tp0 = new TopicPartition("testTopic", 0);
+        final TopicPartition tp1 = new TopicPartition("testTopic", 1);
+        accessor.commit(dbAccessor, Map.of(tp0, 10L, tp1, 20L));
+        assertEquals(10L, accessor.getCommittedOffset(dbAccessor, tp0));
+        assertEquals(20L, accessor.getCommittedOffset(dbAccessor, tp1));
+        accessor.commit(dbAccessor, Map.of());
+        assertNull(accessor.getCommittedOffset(dbAccessor, tp0));
+        assertNull(accessor.getCommittedOffset(dbAccessor, tp1));
     }
 
     private byte[] toBytes(final String s) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 3b2265959fe..861ed349ed2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -1626,7 +1626,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
     }
 
     @Test
-    public void shouldLoadPositionFromFile() {
+    public void shouldMigrateExistingPositionFromFile() {
         final Position position = Position.fromMap(mkMap(mkEntry("topic", 
mkMap(mkEntry(0, 1L)))));
         final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new 
File(stateDir, storeName + ".position"));
         StoreQueryUtils.checkpointPosition(positionCheckpoint, position);
@@ -1639,6 +1639,43 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
         bytesStore.close();
     }
 
+    @Test
+    public void shouldRestoreMergedPositionFromMultipleSegmentsAfterRestart() {
+        final AbstractDualSchemaRocksDBSegmentedBytesStore<KeyValueSegment> 
bytesStore = getBytesStore();
+        // 0 segments initially.
+        bytesStore.init(context, bytesStore);
+
+        // Writes record to different partitions
+        context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "t1", new 
RecordHeaders()));
+        bytesStore.put(serializeKey(new Windowed<>("a", windows[0])), 
serializeValue(10));
+        context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "t1", new 
RecordHeaders()));
+        bytesStore.put(serializeKey(new Windowed<>("a", windows[1])), 
serializeValue(10));
+        context.setRecordContext(new ProcessorRecordContext(0, 1, 1, "t1", new 
RecordHeaders()));
+        bytesStore.put(serializeKey(new Windowed<>("a", windows[2])), 
serializeValue(10));
+        context.setRecordContext(new ProcessorRecordContext(0, 3, 1, "t1", new 
RecordHeaders()));
+        bytesStore.put(serializeKey(new Windowed<>("a", windows[3])), 
serializeValue(10));
+        final Position expected = Position.fromMap(mkMap(mkEntry("t1", 
mkMap(mkEntry(0, 2L), mkEntry(1, 3L)))));
+
+        // Each open segment should share the same position.
+        for (final KeyValueSegment segment : bytesStore.getSegments()) {
+            assertEquals(expected, segment.getPosition());
+        }
+
+        // Persist the merged position and simulate a full store restart.
+        bytesStore.commit(Map.of());
+        bytesStore.segments.writePosition();
+        bytesStore.close();
+        bytesStore.init(context, bytesStore);
+
+        // The store-level position should be restored from the merged 
position.
+        assertEquals(expected, bytesStore.getPosition());
+
+        // Restored segments should all have the same merged position.
+        for (final KeyValueSegment segment : bytesStore.getSegments()) {
+            assertEquals(expected, segment.getPosition());
+        }
+    }
+
     private Set<String> segmentDirs() {
         final File windowDir = new File(stateDir, storeName);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 5206f48ad40..201db4bebe3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -712,7 +712,7 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
 
     @ParameterizedTest
     @MethodSource("getKeySchemas")
-    public void shouldLoadPositionFromFile(final SegmentedBytesStore.KeySchema 
schema) {
+    public void shouldMigrateExistingPositionFromFile(final 
SegmentedBytesStore.KeySchema schema) {
         before(schema);
         final Position position = Position.fromMap(mkMap(mkEntry("topic", 
mkMap(mkEntry(0, 1L)))));
         final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new 
File(context.stateDir(), storeName + ".position"));
@@ -725,6 +725,47 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         assertEquals(position, bytesStore.getPosition());
     }
 
+    @ParameterizedTest
+    @MethodSource("getKeySchemas")
+    public void 
shouldRestoreMergedPositionFromMultipleSegmentsAfterRestart(final 
SegmentedBytesStore.KeySchema schema) {
+        before(schema);
+        bytesStore = getBytesStore();
+        // 0 segments initially.
+        bytesStore.init(context, bytesStore);
+
+        // Writes record to different partitions
+        context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "t1", new 
RecordHeaders()));
+        bytesStore.put(serializeKey(new Windowed<>("a", windows[0])), 
serializeValue(10));
+        context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "t1", new 
RecordHeaders()));
+        bytesStore.put(serializeKey(new Windowed<>("a", windows[1])), 
serializeValue(10));
+        context.setRecordContext(new ProcessorRecordContext(0, 1, 1, "t1", new 
RecordHeaders()));
+        bytesStore.put(serializeKey(new Windowed<>("a", windows[2])), 
serializeValue(10));
+        context.setRecordContext(new ProcessorRecordContext(0, 3, 1, "t1", new 
RecordHeaders()));
+        bytesStore.put(serializeKey(new Windowed<>("a", windows[3])), 
serializeValue(10));
+        final Position expected = Position.fromMap(mkMap(mkEntry("t1", 
mkMap(mkEntry(0, 2L), mkEntry(1, 3L)))));
+
+        // Each open segment should share the same position.
+        for (final S segment : bytesStore.getSegments()) {
+            assertEquals(expected, segment.getPosition());
+        }
+
+        // Persist the merged position and simulate a full store restart.
+        bytesStore.commit(Map.of());
+        for (final S segment : bytesStore.getSegments()) {
+            segment.writePosition();
+        }
+        bytesStore.close();
+        bytesStore.init(context, bytesStore);
+
+        // The store-level position should be restored from the merged 
position.
+        assertEquals(expected, bytesStore.getPosition());
+
+        // Restored segments should all have the same merged position.
+        for (final S segment : bytesStore.getSegments()) {
+            assertEquals(expected, segment.getPosition());
+        }
+    }
+
     private List<ConsumerRecord<byte[], byte[]>> getChangelogRecords() {
         final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
         final Headers headers = new RecordHeaders();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java
new file mode 100644
index 00000000000..6e25c97c881
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryRocksDBAccessor.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * An in-memory implementation of {@link RocksDBStore.DBAccessor} intended for 
use in tests,
+ * eliminating the need for mocks. Data is kept in a {@link NavigableMap} per
+ * {@link ColumnFamilyHandle} (identified by object identity), providing full 
key-ordering semantics
+ * equivalent to RocksDB's default comparator.
+ */
+public class InMemoryRocksDBAccessor implements RocksDBStore.DBAccessor {
+
+    /**
+     * Unsigned lexicographic byte-array comparator — matches RocksDB's 
default comparator.
+     */
+    private static final Comparator<byte[]> BYTES_COMPARATOR = Arrays::compare;
+
+    /**
+     * Per-CF stores, keyed by {@link ColumnFamilyHandle} object identity.
+     */
+    private final Map<ColumnFamilyHandle, NavigableMap<byte[], byte[]>> stores 
= new IdentityHashMap<>();
+    private final RocksDB rocksDB;
+
+    public InMemoryRocksDBAccessor(final RocksDB rocksDB) {
+        this.rocksDB = rocksDB;
+    }
+
+    private NavigableMap<byte[], byte[]> storeFor(final ColumnFamilyHandle 
columnFamily) {
+        return stores.computeIfAbsent(columnFamily, cf -> new 
TreeMap<>(BYTES_COMPARATOR));
+    }
+
+    @Override
+    public byte[] get(final ColumnFamilyHandle columnFamily, final byte[] key) 
{
+        return storeFor(columnFamily).get(key);
+    }
+
+    @Override
+    public byte[] get(final ColumnFamilyHandle columnFamily, final ReadOptions 
readOptions, final byte[] key) {
+        // ReadOptions (snapshots, fill-cache, etc.) are not meaningful 
in-memory; delegate to plain get.
+        return get(columnFamily, key);
+    }
+
+    @Override
+    public RocksIterator newIterator(final ColumnFamilyHandle columnFamily) {
+        return new InMemoryRocksIterator(rocksDB, storeFor(columnFamily));
+    }
+
+    @Override
+    public void put(final ColumnFamilyHandle columnFamily, final byte[] key, 
final byte[] value) {
+        if (value == null) {
+            delete(columnFamily, key);
+        } else {
+            storeFor(columnFamily).put(key, value);
+        }
+    }
+
+    @Override
+    public void delete(final ColumnFamilyHandle columnFamily, final byte[] 
key) {
+        storeFor(columnFamily).remove(key);
+    }
+
+    @Override
+    public void deleteRange(final ColumnFamilyHandle columnFamily, final 
byte[] from, final byte[] to) {
+        throw new UnsupportedOperationException("deleteRange not supported 
in-memory");
+    }
+
+    @Override
+    public long approximateNumEntries(final ColumnFamilyHandle columnFamily) {
+        return storeFor(columnFamily).size();
+    }
+
+    @Override
+    public void flush(final ColumnFamilyHandle... columnFamilies) {
+        // No-op: in-memory writes are immediately durable.
+    }
+
+    @Override
+    public void reset() {
+        stores.clear();
+    }
+
+    @Override
+    public void close() {
+        // No native resources to release.
+    }
+
+    /**
+     * In-memory iterator backed by a navigable map.
+     */
+    private static class InMemoryRocksIterator extends RocksIterator {
+        private final NavigableMap<byte[], byte[]> data;
+        private byte[] currentKey;
+        private boolean valid;
+
+        InMemoryRocksIterator(final RocksDB rocksDB, final 
NavigableMap<byte[], byte[]> data) {
+            super(rocksDB, 0L);
+            this.data = data;
+            this.currentKey = null;
+            this.valid = false;
+        }
+
+        @Override
+        protected void disposeInternal() {
+            // No native resources to release.
+        }
+
+        @Override
+        public boolean isValid() {
+            return valid && currentKey != null && data.containsKey(currentKey);
+        }
+
+        @Override
+        public void seekToFirst() {
+            if (data.isEmpty()) {
+                invalidate();
+            } else {
+                currentKey = data.firstKey();
+                valid = true;
+            }
+        }
+
+        @Override
+        public void seekToLast() {
+            if (data.isEmpty()) {
+                invalidate();
+            } else {
+                currentKey = data.lastKey();
+                valid = true;
+            }
+        }
+
+        @Override
+        public void seek(final byte[] target) {
+            final Map.Entry<byte[], byte[]> entry = data.ceilingEntry(target);
+            if (entry == null) {
+                invalidate();
+            } else {
+                currentKey = entry.getKey();
+                valid = true;
+            }
+        }
+
+        @Override
+        public void seekForPrev(final byte[] target) {
+            final Map.Entry<byte[], byte[]> entry = data.floorEntry(target);
+            if (entry == null) {
+                invalidate();
+            } else {
+                currentKey = entry.getKey();
+                valid = true;
+            }
+        }
+
+        @Override
+        public void seek(final ByteBuffer target) {
+            final ByteBuffer duplicate = target.duplicate();
+            final byte[] bytes = new byte[duplicate.remaining()];
+            duplicate.get(bytes);
+            seek(bytes);
+        }
+
+        @Override
+        public void seekForPrev(final ByteBuffer target) {
+            final ByteBuffer duplicate = target.duplicate();
+            final byte[] bytes = new byte[duplicate.remaining()];
+            duplicate.get(bytes);
+            seekForPrev(bytes);
+        }
+
+        @Override
+        public void next() {
+            final Map.Entry<byte[], byte[]> entry = 
data.higherEntry(currentKey);
+            if (entry == null) {
+                invalidate();
+            } else {
+                currentKey = entry.getKey();
+            }
+        }
+
+        @Override
+        public void prev() {
+            if (!isValid()) {
+                return;
+            }
+            final Map.Entry<byte[], byte[]> entry = 
data.lowerEntry(currentKey);
+            if (entry == null) {
+                invalidate();
+            } else {
+                currentKey = entry.getKey();
+            }
+        }
+
+        @Override
+        public byte[] key() {
+            return currentKey;
+        }
+
+        @Override
+        public byte[] value() {
+            return isValid() ? data.get(currentKey) : null;
+        }
+
+        @Override
+        public void status() throws RocksDBException {
+            // In-memory iterator never enters an error state.
+        }
+
+        @Override
+        public void close() {
+            invalidate();
+        }
+
+        private void invalidate() {
+            valid = false;
+            currentKey = null;
+        }
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 35d152bd07f..1a6a83848d7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -1255,7 +1255,7 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
     }
 
     @Test
-    public void shouldLoadPositionFromFile() {
+    public void shouldMigrateExistingPositionFromFile() {
         final Position position = Position.fromMap(mkMap(mkEntry("topic", 
mkMap(mkEntry(0, 1L)))));
         final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new 
File(context.stateDir(), rocksDBStore.name + ".position"));
         StoreQueryUtils.checkpointPosition(positionCheckpoint, position);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
index f843a38455a..340ca7eb216 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
@@ -834,7 +834,7 @@ public class RocksDBVersionedStoreTest {
     }
 
     @Test
-    public void shouldLoadPositionFromFile() {
+    public void shouldMigrateExistingPositionFromFile() {
         final Position position = Position.fromMap(mkMap(mkEntry("topic", 
mkMap(mkEntry(0, 1L)))));
         final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new 
File(context.stateDir(), store.name() + ".position"));
         StoreQueryUtils.checkpointPosition(positionCheckpoint, position);

Reply via email to