This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 8dad4f93e9d KAFKA-20321: Mark lost partitions before callbacks for
Consumer and Streams (#21781)
8dad4f93e9d is described below
commit 8dad4f93e9d09a993dfb3868d1c464d4bff521cc
Author: Lianet Magrans <[email protected]>
AuthorDate: Fri Mar 20 09:54:41 2026 -0400
KAFKA-20321: Mark lost partitions before callbacks for Consumer and Streams
(#21781)
Mark partitions lost to avoid returning buffered records in the case of
a race after the callback completes (app thread moves onto polling,
while background moves onto clearing the lost assignment) Same fix
applied for Consumer and Stream managers (Share has no callbacks so this
does not apply)
Reviewers: Nilesh Kumar
[[email protected]](mailto:[email protected]), Lan Ding
<[email protected]>, Ken Huang <[email protected]>, PoAn Yang
<[email protected]>, Lucas Brutschy <[email protected]>
---
.../internals/AbstractMembershipManager.java | 5 +-
.../internals/ConsumerMembershipManager.java | 3 +
.../internals/StreamsMembershipManager.java | 9 +++
.../internals/ConsumerMembershipManagerTest.java | 70 +++++++++++++++++++++-
.../internals/StreamsMembershipManagerTest.java | 50 ++++++++++++++++
5 files changed, 133 insertions(+), 4 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index d5c46ed32b5..34e9ffc1ecd 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -773,8 +773,9 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
* expired poll timer. This will trigger the onPartitionsLost callback.
Once the callback
* completes, the member will remain stale until the poll timer is reset
by an application
* poll event. See {@link #maybeRejoinStaleMember()}.
+ * Visible for testing.
*/
- private void transitionToStale() {
+ void transitionToStale() {
transitionTo(MemberState.STALE);
// Release assignment
@@ -1246,7 +1247,7 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
* <li>Previous in-flight fetch requests that may complete while the
partitions are being revoked won't be processed.</li>
* </ul>
*/
- private void markPendingRevocationToPauseFetching(Set<TopicPartition>
partitionsToRevoke) {
+ protected void markPendingRevocationToPauseFetching(Set<TopicPartition>
partitionsToRevoke) {
// When asynchronously committing offsets prior to the revocation of a
set of partitions, there will be a
// window of time between when the offset commit is sent and when it
returns and revocation completes. It is
// possible for pending fetches for these partitions to return during
this time, which means the application's
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
index 93a83f3c22a..f58c6c3f296 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
@@ -301,6 +301,9 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
*/
@Override
protected CompletableFuture<Void> signalPartitionsLost(Set<TopicPartition>
partitionsLost) {
+ // Mark partitions as pending revocation to stop fetching from the
partitions (no new
+ // fetches sent out, and no in-flight fetches responses processed).
+ markPendingRevocationToPauseFetching(partitionsLost);
return invokeOnPartitionsLostCallback(partitionsLost);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
index fcb0b8ad6c6..96a03dc01b8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
@@ -462,6 +462,9 @@ public class StreamsMembershipManager implements
RequestManager {
private void transitionToStale() {
transitionTo(MemberState.STALE);
+ // Mark partitions as pending revocation to stop fetching before
callback
+
subscriptionState.markPendingRevocation(subscriptionState.assignedPartitions());
+
final CompletableFuture<Void> onAllTasksLostCallbackExecuted =
requestOnAllTasksLostCallbackInvocation();
staleMemberAssignmentRelease =
onAllTasksLostCallbackExecuted.whenComplete((result, error) -> {
if (error != null) {
@@ -500,6 +503,9 @@ public class StreamsMembershipManager implements
RequestManager {
return;
}
+ // Mark partitions as pending revocation to stop fetching before
callback
+
subscriptionState.markPendingRevocation(subscriptionState.assignedPartitions());
+
CompletableFuture<Void> onAllTasksLostCallbackExecuted =
requestOnAllTasksLostCallbackInvocation();
onAllTasksLostCallbackExecuted.whenComplete((result, error) -> {
if (error != null) {
@@ -805,6 +811,9 @@ public class StreamsMembershipManager implements
RequestManager {
log.debug("Member {} with epoch {} transitioned to {} state. It will
release its " +
"assignment and rejoin the group.", memberId, memberEpoch,
MemberState.FENCED);
+ // Mark partitions as pending revocation to stop fetching before
callback
+
subscriptionState.markPendingRevocation(subscriptionState.assignedPartitions());
+
CompletableFuture<Void> onAllTasksLostCallbackExecuted =
requestOnAllTasksLostCallbackInvocation();
onAllTasksLostCallbackExecuted.whenComplete((result, error) -> {
if (error != null) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
index 3a659970cdd..b6bb3878642 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@@ -124,7 +124,7 @@ public class ConsumerMembershipManagerTest {
commitRequestManager = mock(CommitRequestManager.class);
backgroundEventQueue = new LinkedBlockingQueue<>();
time = new MockTime(0);
- backgroundEventHandler = new
BackgroundEventHandler(backgroundEventQueue, time,
mock(AsyncConsumerMetrics.class));
+ backgroundEventHandler = spy(new
BackgroundEventHandler(backgroundEventQueue, time,
mock(AsyncConsumerMetrics.class)));
metrics = new Metrics(time);
rebalanceMetricsManager = new ConsumerRebalanceMetricsManager(metrics,
subscriptionState);
@@ -271,6 +271,72 @@ public class ConsumerMembershipManagerTest {
verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
}
+ @Test
+ public void
testTransitionToFencedMarksPendingRevocationBeforeSignalingPartitionsLost() {
+ ConsumerMembershipManager membershipManager =
createMemberInStableState();
+ String topicName = "topic1";
+ TopicPartition ownedPartition = new TopicPartition(topicName, 0);
+ Set<TopicPartition> ownedPartitions =
Collections.singleton(ownedPartition);
+
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener();
+
when(subscriptionState.assignedPartitions()).thenReturn(ownedPartitions);
+ when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+
+ membershipManager.transitionToFenced();
+
+ // Verify markPendingRevocation is called before enqueueing the
callback event
+ InOrder inOrder = inOrder(subscriptionState, backgroundEventHandler);
+
inOrder.verify(subscriptionState).markPendingRevocation(ownedPartitions);
+
inOrder.verify(backgroundEventHandler).add(any(PartitionsRemovedEvent.class));
+ }
+
+ @Test
+ public void
testTransitionToFatalMarksPendingRevocationBeforeSignalingPartitionsLost() {
+ ConsumerMembershipManager membershipManager =
createMemberInStableState();
+ String topicName = "topic1";
+ TopicPartition ownedPartition = new TopicPartition(topicName, 0);
+ Set<TopicPartition> ownedPartitions =
Collections.singleton(ownedPartition);
+
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener();
+
when(subscriptionState.assignedPartitions()).thenReturn(ownedPartitions);
+ when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+
+ membershipManager.transitionToFatal();
+
+ // Verify markPendingRevocation is called before enqueueing the
callback event
+ InOrder inOrder = inOrder(subscriptionState, backgroundEventHandler);
+
inOrder.verify(subscriptionState).markPendingRevocation(ownedPartitions);
+
inOrder.verify(backgroundEventHandler).add(any(PartitionsRemovedEvent.class));
+ }
+
+ @Test
+ public void
testTransitionToStaleMarksPendingRevocationBeforeSignalingPartitionsLost() {
+ ConsumerMembershipManager membershipManager =
createMemberInStableState();
+ String topicName = "topic1";
+ TopicPartition ownedPartition = new TopicPartition(topicName, 0);
+ Set<TopicPartition> ownedPartitions =
Collections.singleton(ownedPartition);
+
+ CounterConsumerRebalanceListener listener = new
CounterConsumerRebalanceListener();
+
when(subscriptionState.assignedPartitions()).thenReturn(ownedPartitions);
+ when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+
+ // First transition to LEAVING (required before transitioning to STALE)
+ membershipManager.transitionToSendingLeaveGroup(true);
+ clearInvocations(subscriptionState, backgroundEventHandler);
+
when(subscriptionState.assignedPartitions()).thenReturn(ownedPartitions);
+
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(listener));
+
+ membershipManager.transitionToStale();
+
+ // Verify markPendingRevocation is called before enqueueing the
callback event
+ InOrder inOrder = inOrder(subscriptionState, backgroundEventHandler);
+
inOrder.verify(subscriptionState).markPendingRevocation(ownedPartitions);
+
inOrder.verify(backgroundEventHandler).add(any(PartitionsRemovedEvent.class));
+ }
+
@Test
public void testListenersGetNotifiedOnTransitionsToFatal() {
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty());
@@ -2841,7 +2907,7 @@ public class ConsumerMembershipManagerTest {
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.STABLE, membershipManager.state());
- clearInvocations(subscriptionState, membershipManager,
commitRequestManager);
+ clearInvocations(subscriptionState, membershipManager,
commitRequestManager, backgroundEventHandler);
return membershipManager;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
index 4b6ee98090e..4043b947c0f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
@@ -42,11 +42,13 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
+import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -70,6 +72,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@@ -1770,6 +1773,53 @@ public class StreamsMembershipManagerTest {
verify(subscriptionState, never()).assignFromSubscribed(Set.of());
}
+ @Test
+ public void testOnFencedMarksPendingRevocationBeforeCallback() {
+ TopicPartition ownedPartition = new TopicPartition(TOPIC_0,
PARTITION_0);
+ Set<TopicPartition> ownedPartitions =
Collections.singleton(ownedPartition);
+
when(subscriptionState.assignedPartitions()).thenReturn(ownedPartitions);
+ joining();
+
+ membershipManager.onFenced();
+
+ // Verify markPendingRevocation is called before the callback event is
enqueued
+ InOrder inOrder = inOrder(subscriptionState, backgroundEventHandler);
+
inOrder.verify(subscriptionState).markPendingRevocation(ownedPartitions);
+
inOrder.verify(backgroundEventHandler).add(any(StreamsOnAllTasksLostCallbackNeededEvent.class));
+ }
+
+ @Test
+ public void testTransitionToFatalMarksPendingRevocationBeforeCallback() {
+ TopicPartition ownedPartition = new TopicPartition(TOPIC_0,
PARTITION_0);
+ Set<TopicPartition> ownedPartitions =
Collections.singleton(ownedPartition);
+
when(subscriptionState.assignedPartitions()).thenReturn(ownedPartitions);
+ joining();
+
+ membershipManager.transitionToFatal();
+
+ // Verify markPendingRevocation is called before the callback event is
enqueued
+ InOrder inOrder = inOrder(subscriptionState, backgroundEventHandler);
+
inOrder.verify(subscriptionState).markPendingRevocation(ownedPartitions);
+
inOrder.verify(backgroundEventHandler).add(any(StreamsOnAllTasksLostCallbackNeededEvent.class));
+ }
+
+ @Test
+ public void testTransitionToStaleMarksPendingRevocationBeforeCallback() {
+ TopicPartition ownedPartition = new TopicPartition(TOPIC_0,
PARTITION_0);
+ Set<TopicPartition> ownedPartitions =
Collections.singleton(ownedPartition);
+
when(subscriptionState.assignedPartitions()).thenReturn(ownedPartitions);
+ joining();
+
+ // Trigger poll timer expiry to transition to LEAVING, then STALE on
heartbeat generated
+ membershipManager.onPollTimerExpired();
+ membershipManager.onHeartbeatRequestGenerated();
+
+ // Verify markPendingRevocation is called before the callback event is
enqueued
+ InOrder inOrder = inOrder(subscriptionState, backgroundEventHandler);
+
inOrder.verify(subscriptionState).markPendingRevocation(ownedPartitions);
+
inOrder.verify(backgroundEventHandler).add(any(StreamsOnAllTasksLostCallbackNeededEvent.class));
+ }
+
@Test
public void testOnTasksAssignedCallbackCompleted() {
final CompletableFuture<Void> future = new CompletableFuture<>();