This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 23aaf812917 KAFKA-20332: Fix to ensure app thread not collecting 
records for partitions being revoked (#21897)
23aaf812917 is described below

commit 23aaf8129175bf42fceb98e80a0be565ead42f9a
Author: Lianet Magrans <[email protected]>
AuthorDate: Tue Apr 7 21:22:44 2026 -0400

    KAFKA-20332: Fix to ensure app thread not collecting records for partitions 
being revoked (#21897)
    
    This addresses race conditions where the app thread could collect/return
    records for revoked partitions.
    
    Fix by ensuring that the app thread does not return buffered records if
    it hasn't checked pending reconciliations. Once it checked pending
    reconciliations, we know that partitions being revoked were marked as
    non-fetchable (so it's when we can safely move onto fetching/collecting
    in the app thread).  Also ensure that background reconciliations do not
    trigger revocations (the app thread could already have records in
    memory, collected from the buffer, for those partitions, which would
    lead to the consumer returning records for revoked partitions if the
    background completes the revocation before the app thread returns).
    
    With these fixes we are sure that the app thread only collects/returns
    records after it has marked revoked partitions as non-fetchable.
    
    This fix applies to the consumer only (share consumer remains unchanged
    with this PR,  can trigger full reconciliation & assignment update from
    the background)
    
    Reviewers: Andrew Schofield <[email protected]>, nileshkumar3
     <[email protected]>, PoAn Yang <[email protected]>, Chia-Ping Tsai
     <[email protected]>, Kirk True <[email protected]>
---
 .../internals/AbstractMembershipManager.java       | 11 ++--
 .../consumer/internals/AsyncKafkaConsumer.java     | 28 ++++++++--
 .../consumer/internals/ShareMembershipManager.java | 13 +++++
 .../events/ApplicationEventProcessor.java          |  4 ++
 .../consumer/internals/events/AsyncPollEvent.java  | 37 ++++++++++++-
 .../consumer/internals/AsyncKafkaConsumerTest.java | 62 ++++++++++++++++++++++
 6 files changed, 148 insertions(+), 7 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index c93a844ff9b..aad72574469 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -851,9 +851,6 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
             return;
         }
 
-        if (autoCommitEnabled && !canCommit) return;
-        markReconciliationInProgress();
-
         // Keep copy of assigned TopicPartitions created from the 
TopicIdPartitions that are
         // being reconciled. Needed for interactions with the centralized 
subscription state that
         // does not support topic IDs yet, and for the callbacks.
@@ -871,6 +868,14 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
         revokedPartitions.addAll(ownedPartitions);
         revokedPartitions.removeAll(assignedTopicPartitions);
 
+        // If canCommit is false (called from background poll(), not from 
AsyncPollEvent), skip
+        // reconciliation if it would involve revocation or auto-commit.
+        // Reconciliations revoking partitions cannot be triggered from the 
background because the app thread could be returning records for those 
partitions already.
+        // Reconciliations just adding new partitions are safe to trigger from 
the background thread since new partitions won't have buffered records.
+        if (!canCommit && (autoCommitEnabled || !revokedPartitions.isEmpty())) 
return;
+
+        markReconciliationInProgress();
+
         log.info("Reconciling assignment with local epoch {}\n" +
                         "\tMember:                                    {}\n" +
                         "\tAssigned partitions:                       {}\n" +
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index fee082e2481..cc79882da23 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1870,9 +1870,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     }
 
     private Fetch<K, V> pollForFetches(Timer timer) {
-        long pollTimeout = isCommittedOffsetsManagementEnabled()
-                ? Math.min(applicationEventHandler.maximumTimeToWait(), 
timer.remainingMs())
-                : timer.remainingMs();
 
         // if data is available already, return it immediately
         final Fetch<K, V> fetch = collectFetch();
@@ -1880,6 +1877,9 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             return fetch;
         }
 
+        long pollTimeout = isCommittedOffsetsManagementEnabled()
+                ? Math.min(applicationEventHandler.maximumTimeToWait(), 
timer.remainingMs())
+                : timer.remainingMs();
         // With the non-blocking poll design, it's possible that at this point 
the background thread is
         // concurrently working to update positions. Therefore, a _copy_ of 
the current assignment is retrieved
         // and iterated looking for any partitions with invalid positions. 
