This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d84e19dde4f KAFKA-19712: Commit OFFSET_UNKOWN when closing state
stores and the offsets are not yet initialized (#21884)
d84e19dde4f is described below
commit d84e19dde4f59fc956a34a41df924f5f6a32670a
Author: Eduwer Camacaro <[email protected]>
AuthorDate: Tue Mar 31 11:45:28 2026 -0500
KAFKA-19712: Commit OFFSET_UNKOWN when closing state stores and the offsets
are not yet initialized (#21884)
KAFKA-19712 got rid of sentinel values when uninitialized offsets were
committed. This made state stores close with invalid offsets when the
store's offsets were not yet initialized.
These changes rollback this decision because it was causing
TaskCorruptedException during rebalances.
Reviewers: Bill Bejeck <[email protected]>
---
.../processor/internals/ProcessorStateManager.java | 18 +++++--
.../internals/ProcessorStateManagerTest.java | 55 +++++++++++++++++++++-
2 files changed, 68 insertions(+), 5 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index db033320795..7b3483f8e68 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -54,6 +54,7 @@ import java.util.stream.Collectors;
import static java.lang.String.format;
import static
org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore;
import static
org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
+import static
org.apache.kafka.streams.state.internals.OffsetCheckpoint.OFFSET_UNKNOWN;
/**
* ProcessorStateManager is the source of truth for the current offset for
each state store,
@@ -307,7 +308,7 @@ public class ProcessorStateManager implements StateManager {
final Long offset =
store.stateStore.committedOffset(store.changelogPartition);
if (offset != null) {
- store.setOffset(offset);
+
store.setOffset(changelogOffsetFromCommittedOffset(offset));
log.info("State store {} initialized from checkpoint with
offset {} at changelog {}",
store.stateStore.name(), store.offset,
store.changelogPartition);
} else {
@@ -509,10 +510,11 @@ public class ProcessorStateManager implements
StateManager {
final StateStore store = metadata.stateStore;
log.trace("Committing store {}", store.name());
try {
- if (metadata.changelogPartition == null || metadata.offset
== null || metadata.corrupted || !store.persistent()) {
+ if (metadata.changelogPartition == null ||
metadata.corrupted || !store.persistent()) {
store.commit(Map.of());
} else {
- store.commit(Map.of(metadata.changelogPartition,
metadata.offset));
+ // logged store, persistent and valid end offset
+ store.commit(Map.of(metadata.changelogPartition,
committableOffsetFromChangelogOffset(metadata.offset)));
}
if (!metadata.corrupted && metadata.commitCallback !=
null) {
@@ -701,6 +703,16 @@ public class ProcessorStateManager implements StateManager
{
stateDirectory.updateTaskOffsets(taskId, changelogOffsets());
}
+ // Commit a sentinel value when the changelog offset is not yet
initialized/known
+ private long committableOffsetFromChangelogOffset(final Long offset) {
+ return offset != null ? offset : OFFSET_UNKNOWN;
+ }
+
+ // Convert the written offsets in the checkpoint file back to the
changelog offset
+ private Long changelogOffsetFromCommittedOffset(final long offset) {
+ return offset != OFFSET_UNKNOWN ? offset : null;
+ }
+
private TopicPartition getStorePartition(final String storeName) {
// NOTE we assume the partition of the topic can always be inferred
from the task id;
// if user ever use a custom partition grouper (deprecated in KIP-528)
this would break and
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 76a10cf192a..f2a96bffa3e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -45,7 +45,6 @@ import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockRestoreCallback;
import org.apache.kafka.test.TestUtils;
-import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -81,6 +80,7 @@ import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -574,7 +574,7 @@ public class ProcessorStateManagerTest {
assertTrue(nonPersistentStore.committed);
// make sure that flush is called in the proper order
- assertThat(persistentStore.getLastCommitCount(),
Matchers.lessThan(nonPersistentStore.getLastCommitCount()));
+ assertThat(persistentStore.getLastCommitCount(),
lessThan(nonPersistentStore.getLastCommitCount()));
stateMgr.updateChangelogOffsets(ackedOffsets);
stateMgr.commit();
@@ -593,6 +593,57 @@ public class ProcessorStateManagerTest {
}
}
+ @Test
+ public void shouldCommitAndCloseLegacyStoresWithUnknownOffsetPositions()
throws Exception {
+ checkpoint.write(emptyMap());
+
+ final File storeCheckpointFile = new
File(stateDirectory.getOrCreateDirectoryForTask(taskId), CHECKPOINT_FILE_NAME +
"_" + persistentStore.name());
+
+ // set up ack'ed offsets
+ final HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
+ ackedOffsets.put(persistentStorePartition, null);
+ ackedOffsets.put(nonPersistentStorePartition, 456L);
+ ackedOffsets.put(new TopicPartition("nonRegisteredTopic", 1), 789L);
+
+ final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE);
+ contextRegistersStateStore(stateMgr);
+ try {
+ // make sure the checkpoint file is not written yet
+ assertFalse(storeCheckpointFile.exists());
+
+ stateMgr.registerStateStores(Arrays.asList(persistentStore,
nonPersistentStore), context);
+ } finally {
+ stateMgr.commit();
+
+ assertTrue(persistentStore.committed);
+ assertTrue(nonPersistentStore.committed);
+
+ // make sure that flush is called in the proper order
+ assertThat(persistentStore.getLastCommitCount(),
lessThan(nonPersistentStore.getLastCommitCount()));
+
+ stateMgr.updateChangelogOffsets(ackedOffsets);
+ stateMgr.commit();
+ stateMgr.close();
+ assertTrue(persistentStore.closed);
+ assertTrue(nonPersistentStore.closed);
+
+ assertTrue(storeCheckpointFile.exists());
+
+ // the checkpoint file should contain an offset from the
persistent store only.
+ final OffsetCheckpoint storeCheckpoint = new
OffsetCheckpoint(storeCheckpointFile);
+ final Map<TopicPartition, Long> checkpointedOffsets =
storeCheckpoint.read();
+ assertThat(checkpointedOffsets, is(singletonMap(new
TopicPartition(persistentStoreTopicName, 1), -4L)));
+
+ try {
+ // Reopen to verify null commited offset
+ stateMgr.registerStateStores(Arrays.asList(persistentStore,
nonPersistentStore), context);
+
assertNull(stateMgr.storeMetadata(persistentStorePartition).offset());
+ } finally {
+ stateMgr.close();
+ }
+ }
+ }
+
@Test
public void shouldOverrideOffsetsWhenRestoreAndProcess() throws
IOException {
final Map<TopicPartition, Long> offsets =
singletonMap(persistentStorePartition, 99L);