This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6918a5611da KAFKA-20382: Send background error when updating
assignment for callbacks fail (#21919)
6918a5611da is described below
commit 6918a5611dad2520b53d1946172b2ed1890ab046
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Apr 1 06:49:45 2026 -0400
KAFKA-20382: Send background error when updating assignment for callbacks
fail (#21919)
This is tidying up the error handling in the app thread around
assignment updates. This ensures that if the app thread fails to get the
result from the ApplyAssignment event, it not only propagates the
KafkaException to the poll (was already done), but it should also sends
the event to the background that signals that the attempt to "update
assignment and maybe trigger callbacks" completed with error. This will
make sure that the background marks the ongoing reconciliation attempt
complete with error
The ApplyAssignment processing is a simple operation that mainly adds
new partitions to the subscription state, so not expected to fail
really, but given that the app thread uses events and waits, it could
fail if the app thread is interrupted. This fix ensures that in that
case, the open reconciliation in the background is marked as completed
with error (just as it happens when the callback itself fails).
Reviewers: Andrew Schofield <[email protected]>
---
.../internals/AbstractMembershipManager.java | 2 +-
.../consumer/internals/AsyncKafkaConsumer.java | 18 ++++-
.../internals/StreamsMembershipManager.java | 4 +-
.../consumer/internals/AsyncKafkaConsumerTest.java | 77 ++++++++++++++++++++++
4 files changed, 96 insertions(+), 5 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index 34e9ffc1ecd..c1972900f7f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -1198,7 +1198,7 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
// Keeping newly added partitions as non-fetchable after the
callback failure.
// They will be retried on the next reconciliation loop, until
it succeeds or the
// broker removes them from the assignment.
- if (!addedPartitions.isEmpty()) {
+ if (!addedPartitions.isEmpty() &&
subscriptions.assignedPartitions().containsAll(addedPartitions)) {
log.warn("Leaving newly assigned partitions {} marked as
non-fetchable and not " +
"requiring initializing positions after
onPartitionsAssigned callback failed.",
addedPartitions, exception);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index e8ec90142cf..ec27794e6d9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -256,7 +256,14 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
event.assignedPartitions(),
event.addedPartitions()
);
- applicationEventHandler.addAndGet(applyEvent);
+ try {
+ applicationEventHandler.addAndGet(applyEvent);
+ } catch (Exception e) {
+ // Send error to the background thread, so it can complete the
ongoing reconciliation (failed to update assignment to run callbacks)
+ KafkaException error =
ConsumerUtils.maybeWrapAsKafkaException(e, "Failed to apply the new
assignment");
+ applicationEventHandler.add(new
ConsumerRebalanceListenerCallbackCompletedEvent(ON_PARTITIONS_ASSIGNED,
event.future(), Optional.of(error)));
+ throw error;
+ }
}
private void process(final PartitionsRemovedEvent event) {
@@ -307,7 +314,14 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
event.assignedPartitions(),
event.addedPartitions()
);
- applicationEventHandler.addAndGet(applyEvent);
+ try {
+ applicationEventHandler.addAndGet(applyEvent);
+ } catch (Exception e) {
+ // Send error to the background thread, so it can complete the
ongoing reconciliation (failed to update assignment to run callbacks)
+ KafkaException error =
ConsumerUtils.maybeWrapAsKafkaException(e, "Failed to apply the new
assignment");
+ applicationEventHandler.add(new
StreamsOnTasksAssignedCallbackCompletedEvent(event.future(),
Optional.of(error)));
+ throw error;
+ }
// Invoke the onTasksAssigned callback and notify the background
thread
StreamsOnTasksAssignedCallbackCompletedEvent invokedEvent =
invokeOnTasksAssignedCallback(
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
index b6f47af3a43..8222908f8f3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
@@ -1134,7 +1134,7 @@ public class StreamsMembershipManager implements
RequestManager {
LocalAssignment currentTargetAssignment = targetAssignment;
tasksRevokedAndAssigned.whenComplete((__, callbackError) -> {
if (callbackError != null) {
- log.error("Reconciliation failed: callback invocation failed
for tasks {}",
+ log.error("Reconciliation failed for tasks {}",
currentTargetAssignment, callbackError);
markReconciliationCompleted();
} else {
@@ -1212,7 +1212,7 @@ public class StreamsMembershipManager implements
RequestManager {
if (callbackError == null) {
subscriptionState.enablePartitionsAwaitingCallback(partitionsToAssign);
} else {
- if (!partitionsToAssignNotPreviouslyOwned.isEmpty()) {
+ if (!partitionsToAssignNotPreviouslyOwned.isEmpty() &&
subscriptionState.assignedPartitions().containsAll(partitionsToAssignNotPreviouslyOwned))
{
log.warn("Leaving newly assigned partitions {} marked as
non-fetchable and not " +
"requiring initializing positions after
onTasksAssigned callback failed.",
partitionsToAssignNotPreviouslyOwned, callbackError);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 95f474690a1..9743da654b2 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ApplyAssignmentEvent;
import
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
@@ -42,6 +43,7 @@ import
org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
+import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
@@ -51,6 +53,8 @@ import
org.apache.kafka.clients.consumer.internals.events.PartitionsAssignedEven
import
org.apache.kafka.clients.consumer.internals.events.PartitionsRemovedEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
+import
org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackCompletedEvent;
+import
org.apache.kafka.clients.consumer.internals.events.StreamsTasksAssignedEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import
org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
import
org.apache.kafka.clients.consumer.internals.events.TopicRe2JPatternSubscriptionChangeEvent;
@@ -2271,6 +2275,79 @@ public class AsyncKafkaConsumerTest {
}
}
+ /**
+ * Tests that when the app thread fails to update the assignment as part
of a reconciliation,
+ * it sends an event with error to the background thread before throwing
on poll.
+ * This ensures the background reconciliation is completed exceptionally.
+ */
+ @Test
+ public void
testPartitionsAssignedEventSendsErrorWhenApplyAssignmentFails() {
+ final InterruptException applyAssignmentError = new
InterruptException("Thread was interrupted");
+
+ consumer =
newConsumer(requiredConsumerConfigAndGroupId("consumerGroup"));
+ completeTopicSubscriptionChangeEventSuccessfully();
+ consumer.subscribe(singletonList("topic"), new
CounterConsumerRebalanceListener(
+ Optional.empty(), Optional.empty(), Optional.empty()));
+
+ // Make ApplyAssignmentEvent fail
+
when(applicationEventHandler.addAndGet(any(ApplyAssignmentEvent.class)))
+ .thenThrow(applyAssignmentError);
+
+ // Add PartitionsAssignedEvent to background queue
+ backgroundEventQueue.add(new PartitionsAssignedEvent(Set.of(), new
TreeSet<>()));
+
+ completeAsyncPollEventSuccessfully();
+
+ // Poll should throw because it couldn't update the assignment to run
callbacks
+ assertSame(applyAssignmentError,
assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO)));
+
+ // Verify that ConsumerRebalanceListenerCallbackCompletedEvent with
error was sent
+ ArgumentCaptor<ConsumerRebalanceListenerCallbackCompletedEvent>
eventCaptor =
+
ArgumentCaptor.forClass(ConsumerRebalanceListenerCallbackCompletedEvent.class);
+ verify(applicationEventHandler).add(eventCaptor.capture());
+ assertTrue(eventCaptor.getValue().error().isPresent());
+ assertSame(applyAssignmentError, eventCaptor.getValue().error().get());
+ assertEquals(ON_PARTITIONS_ASSIGNED,
eventCaptor.getValue().methodName());
+ }
+
+ /**
+ * Tests that when the app thread fails to update the assignment as part
of a reconciliation,
+ * it sends an event with error to the background thread before throwing
on poll.
+ * This ensures the background reconciliation is completed exceptionally.
+ */
+ @Test
+ public void
testStreamsTasksAssignedEventSendsErrorWhenApplyAssignmentFails() {
+ final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
+ UUID.randomUUID(), Optional.empty(), Optional.empty(), Map.of(),
Map.of());
+ final InterruptException applyAssignmentError = new
InterruptException("Thread was interrupted");
+
+ consumer = newConsumerWithStreamRebalanceData(
+ requiredConsumerConfigAndGroupId("streamsGroup"),
streamsRebalanceData);
+ completeTopicSubscriptionChangeEventSuccessfully();
+ consumer.subscribe(singletonList("topic"),
mock(StreamsRebalanceListener.class));
+
+ // Make ApplyAssignmentEvent fail
+
when(applicationEventHandler.addAndGet(any(ApplyAssignmentEvent.class)))
+ .thenThrow(applyAssignmentError);
+
+ // Add StreamsTasksAssignedEvent to background queue
+ backgroundEventQueue.add(new StreamsTasksAssignedEvent(
+ new TreeSet<>(), new TreeSet<>(),
+ new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of(),
true)));
+
+ completeAsyncPollEventSuccessfully();
+
+ // Poll should throw because it failed to update assignment to run
callbacks
+ assertSame(applyAssignmentError,
assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO)));
+
+ // Verify that StreamsOnTasksAssignedCallbackCompletedEvent with error
was sent
+ ArgumentCaptor<StreamsOnTasksAssignedCallbackCompletedEvent>
eventCaptor =
+
ArgumentCaptor.forClass(StreamsOnTasksAssignedCallbackCompletedEvent.class);
+ verify(applicationEventHandler).add(eventCaptor.capture());
+ assertTrue(eventCaptor.getValue().error().isPresent());
+ assertSame(applyAssignmentError, eventCaptor.getValue().error().get());
+ }
+
private void completeAsyncPollEventSuccessfully() {
doAnswer(invocation -> {
AsyncPollEvent event = invocation.getArgument(0);