This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d5d98685680 KAFKA-20431: ConsumerGroupDescribe API does not return 
partitions being revoked (#22015)
d5d98685680 is described below

commit d5d98685680e2167fde5272f529ac51659e33226
Author: David Jacot <[email protected]>
AuthorDate: Fri Apr 10 17:50:02 2026 +0200

    KAFKA-20431: ConsumerGroupDescribe API does not return partitions being 
revoked (#22015)
    
    The `Assignment` field in the `ConsumerGroupDescribe` response only
    included `assignedPartitions`, omitting `partitionsPendingRevocation`.
    This caused partitions to disappear from the response during
    reconciliation. The fix merges both into the `Assignment` field since
    the member is still responsible for those partitions until revocation
    completes.
    
    Reviewers: Sean Quah <[email protected]>, Lianet Magrans
    <[email protected]>
---
 .../server/ConsumerGroupDescribeRequestTest.scala  | 127 ++++++++++++++++++++-
 .../group/modern/consumer/ConsumerGroupMember.java |  25 +++-
 .../apache/kafka/coordinator/group/Assertions.java |  37 ++++++
 .../modern/consumer/ConsumerGroupMemberTest.java   |  74 ++++++++++--
 4 files changed, 252 insertions(+), 11 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
index 018fe853973..58b16ee6bfa 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
@@ -22,13 +22,13 @@ import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.common.{ConsumerGroupState, TopicPartition, Uuid}
 import 
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assignment, 
DescribedGroup, TopicPartitions}
-import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData, 
ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatResponseData}
+import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData, 
ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, 
ConsumerGroupHeartbeatResponseData}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{ConsumerGroupDescribeRequest, 
ConsumerGroupDescribeResponse}
 import org.apache.kafka.common.resource.ResourceType
 import org.apache.kafka.common.test.ClusterInstance
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.coordinator.group.{Assertions, GroupCoordinatorConfig}
 import org.apache.kafka.security.authorizer.AclEntry
 import org.apache.kafka.server.common.Feature
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
@@ -319,4 +319,127 @@ class ConsumerGroupDescribeRequestTest(cluster: 
ClusterInstance) extends GroupCo
       assertEquals(if (version == 0) -1.toByte else 1.toByte, 
member2.get.memberType)
     }
   }
