This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7e1c9db92f2 KAFKA-20332 [2]: Handle wakeup on poll reconciliation 
check (#21997)
7e1c9db92f2 is described below

commit 7e1c9db92f2030b5e75ab79c7cef5007b35caaef
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Apr 8 12:48:39 2026 -0400

    KAFKA-20332 [2]: Handle wakeup on poll reconciliation check (#21997)
    
    Minor follow-up to ensure we handle wake-up when checking reconciliation
    future in poll.
    
    Reviewers: Andrew Schofield <[email protected]>, TengYao Chi
     <[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  3 +++
 .../consumer/internals/AsyncKafkaConsumerTest.java | 26 ++++++++++++++++++++++
 2 files changed, 29 insertions(+)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 8adc71b904b..685947c6864 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -2031,9 +2031,12 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             long timeoutMs = inflightPoll.deadlineMs() - time.milliseconds();
             if (timeoutMs > 0) {
                 try {
+                    
wakeupTrigger.setActiveTask(inflightPoll.reconciliationCheckFuture());
                     
ConsumerUtils.getResult(inflightPoll.reconciliationCheckFuture(), timeoutMs);
                 } catch (TimeoutException e) {
                     return Fetch.empty();
+                } finally {
+                    wakeupTrigger.clearTask();
                 }
             } else {
                 // No time to wait and reconciliation check not complete
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 327f1b25afc..61b5b7cfef4 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -479,6 +479,32 @@ public class AsyncKafkaConsumerTest {
         assertThrows(WakeupException.class, () -> 
consumer.poll(Duration.ZERO));
     }
 
+    @Test
+    public void testWakeupWhileWaitingOnReconciliationCheck() {
+        FetchBuffer fetchBuffer = mock(FetchBuffer.class);
+        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        consumer = newConsumer(fetchBuffer, mock(ConsumerInterceptors.class),
+            mock(ConsumerRebalanceListenerInvoker.class), subscriptions);
+
+        final TopicPartition tp = new TopicPartition("topic1", 0);
+        subscriptions.assignFromUser(singleton(tp));
+        subscriptions.seek(tp, 0);
+
+        // Do not complete the AsyncPollEvent and call wakeup().
+        // The call to poll should throw WakeupException without blocking for 
the full timeout.
+        doAnswer(invocation -> {
+            consumer.wakeup();
+            return null;
+        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
+        long startTime = System.currentTimeMillis();
+        assertThrows(WakeupException.class, () -> 
consumer.poll(Duration.ofMillis(1500)));
+        long elapsed = System.currentTimeMillis() - startTime;
+
+        assertTrue(elapsed < 500, "Wakeup should interrupt promptly, took " + 
elapsed + "ms");
+    }
+
     @Test
     public void testCommitInRebalanceCallback() {
         consumer = newConsumer();

Reply via email to