This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 69f75150793 KAFKA-20106 [2/2]: Ensure reconciled assignment updated 
within poll KS (#21813)
69f75150793 is described below

commit 69f75150793ace2b791848db77bc6f7c3b6f0591
Author: Lianet Magrans <[email protected]>
AuthorDate: Fri Mar 27 09:05:49 2026 -0400

    KAFKA-20106 [2/2]: Ensure reconciled assignment updated within poll KS 
(#21813)
    
    Apply same fix from https://github.com/apache/kafka/pull/21495 to the
    Streams manager.
    
    This ensures that assignment updates happen within a call to
    consumer.poll (even when reconciliations may complete async in the
    background). Done by piggybacking the assignment update the in the
    existing hop that was made to the app thread when a reconciliation
    completed (previously used to trigger callback only, now used to update
    assignment and run callback)
    
    Reviewers: Lucas Brutschy <[email protected]>, Lan Ding
     <[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  35 ++-
 .../internals/StreamsMembershipManager.java        |  65 ++++--
 .../events/ApplicationEventProcessor.java          |  21 +-
 .../consumer/internals/events/BackgroundEvent.java |   2 +-
 .../StreamsOnTasksAssignedCallbackNeededEvent.java |  41 ----
 .../events/StreamsTasksAssignedEvent.java          |  81 +++++++
 .../internals/StreamsMembershipManagerTest.java    | 244 +++++++++++----------
 7 files changed, 298 insertions(+), 191 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 37829111fcb..e8ec90142cf 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -69,9 +69,9 @@ import 
org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnC
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackCompletedEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackNeededEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackCompletedEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackNeededEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackCompletedEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackNeededEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.StreamsTasksAssignedEvent;
 import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
 import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
@@ -205,12 +205,12 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     process((PartitionsRemovedEvent) event);
                     break;
 
-                case STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED:
-                    
processStreamsOnTasksRevokedCallbackNeededEvent((StreamsOnTasksRevokedCallbackNeededEvent)
 event);
+                case STREAMS_TASKS_ASSIGNED:
+                    process((StreamsTasksAssignedEvent) event);
                     break;
 
-                case STREAMS_ON_TASKS_ASSIGNED_CALLBACK_NEEDED:
-                    
processStreamsOnTasksAssignedCallbackNeededEvent((StreamsOnTasksAssignedCallbackNeededEvent)
 event);
+                case STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED:
+                    
processStreamsOnTasksRevokedCallbackNeededEvent((StreamsOnTasksRevokedCallbackNeededEvent)
 event);
                     break;
 
                 case STREAMS_ON_ALL_TASKS_LOST_CALLBACK_NEEDED:
@@ -287,16 +287,33 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             }
         }
 
-        private void processStreamsOnTasksAssignedCallbackNeededEvent(final 
StreamsOnTasksAssignedCallbackNeededEvent event) {
-            StreamsOnTasksAssignedCallbackCompletedEvent invokedEvent = 
invokeOnTasksAssignedCallback(event.assignment(), event.future());
+        private void processStreamsOnAllTasksLostCallbackNeededEvent(final 
StreamsOnAllTasksLostCallbackNeededEvent event) {
+            StreamsOnAllTasksLostCallbackCompletedEvent invokedEvent = 
invokeOnAllTasksLostCallback(event.future());
             applicationEventHandler.add(invokedEvent);
             if (invokedEvent.error().isPresent()) {
                 throw invokedEvent.error().get();
             }
         }
 
-        private void processStreamsOnAllTasksLostCallbackNeededEvent(final 
StreamsOnAllTasksLostCallbackNeededEvent event) {
-            StreamsOnAllTasksLostCallbackCompletedEvent invokedEvent = 
invokeOnAllTasksLostCallback(event.future());
+        /**
+         * Processing this event will perform the actions needed in the app 
thread when new partitions are assigned for Streams:
+         * - apply assignment changes (ensuring they happen in the background 
but triggered within the app thread poll)
+         * - run onTasksAssigned callback
+         * - notify background thread so it can carry on (e.g., send ack to 
the broker)
+         */
+        private void process(final StreamsTasksAssignedEvent event) {
+            // Apply assignment via ApplyAssignmentEvent and wait for it to 
complete
+            ApplyAssignmentEvent applyEvent = new ApplyAssignmentEvent(
+                event.assignedPartitions(),
+                event.addedPartitions()
+            );
+            applicationEventHandler.addAndGet(applyEvent);
+
+            // Invoke the onTasksAssigned callback and notify the background 
thread
+            StreamsOnTasksAssignedCallbackCompletedEvent invokedEvent = 
invokeOnTasksAssignedCallback(
+                event.assignment(),
+                event.future()
+            );
             applicationEventHandler.add(invokedEvent);
             if (invokedEvent.error().isPresent()) {
                 throw invokedEvent.error().get();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
index 96a03dc01b8..b6f47af3a43 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
@@ -20,9 +20,9 @@ import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackCompletedEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackNeededEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackCompletedEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackNeededEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackCompletedEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackNeededEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.StreamsTasksAssignedEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.ConsumerRebalanceMetricsManager;
 import 
org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
 import org.apache.kafka.common.KafkaException;
@@ -396,6 +396,21 @@ public class StreamsMembershipManager implements 
RequestManager {
         stateUpdatesListeners.forEach(stateListener -> 
stateListener.onGroupAssignmentUpdated(partitions));
     }
 
+    /**
+     * Apply the assignment update to the subscription state. This is called 
from the background
+     * thread when processing an ApplyAssignmentEvent that was triggered by 
the application
+     * thread during poll. This ensures that the assignment update happens on 
the background thread
+     * but is coordinated by the application thread, so consumer.assignment() 
only changes within
+     * a call to consumer.poll().
+     *
+     * @param assignedPartitions The full assignment to apply
+     * @param addedPartitions The newly added partitions
+     */
+    public void applyAssignment(Set<TopicPartition> assignedPartitions, 
Set<TopicPartition> addedPartitions) {
+        
subscriptionState.assignFromSubscribedAwaitingCallback(assignedPartitions, 
addedPartitions);
+        notifyAssignmentChange(assignedPartitions);
+    }
+
     /**
      * Transition to the {@link MemberState#JOINING} state, indicating that 
the member will
      * try to join the group on the next heartbeat request. This is expected 
to be invoked when
@@ -1180,14 +1195,12 @@ public class StreamsMembershipManager implements 
RequestManager {
         final SortedSet<TopicPartition> partitionsToAssignNotPreviouslyOwned =
             partitionsToAssignNotPreviouslyOwned(partitionsToAssign, 
topicPartitionsForActiveTasks(ownedActiveTasks));
 
-        subscriptionState.assignFromSubscribedAwaitingCallback(
-            partitionsToAssign,
-            partitionsToAssignNotPreviouslyOwned
-        );
-        notifyAssignmentChange(partitionsToAssign);
-
-        CompletableFuture<Void> onTasksAssignedCallbackExecuted =
-            requestOnTasksAssignedCallbackInvocation(
+        // Enqueue event to app thread to apply assignment within poll() and 
invoke callback.
+        // The app thread will trigger ApplyAssignmentEvent to update 
subscription state on background thread.
+        CompletableFuture<Void> partitionsAssignedAndCallbackExecuted =
+            enqueueStreamsPartitionsAssignedEvent(
+                partitionsToAssign,
+                partitionsToAssignNotPreviouslyOwned,
                 new StreamsRebalanceData.Assignment(
                     activeTasksToAssign,
                     standbyTasksToAssign,
@@ -1195,7 +1208,7 @@ public class StreamsMembershipManager implements 
RequestManager {
                     isGroupReady
                 )
             );
-        onTasksAssignedCallbackExecuted.whenComplete((__, callbackError) -> {
+        partitionsAssignedAndCallbackExecuted.whenComplete((__, callbackError) 
-> {
             if (callbackError == null) {
                 
subscriptionState.enablePartitionsAwaitingCallback(partitionsToAssign);
             } else {
@@ -1207,7 +1220,7 @@ public class StreamsMembershipManager implements 
RequestManager {
             }
         });
 
-        return onTasksAssignedCallbackExecuted;
+        return partitionsAssignedAndCallbackExecuted;
     }
 
     private CompletableFuture<Void> releaseLostActiveTasks() {
@@ -1281,12 +1294,6 @@ public class StreamsMembershipManager implements 
RequestManager {
         rejoinedWhileReconciliationInProgress = false;
     }
 
-    private CompletableFuture<Void> 
requestOnTasksAssignedCallbackInvocation(final StreamsRebalanceData.Assignment 
assignment) {
-        final StreamsOnTasksAssignedCallbackNeededEvent 
onTasksAssignedCallbackNeededEvent = new 
StreamsOnTasksAssignedCallbackNeededEvent(assignment);
-        backgroundEventHandler.add(onTasksAssignedCallbackNeededEvent);
-        return onTasksAssignedCallbackNeededEvent.future();
-    }
-
     private CompletableFuture<Void> requestOnAllTasksLostCallbackInvocation() {
         final StreamsOnAllTasksLostCallbackNeededEvent 
onAllTasksLostCallbackNeededEvent = new 
StreamsOnAllTasksLostCallbackNeededEvent();
         backgroundEventHandler.add(onAllTasksLostCallbackNeededEvent);
@@ -1299,6 +1306,30 @@ public class StreamsMembershipManager implements 
RequestManager {
         return onTasksRevokedCallbackNeededEvent.future();
     }
 
+    /**
+     * Enqueue event to notify the app thread that new partitions have been 
reconciled.
+     * The app thread will trigger the assignment update (via 
ApplyAssignmentEvent) and invoke
+     * the onTasksAssigned callback.
+     *
+     * @param partitionsToAssign The full partition assignment to apply
+     * @param addedPartitions The newly added partitions
+     * @param assignment The task assignment for the callback
+     * @return Future that completes when the assignment is applied and the 
callback executed
+     */
+    private CompletableFuture<Void> enqueueStreamsPartitionsAssignedEvent(
+            final SortedSet<TopicPartition> partitionsToAssign,
+            final SortedSet<TopicPartition> addedPartitions,
+            final StreamsRebalanceData.Assignment assignment) {
+        final StreamsTasksAssignedEvent event = new StreamsTasksAssignedEvent(
+            partitionsToAssign,
+            addedPartitions,
+            assignment
+        );
+        backgroundEventHandler.add(event);
+        log.debug("Enqueued StreamsTasksAssignedEvent to apply assignment and 
trigger onTasksAssigned callback");
+        return event.future();
+    }
+
     /**
      * Completes the future that marks the completed execution of the 
onTasksRevoked callback.
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 8e0eecb7a99..fb6134c909d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -727,15 +727,20 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
      * (to keep subscription state changes in the background)
      */
     private void process(final ApplyAssignmentEvent event) {
-        if (requestManagers.consumerMembershipManager.isEmpty()) {
-            log.warn("ConsumerMembershipManager not present when processing 
ApplyAssignmentEvent");
-            event.future().completeExceptionally(
-                new IllegalStateException("ConsumerMembershipManager not 
available"));
-            return;
-        }
         try {
-            requestManagers.consumerMembershipManager.get().applyAssignment(
-                event.assignedPartitions(), event.addedPartitions());
+            if (requestManagers.consumerMembershipManager.isPresent()) {
+                
requestManagers.consumerMembershipManager.get().applyAssignment(
+                    event.assignedPartitions(), event.addedPartitions());
+            } else if (requestManagers.streamsMembershipManager.isPresent()) {
+                requestManagers.streamsMembershipManager.get().applyAssignment(
+                    event.assignedPartitions(), event.addedPartitions());
+            } else {
+                log.warn("Neither ConsumerMembershipManager nor 
StreamsMembershipManager present " +
+                    "when processing ApplyAssignmentEvent");
+                event.future().completeExceptionally(
+                    new IllegalStateException("No membership manager available 
when processing ApplyAssignmentEvent"));
+                return;
+            }
             event.future().complete(null);
         } catch (Exception e) {
             event.future().completeExceptionally(e);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
index ca4fc213594..26f0fda108e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
@@ -29,7 +29,7 @@ public abstract class BackgroundEvent {
         ERROR,
         PARTITIONS_ASSIGNED,
         PARTITIONS_REMOVED,
-        STREAMS_ON_TASKS_ASSIGNED_CALLBACK_NEEDED,
+        STREAMS_TASKS_ASSIGNED,
         STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED,
         STREAMS_ON_ALL_TASKS_LOST_CALLBACK_NEEDED
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnTasksAssignedCallbackNeededEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnTasksAssignedCallbackNeededEvent.java
deleted file mode 100644
index 565bf97c6b7..00000000000
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsOnTasksAssignedCallbackNeededEvent.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals.events;
-
-import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData;
-
-import java.util.Objects;
-
-public class StreamsOnTasksAssignedCallbackNeededEvent extends 
CompletableBackgroundEvent<Void> {
-
-    private final StreamsRebalanceData.Assignment assignment;
-
-    public 
StreamsOnTasksAssignedCallbackNeededEvent(StreamsRebalanceData.Assignment 
assignment) {
-        super(Type.STREAMS_ON_TASKS_ASSIGNED_CALLBACK_NEEDED, Long.MAX_VALUE);
-        this.assignment = Objects.requireNonNull(assignment);
-    }
-
-    public StreamsRebalanceData.Assignment assignment() {
-        return assignment;
-    }
-
-    @Override
-    protected String toStringBase() {
-        return super.toStringBase() +
-            ", assignment=" + assignment;
-    }
-}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsTasksAssignedEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsTasksAssignedEvent.java
new file mode 100644
index 00000000000..57d592f9628
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/StreamsTasksAssignedEvent.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.SortedSet;
+
+/**
+ * Event sent from the background to the app thread, to notify that a new 
assignment has been reconciled.
+ * The app thread is expected to apply the assignment change to the 
subscription state in the
+ * next call to consumer.poll, and invoke the onTasksAssigned callback.
+ */
+public class StreamsTasksAssignedEvent extends 
CompletableBackgroundEvent<Void> {
+
+    private final SortedSet<TopicPartition> assignedPartitions;
+    private final SortedSet<TopicPartition> addedPartitions;
+    private final StreamsRebalanceData.Assignment assignment;
+
+    /**
+     * Constructor for the streams partitions assigned event.
+     *
+     * @param assignedPartitions The full partition assignment to apply
+     * @param addedPartitions The newly added partitions
+     * @param assignment The task assignment for the callback
+     */
+    public StreamsTasksAssignedEvent(final SortedSet<TopicPartition> 
assignedPartitions,
+                                     final SortedSet<TopicPartition> 
addedPartitions,
+                                     final StreamsRebalanceData.Assignment 
assignment) {
+        super(Type.STREAMS_TASKS_ASSIGNED, Long.MAX_VALUE);
+        this.assignedPartitions = 
Collections.unmodifiableSortedSet(Objects.requireNonNull(assignedPartitions));
+        this.addedPartitions = 
Collections.unmodifiableSortedSet(Objects.requireNonNull(addedPartitions));
+        this.assignment = Objects.requireNonNull(assignment);
+    }
+
+    /**
+     * @return The full partition assignment to apply.
+     */
+    public SortedSet<TopicPartition> assignedPartitions() {
+        return assignedPartitions;
+    }
+
+    /**
+     * @return The newly added partitions.
+     */
+    public SortedSet<TopicPartition> addedPartitions() {
+        return addedPartitions;
+    }
+
+    /**
+     * @return The task assignment for the onTasksAssigned callback.
+     */
+    public StreamsRebalanceData.Assignment assignment() {
+        return assignment;
+    }
+
+    @Override
+    protected String toStringBase() {
+        return super.toStringBase() +
+                ", assignedPartitions=" + assignedPartitions +
+                ", addedPartitions=" + addedPartitions +
+                ", assignment=" + assignment;
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
index 4043b947c0f..6312ac01577 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
@@ -20,9 +20,9 @@ import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackCompletedEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackNeededEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackCompletedEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackNeededEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackCompletedEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackNeededEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.StreamsTasksAssignedEvent;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
@@ -112,8 +112,8 @@ public class StreamsMembershipManagerTest {
     private MemberStateListener memberStateListener;
 
     @Captor
-    private ArgumentCaptor<StreamsOnTasksAssignedCallbackNeededEvent> 
onTasksAssignedCallbackNeededEventCaptor;
-    private int onTasksAssignedCallbackNeededAddCount = 0;
+    private ArgumentCaptor<StreamsTasksAssignedEvent> tasksAssignedEventCaptor;
+    private int tasksAssignedAddCount = 0;
 
     @Captor
     private ArgumentCaptor<StreamsOnTasksRevokedCallbackNeededEvent> 
onTasksRevokedCallbackNeededEventCaptor;
@@ -215,10 +215,10 @@ public class StreamsMembershipManagerTest {
 
         final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of(new 
TopicPartition(TOPIC_0, PARTITION_0));
         final Set<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
 Set.of(), Set.of());
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
         
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
         verifyThatNoTasksHaveBeenRevoked();
     }
@@ -237,7 +237,7 @@ public class StreamsMembershipManagerTest {
             .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup,
 Set.of(), Set.of());
         acknowledging(onTasksAssignedCallbackExecutedSetup);
 
@@ -254,10 +254,10 @@ public class StreamsMembershipManagerTest {
             expectedNewPartitionsToAssign
         );
         onTasksRevokedCallbackExecuted.complete(null);
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
 Set.of(), Set.of());
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
         
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
     }
 
@@ -276,13 +276,13 @@ public class StreamsMembershipManagerTest {
             .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup,
 Set.of(), Set.of());
         acknowledging(onTasksAssignedCallbackExecutedSetup);
 
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0, PARTITION_1)));
 
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
 Set.of(), Set.of());
         final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of(
             new TopicPartition(TOPIC_0, PARTITION_0),
@@ -290,7 +290,7 @@ public class StreamsMembershipManagerTest {
         );
         final Set<TopicPartition> expectedNewPartitionsToAssign = Set.of(new 
TopicPartition(TOPIC_0, PARTITION_1));
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
         
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
         verifyThatNoTasksHaveBeenRevoked();
     }
@@ -313,7 +313,7 @@ public class StreamsMembershipManagerTest {
             .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0), new 
TopicPartition(TOPIC_0, PARTITION_1)));
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0, PARTITION_1)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup,
 Set.of(), Set.of());
         acknowledging(onTasksAssignedCallbackExecutedSetup);
 
@@ -331,9 +331,9 @@ public class StreamsMembershipManagerTest {
         );
         onTasksRevokedCallbackExecuted.complete(null);
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
 Set.of(), Set.of());
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
         
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
     }
 
@@ -354,7 +354,7 @@ public class StreamsMembershipManagerTest {
             SUBTOPOLOGY_ID_1, List.of(PARTITION_0))
         );
 
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
 Set.of(), Set.of());
         final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of(
             new TopicPartition(TOPIC_0, PARTITION_0),
@@ -362,7 +362,7 @@ public class StreamsMembershipManagerTest {
         );
         final Set<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
         
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
         verifyThatNoTasksHaveBeenRevoked();
     }
@@ -382,7 +382,7 @@ public class StreamsMembershipManagerTest {
             .thenReturn(Set.of());
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 activeTasksSetup,
                 Set.of(),
@@ -404,13 +404,13 @@ public class StreamsMembershipManagerTest {
         );
         onTasksRevokedCallbackExecuted.complete(null);
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 standbyTasks,
                 Set.of()
             );
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
         
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
     }
 
@@ -429,7 +429,7 @@ public class StreamsMembershipManagerTest {
             .thenReturn(Set.of());
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 activeTasksSetup,
                 Set.of(),
@@ -451,13 +451,13 @@ public class StreamsMembershipManagerTest {
         );
         onTasksRevokedCallbackExecuted.complete(null);
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 Set.of(),
                 warmupTasks
             );
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
         
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
     }
 
