chia7712 commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1549840889


##########
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala:
##########
@@ -177,6 +201,78 @@ class RaftManagerTest {
     assertFalse(fileLocked(lockPath))
   }
 
+  @Test
+  def testMigratingZkBrokerDeletesMetadataLog(): Unit = {
+    val logDir = Some(TestUtils.tempDir().toPath)
+    val metadataDir = Some(TestUtils.tempDir().toPath)
+    val nodeId = 1
+    val config = createZkBrokerConfig(migrationEnabled = true, nodeId, logDir, 
metadataDir)
+    val raftManager = createRaftManager(
+      new TopicPartition("__cluster_metadata", 0),
+      config
+    )
+    raftManager.shutdown()
+
+    try {
+      KafkaRaftManager.maybeDeleteMetadataLogDir(config)
+      
assertFalse(Files.exists(metadataDir.get.resolve("__cluster_metadata-0")))
+    } catch {
+      case err: Throwable => fail("Failed to delete metadata log", err)
+    }
+    assertTrue(Files.exists(metadataDir.get))
+  }
+
+  @Test
+  def testNonMigratingZkBrokerDeletesMetadataLog(): Unit = {
+    val logDir = Some(TestUtils.tempDir().toPath)
+    val metadataDir = Some(TestUtils.tempDir().toPath)
+    val nodeId = 1
+    // Use this config to create the directory
+    val config1 = createZkBrokerConfig(migrationEnabled = true, nodeId, 
logDir, metadataDir)
+    val raftManager = createRaftManager(
+      new TopicPartition("__cluster_metadata", 0),
+      config1
+    )
+    raftManager.shutdown()
+
+    val config2 = createZkBrokerConfig(migrationEnabled = false, nodeId, 
logDir, metadataDir)
+    try {
+      KafkaRaftManager.maybeDeleteMetadataLogDir(config2)
+      fail("Should have not deleted the metadata log")
+    } catch {
+      case err: Throwable =>
+        assertEquals("Not deleting metadata log dir since migrations are not 
enabled.", err.getMessage)
+        
assertTrue(Files.exists(metadataDir.get.resolve("__cluster_metadata-0")))
+    }
+    assertTrue(Files.exists(metadataDir.get))
+  }
+
+  @Test
+  def testKRaftBrokerDoesNotDeleteMetadataLog(): Unit = {
+    val logDir = Some(TestUtils.tempDir().toPath)
+    val metadataDir = Some(TestUtils.tempDir().toPath)
+    val nodeId = 1
+    val config = createConfig(
+      Set(ProcessRole.BrokerRole),
+      nodeId,
+      logDir,
+      metadataDir
+    )
+    val raftManager = createRaftManager(
+      new TopicPartition("__cluster_metadata", 0),
+      config
+    )
+    raftManager.shutdown()
+
+    try {

Review Comment:
   we can use `assertThrow` to simplify the code. for example:
   ```scala
       assertThrows(classOf[RuntimeException], () => 
KafkaRaftManager.maybeDeleteMetadataLogDir(config))
       assertTrue(Files.exists(metadataDir.get.resolve("__cluster_metadata-0")))
       assertTrue(Files.exists(metadataDir.get))
   ```



##########
core/src/main/scala/kafka/raft/RaftManager.scala:
##########
@@ -69,6 +70,51 @@ object KafkaRaftManager {
 
     lock
   }
+
+  /**
+   * Test if the configured metadata log dir is one of the data log dirs.
+   */
+  def hasDifferentLogDir(config: KafkaConfig): Boolean = {
+    !config
+      .logDirs
+      .map(Paths.get(_).toAbsolutePath)
+      .contains(Paths.get(config.metadataLogDir).toAbsolutePath)
+  }
+
+  /**
+   * Obtain the file lock and delete the metadata log directory completely.
+   *
+   * This is only used by ZK brokers that are in pre-migration or hybrid mode 
of the ZK to KRaft migration.
+   * The rationale for deleting the metadata log in these cases is that it is 
safe to do on brokers and it
+   * it makes recovery from a failed migration much easier. See KAFKA-16463.
+   *
+   * @param config  The broker config
+   */
+  def maybeDeleteMetadataLogDir(config: KafkaConfig): Unit = {
+    // These constraints are enforced in KafkaServer, but repeating them here 
to guard against future callers
+    if (config.processRoles.nonEmpty) {
+      throw new RuntimeException("Not deleting metadata log dir since this 
node is in KRaft mode.")
+    } else if (!config.migrationEnabled) {
+      throw new RuntimeException("Not deleting metadata log dir since 
migrations are not enabled.")
+    } else {
+      val metadataDir = new File(config.metadataLogDir)
+      val logDirName = 
UnifiedLog.logDirName(Topic.CLUSTER_METADATA_TOPIC_PARTITION)
+      val metadataPartitionDir = KafkaRaftManager.createLogDirectory(new 
File(config.metadataLogDir), logDirName)

Review Comment:
   `new File(config.metadataLogDir)` can be replaced by `metadataDir`



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -480,6 +480,81 @@ class ZkMigrationIntegrationTest {
     }
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
+    new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
+    new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+    new ClusterConfigProperty(key = "advertised.listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+    new ClusterConfigProperty(key = "listener.security.protocol.map", value = 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+  ))
+  def testDeleteLogOnStartup(zkCluster: ClusterInstance): Unit = {
+    var admin = zkCluster.createAdminClient()

Review Comment:
   apply `try-finally`?



##########
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala:
##########
@@ -177,6 +201,78 @@ class RaftManagerTest {
     assertFalse(fileLocked(lockPath))
   }
 
+  @Test
+  def testMigratingZkBrokerDeletesMetadataLog(): Unit = {
+    val logDir = Some(TestUtils.tempDir().toPath)
+    val metadataDir = Some(TestUtils.tempDir().toPath)
+    val nodeId = 1
+    val config = createZkBrokerConfig(migrationEnabled = true, nodeId, logDir, 
metadataDir)
+    val raftManager = createRaftManager(
+      new TopicPartition("__cluster_metadata", 0),
+      config
+    )
+    raftManager.shutdown()
+
+    try {
+      KafkaRaftManager.maybeDeleteMetadataLogDir(config)
+      
assertFalse(Files.exists(metadataDir.get.resolve("__cluster_metadata-0")))
+    } catch {
+      case err: Throwable => fail("Failed to delete metadata log", err)
+    }
+    assertTrue(Files.exists(metadataDir.get))
+  }
+
+  @Test
+  def testNonMigratingZkBrokerDeletesMetadataLog(): Unit = {
+    val logDir = Some(TestUtils.tempDir().toPath)
+    val metadataDir = Some(TestUtils.tempDir().toPath)
+    val nodeId = 1
+    // Use this config to create the directory
+    val config1 = createZkBrokerConfig(migrationEnabled = true, nodeId, 
logDir, metadataDir)
+    val raftManager = createRaftManager(
+      new TopicPartition("__cluster_metadata", 0),
+      config1
+    )
+    raftManager.shutdown()
+
+    val config2 = createZkBrokerConfig(migrationEnabled = false, nodeId, 
logDir, metadataDir)
+    try {

Review Comment:
   ditto
   ```scala
       val err = assertThrows(classOf[RuntimeException], () => 
KafkaRaftManager.maybeDeleteMetadataLogDir(config2))
       assertEquals("Not deleting metadata log dir since migrations are not 
enabled.", err.getMessage)
       assertTrue(Files.exists(metadataDir.get.resolve("__cluster_metadata-0")))
   ```



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -480,6 +480,81 @@ class ZkMigrationIntegrationTest {
     }
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
+    new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
+    new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+    new ClusterConfigProperty(key = "advertised.listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+    new ClusterConfigProperty(key = "listener.security.protocol.map", value = 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+  ))
+  def testDeleteLogOnStartup(zkCluster: ClusterInstance): Unit = {
+    var admin = zkCluster.createAdminClient()
+    val newTopics = new util.ArrayList[NewTopic]()
+    newTopics.add(new NewTopic("testDeleteLogOnStartup", 2, 3.toShort)
+      .configs(Map(TopicConfig.SEGMENT_BYTES_CONFIG -> "102400", 
TopicConfig.SEGMENT_MS_CONFIG -> "300000").asJava))
+    val createTopicResult = admin.createTopics(newTopics)
+    createTopicResult.all().get(60, TimeUnit.SECONDS)
+    admin.close()
+
+    // Bootstrap the ZK cluster ID into KRaft
+    val clusterId = zkCluster.clusterId()
+    val kraftCluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setBootstrapMetadataVersion(MetadataVersion.IBP_3_8_IV0).
+        setClusterId(Uuid.fromString(clusterId)).
+        setNumBrokerNodes(0).
+        setNumControllerNodes(1).build())
+      .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
+      .setConfigProp(ZkConfigs.ZK_CONNECT_CONFIG, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+      .build()
+    try {
+      kraftCluster.format()
+      kraftCluster.startup()
+      val readyFuture = 
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
+
+      // Enable migration configs and restart brokers
+      log.info("Restart brokers in migration mode")
+      
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
+      
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
+      
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
+      
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+      zkCluster.rollingBrokerRestart() // This would throw if authorizers 
weren't allowed
+      zkCluster.waitForReadyBrokers()
+      readyFuture.get(30, TimeUnit.SECONDS)
+
+      val zkClient = 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
+      TestUtils.waitUntilTrue(
+        () => zkClient.getControllerId.contains(3000),
+        "Timed out waiting for KRaft controller to take over",
+        30000)
+
+      def hasKRaftController: Boolean = {
+        
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().brokers.forall(
+          broker => broker.metadataCache.getControllerId match {
+            case Some(_: KRaftCachedControllerId) => true
+            case _ => false
+          }
+        )
+      }
+      TestUtils.waitUntilTrue(() => hasKRaftController, "Timed out waiting for 
ZK brokers to see a KRaft controller")
+
+      log.info("Restart brokers again")
+      zkCluster.rollingBrokerRestart()
+      zkCluster.waitForReadyBrokers()
+
+      // List topics is served from local MetadataCache on brokers. For ZK 
brokers this cache is populated by UMR
+      // which won't be sent until the broker has been unfenced by the KRaft 
controller. So, seeing the topic in
+      // the brokers cache tells us it has recreated and re-replicated the 
metadata log
+      admin = zkCluster.createAdminClient()

Review Comment:
   apply `try-finally`?



##########
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala:
##########
@@ -177,6 +201,78 @@ class RaftManagerTest {
     assertFalse(fileLocked(lockPath))
   }
 
+  @Test
+  def testMigratingZkBrokerDeletesMetadataLog(): Unit = {
+    val logDir = Some(TestUtils.tempDir().toPath)
+    val metadataDir = Some(TestUtils.tempDir().toPath)
+    val nodeId = 1
+    val config = createZkBrokerConfig(migrationEnabled = true, nodeId, logDir, 
metadataDir)
+    val raftManager = createRaftManager(
+      new TopicPartition("__cluster_metadata", 0),
+      config
+    )
+    raftManager.shutdown()
+
+    try {
+      KafkaRaftManager.maybeDeleteMetadataLogDir(config)
+      
assertFalse(Files.exists(metadataDir.get.resolve("__cluster_metadata-0")))
+    } catch {
+      case err: Throwable => fail("Failed to delete metadata log", err)

Review Comment:
   it seems we can just throw the exception to fail this test case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to