This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new e1a062cc074 KAFKA-20426 Using both group.id and assign() causes a busy 
loop in AsyncKafkaConsumer (#22173)
e1a062cc074 is described below

commit e1a062cc074cb111197031c0a4355596d830386c
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Fri May 1 02:16:27 2026 +0800

    KAFKA-20426 Using both group.id and assign() causes a busy loop in 
AsyncKafkaConsumer (#22173)
    
    When a consumer uses manual assignment (assign()) instead of group
    subscription, the member remains in the UNSUBSCRIBED state. In this
    state, heartbeats are skipped.
    
    Previously, because the heartbeat interval was initialized to 0, the
    `maximumTimeToWait` calculation would return 0 when heartbeats were
    skipped. This caused `pollForFetches` to return immediately and enter a
    busy-loop, consuming excessive CPU.
    
    This patch fixes the issue by ensuring `maximumTimeToWait` returns
    Long.MAX_VALUE whenever    `shouldSkipHeartbeat()` is true.
    
    Reviewers: Jiayao Sun <[email protected]>, Kirk True
     <[email protected]>, Lianet Magrans <[email protected]>, Chia-Ping
     Tsai <[email protected]>
---
 .../consumer/PlaintextConsumerFetchTest.java       | 10 ++--
 .../internals/AbstractHeartbeatRequestManager.java |  9 +++-
 .../consumer/internals/AsyncKafkaConsumerTest.java | 56 ++++++++++++++++++++++
 .../ConsumerHeartbeatRequestManagerTest.java       | 27 +++++++++++
 .../kafka/api/AuthorizerIntegrationTest.scala      | 14 +++++-
 5 files changed, 110 insertions(+), 6 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerFetchTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerFetchTest.java
index e65dd0b7ed9..97e5f93d34c 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerFetchTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerFetchTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ExecutionException;
 import static org.apache.kafka.clients.ClientsTestUtils.awaitAssignment;
 import static 
org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
 import static org.apache.kafka.clients.ClientsTestUtils.consumeRecords;
+import static org.apache.kafka.clients.ClientsTestUtils.pollUntilTrue;
 import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.FETCH_MAX_BYTES_CONFIG;
@@ -55,7 +56,6 @@ import static 
org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @ClusterTestDefaults(
     types = {Type.KRAFT},
@@ -174,14 +174,18 @@ public class PlaintextConsumerFetchTest {
             sendRecords(producer, tp, totalRecords, startingTimestamp);
             consumer.assign(List.of(tp));
             consumer.seek(tp, 0);
-            
+
             // consume some, but not all the records
             consumeAndVerifyRecords(consumer, tp, totalRecords / 2, 0);
             // seek to out of range position
             var outOfRangePos = totalRecords + 17; // arbitrary, much higher 
offset
             consumer.seek(tp, outOfRangePos);
             // assert that poll resets to the ending position
-            assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty());
+            // use pollUntilTrue because AsyncConsumer with manual assignment 
sends fewer
+            // fetch requests per poll compared to the fix before KAFKA-20426, 
so the
+            // offset reset from out-of-range may not complete in a single 
poll() call
+            pollUntilTrue(consumer, () -> consumer.position(tp) == 
totalRecords,
+                "Consumer position should advance to the latest end offset " + 
totalRecords);
             sendRecords(producer, tp, totalRecords, totalRecords);
             var nextRecord = 
consumer.poll(Duration.ofMillis(50)).iterator().next();
             // ensure the seek went to the last known record at the time of 
the previous poll
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
index 4f47c859114..8392e5032f6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
@@ -245,12 +245,17 @@ public abstract class AbstractHeartbeatRequestManager<R 
extends AbstractResponse
      * <p>Similarly, we may have to unblock the application thread to send a 
{@link AsyncPollEvent} to make sure
      * our poll timer will not expire while we are polling.
      *
-     * <p>In the event that heartbeats are currently being skipped, this still 
returns the next heartbeat
-     * delay rather than {@code Long.MAX_VALUE} so that the application thread 
remains responsive.
+     * <p>When the member is {@link MemberState#UNSUBSCRIBED} (for example, 
with manual assignment),
+     * this returns {@code Long.MAX_VALUE} to indicate there is no next 
heartbeat to wait for,
+     * allowing the application thread to block for the full user-specified 
poll timeout rather than
+     * spinning in a busy loop.
      */
     @Override
     public long maximumTimeToWait(long currentTimeMs) {
         pollTimer.update(currentTimeMs);
+        if (membershipManager().state() == MemberState.UNSUBSCRIBED) {
+            return Long.MAX_VALUE;
+        }
         if (pollTimer.isExpired() || (membershipManager().shouldHeartbeatNow() 
&& !heartbeatRequestState.requestInFlight())) {
             return 0L;
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 0966e97afac..a5b825b5d0b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -163,6 +163,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -1773,6 +1774,61 @@ public class AsyncKafkaConsumerTest {
         assertEquals(partitions, consumer.assignment());
     }
 
+    /**
+     * Verifies that with manual partition assignment, poll() blocks for
+     * the full user-supplied timeout instead of spinning in a busy loop.
+     * Before the fix, AbstractHeartbeatRequestManager.maximumTimeToWait() 
returned 0
+     * while the membership state was UNSUBSCRIBED (the state used for manual 
assignment),
+     * causing pollForFetches() to call fetchBuffer.awaitWakeup() with a 
0-timeout timer
+     * and re-enter the poll loop immediately. After KAFKA-20426, 
maximumTimeToWait() returns
+     * Long.MAX_VALUE in that state so the application thread can block for 
the full timeout.
+     */
+    @Test
+    public void testPollWithManualAssignmentDoesNotBusyLoop() {
+        FetchBuffer fetchBuffer = mock(FetchBuffer.class);
+        ConsumerInterceptors<String, String> interceptors = 
mock(ConsumerInterceptors.class);
+        ConsumerRebalanceListenerInvoker rebalanceListenerInvoker = 
mock(ConsumerRebalanceListenerInvoker.class);
+        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        consumer = newConsumer(fetchBuffer, interceptors, 
rebalanceListenerInvoker, subscriptions);
+
+        final TopicPartition tp = new TopicPartition("topic1", 0);
+
+        // Manual assignment with valid position so pollForFetches() does not 
shrink pollTimeout to retryBackoffMs.
+        subscriptions.assignFromUser(singleton(tp));
+        subscriptions.seek(tp, 0);
+
+        // Simulate the FIXED behavior of 
AbstractHeartbeatRequestManager#maximumTimeToWait() when the
+        // membership state is UNSUBSCRIBED, i.e. user called assign().
+        
doReturn(Long.MAX_VALUE).when(applicationEventHandler).maximumTimeToWait();
+
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+
+        // Capture the Timer passed to awaitWakeup so we can assert it was 
given the full
+        // poll timeout, i.e. no busy loop. Also advance mock time by the 
timer's remaining ms so the
+        // outer poll timer expires after a single wait.
+        AtomicReference<Long> awaitTimerInitialMs = new AtomicReference<>();
+        doAnswer(invocation -> {
+            Timer pollTimer = invocation.getArgument(0, Timer.class);
+            awaitTimerInitialMs.compareAndSet(null, pollTimer.remainingMs());
+            time.sleep(pollTimer.remainingMs());
+            pollTimer.update();
+            return null;
+        }).when(fetchBuffer).awaitWakeup(any(Timer.class));
+
+        final long pollTimeoutMs = 500;
+        consumer.poll(Duration.ofMillis(pollTimeoutMs));
+
+        // The poll timer passed to awaitWakeup must have been set to the full 
user timeout,
+        // proving pollTimeout was NOT clamped to 0 (busy loop) by 
maximumTimeToWait().
+        assertNotNull(awaitTimerInitialMs.get(), "fetchBuffer.awaitWakeup was 
never called");
+        assertEquals(pollTimeoutMs, awaitTimerInitialMs.get(),
+            "Expected poll wait timer to use the full user timeout (no busy 
loop), but was " + awaitTimerInitialMs.get());
+
+        // Only a single wait cycle should have happened
+        verify(fetchBuffer, times(1)).awaitWakeup(any(Timer.class));
+    }
+
     /**
      * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, 
Predicate, boolean) processBackgroundEvents}
      * handles the case where the {@link Future} takes a bit of time to 
complete, but does within the timeout.
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
index 3ef1d712c96..906ae79518f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
@@ -314,6 +314,33 @@ public class ConsumerHeartbeatRequestManagerTest {
         }
     }
 
+    /**
+     * When the consumer uses manual partition assignment (assign()) instead 
of subscribe(), the
+     * member stays in UNSUBSCRIBED state indefinitely. Because heartbeats are 
skipped in that
+     * state and heartbeatIntervalMs initialises to 0, maximumTimeToWait used 
to return 0, causing
+     * a busy-loop in pollForFetches. Verify that maximumTimeToWait returns 
Long.MAX_VALUE whenever
+     * the member is in UNSUBSCRIBED state so the application thread can block 
for the full poll
+     * timeout.
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testMaximumTimeToWaitWhenHeartbeatShouldBeSkipped(final 
boolean isUnsubscribed) {
+        // Start with zero heartbeat interval (simulates the initial state 
before any HB response)
+        createHeartbeatRequestStateWithZeroHeartbeatInterval();
+        when(membershipManager.state()).thenReturn(isUnsubscribed ? 
MemberState.UNSUBSCRIBED : MemberState.JOINING);
+
+        long result = 
heartbeatRequestManager.maximumTimeToWait(time.milliseconds());
+
+        if (isUnsubscribed) {
+            assertEquals(Long.MAX_VALUE, result,
+                "maximumTimeToWait should return Long.MAX_VALUE when in 
UNSUBSCRIBED state " +
+                    "(e.g., manual assignment) to prevent a busy loop");
+        } else {
+            assertEquals(0, result,
+                "maximumTimeToWait should return 0 when heartbeat interval 
timer has already expired");
+        }
+    }
+
     @Test
     public void testTimerNotDue() {
         time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat 
should be sent
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e2ac0e1433c..30e43a95397 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1669,7 +1669,19 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     addAndVerifyAcls(acls, resource)
 
     waitUntilTrue(() => {
-      consumer.poll(Duration.ofMillis(50L))
+      try {
+        consumer.poll(Duration.ofMillis(50L))
+      } catch {
+        // In AsyncKafkaConsumer the TOPIC_AUTHORIZATION_FAILED error from the 
1st metadata
+        // response is queued by the background thread and only delivered to 
the app thread on
+        // the next poll(), so assertThrows returns one full poll-timeout 
(~50ms) later than
+        // in the fix before KAFKA-20426. By then most of the 100ms retry 
backoff (retry.backoff.ms)
+        // has already elapsed, and the 2nd metadata request can fire just 
milliseconds after
+        // addAndVerifyAcls returns — potentially before the CREATE ACL has 
been committed on
+        // the broker. That produces another TOPIC_AUTHORIZATION_FAILED which 
is delivered on
+        // the first poll() in waitUntilTrue. Swallow it so the loop keeps 
retrying.
+        case _: TopicAuthorizationException =>
+      }
       brokers.forall { broker =>
         
OptionConverters.toScala(broker.metadataCache.getLeaderAndIsr(newTopic, 0)) 
match {
           case Some(partitionState) => 
FetchRequest.isValidBrokerId(partitionState.leader)

Reply via email to