@@ -469,7 +469,7 @@ public class StreamsMembershipManagerTest {
 
         reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
 
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 standbyTasks,
@@ -478,7 +478,7 @@ public class StreamsMembershipManagerTest {
         final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of();
         final Set<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
         
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
         verifyThatNoTasksHaveBeenRevoked();
     }
@@ -493,7 +493,7 @@ public class StreamsMembershipManagerTest {
         );
         joining();
         reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 standbyTasksSetup,
@@ -505,7 +505,7 @@ public class StreamsMembershipManagerTest {
 
         reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_1)));
 
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 standbyTasks,
@@ -514,7 +514,7 @@ public class StreamsMembershipManagerTest {
         final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of();
         final Set<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
         
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
         verifyThatNoTasksHaveBeenRevoked();
     }
@@ -530,7 +530,7 @@ public class StreamsMembershipManagerTest {
         );
         joining();
         reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 standbyTasksSetup,
@@ -545,13 +545,13 @@ public class StreamsMembershipManagerTest {
         final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of();
         final Set<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 standbyTasks,
                 Set.of()
             );
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
         
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
         verifyThatNoTasksHaveBeenRevoked();
     }
@@ -567,7 +567,7 @@ public class StreamsMembershipManagerTest {
         );
         joining();
         reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0, PARTITION_1)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 standbyTasksSetup,
