This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 5d6248c4486 KAFKA-20332 [2]: Handle wakeup on poll reconciliation
check (#21997)
5d6248c4486 is described below
commit 5d6248c4486f48471849caf09861b02db5009cbd
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Apr 8 12:48:39 2026 -0400
KAFKA-20332 [2]: Handle wakeup on poll reconciliation check (#21997)
Minor follow-up to ensure we handle wake-up when checking reconciliation
future in poll.
Reviewers: Andrew Schofield <[email protected]>, TengYao Chi
<[email protected]>
---
.../consumer/internals/AsyncKafkaConsumer.java | 3 +++
.../consumer/internals/AsyncKafkaConsumerTest.java | 26 ++++++++++++++++++++++
2 files changed, 29 insertions(+)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index d4657e84ed1..c0b17a70984 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -2020,9 +2020,12 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
long timeoutMs = inflightPoll.deadlineMs() - time.milliseconds();
if (timeoutMs > 0) {
try {
+
wakeupTrigger.setActiveTask(inflightPoll.reconciliationCheckFuture());
ConsumerUtils.getResult(inflightPoll.reconciliationCheckFuture(), timeoutMs);
} catch (TimeoutException e) {
return Fetch.empty();
+ } finally {
+ wakeupTrigger.clearTask();
}
} else {
// No time to wait and reconciliation check not complete
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index b49b2c007f3..1c7d3bcf4df 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -474,6 +474,32 @@ public class AsyncKafkaConsumerTest {
assertThrows(WakeupException.class, () ->
consumer.poll(Duration.ZERO));
}
+ @Test
+ public void testWakeupWhileWaitingOnReconciliationCheck() {
+ FetchBuffer fetchBuffer = mock(FetchBuffer.class);
+ SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
+ consumer = newConsumer(fetchBuffer, mock(ConsumerInterceptors.class),
+ mock(ConsumerRebalanceListenerInvoker.class), subscriptions);
+
+ final TopicPartition tp = new TopicPartition("topic1", 0);
+ subscriptions.assignFromUser(singleton(tp));
+ subscriptions.seek(tp, 0);
+
+ // Do not complete the AsyncPollEvent and call wakeup().
+ // The call to poll should throw WakeupException without blocking for
the full timeout.
+ doAnswer(invocation -> {
+ consumer.wakeup();
+ return null;
+
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
+ long startTime = System.currentTimeMillis();
+ assertThrows(WakeupException.class, () ->
consumer.poll(Duration.ofMillis(1500)));
+ long elapsed = System.currentTimeMillis() - startTime;
+
+ assertTrue(elapsed < 500, "Wakeup should interrupt promptly, took " +
elapsed + "ms");
+ }
+
@Test
public void testCommitInRebalanceCallback() {
consumer = newConsumer();