This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d84e19dde4f KAFKA-19712: Commit OFFSET_UNKOWN when closing state 
stores and the offsets are not yet initialized (#21884)
d84e19dde4f is described below

commit d84e19dde4f59fc956a34a41df924f5f6a32670a
Author: Eduwer Camacaro <[email protected]>
AuthorDate: Tue Mar 31 11:45:28 2026 -0500

    KAFKA-19712: Commit OFFSET_UNKOWN when closing state stores and the offsets 
are not yet initialized (#21884)
    
    KAFKA-19712 got rid of sentinel values when uninitialized offsets were
    committed. This made state stores close with invalid offsets when the
    store's offsets were not yet initialized.
    
    These changes rollback this decision because it was causing
    TaskCorruptedException during rebalances.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../processor/internals/ProcessorStateManager.java | 18 +++++--
 .../internals/ProcessorStateManagerTest.java       | 55 +++++++++++++++++++++-
 2 files changed, 68 insertions(+), 5 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index db033320795..7b3483f8e68 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -54,6 +54,7 @@ import java.util.stream.Collectors;
 import static java.lang.String.format;
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore;
 import static 
org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
+import static 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.OFFSET_UNKNOWN;
 
 /**
  * ProcessorStateManager is the source of truth for the current offset for 
each state store,
@@ -307,7 +308,7 @@ public class ProcessorStateManager implements StateManager {
                 final Long offset = 
store.stateStore.committedOffset(store.changelogPartition);
 
                 if (offset != null) {
-                    store.setOffset(offset);
+                    
store.setOffset(changelogOffsetFromCommittedOffset(offset));
                     log.info("State store {} initialized from checkpoint with 
offset {} at changelog {}",
                             store.stateStore.name(), store.offset, 
store.changelogPartition);
                 } else {
@@ -509,10 +510,11 @@ public class ProcessorStateManager implements 
StateManager {
                 final StateStore store = metadata.stateStore;
                 log.trace("Committing store {}", store.name());
                 try {
-                    if (metadata.changelogPartition == null || metadata.offset 
== null || metadata.corrupted || !store.persistent()) {
+                    if (metadata.changelogPartition == null || 
metadata.corrupted || !store.persistent()) {
                         store.commit(Map.of());
                     } else {
-                        store.commit(Map.of(metadata.changelogPartition, 
metadata.offset));
+                        // logged store, persistent and valid end offset
+                        store.commit(Map.of(metadata.changelogPartition, 
committableOffsetFromChangelogOffset(metadata.offset)));
                     }
 
                     if (!metadata.corrupted && metadata.commitCallback != 
null) {
@@ -701,6 +703,16 @@ public class ProcessorStateManager implements StateManager 
{
         stateDirectory.updateTaskOffsets(taskId, changelogOffsets());
     }
 
+    // Commit a sentinel value when the changelog offset is not yet 
initialized/known
+    private long committableOffsetFromChangelogOffset(final Long offset) {
+        return offset != null ? offset : OFFSET_UNKNOWN;
+    }
+
+    // Convert the written offsets in the checkpoint file back to the 
changelog offset
+    private Long changelogOffsetFromCommittedOffset(final long offset) {
+        return offset != OFFSET_UNKNOWN ? offset : null;
+    }
+
     private  TopicPartition getStorePartition(final String storeName) {
         // NOTE we assume the partition of the topic can always be inferred 
from the task id;
         // if user ever use a custom partition grouper (deprecated in KIP-528) 
this would break and
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 76a10cf192a..f2a96bffa3e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -45,7 +45,6 @@ import org.apache.kafka.test.MockKeyValueStore;
 import org.apache.kafka.test.MockRestoreCallback;
 import org.apache.kafka.test.TestUtils;
 
-import org.hamcrest.Matchers;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -81,6 +80,7 @@ import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.core.Is.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -574,7 +574,7 @@ public class ProcessorStateManagerTest {
             assertTrue(nonPersistentStore.committed);
 
             // make sure that flush is called in the proper order
-            assertThat(persistentStore.getLastCommitCount(), 
Matchers.lessThan(nonPersistentStore.getLastCommitCount()));
+            assertThat(persistentStore.getLastCommitCount(), 
lessThan(nonPersistentStore.getLastCommitCount()));
 
             stateMgr.updateChangelogOffsets(ackedOffsets);
             stateMgr.commit();
@@ -593,6 +593,57 @@ public class ProcessorStateManagerTest {
         }
     }
 
+    @Test
+    public void shouldCommitAndCloseLegacyStoresWithUnknownOffsetPositions() 
throws Exception {
+        checkpoint.write(emptyMap());
+
+        final File storeCheckpointFile = new 
File(stateDirectory.getOrCreateDirectoryForTask(taskId), CHECKPOINT_FILE_NAME + 
"_" + persistentStore.name());
+
+        // set up ack'ed offsets
+        final HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
+        ackedOffsets.put(persistentStorePartition, null);
+        ackedOffsets.put(nonPersistentStorePartition, 456L);
+        ackedOffsets.put(new TopicPartition("nonRegisteredTopic", 1), 789L);
+
+        final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
+        contextRegistersStateStore(stateMgr);
+        try {
+            // make sure the checkpoint file is not written yet
+            assertFalse(storeCheckpointFile.exists());
+
+            stateMgr.registerStateStores(Arrays.asList(persistentStore, 
nonPersistentStore), context);
+        } finally {
+            stateMgr.commit();
+
+            assertTrue(persistentStore.committed);
+            assertTrue(nonPersistentStore.committed);
+
+            // make sure that flush is called in the proper order
+            assertThat(persistentStore.getLastCommitCount(), 
lessThan(nonPersistentStore.getLastCommitCount()));
+
+            stateMgr.updateChangelogOffsets(ackedOffsets);
+            stateMgr.commit();
+            stateMgr.close();
+            assertTrue(persistentStore.closed);
+            assertTrue(nonPersistentStore.closed);
+
+            assertTrue(storeCheckpointFile.exists());
+
+            // the checkpoint file should contain an offset from the 
persistent store only.
+            final OffsetCheckpoint storeCheckpoint = new 
OffsetCheckpoint(storeCheckpointFile);
+            final Map<TopicPartition, Long> checkpointedOffsets = 
storeCheckpoint.read();
+            assertThat(checkpointedOffsets, is(singletonMap(new 
TopicPartition(persistentStoreTopicName, 1), -4L)));
+
+            try {
+                // Reopen to verify null commited offset
+                stateMgr.registerStateStores(Arrays.asList(persistentStore, 
nonPersistentStore), context);
+                
assertNull(stateMgr.storeMetadata(persistentStorePartition).offset());
+            } finally {
+                stateMgr.close();
+            }
+        }
+    }
+
     @Test
     public void shouldOverrideOffsetsWhenRestoreAndProcess() throws 
IOException {
         final Map<TopicPartition, Long> offsets = 
singletonMap(persistentStorePartition, 99L);

Reply via email to