@@ -582,13 +582,13 @@ public class StreamsMembershipManagerTest {
         final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of();
         final Set<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 standbyTasks,
                 Set.of()
             );
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
         
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
         verifyThatNoTasksHaveBeenRevoked();
     }
@@ -608,7 +608,7 @@ public class StreamsMembershipManagerTest {
             .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
         joining();
         reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 standbyTasksSetup,
@@ -621,9 +621,9 @@ public class StreamsMembershipManagerTest {
         final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of(new 
TopicPartition(TOPIC_0, PARTITION_1));
         final Set<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
 Set.of(), Set.of());
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
         
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
         verifyThatNoTasksHaveBeenRevoked();
     }
@@ -638,7 +638,7 @@ public class StreamsMembershipManagerTest {
         );
         joining();
         reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 standbyTasksSetup,
@@ -650,7 +650,7 @@ public class StreamsMembershipManagerTest {
 
         reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_1)));
 
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 Set.of(),
@@ -659,7 +659,7 @@ public class StreamsMembershipManagerTest {
         final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of();
         final Set<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
         
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
         verifyThatNoTasksHaveBeenRevoked();
     }
@@ -672,7 +672,7 @@ public class StreamsMembershipManagerTest {
 
         reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
 
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 Set.of(),
@@ -681,7 +681,7 @@ public class StreamsMembershipManagerTest {
         final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of();
         final Set<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
         
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
         verifyThatNoTasksHaveBeenRevoked();
     }
@@ -696,7 +696,7 @@ public class StreamsMembershipManagerTest {
         );
         joining();
         reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 Set.of(),
@@ -708,7 +708,7 @@ public class StreamsMembershipManagerTest {
 
         reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_1)));
 
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 Set.of(),
@@ -717,7 +717,7 @@ public class StreamsMembershipManagerTest {
         final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of();
         final Set<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
         
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
         verifyThatNoTasksHaveBeenRevoked();
     }
