This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 71449aabb6c KAFKA-20106: Ensure reconciled assignment updated within 
poll  (#21495)
71449aabb6c is described below

commit 71449aabb6c115667045e62d2d879b2fbd6a7302
Author: Lianet Magrans <[email protected]>
AuthorDate: Tue Mar 17 17:11:40 2026 -0400

    KAFKA-20106: Ensure reconciled assignment updated within poll  (#21495)
    
    Fix to ensure that reconciled assignments are only updated in the
    subscription state with a call to consumer.poll.
    
    Before this PR, assignment changes occurred in the background thread
    when async operations commit/callback completed : (1)commit → (2)revoke
    callback → (3)assignment  update → (4)assign callback), potentially
    causing  IllegalStateException when applications called seek/position on
    consumer.assignment().
    
    With this PR, we piggyback the assignment update on the existing
    mechanism that triggers the onPartitionsAssigned callback   (consolidate
    steps 3 and 4 mentioned above, in a single one). Replace
    CallbackNeededEvent with a new PartitionsAssignedEvent sent to the app
    thread after every reconciliation. This event performs both assignment
    update and onPartitionsAssigned callback (if needed).
    
    This ensures assignment changes happen within poll() for all cases
    (commit or not, callbacks or not).
    
    The fix applies to the KafkaConsumer only. The ShareConsumer behaviour
    remains unchanged with this PR (performs assignment update in the
    background)
    
    Reviewers: Lucas Brutschy <[email protected]>, David Jacot
     <[email protected]>
---
 .../internals/AbstractMembershipManager.java       |  31 ++----
 .../consumer/internals/AsyncKafkaConsumer.java     |  64 +++++++++++--
 .../internals/ConsumerMembershipManager.java       |  68 +++++++++-----
 .../consumer/internals/ShareMembershipManager.java |  15 +++
 .../internals/events/ApplicationEvent.java         |   1 +
 .../events/ApplicationEventProcessor.java          |  26 ++++++
 .../internals/events/ApplyAssignmentEvent.java     |  66 +++++++++++++
 .../consumer/internals/events/BackgroundEvent.java |   3 +-
 .../internals/events/PartitionsAssignedEvent.java  |  69 ++++++++++++++
 ...eededEvent.java => PartitionsRemovedEvent.java} |  18 ++--
 .../consumer/internals/AsyncKafkaConsumerTest.java |   8 +-
 .../internals/ConsumerMembershipManagerTest.java   | 104 +++++++++++++++++----
 12 files changed, 388 insertions(+), 85 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index c93a844ff9b..d5c46ed32b5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -505,20 +505,6 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
         clearPendingAssignmentsAndLocalNamesCache();
     }
 
-    /**
-     * Update a new assignment by setting the assigned partitions in the 
member subscription.
-     * This will mark the newly added partitions as pending callback, to 
prevent fetching records
-     * or updating positions for them while the callback runs.
-     *
-     * @param assignedPartitions Full assignment, to update in the 
subscription state
-     * @param addedPartitions    Newly added partitions
-     */
-    private void updateSubscriptionAwaitingCallback(TopicIdPartitionSet 
assignedPartitions,
-                                                    SortedSet<TopicPartition> 
addedPartitions) {
-        
subscriptions.assignFromSubscribedAwaitingCallback(assignedPartitions.topicPartitions(),
 addedPartitions);
-        notifyAssignmentChange(assignedPartitions.topicPartitions());
-    }
-
     /**
      * Transition to the {@link MemberState#JOINING} state, indicating that 
the member will
      * try to join the group on the next heartbeat request. This is expected 
to be invoked when
@@ -1192,12 +1178,11 @@ public abstract class AbstractMembershipManager<R 
extends AbstractResponse> impl
             TopicIdPartitionSet assignedPartitions,
             SortedSet<TopicPartition> addedPartitions) {
 
-        // Update assignment in the subscription state, and ensure that no 
fetching or positions
-        // initialization happens for the newly added partitions while the 
callback runs.
-        updateSubscriptionAwaitingCallback(assignedPartitions, 
addedPartitions);
+        // Signal that new partitions have been reconciled so that 
type-specific actions can be taken.
+        // - ShareMembershipManager: updates subscription immediately and 
returns completed future
+        // - ConsumerMembershipManager: enqueues event for app thread to apply 
assignment within poll() and run callbacks
+        CompletableFuture<Void> result = 
signalPartitionsAssigned(assignedPartitions, addedPartitions);
 
-        // Invoke user call back.
-        CompletableFuture<Void> result = 
signalPartitionsAssigned(addedPartitions);
         // Enable newly added partitions to start fetching and updating 
positions for them.
         result.whenComplete((__, exception) -> {
             if (exception == null) {
@@ -1230,10 +1215,12 @@ public abstract class AbstractMembershipManager<R 
extends AbstractResponse> impl
     /**
      * Signals to the membership manager that partitions are being assigned so 
that actions
      * specific to the group type can be taken.
+     *
+     * @param assignedPartitions The full assignment to apply
+     * @param addedPartitions The newly added partitions (used for callback 
and subscription update)
      */
