This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8b47aa813a7 KAFKA-17411: Fix issues with new offset management (#21795)
8b47aa813a7 is described below
commit 8b47aa813a7ad80fd0f77a1a047f02c3ab194929
Author: Nick Telford <[email protected]>
AuthorDate: Wed Mar 18 18:22:51 2026 +0000
KAFKA-17411: Fix issues with new offset management (#21795)
We had several issues with offset management that were causing the
system tests to fail:
1. An NPE in `RocksDBStore`, caused by a `null` offset being passed to
`commit`.
2. Inconsistent behaviour between `RocksDBStore` and
`LegacyCheckpointingStateStore` for both `null` offsets and an empty
`Map` in `commit`.
- This was largely due to these cases not being properly defined in
the `commit` method contract; which has now been addressed.
3. No way to wipe offsets from `RocksDBStore` when the store was
corrupted. We now interpret an empty `Map` in `commit` as an instruction
to wipe all committed offsets.
- `ProcessorStateManager` now commits empty offsets to `corrupted`
stores to force them to wipe their offsets.
3. `GlobalStateManagerImpl` didn't wipe state under EOS when a store was
detected as corrupted. This has now been added, consistent with the
behaviour of `ProcessorStateManager`
4. Many `StateStore` implementations that delegate to an internal
`RocksDBStore` did not implement either `managesOffsets` or
`committedOffset`, despite implementing `commit`. This caused these
stores to be incorrectly wrapped in a `LegacyCheckpointingStateStore`,
which conflicted with the offsets being tracked in RocksDB.
5. Now that `.checkpoint` files are not used for `RocksDBStore`, some
tests no longer make sense.
6. In `GlobalKTableEOSIntegrationTest`,
`shouldSkipOverTxMarkersOnRestore` and
`shouldSkipOverAbortedMessagesOnRestore` had to be removed, as they
depended on the ability to externally preload checkpoint offsets, which
is no longer possible now they're stored in RocksDB.
7. `LegacyCheckpointingStateStore` no longer uses the `OFFSET_UNKNOWN`
sentinel value, except during migration of old `.checkpoint` files.
Reviewers: Bill Bejeck <[email protected]>
---
.../streams/integration/EosIntegrationTest.java | 12 +--
.../GlobalKTableEOSIntegrationTest.java | 101 ---------------------
.../apache/kafka/streams/processor/StateStore.java | 11 +++
.../internals/GlobalStateManagerImpl.java | 22 ++++-
.../processor/internals/ProcessorStateManager.java | 70 +++++++-------
.../internals/AbstractColumnFamilyAccessor.java | 34 +++++--
...stractDualSchemaRocksDBSegmentedBytesStore.java | 11 +++
.../AbstractRocksDBSegmentedBytesStore.java | 11 +++
.../streams/state/internals/AbstractSegments.java | 17 ++++
...ValueToTimestampedKeyValueByteStoreAdapter.java | 11 +++
.../internals/LegacyCheckpointingStateStore.java | 36 ++++----
.../state/internals/LogicalKeyValueSegments.java | 11 +++
.../internals/PlainToHeadersStoreAdapter.java | 11 +++
.../PlainToHeadersWindowStoreAdapter.java | 11 +++
.../state/internals/RocksDBVersionedStore.java | 11 +++
.../kafka/streams/state/internals/Segments.java | 5 +
.../internals/SessionToHeadersStoreAdapter.java | 11 +++
.../TimestampedToHeadersStoreAdapter.java | 11 +++
.../TimestampedToHeadersWindowStoreAdapter.java | 11 +++
.../VersionedKeyValueToBytesStoreAdapter.java | 11 +++
.../WindowToTimestampedWindowByteStoreAdapter.java | 11 +++
.../streams/state/internals/WrappedStateStore.java | 11 +++
.../LegacyCheckpointingStateStoreTest.java | 18 ++--
23 files changed, 293 insertions(+), 176 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index ca09905fdab..a6cb68f5283 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -923,18 +923,12 @@ public class EosIntegrationTest {
kafkaStreams.close();
waitForApplicationState(Collections.singletonList(kafkaStreams),
KafkaStreams.State.NOT_RUNNING, Duration.ofSeconds(60));
- final File checkpointFile = Paths.get(
+ final File taskDir = Paths.get(
streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG),
streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
- task00.toString(),
- ".checkpoint_" + stateStoreName
+ task00.toString()
).toFile();
- assertTrue(checkpointFile.exists());
- final Map<TopicPartition, Long> checkpoints = new
OffsetCheckpoint(checkpointFile).read();
- assertEquals(
- Long.valueOf(restoredOffsetsForPartition0.get()),
- new ArrayList<>(checkpoints.values()).get(0)
- );
+ assertTrue(taskDir.exists());
}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 6e8a345c7e9..cf98c0226bf 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -42,7 +41,6 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -54,19 +52,15 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
-import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("integration")
@Timeout(600)
@@ -315,101 +309,6 @@ public class GlobalKTableEOSIntegrationTest {
);
}
- @Test
- public void shouldSkipOverTxMarkersOnRestore() throws Exception {
- shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(false);
- }
-
- @Test
- public void shouldSkipOverAbortedMessagesOnRestore() throws Exception {
- shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(true);
- }
-
- private void shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(final
boolean appendAbortedMessages) throws Exception {
- // records with key 1L, 2L, and 4L are written into partition-0
- // record with key 3L is written into partition-1
- produceInitialGlobalTableValues();
- final String stateDir =
streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG);
- final File globalStateDir = new File(
- stateDir
- + File.separator
- +
streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
- + File.separator
- + "global");
- assertTrue(globalStateDir.mkdirs());
- final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new
File(globalStateDir, ".checkpoint_" + globalStore));
-
- // set the checkpointed offset to the commit marker of partition-1
- // even if `poll()` won't return any data for partition-1, we should
still finish the restore
- checkpoint.write(Collections.singletonMap(new
TopicPartition(globalTableTopic, 1), 1L));
-
- if (appendAbortedMessages) {
- final AtomicReference<Exception> error = new AtomicReference<>();
- startStreams(new StateRestoreListener() {
- @Override
- public void onRestoreStart(final TopicPartition topicPartition,
- final String storeName,
- final long startingOffset,
- final long endingOffset) {
- // we need to write aborted messages only after we init
the `highWatermark`
- // to move the `endOffset` beyond the `highWatermark
- //
- // we cannot write committed messages because we want to
test the case that
- // poll() returns no records
- //
- // cf. GlobalStateManagerImpl#restoreState()
- try {
- produceAbortedMessages();
- } catch (final Exception fatal) {
- error.set(fatal);
- }
- }
-
- @Override
- public void onBatchRestored(final TopicPartition
topicPartition,
- final String storeName,
- final long batchEndOffset,
- final long numRestored) { }
-
- @Override
- public void onRestoreEnd(final TopicPartition topicPartition,
- final String storeName,
- final long totalRestored) { }
- });
- final Exception fatal = error.get();
- if (fatal != null) {
- throw fatal;
- }
- } else {
- startStreams();
- }
-
- final Map<Long, String> expected = new HashMap<>();
- expected.put(1L, "A");
- expected.put(2L, "B");
- // skip record <3L, "C"> because we won't read it (cf checkpoint file
above)
- expected.put(4L, "D");
-
- final ReadOnlyKeyValueStore<Long, String> store = IntegrationTestUtils
- .getStore(globalStore, kafkaStreams,
QueryableStoreTypes.keyValueStore());
- assertNotNull(store);
-
- final Map<Long, String> storeContent = new HashMap<>();
- TestUtils.waitForCondition(
- () -> {
- storeContent.clear();
- try (final KeyValueIterator<Long, String> it = store.all()) {
- it.forEachRemaining(kv -> storeContent.put(kv.key,
kv.value));
- }
- return storeContent.equals(expected);
- },
- 30_000L,
- () -> "waiting for initial values" +
- "\n expected: " + expected +
- "\n received: " + storeContent
- );
- }
-
@Test
public void shouldNotRestoreAbortedMessages() throws Exception {
produceAbortedMessages();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index ada7955b302..6f91222f64b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -103,8 +103,19 @@ public interface StateStore {
* <p>
* Implementations <em>SHOULD</em> ensure that {@code changelogOffsets}
are committed to disk atomically with the
* records they represent, if possible.
+ * <p>
+ * <b>Empty map:</b> If {@code changelogOffsets} is empty, implementations
that manage offsets <em>MUST</em>
+ * remove all previously committed offsets. After an empty commit, {@link
#committedOffset(TopicPartition)} should
+ * return {@code null} for all partitions. This is used during corruption
recovery to clear stale offsets so that
+ * restoration can restart from the beginning.
+ * <p>
+ * <b>Null values:</b> If a value in {@code changelogOffsets} is {@code
null}, implementations that manage offsets
+ * <em>MUST</em> remove the committed offset for that partition. After
such a commit,
+ * {@link #committedOffset(TopicPartition)} should return {@code null} for
the affected partition.
*
* @param changelogOffsets The changelog offset(s) corresponding to the
most recently written records.
+ * An empty map signals that all committed offsets
should be cleared.
+ * A {@code null} value for a partition signals
that its committed offset should be removed.
*/
default void commit(final Map<TopicPartition, Long> changelogOffsets) {
flush();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 5aad0499a22..5b08d9ae53e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -28,6 +28,7 @@ import
org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.FixedOrderMap;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
@@ -51,6 +52,7 @@ import
org.apache.kafka.streams.state.internals.RecordConverter;
import org.slf4j.Logger;
import java.io.File;
+import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
@@ -179,7 +181,20 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
final List<TopicPartition> storePartitions =
topicPartitionsForStore(stateStore);
final StateStore maybeWrappedStore =
LegacyCheckpointingStateStore.maybeWrapStore(
stateStore, eosEnabled, new HashSet<>(storePartitions),
stateDirectory, null, logPrefix);
- maybeWrappedStore.init(globalProcessorContext, maybeWrappedStore);
+ try {
+ maybeWrappedStore.init(globalProcessorContext,
maybeWrappedStore);
+ } catch (final ProcessorStateException e) {
+ if (eosEnabled) {
+ log.warn("{}Detected unclean shutdown for global store {}.
" +
+ "Wiping global state directory.", logPrefix,
stateStore.name(), e);
+ try {
+
Utils.delete(stateDirectory.globalStateDir().getAbsoluteFile());
+ } catch (final IOException ioe) {
+ e.addSuppressed(ioe);
+ }
+ }
+ throw e;
+ }
for (final TopicPartition storePartition : storePartitions) {
wrappedStores.put(storePartition, maybeWrappedStore);
@@ -566,7 +581,10 @@ public class GlobalStateManagerImpl implements
GlobalStateManager {
// only add offsets for persistent stores
if (store.persistent()) {
for (final TopicPartition storePartition :
storePartitions) {
- storeOffsets.put(storePartition,
currentOffsets.get(storePartition));
+ final Long offset =
currentOffsets.get(storePartition);
+ if (offset != null) {
+ storeOffsets.put(storePartition, offset);
+ }
}
}
store.commit(storeOffsets);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index ff6786d9ecc..645a56a5463 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -487,45 +487,43 @@ public class ProcessorStateManager implements
StateManager {
if (!stores.isEmpty()) {
log.debug("Committing all stores registered in the state manager:
{}", stores);
for (final StateStoreMetadata metadata : stores.values()) {
- if (!metadata.corrupted) {
- final StateStore store = metadata.stateStore;
- log.trace("Committing store {}", store.name());
- try {
- if (metadata.changelogPartition == null ||
metadata.offset == null || !store.persistent()) {
- store.commit(Map.of());
- } else {
- store.commit(Map.of(metadata.changelogPartition,
metadata.offset));
- }
+ final StateStore store = metadata.stateStore;
+ log.trace("Committing store {}", store.name());
+ try {
+ if (metadata.changelogPartition == null || metadata.offset
== null || metadata.corrupted || !store.persistent()) {
+ store.commit(Map.of());
+ } else {
+ store.commit(Map.of(metadata.changelogPartition,
metadata.offset));
+ }
- if (metadata.commitCallback != null) {
- try {
- metadata.commitCallback.onCommit();
- } catch (final IOException e) {
- throw new ProcessorStateException(
- format("%sException caught while
trying to checkpoint store, " +
- "changelog partition %s",
logPrefix, metadata.changelogPartition),
- e
- );
- }
- }
- } catch (final RuntimeException exception) {
- if (firstException == null) {
- // do NOT wrap the error if it is actually caused
by Streams itself
- // In case of FailedProcessingException Do not
keep the failed processing exception in the stack trace
- if (exception instanceof FailedProcessingException)
- firstException = new ProcessorStateException(
- format("%sFailed to commit state store
%s", logPrefix, store.name()),
- exception.getCause());
- else if (exception instanceof StreamsException)
- firstException = exception;
- else
- firstException = new ProcessorStateException(
- format("%sFailed to commit state store
%s", logPrefix, store.name()), exception);
- log.error("Failed to commit state store {}: ",
store.name(), firstException);
- } else {
- log.error("Failed to commit state store {}: ",
store.name(), exception);
+ if (!metadata.corrupted && metadata.commitCallback !=
null) {
+ try {
+ metadata.commitCallback.onCommit();
+ } catch (final IOException e) {
+ throw new ProcessorStateException(
+ format("%sException caught while trying to
checkpoint store, " +
+ "changelog partition %s",
logPrefix, metadata.changelogPartition),
+ e
+ );
}
}
+ } catch (final RuntimeException exception) {
+ if (firstException == null) {
+ // do NOT wrap the error if it is actually caused by
Streams itself
+ // In case of FailedProcessingException Do not keep
the failed processing exception in the stack trace
+ if (exception instanceof FailedProcessingException)
+ firstException = new ProcessorStateException(
+ format("%sFailed to commit state store
%s", logPrefix, store.name()),
+ exception.getCause());
+ else if (exception instanceof StreamsException)
+ firstException = exception;
+ else
+ firstException = new ProcessorStateException(
+ format("%sFailed to commit state store
%s", logPrefix, store.name()), exception);
+ log.error("Failed to commit state store {}: ",
store.name(), firstException);
+ } else {
+ log.error("Failed to commit state store {}: ",
store.name(), exception);
+ }
}
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
index 0c2e157956f..0288872f567 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.query.Position;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -53,12 +54,20 @@ abstract class AbstractColumnFamilyAccessor implements
RocksDBStore.ColumnFamily
@Override
public final void commit(final RocksDBStore.DBAccessor accessor, final
Map<TopicPartition, Long> changelogOffsets) throws RocksDBException {
- for (final Map.Entry<TopicPartition, Long> entry :
changelogOffsets.entrySet()) {
- final TopicPartition tp = entry.getKey();
- final Long offset = entry.getValue();
- final byte[] key = stringSerializer.serialize(null, tp.toString());
- final byte[] value = longSerde.serializer().serialize(null,
offset);
- accessor.put(offsetColumnFamilyHandle, key, value);
+ if (changelogOffsets.isEmpty()) {
+ wipeOffsets(accessor);
+ } else {
+ for (final Map.Entry<TopicPartition, Long> entry :
changelogOffsets.entrySet()) {
+ final TopicPartition tp = entry.getKey();
+ final Long offset = entry.getValue();
+ final byte[] key = stringSerializer.serialize(null,
tp.toString());
+ if (offset != null) {
+ final byte[] value =
longSerde.serializer().serialize(null, offset);
+ accessor.put(offsetColumnFamilyHandle, key, value);
+ } else {
+ accessor.delete(offsetColumnFamilyHandle, key);
+ }
+ }
}
// We need to remove this flush call when implementing KAFKA-19712
this.flush(accessor, offsetColumnFamilyHandle);
@@ -112,4 +121,17 @@ abstract class AbstractColumnFamilyAccessor implements
RocksDBStore.ColumnFamily
* @throws RocksDBException if an error occurs during the commit operation
*/
protected abstract void flush(final RocksDBStore.DBAccessor accessor,
final ColumnFamilyHandle offsetColumnFamilyHandle) throws RocksDBException;
+
+ private void wipeOffsets(final RocksDBStore.DBAccessor accessor) throws
RocksDBException {
+ try (final RocksIterator iter =
accessor.newIterator(offsetColumnFamilyHandle)) {
+ iter.seekToFirst();
+ while (iter.isValid()) {
+ final byte[] key = iter.key();
+ if (!Arrays.equals(key, statusKey)) {
+ accessor.delete(offsetColumnFamilyHandle, key);
+ }
+ iter.next();
+ }
+ }
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
index 4b1ff1e0f1d..f353e56f7df 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
@@ -277,6 +277,17 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
segments.commit(changelogOffsets);
}
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return segments.managesOffsets();
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition partition) {
+ return segments.committedOffset(partition);
+ }
+
@Override
public void close() {
open = false;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index 9a4d2458848..c6b57411067 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -318,6 +318,17 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
segments.commit(changelogOffsets);
}
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return segments.managesOffsets();
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition partition) {
+ return segments.committedOffset(partition);
+ }
+
@Override
public void close() {
open = false;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
index c5834c49231..ae292a44f8b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
@@ -184,6 +184,23 @@ abstract class AbstractSegments<S extends Segment>
implements Segments<S> {
}
}
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return true;
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition partition) {
+ for (final S segment : segments.values()) {
+ final Long offset = segment.committedOffset(partition);
+ if (offset != null) {
+ return offset;
+ }
+ }
+ return null;
+ }
+
@Override
public void close() {
for (final S segment : segments.values()) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
index f62c703d91d..3612776fbeb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
@@ -106,6 +106,17 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter
implements KeyValueSt
store.commit(changelogOffsets);
}
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return store.managesOffsets();
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition partition) {
+ return store.committedOffset(partition);
+ }
+
@Override
public void close() {
store.close();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
index 311285a2fba..bc059f03ffd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
@@ -117,7 +117,12 @@ public class LegacyCheckpointingStateStore<S extends
StateStore, K, V> extends W
for (final Map.Entry<TopicPartition, Long> entry :
legacyCheckpoint.read().entrySet()) {
final StateStore store = stores.get(entry.getKey());
if (store != null) {
- storesToMigrate.computeIfAbsent(store, k -> new
HashMap<>()).put(entry.getKey(), entry.getValue());
+ final Long offset =
changelogOffsetFromCheckpointedOffset(entry.getValue());
+ if (offset != null) {
+ storesToMigrate
+ .computeIfAbsent(store, k -> new
HashMap<>())
+ .put(entry.getKey(), offset);
+ }
}
}
@@ -169,7 +174,7 @@ public class LegacyCheckpointingStateStore<S extends
StateStore, K, V> extends W
final Map<TopicPartition, Long> allOffsets = checkpointFile.read();
for (final Map.Entry<TopicPartition, Long> entry :
allOffsets.entrySet()) {
if (changelogPartitions.contains(entry.getKey())) {
- offsets.put(entry.getKey(),
changelogOffsetFromCheckpointedOffset(entry.getValue()));
+ offsets.put(entry.getKey(), entry.getValue());
}
}
checkpointedOffsets = new HashMap<>(offsets);
@@ -206,7 +211,17 @@ public class LegacyCheckpointingStateStore<S extends
StateStore, K, V> extends W
super.commit(changelogOffsets);
// update in-memory offsets
- offsets.putAll(changelogOffsets);
+ if (changelogOffsets.isEmpty()) {
+ offsets.clear();
+ } else {
+ for (final Map.Entry<TopicPartition, Long> entry :
changelogOffsets.entrySet()) {
+ if (entry.getValue() != null) {
+ offsets.put(entry.getKey(), entry.getValue());
+ } else {
+ offsets.remove(entry.getKey());
+ }
+ }
+ }
// only write the checkpoint file if both:
// 1. in ALOS mode (under EOS, the checkpoint file is only written
when closing the store)
@@ -236,14 +251,8 @@ public class LegacyCheckpointingStateStore<S extends
StateStore, K, V> extends W
// only checkpoint persistent and logged stores
if (persistent() && !changelogPartitions.isEmpty()) {
try {
- // merge new checkpoint offsets into checkpoint file
- final Map<TopicPartition, Long> checkpointingOffsets = new
HashMap<>(offsets.size());
- for (final Map.Entry<TopicPartition, Long> entry :
offsets.entrySet()) {
- checkpointingOffsets.put(entry.getKey(),
checkpointableOffsetFromChangelogOffset(entry.getValue()));
- }
-
- log.debug("Writing checkpoint: {} for task {}",
checkpointingOffsets, taskId);
- checkpointFile.write(checkpointingOffsets);
+ log.debug("Writing checkpoint: {} for task {}", offsets,
taskId);
+ checkpointFile.write(offsets);
checkpointedOffsets = new HashMap<>(offsets);
} catch (final IOException e) {
log.warn("{}Failed to write offset checkpoint file to [{}]." +
@@ -295,11 +304,6 @@ public class LegacyCheckpointingStateStore<S extends
StateStore, K, V> extends W
return totalOffsetDelta > OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;
}
- // Pass in a sentinel value to checkpoint when the changelog offset is not
yet initialized/known
- private static long checkpointableOffsetFromChangelogOffset(final Long
offset) {
- return offset != null ? offset : OFFSET_UNKNOWN;
- }
-
// Convert the written offsets in the checkpoint file back to the
changelog offset
private static Long changelogOffsetFromCheckpointedOffset(final long
offset) {
return offset != OFFSET_UNKNOWN ? offset : null;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
index 34e1ec42d77..fbf7722629c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
@@ -107,6 +107,17 @@ public class LogicalKeyValueSegments extends
AbstractSegments<LogicalKeyValueSeg
physicalStore.commit(changelogOffsets);
}
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return physicalStore.managesOffsets();
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition partition) {
+ return physicalStore.committedOffset(partition);
+ }
+
@Override
public void close() {
// close the logical segments first to close any open iterators
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapter.java
index 4d762b82392..17415dbe207 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapter.java
@@ -110,6 +110,17 @@ public class PlainToHeadersStoreAdapter implements
KeyValueStore<Bytes, byte[]>
store.commit(changelogOffsets);
}
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return store.managesOffsets();
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition partition) {
+ return store.committedOffset(partition);
+ }
+
@Override
public void close() {
store.close();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapter.java
index 88be98af91b..a463dbc97f2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapter.java
@@ -221,6 +221,17 @@ public class PlainToHeadersWindowStoreAdapter implements
WindowStore<Bytes, byte
store.commit(changelogOffsets);
}
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return store.managesOffsets();
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition partition) {
+ return store.committedOffset(partition);
+ }
+
@Override
public void close() {
store.close();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
index 3ccf8f6b663..e9c705972ee 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
@@ -309,6 +309,17 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
// same physical RocksDB instance
}
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return segmentStores.managesOffsets();
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition partition) {
+ return segmentStores.committedOffset(partition);
+ }
+
@Override
public void close() {
open = false;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index e9c7d144d84..9c902da34a1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -42,5 +42,10 @@ interface Segments<S extends Segment> {
void commit(final Map<TopicPartition, Long> changelogOffsets);
+ @SuppressWarnings("deprecation")
+ boolean managesOffsets();
+
+ Long committedOffset(final TopicPartition partition);
+
void close();
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java
index c6d475c2f33..4771a86a5f9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java
@@ -156,6 +156,17 @@ public class SessionToHeadersStoreAdapter implements
SessionStore<Bytes, byte[]>
store.commit(changelogOffsets);
}
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return store.managesOffsets();
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition partition) {
+ return store.committedOffset(partition);
+ }
+
@Override
public void close() {
store.close();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
index 7ba3b9bd8db..facef5f0a06 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
@@ -112,6 +112,17 @@ public class TimestampedToHeadersStoreAdapter implements
KeyValueStore<Bytes, by
store.commit(changelogOffsets);
}
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return store.managesOffsets();
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition partition) {
+ return store.committedOffset(partition);
+ }
+
@Override
public void close() {
store.close();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
index 97f10285849..738787b9c21 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
@@ -193,6 +193,17 @@ public class TimestampedToHeadersWindowStoreAdapter
implements WindowStore<Bytes
store.commit(changelogOffsets);
}
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return store.managesOffsets();
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition partition) {
+ return store.committedOffset(partition);
+ }
+
@Override
public void close() {
store.close();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java
index 1d16b2a86b7..c02cec5747a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java
@@ -99,6 +99,17 @@ public class VersionedKeyValueToBytesStoreAdapter implements
VersionedBytesStore
inner.commit(changelogOffsets);
}
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return inner.managesOffsets();
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition partition) {
+ return inner.committedOffset(partition);
+ }
+
@Override
public void close() {
inner.close();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
index 1261f17a697..5a7f05c1b2b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
@@ -167,6 +167,17 @@ public class WindowToTimestampedWindowByteStoreAdapter
implements WindowStore<By
store.commit(changelogOffsets);
}
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return store.managesOffsets();
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition partition) {
+ return store.committedOffset(partition);
+ }
+
@Override
public void close() {
store.close();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index d1ce967bcc8..e6e6c92e559 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -128,6 +128,17 @@ public abstract class WrappedStateStore<S extends
StateStore, K, V> implements S
wrapped.commit(changelogOffsets);
}
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return wrapped.managesOffsets();
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition partition) {
+ return wrapped.committedOffset(partition);
+ }
+
@Override
public void close() {
wrapped.close();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
index 493e437e2c7..9b80801b706 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
@@ -49,7 +49,6 @@ import java.util.Set;
import static
org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME;
import static
org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore.OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;
-import static
org.apache.kafka.streams.state.internals.OffsetCheckpoint.OFFSET_UNKNOWN;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.instanceOf;
@@ -307,8 +306,9 @@ public class LegacyCheckpointingStateStoreTest {
}
@Test
- public void shouldMapOffsetUnknownToNullOnInit() throws IOException {
- writeCheckpointFile(Collections.singletonMap(partition,
OFFSET_UNKNOWN));
+ public void shouldReturnNullForPartitionNotInCheckpointOnInit() throws
IOException {
+ // write a checkpoint with no entry for our partition
+ writeCheckpointFile(Collections.emptyMap());
final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
store.init(context, persistentStore);
@@ -464,15 +464,21 @@ public class LegacyCheckpointingStateStoreTest {
}
@Test
- public void shouldWriteOffsetUnknownSentinelWhenOffsetIsNull() throws
IOException {
+ public void shouldRemoveOffsetWhenCommittedWithNull() throws IOException {
final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object>
store = createStore(false);
+ // first commit a real offset
+ store.commit(Collections.singletonMap(partition, 42L));
+ store.checkpoint();
+ assertEquals(42L, (long) readCheckpointFile().get(partition));
+
+ // now commit null for the same partition — should remove it
final Map<TopicPartition, Long> nullOffset = new HashMap<>();
nullOffset.put(partition, null);
store.commit(nullOffset);
store.checkpoint();
- final Map<TopicPartition, Long> checkpointed = readCheckpointFile();
- assertEquals(OFFSET_UNKNOWN, (long) checkpointed.get(partition));
+ assertFalse(readCheckpointFile().containsKey(partition));
+ assertNull(store.committedOffset(partition));
}
@Test