@@ -733,7 +733,7 @@ public class StreamsMembershipManagerTest {
         );
         joining();
         reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 Set.of(),
@@ -745,7 +745,7 @@ public class StreamsMembershipManagerTest {
 
         reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0, PARTITION_1)));
 
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 Set.of(),
@@ -754,7 +754,7 @@ public class StreamsMembershipManagerTest {
         final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of();
         final Set<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
         
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
         verifyThatNoTasksHaveBeenRevoked();
     }
@@ -770,7 +770,7 @@ public class StreamsMembershipManagerTest {
         );
         joining();
         reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0, PARTITION_1)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 Set.of(),
@@ -782,7 +782,7 @@ public class StreamsMembershipManagerTest {
 
         reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_1)));
 
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 Set.of(),
@@ -791,7 +791,7 @@ public class StreamsMembershipManagerTest {
         final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of();
         final Set<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
         
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
         verifyThatNoTasksHaveBeenRevoked();
     }
@@ -811,7 +811,7 @@ public class StreamsMembershipManagerTest {
             .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_1)));
         joining();
         reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 Set.of(),
@@ -821,7 +821,7 @@ public class StreamsMembershipManagerTest {
 
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_1)));
 
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 activeTasks,
                 Set.of(),
@@ -830,7 +830,7 @@ public class StreamsMembershipManagerTest {
         final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of(new 
TopicPartition(TOPIC_0, PARTITION_1));
         final Set<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
         
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
         verifyThatNoTasksHaveBeenRevoked();
     }
