This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 4.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit b82caca83d02b29de6dfe5ec6b35ac1e84ecf703 Author: Eduwer Camacaro <[email protected]> AuthorDate: Tue Mar 31 11:45:28 2026 -0500 KAFKA-19712: Commit OFFSET_UNKOWN when closing state stores and the offsets are not yet initialized (#21884) KAFKA-19712 got rid of sentinel values when uninitialized offsets were committed. This made state stores close with invalid offsets when the store's offsets were not yet initialized. These changes rollback this decision because it was causing TaskCorruptedException during rebalances. Reviewers: Bill Bejeck <[email protected]> --- .../processor/internals/ProcessorStateManager.java | 18 +++++-- .../internals/ProcessorStateManagerTest.java | 55 +++++++++++++++++++++- 2 files changed, 68 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index db033320795..7b3483f8e68 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -54,6 +54,7 @@ import java.util.stream.Collectors; import static java.lang.String.format; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore; import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt; +import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.OFFSET_UNKNOWN; /** * ProcessorStateManager is the source of truth for the current offset for each state store, @@ -307,7 +308,7 @@ public class ProcessorStateManager implements StateManager { final Long offset = store.stateStore.committedOffset(store.changelogPartition); if (offset != null) { - store.setOffset(offset); + store.setOffset(changelogOffsetFromCommittedOffset(offset)); log.info("State store {} initialized from checkpoint with offset {} at changelog {}", store.stateStore.name(), store.offset, store.changelogPartition); } else { @@ -509,10 +510,11 @@ public class ProcessorStateManager implements StateManager { final StateStore store = metadata.stateStore; log.trace("Committing store {}", store.name()); try { - if (metadata.changelogPartition == null || metadata.offset == null || metadata.corrupted || !store.persistent()) { + if (metadata.changelogPartition == null || metadata.corrupted || !store.persistent()) { store.commit(Map.of()); } else { - store.commit(Map.of(metadata.changelogPartition, metadata.offset)); + // logged store, persistent and valid end offset + store.commit(Map.of(metadata.changelogPartition, committableOffsetFromChangelogOffset(metadata.offset))); } if (!metadata.corrupted && metadata.commitCallback != null) { @@ -701,6 +703,16 @@ public class ProcessorStateManager implements StateManager { stateDirectory.updateTaskOffsets(taskId, changelogOffsets()); } + // Commit a sentinel value when the changelog offset is not yet initialized/known + private long committableOffsetFromChangelogOffset(final Long offset) { + return offset != null ? offset : OFFSET_UNKNOWN; + } + + // Convert the written offsets in the checkpoint file back to the changelog offset + private Long changelogOffsetFromCommittedOffset(final long offset) { + return offset != OFFSET_UNKNOWN ? offset : null; + } + private TopicPartition getStorePartition(final String storeName) { // NOTE we assume the partition of the topic can always be inferred from the task id; // if user ever use a custom partition grouper (deprecated in KIP-528) this would break and diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 76a10cf192a..f2a96bffa3e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -45,7 +45,6 @@ import org.apache.kafka.test.MockKeyValueStore; import org.apache.kafka.test.MockRestoreCallback; import org.apache.kafka.test.TestUtils; -import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -81,6 +80,7 @@ import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -574,7 +574,7 @@ public class ProcessorStateManagerTest { assertTrue(nonPersistentStore.committed); // make sure that flush is called in the proper order - assertThat(persistentStore.getLastCommitCount(), Matchers.lessThan(nonPersistentStore.getLastCommitCount())); + assertThat(persistentStore.getLastCommitCount(), lessThan(nonPersistentStore.getLastCommitCount())); stateMgr.updateChangelogOffsets(ackedOffsets); stateMgr.commit(); @@ -593,6 +593,57 @@ public class ProcessorStateManagerTest { } } + @Test + public void shouldCommitAndCloseLegacyStoresWithUnknownOffsetPositions() throws Exception { + checkpoint.write(emptyMap()); + + final File storeCheckpointFile = new File(stateDirectory.getOrCreateDirectoryForTask(taskId), CHECKPOINT_FILE_NAME + "_" + persistentStore.name()); + + // set up ack'ed offsets + final HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>(); + ackedOffsets.put(persistentStorePartition, null); + ackedOffsets.put(nonPersistentStorePartition, 456L); + ackedOffsets.put(new TopicPartition("nonRegisteredTopic", 1), 789L); + + final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE); + contextRegistersStateStore(stateMgr); + try { + // make sure the checkpoint file is not written yet + assertFalse(storeCheckpointFile.exists()); + + stateMgr.registerStateStores(Arrays.asList(persistentStore, nonPersistentStore), context); + } finally { + stateMgr.commit(); + + assertTrue(persistentStore.committed); + assertTrue(nonPersistentStore.committed); + + // make sure that flush is called in the proper order + assertThat(persistentStore.getLastCommitCount(), lessThan(nonPersistentStore.getLastCommitCount())); + + stateMgr.updateChangelogOffsets(ackedOffsets); + stateMgr.commit(); + stateMgr.close(); + assertTrue(persistentStore.closed); + assertTrue(nonPersistentStore.closed); + + assertTrue(storeCheckpointFile.exists()); + + // the checkpoint file should contain an offset from the persistent store only. + final OffsetCheckpoint storeCheckpoint = new OffsetCheckpoint(storeCheckpointFile); + final Map<TopicPartition, Long> checkpointedOffsets = storeCheckpoint.read(); + assertThat(checkpointedOffsets, is(singletonMap(new TopicPartition(persistentStoreTopicName, 1), -4L))); + + try { + // Reopen to verify null commited offset + stateMgr.registerStateStores(Arrays.asList(persistentStore, nonPersistentStore), context); + assertNull(stateMgr.storeMetadata(persistentStorePartition).offset()); + } finally { + stateMgr.close(); + } + } + } + @Test public void shouldOverrideOffsetsWhenRestoreAndProcess() throws IOException { final Map<TopicPartition, Long> offsets = singletonMap(persistentStorePartition, 99L);
