This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 73cb15e72c0 KAFKA-16600: Add unit test for idempotent PENDING_SHUTDOWN 
state transition (#21671)
73cb15e72c0 is described below

commit 73cb15e72c0809d624ffabdf76c51d4fa8c5a046
Author: Kirk True <[email protected]>
AuthorDate: Fri Mar 27 11:11:12 2026 -0700

    KAFKA-16600: Add unit test for idempotent PENDING_SHUTDOWN state transition 
(#21671)
    
    The change includes a unit test that reliably fails before the fix from
    KAFKA-17379 and reliably passes afterward.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |  2 +-
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 70 ++++++++++++++++++++++
 2 files changed, 71 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 7adb5ac73c4..a0e407e9bb7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1585,7 +1585,7 @@ public class KafkaStreams implements AutoCloseable {
         }
     }
 
-    private void closeToError() {
+    void closeToError() {
         if (!setState(State.PENDING_ERROR)) {
             log.info("Skipping shutdown since we are already in {}", state());
         } else {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 857b53910f2..11607240eb2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -1914,6 +1914,76 @@ public class KafkaStreamsTest {
         }
     }
 
+    @Test
+    public void shouldHandleCloseAfterErrorState() throws Exception {
+        // Regression test for the race condition bug fixed by KAFKA-17379 
that also fixed KAFKA-16600.
+        prepareStreams();
+        final AtomicReference<StreamThread.State> state1 = 
prepareStreamThread(streamThreadOne, 1);
+        final AtomicReference<StreamThread.State> state2 = 
prepareStreamThread(streamThreadTwo, 2);
+        prepareThreadState(streamThreadOne, state1);
+        prepareThreadState(streamThreadTwo, state2);
+
+        try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.start();
+            waitForCondition(
+                () -> streams.state() == KafkaStreams.State.RUNNING,
+                "Streams never started"
+            );
+
+            final int numberOfConcurrentCloseThreads = 10;
+            final AtomicReference<Throwable> closeException = new 
AtomicReference<>();
+            final CountDownLatch startLatch = new CountDownLatch(1);
+            final CountDownLatch completionLatch = new 
CountDownLatch(numberOfConcurrentCloseThreads + 1);
+
+            // Launch multiple close() threads
+            for (int i = 0; i < numberOfConcurrentCloseThreads; i++) {
+                new Thread(
+                    () -> {
+                        try {
+                            startLatch.await();
+                            streams.close(Duration.ofSeconds(10));
+                        } catch (final Throwable t) {
+                            closeException.compareAndSet(null, t);
+                        } finally {
+                            completionLatch.countDown();
+                        }
+                    },
+                    "CloseThread-" + i
+                ).start();
+            }
+
+            // Launch error thread
+            new Thread(
+                () -> {
+                    try {
+                        startLatch.await();
+                        streams.closeToError();
+                    } catch (final Throwable t) {
+                        // Ignore - this is expected to race
+                    } finally {
+                        completionLatch.countDown();
+                    }
+                },
+                "ErrorThread"
+            ).start();
+
+            // Start the race
+            startLatch.countDown();
+
+            // Wait for completion
+            assertTrue(
+                completionLatch.await(15, TimeUnit.SECONDS),
+                "All threads should complete within timeout"
+            );
+
+            if (closeException.get() != null) {
+                // Before fix: StreamsException("Failed to shut down while in 
state ERROR")
+                // After fix:  No exception
+                fail("Race condition detected; close() threw exception", 
closeException.get());
+            }
+        }
+    }
+
     private Topology getStatefulTopology(final String inputTopic,
                                          final String outputTopic,
                                          final String globalTopicName,

Reply via email to