@@ -845,7 +845,7 @@ public class StreamsMembershipManagerTest {
         );
         joining();
         reconcile(makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 Set.of(),
@@ -857,7 +857,7 @@ public class StreamsMembershipManagerTest {
 
         reconcile(makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_1)));
 
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 Set.of(),
                 standbyTasks,
@@ -866,7 +866,7 @@ public class StreamsMembershipManagerTest {
         final Set<TopicPartition> expectedFullPartitionsToAssign = Set.of();
         final Set<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
         
verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
         verifyThatNoTasksHaveBeenRevoked();
     }
@@ -880,7 +880,7 @@ public class StreamsMembershipManagerTest {
 
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
 
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 activeTasks,
                 Set.of(),
@@ -890,7 +890,9 @@ public class StreamsMembershipManagerTest {
         final Set<TopicPartition> expectedNewPartitionsToAssign = 
expectedFullPartitionsToAssign;
         
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign,
 expectedNewPartitionsToAssign);
 
-        onTasksAssignedCallbackExecuted.completeExceptionally(new 
RuntimeException("KABOOM!"));
+        // Assignment is applied before callback is invoked, then callback 
fails
+        
membershipManager.applyAssignment(onTasksAssignedCallbackExecuted.assignedPartitions(),
 onTasksAssignedCallbackExecuted.addedPartitions());
+        onTasksAssignedCallbackExecuted.future().completeExceptionally(new 
RuntimeException("KABOOM!"));
 
         verifyInStateReconciling(membershipManager);
         verify(subscriptionState, 
never()).enablePartitionsAwaitingCallback(any());
@@ -910,7 +912,7 @@ public class StreamsMembershipManagerTest {
             .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 activeTasksSetup,
                 Set.of(),
@@ -958,7 +960,7 @@ public class StreamsMembershipManagerTest {
             .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 activeTasksSetup,
                 Set.of(),
@@ -1009,7 +1011,7 @@ public class StreamsMembershipManagerTest {
             .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 activeTasksSetup,
                 Set.of(),
@@ -1057,7 +1059,7 @@ public class StreamsMembershipManagerTest {
             .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 activeTasksSetup,
                 Set.of(),
@@ -1083,14 +1085,14 @@ public class StreamsMembershipManagerTest {
             
verifyOnAllTasksLostCallbackNeededEventAddedToBackgroundEventHandler();
         onAllTasksLostCallbackExecuted.complete(null);
         membershipManager.maybeRejoinStaleMember();
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 activeTasks,
                 Set.of(),
                 Set.of()
             );
 
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
 
         assertNotEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
     }
@@ -1109,7 +1111,7 @@ public class StreamsMembershipManagerTest {
             .thenReturn(Set.of(new TopicPartition(TOPIC_0, PARTITION_0)));
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 activeTasksSetup,
                 Set.of(),
@@ -1135,13 +1137,13 @@ public class StreamsMembershipManagerTest {
 
         onAllTasksLostCallbackExecuted.complete(null);
 
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 activeTasks,
                 Set.of(),
                 Set.of()
             );
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
 
         assertNotEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
     }
@@ -1163,7 +1165,7 @@ public class StreamsMembershipManagerTest {
             Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 
PARTITION_0));
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
 Set.of(), Set.of());
         acknowledging(onTasksAssignedCallbackExecutedSetup);
         stable();
@@ -1255,7 +1257,7 @@ public class StreamsMembershipManagerTest {
             Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 
PARTITION_0));
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 activeTasks,
                 Set.of(),
@@ -1304,7 +1306,7 @@ public class StreamsMembershipManagerTest {
             Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 
PARTITION_0));
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 activeTasks,
                 Set.of(),
