dengziming commented on code in PR #14846:
URL: https://github.com/apache/kafka/pull/14846#discussion_r1440196006
##########
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##########
@@ -187,164 +197,233 @@ class DeleteTopicTest extends QuorumTestHarness {
}.toSet
}
- @Test
- def testIncreasePartitionCountDuringDeleteTopic(): Unit = {
- val topic = "test"
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testIncreasePartitionCountDuringDeleteTopic(quorum: String): Unit = {
val topicPartition = new TopicPartition(topic, 0)
- val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false)
- brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
- // create brokers
- val allServers = brokerConfigs.map(b =>
TestUtils.createServer(KafkaConfig.fromProps(b)))
- this.servers = allServers
- val servers = allServers.filter(s =>
expectedReplicaAssignment(0).contains(s.config.brokerId))
- // create the topic
- TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers)
- // wait until replica log is created on every broker
- TestUtils.waitUntilTrue(() =>
servers.forall(_.getLogManager.getLog(topicPartition).isDefined),
- "Replicas for topic test not created.")
- // shutdown a broker to make sure the following topic deletion will be
suspended
- val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition)
- assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition
[test,0]")
- val follower = servers.filter(s => s.config.brokerId !=
leaderIdOpt.get).last
- follower.shutdown()
- // start topic deletion
- adminZkClient.deleteTopic(topic)
- // make sure deletion of all of the topic's replicas have been tried
- ensureControllerExists()
- val (controller, controllerId) = getController()
- val allReplicasForTopic = getAllReplicasFromAssignment(topic,
expectedReplicaAssignment)
- TestUtils.waitUntilTrue(() => {
- val replicasInDeletionSuccessful =
controller.kafkaController.controllerContext.replicasInState(topic,
ReplicaDeletionSuccessful)
- val offlineReplicas =
controller.kafkaController.controllerContext.replicasInState(topic,
OfflineReplica)
- allReplicasForTopic == (replicasInDeletionSuccessful union
offlineReplicas)
- }, s"Not all replicas for topic $topic are in states of either
ReplicaDeletionSuccessful or OfflineReplica")
+ if (isKRaftTest()) {
+ val topicPartition = new TopicPartition(topic, 0)
+ val allBrokers = createTestTopicAndCluster(topic, 4, deleteTopicEnabled
= true)
+ this.brokers = allBrokers
+ val partitionHostingBrokers = allBrokers.filter(b =>
expectedReplicaAssignment(0).contains(b.config.brokerId))
+
+ // wait until replica log is created on every broker
+ TestUtils.waitUntilTrue(() =>
partitionHostingBrokers.forall(_.logManager.getLog(topicPartition).isDefined),
+ "Replicas for topic test not created.")
+
+ // shutdown a broker to make sure the following topic deletion will be
suspended
+ val leaderIdOpt =
TestUtils.waitUntilLeaderIsKnown(partitionHostingBrokers, topicPartition)
+ val follower = partitionHostingBrokers.filter(s => s.config.brokerId !=
leaderIdOpt).last
+ follower.shutdown()
+ // start topic deletion
+ admin.deleteTopics(Collections.singletonList(topic)).all().get()
+
+ // increase the partition count for topic
+ val props = new Properties()
+ props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
TestUtils.plaintextBootstrapServers(partitionHostingBrokers))
+ TestUtils.resource(Admin.create(props)) { adminClient =>
+ try {
+ adminClient.createPartitions(Map(topic ->
NewPartitions.increaseTo(2)).asJava).all().get()
+ } catch {
+ case _: ExecutionException =>
+ }
+ }
- // increase the partition count for topic
- val props = new Properties()
- props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
TestUtils.plaintextBootstrapServers(servers))
- TestUtils.resource(Admin.create(props)) { adminClient =>
- try {
- adminClient.createPartitions(Map(topic ->
NewPartitions.increaseTo(2)).asJava).all().get()
- } catch {
- case _: ExecutionException =>
+ // bring back the failed broker
+ follower.startup()
+ TestUtils.verifyTopicDeletion(null, topic, 2, partitionHostingBrokers)
+ } else {
+ val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false)
+ brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
+ // create brokers
+ val allServers = brokerConfigs.map(b =>
TestUtils.createServer(KafkaConfig.fromProps(b)))
+ this.servers = allServers
+ val partitionHostingServers = allServers.filter(s =>
expectedReplicaAssignment(0).contains(s.config.brokerId))
+ // create the topic
+ TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment,
partitionHostingServers)
+ // wait until replica log is created on every broker
+ TestUtils.waitUntilTrue(() =>
partitionHostingServers.forall(_.getLogManager.getLog(topicPartition).isDefined),
+ "Replicas for topic test not created.")
+
+ // shutdown a broker to make sure the following topic deletion will be
suspended
+ val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition)
+ assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition
[test,0]")
+ val follower = partitionHostingServers.filter(s => s.config.brokerId !=
leaderIdOpt.get).last
+ follower.shutdown()
+ // start topic deletion
+ adminZkClient.deleteTopic(topic)
+
+ // make sure deletion of all of the topic's replicas have been tried
+ ensureControllerExists()
+ val (controller, controllerId) = getController()
+ val allReplicasForTopic = getAllReplicasFromAssignment(topic,
expectedReplicaAssignment)
+ TestUtils.waitUntilTrue(() => {
+ val replicasInDeletionSuccessful =
controller.kafkaController.controllerContext.replicasInState(topic,
ReplicaDeletionSuccessful)
+ val offlineReplicas =
controller.kafkaController.controllerContext.replicasInState(topic,
OfflineReplica)
+ allReplicasForTopic == (replicasInDeletionSuccessful union
offlineReplicas)
+ }, s"Not all replicas for topic $topic are in states of either
ReplicaDeletionSuccessful or OfflineReplica")
+
+ // increase the partition count for topic
+ val props = new Properties()
+ props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
TestUtils.plaintextBootstrapServers(partitionHostingServers))
+ TestUtils.resource(Admin.create(props)) { adminClient =>
+ try {
+ adminClient.createPartitions(Map(topic ->
NewPartitions.increaseTo(2)).asJava).all().get()
+ } catch {
+ case _: ExecutionException =>
+ }
}
+ // trigger a controller switch now
+ val previousControllerId = controllerId
+
+ controller.shutdown()
+
+ ensureControllerExists()
+ // wait until a new controller to show up
+ TestUtils.waitUntilTrue(() => {
+ val (newController, newControllerId) = getController()
+ newControllerId != previousControllerId
+ }, "The new controller should not have the failed controller id")
+
+ // bring back the failed brokers
+ follower.startup()
+ controller.startup()
+ TestUtils.verifyTopicDeletion(zkClient, topic, 2,
partitionHostingServers)
}
- // trigger a controller switch now
- val previousControllerId = controllerId
-
- controller.shutdown()
-
- ensureControllerExists()
- // wait until a new controller to show up
- TestUtils.waitUntilTrue(() => {
- val (newController, newControllerId) = getController()
- newControllerId != previousControllerId
- }, "The new controller should not have the failed controller id")
-
- // bring back the failed brokers
- follower.startup()
- controller.startup()
- TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
}
-
- @Test
- def testDeleteTopicDuringAddPartition(): Unit = {
- val topic = "test"
- servers = createTestTopicAndCluster(topic)
- val leaderIdOpt = zkClient.getLeaderForPartition(new TopicPartition(topic,
0))
- assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition
[test,0]")
- val follower = servers.filter(_.config.brokerId != leaderIdOpt.get).last
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteTopicDuringAddPartition(quorum: String): Unit = {
+ brokers = createTestTopicAndCluster(topic)
+ val leaderIdOpt = TestUtils.waitUntilLeaderIsKnown(brokers, new
TopicPartition(topic, 0))
+ val follower = brokers.filter(_.config.brokerId != leaderIdOpt).last
val newPartition = new TopicPartition(topic, 1)
- // capture the brokers before we shutdown so that we don't fail validation
in `addPartitions`
- val brokers = adminZkClient.getBrokerMetadatas()
- follower.shutdown()
- // wait until the broker has been removed from ZK to reduce non-determinism
- TestUtils.waitUntilTrue(() =>
zkClient.getBroker(follower.config.brokerId).isEmpty,
- s"Follower ${follower.config.brokerId} was not removed from ZK")
- // add partitions to topic
- adminZkClient.addPartitions(topic, expectedReplicaFullAssignment, brokers,
2,
- Some(Map(1 -> Seq(0, 1, 2), 2 -> Seq(0, 1, 2))))
+
+ if (isKRaftTest()) {
Review Comment:
We have placed the code from both modes in two different branches, which is
no different from two seperate methods. Can we try to ensure that the same code
is used in both modes as much as possible. for example, the 2 branches both
have a `follower.shutdown()` and both call `increasePartitions`, we can move it
out of it/else and move `adminZkClient.addPartitions` into
`TestUtils.increasePartitions`, WDYT.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]