This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8b47aa813a7 KAFKA-17411: Fix issues with new offset management (#21795)
8b47aa813a7 is described below

commit 8b47aa813a7ad80fd0f77a1a047f02c3ab194929
Author: Nick Telford <[email protected]>
AuthorDate: Wed Mar 18 18:22:51 2026 +0000

    KAFKA-17411: Fix issues with new offset management (#21795)
    
    We had several issues with offset management that were causing the
    system tests to fail:
    
    1. An NPE in `RocksDBStore`, caused by a `null` offset being passed to
    `commit`.
    
    2. Inconsistent behaviour between `RocksDBStore` and
    `LegacyCheckpointingStateStore` for both `null` offsets and an empty
    `Map` in `commit`.
    
       - This was largely due to these cases not being properly defined in
    the `commit` method contract; which has now been addressed.
    
    3. No way to wipe offsets from `RocksDBStore` when the store was
    corrupted. We now interpret an empty `Map` in `commit` as an instruction
    to wipe all committed offsets.
    
       - `ProcessorStateManager` now commits empty offsets to `corrupted`
    stores to force them to wipe their offsets.
    
    3. `GlobalStateManagerImpl` didn't wipe state under EOS when a store was
    detected as corrupted. This has now been added, consistent with the
    behaviour of `ProcessorStateManager`
    
    4. Many `StateStore` implementations that delegate to an internal
    `RocksDBStore` did not implement either `managesOffsets` or
    `committedOffset`, despite implementing `commit`. This caused these
    stores to be incorrectly wrapped in a `LegacyCheckpointingStateStore`,
    which conflicted with the offsets being tracked in RocksDB.
    
    5. Now that `.checkpoint` files are not used for `RocksDBStore`, some
    tests no longer make sense.
    
    6. In `GlobalKTableEOSIntegrationTest`,
    `shouldSkipOverTxMarkersOnRestore` and
    `shouldSkipOverAbortedMessagesOnRestore` had to be removed, as they
    depended on the ability to externally preload checkpoint offsets, which
    is no longer possible now they're stored in RocksDB.
    
    7. `LegacyCheckpointingStateStore` no longer uses the `OFFSET_UNKNOWN`
    sentinel value, except during migration of old `.checkpoint` files.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../streams/integration/EosIntegrationTest.java    |  12 +--
 .../GlobalKTableEOSIntegrationTest.java            | 101 ---------------------
 .../apache/kafka/streams/processor/StateStore.java |  11 +++
 .../internals/GlobalStateManagerImpl.java          |  22 ++++-
 .../processor/internals/ProcessorStateManager.java |  70 +++++++-------
 .../internals/AbstractColumnFamilyAccessor.java    |  34 +++++--
 ...stractDualSchemaRocksDBSegmentedBytesStore.java |  11 +++
 .../AbstractRocksDBSegmentedBytesStore.java        |  11 +++
 .../streams/state/internals/AbstractSegments.java  |  17 ++++
 ...ValueToTimestampedKeyValueByteStoreAdapter.java |  11 +++
 .../internals/LegacyCheckpointingStateStore.java   |  36 ++++----
 .../state/internals/LogicalKeyValueSegments.java   |  11 +++
 .../internals/PlainToHeadersStoreAdapter.java      |  11 +++
 .../PlainToHeadersWindowStoreAdapter.java          |  11 +++
 .../state/internals/RocksDBVersionedStore.java     |  11 +++
 .../kafka/streams/state/internals/Segments.java    |   5 +
 .../internals/SessionToHeadersStoreAdapter.java    |  11 +++
 .../TimestampedToHeadersStoreAdapter.java          |  11 +++
 .../TimestampedToHeadersWindowStoreAdapter.java    |  11 +++
 .../VersionedKeyValueToBytesStoreAdapter.java      |  11 +++
 .../WindowToTimestampedWindowByteStoreAdapter.java |  11 +++
 .../streams/state/internals/WrappedStateStore.java |  11 +++
 .../LegacyCheckpointingStateStoreTest.java         |  18 ++--
 23 files changed, 293 insertions(+), 176 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index ca09905fdab..a6cb68f5283 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -923,18 +923,12 @@ public class EosIntegrationTest {
         kafkaStreams.close();
         waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.NOT_RUNNING, Duration.ofSeconds(60));
 
-        final File checkpointFile = Paths.get(
+        final File taskDir = Paths.get(
             streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG),
             
streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
-            task00.toString(),
-            ".checkpoint_" + stateStoreName
+            task00.toString()
         ).toFile();
-        assertTrue(checkpointFile.exists());
-        final Map<TopicPartition, Long> checkpoints = new 
OffsetCheckpoint(checkpointFile).read();
-        assertEquals(
-            Long.valueOf(restoredOffsetsForPartition0.get()),
-            new ArrayList<>(checkpoints.values()).get(0)
-        );
+        assertTrue(taskDir.exists());
     }
 
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 6e8a345c7e9..cf98c0226bf 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -42,7 +41,6 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -54,19 +52,15 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
 
-import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Tag("integration")
 @Timeout(600)
@@ -315,101 +309,6 @@ public class GlobalKTableEOSIntegrationTest {
         );
     }
 
