This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f188a311243 KAFKA-19500: `kafka-consumer-groups.sh` should fail
quickly if the partition leader is unavailable (#20168)
f188a311243 is described below
commit f188a311243eb2801474e089d32fa77db76b6dc4
Author: xijiu <[email protected]>
AuthorDate: Mon Jul 21 16:25:40 2025 +0800
KAFKA-19500: `kafka-consumer-groups.sh` should fail quickly if the
partition leader is unavailable (#20168)
1. Add check leader missing logic in method
`ConsumerGroupCommand.ConsumerGroupService#prepareOffsetsToReset` in
order to fail quickly
2. Add some tests
Reviewers: TaiJuWu <[email protected]>, Lan Ding <[email protected]>,
Ken Huang <[email protected]>, Andrew Schofield
<[email protected]>
---
.../tools/consumer/group/ConsumerGroupCommand.java | 37 +++++++++++++++++++++
.../consumer/group/ConsumerGroupServiceTest.java | 5 ++-
.../group/ResetConsumerGroupOffsetTest.java | 38 ++++++++++++++++++++++
3 files changed, 79 insertions(+), 1 deletion(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
index da3ccff9260..6a0d1ffd224 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
@@ -41,6 +41,8 @@ import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
@@ -1000,6 +1002,9 @@ public class ConsumerGroupCommand {
}
private Map<TopicPartition, OffsetAndMetadata>
prepareOffsetsToReset(String groupId, Collection<TopicPartition>
partitionsToReset) {
+ // ensure all partitions are valid, otherwise throw a runtime
exception
+ checkAllTopicPartitionsValid(partitionsToReset);
+
if (opts.options.has(opts.resetToOffsetOpt)) {
return offsetsUtils.resetToOffset(partitionsToReset);
} else if (opts.options.has(opts.resetToEarliestOpt)) {
@@ -1024,6 +1029,38 @@ public class ConsumerGroupCommand {
return null;
}
+ private void checkAllTopicPartitionsValid(Collection<TopicPartition>
partitionsToReset) {
+ // check the partitions exist
+ List<TopicPartition> partitionsNotExistList =
filterNonExistentPartitions(partitionsToReset);
+ if (!partitionsNotExistList.isEmpty()) {
+ String partitionStr =
partitionsNotExistList.stream().map(TopicPartition::toString).collect(Collectors.joining(","));
+ throw new UnknownTopicOrPartitionException("The partitions \""
+ partitionStr + "\" do not exist");
+ }
+
+ // check the partitions have leader
+ List<TopicPartition> partitionsWithoutLeader =
filterNoneLeaderPartitions(partitionsToReset);
+ if (!partitionsWithoutLeader.isEmpty()) {
+ String partitionStr =
partitionsWithoutLeader.stream().map(TopicPartition::toString).collect(Collectors.joining(","));
+ throw new LeaderNotAvailableException("The partitions \"" +
partitionStr + "\" have no leader");
+ }
+ }
+
+ private List<TopicPartition>
filterNonExistentPartitions(Collection<TopicPartition> topicPartitions) {
+ // collect all topics
+ Set<String> topics =
topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
+ try {
+ List<TopicPartition> existPartitions =
adminClient.describeTopics(topics).allTopicNames().get().entrySet()
+ .stream()
+ .flatMap(entry ->
entry.getValue().partitions().stream()
+ .map(partitionInfo -> new
TopicPartition(entry.getKey(), partitionInfo.partition())))
+ .toList();
+
+ return topicPartitions.stream().filter(element ->
!existPartitions.contains(element)).toList();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
String exportOffsetsToCsv(Map<String, Map<TopicPartition,
OffsetAndMetadata>> assignments) {
boolean isSingleGroupQuery =
opts.options.valuesOf(opts.groupOpt).size() == 1;
ObjectWriter csvWriter = isSingleGroupQuery
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
index 0ddba4346ba..242aa1bc894 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
@@ -62,6 +62,7 @@ import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -234,6 +235,8 @@ public class ConsumerGroupServiceTest {
.thenReturn(describeGroupsResult(GroupState.DEAD));
when(admin.describeTopics(ArgumentMatchers.eq(topicsWithoutPartitionsSpecified),
any()))
.thenReturn(describeTopicsResult(topicsWithoutPartitionsSpecified));
+ when(admin.describeTopics(anySet()))
+ .thenReturn(describeTopicsResult(TOPICS));
when(admin.listOffsets(offsetsArgMatcher(), any()))
.thenReturn(listOffsetsResult());
@@ -317,7 +320,7 @@ public class ConsumerGroupServiceTest {
topics.forEach(topic -> {
List<TopicPartitionInfo> partitions = IntStream.range(0,
NUM_PARTITIONS)
- .mapToObj(i -> new TopicPartitionInfo(i, null,
Collections.emptyList(), Collections.emptyList()))
+ .mapToObj(i -> new TopicPartitionInfo(i, Node.noNode(),
Collections.emptyList(), Collections.emptyList()))
.collect(Collectors.toList());
topicDescriptions.put(topic, new TopicDescription(topic, false,
partitions));
});
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
index 3c417cdeeac..5fb704cf53d 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
@@ -27,6 +27,8 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.test.ClusterInstance;
@@ -81,6 +83,7 @@ import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_IN
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -659,6 +662,41 @@ public class ResetConsumerGroupOffsetTest {
assertThrows(OptionException.class, () ->
getConsumerGroupService(cgcArgs));
}
+ @ClusterTest(brokers = 3, serverProperties = {@ClusterConfigProperty(key =
OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2")})
+ public void testResetOffsetsWithPartitionNoneLeader(ClusterInstance
cluster) throws Exception {
+ String group = generateRandomGroupId();
+ String topic = generateRandomTopic();
+ String[] args = buildArgsForGroup(cluster, group, "--topic", topic +
":0,1,2",
+ "--to-earliest", "--execute");
+
+ try (Admin admin = cluster.admin();
+ ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(args)) {
+
+ admin.createTopics(singleton(new NewTopic(topic, 3, (short)
1))).all().get();
+ produceConsumeAndShutdown(cluster, topic, group, 2,
GroupProtocol.CLASSIC);
+ assertDoesNotThrow(() -> resetOffsets(service));
+ // shutdown a broker to make some partitions missing leader
+ cluster.shutdownBroker(0);
+ assertThrows(LeaderNotAvailableException.class, () ->
resetOffsets(service));
+ }
+ }
+
+ @ClusterTest
+ public void testResetOffsetsWithPartitionNotExist(ClusterInstance cluster)
throws Exception {
+ String group = generateRandomGroupId();
+ String topic = generateRandomTopic();
+ String[] args = buildArgsForGroup(cluster, group, "--topic", topic +
":2,3",
+ "--to-earliest", "--execute");
+
+ try (Admin admin = cluster.admin();
+ ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(args)) {
+
+ admin.createTopics(singleton(new NewTopic(topic, 1, (short)
1))).all().get();
+ produceConsumeAndShutdown(cluster, topic, group, 2,
GroupProtocol.CLASSIC);
+ assertThrows(UnknownTopicOrPartitionException.class, () ->
resetOffsets(service));
+ }
+ }
+
private String generateRandomTopic() {
return TOPIC_PREFIX + TestUtils.randomString(10);
}