This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 917caaee167 KAFKA-20417: Add KIP-1035 self-managed offset recovery
integration tests (#21998)
917caaee167 is described below
commit 917caaee1677fc219dfaa60d6233ab8c72758a3b
Author: Bill Bejeck <[email protected]>
AuthorDate: Fri Apr 10 15:02:07 2026 -0400
KAFKA-20417: Add KIP-1035 self-managed offset recovery integration tests
(#21998)
Adds `SelfManagedOffsetRecoveryIntegrationTest` and
`RocksDBStoreTestingUtils`
Tests verify Kafka Streams recovers correctly after unclean shutdowns
when offsets are stored in RocksDB column families (KIP-1035). Covers:
- Unclean shutdown recovery (ALOS and EOS) - Missing
offsets triggering task corruption detection
- Combined status=open + missing offsets
- EOS exactly-once guarantees preserved after recovery
- Multi-store partial corruption
- Standby task recovery
- KAFKA-19712 regression (standby TaskCorruptedException after state
wipe)
Reviewers: Matthias J. Sax <[email protected]>
---
.../SelfManagedOffsetRecoveryIntegrationTest.java | 693 +++++++++++++++++++++
.../state/internals/RocksDBStoreTestingUtils.java | 216 +++++++
2 files changed, 909 insertions(+)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfManagedOffsetRecoveryIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfManagedOffsetRecoveryIntegrationTest.java
new file mode 100644
index 00000000000..a9ac4b4059b
--- /dev/null
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfManagedOffsetRecoveryIntegrationTest.java
@@ -0,0 +1,693 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.ThreadMetadata;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.state.internals.RocksDBStoreTestingUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for KIP-1035 column family offset recovery.
+ *
+ * KIP-1035 moved offset storage from external .checkpoint files into RocksDB
column families.
+ * These tests verify that Kafka Streams can recover from unclean shutdowns
and corrupted
+ * column family state, which is critical for exactly-once semantics (EOS)
correctness.
+ */
+@Tag("integration")
+@Timeout(600)
+public class SelfManagedOffsetRecoveryIntegrationTest {
+
+ private static final int NUM_BROKERS = 1;
+ private static final int NUM_PARTITIONS = 3;
+ private static final String INPUT_TOPIC = "input-topic";
+ private static final String OUTPUT_TOPIC = "output-topic";
+ private static final String OUTPUT_TOPIC_2 = "output-topic-2";
+ private static final String STORE_NAME = "counts-store";
+ private static final String STORE_NAME_2 = "counts-store-2";
+ private static final long COMMIT_INTERVAL_MS = 100L;
+ private static final Duration STREAMS_CLOSE_TIMEOUT =
Duration.ofSeconds(5);
+
+ private static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
+
+ private int consumerGroupCounter = 0;
+
+ private Properties streamsConfig;
+ private KafkaStreams streams;
+ private File stateDir;
+
+ @BeforeAll
+ public static void startCluster() throws IOException, InterruptedException
{
+ CLUSTER.start();
+ }
+
+ @AfterAll
+ public static void stopCluster() {
+ CLUSTER.stop();
+ }
+
+ @BeforeEach
+ public void setUp(final TestInfo testInfo) throws InterruptedException {
+ CLUSTER.deleteAllTopics();
+ CLUSTER.createTopic(INPUT_TOPIC, NUM_PARTITIONS, 1);
+ CLUSTER.createTopic(OUTPUT_TOPIC, NUM_PARTITIONS, 1);
+ CLUSTER.createTopic(OUTPUT_TOPIC_2, NUM_PARTITIONS, 1);
+
+ stateDir = TestUtils.tempDirectory();
+ final String safeTestName = safeUniqueTestName(testInfo);
+
+ streamsConfig = new Properties();
+ streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" +
safeTestName);
+ streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
+ streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
+ streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath());
+ streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+ streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
COMMIT_INTERVAL_MS);
+ streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5000);
+ streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (streams != null) {
+ closeStreams(streams);
+ streams.cleanUp();
+ }
+ }
+
+ private StreamsBuilder buildCountTopology() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream<String, String> stream = builder.stream(INPUT_TOPIC);
+ stream
+ .groupByKey()
+ .count(Materialized.as(STORE_NAME))
+ .toStream()
+ .to(OUTPUT_TOPIC);
+ return builder;
+ }
+
+ /**
+ * Builds a topology with two separate state stores:
+ * store 1: groupByKey -> count (counts per key)
+ * store 2: groupBy(value) -> count (counts per value)
+ */
+ private StreamsBuilder buildDualStoreTopology() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream<String, String> stream = builder.stream(INPUT_TOPIC);
+
+ // Store 1: count by key
+ stream
+ .groupByKey()
+ .count(Materialized.as(STORE_NAME))
+ .toStream()
+ .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
+
+ // Store 2: count by value
+ stream
+ .groupBy((key, value) -> value)
+ .count(Materialized.as(STORE_NAME_2))
+ .toStream()
+ .to(OUTPUT_TOPIC_2, Produced.with(Serdes.String(), Serdes.Long()));
+
+ return builder;
+ }
+
+ /**
+ * Corrupts store status to open for ALL task directories that contain the
given store.
+ */
+ private void setAllStoreStatusesToOpen(final String storeName) throws
Exception {
+ final String appId =
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
+ for (final File storeDir :
RocksDBStoreTestingUtils.findAllStoreDirs(stateDir, appId, storeName)) {
+ RocksDBStoreTestingUtils.setStoreStatusToOpen(storeDir);
+ }
+ }
+
+ /**
+ * Deletes offset entries from the offsets column family for ALL task
directories.
+ */
+ private void deleteAllOffsets(final String storeName) throws Exception {
+ final String appId =
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
+ for (final File storeDir :
RocksDBStoreTestingUtils.findAllStoreDirs(stateDir, appId, storeName)) {
+ RocksDBStoreTestingUtils.deleteOffsets(storeDir);
+ }
+ }
+
+
+ private void closeStreams(final KafkaStreams kafkaStreams) {
+ kafkaStreams.close(STREAMS_CLOSE_TIMEOUT);
+ }
+
+ private KafkaStreams startStreams() throws Exception {
+ final StreamsBuilder builder = buildCountTopology();
+ streams = new KafkaStreams(builder.build(), streamsConfig);
+ streams.cleanUp();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+ return streams;
+ }
+
+ private Properties producerConfig() {
+ final Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
+ return props;
+ }
+
+ private Properties readCommittedConsumerConfig() {
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "verify-consumer-" +
consumerGroupCounter++);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class.getName());
+ return props;
+ }
+
+ private void produceRecords(final List<KeyValue<String, String>> records) {
+ IntegrationTestUtils.produceKeyValuesSynchronously(
+ INPUT_TOPIC,
+ records,
+ producerConfig(),
+ CLUSTER.time
+ );
+ }
+
+ private List<KeyValue<String, Long>> waitForOutput(final int
expectedCount) throws Exception {
+ return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ readCommittedConsumerConfig(),
+ OUTPUT_TOPIC,
+ expectedCount
+ );
+ }
+
+ /**
+ * Extracts the latest count for each key from the output records.
+ */
+ private Map<String, Long> latestCountsFromOutput(final
List<KeyValue<String, Long>> output) {
+ final Map<String, Long> latest = new HashMap<>();
+ for (final KeyValue<String, Long> record : output) {
+ latest.put(record.key, record.value);
+ }
+ return latest;
+ }
+
+ /**
+ * ALOS baseline: after an unclean shutdown (status=open), the store
should recover
+ * because ALOS opens with ignoreInvalidState=true.
+ */
+ @Test
+ public void shouldRecoverFromUncleanShutdownWithAlos() throws Exception {
+ // No EOS — default is at-least-once
+
+ // Phase 1: start, produce, verify output
+ final List<KeyValue<String, String>> initialRecords = Arrays.asList(
+ new KeyValue<>("A", "v1"),
+ new KeyValue<>("B", "v1"),
+ new KeyValue<>("A", "v2")
+ );
+
+ startStreams();
+ produceRecords(initialRecords);
+ waitForOutput(initialRecords.size());
+
+ // Phase 2: clean shutdown, then corrupt store status
+ closeStreams(streams);
+ streams = null;
+
+ setAllStoreStatusesToOpen(STORE_NAME);
+
+ // Phase 3: restart — should recover despite status=open
+ final StreamsBuilder builder = buildCountTopology();
+ streams = new KafkaStreams(builder.build(), streamsConfig);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
+ // Phase 4: produce more records, verify processing continues
+ final List<KeyValue<String, String>> additionalRecords = Arrays.asList(
+ new KeyValue<>("A", "v3"),
+ new KeyValue<>("C", "v1")
+ );
+ produceRecords(additionalRecords);
+
+ // ALOS may produce duplicates, so we verify processing continues and
counts
+ // are at least the expected values (duplicates are acceptable under
ALOS).
+ final List<KeyValue<String, Long>> allOutput =
waitForOutput(initialRecords.size() + additionalRecords.size());
+ final Map<String, Long> counts = latestCountsFromOutput(allOutput);
+
+ // A: 3 (v1, v2, v3), B: 1, C: 1 — at minimum
+ assertTrue(counts.get("A") >= 3L, "A count should be at least 3");
+ assertTrue(counts.get("C") >= 1L, "C count should be at least 1");
+ }
+
+ /**
+ * Primary regression test for KIP-1035: after an unclean shutdown with
EOS enabled,
+ * the store status key is left as 1L (open).
AbstractColumnFamilyAccessor.open() throws
+ * ProcessorStateException("Invalid state during store open") which should
be caught and
+ * trigger task corruption recovery (wipe + restore from changelog).
+ *
+ * Without the fix, the ProcessorStateException propagates fatally and the
application
+ * fails to start.
+ */
+ @Test
+ public void shouldRecoverFromUncleanShutdownWithEos() throws Exception {
+ streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
+
+ // Phase 1: start with EOS, produce records, verify committed output
+ final List<KeyValue<String, String>> initialRecords = Arrays.asList(
+ new KeyValue<>("A", "v1"),
+ new KeyValue<>("B", "v1"),
+ new KeyValue<>("A", "v2")
+ );
+
+ startStreams();
+ produceRecords(initialRecords);
+ waitForOutput(initialRecords.size());
+
+ // Phase 2: clean shutdown, then corrupt store status to simulate
unclean shutdown
+ closeStreams(streams);
+ streams = null;
+
+ setAllStoreStatusesToOpen(STORE_NAME);
+
+ // Phase 3: restart with EOS — should detect corruption, wipe, and
restore from changelog
+ final StreamsBuilder builder = buildCountTopology();
+ streams = new KafkaStreams(builder.build(), streamsConfig);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
+ // Phase 4: produce more records and verify processing continues
correctly
+ final List<KeyValue<String, String>> additionalRecords = Arrays.asList(
+ new KeyValue<>("A", "v3"),
+ new KeyValue<>("C", "v1")
+ );
+ produceRecords(additionalRecords);
+
+ // After recovery from corruption, state is rebuilt from changelog.
+ final List<KeyValue<String, Long>> allOutput =
waitForOutput(initialRecords.size() + additionalRecords.size());
+ final Map<String, Long> counts = latestCountsFromOutput(allOutput);
+
+ // A: 3 (v1, v2, v3), B: 1, C: 1 — exact under EOS
+ assertEquals(3L, counts.get("A"), "A count after EOS recovery");
+ assertEquals(1L, counts.get("B"), "B count after EOS recovery");
+ assertEquals(1L, counts.get("C"), "C count after EOS recovery");
+ }
+
+ /**
+ * Tests the TaskCorruptedException path: offsets are deleted from the
column family
+ * but the store status is clean (closed). Under EOS, missing offsets
should trigger
+ * task corruption detection, causing a wipe and restore from changelog.
+ */
+ @Test
+ public void shouldRecoverFromMissingOffsetsInColumnFamilyWithEos() throws
Exception {
+ streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
+
+ // Phase 1: start with EOS, produce records, verify committed output
+ final List<KeyValue<String, String>> initialRecords = Arrays.asList(
+ new KeyValue<>("A", "v1"),
+ new KeyValue<>("B", "v1"),
+ new KeyValue<>("A", "v2")
+ );
+
+ startStreams();
+ produceRecords(initialRecords);
+ waitForOutput(initialRecords.size());
+
+ // Phase 2: clean shutdown, then delete offset entries (keep
status=closed)
+ closeStreams(streams);
+ streams = null;
+
+ deleteAllOffsets(STORE_NAME);
+
+ // Phase 3: restart — should detect missing offsets, mark task
corrupted, wipe and restore
+ final StreamsBuilder builder = buildCountTopology();
+ streams = new KafkaStreams(builder.build(), streamsConfig);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
+ // Phase 4: produce more records, verify data is re-bootstrapped from
changelog
+ final List<KeyValue<String, String>> additionalRecords = Arrays.asList(
+ new KeyValue<>("A", "v3"),
+ new KeyValue<>("C", "v1")
+ );
+ produceRecords(additionalRecords);
+
+ final List<KeyValue<String, Long>> allOutput =
waitForOutput(initialRecords.size() + additionalRecords.size());
+ final Map<String, Long> counts = latestCountsFromOutput(allOutput);
+
+ assertEquals(3L, counts.get("A"), "A count after missing offsets
recovery");
+ assertEquals(1L, counts.get("B"), "B count after missing offsets
recovery");
+ assertEquals(1L, counts.get("C"), "C count after missing offsets
recovery");
+ }
+
+ /**
+ * Combined worst case: status=open (unclean shutdown) AND no committed
offsets.
+ * Under EOS, this should still trigger corruption recovery.
+ *
+ * Without the fix, the ProcessorStateException from status=open
propagates fatally
+ * before the missing offsets are even checked.
+ */
+ @Test
+ public void shouldRecoverFromUncleanShutdownAndMissingOffsetsWithEos()
throws Exception {
+ streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
+
+ // Phase 1: start with EOS, produce records, verify committed output
+ final List<KeyValue<String, String>> initialRecords = Arrays.asList(
+ new KeyValue<>("A", "v1"),
+ new KeyValue<>("B", "v1"),
+ new KeyValue<>("A", "v2")
+ );
+
+ startStreams();
+ produceRecords(initialRecords);
+ waitForOutput(initialRecords.size());
+
+ // Phase 2: clean shutdown, then corrupt BOTH status and offsets
+ closeStreams(streams);
+ streams = null;
+
+ setAllStoreStatusesToOpen(STORE_NAME);
+ deleteAllOffsets(STORE_NAME);
+
+ // Phase 3: restart — should recover from both corruptions
+ final StreamsBuilder builder = buildCountTopology();
+ streams = new KafkaStreams(builder.build(), streamsConfig);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
+ // Phase 4: produce more records, verify data is re-bootstrapped
correctly
+ final List<KeyValue<String, String>> additionalRecords = Arrays.asList(
+ new KeyValue<>("A", "v3"),
+ new KeyValue<>("C", "v1")
+ );
+ produceRecords(additionalRecords);
+
+ final List<KeyValue<String, Long>> allOutput =
waitForOutput(initialRecords.size() + additionalRecords.size());
+ final Map<String, Long> counts = latestCountsFromOutput(allOutput);
+
+ assertEquals(3L, counts.get("A"), "A count after combined corruption
recovery");
+ assertEquals(1L, counts.get("B"), "B count after combined corruption
recovery");
+ assertEquals(1L, counts.get("C"), "C count after combined corruption
recovery");
+ }
+
+ /**
+ * Tests that partial store corruption is handled correctly: only one of
two stores
+ * is corrupted, and the application should still recover.
+ *
+ * Without the fix, corrupting even one store causes the application to
crash.
+ */
+ @Test
+ public void shouldRecoverMultipleStoresFromUncleanShutdown() throws
Exception {
+ streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
+
+ // Phase 1: start with dual-store topology, produce records
+ final List<KeyValue<String, String>> initialRecords = Arrays.asList(
+ new KeyValue<>("A", "v1"),
+ new KeyValue<>("B", "v2"),
+ new KeyValue<>("A", "v1")
+ );
+
+ final StreamsBuilder builder1 = buildDualStoreTopology();
+ streams = new KafkaStreams(builder1.build(), streamsConfig);
+ streams.cleanUp();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
+ produceRecords(initialRecords);
+ // Wait for output from the first store
+ waitForOutput(initialRecords.size());
+
+ // Phase 2: clean shutdown, corrupt ONLY store 1 (leave store 2 clean)
+ closeStreams(streams);
+ streams = null;
+
+ setAllStoreStatusesToOpen(STORE_NAME);
+ // STORE_NAME_2 is left with clean status
+
+ // Phase 3: restart — should recover the corrupted store, keep the
clean one
+ final StreamsBuilder builder2 = buildDualStoreTopology();
+ streams = new KafkaStreams(builder2.build(), streamsConfig);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
+ // Phase 4: produce more records, verify both stores produce correct
output
+ final List<KeyValue<String, String>> additionalRecords = Arrays.asList(
+ new KeyValue<>("C", "v3"),
+ new KeyValue<>("A", "v1")
+ );
+ produceRecords(additionalRecords);
+
+ final List<KeyValue<String, Long>> allOutput =
waitForOutput(initialRecords.size() + additionalRecords.size());
+ final Map<String, Long> counts = latestCountsFromOutput(allOutput);
+
+ // Store 1 counts by key: A=3 (v1, v1, v1), B=1 (v2), C=1 (v3)
+ assertEquals(3L, counts.get("A"), "A count after multi-store
recovery");
+ assertEquals(1L, counts.get("B"), "B count after multi-store
recovery");
+ assertEquals(1L, counts.get("C"), "C count after multi-store
recovery");
+ }
+
+ /**
+ * Tests standby task recovery with corrupted column family state.
+ * After corrupting instance 1's store, it should recover from the
standby/changelog
+ * and eventually take over as active when instance 2 is shut down.
+ *
+ * Without the fix, instance 1 fails to restart due to
ProcessorStateException.
+ */
+ @Test
+ public void shouldRecoverStandbyTaskFromUncleanShutdownWithEos() throws
Exception {
+ streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
+ streamsConfig.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+
+ // Use separate state dirs for each instance
+ final File stateDir1 = TestUtils.tempDirectory();
+ final File stateDir2 = TestUtils.tempDirectory();
+
+ // Phase 1: start two instances
+ final Properties config1 = new Properties();
+ config1.putAll(streamsConfig);
+ config1.put(StreamsConfig.STATE_DIR_CONFIG, stateDir1.getPath());
+
+ final Properties config2 = new Properties();
+ config2.putAll(streamsConfig);
+ config2.put(StreamsConfig.STATE_DIR_CONFIG, stateDir2.getPath());
+
+ final StreamsBuilder builder1 = buildCountTopology();
+ final StreamsBuilder builder2 = buildCountTopology();
+
+ final KafkaStreams streams1 = new KafkaStreams(builder1.build(),
config1);
+ final KafkaStreams streams2 = new KafkaStreams(builder2.build(),
config2);
+ streams1.cleanUp();
+ streams2.cleanUp();
+
IntegrationTestUtils.startApplicationAndWaitUntilRunning(Arrays.asList(streams1,
streams2));
+
+ // Phase 2: produce data, wait for processing
+ final List<KeyValue<String, String>> initialRecords = Arrays.asList(
+ new KeyValue<>("A", "v1"),
+ new KeyValue<>("B", "v1"),
+ new KeyValue<>("A", "v2")
+ );
+ produceRecords(initialRecords);
+ waitForOutput(initialRecords.size());
+
+ // Phase 3: shut down instance 1, corrupt its store status
+ closeStreams(streams1);
+
+ // Corrupt all store dirs under instance 1's state directory
+ final String appId =
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
+ for (final File storeDir :
RocksDBStoreTestingUtils.findAllStoreDirs(stateDir1, appId, STORE_NAME)) {
+ RocksDBStoreTestingUtils.setStoreStatusToOpen(storeDir);
+ }
+
+ // Phase 4: restart instance 1 — should recover from standby or
changelog
+ final StreamsBuilder builder1Restart = buildCountTopology();
+ final KafkaStreams streams1Restart = new
KafkaStreams(builder1Restart.build(), config1);
+
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams1Restart);
+
+ // Phase 5: shut down instance 2, verify instance 1 takes over
+ closeStreams(streams2);
+
+ // Produce more records and verify instance 1 processes them as active
+ final List<KeyValue<String, String>> additionalRecords = Arrays.asList(
+ new KeyValue<>("A", "v3"),
+ new KeyValue<>("C", "v1")
+ );
+ produceRecords(additionalRecords);
+
+ final List<KeyValue<String, Long>> allOutput =
waitForOutput(initialRecords.size() + additionalRecords.size());
+ final Map<String, Long> counts = latestCountsFromOutput(allOutput);
+
+ // A: 3 (v1, v2, v3), B: 1, C: 1
+ assertEquals(3L, counts.get("A"), "A count after standby recovery");
+ assertEquals(1L, counts.get("B"), "B count after standby recovery");
+ assertEquals(1L, counts.get("C"), "C count after standby recovery");
+
+ // Clean up — set streams to instance 1 so tearDown handles it
+ streams = streams1Restart;
+ }
+
+ /**
+ * Regression test for KAFKA-19712 (PR #21884): after completely deleting
local state
+ * and restarting, standby tasks should not get TaskCorruptedException
during rebalance.
+ *
+ * The bug: KIP-1035 removed the OFFSET_UNKNOWN sentinel, so stores closed
with null
+ * offsets when offsets were never initialized. On the next rebalance,
initializeStoreOffsets()
+ * found null committed offset + non-empty state dir under EOS, and threw
TaskCorruptedException.
+ *
+ * The fix: re-introduced OFFSET_UNKNOWN (-4L) as a sentinel in commit(),
and translates
+ * it back to null in initializeStoreOffsets().
+ */
+ @Test
+ public void shouldNotThrowTaskCorruptedOnStandbyAfterStateWipe() throws
Exception {
+ streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
+ streamsConfig.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+
+ final File stateDir1 = TestUtils.tempDirectory("instance1-state");
+ final File stateDir2 = TestUtils.tempDirectory("instance2-state");
+
+ final Properties config1 = new Properties();
+ config1.putAll(streamsConfig);
+ config1.put(StreamsConfig.STATE_DIR_CONFIG, stateDir1.getPath());
+
+ final Properties config2 = new Properties();
+ config2.putAll(streamsConfig);
+ config2.put(StreamsConfig.STATE_DIR_CONFIG, stateDir2.getPath());
+
+ // Phase 1: start two instances, produce data, let both process and
replicate
+ final StreamsBuilder builder1 = buildCountTopology();
+ final StreamsBuilder builder2 = buildCountTopology();
+
+ final KafkaStreams streams1 = new KafkaStreams(builder1.build(),
config1);
+ final KafkaStreams streams2 = new KafkaStreams(builder2.build(),
config2);
+ streams1.cleanUp();
+ streams2.cleanUp();
+
IntegrationTestUtils.startApplicationAndWaitUntilRunning(Arrays.asList(streams1,
streams2));
+
+ final List<KeyValue<String, String>> initialRecords = Arrays.asList(
+ new KeyValue<>("A", "v1"),
+ new KeyValue<>("B", "v1"),
+ new KeyValue<>("A", "v2")
+ );
+ produceRecords(initialRecords);
+ waitForOutput(initialRecords.size());
+
+ // Phase 2: shut down instance 1, wipe its entire state, then restart.
+ closeStreams(streams1);
+ streams1.cleanUp();
+
+ // Verify that no RocksDB store directories exist.
+ final String appId =
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
+ final File stateDirForStreamsOne = new File(stateDir1, appId);
+ final List<File> storeDirsAfterCleanup =
RocksDBStoreTestingUtils.findAllStoreDirs(
+ stateDir1, appId, STORE_NAME);
+ assertTrue(storeDirsAfterCleanup.isEmpty(),
+ "No store directories should exist after cleanUp, but found: " +
storeDirsAfterCleanup);
+
+ final StreamsBuilder builder1Restart = buildCountTopology();
+ final KafkaStreams streams1Restart = new
KafkaStreams(builder1Restart.build(), config1);
+
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams1Restart);
+
+ // Wait for instance 1 to have standby tasks
+ TestUtils.waitForCondition(() ->
+ streams1Restart.metadataForLocalThreads().stream()
+ .anyMatch(t -> !t.standbyTasks().isEmpty()),
+ 60_000, "Instance 1 should have standby tasks after restart");
+
+ // Verify that store directories now exist for the standby tasks —
+ // these were freshly created from changelog restoration, not carried
over.
+ final Set<TopicPartition> standbyPartitions = new HashSet<>();
+ for (final ThreadMetadata threadMd :
streams1Restart.metadataForLocalThreads()) {
+ for (final TaskMetadata taskMd : threadMd.standbyTasks()) {
+ standbyPartitions.addAll(taskMd.topicPartitions());
+ }
+ }
+ assertFalse(standbyPartitions.isEmpty(),
+ "Instance 1 should have standby partitions after restart");
+ for (final TopicPartition tp : standbyPartitions) {
+ final File storeDir = new File(stateDirForStreamsOne, "0_" +
tp.partition() + "/rocksdb/" + STORE_NAME);
+ assertTrue(storeDir.exists(),
+ "Standby store directory should exist after changelog restore:
" + storeDir);
+ }
+
+ // Phase 3: trigger a rebalance by shutting down instance 2 and
restarting it.
+ // Before the fix, the standby tasks on instance 1 would throw
+ // TaskCorruptedException during the rebalance when re-initializing
store offsets.
+ closeStreams(streams2);
+
+ final StreamsBuilder builder2Restart = buildCountTopology();
+ final KafkaStreams streams2Restart = new
KafkaStreams(builder2Restart.build(), config2);
+
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams2Restart);
+ // streams1Restart is already running — just wait for it to stabilize
after rebalance
+ TestUtils.waitForCondition(
+ () -> streams1Restart.state() == KafkaStreams.State.RUNNING,
+ 60_000, "Instance 1 should return to RUNNING after rebalance");
+
+ // Phase 4: verify processing still works after rebalance
+ final List<KeyValue<String, String>> additionalRecords = Arrays.asList(
+ new KeyValue<>("A", "v3"),
+ new KeyValue<>("C", "v1")
+ );
+ produceRecords(additionalRecords);
+
+ final List<KeyValue<String, Long>> allOutput =
waitForOutput(initialRecords.size() + additionalRecords.size());
+ final Map<String, Long> counts = latestCountsFromOutput(allOutput);
+
+ assertEquals(3L, counts.get("A"), "A count after state wipe and
rebalance");
+ assertEquals(1L, counts.get("B"), "B count after state wipe and
rebalance");
+ assertEquals(1L, counts.get("C"), "C count after state wipe and
rebalance");
+
+ // Clean up
+ closeStreams(streams2Restart);
+ streams = streams1Restart;
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTestingUtils.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTestingUtils.java
new file mode 100644
index 00000000000..79871b980c9
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTestingUtils.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Test utility for working with RocksDB including changing column family
state to simulate
+ * store corruption scenarios (e.g., unclean shutdown).
+ */
+public final class RocksDBStoreTestingUtils {
+
+ private static final StringSerializer STRING_SERIALIZER = new
StringSerializer();
+ private static final byte[] OFFSETS_COLUMN_FAMILY_NAME =
"offsets".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] STATUS_KEY = STRING_SERIALIZER.serialize(null,
"status");
+ private static final byte[] OPEN_STATE =
Serdes.Long().serializer().serialize(null, 1L);
+ private static final byte[] POSITION_KEY =
STRING_SERIALIZER.serialize(null, "position");
+
+ private RocksDBStoreTestingUtils() {
+ }
+
+ /**
+ * Overwrites the store status key to 1L (open), simulating an unclean
shutdown.
+ *
+ * @param dbDir the RocksDB store directory
+ */
+ public static void setStoreStatusToOpen(final File dbDir) throws
RocksDBException {
+ try (final DBOptions dbOptions = new DBOptions();
+ final ColumnFamilyOptions cfOptions = new ColumnFamilyOptions()) {
+
+ final List<ColumnFamilyDescriptor> cfDescriptors =
listCfDescriptors(dbDir, cfOptions);
+ final List<ColumnFamilyHandle> cfHandles = new
ArrayList<>(cfDescriptors.size());
+ try (final RocksDB db = RocksDB.open(dbOptions,
dbDir.getAbsolutePath(), cfDescriptors, cfHandles)) {
+ final ColumnFamilyHandle offsetsCf = findOffsetsCf(cfHandles,
cfDescriptors);
+ db.put(offsetsCf, STATUS_KEY, OPEN_STATE);
+ } finally {
+ cfHandles.forEach(ColumnFamilyHandle::close);
+ }
+ }
+ }
+
+ /**
+ * Deletes all offset entries from the offsets column family, keeping only
the status key.
+ *
+ * @param dbDir the RocksDB store directory
+ */
+ public static void deleteOffsets(final File dbDir) throws RocksDBException
{
+ try (final DBOptions dbOptions = new DBOptions();
+ final ColumnFamilyOptions cfOptions = new ColumnFamilyOptions()) {
+
+ final List<ColumnFamilyDescriptor> cfDescriptors =
listCfDescriptors(dbDir, cfOptions);
+ final List<ColumnFamilyHandle> cfHandles = new
ArrayList<>(cfDescriptors.size());
+ try (final RocksDB db = RocksDB.open(dbOptions,
dbDir.getAbsolutePath(), cfDescriptors, cfHandles)) {
+ final ColumnFamilyHandle offsetsCf = findOffsetsCf(cfHandles,
cfDescriptors);
+
+ try (final org.rocksdb.RocksIterator iter =
db.newIterator(offsetsCf)) {
+ iter.seekToFirst();
+ while (iter.isValid()) {
+ final byte[] key = iter.key();
+ if (!Arrays.equals(key, STATUS_KEY)) {
+ db.delete(offsetsCf, key);
+ }
+ iter.next();
+ }
+ }
+ } finally {
+ cfHandles.forEach(ColumnFamilyHandle::close);
+ }
+ }
+ }
+
+ /**
+ * Reads the store status from the offsets column family.
+ *
+ * @param dbDir the RocksDB store directory
+ * @return the status value (0L = closed, 1L = open), or null if no status
key exists
+ */
+ public static Long readStoreStatus(final File dbDir) throws
RocksDBException {
+ try (final DBOptions dbOptions = new DBOptions();
+ final ColumnFamilyOptions cfOptions = new ColumnFamilyOptions()) {
+
+ final List<ColumnFamilyDescriptor> cfDescriptors =
listCfDescriptors(dbDir, cfOptions);
+ final List<ColumnFamilyHandle> cfHandles = new
ArrayList<>(cfDescriptors.size());
+ try (final RocksDB db = RocksDB.open(dbOptions,
dbDir.getAbsolutePath(), cfDescriptors, cfHandles)) {
+ final ColumnFamilyHandle offsetsCf = findOffsetsCf(cfHandles,
cfDescriptors);
+ final byte[] valueBytes = db.get(offsetsCf, STATUS_KEY);
+ if (valueBytes != null) {
+ return Serdes.Long().deserializer().deserialize(null,
valueBytes);
+ }
+ return null;
+ } finally {
+ cfHandles.forEach(ColumnFamilyHandle::close);
+ }
+ }
+ }
+
+ /**
+ * Reads all offset entries from the offsets column family, excluding the
status and position keys.
+ * Keys are TopicPartition.toString() values, values are committed offsets.
+ *
+ * @param dbDir the RocksDB store directory
+ * @return a map of partition string to committed offset
+ */
+ public static Map<String, Long> readOffsets(final File dbDir) throws
RocksDBException {
+ try (final DBOptions dbOptions = new DBOptions();
+ final ColumnFamilyOptions cfOptions = new ColumnFamilyOptions()) {
+
+ final List<ColumnFamilyDescriptor> cfDescriptors =
listCfDescriptors(dbDir, cfOptions);
+ final List<ColumnFamilyHandle> cfHandles = new
ArrayList<>(cfDescriptors.size());
+ try (final RocksDB db = RocksDB.open(dbOptions,
dbDir.getAbsolutePath(), cfDescriptors, cfHandles)) {
+ final ColumnFamilyHandle offsetsCf = findOffsetsCf(cfHandles,
cfDescriptors);
+ final Map<String, Long> offsets = new HashMap<>();
+
+ try (final org.rocksdb.RocksIterator iter =
db.newIterator(offsetsCf)) {
+ iter.seekToFirst();
+ while (iter.isValid()) {
+ final byte[] key = iter.key();
+ if (!Arrays.equals(key, STATUS_KEY) &&
!Arrays.equals(key, POSITION_KEY)) {
+ final String partition = new String(key,
StandardCharsets.UTF_8);
+ final Long offset =
Serdes.Long().deserializer().deserialize(null, iter.value());
+ offsets.put(partition, offset);
+ }
+ iter.next();
+ }
+ }
+ return offsets;
+ } finally {
+ cfHandles.forEach(ColumnFamilyHandle::close);
+ }
+ }
+ }
+
+ /**
+ * Finds all RocksDB store directories for the given store name across all
task directories.
+ * Returns an empty list if no task directories exist.
+ *
+ * @param stateDir the root state directory
+ * @param appId the application ID
+ * @param storeName the store name
+ * @return list of store directories
+ */
+ public static List<File> findAllStoreDirs(final File stateDir, final
String appId, final String storeName) {
+ final File appDir = new File(stateDir, appId);
+ final File[] taskDirs = appDir.listFiles(file ->
+ file.isDirectory() && !file.getName().startsWith("."));
+
+ if (taskDirs == null || taskDirs.length == 0) {
+ return Collections.emptyList();
+ }
+
+ final List<File> storeDirs = new ArrayList<>();
+ for (final File taskDir : taskDirs) {
+ final File storeDir2 = Paths.get(taskDir.getAbsolutePath(),
"rocksdb", storeName).toFile();
+ if (storeDir2.exists()) {
+ storeDirs.add(storeDir2);
+ }
+ }
+
+ if (storeDirs.isEmpty()) {
+ throw new IllegalStateException("No store directories for '" +
storeName + "' found under " + appDir);
+ }
+ return storeDirs;
+ }
+
+ private static List<ColumnFamilyDescriptor> listCfDescriptors(final File
dbDir,
+ final
ColumnFamilyOptions cfOptions) throws RocksDBException {
+ return RocksDB.listColumnFamilies(new Options(),
dbDir.getAbsolutePath())
+ .stream()
+ .map(name -> new ColumnFamilyDescriptor(name, cfOptions))
+ .collect(Collectors.toList());
+ }
+
+ private static ColumnFamilyHandle findOffsetsCf(final
List<ColumnFamilyHandle> handles,
+ final
List<ColumnFamilyDescriptor> descriptors) {
+ for (int i = 0; i < descriptors.size(); i++) {
+ if (Arrays.equals(descriptors.get(i).getName(),
OFFSETS_COLUMN_FAMILY_NAME)) {
+ return handles.get(i);
+ }
+ }
+ throw new IllegalStateException("Offsets column family not found in
RocksDB store");
+ }
+}