This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 53b1a77b59a KAFKA-19713: Persist Position into offsets CF (#21750)
53b1a77b59a is described below

commit 53b1a77b59ae75568d6371a8b9bc36b3d671fc14
Author: Eduwer Camacaro <[email protected]>
AuthorDate: Wed Mar 18 07:26:25 2026 -0500

    KAFKA-19713: Persist Position into offsets CF (#21750)
    
    As part of the implementation of KIP-1035, this PR adds the ability to
    store the position offsets into the `Offsets` ColumnFamily.
    
    Also, it aims to migrate any pre-existing position file into the offsets
    CF.
    
    Reviewers: Bill Bejeck <[email protected]>
    
    ---------
    
    Co-authored-by: Bill Bejeck <[email protected]>
---
 .../internals/AbstractColumnFamilyAccessor.java    | 16 +++++++++++-
 ...stractDualSchemaRocksDBSegmentedBytesStore.java | 11 +++-----
 .../AbstractRocksDBSegmentedBytesStore.java        | 10 +++-----
 .../streams/state/internals/AbstractSegments.java  | 10 ++++----
 .../state/internals/LogicalKeyValueSegment.java    |  5 ++++
 .../state/internals/LogicalKeyValueSegments.java   |  7 +----
 .../streams/state/internals/RocksDBStore.java      | 30 ++++++++++++++++------
 .../state/internals/RocksDBVersionedStore.java     | 10 +++-----
 .../kafka/streams/state/internals/Segment.java     |  2 ++
 .../streams/state/internals/StoreQueryUtils.java   |  9 +++++++
 ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 14 ++++++++++
 .../AbstractRocksDBSegmentedBytesStoreTest.java    | 15 +++++++++++
 .../internals/LogicalKeyValueSegmentsTest.java     |  2 --
 .../streams/state/internals/RocksDBStoreTest.java  | 10 ++++++++
 .../state/internals/RocksDBVersionedStoreTest.java | 18 +++++++++++++
 15 files changed, 125 insertions(+), 44 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
index 5a4a85df98e..0c2e157956f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
@@ -20,10 +20,12 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.query.Position;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -39,6 +41,7 @@ abstract class AbstractColumnFamilyAccessor implements 
RocksDBStore.ColumnFamily
     private final StringSerializer stringSerializer = new StringSerializer();
     private final Serdes.LongSerde longSerde = new Serdes.LongSerde();
     private final byte[] statusKey = stringSerializer.serialize(null, 
"status");
+    private final byte[] positionKey = stringSerializer.serialize(null, 
"position");
     private final byte[] openState = longSerde.serializer().serialize(null, 
1L);
     private final byte[] closedState = longSerde.serializer().serialize(null, 
0L);
     private final AtomicBoolean storeOpen;
@@ -62,12 +65,23 @@ abstract class AbstractColumnFamilyAccessor implements 
RocksDBStore.ColumnFamily
     }
 
     @Override
-    public final void open(final RocksDBStore.DBAccessor accessor, final 
boolean ignoreInvalidState) throws RocksDBException {
+    public final void commit(final RocksDBStore.DBAccessor accessor, final 
Position storePosition) throws RocksDBException {
+        accessor.put(offsetColumnFamilyHandle, positionKey, 
PositionSerde.serialize(storePosition).array());
+    }
+
+    @Override
+    public final Position open(final RocksDBStore.DBAccessor accessor, final 
boolean ignoreInvalidState) throws RocksDBException {
         final byte[] valueBytes = accessor.get(offsetColumnFamilyHandle, 
statusKey);
         if (ignoreInvalidState || (valueBytes == null || 
Arrays.equals(valueBytes, closedState))) {
             // If the status key is not present, we initialize it to "OPEN"
             accessor.put(offsetColumnFamilyHandle, statusKey, openState);
             storeOpen.set(true);
+            final byte[] positionBytes = 
accessor.get(offsetColumnFamilyHandle, positionKey);
+            if (positionBytes != null) {
+                return 
PositionSerde.deserialize(ByteBuffer.wrap(positionBytes));
+            } else {
+                return Position.emptyPosition();
+            }
         } else {
             throw new ProcessorStateException("Invalid state during store 
open. Expected state to be either empty or closed");
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
index 85b85855a36..4b1ff1e0f1d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
@@ -38,7 +38,6 @@ import org.rocksdb.WriteBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -61,7 +60,6 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
     protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
     protected boolean consistencyEnabled = false;
     protected Position position;
-    protected OffsetCheckpoint positionCheckpoint;
     private volatile boolean open;
 
     AbstractDualSchemaRocksDBSegmentedBytesStore(final String name,
@@ -254,18 +252,15 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
             metrics
         );
 
-        final File positionCheckpointFile = new 
File(stateStoreContext.stateDir(), name() + ".position");
-        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
-        this.position = 
StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
-        segments.setPosition(this.position);
-
         segments.openExisting(internalProcessorContext, observedStreamTime);
+        this.position = segments.position;
+        
StoreQueryUtils.maybeMigrateExistingPositionFile(stateStoreContext.stateDir(), 
name(), this.position);
 
         // register and possibly restore the state from the logs
         stateStoreContext.register(
             root,
             (RecordBatchingStateRestoreCallback) this::restoreAllInternal,
-            () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, 
position)
+            segments::writePosition
         );
 
         open = true;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index 2baae916e34..9a4d2458848 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -40,7 +40,6 @@ import org.rocksdb.WriteBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -62,7 +61,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends 
Segment> implements Se
     private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
     private boolean consistencyEnabled = false;
     private Position position;
-    protected OffsetCheckpoint positionCheckpoint;
     private volatile boolean open;
 
     AbstractRocksDBSegmentedBytesStore(final String name,
@@ -296,17 +294,15 @@ public class AbstractRocksDBSegmentedBytesStore<S extends 
Segment> implements Se
                 metrics
         );
 
-        final File positionCheckpointFile = new 
File(stateStoreContext.stateDir(), name() + ".position");
-        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
-        this.position = 
StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
-        segments.setPosition(position);
         segments.openExisting(internalProcessorContext, observedStreamTime);
+        this.position = segments.position;
+        
StoreQueryUtils.maybeMigrateExistingPositionFile(stateStoreContext.stateDir(), 
name(), this.position);
 
         // register and possibly restore the state from the logs
         stateStoreContext.register(
             root,
             (RecordBatchingStateRestoreCallback) this::restoreAllInternal,
-            () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, 
position)
+                segments::writePosition
         );
 
         open = true;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
index d232f008de7..c5834c49231 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
@@ -46,7 +46,7 @@ abstract class AbstractSegments<S extends Segment> implements 
Segments<S> {
     private final long retentionPeriod;
     private final long segmentInterval;
     private final SimpleDateFormat formatter;
-    Position position;
+    protected final Position position = Position.emptyPosition();
 
     AbstractSegments(final String name, final long retentionPeriod, final long 
segmentInterval) {
         this.name = name;
@@ -57,14 +57,14 @@ abstract class AbstractSegments<S extends Segment> 
implements Segments<S> {
         this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
     }
 
+    protected final void writePosition() {
+        segments.forEach((id, segment) -> segment.writePosition());
+    }
+
     protected abstract S createSegment(long segmentId, String segmentName);
 
     protected abstract void openSegmentDB(final S segment, final 
StateStoreContext context);
 
-    public void setPosition(final Position position) {
-        this.position = position;
-    }
-
     @Override
     public long segmentId(final long timestamp) {
         return timestamp / segmentInterval;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
index daa98142886..1adbac447b0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
@@ -143,6 +143,11 @@ class LogicalKeyValueSegment implements Segment, 
VersionedStoreSegment {
         throw new UnsupportedOperationException("nothing to commit for logical 
segment");
     }
 
+    @Override
+    public void writePosition() {
+        physicalStore.writePosition();
+    }
+
     @Override
     public synchronized void close() {
         // close open iterators
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
index 6d2ebee7638..34e1ec42d77 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
-import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
 
 import java.util.HashMap;
@@ -56,11 +55,6 @@ public class LogicalKeyValueSegments extends 
AbstractSegments<LogicalKeyValueSeg
         this.physicalStore = new RocksDBStore(name, parentDir, 
metricsRecorder, false);
     }
 
-    @Override
-    public void setPosition(final Position position) {
-        this.physicalStore.position = position;
-    }
-
     @Override
     protected LogicalKeyValueSegment createSegment(final long segmentId, final 
String segmentName) {
         if (segmentId < 0) {
@@ -100,6 +94,7 @@ public class LogicalKeyValueSegments extends 
AbstractSegments<LogicalKeyValueSeg
     public void openExisting(final StateStoreContext context, final long 
streamTime) {
         metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), 
context.taskId());
         physicalStore.openDB(context.appConfigs(), context.stateDir());
+        position.merge(physicalStore.getPosition());
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index c2628a863af..8de21f4d40f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -140,7 +140,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
 
     protected StateStoreContext context;
     protected Position position;
-    private OffsetCheckpoint positionCheckpoint;
 
     public RocksDBStore(final String name,
                         final String metricsScope) {
@@ -169,10 +168,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
         // open the DB dir
         metricsRecorder.init(metricsImpl(stateStoreContext), 
stateStoreContext.taskId());
         openDB(stateStoreContext.appConfigs(), stateStoreContext.stateDir());
-
-        final File positionCheckpointFile = new 
File(stateStoreContext.stateDir(), name() + ".position");
-        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
-        this.position = 
StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+        
StoreQueryUtils.maybeMigrateExistingPositionFile(stateStoreContext.stateDir(), 
name(), position);
 
         // value getter should always read directly from rocksDB
         // since it is only for values that are already flushed
@@ -180,7 +176,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
         stateStoreContext.register(
             root,
             (RecordBatchingStateRestoreCallback) this::restoreBatch,
-            () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, 
position)
+                this::writePosition
         );
         consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
             stateStoreContext.appConfigs(),
@@ -252,7 +248,13 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
         openRocksDB(dbOptions, columnFamilyOptions);
         dbAccessor = new DirectDBAccessor(db, fOptions, wOptions);
         try {
-            cfAccessor.open(dbAccessor, !eosEnabled);
+            final Position existingPositionOrEmpty = 
cfAccessor.open(dbAccessor, !eosEnabled);
+            if (position == null) {
+                position = existingPositionOrEmpty;
+            } else {
+                // For segmented stores, the overall position is composed of 
multiple underlying stores, so merge this store's position into it.
+                position.merge(existingPositionOrEmpty);
+            }
         } catch (final StreamsException fatal) {
             final String fatalMessage = "State store " + name + " didn't find 
a valid state, since under EOS it has the risk of getting uncommitted data in 
stores";
             throw new ProcessorStateException(fatalMessage, fatal);
@@ -396,6 +398,15 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
         return columnFamilies;
     }
 
+    public final void writePosition() {
+        validateStoreOpen();
+        try {
+            cfAccessor.commit(dbAccessor, position);
+        } catch (final RocksDBException e) {
+            log.warn("Error while committing position for store {}", name, e);
+        }
+    }
+
     @Override
     public String name() {
         return name;
@@ -906,6 +917,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
 
         void commit(final DBAccessor accessor, final Map<TopicPartition, Long> 
changelogOffsets) throws RocksDBException;
 
+        void commit(final DBAccessor accessor, final Position storePosition) 
throws RocksDBException;
+
         void addToBatch(final byte[] key,
                         final byte[] value,
                         final WriteBatchInterface batch) throws 
RocksDBException;
@@ -914,9 +927,10 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
 
         /**
          * Initializes the ColumnFamily.
+         * @return the position of the store based on the data in the 
ColumnFamily. If no offset position is found, an empty position is returned.
          * @throws StreamsException if an invalid state is found and 
ignoreInvalidState is false
          */
-        void open(final RocksDBStore.DBAccessor accessor, final boolean 
ignoreInvalidState) throws RocksDBException, StreamsException;
+        Position open(final RocksDBStore.DBAccessor accessor, final boolean 
ignoreInvalidState) throws RocksDBException, StreamsException;
 
         Long getCommittedOffset(final RocksDBStore.DBAccessor accessor, final 
TopicPartition partition) throws RocksDBException;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
index ceb2c8cb8a8..3ccf8f6b663 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
@@ -50,7 +50,6 @@ import org.rocksdb.WriteBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -108,7 +107,6 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
     private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
     private boolean consistencyEnabled = false;
     private Position position;
-    private OffsetCheckpoint positionCheckpoint;
     private volatile boolean open;
 
     RocksDBVersionedStore(final String name, final String metricsScope, final 
long historyRetention, final long segmentInterval) {
@@ -366,17 +364,15 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
 
         
metricsRecorder.init(ProcessorContextUtils.metricsImpl(stateStoreContext), 
stateStoreContext.taskId());
 
-        final File positionCheckpointFile = new 
File(stateStoreContext.stateDir(), name() + ".position");
-        positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
-        position = 
StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
-        segmentStores.setPosition(position);
         segmentStores.openExisting(internalProcessorContext, 
observedStreamTime);
+        this.position = segmentStores.position;
+        
StoreQueryUtils.maybeMigrateExistingPositionFile(stateStoreContext.stateDir(), 
name(), this.position);
 
         // register and possibly restore the state from the logs
         stateStoreContext.register(
                 root,
                 (RecordBatchingStateRestoreCallback) 
RocksDBVersionedStore.this::restoreBatch,
-                () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, 
position)
+                segmentStores::writePosition
         );
 
         open = true;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index 598b396f3fd..d19bb09ad76 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -29,6 +29,8 @@ public interface Segment extends KeyValueStore<Bytes, 
byte[]>, BatchWritingStore
 
     void deleteRange(Bytes keyFrom, Bytes keyTo);
 
+    void writePosition();
+
     @Override
     default int compareTo(final Segment segment) {
         return Long.compare(id(), segment.id());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index 3a3e79570ca..0330ebd05de 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -49,6 +49,7 @@ import org.apache.kafka.streams.state.VersionedRecordIterator;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -165,6 +166,14 @@ public final class StoreQueryUtils {
         }
     }
 
+    public static void maybeMigrateExistingPositionFile(final File stateDir, 
final String storeName, final Position position) {
+        final File positionCheckpointFile = new File(stateDir, storeName + 
".position");
+        if (positionCheckpointFile.exists()) {
+            final Position existingPosition = readPositionFromCheckpoint(new 
OffsetCheckpoint(positionCheckpointFile));
+            position.merge(existingPosition);
+        }
+    }
+
     public static boolean isPermitted(
         final Position position,
         final PositionBound positionBound,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index d3c7c2dec63..2cb1f410c7f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -1629,6 +1629,20 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
         bytesStore.close();
     }
 
+    @Test
+    public void shouldLoadPositionFromFile() {
+        final Position position = Position.fromMap(mkMap(mkEntry("topic", 
mkMap(mkEntry(0, 1L)))));
+        final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new 
File(stateDir, storeName + ".position"));
+        StoreQueryUtils.checkpointPosition(positionCheckpoint, position);
+
+        final AbstractDualSchemaRocksDBSegmentedBytesStore<KeyValueSegment> 
bytesStore = getBytesStore();
+
+        // store.init migrates the position from the legacy checkpoint file 
into the store.
+        bytesStore.init(context, bytesStore);
+        assertEquals(position, bytesStore.getPosition());
+        bytesStore.close();
+    }
+
     private Set<String> segmentDirs() {
         final File windowDir = new File(stateDir, storeName);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 38cb1a268fe..5206f48ad40 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -710,6 +710,21 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         assertThat(bytesStore.getPosition(), is(Position.emptyPosition()));
     }
 
+    @ParameterizedTest
+    @MethodSource("getKeySchemas")
+    public void shouldLoadPositionFromFile(final SegmentedBytesStore.KeySchema 
schema) {
+        before(schema);
+        final Position position = Position.fromMap(mkMap(mkEntry("topic", 
mkMap(mkEntry(0, 1L)))));
+        final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new 
File(context.stateDir(), storeName + ".position"));
+        StoreQueryUtils.checkpointPosition(positionCheckpoint, position);
+
+        final AbstractRocksDBSegmentedBytesStore<S> bytesStore = 
getBytesStore();
+
+        // store.init migrates the position from the legacy checkpoint file 
into the store.
+        bytesStore.init(context, bytesStore);
+        assertEquals(position, bytesStore.getPosition());
+    }
+
     private List<ConsumerRecord<byte[], byte[]>> getChangelogRecords() {
         final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
         final Headers headers = new RecordHeaders();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
index 634e3458680..4e053da9b7c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
 import org.apache.kafka.test.InternalMockProcessorContext;
@@ -72,7 +71,6 @@ public class LogicalKeyValueSegmentsTest {
             SEGMENT_INTERVAL,
             new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME)
         );
-        segments.setPosition(Position.emptyPosition());
         segments.openExisting(context, 0L);
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index bda152838d2..35d152bd07f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -1254,6 +1254,16 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
         assertThat(rocksDBStore.getPosition(), is(Position.emptyPosition()));
     }
 
+    @Test
+    public void shouldLoadPositionFromFile() {
+        final Position position = Position.fromMap(mkMap(mkEntry("topic", 
mkMap(mkEntry(0, 1L)))));
+        final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new 
File(context.stateDir(), rocksDBStore.name + ".position"));
+        StoreQueryUtils.checkpointPosition(positionCheckpoint, position);
+
+        rocksDBStore.init(context, rocksDBStore);
+        assertEquals(position, rocksDBStore.getPosition());
+    }
+
     private List<ConsumerRecord<byte[], byte[]>> getChangelogRecords() {
         final List<ConsumerRecord<byte[], byte[]>> entries = new ArrayList<>();
         entries.add(createChangelogRecord("1".getBytes(UTF_8), 
"a".getBytes(UTF_8), "", 0, 1));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
index 19fbd9f486c..f843a38455a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
@@ -28,6 +28,7 @@ import 
org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.ResultOrder;
 import org.apache.kafka.streams.state.VersionedRecord;
 import org.apache.kafka.streams.state.VersionedRecordIterator;
@@ -39,6 +40,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -831,6 +833,22 @@ public class RocksDBVersionedStoreTest {
         verifyExpiredRecordSensor(1);
     }
 
+    @Test
+    public void shouldLoadPositionFromFile() {
+        final Position position = Position.fromMap(mkMap(mkEntry("topic", 
mkMap(mkEntry(0, 1L)))));
+        final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new 
File(context.stateDir(), store.name() + ".position"));
+        StoreQueryUtils.checkpointPosition(positionCheckpoint, position);
+
+
+
+        store.close();
+        store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 
HISTORY_RETENTION, SEGMENT_INTERVAL);
+        // store.init migrates the position from the legacy checkpoint file 
into the store.
+        store.init(context, store);
+        assertEquals(position, store.getPosition());
+        store.close();
+    }
+
     private void putToStore(final String key, final String value, final long 
timestamp, final long expectedValidTo) {
         final long validTo = store.put(
             new Bytes(STRING_SERIALIZER.serialize(null, key)),

Reply via email to