dengziming commented on code in PR #14846:
URL: https://github.com/apache/kafka/pull/14846#discussion_r1448306793
##########
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##########
@@ -425,8 +539,10 @@ class DeleteTopicTest extends QuorumTestHarness {
*/
val replicaAssignment = Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2))
- val topic = "test"
- servers = createTestTopicAndCluster(topic, true, replicaAssignment)
+ val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnectOrNull,
enableControlledShutdown = false)
Review Comment:
Why not use `createTestTopicAndCluster` directly?
##########
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##########
@@ -19,78 +19,95 @@ package kafka.admin
import java.util
import java.util.concurrent.ExecutionException
import java.util.{Collections, Optional, Properties}
-
import scala.collection.Seq
import kafka.log.UnifiedLog
import kafka.zk.TopicPartitionZNode
-import kafka.utils.TestUtils
-import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness}
+import kafka.utils._
+import kafka.server.{KafkaBroker, KafkaConfig, KafkaServer, QuorumTestHarness}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
-import kafka.common.TopicAlreadyMarkedForDeletionException
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import kafka.controller.{OfflineReplica, PartitionAndReplica,
ReplicaAssignment, ReplicaDeletionSuccessful}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig,
NewPartitionReassignment, NewPartitions}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
+import org.apache.kafka.common.errors.{TopicDeletionDisabledException,
UnknownTopicOrPartitionException}
+import org.apache.kafka.metadata.BrokerState
+
import scala.jdk.CollectionConverters._
class DeleteTopicTest extends QuorumTestHarness {
+ var brokers: Seq[KafkaBroker] = Seq()
+
var servers: Seq[KafkaServer] = Seq()
Review Comment:
I thinks it's unnecessary to keep both servers and brokers, we can use
`KafkaBroker` in most case, or cast it to `KafkaServer` if necessary
##########
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##########
@@ -354,39 +433,54 @@ class DeleteTopicTest extends QuorumTestHarness {
server.logManager.cleaner.awaitCleaned(new TopicPartition(topicName, 0), 0)
// delete topic
- adminZkClient.deleteTopic("test")
- TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers)
+ admin.deleteTopics(Collections.singletonList(topic)).all().get()
+ TestUtils.verifyTopicDeletion(zkClientOrNull, "test", 1, brokers)
}
- @Test
- def testDeleteTopicAlreadyMarkedAsDeleted(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteTopicAlreadyMarkedAsDeleted(quorum: String): Unit = {
val topicPartition = new TopicPartition("test", 0)
val topic = topicPartition.topic
- servers = createTestTopicAndCluster(topic)
+ brokers = createTestTopicAndCluster(topic)
// start topic deletion
- adminZkClient.deleteTopic(topic)
+ admin.deleteTopics(Collections.singletonList(topic)).all().get()
// try to delete topic marked as deleted
- assertThrows(classOf[TopicAlreadyMarkedForDeletionException], () =>
adminZkClient.deleteTopic(topic))
+ // start topic deletion
+ TestUtils.waitUntilTrue(() => {
+ try {
+ admin.deleteTopics(Collections.singletonList(topic)).all().get()
+ false
+ } catch {
+ case e: ExecutionException =>
+ classOf[UnknownTopicOrPartitionException].equals(e.getCause.getClass)
+ }
+ }, s"Topic ${topic} should be marked for deletion or already deleted.")
- TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+ TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers)
}
- private def createTestTopicAndCluster(topic: String, deleteTopicEnabled:
Boolean = true, replicaAssignment: Map[Int, List[Int]] =
expectedReplicaAssignment): Seq[KafkaServer] = {
- val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect,
enableControlledShutdown = false)
+ private def createTestTopicAndCluster(topic: String, numOfConfigs: Int = 3,
deleteTopicEnabled: Boolean = true, replicaAssignment: Map[Int, List[Int]] =
expectedReplicaAssignment): Seq[KafkaBroker] = {
+ val brokerConfigs = TestUtils.createBrokerConfigs(numOfConfigs,
zkConnectOrNull, enableControlledShutdown = false)
brokerConfigs.foreach(_.setProperty("delete.topic.enable",
deleteTopicEnabled.toString))
createTestTopicAndCluster(topic, brokerConfigs, replicaAssignment)
}
- private def createTestTopicAndCluster(topic: String, brokerConfigs:
Seq[Properties], replicaAssignment: Map[Int, List[Int]]): Seq[KafkaServer] = {
+ private def createTestTopicAndCluster(topic: String, brokerConfigs:
Seq[Properties], replicaAssignment: Map[Int, List[Int]]): Seq[KafkaBroker] = {
val topicPartition = new TopicPartition(topic, 0)
// create brokers
- val servers = brokerConfigs.map(b =>
TestUtils.createServer(KafkaConfig.fromProps(b)))
+ val brokers = brokerConfigs.map(b =>
createBroker(KafkaConfig.fromProps(b)))
+
+ admin = TestUtils.createAdminClient(brokers,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
Review Comment:
Add a helper method such as `getAdminClient` in this class, we only create a
new one, if the admin is null or is closed.
##########
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##########
@@ -354,39 +433,54 @@ class DeleteTopicTest extends QuorumTestHarness {
server.logManager.cleaner.awaitCleaned(new TopicPartition(topicName, 0), 0)
// delete topic
- adminZkClient.deleteTopic("test")
- TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers)
+ admin.deleteTopics(Collections.singletonList(topic)).all().get()
+ TestUtils.verifyTopicDeletion(zkClientOrNull, "test", 1, brokers)
}
- @Test
- def testDeleteTopicAlreadyMarkedAsDeleted(): Unit = {
Review Comment:
Let's just keep it unchanged since it's not testable in kraft mode.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]