This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0fd83279203 KAFKA-20315: Add tests for single in-flight poll events in
Async and Share consumers (#21800)
0fd83279203 is described below
commit 0fd832792036fddcc2ab8d1ca76d694e0d152c5c
Author: nileshkumar3 <[email protected]>
AuthorDate: Tue Mar 31 01:57:39 2026 -0500
KAFKA-20315: Add tests for single in-flight poll events in Async and Share
consumers (#21800)
This PR adds unit test coverage for the single in-flight poll event
behavior in AsyncKafkaConsumer and ShareConsumer.
Both consumers are expected to submit at most one poll event
(AsyncPollEvent or SharePollEvent) while the previous event is still
in-flight. Repeated poll() iterations (e.g., due to empty fetches) must
not enqueue additional events until the current one completes.
Reviewers: Sanskar Jhajharia <[email protected]>, Andrew Schofield
<[email protected]>
---
.../consumer/internals/AsyncKafkaConsumerTest.java | 48 ++++++++++++++++++++++
.../consumer/internals/ShareConsumerImplTest.java | 32 +++++++++++++++
2 files changed, 80 insertions(+)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index b3086caba56..95f474690a1 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -150,6 +150,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.clearInvocations;
@@ -160,6 +161,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -1684,6 +1686,52 @@ public class AsyncKafkaConsumerTest {
assertEquals(partitions, consumer.assignment());
}
+ /**
+ * Verifies that at most one {@link AsyncPollEvent} is in-flight at a
time. When {@code poll()} runs
+ * multiple loop iterations (e.g. empty fetches), it must not enqueue a
new event while the previous
+ * one is still in-flight. This prevents unnecessary queueing. See
KAFKA-20315.
+ */
+ @Test
+ public void testPollDoesNotAddNewAsyncPollEventWhenOneIsAlreadyInFlight() {
+ // Build a consumer with a mocked FetchBuffer so we can
deterministically advance MockTime and avoid
+ // tight spinning while poll() waits.
+ FetchBuffer fetchBuffer = mock(FetchBuffer.class);
+ ConsumerInterceptors<String, String> interceptors =
mock(ConsumerInterceptors.class);
+ ConsumerRebalanceListenerInvoker rebalanceListenerInvoker =
mock(ConsumerRebalanceListenerInvoker.class);
+ SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
+ consumer = newConsumer(fetchBuffer, interceptors,
rebalanceListenerInvoker, subscriptions);
+
+ final String topicName = "topic1";
+ final TopicPartition tp = new TopicPartition(topicName, 0);
+
+ // Satisfy poll() preconditions without needing assign() (which would
require stubbing addAndGet()).
+ subscriptions.assignFromUser(singleton(tp));
+ subscriptions.seek(tp, 0);
+
+ // Make pollForFetches() "wait" by advancing mock time.
+ doReturn(100L).when(applicationEventHandler).maximumTimeToWait();
+ doAnswer(invocation -> {
+ Timer pollTimer = invocation.getArgument(0, Timer.class);
+ ((MockTime) time).sleep(pollTimer.remainingMs());
+ return null;
+ }).when(fetchBuffer).awaitWakeup(any(Timer.class));
+
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+
+ // Leave AsyncPollEvent in-flight (do not complete it) so the next
loop iteration sees inflightPoll != null
+ doAnswer(invocation ->
null).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
+
+ ConsumerRecords<?, ?> result = consumer.poll(Duration.ofMillis(450));
+ assertTrue(result.isEmpty());
+
+ // Ensure we actually exercised the "wait for fetches" path (i.e.,
more than a trivial single pass).
+ verify(fetchBuffer, atLeastOnce()).awaitWakeup(any(Timer.class));
+
+ // Only one AsyncPollEvent must have been added despite multiple poll
loop iterations.
+ verify(applicationEventHandler,
times(1)).add(isA(AsyncPollEvent.class));
+ }
+
/**
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer,
Predicate) processBackgroundEvents}
* handles the case where the {@link Future} takes a bit of time to
complete, but does within the timeout.
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 7f88af47e93..aef8cf4ee5b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -104,6 +104,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
@@ -826,6 +827,37 @@ public class ShareConsumerImplTest {
verify(applicationEventHandler).addAndGet(any(ShareAcknowledgeOnCloseEvent.class));
}
+ @Test
+ public void testPollDoesNotAddNewSharePollEventWhenOneIsAlreadyInFlight() {
+ ShareFetchBuffer fetchBuffer = mock(ShareFetchBuffer.class);
+ SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
+ consumer = newConsumer(fetchBuffer, subscriptions, "group-id",
"client-id", "implicit");
+
+ TopicPartition tp = new TopicPartition("topic1", 0);
+ subscriptions.assignFromUser(Collections.singleton(tp));
+ subscriptions.seek(tp, 0);
+
+ // Keep pollForFetches from spinning by making it "wait" and advance
MockTime.
+ doReturn(100L).when(applicationEventHandler).maximumTimeToWait();
+ doAnswer(invocation -> {
+ Timer pollTimer = invocation.getArgument(0, Timer.class);
+ ((MockTime) time).sleep(pollTimer.remainingMs());
+ return null;
+ }).when(fetchBuffer).awaitNotEmpty(any(Timer.class));
+
+ // Always empty fetch: forces multiple loop iterations until the
overall poll timeout expires.
+
doReturn(ShareFetch.empty()).when(fetchCollector).collect(any(ShareFetchBuffer.class));
+
+ ConsumerRecords<?, ?> result = consumer.poll(Duration.ofMillis(450));
+ assertTrue(result.isEmpty());
+
+ // Ensure we actually exercised the "wait for fetches" path (i.e.,
more than a trivial single pass).
+ verify(fetchBuffer, atLeastOnce()).awaitNotEmpty(any(Timer.class));
+
+ // Only one SharePollEvent must have been added despite multiple poll
loop iterations.
+ verify(applicationEventHandler,
times(1)).add(any(SharePollEvent.class));
+ }
+
@ParameterizedTest
@EnumSource(value = Errors.class, names = {"TOPIC_AUTHORIZATION_FAILED",
"GROUP_AUTHORIZATION_FAILED", "INVALID_TOPIC_EXCEPTION"})
public void testCloseWithBackgroundQueueErrorsAfterUnsubscribe(Errors
error) {