@@ -1349,7 +1351,7 @@ public class StreamsMembershipManagerTest {
         );
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 activeTasksSetup,
                 Set.of(),
@@ -1374,7 +1376,7 @@ public class StreamsMembershipManagerTest {
         );
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
                 activeTasksSetup,
                 Set.of(),
@@ -1446,9 +1448,9 @@ public class StreamsMembershipManagerTest {
     public void testOnHeartbeatSuccessWhenInReconciling() {
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(List.of(), 
MEMBER_EPOCH));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecuted =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecuted =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), 
Set.of(), Set.of());
-        onTasksAssignedCallbackExecuted.complete(null);
+        completeAssignment(onTasksAssignedCallbackExecuted);
         membershipManager.onHeartbeatRequestGenerated();
 
         
membershipManager.onHeartbeatSuccess(makeHeartbeatResponseWithActiveTasks(List.of(),
 MEMBER_EPOCH));
@@ -1464,7 +1466,7 @@ public class StreamsMembershipManagerTest {
             Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 
PARTITION_0));
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
 Set.of(), Set.of());
         acknowledging(onTasksAssignedCallbackExecutedSetup);
 
@@ -1497,7 +1499,7 @@ public class StreamsMembershipManagerTest {
             Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 
PARTITION_0));
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
 Set.of(), Set.of());
         acknowledging(onTasksAssignedCallbackExecutedSetup);
 
@@ -1513,7 +1515,7 @@ public class StreamsMembershipManagerTest {
             Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 
PARTITION_0));
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
 Set.of(), Set.of());
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_1)));
         acknowledging(onTasksAssignedCallbackExecutedSetup);
@@ -1608,7 +1610,7 @@ public class StreamsMembershipManagerTest {
             Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 
PARTITION_0));
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
 Set.of(), Set.of());
         acknowledging(onTasksAssignedCallbackExecutedSetup);
 
@@ -1622,7 +1624,7 @@ public class StreamsMembershipManagerTest {
             Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 
PARTITION_0));
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
 Set.of(), Set.of());
         acknowledging(onTasksAssignedCallbackExecutedSetup);
         stable();
@@ -1710,7 +1712,7 @@ public class StreamsMembershipManagerTest {
         );
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup,
 Set.of(), Set.of());
         acknowledging(onTasksAssignedCallbackExecutedSetup);
         stable();
