chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1576262016
##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##########
@@ -170,30 +227,23 @@ private void produceRecord() {
}
}
- private void withStableConsumerGroup(Runnable body) {
- Consumer<byte[], byte[]> consumer = createConsumer(new Properties());
- try {
- TestUtils.subscribeAndWaitForRecords(TOPIC, consumer,
DEFAULT_MAX_WAIT_MS);
+ private void withConsumerGroup(Runnable body, boolean isStable, Properties
consumerProperties) {
+ try (Consumer<byte[], byte[]> consumer =
createConsumer(consumerProperties)) {
+ consumer.subscribe(Collections.singletonList(TOPIC));
+ ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS));
+ Assertions.assertNotEquals(0, records.count());
consumer.commitSync();
- body.run();
- } finally {
- Utils.closeQuietly(consumer, "consumer");
+ if (isStable) {
+ body.run();
+ }
}
- }
-
- private void withEmptyConsumerGroup(Runnable body) {
- Consumer<byte[], byte[]> consumer = createConsumer(new Properties());
- try {
- TestUtils.subscribeAndWaitForRecords(TOPIC, consumer,
DEFAULT_MAX_WAIT_MS);
- consumer.commitSync();
- } finally {
- Utils.closeQuietly(consumer, "consumer");
+ if (!isStable) {
+ body.run();
}
- body.run();
}
private KafkaProducer<byte[], byte[]> createProducer(Properties config) {
Review Comment:
It seems `config` is always empty, so please remove it.
##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##########
@@ -42,109 +56,152 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
-public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends
ConsumerGroupCommandTest {
- String[] getArgs(String group, String topic) {
- return new String[] {
- "--bootstrap-server", bootstrapServers(listenerName()),
- "--delete-offsets",
- "--group", group,
- "--topic", topic
- };
+@Tag("integration")
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+ @ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true")
+})
+@ExtendWith(ClusterTestExtensions.class)
+public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
+ private final ClusterInstance clusterInstance;
+ private ConsumerGroupCommand.ConsumerGroupService consumerGroupService;
+ public static final String TOPIC = "foo";
+ public static final String GROUP = "test.group";
+
+ DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance
clusterInstance) {
+ this.clusterInstance = clusterInstance;
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (consumerGroupService != null) {
+ consumerGroupService.close();
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDeleteOffsetsNonExistingGroup(String quorum) {
+ @ClusterTest
+ public void testDeleteOffsetsNonExistingGroup() {
String group = "missing.group";
String topic = "foo:1";
- ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(getArgs(group, topic));
+ setupConsumerGroupService(getArgs(group, topic));
- Entry<Errors, Map<TopicPartition, Throwable>> res =
service.deleteOffsets(group, Collections.singletonList(topic));
+ Entry<Errors, Map<TopicPartition, Throwable>> res =
consumerGroupService.deleteOffsets(group, Collections.singletonList(topic));
assertEquals(Errors.GROUP_ID_NOT_FOUND, res.getKey());
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void
testDeleteOffsetsOfStableConsumerGroupWithTopicPartition(String quorum) {
- testWithStableConsumerGroup(TOPIC, 0, 0,
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+ @ClusterTest
+ public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() {
+ createTopic(TOPIC);
+ Properties consumerProperties = new Properties();
+ testWithConsumerGroup(TOPIC, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC,
true, consumerProperties);
+ if (clusterInstance.isKRaftTest()) {
+ consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name());
+ testWithConsumerGroup(TOPIC, 0, 0,
Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerProperties);
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly(String
quorum) {
- testWithStableConsumerGroup(TOPIC, -1, 0,
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+ @ClusterTest
+ public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly() {
+ createTopic(TOPIC);
+ Properties consumerProperties = new Properties();
+ testWithConsumerGroup(TOPIC, -1, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC,
true, consumerProperties);
+ if (clusterInstance.isKRaftTest()) {
+ consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name());
+ testWithConsumerGroup(TOPIC, -1, 0,
Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerProperties);
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void
testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicPartition(String quorum) {
- testWithStableConsumerGroup("foobar", 0, 0,
Errors.UNKNOWN_TOPIC_OR_PARTITION);
+ @ClusterTest
+ public void
testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicPartition() {
+ Properties consumerProperties = new Properties();
+ testWithConsumerGroup("foobar", 0, 0,
Errors.UNKNOWN_TOPIC_OR_PARTITION, true, consumerProperties);
+ if (clusterInstance.isKRaftTest()) {
+ consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name());
+ testWithConsumerGroup("foobar", 0, 0,
Errors.UNKNOWN_TOPIC_OR_PARTITION, true, consumerProperties);
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void
testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicOnly(String quorum) {
- testWithStableConsumerGroup("foobar", -1, -1,
Errors.UNKNOWN_TOPIC_OR_PARTITION);
+ @ClusterTest
+ public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicOnly() {
+ Properties consumerProperties = new Properties();
+ testWithConsumerGroup("foobar", -1, -1,
Errors.UNKNOWN_TOPIC_OR_PARTITION, true, consumerProperties);
+ if (clusterInstance.isKRaftTest()) {
+ consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name());
+ testWithConsumerGroup("foobar", -1, -1,
Errors.UNKNOWN_TOPIC_OR_PARTITION, true, consumerProperties);
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicPartition(String
quorum) {
- testWithEmptyConsumerGroup(TOPIC, 0, 0, Errors.NONE);
+ @ClusterTest
+ public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicPartition() {
+ createTopic(TOPIC);
+ Properties consumerProperties = new Properties();
+ testWithConsumerGroup(TOPIC, 0, 0, Errors.NONE, false,
consumerProperties);
+ if (clusterInstance.isKRaftTest()) {
+ consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name());
+ testWithConsumerGroup(TOPIC, 0, 0, Errors.NONE, false,
consumerProperties);
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly(String
quorum) {
- testWithEmptyConsumerGroup(TOPIC, -1, 0, Errors.NONE);
+ @ClusterTest
+ public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly() {
+ createTopic(TOPIC);
+ Properties consumerProperties = new Properties();
+ testWithConsumerGroup(TOPIC, -1, 0, Errors.NONE, false,
consumerProperties);
+ if (clusterInstance.isKRaftTest()) {
+ consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name());
+ testWithConsumerGroup(TOPIC, -1, 0, Errors.NONE, false,
consumerProperties);
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void
testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicPartition(String quorum) {
- testWithEmptyConsumerGroup("foobar", 0, 0,
Errors.UNKNOWN_TOPIC_OR_PARTITION);
+ @ClusterTest
+ public void
testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicPartition() {
+ Properties consumerProperties = new Properties();
+ testWithConsumerGroup("foobar", 0, 0,
Errors.UNKNOWN_TOPIC_OR_PARTITION, false, consumerProperties);
+ if (clusterInstance.isKRaftTest()) {
+ consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name());
Review Comment:
we should test both `CONSUMER` and `CLASSIC` for kraft. Also, could you
initialize the consumer configs in construction. for example;
```java
DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance
clusterInstance) {
this.clusterInstance = clusterInstance;
this.consumerConfigs = clusterInstance.isKRaftTest()
?
Arrays.asList(Collections.singletonMap(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name()),
Collections.singletonMap(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name()))
: Collections.emptyList();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]