This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 67ae18eaae3 KAFKA-20428: Fix unsubscribe failure with assignment 
updates (#22011)
67ae18eaae3 is described below

commit 67ae18eaae36dca0d70008fb48ec808ec87fc112
Author: Lianet Magrans <[email protected]>
AuthorDate: Sat Apr 11 09:42:41 2026 -0400

    KAFKA-20428: Fix unsubscribe failure with assignment updates (#22011)
    
    Fix to ensure that unsubscribe does not apply any pending assignment
    update that may exist in the background queue (e.g, if a reconciliation
    completed right before unsubscribing).
    
    Fix by filtering out the assignment update events on unsubscribe (same
    approach already done for filtering out error events that the
    unsubscribe should not process).
    
    This issue and fix only affects unsubscribe (not close), as unsubscribe
    is the only one, other than poll, that processes background events.
    
    Reviewers: Kirk True <[email protected]>, Andrew Schofield
     <[email protected]>
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     | 54 ++++++++++++++++++++--
 .../consumer/internals/AsyncKafkaConsumerTest.java | 46 +++++++++++++++---
 .../internals/ConsumerMembershipManagerTest.java   | 39 ++++++++++++++++
 .../internals/StreamsMembershipManagerTest.java    | 36 +++++++++++++++
 4 files changed, 165 insertions(+), 10 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index c0b17a70984..2177d06df5d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1917,7 +1917,11 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             try {
                 // If users have fatal error, they will get some exceptions in 
the background queue.
                 // When running unsubscribe, these exceptions should be 
ignored, or users can't unsubscribe successfully.
-                processBackgroundEvents(unsubscribeEvent.future(), timer, e -> 
(e instanceof GroupAuthorizationException || e instanceof 
TopicAuthorizationException));
+                // We also skip processing assignment events 
(PARTITIONS_ASSIGNED, STREAMS_TASKS_ASSIGNED) because
+                // they are not relevant anymore (consumer already 
unsubscribing).
+                processBackgroundEvents(unsubscribeEvent.future(), timer,
+                    e -> (e instanceof GroupAuthorizationException || e 
instanceof TopicAuthorizationException),
+                    true);
                 log.info("Unsubscribed all topics or patterns and assigned 
partitions");
             } catch (TimeoutException e) {
                 log.error("Failed while waiting for the unsubscribe event to 
complete");
@@ -2299,6 +2303,31 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
      * Visible for testing.
      */
     boolean processBackgroundEvents() {
+        return processBackgroundEvents(false);
+    }
+
+    /**
+     * Checks if the given background event is an assignment update event.
+     * Those are to update reconciled assignments, so should only be processed 
from poll() and not from unsubscribe().
+     */
+    private static boolean isAssignmentEvent(BackgroundEvent event) {
+        return event.type() == BackgroundEvent.Type.PARTITIONS_ASSIGNED ||
+               event.type() == BackgroundEvent.Type.STREAMS_TASKS_ASSIGNED;
+    }
+
+    /**
+     * Process the events produced by the background thread.
+     * It is possible that {@link ErrorEvent an error}
+     * could occur when processing the events. In such cases, the processor 
will take a reference to the first
+     * error, continue to process the remaining events, and then throw the 
first error that occurred.
+     * Visible for testing.
+     *
+     * @param skipAssignmentEvents If true, skip processing events that update 
a new assignment after a reconciliation
+     *                             (PARTITIONS_ASSIGNED and 
STREAMS_TASKS_ASSIGNED)
+     *                             These events should only be processed from 
poll(), not from unsubscribe().
+     * @return true if any events were drained from the queue
+     */
+    boolean processBackgroundEvents(boolean skipAssignmentEvents) {
         AtomicReference<KafkaException> firstError = new AtomicReference<>();
 
         List<BackgroundEvent> events = backgroundEventHandler.drainEvents();
@@ -2310,6 +2339,18 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     if (event instanceof CompletableEvent)
                         backgroundEventReaper.add((CompletableEvent<?>) event);
 
+                    // Skip assignment events if requested (e.g., during 
unsubscribe).
+                    // These events should only be processed from poll().
+                    // Complete them exceptionally to unblock the 
reconciliation in the background.
+                    if (skipAssignmentEvents && isAssignmentEvent(event)) {
+                        if (event instanceof CompletableEvent) {
+                            ((CompletableEvent<?>) 
event).future().completeExceptionally(
+                                new KafkaException("Assignment event skipped 
because consumer is unsubscribing"));
+                        }
+                        log.debug("Skipped processing {} during unsubscribe", 
event.type());
+                        continue;
+                    }
+
                     backgroundEventProcessor.process(event);
                 } catch (Throwable t) {
                     KafkaException e = 
ConsumerUtils.maybeWrapAsKafkaException(t);
@@ -2367,14 +2408,19 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
      * @param timer                     Overall timer that bounds how long to 
wait for the event to complete
      * @param ignoreErrorEventException Predicate to ignore background errors.
      *                                  Any exceptions found while processing 
background events that match the predicate won't be propagated.
-     * @return {@code true} if the event completed within the timeout, {@code 
false} otherwise
+     * @param skipAssignmentEvents      If true, skip processing 
PARTITIONS_ASSIGNED and STREAMS_TASKS_ASSIGNED
+     *                                  events and complete them 
exceptionally. These events should only be
+     *                                  processed from poll(), not from 
unsubscribe() or other operations.
+     * @return the completed result of the supplied {@code future}
+     * @throws TimeoutException if the operation does not complete before the 
timer expires
      */
     // Visible for testing
-    <T> T processBackgroundEvents(Future<T> future, Timer timer, 
Predicate<Exception> ignoreErrorEventException) {
+    <T> T processBackgroundEvents(Future<T> future, Timer timer, 
Predicate<Exception> ignoreErrorEventException,
+                                  boolean skipAssignmentEvents) {
         do {
             boolean hadEvents = false;
             try {
-                hadEvents = processBackgroundEvents();
+                hadEvents = processBackgroundEvents(skipAssignmentEvents);
             } catch (Exception e) {
                 if (!ignoreErrorEventException.test(e))
                     throw e;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 1c7d3bcf4df..0966e97afac 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -1774,7 +1774,7 @@ public class AsyncKafkaConsumerTest {
     }
 
     /**
-     * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, 
Predicate) processBackgroundEvents}
+     * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, 
Predicate, boolean) processBackgroundEvents}
      * handles the case where the {@link Future} takes a bit of time to 
complete, but does within the timeout.
      */
     @Test
@@ -1800,14 +1800,14 @@ public class AsyncKafkaConsumerTest {
             return null;
         }).when(future).get(any(Long.class), any(TimeUnit.class));
 
-        consumer.processBackgroundEvents(future, timer, e -> false);
+        consumer.processBackgroundEvents(future, timer, e -> false, false);
 
         // 800 is the 1000 ms timeout (above) minus the 200 ms delay for the 
two incremental timeouts/retries.
         assertEquals(800, timer.remainingMs());
     }
 
     /**
-     * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, 
Predicate) processBackgroundEvents}
+     * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, 
Predicate, boolean) processBackgroundEvents}
      * handles the case where the {@link Future} is already complete when 
invoked, so it doesn't have to wait.
      */
     @Test
@@ -1818,7 +1818,7 @@ public class AsyncKafkaConsumerTest {
         // Create a future that is already completed.
         CompletableFuture<?> future = CompletableFuture.completedFuture(null);
 
-        consumer.processBackgroundEvents(future, timer, e -> false);
+        consumer.processBackgroundEvents(future, timer, e -> false, false);
 
         // Because we didn't need to perform a timed get, we should still have 
every last millisecond
         // of our initial timeout.
@@ -1826,7 +1826,7 @@ public class AsyncKafkaConsumerTest {
     }
 
     /**
-     * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, 
Predicate) processBackgroundEvents}
+     * Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, 
Predicate, boolean) processBackgroundEvents}
      * handles the case where the {@link Future} does not complete within the 
timeout.
      */
     @Test
@@ -1841,7 +1841,7 @@ public class AsyncKafkaConsumerTest {
             throw new java.util.concurrent.TimeoutException("Intentional 
timeout");
         }).when(future).get(any(Long.class), any(TimeUnit.class));
 
-        assertThrows(TimeoutException.class, () -> 
consumer.processBackgroundEvents(future, timer, e -> false));
+        assertThrows(TimeoutException.class, () -> 
consumer.processBackgroundEvents(future, timer, e -> false, false));
 
         // Because we forced our mocked future to continuously time out, we 
should have no time remaining.
         assertEquals(0, timer.remainingMs());
@@ -1910,6 +1910,40 @@ public class AsyncKafkaConsumerTest {
         
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
     }
 
+    private static Stream<CompletableBackgroundEvent<?>> 
assignmentEventsSource() {
+        return Stream.of(
+            new PartitionsAssignedEvent(Set.of(), new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR)),
+            new StreamsTasksAssignedEvent(
+                new TreeSet<>(TOPIC_PARTITION_COMPARATOR),
+                new TreeSet<>(TOPIC_PARTITION_COMPARATOR),
+                new StreamsRebalanceData.Assignment(Set.of(), Set.of(), 
Set.of(), true))
+        );
+    }
+
+    /**
+     * Test to ensure that assignment updates are not applied while 
unsubscribing
+     * (it would cause an IllegalArgumentException when calling unsubscribe()).
+     * Validates the fix for KAFKA-20428.
+     */
+    @ParameterizedTest
+    @MethodSource("assignmentEventsSource")
+    public void 
testUnsubscribeWithPendingAssignmentEvent(CompletableBackgroundEvent<?> 
assignedEvent) {
+        consumer = 
newConsumer(requiredConsumerConfigAndGroupId("consumerGroup"));
+        completeTopicSubscriptionChangeEventSuccessfully();
+        consumer.subscribe(singletonList("topic"));
+        completeUnsubscribeApplicationEventSuccessfully();
+
+        // Add assignment event to the background queue (simulating an ongoing 
reconciliation
+        // that completed just before unsubscribe was called)
+        backgroundEventQueue.add(assignedEvent);
+
+        // The call to unsubscribe should complete successfully (assignment 
event not processed and completed exceptionally)
+        assertDoesNotThrow(() -> consumer.unsubscribe());
+        verify(applicationEventHandler, never().description("Reconciled 
assignment updates shouldn't be processed while unsubscribing"))
+                .addAndGet(any(ApplyAssignmentEvent.class));
+        assertTrue(assignedEvent.future().isCompletedExceptionally());
+    }
+
     @Test
     public void testSeekToBeginning() {
         Collection<TopicPartition> topics = Collections.singleton(new 
TopicPartition("test", 0));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
index b6bb3878642..914f0a26fb7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@@ -1248,6 +1248,45 @@ public class ConsumerMembershipManagerTest {
         assertEquals(MemberState.STALE, membershipManager.state());
     }
 
+    /**
+     * Test that when unsubscribe/leaveGroup is called during an ongoing 
reconciliation and the pending
+     * assignment event is completed exceptionally, the member can still 
rejoin and start
+     * a new reconciliation.
+     */
+    @Test
+    public void testLeaveGroupDuringReconciliationThenRejoin() {
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "topic1";
+        ConsumerMembershipManager membershipManager = 
createMembershipManagerJoiningGroup();
+        mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, 
topicName, Collections.emptyList());
+
+        // Start reconciliation - assignment event is pending
+        receiveAssignment(topicId, Collections.singletonList(0), 
membershipManager);
+        membershipManager.maybeReconcile(true);
+        PartitionsAssignedEvent pendingAssignmentEvent = 
(PartitionsAssignedEvent) backgroundEventQueue.poll();
+        assertNotNull(pendingAssignmentEvent);
+
+        // Call leaveGroup while reconciliation is in progress
+        mockLeaveGroup();
+        membershipManager.leaveGroup();
+        assertEquals(MemberState.LEAVING, membershipManager.state());
+
+        // Complete the pending assignment event exceptionally (simulating 
unsubscribe skipping it)
+        pendingAssignmentEvent.future().completeExceptionally(
+            new KafkaException("Assignment event skipped because consumer is 
unsubscribing"));
+
+        // Complete leave and rejoin
+        membershipManager.onHeartbeatRequestGenerated();
+        clearInvocations(membershipManager);
+        mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, 
topicName, Collections.emptyList());
+        membershipManager.transitionToJoining();
+
+        // Receive assignment - verify new reconciliation starts
+        receiveAssignment(topicId, Collections.singletonList(0), 
membershipManager);
+        membershipManager.maybeReconcile(true);
+        verifyReconciliationTriggered(membershipManager);
+    }
+
     @Test
     public void testFatalFailureWhenStateIsUnjoined() {
         ConsumerMembershipManager membershipManager = 
createMembershipManagerJoiningGroup();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
index 6312ac01577..3348a0d109f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
@@ -1343,6 +1343,42 @@ public class StreamsMembershipManagerTest {
         assertFalse(onGroupLeft.isCompletedExceptionally());
     }
 
+    /**
+     * Test that when unsubscribe/leaveGroup is called during an ongoing 
reconciliation and the pending
+     * assignment event is completed exceptionally, the member can still 
rejoin and start
+     * a new reconciliation.
+     */
+    @Test
+    public void testLeaveGroupDuringReconciliationThenRejoin() {
+        
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
TOPIC_0);
+        final Set<StreamsRebalanceData.TaskId> activeTasks =
+            Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 
PARTITION_0));
+        when(subscriptionState.assignedPartitions()).thenReturn(Set.of());
+        joining();
+
+        // Start reconciliation - assignment event is pending
+        reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
+        final StreamsTasksAssignedEvent pendingAssignmentEvent =
+            
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
 Set.of(), Set.of());
+
+        // Call leaveGroup while reconciliation is in progress
+        membershipManager.leaveGroup();
+
+        // Complete the pending assignment event exceptionally (simulating 
unsubscribe skipping it)
+        pendingAssignmentEvent.future().completeExceptionally(
+            new KafkaException("Assignment event skipped because consumer is 
unsubscribing"));
+
+        // Complete leave and rejoin
+        membershipManager.onHeartbeatRequestGenerated();
+        Mockito.clearInvocations(backgroundEventHandler);
+        tasksAssignedAddCount = 0;
+        joining();
+
+        // Receive assignment - verify new reconciliation starts
+        reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
+        
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
 Set.of(), Set.of());
+    }
+
     @Test
     public void testOnHeartbeatRequestSkippedWhenInLeaving() {
         
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
"topic");

Reply via email to