@@ -1726,7 +1728,7 @@ public class StreamsMembershipManagerTest {
         );
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup,
 Set.of(), Set.of());
         acknowledging(onTasksAssignedCallbackExecutedSetup);
 
@@ -1741,7 +1743,7 @@ public class StreamsMembershipManagerTest {
         );
         joining();
         reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, 
List.of(PARTITION_0)));
-        final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
+        final StreamsTasksAssignedEvent onTasksAssignedCallbackExecutedSetup =
             
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup,
 Set.of(), Set.of());
         acknowledging(onTasksAssignedCallbackExecutedSetup);
         stable();
@@ -2008,14 +2010,14 @@ public class StreamsMembershipManagerTest {
         membershipManager.onHeartbeatSuccess(response);
         membershipManager.poll(time.milliseconds());
 
-        final CompletableFuture<Void> future = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+        final StreamsTasksAssignedEvent tasksAssignedEvent = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
             Set.of(),
             Set.of(),
             Set.of(),
             false
         );
 
-        future.complete(null);
+        completeAssignment(tasksAssignedEvent);
     }
 
     @Test
@@ -2040,14 +2042,14 @@ public class StreamsMembershipManagerTest {
         membershipManager.onHeartbeatSuccess(response);
         membershipManager.poll(time.milliseconds());
 
-        final CompletableFuture<Void> future = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+        final StreamsTasksAssignedEvent tasksAssignedEvent = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
             Set.of(),
             Set.of(),
             Set.of(),
             false
         );
 
-        future.complete(null);
+        completeAssignment(tasksAssignedEvent);
     }
 
     @Test
@@ -2072,14 +2074,14 @@ public class StreamsMembershipManagerTest {
         membershipManager.onHeartbeatSuccess(response);
         membershipManager.poll(time.milliseconds());
 
-        final CompletableFuture<Void> future = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+        final StreamsTasksAssignedEvent tasksAssignedEvent = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
             Set.of(),
             Set.of(),
             Set.of(),
             false
         );
 
-        future.complete(null);
+        completeAssignment(tasksAssignedEvent);
     }
 
     @Test
@@ -2104,14 +2106,14 @@ public class StreamsMembershipManagerTest {
         membershipManager.onHeartbeatSuccess(response);
         membershipManager.poll(time.milliseconds());
 
-        final CompletableFuture<Void> future = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+        final StreamsTasksAssignedEvent tasksAssignedEvent = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
             Set.of(),
             Set.of(),
             Set.of(),
             false
         );
 
-        future.complete(null);
+        completeAssignment(tasksAssignedEvent);
     }
 
     @Test
@@ -2130,14 +2132,14 @@ public class StreamsMembershipManagerTest {
         membershipManager.onHeartbeatSuccess(response);
         membershipManager.poll(time.milliseconds());
 
-        final CompletableFuture<Void> future = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+        final StreamsTasksAssignedEvent tasksAssignedEvent = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
             Set.of(),
             Set.of(),
             Set.of(),
             true
         );
 
-        future.complete(null);
+        completeAssignment(tasksAssignedEvent);
     }
 
     @Test
@@ -2162,14 +2164,14 @@ public class StreamsMembershipManagerTest {
         membershipManager.onHeartbeatSuccess(response);
         membershipManager.poll(time.milliseconds());
 
-        final CompletableFuture<Void> future = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+        final StreamsTasksAssignedEvent tasksAssignedEvent = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
             Set.of(),
             Set.of(),
             Set.of(),
             true
         );
 
-        future.complete(null);
+        completeAssignment(tasksAssignedEvent);
     }
 
     @Test
@@ -2188,13 +2190,13 @@ public class StreamsMembershipManagerTest {
         membershipManager.onHeartbeatSuccess(responseWithTasks);
         membershipManager.poll(time.milliseconds());
 
-        final CompletableFuture<Void> future1 = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+        final StreamsTasksAssignedEvent tasksAssignedEvent1 = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
             Set.of(),
             Set.of(),
             Set.of(),
             true
         );
-        future1.complete(null);
+        completeAssignment(tasksAssignedEvent1);
 
         final List<StreamsGroupHeartbeatResponseData.Status> statuses = 