-    @Test
-    public void shouldSkipOverTxMarkersOnRestore() throws Exception {
-        shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(false);
-    }
-
-    @Test
-    public void shouldSkipOverAbortedMessagesOnRestore() throws Exception {
-        shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(true);
-    }
-
-    private void shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(final 
boolean appendAbortedMessages) throws Exception {
-        // records with key 1L, 2L, and 4L are written into partition-0
-        // record with key 3L is written into partition-1
-        produceInitialGlobalTableValues();
-        final String stateDir = 
streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG);
-        final File globalStateDir = new File(
-            stateDir
-                + File.separator
-                + 
streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
-                + File.separator
-                + "global");
-        assertTrue(globalStateDir.mkdirs());
-        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new 
File(globalStateDir, ".checkpoint_" + globalStore));
-
-        // set the checkpointed offset to the commit marker of partition-1
-        // even if `poll()` won't return any data for partition-1, we should 
still finish the restore
-        checkpoint.write(Collections.singletonMap(new 
TopicPartition(globalTableTopic, 1), 1L));
-
-        if (appendAbortedMessages) {
-            final AtomicReference<Exception> error = new AtomicReference<>();
-            startStreams(new StateRestoreListener() {
-                @Override
-                public void onRestoreStart(final TopicPartition topicPartition,
-                                           final String storeName,
-                                           final long startingOffset,
-                                           final long endingOffset) {
-                    // we need to write aborted messages only after we init 
the `highWatermark`
-                    // to move the `endOffset` beyond the `highWatermark
-                    //
-                    // we cannot write committed messages because we want to 
test the case that
-                    // poll() returns no records
-                    //
-                    // cf. GlobalStateManagerImpl#restoreState()
-                    try {
-                        produceAbortedMessages();
-                    } catch (final Exception fatal) {
-                        error.set(fatal);
-                    }
-                }
-
-                @Override
-                public void onBatchRestored(final TopicPartition 
topicPartition,
-                                            final String storeName,
-                                            final long batchEndOffset,
-                                            final long numRestored) { }
-
-                @Override
-                public void onRestoreEnd(final TopicPartition topicPartition,
-                                         final String storeName,
-                                         final long totalRestored) { }
-            });
-            final Exception fatal = error.get();
-            if (fatal != null) {
-                throw fatal;
-            }
-        } else {
-            startStreams();
-        }
-
-        final Map<Long, String> expected = new HashMap<>();
-        expected.put(1L, "A");
-        expected.put(2L, "B");
-        // skip record <3L, "C"> because we won't read it (cf checkpoint file 
above)
-        expected.put(4L, "D");
-
-        final ReadOnlyKeyValueStore<Long, String> store = IntegrationTestUtils
-            .getStore(globalStore, kafkaStreams, 
QueryableStoreTypes.keyValueStore());
-        assertNotNull(store);
-
-        final Map<Long, String> storeContent = new HashMap<>();
-        TestUtils.waitForCondition(
-            () -> {
-                storeContent.clear();
-                try (final KeyValueIterator<Long, String> it = store.all()) {
-                    it.forEachRemaining(kv -> storeContent.put(kv.key, 
kv.value));
-                }
-                return storeContent.equals(expected);
-            },
-            30_000L,
-            () -> "waiting for initial values" +
-                "\n  expected: " + expected +
-                "\n  received: " + storeContent
-        );
-    }
-
     @Test
     public void shouldNotRestoreAbortedMessages() throws Exception {
         produceAbortedMessages();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index ada7955b302..6f91222f64b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -103,8 +103,19 @@ public interface StateStore {
      * <p>
      * Implementations <em>SHOULD</em> ensure that {@code changelogOffsets} 
are committed to disk atomically with the
      * records they represent, if possible.
+     * <p>
+     * <b>Empty map:</b> If {@code changelogOffsets} is empty, implementations 
that manage offsets <em>MUST</em>
+     * remove all previously committed offsets. After an empty commit, {@link 
#committedOffset(TopicPartition)} should
+     * return {@code null} for all partitions. This is used during corruption 
recovery to clear stale offsets so that
+     * restoration can restart from the beginning.
+     * <p>
+     * <b>Null values:</b> If a value in {@code changelogOffsets} is {@code 
null}, implementations that manage offsets
+     * <em>MUST</em> remove the committed offset for that partition. After 
such a commit,
+     * {@link #committedOffset(TopicPartition)} should return {@code null} for 
the affected partition.
      *
      * @param changelogOffsets The changelog offset(s) corresponding to the 
most recently written records.
+     *                         An empty map signals that all committed offsets 
should be cleared.
+     *                         A {@code null} value for a partition signals 
that its committed offset should be removed.
      */
     default void commit(final Map<TopicPartition, Long> changelogOffsets) {
         flush();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 5aad0499a22..5b08d9ae53e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -28,6 +28,7 @@ import 
org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.FixedOrderMap;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
 import org.apache.kafka.streams.errors.ErrorHandlerContext;
@@ -51,6 +52,7 @@ import 
org.apache.kafka.streams.state.internals.RecordConverter;
 import org.slf4j.Logger;
 
 import java.io.File;
+import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -179,7 +181,20 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
             final List<TopicPartition> storePartitions = 
topicPartitionsForStore(stateStore);
             final StateStore maybeWrappedStore = 
LegacyCheckpointingStateStore.maybeWrapStore(
                     stateStore, eosEnabled, new HashSet<>(storePartitions), 
stateDirectory, null, logPrefix);
-            maybeWrappedStore.init(globalProcessorContext, maybeWrappedStore);
+            try {
+                maybeWrappedStore.init(globalProcessorContext, 
maybeWrappedStore);
+            } catch (final ProcessorStateException e) {
+                if (eosEnabled) {
+                    log.warn("{}Detected unclean shutdown for global store {}. 
" +
+                            "Wiping global state directory.", logPrefix, 
stateStore.name(), e);
+                    try {
+                        
Utils.delete(stateDirectory.globalStateDir().getAbsoluteFile());
+                    } catch (final IOException ioe) {
+                        e.addSuppressed(ioe);
+                    }
+                }
+                throw e;
+            }
 
             for (final TopicPartition storePartition : storePartitions) {
                 wrappedStores.put(storePartition, maybeWrappedStore);
@@ -566,7 +581,10 @@ public class GlobalStateManagerImpl implements 
GlobalStateManager {
                     // only add offsets for persistent stores
                     if (store.persistent()) {
                         for (final TopicPartition storePartition : 
storePartitions) {
-                            storeOffsets.put(storePartition, 
currentOffsets.get(storePartition));
+                            final Long offset = 
currentOffsets.get(storePartition);
+                            if (offset != null) {
+                                storeOffsets.put(storePartition, offset);
+                            }
                         }
                     }
                     store.commit(storeOffsets);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index ff6786d9ecc..645a56a5463 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -487,45 +487,43 @@ public class ProcessorStateManager implements 
StateManager {
         if (!stores.isEmpty()) {
             log.debug("Committing all stores registered in the state manager: 
{}", stores);
             for (final StateStoreMetadata metadata : stores.values()) {
-                if (!metadata.corrupted) {
-                    final StateStore store = metadata.stateStore;
-                    log.trace("Committing store {}", store.name());
-                    try {
-                        if (metadata.changelogPartition == null || 
metadata.offset == null || !store.persistent()) {
-                            store.commit(Map.of());
-                        } else {
-                            store.commit(Map.of(metadata.changelogPartition, 
metadata.offset));
-                        }
+                final StateStore store = metadata.stateStore;
+                log.trace("Committing store {}", store.name());
+                try {
+                    if (metadata.changelogPartition == null || metadata.offset 
== null || metadata.corrupted || !store.persistent()) {
+                        store.commit(Map.of());
+                    } else {
+                        store.commit(Map.of(metadata.changelogPartition, 
metadata.offset));
+                    }
 
-                        if (metadata.commitCallback != null) {
-                            try {
-                                metadata.commitCallback.onCommit();
-                            } catch (final IOException e) {
-                                throw new ProcessorStateException(
-                                        format("%sException caught while 
trying to checkpoint store, " +
-                                                "changelog partition %s", 
logPrefix, metadata.changelogPartition),
-                                        e
-                                );
-                            }
-                        }
-                    } catch (final RuntimeException exception) {
-                        if (firstException == null) {
-                            // do NOT wrap the error if it is actually caused 
by Streams itself
-                            // In case of FailedProcessingException Do not 
keep the failed processing exception in the stack trace
-                            if (exception instanceof FailedProcessingException)
-                                firstException = new ProcessorStateException(
-                                        format("%sFailed to commit state store 
%s", logPrefix, store.name()),
-                                        exception.getCause());
-                            else if (exception instanceof StreamsException)
-                                firstException = exception;
-                            else
-                                firstException = new ProcessorStateException(
-                                        format("%sFailed to commit state store 
%s", logPrefix, store.name()), exception);
-                            log.error("Failed to commit state store {}: ", 
store.name(), firstException);
-                        } else {
-                            log.error("Failed to commit state store {}: ", 
store.name(), exception);
+                    if (!metadata.corrupted && metadata.commitCallback != 
null) {
+                        try {
+                            metadata.commitCallback.onCommit();
+                        } catch (final IOException e) {
+                            throw new ProcessorStateException(
+                                    format("%sException caught while trying to 
checkpoint store, " +
+                                            "changelog partition %s", 
logPrefix, metadata.changelogPartition),
+                                    e
+                            );
                         }
                     }
+                } catch (final RuntimeException exception) {
+                    if (firstException == null) {
+                        // do NOT wrap the error if it is actually caused by 
Streams itself
+                        // In case of FailedProcessingException Do not keep 
the failed processing exception in the stack trace
+                        if (exception instanceof FailedProcessingException)
+                            firstException = new ProcessorStateException(
+                                    format("%sFailed to commit state store 
%s", logPrefix, store.name()),
+                                    exception.getCause());
+                        else if (exception instanceof StreamsException)
+                            firstException = exception;
+                        else
+                            firstException = new ProcessorStateException(
+                                    format("%sFailed to commit state store 
%s", logPrefix, store.name()), exception);
+                        log.error("Failed to commit state store {}: ", 
store.name(), firstException);
+                    } else {
+                        log.error("Failed to commit state store {}: ", 
store.name(), exception);
+                    }
                 }
             }
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
index 0c2e157956f..0288872f567 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.query.Position;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -53,12 +54,20 @@ abstract class AbstractColumnFamilyAccessor implements 
RocksDBStore.ColumnFamily
 
     @Override
     public final void commit(final RocksDBStore.DBAccessor accessor, final 
Map<TopicPartition, Long> changelogOffsets) throws RocksDBException {
-        for (final Map.Entry<TopicPartition, Long> entry : 
changelogOffsets.entrySet()) {
-            final TopicPartition tp = entry.getKey();
-            final Long offset = entry.getValue();
-            final byte[] key = stringSerializer.serialize(null, tp.toString());
-            final byte[] value = longSerde.serializer().serialize(null, 
offset);
-            accessor.put(offsetColumnFamilyHandle, key, value);
+        if (changelogOffsets.isEmpty()) {
+            wipeOffsets(accessor);
+        } else {
+            for (final Map.Entry<TopicPartition, Long> entry : 
changelogOffsets.entrySet()) {
+                final TopicPartition tp = entry.getKey();
+                final Long offset = entry.getValue();
+                final byte[] key = stringSerializer.serialize(null, 
tp.toString());
+                if (offset != null) {
+                    final byte[] value = 
longSerde.serializer().serialize(null, offset);
+                    accessor.put(offsetColumnFamilyHandle, key, value);
+                } else {
+                    accessor.delete(offsetColumnFamilyHandle, key);
+                }
+            }
         }
         // We need to remove this flush call when implementing KAFKA-19712
         this.flush(accessor, offsetColumnFamilyHandle);
@@ -112,4 +121,17 @@ abstract class AbstractColumnFamilyAccessor implements 
RocksDBStore.ColumnFamily
      * @throws RocksDBException if an error occurs during the commit operation
      */
     protected abstract void flush(final RocksDBStore.DBAccessor accessor, 
final ColumnFamilyHandle offsetColumnFamilyHandle) throws RocksDBException;
+
+    private void wipeOffsets(final RocksDBStore.DBAccessor accessor) throws 
RocksDBException {
+        try (final RocksIterator iter = 
accessor.newIterator(offsetColumnFamilyHandle)) {
+            iter.seekToFirst();
+            while (iter.isValid()) {
+                final byte[] key = iter.key();
+                if (!Arrays.equals(key, statusKey)) {
+                    accessor.delete(offsetColumnFamilyHandle, key);
+                }
+                iter.next();
+            }
+        }
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
index 4b1ff1e0f1d..f353e56f7df 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
@@ -277,6 +277,17 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
         segments.commit(changelogOffsets);
     }
 
+    @SuppressWarnings("deprecation")
+    @Override
+    public boolean managesOffsets() {
+        return segments.managesOffsets();
+    }
+
+    @Override
+    public Long committedOffset(final TopicPartition partition) {
+        return segments.committedOffset(partition);
+    }
+
     @Override
     public void close() {
         open = false;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index 9a4d2458848..c6b57411067 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -318,6 +318,17 @@ public class AbstractRocksDBSegmentedBytesStore<S extends 
Segment> implements Se
         segments.commit(changelogOffsets);
     }
 
+    @SuppressWarnings("deprecation")
+    @Override
+    public boolean managesOffsets() {
+        return segments.managesOffsets();
+    }
+
+    @Override
+    public Long committedOffset(final TopicPartition partition) {
+        return segments.committedOffset(partition);
+    }
+
     @Override
     public void close() {
         open = false;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
index c5834c49231..ae292a44f8b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
@@ -184,6 +184,23 @@ abstract class AbstractSegments<S extends Segment> 
implements Segments<S> {
         }
     }
 
+    @SuppressWarnings("deprecation")
+    @Override
+    public boolean managesOffsets() {
+        return true;
+    }
+
+    @Override
+    public Long committedOffset(final TopicPartition partition) {
+        for (final S segment : segments.values()) {
+            final Long offset = segment.committedOffset(partition);
+            if (offset != null) {
+                return offset;
+            }
+        }
+        return null;
+    }
+
     @Override
     public void close() {
         for (final S segment : segments.values()) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
index f62c703d91d..3612776fbeb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
@@ -106,6 +106,17 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter 
implements KeyValueSt
         store.commit(changelogOffsets);
     }
 
+    @SuppressWarnings("deprecation")
+    @Override
+    public boolean managesOffsets() {
+        return store.managesOffsets();
+    }
+
+    @Override
+    public Long committedOffset(final TopicPartition partition) {
+        return store.committedOffset(partition);
+    }
+
     @Override
     public void close() {
         store.close();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
index 311285a2fba..bc059f03ffd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java
@@ -117,7 +117,12 @@ public class LegacyCheckpointingStateStore<S extends 
StateStore, K, V> extends W
                 for (final Map.Entry<TopicPartition, Long> entry : 
legacyCheckpoint.read().entrySet()) {
                     final StateStore store = stores.get(entry.getKey());
                     if (store != null) {
-                        storesToMigrate.computeIfAbsent(store, k -> new 
HashMap<>()).put(entry.getKey(), entry.getValue());
+                        final Long offset = 
changelogOffsetFromCheckpointedOffset(entry.getValue());
+                        if (offset != null) {
+                            storesToMigrate
+                                    .computeIfAbsent(store, k -> new 
HashMap<>())
+                                    .put(entry.getKey(), offset);
+                        }
                     }
                 }
 
@@ -169,7 +174,7 @@ public class LegacyCheckpointingStateStore<S extends 
StateStore, K, V> extends W
             final Map<TopicPartition, Long> allOffsets = checkpointFile.read();
             for (final Map.Entry<TopicPartition, Long> entry : 
allOffsets.entrySet()) {
                 if (changelogPartitions.contains(entry.getKey())) {
-                    offsets.put(entry.getKey(), 
changelogOffsetFromCheckpointedOffset(entry.getValue()));
+                    offsets.put(entry.getKey(), entry.getValue());
                 }
             }
             checkpointedOffsets = new HashMap<>(offsets);
@@ -206,7 +211,17 @@ public class LegacyCheckpointingStateStore<S extends 
StateStore, K, V> extends W
         super.commit(changelogOffsets);
 
         // update in-memory offsets
-        offsets.putAll(changelogOffsets);
+        if (changelogOffsets.isEmpty()) {
+            offsets.clear();
+        } else {
+            for (final Map.Entry<TopicPartition, Long> entry : 
changelogOffsets.entrySet()) {
+                if (entry.getValue() != null) {
+                    offsets.put(entry.getKey(), entry.getValue());
+                } else {
+                    offsets.remove(entry.getKey());
+                }
+            }
+        }
 
         // only write the checkpoint file if both:
         // 1. in ALOS mode (under EOS, the checkpoint file is only written 
when closing the store)
@@ -236,14 +251,8 @@ public class LegacyCheckpointingStateStore<S extends 
StateStore, K, V> extends W
         // only checkpoint persistent and logged stores
         if (persistent() && !changelogPartitions.isEmpty()) {
             try {
-                // merge new checkpoint offsets into checkpoint file
-                final Map<TopicPartition, Long> checkpointingOffsets = new 
HashMap<>(offsets.size());
-                for (final Map.Entry<TopicPartition, Long> entry : 
offsets.entrySet()) {
-                    checkpointingOffsets.put(entry.getKey(), 
checkpointableOffsetFromChangelogOffset(entry.getValue()));
-                }
-
-                log.debug("Writing checkpoint: {} for task {}", 
checkpointingOffsets, taskId);
-                checkpointFile.write(checkpointingOffsets);
+                log.debug("Writing checkpoint: {} for task {}", offsets, 
taskId);
+                checkpointFile.write(offsets);
                 checkpointedOffsets = new HashMap<>(offsets);
             } catch (final IOException e) {
                 log.warn("{}Failed to write offset checkpoint file to [{}]." +
@@ -295,11 +304,6 @@ public class LegacyCheckpointingStateStore<S extends 
StateStore, K, V> extends W
         return totalOffsetDelta > OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;
     }
 
-    // Pass in a sentinel value to checkpoint when the changelog offset is not 
yet initialized/known
-    private static long checkpointableOffsetFromChangelogOffset(final Long 
offset) {
-        return offset != null ? offset : OFFSET_UNKNOWN;
-    }
-
     // Convert the written offsets in the checkpoint file back to the 
changelog offset
     private static Long changelogOffsetFromCheckpointedOffset(final long 
offset) {
         return offset != OFFSET_UNKNOWN ? offset : null;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
index 34e1ec42d77..fbf7722629c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
@@ -107,6 +107,17 @@ public class LogicalKeyValueSegments extends 
AbstractSegments<LogicalKeyValueSeg
         physicalStore.commit(changelogOffsets);
     }
 
+    @SuppressWarnings("deprecation")
+    @Override
+    public boolean managesOffsets() {
+        return physicalStore.managesOffsets();
+    }
+
+    @Override
+    public Long committedOffset(final TopicPartition partition) {
+        return physicalStore.committedOffset(partition);
+    }
+
     @Override
     public void close() {
         // close the logical segments first to close any open iterators
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapter.java
index 4d762b82392..17415dbe207 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapter.java
@@ -110,6 +110,17 @@ public class PlainToHeadersStoreAdapter implements 
KeyValueStore<Bytes, byte[]>
         store.commit(changelogOffsets);
     }
 
+    @SuppressWarnings("deprecation")
+    @Override
+    public boolean managesOffsets() {
+        return store.managesOffsets();
+    }
+
+    @Override
+    public Long committedOffset(final TopicPartition partition) {
+        return store.committedOffset(partition);
+    }
+
     @Override
     public void close() {
         store.close();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapter.java
index 88be98af91b..a463dbc97f2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersWindowStoreAdapter.java
@@ -221,6 +221,17 @@ public class PlainToHeadersWindowStoreAdapter implements 
WindowStore<Bytes, byte
         store.commit(changelogOffsets);
     }
 
+    @SuppressWarnings("deprecation")
+    @Override
+    public boolean managesOffsets() {
+        return store.managesOffsets();
+    }
+
+    @Override
+    public Long committedOffset(final TopicPartition partition) {
+        return store.committedOffset(partition);
+    }
+
     @Override
     public void close() {
         store.close();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
index 3ccf8f6b663..e9c705972ee 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
@@ -309,6 +309,17 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
         // same physical RocksDB instance
     }
 
+    @SuppressWarnings("deprecation")
+    @Override
+    public boolean managesOffsets() {
+        return segmentStores.managesOffsets();
+    }
+
+    @Override
+    public Long committedOffset(final TopicPartition partition) {
+        return segmentStores.committedOffset(partition);
+    }
+
     @Override
     public void close() {
         open = false;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index e9c7d144d84..9c902da34a1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -42,5 +42,10 @@ interface Segments<S extends Segment> {
 
     void commit(final Map<TopicPartition, Long> changelogOffsets);
 
+    @SuppressWarnings("deprecation")
+    boolean managesOffsets();
+
+    Long committedOffset(final TopicPartition partition);
+
     void close();
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java
index c6d475c2f33..4771a86a5f9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java
@@ -156,6 +156,17 @@ public class SessionToHeadersStoreAdapter implements 
SessionStore<Bytes, byte[]>
         store.commit(changelogOffsets);
     }
 
+    @SuppressWarnings("deprecation")
+    @Override
+    public boolean managesOffsets() {
+        return store.managesOffsets();
+    }
+
+    @Override
+    public Long committedOffset(final TopicPartition partition) {
+        return store.committedOffset(partition);
+    }
+
     @Override
     public void close() {
         store.close();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
index 7ba3b9bd8db..facef5f0a06 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
@@ -112,6 +112,17 @@ public class TimestampedToHeadersStoreAdapter implements 
KeyValueStore<Bytes, by
         store.commit(changelogOffsets);
     }
 
+    @SuppressWarnings("deprecation")
+    @Override
+    public boolean managesOffsets() {
+        return store.managesOffsets();
+    }
+
+    @Override
+    public Long committedOffset(final TopicPartition partition) {
+        return store.committedOffset(partition);
+    }
+
     @Override
     public void close() {
         store.close();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
index 97f10285849..738787b9c21 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
@@ -193,6 +193,17 @@ public class TimestampedToHeadersWindowStoreAdapter 
implements WindowStore<Bytes
         store.commit(changelogOffsets);
     }
 
+    @SuppressWarnings("deprecation")
+    @Override
+    public boolean managesOffsets() {
+        return store.managesOffsets();
+    }
+
+    @Override
+    public Long committedOffset(final TopicPartition partition) {
+        return store.committedOffset(partition);
+    }
+
     @Override
     public void close() {
         store.close();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java
index 1d16b2a86b7..c02cec5747a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java
@@ -99,6 +99,17 @@ public class VersionedKeyValueToBytesStoreAdapter implements 
VersionedBytesStore
         inner.commit(changelogOffsets);
     }
 
+    @SuppressWarnings("deprecation")
+    @Override
+    public boolean managesOffsets() {
+        return inner.managesOffsets();
+    }
+
+    @Override
+    public Long committedOffset(final TopicPartition partition) {
+        return inner.committedOffset(partition);
+    }
+
     @Override
     public void close() {
         inner.close();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
index 1261f17a697..5a7f05c1b2b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
@@ -167,6 +167,17 @@ public class WindowToTimestampedWindowByteStoreAdapter 
implements WindowStore<By
         store.commit(changelogOffsets);
     }
 
+    @SuppressWarnings("deprecation")
+    @Override
+    public boolean managesOffsets() {
+        return store.managesOffsets();
+    }
+
+    @Override
+    public Long committedOffset(final TopicPartition partition) {
+        return store.committedOffset(partition);
+    }
+
     @Override
     public void close() {
         store.close();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index d1ce967bcc8..e6e6c92e559 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -128,6 +128,17 @@ public abstract class WrappedStateStore<S extends 
StateStore, K, V> implements S
         wrapped.commit(changelogOffsets);
     }
 
+    @SuppressWarnings("deprecation")
+    @Override
+    public boolean managesOffsets() {
+        return wrapped.managesOffsets();
+    }
+
+    @Override
+    public Long committedOffset(final TopicPartition partition) {
+        return wrapped.committedOffset(partition);
+    }
+
     @Override
     public void close() {
         wrapped.close();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
index 493e437e2c7..9b80801b706 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStoreTest.java
@@ -49,7 +49,6 @@ import java.util.Set;
 
 import static 
org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore.CHECKPOINT_FILE_NAME;
 import static 
org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore.OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;
-import static 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.OFFSET_UNKNOWN;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.CoreMatchers.instanceOf;
@@ -307,8 +306,9 @@ public class LegacyCheckpointingStateStoreTest {
     }
 
     @Test
-    public void shouldMapOffsetUnknownToNullOnInit() throws IOException {
-        writeCheckpointFile(Collections.singletonMap(partition, 
OFFSET_UNKNOWN));
+    public void shouldReturnNullForPartitionNotInCheckpointOnInit() throws 
IOException {
+        // write a checkpoint with no entry for our partition
+        writeCheckpointFile(Collections.emptyMap());
 
         final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
         store.init(context, persistentStore);
@@ -464,15 +464,21 @@ public class LegacyCheckpointingStateStoreTest {
     }
 
     @Test
-    public void shouldWriteOffsetUnknownSentinelWhenOffsetIsNull() throws 
IOException {
+    public void shouldRemoveOffsetWhenCommittedWithNull() throws IOException {
         final LegacyCheckpointingStateStore<MockKeyValueStore, Object, Object> 
store = createStore(false);
+        // first commit a real offset
+        store.commit(Collections.singletonMap(partition, 42L));
+        store.checkpoint();
+        assertEquals(42L, (long) readCheckpointFile().get(partition));
+
+        // now commit null for the same partition — should remove it
         final Map<TopicPartition, Long> nullOffset = new HashMap<>();
         nullOffset.put(partition, null);
         store.commit(nullOffset);
         store.checkpoint();
 
-        final Map<TopicPartition, Long> checkpointed = readCheckpointFile();
-        assertEquals(OFFSET_UNKNOWN, (long) checkpointed.get(partition));
+        assertFalse(readCheckpointFile().containsKey(partition));
+        assertNull(store.committedOffset(partition));
     }
 
     @Test


Reply via email to