jsancio commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1548472966
##########
core/src/main/scala/kafka/raft/RaftManager.scala:
##########
@@ -69,6 +69,36 @@ object KafkaRaftManager {
lock
}
+
+ /**
+ * Obtain the file lock and delete the metadata log directory completely.
+ *
+ * This is only used by ZK brokers that are in pre-migration or hybrid mode
of the ZK to KRaft migration.
+ * The rationale for deleting the metadata log in these cases is that it is
safe to do on brokers and it
+ * it makes recovery from a failed migration much easier. See KAFKA-16463.
+ *
+ * @param config The broker config
+ * @return An error wrapped as an Option, if an error occurred. None
otherwise
+ */
+ def maybeDeleteMetadataLogDir(config: KafkaConfig): Option[Throwable] = {
+ // These constraints are enforced in KafkaServer, but repeating them here
to guard against future callers
+ if (config.processRoles.nonEmpty) {
+ Some(new RuntimeException("Not deleting metadata log dir since this node
is in KRaft mode."))
+ } else if (!config.migrationEnabled) {
+ Some(new RuntimeException("Not deleting metadata log dir since
migrations are not enabled."))
+ } else {
+ val metadataDir = new File(config.metadataLogDir)
+ val deletionLock = KafkaRaftManager.lockDataDir(metadataDir)
+ try {
+ Utils.delete(metadataDir)
Review Comment:
This deletes the entire metadata log directory and not the
`__cluster_metadata-0` topic partition in the metadata log dir. In some
configuration the `metadata.log.dir` equals the `log.dir(s)`. In those
configuration this will delete all of the topic partitions in the log directory.
If the test pass, this means that we are missing a test that checks this
doesn't happen.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]