chia7712 commented on code in PR #16127:
URL: https://github.com/apache/kafka/pull/16127#discussion_r1706607856
##########
tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java:
##########
@@ -85,653 +80,761 @@
import static org.mockito.Mockito.spy;
@Tag("integration")
-@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for
usages of JavaConverters
-public class TopicCommandIntegrationTest extends
kafka.integration.KafkaServerTestHarness implements Logging, RackAwareTest {
+@ExtendWith(ClusterTestExtensions.class)
+public class TopicCommandIntegrationTest {
Review Comment:
@TaiJuWu Could you please merge `TopicCommandTest` and
`TopicCommandIntegrationTest` into single one? `ClusterTest` and `Test` can be
existent in the same class
##########
tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java:
##########
@@ -85,653 +80,761 @@
import static org.mockito.Mockito.spy;
@Tag("integration")
-@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for
usages of JavaConverters
-public class TopicCommandIntegrationTest extends
kafka.integration.KafkaServerTestHarness implements Logging, RackAwareTest {
+@ExtendWith(ClusterTestExtensions.class)
+public class TopicCommandIntegrationTest {
private final short defaultReplicationFactor = 1;
private final int defaultNumPartitions = 1;
- private TopicCommand.TopicService topicService;
- private Admin adminClient;
- private String bootstrapServer;
- private String testTopicName;
- private Buffer<KafkaBroker> scalaBrokers;
- private Seq<ControllerServer> scalaControllers;
- /**
- * Implementations must override this method to return a set of
KafkaConfigs. This method will be invoked for every
- * test and should not reuse previous configurations unless they select
their ports randomly when servers are started.
- *
- * Note the replica fetch max bytes is set to `1` in order to throttle the
rate of replication for test
- * `testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress`.
- */
- @Override
- public scala.collection.Seq<KafkaConfig> generateConfigs() {
- Map<Integer, String> rackInfo = new HashMap<>();
- rackInfo.put(0, "rack1");
- rackInfo.put(1, "rack2");
- rackInfo.put(2, "rack2");
- rackInfo.put(3, "rack1");
- rackInfo.put(4, "rack3");
- rackInfo.put(5, "rack3");
-
- List<Properties> brokerConfigs = ToolsTestUtils
- .createBrokerProperties(6, zkConnectOrNull(), rackInfo,
defaultNumPartitions, defaultReplicationFactor);
-
- List<KafkaConfig> configs = new ArrayList<>();
- for (Properties props : brokerConfigs) {
- props.put(REPLICA_FETCH_MAX_BYTES_CONFIG, "1");
- configs.add(KafkaConfig.fromProps(props));
- }
- return JavaConverters.asScalaBuffer(configs).toSeq();
- }
+ private final ClusterInstance clusterInstance;
private TopicCommand.TopicCommandOptions
buildTopicCommandOptionsWithBootstrap(String... opts) {
+ String bootstrapServer = clusterInstance.bootstrapServers();
String[] finalOptions = Stream.concat(Arrays.stream(opts),
Stream.of("--bootstrap-server", bootstrapServer)
).toArray(String[]::new);
return new TopicCommand.TopicCommandOptions(finalOptions);
}
- @BeforeEach
- public void setUp(TestInfo info) {
- super.setUp(info);
- // create adminClient
- Properties props = new Properties();
- bootstrapServer = bootstrapServers(listenerName());
- props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer);
- adminClient = Admin.create(props);
- topicService = new TopicCommand.TopicService(props,
Optional.of(bootstrapServer));
- testTopicName = String.format("%s-%s",
info.getTestMethod().get().getName(),
org.apache.kafka.test.TestUtils.randomString(10));
- scalaBrokers = brokers();
- scalaControllers = controllerServers();
+ static List<ClusterConfig> generate1() {
+ Map<String, String> serverProp = new HashMap<>();
+ serverProp.put(REPLICA_FETCH_MAX_BYTES_CONFIG, "1"); // if config name
error, no exception throw
+
+ Map<Integer, Map<String, String>> rackInfo = new HashMap<>();
+ Map<String, String> infoPerBroker1 = new HashMap<>();
+ infoPerBroker1.put("broker.rack", "rack1");
+ Map<String, String> infoPerBroker2 = new HashMap<>();
+ infoPerBroker2.put("broker.rack", "rack2");
+ Map<String, String> infoPerBroker3 = new HashMap<>();
+ infoPerBroker3.put("broker.rack", "rack2");
+ Map<String, String> infoPerBroker4 = new HashMap<>();
+ infoPerBroker4.put("broker.rack", "rack1");
+ Map<String, String> infoPerBroker5 = new HashMap<>();
+ infoPerBroker5.put("broker.rack", "rack3");
+ Map<String, String> infoPerBroker6 = new HashMap<>();
+ infoPerBroker6.put("broker.rack", "rack3");
+
+ rackInfo.put(0, infoPerBroker1);
+ rackInfo.put(1, infoPerBroker2);
+ rackInfo.put(2, infoPerBroker3);
+ rackInfo.put(3, infoPerBroker4);
+ rackInfo.put(4, infoPerBroker5);
+ rackInfo.put(5, infoPerBroker6);
+
+ return Collections.singletonList(ClusterConfig.defaultBuilder()
+ .setBrokers(6)
+ .setServerProperties(serverProp)
+ .setPerServerProperties(rackInfo)
+ .build()
+ );
}
- @AfterEach
- public void close() throws Exception {
- if (topicService != null)
- topicService.close();
- if (adminClient != null)
- adminClient.close();
+ TopicCommandIntegrationTest(ClusterInstance clusterInstance) {
+ this.clusterInstance = clusterInstance;
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testCreate(String quorum) throws Exception {
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, 2, 1,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
-
assertTrue(adminClient.listTopics().names().get().contains(testTopicName),
- "Admin client didn't see the created topic. It saw: " +
adminClient.listTopics().names().get());
+ @ClusterTemplate("generate1")
+ public void testCreate(TestInfo testInfo) throws InterruptedException,
ExecutionException {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
Review Comment:
not sure why we need the postfix? each test case has its own embedded
cluster, so there is no name conflicting I feel :_
##########
tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java:
##########
@@ -740,175 +843,181 @@ public void testDescribeUnavailablePartitions(String
quorum) throws ExecutionExc
"Unexpected Topic " + rows[0] + " received. Expect " +
String.format("Topic: %s", testTopicName));
assertTrue(rows[0].contains("Leader: none\tReplicas: 0\tIsr:"),
"Rows did not contain 'Leader: none\tReplicas: 0\tIsr:'");
- } finally {
- restartDeadBrokers(false);
+
}
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDescribeUnderReplicatedPartitions(String quorum) {
- int partitions = 1;
- short replicationFactor = 6;
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, partitions, replicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
- try {
- killBroker(0);
- if (isKRaftTest()) {
- ensureConsistentKRaftMetadata();
- } else {
- TestUtils.waitForPartitionMetadata(aliveBrokers(),
testTopicName, 0, org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS);
- }
+ @ClusterTemplate("generate1")
+ public void testDescribeUnderReplicatedPartitions(TestInfo testInfo)
throws InterruptedException {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ try (Admin adminClient = clusterInstance.createAdminClient()) {
+ int partitions = 1;
+ short replicationFactor = 6;
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partitions, replicationFactor)));
+ clusterInstance.waitForTopic(testTopicName, partitions);
+
+ clusterInstance.shutdownBroker(0);
+ Assertions.assertEquals(clusterInstance.aliveBrokers().size(), 5);
+
+ TestUtils.waitForCondition(
+ () ->
clusterInstance.aliveBrokers().values().stream().allMatch(
+ broker -> {
+
Optional<UpdateMetadataRequestData.UpdateMetadataPartitionState> partitionState
=
+
Optional.ofNullable(broker.metadataCache().getPartitionInfo(testTopicName,
0).getOrElse(null));
+ return partitionState.map(s ->
FetchRequest.isValidBrokerId(s.leader())).orElse(false);
+ }
+ ), 60000, "Meta data propogation fail in 60000 ms");
+
String output =
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
"--under-replicated-partitions"));
String[] rows = output.split(System.lineSeparator());
assertTrue(rows[0].startsWith(String.format("Topic: %s",
testTopicName)), String.format("Unexpected output: %s", rows[0]));
- } finally {
- restartDeadBrokers(false);
}
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDescribeUnderMinIsrPartitions(String quorum) {
- Properties topicConfig = new Properties();
- topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6");
- int partitions = 1;
- short replicationFactor = 6;
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, partitions, replicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), topicConfig
- );
- try {
- killBroker(0);
- if (isKRaftTest()) {
- ensureConsistentKRaftMetadata();
- } else {
- kafka.utils.TestUtils.waitUntilTrue(
- () -> aliveBrokers().forall(b ->
b.metadataCache().getPartitionInfo(testTopicName, 0).get().isr().size() == 5),
- () -> String.format("Timeout waiting for partition
metadata propagating to brokers for %s topic", testTopicName),
- org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L
- );
- }
- String output =
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
"--under-min-isr-partitions"));
+
+ @ClusterTemplate("generate1")
+ public void testDescribeUnderMinIsrPartitions(TestInfo testInfo) throws
InterruptedException {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+
+ try (Admin adminClient = clusterInstance.createAdminClient()) {
+ Map<String, String> topicConfig = new HashMap<>();
+ topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6");
+ int partitions = 1;
+ short replicationFactor = 6;
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partitions, replicationFactor).configs(topicConfig)));
+ clusterInstance.waitForTopic(testTopicName, partitions);
+
+ clusterInstance.shutdownBroker(0);
+ assertEquals(5, clusterInstance.aliveBrokers().size());
+
+ TestUtils.waitForCondition(
+ () ->
clusterInstance.aliveBrokers().values().stream().allMatch(broker ->
broker.metadataCache().getPartitionInfo(testTopicName, 0).get().isr().size() ==
5),
+ 60000, String.format("Timeout waiting for partition
metadata propagating to brokers for %s topic", testTopicName)
+ );
+
+ String output =
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
"--under-min-isr-partitions", "--exclude-internal"));
String[] rows = output.split(System.lineSeparator());
assertTrue(rows[0].startsWith(String.format("Topic: %s",
testTopicName)),
"Unexpected topic: " + rows[0]);
- } finally {
- restartDeadBrokers(false);
}
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String
quorum) throws ExecutionException, InterruptedException {
- TopicPartition tp = new TopicPartition(testTopicName, 0);
+ @ClusterTemplate("generate1")
+ public void
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(TestInfo
testInfo) throws ExecutionException, InterruptedException {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
+ try (Admin adminClient = clusterInstance.createAdminClient();
+ KafkaProducer<String, String> producer = createProducer()) {
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
+ clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
- // Produce multiple batches.
- TestUtils.generateAndProduceMessages(brokers(), testTopicName, 10, -1);
- TestUtils.generateAndProduceMessages(brokers(), testTopicName, 10, -1);
+ TopicPartition tp = new TopicPartition(testTopicName, 0);
- // Enable throttling. Note the broker config sets the replica max
fetch bytes to `1` upon to minimize replication
- // throughput so the reassignment doesn't complete quickly.
- List<Integer> brokerIds =
JavaConverters.seqAsJavaList(brokers()).stream()
- .map(broker ->
broker.config().brokerId()).collect(Collectors.toList());
+ // Produce multiple batches.
+ sendProducerRecords(testTopicName, producer, 10);
+ sendProducerRecords(testTopicName, producer, 10);
- ToolsTestUtils.setReplicationThrottleForPartitions(adminClient,
brokerIds, Collections.singleton(tp), 1);
+ // Enable throttling. Note the broker config sets the replica max
fetch bytes to `1` upon to minimize replication
+ // throughput so the reassignment doesn't complete quickly.
+ List<Integer> brokerIds = new
ArrayList<>(clusterInstance.brokerIds());
- TopicDescription testTopicDesc =
adminClient.describeTopics(Collections.singleton(testTopicName)).allTopicNames().get().get(testTopicName);
- TopicPartitionInfo firstPartition = testTopicDesc.partitions().get(0);
+ ToolsTestUtils.setReplicationThrottleForPartitions(adminClient,
brokerIds, Collections.singleton(tp), 1);
- List<Integer> replicasOfFirstPartition =
firstPartition.replicas().stream().map(Node::id).collect(Collectors.toList());
- List<Integer> replicasDiff = new ArrayList<>(brokerIds);
- replicasDiff.removeAll(replicasOfFirstPartition);
- Integer targetReplica = replicasDiff.get(0);
+ TopicDescription testTopicDesc =
adminClient.describeTopics(Collections.singleton(testTopicName)).allTopicNames().get().get(testTopicName);
+ TopicPartitionInfo firstPartition =
testTopicDesc.partitions().get(0);
- adminClient.alterPartitionReassignments(Collections.singletonMap(tp,
- Optional.of(new
NewPartitionReassignment(Collections.singletonList(targetReplica))))).all().get();
+ List<Integer> replicasOfFirstPartition =
firstPartition.replicas().stream().map(Node::id).collect(Collectors.toList());
+ List<Integer> replicasDiff = new ArrayList<>(brokerIds);
+ replicasDiff.removeAll(replicasOfFirstPartition);
+ Integer targetReplica = replicasDiff.get(0);
- // let's wait until the LAIR is propagated
- kafka.utils.TestUtils.waitUntilTrue(
- () -> {
- try {
- return
!adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get()
- .get(tp).addingReplicas().isEmpty();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
- },
- () -> "Reassignment didn't add the second node",
- org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L);
-
- ensureConsistentKRaftMetadata();
-
- // describe the topic and test if it's under-replicated
- String simpleDescribeOutput =
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
"--topic", testTopicName));
- String[] simpleDescribeOutputRows =
simpleDescribeOutput.split(System.lineSeparator());
-
assertTrue(simpleDescribeOutputRows[0].startsWith(String.format("Topic: %s",
testTopicName)),
- "Unexpected describe output: " + simpleDescribeOutputRows[0]);
- assertEquals(2, simpleDescribeOutputRows.length,
- "Unexpected describe output length: " +
simpleDescribeOutputRows.length);
-
- String underReplicatedOutput =
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
"--under-replicated-partitions"));
- assertEquals("", underReplicatedOutput,
- String.format("--under-replicated-partitions shouldn't return
anything: '%s'", underReplicatedOutput));
-
- int maxRetries = 20;
- long pause = 100L;
- long waitTimeMs = maxRetries * pause;
- AtomicReference<PartitionReassignment> reassignmentsRef = new
AtomicReference<>();
-
- TestUtils.waitUntilTrue(() -> {
- try {
- PartitionReassignment tempReassignments =
adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get().get(tp);
- reassignmentsRef.set(tempReassignments);
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException("Error while fetching
reassignments", e);
- }
- return reassignmentsRef.get() != null;
- }, () -> "Reassignments did not become non-null within the specified
time", waitTimeMs, pause);
+
adminClient.alterPartitionReassignments(Collections.singletonMap(tp,
+ Optional.of(new
NewPartitionReassignment(Collections.singletonList(targetReplica))))).all().get();
+
+ // let's wait until the LAIR is propagated
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ return
!adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get()
+ .get(tp).addingReplicas().isEmpty();
+ } catch (InterruptedException | ExecutionException e) {
Review Comment:
we don't need this try-catch, right?
##########
tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java:
##########
@@ -740,175 +843,181 @@ public void testDescribeUnavailablePartitions(String
quorum) throws ExecutionExc
"Unexpected Topic " + rows[0] + " received. Expect " +
String.format("Topic: %s", testTopicName));
assertTrue(rows[0].contains("Leader: none\tReplicas: 0\tIsr:"),
"Rows did not contain 'Leader: none\tReplicas: 0\tIsr:'");
- } finally {
- restartDeadBrokers(false);
+
}
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDescribeUnderReplicatedPartitions(String quorum) {
- int partitions = 1;
- short replicationFactor = 6;
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, partitions, replicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
- try {
- killBroker(0);
- if (isKRaftTest()) {
- ensureConsistentKRaftMetadata();
- } else {
- TestUtils.waitForPartitionMetadata(aliveBrokers(),
testTopicName, 0, org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS);
- }
+ @ClusterTemplate("generate1")
+ public void testDescribeUnderReplicatedPartitions(TestInfo testInfo)
throws InterruptedException {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ try (Admin adminClient = clusterInstance.createAdminClient()) {
+ int partitions = 1;
+ short replicationFactor = 6;
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partitions, replicationFactor)));
+ clusterInstance.waitForTopic(testTopicName, partitions);
+
+ clusterInstance.shutdownBroker(0);
+ Assertions.assertEquals(clusterInstance.aliveBrokers().size(), 5);
+
+ TestUtils.waitForCondition(
+ () ->
clusterInstance.aliveBrokers().values().stream().allMatch(
+ broker -> {
+
Optional<UpdateMetadataRequestData.UpdateMetadataPartitionState> partitionState
=
+
Optional.ofNullable(broker.metadataCache().getPartitionInfo(testTopicName,
0).getOrElse(null));
+ return partitionState.map(s ->
FetchRequest.isValidBrokerId(s.leader())).orElse(false);
+ }
+ ), 60000, "Meta data propogation fail in 60000 ms");
+
String output =
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
"--under-replicated-partitions"));
String[] rows = output.split(System.lineSeparator());
assertTrue(rows[0].startsWith(String.format("Topic: %s",
testTopicName)), String.format("Unexpected output: %s", rows[0]));
- } finally {
- restartDeadBrokers(false);
}
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDescribeUnderMinIsrPartitions(String quorum) {
- Properties topicConfig = new Properties();
- topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6");
- int partitions = 1;
- short replicationFactor = 6;
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, partitions, replicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), topicConfig
- );
- try {
- killBroker(0);
- if (isKRaftTest()) {
- ensureConsistentKRaftMetadata();
- } else {
- kafka.utils.TestUtils.waitUntilTrue(
- () -> aliveBrokers().forall(b ->
b.metadataCache().getPartitionInfo(testTopicName, 0).get().isr().size() == 5),
- () -> String.format("Timeout waiting for partition
metadata propagating to brokers for %s topic", testTopicName),
- org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L
- );
- }
- String output =
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
"--under-min-isr-partitions"));
+
+ @ClusterTemplate("generate1")
+ public void testDescribeUnderMinIsrPartitions(TestInfo testInfo) throws
InterruptedException {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+
+ try (Admin adminClient = clusterInstance.createAdminClient()) {
+ Map<String, String> topicConfig = new HashMap<>();
+ topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6");
+ int partitions = 1;
+ short replicationFactor = 6;
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partitions, replicationFactor).configs(topicConfig)));
+ clusterInstance.waitForTopic(testTopicName, partitions);
+
+ clusterInstance.shutdownBroker(0);
+ assertEquals(5, clusterInstance.aliveBrokers().size());
+
+ TestUtils.waitForCondition(
+ () ->
clusterInstance.aliveBrokers().values().stream().allMatch(broker ->
broker.metadataCache().getPartitionInfo(testTopicName, 0).get().isr().size() ==
5),
+ 60000, String.format("Timeout waiting for partition
metadata propagating to brokers for %s topic", testTopicName)
+ );
+
+ String output =
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
"--under-min-isr-partitions", "--exclude-internal"));
String[] rows = output.split(System.lineSeparator());
assertTrue(rows[0].startsWith(String.format("Topic: %s",
testTopicName)),
"Unexpected topic: " + rows[0]);
- } finally {
- restartDeadBrokers(false);
}
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String
quorum) throws ExecutionException, InterruptedException {
- TopicPartition tp = new TopicPartition(testTopicName, 0);
+ @ClusterTemplate("generate1")
+ public void
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(TestInfo
testInfo) throws ExecutionException, InterruptedException {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
+ try (Admin adminClient = clusterInstance.createAdminClient();
+ KafkaProducer<String, String> producer = createProducer()) {
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
+ clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
- // Produce multiple batches.
- TestUtils.generateAndProduceMessages(brokers(), testTopicName, 10, -1);
- TestUtils.generateAndProduceMessages(brokers(), testTopicName, 10, -1);
+ TopicPartition tp = new TopicPartition(testTopicName, 0);
- // Enable throttling. Note the broker config sets the replica max
fetch bytes to `1` upon to minimize replication
- // throughput so the reassignment doesn't complete quickly.
- List<Integer> brokerIds =
JavaConverters.seqAsJavaList(brokers()).stream()
- .map(broker ->
broker.config().brokerId()).collect(Collectors.toList());
+ // Produce multiple batches.
+ sendProducerRecords(testTopicName, producer, 10);
+ sendProducerRecords(testTopicName, producer, 10);
- ToolsTestUtils.setReplicationThrottleForPartitions(adminClient,
brokerIds, Collections.singleton(tp), 1);
+ // Enable throttling. Note the broker config sets the replica max
fetch bytes to `1` upon to minimize replication
+ // throughput so the reassignment doesn't complete quickly.
+ List<Integer> brokerIds = new
ArrayList<>(clusterInstance.brokerIds());
- TopicDescription testTopicDesc =
adminClient.describeTopics(Collections.singleton(testTopicName)).allTopicNames().get().get(testTopicName);
- TopicPartitionInfo firstPartition = testTopicDesc.partitions().get(0);
+ ToolsTestUtils.setReplicationThrottleForPartitions(adminClient,
brokerIds, Collections.singleton(tp), 1);
- List<Integer> replicasOfFirstPartition =
firstPartition.replicas().stream().map(Node::id).collect(Collectors.toList());
- List<Integer> replicasDiff = new ArrayList<>(brokerIds);
- replicasDiff.removeAll(replicasOfFirstPartition);
- Integer targetReplica = replicasDiff.get(0);
+ TopicDescription testTopicDesc =
adminClient.describeTopics(Collections.singleton(testTopicName)).allTopicNames().get().get(testTopicName);
+ TopicPartitionInfo firstPartition =
testTopicDesc.partitions().get(0);
- adminClient.alterPartitionReassignments(Collections.singletonMap(tp,
- Optional.of(new
NewPartitionReassignment(Collections.singletonList(targetReplica))))).all().get();
+ List<Integer> replicasOfFirstPartition =
firstPartition.replicas().stream().map(Node::id).collect(Collectors.toList());
+ List<Integer> replicasDiff = new ArrayList<>(brokerIds);
+ replicasDiff.removeAll(replicasOfFirstPartition);
+ Integer targetReplica = replicasDiff.get(0);
- // let's wait until the LAIR is propagated
- kafka.utils.TestUtils.waitUntilTrue(
- () -> {
- try {
- return
!adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get()
- .get(tp).addingReplicas().isEmpty();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
- },
- () -> "Reassignment didn't add the second node",
- org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L);
-
- ensureConsistentKRaftMetadata();
-
- // describe the topic and test if it's under-replicated
- String simpleDescribeOutput =
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
"--topic", testTopicName));
- String[] simpleDescribeOutputRows =
simpleDescribeOutput.split(System.lineSeparator());
-
assertTrue(simpleDescribeOutputRows[0].startsWith(String.format("Topic: %s",
testTopicName)),
- "Unexpected describe output: " + simpleDescribeOutputRows[0]);
- assertEquals(2, simpleDescribeOutputRows.length,
- "Unexpected describe output length: " +
simpleDescribeOutputRows.length);
-
- String underReplicatedOutput =
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
"--under-replicated-partitions"));
- assertEquals("", underReplicatedOutput,
- String.format("--under-replicated-partitions shouldn't return
anything: '%s'", underReplicatedOutput));
-
- int maxRetries = 20;
- long pause = 100L;
- long waitTimeMs = maxRetries * pause;
- AtomicReference<PartitionReassignment> reassignmentsRef = new
AtomicReference<>();
-
- TestUtils.waitUntilTrue(() -> {
- try {
- PartitionReassignment tempReassignments =
adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get().get(tp);
- reassignmentsRef.set(tempReassignments);
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException("Error while fetching
reassignments", e);
- }
- return reassignmentsRef.get() != null;
- }, () -> "Reassignments did not become non-null within the specified
time", waitTimeMs, pause);
+
adminClient.alterPartitionReassignments(Collections.singletonMap(tp,
+ Optional.of(new
NewPartitionReassignment(Collections.singletonList(targetReplica))))).all().get();
+
+ // let's wait until the LAIR is propagated
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ return
!adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get()
+ .get(tp).addingReplicas().isEmpty();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }, 60000, "Reassignment didn't add the second node"
+ );
+
+ // describe the topic and test if it's under-replicated
+ String simpleDescribeOutput =
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
"--topic", testTopicName));
+ String[] simpleDescribeOutputRows =
simpleDescribeOutput.split(System.lineSeparator());
+
assertTrue(simpleDescribeOutputRows[0].startsWith(String.format("Topic: %s",
testTopicName)),
+ "Unexpected describe output: " +
simpleDescribeOutputRows[0]);
+ assertEquals(2, simpleDescribeOutputRows.length,
+ "Unexpected describe output length: " +
simpleDescribeOutputRows.length);
+
+ String underReplicatedOutput =
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
"--under-replicated-partitions"));
+ assertEquals("", underReplicatedOutput,
+ String.format("--under-replicated-partitions shouldn't
return anything: '%s'", underReplicatedOutput));
+
+ int maxRetries = 20;
+ long pause = 100L;
+ long waitTimeMs = maxRetries * pause;
+ AtomicReference<PartitionReassignment> reassignmentsRef = new
AtomicReference<>();
+
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ PartitionReassignment tempReassignments =
adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get().get(tp);
+ reassignmentsRef.set(tempReassignments);
+ } catch (InterruptedException | ExecutionException e) {
Review Comment:
we don't need try-catch, right?
##########
tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java:
##########
@@ -1161,4 +1278,32 @@ public ReplicaDistributions(Map<Integer, List<String>>
partitionRacks,
}
}
+ private List<ProducerRecord<String, String>> generateProduceMessage(String
topic, Integer numMessage) {
+ List<ProducerRecord<String, String>> records = new ArrayList<>();
+ for (int i = 0; i < numMessage; ++i) {
+ records.add(new ProducerRecord<String, String>(topic, "test-" +
i));
+ }
+ return records;
+ }
+
+
+ private KafkaProducer<String, String> createProducer() {
+ Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
clusterInstance.bootstrapServers());
+ producerProps.put(ProducerConfig.ACKS_CONFIG, "-1");
+ return new KafkaProducer<>(producerProps, new StringSerializer(), new
StringSerializer());
+ }
+
+ private void sendProducerRecords(String testTopicName,
KafkaProducer<String, String> producer, Integer numMessage) {
Review Comment:
`Integer` -> int
##########
tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java:
##########
@@ -85,653 +80,761 @@
import static org.mockito.Mockito.spy;
@Tag("integration")
-@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for
usages of JavaConverters
-public class TopicCommandIntegrationTest extends
kafka.integration.KafkaServerTestHarness implements Logging, RackAwareTest {
+@ExtendWith(ClusterTestExtensions.class)
+public class TopicCommandIntegrationTest {
private final short defaultReplicationFactor = 1;
private final int defaultNumPartitions = 1;
- private TopicCommand.TopicService topicService;
- private Admin adminClient;
- private String bootstrapServer;
- private String testTopicName;
- private Buffer<KafkaBroker> scalaBrokers;
- private Seq<ControllerServer> scalaControllers;
- /**
- * Implementations must override this method to return a set of
KafkaConfigs. This method will be invoked for every
- * test and should not reuse previous configurations unless they select
their ports randomly when servers are started.
- *
- * Note the replica fetch max bytes is set to `1` in order to throttle the
rate of replication for test
- * `testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress`.
- */
- @Override
- public scala.collection.Seq<KafkaConfig> generateConfigs() {
- Map<Integer, String> rackInfo = new HashMap<>();
- rackInfo.put(0, "rack1");
- rackInfo.put(1, "rack2");
- rackInfo.put(2, "rack2");
- rackInfo.put(3, "rack1");
- rackInfo.put(4, "rack3");
- rackInfo.put(5, "rack3");
-
- List<Properties> brokerConfigs = ToolsTestUtils
- .createBrokerProperties(6, zkConnectOrNull(), rackInfo,
defaultNumPartitions, defaultReplicationFactor);
-
- List<KafkaConfig> configs = new ArrayList<>();
- for (Properties props : brokerConfigs) {
- props.put(REPLICA_FETCH_MAX_BYTES_CONFIG, "1");
- configs.add(KafkaConfig.fromProps(props));
- }
- return JavaConverters.asScalaBuffer(configs).toSeq();
- }
+ private final ClusterInstance clusterInstance;
private TopicCommand.TopicCommandOptions
buildTopicCommandOptionsWithBootstrap(String... opts) {
+ String bootstrapServer = clusterInstance.bootstrapServers();
String[] finalOptions = Stream.concat(Arrays.stream(opts),
Stream.of("--bootstrap-server", bootstrapServer)
).toArray(String[]::new);
return new TopicCommand.TopicCommandOptions(finalOptions);
}
- @BeforeEach
- public void setUp(TestInfo info) {
- super.setUp(info);
- // create adminClient
- Properties props = new Properties();
- bootstrapServer = bootstrapServers(listenerName());
- props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer);
- adminClient = Admin.create(props);
- topicService = new TopicCommand.TopicService(props,
Optional.of(bootstrapServer));
- testTopicName = String.format("%s-%s",
info.getTestMethod().get().getName(),
org.apache.kafka.test.TestUtils.randomString(10));
- scalaBrokers = brokers();
- scalaControllers = controllerServers();
+ static List<ClusterConfig> generate1() {
+ Map<String, String> serverProp = new HashMap<>();
+ serverProp.put(REPLICA_FETCH_MAX_BYTES_CONFIG, "1"); // if config name
error, no exception throw
+
+ Map<Integer, Map<String, String>> rackInfo = new HashMap<>();
+ Map<String, String> infoPerBroker1 = new HashMap<>();
+ infoPerBroker1.put("broker.rack", "rack1");
+ Map<String, String> infoPerBroker2 = new HashMap<>();
+ infoPerBroker2.put("broker.rack", "rack2");
+ Map<String, String> infoPerBroker3 = new HashMap<>();
+ infoPerBroker3.put("broker.rack", "rack2");
+ Map<String, String> infoPerBroker4 = new HashMap<>();
+ infoPerBroker4.put("broker.rack", "rack1");
+ Map<String, String> infoPerBroker5 = new HashMap<>();
+ infoPerBroker5.put("broker.rack", "rack3");
+ Map<String, String> infoPerBroker6 = new HashMap<>();
+ infoPerBroker6.put("broker.rack", "rack3");
+
+ rackInfo.put(0, infoPerBroker1);
+ rackInfo.put(1, infoPerBroker2);
+ rackInfo.put(2, infoPerBroker3);
+ rackInfo.put(3, infoPerBroker4);
+ rackInfo.put(4, infoPerBroker5);
+ rackInfo.put(5, infoPerBroker6);
+
+ return Collections.singletonList(ClusterConfig.defaultBuilder()
+ .setBrokers(6)
+ .setServerProperties(serverProp)
+ .setPerServerProperties(rackInfo)
+ .build()
+ );
}
- @AfterEach
- public void close() throws Exception {
- if (topicService != null)
- topicService.close();
- if (adminClient != null)
- adminClient.close();
+ TopicCommandIntegrationTest(ClusterInstance clusterInstance) {
+ this.clusterInstance = clusterInstance;
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testCreate(String quorum) throws Exception {
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, 2, 1,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
-
assertTrue(adminClient.listTopics().names().get().contains(testTopicName),
- "Admin client didn't see the created topic. It saw: " +
adminClient.listTopics().names().get());
+ @ClusterTemplate("generate1")
+ public void testCreate(TestInfo testInfo) throws InterruptedException,
ExecutionException {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+
+ try (Admin adminClient = clusterInstance.createAdminClient()) {
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
+
+ clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+
Assertions.assertTrue(adminClient.listTopics().names().get().contains(testTopicName),
+ "Admin client didn't see the created topic. It saw: " +
adminClient.listTopics().names().get());
+
+ adminClient.deleteTopics(Collections.singletonList(testTopicName));
+ clusterInstance.waitForTopic(testTopicName, 0);
+
Assertions.assertTrue(adminClient.listTopics().names().get().isEmpty(),
+ "Admin client see the created topic. It saw: " +
adminClient.listTopics().names().get());
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testCreateWithDefaults(String quorum) throws Exception {
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
- List<TopicPartitionInfo> partitions = adminClient
- .describeTopics(Collections.singletonList(testTopicName))
- .allTopicNames()
- .get()
- .get(testTopicName)
- .partitions();
- assertEquals(defaultNumPartitions, partitions.size(), "Unequal
partition size: " + partitions.size());
- assertEquals(defaultReplicationFactor, (short)
partitions.get(0).replicas().size(), "Unequal replication factor: " +
partitions.get(0).replicas().size());
+ @ClusterTemplate("generate1")
Review Comment:
`generate1` -> `generate`
##########
tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java:
##########
@@ -85,653 +80,761 @@
import static org.mockito.Mockito.spy;
@Tag("integration")
Review Comment:
Could you please remove this tag? it is redundant to
`ClusterTestExtensions`, as it is added automatically.
##########
tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java:
##########
@@ -85,653 +80,761 @@
import static org.mockito.Mockito.spy;
@Tag("integration")
-@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for
usages of JavaConverters
-public class TopicCommandIntegrationTest extends
kafka.integration.KafkaServerTestHarness implements Logging, RackAwareTest {
+@ExtendWith(ClusterTestExtensions.class)
+public class TopicCommandIntegrationTest {
private final short defaultReplicationFactor = 1;
private final int defaultNumPartitions = 1;
- private TopicCommand.TopicService topicService;
- private Admin adminClient;
- private String bootstrapServer;
- private String testTopicName;
- private Buffer<KafkaBroker> scalaBrokers;
- private Seq<ControllerServer> scalaControllers;
- /**
- * Implementations must override this method to return a set of
KafkaConfigs. This method will be invoked for every
- * test and should not reuse previous configurations unless they select
their ports randomly when servers are started.
- *
- * Note the replica fetch max bytes is set to `1` in order to throttle the
rate of replication for test
- * `testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress`.
- */
- @Override
- public scala.collection.Seq<KafkaConfig> generateConfigs() {
- Map<Integer, String> rackInfo = new HashMap<>();
- rackInfo.put(0, "rack1");
- rackInfo.put(1, "rack2");
- rackInfo.put(2, "rack2");
- rackInfo.put(3, "rack1");
- rackInfo.put(4, "rack3");
- rackInfo.put(5, "rack3");
-
- List<Properties> brokerConfigs = ToolsTestUtils
- .createBrokerProperties(6, zkConnectOrNull(), rackInfo,
defaultNumPartitions, defaultReplicationFactor);
-
- List<KafkaConfig> configs = new ArrayList<>();
- for (Properties props : brokerConfigs) {
- props.put(REPLICA_FETCH_MAX_BYTES_CONFIG, "1");
- configs.add(KafkaConfig.fromProps(props));
- }
- return JavaConverters.asScalaBuffer(configs).toSeq();
- }
+ private final ClusterInstance clusterInstance;
private TopicCommand.TopicCommandOptions
buildTopicCommandOptionsWithBootstrap(String... opts) {
+ String bootstrapServer = clusterInstance.bootstrapServers();
String[] finalOptions = Stream.concat(Arrays.stream(opts),
Stream.of("--bootstrap-server", bootstrapServer)
).toArray(String[]::new);
return new TopicCommand.TopicCommandOptions(finalOptions);
}
- @BeforeEach
- public void setUp(TestInfo info) {
- super.setUp(info);
- // create adminClient
- Properties props = new Properties();
- bootstrapServer = bootstrapServers(listenerName());
- props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer);
- adminClient = Admin.create(props);
- topicService = new TopicCommand.TopicService(props,
Optional.of(bootstrapServer));
- testTopicName = String.format("%s-%s",
info.getTestMethod().get().getName(),
org.apache.kafka.test.TestUtils.randomString(10));
- scalaBrokers = brokers();
- scalaControllers = controllerServers();
+ static List<ClusterConfig> generate1() {
+ Map<String, String> serverProp = new HashMap<>();
+ serverProp.put(REPLICA_FETCH_MAX_BYTES_CONFIG, "1"); // if config name
error, no exception throw
+
+ Map<Integer, Map<String, String>> rackInfo = new HashMap<>();
+ Map<String, String> infoPerBroker1 = new HashMap<>();
+ infoPerBroker1.put("broker.rack", "rack1");
+ Map<String, String> infoPerBroker2 = new HashMap<>();
+ infoPerBroker2.put("broker.rack", "rack2");
+ Map<String, String> infoPerBroker3 = new HashMap<>();
+ infoPerBroker3.put("broker.rack", "rack2");
+ Map<String, String> infoPerBroker4 = new HashMap<>();
+ infoPerBroker4.put("broker.rack", "rack1");
+ Map<String, String> infoPerBroker5 = new HashMap<>();
+ infoPerBroker5.put("broker.rack", "rack3");
+ Map<String, String> infoPerBroker6 = new HashMap<>();
+ infoPerBroker6.put("broker.rack", "rack3");
+
+ rackInfo.put(0, infoPerBroker1);
+ rackInfo.put(1, infoPerBroker2);
+ rackInfo.put(2, infoPerBroker3);
+ rackInfo.put(3, infoPerBroker4);
+ rackInfo.put(4, infoPerBroker5);
+ rackInfo.put(5, infoPerBroker6);
+
+ return Collections.singletonList(ClusterConfig.defaultBuilder()
+ .setBrokers(6)
Review Comment:
Maybe we can test "zk and "kraft" instead of all types.
##########
tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java:
##########
@@ -1161,4 +1278,32 @@ public ReplicaDistributions(Map<Integer, List<String>>
partitionRacks,
}
}
+ private List<ProducerRecord<String, String>> generateProduceMessage(String
topic, Integer numMessage) {
+ List<ProducerRecord<String, String>> records = new ArrayList<>();
+ for (int i = 0; i < numMessage; ++i) {
+ records.add(new ProducerRecord<String, String>(topic, "test-" +
i));
+ }
+ return records;
+ }
+
+
+ private KafkaProducer<String, String> createProducer() {
+ Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
clusterInstance.bootstrapServers());
+ producerProps.put(ProducerConfig.ACKS_CONFIG, "-1");
+ return new KafkaProducer<>(producerProps, new StringSerializer(), new
StringSerializer());
+ }
+
+ private void sendProducerRecords(String testTopicName,
KafkaProducer<String, String> producer, Integer numMessage) {
+ List<ProducerRecord<String, String>> records =
generateProduceMessage(testTopicName, numMessage);
+ List<Future<RecordMetadata>> features =
records.stream().map(producer::send).collect(Collectors.toList());
+
+ assertDoesNotThrow(() -> features.forEach(s -> {
Review Comment:
```java
private void sendProducerRecords(String testTopicName,
KafkaProducer<String, String> producer, Integer numMessage) {
IntStream.range(0, numMessage).forEach(i -> producer.send(new
ProducerRecord<>(testTopicName, "test-" + i)));
producer.flush();
}
```
##########
tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java:
##########
@@ -85,653 +80,761 @@
import static org.mockito.Mockito.spy;
@Tag("integration")
-@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for
usages of JavaConverters
-public class TopicCommandIntegrationTest extends
kafka.integration.KafkaServerTestHarness implements Logging, RackAwareTest {
+@ExtendWith(ClusterTestExtensions.class)
+public class TopicCommandIntegrationTest {
private final short defaultReplicationFactor = 1;
private final int defaultNumPartitions = 1;
- private TopicCommand.TopicService topicService;
- private Admin adminClient;
- private String bootstrapServer;
- private String testTopicName;
- private Buffer<KafkaBroker> scalaBrokers;
- private Seq<ControllerServer> scalaControllers;
- /**
- * Implementations must override this method to return a set of
KafkaConfigs. This method will be invoked for every
- * test and should not reuse previous configurations unless they select
their ports randomly when servers are started.
- *
- * Note the replica fetch max bytes is set to `1` in order to throttle the
rate of replication for test
- * `testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress`.
- */
- @Override
- public scala.collection.Seq<KafkaConfig> generateConfigs() {
- Map<Integer, String> rackInfo = new HashMap<>();
- rackInfo.put(0, "rack1");
- rackInfo.put(1, "rack2");
- rackInfo.put(2, "rack2");
- rackInfo.put(3, "rack1");
- rackInfo.put(4, "rack3");
- rackInfo.put(5, "rack3");
-
- List<Properties> brokerConfigs = ToolsTestUtils
- .createBrokerProperties(6, zkConnectOrNull(), rackInfo,
defaultNumPartitions, defaultReplicationFactor);
-
- List<KafkaConfig> configs = new ArrayList<>();
- for (Properties props : brokerConfigs) {
- props.put(REPLICA_FETCH_MAX_BYTES_CONFIG, "1");
- configs.add(KafkaConfig.fromProps(props));
- }
- return JavaConverters.asScalaBuffer(configs).toSeq();
- }
+ private final ClusterInstance clusterInstance;
private TopicCommand.TopicCommandOptions
buildTopicCommandOptionsWithBootstrap(String... opts) {
+ String bootstrapServer = clusterInstance.bootstrapServers();
String[] finalOptions = Stream.concat(Arrays.stream(opts),
Stream.of("--bootstrap-server", bootstrapServer)
).toArray(String[]::new);
return new TopicCommand.TopicCommandOptions(finalOptions);
}
- @BeforeEach
- public void setUp(TestInfo info) {
- super.setUp(info);
- // create adminClient
- Properties props = new Properties();
- bootstrapServer = bootstrapServers(listenerName());
- props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer);
- adminClient = Admin.create(props);
- topicService = new TopicCommand.TopicService(props,
Optional.of(bootstrapServer));
- testTopicName = String.format("%s-%s",
info.getTestMethod().get().getName(),
org.apache.kafka.test.TestUtils.randomString(10));
- scalaBrokers = brokers();
- scalaControllers = controllerServers();
+ static List<ClusterConfig> generate1() {
+ Map<String, String> serverProp = new HashMap<>();
+ serverProp.put(REPLICA_FETCH_MAX_BYTES_CONFIG, "1"); // if config name
error, no exception throw
+
+ Map<Integer, Map<String, String>> rackInfo = new HashMap<>();
+ Map<String, String> infoPerBroker1 = new HashMap<>();
+ infoPerBroker1.put("broker.rack", "rack1");
+ Map<String, String> infoPerBroker2 = new HashMap<>();
+ infoPerBroker2.put("broker.rack", "rack2");
+ Map<String, String> infoPerBroker3 = new HashMap<>();
+ infoPerBroker3.put("broker.rack", "rack2");
+ Map<String, String> infoPerBroker4 = new HashMap<>();
+ infoPerBroker4.put("broker.rack", "rack1");
+ Map<String, String> infoPerBroker5 = new HashMap<>();
+ infoPerBroker5.put("broker.rack", "rack3");
+ Map<String, String> infoPerBroker6 = new HashMap<>();
+ infoPerBroker6.put("broker.rack", "rack3");
+
+ rackInfo.put(0, infoPerBroker1);
+ rackInfo.put(1, infoPerBroker2);
+ rackInfo.put(2, infoPerBroker3);
+ rackInfo.put(3, infoPerBroker4);
+ rackInfo.put(4, infoPerBroker5);
+ rackInfo.put(5, infoPerBroker6);
+
+ return Collections.singletonList(ClusterConfig.defaultBuilder()
+ .setBrokers(6)
+ .setServerProperties(serverProp)
+ .setPerServerProperties(rackInfo)
+ .build()
+ );
}
- @AfterEach
- public void close() throws Exception {
- if (topicService != null)
- topicService.close();
- if (adminClient != null)
- adminClient.close();
+ TopicCommandIntegrationTest(ClusterInstance clusterInstance) {
+ this.clusterInstance = clusterInstance;
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testCreate(String quorum) throws Exception {
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, 2, 1,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
-
assertTrue(adminClient.listTopics().names().get().contains(testTopicName),
- "Admin client didn't see the created topic. It saw: " +
adminClient.listTopics().names().get());
+ @ClusterTemplate("generate1")
+ public void testCreate(TestInfo testInfo) throws InterruptedException,
ExecutionException {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+
+ try (Admin adminClient = clusterInstance.createAdminClient()) {
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
+
+ clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+
Assertions.assertTrue(adminClient.listTopics().names().get().contains(testTopicName),
+ "Admin client didn't see the created topic. It saw: " +
adminClient.listTopics().names().get());
+
+ adminClient.deleteTopics(Collections.singletonList(testTopicName));
+ clusterInstance.waitForTopic(testTopicName, 0);
+
Assertions.assertTrue(adminClient.listTopics().names().get().isEmpty(),
+ "Admin client see the created topic. It saw: " +
adminClient.listTopics().names().get());
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testCreateWithDefaults(String quorum) throws Exception {
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
- List<TopicPartitionInfo> partitions = adminClient
- .describeTopics(Collections.singletonList(testTopicName))
- .allTopicNames()
- .get()
- .get(testTopicName)
- .partitions();
- assertEquals(defaultNumPartitions, partitions.size(), "Unequal
partition size: " + partitions.size());
- assertEquals(defaultReplicationFactor, (short)
partitions.get(0).replicas().size(), "Unequal replication factor: " +
partitions.get(0).replicas().size());
+ @ClusterTemplate("generate1")
+ public void testCreateWithDefaults(TestInfo testInfo) throws
InterruptedException, ExecutionException {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+
+ try (Admin adminClient = clusterInstance.createAdminClient()) {
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
+
+ clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+
Assertions.assertTrue(adminClient.listTopics().names().get().contains(testTopicName),
+ "Admin client didn't see the created topic. It saw: " +
adminClient.listTopics().names().get());
+
+ List<TopicPartitionInfo> partitions = adminClient
+ .describeTopics(Collections.singletonList(testTopicName))
+ .allTopicNames()
+ .get()
+ .get(testTopicName)
+ .partitions();
+ Assertions.assertEquals(defaultNumPartitions, partitions.size(),
"Unequal partition size: " + partitions.size());
+ Assertions.assertEquals(defaultReplicationFactor, (short)
partitions.get(0).replicas().size(), "Unequal replication factor: " +
partitions.get(0).replicas().size());
+
+ adminClient.deleteTopics(Collections.singletonList(testTopicName));
+ clusterInstance.waitForTopic(testTopicName, 0);
+
Assertions.assertTrue(adminClient.listTopics().names().get().isEmpty(),
+ "Admin client see the created topic. It saw: " +
adminClient.listTopics().names().get());
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testCreateWithDefaultReplication(String quorum) throws
Exception {
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, 2, defaultReplicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
- List<TopicPartitionInfo> partitions = adminClient
- .describeTopics(Collections.singletonList(testTopicName))
- .allTopicNames()
- .get()
- .get(testTopicName)
- .partitions();
- assertEquals(2, partitions.size(), "Unequal partition size: " +
partitions.size());
- assertEquals(defaultReplicationFactor, (short)
partitions.get(0).replicas().size(), "Unequal replication factor: " +
partitions.get(0).replicas().size());
+ @ClusterTemplate("generate1")
+ public void testCreateWithDefaultReplication(TestInfo testInfo) throws
InterruptedException, ExecutionException {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+
+ try (Admin adminClient = clusterInstance.createAdminClient()) {
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, 2, defaultReplicationFactor)));
+ clusterInstance.waitForTopic(testTopicName, 2);
+ List<TopicPartitionInfo> partitions = adminClient
+ .describeTopics(Collections.singletonList(testTopicName))
+ .allTopicNames()
+ .get()
+ .get(testTopicName)
+ .partitions();
+ assertEquals(2, partitions.size(), "Unequal partition size: " +
partitions.size());
+ assertEquals(defaultReplicationFactor, (short)
partitions.get(0).replicas().size(), "Unequal replication factor: " +
partitions.get(0).replicas().size());
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testCreateWithDefaultPartitions(String quorum) throws
Exception {
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, defaultNumPartitions, 2,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
- List<TopicPartitionInfo> partitions = adminClient
- .describeTopics(Collections.singletonList(testTopicName))
- .allTopicNames()
- .get()
- .get(testTopicName)
- .partitions();
-
- assertEquals(defaultNumPartitions, partitions.size(), "Unequal
partition size: " + partitions.size());
- assertEquals(2, (short) partitions.get(0).replicas().size(),
"Partitions not replicated: " + partitions.get(0).replicas().size());
+ @ClusterTemplate("generate1")
+ public void testCreateWithDefaultPartitions(TestInfo testInfo) throws
InterruptedException, ExecutionException {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+
+ try (Admin adminClient = clusterInstance.createAdminClient()) {
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, (short) 2)));
+ clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+ List<TopicPartitionInfo> partitions = adminClient
+ .describeTopics(Collections.singletonList(testTopicName))
+ .allTopicNames()
+ .get()
+ .get(testTopicName)
+ .partitions();
+
+ assertEquals(defaultNumPartitions, partitions.size(), "Unequal
partition size: " + partitions.size());
+ assertEquals(2, (short) partitions.get(0).replicas().size(),
"Partitions not replicated: " + partitions.get(0).replicas().size());
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testCreateWithConfigs(String quorum) throws Exception {
- ConfigResource configResource = new
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
- Properties topicConfig = new Properties();
- topicConfig.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, "1000");
+ @ClusterTemplate("generate1")
+ public void testCreateWithConfigs(TestInfo testInfo) throws Exception {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, 2, 2,
- scala.collection.immutable.Map$.MODULE$.empty(), topicConfig
- );
+ try (Admin adminClient = clusterInstance.createAdminClient()) {
+ ConfigResource configResource = new
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
+ Map<String, String> topicConfig = new HashMap<>();
+ topicConfig.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, "1000");
- Config configs =
adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource);
- assertEquals(1000,
Integer.valueOf(configs.get("delete.retention.ms").value()),
- "Config not set correctly: " +
configs.get("delete.retention.ms").value());
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, 2, (short) 2).configs(topicConfig)));
+ clusterInstance.waitForTopic(testTopicName, 2);
+
+
+ Config configs =
adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource);
+ assertEquals(1000,
Integer.valueOf(configs.get("delete.retention.ms").value()),
+ "Config not set correctly: " +
configs.get("delete.retention.ms").value());
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testCreateWhenAlreadyExists(String quorum) {
- // create the topic
- TopicCommand.TopicCommandOptions createOpts =
buildTopicCommandOptionsWithBootstrap(
- "--create", "--partitions",
Integer.toString(defaultNumPartitions), "--replication-factor", "1",
- "--topic", testTopicName);
+ @ClusterTemplate("generate1")
+ public void testCreateWhenAlreadyExists(TestInfo testInfo) throws
Exception {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ try (Admin adminClient = clusterInstance.createAdminClient();
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
+ TopicCommand.TopicCommandOptions createOpts =
buildTopicCommandOptionsWithBootstrap(
+ "--create", "--partitions",
Integer.toString(defaultNumPartitions), "--replication-factor", "1",
+ "--topic", testTopicName);
+
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
+ clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+
+ // try to re-create the topic
+ assertThrows(TopicExistsException.class, () ->
topicService.createTopic(createOpts),
+ "Expected TopicExistsException to throw");
+ }
+ }
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
- // try to re-create the topic
- assertThrows(TopicExistsException.class, () ->
topicService.createTopic(createOpts),
- "Expected TopicExistsException to throw");
+ @ClusterTemplate("generate1")
+ public void testCreateWhenAlreadyExistsWithIfNotExists(TestInfo testInfo)
throws Exception {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ try (Admin adminClient = clusterInstance.createAdminClient();
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
+ clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+
+ TopicCommand.TopicCommandOptions createOpts =
+ buildTopicCommandOptionsWithBootstrap("--create",
"--topic", testTopicName, "--if-not-exists");
+ topicService.createTopic(createOpts);
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testCreateWhenAlreadyExistsWithIfNotExists(String quorum)
throws Exception {
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
- TopicCommand.TopicCommandOptions createOpts =
- buildTopicCommandOptionsWithBootstrap("--create", "--topic",
testTopicName, "--if-not-exists");
- topicService.createTopic(createOpts);
+ private List<Integer> getPartitionReplicas(List<TopicPartitionInfo>
partitions, int partitionNumber) {
+ return
partitions.get(partitionNumber).replicas().stream().map(Node::id).collect(Collectors.toList());
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testCreateWithReplicaAssignment(String quorum) throws
Exception {
- scala.collection.mutable.HashMap<Object, Seq<Object>>
replicaAssignmentMap = new scala.collection.mutable.HashMap<>();
+ @ClusterTemplate("generate1")
+ public void testCreateWithReplicaAssignment(TestInfo testInfo) throws
Exception {
+ Map<Integer, List<Integer>> replicaAssignmentMap = new HashMap<>();
+ try (Admin adminClient = clusterInstance.createAdminClient()) {
+ String testTopicName = testInfo.getTestMethod().get().getName() +
"-" +
+ TestUtils.randomString(10);
- replicaAssignmentMap.put(0,
JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 5, (Object)
4)).asScala().toSeq());
- replicaAssignmentMap.put(1,
JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 3, (Object)
2)).asScala().toSeq());
- replicaAssignmentMap.put(2,
JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 1, (Object)
0)).asScala().toSeq());
+ replicaAssignmentMap.put(0, Arrays.asList(5, 4));
+ replicaAssignmentMap.put(1, Arrays.asList(3, 2));
+ replicaAssignmentMap.put(2, Arrays.asList(1, 0));
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, defaultNumPartitions,
- defaultReplicationFactor, replicaAssignmentMap, new
Properties()
- );
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, replicaAssignmentMap)));
+ clusterInstance.waitForTopic(testTopicName, 3);
- List<TopicPartitionInfo> partitions = adminClient
- .describeTopics(Collections.singletonList(testTopicName))
- .allTopicNames()
- .get()
- .get(testTopicName)
- .partitions();
-
- assertEquals(3, partitions.size(),
- "Unequal partition size: " + partitions.size());
- assertEquals(Arrays.asList(5, 4), getPartitionReplicas(partitions, 0),
- "Unexpected replica assignment: " +
getPartitionReplicas(partitions, 0));
- assertEquals(Arrays.asList(3, 2), getPartitionReplicas(partitions, 1),
- "Unexpected replica assignment: " +
getPartitionReplicas(partitions, 1));
- assertEquals(Arrays.asList(1, 0), getPartitionReplicas(partitions, 2),
- "Unexpected replica assignment: " +
getPartitionReplicas(partitions, 2));
- }
+ List<TopicPartitionInfo> partitions = adminClient
+ .describeTopics(Collections.singletonList(testTopicName))
+ .allTopicNames()
+ .get()
+ .get(testTopicName)
+ .partitions();
- private List<Integer> getPartitionReplicas(List<TopicPartitionInfo>
partitions, int partitionNumber) {
- return
partitions.get(partitionNumber).replicas().stream().map(Node::id).collect(Collectors.toList());
+ adminClient.close();
+ assertEquals(3, partitions.size(),
+ "Unequal partition size: " + partitions.size());
+ assertEquals(Arrays.asList(5, 4), getPartitionReplicas(partitions,
0),
+ "Unexpected replica assignment: " +
getPartitionReplicas(partitions, 0));
+ assertEquals(Arrays.asList(3, 2), getPartitionReplicas(partitions,
1),
+ "Unexpected replica assignment: " +
getPartitionReplicas(partitions, 1));
+ assertEquals(Arrays.asList(1, 0), getPartitionReplicas(partitions,
2),
+ "Unexpected replica assignment: " +
getPartitionReplicas(partitions, 2));
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testCreateWithInvalidReplicationFactor(String quorum) {
- TopicCommand.TopicCommandOptions opts =
buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "2",
"--replication-factor", Integer.toString(Short.MAX_VALUE + 1),
- "--topic", testTopicName);
- assertThrows(IllegalArgumentException.class, () ->
topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
+ @ClusterTemplate("generate1")
+ public void testCreateWithInvalidReplicationFactor(TestInfo testInfo)
throws Exception {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ try (Admin adminClient = clusterInstance.createAdminClient();
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
+
+ TopicCommand.TopicCommandOptions opts =
buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "2",
"--replication-factor", Integer.toString(Short.MAX_VALUE + 1),
+ "--topic", testTopicName);
+ assertThrows(IllegalArgumentException.class, () ->
topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testCreateWithNegativeReplicationFactor(String quorum) {
- TopicCommand.TopicCommandOptions opts =
buildTopicCommandOptionsWithBootstrap("--create",
- "--partitions", "2", "--replication-factor", "-1", "--topic",
testTopicName);
- assertThrows(IllegalArgumentException.class, () ->
topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
+ @ClusterTemplate("generate1")
+ public void testCreateWithNegativeReplicationFactor(TestInfo testInfo)
throws Exception {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ try (Admin adminClient = clusterInstance.createAdminClient();
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
+ TopicCommand.TopicCommandOptions opts =
buildTopicCommandOptionsWithBootstrap("--create",
+ "--partitions", "2", "--replication-factor", "-1",
"--topic", testTopicName);
+ assertThrows(IllegalArgumentException.class, () ->
topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testCreateWithNegativePartitionCount(String quorum) {
- TopicCommand.TopicCommandOptions opts =
buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "-1",
"--replication-factor", "1", "--topic", testTopicName);
- assertThrows(IllegalArgumentException.class, () ->
topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
+ @ClusterTemplate("generate1")
+ public void testCreateWithNegativePartitionCount(TestInfo testInfo) throws
Exception {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ try (Admin adminClient = clusterInstance.createAdminClient();
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
+ TopicCommand.TopicCommandOptions opts =
buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "-1",
"--replication-factor", "1", "--topic", testTopicName);
+ assertThrows(IllegalArgumentException.class, () ->
topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testInvalidTopicLevelConfig(String quorum) {
- TopicCommand.TopicCommandOptions createOpts =
buildTopicCommandOptionsWithBootstrap("--create",
- "--partitions", "1", "--replication-factor", "1", "--topic",
testTopicName,
- "--config", "message.timestamp.type=boom");
- assertThrows(ConfigException.class, () ->
topicService.createTopic(createOpts), "Expected ConfigException to throw");
+ @ClusterTemplate("generate1")
+ public void testInvalidTopicLevelConfig(TestInfo testInfo) {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ try (Admin adminClient = clusterInstance.createAdminClient()) {
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient);
+
+ TopicCommand.TopicCommandOptions createOpts =
buildTopicCommandOptionsWithBootstrap("--create",
+ "--partitions", "1", "--replication-factor", "1",
"--topic", testTopicName,
+ "--config", "message.timestamp.type=boom");
+ assertThrows(ConfigException.class, () ->
topicService.createTopic(createOpts), "Expected ConfigException to throw");
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testListTopics(String quorum) {
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
+ @ClusterTemplate("generate1")
+ public void testListTopics(TestInfo testInfo) throws InterruptedException {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ try (Admin adminClient = clusterInstance.createAdminClient()) {
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
+ clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
- String output =
captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list"));
- assertTrue(output.contains(testTopicName), "Expected topic name to be
present in output: " + output);
+ String output =
captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list"));
+ assertTrue(output.contains(testTopicName), "Expected topic name to
be present in output: " + output);
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testListTopicsWithIncludeList(String quorum) {
- String topic1 = "kafka.testTopic1";
- String topic2 = "kafka.testTopic2";
- String topic3 = "oooof.testTopic1";
- int partition = 2;
- short replicationFactor = 2;
- TestUtils.createTopicWithAdmin(adminClient, topic1, scalaBrokers,
scalaControllers, partition, replicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
- TestUtils.createTopicWithAdmin(adminClient, topic2, scalaBrokers,
scalaControllers, partition, replicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
- TestUtils.createTopicWithAdmin(adminClient, topic3, scalaBrokers,
scalaControllers, partition, replicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
+ @ClusterTemplate("generate1")
+ public void testListTopicsWithIncludeList() throws InterruptedException {
+ try (Admin adminClient = clusterInstance.createAdminClient()) {
+ String topic1 = "kafka.testTopic1";
+ String topic2 = "kafka.testTopic2";
+ String topic3 = "oooof.testTopic1";
+ int partition = 2;
+ short replicationFactor = 2;
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(topic1, partition, replicationFactor)));
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(topic2, partition, replicationFactor)));
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(topic3, partition, replicationFactor)));
+ clusterInstance.waitForTopic(topic1, partition);
+ clusterInstance.waitForTopic(topic2, partition);
+ clusterInstance.waitForTopic(topic3, partition);
+
+ String output =
captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list",
"--topic", "kafka.*"));
+ assertTrue(output.contains(topic1), "Expected topic name " +
topic1 + " to be present in output: " + output);
+ assertTrue(output.contains(topic2), "Expected topic name " +
topic2 + " to be present in output: " + output);
+ assertFalse(output.contains(topic3), "Do not expect topic name " +
topic3 + " to be present in output: " + output);
+ }
+ }
- String output =
captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list",
"--topic", "kafka.*"));
+ @ClusterTemplate("generate1")
+ public void testListTopicsWithExcludeInternal() throws
InterruptedException {
+ try (Admin adminClient = clusterInstance.createAdminClient();) {
+ String topic1 = "kafka.testTopic1";
+ String hiddenConsumerTopic = Topic.GROUP_METADATA_TOPIC_NAME;
+ int partition = 2;
+ short replicationFactor = 2;
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(topic1, partition, replicationFactor)));
+ clusterInstance.waitForTopic(topic1, partition);
+
+ String output =
captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list",
"--exclude-internal"));
+ assertTrue(output.contains(topic1), "Expected topic name " +
topic1 + " to be present in output: " + output);
+ assertFalse(output.contains(hiddenConsumerTopic), "Do not expect
topic name " + hiddenConsumerTopic + " to be present in output: " + output);
+ }
+ }
- assertTrue(output.contains(topic1), "Expected topic name " + topic1 +
" to be present in output: " + output);
- assertTrue(output.contains(topic2), "Expected topic name " + topic2 +
" to be present in output: " + output);
- assertFalse(output.contains(topic3), "Do not expect topic name " +
topic3 + " to be present in output: " + output);
+ @ClusterTemplate("generate1")
+ public void testAlterPartitionCount(TestInfo testInfo) throws Exception {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ try (Admin adminClient = clusterInstance.createAdminClient();
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
+ int partition = 2;
+ short replicationFactor = 2;
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partition, replicationFactor)));
+ clusterInstance.waitForTopic(testTopicName, partition);
+
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter",
"--topic", testTopicName, "--partitions", "3"));
+
+ TestUtils.waitForCondition(
+ () ->
adminClient.listPartitionReassignments().reassignments().get().isEmpty(),
+ 60000, testTopicName + "reassignmet not finished after
60000 ms"
+ );
+
+ TestUtils.waitForCondition(
+ () -> clusterInstance.brokers().values().stream().allMatch(
+ b ->
b.metadataCache().getTopicPartitions(testTopicName).size() == 3),
+ TestUtils.DEFAULT_MAX_WAIT_MS, "Timeout waiting for new
assignment propagating to broker");
+ TopicDescription topicDescription =
adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get();
+ assertEquals(3, topicDescription.partitions().size(), "Expected
partition count to be 3. Got: " + topicDescription.partitions().size());
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testListTopicsWithExcludeInternal(String quorum) {
- String topic1 = "kafka.testTopic1";
- String hiddenConsumerTopic = Topic.GROUP_METADATA_TOPIC_NAME;
- int partition = 2;
- short replicationFactor = 2;
- TestUtils.createTopicWithAdmin(adminClient, topic1, scalaBrokers,
scalaControllers, partition, replicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
- TestUtils.createTopicWithAdmin(adminClient, hiddenConsumerTopic,
scalaBrokers, scalaControllers, partition, replicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
+ @ClusterTemplate("generate1")
+ public void testAlterAssignment(TestInfo testInfo) throws Exception {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ try (Admin adminClient = clusterInstance.createAdminClient();
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
+ int partition = 2;
+ short replicationFactor = 2;
- String output =
captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list",
"--exclude-internal"));
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partition, replicationFactor)));
+ clusterInstance.waitForTopic(testTopicName, partition);
- assertTrue(output.contains(topic1), "Expected topic name " + topic1 +
" to be present in output: " + output);
- assertFalse(output.contains(hiddenConsumerTopic), "Do not expect topic
name " + hiddenConsumerTopic + " to be present in output: " + output);
- }
+
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter",
+ "--topic", testTopicName, "--replica-assignment",
"5:3,3:1,4:2", "--partitions", "3"));
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testAlterPartitionCount(String quorum) throws
ExecutionException, InterruptedException {
- int partition = 2;
- short replicationFactor = 2;
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, partition, replicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
-
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter",
"--topic", testTopicName, "--partitions", "3"));
-
- TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L);
- kafka.utils.TestUtils.waitUntilTrue(
- () -> brokers().forall(b ->
b.metadataCache().getTopicPartitions(testTopicName).size() == 3),
- () -> "Timeout waiting for new assignment propagating to broker",
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L);
- TopicDescription topicDescription =
adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get();
- assertEquals(3, topicDescription.partitions().size(), "Expected
partition count to be 3. Got: " + topicDescription.partitions().size());
- }
+ TestUtils.waitForCondition(
+ () ->
adminClient.listPartitionReassignments().reassignments().get().isEmpty(),
+ 60000, testTopicName + "reassignmet not finished after
60000 ms"
+ );
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testAlterAssignment(String quorum) throws ExecutionException,
InterruptedException {
- int partition = 2;
- short replicationFactor = 2;
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, partition, replicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
-
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter",
- "--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2",
"--partitions", "3"));
-
- TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L);
- kafka.utils.TestUtils.waitUntilTrue(
- () -> brokers().forall(b ->
b.metadataCache().getTopicPartitions(testTopicName).size() == 3),
- () -> "Timeout waiting for new assignment propagating to broker",
- org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L);
-
- TopicDescription topicDescription =
adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get();
- assertEquals(3, topicDescription.partitions().size(), "Expected
partition count to be 3. Got: " + topicDescription.partitions().size());
- List<Integer> partitionReplicas =
getPartitionReplicas(topicDescription.partitions(), 2);
- assertEquals(Arrays.asList(4, 2), partitionReplicas, "Expected to have
replicas 4,2. Got: " + partitionReplicas);
+ TestUtils.waitForCondition(
+ () -> clusterInstance.brokers().values().stream().allMatch(
+ b ->
b.metadataCache().getTopicPartitions(testTopicName).size() == 3),
+ TestUtils.DEFAULT_MAX_WAIT_MS, "Timeout waiting for new
assignment propagating to broker");
+
+ TopicDescription topicDescription =
adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get();
+ assertEquals(3, topicDescription.partitions().size(), "Expected
partition count to be 3. Got: " + topicDescription.partitions().size());
+ List<Integer> partitionReplicas =
getPartitionReplicas(topicDescription.partitions(), 2);
+ assertEquals(Arrays.asList(4, 2), partitionReplicas, "Expected to
have replicas 4,2. Got: " + partitionReplicas);
+
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testAlterAssignmentWithMoreAssignmentThanPartitions(String
quorum) {
- int partition = 2;
- short replicationFactor = 2;
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, partition, replicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
- assertThrows(ExecutionException.class,
- () ->
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter",
- "--topic", testTopicName, "--replica-assignment",
"5:3,3:1,4:2,3:2", "--partitions", "3")),
- "Expected to fail with ExecutionException");
+ @ClusterTemplate("generate1")
+ public void testAlterAssignmentWithMoreAssignmentThanPartitions(TestInfo
testInfo) throws Exception {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ try (Admin adminClient = clusterInstance.createAdminClient();
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
+
+ int partition = 2;
+ short replicationFactor = 2;
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partition, replicationFactor)));
+ clusterInstance.waitForTopic(testTopicName, partition);
+
+ assertThrows(ExecutionException.class,
+ () ->
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter",
+ "--topic", testTopicName, "--replica-assignment",
"5:3,3:1,4:2,3:2", "--partitions", "3")),
+ "Expected to fail with ExecutionException");
+
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testAlterAssignmentWithMorePartitionsThanAssignment(String
quorum) {
- int partition = 2;
- short replicationFactor = 2;
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, partition, replicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
+ @ClusterTemplate("generate1")
+ public void testAlterAssignmentWithMorePartitionsThanAssignment(TestInfo
testInfo) throws Exception {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ try (Admin adminClient = clusterInstance.createAdminClient();
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
+ int partition = 2;
+ short replicationFactor = 2;
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partition, replicationFactor)));
+ clusterInstance.waitForTopic(testTopicName, partition);
+
+ assertThrows(ExecutionException.class,
+ () ->
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter",
"--topic", testTopicName,
+ "--replica-assignment", "5:3,3:1,4:2",
"--partitions", "6")),
+ "Expected to fail with ExecutionException");
- assertThrows(ExecutionException.class,
- () ->
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter",
"--topic", testTopicName,
- "--replica-assignment", "5:3,3:1,4:2", "--partitions", "6")),
- "Expected to fail with ExecutionException");
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testAlterWithInvalidPartitionCount(String quorum) {
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
- assertThrows(ExecutionException.class,
- () ->
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter",
"--partitions", "-1", "--topic", testTopicName)),
- "Expected to fail with ExecutionException");
+ @ClusterTemplate("generate1")
+ public void testAlterWithInvalidPartitionCount(TestInfo testInfo) throws
Exception {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+
+ try (Admin adminClient = clusterInstance.createAdminClient();
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
+ clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+
+ assertThrows(ExecutionException.class,
+ () ->
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter",
"--partitions", "-1", "--topic", testTopicName)),
+ "Expected to fail with ExecutionException");
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testAlterWhenTopicDoesntExist(String quorum) {
- // alter a topic that does not exist without --if-exists
- TopicCommand.TopicCommandOptions alterOpts =
buildTopicCommandOptionsWithBootstrap("--alter", "--topic", testTopicName,
"--partitions", "1");
- TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient);
- assertThrows(IllegalArgumentException.class, () ->
topicService.alterTopic(alterOpts), "Expected to fail with
IllegalArgumentException");
+ @ClusterTemplate("generate1")
+ public void testAlterWhenTopicDoesntExist(TestInfo testInfo) throws
Exception {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+
+ try (Admin adminClient = clusterInstance.createAdminClient();
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
+ // alter a topic that does not exist without --if-exists
+ TopicCommand.TopicCommandOptions alterOpts =
buildTopicCommandOptionsWithBootstrap("--alter", "--topic", testTopicName,
"--partitions", "1");
+ assertThrows(IllegalArgumentException.class, () ->
topicService.alterTopic(alterOpts), "Expected to fail with
IllegalArgumentException");
+
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testAlterWhenTopicDoesntExistWithIfExists(String quorum)
throws ExecutionException, InterruptedException {
+ @ClusterTemplate("generate1")
+ public void testAlterWhenTopicDoesntExistWithIfExists(TestInfo testInfo)
throws Exception {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ Admin adminClient = clusterInstance.createAdminClient();
+
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient);
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter",
"--topic", testTopicName, "--partitions", "1", "--if-exists"));
+ adminClient.close();
+ topicService.close();
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testCreateAlterTopicWithRackAware(String quorum) throws
Exception {
- Map<Integer, String> rackInfo = new HashMap<>();
- rackInfo.put(0, "rack1");
- rackInfo.put(1, "rack2");
- rackInfo.put(2, "rack2");
- rackInfo.put(3, "rack1");
- rackInfo.put(4, "rack3");
- rackInfo.put(5, "rack3");
-
- int numPartitions = 18;
- int replicationFactor = 3;
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, numPartitions, replicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
+ @ClusterTemplate("generate1")
+ public void testCreateAlterTopicWithRackAware(TestInfo testInfo) throws
Exception {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ try (Admin adminClient = clusterInstance.createAdminClient();
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
+
+ Map<Integer, String> rackInfo = new HashMap<>();
+ rackInfo.put(0, "rack1");
+ rackInfo.put(1, "rack2");
+ rackInfo.put(2, "rack2");
+ rackInfo.put(3, "rack1");
+ rackInfo.put(4, "rack3");
+ rackInfo.put(5, "rack3");
+
+ int numPartitions = 18;
+ int replicationFactor = 3;
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, numPartitions, (short) replicationFactor)));
+ clusterInstance.waitForTopic(testTopicName, numPartitions);
+
+ Map<Integer, List<Integer>> assignment =
adminClient.describeTopics(Collections.singletonList(testTopicName))
+ .allTopicNames().get().get(testTopicName).partitions()
+ .stream()
+ .collect(Collectors.toMap(
+ info -> info.partition(),
+ info ->
info.replicas().stream().map(Node::id).collect(Collectors.toList())));
+ checkReplicaDistribution(assignment, rackInfo, rackInfo.size(),
numPartitions,
+ replicationFactor, true, true, true);
+
+ int alteredNumPartitions = 36;
+ // verify that adding partitions will also be rack aware
+ TopicCommand.TopicCommandOptions alterOpts =
buildTopicCommandOptionsWithBootstrap("--alter",
+ "--partitions", Integer.toString(alteredNumPartitions),
+ "--topic", testTopicName);
+ topicService.alterTopic(alterOpts);
+
+ TestUtils.waitForCondition(
+ () ->
adminClient.listPartitionReassignments().reassignments().get().isEmpty(),
+ 60000, testTopicName + "reassignmet not finished after
60000 ms"
+ );
+ TestUtils.waitForCondition(
+ () ->
clusterInstance.brokers().values().stream().allMatch(p ->
p.metadataCache().getTopicPartitions(testTopicName).size() ==
alteredNumPartitions),
+ TestUtils.DEFAULT_MAX_WAIT_MS, "Timeout waiting for new
assignment propagating to broker");
+
+ assignment =
adminClient.describeTopics(Collections.singletonList(testTopicName))
+
.allTopicNames().get().get(testTopicName).partitions().stream()
+ .collect(Collectors.toMap(info -> info.partition(), info
-> info.replicas().stream().map(Node::id).collect(Collectors.toList())));
+ checkReplicaDistribution(assignment, rackInfo, rackInfo.size(),
alteredNumPartitions, replicationFactor,
+ true, true, true);
- Map<Integer, List<Integer>> assignment =
adminClient.describeTopics(Collections.singletonList(testTopicName))
- .allTopicNames().get().get(testTopicName).partitions()
- .stream()
- .collect(Collectors.toMap(
- info -> info.partition(),
- info ->
info.replicas().stream().map(Node::id).collect(Collectors.toList())));
- checkReplicaDistribution(assignment, rackInfo, rackInfo.size(),
numPartitions,
- replicationFactor, true, true, true);
-
- int alteredNumPartitions = 36;
- // verify that adding partitions will also be rack aware
- TopicCommand.TopicCommandOptions alterOpts =
buildTopicCommandOptionsWithBootstrap("--alter",
- "--partitions", Integer.toString(alteredNumPartitions),
- "--topic", testTopicName);
- topicService.alterTopic(alterOpts);
-
- TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L);
- kafka.utils.TestUtils.waitUntilTrue(
- () -> brokers().forall(p ->
p.metadataCache().getTopicPartitions(testTopicName).size() ==
alteredNumPartitions),
- () -> "Timeout waiting for new assignment propagating to broker",
- org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L);
-
- assignment =
adminClient.describeTopics(Collections.singletonList(testTopicName))
- .allTopicNames().get().get(testTopicName).partitions().stream()
- .collect(Collectors.toMap(info -> info.partition(), info ->
info.replicas().stream().map(Node::id).collect(Collectors.toList())));
- checkReplicaDistribution(assignment, rackInfo, rackInfo.size(),
alteredNumPartitions, replicationFactor,
- true, true, true);
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testConfigPreservationAcrossPartitionAlteration(String quorum)
throws Exception {
- String cleanUpPolicy = "compact";
- Properties topicConfig = new Properties();
- topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanUpPolicy);
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), topicConfig
- );
-
- ConfigResource configResource = new
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
- Config props =
adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource);
- // val props = adminZkClient.fetchEntityConfig(ConfigType.Topic,
testTopicName)
- assertNotNull(props.get(TopicConfig.CLEANUP_POLICY_CONFIG),
"Properties after creation don't contain " + cleanUpPolicy);
- assertEquals(cleanUpPolicy,
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).value(), "Properties after
creation have incorrect value");
+ @ClusterTemplate("generate1")
+ public void testConfigPreservationAcrossPartitionAlteration(TestInfo
testInfo) throws Exception {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ try (Admin adminClient = clusterInstance.createAdminClient();
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
+
+ String cleanUpPolicy = "compact";
+ HashMap<String, String> topicConfig = new HashMap<>();
+ topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanUpPolicy);
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions,
defaultReplicationFactor).configs(topicConfig)));
+ clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+
+ ConfigResource configResource = new
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
+ Config props =
adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource);
+ assertNotNull(props.get(TopicConfig.CLEANUP_POLICY_CONFIG),
"Properties after creation don't contain " + cleanUpPolicy);
+ assertEquals(cleanUpPolicy,
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).value(), "Properties after
creation have incorrect value");
+
+ // modify the topic to add new partitions
+ int numPartitionsModified = 3;
+ TopicCommand.TopicCommandOptions alterOpts =
buildTopicCommandOptionsWithBootstrap("--alter",
+ "--partitions", Integer.toString(numPartitionsModified),
"--topic", testTopicName);
+ topicService.alterTopic(alterOpts);
+
+ TestUtils.waitForCondition(
+ () ->
clusterInstance.brokers().values().stream().allMatch(p ->
p.metadataCache().getTopicPartitions(testTopicName).size() ==
numPartitionsModified),
+ TestUtils.DEFAULT_MAX_WAIT_MS, "Timeout waiting for new
assignment propagating to broker");
+
+ Config newProps =
adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource);
+ assertNotNull(newProps.get(TopicConfig.CLEANUP_POLICY_CONFIG),
"Updated properties do not contain " + TopicConfig.CLEANUP_POLICY_CONFIG);
+ assertEquals(cleanUpPolicy,
newProps.get(TopicConfig.CLEANUP_POLICY_CONFIG).value(), "Updated properties
have incorrect value");
- // pre-create the topic config changes path to avoid a NoNodeException
- if (!isKRaftTest()) {
-
zkClient().makeSurePersistentPathExists(kafka.zk.ConfigEntityChangeNotificationZNode.path());
}
+ }
- // modify the topic to add new partitions
- int numPartitionsModified = 3;
- TopicCommand.TopicCommandOptions alterOpts =
buildTopicCommandOptionsWithBootstrap("--alter",
- "--partitions", Integer.toString(numPartitionsModified),
"--topic", testTopicName);
- topicService.alterTopic(alterOpts);
+ @ClusterTemplate("generate1")
+ public void testTopicDeletion(TestInfo testInfo) throws Exception {
+ try (Admin adminClient = clusterInstance.createAdminClient();
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
+ String testTopicName = testInfo.getTestMethod().get().getName() +
"-" +
+ TestUtils.randomString(10);
- TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L);
- Config newProps =
adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource);
- assertNotNull(newProps.get(TopicConfig.CLEANUP_POLICY_CONFIG),
"Updated properties do not contain " + TopicConfig.CLEANUP_POLICY_CONFIG);
- assertEquals(cleanUpPolicy,
newProps.get(TopicConfig.CLEANUP_POLICY_CONFIG).value(), "Updated properties
have incorrect value");
- }
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
+ clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testTopicDeletion(String quorum) throws Exception {
- // create the NormalTopic
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
- // delete the NormalTopic
- TopicCommand.TopicCommandOptions deleteOpts =
buildTopicCommandOptionsWithBootstrap("--delete", "--topic", testTopicName);
+ // delete the NormalTopic
+ TopicCommand.TopicCommandOptions deleteOpts =
buildTopicCommandOptionsWithBootstrap("--delete", "--topic", testTopicName);
+ topicService.deleteTopic(deleteOpts);
- if (!isKRaftTest()) {
- String deletePath =
kafka.zk.DeleteTopicsTopicZNode.path(testTopicName);
- assertFalse(zkClient().pathExists(deletePath), "Delete path for
topic shouldn't exist before deletion.");
+ TestUtils.waitForCondition(
+ () ->
adminClient.listTopics().listings().get().stream().noneMatch(topic ->
topic.name().equals(testTopicName)),
+ 60000, "Delete topic fail in 60000 ms"
+ );
}
- topicService.deleteTopic(deleteOpts);
- TestUtils.verifyTopicDeletion(zkClientOrNull(), testTopicName, 1,
brokers());
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testTopicWithCollidingCharDeletionAndCreateAgain(String
quorum) throws Exception {
- // create the topic with colliding chars
- String topicWithCollidingChar = "test.a";
- TestUtils.createTopicWithAdmin(adminClient, topicWithCollidingChar,
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
- // delete the topic
- TopicCommand.TopicCommandOptions deleteOpts =
buildTopicCommandOptionsWithBootstrap("--delete", "--topic",
topicWithCollidingChar);
-
- if (!isKRaftTest()) {
- String deletePath =
kafka.zk.DeleteTopicsTopicZNode.path(topicWithCollidingChar);
- assertFalse(zkClient().pathExists(deletePath), "Delete path for
topic shouldn't exist before deletion.");
+ @ClusterTemplate("generate1")
+ public void testTopicWithCollidingCharDeletionAndCreateAgain() throws
Exception {
+ try (Admin adminClient = clusterInstance.createAdminClient();
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
+ // create the topic with colliding chars
+ String topicWithCollidingChar = "test.a";
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(topicWithCollidingChar, defaultNumPartitions,
defaultReplicationFactor)));
+ clusterInstance.waitForTopic(topicWithCollidingChar,
defaultNumPartitions);
+
+ // delete the topic
+ TopicCommand.TopicCommandOptions deleteOpts =
buildTopicCommandOptionsWithBootstrap("--delete", "--topic",
topicWithCollidingChar);
+ topicService.deleteTopic(deleteOpts);
+ TestUtils.waitForCondition(
+ () ->
adminClient.listTopics().listings().get().stream().noneMatch(topic ->
topic.name().equals(topicWithCollidingChar)),
+ 60000, "Delete topic fail in 60000 ms"
+ );
+
+ clusterInstance.waitTopicDeletion(topicWithCollidingChar);
+
+ // recreate same topic
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(topicWithCollidingChar, defaultNumPartitions,
defaultReplicationFactor)));
+ clusterInstance.waitForTopic(topicWithCollidingChar,
defaultNumPartitions);
}
- topicService.deleteTopic(deleteOpts);
- TestUtils.verifyTopicDeletion(zkClientOrNull(),
topicWithCollidingChar, 1, brokers());
- assertDoesNotThrow(() -> TestUtils.createTopicWithAdmin(adminClient,
topicWithCollidingChar, scalaBrokers, scalaControllers, defaultNumPartitions,
defaultReplicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- ), "Should be able to create a topic with colliding chars after
deletion.");
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDeleteInternalTopic(String quorum) throws Exception {
- // create the offset topic
- TestUtils.createTopicWithAdmin(adminClient,
Topic.GROUP_METADATA_TOPIC_NAME, scalaBrokers, scalaControllers,
defaultNumPartitions, defaultReplicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
- // Try to delete the Topic.GROUP_METADATA_TOPIC_NAME which is allowed
by default.
- // This is a difference between the new and the old command as the old
one didn't allow internal topic deletion.
- // If deleting internal topics is not desired, ACLS should be used to
control it.
- TopicCommand.TopicCommandOptions deleteOffsetTopicOpts =
- buildTopicCommandOptionsWithBootstrap("--delete", "--topic",
Topic.GROUP_METADATA_TOPIC_NAME);
- String deleteOffsetTopicPath =
kafka.zk.DeleteTopicsTopicZNode.path(Topic.GROUP_METADATA_TOPIC_NAME);
- if (!isKRaftTest()) {
- assertFalse(zkClient().pathExists(deleteOffsetTopicPath), "Delete
path for topic shouldn't exist before deletion.");
+ @ClusterTemplate("generate1")
+ public void testDeleteInternalTopic() throws Exception {
+ try (Admin adminClient = clusterInstance.createAdminClient();
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
+
+ // create the offset topic
+ // In ZK mode, Topic.GROUP_METADATA_TOPIC_NAME exist when cluster
is created.
+ if (clusterInstance.isKRaftTest()) {
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, defaultNumPartitions,
defaultReplicationFactor)));
+ clusterInstance.waitForTopic(Topic.GROUP_METADATA_TOPIC_NAME,
defaultNumPartitions);
+ }
+
+ // Try to delete the Topic.GROUP_METADATA_TOPIC_NAME which is
allowed by default.
+ // This is a difference between the new and the old command as the
old one didn't allow internal topic deletion.
+ // If deleting internal topics is not desired, ACLS should be used
to control it.
+ TopicCommand.TopicCommandOptions deleteOffsetTopicOpts =
+ buildTopicCommandOptionsWithBootstrap("--delete",
"--topic", Topic.GROUP_METADATA_TOPIC_NAME);
+
+ topicService.deleteTopic(deleteOffsetTopicOpts);
+ TestUtils.waitForCondition(
+ () ->
adminClient.listTopics().listings().get().stream().noneMatch(topic ->
topic.name().equals(Topic.GROUP_METADATA_TOPIC_NAME)),
+ 60000, "Delete topic fail in 60000 ms"
+ );
+
}
- topicService.deleteTopic(deleteOffsetTopicOpts);
- TestUtils.verifyTopicDeletion(zkClientOrNull(),
Topic.GROUP_METADATA_TOPIC_NAME, defaultNumPartitions, brokers());
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDeleteWhenTopicDoesntExist(String quorum) {
- // delete a topic that does not exist
- TopicCommand.TopicCommandOptions deleteOpts =
buildTopicCommandOptionsWithBootstrap("--delete", "--topic", testTopicName);
- assertThrows(IllegalArgumentException.class, () ->
topicService.deleteTopic(deleteOpts),
- "Expected an exception when trying to delete a topic that does
not exist.");
+ @ClusterTemplate("generate1")
+ public void testDeleteWhenTopicDoesntExist(TestInfo testInfo) throws
Exception {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ try (Admin adminClient = clusterInstance.createAdminClient();
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
+ // delete a topic that does not exist
+ TopicCommand.TopicCommandOptions deleteOpts =
buildTopicCommandOptionsWithBootstrap("--delete", "--topic", testTopicName);
+ assertThrows(IllegalArgumentException.class, () ->
topicService.deleteTopic(deleteOpts),
+ "Expected an exception when trying to delete a topic that
does not exist.");
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDeleteWhenTopicDoesntExistWithIfExists(String quorum)
throws ExecutionException, InterruptedException {
-
topicService.deleteTopic(buildTopicCommandOptionsWithBootstrap("--delete",
"--topic", testTopicName, "--if-exists"));
+ @ClusterTemplate("generate1")
+ public void testDeleteWhenTopicDoesntExistWithIfExists(TestInfo testInfo)
throws Exception {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ try (Admin adminClient = clusterInstance.createAdminClient();
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient);) {
+
topicService.deleteTopic(buildTopicCommandOptionsWithBootstrap("--delete",
"--topic", testTopicName, "--if-exists"));
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDescribe(String quorum) {
- int partition = 2;
- short replicationFactor = 2;
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, partition, replicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
- String output =
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
"--topic", testTopicName));
- String[] rows = output.split(System.lineSeparator());
- assertEquals(3, rows.length, "Expected 3 rows in output, got " +
rows.length);
- assertTrue(rows[0].startsWith(String.format("Topic: %s",
testTopicName)), "Row does not start with " + testTopicName + ". Row is: " +
rows[0]);
+ @ClusterTemplate("generate1")
+ public void testDescribe(TestInfo testInfo) throws InterruptedException {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ try (Admin adminClient = clusterInstance.createAdminClient()) {
+ int partition = 2;
+ short replicationFactor = 2;
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partition, replicationFactor)));
+ clusterInstance.waitForTopic(testTopicName, partition);
+
+ String output =
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
"--topic", testTopicName));
+ String[] rows = output.split(System.lineSeparator());
+ assertEquals(3, rows.length, "Expected 3 rows in output, got " +
rows.length);
+ assertTrue(rows[0].startsWith(String.format("Topic: %s",
testTopicName)), "Row does not start with " + testTopicName + ". Row is: " +
rows[0]);
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"quorum=zk", "quorum=kraft"})
- public void testDescribeWithDescribeTopicPartitionsApi(String quorum)
throws ExecutionException, InterruptedException {
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, 20, (short) 2,
- scala.collection.immutable.Map$.MODULE$.empty(), new Properties()
- );
- TestUtils.createTopicWithAdmin(adminClient, "test-2", scalaBrokers,
scalaControllers, 41, (short) 2,
- scala.collection.immutable.Map$.MODULE$.empty(), new Properties()
- );
- TestUtils.createTopicWithAdmin(adminClient, "test-3", scalaBrokers,
scalaControllers, 5, (short) 2,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
- TestUtils.createTopicWithAdmin(adminClient, "test-4", scalaBrokers,
scalaControllers, 5, (short) 2,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
- TestUtils.createTopicWithAdmin(adminClient, "test-5", scalaBrokers,
scalaControllers, 100, (short) 2,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
+ @ClusterTemplate("generate1")
+ public void testDescribeWithDescribeTopicPartitionsApi(TestInfo testInfo)
throws InterruptedException {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+
+ try (Admin adminClient = clusterInstance.createAdminClient()) {
+
+ List<NewTopic> topics = new ArrayList<>();
+ topics.add(new NewTopic(testTopicName, 20, (short) 2));
+ topics.add(new NewTopic("test-2", 41, (short) 2));
+ topics.add(new NewTopic("test-3", 5, (short) 2));
+ topics.add(new NewTopic("test-4", 5, (short) 2));
+ topics.add(new NewTopic("test-5", 100, (short) 2));
+
+ adminClient.createTopics(topics);
+ clusterInstance.waitForTopic(testTopicName, 20);
+ clusterInstance.waitForTopic("test-2", 41);
+ clusterInstance.waitForTopic("test-3", 5);
+ clusterInstance.waitForTopic("test-4", 5);
+ clusterInstance.waitForTopic("test-5", 100);
+
+ String output =
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap(
+ "--describe", "--partition-size-limit-per-response=20",
"--exclude-internal"));
+ String[] rows = output.split("\n");
+
+ assertEquals(176, rows.length, String.join("\n", rows));
+ assertTrue(rows[2].contains("\tElr"), rows[2]);
+ assertTrue(rows[2].contains("LastKnownElr"), rows[2]);
- String output =
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap(
- "--describe", "--partition-size-limit-per-response=20"));
- String[] rows = output.split("\n");
- assertEquals(176, rows.length, String.join("\n", rows));
- assertTrue(rows[2].contains("\tElr"), rows[2]);
- assertTrue(rows[2].contains("LastKnownElr"), rows[2]);
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDescribeWhenTopicDoesntExist(String quorum) {
- assertThrows(IllegalArgumentException.class,
- () ->
topicService.describeTopic(buildTopicCommandOptionsWithBootstrap("--describe",
"--topic", testTopicName)),
- "Expected an exception when trying to describe a topic that
does not exist.");
+ @ClusterTemplate("generate1")
+ public void testDescribeWhenTopicDoesntExist(TestInfo testInfo) {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ try (Admin adminClient = clusterInstance.createAdminClient()) {
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient);
+
+ assertThrows(IllegalArgumentException.class,
+ () ->
topicService.describeTopic(buildTopicCommandOptionsWithBootstrap("--describe",
"--topic", testTopicName)),
+ "Expected an exception when trying to describe a topic
that does not exist.");
+ }
+
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDescribeWhenTopicDoesntExistWithIfExists(String quorum)
throws ExecutionException, InterruptedException {
-
topicService.describeTopic(buildTopicCommandOptionsWithBootstrap("--describe",
"--topic", testTopicName, "--if-exists"));
+ @ClusterTemplate("generate1")
+ public void testDescribeWhenTopicDoesntExistWithIfExists(TestInfo
testInfo) throws Exception {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+ try (Admin adminClient = clusterInstance.createAdminClient()) {
+ TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient);
+
+
topicService.describeTopic(buildTopicCommandOptionsWithBootstrap("--describe",
"--topic", testTopicName, "--if-exists"));
+
+ adminClient.close();
+ topicService.close();
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDescribeUnavailablePartitions(String quorum) throws
ExecutionException, InterruptedException {
- int partitions = 6;
- short replicationFactor = 1;
- TestUtils.createTopicWithAdmin(adminClient, testTopicName,
scalaBrokers, scalaControllers, partitions, replicationFactor,
- scala.collection.immutable.Map$.MODULE$.empty(), new
Properties()
- );
- try {
+ @ClusterTemplate("generate1")
+ public void testDescribeUnavailablePartitions(TestInfo testInfo) throws
ExecutionException, InterruptedException {
+ String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+ TestUtils.randomString(10);
+
+ try (Admin adminClient = clusterInstance.createAdminClient()) {
+ int partitions = 6;
+ short replicationFactor = 1;
+
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(testTopicName, partitions, replicationFactor)));
+ clusterInstance.waitForTopic(testTopicName, partitions);
+
// check which partition is on broker 0 which we'll kill
TopicDescription testTopicDescription =
adminClient.describeTopics(Collections.singletonList(testTopicName))
- .allTopicNames().get().get(testTopicName);
+ .allTopicNames().get().get(testTopicName);
int partitionOnBroker0 = testTopicDescription.partitions().stream()
Review Comment:
this variable is unused. could you please remove it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]