This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 44bafc60e72 KAFKA-20426 Using both group.id and assign() causes a busy
loop in AsyncKafkaConsumer (#22018)
44bafc60e72 is described below
commit 44bafc60e720830a115a1dfb41e0f5ee271c0f5f
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Tue Apr 28 21:23:59 2026 +0800
KAFKA-20426 Using both group.id and assign() causes a busy loop in
AsyncKafkaConsumer (#22018)
When a consumer uses manual assignment (assign()) instead of group
subscription, the member remains in the UNSUBSCRIBED state. In this
state, heartbeats are skipped.
Previously, because the heartbeat interval was initialized to 0, the
`maximumTimeToWait` calculation would return 0 when heartbeats were
skipped. This caused `pollForFetches` to return immediately and enter a
busy-loop, consuming excessive CPU.
This patch fixes the issue by ensuring `maximumTimeToWait` returns
Long.MAX_VALUE whenever `shouldSkipHeartbeat()` is true.
Reviewers: Jiayao Sun <[email protected]>, Kirk True
<[email protected]>, Lianet Magrans <[email protected]>, Chia-Ping
Tsai <[email protected]>
---
.../consumer/PlaintextConsumerFetchTest.java | 10 ++--
.../internals/AbstractHeartbeatRequestManager.java | 9 +++-
.../consumer/internals/AsyncKafkaConsumerTest.java | 55 ++++++++++++++++++++++
.../ConsumerHeartbeatRequestManagerTest.java | 27 +++++++++++
.../kafka/api/AuthorizerIntegrationTest.scala | 14 +++++-
5 files changed, 109 insertions(+), 6 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerFetchTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerFetchTest.java
index e65dd0b7ed9..97e5f93d34c 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerFetchTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerFetchTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ExecutionException;
import static org.apache.kafka.clients.ClientsTestUtils.awaitAssignment;
import static
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
import static org.apache.kafka.clients.ClientsTestUtils.consumeRecords;
+import static org.apache.kafka.clients.ClientsTestUtils.pollUntilTrue;
import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.FETCH_MAX_BYTES_CONFIG;
@@ -55,7 +56,6 @@ import static
org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
@ClusterTestDefaults(
types = {Type.KRAFT},
@@ -174,14 +174,18 @@ public class PlaintextConsumerFetchTest {
sendRecords(producer, tp, totalRecords, startingTimestamp);
consumer.assign(List.of(tp));
consumer.seek(tp, 0);
-
+
// consume some, but not all the records
consumeAndVerifyRecords(consumer, tp, totalRecords / 2, 0);
// seek to out of range position
var outOfRangePos = totalRecords + 17; // arbitrary, much higher
offset
consumer.seek(tp, outOfRangePos);
// assert that poll resets to the ending position
- assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty());
+ // use pollUntilTrue because AsyncConsumer with manual assignment
sends fewer
+ // fetch requests per poll compared to the fix before KAFKA-20426,
so the
+ // offset reset from out-of-range may not complete in a single
poll() call
+ pollUntilTrue(consumer, () -> consumer.position(tp) ==
totalRecords,
+ "Consumer position should advance to the latest end offset " +
totalRecords);
sendRecords(producer, tp, totalRecords, totalRecords);
var nextRecord =
consumer.poll(Duration.ofMillis(50)).iterator().next();
// ensure the seek went to the last known record at the time of
the previous poll
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
index 4f47c859114..8392e5032f6 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
@@ -245,12 +245,17 @@ public abstract class AbstractHeartbeatRequestManager<R
extends AbstractResponse
* <p>Similarly, we may have to unblock the application thread to send a
{@link AsyncPollEvent} to make sure
* our poll timer will not expire while we are polling.
*
- * <p>In the event that heartbeats are currently being skipped, this still
returns the next heartbeat
- * delay rather than {@code Long.MAX_VALUE} so that the application thread
remains responsive.
+ * <p>When the member is {@link MemberState#UNSUBSCRIBED} (for example,
with manual assignment),
+ * this returns {@code Long.MAX_VALUE} to indicate there is no next
heartbeat to wait for,
+ * allowing the application thread to block for the full user-specified
poll timeout rather than
+ * spinning in a busy loop.
*/
@Override
public long maximumTimeToWait(long currentTimeMs) {
pollTimer.update(currentTimeMs);
+ if (membershipManager().state() == MemberState.UNSUBSCRIBED) {
+ return Long.MAX_VALUE;
+ }
if (pollTimer.isExpired() || (membershipManager().shouldHeartbeatNow()
&& !heartbeatRequestState.requestInFlight())) {
return 0L;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index a8736329e4d..e4712439494 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -1824,6 +1824,61 @@ public class AsyncKafkaConsumerTest {
verify(applicationEventHandler,
times(1)).add(isA(AsyncPollEvent.class));
}
+ /**
+ * Verifies that with manual partition assignment, poll() blocks for
+ * the full user-supplied timeout instead of spinning in a busy loop.
+ * Before the fix, AbstractHeartbeatRequestManager.maximumTimeToWait()
returned 0
+ * while the membership state was UNSUBSCRIBED (the state used for manual
assignment),
+ * causing pollForFetches() to call fetchBuffer.awaitWakeup() with a
0-timeout timer
+ * and re-enter the poll loop immediately. After KAFKA-20426,
maximumTimeToWait() returns
+ * Long.MAX_VALUE in that state so the application thread can block for
the full timeout.
+ */
+ @Test
+ public void testPollWithManualAssignmentDoesNotBusyLoop() {
+ FetchBuffer fetchBuffer = mock(FetchBuffer.class);
+ ConsumerInterceptors<String, String> interceptors =
mock(ConsumerInterceptors.class);
+ ConsumerRebalanceListenerInvoker rebalanceListenerInvoker =
mock(ConsumerRebalanceListenerInvoker.class);
+ SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
+ consumer = newConsumer(fetchBuffer, interceptors,
rebalanceListenerInvoker, subscriptions);
+
+ final TopicPartition tp = new TopicPartition("topic1", 0);
+
+ // Manual assignment with valid position so pollForFetches() does not
shrink pollTimeout to retryBackoffMs.
+ subscriptions.assignFromUser(singleton(tp));
+ subscriptions.seek(tp, 0);
+
+ // Simulate the FIXED behavior of
AbstractHeartbeatRequestManager#maximumTimeToWait() when the
+ // membership state is UNSUBSCRIBED, i.e. user called assign().
+
doReturn(Long.MAX_VALUE).when(applicationEventHandler).maximumTimeToWait();
+
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+
+ // Capture the Timer passed to awaitWakeup so we can assert it was
given the full
+ // poll timeout, i.e. no busy loop. Also advance mock time by the
timer's remaining ms so the
+ // outer poll timer expires after a single wait.
+ AtomicReference<Long> awaitTimerInitialMs = new AtomicReference<>();
+ doAnswer(invocation -> {
+ Timer pollTimer = invocation.getArgument(0, Timer.class);
+ awaitTimerInitialMs.compareAndSet(null, pollTimer.remainingMs());
+ time.sleep(pollTimer.remainingMs());
+ pollTimer.update();
+ return null;
+ }).when(fetchBuffer).awaitWakeup(any(Timer.class));
+
+ final long pollTimeoutMs = 500;
+ consumer.poll(Duration.ofMillis(pollTimeoutMs));
+
+ // The poll timer passed to awaitWakeup must have been set to the full
user timeout,
+ // proving pollTimeout was NOT clamped to 0 (busy loop) by
maximumTimeToWait().
+ assertNotNull(awaitTimerInitialMs.get(), "fetchBuffer.awaitWakeup was
never called");
+ assertEquals(pollTimeoutMs, awaitTimerInitialMs.get(),
+ "Expected poll wait timer to use the full user timeout (no busy
loop), but was " + awaitTimerInitialMs.get());
+
+ // Only a single wait cycle should have happened
+ verify(fetchBuffer, times(1)).awaitWakeup(any(Timer.class));
+ }
+
/**
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer,
Predicate, boolean) processBackgroundEvents}
* handles the case where the {@link Future} takes a bit of time to
complete, but does within the timeout.
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
index 3ef1d712c96..906ae79518f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
@@ -314,6 +314,33 @@ public class ConsumerHeartbeatRequestManagerTest {
}
}
+ /**
+ * When the consumer uses manual partition assignment (assign()) instead
of subscribe(), the
+ * member stays in UNSUBSCRIBED state indefinitely. Because heartbeats are
skipped in that
+ * state and heartbeatIntervalMs initialises to 0, maximumTimeToWait used
to return 0, causing
+ * a busy-loop in pollForFetches. Verify that maximumTimeToWait returns
Long.MAX_VALUE whenever
+ * the member is in UNSUBSCRIBED state so the application thread can block
for the full poll
+ * timeout.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testMaximumTimeToWaitWhenHeartbeatShouldBeSkipped(final
boolean isUnsubscribed) {
+ // Start with zero heartbeat interval (simulates the initial state
before any HB response)
+ createHeartbeatRequestStateWithZeroHeartbeatInterval();
+ when(membershipManager.state()).thenReturn(isUnsubscribed ?
MemberState.UNSUBSCRIBED : MemberState.JOINING);
+
+ long result =
heartbeatRequestManager.maximumTimeToWait(time.milliseconds());
+
+ if (isUnsubscribed) {
+ assertEquals(Long.MAX_VALUE, result,
+ "maximumTimeToWait should return Long.MAX_VALUE when in
UNSUBSCRIBED state " +
+ "(e.g., manual assignment) to prevent a busy loop");
+ } else {
+ assertEquals(0, result,
+ "maximumTimeToWait should return 0 when heartbeat interval
timer has already expired");
+ }
+ }
+
@Test
public void testTimerNotDue() {
time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat
should be sent
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index d5e90149052..161a6bf245d 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1669,7 +1669,19 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
addAndVerifyAcls(acls, resource)
waitUntilTrue(() => {
- consumer.poll(Duration.ofMillis(50L))
+ try {
+ consumer.poll(Duration.ofMillis(50L))
+ } catch {
+ // In AsyncKafkaConsumer the TOPIC_AUTHORIZATION_FAILED error from the
1st metadata
+ // response is queued by the background thread and only delivered to
the app thread on
+ // the next poll(), so assertThrows returns one full poll-timeout
(~50ms) later than
+ // in the fix before KAFKA-20426. By then most of the 100ms retry
backoff (retry.backoff.ms)
+ // has already elapsed, and the 2nd metadata request can fire just
milliseconds after
+ // addAndVerifyAcls returns — potentially before the CREATE ACL has
been committed on
+ // the broker. That produces another TOPIC_AUTHORIZATION_FAILED which
is delivered on
+ // the first poll() in waitUntilTrue. Swallow it so the loop keeps
retrying.
+ case _: TopicAuthorizationException =>
+ }
brokers.forall { broker =>
OptionConverters.toScala(broker.metadataCache.getLeaderAndIsr(newTopic, 0))
match {
case Some(partitionState) =>
FetchRequest.isValidBrokerId(partitionState.leader)