This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 73cb15e72c0 KAFKA-16600: Add unit test for idempotent PENDING_SHUTDOWN
state transition (#21671)
73cb15e72c0 is described below
commit 73cb15e72c0809d624ffabdf76c51d4fa8c5a046
Author: Kirk True <[email protected]>
AuthorDate: Fri Mar 27 11:11:12 2026 -0700
KAFKA-16600: Add unit test for idempotent PENDING_SHUTDOWN state transition
(#21671)
The change includes a unit test that reliably fails before the fix from
KAFKA-17379 and reliably passes afterward.
Reviewers: Matthias J. Sax <[email protected]>
---
.../org/apache/kafka/streams/KafkaStreams.java | 2 +-
.../org/apache/kafka/streams/KafkaStreamsTest.java | 70 ++++++++++++++++++++++
2 files changed, 71 insertions(+), 1 deletion(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 7adb5ac73c4..a0e407e9bb7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1585,7 +1585,7 @@ public class KafkaStreams implements AutoCloseable {
}
}
- private void closeToError() {
+ void closeToError() {
if (!setState(State.PENDING_ERROR)) {
log.info("Skipping shutdown since we are already in {}", state());
} else {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 857b53910f2..11607240eb2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -1914,6 +1914,76 @@ public class KafkaStreamsTest {
}
}
+ @Test
+ public void shouldHandleCloseAfterErrorState() throws Exception {
+ // Regression test for the race condition bug fixed by KAFKA-17379
that also fixed KAFKA-16600.
+ prepareStreams();
+ final AtomicReference<StreamThread.State> state1 =
prepareStreamThread(streamThreadOne, 1);
+ final AtomicReference<StreamThread.State> state2 =
prepareStreamThread(streamThreadTwo, 2);
+ prepareThreadState(streamThreadOne, state1);
+ prepareThreadState(streamThreadTwo, state2);
+
+ try (final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ streams.start();
+ waitForCondition(
+ () -> streams.state() == KafkaStreams.State.RUNNING,
+ "Streams never started"
+ );
+
+ final int numberOfConcurrentCloseThreads = 10;
+ final AtomicReference<Throwable> closeException = new
AtomicReference<>();
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch completionLatch = new
CountDownLatch(numberOfConcurrentCloseThreads + 1);
+
+ // Launch multiple close() threads
+ for (int i = 0; i < numberOfConcurrentCloseThreads; i++) {
+ new Thread(
+ () -> {
+ try {
+ startLatch.await();
+ streams.close(Duration.ofSeconds(10));
+ } catch (final Throwable t) {
+ closeException.compareAndSet(null, t);
+ } finally {
+ completionLatch.countDown();
+ }
+ },
+ "CloseThread-" + i
+ ).start();
+ }
+
+ // Launch error thread
+ new Thread(
+ () -> {
+ try {
+ startLatch.await();
+ streams.closeToError();
+ } catch (final Throwable t) {
+ // Ignore - this is expected to race
+ } finally {
+ completionLatch.countDown();
+ }
+ },
+ "ErrorThread"
+ ).start();
+
+ // Start the race
+ startLatch.countDown();
+
+ // Wait for completion
+ assertTrue(
+ completionLatch.await(15, TimeUnit.SECONDS),
+ "All threads should complete within timeout"
+ );
+
+ if (closeException.get() != null) {
+ // Before fix: StreamsException("Failed to shut down while in
state ERROR")
+ // After fix: No exception
+ fail("Race condition detected; close() threw exception",
closeException.get());
+ }
+ }
+ }
+
private Topology getStatefulTopology(final String inputTopic,
final String outputTopic,
final String globalTopicName,