This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0fd83279203 KAFKA-20315: Add tests for single in-flight poll events in 
Async and Share consumers (#21800)
0fd83279203 is described below

commit 0fd832792036fddcc2ab8d1ca76d694e0d152c5c
Author: nileshkumar3 <[email protected]>
AuthorDate: Tue Mar 31 01:57:39 2026 -0500

    KAFKA-20315: Add tests for single in-flight poll events in Async and Share 
consumers (#21800)
    
    This PR adds unit test coverage for the single in-flight poll event
    behavior in AsyncKafkaConsumer and ShareConsumer.
    
    Both consumers are expected to submit at most one poll event
    (AsyncPollEvent or SharePollEvent) while the previous event is still
    in-flight. Repeated poll() iterations (e.g., due to empty fetches) must
    not enqueue additional events until the current one completes.
    
    Reviewers: Sanskar Jhajharia <[email protected]>, Andrew Schofield
     <[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumerTest.java | 48 ++++++++++++++++++++++
 .../consumer/internals/ShareConsumerImplTest.java  | 32 +++++++++++++++
 2 files changed, 80 insertions(+)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index b3086caba56..95f474690a1 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -150,6 +150,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.clearInvocations;
@@ -160,6 +161,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -1684,6 +1686,52 @@ public class AsyncKafkaConsumerTest {
         assertEquals(partitions, consumer.assignment());
     }
 
+    /**
+     * Verifies that at most one {@link AsyncPollEvent} is in-flight at a 
time. When {@code poll()} runs
+     * multiple loop iterations (e.g. empty fetches), it must not enqueue a 
new event while the previous
+     * one is still in-flight. This prevents unnecessary queueing. See 
KAFKA-20315.
+     */
+    @Test
+    public void testPollDoesNotAddNewAsyncPollEventWhenOneIsAlreadyInFlight() {
+        // Build a consumer with a mocked FetchBuffer so we can 
deterministically advance MockTime and avoid
+        // tight spinning while poll() waits.
+        FetchBuffer fetchBuffer = mock(FetchBuffer.class);
+        ConsumerInterceptors<String, String> interceptors = 
mock(ConsumerInterceptors.class);
+        ConsumerRebalanceListenerInvoker rebalanceListenerInvoker = 
mock(ConsumerRebalanceListenerInvoker.class);
+        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        consumer = newConsumer(fetchBuffer, interceptors, 
rebalanceListenerInvoker, subscriptions);
+
+        final String topicName = "topic1";
+        final TopicPartition tp = new TopicPartition(topicName, 0);
+
+        // Satisfy poll() preconditions without needing assign() (which would 
require stubbing addAndGet()).
+        subscriptions.assignFromUser(singleton(tp));
+        subscriptions.seek(tp, 0);
+
+        // Make pollForFetches() "wait" by advancing mock time.
+        doReturn(100L).when(applicationEventHandler).maximumTimeToWait();
+        doAnswer(invocation -> {
+            Timer pollTimer = invocation.getArgument(0, Timer.class);
+            ((MockTime) time).sleep(pollTimer.remainingMs());
+            return null;
+        }).when(fetchBuffer).awaitWakeup(any(Timer.class));
+
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+
+        // Leave AsyncPollEvent in-flight (do not complete it) so the next 
loop iteration sees inflightPoll != null
+        doAnswer(invocation -> 
null).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
+
+        ConsumerRecords<?, ?> result = consumer.poll(Duration.ofMillis(450));
+        assertTrue(result.isEmpty());
+
+        // Ensure we actually exercised the "wait for fetches" path (i.e., 
more than a trivial single pass).
+        verify(fetchBuffer, atLeastOnce()).awaitWakeup(any(Timer.class));
+
+        // Only one AsyncPollEvent must have been added despite multiple poll 
loop iterations.
+        verify(applicationEventHandler, 
times(1)).add(isA(AsyncPollEvent.class));
+    }
+
     /**
      * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, 
Predicate) processBackgroundEvents}
      * handles the case where the {@link Future} takes a bit of time to 
complete, but does within the timeout.
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 7f88af47e93..aef8cf4ee5b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -104,6 +104,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
@@ -826,6 +827,37 @@ public class ShareConsumerImplTest {
         
verify(applicationEventHandler).addAndGet(any(ShareAcknowledgeOnCloseEvent.class));
     }
 
+    @Test
+    public void testPollDoesNotAddNewSharePollEventWhenOneIsAlreadyInFlight() {
+        ShareFetchBuffer fetchBuffer = mock(ShareFetchBuffer.class);
+        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        consumer = newConsumer(fetchBuffer, subscriptions, "group-id", 
"client-id", "implicit");
+
+        TopicPartition tp = new TopicPartition("topic1", 0);
+        subscriptions.assignFromUser(Collections.singleton(tp));
+        subscriptions.seek(tp, 0);
+
+        // Keep pollForFetches from spinning by making it "wait" and advance 
MockTime.
+        doReturn(100L).when(applicationEventHandler).maximumTimeToWait();
+        doAnswer(invocation -> {
+            Timer pollTimer = invocation.getArgument(0, Timer.class);
+            ((MockTime) time).sleep(pollTimer.remainingMs());
+            return null;
+        }).when(fetchBuffer).awaitNotEmpty(any(Timer.class));
+
+        // Always empty fetch: forces multiple loop iterations until the 
overall poll timeout expires.
+        
doReturn(ShareFetch.empty()).when(fetchCollector).collect(any(ShareFetchBuffer.class));
+
+        ConsumerRecords<?, ?> result = consumer.poll(Duration.ofMillis(450));
+        assertTrue(result.isEmpty());
+
+        // Ensure we actually exercised the "wait for fetches" path (i.e., 
more than a trivial single pass).
+        verify(fetchBuffer, atLeastOnce()).awaitNotEmpty(any(Timer.class));
+
+        // Only one SharePollEvent must have been added despite multiple poll 
loop iterations.
+        verify(applicationEventHandler, 
times(1)).add(any(SharePollEvent.class));
+    }
+
     @ParameterizedTest
     @EnumSource(value = Errors.class, names = {"TOPIC_AUTHORIZATION_FAILED", 
"GROUP_AUTHORIZATION_FAILED", "INVALID_TOPIC_EXCEPTION"})
     public void testCloseWithBackgroundQueueErrorsAfterUnsubscribe(Errors 
error) {

Reply via email to