This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 5d6248c4486 KAFKA-20332 [2]: Handle wakeup on poll reconciliation 
check (#21997)
5d6248c4486 is described below

commit 5d6248c4486f48471849caf09861b02db5009cbd
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Apr 8 12:48:39 2026 -0400

    KAFKA-20332 [2]: Handle wakeup on poll reconciliation check (#21997)
    
    Minor follow-up to ensure we handle wake-up when checking reconciliation
    future in poll.
    
    Reviewers: Andrew Schofield <[email protected]>, TengYao Chi
     <[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  3 +++
 .../consumer/internals/AsyncKafkaConsumerTest.java | 26 ++++++++++++++++++++++
 2 files changed, 29 insertions(+)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index d4657e84ed1..c0b17a70984 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -2020,9 +2020,12 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             long timeoutMs = inflightPoll.deadlineMs() - time.milliseconds();
             if (timeoutMs > 0) {
                 try {
+                    
wakeupTrigger.setActiveTask(inflightPoll.reconciliationCheckFuture());
                     
ConsumerUtils.getResult(inflightPoll.reconciliationCheckFuture(), timeoutMs);
                 } catch (TimeoutException e) {
                     return Fetch.empty();
+                } finally {
+                    wakeupTrigger.clearTask();
                 }
             } else {
                 // No time to wait and reconciliation check not complete
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index b49b2c007f3..1c7d3bcf4df 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -474,6 +474,32 @@ public class AsyncKafkaConsumerTest {
         assertThrows(WakeupException.class, () -> 
consumer.poll(Duration.ZERO));
     }
 
+    @Test
+    public void testWakeupWhileWaitingOnReconciliationCheck() {
+        FetchBuffer fetchBuffer = mock(FetchBuffer.class);
+        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        consumer = newConsumer(fetchBuffer, mock(ConsumerInterceptors.class),
+            mock(ConsumerRebalanceListenerInvoker.class), subscriptions);
+
+        final TopicPartition tp = new TopicPartition("topic1", 0);
+        subscriptions.assignFromUser(singleton(tp));
+        subscriptions.seek(tp, 0);
+
+        // Do not complete the AsyncPollEvent and call wakeup().
+        // The call to poll should throw WakeupException without blocking for 
the full timeout.
+        doAnswer(invocation -> {
+            consumer.wakeup();
+            return null;
+        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
+        long startTime = System.currentTimeMillis();
+        assertThrows(WakeupException.class, () -> 
consumer.poll(Duration.ofMillis(1500)));
+        long elapsed = System.currentTimeMillis() - startTime;
+
+        assertTrue(elapsed < 500, "Wakeup should interrupt promptly, took " + 
elapsed + "ms");
+    }
+
     @Test
     public void testCommitInRebalanceCallback() {
         consumer = newConsumer();

Reply via email to