This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 67ae18eaae3 KAFKA-20428: Fix unsubscribe failure with assignment
updates (#22011)
67ae18eaae3 is described below
commit 67ae18eaae36dca0d70008fb48ec808ec87fc112
Author: Lianet Magrans <[email protected]>
AuthorDate: Sat Apr 11 09:42:41 2026 -0400
KAFKA-20428: Fix unsubscribe failure with assignment updates (#22011)
Fix to ensure that unsubscribe does not apply any pending assignment
update that may exist in the background queue (e.g, if a reconciliation
completed right before unsubscribing).
Fix by filtering out the assignment update events on unsubscribe (same
approach already done for filtering out error events that the
unsubscribe should not process).
This issue and fix only affects unsubscribe (not close), as unsubscribe
is the only one, other than poll, that processes background events.
Reviewers: Kirk True <[email protected]>, Andrew Schofield
<[email protected]>
---------
Co-authored-by: Copilot <[email protected]>
---
.../consumer/internals/AsyncKafkaConsumer.java | 54 ++++++++++++++++++++--
.../consumer/internals/AsyncKafkaConsumerTest.java | 46 +++++++++++++++---
.../internals/ConsumerMembershipManagerTest.java | 39 ++++++++++++++++
.../internals/StreamsMembershipManagerTest.java | 36 +++++++++++++++
4 files changed, 165 insertions(+), 10 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index c0b17a70984..2177d06df5d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1917,7 +1917,11 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
try {
// If users have fatal error, they will get some exceptions in
the background queue.
// When running unsubscribe, these exceptions should be
ignored, or users can't unsubscribe successfully.
- processBackgroundEvents(unsubscribeEvent.future(), timer, e ->
(e instanceof GroupAuthorizationException || e instanceof
TopicAuthorizationException));
+ // We also skip processing assignment events
(PARTITIONS_ASSIGNED, STREAMS_TASKS_ASSIGNED) because
+ // they are not relevant anymore (consumer already
unsubscribing).
+ processBackgroundEvents(unsubscribeEvent.future(), timer,
+ e -> (e instanceof GroupAuthorizationException || e
instanceof TopicAuthorizationException),
+ true);
log.info("Unsubscribed all topics or patterns and assigned
partitions");
} catch (TimeoutException e) {
log.error("Failed while waiting for the unsubscribe event to
complete");
@@ -2299,6 +2303,31 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
* Visible for testing.
*/
boolean processBackgroundEvents() {
+ return processBackgroundEvents(false);
+ }
+
+ /**
+ * Checks if the given background event is an assignment update event.
+ * Those are to update reconciled assignments, so should only be processed
from poll() and not from unsubscribe().
+ */
+ private static boolean isAssignmentEvent(BackgroundEvent event) {
+ return event.type() == BackgroundEvent.Type.PARTITIONS_ASSIGNED ||
+ event.type() == BackgroundEvent.Type.STREAMS_TASKS_ASSIGNED;
+ }
+
+ /**
+ * Process the events produced by the background thread.
+ * It is possible that {@link ErrorEvent an error}
+ * could occur when processing the events. In such cases, the processor
will take a reference to the first
+ * error, continue to process the remaining events, and then throw the
first error that occurred.
+ * Visible for testing.
+ *
+ * @param skipAssignmentEvents If true, skip processing events that update
a new assignment after a reconciliation
+ * (PARTITIONS_ASSIGNED and
STREAMS_TASKS_ASSIGNED)
+ * These events should only be processed from
poll(), not from unsubscribe().
+ * @return true if any events were drained from the queue
+ */
+ boolean processBackgroundEvents(boolean skipAssignmentEvents) {
AtomicReference<KafkaException> firstError = new AtomicReference<>();
List<BackgroundEvent> events = backgroundEventHandler.drainEvents();
@@ -2310,6 +2339,18 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
if (event instanceof CompletableEvent)
backgroundEventReaper.add((CompletableEvent<?>) event);
+ // Skip assignment events if requested (e.g., during
unsubscribe).
+ // These events should only be processed from poll().
+ // Complete them exceptionally to unblock the
reconciliation in the background.
+ if (skipAssignmentEvents && isAssignmentEvent(event)) {
+ if (event instanceof CompletableEvent) {
+ ((CompletableEvent<?>)
event).future().completeExceptionally(
+ new KafkaException("Assignment event skipped
because consumer is unsubscribing"));
+ }
+ log.debug("Skipped processing {} during unsubscribe",
event.type());
+ continue;
+ }
+
backgroundEventProcessor.process(event);
} catch (Throwable t) {
KafkaException e =
ConsumerUtils.maybeWrapAsKafkaException(t);
@@ -2367,14 +2408,19 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
* @param timer Overall timer that bounds how long to
wait for the event to complete
* @param ignoreErrorEventException Predicate to ignore background errors.
* Any exceptions found while processing
background events that match the predicate won't be propagated.
- * @return {@code true} if the event completed within the timeout, {@code
false} otherwise
+ * @param skipAssignmentEvents If true, skip processing
PARTITIONS_ASSIGNED and STREAMS_TASKS_ASSIGNED
+ * events and complete them
exceptionally. These events should only be
+ * processed from poll(), not from
unsubscribe() or other operations.
+ * @return the completed result of the supplied {@code future}
+ * @throws TimeoutException if the operation does not complete before the
timer expires
*/
// Visible for testing
- <T> T processBackgroundEvents(Future<T> future, Timer timer,
Predicate<Exception> ignoreErrorEventException) {
+ <T> T processBackgroundEvents(Future<T> future, Timer timer,
Predicate<Exception> ignoreErrorEventException,
+ boolean skipAssignmentEvents) {
do {
boolean hadEvents = false;
try {
- hadEvents = processBackgroundEvents();
+ hadEvents = processBackgroundEvents(skipAssignmentEvents);
} catch (Exception e) {
if (!ignoreErrorEventException.test(e))
throw e;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 1c7d3bcf4df..0966e97afac 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -1774,7 +1774,7 @@ public class AsyncKafkaConsumerTest {
}
/**
- * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer,
Predicate) processBackgroundEvents}
+ * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer,
Predicate, boolean) processBackgroundEvents}
* handles the case where the {@link Future} takes a bit of time to
complete, but does within the timeout.
*/
@Test
@@ -1800,14 +1800,14 @@ public class AsyncKafkaConsumerTest {
return null;
}).when(future).get(any(Long.class), any(TimeUnit.class));
- consumer.processBackgroundEvents(future, timer, e -> false);
+ consumer.processBackgroundEvents(future, timer, e -> false, false);
// 800 is the 1000 ms timeout (above) minus the 200 ms delay for the
two incremental timeouts/retries.
assertEquals(800, timer.remainingMs());
}
/**
- * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer,
Predicate) processBackgroundEvents}
+ * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer,
Predicate, boolean) processBackgroundEvents}
* handles the case where the {@link Future} is already complete when
invoked, so it doesn't have to wait.
*/
@Test
@@ -1818,7 +1818,7 @@ public class AsyncKafkaConsumerTest {
// Create a future that is already completed.
CompletableFuture<?> future = CompletableFuture.completedFuture(null);
- consumer.processBackgroundEvents(future, timer, e -> false);
+ consumer.processBackgroundEvents(future, timer, e -> false, false);
// Because we didn't need to perform a timed get, we should still have
every last millisecond
// of our initial timeout.
@@ -1826,7 +1826,7 @@ public class AsyncKafkaConsumerTest {
}
/**
- * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer,
Predicate) processBackgroundEvents}
+ * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer,
Predicate, boolean) processBackgroundEvents}
* handles the case where the {@link Future} does not complete within the
timeout.
*/
@Test
@@ -1841,7 +1841,7 @@ public class AsyncKafkaConsumerTest {
throw new java.util.concurrent.TimeoutException("Intentional
timeout");
}).when(future).get(any(Long.class), any(TimeUnit.class));
- assertThrows(TimeoutException.class, () ->
consumer.processBackgroundEvents(future, timer, e -> false));
+ assertThrows(TimeoutException.class, () ->
consumer.processBackgroundEvents(future, timer, e -> false, false));
// Because we forced our mocked future to continuously time out, we
should have no time remaining.
assertEquals(0, timer.remainingMs());
@@ -1910,6 +1910,40 @@ public class AsyncKafkaConsumerTest {
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
}
+ private static Stream<CompletableBackgroundEvent<?>>
assignmentEventsSource() {
+ return Stream.of(
+ new PartitionsAssignedEvent(Set.of(), new
TreeSet<>(TOPIC_PARTITION_COMPARATOR)),
+ new StreamsTasksAssignedEvent(
+ new TreeSet<>(TOPIC_PARTITION_COMPARATOR),
+ new TreeSet<>(TOPIC_PARTITION_COMPARATOR),
+ new StreamsRebalanceData.Assignment(Set.of(), Set.of(),
Set.of(), true))
+ );
+ }
+
+ /**
+ * Test to ensure that assignment updates are not applied while
unsubscribing
+ * (it would cause an IllegalArgumentException when calling unsubscribe()).
+ * Validates the fix for KAFKA-20428.
+ */
+ @ParameterizedTest
+ @MethodSource("assignmentEventsSource")
+ public void
testUnsubscribeWithPendingAssignmentEvent(CompletableBackgroundEvent<?>
assignedEvent) {
+ consumer =
newConsumer(requiredConsumerConfigAndGroupId("consumerGroup"));
+ completeTopicSubscriptionChangeEventSuccessfully();
+ consumer.subscribe(singletonList("topic"));
+ completeUnsubscribeApplicationEventSuccessfully();
+
+ // Add assignment event to the background queue (simulating an ongoing
reconciliation
+ // that completed just before unsubscribe was called)
+ backgroundEventQueue.add(assignedEvent);
+
+ // The call to unsubscribe should complete successfully (assignment
event not processed and completed exceptionally)
+ assertDoesNotThrow(() -> consumer.unsubscribe());
+ verify(applicationEventHandler, never().description("Reconciled
assignment updates shouldn't be processed while unsubscribing"))
+ .addAndGet(any(ApplyAssignmentEvent.class));
+ assertTrue(assignedEvent.future().isCompletedExceptionally());
+ }
+
@Test
public void testSeekToBeginning() {
Collection<TopicPartition> topics = Collections.singleton(new
TopicPartition("test", 0));
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
index b6bb3878642..914f0a26fb7 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@@ -1248,6 +1248,45 @@ public class ConsumerMembershipManagerTest {
assertEquals(MemberState.STALE, membershipManager.state());
}
+ /**
+ * Test that when unsubscribe/leaveGroup is called during an ongoing
reconciliation and the pending
+ * assignment event is completed exceptionally, the member can still
rejoin and start
+ * a new reconciliation.
+ */
+ @Test
+ public void testLeaveGroupDuringReconciliationThenRejoin() {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "topic1";
+ ConsumerMembershipManager membershipManager =
createMembershipManagerJoiningGroup();
+ mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId,
topicName, Collections.emptyList());
+
+ // Start reconciliation - assignment event is pending
+ receiveAssignment(topicId, Collections.singletonList(0),
membershipManager);
+ membershipManager.maybeReconcile(true);
+ PartitionsAssignedEvent pendingAssignmentEvent =
(PartitionsAssignedEvent) backgroundEventQueue.poll();
+ assertNotNull(pendingAssignmentEvent);
+
+ // Call leaveGroup while reconciliation is in progress
+ mockLeaveGroup();
+ membershipManager.leaveGroup();
+ assertEquals(MemberState.LEAVING, membershipManager.state());
+
+ // Complete the pending assignment event exceptionally (simulating
unsubscribe skipping it)
+ pendingAssignmentEvent.future().completeExceptionally(
+ new KafkaException("Assignment event skipped because consumer is
unsubscribing"));
+
+ // Complete leave and rejoin
+ membershipManager.onHeartbeatRequestGenerated();
+ clearInvocations(membershipManager);
+ mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId,
topicName, Collections.emptyList());
+ membershipManager.transitionToJoining();
+
+ // Receive assignment - verify new reconciliation starts
+ receiveAssignment(topicId, Collections.singletonList(0),
membershipManager);
+ membershipManager.maybeReconcile(true);
+ verifyReconciliationTriggered(membershipManager);
+ }
+
@Test
public void testFatalFailureWhenStateIsUnjoined() {
ConsumerMembershipManager membershipManager =
createMembershipManagerJoiningGroup();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
index 6312ac01577..3348a0d109f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
@@ -1343,6 +1343,42 @@ public class StreamsMembershipManagerTest {
assertFalse(onGroupLeft.isCompletedExceptionally());
}
+ /**
+ * Test that when unsubscribe/leaveGroup is called during an ongoing
reconciliation and the pending
+ * assignment event is completed exceptionally, the member can still
rejoin and start
+ * a new reconciliation.
+ */
+ @Test
+ public void testLeaveGroupDuringReconciliationThenRejoin() {
+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
TOPIC_0);
+ final Set<StreamsRebalanceData.TaskId> activeTasks =
+ Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0,
PARTITION_0));
+ when(subscriptionState.assignedPartitions()).thenReturn(Set.of());
+ joining();
+
+ // Start reconciliation - assignment event is pending
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0,
List.of(PARTITION_0)));
+ final StreamsTasksAssignedEvent pendingAssignmentEvent =
+
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
Set.of(), Set.of());
+
+ // Call leaveGroup while reconciliation is in progress
+ membershipManager.leaveGroup();
+
+ // Complete the pending assignment event exceptionally (simulating
unsubscribe skipping it)
+ pendingAssignmentEvent.future().completeExceptionally(
+ new KafkaException("Assignment event skipped because consumer is
unsubscribing"));
+
+ // Complete leave and rejoin
+ membershipManager.onHeartbeatRequestGenerated();
+ Mockito.clearInvocations(backgroundEventHandler);
+ tasksAssignedAddCount = 0;
+ joining();
+
+ // Receive assignment - verify new reconciliation starts
+ reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0,
List.of(PARTITION_0)));
+
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
Set.of(), Set.of());
+ }
+
@Test
public void testOnHeartbeatRequestSkippedWhenInLeaving() {
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
"topic");