This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 8dad4f93e9d KAFKA-20321: Mark lost partitions before callbacks for 
Consumer and Streams (#21781)
8dad4f93e9d is described below

commit 8dad4f93e9d09a993dfb3868d1c464d4bff521cc
Author: Lianet Magrans <[email protected]>
AuthorDate: Fri Mar 20 09:54:41 2026 -0400

    KAFKA-20321: Mark lost partitions before callbacks for Consumer and Streams 
(#21781)
    
    Mark partitions lost to avoid returning buffered records in the case of
    a race after the callback completes (app thread moves onto polling,
    while background moves onto clearing the lost assignment)  Same fix
    applied for Consumer and Stream managers (Share has no callbacks so this
    does not apply)
    
    Reviewers: Nilesh Kumar
     [[email protected]](mailto:[email protected]), Lan Ding
     <[email protected]>, Ken Huang <[email protected]>, PoAn Yang
     <[email protected]>, Lucas Brutschy <[email protected]>
---
 .../internals/AbstractMembershipManager.java       |  5 +-
 .../internals/ConsumerMembershipManager.java       |  3 +
 .../internals/StreamsMembershipManager.java        |  9 +++
 .../internals/ConsumerMembershipManagerTest.java   | 70 +++++++++++++++++++++-
 .../internals/StreamsMembershipManagerTest.java    | 50 ++++++++++++++++
 5 files changed, 133 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index d5c46ed32b5..34e9ffc1ecd 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -773,8 +773,9 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
      * expired poll timer. This will trigger the onPartitionsLost callback. 
Once the callback
      * completes, the member will remain stale until the poll timer is reset 
by an application
      * poll event. See {@link #maybeRejoinStaleMember()}.
+     * Visible for testing.
      */
-    private void transitionToStale() {
+    void transitionToStale() {
         transitionTo(MemberState.STALE);
 
         // Release assignment
@@ -1246,7 +1247,7 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
      *     <li>Previous in-flight fetch requests that may complete while the 
partitions are being revoked won't be processed.</li>
      * </ul>
      */
-    private void markPendingRevocationToPauseFetching(Set<TopicPartition> 
partitionsToRevoke) {
+    protected void markPendingRevocationToPauseFetching(Set<TopicPartition> 
partitionsToRevoke) {
         // When asynchronously committing offsets prior to the revocation of a 
set of partitions, there will be a
         // window of time between when the offset commit is sent and when it 
returns and revocation completes. It is
         // possible for pending fetches for these partitions to return during 
this time, which means the application's
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
index 93a83f3c22a..f58c6c3f296 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
@@ -301,6 +301,9 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
      */
     @Override
     protected CompletableFuture<Void> signalPartitionsLost(Set<TopicPartition> 
partitionsLost) {
+        // Mark partitions as pending revocation to stop fetching from the 
partitions (no new
+        // fetches sent out, and no in-flight fetches responses processed).
+        markPendingRevocationToPauseFetching(partitionsLost);
         return invokeOnPartitionsLostCallback(partitionsLost);
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
index fcb0b8ad6c6..96a03dc01b8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
@@ -462,6 +462,9 @@ public class StreamsMembershipManager implements 
RequestManager {
     private void transitionToStale() {
         transitionTo(MemberState.STALE);
 
+        // Mark partitions as pending revocation to stop fetching before 
callback
+        
subscriptionState.markPendingRevocation(subscriptionState.assignedPartitions());
+
         final CompletableFuture<Void> onAllTasksLostCallbackExecuted = 
requestOnAllTasksLostCallbackInvocation();
         staleMemberAssignmentRelease = 
onAllTasksLostCallbackExecuted.whenComplete((result, error) -> {
             if (error != null) {
@@ -500,6 +503,9 @@ public class StreamsMembershipManager implements 
RequestManager {
             return;
         }
 
+        // Mark partitions as pending revocation to stop fetching before 
callback
+        
subscriptionState.markPendingRevocation(subscriptionState.assignedPartitions());
+
         CompletableFuture<Void> onAllTasksLostCallbackExecuted = 
requestOnAllTasksLostCallbackInvocation();
         onAllTasksLostCallbackExecuted.whenComplete((result, error) -> {
             if (error != null) {
@@ -805,6 +811,9 @@ public class StreamsMembershipManager implements 
RequestManager {
         log.debug("Member {} with epoch {} transitioned to {} state. It will 
release its " +
             "assignment and rejoin the group.", memberId, memberEpoch, 
MemberState.FENCED);
 
+        // Mark partitions as pending revocation to stop fetching before 
callback
+        
subscriptionState.markPendingRevocation(subscriptionState.assignedPartitions());
+
         CompletableFuture<Void> onAllTasksLostCallbackExecuted = 
requestOnAllTasksLostCallbackInvocation();
         onAllTasksLostCallbackExecuted.whenComplete((result, error) -> {
             if (error != null) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
index 3a659970cdd..b6bb3878642 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@@ -124,7 +124,7 @@ public class ConsumerMembershipManagerTest {
         commitRequestManager = mock(CommitRequestManager.class);
         backgroundEventQueue = new LinkedBlockingQueue<>();
         time = new MockTime(0);
-        backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue, time, 
mock(AsyncConsumerMetrics.class));
+        backgroundEventHandler = spy(new 
BackgroundEventHandler(backgroundEventQueue, time, 
mock(AsyncConsumerMetrics.class)));
         metrics = new Metrics(time);
         rebalanceMetricsManager = new ConsumerRebalanceMetricsManager(metrics, 
subscriptionState);
 
@@ -271,6 +271,72 @@ public class ConsumerMembershipManagerTest {
         verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
     }
 
+    @Test
+    public void 
testTransitionToFencedMarksPendingRevocationBeforeSignalingPartitionsLost() {
+        ConsumerMembershipManager membershipManager = 
createMemberInStableState();
+        String topicName = "topic1";
+        TopicPartition ownedPartition = new TopicPartition(topicName, 0);
+        Set<TopicPartition> ownedPartitions = 
Collections.singleton(ownedPartition);
+
+        CounterConsumerRebalanceListener listener = new 
CounterConsumerRebalanceListener();
+        
when(subscriptionState.assignedPartitions()).thenReturn(ownedPartitions);
+        when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+        
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+
+        membershipManager.transitionToFenced();
+
+        // Verify markPendingRevocation is called before enqueueing the 
callback event
+        InOrder inOrder = inOrder(subscriptionState, backgroundEventHandler);
+        
inOrder.verify(subscriptionState).markPendingRevocation(ownedPartitions);
+        
inOrder.verify(backgroundEventHandler).add(any(PartitionsRemovedEvent.class));
+    }
+
+    @Test
+    public void 
testTransitionToFatalMarksPendingRevocationBeforeSignalingPartitionsLost() {
+        ConsumerMembershipManager membershipManager = 
createMemberInStableState();
+        String topicName = "topic1";
+        TopicPartition ownedPartition = new TopicPartition(topicName, 0);
+        Set<TopicPartition> ownedPartitions = 
Collections.singleton(ownedPartition);
+
+        CounterConsumerRebalanceListener listener = new 
CounterConsumerRebalanceListener();
+        
when(subscriptionState.assignedPartitions()).thenReturn(ownedPartitions);
+        when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+        
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+
+        membershipManager.transitionToFatal();
+
+        // Verify markPendingRevocation is called before enqueueing the 
callback event
+        InOrder inOrder = inOrder(subscriptionState, backgroundEventHandler);
+        
inOrder.verify(subscriptionState).markPendingRevocation(ownedPartitions);
+        
inOrder.verify(backgroundEventHandler).add(any(PartitionsRemovedEvent.class));
+    }
+
+    @Test
+    public void 
testTransitionToStaleMarksPendingRevocationBeforeSignalingPartitionsLost() {
+        ConsumerMembershipManager membershipManager = 
createMemberInStableState();
+        String topicName = "topic1";
+        TopicPartition ownedPartition = new TopicPartition(topicName, 0);
+        Set<TopicPartition> ownedPartitions = 
Collections.singleton(ownedPartition);
+
+        CounterConsumerRebalanceListener listener = new 
CounterConsumerRebalanceListener();
+        
when(subscriptionState.assignedPartitions()).thenReturn(ownedPartitions);
+        when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+        
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+
+        // First transition to LEAVING (required before transitioning to STALE)
+        membershipManager.transitionToSendingLeaveGroup(true);
+        clearInvocations(subscriptionState, backgroundEventHandler);
+        
when(subscriptionState.assignedPartitions()).thenReturn(ownedPartitions);
+        
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+
+        membershipManager.transitionToStale();
+
+        // Verify markPendingRevocation is called before enqueueing the 
callback event
+        InOrder inOrder = inOrder(subscriptionState, backgroundEventHandler);
+        
inOrder.verify(subscriptionState).markPendingRevocation(ownedPartitions);
+        
inOrder.verify(backgroundEventHandler).add(any(PartitionsRemovedEvent.class));
+    }
+
     @Test
     public void testListenersGetNotifiedOnTransitionsToFatal() {
         
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty());
@@ -2841,7 +2907,7 @@ public class ConsumerMembershipManagerTest {
         membershipManager.onHeartbeatRequestGenerated();
         assertEquals(MemberState.STABLE, membershipManager.state());
 
-        clearInvocations(subscriptionState, membershipManager, 
commitRequestManager);
+        clearInvocations(subscriptionState, membershipManager, 
commitRequestManager, backgroundEventHandler);
         return membershipManager;
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
index 4b6ee98090e..4043b947c0f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
@@ -42,11 +42,13 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
+import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -70,6 +72,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -1770,6 +1773,53 @@ public class StreamsMembershipManagerTest {
         verify(subscriptionState, never()).assignFromSubscribed(Set.of());
     }
 
+    @Test
+    public void testOnFencedMarksPendingRevocationBeforeCallback() {
+        TopicPartition ownedPartition = new TopicPartition(TOPIC_0, 
PARTITION_0);
+        Set<TopicPartition> ownedPartitions = 
Collections.singleton(ownedPartition);
+        
when(subscriptionState.assignedPartitions()).thenReturn(ownedPartitions);
+        joining();
+
+        membershipManager.onFenced();
+
+        // Verify markPendingRevocation is called before the callback event is 
enqueued
+        InOrder inOrder = inOrder(subscriptionState, backgroundEventHandler);
+        
inOrder.verify(subscriptionState).markPendingRevocation(ownedPartitions);
+        
inOrder.verify(backgroundEventHandler).add(any(StreamsOnAllTasksLostCallbackNeededEvent.class));
+    }
+
+    @Test
+    public void testTransitionToFatalMarksPendingRevocationBeforeCallback() {
+        TopicPartition ownedPartition = new TopicPartition(TOPIC_0, 
PARTITION_0);
+        Set<TopicPartition> ownedPartitions = 
Collections.singleton(ownedPartition);
+        
when(subscriptionState.assignedPartitions()).thenReturn(ownedPartitions);
+        joining();
+
+        membershipManager.transitionToFatal();
+
+        // Verify markPendingRevocation is called before the callback event is 
enqueued
+        InOrder inOrder = inOrder(subscriptionState, backgroundEventHandler);
+        
inOrder.verify(subscriptionState).markPendingRevocation(ownedPartitions);
+        
inOrder.verify(backgroundEventHandler).add(any(StreamsOnAllTasksLostCallbackNeededEvent.class));
+    }
+
+    @Test
+    public void testTransitionToStaleMarksPendingRevocationBeforeCallback() {
+        TopicPartition ownedPartition = new TopicPartition(TOPIC_0, 
PARTITION_0);
+        Set<TopicPartition> ownedPartitions = 
Collections.singleton(ownedPartition);
+        
when(subscriptionState.assignedPartitions()).thenReturn(ownedPartitions);
+        joining();
+
+        // Trigger poll timer expiry to transition to LEAVING, then STALE on 
heartbeat generated
+        membershipManager.onPollTimerExpired();
+        membershipManager.onHeartbeatRequestGenerated();
+
+        // Verify markPendingRevocation is called before the callback event is 
enqueued
+        InOrder inOrder = inOrder(subscriptionState, backgroundEventHandler);
+        
inOrder.verify(subscriptionState).markPendingRevocation(ownedPartitions);
+        
inOrder.verify(backgroundEventHandler).add(any(StreamsOnAllTasksLostCallbackNeededEvent.class));
+    }
+
     @Test
     public void testOnTasksAssignedCallbackCompleted() {
         final CompletableFuture<Void> future = new CompletableFuture<>();

Reply via email to