chia7712 commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1549840889
##########
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala:
##########
@@ -177,6 +201,78 @@ class RaftManagerTest {
assertFalse(fileLocked(lockPath))
}
+ @Test
+ def testMigratingZkBrokerDeletesMetadataLog(): Unit = {
+ val logDir = Some(TestUtils.tempDir().toPath)
+ val metadataDir = Some(TestUtils.tempDir().toPath)
+ val nodeId = 1
+ val config = createZkBrokerConfig(migrationEnabled = true, nodeId, logDir,
metadataDir)
+ val raftManager = createRaftManager(
+ new TopicPartition("__cluster_metadata", 0),
+ config
+ )
+ raftManager.shutdown()
+
+ try {
+ KafkaRaftManager.maybeDeleteMetadataLogDir(config)
+
assertFalse(Files.exists(metadataDir.get.resolve("__cluster_metadata-0")))
+ } catch {
+ case err: Throwable => fail("Failed to delete metadata log", err)
+ }
+ assertTrue(Files.exists(metadataDir.get))
+ }
+
+ @Test
+ def testNonMigratingZkBrokerDeletesMetadataLog(): Unit = {
+ val logDir = Some(TestUtils.tempDir().toPath)
+ val metadataDir = Some(TestUtils.tempDir().toPath)
+ val nodeId = 1
+ // Use this config to create the directory
+ val config1 = createZkBrokerConfig(migrationEnabled = true, nodeId,
logDir, metadataDir)
+ val raftManager = createRaftManager(
+ new TopicPartition("__cluster_metadata", 0),
+ config1
+ )
+ raftManager.shutdown()
+
+ val config2 = createZkBrokerConfig(migrationEnabled = false, nodeId,
logDir, metadataDir)
+ try {
+ KafkaRaftManager.maybeDeleteMetadataLogDir(config2)
+ fail("Should have not deleted the metadata log")
+ } catch {
+ case err: Throwable =>
+ assertEquals("Not deleting metadata log dir since migrations are not
enabled.", err.getMessage)
+
assertTrue(Files.exists(metadataDir.get.resolve("__cluster_metadata-0")))
+ }
+ assertTrue(Files.exists(metadataDir.get))
+ }
+
+ @Test
+ def testKRaftBrokerDoesNotDeleteMetadataLog(): Unit = {
+ val logDir = Some(TestUtils.tempDir().toPath)
+ val metadataDir = Some(TestUtils.tempDir().toPath)
+ val nodeId = 1
+ val config = createConfig(
+ Set(ProcessRole.BrokerRole),
+ nodeId,
+ logDir,
+ metadataDir
+ )
+ val raftManager = createRaftManager(
+ new TopicPartition("__cluster_metadata", 0),
+ config
+ )
+ raftManager.shutdown()
+
+ try {
Review Comment:
we can use `assertThrow` to simplify the code. for example:
```scala
assertThrows(classOf[RuntimeException], () =>
KafkaRaftManager.maybeDeleteMetadataLogDir(config))
assertTrue(Files.exists(metadataDir.get.resolve("__cluster_metadata-0")))
assertTrue(Files.exists(metadataDir.get))
```
##########
core/src/main/scala/kafka/raft/RaftManager.scala:
##########
@@ -69,6 +70,51 @@ object KafkaRaftManager {
lock
}
+
+ /**
+ * Test if the configured metadata log dir is one of the data log dirs.
+ */
+ def hasDifferentLogDir(config: KafkaConfig): Boolean = {
+ !config
+ .logDirs
+ .map(Paths.get(_).toAbsolutePath)
+ .contains(Paths.get(config.metadataLogDir).toAbsolutePath)
+ }
+
+ /**
+ * Obtain the file lock and delete the metadata log directory completely.
+ *
+ * This is only used by ZK brokers that are in pre-migration or hybrid mode
of the ZK to KRaft migration.
+ * The rationale for deleting the metadata log in these cases is that it is
safe to do on brokers and it
+ * it makes recovery from a failed migration much easier. See KAFKA-16463.
+ *
+ * @param config The broker config
+ */
+ def maybeDeleteMetadataLogDir(config: KafkaConfig): Unit = {
+ // These constraints are enforced in KafkaServer, but repeating them here
to guard against future callers
+ if (config.processRoles.nonEmpty) {
+ throw new RuntimeException("Not deleting metadata log dir since this
node is in KRaft mode.")
+ } else if (!config.migrationEnabled) {
+ throw new RuntimeException("Not deleting metadata log dir since
migrations are not enabled.")
+ } else {
+ val metadataDir = new File(config.metadataLogDir)
+ val logDirName =
UnifiedLog.logDirName(Topic.CLUSTER_METADATA_TOPIC_PARTITION)
+ val metadataPartitionDir = KafkaRaftManager.createLogDirectory(new
File(config.metadataLogDir), logDirName)
Review Comment:
`new File(config.metadataLogDir)` can be replaced by `metadataDir`
##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -480,6 +480,81 @@ class ZkMigrationIntegrationTest {
}
}
+ @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion =
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
+ new ClusterConfigProperty(key = "inter.broker.listener.name", value =
"EXTERNAL"),
+ new ClusterConfigProperty(key = "listeners", value =
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+ new ClusterConfigProperty(key = "advertised.listeners", value =
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+ new ClusterConfigProperty(key = "listener.security.protocol.map", value =
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+ ))
+ def testDeleteLogOnStartup(zkCluster: ClusterInstance): Unit = {
+ var admin = zkCluster.createAdminClient()
Review Comment:
apply `try-finally`?
##########
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala:
##########
@@ -177,6 +201,78 @@ class RaftManagerTest {
assertFalse(fileLocked(lockPath))
}
+ @Test
+ def testMigratingZkBrokerDeletesMetadataLog(): Unit = {
+ val logDir = Some(TestUtils.tempDir().toPath)
+ val metadataDir = Some(TestUtils.tempDir().toPath)
+ val nodeId = 1
+ val config = createZkBrokerConfig(migrationEnabled = true, nodeId, logDir,
metadataDir)
+ val raftManager = createRaftManager(
+ new TopicPartition("__cluster_metadata", 0),
+ config
+ )
+ raftManager.shutdown()
+
+ try {
+ KafkaRaftManager.maybeDeleteMetadataLogDir(config)
+
assertFalse(Files.exists(metadataDir.get.resolve("__cluster_metadata-0")))
+ } catch {
+ case err: Throwable => fail("Failed to delete metadata log", err)
+ }
+ assertTrue(Files.exists(metadataDir.get))
+ }
+
+ @Test
+ def testNonMigratingZkBrokerDeletesMetadataLog(): Unit = {
+ val logDir = Some(TestUtils.tempDir().toPath)
+ val metadataDir = Some(TestUtils.tempDir().toPath)
+ val nodeId = 1
+ // Use this config to create the directory
+ val config1 = createZkBrokerConfig(migrationEnabled = true, nodeId,
logDir, metadataDir)
+ val raftManager = createRaftManager(
+ new TopicPartition("__cluster_metadata", 0),
+ config1
+ )
+ raftManager.shutdown()
+
+ val config2 = createZkBrokerConfig(migrationEnabled = false, nodeId,
logDir, metadataDir)
+ try {
Review Comment:
ditto
```scala
val err = assertThrows(classOf[RuntimeException], () =>
KafkaRaftManager.maybeDeleteMetadataLogDir(config2))
assertEquals("Not deleting metadata log dir since migrations are not
enabled.", err.getMessage)
assertTrue(Files.exists(metadataDir.get.resolve("__cluster_metadata-0")))
```
##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -480,6 +480,81 @@ class ZkMigrationIntegrationTest {
}
}
+ @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion =
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
+ new ClusterConfigProperty(key = "inter.broker.listener.name", value =
"EXTERNAL"),
+ new ClusterConfigProperty(key = "listeners", value =
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+ new ClusterConfigProperty(key = "advertised.listeners", value =
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+ new ClusterConfigProperty(key = "listener.security.protocol.map", value =
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+ ))
+ def testDeleteLogOnStartup(zkCluster: ClusterInstance): Unit = {
+ var admin = zkCluster.createAdminClient()
+ val newTopics = new util.ArrayList[NewTopic]()
+ newTopics.add(new NewTopic("testDeleteLogOnStartup", 2, 3.toShort)
+ .configs(Map(TopicConfig.SEGMENT_BYTES_CONFIG -> "102400",
TopicConfig.SEGMENT_MS_CONFIG -> "300000").asJava))
+ val createTopicResult = admin.createTopics(newTopics)
+ createTopicResult.all().get(60, TimeUnit.SECONDS)
+ admin.close()
+
+ // Bootstrap the ZK cluster ID into KRaft
+ val clusterId = zkCluster.clusterId()
+ val kraftCluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setBootstrapMetadataVersion(MetadataVersion.IBP_3_8_IV0).
+ setClusterId(Uuid.fromString(clusterId)).
+ setNumBrokerNodes(0).
+ setNumControllerNodes(1).build())
+ .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
+ .setConfigProp(ZkConfigs.ZK_CONNECT_CONFIG,
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+ .build()
+ try {
+ kraftCluster.format()
+ kraftCluster.startup()
+ val readyFuture =
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
+
+ // Enable migration configs and restart brokers
+ log.info("Restart brokers in migration mode")
+
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp,
"true")
+
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG,
kraftCluster.quorumVotersConfig())
+
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
"CONTROLLER")
+
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp,
"CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+ zkCluster.rollingBrokerRestart() // This would throw if authorizers
weren't allowed
+ zkCluster.waitForReadyBrokers()
+ readyFuture.get(30, TimeUnit.SECONDS)
+
+ val zkClient =
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
+ TestUtils.waitUntilTrue(
+ () => zkClient.getControllerId.contains(3000),
+ "Timed out waiting for KRaft controller to take over",
+ 30000)
+
+ def hasKRaftController: Boolean = {
+
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().brokers.forall(
+ broker => broker.metadataCache.getControllerId match {
+ case Some(_: KRaftCachedControllerId) => true
+ case _ => false
+ }
+ )
+ }
+ TestUtils.waitUntilTrue(() => hasKRaftController, "Timed out waiting for
ZK brokers to see a KRaft controller")
+
+ log.info("Restart brokers again")
+ zkCluster.rollingBrokerRestart()
+ zkCluster.waitForReadyBrokers()
+
+ // List topics is served from local MetadataCache on brokers. For ZK
brokers this cache is populated by UMR
+ // which won't be sent until the broker has been unfenced by the KRaft
controller. So, seeing the topic in
+ // the brokers cache tells us it has recreated and re-replicated the
metadata log
+ admin = zkCluster.createAdminClient()
Review Comment:
apply `try-finally`?
##########
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala:
##########
@@ -177,6 +201,78 @@ class RaftManagerTest {
assertFalse(fileLocked(lockPath))
}
+ @Test
+ def testMigratingZkBrokerDeletesMetadataLog(): Unit = {
+ val logDir = Some(TestUtils.tempDir().toPath)
+ val metadataDir = Some(TestUtils.tempDir().toPath)
+ val nodeId = 1
+ val config = createZkBrokerConfig(migrationEnabled = true, nodeId, logDir,
metadataDir)
+ val raftManager = createRaftManager(
+ new TopicPartition("__cluster_metadata", 0),
+ config
+ )
+ raftManager.shutdown()
+
+ try {
+ KafkaRaftManager.maybeDeleteMetadataLogDir(config)
+
assertFalse(Files.exists(metadataDir.get.resolve("__cluster_metadata-0")))
+ } catch {
+ case err: Throwable => fail("Failed to delete metadata log", err)
Review Comment:
it seems we can just throw the exception to fail this test case
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]