This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0d9fe518b61 KAFKA-20434: Consumer group does not recompute assignment 
when static members rejoin with different assignor (#22036)
0d9fe518b61 is described below

commit 0d9fe518b616725fecd96162297fee89a7b7a6a5
Author: David Jacot <[email protected]>
AuthorDate: Mon Apr 13 20:18:17 2026 +0200

    KAFKA-20434: Consumer group does not recompute assignment when static 
members rejoin with different assignor (#22036)
    
    When all static members of a consumer group using the consumer protocol
    leave and rejoin with a different server-side assignor, the group does
    not recompute the target assignment. The `bumpGroupEpoch` flag only
    considers changes to subscribed topic names and regex but not the
    preferred server assignor.
    
    This patch adds a `hasPreferredServerAssignorChanged` check that
    compares the effective preferred assignor (falling back to the default
    when no member has an explicit preference) before and after the member
    update. The group epoch is bumped when the effective preferred assignor
    changes, which only happens once a majority of members has switched.
    
    Reviewers: Sean Quah <[email protected]>, Lianet Magrans
    <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
---
 .../server/ConsumerGroupHeartbeatRequestTest.scala |  76 ++++++++-
 .../coordinator/group/GroupMetadataManager.java    |  33 +++-
 .../group/GroupMetadataManagerTest.java            | 172 +++++++++++++++++++++
 3 files changed, 278 insertions(+), 3 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 20ac6d42347..6754cb4517a 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -28,7 +28,7 @@ import 
org.apache.kafka.common.requests.{ConsumerGroupHeartbeatRequest, Consumer
 import org.apache.kafka.common.test.ClusterInstance
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
 import org.apache.kafka.server.common.Feature
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotEquals, assertNotNull}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotEquals, assertNotNull, assertTrue}
 
 import scala.collection.Map
 import scala.jdk.CollectionConverters._
@@ -1273,4 +1273,78 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
       .setAssignment(expectedAssignment4)
     assertEquals(expectedResponse4, response4.data)
   }
+
+  @ClusterTest(
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = 
"0"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, value 
= "0")
+    )
+  )
+  def testStaticMembersRejoinWithDifferentServerAssignor(): Unit = {
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    val groupId = "grp"
+    val instanceIds = List("instance-1", "instance-2", "instance-3")
+
+    // A helper that joins a static member with the given server assignor and 
waits until
+    // the heartbeat returns a successful response. Returns the member epoch 
from the response.
+    def joinStaticMember(memberId: String, instanceId: String, serverAssignor: 
String): Int = {
+      val joinRequest = new ConsumerGroupHeartbeatRequest.Builder(
+        new ConsumerGroupHeartbeatRequestData()
+          .setGroupId(groupId)
+          .setMemberId(memberId)
+          .setInstanceId(instanceId)
+          .setMemberEpoch(0)
+          .setRebalanceTimeoutMs(5 * 60 * 1000)
+          .setServerAssignor(serverAssignor)
+          .setSubscribedTopicNames(List("foo").asJava)
+          .setTopicPartitions(List.empty.asJava)
+      ).build()
+
+      var response: ConsumerGroupHeartbeatResponse = null
+      TestUtils.waitUntilTrue(() => {
+        response = 
connectAndReceive[ConsumerGroupHeartbeatResponse](joinRequest)
+        response.data.errorCode == Errors.NONE.code
+      }, msg = s"Static member $instanceId could not join the group. Last 
response $response.")
+      response.data.memberEpoch
+    }
+
+    // Three static members join the group with the "uniform" assignor.
+    val initialMemberIds = instanceIds.map(_ => Uuid.randomUuid.toString)
+    val initialEpochs = initialMemberIds.zip(instanceIds).map { case 
(memberId, instanceId) =>
+      joinStaticMember(memberId, instanceId, "uniform")
+    }
+    val epochBeforeRejoin = initialEpochs.last
+
+    // All three members leave the group.
+    initialMemberIds.zip(instanceIds).foreach { case (memberId, instanceId) =>
+      val leaveRequest = new ConsumerGroupHeartbeatRequest.Builder(
+        new ConsumerGroupHeartbeatRequestData()
+          .setGroupId(groupId)
+          .setMemberId(memberId)
+          .setInstanceId(instanceId)
+          .setMemberEpoch(-2)
+      ).build()
+      val leaveResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](leaveRequest)
+      assertEquals(Errors.NONE.code, leaveResponse.data.errorCode)
+      assertEquals(-2, leaveResponse.data.memberEpoch)
+    }
+
+    // All three members rejoin with the "range" assignor and new member ids. 
The group
+    // epoch must be bumped once a majority of members has switched assignor 
so that the
+    // target assignment is recomputed with the new assignor.
+    val rejoinEpochs = instanceIds.map { instanceId =>
+      joinStaticMember(Uuid.randomUuid.toString, instanceId, "range")
+    }
+
+    // The last rejoin epoch must be greater than the epoch before the 
leave/rejoin cycle,
+    // confirming that the group epoch was bumped and the assignment was 
recomputed.
+    val lastRejoinEpoch = rejoinEpochs.last
+    assertTrue(lastRejoinEpoch > epochBeforeRejoin,
+      s"Expected the last rejoin epoch ($lastRejoinEpoch) to be greater than " 
+
+      s"the epoch before the rejoin ($epochBeforeRejoin). " +
+      s"Rejoin epochs: ${rejoinEpochs.mkString(", ")}.")
+  }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 81a6e1b0c4a..a3663f34527 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2377,6 +2377,11 @@ public class GroupMetadataManager {
             updatedMember,
             records
         );
