This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 71449aabb6c KAFKA-20106: Ensure reconciled assignment updated within
poll (#21495)
71449aabb6c is described below
commit 71449aabb6c115667045e62d2d879b2fbd6a7302
Author: Lianet Magrans <[email protected]>
AuthorDate: Tue Mar 17 17:11:40 2026 -0400
KAFKA-20106: Ensure reconciled assignment updated within poll (#21495)
Fix to ensure that reconciled assignments are only updated in the
subscription state with a call to consumer.poll.
Before this PR, assignment changes occurred in the background thread
when async operations commit/callback completed : (1)commit → (2)revoke
callback → (3)assignment update → (4)assign callback), potentially
causing IllegalStateException when applications called seek/position on
consumer.assignment().
With this PR, we piggyback the assignment update on the existing
mechanism that triggers the onPartitionsAssigned callback (consolidate
steps 3 and 4 mentioned above, in a single one). Replace
CallbackNeededEvent with a new PartitionsAssignedEvent sent to the app
thread after every reconciliation. This event performs both assignment
update and onPartitionsAssigned callback (if needed).
This ensures assignment changes happen within poll() for all cases
(commit or not, callbacks or not).
The fix applies to the KafkaConsumer only. The ShareConsumer behaviour
remains unchanged with this PR (performs assignment update in the
background)
Reviewers: Lucas Brutschy <[email protected]>, David Jacot
<[email protected]>
---
.../internals/AbstractMembershipManager.java | 31 ++----
.../consumer/internals/AsyncKafkaConsumer.java | 64 +++++++++++--
.../internals/ConsumerMembershipManager.java | 68 +++++++++-----
.../consumer/internals/ShareMembershipManager.java | 15 +++
.../internals/events/ApplicationEvent.java | 1 +
.../events/ApplicationEventProcessor.java | 26 ++++++
.../internals/events/ApplyAssignmentEvent.java | 66 +++++++++++++
.../consumer/internals/events/BackgroundEvent.java | 3 +-
.../internals/events/PartitionsAssignedEvent.java | 69 ++++++++++++++
...eededEvent.java => PartitionsRemovedEvent.java} | 18 ++--
.../consumer/internals/AsyncKafkaConsumerTest.java | 8 +-
.../internals/ConsumerMembershipManagerTest.java | 104 +++++++++++++++++----
12 files changed, 388 insertions(+), 85 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index c93a844ff9b..d5c46ed32b5 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -505,20 +505,6 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
clearPendingAssignmentsAndLocalNamesCache();
}
- /**
- * Update a new assignment by setting the assigned partitions in the
member subscription.
- * This will mark the newly added partitions as pending callback, to
prevent fetching records
- * or updating positions for them while the callback runs.
- *
- * @param assignedPartitions Full assignment, to update in the
subscription state
- * @param addedPartitions Newly added partitions
- */
- private void updateSubscriptionAwaitingCallback(TopicIdPartitionSet
assignedPartitions,
- SortedSet<TopicPartition>
addedPartitions) {
-
subscriptions.assignFromSubscribedAwaitingCallback(assignedPartitions.topicPartitions(),
addedPartitions);
- notifyAssignmentChange(assignedPartitions.topicPartitions());
- }
-
/**
* Transition to the {@link MemberState#JOINING} state, indicating that
the member will
* try to join the group on the next heartbeat request. This is expected
to be invoked when
@@ -1192,12 +1178,11 @@ public abstract class AbstractMembershipManager<R
extends AbstractResponse> impl
TopicIdPartitionSet assignedPartitions,
SortedSet<TopicPartition> addedPartitions) {
- // Update assignment in the subscription state, and ensure that no
fetching or positions
- // initialization happens for the newly added partitions while the
callback runs.
- updateSubscriptionAwaitingCallback(assignedPartitions,
addedPartitions);
+ // Signal that new partitions have been reconciled so that
type-specific actions can be taken.
+ // - ShareMembershipManager: updates subscription immediately and
returns completed future
+ // - ConsumerMembershipManager: enqueues event for app thread to apply
assignment within poll() and run callbacks
+ CompletableFuture<Void> result =
signalPartitionsAssigned(assignedPartitions, addedPartitions);
- // Invoke user call back.
- CompletableFuture<Void> result =
signalPartitionsAssigned(addedPartitions);
// Enable newly added partitions to start fetching and updating
positions for them.
result.whenComplete((__, exception) -> {
if (exception == null) {
@@ -1230,10 +1215,12 @@ public abstract class AbstractMembershipManager<R
extends AbstractResponse> impl
/**
* Signals to the membership manager that partitions are being assigned so
that actions
* specific to the group type can be taken.
+ *
+ * @param assignedPartitions The full assignment to apply
+ * @param addedPartitions The newly added partitions (used for callback
and subscription update)
*/
- public CompletableFuture<Void>
signalPartitionsAssigned(Set<TopicPartition> partitionsAssigned) {
- return CompletableFuture.completedFuture(null);
- }
+ protected abstract CompletableFuture<Void>
signalPartitionsAssigned(TopicIdPartitionSet assignedPartitions,
+
SortedSet<TopicPartition> addedPartitions);
/**
* Signals to the membership manager that partitions are being revoked so
that actions
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 644613a8dee..c08dcfa325a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -39,6 +39,7 @@ import
org.apache.kafka.clients.consumer.internals.events.AllTopicsMetadataEvent
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.ApplyAssignmentEvent;
import
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
@@ -51,7 +52,6 @@ import
org.apache.kafka.clients.consumer.internals.events.CompletableApplication
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
-import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import
org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
import org.apache.kafka.clients.consumer.internals.events.CurrentLagEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
@@ -59,6 +59,8 @@ import
org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import
org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
+import
org.apache.kafka.clients.consumer.internals.events.PartitionsAssignedEvent;
+import
org.apache.kafka.clients.consumer.internals.events.PartitionsRemovedEvent;
import org.apache.kafka.clients.consumer.internals.events.PausePartitionsEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import
org.apache.kafka.clients.consumer.internals.events.ResumePartitionsEvent;
@@ -143,6 +145,7 @@ import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
import static
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.TOPIC_PARTITION_COMPARATOR;
+import static
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
@@ -194,8 +197,12 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
process((ErrorEvent) event);
break;
- case CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED:
- process((ConsumerRebalanceListenerCallbackNeededEvent)
event);
+ case PARTITIONS_ASSIGNED:
+ process((PartitionsAssignedEvent) event);
+ break;
+
+ case PARTITIONS_REMOVED:
+ process((PartitionsRemovedEvent) event);
break;
case STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED:
@@ -220,12 +227,51 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
throw event.error();
}
- private void process(final
ConsumerRebalanceListenerCallbackNeededEvent event) {
+ /**
+ * Processing this event will perform the actions needed in the app
thread when new partitions are reconciled in the background:
+ * - apply assignment changes (ensuring they happen in the background
but triggered within the app thread poll)
+ * - run onPartitionsAssigned callback if present
+ * - notify background thread so it can carry on (e.g., send ack to
the broker)
+ */
+ private void process(final PartitionsAssignedEvent event) {
+
+ applyNewAssignment(event);
+
+ if (subscriptions.rebalanceListener().isEmpty()) {
+ event.future().complete(null);
+ } else {
+
invokeRebalanceCallbackAndNotifyBackgroundThread(ON_PARTITIONS_ASSIGNED,
event.addedPartitions(), event.future());
+ }
+ }
+
+ /**
+ * Send event to the background to update the assignment in the
subscription state.
+ * Block on it to complete to ensure the assignment change happens
within a call to
+ * consumer.poll.
+ * Note that this event only happens when there is a pending
assignment (reconciliation
+ * completed in the background)
+ */
+ private void applyNewAssignment(final PartitionsAssignedEvent event) {
+ ApplyAssignmentEvent applyEvent = new ApplyAssignmentEvent(
+ event.assignedPartitions(),
+ event.addedPartitions()
+ );
+ applicationEventHandler.addAndGet(applyEvent);
+ }
+
+ private void process(final PartitionsRemovedEvent event) {
+
invokeRebalanceCallbackAndNotifyBackgroundThread(event.methodName(),
event.partitions(), event.future());
+ }
+
+ private void invokeRebalanceCallbackAndNotifyBackgroundThread(
+ ConsumerRebalanceListenerMethodName methodName,
+ SortedSet<TopicPartition> partitions,
+ CompletableFuture<Void> future) {
ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent =
invokeRebalanceCallbacks(
rebalanceListenerInvoker,
- event.methodName(),
- event.partitions(),
- event.future()
+ methodName,
+ partitions,
+ future
);
applicationEventHandler.add(invokedEvent);
if (invokedEvent.error().isPresent()) {
@@ -2252,9 +2298,9 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
* {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)}
callback needs to be invoked for any
* partitions the consumer owns. However,
* this callback must be executed on the application thread. To achieve
this, the background thread enqueues a
- * {@link ConsumerRebalanceListenerCallbackNeededEvent} on its background
event queue. That event queue is
+ * {@link PartitionsRemovedEvent} on its background event queue. That
event queue is
* periodically queried by the application thread to see if there's work
to be done. When the application thread
- * sees {@link ConsumerRebalanceListenerCallbackNeededEvent}, it is
processed, and then a
+ * sees {@link PartitionsRemovedEvent}, it is processed, and then a
* {@link ConsumerRebalanceListenerCallbackCompletedEvent} is then
enqueued by the application thread on the
* application event queue. Moments later, the background thread will see
that event, process it, and continue
* execution of the rebalancing logic. The rebalancing logic cannot
complete until the
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
index 82c209ac128..93a83f3c22a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
@@ -19,11 +19,13 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplyAssignmentEvent;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import
org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
-import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+import
org.apache.kafka.clients.consumer.internals.events.PartitionsAssignedEvent;
+import
org.apache.kafka.clients.consumer.internals.events.PartitionsRemovedEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.ConsumerRebalanceMetricsManager;
import
org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
import org.apache.kafka.common.KafkaException;
@@ -48,7 +50,6 @@ import java.util.concurrent.CompletableFuture;
import static
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.DEFAULT;
import static
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.LEAVE_GROUP;
import static
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP;
-import static
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED;
import static
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST;
import static
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED;
@@ -134,7 +135,7 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
/**
* Serves as the conduit by which we can report events to the application
thread. This is needed as we send
- * {@link ConsumerRebalanceListenerCallbackNeededEvent callbacks} and, if
needed,
+ * {@link PartitionsAssignedEvent}, {@link PartitionsRemovedEvent} and, if
needed,
* {@link ErrorEvent errors} to the application thread.
*/
private final BackgroundEventHandler backgroundEventHandler;
@@ -360,17 +361,6 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
}
}
- private CompletableFuture<Void>
invokeOnPartitionsAssignedCallback(Set<TopicPartition> partitionsAssigned) {
- // This should always trigger the callback, even if partitionsAssigned
is empty, to keep
- // the current behaviour.
- Optional<ConsumerRebalanceListener> listener =
subscriptions.rebalanceListener();
- if (listener.isPresent()) {
- return
enqueueConsumerRebalanceListenerCallback(ON_PARTITIONS_ASSIGNED,
partitionsAssigned);
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }
-
private CompletableFuture<Void>
invokeOnPartitionsLostCallback(Set<TopicPartition> partitionsLost) {
// This should not trigger the callback if partitionsLost is empty, to
keep the current
// behaviour.
@@ -386,8 +376,12 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
* {@inheritDoc}
*/
@Override
- public CompletableFuture<Void>
signalPartitionsAssigned(Set<TopicPartition> partitionsAssigned) {
- return invokeOnPartitionsAssignedCallback(partitionsAssigned);
+ protected CompletableFuture<Void>
signalPartitionsAssigned(TopicIdPartitionSet assignedPartitions,
+
SortedSet<TopicPartition> addedPartitions) {
+ // Send an event to notify the app thread that the assignment changed
with new partitions.
+ // The app thread is expected to trigger the assignment update (within
a call to poll),
+ // and to run the onPartitionsAssigned callback if needed.
+ return
enqueuePartitionsAssignedEvent(assignedPartitions.topicPartitions(),
addedPartitions);
}
/**
@@ -440,16 +434,16 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
}
/**
- * Enqueue a {@link ConsumerRebalanceListenerCallbackNeededEvent} to
trigger the execution of the
- * appropriate {@link ConsumerRebalanceListener} {@link
ConsumerRebalanceListenerMethodName method} on the
- * application thread.
+ * Enqueue a {@link PartitionsRemovedEvent} to trigger the execution of
either
+ * {@link ConsumerRebalanceListener#onPartitionsRevoked} or {@link
ConsumerRebalanceListener#onPartitionsLost}
+ * on the application thread.
*
* <p/>
*
* Because the reconciliation process (run in the background thread) will
be blocked by the application thread
* until it completes this, we need to provide a {@link CompletableFuture}
by which to remember where we left off.
*
- * @param methodName Callback method that needs to be executed on the
application thread
+ * @param methodName Callback method that needs to be executed
(ON_PARTITIONS_REVOKED or ON_PARTITIONS_LOST)
* @param partitions Partitions to supply to the callback method
* @return Future that will be chained within the rest of the
reconciliation logic
*/
@@ -458,12 +452,29 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
SortedSet<TopicPartition> sortedPartitions = new
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
sortedPartitions.addAll(partitions);
- CompletableBackgroundEvent<Void> event = new
ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions);
+ CompletableBackgroundEvent<Void> event = new
PartitionsRemovedEvent(methodName, sortedPartitions);
backgroundEventHandler.add(event);
log.debug("The event to trigger the {} method execution was enqueued
successfully", methodName.fullyQualifiedMethodName());
return event.future();
}
+ /**
+ * Enqueue a {@link PartitionsAssignedEvent} to the application thread.
+ * This event handles the assignment update and optional
onPartitionsAssigned callback.
+ *
+ * @param fullAssignment The full assignment to apply
+ * @param addedPartitions The newly added partitions (passed to the
callback)
+ * @return Future that will be chained within the rest of the
reconciliation logic
+ */
+ private CompletableFuture<Void>
enqueuePartitionsAssignedEvent(Set<TopicPartition> fullAssignment,
+
SortedSet<TopicPartition> addedPartitions) {
+ CompletableBackgroundEvent<Void> event = new
PartitionsAssignedEvent(fullAssignment, addedPartitions);
+ backgroundEventHandler.add(event);
+ log.debug("The event to update the new assignment and trigger
onPartitionsAssigned callback if needed " +
+ "has been enqueued successfully to be sent to the app
thread.");
+ return event.future();
+ }
+
/**
* Signals that a {@link ConsumerRebalanceListener} callback has
completed. This is invoked when the
* application thread has completed the callback and has submitted a
@@ -497,6 +508,21 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
}
}
+ /**
+ * Apply the assignment update to the subscription state. This is called
from the background
+ * thread when processing an {@link ApplyAssignmentEvent} that was
triggered by the application
+ * thread during poll. This ensures that the assignment update happens on
the background thread
+ * but is coordinated by the application thread, so consumer.assignment()
only changes within
+ * a call to consumer.poll().
+ *
+ * @param assignedPartitions The full assignment to apply
+ * @param addedPartitions The newly added partitions
+ */
+ public void applyAssignment(Set<TopicPartition> assignedPartitions,
SortedSet<TopicPartition> addedPartitions) {
+ subscriptions.assignFromSubscribedAwaitingCallback(assignedPartitions,
addedPartitions);
+ notifyAssignmentChange(assignedPartitions);
+ }
+
/**
* {@inheritDoc}
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
index 130656f5b58..dfdbd958401 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import
org.apache.kafka.clients.consumer.internals.metrics.ShareRebalanceMetricsManager;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.Metrics;
@@ -32,6 +33,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
/**
* Group manager for a single consumer that has a group id defined in the
config
@@ -175,6 +177,19 @@ public class ShareMembershipManager extends
AbstractMembershipManager<ShareGroup
}
}
+ /**
+ * {@inheritDoc}
+ * <p>
+ * For the ShareConsumer, assignment changes are applied immediately in
the background thread.
+ */
+ @Override
+ protected CompletableFuture<Void>
signalPartitionsAssigned(TopicIdPartitionSet assignedPartitions,
+
SortedSet<TopicPartition> addedPartitions) {
+
subscriptions.assignFromSubscribedAwaitingCallback(assignedPartitions.topicPartitions(),
addedPartitions);
+ notifyAssignmentChange(assignedPartitions.topicPartitions());
+ return CompletableFuture.completedFuture(null);
+ }
+
@Override
public int joinGroupEpoch() {
return ShareGroupHeartbeatRequest.JOIN_GROUP_MEMBER_EPOCH;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index 79ca558123a..af3777a2728 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -43,6 +43,7 @@ public abstract class ApplicationEvent {
STREAMS_ON_TASKS_ASSIGNED_CALLBACK_COMPLETED,
STREAMS_ON_TASKS_REVOKED_CALLBACK_COMPLETED,
STREAMS_ON_ALL_TASKS_LOST_CALLBACK_COMPLETED,
+ APPLY_ASSIGNMENT,
}
private final Type type;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 5e11b3a0ef6..8e0eecb7a99 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -223,6 +223,10 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
process((StreamsOnAllTasksLostCallbackCompletedEvent) event);
return;
+ case APPLY_ASSIGNMENT:
+ process((ApplyAssignmentEvent) event);
+ return;
+
default:
log.warn("Application event type {} was not expected",
event.type());
}
@@ -716,6 +720,28 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
requestManagers.streamsMembershipManager.get().onAllTasksLostCallbackCompleted(event);
}
+ /**
+ * Update the subscription state with a new assignment that has been
reconciled.
+ * This is triggered by the application thread during poll (to ensure that
assignment changes
+ * happen only within a call to consumer.poll), and it's applied here on
the background thread
+ * (to keep subscription state changes in the background)
+ */
+ private void process(final ApplyAssignmentEvent event) {
+ if (requestManagers.consumerMembershipManager.isEmpty()) {
+ log.warn("ConsumerMembershipManager not present when processing
ApplyAssignmentEvent");
+ event.future().completeExceptionally(
+ new IllegalStateException("ConsumerMembershipManager not
available"));
+ return;
+ }
+ try {
+ requestManagers.consumerMembershipManager.get().applyAssignment(
+ event.assignedPartitions(), event.addedPartitions());
+ event.future().complete(null);
+ } catch (Exception e) {
+ event.future().completeExceptionally(e);
+ }
+ }
+
private void process(final AsyncPollEvent event) {
// Trigger a reconciliation that can safely commit offsets if needed
to rebalance,
// as we're processing before any new fetching starts
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplyAssignmentEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplyAssignmentEvent.java
new file mode 100644
index 00000000000..b6acbb07cb3
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplyAssignmentEvent.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+
+/**
+ * Event sent from the application thread to the background thread to
+ * update the subscription state with a new group assignment after it has been
reconciled.
+ * Done via events to ensure that assignment updates happen on the background
thread,
+ * triggered by the application thread during poll, and completed before the
call to poll returns to the user.
+ */
+public class ApplyAssignmentEvent extends CompletableApplicationEvent<Void> {
+
+ /**
+ * The full assignment to apply
+ * This is used to update the subscription state.
+ */
+ private final Set<TopicPartition> assignedPartitions;
+
+ /**
+ * The newly added partitions.
+ * This is used to mark them as awaiting callbacks if needed when updating
the subscription state.
+ */
+ private final SortedSet<TopicPartition> addedPartitions;
+
+ public ApplyAssignmentEvent(Set<TopicPartition> assignedPartitions,
+ SortedSet<TopicPartition> addedPartitions) {
+ super(Type.APPLY_ASSIGNMENT, Long.MAX_VALUE);
+ this.assignedPartitions = Objects.requireNonNull(assignedPartitions);
+ this.addedPartitions = Objects.requireNonNull(addedPartitions);
+ }
+
+ public Set<TopicPartition> assignedPartitions() {
+ return assignedPartitions;
+ }
+
+ public SortedSet<TopicPartition> addedPartitions() {
+ return addedPartitions;
+ }
+
+ @Override
+ protected String toStringBase() {
+ return super.toStringBase() +
+ ", assignedPartitions=" + assignedPartitions +
+ ", addedPartitions=" + addedPartitions;
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
index dc33a134bcf..ca4fc213594 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
@@ -27,7 +27,8 @@ public abstract class BackgroundEvent {
public enum Type {
ERROR,
- CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED,
+ PARTITIONS_ASSIGNED,
+ PARTITIONS_REMOVED,
STREAMS_ON_TASKS_ASSIGNED_CALLBACK_NEEDED,
STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED,
STREAMS_ON_ALL_TASKS_LOST_CALLBACK_NEEDED
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PartitionsAssignedEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PartitionsAssignedEvent.java
new file mode 100644
index 00000000000..82317cb2627
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PartitionsAssignedEvent.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+
+/**
+ * Event sent from the background to the app thread, to notify that a new
assignment has been reconciled.
+ * The app thread is expected to apply the assignment change to the
subscription state in the next call to consumer.poll,
+ * and invoke the onPartitionsAssigned callback if needed.
+ */
+public class PartitionsAssignedEvent extends CompletableBackgroundEvent<Void> {
+
+ private final Set<TopicPartition> assignedPartitions;
+ private final SortedSet<TopicPartition> addedPartitions;
+
+ /**
+ * Constructor for the partitions assigned event.
+ *
+ * @param assignedPartitions The full assignment to apply
+ * @param addedPartitions The newly added partitions (passed to the
callback)
+ */
+ public PartitionsAssignedEvent(final Set<TopicPartition>
assignedPartitions,
+ final SortedSet<TopicPartition>
addedPartitions) {
+ super(Type.PARTITIONS_ASSIGNED, Long.MAX_VALUE);
+ this.assignedPartitions = Objects.requireNonNull(assignedPartitions);
+ this.addedPartitions =
Collections.unmodifiableSortedSet(Objects.requireNonNull(addedPartitions));
+ }
+
+ /**
+ * @return The full assignment to apply.
+ */
+ public Set<TopicPartition> assignedPartitions() {
+ return assignedPartitions;
+ }
+
+ /**
+ * @return The newly added partitions (passed to the onPartitionsAssigned
callback).
+ */
+ public SortedSet<TopicPartition> addedPartitions() {
+ return addedPartitions;
+ }
+
+ @Override
+ protected String toStringBase() {
+ return super.toStringBase() +
+ ", assignedPartitions=" + assignedPartitions +
+ ", addedPartitions=" + addedPartitions;
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PartitionsRemovedEvent.java
similarity index 64%
rename from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java
rename to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PartitionsRemovedEvent.java
index ecb9eedab22..6f3832a6e5c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PartitionsRemovedEvent.java
@@ -16,30 +16,26 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName;
import org.apache.kafka.common.TopicPartition;
-import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.SortedSet;
/**
- * Event that signifies that the network I/O thread wants to invoke one of the
callback methods on the
- * {@link ConsumerRebalanceListener}. This event will be processed by the
application thread when the next
- * {@link Consumer#poll(Duration)} call is performed by the user. When
processed, the application thread should
- * invoke the appropriate callback method (based on {@link #methodName()})
with the given partitions.
+ * Event sent from the background to the app thread
+ * to notify that a new assignment has been reconciled in the background
removing partitions.
+ * The app thread is expected to invoke the
onPartitionsRevoked/onPartitionsLost callback as needed.
*/
-public class ConsumerRebalanceListenerCallbackNeededEvent extends
CompletableBackgroundEvent<Void> {
+public class PartitionsRemovedEvent extends CompletableBackgroundEvent<Void> {
private final ConsumerRebalanceListenerMethodName methodName;
private final SortedSet<TopicPartition> partitions;
- public ConsumerRebalanceListenerCallbackNeededEvent(final
ConsumerRebalanceListenerMethodName methodName,
- final
SortedSet<TopicPartition> partitions) {
- super(Type.CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED,
Long.MAX_VALUE);
+ public PartitionsRemovedEvent(final ConsumerRebalanceListenerMethodName
methodName,
+ final SortedSet<TopicPartition> partitions)
{
+ super(Type.PARTITIONS_REMOVED, Long.MAX_VALUE);
this.methodName = Objects.requireNonNull(methodName);
this.partitions = Collections.unmodifiableSortedSet(partitions);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 1df45a8fe94..ef939d9f7c2 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -42,12 +42,12 @@ import
org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
-import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import
org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
+import
org.apache.kafka.clients.consumer.internals.events.PartitionsRemovedEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
@@ -480,7 +480,7 @@ public class AsyncKafkaConsumerTest {
doAnswer(invocation ->
Fetch.empty()).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class));
SortedSet<TopicPartition> sortedPartitions = new
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
sortedPartitions.add(tp);
- CompletableBackgroundEvent<Void> e = new
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED,
sortedPartitions);
+ CompletableBackgroundEvent<Void> e = new
PartitionsRemovedEvent(ON_PARTITIONS_REVOKED, sortedPartitions);
backgroundEventQueue.add(e);
completeCommitSyncApplicationEventSuccessfully();
final AtomicBoolean callbackExecuted = new AtomicBoolean(false);
@@ -1452,7 +1452,7 @@ public class AsyncKafkaConsumerTest {
SortedSet<TopicPartition> partitions = Collections.emptySortedSet();
for (ConsumerRebalanceListenerMethodName methodName : methodNames) {
- CompletableBackgroundEvent<Void> e = new
ConsumerRebalanceListenerCallbackNeededEvent(methodName, partitions);
+ CompletableBackgroundEvent<Void> e = new
PartitionsRemovedEvent(methodName, partitions);
backgroundEventQueue.add(e);
}
@@ -1959,7 +1959,7 @@ public class AsyncKafkaConsumerTest {
Metrics metrics = consumer.metricsRegistry();
AsyncConsumerMetrics asyncConsumerMetrics =
consumer.asyncConsumerMetrics();
- ConsumerRebalanceListenerCallbackNeededEvent event = new
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED,
Collections.emptySortedSet());
+ PartitionsRemovedEvent event = new
PartitionsRemovedEvent(ON_PARTITIONS_REVOKED, Collections.emptySortedSet());
event.setEnqueuedMs(time.milliseconds());
backgroundEventQueue.add(event);
asyncConsumerMetrics.recordBackgroundEventQueueSize(1);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
index 4fe8e7dac80..15ac6513576 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@@ -21,7 +21,8 @@ import
org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
-import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
+import
org.apache.kafka.clients.consumer.internals.events.PartitionsAssignedEvent;
+import
org.apache.kafka.clients.consumer.internals.events.PartitionsRemovedEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import
org.apache.kafka.clients.consumer.internals.metrics.ConsumerRebalanceMetricsManager;
import
org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
@@ -334,6 +335,7 @@ public class ConsumerMembershipManagerTest {
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
membershipManager.onHeartbeatSuccess(heartbeatResponse);
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.STABLE, membershipManager.state());
assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
@@ -704,6 +706,7 @@ public class ConsumerMembershipManagerTest {
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1,
membershipManager.memberId()));
assertEquals(MemberState.RECONCILING, membershipManager.state());
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.STABLE, membershipManager.state());
@@ -740,6 +743,7 @@ public class ConsumerMembershipManagerTest {
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1,
membershipManager.memberId()));
assertEquals(MemberState.RECONCILING, membershipManager.state());
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
verifyReconciliationTriggeredAndCompleted(membershipManager,
Collections.singletonList(new TopicIdPartition(topic1, new
TopicPartition("topic1", 0)))
);
@@ -809,6 +813,7 @@ public class ConsumerMembershipManagerTest {
// Reconciliation in progress completes. Should be applied revoking
topic 1 only. Newly
// discovered topic2 will be reconciled in the next reconciliation
loop.
commitResult.complete(null);
+ processAssignmentEventNoCallback(membershipManager);
// Member should update the subscription and send ack when the delayed
reconciliation
// completes.
@@ -828,6 +833,7 @@ public class ConsumerMembershipManagerTest {
assertEquals(topic2Assignment,
membershipManager.topicPartitionsAwaitingReconciliation());
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
assertEquals(Collections.emptySet(),
membershipManager.topicsAwaitingReconciliation());
verify(subscriptionState).assignFromSubscribedAwaitingCallback(topicPartitions(topic2Assignment,
topic2Metadata), topicPartitions(topic2Assignment, topic2Metadata));
@@ -883,6 +889,7 @@ public class ConsumerMembershipManagerTest {
// with membership manager entering ACKNOWLEDGING state.
commitFuture.complete(null);
+ processAssignmentEventNoCallback(membershipManager);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
assertEquals(Set.of(topicId2),
membershipManager.topicsAwaitingReconciliation());
@@ -895,6 +902,7 @@ public class ConsumerMembershipManagerTest {
clearInvocations(membershipManager, commitRequestManager);
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
verifyReconciliationTriggeredAndCompleted(membershipManager,
Arrays.asList(topicId1Partition0, topicId2Partition0));
}
@@ -956,6 +964,7 @@ public class ConsumerMembershipManagerTest {
when(metadata.topicNames()).thenReturn(fullTopicMetadata);
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
verifyReconciliationTriggeredAndCompleted(membershipManager,
Arrays.asList(topicId1Partition0, topicId2Partition0));
}
@@ -1078,6 +1087,7 @@ public class ConsumerMembershipManagerTest {
verifyReconciliationNotTriggered(membershipManager);
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
List<TopicIdPartition> assignedPartitions = Arrays.asList(
new TopicIdPartition(topicId, new TopicPartition(topicName, 0)),
@@ -1312,6 +1322,7 @@ public class ConsumerMembershipManagerTest {
verifyReconciliationNotTriggered(membershipManager);
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
Set<TopicPartition> expectedAssignment = Collections.singleton(new
TopicPartition(topicName, 0));
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@@ -1354,6 +1365,7 @@ public class ConsumerMembershipManagerTest {
verifyReconciliationNotTriggered(membershipManager);
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
verifyReconciliationTriggeredAndCompleted(membershipManager,
Collections.emptyList());
@@ -1406,6 +1418,7 @@ public class ConsumerMembershipManagerTest {
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty());
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
// Assignment should have been reconciled.
Set<TopicPartition> expectedAssignment = Collections.singleton(new
TopicPartition(topicName, 1));
@@ -1468,6 +1481,7 @@ public class ConsumerMembershipManagerTest {
verifyReconciliationNotTriggered(membershipManager);
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
List<TopicIdPartition> assignedPartitions = topicIdPartitions(topicId,
topicName, 0, 1);
verifyReconciliationTriggeredAndCompleted(membershipManager,
assignedPartitions);
@@ -1487,6 +1501,7 @@ public class ConsumerMembershipManagerTest {
verifyReconciliationNotTriggered(membershipManager);
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
List<TopicIdPartition> assignedPartitions = new ArrayList<>();
assignedPartitions.add(ownedPartition);
@@ -1508,6 +1523,7 @@ public class ConsumerMembershipManagerTest {
verifyReconciliationNotTriggered(membershipManager);
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
verifyReconciliationTriggeredAndCompleted(membershipManager,
expectedAssignmentReconciled);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@@ -1541,6 +1557,7 @@ public class ConsumerMembershipManagerTest {
verifyReconciliationNotTriggered(membershipManager);
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
testRevocationOfAllPartitionsCompleted(membershipManager);
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new
TopicPartition("topic1", 0)));
@@ -1567,6 +1584,7 @@ public class ConsumerMembershipManagerTest {
// Complete commit request
commitResult.complete(null);
+ processAssignmentEventNoCallback(membershipManager);
InOrder inOrder = inOrder(subscriptionState, commitRequestManager);
inOrder.verify(subscriptionState).markPendingRevocation(Set.of(new
TopicPartition("topic1", 0)));
inOrder.verify(commitRequestManager).maybeAutoCommitSyncBeforeRebalance(anyLong());
@@ -1595,6 +1613,7 @@ public class ConsumerMembershipManagerTest {
// Complete commit request
commitResult.completeExceptionally(new KafkaException("Commit request
failed with " +
"non-retriable error"));
+ processAssignmentEventNoCallback(membershipManager);
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new
TopicPartition("topic1", 0)));
testRevocationOfAllPartitionsCompleted(membershipManager);
@@ -1617,6 +1636,7 @@ public class ConsumerMembershipManagerTest {
verifyReconciliationNotTriggered(membershipManager);
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
TreeSet<TopicPartition> expectedSet = new
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
expectedSet.add(new TopicPartition(topicName, 1));
@@ -1651,6 +1671,7 @@ public class ConsumerMembershipManagerTest {
mockTopicNameInMetadataCache(Collections.singletonMap(topicId,
topicName), true);
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
List<TopicIdPartition> expectedAssignmentReconciled =
topicIdPartitions(topicId, topicName, 0, 1);
verifyReconciliationTriggeredAndCompleted(membershipManager,
expectedAssignmentReconciled);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@@ -1698,6 +1719,7 @@ public class ConsumerMembershipManagerTest {
verifyReconciliationNotTriggered(membershipManager);
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
verify(subscriptionState).markPendingRevocation(Set.of());
// Member should complete reconciliation
@@ -1722,6 +1744,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, Collections.singletonList(1),
membershipManager);
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new
TopicPartition(topicName, 0)));
// Revocation should complete without requesting any metadata update
given that the topic
@@ -1987,8 +2010,6 @@ public class ConsumerMembershipManagerTest {
membershipManager.maybeReconcile(true);
-
verify(subscriptionState).assignFromSubscribedAwaitingCallback(assignedPartitions,
addedPartitions);
-
performCallback(
membershipManager,
invoker,
@@ -1997,6 +2018,8 @@ public class ConsumerMembershipManagerTest {
true
);
+ // Assignment is applied when the callback event is processed
+
verify(subscriptionState).assignFromSubscribedAwaitingCallback(assignedPartitions,
addedPartitions);
verify(subscriptionState).enablePartitionsAwaitingCallback(assignedPartitions);
}
@@ -2019,8 +2042,6 @@ public class ConsumerMembershipManagerTest {
membershipManager.maybeReconcile(true);
-
verify(subscriptionState).assignFromSubscribedAwaitingCallback(assignedPartitions,
addedPartitions);
-
performCallback(
membershipManager,
invoker,
@@ -2028,6 +2049,9 @@ public class ConsumerMembershipManagerTest {
addedPartitions,
true
);
+
+ // Assignment is applied when the callback event is processed
+
verify(subscriptionState).assignFromSubscribedAwaitingCallback(assignedPartitions,
addedPartitions);
verify(subscriptionState,
never()).enablePartitionsAwaitingCallback(any());
}
@@ -2124,7 +2148,7 @@ public class ConsumerMembershipManagerTest {
assertEquals(MemberState.STALE, membershipManager.state());
assertFalse(backgroundEventQueue.isEmpty());
- assertInstanceOf(ConsumerRebalanceListenerCallbackNeededEvent.class,
backgroundEventQueue.peek());
+ assertInstanceOf(PartitionsRemovedEvent.class,
backgroundEventQueue.peek());
// Stale member triggers onPartitionLost callback that will not
complete just yet
ConsumerRebalanceListenerCallbackCompletedEvent callbackEvent =
performCallback(
@@ -2279,18 +2303,40 @@ public class ConsumerMembershipManagerTest {
// We expect only our enqueued event in the background queue.
assertEquals(1, backgroundEventQueue.size());
assertNotNull(backgroundEventQueue.peek());
- assertInstanceOf(ConsumerRebalanceListenerCallbackNeededEvent.class,
backgroundEventQueue.peek());
- ConsumerRebalanceListenerCallbackNeededEvent neededEvent =
(ConsumerRebalanceListenerCallbackNeededEvent) backgroundEventQueue.poll();
- assertNotNull(neededEvent);
- assertEquals(expectedMethodName, neededEvent.methodName());
- assertEquals(expectedPartitions, neededEvent.partitions());
- ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent =
invokeRebalanceCallbacks(
- invoker,
- neededEvent.methodName(),
- neededEvent.partitions(),
- neededEvent.future()
- );
+ ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent;
+
+ if (expectedMethodName ==
ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED) {
+ // ON_PARTITIONS_ASSIGNED uses the new PartitionsAssignedEvent
+ assertInstanceOf(PartitionsAssignedEvent.class,
backgroundEventQueue.peek());
+ PartitionsAssignedEvent neededEvent = (PartitionsAssignedEvent)
backgroundEventQueue.poll();
+ assertNotNull(neededEvent);
+ assertEquals(expectedPartitions, neededEvent.addedPartitions());
+
+ // Apply assignment (simulates ApplyAssignmentEvent processing)
+
membershipManager.applyAssignment(neededEvent.assignedPartitions(),
neededEvent.addedPartitions());
+
+ invokedEvent = invokeRebalanceCallbacks(
+ invoker,
+ ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+ neededEvent.addedPartitions(),
+ neededEvent.future()
+ );
+ } else {
+ // ON_PARTITIONS_REVOKED and ON_PARTITIONS_LOST use
PartitionsRemovedEvent
+ assertInstanceOf(PartitionsRemovedEvent.class,
backgroundEventQueue.peek());
+ PartitionsRemovedEvent neededEvent = (PartitionsRemovedEvent)
backgroundEventQueue.poll();
+ assertNotNull(neededEvent);
+ assertEquals(expectedMethodName, neededEvent.methodName());
+ assertEquals(expectedPartitions, neededEvent.partitions());
+
+ invokedEvent = invokeRebalanceCallbacks(
+ invoker,
+ neededEvent.methodName(),
+ neededEvent.partitions(),
+ neededEvent.future()
+ );
+ }
if (complete) {
completeCallback(invokedEvent, membershipManager);
@@ -2303,6 +2349,25 @@ public class ConsumerMembershipManagerTest {
membershipManager.consumerRebalanceListenerCallbackCompleted(callbackCompletedEvent);
}
+ /**
+ * Process a callback event when no listener is registered. This simulates
what the
+ * application thread does when processing a PartitionsAssignedEvent
+ * with invokeCallback=false.
+ */
+ private void processAssignmentEventNoCallback(ConsumerMembershipManager
membershipManager) {
+ assertEquals(1, backgroundEventQueue.size());
+ PartitionsAssignedEvent neededEvent =
+ (PartitionsAssignedEvent) backgroundEventQueue.poll();
+ assertNotNull(neededEvent);
+ // Apply assignment (simulates ApplyAssignmentEvent processing)
+ membershipManager.applyAssignment(neededEvent.assignedPartitions(),
neededEvent.addedPartitions());
+
+ // Complete the callback (simulates
ConsumerRebalanceListenerCallbackCompletedEvent processing)
+ ConsumerRebalanceListenerCallbackCompletedEvent completedEvent =
+ new
ConsumerRebalanceListenerCallbackCompletedEvent(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
neededEvent.future(), Optional.empty());
+
membershipManager.consumerRebalanceListenerCallbackCompleted(completedEvent);
+ }
+
private void testFenceIsNoOp(ConsumerMembershipManager membershipManager) {
assertNotEquals(0, membershipManager.memberEpoch());
verify(subscriptionState, never()).rebalanceListener();
@@ -2330,6 +2395,7 @@ public class ConsumerMembershipManagerTest {
verifyReconciliationNotTriggered(membershipManager);
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.STABLE, membershipManager.state());
@@ -2391,6 +2457,7 @@ public class ConsumerMembershipManagerTest {
membershipManager.maybeReconcile(true);
// Complete commit request to complete the callback invocation
commitResult.complete(null);
+ processAssignmentEventNoCallback(membershipManager);
assertEquals((double) reconciliationDurationMs,
getMetricValue(metrics, rebalanceMetricsManager.rebalanceLatencyTotal));
assertEquals((double) reconciliationDurationMs,
getMetricValue(metrics, rebalanceMetricsManager.rebalanceLatencyAvg));
@@ -2539,6 +2606,7 @@ public class ConsumerMembershipManagerTest {
verifyReconciliationNotTriggered(membershipManager);
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
List<TopicIdPartition> assignedPartitions =
partitions.stream().map(tp -> new TopicIdPartition(topicId,
@@ -2764,6 +2832,7 @@ public class ConsumerMembershipManagerTest {
if (triggerReconciliation) {
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
verify(subscriptionState).assignFromSubscribedAwaitingCallback(anyCollection(),
anyCollection());
} else {
verify(subscriptionState,
never()).assignFromSubscribed(anyCollection());
@@ -2785,6 +2854,7 @@ public class ConsumerMembershipManagerTest {
membershipManager.onHeartbeatSuccess(heartbeatResponse);
assertEquals(MemberState.RECONCILING, membershipManager.state());
membershipManager.maybeReconcile(true);
+ processAssignmentEventNoCallback(membershipManager);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.STABLE, membershipManager.state());