This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 53b1a77b59a KAFKA-19713: Persist Position into offsets CF (#21750)
53b1a77b59a is described below
commit 53b1a77b59ae75568d6371a8b9bc36b3d671fc14
Author: Eduwer Camacaro <[email protected]>
AuthorDate: Wed Mar 18 07:26:25 2026 -0500
KAFKA-19713: Persist Position into offsets CF (#21750)
As part of the implementation of KIP-1035, this PR adds the ability to
store the position offsets into the `Offsets` ColumnFamily.
Also, it aims to migrate any pre-existing position file into the offsets
CF.
Reviewers: Bill Bejeck <[email protected]>
---------
Co-authored-by: Bill Bejeck <[email protected]>
---
.../internals/AbstractColumnFamilyAccessor.java | 16 +++++++++++-
...stractDualSchemaRocksDBSegmentedBytesStore.java | 11 +++-----
.../AbstractRocksDBSegmentedBytesStore.java | 10 +++-----
.../streams/state/internals/AbstractSegments.java | 10 ++++----
.../state/internals/LogicalKeyValueSegment.java | 5 ++++
.../state/internals/LogicalKeyValueSegments.java | 7 +----
.../streams/state/internals/RocksDBStore.java | 30 ++++++++++++++++------
.../state/internals/RocksDBVersionedStore.java | 10 +++-----
.../kafka/streams/state/internals/Segment.java | 2 ++
.../streams/state/internals/StoreQueryUtils.java | 9 +++++++
...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 14 ++++++++++
.../AbstractRocksDBSegmentedBytesStoreTest.java | 15 +++++++++++
.../internals/LogicalKeyValueSegmentsTest.java | 2 --
.../streams/state/internals/RocksDBStoreTest.java | 10 ++++++++
.../state/internals/RocksDBVersionedStoreTest.java | 18 +++++++++++++
15 files changed, 125 insertions(+), 44 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
index 5a4a85df98e..0c2e157956f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
@@ -20,10 +20,12 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.query.Position;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -39,6 +41,7 @@ abstract class AbstractColumnFamilyAccessor implements
RocksDBStore.ColumnFamily
private final StringSerializer stringSerializer = new StringSerializer();
private final Serdes.LongSerde longSerde = new Serdes.LongSerde();
private final byte[] statusKey = stringSerializer.serialize(null,
"status");
+ private final byte[] positionKey = stringSerializer.serialize(null,
"position");
private final byte[] openState = longSerde.serializer().serialize(null,
1L);
private final byte[] closedState = longSerde.serializer().serialize(null,
0L);
private final AtomicBoolean storeOpen;
@@ -62,12 +65,23 @@ abstract class AbstractColumnFamilyAccessor implements
RocksDBStore.ColumnFamily
}
@Override
- public final void open(final RocksDBStore.DBAccessor accessor, final
boolean ignoreInvalidState) throws RocksDBException {
+ public final void commit(final RocksDBStore.DBAccessor accessor, final
Position storePosition) throws RocksDBException {
+ accessor.put(offsetColumnFamilyHandle, positionKey,
PositionSerde.serialize(storePosition).array());
+ }
+
+ @Override
+ public final Position open(final RocksDBStore.DBAccessor accessor, final
boolean ignoreInvalidState) throws RocksDBException {
final byte[] valueBytes = accessor.get(offsetColumnFamilyHandle,
statusKey);
if (ignoreInvalidState || (valueBytes == null ||
Arrays.equals(valueBytes, closedState))) {
// If the status key is not present, we initialize it to "OPEN"
accessor.put(offsetColumnFamilyHandle, statusKey, openState);
storeOpen.set(true);
+ final byte[] positionBytes =
accessor.get(offsetColumnFamilyHandle, positionKey);
+ if (positionBytes != null) {
+ return
PositionSerde.deserialize(ByteBuffer.wrap(positionBytes));
+ } else {
+ return Position.emptyPosition();
+ }
} else {
throw new ProcessorStateException("Invalid state during store
open. Expected state to be either empty or closed");
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
index 85b85855a36..4b1ff1e0f1d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
@@ -38,7 +38,6 @@ import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -61,7 +60,6 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
protected boolean consistencyEnabled = false;
protected Position position;
- protected OffsetCheckpoint positionCheckpoint;
private volatile boolean open;
AbstractDualSchemaRocksDBSegmentedBytesStore(final String name,
@@ -254,18 +252,15 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
metrics
);
- final File positionCheckpointFile = new
File(stateStoreContext.stateDir(), name() + ".position");
- this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
- this.position =
StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
- segments.setPosition(this.position);
-
segments.openExisting(internalProcessorContext, observedStreamTime);
+ this.position = segments.position;
+
StoreQueryUtils.maybeMigrateExistingPositionFile(stateStoreContext.stateDir(),
name(), this.position);
// register and possibly restore the state from the logs
stateStoreContext.register(
root,
(RecordBatchingStateRestoreCallback) this::restoreAllInternal,
- () -> StoreQueryUtils.checkpointPosition(positionCheckpoint,
position)
+ segments::writePosition
);
open = true;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index 2baae916e34..9a4d2458848 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -40,7 +40,6 @@ import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -62,7 +61,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
private boolean consistencyEnabled = false;
private Position position;
- protected OffsetCheckpoint positionCheckpoint;
private volatile boolean open;
AbstractRocksDBSegmentedBytesStore(final String name,
@@ -296,17 +294,15 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
metrics
);
- final File positionCheckpointFile = new
File(stateStoreContext.stateDir(), name() + ".position");
- this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
- this.position =
StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
- segments.setPosition(position);
segments.openExisting(internalProcessorContext, observedStreamTime);
+ this.position = segments.position;
+
StoreQueryUtils.maybeMigrateExistingPositionFile(stateStoreContext.stateDir(),
name(), this.position);
// register and possibly restore the state from the logs
stateStoreContext.register(
root,
(RecordBatchingStateRestoreCallback) this::restoreAllInternal,
- () -> StoreQueryUtils.checkpointPosition(positionCheckpoint,
position)
+ segments::writePosition
);
open = true;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
index d232f008de7..c5834c49231 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
@@ -46,7 +46,7 @@ abstract class AbstractSegments<S extends Segment> implements
Segments<S> {
private final long retentionPeriod;
private final long segmentInterval;
private final SimpleDateFormat formatter;
- Position position;
+ protected final Position position = Position.emptyPosition();
AbstractSegments(final String name, final long retentionPeriod, final long
segmentInterval) {
this.name = name;
@@ -57,14 +57,14 @@ abstract class AbstractSegments<S extends Segment>
implements Segments<S> {
this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
}
+ protected final void writePosition() {
+ segments.forEach((id, segment) -> segment.writePosition());
+ }
+
protected abstract S createSegment(long segmentId, String segmentName);
protected abstract void openSegmentDB(final S segment, final
StateStoreContext context);
- public void setPosition(final Position position) {
- this.position = position;
- }
-
@Override
public long segmentId(final long timestamp) {
return timestamp / segmentInterval;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
index daa98142886..1adbac447b0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
@@ -143,6 +143,11 @@ class LogicalKeyValueSegment implements Segment,
VersionedStoreSegment {
throw new UnsupportedOperationException("nothing to commit for logical
segment");
}
+ @Override
+ public void writePosition() {
+ physicalStore.writePosition();
+ }
+
@Override
public synchronized void close() {
// close open iterators
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
index 6d2ebee7638..34e1ec42d77 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
-import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
import java.util.HashMap;
@@ -56,11 +55,6 @@ public class LogicalKeyValueSegments extends
AbstractSegments<LogicalKeyValueSeg
this.physicalStore = new RocksDBStore(name, parentDir,
metricsRecorder, false);
}
- @Override
- public void setPosition(final Position position) {
- this.physicalStore.position = position;
- }
-
@Override
protected LogicalKeyValueSegment createSegment(final long segmentId, final
String segmentName) {
if (segmentId < 0) {
@@ -100,6 +94,7 @@ public class LogicalKeyValueSegments extends
AbstractSegments<LogicalKeyValueSeg
public void openExisting(final StateStoreContext context, final long
streamTime) {
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context),
context.taskId());
physicalStore.openDB(context.appConfigs(), context.stateDir());
+ position.merge(physicalStore.getPosition());
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index c2628a863af..8de21f4d40f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -140,7 +140,6 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
protected StateStoreContext context;
protected Position position;
- private OffsetCheckpoint positionCheckpoint;
public RocksDBStore(final String name,
final String metricsScope) {
@@ -169,10 +168,7 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
// open the DB dir
metricsRecorder.init(metricsImpl(stateStoreContext),
stateStoreContext.taskId());
openDB(stateStoreContext.appConfigs(), stateStoreContext.stateDir());
-
- final File positionCheckpointFile = new
File(stateStoreContext.stateDir(), name() + ".position");
- this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
- this.position =
StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+
StoreQueryUtils.maybeMigrateExistingPositionFile(stateStoreContext.stateDir(),
name(), position);
// value getter should always read directly from rocksDB
// since it is only for values that are already flushed
@@ -180,7 +176,7 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
stateStoreContext.register(
root,
(RecordBatchingStateRestoreCallback) this::restoreBatch,
- () -> StoreQueryUtils.checkpointPosition(positionCheckpoint,
position)
+ this::writePosition
);
consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
stateStoreContext.appConfigs(),
@@ -252,7 +248,13 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
openRocksDB(dbOptions, columnFamilyOptions);
dbAccessor = new DirectDBAccessor(db, fOptions, wOptions);
try {
- cfAccessor.open(dbAccessor, !eosEnabled);
+ final Position existingPositionOrEmpty =
cfAccessor.open(dbAccessor, !eosEnabled);
+ if (position == null) {
+ position = existingPositionOrEmpty;
+ } else {
+ // For segmented stores, the overall position is composed of
multiple underlying stores, so merge this store's position into it.
+ position.merge(existingPositionOrEmpty);
+ }
} catch (final StreamsException fatal) {
final String fatalMessage = "State store " + name + " didn't find
a valid state, since under EOS it has the risk of getting uncommitted data in
stores";
throw new ProcessorStateException(fatalMessage, fatal);
@@ -396,6 +398,15 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
return columnFamilies;
}
+ public final void writePosition() {
+ validateStoreOpen();
+ try {
+ cfAccessor.commit(dbAccessor, position);
+ } catch (final RocksDBException e) {
+ log.warn("Error while committing position for store {}", name, e);
+ }
+ }
+
@Override
public String name() {
return name;
@@ -906,6 +917,8 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
void commit(final DBAccessor accessor, final Map<TopicPartition, Long>
changelogOffsets) throws RocksDBException;
+ void commit(final DBAccessor accessor, final Position storePosition)
throws RocksDBException;
+
void addToBatch(final byte[] key,
final byte[] value,
final WriteBatchInterface batch) throws
RocksDBException;
@@ -914,9 +927,10 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
/**
* Initializes the ColumnFamily.
+ * @return the position of the store based on the data in the
ColumnFamily. If no offset position is found, an empty position is returned.
* @throws StreamsException if an invalid state is found and
ignoreInvalidState is false
*/
- void open(final RocksDBStore.DBAccessor accessor, final boolean
ignoreInvalidState) throws RocksDBException, StreamsException;
+ Position open(final RocksDBStore.DBAccessor accessor, final boolean
ignoreInvalidState) throws RocksDBException, StreamsException;
Long getCommittedOffset(final RocksDBStore.DBAccessor accessor, final
TopicPartition partition) throws RocksDBException;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
index ceb2c8cb8a8..3ccf8f6b663 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
@@ -50,7 +50,6 @@ import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
@@ -108,7 +107,6 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
private boolean consistencyEnabled = false;
private Position position;
- private OffsetCheckpoint positionCheckpoint;
private volatile boolean open;
RocksDBVersionedStore(final String name, final String metricsScope, final
long historyRetention, final long segmentInterval) {
@@ -366,17 +364,15 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
metricsRecorder.init(ProcessorContextUtils.metricsImpl(stateStoreContext),
stateStoreContext.taskId());
- final File positionCheckpointFile = new
File(stateStoreContext.stateDir(), name() + ".position");
- positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
- position =
StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
- segmentStores.setPosition(position);
segmentStores.openExisting(internalProcessorContext,
observedStreamTime);
+ this.position = segmentStores.position;
+
StoreQueryUtils.maybeMigrateExistingPositionFile(stateStoreContext.stateDir(),
name(), this.position);
// register and possibly restore the state from the logs
stateStoreContext.register(
root,
(RecordBatchingStateRestoreCallback)
RocksDBVersionedStore.this::restoreBatch,
- () -> StoreQueryUtils.checkpointPosition(positionCheckpoint,
position)
+ segmentStores::writePosition
);
open = true;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index 598b396f3fd..d19bb09ad76 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -29,6 +29,8 @@ public interface Segment extends KeyValueStore<Bytes,
byte[]>, BatchWritingStore
void deleteRange(Bytes keyFrom, Bytes keyTo);
+ void writePosition();
+
@Override
default int compareTo(final Segment segment) {
return Long.compare(id(), segment.id());
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index 3a3e79570ca..0330ebd05de 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -49,6 +49,7 @@ import org.apache.kafka.streams.state.VersionedRecordIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
+import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
@@ -165,6 +166,14 @@ public final class StoreQueryUtils {
}
}
+ public static void maybeMigrateExistingPositionFile(final File stateDir,
final String storeName, final Position position) {
+ final File positionCheckpointFile = new File(stateDir, storeName +
".position");
+ if (positionCheckpointFile.exists()) {
+ final Position existingPosition = readPositionFromCheckpoint(new
OffsetCheckpoint(positionCheckpointFile));
+ position.merge(existingPosition);
+ }
+ }
+
public static boolean isPermitted(
final Position position,
final PositionBound positionBound,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index d3c7c2dec63..2cb1f410c7f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -1629,6 +1629,20 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
bytesStore.close();
}
+ @Test
+ public void shouldLoadPositionFromFile() {
+ final Position position = Position.fromMap(mkMap(mkEntry("topic",
mkMap(mkEntry(0, 1L)))));
+ final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new
File(stateDir, storeName + ".position"));
+ StoreQueryUtils.checkpointPosition(positionCheckpoint, position);
+
+ final AbstractDualSchemaRocksDBSegmentedBytesStore<KeyValueSegment>
bytesStore = getBytesStore();
+
+ // store.init migrates the position from the legacy checkpoint file
into the store.
+ bytesStore.init(context, bytesStore);
+ assertEquals(position, bytesStore.getPosition());
+ bytesStore.close();
+ }
+
private Set<String> segmentDirs() {
final File windowDir = new File(stateDir, storeName);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 38cb1a268fe..5206f48ad40 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -710,6 +710,21 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
assertThat(bytesStore.getPosition(), is(Position.emptyPosition()));
}
+ @ParameterizedTest
+ @MethodSource("getKeySchemas")
+ public void shouldLoadPositionFromFile(final SegmentedBytesStore.KeySchema
schema) {
+ before(schema);
+ final Position position = Position.fromMap(mkMap(mkEntry("topic",
mkMap(mkEntry(0, 1L)))));
+ final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new
File(context.stateDir(), storeName + ".position"));
+ StoreQueryUtils.checkpointPosition(positionCheckpoint, position);
+
+ final AbstractRocksDBSegmentedBytesStore<S> bytesStore =
getBytesStore();
+
+ // store.init migrates the position from the legacy checkpoint file
into the store.
+ bytesStore.init(context, bytesStore);
+ assertEquals(position, bytesStore.getPosition());
+ }
+
private List<ConsumerRecord<byte[], byte[]>> getChangelogRecords() {
final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
final Headers headers = new RecordHeaders();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
index 634e3458680..4e053da9b7c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
import org.apache.kafka.test.InternalMockProcessorContext;
@@ -72,7 +71,6 @@ public class LogicalKeyValueSegmentsTest {
SEGMENT_INTERVAL,
new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME)
);
- segments.setPosition(Position.emptyPosition());
segments.openExisting(context, 0L);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index bda152838d2..35d152bd07f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -1254,6 +1254,16 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
assertThat(rocksDBStore.getPosition(), is(Position.emptyPosition()));
}
+ @Test
+ public void shouldLoadPositionFromFile() {
+ final Position position = Position.fromMap(mkMap(mkEntry("topic",
mkMap(mkEntry(0, 1L)))));
+ final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new
File(context.stateDir(), rocksDBStore.name + ".position"));
+ StoreQueryUtils.checkpointPosition(positionCheckpoint, position);
+
+ rocksDBStore.init(context, rocksDBStore);
+ assertEquals(position, rocksDBStore.getPosition());
+ }
+
private List<ConsumerRecord<byte[], byte[]>> getChangelogRecords() {
final List<ConsumerRecord<byte[], byte[]>> entries = new ArrayList<>();
entries.add(createChangelogRecord("1".getBytes(UTF_8),
"a".getBytes(UTF_8), "", 0, 1));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
index 19fbd9f486c..f843a38455a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
@@ -28,6 +28,7 @@ import
org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.ResultOrder;
import org.apache.kafka.streams.state.VersionedRecord;
import org.apache.kafka.streams.state.VersionedRecordIterator;
@@ -39,6 +40,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -831,6 +833,22 @@ public class RocksDBVersionedStoreTest {
verifyExpiredRecordSensor(1);
}
+ @Test
+ public void shouldLoadPositionFromFile() {
+ final Position position = Position.fromMap(mkMap(mkEntry("topic",
mkMap(mkEntry(0, 1L)))));
+ final OffsetCheckpoint positionCheckpoint = new OffsetCheckpoint(new
File(context.stateDir(), store.name() + ".position"));
+ StoreQueryUtils.checkpointPosition(positionCheckpoint, position);
+
+
+
+ store.close();
+ store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE,
HISTORY_RETENTION, SEGMENT_INTERVAL);
+ // store.init migrates the position from the legacy checkpoint file
into the store.
+ store.init(context, store);
+ assertEquals(position, store.getPosition());
+ store.close();
+ }
+
private void putToStore(final String key, final String value, final long
timestamp, final long expectedValidTo) {
final long validTo = store.put(
new Bytes(STRING_SERIALIZER.serialize(null, key)),