This is done to avoid being stuck
@@ -1931,6 +1931,28 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
      * for returning.
      */
     private Fetch<K, V> collectFetch() {
+        // Do not return buffered records if the background hasn't checked for 
pending reconciliations
+        // for the inflight poll event.
+        // This is key because partitions may need revocation, so we need to 
wait for the reconciliation check
+        // that triggers commits and marks partitions as pending revocation, 
before we can
+        // safely collect records from the buffer.
+        if (inflightPoll != null && 
!inflightPoll.isReconciliationCheckComplete()) {
+            // If the background hasn't had the time to check for pending 
reconciliation,
+            // we need to wait for that check before moving on (instead of 
returning empty right away,
+            // which will lead to blocking on buffer data)
+            long timeoutMs = inflightPoll.deadlineMs() - time.milliseconds();
+            if (timeoutMs > 0) {
+                try {
+                    
ConsumerUtils.getResult(inflightPoll.reconciliationCheckFuture(), timeoutMs);
+                } catch (TimeoutException e) {
+                    return Fetch.empty();
+                }
+            } else {
+                // No time to wait and reconciliation check not complete
+                return Fetch.empty();
+            }
+        }
+
         // With the non-blocking async poll, it's critical that the 
application thread wait until the background
         // thread has completed the stage of validating positions. This 
prevents a race condition where both
         // threads may attempt to update the SubscriptionState.position() for 
a given partition. So if the background
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
index 130656f5b58..78f878b7603 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
 import 
org.apache.kafka.clients.consumer.internals.metrics.ShareRebalanceMetricsManager;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
@@ -184,4 +185,16 @@ public class ShareMembershipManager extends 
AbstractMembershipManager<ShareGroup
     public int leaveGroupEpoch() {
         return ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
     }
+
+    /**
+     * {@inheritDoc}
+     * <p>
+     * For the ShareConsumer, full reconciliations can always be triggered 
from the background thread
+     * (fully updates assignment).
+     */
+    @Override
+    public PollResult poll(final long currentTimeMs) {
+        maybeReconcile(true);
+        return PollResult.EMPTY;
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 9c143165560..ac09dc8a40c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -729,6 +729,10 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
         
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
             consumerMembershipManager.maybeReconcile(true));
 
+        // We completed checking pending reconciliations (commits triggered, 
revoked partitions marked to prevent fetching)
+        // so the application thread poll loop can safely continue progress 
now (fetching)
+        event.markReconciliationCheckComplete();
+
         if (requestManagers.commitRequestManager.isPresent()) {
             CommitRequestManager commitRequestManager = 
requestManagers.commitRequestManager.get();
             commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java
index 068193ca498..cffd0607f06 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Time;
 
 import java.time.Duration;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * This class represents the non-blocking event that executes logic 
functionally equivalent to the following:
@@ -47,6 +48,7 @@ public class AsyncPollEvent extends ApplicationEvent 
implements MetadataErrorNot
     private volatile KafkaException error;
     private volatile boolean isComplete;
     private volatile boolean isValidatePositionsComplete;
+    private final CompletableFuture<Void> reconciliationCheckFuture = new 
CompletableFuture<>();
 
     /**
      * Creates a new event to signify a multi-stage processing of {@link 
Consumer#poll(Duration)} logic.
@@ -85,15 +87,47 @@ public class AsyncPollEvent extends ApplicationEvent 
implements MetadataErrorNot
         this.isValidatePositionsComplete = true;
     }
 
+    /**
+     * @return the future that completes when the background thread has 
checked any pending reconciliation
+     * for this poll event. Once complete, revocations have been handled 
(commit triggered and partitions
+     * marked as pending revocation), so the app thread can safely proceed to 
fetch/collect records.
+     */
+    public CompletableFuture<Void> reconciliationCheckFuture() {
+        return reconciliationCheckFuture;
+    }
+
+    /**
+     * @return true if the background already checked any pending 
reconciliation when processing this poll event.
+     * If it completed the check, we know that revocations were handled 
(commit triggered and partitions marked as pending revocation),
+     * so the app thread can safely proceed to fetch/collect records.
+     */
+    public boolean isReconciliationCheckComplete() {
+        return reconciliationCheckFuture.isDone();
+    }
+
+    /**
+     * Mark that reconciliation check is complete for this poll event.
+     * This should be called after the background has checked pending 
reconciliations when processing this poll event
+     * (triggered commits, and marked partitions as pending revocation if 
needed)
+     */
+    public void markReconciliationCheckComplete() {
+        reconciliationCheckFuture.complete(null);
+    }
+
     public boolean isComplete() {
         return isComplete;
     }
 
     public void completeSuccessfully() {
+        // Complete reconciliation future as safety net in case it wasn't 
already marked complete
+        reconciliationCheckFuture.complete(null);
         isComplete = true;
     }
 
     public void completeExceptionally(KafkaException e) {
+        // Complete reconciliation future to unblock any waiters - the error 
will be surfaced
+        // through the normal checkInflightPoll() mechanism via the error field
+        reconciliationCheckFuture.complete(null);
         error = e;
         isComplete = true;
     }
@@ -110,6 +144,7 @@ public class AsyncPollEvent extends ApplicationEvent 
implements MetadataErrorNot
             ", pollTimeMs=" + pollTimeMs +
             ", error=" + error +
             ", isComplete=" + isComplete +
-            ", isValidatePositionsComplete=" + isValidatePositionsComplete;
+            ", isValidatePositionsComplete=" + isValidatePositionsComplete +
+            ", isReconciliationCheckComplete=" + 
isReconciliationCheckComplete();
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 40cc0e8df83..e23110d77b8 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -527,6 +527,68 @@ public class AsyncKafkaConsumerTest {
         assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
     }
 
+    /**
+     * Test that poll() does not return records until the reconciliation check 
is complete.
+     * This prevents a race condition where records could be returned for 
partitions that
+     * are being revoked (see KAFKA-20332).
+     */
+    @Test
+    public void testPollWaitsForReconciliationCheckComplete() {
+        final String topicName = "foo";
+        final int partition = 3;
+        final TopicPartition tp = new TopicPartition(topicName, partition);
+        final List<ConsumerRecord<String, String>> records = asList(
+            new ConsumerRecord<>(topicName, partition, 2, "key1", "value1"),
+            new ConsumerRecord<>(topicName, partition, 3, "key2", "value2")
+        );
+
+        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        consumer = newConsumer(
+            mock(FetchBuffer.class),
+            new ConsumerInterceptors<>(Collections.emptyList(), metrics),
+            mock(ConsumerRebalanceListenerInvoker.class),
+            subscriptions);
+
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+        // PositionsValidator starts with metadataUpdateVersion=-1. Stub 
metadata.updateVersion() to match,
+        // so canSkipUpdateFetchPositions() passes and we test the 
reconciliation check path.
+        doReturn(-1).when(metadata).updateVersion();
+
+        completeTopicSubscriptionChangeEventSuccessfully();
+        consumer.subscribe(singleton(topicName), 
mock(ConsumerRebalanceListener.class));
+        // Simulate partition assignment from group coordinator
+        subscriptions.assignFromSubscribed(singleton(tp));
+
+        // Set up position so canSkipUpdateFetchPositions() returns true 
(partition in FETCHING state)
+        completeSeekUnvalidatedEventSuccessfully();
+        subscriptions.seek(tp, 0);
+
+        // Set up fetch collector to return records when called
+        doReturn(Fetch.forPartition(tp, records, true, new 
OffsetAndMetadata(4, Optional.of(0), "")))
+            .when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
+        // Capture the AsyncPollEvent to manually control when reconciliation 
check is marked complete
+        AtomicReference<AsyncPollEvent> capturedEvent = new 
AtomicReference<>();
+        doAnswer(invocation -> {
+            AsyncPollEvent event = invocation.getArgument(0);
+            assertTrue(capturedEvent.compareAndSet(null, event));
+            // Do NOT mark reconciliation check complete - simulating 
background hasn't processed it yet
+            return null;
+        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
+
+        // Poll should return empty because reconciliation check is not 
complete.
+        ConsumerRecords<?, ?> result1 = consumer.poll(Duration.ZERO);
+        assertTrue(result1.isEmpty(), "Poll should not return records if it 
hasn't completed checking and triggering pending reconciliations.");
+
+        // Now mark reconciliation check complete on the captured event
+        assertNotNull(capturedEvent.get(), "AsyncPollEvent should have been 
captured");
+        capturedEvent.get().markReconciliationCheckComplete();
+
+        // Next poll should return the records since reconciliation check is 
now complete
+        ConsumerRecords<?, ?> result2 = consumer.poll(Duration.ZERO);
+        assertEquals(2, result2.count(), "Expected 2 records after 
reconciliation check is complete");
+    }
+
     @Test
     public void testEnsureCallbackExecutedByApplicationThread() {
         consumer = newConsumer();

Reply via email to