-    public CompletableFuture<Void> 
signalPartitionsAssigned(Set<TopicPartition> partitionsAssigned) {
-        return CompletableFuture.completedFuture(null);
-    }
+    protected abstract CompletableFuture<Void> 
signalPartitionsAssigned(TopicIdPartitionSet assignedPartitions,
+                                                                        
SortedSet<TopicPartition> addedPartitions);
 
     /**
      * Signals to the membership manager that partitions are being revoked so 
that actions
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 644613a8dee..c08dcfa325a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -39,6 +39,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.AllTopicsMetadataEvent
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.ApplyAssignmentEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
 import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
 import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
@@ -51,7 +52,6 @@ import 
org.apache.kafka.clients.consumer.internals.events.CompletableApplication
 import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
 import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
 import org.apache.kafka.clients.consumer.internals.events.CurrentLagEvent;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
@@ -59,6 +59,8 @@ import 
org.apache.kafka.clients.consumer.internals.events.EventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
 import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.PartitionsAssignedEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.PartitionsRemovedEvent;
 import org.apache.kafka.clients.consumer.internals.events.PausePartitionsEvent;
 import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ResumePartitionsEvent;
@@ -143,6 +145,7 @@ import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.TOPIC_PARTITION_COMPARATOR;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
@@ -194,8 +197,12 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     process((ErrorEvent) event);
                     break;
 
-                case CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED:
-                    process((ConsumerRebalanceListenerCallbackNeededEvent) 
event);
+                case PARTITIONS_ASSIGNED:
+                    process((PartitionsAssignedEvent) event);
+                    break;
+
+                case PARTITIONS_REMOVED:
+                    process((PartitionsRemovedEvent) event);
                     break;
 
                 case STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED:
@@ -220,12 +227,51 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             throw event.error();
         }
 
-        private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
+        /**
+         * Processing this event will perform the actions needed in the app 
thread when new partitions are reconciled in the background:
+         * - apply assignment changes (ensuring they happen in the background 
but triggered within the app thread poll)
+         * - run onPartitionsAssigned callback if present
+         * - notify background thread so it can carry on (e.g., send ack to 
the broker)
+         */
+        private void process(final PartitionsAssignedEvent event) {
+
+            applyNewAssignment(event);
+
+            if (subscriptions.rebalanceListener().isEmpty()) {
+                event.future().complete(null);
+            } else {
+                
invokeRebalanceCallbackAndNotifyBackgroundThread(ON_PARTITIONS_ASSIGNED, 
event.addedPartitions(), event.future());
+            }
+        }
+
+        /**
+         * Send event to the background to update the assignment in the 
subscription state.
+         * Block on it to complete to ensure the assignment change happens 
within a call to
+         * consumer.poll.
+         * Note that this event only happens when there is a pending 
assignment (reconciliation
+         * completed in the background)
+         */
+        private void applyNewAssignment(final PartitionsAssignedEvent event) {
+            ApplyAssignmentEvent applyEvent = new ApplyAssignmentEvent(
+                event.assignedPartitions(),
+                event.addedPartitions()
+            );
+            applicationEventHandler.addAndGet(applyEvent);
+        }
+
+        private void process(final PartitionsRemovedEvent event) {
+            
invokeRebalanceCallbackAndNotifyBackgroundThread(event.methodName(), 
event.partitions(), event.future());
+        }
+
+        private void invokeRebalanceCallbackAndNotifyBackgroundThread(
+                ConsumerRebalanceListenerMethodName methodName,
+                SortedSet<TopicPartition> partitions,
+                CompletableFuture<Void> future) {
             ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = 
invokeRebalanceCallbacks(
                 rebalanceListenerInvoker,
-                event.methodName(),
-                event.partitions(),
-                event.future()
+                methodName,
+                partitions,
+                future
             );
             applicationEventHandler.add(invokedEvent);
             if (invokedEvent.error().isPresent()) {
@@ -2252,9 +2298,9 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
      * {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} 
callback needs to be invoked for any
      * partitions the consumer owns. However,
      * this callback must be executed on the application thread. To achieve 
this, the background thread enqueues a
-     * {@link ConsumerRebalanceListenerCallbackNeededEvent} on its background 
event queue. That event queue is
+     * {@link PartitionsRemovedEvent} on its background event queue. That 
event queue is
      * periodically queried by the application thread to see if there's work 
to be done. When the application thread
-     * sees {@link ConsumerRebalanceListenerCallbackNeededEvent}, it is 
processed, and then a
+     * sees {@link PartitionsRemovedEvent}, it is processed, and then a
      * {@link ConsumerRebalanceListenerCallbackCompletedEvent} is then 
enqueued by the application thread on the
      * application event queue. Moments later, the background thread will see 
that event, process it, and continue
      * execution of the rebalancing logic. The rebalancing logic cannot 
complete until the
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
index 82c209ac128..93a83f3c22a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
@@ -19,11 +19,13 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.consumer.CloseOptions;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.internals.events.ApplyAssignmentEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.PartitionsAssignedEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.PartitionsRemovedEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.ConsumerRebalanceMetricsManager;
 import 
org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
 import org.apache.kafka.common.KafkaException;
@@ -48,7 +50,6 @@ import java.util.concurrent.CompletableFuture;
 import static 
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.DEFAULT;
 import static 
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.LEAVE_GROUP;
 import static 
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED;
 
@@ -134,7 +135,7 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
 
     /**
      * Serves as the conduit by which we can report events to the application 
thread. This is needed as we send
-     * {@link ConsumerRebalanceListenerCallbackNeededEvent callbacks} and, if 
needed,
+     * {@link PartitionsAssignedEvent}, {@link PartitionsRemovedEvent} and, if 
needed,
      * {@link ErrorEvent errors} to the application thread.
      */
     private final BackgroundEventHandler backgroundEventHandler;
@@ -360,17 +361,6 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
         }
     }
 