+
+  @ClusterTest
+  def testConsumerGroupDescribeIncludesPartitionsPendingRevocation(): Unit = {
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    val topicId = createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    val clientId = "client-id"
+    val clientHost = "/127.0.0.1"
+    val authorizedOperationsInt = Utils.to32BitField(
+      AclEntry.supportedOperations(ResourceType.GROUP).asScala
+        .map(_.code.asInstanceOf[JByte]).asJava
+    )
+
+    // Member-1 joins using the range assignor and waits until it gets all 3 
partitions.
+    var member1Response: ConsumerGroupHeartbeatResponseData = null
+    TestUtils.waitUntilTrue(() => {
+      member1Response = consumerGroupHeartbeat(
+        groupId = "grp",
+        memberId = "member-1",
+        serverAssignor = "range",
+        rebalanceTimeoutMs = 5 * 60 * 1000,
+        subscribedTopicNames = List("foo"),
+        topicPartitions = List.empty
+      )
+      member1Response.assignment != null && 
!member1Response.assignment.topicPartitions.isEmpty
+    }, msg = s"Could not join the group successfully. Last response 
$member1Response.")
+
+    // Member-2 joins, triggering a rebalance. The target assignment changes:
+    // member-1 -> [0, 1], member-2 -> [2].
+    val member2Response = consumerGroupHeartbeat(
+      groupId = "grp",
+      memberId = "member-2",
+      serverAssignor = "range",
+      rebalanceTimeoutMs = 5 * 60 * 1000,
+      subscribedTopicNames = List("foo"),
+      topicPartitions = List.empty
+    )
+
+    // Member-1 heartbeats again, reporting it still owns all 3 partitions.
+    // This triggers reconciliation: assigned becomes [0, 1] and partition 2
+    // moves to partitions pending revocation.
+    consumerGroupHeartbeat(
+      groupId = "grp",
+      memberId = "member-1",
+      memberEpoch = member1Response.memberEpoch,
+      rebalanceTimeoutMs = 5 * 60 * 1000,
+      subscribedTopicNames = List("foo"),
+      topicPartitions = List(
+        new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+          .setTopicId(topicId)
+          .setPartitions(List[Integer](0, 1, 2).asJava)
+      )
+    )
+
+    // Describe the group and verify that member-1's assignment includes
+    // both assigned partitions and partitions pending revocation.
+    val expected = List(
+      new DescribedGroup()
+        .setGroupId("grp")
+        .setGroupState(ConsumerGroupState.RECONCILING.toString)
+        .setGroupEpoch(member2Response.memberEpoch)
+        .setAssignmentEpoch(member2Response.memberEpoch)
+        .setAssignorName("range")
+        .setAuthorizedOperations(authorizedOperationsInt)
+        .setMembers(List(
+          new ConsumerGroupDescribeResponseData.Member()
+            .setMemberId("member-1")
+            .setMemberEpoch(member1Response.memberEpoch)
+            .setClientId(clientId)
+            .setClientHost(clientHost)
+            .setSubscribedTopicRegex("")
+            .setSubscribedTopicNames(List("foo").asJava)
+            .setAssignment(new Assignment()
+              .setTopicPartitions(List(
+                new TopicPartitions()
+                  .setTopicId(topicId)
+                  .setTopicName("foo")
+                  .setPartitions(List[Integer](0, 1, 2).asJava)
+              ).asJava))
+            .setTargetAssignment(new Assignment()
+              .setTopicPartitions(List(
+                new TopicPartitions()
+                  .setTopicId(topicId)
+                  .setTopicName("foo")
+                  .setPartitions(List[Integer](0, 1).asJava)
+              ).asJava))
+            .setMemberType(1.toByte),
+          new ConsumerGroupDescribeResponseData.Member()
+            .setMemberId("member-2")
+            .setMemberEpoch(member2Response.memberEpoch)
+            .setClientId(clientId)
+            .setClientHost(clientHost)
+            .setSubscribedTopicRegex("")
+            .setSubscribedTopicNames(List("foo").asJava)
+            .setAssignment(new Assignment())
+            .setTargetAssignment(new Assignment()
+              .setTopicPartitions(List(
+                new TopicPartitions()
+                  .setTopicId(topicId)
+                  .setTopicName("foo")
+                  .setPartitions(List[Integer](2).asJava)
+              ).asJava))
+            .setMemberType(1.toByte),
+        ).asJava)
+    )
+
+    val actual = consumerGroupDescribe(
+      groupIds = List("grp"),
+      includeAuthorizedOperations = true,
+      version = 
ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled),
+    )
+
+    Assertions.assertResponseEquals(
+      new ConsumerGroupDescribeResponseData().setGroups(expected.asJava),
+      new ConsumerGroupDescribeResponseData().setGroups(actual.asJava)
+    )
+  }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
index 7a10dbda6fa..5b92240d343 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java
@@ -480,12 +480,18 @@ public class ConsumerGroupMember extends 
ModernGroupMember {
         Assignment targetAssignment,
         CoordinatorMetadataImage image
     ) {
+        // The assignment includes both assigned partitions and partitions 
pending
+        // revocation because the member is still responsible for the latter 
until
+        // revocation is complete.
+        var topicPartitionsMap = new HashMap<Uuid, 
ConsumerGroupDescribeResponseData.TopicPartitions>();
+        accumulateTopicPartitions(assignedPartitions, topicPartitionsMap, 
image);
+        accumulateTopicPartitions(partitionsPendingRevocation, 
topicPartitionsMap, image);
+
         return new ConsumerGroupDescribeResponseData.Member()
             .setMemberEpoch(memberEpoch)
             .setMemberId(memberId)
             .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
-                .setTopicPartitions(topicPartitionsFromAssignment(
-                    Utils.toAssignmentWithoutEpochs(assignedPartitions), 
image)))
+                .setTopicPartitions(new 
ArrayList<>(topicPartitionsMap.values())))
             .setTargetAssignment(new 
ConsumerGroupDescribeResponseData.Assignment()
                 .setTopicPartitions(topicPartitionsFromAssignment(
                     targetAssignment != null ? targetAssignment.partitions() : 
Map.of(),
@@ -500,6 +506,21 @@ public class ConsumerGroupMember extends ModernGroupMember 
{
             .setMemberType(useClassicProtocol() ? (byte) 0 : (byte) 1);
     }
 
+    private static void accumulateTopicPartitions(
+        Map<Uuid, Map<Integer, Integer>> source,
+        Map<Uuid, ConsumerGroupDescribeResponseData.TopicPartitions> target,
+        CoordinatorMetadataImage image
+    ) {
+        source.forEach((topicId, eps) ->
+            image.topicMetadata(topicId).ifPresent(metadata ->
+                target.computeIfAbsent(topicId, __ ->
+                    new ConsumerGroupDescribeResponseData.TopicPartitions()
+                        .setTopicId(topicId)
+                        .setTopicName(metadata.name())
+                        .setPartitions(new ArrayList<>())
+                ).partitions().addAll(eps.keySet())));
+    }
+
     private static List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitionsFromAssignment(
         Map<Uuid, Set<Integer>> partitions,
         CoordinatorMetadataImage image
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
index 519c2dd0302..cced7b17472 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
@@ -51,6 +52,7 @@ public class Assertions {
     private static final BiConsumer<ApiMessage, ApiMessage> 
API_MESSAGE_DEFAULT_COMPARATOR = org.junit.jupiter.api.Assertions::assertEquals;
     private static final Map<Class<?>, BiConsumer<ApiMessage, ApiMessage>> 
API_MESSAGE_COMPARATORS = Map.of(
         // Register request/response comparators.
+        ConsumerGroupDescribeResponseData.class, 
Assertions::assertConsumerGroupDescribeResponse,
         ConsumerGroupHeartbeatResponseData.class, 
Assertions::assertConsumerGroupHeartbeatResponse,
         ShareGroupHeartbeatResponseData.class, 
Assertions::assertShareGroupHeartbeatResponse,
         SyncGroupResponseData.class, Assertions::assertSyncGroupResponse,
@@ -149,6 +151,41 @@ public class Assertions {
         }
     }
 
+    public static void normalizeAssignment(
+        ConsumerGroupDescribeResponseData.Assignment assignment
+    ) {
+        if (assignment != null) {
+            assignment.topicPartitions().sort(Comparator.comparing(
+                ConsumerGroupDescribeResponseData.TopicPartitions::topicId
+            ));
+            assignment.topicPartitions().forEach(topic -> 
topic.partitions().sort(Integer::compareTo));
+        }
+    }
+
+    private static void assertConsumerGroupDescribeResponse(
+        ApiMessage exp,
+        ApiMessage act
+    ) {
+        var expected = (ConsumerGroupDescribeResponseData) exp.duplicate();
+        var actual = (ConsumerGroupDescribeResponseData) act.duplicate();
+
+        Consumer<ConsumerGroupDescribeResponseData> normalize = message ->
+            message.groups().forEach(group -> {
+                group.members().sort(Comparator.comparing(
+                    ConsumerGroupDescribeResponseData.Member::memberId
+                ));
+                group.members().forEach(member -> {
+                    normalizeAssignment(member.assignment());
+                    normalizeAssignment(member.targetAssignment());
+                });
+            });
+
+        normalize.accept(expected);
+        normalize.accept(actual);
+
+        assertEquals(expected, actual);
+    }
+
     private static void assertConsumerGroupHeartbeatResponse(
         ApiMessage exp,
         ApiMessage act
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
index 202325d2327..7f7e0fa8a3e 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java
@@ -21,6 +21,7 @@ import 
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import 
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
 import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
+import org.apache.kafka.coordinator.group.Assertions;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
 import org.apache.kafka.coordinator.group.modern.Assignment;
@@ -333,8 +334,11 @@ public class ConsumerGroupMemberTest {
                 .setSupportedProtocols(toClassicProtocolCollection("range")) : 
null)
             .build();
 
-        ConsumerGroupDescribeResponseData.Member actual = 
member.asConsumerGroupDescribeMember(targetAssignment, new 
KRaftCoordinatorMetadataImage(metadataImage));
-        ConsumerGroupDescribeResponseData.Member expected = new 
ConsumerGroupDescribeResponseData.Member()
+        var actual = member.asConsumerGroupDescribeMember(
+            targetAssignment,
+            new KRaftCoordinatorMetadataImage(metadataImage)
+        );
+        var expected = new ConsumerGroupDescribeResponseData.Member()
             .setMemberId(memberId)
             .setMemberEpoch(epoch)
             .setClientId(clientId)
@@ -344,12 +348,19 @@ public class ConsumerGroupMemberTest {
             .setSubscribedTopicNames(new ArrayList<>(subscribedTopicNames))
             .setSubscribedTopicRegex(subscribedTopicRegex)
             .setAssignment(
+                // The assignment should include both assigned partitions and
+                // partitions pending revocation.
                 new ConsumerGroupDescribeResponseData.Assignment()
-                    .setTopicPartitions(List.of(new 
ConsumerGroupDescribeResponseData.TopicPartitions()
-                        .setTopicId(topicId1)
-                        .setTopicName("topic1")
-                        .setPartitions(assignedPartitions)
-                    ))
+                    .setTopicPartitions(new ArrayList<>(List.of(
+                        new ConsumerGroupDescribeResponseData.TopicPartitions()
+                            .setTopicId(topicId1)
+                            .setTopicName("topic1")
+                            .setPartitions(assignedPartitions),
+                        new ConsumerGroupDescribeResponseData.TopicPartitions()
+                            .setTopicId(topicId2)
+                            .setTopicName("topic2")
+                            .setPartitions(Arrays.asList(3, 4, 5))
+                    )))
             )
             .setTargetAssignment(
                 new ConsumerGroupDescribeResponseData.Assignment()
@@ -362,9 +373,58 @@ public class ConsumerGroupMemberTest {
             )
             .setMemberType(withClassicMemberMetadata ? (byte) 0 : (byte) 1);
 
+        // Sort to avoid order dependency from HashMap iteration.
+        Assertions.normalizeAssignment(actual.assignment());
+        Assertions.normalizeAssignment(expected.assignment());
+
         assertEquals(expected, actual);
     }
 
+    @Test
+    public void 
testAsConsumerGroupDescribeMemberWithSameTopicPendingRevocation() {
+        var topicId1 = Uuid.randomUuid();
+        var metadataImage = new MetadataImageBuilder()
+            .addTopic(topicId1, "topic1", 6)
+            .build();
+
+        // Assigned partitions [0, 1] and partitions pending revocation [2] 
share the same topic.
+        var record = new ConsumerGroupCurrentMemberAssignmentValue()
+            .setMemberEpoch(5)
+            .setPreviousMemberEpoch(4)
+            .setAssignedPartitions(List.of(
+                new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+                    .setTopicId(topicId1)
+                    .setPartitions(Arrays.asList(0, 1))
+            ))
+            .setPartitionsPendingRevocation(List.of(
+                new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+                    .setTopicId(topicId1)
+                    .setPartitions(List.of(2))
+            ));
+
+        var memberId = Uuid.randomUuid().toString();
+        var member = new ConsumerGroupMember.Builder(memberId)
+            .updateWith(LOG, GROUP_ID, record)
+            .build();
+
+        var actual = member.asConsumerGroupDescribeMember(
+            null,
+            new KRaftCoordinatorMetadataImage(metadataImage)
+        );
+
+        // The assignment should merge both assigned and pending revocation 
for the same topic.
+        Assertions.normalizeAssignment(actual.assignment());
+        assertEquals(
+            List.of(
+                new ConsumerGroupDescribeResponseData.TopicPartitions()
+                    .setTopicId(topicId1)
+                    .setTopicName("topic1")
+                    .setPartitions(Arrays.asList(0, 1, 2))
+            ),
+            actual.assignment().topicPartitions()
+        );
+    }
+
     @Test
     public void testAsConsumerGroupDescribeWithTargetAssignmentNull() {
         ConsumerGroupMember member = new 
ConsumerGroupMember.Builder(Uuid.randomUuid().toString())

Reply via email to