This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f188a311243 KAFKA-19500: `kafka-consumer-groups.sh` should fail 
quickly if the partition leader is unavailable (#20168)
f188a311243 is described below

commit f188a311243eb2801474e089d32fa77db76b6dc4
Author: xijiu <[email protected]>
AuthorDate: Mon Jul 21 16:25:40 2025 +0800

    KAFKA-19500: `kafka-consumer-groups.sh` should fail quickly if the 
partition leader is unavailable (#20168)
    
    1. Add check leader missing logic in method
    `ConsumerGroupCommand.ConsumerGroupService#prepareOffsetsToReset` in
    order to fail quickly
    2. Add some tests
    
    Reviewers: TaiJuWu <[email protected]>, Lan Ding <[email protected]>,
     Ken Huang <[email protected]>, Andrew Schofield
     <[email protected]>
---
 .../tools/consumer/group/ConsumerGroupCommand.java | 37 +++++++++++++++++++++
 .../consumer/group/ConsumerGroupServiceTest.java   |  5 ++-
 .../group/ResetConsumerGroupOffsetTest.java        | 38 ++++++++++++++++++++++
 3 files changed, 79 insertions(+), 1 deletion(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
index da3ccff9260..6a0d1ffd224 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
@@ -41,6 +41,8 @@ import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.util.CommandLineUtils;
@@ -1000,6 +1002,9 @@ public class ConsumerGroupCommand {
         }
 
         private Map<TopicPartition, OffsetAndMetadata> 
prepareOffsetsToReset(String groupId, Collection<TopicPartition> 
partitionsToReset) {
+            // ensure all partitions are valid, otherwise throw a runtime 
exception
+            checkAllTopicPartitionsValid(partitionsToReset);
+
             if (opts.options.has(opts.resetToOffsetOpt)) {
                 return offsetsUtils.resetToOffset(partitionsToReset);
             } else if (opts.options.has(opts.resetToEarliestOpt)) {
@@ -1024,6 +1029,38 @@ public class ConsumerGroupCommand {
             return null;
         }
 
+        private void checkAllTopicPartitionsValid(Collection<TopicPartition> 
partitionsToReset) {
+            // check the partitions exist
+            List<TopicPartition> partitionsNotExistList = 
filterNonExistentPartitions(partitionsToReset);
+            if (!partitionsNotExistList.isEmpty()) {
+                String partitionStr = 
partitionsNotExistList.stream().map(TopicPartition::toString).collect(Collectors.joining(","));
+                throw new UnknownTopicOrPartitionException("The partitions \"" 
+ partitionStr + "\" do not exist");
+            }
+
+            // check the partitions have leader
+            List<TopicPartition> partitionsWithoutLeader = 
filterNoneLeaderPartitions(partitionsToReset);
+            if (!partitionsWithoutLeader.isEmpty()) {
+                String partitionStr = 
partitionsWithoutLeader.stream().map(TopicPartition::toString).collect(Collectors.joining(","));
+                throw new LeaderNotAvailableException("The partitions \"" + 
partitionStr + "\" have no leader");
+            }
+        }
+
+        private List<TopicPartition> 
filterNonExistentPartitions(Collection<TopicPartition> topicPartitions) {
+            // collect all topics
+            Set<String> topics = 
topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
+            try {
+                List<TopicPartition> existPartitions = 
adminClient.describeTopics(topics).allTopicNames().get().entrySet()
+                        .stream()
+                        .flatMap(entry -> 
entry.getValue().partitions().stream()
+                                .map(partitionInfo -> new 
TopicPartition(entry.getKey(), partitionInfo.partition())))
+                        .toList();
+
+                return topicPartitions.stream().filter(element -> 
!existPartitions.contains(element)).toList();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
         String exportOffsetsToCsv(Map<String, Map<TopicPartition, 
OffsetAndMetadata>> assignments) {
             boolean isSingleGroupQuery = 
opts.options.valuesOf(opts.groupOpt).size() == 1;
             ObjectWriter csvWriter = isSingleGroupQuery
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
index 0ddba4346ba..242aa1bc894 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
@@ -62,6 +62,7 @@ import java.util.stream.IntStream;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anySet;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -234,6 +235,8 @@ public class ConsumerGroupServiceTest {
                 .thenReturn(describeGroupsResult(GroupState.DEAD));
         
when(admin.describeTopics(ArgumentMatchers.eq(topicsWithoutPartitionsSpecified),
 any()))
                 
.thenReturn(describeTopicsResult(topicsWithoutPartitionsSpecified));
+        when(admin.describeTopics(anySet()))
+                .thenReturn(describeTopicsResult(TOPICS));
         when(admin.listOffsets(offsetsArgMatcher(), any()))
                 .thenReturn(listOffsetsResult());
 
@@ -317,7 +320,7 @@ public class ConsumerGroupServiceTest {
 
         topics.forEach(topic -> {
             List<TopicPartitionInfo> partitions = IntStream.range(0, 
NUM_PARTITIONS)
-                    .mapToObj(i -> new TopicPartitionInfo(i, null, 
Collections.emptyList(), Collections.emptyList()))
+                    .mapToObj(i -> new TopicPartitionInfo(i, Node.noNode(), 
Collections.emptyList(), Collections.emptyList()))
                     .collect(Collectors.toList());
             topicDescriptions.put(topic, new TopicDescription(topic, false, 
partitions));
         });
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
index 3c417cdeeac..5fb704cf53d 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
@@ -27,6 +27,8 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.GroupState;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.test.ClusterInstance;
@@ -81,6 +83,7 @@ import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_IN
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -659,6 +662,41 @@ public class ResetConsumerGroupOffsetTest {
         assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
     }
 
+    @ClusterTest(brokers = 3, serverProperties = {@ClusterConfigProperty(key = 
OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2")})
+    public void testResetOffsetsWithPartitionNoneLeader(ClusterInstance 
cluster) throws Exception {
+        String group = generateRandomGroupId();
+        String topic = generateRandomTopic();
+        String[] args = buildArgsForGroup(cluster, group, "--topic", topic + 
":0,1,2",
+                "--to-earliest", "--execute");
+
+        try (Admin admin = cluster.admin();
+             ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(args)) {
+
+            admin.createTopics(singleton(new NewTopic(topic, 3, (short) 
1))).all().get();
+            produceConsumeAndShutdown(cluster, topic, group, 2, 
GroupProtocol.CLASSIC);
+            assertDoesNotThrow(() -> resetOffsets(service));
+            // shutdown a broker to make some partitions missing leader
+            cluster.shutdownBroker(0);
+            assertThrows(LeaderNotAvailableException.class, () -> 
resetOffsets(service));
+        }
+    }
+
+    @ClusterTest
+    public void testResetOffsetsWithPartitionNotExist(ClusterInstance cluster) 
throws Exception {
+        String group = generateRandomGroupId();
+        String topic = generateRandomTopic();
+        String[] args = buildArgsForGroup(cluster, group, "--topic", topic + 
":2,3",
+                "--to-earliest", "--execute");
+
+        try (Admin admin = cluster.admin();
+             ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(args)) {
+
+            admin.createTopics(singleton(new NewTopic(topic, 1, (short) 
1))).all().get();
+            produceConsumeAndShutdown(cluster, topic, group, 2, 
GroupProtocol.CLASSIC);
+            assertThrows(UnknownTopicOrPartitionException.class, () -> 
resetOffsets(service));
+        }
+    }
+
     private String generateRandomTopic() {
         return TOPIC_PREFIX + TestUtils.randomString(10);
     }

Reply via email to