-    private CompletableFuture<Void> 
invokeOnPartitionsAssignedCallback(Set<TopicPartition> partitionsAssigned) {
-        // This should always trigger the callback, even if partitionsAssigned 
is empty, to keep
-        // the current behaviour.
-        Optional<ConsumerRebalanceListener> listener = 
subscriptions.rebalanceListener();
-        if (listener.isPresent()) {
-            return 
enqueueConsumerRebalanceListenerCallback(ON_PARTITIONS_ASSIGNED, 
partitionsAssigned);
-        } else {
-            return CompletableFuture.completedFuture(null);
-        }
-    }
-
     private CompletableFuture<Void> 
invokeOnPartitionsLostCallback(Set<TopicPartition> partitionsLost) {
         // This should not trigger the callback if partitionsLost is empty, to 
keep the current
         // behaviour.
@@ -386,8 +376,12 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
      * {@inheritDoc}
      */
     @Override
-    public CompletableFuture<Void> 
signalPartitionsAssigned(Set<TopicPartition> partitionsAssigned) {
-        return invokeOnPartitionsAssignedCallback(partitionsAssigned);
+    protected CompletableFuture<Void> 
signalPartitionsAssigned(TopicIdPartitionSet assignedPartitions,
+                                                               
SortedSet<TopicPartition> addedPartitions) {
+        // Send an event to notify the app thread that the assignment changed 
with new partitions.
+        // The app thread is expected to trigger the assignment update (within 
a call to poll),
+        // and to run the onPartitionsAssigned callback if needed.
+        return 
enqueuePartitionsAssignedEvent(assignedPartitions.topicPartitions(), 
addedPartitions);
     }
 
     /**
@@ -440,16 +434,16 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
     }
 
     /**
-     * Enqueue a {@link ConsumerRebalanceListenerCallbackNeededEvent} to 
trigger the execution of the
-     * appropriate {@link ConsumerRebalanceListener} {@link 
ConsumerRebalanceListenerMethodName method} on the
-     * application thread.
+     * Enqueue a {@link PartitionsRemovedEvent} to trigger the execution of 
either
+     * {@link ConsumerRebalanceListener#onPartitionsRevoked} or {@link 
ConsumerRebalanceListener#onPartitionsLost}
+     * on the application thread.
      *
      * <p/>
      *
      * Because the reconciliation process (run in the background thread) will 
be blocked by the application thread
      * until it completes this, we need to provide a {@link CompletableFuture} 
by which to remember where we left off.
      *
-     * @param methodName Callback method that needs to be executed on the 
application thread
+     * @param methodName Callback method that needs to be executed 
(ON_PARTITIONS_REVOKED or ON_PARTITIONS_LOST)
      * @param partitions Partitions to supply to the callback method
      * @return Future that will be chained within the rest of the 
reconciliation logic
      */
@@ -458,12 +452,29 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
         SortedSet<TopicPartition> sortedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
         sortedPartitions.addAll(partitions);
 
-        CompletableBackgroundEvent<Void> event = new 
ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions);
+        CompletableBackgroundEvent<Void> event = new 
PartitionsRemovedEvent(methodName, sortedPartitions);
         backgroundEventHandler.add(event);
         log.debug("The event to trigger the {} method execution was enqueued 
successfully", methodName.fullyQualifiedMethodName());
         return event.future();
     }
 
+    /**
+     * Enqueue a {@link PartitionsAssignedEvent} to the application thread.
+     * This event handles the assignment update and optional 
onPartitionsAssigned callback.
+     *
+     * @param fullAssignment The full assignment to apply
+     * @param addedPartitions The newly added partitions (passed to the 
callback)
+     * @return Future that will be chained within the rest of the 
reconciliation logic
+     */
+    private CompletableFuture<Void> 
enqueuePartitionsAssignedEvent(Set<TopicPartition> fullAssignment,
+                                                                   
SortedSet<TopicPartition> addedPartitions) {
+        CompletableBackgroundEvent<Void> event = new 
PartitionsAssignedEvent(fullAssignment, addedPartitions);
+        backgroundEventHandler.add(event);
+        log.debug("The event to update the new assignment and trigger 
onPartitionsAssigned callback if needed " +
+                "has been enqueued successfully to be sent to the app 
thread.");
+        return event.future();
+    }
+
     /**
      * Signals that a {@link ConsumerRebalanceListener} callback has 
completed. This is invoked when the
      * application thread has completed the callback and has submitted a
@@ -497,6 +508,21 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
         }
     }
 
+    /**
+     * Apply the assignment update to the subscription state. This is called 
from the background
+     * thread when processing an {@link ApplyAssignmentEvent} that was 
triggered by the application
+     * thread during poll. This ensures that the assignment update happens on 
the background thread
+     * but is coordinated by the application thread, so consumer.assignment() 
only changes within
+     * a call to consumer.poll().
+     *
+     * @param assignedPartitions The full assignment to apply
+     * @param addedPartitions The newly added partitions
+     */
+    public void applyAssignment(Set<TopicPartition> assignedPartitions, 
SortedSet<TopicPartition> addedPartitions) {
+        subscriptions.assignFromSubscribedAwaitingCallback(assignedPartitions, 
addedPartitions);
+        notifyAssignmentChange(assignedPartitions);
+    }
+
     /**
      * {@inheritDoc}
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
index 130656f5b58..dfdbd958401 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import 
org.apache.kafka.clients.consumer.internals.metrics.ShareRebalanceMetricsManager;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
 import org.apache.kafka.common.metrics.Metrics;
@@ -32,6 +33,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Group manager for a single consumer that has a group id defined in the 
config
@@ -175,6 +177,19 @@ public class ShareMembershipManager extends 
AbstractMembershipManager<ShareGroup
         }
     }
 
+    /**
+     * {@inheritDoc}
+     * <p>
+     * For the ShareConsumer, assignment changes are applied immediately in 
the background thread.
+     */
+    @Override
+    protected CompletableFuture<Void> 
signalPartitionsAssigned(TopicIdPartitionSet assignedPartitions,
+                                                               
SortedSet<TopicPartition> addedPartitions) {
+        
subscriptions.assignFromSubscribedAwaitingCallback(assignedPartitions.topicPartitions(),
 addedPartitions);
+        notifyAssignmentChange(assignedPartitions.topicPartitions());
+        return CompletableFuture.completedFuture(null);
+    }
+
     @Override
     public int joinGroupEpoch() {
         return ShareGroupHeartbeatRequest.JOIN_GROUP_MEMBER_EPOCH;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index 79ca558123a..af3777a2728 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -43,6 +43,7 @@ public abstract class ApplicationEvent {
         STREAMS_ON_TASKS_ASSIGNED_CALLBACK_COMPLETED,
         STREAMS_ON_TASKS_REVOKED_CALLBACK_COMPLETED,
         STREAMS_ON_ALL_TASKS_LOST_CALLBACK_COMPLETED,
+        APPLY_ASSIGNMENT,
     }
 
     private final Type type;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 5e11b3a0ef6..8e0eecb7a99 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -223,6 +223,10 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
                 process((StreamsOnAllTasksLostCallbackCompletedEvent) event);
                 return;
 
+            case APPLY_ASSIGNMENT:
+                process((ApplyAssignmentEvent) event);
+                return;
+
             default:
                 log.warn("Application event type {} was not expected", 
event.type());
         }
@@ -716,6 +720,28 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
         
requestManagers.streamsMembershipManager.get().onAllTasksLostCallbackCompleted(event);
     }
 
