This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 54d6e39fa6f KAFKA-20382: Send background error when updating 
assignment for callbacks fail (#21919)
54d6e39fa6f is described below

commit 54d6e39fa6f8b0c6cfac77b45cf69b373f380b4d
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Apr 1 06:49:45 2026 -0400

    KAFKA-20382: Send background error when updating assignment for callbacks 
fail (#21919)
    
    This is tidying up the error handling in the app thread around
    assignment updates. This ensures that if the app thread fails to get the
    result from the ApplyAssignment event, it not only propagates the
    KafkaException to the poll (was already done), but it should also sends
    the event to the background that signals that the attempt to "update
    assignment and maybe trigger callbacks" completed with error. This will
    make sure that the background marks the ongoing reconciliation attempt
    complete with error
    
    The ApplyAssignment processing is a simple operation that mainly adds
    new partitions to the subscription state, so not expected to fail
    really, but given that the app thread uses events and waits, it could
    fail if the app thread is interrupted. This fix ensures that in that
    case, the open reconciliation in the background is marked as completed
    with error (just as it happens when the callback itself fails).
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../internals/AbstractMembershipManager.java       |  2 +-
 .../consumer/internals/AsyncKafkaConsumer.java     | 18 ++++-
 .../internals/StreamsMembershipManager.java        |  4 +-
 .../consumer/internals/AsyncKafkaConsumerTest.java | 77 ++++++++++++++++++++++
 4 files changed, 96 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index 34e9ffc1ecd..c1972900f7f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -1198,7 +1198,7 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
                 // Keeping newly added partitions as non-fetchable after the 
callback failure.
                 // They will be retried on the next reconciliation loop, until 
it succeeds or the
                 // broker removes them from the assignment.
-                if (!addedPartitions.isEmpty()) {
+                if (!addedPartitions.isEmpty() && 
subscriptions.assignedPartitions().containsAll(addedPartitions)) {
                     log.warn("Leaving newly assigned partitions {} marked as 
non-fetchable and not " +
                             "requiring initializing positions after 
onPartitionsAssigned callback failed.",
                         addedPartitions, exception);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index d4ab7b836d3..2c5d4a1e857 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -256,7 +256,14 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 event.assignedPartitions(),
                 event.addedPartitions()
             );
-            applicationEventHandler.addAndGet(applyEvent);
+            try {
+                applicationEventHandler.addAndGet(applyEvent);
+            } catch (Exception e) {
+                // Send error to the background thread, so it can complete the 
ongoing reconciliation (failed to update assignment to run callbacks)
+                KafkaException error = 
ConsumerUtils.maybeWrapAsKafkaException(e, "Failed to apply the new 
assignment");
+                applicationEventHandler.add(new 
ConsumerRebalanceListenerCallbackCompletedEvent(ON_PARTITIONS_ASSIGNED, 
event.future(), Optional.of(error)));
+                throw error;
+            }
         }
 
         private void process(final PartitionsRemovedEvent event) {
@@ -307,7 +314,14 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 event.assignedPartitions(),
                 event.addedPartitions()
             );
-            applicationEventHandler.addAndGet(applyEvent);
+            try {
+                applicationEventHandler.addAndGet(applyEvent);
+            } catch (Exception e) {
+                // Send error to the background thread, so it can complete the 
ongoing reconciliation (failed to update assignment to run callbacks)
+                KafkaException error = 
ConsumerUtils.maybeWrapAsKafkaException(e, "Failed to apply the new 
assignment");
+                applicationEventHandler.add(new 
StreamsOnTasksAssignedCallbackCompletedEvent(event.future(), 
Optional.of(error)));
+                throw error;
+            }
 
             // Invoke the onTasksAssigned callback and notify the background 
thread
             StreamsOnTasksAssignedCallbackCompletedEvent invokedEvent = 
invokeOnTasksAssignedCallback(
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
index b6f47af3a43..8222908f8f3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
@@ -1134,7 +1134,7 @@ public class StreamsMembershipManager implements 
RequestManager {
         LocalAssignment currentTargetAssignment = targetAssignment;
         tasksRevokedAndAssigned.whenComplete((__, callbackError) -> {
             if (callbackError != null) {
-                log.error("Reconciliation failed: callback invocation failed 
for tasks {}",
+                log.error("Reconciliation failed for tasks {}",
                     currentTargetAssignment, callbackError);
                 markReconciliationCompleted();
             } else {
@@ -1212,7 +1212,7 @@ public class StreamsMembershipManager implements 
RequestManager {
             if (callbackError == null) {
                 
subscriptionState.enablePartitionsAwaitingCallback(partitionsToAssign);
             } else {
-                if (!partitionsToAssignNotPreviouslyOwned.isEmpty()) {
+                if (!partitionsToAssignNotPreviouslyOwned.isEmpty() && 
subscriptionState.assignedPartitions().containsAll(partitionsToAssignNotPreviouslyOwned))
 {
                     log.warn("Leaving newly assigned partitions {} marked as 
non-fetchable and not " +
                             "requiring initializing positions after 
onTasksAssigned callback failed.",
                         partitionsToAssignNotPreviouslyOwned, callbackError);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 4c7b97524ca..69cd413a532 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.SubscriptionPattern;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ApplyAssignmentEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
 import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
 import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
@@ -42,6 +43,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
+import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
@@ -51,6 +53,8 @@ import 
org.apache.kafka.clients.consumer.internals.events.PartitionsAssignedEven
 import 
org.apache.kafka.clients.consumer.internals.events.PartitionsRemovedEvent;
 import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
 import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackCompletedEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.StreamsTasksAssignedEvent;
 import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.TopicRe2JPatternSubscriptionChangeEvent;
@@ -2220,6 +2224,79 @@ public class AsyncKafkaConsumerTest {
         }
     }
 
+    /**
+     * Tests that when the app thread fails to update the assignment as part 
of a reconciliation,
+     * it sends an event with error to the background thread before throwing 
on poll.
+     * This ensures the background reconciliation is completed exceptionally.
+     */
+    @Test
+    public void 
testPartitionsAssignedEventSendsErrorWhenApplyAssignmentFails() {
+        final InterruptException applyAssignmentError = new 
InterruptException("Thread was interrupted");
+
+        consumer = 
newConsumer(requiredConsumerConfigAndGroupId("consumerGroup"));
+        completeTopicSubscriptionChangeEventSuccessfully();
+        consumer.subscribe(singletonList("topic"), new 
CounterConsumerRebalanceListener(
+            Optional.empty(), Optional.empty(), Optional.empty()));
+
+        // Make ApplyAssignmentEvent fail
+        
when(applicationEventHandler.addAndGet(any(ApplyAssignmentEvent.class)))
+            .thenThrow(applyAssignmentError);
+
+        // Add PartitionsAssignedEvent to background queue
+        backgroundEventQueue.add(new PartitionsAssignedEvent(Set.of(), new 
TreeSet<>()));
+
+        completeAsyncPollEventSuccessfully();
+
+        // Poll should throw because it couldn't update the assignment to run 
callbacks
+        assertSame(applyAssignmentError, 
assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO)));
+
+        // Verify that ConsumerRebalanceListenerCallbackCompletedEvent with 
error was sent
+        ArgumentCaptor<ConsumerRebalanceListenerCallbackCompletedEvent> 
eventCaptor =
+            
ArgumentCaptor.forClass(ConsumerRebalanceListenerCallbackCompletedEvent.class);
+        verify(applicationEventHandler).add(eventCaptor.capture());
+        assertTrue(eventCaptor.getValue().error().isPresent());
+        assertSame(applyAssignmentError, eventCaptor.getValue().error().get());
+        assertEquals(ON_PARTITIONS_ASSIGNED, 
eventCaptor.getValue().methodName());
+    }
+
+    /**
+     * Tests that when the app thread fails to update the assignment as part 
of a reconciliation,
+     * it sends an event with error to the background thread before throwing 
on poll.
+     * This ensures the background reconciliation is completed exceptionally.
+     */
+    @Test
+    public void 
testStreamsTasksAssignedEventSendsErrorWhenApplyAssignmentFails() {
+        final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
+            UUID.randomUUID(), Optional.empty(), Optional.empty(), Map.of(), 
Map.of());
+        final InterruptException applyAssignmentError = new 
InterruptException("Thread was interrupted");
+
+        consumer = newConsumerWithStreamRebalanceData(
+            requiredConsumerConfigAndGroupId("streamsGroup"), 
streamsRebalanceData);
+        completeTopicSubscriptionChangeEventSuccessfully();
+        consumer.subscribe(singletonList("topic"), 
mock(StreamsRebalanceListener.class));
+
+        // Make ApplyAssignmentEvent fail
+        
when(applicationEventHandler.addAndGet(any(ApplyAssignmentEvent.class)))
+            .thenThrow(applyAssignmentError);
+
+        // Add StreamsTasksAssignedEvent to background queue
+        backgroundEventQueue.add(new StreamsTasksAssignedEvent(
+            new TreeSet<>(), new TreeSet<>(),
+            new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of(), 
true)));
+
+        completeAsyncPollEventSuccessfully();
+
+        // Poll should throw because it failed to update assignment to run 
callbacks
+        assertSame(applyAssignmentError, 
assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO)));
+
+        // Verify that StreamsOnTasksAssignedCallbackCompletedEvent with error 
was sent
+        ArgumentCaptor<StreamsOnTasksAssignedCallbackCompletedEvent> 
eventCaptor =
+            
ArgumentCaptor.forClass(StreamsOnTasksAssignedCallbackCompletedEvent.class);
+        verify(applicationEventHandler).add(eventCaptor.capture());
+        assertTrue(eventCaptor.getValue().error().isPresent());
+        assertSame(applyAssignmentError, eventCaptor.getValue().error().get());
+    }
+
     private void completeAsyncPollEventSuccessfully() {
         doAnswer(invocation -> {
             AsyncPollEvent event = invocation.getArgument(0);

Reply via email to