This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0d9fe518b61 KAFKA-20434: Consumer group does not recompute assignment
when static members rejoin with different assignor (#22036)
0d9fe518b61 is described below
commit 0d9fe518b616725fecd96162297fee89a7b7a6a5
Author: David Jacot <[email protected]>
AuthorDate: Mon Apr 13 20:18:17 2026 +0200
KAFKA-20434: Consumer group does not recompute assignment when static
members rejoin with different assignor (#22036)
When all static members of a consumer group using the consumer protocol
leave and rejoin with a different server-side assignor, the group does
not recompute the target assignment. The `bumpGroupEpoch` flag only
considers changes to subscribed topic names and regex but not the
preferred server assignor.
This patch adds a `hasPreferredServerAssignorChanged` check that
compares the effective preferred assignor (falling back to the default
when no member has an explicit preference) before and after the member
update. The group epoch is bumped when the effective preferred assignor
changes, which only happens once a majority of members has switched.
Reviewers: Sean Quah <[email protected]>, Lianet Magrans
<[email protected]>
---------
Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
---
.../server/ConsumerGroupHeartbeatRequestTest.scala | 76 ++++++++-
.../coordinator/group/GroupMetadataManager.java | 33 +++-
.../group/GroupMetadataManagerTest.java | 172 +++++++++++++++++++++
3 files changed, 278 insertions(+), 3 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 20ac6d42347..6754cb4517a 100644
---
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -28,7 +28,7 @@ import
org.apache.kafka.common.requests.{ConsumerGroupHeartbeatRequest, Consumer
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.server.common.Feature
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotEquals, assertNotNull}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotEquals, assertNotNull, assertTrue}
import scala.collection.Map
import scala.jdk.CollectionConverters._
@@ -1273,4 +1273,78 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
.setAssignment(expectedAssignment4)
assertEquals(expectedResponse4, response4.data)
}
+
+ @ClusterTest(
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"0"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, value
= "0")
+ )
+ )
+ def testStaticMembersRejoinWithDifferentServerAssignor(): Unit = {
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ createOffsetsTopic()
+
+ val groupId = "grp"
+ val instanceIds = List("instance-1", "instance-2", "instance-3")
+
+ // A helper that joins a static member with the given server assignor and
waits until
+ // the heartbeat returns a successful response. Returns the member epoch
from the response.
+ def joinStaticMember(memberId: String, instanceId: String, serverAssignor:
String): Int = {
+ val joinRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setInstanceId(instanceId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setServerAssignor(serverAssignor)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ var response: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ response =
connectAndReceive[ConsumerGroupHeartbeatResponse](joinRequest)
+ response.data.errorCode == Errors.NONE.code
+ }, msg = s"Static member $instanceId could not join the group. Last
response $response.")
+ response.data.memberEpoch
+ }
+
+ // Three static members join the group with the "uniform" assignor.
+ val initialMemberIds = instanceIds.map(_ => Uuid.randomUuid.toString)
+ val initialEpochs = initialMemberIds.zip(instanceIds).map { case
(memberId, instanceId) =>
+ joinStaticMember(memberId, instanceId, "uniform")
+ }
+ val epochBeforeRejoin = initialEpochs.last
+
+ // All three members leave the group.
+ initialMemberIds.zip(instanceIds).foreach { case (memberId, instanceId) =>
+ val leaveRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setInstanceId(instanceId)
+ .setMemberEpoch(-2)
+ ).build()
+ val leaveResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](leaveRequest)
+ assertEquals(Errors.NONE.code, leaveResponse.data.errorCode)
+ assertEquals(-2, leaveResponse.data.memberEpoch)
+ }
+
+ // All three members rejoin with the "range" assignor and new member ids.
The group
+ // epoch must be bumped once a majority of members has switched assignor
so that the
+ // target assignment is recomputed with the new assignor.
+ val rejoinEpochs = instanceIds.map { instanceId =>
+ joinStaticMember(Uuid.randomUuid.toString, instanceId, "range")
+ }
+
+ // The last rejoin epoch must be greater than the epoch before the
leave/rejoin cycle,
+ // confirming that the group epoch was bumped and the assignment was
recomputed.
+ val lastRejoinEpoch = rejoinEpochs.last
+ assertTrue(lastRejoinEpoch > epochBeforeRejoin,
+ s"Expected the last rejoin epoch ($lastRejoinEpoch) to be greater than "
+
+ s"the epoch before the rejoin ($epochBeforeRejoin). " +
+ s"Rejoin epochs: ${rejoinEpochs.mkString(", ")}.")
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 81a6e1b0c4a..a3663f34527 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2377,6 +2377,11 @@ public class GroupMetadataManager {
updatedMember,
records
);
+ boolean preferredServerAssignorChanged =
hasPreferredServerAssignorChanged(
+ group,
+ member,
+ updatedMember
+ );
// The subscription has changed when either the subscribed topic names
or subscribed topic
// regex has changed.
@@ -2389,9 +2394,12 @@ public class GroupMetadataManager {
// the group epoch when the member has changed its subscribed
topic names or the member
// has changed its subscribed topic regex to a regex that is
already resolved. We avoid
// bumping the group epoch when the new subscribed topic regex has
not been resolved
- // yet, since we will have to update the target assignment again
later.
+ // yet, since we will have to update the target assignment again
later. We also bump the
+ // group epoch when the effective preferred server assignor
changes, since the target
+ // assignment must be recomputed with the new assignor.
subscribedTopicNamesChanged ||
- updateRegularExpressionStatus ==
UpdateRegularExpressionStatus.REGEX_UPDATED_AND_RESOLVED;
+ updateRegularExpressionStatus ==
UpdateRegularExpressionStatus.REGEX_UPDATED_AND_RESOLVED ||
+ preferredServerAssignorChanged;
if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
// The subscription metadata is updated in two cases:
@@ -3232,6 +3240,27 @@ public class GroupMetadataManager {
return false;
}
+ /**
+ * Returns true if the effective preferred server assignor of the group
changes as a
+ * result of updating the given member. The effective preferred assignor
falls back to
+ * the default assignor when no member has an explicit preference.
+ *
+ * @param group The consumer group.
+ * @param member The old member.
+ * @param updatedMember The updated member.
+ * @return Whether the effective preferred server assignor has changed.
+ */
+ private boolean hasPreferredServerAssignorChanged(
+ ConsumerGroup group,
+ ConsumerGroupMember member,
+ ConsumerGroupMember updatedMember
+ ) {
+ String defaultAssignorName = defaultConsumerGroupAssignor.name();
+ String currentPreferredAssignor =
group.preferredServerAssignor().orElse(defaultAssignorName);
+ String newPreferredAssignor =
group.computePreferredServerAssignor(member,
updatedMember).orElse(defaultAssignorName);
+ return !currentPreferredAssignor.equals(newPreferredAssignor);
+ }
+
private static boolean isNotEmpty(String value) {
return value != null && !value.isEmpty();
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index a4b8141ed5c..8bfcc7dae54 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -2536,6 +2536,178 @@ public class GroupMetadataManagerTest {
context.assertNoRebalanceTimeout(groupId, memberId2);
}
+ @Test
+ public void testStaticMembersRejoinWithNewServerAssignor() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+ String memberId3 = Uuid.randomUuid().toString();
+ String member2RejoinId = Uuid.randomUuid().toString();
+ String member3RejoinId = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MockPartitionAssignor uniformAssignor = new
MockPartitionAssignor("uniform");
+ MockPartitionAssignor rangeAssignor = new
MockPartitionAssignor("range");
+
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setInstanceId("instance-id-1")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("uniform")
+ .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)), 10))
+ .build();
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.STABLE)
+ .setInstanceId("instance-id-2")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("uniform")
+ .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+ mkTopicAssignment(fooTopicId, 2, 3)), 10))
+ .build();
+ ConsumerGroupMember member3 = new
ConsumerGroupMember.Builder(memberId3)
+ .setState(MemberState.STABLE)
+ .setInstanceId("instance-id-3")
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("uniform")
+ .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+ mkTopicAssignment(fooTopicId, 4, 5)), 10))
+ .build();
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addRacks()
+ .build();
+
+ // Consumer group with three static members using the uniform assignor.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(uniformAssignor, rangeAssignor))
+ .withMetadataImage(new
KRaftCoordinatorMetadataImage(metadataImage))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(member1)
+ .withMember(member2)
+ .withMember(member3)
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)))
+ .withAssignment(memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 2, 3)))
+ .withAssignment(memberId3, mkAssignment(
+ mkTopicAssignment(fooTopicId, 4, 5)))
+ .withAssignmentEpoch(10)
+ .withMetadataHash(computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, new
KRaftCoordinatorMetadataImage(metadataImage))
+ ))))
+ .build();
+
+ // All three members leave the consumer group.
+ for (var entry : List.of(
+ Map.entry(memberId1, "instance-id-1"),
+ Map.entry(memberId2, "instance-id-2"),
+ Map.entry(memberId3, "instance-id-3")
+ )) {
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(entry.getKey())
+ .setInstanceId(entry.getValue())
+ .setMemberEpoch(-2));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(entry.getKey())
+ .setMemberEpoch(-2),
+ result.response()
+ );
+ }
+
+ // Member 2 rejoins with "range" assignor. The preferred assignor is
still "uniform"
+ // (counts: uniform=2, range=1) so the group epoch is not bumped.
Member 2 gets back
+ // its existing assignment at epoch 10.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> rejoinResult2 = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setMemberId(member2RejoinId)
+ .setGroupId(groupId)
+ .setInstanceId("instance-id-2")
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setServerAssignor("range")
+ .setSubscribedTopicNames(List.of("foo"))
+ .setTopicPartitions(List.of()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(member2RejoinId)
+ .setMemberEpoch(10)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(2, 3))
+ ))),
+ rejoinResult2.response()
+ );
+
+ // Member 3 rejoins with "range" assignor. The preferred assignor
shifts to "range"
+ // (counts: uniform=1, range=2) so the group epoch is bumped to 11 and
a new target
+ // assignment is computed using the range assignor.
+ rangeAssignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+ memberId1, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)
+ )),
+ member2RejoinId, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 2, 3)
+ )),
+ member3RejoinId, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 4, 5)
+ ))
+ )));
+
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> rejoinResult3 = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setMemberId(member3RejoinId)
+ .setGroupId(groupId)
+ .setInstanceId("instance-id-3")
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setServerAssignor("range")
+ .setSubscribedTopicNames(List.of("foo"))
+ .setTopicPartitions(List.of()));
+
+ // Verify that the group epoch was bumped to 11 and the member got the
new assignment.
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(member3RejoinId)
+ .setMemberEpoch(11)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(4, 5))
+ ))),
+ rejoinResult3.response()
+ );
+ }
+
@Test
public void testNoGroupEpochBumpWhenStaticMemberTemporarilyLeaves() {
String groupId = "fooup";