+    /**
+     * Update the subscription state with a new assignment that has been 
reconciled.
+     * This is triggered by the application thread during poll (to ensure that 
assignment changes
+     * happen only within a call to consumer.poll), and it's applied here on 
the background thread
+     * (to keep subscription state changes in the background)
+     */
+    private void process(final ApplyAssignmentEvent event) {
+        if (requestManagers.consumerMembershipManager.isEmpty()) {
+            log.warn("ConsumerMembershipManager not present when processing 
ApplyAssignmentEvent");
+            event.future().completeExceptionally(
+                new IllegalStateException("ConsumerMembershipManager not 
available"));
+            return;
+        }
+        try {
+            requestManagers.consumerMembershipManager.get().applyAssignment(
+                event.assignedPartitions(), event.addedPartitions());
+            event.future().complete(null);
+        } catch (Exception e) {
+            event.future().completeExceptionally(e);
+        }
+    }
+
     private void process(final AsyncPollEvent event) {
         // Trigger a reconciliation that can safely commit offsets if needed 
to rebalance,
         // as we're processing before any new fetching starts
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplyAssignmentEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplyAssignmentEvent.java
new file mode 100644
index 00000000000..b6acbb07cb3
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplyAssignmentEvent.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+
+/**
+ * Event sent from the application thread to the background thread to
+ * update the subscription state with a new group assignment after it has been 
reconciled.
+ * Done via events to ensure that assignment updates happen on the background 
thread,
+ * triggered by the application thread during poll, and completed before the 
call to poll returns to the user.
+ */
+public class ApplyAssignmentEvent extends CompletableApplicationEvent<Void> {
+
+    /**
+     * The full assignment to apply
+     * This is used to update the subscription state.
+     */
+    private final Set<TopicPartition> assignedPartitions;
+
+    /**
+     * The newly added partitions.
+     * This is used to mark them as awaiting callbacks if needed when updating 
the subscription state.
+     */
+    private final SortedSet<TopicPartition> addedPartitions;
+
+    public ApplyAssignmentEvent(Set<TopicPartition> assignedPartitions,
+                                SortedSet<TopicPartition> addedPartitions) {
+        super(Type.APPLY_ASSIGNMENT, Long.MAX_VALUE);
+        this.assignedPartitions = Objects.requireNonNull(assignedPartitions);
+        this.addedPartitions = Objects.requireNonNull(addedPartitions);
+    }
+
+    public Set<TopicPartition> assignedPartitions() {
+        return assignedPartitions;
+    }
+
+    public SortedSet<TopicPartition> addedPartitions() {
+        return addedPartitions;
+    }
+
+    @Override
+    protected String toStringBase() {
+        return super.toStringBase() +
+                ", assignedPartitions=" + assignedPartitions +
+                ", addedPartitions=" + addedPartitions;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
index dc33a134bcf..ca4fc213594 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
@@ -27,7 +27,8 @@ public abstract class BackgroundEvent {
 
     public enum Type {
         ERROR,
-        CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED,
+        PARTITIONS_ASSIGNED,
+        PARTITIONS_REMOVED,
         STREAMS_ON_TASKS_ASSIGNED_CALLBACK_NEEDED,
         STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED,
         STREAMS_ON_ALL_TASKS_LOST_CALLBACK_NEEDED
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PartitionsAssignedEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PartitionsAssignedEvent.java
new file mode 100644
index 00000000000..82317cb2627
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PartitionsAssignedEvent.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedSet;
+
+/**
+ * Event sent from the background to the app thread, to notify that a new 
assignment has been reconciled.
+ * The app thread is expected to apply the assignment change to the 
subscription state in the next call to consumer.poll,
+ * and invoke the onPartitionsAssigned callback if needed.
+ */
+public class PartitionsAssignedEvent extends CompletableBackgroundEvent<Void> {
+
+    private final Set<TopicPartition> assignedPartitions;
+    private final SortedSet<TopicPartition> addedPartitions;
+
+    /**
+     * Constructor for the partitions assigned event.
+     *
+     * @param assignedPartitions The full assignment to apply
+     * @param addedPartitions The newly added partitions (passed to the 
callback)
+     */
+    public PartitionsAssignedEvent(final Set<TopicPartition> 
assignedPartitions,
+                                   final SortedSet<TopicPartition> 
addedPartitions) {
+        super(Type.PARTITIONS_ASSIGNED, Long.MAX_VALUE);
+        this.assignedPartitions = Objects.requireNonNull(assignedPartitions);
+        this.addedPartitions = 
Collections.unmodifiableSortedSet(Objects.requireNonNull(addedPartitions));
+    }
+
+    /**
+     * @return The full assignment to apply.
+     */
+    public Set<TopicPartition> assignedPartitions() {
+        return assignedPartitions;
+    }
+
+    /**
+     * @return The newly added partitions (passed to the onPartitionsAssigned 
callback).
+     */
+    public SortedSet<TopicPartition> addedPartitions() {
+        return addedPartitions;
+    }
+
+    @Override
+    protected String toStringBase() {
+        return super.toStringBase() +
+                ", assignedPartitions=" + assignedPartitions +
+                ", addedPartitions=" + addedPartitions;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PartitionsRemovedEvent.java
similarity index 64%
rename from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java
rename to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PartitionsRemovedEvent.java
index ecb9eedab22..6f3832a6e5c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PartitionsRemovedEvent.java
@@ -16,30 +16,26 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import 
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName;
 import org.apache.kafka.common.TopicPartition;
 
-import java.time.Duration;
 import java.util.Collections;
 import java.util.Objects;
 import java.util.SortedSet;
 
 /**
- * Event that signifies that the network I/O thread wants to invoke one of the 
callback methods on the
- * {@link ConsumerRebalanceListener}. This event will be processed by the 
application thread when the next
- * {@link Consumer#poll(Duration)} call is performed by the user. When 
processed, the application thread should
- * invoke the appropriate callback method (based on {@link #methodName()}) 
with the given partitions.
+ * Event sent from the background to the app thread
+ * to notify that a new assignment has been reconciled in the background 
removing partitions.
+ * The app thread is expected to invoke the 
onPartitionsRevoked/onPartitionsLost callback as needed.
  */
-public class ConsumerRebalanceListenerCallbackNeededEvent extends 
CompletableBackgroundEvent<Void> {
+public class PartitionsRemovedEvent extends CompletableBackgroundEvent<Void> {
 
     private final ConsumerRebalanceListenerMethodName methodName;
     private final SortedSet<TopicPartition> partitions;
 
-    public ConsumerRebalanceListenerCallbackNeededEvent(final 
ConsumerRebalanceListenerMethodName methodName,
-                                                        final 
SortedSet<TopicPartition> partitions) {
-        super(Type.CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED, 
Long.MAX_VALUE);
+    public PartitionsRemovedEvent(final ConsumerRebalanceListenerMethodName 
methodName,
+                                   final SortedSet<TopicPartition> partitions) 
{
+        super(Type.PARTITIONS_REMOVED, Long.MAX_VALUE);
         this.methodName = Objects.requireNonNull(methodName);
         this.partitions = Collections.unmodifiableSortedSet(partitions);
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 1df45a8fe94..ef939d9f7c2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -42,12 +42,12 @@ import 
org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
-import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
 import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.PartitionsRemovedEvent;
 import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
 import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
 import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
@@ -480,7 +480,7 @@ public class AsyncKafkaConsumerTest {
         doAnswer(invocation -> 
Fetch.empty()).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class));
         SortedSet<TopicPartition> sortedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
         sortedPartitions.add(tp);
-        CompletableBackgroundEvent<Void> e = new 
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, 
sortedPartitions);
+        CompletableBackgroundEvent<Void> e = new 
PartitionsRemovedEvent(ON_PARTITIONS_REVOKED, sortedPartitions);
         backgroundEventQueue.add(e);
         completeCommitSyncApplicationEventSuccessfully();
         final AtomicBoolean callbackExecuted = new AtomicBoolean(false);
@@ -1452,7 +1452,7 @@ public class AsyncKafkaConsumerTest {
         SortedSet<TopicPartition> partitions = Collections.emptySortedSet();
 
         for (ConsumerRebalanceListenerMethodName methodName : methodNames) {
-            CompletableBackgroundEvent<Void> e = new 
ConsumerRebalanceListenerCallbackNeededEvent(methodName, partitions);
+            CompletableBackgroundEvent<Void> e = new 
PartitionsRemovedEvent(methodName, partitions);
             backgroundEventQueue.add(e);
         }
 
@@ -1959,7 +1959,7 @@ public class AsyncKafkaConsumerTest {
         Metrics metrics = consumer.metricsRegistry();
         AsyncConsumerMetrics asyncConsumerMetrics = 
consumer.asyncConsumerMetrics();
 
-        ConsumerRebalanceListenerCallbackNeededEvent event = new 
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, 
Collections.emptySortedSet());
+        PartitionsRemovedEvent event = new 
PartitionsRemovedEvent(ON_PARTITIONS_REVOKED, Collections.emptySortedSet());
         event.setEnqueuedMs(time.milliseconds());
         backgroundEventQueue.add(event);
         asyncConsumerMetrics.recordBackgroundEventQueueSize(1);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
index 4fe8e7dac80..15ac6513576 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@@ -21,7 +21,8 @@ import 
org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.PartitionsAssignedEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.PartitionsRemovedEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
 import 
org.apache.kafka.clients.consumer.internals.metrics.ConsumerRebalanceMetricsManager;
 import 
org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
@@ -334,6 +335,7 @@ public class ConsumerMembershipManagerTest {
         when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
         membershipManager.onHeartbeatSuccess(heartbeatResponse);
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
         membershipManager.onHeartbeatRequestGenerated();
         assertEquals(MemberState.STABLE, membershipManager.state());
         assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
@@ -704,6 +706,7 @@ public class ConsumerMembershipManagerTest {
         
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1,
 membershipManager.memberId()));
         assertEquals(MemberState.RECONCILING, membershipManager.state());
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
         membershipManager.onHeartbeatRequestGenerated();
         assertEquals(MemberState.STABLE, membershipManager.state());
@@ -740,6 +743,7 @@ public class ConsumerMembershipManagerTest {
         
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1,
 membershipManager.memberId()));
         assertEquals(MemberState.RECONCILING, membershipManager.state());
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
         verifyReconciliationTriggeredAndCompleted(membershipManager,
             Collections.singletonList(new TopicIdPartition(topic1, new 
TopicPartition("topic1", 0)))
         );
@@ -809,6 +813,7 @@ public class ConsumerMembershipManagerTest {
         // Reconciliation in progress completes. Should be applied revoking 
topic 1 only. Newly
         // discovered topic2 will be reconciled in the next reconciliation 
loop.
         commitResult.complete(null);
+        processAssignmentEventNoCallback(membershipManager);
 
         // Member should update the subscription and send ack when the delayed 
reconciliation
         // completes.
@@ -828,6 +833,7 @@ public class ConsumerMembershipManagerTest {
         assertEquals(topic2Assignment, 
membershipManager.topicPartitionsAwaitingReconciliation());
 
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
 
         assertEquals(Collections.emptySet(), 
membershipManager.topicsAwaitingReconciliation());
         
verify(subscriptionState).assignFromSubscribedAwaitingCallback(topicPartitions(topic2Assignment,
 topic2Metadata), topicPartitions(topic2Assignment, topic2Metadata));
@@ -883,6 +889,7 @@ public class ConsumerMembershipManagerTest {
         // with membership manager entering ACKNOWLEDGING state.
 
         commitFuture.complete(null);
+        processAssignmentEventNoCallback(membershipManager);
 
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
         assertEquals(Set.of(topicId2), 
membershipManager.topicsAwaitingReconciliation());
@@ -895,6 +902,7 @@ public class ConsumerMembershipManagerTest {
         clearInvocations(membershipManager, commitRequestManager);
 
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
 
         verifyReconciliationTriggeredAndCompleted(membershipManager, 
Arrays.asList(topicId1Partition0, topicId2Partition0));
     }
@@ -956,6 +964,7 @@ public class ConsumerMembershipManagerTest {
         when(metadata.topicNames()).thenReturn(fullTopicMetadata);
 
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
 
         verifyReconciliationTriggeredAndCompleted(membershipManager, 
Arrays.asList(topicId1Partition0, topicId2Partition0));
     }
@@ -1078,6 +1087,7 @@ public class ConsumerMembershipManagerTest {
 
         verifyReconciliationNotTriggered(membershipManager);
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
 
         List<TopicIdPartition> assignedPartitions = Arrays.asList(
             new TopicIdPartition(topicId, new TopicPartition(topicName, 0)),
@@ -1312,6 +1322,7 @@ public class ConsumerMembershipManagerTest {
 
         verifyReconciliationNotTriggered(membershipManager);
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
 
         Set<TopicPartition> expectedAssignment = Collections.singleton(new 
TopicPartition(topicName, 0));
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@@ -1354,6 +1365,7 @@ public class ConsumerMembershipManagerTest {
         verifyReconciliationNotTriggered(membershipManager);
 
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
 
         verifyReconciliationTriggeredAndCompleted(membershipManager, 
Collections.emptyList());
 
@@ -1406,6 +1418,7 @@ public class ConsumerMembershipManagerTest {
         
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty());
 
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
 
         // Assignment should have been reconciled.
         Set<TopicPartition> expectedAssignment = Collections.singleton(new 
TopicPartition(topicName, 1));
@@ -1468,6 +1481,7 @@ public class ConsumerMembershipManagerTest {
 
         verifyReconciliationNotTriggered(membershipManager);
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
 
         List<TopicIdPartition> assignedPartitions = topicIdPartitions(topicId, 
topicName, 0, 1);
         verifyReconciliationTriggeredAndCompleted(membershipManager, 
assignedPartitions);
@@ -1487,6 +1501,7 @@ public class ConsumerMembershipManagerTest {
 
         verifyReconciliationNotTriggered(membershipManager);
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
 
         List<TopicIdPartition> assignedPartitions = new ArrayList<>();
         assignedPartitions.add(ownedPartition);
@@ -1508,6 +1523,7 @@ public class ConsumerMembershipManagerTest {
 
         verifyReconciliationNotTriggered(membershipManager);
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
 
         verifyReconciliationTriggeredAndCompleted(membershipManager, 
expectedAssignmentReconciled);
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@@ -1541,6 +1557,7 @@ public class ConsumerMembershipManagerTest {
 
         verifyReconciliationNotTriggered(membershipManager);
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
 
         testRevocationOfAllPartitionsCompleted(membershipManager);
         verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new 
TopicPartition("topic1", 0)));
@@ -1567,6 +1584,7 @@ public class ConsumerMembershipManagerTest {
 
         // Complete commit request
         commitResult.complete(null);
+        processAssignmentEventNoCallback(membershipManager);
         InOrder inOrder = inOrder(subscriptionState, commitRequestManager);
         inOrder.verify(subscriptionState).markPendingRevocation(Set.of(new 
TopicPartition("topic1", 0)));
         
inOrder.verify(commitRequestManager).maybeAutoCommitSyncBeforeRebalance(anyLong());
@@ -1595,6 +1613,7 @@ public class ConsumerMembershipManagerTest {
         // Complete commit request
         commitResult.completeExceptionally(new KafkaException("Commit request 
failed with " +
                 "non-retriable error"));
+        processAssignmentEventNoCallback(membershipManager);
         verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new 
TopicPartition("topic1", 0)));
 
         testRevocationOfAllPartitionsCompleted(membershipManager);
@@ -1617,6 +1636,7 @@ public class ConsumerMembershipManagerTest {
 
         verifyReconciliationNotTriggered(membershipManager);
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
 
         TreeSet<TopicPartition> expectedSet = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
         expectedSet.add(new TopicPartition(topicName, 1));
@@ -1651,6 +1671,7 @@ public class ConsumerMembershipManagerTest {
         mockTopicNameInMetadataCache(Collections.singletonMap(topicId, 
topicName), true);
 
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
         List<TopicIdPartition> expectedAssignmentReconciled = 
topicIdPartitions(topicId, topicName, 0, 1);
         verifyReconciliationTriggeredAndCompleted(membershipManager, 
expectedAssignmentReconciled);
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@@ -1698,6 +1719,7 @@ public class ConsumerMembershipManagerTest {
 
         verifyReconciliationNotTriggered(membershipManager);
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
         verify(subscriptionState).markPendingRevocation(Set.of());
 
         // Member should complete reconciliation
@@ -1722,6 +1744,7 @@ public class ConsumerMembershipManagerTest {
         receiveAssignment(topicId, Collections.singletonList(1), 
membershipManager);
 
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
         verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new 
TopicPartition(topicName, 0)));
 
         // Revocation should complete without requesting any metadata update 
given that the topic
@@ -1987,8 +2010,6 @@ public class ConsumerMembershipManagerTest {
 
         membershipManager.maybeReconcile(true);
 
-        
verify(subscriptionState).assignFromSubscribedAwaitingCallback(assignedPartitions,
 addedPartitions);
-
         performCallback(
             membershipManager,
             invoker,
@@ -1997,6 +2018,8 @@ public class ConsumerMembershipManagerTest {
             true
         );
 
+        // Assignment is applied when the callback event is processed
+        
verify(subscriptionState).assignFromSubscribedAwaitingCallback(assignedPartitions,
 addedPartitions);
         
verify(subscriptionState).enablePartitionsAwaitingCallback(assignedPartitions);
     }
 
@@ -2019,8 +2042,6 @@ public class ConsumerMembershipManagerTest {
 
         membershipManager.maybeReconcile(true);
 
-        
verify(subscriptionState).assignFromSubscribedAwaitingCallback(assignedPartitions,
 addedPartitions);
-
         performCallback(
             membershipManager,
             invoker,
@@ -2028,6 +2049,9 @@ public class ConsumerMembershipManagerTest {
             addedPartitions,
             true
         );
+
+        // Assignment is applied when the callback event is processed
+        
verify(subscriptionState).assignFromSubscribedAwaitingCallback(assignedPartitions,
 addedPartitions);
         verify(subscriptionState, 
never()).enablePartitionsAwaitingCallback(any());
     }
 
@@ -2124,7 +2148,7 @@ public class ConsumerMembershipManagerTest {
 
         assertEquals(MemberState.STALE, membershipManager.state());
         assertFalse(backgroundEventQueue.isEmpty());
-        assertInstanceOf(ConsumerRebalanceListenerCallbackNeededEvent.class, 
backgroundEventQueue.peek());
+        assertInstanceOf(PartitionsRemovedEvent.class, 
backgroundEventQueue.peek());
 
         // Stale member triggers onPartitionLost callback that will not 
complete just yet
         ConsumerRebalanceListenerCallbackCompletedEvent callbackEvent = 
performCallback(
@@ -2279,18 +2303,40 @@ public class ConsumerMembershipManagerTest {
         // We expect only our enqueued event in the background queue.
         assertEquals(1, backgroundEventQueue.size());
         assertNotNull(backgroundEventQueue.peek());
-        assertInstanceOf(ConsumerRebalanceListenerCallbackNeededEvent.class, 
backgroundEventQueue.peek());
-        ConsumerRebalanceListenerCallbackNeededEvent neededEvent = 
(ConsumerRebalanceListenerCallbackNeededEvent) backgroundEventQueue.poll();
-        assertNotNull(neededEvent);
-        assertEquals(expectedMethodName, neededEvent.methodName());
-        assertEquals(expectedPartitions, neededEvent.partitions());
 
-        ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = 
invokeRebalanceCallbacks(
-                invoker,
-                neededEvent.methodName(),
-                neededEvent.partitions(),
-                neededEvent.future()
-        );
+        ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent;
+
+        if (expectedMethodName == 
ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED) {
+            // ON_PARTITIONS_ASSIGNED uses the new PartitionsAssignedEvent
+            assertInstanceOf(PartitionsAssignedEvent.class, 
backgroundEventQueue.peek());
+            PartitionsAssignedEvent neededEvent = (PartitionsAssignedEvent) 
backgroundEventQueue.poll();
+            assertNotNull(neededEvent);
+            assertEquals(expectedPartitions, neededEvent.addedPartitions());
+
+            // Apply assignment (simulates ApplyAssignmentEvent processing)
+            
membershipManager.applyAssignment(neededEvent.assignedPartitions(), 
neededEvent.addedPartitions());
+
+            invokedEvent = invokeRebalanceCallbacks(
+                    invoker,
+                    ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
+                    neededEvent.addedPartitions(),
+                    neededEvent.future()
+            );
+        } else {
+            // ON_PARTITIONS_REVOKED and ON_PARTITIONS_LOST use 
PartitionsRemovedEvent
+            assertInstanceOf(PartitionsRemovedEvent.class, 
backgroundEventQueue.peek());
+            PartitionsRemovedEvent neededEvent = (PartitionsRemovedEvent) 
backgroundEventQueue.poll();
+            assertNotNull(neededEvent);
+            assertEquals(expectedMethodName, neededEvent.methodName());
+            assertEquals(expectedPartitions, neededEvent.partitions());
+
+            invokedEvent = invokeRebalanceCallbacks(
+                    invoker,
+                    neededEvent.methodName(),
+                    neededEvent.partitions(),
+                    neededEvent.future()
+            );
+        }
 
         if (complete) {
             completeCallback(invokedEvent, membershipManager);
@@ -2303,6 +2349,25 @@ public class ConsumerMembershipManagerTest {
         
membershipManager.consumerRebalanceListenerCallbackCompleted(callbackCompletedEvent);
     }
 
+    /**
+     * Process a callback event when no listener is registered. This simulates 
what the
+     * application thread does when processing a PartitionsAssignedEvent
+     * with invokeCallback=false.
+     */
+    private void processAssignmentEventNoCallback(ConsumerMembershipManager 
membershipManager) {
+        assertEquals(1, backgroundEventQueue.size());
+        PartitionsAssignedEvent neededEvent =
+            (PartitionsAssignedEvent) backgroundEventQueue.poll();
+        assertNotNull(neededEvent);
+        // Apply assignment (simulates ApplyAssignmentEvent processing)
+        membershipManager.applyAssignment(neededEvent.assignedPartitions(), 
neededEvent.addedPartitions());
+
+        // Complete the callback (simulates 
ConsumerRebalanceListenerCallbackCompletedEvent processing)
+        ConsumerRebalanceListenerCallbackCompletedEvent completedEvent =
+            new 
ConsumerRebalanceListenerCallbackCompletedEvent(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
 neededEvent.future(), Optional.empty());
+        
membershipManager.consumerRebalanceListenerCallbackCompleted(completedEvent);
+    }
+
     private void testFenceIsNoOp(ConsumerMembershipManager membershipManager) {
         assertNotEquals(0, membershipManager.memberEpoch());
         verify(subscriptionState, never()).rebalanceListener();
@@ -2330,6 +2395,7 @@ public class ConsumerMembershipManagerTest {
 
         verifyReconciliationNotTriggered(membershipManager);
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
         membershipManager.onHeartbeatRequestGenerated();
 
         assertEquals(MemberState.STABLE, membershipManager.state());
@@ -2391,6 +2457,7 @@ public class ConsumerMembershipManagerTest {
         membershipManager.maybeReconcile(true);
         // Complete commit request to complete the callback invocation
         commitResult.complete(null);
+        processAssignmentEventNoCallback(membershipManager);
 
         assertEquals((double) reconciliationDurationMs, 
getMetricValue(metrics, rebalanceMetricsManager.rebalanceLatencyTotal));
         assertEquals((double) reconciliationDurationMs, 
getMetricValue(metrics, rebalanceMetricsManager.rebalanceLatencyAvg));
@@ -2539,6 +2606,7 @@ public class ConsumerMembershipManagerTest {
 
         verifyReconciliationNotTriggered(membershipManager);
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
 
         List<TopicIdPartition> assignedPartitions =
             partitions.stream().map(tp -> new TopicIdPartition(topicId,
@@ -2764,6 +2832,7 @@ public class ConsumerMembershipManagerTest {
 
         if (triggerReconciliation) {
             membershipManager.maybeReconcile(true);
+            processAssignmentEventNoCallback(membershipManager);
             
verify(subscriptionState).assignFromSubscribedAwaitingCallback(anyCollection(), 
anyCollection());
         } else {
             verify(subscriptionState, 
never()).assignFromSubscribed(anyCollection());
@@ -2785,6 +2854,7 @@ public class ConsumerMembershipManagerTest {
         membershipManager.onHeartbeatSuccess(heartbeatResponse);
         assertEquals(MemberState.RECONCILING, membershipManager.state());
         membershipManager.maybeReconcile(true);
+        processAssignmentEventNoCallback(membershipManager);
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
         membershipManager.onHeartbeatRequestGenerated();
         assertEquals(MemberState.STABLE, membershipManager.state());

Reply via email to