This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new beb39ac301f KAFKA-20431: ConsumerGroupDescribe API does not return
partitions being revoked (#22015)
beb39ac301f is described below
commit beb39ac301f6cf7262870b0758f0bddee69efcfe
Author: David Jacot <[email protected]>
AuthorDate: Fri Apr 10 17:50:02 2026 +0200
KAFKA-20431: ConsumerGroupDescribe API does not return partitions being
revoked (#22015)
The `Assignment` field in the `ConsumerGroupDescribe` response only
included `assignedPartitions`, omitting `partitionsPendingRevocation`.
This caused partitions to disappear from the response during
reconciliation. The fix merges both into the `Assignment` field since
the member is still responsible for those partitions until revocation
completes.
Reviewers: Sean Quah <[email protected]>, Lianet Magrans
<[email protected]>
---
.../server/ConsumerGroupDescribeRequestTest.scala | 127 ++++++++++++++++++++-
.../group/modern/consumer/ConsumerGroupMember.java | 25 +++-
.../apache/kafka/coordinator/group/Assertions.java | 37 ++++++
.../modern/consumer/ConsumerGroupMemberTest.java | 74 ++++++++++--
4 files changed, 252 insertions(+), 11 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
index 018fe853973..58b16ee6bfa 100644
---
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
@@ -22,13 +22,13 @@ import
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.{ConsumerGroupState, TopicPartition, Uuid}
import
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assignment,
DescribedGroup, TopicPartitions}
-import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData,
ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatResponseData}
+import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData,
ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData,
ConsumerGroupHeartbeatResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ConsumerGroupDescribeRequest,
ConsumerGroupDescribeResponse}
import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.coordinator.group.{Assertions, GroupCoordinatorConfig}
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.common.Feature
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
@@ -319,4 +319,127 @@ class ConsumerGroupDescribeRequestTest(cluster:
ClusterInstance) extends GroupCo
assertEquals(if (version == 0) -1.toByte else 1.toByte,
member2.get.memberType)
}
}
+
+ @ClusterTest
+ def testConsumerGroupDescribeIncludesPartitionsPendingRevocation(): Unit = {
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ createOffsetsTopic()
+
+ val topicId = createTopic(
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ val clientId = "client-id"
+ val clientHost = "/127.0.0.1"
+ val authorizedOperationsInt = Utils.to32BitField(
+ AclEntry.supportedOperations(ResourceType.GROUP).asScala
+ .map(_.code.asInstanceOf[JByte]).asJava
+ )
+
+ // Member-1 joins using the range assignor and waits until it gets all 3
partitions.
+ var member1Response: ConsumerGroupHeartbeatResponseData = null
+ TestUtils.waitUntilTrue(() => {
+ member1Response = consumerGroupHeartbeat(
+ groupId = "grp",
+ memberId = "member-1",
+ serverAssignor = "range",
+ rebalanceTimeoutMs = 5 * 60 * 1000,
+ subscribedTopicNames = List("foo"),
+ topicPartitions = List.empty
+ )
+ member1Response.assignment != null &&
!member1Response.assignment.topicPartitions.isEmpty
+ }, msg = s"Could not join the group successfully. Last response
$member1Response.")
+
+ // Member-2 joins, triggering a rebalance. The target assignment changes:
+ // member-1 -> [0, 1], member-2 -> [2].
+ val member2Response = consumerGroupHeartbeat(
+ groupId = "grp",
+ memberId = "member-2",
+ serverAssignor = "range",
+ rebalanceTimeoutMs = 5 * 60 * 1000,
+ subscribedTopicNames = List("foo"),
+ topicPartitions = List.empty
+ )
+
+ // Member-1 heartbeats again, reporting it still owns all 3 partitions.
+ // This triggers reconciliation: assigned becomes [0, 1] and partition 2
+ // moves to partitions pending revocation.
+ consumerGroupHeartbeat(
+ groupId = "grp",
+ memberId = "member-1",
+ memberEpoch = member1Response.memberEpoch,
+ rebalanceTimeoutMs = 5 * 60 * 1000,
+ subscribedTopicNames = List("foo"),
+ topicPartitions = List(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1, 2).asJava)
+ )
+ )
+
+ // Describe the group and verify that member-1's assignment includes
+ // both assigned partitions and partitions pending revocation.
+ val expected = List(
+ new DescribedGroup()
+ .setGroupId("grp")
+ .setGroupState(ConsumerGroupState.RECONCILING.toString)
+ .setGroupEpoch(member2Response.memberEpoch)
+ .setAssignmentEpoch(member2Response.memberEpoch)
+ .setAssignorName("range")
+ .setAuthorizedOperations(authorizedOperationsInt)
+ .setMembers(List(
+ new ConsumerGroupDescribeResponseData.Member()
+ .setMemberId("member-1")
+ .setMemberEpoch(member1Response.memberEpoch)
+ .setClientId(clientId)
+ .setClientHost(clientHost)
+ .setSubscribedTopicRegex("")
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setAssignment(new Assignment()
+ .setTopicPartitions(List(
+ new TopicPartitions()
+ .setTopicId(topicId)
+ .setTopicName("foo")
+ .setPartitions(List[Integer](0, 1, 2).asJava)
+ ).asJava))
+ .setTargetAssignment(new Assignment()
+ .setTopicPartitions(List(
+ new TopicPartitions()
+ .setTopicId(topicId)
+ .setTopicName("foo")
+ .setPartitions(List[Integer](0, 1).asJava)
+ ).asJava))
+ .setMemberType(1.toByte),
+ new ConsumerGroupDescribeResponseData.Member()
+ .setMemberId("member-2")
+ .setMemberEpoch(member2Response.memberEpoch)
+ .setClientId(clientId)
+ .setClientHost(clientHost)
+ .setSubscribedTopicRegex("")
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setAssignment(new Assignment())
+ .setTargetAssignment(new Assignment()
+ .setTopicPartitions(List(
+ new TopicPartitions()
+ .setTopicId(topicId)
+ .setTopicName("foo")
+ .setPartitions(List[Integer](2).asJava)
+ ).asJava))
+ .setMemberType(1.toByte),
+ ).asJava)
+ )
+
+ val actual = consumerGroupDescribe(
+ groupIds = List("grp"),
+ includeAuthorizedOperations = true,
+ version =
ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled),
+ )
+
+ Assertions.assertResponseEquals(
+ new ConsumerGroupDescribeResponseData().setGroups(expected.asJava),
+ new ConsumerGroupDescribeResponseData().setGroups(actual.asJava)
+ )
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
index 7a10dbda6fa..5b92240d343 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
@@ -480,12 +480,18 @@ public class ConsumerGroupMember extends
ModernGroupMember {
Assignment targetAssignment,
CoordinatorMetadataImage image
) {
+ // The assignment includes both assigned partitions and partitions
pending
+ // revocation because the member is still responsible for the latter
until
+ // revocation is complete.
+ var topicPartitionsMap = new HashMap<Uuid,
ConsumerGroupDescribeResponseData.TopicPartitions>();
+ accumulateTopicPartitions(assignedPartitions, topicPartitionsMap,
image);
+ accumulateTopicPartitions(partitionsPendingRevocation,
topicPartitionsMap, image);
+
return new ConsumerGroupDescribeResponseData.Member()
.setMemberEpoch(memberEpoch)
.setMemberId(memberId)
.setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
- .setTopicPartitions(topicPartitionsFromAssignment(
- Utils.toAssignmentWithoutEpochs(assignedPartitions),
image)))
+ .setTopicPartitions(new
ArrayList<>(topicPartitionsMap.values())))
.setTargetAssignment(new
ConsumerGroupDescribeResponseData.Assignment()
.setTopicPartitions(topicPartitionsFromAssignment(
targetAssignment != null ? targetAssignment.partitions() :
Map.of(),
@@ -500,6 +506,21 @@ public class ConsumerGroupMember extends ModernGroupMember
{
.setMemberType(useClassicProtocol() ? (byte) 0 : (byte) 1);
}
+ private static void accumulateTopicPartitions(
+ Map<Uuid, Map<Integer, Integer>> source,
+ Map<Uuid, ConsumerGroupDescribeResponseData.TopicPartitions> target,
+ CoordinatorMetadataImage image
+ ) {
+ source.forEach((topicId, eps) ->
+ image.topicMetadata(topicId).ifPresent(metadata ->
+ target.computeIfAbsent(topicId, __ ->
+ new ConsumerGroupDescribeResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setTopicName(metadata.name())
+ .setPartitions(new ArrayList<>())
+ ).partitions().addAll(eps.keySet())));
+ }
+
private static List<ConsumerGroupDescribeResponseData.TopicPartitions>
topicPartitionsFromAssignment(
Map<Uuid, Set<Integer>> partitions,
CoordinatorMetadataImage image
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
index 519c2dd0302..cced7b17472 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupResponseData;
@@ -51,6 +52,7 @@ public class Assertions {
private static final BiConsumer<ApiMessage, ApiMessage>
API_MESSAGE_DEFAULT_COMPARATOR = org.junit.jupiter.api.Assertions::assertEquals;
private static final Map<Class<?>, BiConsumer<ApiMessage, ApiMessage>>
API_MESSAGE_COMPARATORS = Map.of(
// Register request/response comparators.
+ ConsumerGroupDescribeResponseData.class,
Assertions::assertConsumerGroupDescribeResponse,
ConsumerGroupHeartbeatResponseData.class,
Assertions::assertConsumerGroupHeartbeatResponse,
ShareGroupHeartbeatResponseData.class,
Assertions::assertShareGroupHeartbeatResponse,
SyncGroupResponseData.class, Assertions::assertSyncGroupResponse,
@@ -149,6 +151,41 @@ public class Assertions {
}
}
+ public static void normalizeAssignment(
+ ConsumerGroupDescribeResponseData.Assignment assignment
+ ) {
+ if (assignment != null) {
+ assignment.topicPartitions().sort(Comparator.comparing(
+ ConsumerGroupDescribeResponseData.TopicPartitions::topicId
+ ));
+ assignment.topicPartitions().forEach(topic ->
topic.partitions().sort(Integer::compareTo));
+ }
+ }
+
+ private static void assertConsumerGroupDescribeResponse(
+ ApiMessage exp,
+ ApiMessage act
+ ) {
+ var expected = (ConsumerGroupDescribeResponseData) exp.duplicate();
+ var actual = (ConsumerGroupDescribeResponseData) act.duplicate();
+
+ Consumer<ConsumerGroupDescribeResponseData> normalize = message ->
+ message.groups().forEach(group -> {
+ group.members().sort(Comparator.comparing(
+ ConsumerGroupDescribeResponseData.Member::memberId
+ ));
+ group.members().forEach(member -> {
+ normalizeAssignment(member.assignment());
+ normalizeAssignment(member.targetAssignment());
+ });
+ });
+
+ normalize.accept(expected);
+ normalize.accept(actual);
+
+ assertEquals(expected, actual);
+ }
+
private static void assertConsumerGroupHeartbeatResponse(
ApiMessage exp,
ApiMessage act
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
index 202325d2327..7f7e0fa8a3e 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
@@ -21,6 +21,7 @@ import
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
+import org.apache.kafka.coordinator.group.Assertions;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.modern.Assignment;
@@ -333,8 +334,11 @@ public class ConsumerGroupMemberTest {
.setSupportedProtocols(toClassicProtocolCollection("range")) :
null)
.build();
- ConsumerGroupDescribeResponseData.Member actual =
member.asConsumerGroupDescribeMember(targetAssignment, new
KRaftCoordinatorMetadataImage(metadataImage));
- ConsumerGroupDescribeResponseData.Member expected = new
ConsumerGroupDescribeResponseData.Member()
+ var actual = member.asConsumerGroupDescribeMember(
+ targetAssignment,
+ new KRaftCoordinatorMetadataImage(metadataImage)
+ );
+ var expected = new ConsumerGroupDescribeResponseData.Member()
.setMemberId(memberId)
.setMemberEpoch(epoch)
.setClientId(clientId)
@@ -344,12 +348,19 @@ public class ConsumerGroupMemberTest {
.setSubscribedTopicNames(new ArrayList<>(subscribedTopicNames))
.setSubscribedTopicRegex(subscribedTopicRegex)
.setAssignment(
+ // The assignment should include both assigned partitions and
+ // partitions pending revocation.
new ConsumerGroupDescribeResponseData.Assignment()
- .setTopicPartitions(List.of(new
ConsumerGroupDescribeResponseData.TopicPartitions()
- .setTopicId(topicId1)
- .setTopicName("topic1")
- .setPartitions(assignedPartitions)
- ))
+ .setTopicPartitions(new ArrayList<>(List.of(
+ new ConsumerGroupDescribeResponseData.TopicPartitions()
+ .setTopicId(topicId1)
+ .setTopicName("topic1")
+ .setPartitions(assignedPartitions),
+ new ConsumerGroupDescribeResponseData.TopicPartitions()
+ .setTopicId(topicId2)
+ .setTopicName("topic2")
+ .setPartitions(Arrays.asList(3, 4, 5))
+ )))
)
.setTargetAssignment(
new ConsumerGroupDescribeResponseData.Assignment()
@@ -362,9 +373,58 @@ public class ConsumerGroupMemberTest {
)
.setMemberType(withClassicMemberMetadata ? (byte) 0 : (byte) 1);
+ // Sort to avoid order dependency from HashMap iteration.
+ Assertions.normalizeAssignment(actual.assignment());
+ Assertions.normalizeAssignment(expected.assignment());
+
assertEquals(expected, actual);
}
+ @Test
+ public void
testAsConsumerGroupDescribeMemberWithSameTopicPendingRevocation() {
+ var topicId1 = Uuid.randomUuid();
+ var metadataImage = new MetadataImageBuilder()
+ .addTopic(topicId1, "topic1", 6)
+ .build();
+
+ // Assigned partitions [0, 1] and partitions pending revocation [2]
share the same topic.
+ var record = new ConsumerGroupCurrentMemberAssignmentValue()
+ .setMemberEpoch(5)
+ .setPreviousMemberEpoch(4)
+ .setAssignedPartitions(List.of(
+ new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+ .setTopicId(topicId1)
+ .setPartitions(Arrays.asList(0, 1))
+ ))
+ .setPartitionsPendingRevocation(List.of(
+ new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+ .setTopicId(topicId1)
+ .setPartitions(List.of(2))
+ ));
+
+ var memberId = Uuid.randomUuid().toString();
+ var member = new ConsumerGroupMember.Builder(memberId)
+ .updateWith(LOG, GROUP_ID, record)
+ .build();
+
+ var actual = member.asConsumerGroupDescribeMember(
+ null,
+ new KRaftCoordinatorMetadataImage(metadataImage)
+ );
+
+ // The assignment should merge both assigned and pending revocation
for the same topic.
+ Assertions.normalizeAssignment(actual.assignment());
+ assertEquals(
+ List.of(
+ new ConsumerGroupDescribeResponseData.TopicPartitions()
+ .setTopicId(topicId1)
+ .setTopicName("topic1")
+ .setPartitions(Arrays.asList(0, 1, 2))
+ ),
+ actual.assignment().topicPartitions()
+ );
+ }
+
@Test
public void testAsConsumerGroupDescribeWithTargetAssignmentNull() {
ConsumerGroupMember member = new
ConsumerGroupMember.Builder(Uuid.randomUuid().toString())