List.of(
             new StreamsGroupHeartbeatResponseData.Status()
@@ -2213,13 +2215,13 @@ public class StreamsMembershipManagerTest {
         membershipManager.onHeartbeatSuccess(responseWithoutTasks);
         membershipManager.poll(time.milliseconds());
 
-        final CompletableFuture<Void> future2 = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+        final StreamsTasksAssignedEvent tasksAssignedEvent2 = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
             Set.of(),
             Set.of(),
             Set.of(),
             false
         );
-        future2.complete(null);
+        completeAssignment(tasksAssignedEvent2);
     }
 
     private void verifyThatNoTasksHaveBeenRevoked() {
@@ -2237,8 +2239,10 @@ public class StreamsMembershipManagerTest {
 
     private void 
verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(Set<TopicPartition>
 expectedAllPartitionsToAssign,
                                                                               
Set<TopicPartition> expectedNewPartitionsToAssign) {
-        
verify(subscriptionState).assignFromSubscribedAwaitingCallback(expectedAllPartitionsToAssign,
 expectedNewPartitionsToAssign);
-        
verify(memberStateListener).onGroupAssignmentUpdated(expectedAllPartitionsToAssign);
+        // Assignment is now deferred to app thread via 
StreamsTasksAssignedEvent,
+        // so assignFromSubscribedAwaitingCallback should NOT be called 
directly from background thread
+        verify(subscriptionState, 
never()).assignFromSubscribedAwaitingCallback(expectedAllPartitionsToAssign, 
expectedNewPartitionsToAssign);
+        verify(memberStateListener, 
never()).onGroupAssignmentUpdated(expectedAllPartitionsToAssign);
         verify(subscriptionState, 
never()).enablePartitionsAwaitingCallback(expectedNewPartitionsToAssign);
         verifyInStateReconciling(membershipManager);
     }
@@ -2318,20 +2322,30 @@ public class StreamsMembershipManagerTest {
         assertFalse(membershipManager.isLeavingGroup());
     }
 
-    private CompletableFuture<Void> 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(final 
Set<StreamsRebalanceData.TaskId> activeTasks,
-                                                                               
                           final Set<StreamsRebalanceData.TaskId> standbyTasks,
-                                                                               
                           final Set<StreamsRebalanceData.TaskId> warmupTasks) {
+    private StreamsTasksAssignedEvent 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(final 
Set<StreamsRebalanceData.TaskId> activeTasks,
+                                                                               
                             final Set<StreamsRebalanceData.TaskId> 
standbyTasks,
+                                                                               
                             final Set<StreamsRebalanceData.TaskId> 
warmupTasks) {
         return 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
 standbyTasks, warmupTasks, true);
     }
 
-    private CompletableFuture<Void> 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(final 
Set<StreamsRebalanceData.TaskId> activeTasks,
-                                                                               
                           final Set<StreamsRebalanceData.TaskId> standbyTasks,
-                                                                               
                           final Set<StreamsRebalanceData.TaskId> warmupTasks,
-                                                                               
                           final boolean isGroupReady) {
-        verify(backgroundEventHandler, 
times(++onTasksAssignedCallbackNeededAddCount)).add(onTasksAssignedCallbackNeededEventCaptor.capture());
-        final StreamsOnTasksAssignedCallbackNeededEvent 
onTasksAssignedCallbackNeeded = 
onTasksAssignedCallbackNeededEventCaptor.getValue();
-        assertEquals(makeTaskAssignment(activeTasks, standbyTasks, 
warmupTasks, isGroupReady), onTasksAssignedCallbackNeeded.assignment());
-        return onTasksAssignedCallbackNeeded.future();
+    private StreamsTasksAssignedEvent 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(final 
Set<StreamsRebalanceData.TaskId> activeTasks,
+                                                                               
                             final Set<StreamsRebalanceData.TaskId> 
standbyTasks,
+                                                                               
                             final Set<StreamsRebalanceData.TaskId> warmupTasks,
+                                                                               
                             final boolean isGroupReady) {
+        verify(backgroundEventHandler, 
times(++tasksAssignedAddCount)).add(tasksAssignedEventCaptor.capture());
+        final StreamsTasksAssignedEvent streamsTasksAssignedEvent = 
tasksAssignedEventCaptor.getValue();
+        assertEquals(makeTaskAssignment(activeTasks, standbyTasks, 
warmupTasks, isGroupReady), streamsTasksAssignedEvent.assignment());
+        return streamsTasksAssignedEvent;
+    }
+
+    /**
+     * Simulate the application thread processing the 
StreamsTasksAssignedEvent:
+     * 1. Apply the assignment (via ApplyAssignmentEvent to background thread)
+     * 2. Complete the callback future
+     */
+    private void completeAssignment(final StreamsTasksAssignedEvent event) {
+        membershipManager.applyAssignment(event.assignedPartitions(), 
event.addedPartitions());
+        event.future().complete(null);
     }
 
     private CompletableFuture<Void> 
verifyOnTasksRevokedCallbackNeededEventAddedToBackgroundEventHandler(final 
Set<StreamsRebalanceData.TaskId> activeTasksToRevoke) {
@@ -2354,8 +2368,8 @@ public class StreamsMembershipManagerTest {
                                         final Set<StreamsRebalanceData.TaskId> 
standbyTasks,
                                         final Set<StreamsRebalanceData.TaskId> 
warmupTasks) {
         verify(backgroundEventHandler, never()).add(argThat(a -> {
-            if (a instanceof StreamsOnTasksAssignedCallbackNeededEvent) {
-                return ((StreamsOnTasksAssignedCallbackNeededEvent) 
a).assignment()
+            if (a instanceof StreamsTasksAssignedEvent) {
+                return ((StreamsTasksAssignedEvent) a).assignment()
                     .equals(makeTaskAssignment(activeTasks, standbyTasks, 
warmupTasks));
             }
             return false;
@@ -2536,8 +2550,8 @@ public class StreamsMembershipManagerTest {
         verifyInStateReconciling(membershipManager);
     }
 
-    private void acknowledging(final CompletableFuture<Void> future) {
-        future.complete(null);
+    private void acknowledging(final StreamsTasksAssignedEvent event) {
+        completeAssignment(event);
         verifyInStateAcknowledging(membershipManager);
     }
 

Reply via email to