This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 6a50244a80d KAFKA-20332 [2]: Handle wakeup on poll reconciliation 
check (#21997)
6a50244a80d is described below

commit 6a50244a80de5c3042144a22e6bb65beff372a8d
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Apr 8 12:48:39 2026 -0400

    KAFKA-20332 [2]: Handle wakeup on poll reconciliation check (#21997)
    
    Minor follow-up to ensure we handle wake-up when checking reconciliation
    future in poll.
    
    Reviewers: Andrew Schofield <[email protected]>, TengYao Chi
     <[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  3 +++
 .../consumer/internals/AsyncKafkaConsumerTest.java | 26 ++++++++++++++++++++++
 2 files changed, 29 insertions(+)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index cc79882da23..21a947febf1 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1943,9 +1943,12 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             long timeoutMs = inflightPoll.deadlineMs() - time.milliseconds();
             if (timeoutMs > 0) {
                 try {
+                    
wakeupTrigger.setActiveTask(inflightPoll.reconciliationCheckFuture());
                     
ConsumerUtils.getResult(inflightPoll.reconciliationCheckFuture(), timeoutMs);
                 } catch (TimeoutException e) {
                     return Fetch.empty();
+                } finally {
+                    wakeupTrigger.clearTask();
                 }
             } else {
                 // No time to wait and reconciliation check not complete
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index e23110d77b8..fdbe8e82725 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -471,6 +471,32 @@ public class AsyncKafkaConsumerTest {
         assertThrows(WakeupException.class, () -> 
consumer.poll(Duration.ZERO));
     }
 
+    @Test
+    public void testWakeupWhileWaitingOnReconciliationCheck() {
+        FetchBuffer fetchBuffer = mock(FetchBuffer.class);
+        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        consumer = newConsumer(fetchBuffer, mock(ConsumerInterceptors.class),
+            mock(ConsumerRebalanceListenerInvoker.class), subscriptions);
+
+        final TopicPartition tp = new TopicPartition("topic1", 0);
+        subscriptions.assignFromUser(singleton(tp));
+        subscriptions.seek(tp, 0);
+
+        // Do not complete the AsyncPollEvent and call wakeup().
+        // The call to poll should throw WakeupException without blocking for 
the full timeout.
+        doAnswer(invocation -> {
+            consumer.wakeup();
+            return null;
+        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
+        long startTime = System.currentTimeMillis();
+        assertThrows(WakeupException.class, () -> 
consumer.poll(Duration.ofMillis(1500)));
+        long elapsed = System.currentTimeMillis() - startTime;
+
+        assertTrue(elapsed < 500, "Wakeup should interrupt promptly, took " + 
elapsed + "ms");
+    }
+
     @Test
     public void testCommitInRebalanceCallback() {
         consumer = newConsumer();

Reply via email to