+        boolean preferredServerAssignorChanged = 
hasPreferredServerAssignorChanged(
+            group,
+            member,
+            updatedMember
+        );
 
         // The subscription has changed when either the subscribed topic names 
or subscribed topic
         // regex has changed.
@@ -2389,9 +2394,12 @@ public class GroupMetadataManager {
             // the group epoch when the member has changed its subscribed 
topic names or the member
             // has changed its subscribed topic regex to a regex that is 
already resolved. We avoid
             // bumping the group epoch when the new subscribed topic regex has 
not been resolved
-            // yet, since we will have to update the target assignment again 
later.
+            // yet, since we will have to update the target assignment again 
later. We also bump the
+            // group epoch when the effective preferred server assignor 
changes, since the target
+            // assignment must be recomputed with the new assignor.
             subscribedTopicNamesChanged ||
-            updateRegularExpressionStatus == 
UpdateRegularExpressionStatus.REGEX_UPDATED_AND_RESOLVED;
+            updateRegularExpressionStatus == 
UpdateRegularExpressionStatus.REGEX_UPDATED_AND_RESOLVED ||
+            preferredServerAssignorChanged;
 
         if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
             // The subscription metadata is updated in two cases:
@@ -3232,6 +3240,27 @@ public class GroupMetadataManager {
         return false;
     }
 
+    /**
+     * Returns true if the effective preferred server assignor of the group 
changes as a
+     * result of updating the given member. The effective preferred assignor 
falls back to
+     * the default assignor when no member has an explicit preference.
+     *
+     * @param group         The consumer group.
+     * @param member        The old member.
+     * @param updatedMember The updated member.
+     * @return Whether the effective preferred server assignor has changed.
+     */
+    private boolean hasPreferredServerAssignorChanged(
+        ConsumerGroup group,
+        ConsumerGroupMember member,
+        ConsumerGroupMember updatedMember
+    ) {
+        String defaultAssignorName = defaultConsumerGroupAssignor.name();
+        String currentPreferredAssignor = 
group.preferredServerAssignor().orElse(defaultAssignorName);
+        String newPreferredAssignor = 
group.computePreferredServerAssignor(member, 
updatedMember).orElse(defaultAssignorName);
+        return !currentPreferredAssignor.equals(newPreferredAssignor);
+    }
+
     private static boolean isNotEmpty(String value) {
         return value != null && !value.isEmpty();
     }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index a4b8141ed5c..8bfcc7dae54 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -2536,6 +2536,178 @@ public class GroupMetadataManagerTest {
         context.assertNoRebalanceTimeout(groupId, memberId2);
     }
 
+    @Test
+    public void testStaticMembersRejoinWithNewServerAssignor() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+        String member2RejoinId = Uuid.randomUuid().toString();
+        String member3RejoinId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor uniformAssignor = new 
MockPartitionAssignor("uniform");
+        MockPartitionAssignor rangeAssignor = new 
MockPartitionAssignor("range");
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setState(MemberState.STABLE)
+            .setInstanceId("instance-id-1")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("uniform")
+            .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1)), 10))
+            .build();
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setState(MemberState.STABLE)
+            .setInstanceId("instance-id-2")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("uniform")
+            .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+                mkTopicAssignment(fooTopicId, 2, 3)), 10))
+            .build();
+        ConsumerGroupMember member3 = new 
ConsumerGroupMember.Builder(memberId3)
+            .setState(MemberState.STABLE)
+            .setInstanceId("instance-id-3")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("uniform")
+            .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+                mkTopicAssignment(fooTopicId, 4, 5)), 10))
+            .build();
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .addRacks()
+            .build();
+
+        // Consumer group with three static members using the uniform assignor.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(uniformAssignor, rangeAssignor))
+            .withMetadataImage(new 
KRaftCoordinatorMetadataImage(metadataImage))
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(member1)
+                .withMember(member2)
+                .withMember(member3)
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 2, 3)))
+                .withAssignment(memberId3, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 4, 5)))
+                .withAssignmentEpoch(10)
+                .withMetadataHash(computeGroupHash(Map.of(
+                    fooTopicName, computeTopicHash(fooTopicName, new 
KRaftCoordinatorMetadataImage(metadataImage))
+                ))))
+            .build();
+
+        // All three members leave the consumer group.
+        for (var entry : List.of(
+            Map.entry(memberId1, "instance-id-1"),
+            Map.entry(memberId2, "instance-id-2"),
+            Map.entry(memberId3, "instance-id-3")
+        )) {
+            CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(entry.getKey())
+                    .setInstanceId(entry.getValue())
+                    .setMemberEpoch(-2));
+
+            assertResponseEquals(
+                new ConsumerGroupHeartbeatResponseData()
+                    .setMemberId(entry.getKey())
+                    .setMemberEpoch(-2),
+                result.response()
+            );
+        }
+
+        // Member 2 rejoins with "range" assignor. The preferred assignor is 
still "uniform"
+        // (counts: uniform=2, range=1) so the group epoch is not bumped. 
Member 2 gets back
+        // its existing assignment at epoch 10.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> rejoinResult2 = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setMemberId(member2RejoinId)
+                .setGroupId(groupId)
+                .setInstanceId("instance-id-2")
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(List.of("foo"))
+                .setTopicPartitions(List.of()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(member2RejoinId)
+                .setMemberEpoch(10)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(2, 3))
+                    ))),
+            rejoinResult2.response()
+        );
+
+        // Member 3 rejoins with "range" assignor. The preferred assignor 
shifts to "range"
+        // (counts: uniform=1, range=2) so the group epoch is bumped to 11 and 
a new target
+        // assignment is computed using the range assignor.
+        rangeAssignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+            memberId1, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1)
+            )),
+            member2RejoinId, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 2, 3)
+            )),
+            member3RejoinId, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 4, 5)
+            ))
+        )));
+
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> rejoinResult3 = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setMemberId(member3RejoinId)
+                .setGroupId(groupId)
+                .setInstanceId("instance-id-3")
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(List.of("foo"))
+                .setTopicPartitions(List.of()));
+
+        // Verify that the group epoch was bumped to 11 and the member got the 
new assignment.
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(member3RejoinId)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(4, 5))
+                    ))),
+            rejoinResult3.response()
+        );
+    }
+
     @Test
     public void testNoGroupEpochBumpWhenStaticMemberTemporarilyLeaves() {
         String groupId = "fooup";

Reply via email to